Interfaces#

SFTP-based ETL Module#

This module provides ETL base classes for processing files retrieved from SFTP servers.

Key Features:
  • Abstract base class for SFTP file processing tasks

  • Built-in SFTP connection management and authentication

  • File filtering by extension and prefix

  • Optional automatic file cleanup after processing

  • Comprehensive error handling and logging

Example

>>> class DataProcessor(IBaseEtlFromFtpFile):
...     def process_file(self, path: str, **kwargs):
...         # Custom file processing logic
...         print(f"Processing {path}")
...
>>> task = DataProcessor(
...     connection=SftpConnectionConfig(
...         host="data.server.com",
...         user="admin"
...     ),
...     file_config=SftpFileConfig(
...         path="/data/csv",
...         file_ext=".csv",
...         delete_file_on_success=True,
...     ),
... )
>>> task.execute()
class core_ftp.etls.file_based.SftpFileConfig(path: str | None = None, file_prefix: str | None = None, file_ext: str | None = None, delay_in_days: int = 1, monthly_basis: bool = False, delete_file_on_success: bool = False)[source]#

Bases: object

Configuration for SFTP file-processing options.

path: str | None = None#
file_prefix: str | None = None#
file_ext: str | None = None#
delay_in_days: int = 1#
monthly_basis: bool = False#
delete_file_on_success: bool = False#
__init__(path: str | None = None, file_prefix: str | None = None, file_ext: str | None = None, delay_in_days: int = 1, monthly_basis: bool = False, delete_file_on_success: bool = False) None#
class core_ftp.etls.file_based.IBaseEtlFromFtpFile(connection: SftpConnectionConfig, file_config: SftpFileConfig | None = None, **kwargs)[source]#

Bases: IBaseEtlFromFile, ABC

Base class for ETL tasks that process files retrieved from an SFTP server.

This class extends IBaseEtlFromFile to provide SFTP-specific functionality, including connection management, file filtering, and optional cleanup. It handles the complete lifecycle of SFTP-based file processing.

Features:
  • SFTP connection management with authentication support

  • File filtering by extension and prefix

  • Optional file deletion after successful processing

  • Proper resource cleanup

Example

# Start test SFTP server
docker run -v /home/user/Documents:/home/foo/upload                        -p 22:22                        -d atmoz/sftp foo:pass:::upload

class SftpTask(IBaseEtlFromFtpFile):
    @classmethod
    def registered_name(cls) -> str:
        return "SftpTask"

    def process_file(self, path: str, *args, **kwargs):
        # Process the file here
        pass

# Execute the task
SftpTask(
    connection=SftpConnectionConfig(host="localhost", user="foo", password="pass"),
    file_config=SftpFileConfig(
        path="/upload",
        file_prefix="data_",
        file_ext=".csv",
        delete_file_on_success=True,
    ),
).execute()
__init__(connection: SftpConnectionConfig, file_config: SftpFileConfig | None = None, **kwargs) None[source]#

Initialize the SFTP-based ETL task.

Parameters:
  • connection (SftpConnectionConfig) – SFTP connection parameters.

  • file_config (Optional[SftpFileConfig]) – File-processing options.

  • kwargs – Additional arguments passed to the parent class.

pre_processing() None[source]#

Pre-processing actions…

get_paths() Iterator[str][source]#

Retrieves file paths from the remote SFTP server.

Filters files by extension and prefix if specified. Handles connection errors gracefully and logs them appropriately.

Returns:

Iterator of filtered file names.

Return type:

Iterator[str]

Raises:

Logs errors but doesn’t re-raise to allow graceful handling.

process_file(path: str)[source]#

Processes a single file from the SFTP server.

This method should be overridden by subclasses to implement specific file processing logic.

Parameters:

path – Path to the remote file to process.

on_success(path: str)[source]#

Called after successful file processing.

Optionally deletes the processed file from the SFTP server if delete_file_on_success is enabled.

Parameters:

path – Path to the successfully processed file.

clean_resources()[source]#

Cleans up SFTP connection resources. Safely closes the SFTP connection, handling any cleanup errors gracefully.

_abc_impl = <_abc._abc_data object>#