Skip to content

Sftp

This module contains the SFTPWriter class and the SFTPWriteMode enum.

The SFTPWriter class is used to write data to a file on an SFTP server. It uses the Paramiko library to establish an SFTP connection and write data to the server. The data to be written is provided by a BufferWriter, which generates the data in a buffer. See the docstring of the SFTPWriter class for more details. Refer to koheesio.spark.writers.buffer for more details on the BufferWriter interface.

The SFTPWriteMode enum defines the different write modes that the SFTPWriter can use. These modes determine how the SFTPWriter behaves when the file it is trying to write to already exists on the server. For more details on each mode, see the docstring of the SFTPWriteMode enum.

koheesio.integrations.spark.sftp.SFTPWriteMode #

The different write modes for the SFTPWriter.

OVERWRITE:#
  • If the file exists, it will be overwritten.
  • If it does not exist, a new file will be created.
APPEND:#
  • If the file exists, the new data will be appended to it.
  • If it does not exist, a new file will be created.
IGNORE:#
  • If the file exists, the method will return without writing anything.
  • If it does not exist, a new file will be created.
EXCLUSIVE:#
  • If the file exists, an error will be raised.
  • If it does not exist, a new file will be created.
BACKUP:#
  • If the file exists and the new data is different from the existing data, a backup will be created and the file will be overwritten.
  • If it does not exist, a new file will be created.
UPDATE:#
  • If the file exists and the new data is different from the existing data, the file will be overwritten.
  • If the file exists and the new data is the same as the existing data, the method will return without writing anything.
  • If the file does not exist, a new file will be created.

APPEND class-attribute instance-attribute #

APPEND = 'append'

BACKUP class-attribute instance-attribute #

BACKUP = 'backup'

EXCLUSIVE class-attribute instance-attribute #

EXCLUSIVE = 'exclusive'

IGNORE class-attribute instance-attribute #

IGNORE = 'ignore'

OVERWRITE class-attribute instance-attribute #

OVERWRITE = 'overwrite'

UPDATE class-attribute instance-attribute #

UPDATE = 'update'

write_mode property #

write_mode

Return the write mode for the given SFTPWriteMode.

from_string classmethod #

from_string(mode: str)

Return the SFTPWriteMode for the given string.

Source code in src/koheesio/integrations/spark/sftp.py
@classmethod
def from_string(cls, mode: str):
    """Return the SFTPWriteMode for the given string."""
    return cls[mode.upper()]

koheesio.integrations.spark.sftp.SFTPWriter #

Write a Dataframe to SFTP through a BufferWriter

Concept
  • This class uses Paramiko to connect to an SFTP server and write the contents of a buffer to a file on the server.
  • This implementation takes inspiration from https://github.com/springml/spark-sftp

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the folder to write to

required
file_name Optional[str]

Name of the file. If not provided, the file name is expected to be part of the path. Make sure to add the desired file extension.

None
host str

SFTP Host

required
port int

SFTP Port

required
username SecretStr

SFTP Server Username

None
password SecretStr

SFTP Server Password

None
buffer_writer BufferWriter

This is the writer that will generate the body of the file that will be written to the specified file through SFTP. Details on how the DataFrame is written to the buffer should be implemented in the implementation of the BufferWriter class. Any BufferWriter can be used here, as long as it implements the BufferWriter interface.

required
mode SFTPWriteMode

Write mode: overwrite, append, ignore, exclusive, backup, or update. See the docstring of SFTPWriteMode for more details.

SFTPWriteMode.OVERWRITE

buffer_writer class-attribute instance-attribute #

buffer_writer: InstanceOf[BufferWriter] = Field(
    default=...,
    description="This is the writer that will generate the body of the file that will be written to the specified file through SFTP. Details on how the DataFrame is written to the buffer should be implemented in the implementation of the BufferWriter class. Any BufferWriter can be used here, as long as it implements the BufferWriter interface.",
)

client property #

client: SFTPClient

Return the SFTP client. If it doesn't exist, create it.

file_name class-attribute instance-attribute #

file_name: Optional[str] = Field(
    default=None,
    description="Name of the file. If not provided, the file name is expected to be part of the path. Make sure to add the desired file extension!",
    alias="filename",
)

host class-attribute instance-attribute #

host: str = Field(default=..., description='SFTP Host')

mode class-attribute instance-attribute #

mode: SFTPWriteMode = Field(
    default=OVERWRITE,
    description="Write mode: overwrite, append, ignore, exclusive, backup, or update."
    + __doc__,
)

