Source code for core_ftp.etls.file_based

# -*- coding: utf-8 -*-

"""
SFTP-based ETL Module
=====================

This module provides ETL base classes for processing files retrieved from SFTP servers.

Key Features:
    - Abstract base class for SFTP file processing tasks
    - Built-in SFTP connection management and authentication
    - File filtering by extension and prefix
    - Optional automatic file cleanup after processing
    - Comprehensive error handling and logging

Example:
    >>> class DataProcessor(IBaseEtlFromFtpFile):
    ...     def process_file(self, path: str, **kwargs):
    ...         # Custom file processing logic
    ...         print(f"Processing {path}")
    ...
    >>> task = DataProcessor(
    ...     connection=SftpConnectionConfig(
    ...         host="data.server.com",
    ...         user="admin"
    ...     ),
    ...     file_config=SftpFileConfig(
    ...         path="/data/csv",
    ...         file_ext=".csv",
    ...         delete_file_on_success=True,
    ...     ),
    ... )
    >>> task.execute()
"""

from __future__ import annotations

from abc import ABC
from dataclasses import dataclass
from typing import Iterator
from typing import Optional

from core_etl.file_based import IBaseEtlFromFile

from core_ftp.clients.sftp import SftpClient
from core_ftp.clients.sftp import SftpClientError
from core_ftp.clients.sftp import SftpConnectionConfig
from core_ftp.clients.sftp import SftpTransportConfig


[docs] @dataclass class SftpFileConfig: """Configuration for SFTP file-processing options.""" path: Optional[str] = None file_prefix: Optional[str] = None file_ext: Optional[str] = None delay_in_days: int = 1 monthly_basis: bool = False delete_file_on_success: bool = False
[docs] class IBaseEtlFromFtpFile(IBaseEtlFromFile, ABC): """ Base class for ETL tasks that process files retrieved from an SFTP server. This class extends IBaseEtlFromFile to provide SFTP-specific functionality, including connection management, file filtering, and optional cleanup. It handles the complete lifecycle of SFTP-based file processing. Features: - SFTP connection management with authentication support - File filtering by extension and prefix - Optional file deletion after successful processing - Proper resource cleanup Example: .. code-block:: python # Start test SFTP server docker run -v /home/user/Documents:/home/foo/upload \ -p 22:22 \ -d atmoz/sftp foo:pass:::upload class SftpTask(IBaseEtlFromFtpFile): @classmethod def registered_name(cls) -> str: return "SftpTask" def process_file(self, path: str, *args, **kwargs): # Process the file here pass # Execute the task SftpTask( connection=SftpConnectionConfig(host="localhost", user="foo", password="pass"), file_config=SftpFileConfig( path="/upload", file_prefix="data_", file_ext=".csv", delete_file_on_success=True, ), ).execute() .. """
[docs] def __init__( self, connection: SftpConnectionConfig, file_config: Optional[SftpFileConfig] = None, **kwargs, ) -> None: """ Initialize the SFTP-based ETL task. :param connection: SFTP connection parameters. :type connection: SftpConnectionConfig :param file_config: File-processing options. :type file_config: Optional[SftpFileConfig] :param kwargs: Additional arguments passed to the parent class. """ super().__init__(**kwargs) self.connection = connection self.file_config = file_config if file_config is not None else SftpFileConfig() self.ftp_client = SftpClient( connection=self.connection, transport=SftpTransportConfig(disabled_algorithms=True), )
[docs] def pre_processing(self) -> None: super().pre_processing() self.ftp_client.connect()
[docs] def get_paths(self) -> Iterator[str]: """ Retrieves file paths from the remote SFTP server. Filters files by extension and prefix if specified. Handles connection errors gracefully and logs them appropriately. :return: Iterator of filtered file names. :rtype: Iterator[str] :raises: Logs errors but doesn't re-raise to allow graceful handling. """ if self.file_config.path is None: self.warning("No path specified for SFTP file scanning") return try: for file_name, _ in self.ftp_client.list_files(self.file_config.path): if not self.file_config.file_ext or file_name.endswith(self.file_config.file_ext): if not self.file_config.file_prefix or file_name.startswith( self.file_config.file_prefix ): yield file_name except SftpClientError as error: self.error("Error listing files from SFTP path '%s': %s", self.file_config.path, error) return
[docs] def process_file(self, path: str): """ Processes a single file from the SFTP server. This method should be overridden by subclasses to implement specific file processing logic. :param path: Path to the remote file to process. """ self.info("Processing remote file: %s...", path)
[docs] def on_success(self, path: str): """ Called after successful file processing. Optionally deletes the processed file from the SFTP server if delete_file_on_success is enabled. :param path: Path to the successfully processed file. """ if self.file_config.delete_file_on_success: try: self.ftp_client.delete(path) self.info('File "%s" was deleted successfully!', path) except SftpClientError as error: self.error('Failed to delete file "%s": %s', path, error)
# Don't re-raise to avoid failing the entire process
[docs] def clean_resources(self): """ Cleans up SFTP connection resources. Safely closes the SFTP connection, handling any cleanup errors gracefully. """ try: self.ftp_client.close() self.info("SFTP connection closed successfully") except SftpClientError as error: self.warning("Error closing SFTP connection: %s", error)