Interfaces#
SFTP-based ETL Module#
This module provides ETL base classes for processing files retrieved from SFTP servers.
- Key Features:
Abstract base class for SFTP file processing tasks
Built-in SFTP connection management and authentication
File filtering by extension and prefix
Optional automatic file cleanup after processing
Comprehensive error handling and logging
Example
>>> class DataProcessor(IBaseEtlFromFtpFile):
... def process_file(self, path: str, **kwargs):
... # Custom file processing logic
... print(f"Processing {path}")
...
>>> task = DataProcessor(
... connection=SftpConnectionConfig(
... host="data.server.com",
... user="admin"
... ),
... file_config=SftpFileConfig(
... path="/data/csv",
... file_ext=".csv",
... delete_file_on_success=True,
... ),
... )
>>> task.execute()
- class core_ftp.etls.file_based.SftpFileConfig(path: str | None = None, file_prefix: str | None = None, file_ext: str | None = None, delay_in_days: int = 1, monthly_basis: bool = False, delete_file_on_success: bool = False)[source]#
Bases:
objectConfiguration for SFTP file-processing options.
- class core_ftp.etls.file_based.IBaseEtlFromFtpFile(connection: SftpConnectionConfig, file_config: SftpFileConfig | None = None, **kwargs)[source]#
Bases:
IBaseEtlFromFile,ABCBase class for ETL tasks that process files retrieved from an SFTP server.
This class extends IBaseEtlFromFile to provide SFTP-specific functionality, including connection management, file filtering, and optional cleanup. It handles the complete lifecycle of SFTP-based file processing.
- Features:
SFTP connection management with authentication support
File filtering by extension and prefix
Optional file deletion after successful processing
Proper resource cleanup
Example
# Start test SFTP server docker run -v /home/user/Documents:/home/foo/upload -p 22:22 -d atmoz/sftp foo:pass:::upload class SftpTask(IBaseEtlFromFtpFile): @classmethod def registered_name(cls) -> str: return "SftpTask" def process_file(self, path: str, *args, **kwargs): # Process the file here pass # Execute the task SftpTask( connection=SftpConnectionConfig(host="localhost", user="foo", password="pass"), file_config=SftpFileConfig( path="/upload", file_prefix="data_", file_ext=".csv", delete_file_on_success=True, ), ).execute()
- __init__(connection: SftpConnectionConfig, file_config: SftpFileConfig | None = None, **kwargs) None[source]#
Initialize the SFTP-based ETL task.
- Parameters:
connection (SftpConnectionConfig) – SFTP connection parameters.
file_config (Optional[SftpFileConfig]) – File-processing options.
kwargs – Additional arguments passed to the parent class.
- get_paths() Iterator[str][source]#
Retrieves file paths from the remote SFTP server.
Filters files by extension and prefix if specified. Handles connection errors gracefully and logs them appropriately.
- Returns:
Iterator of filtered file names.
- Return type:
Iterator[str]
- Raises:
Logs errors but doesn’t re-raise to allow graceful handling.
- process_file(path: str)[source]#
Processes a single file from the SFTP server.
This method should be overridden by subclasses to implement specific file processing logic.
- Parameters:
path – Path to the remote file to process.
- on_success(path: str)[source]#
Called after successful file processing.
Optionally deletes the processed file from the SFTP server if delete_file_on_success is enabled.
- Parameters:
path – Path to the successfully processed file.
- clean_resources()[source]#
Cleans up SFTP connection resources. Safely closes the SFTP connection, handling any cleanup errors gracefully.
- _abc_impl = <_abc._abc_data object>#