password class-attribute instance-attribute #

password: Optional[SecretStr] = Field(
    default=None, description="SFTP Server Password"
)

path class-attribute instance-attribute #

path: Union[str, Path] = Field(
    default=...,
    description="Path to the folder to write to",
    alias="prefix",
)

port class-attribute instance-attribute #

port: int = Field(default=..., description='SFTP Port')

transport property #

transport

Return the transport for the SFTP connection. If it doesn't exist, create it.

If the username and password are provided, use them to connect to the SFTP server.

username class-attribute instance-attribute #

username: Optional[SecretStr] = Field(
    default=None, description="SFTP Server Username"
)

write_mode property #

write_mode

Return the write mode for the given SFTPWriteMode.

check_file_exists #

check_file_exists(file_path: str) -> bool

Check if a file exists on the SFTP server.

Source code in src/koheesio/integrations/spark/sftp.py
def check_file_exists(self, file_path: str) -> bool:
    """
    Check if a file exists on the SFTP server.
    """
    try:
        self.client.stat(file_path)
        return True
    except IOError:
        return False

execute #

execute()
Source code in src/koheesio/integrations/spark/sftp.py
def execute(self):
    buffer_output: InstanceOf[BufferWriter.Output] = self.buffer_writer.write(self.df)

    # write buffer to the SFTP server
    try:
        self._handle_write_mode(self.path.as_posix(), buffer_output)
    finally:
        self._close_client()

validate_path_and_file_name #

validate_path_and_file_name(data: dict) -> dict

Validate the path, make sure path and file_name are Path objects.

Source code in src/koheesio/integrations/spark/sftp.py
@model_validator(mode="before")
def validate_path_and_file_name(cls, data: dict) -> dict:
    """Validate the path, make sure path and file_name are Path objects."""
    path_or_str = data.get("path")

    if isinstance(path_or_str, str):
        # make sure the path is a Path object
        path_or_str = Path(path_or_str)

    if not isinstance(path_or_str, Path):
        raise ValueError(f"Invalid path: {path_or_str}")

    if file_name := data.get("file_name", data.get("filename")):
        path_or_str = path_or_str / file_name
        try:
            del data["filename"]
        except KeyError:
            pass
        data["file_name"] = file_name

    data["path"] = path_or_str
    return data

validate_sftp_host #

validate_sftp_host(v) -> str

Validate the host

Source code in src/koheesio/integrations/spark/sftp.py
@field_validator("host")
def validate_sftp_host(cls, v) -> str:
    """Validate the host"""
    # remove the sftp:// prefix if present
    if v.startswith("sftp://"):
        v = v.replace("sftp://", "")

    # remove the trailing slash if present
    if v.endswith("/"):
        v = v[:-1]

    return v

write_file #

write_file(
    file_path: str, buffer_output: InstanceOf[Output]
)

Using Paramiko, write the data in the buffer to SFTP.

Source code in src/koheesio/integrations/spark/sftp.py
def write_file(self, file_path: str, buffer_output: InstanceOf[BufferWriter.Output]):
    """
    Using Paramiko, write the data in the buffer to SFTP.
    """
    with self.client.open(file_path, self.write_mode) as file:
        self.log.debug(f"Writing file {file_path} to SFTP...")
        file.write(buffer_output.read())

koheesio.integrations.spark.sftp.SendCsvToSftp #

Write a DataFrame to an SFTP server as a CSV file.

This class uses the PandasCsvBufferWriter to generate the CSV data and the SFTPWriter to write the data to the SFTP server.

Example
from koheesio.spark.writers import SendCsvToSftp

writer = SendCsvToSftp(
    # SFTP Parameters
    host="sftp.example.com",
    port=22,
    username="user",
    password="password",
    path="/path/to/folder",
    file_name="file.tsv.gz",
    # CSV Parameters
    header=True,
    sep="   ",
    quote='"',
    timestampFormat="%Y-%m-%d",
    lineSep=os.linesep,
    compression="gzip",
    index=False,
)

writer.write(df)

In this example, the DataFrame df is written to the file file.csv.gz in the folder /path/to/folder on the SFTP server. The file is written as a CSV file with a tab delimiter (TSV), double quotes as the quote character, and gzip compression.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the folder to write to.

required
file_name Optional[str]

Name of the file. If not provided, it's expected to be part of the path.

required
host str

SFTP Host.

required
port int

SFTP Port.

