Sftp
This module contains the SFTPWriter class and the SFTPWriteMode enum.
The SFTPWriter class is used to write data to a file on an SFTP server. It uses the Paramiko library to establish an SFTP connection and write data to the server. The data to be written is provided by a BufferWriter, which generates the data in a buffer. See the docstring of the SFTPWriter class for more details. Refer to koheesio.spark.writers.buffer for more details on the BufferWriter interface.
The SFTPWriteMode enum defines the different write modes that the SFTPWriter can use. These modes determine how the SFTPWriter behaves when the file it is trying to write to already exists on the server. For more details on each mode, see the docstring of the SFTPWriteMode enum.
koheesio.integrations.spark.sftp.SFTPWriteMode #
The different write modes for the SFTPWriter.
OVERWRITE:#
- If the file exists, it will be overwritten.
- If it does not exist, a new file will be created.
APPEND:#
- If the file exists, the new data will be appended to it.
- If it does not exist, a new file will be created.
IGNORE:#
- If the file exists, the method will return without writing anything.
- If it does not exist, a new file will be created.
EXCLUSIVE:#
- If the file exists, an error will be raised.
- If it does not exist, a new file will be created.
BACKUP:#
- If the file exists and the new data is different from the existing data, a backup will be created and the file will be overwritten.
- If it does not exist, a new file will be created.
UPDATE:#
- If the file exists and the new data is different from the existing data, the file will be overwritten.
- If the file exists and the new data is the same as the existing data, the method will return without writing anything.
- If the file does not exist, a new file will be created.
koheesio.integrations.spark.sftp.SFTPWriter #
Write a Dataframe to SFTP through a BufferWriter
Concept
- This class uses Paramiko to connect to an SFTP server and write the contents of a buffer to a file on the server.
- This implementation takes inspiration from https://github.com/springml/spark-sftp
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, Path]
|
Path to the folder to write to |
required |
file_name |
Optional[str]
|
Name of the file. If not provided, the file name is expected to be part of the path. Make sure to add the desired file extension. |
None
|
host |
str
|
SFTP Host |
required |
port |
int
|
SFTP Port |
required |
username |
SecretStr
|
SFTP Server Username |
None
|
password |
SecretStr
|
SFTP Server Password |
None
|
buffer_writer |
BufferWriter
|
This is the writer that will generate the body of the file that will be written to the specified file through SFTP. Details on how the DataFrame is written to the buffer should be implemented in the implementation of the BufferWriter class. Any BufferWriter can be used here, as long as it implements the BufferWriter interface. |
required |
mode |
SFTPWriteMode
|
Write mode: overwrite, append, ignore, exclusive, backup, or update. See the docstring of SFTPWriteMode for more details. |
SFTPWriteMode.OVERWRITE
|
buffer_writer
class-attribute
instance-attribute
#
buffer_writer: InstanceOf[BufferWriter] = Field(
default=...,
description="This is the writer that will generate the body of the file that will be written to the specified file through SFTP. Details on how the DataFrame is written to the buffer should be implemented in the implementation of the BufferWriter class. Any BufferWriter can be used here, as long as it implements the BufferWriter interface.",
)
file_name
class-attribute
instance-attribute
#
file_name: Optional[str] = Field(
default=None,
description="Name of the file. If not provided, the file name is expected to be part of the path. Make sure to add the desired file extension!",
alias="filename",
)
mode
class-attribute
instance-attribute
#
mode: SFTPWriteMode = Field(
default=OVERWRITE,
description="Write mode: overwrite, append, ignore, exclusive, backup, or update."
+ __doc__,
)
password
class-attribute
instance-attribute
#
password: Optional[SecretStr] = Field(
default=None, description="SFTP Server Password"
)
path
class-attribute
instance-attribute
#
path: Union[str, Path] = Field(
default=...,
description="Path to the folder to write to",
alias="prefix",
)
transport
property
#
Return the transport for the SFTP connection. If it doesn't exist, create it.
If the username and password are provided, use them to connect to the SFTP server.
username
class-attribute
instance-attribute
#
username: Optional[SecretStr] = Field(
default=None, description="SFTP Server Username"
)
check_file_exists #
Check if a file exists on the SFTP server.
execute #
Source code in src/koheesio/integrations/spark/sftp.py
validate_path_and_file_name #
Validate the path, make sure path and file_name are Path objects.
Source code in src/koheesio/integrations/spark/sftp.py
validate_sftp_host #
validate_sftp_host(v) -> str
Validate the host
Source code in src/koheesio/integrations/spark/sftp.py
write_file #
Using Paramiko, write the data in the buffer to SFTP.
Source code in src/koheesio/integrations/spark/sftp.py
koheesio.integrations.spark.sftp.SendCsvToSftp #
Write a DataFrame to an SFTP server as a CSV file.
This class uses the PandasCsvBufferWriter to generate the CSV data and the SFTPWriter to write the data to the SFTP server.
Example
from koheesio.spark.writers import SendCsvToSftp
writer = SendCsvToSftp(
# SFTP Parameters
host="sftp.example.com",
port=22,
username="user",
password="password",
path="/path/to/folder",
file_name="file.tsv.gz",
# CSV Parameters
header=True,
sep=" ",
quote='"',
timestampFormat="%Y-%m-%d",
lineSep=os.linesep,
compression="gzip",
index=False,
)
writer.write(df)
In this example, the DataFrame df
is written to the file file.csv.gz
in the folder /path/to/folder
on the
SFTP server. The file is written as a CSV file with a tab delimiter (TSV), double quotes as the quote character,
and gzip compression.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, Path]
|
Path to the folder to write to. |
required |
file_name |
Optional[str]
|
Name of the file. If not provided, it's expected to be part of the path. |
required |
host |
str
|
SFTP Host. |
required |
port |
int
|
SFTP Port. |
required |
username |
SecretStr
|
SFTP Server Username. |
required |
password |
SecretStr
|
SFTP Server Password. |
required |
mode |
SFTPWriteMode
|
Write mode: overwrite, append, ignore, exclusive, backup, or update. |
required |
header |
bool
|
Whether to write column names as the first line. Default is True. |
required |
sep |
str
|
Field delimiter for the output file. Default is ','. |
required |
quote |
str
|
Character used to quote fields. Default is '"'. |
required |
quoteAll |
bool
|
Whether all values should be enclosed in quotes. Default is False. |
required |
escape |
str
|
Character used to escape sep and quote when needed. Default is '\'. |
required |
timestampFormat |
str
|
Date format for datetime objects. Default is '%Y-%m-%dT%H:%M:%S.%f'. |
required |
lineSep |
str
|
Character used as line separator. Default is os.linesep. |
required |
compression |
Optional[Literal[infer, gzip, bz2, zip, xz, zstd, tar]]
|
Compression to use for the output data. Default is None. |
required |
See Also
For more details on the CSV parameters, refer to the PandasCsvBufferWriter class documentation.
buffer_writer
class-attribute
instance-attribute
#
buffer_writer: PandasCsvBufferWriter = Field(
default=None, validate_default=False
)
execute #
set_up_buffer_writer #
set_up_buffer_writer() -> SendCsvToSftp
Set up the buffer writer, passing all CSV related options to it.
Source code in src/koheesio/integrations/spark/sftp.py
koheesio.integrations.spark.sftp.SendJsonToSftp #
Write a DataFrame to an SFTP server as a JSON file.
This class uses the PandasJsonBufferWriter to generate the JSON data and the SFTPWriter to write the data to the SFTP server.
Example
from koheesio.spark.writers import SendJsonToSftp
writer = SendJsonToSftp(
# SFTP Parameters (Inherited from SFTPWriter)
host="sftp.example.com",
port=22,
username="user",
password="password",
path="/path/to/folder",
file_name="file.json.gz",
# JSON Parameters (Inherited from PandasJsonBufferWriter)
orient="records",
date_format="iso",
double_precision=2,
date_unit="ms",
lines=False,
compression="gzip",
index=False,
)
writer.write(df)
In this example, the DataFrame df
is written to the file file.json.gz
in the folder /path/to/folder
on the
SFTP server. The file is written as a JSON file with gzip compression.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, Path]
|
Path to the folder on the SFTP server. |
required |
file_name |
Optional[str]
|
Name of the file, including extension. If not provided, expected to be part of the path. |
required |
host |
str
|
SFTP Host. |
required |
port |
int
|
SFTP Port. |
required |
username |
SecretStr
|
SFTP Server Username. |
required |
password |
SecretStr
|
SFTP Server Password. |
required |
mode |
SFTPWriteMode
|
Write mode: overwrite, append, ignore, exclusive, backup, or update. |
required |
orient |
Literal[split, records, index, columns, values, table]
|
Format of the JSON string. Default is 'records'. |
required |
lines |
bool
|
If True, output is one JSON object per line. Only used when orient='records'. Default is True. |
required |
date_format |
Literal[iso, epoch]
|
Type of date conversion. Default is 'iso'. |
required |
double_precision |
int
|
Decimal places for encoding floating point values. Default is 10. |
required |
force_ascii |
bool
|
If True, encoded string is ASCII. Default is True. |
required |
compression |
Optional[Literal[gzip]]
|
Compression to use for output data. Default is None. |
required |
See Also
For more details on the JSON parameters, refer to the PandasJsonBufferWriter class documentation.
buffer_writer
class-attribute
instance-attribute
#
buffer_writer: PandasJsonBufferWriter = Field(
default=None, validate_default=False
)
execute #
set_up_buffer_writer #
set_up_buffer_writer() -> SendJsonToSftp
Set up the buffer writer, passing all JSON related options to it.