Source code for core_ftp.etls.file_based
# -*- coding: utf-8 -*-
"""
SFTP-based ETL Module
=====================
This module provides ETL base classes for processing files retrieved from SFTP servers.
Key Features:
- Abstract base class for SFTP file processing tasks
- Built-in SFTP connection management and authentication
- File filtering by extension and prefix
- Optional automatic file cleanup after processing
- Comprehensive error handling and logging
Example:
>>> class DataProcessor(IBaseEtlFromFtpFile):
... def process_file(self, path: str, **kwargs):
... # Custom file processing logic
... print(f"Processing {path}")
...
>>> task = DataProcessor(
... connection=SftpConnectionConfig(
... host="data.server.com",
... user="admin"
... ),
... file_config=SftpFileConfig(
... path="/data/csv",
... file_ext=".csv",
... delete_file_on_success=True,
... ),
... )
>>> task.execute()
"""
from __future__ import annotations
from abc import ABC
from dataclasses import dataclass
from typing import Iterator
from typing import Optional
from core_etl.file_based import IBaseEtlFromFile
from core_ftp.clients.sftp import SftpClient
from core_ftp.clients.sftp import SftpClientError
from core_ftp.clients.sftp import SftpConnectionConfig
from core_ftp.clients.sftp import SftpTransportConfig
[docs]
@dataclass
class SftpFileConfig:
"""Configuration for SFTP file-processing options."""
path: Optional[str] = None
file_prefix: Optional[str] = None
file_ext: Optional[str] = None
delay_in_days: int = 1
monthly_basis: bool = False
delete_file_on_success: bool = False
[docs]
class IBaseEtlFromFtpFile(IBaseEtlFromFile, ABC):
"""
Base class for ETL tasks that process files retrieved from an SFTP server.
This class extends IBaseEtlFromFile to provide SFTP-specific functionality,
including connection management, file filtering, and optional cleanup.
It handles the complete lifecycle of SFTP-based file processing.
Features:
- SFTP connection management with authentication support
- File filtering by extension and prefix
- Optional file deletion after successful processing
- Proper resource cleanup
Example:
.. code-block:: python
# Start test SFTP server
docker run -v /home/user/Documents:/home/foo/upload \
-p 22:22 \
-d atmoz/sftp foo:pass:::upload
class SftpTask(IBaseEtlFromFtpFile):
@classmethod
def registered_name(cls) -> str:
return "SftpTask"
def process_file(self, path: str, *args, **kwargs):
# Process the file here
pass
# Execute the task
SftpTask(
connection=SftpConnectionConfig(host="localhost", user="foo", password="pass"),
file_config=SftpFileConfig(
path="/upload",
file_prefix="data_",
file_ext=".csv",
delete_file_on_success=True,
),
).execute()
..
"""
[docs]
def __init__(
self,
connection: SftpConnectionConfig,
file_config: Optional[SftpFileConfig] = None,
**kwargs,
) -> None:
"""
Initialize the SFTP-based ETL task.
:param connection: SFTP connection parameters.
:type connection: SftpConnectionConfig
:param file_config: File-processing options.
:type file_config: Optional[SftpFileConfig]
:param kwargs: Additional arguments passed to the parent class.
"""
super().__init__(**kwargs)
self.connection = connection
self.file_config = file_config if file_config is not None else SftpFileConfig()
self.ftp_client = SftpClient(
connection=self.connection,
transport=SftpTransportConfig(disabled_algorithms=True),
)
[docs]
def pre_processing(self) -> None:
super().pre_processing()
self.ftp_client.connect()
[docs]
def get_paths(self) -> Iterator[str]:
"""
Retrieves file paths from the remote SFTP server.
Filters files by extension and prefix if specified. Handles connection
errors gracefully and logs them appropriately.
:return: Iterator of filtered file names.
:rtype: Iterator[str]
:raises: Logs errors but doesn't re-raise to allow graceful handling.
"""
if self.file_config.path is None:
self.warning("No path specified for SFTP file scanning")
return
try:
for file_name, _ in self.ftp_client.list_files(self.file_config.path):
if not self.file_config.file_ext or file_name.endswith(self.file_config.file_ext):
if not self.file_config.file_prefix or file_name.startswith(
self.file_config.file_prefix
):
yield file_name
except SftpClientError as error:
self.error("Error listing files from SFTP path '%s': %s", self.file_config.path, error)
return
[docs]
def process_file(self, path: str):
"""
Processes a single file from the SFTP server.
This method should be overridden by subclasses to implement
specific file processing logic.
:param path: Path to the remote file to process.
"""
self.info("Processing remote file: %s...", path)
[docs]
def on_success(self, path: str):
"""
Called after successful file processing.
Optionally deletes the processed file from the SFTP server
if delete_file_on_success is enabled.
:param path: Path to the successfully processed file.
"""
if self.file_config.delete_file_on_success:
try:
self.ftp_client.delete(path)
self.info('File "%s" was deleted successfully!', path)
except SftpClientError as error:
self.error('Failed to delete file "%s": %s', path, error)
# Don't re-raise to avoid failing the entire process
[docs]
def clean_resources(self):
"""
Cleans up SFTP connection resources.
Safely closes the SFTP connection, handling any cleanup
errors gracefully.
"""
try:
self.ftp_client.close()
self.info("SFTP connection closed successfully")
except SftpClientError as error:
self.warning("Error closing SFTP connection: %s", error)