required
username SecretStr

SFTP Server Username.

required
password SecretStr

SFTP Server Password.

required
mode SFTPWriteMode

Write mode: overwrite, append, ignore, exclusive, backup, or update.

required
header bool

Whether to write column names as the first line. Default is True.

required
sep str

Field delimiter for the output file. Default is ','.

required
quote str

Character used to quote fields. Default is '"'.

required
quoteAll bool

Whether all values should be enclosed in quotes. Default is False.

required
escape str

Character used to escape sep and quote when needed. Default is '\'.

required
timestampFormat str

Date format for datetime objects. Default is '%Y-%m-%dT%H:%M:%S.%f'.

required
lineSep str

Character used as line separator. Default is os.linesep.

required
compression Optional[Literal[infer, gzip, bz2, zip, xz, zstd, tar]]

Compression to use for the output data. Default is None.

required
See Also

For more details on the CSV parameters, refer to the PandasCsvBufferWriter class documentation.

buffer_writer class-attribute instance-attribute #

buffer_writer: PandasCsvBufferWriter = Field(
    default=None, validate_default=False
)

execute #

execute()
Source code in src/koheesio/integrations/spark/sftp.py
def execute(self):
    SFTPWriter.execute(self)

set_up_buffer_writer #

set_up_buffer_writer() -> SendCsvToSftp

Set up the buffer writer, passing all CSV related options to it.

Source code in src/koheesio/integrations/spark/sftp.py
@model_validator(mode="after")
def set_up_buffer_writer(self) -> "SendCsvToSftp":
    """Set up the buffer writer, passing all CSV related options to it."""
    self.buffer_writer = PandasCsvBufferWriter(**self.get_options(options_type="koheesio_pandas_buffer_writer"))
    return self

koheesio.integrations.spark.sftp.SendJsonToSftp #

Write a DataFrame to an SFTP server as a JSON file.

This class uses the PandasJsonBufferWriter to generate the JSON data and the SFTPWriter to write the data to the SFTP server.

Example
from koheesio.spark.writers import SendJsonToSftp

writer = SendJsonToSftp(
    # SFTP Parameters (Inherited from SFTPWriter)
    host="sftp.example.com",
    port=22,
    username="user",
    password="password",
    path="/path/to/folder",
    file_name="file.json.gz",
    # JSON Parameters (Inherited from PandasJsonBufferWriter)
    orient="records",
    date_format="iso",
    double_precision=2,
    date_unit="ms",
    lines=False,
    compression="gzip",
    index=False,
)

writer.write(df)

In this example, the DataFrame df is written to the file file.json.gz in the folder /path/to/folder on the SFTP server. The file is written as a JSON file with gzip compression.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the folder on the SFTP server.

required
file_name Optional[str]

Name of the file, including extension. If not provided, expected to be part of the path.

required
host str

SFTP Host.

required
port int

SFTP Port.

required
username SecretStr

SFTP Server Username.

required
password SecretStr

SFTP Server Password.

required
mode SFTPWriteMode

Write mode: overwrite, append, ignore, exclusive, backup, or update.

required
orient Literal[split, records, index, columns, values, table]

Format of the JSON string. Default is 'records'.

required
lines bool

If True, output is one JSON object per line. Only used when orient='records'. Default is True.

required
date_format Literal[iso, epoch]

Type of date conversion. Default is 'iso'.

required
double_precision int

Decimal places for encoding floating point values. Default is 10.

required
force_ascii bool

If True, encoded string is ASCII. Default is True.

required
compression Optional[Literal[gzip]]

Compression to use for output data. Default is None.

required
See Also

For more details on the JSON parameters, refer to the PandasJsonBufferWriter class documentation.

buffer_writer class-attribute instance-attribute #

buffer_writer: PandasJsonBufferWriter = Field(
    default=None, validate_default=False
)

execute #

execute()
Source code in src/koheesio/integrations/spark/sftp.py
def execute(self):
    SFTPWriter.execute(self)

set_up_buffer_writer #

set_up_buffer_writer() -> SendJsonToSftp

Set up the buffer writer, passing all JSON related options to it.

Source code in src/koheesio/integrations/spark/sftp.py
@model_validator(mode="after")
def set_up_buffer_writer(self) -> "SendJsonToSftp":
    """Set up the buffer writer, passing all JSON related options to it."""
    self.buffer_writer = PandasJsonBufferWriter(
        **self.get_options(), compression=self.compression, columns=self.columns
    )
    return self