Source code for ace.tools.sftp.utils

import os
import paramiko
import logging
from typing import Union
from ace.config import settings


[docs] class SFTPConnector:
[docs] def __init__( self, host: str = settings.SFTP_HOST, username: str = settings.SFTP_USERNAME, password: str = settings.SFTP_PASSWORD, private_key: str = settings.SFTP_PRIVATE_KEY, port: Union[str, int] = settings.SFTP_PORT, auth_method: str = settings.SFTP_AUTH_METHOD, ): self.host = host self.port = port self.username = username self.password = password self.private_key = private_key self.client: paramiko.client.SSHClient = paramiko.client.SSHClient() self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) if auth_method not in ['password', 'private_key']: raise ValueError("Invalid authentication method. Use 'password' or 'private_key'.") self.auth_method = auth_method if self.auth_method == 'private_key' and not self.private_key: raise ValueError("Private key path must be provided for private key authentication.") if self.auth_method == 'password' and not self.password: raise ValueError("Password must be provided for password authentication.")
[docs] def connect(self) -> None: if self.auth_method != 'password': return self.private_key_auth() return self.password_auth()
[docs] def disconnect(self): # Implement disconnection logic here if self.client: self.client.close() logging.info("Disconnected from SFTP server")
[docs] def password_auth(self) -> None: try: # Initialize the SSH client logging.info("Connecting to SFTP server using password authentication") self.client.connect(hostname=self.host, port=self.port, username=self.username, password=self.password) logging.info("Connected to SFTP server using password authentication") except paramiko.AuthenticationException as e: logging.error(f"Authentication failed: {e}")
[docs] def private_key_auth(self) -> None: try: # Initialize the SSH client logging.info("Connecting to SFTP server using private key authentication") pkey = paramiko.RSAKey.from_private_key_file(self.private_key) self.client.connect(hostname=self.host, port=self.port, username=self.username, pkey=pkey) logging.info("Connected to SFTP server using private key authentication") except paramiko.AuthenticationException as e: logging.error(f"Authentication failed: {e}")
[docs] def upload(sftp_connector: SFTPConnector, file_obj, remote_path: str): sftp_connector.connect() try: logging.info(f"Uploading file to {remote_path}") sftp = sftp_connector.client.open_sftp() file_obj.seek(0) sftp.putfo(file_obj, remote_path) logging.info(f"File uploaded successfully to {remote_path}") sftp.close() except paramiko.SSHException as e: logging.error(f"SSH error: {e}") finally: sftp_connector.disconnect()
[docs] def download(sftp_connector: SFTPConnector, file_name: str): sftp_connector.connect() try: logging.info(f"Downloading file {file_name} from SFTP server") sftp = sftp_connector.client.open_sftp() # Dowload file as bytes sftp_file = sftp.open(file_name, 'r') logging.info(f"File {file_name} downloaded successfully from SFTP server") file_content = sftp_file.read() sftp.close() sftp_connector.disconnect() return file_content except paramiko.SSHException as e: logging.error(f"SSH error: {e}") return None except FileNotFoundError as e: logging.error(f"File not found: {e}") return None