Buffer
This module contains classes for writing data to a buffer before writing to the final destination.
The BufferWriter
class is a base class for writers that write to a buffer first. It provides methods for writing,
reading, and resetting the buffer, as well as checking if the buffer is compressed and compressing the buffer.
The PandasCsvBufferWriter
class is a subclass of BufferWriter
that writes a Spark DataFrame to CSV file(s) using
Pandas. It is not meant to be used for writing huge amounts of data, but rather for writing smaller amounts of data
to more arbitrary file systems (e.g., SFTP).
The PandasJsonBufferWriter
class is a subclass of BufferWriter
that writes a Spark DataFrame to JSON file(s) using
Pandas. It is not meant to be used for writing huge amounts of data, but rather for writing smaller amounts of data
to more arbitrary file systems (e.g., SFTP).
koheesio.spark.writers.buffer.BufferWriter #
Base class for writers that write to a buffer first, before writing to the final destination.
execute()
method should implement how the incoming DataFrame is written to the buffer object (e.g. BytesIO) in the
output.
The default implementation uses a SpooledTemporaryFile
as the buffer. This is a file-like object that starts off
stored in memory and automatically rolls over to a temporary file on disk if it exceeds a certain size. A
SpooledTemporaryFile
behaves similar to BytesIO
, but with the added benefit of being able to handle larger
amounts of data.
This approach provides a balance between speed and memory usage, allowing for fast in-memory operations for smaller amounts of data while still being able to handle larger amounts of data that would not otherwise fit in memory.
Output #
Output class for BufferWriter
buffer
class-attribute
instance-attribute
#
buffer: InstanceOf[SpooledTemporaryFile] = Field(
default_factory=partial(
SpooledTemporaryFile, mode="w+b", max_size=0
),
exclude=True,
)
compress #
Compress the file_buffer in place using GZIP
Source code in src/koheesio/spark/writers/buffer.py
is_compressed #
Check if the buffer is compressed.
reset_buffer #
koheesio.spark.writers.buffer.PandasCsvBufferWriter #
Write a Spark DataFrame to CSV file(s) using Pandas.
Takes inspiration from https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html
See also: https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option
Note
This class is not meant to be used for writing huge amounts of data. It is meant to be used for writing smaller amounts of data to more arbitrary file systems (e.g. SFTP).
Pyspark vs Pandas
The following table shows the mapping between Pyspark, Pandas, and Koheesio properties. Note that the default values
are mostly the same as Pyspark's DataFrameWriter
implementation, with some exceptions (see below).
This class implements the most commonly used properties. If a property is not explicitly implemented, it can be
accessed through params
.
PySpark Property | Default PySpark | Pandas Property | Default Pandas | Koheesio Property | Default Koheesio | Notes |
---|---|---|---|---|---|---|
maxRecordsPerFile | ... | chunksize | None | max_records_per_file | ... | Spark property name: spark.sql.files.maxRecordsPerFile |
sep | , | sep | , | sep | , | |
lineSep | \n |
line_terminator | os.linesep | lineSep (alias=line_terminator) | \n | |
N/A | ... | index | True | index | False | Determines whether row labels (index) are included in the output |
header | False | header | True | header | True | |
quote | " | quotechar | " | quote (alias=quotechar) | " | |
quoteAll | False | doublequote | True | quoteAll (alias=doublequote) | False | |
escape | \ |
escapechar | None | escapechar (alias=escape) | \ | |
escapeQuotes | True | N/A | N/A | N/A | ... | Not available in Pandas |
ignoreLeadingWhiteSpace | True | N/A | N/A | N/A | ... | Not available in Pandas |
ignoreTrailingWhiteSpace | True | N/A | N/A | N/A | ... | Not available in Pandas |
charToEscapeQuoteEscaping | escape or
|
N/A | N/A | N/A | ... | Not available in Pandas |
dateFormat | yyyy-MM-dd |
N/A | N/A | N/A | ... | Pandas implements Timestamp, not Date |
timestampFormat | yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] |
date_format | N/A | timestampFormat (alias=date_format) | yyyy-MM-dd'T'HH |
Follows PySpark defaults |
timestampNTZFormat | yyyy-MM-dd'T'HH:mm:ss[.SSS] |
N/A | N/A | N/A | ... | Pandas implements Timestamp, see above |
compression | None | compression | infer | compression | None | |
encoding | utf-8 | encoding | utf-8 | N/A | ... | Not explicitly implemented |
nullValue | "" | na_rep | "" | N/A | "" | Not explicitly implemented |
emptyValue | "" | na_rep | "" | N/A | "" | Not explicitly implemented |
N/A | ... | float_format | N/A | N/A | ... | Not explicitly implemented |
N/A | ... | decimal | N/A | N/A | ... | Not explicitly implemented |
N/A | ... | index_label | None | N/A | ... | Not explicitly implemented |
N/A | ... | columns | N/A | N/A | ... | Not explicitly implemented |
N/A | ... | mode | N/A | N/A | ... | Not explicitly implemented |
N/A | ... | quoting | N/A | N/A | ... | Not explicitly implemented |
N/A | ... | errors | N/A | N/A | ... | Not explicitly implemented |
N/A | ... | storage_options | N/A | N/A | ... | Not explicitly implemented |
differences with Pyspark:
- dateFormat -> Pandas implements Timestamp, not just Date. Hence, Koheesio sets the default to the python equivalent of PySpark's default.
- compression -> Spark does not compress by default, hence Koheesio does not compress by default. Compression can be provided though.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
header
|
bool
|
Whether to write the names of columns as the first line. In Pandas a list of strings can be given assumed to be aliases for the column names - this is not supported in this class. Instead, the column names as used in the dataframe are used as the header. Koheesio sets this default to True to match Pandas' default. |
True
|
sep
|
str
|
Field delimiter for the output file. Default is ','. |
,
|
quote
|
str
|
String of length 1. Character used to quote fields. In PySpark, this is called 'quote', in Pandas this is called 'quotechar'. Default is '"'. |
"
|
quoteAll
|
bool
|
A flag indicating whether all values should always be enclosed in quotes in a field. Koheesio sets the default (False) to only escape values containing a quote character - this is Pyspark's default behavior. In Pandas, this is called 'doublequote'. Default is False. |
False
|
escape
|
str
|
String of length 1. Character used to escape sep and quotechar when appropriate. Koheesio sets this default to
|
\
|
timestampFormat
|
str
|
Sets the string that indicates a date format for datetime objects. Koheesio sets this default to a close
equivalent of Pyspark's default (excluding timezone information). In Pandas, this field is called 'date_format'.
Note: Pandas does not support Java Timestamps, only Python Timestamps. The Pyspark default is
|
yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
|
lineSep
|
str, optional, default=
|
String of length 1. Defines the character used as line separator that should be used for writing. Default is os.linesep. |
required |
compression
|
Optional[Literal['infer', 'gzip', 'bz2', 'zip', 'xz', 'zstd', 'tar']]
|
A string representing the compression to use for on-the-fly compression of the output data. Note: Pandas sets this default to 'infer', Koheesio sets this default to 'None' leaving the data uncompressed by default. Can be set to one of 'infer', 'gzip', 'bz2', 'zip', 'xz', 'zstd', or 'tar'. See Pandas documentation for more details. |
None
|
compression
class-attribute
instance-attribute
#
compression: Optional[CompressionOptions] = Field(
default=None,
description="A string representing the compression to use for on-the-fly compression of the output data.Note: Pandas sets this default to 'infer', Koheesio sets this default to 'None' leaving the data uncompressed by default. Can be set to one of 'infer', 'gzip', 'bz2', 'zip', 'xz', 'zstd', or 'tar'. See Pandas documentation for more details.",
)
emptyValue
class-attribute
instance-attribute
#
emptyValue: Optional[str] = Field(
default="",
description="The string to use for missing values. Koheesio sets this default to an empty string.",
)
escape
class-attribute
instance-attribute
#
escape: constr(max_length=1) = Field(
default="\\",
description="String of length 1. Character used to escape sep and quotechar when appropriate. Koheesio sets this default to `\\` to match Pyspark's default behavior. In Pandas, this is called 'escapechar', and defaults to None.",
alias="escapechar",
)
header
class-attribute
instance-attribute
#
header: bool = Field(
default=True,
description="Whether to write the names of columns as the first line. In Pandas a list of strings can be given assumed to be aliases for the column names - this is not supported in this class. Instead, the column names as used in the dataframe are used as the header. Koheesio sets this default to True to match Pandas' default.",
)
index
class-attribute
instance-attribute
#
index: bool = Field(
default=False,
description="Toggles whether to write row names (index). Default False in Koheesio - pandas default is True.",
)
lineSep
class-attribute
instance-attribute
#
lineSep: Optional[constr(max_length=1)] = Field(
default=linesep,
description="String of length 1. Defines the character used as line separator that should be used for writing.",
alias="line_terminator",
)
nullValue
class-attribute
instance-attribute
#
nullValue: Optional[str] = Field(
default="",
description="The string to use for missing values. Koheesio sets this default to an empty string.",
)
quote
class-attribute
instance-attribute
#
quote: constr(max_length=1) = Field(
default='"',
description="String of length 1. Character used to quote fields. In PySpark, this is called 'quote', in Pandas this is called 'quotechar'.",
alias="quotechar",
)
quoteAll
class-attribute
instance-attribute
#
quoteAll: bool = Field(
default=False,
description="A flag indicating whether all values should always be enclosed in quotes in a field. Koheesio set the default (False) to only escape values containing a quote character - this is Pyspark's default behavior. In Pandas, this is called 'doublequote'.",
alias="doublequote",
)
sep
class-attribute
instance-attribute
#
sep: constr(max_length=1) = Field(
default=",",
description="Field delimiter for the output file",
)
timestampFormat
class-attribute
instance-attribute
#
timestampFormat: str = Field(
default="%Y-%m-%dT%H:%M:%S.%f",
description="Sets the string that indicates a date format for datetime objects. Koheesio sets this default to a close equivalent of Pyspark's default (excluding timezone information). In Pandas, this field is called 'date_format'. Note: Pandas does not support Java Timestamps, only Python Timestamps. The Pyspark default is `yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]` which mimics the iso8601 format (`datetime.isoformat()`).",
alias="date_format",
)
Output #
execute #
execute() -> Output
Write the DataFrame to the buffer using Pandas to_csv() method. Compression is handled by pandas to_csv() method.
Source code in src/koheesio/spark/writers/buffer.py
get_options #
Returns the options to pass to Pandas' to_csv() method.
Source code in src/koheesio/spark/writers/buffer.py
koheesio.spark.writers.buffer.PandasJsonBufferWriter #
Write a Spark DataFrame to JSON file(s) using Pandas.
Takes inspiration from https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_json.html
Note
This class is not meant to be used for writing huge amounts of data. It is meant to be used for writing smaller amounts of data to more arbitrary file systems (e.g. SFTP).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
orient
|
Literal['split', 'records', 'index', 'columns', 'values', 'table']
|
Format of the resulting JSON string. Default is 'records'. |
required |
lines
|
bool
|
Format output as one JSON object per line. Only used when orient='records'. Default is True. - If true, the output will be formatted as one JSON object per line. - If false, the output will be written as a single JSON object. Note: this value is only used when orient='records' and will be ignored otherwise. |
required |
date_format
|
Literal['iso', 'epoch']
|
Type of date conversion. Default is 'iso'. See |
required |
double_precision
|
int
|
Number of decimal places for encoding floating point values. Default is 10. |
required |
force_ascii
|
bool
|
Force encoded string to be ASCII. Default is True. |
required |
compression
|
Optional[Literal['gzip']]
|
A string representing the compression to use for on-the-fly compression of the output data. Koheesio sets this default to 'None' leaving the data uncompressed. Can be set to gzip' optionally. Other compression options are currently not supported by Koheesio for JSON output. |
required |
Other Possible Parameters
Date and Timestamp Formats in JSON#
The date_format
and date_unit
parameters in pandas to_json()
method are used to control the representation of
dates and timestamps in the resulting JSON.
-
date_format
: This parameter determines the format of date strings. It accepts two options: 'epoch' and 'iso'.- 'epoch': Dates are represented as the number of milliseconds since the epoch (1970-01-01).
- 'iso': Dates are represented in ISO8601 format, 'YYYY-MM-DDTHH:MM:SS'.
In Pandas the default value depends on the
orient
parameter. Fororient='table'
, the default is 'iso'. For all otherorient
values, the default is 'epoch'. However, in Koheesio, the default is set to 'iso' irrespective of theorient
parameter.
-
date_unit
: This parameter specifies the time unit for encoding timestamps and datetime objects. It accepts four options: 's' for seconds, 'ms' for milliseconds, 'us' for microseconds, and 'ns' for nanoseconds. The default is 'ms'. Note that this parameter is ignored whendate_format='iso'
.
Orient Parameter#
The orient
parameter is used to specify the format of the resulting JSON string. Each option is useful in
different scenarios depending on whether you need to preserve the index, data types, and/or column names of the
original DataFrame. The set of possible orients is:
- 'records': List of dictionaries with each dictionary representing a row of data. This is the default orient in
Koheesio.
- Does not preserve the index.
- If
lines=True
(default), each record is written as a separate line in the output file.- Example:
- If
lines=False
, all records are written within a single JSON array.- Example:
- 'split': Dictionary containing indexes, columns, and data.
- Preserves data types and indexes of the original DataFrame.
- Example:
- 'index': Dictionary with DataFrame indexes as keys and dictionaries with column names and values as values.
- Preserves the index.
- Example:
- 'columns': Dictionary with column names as keys and dictionaries with indexes and values as values.
- Preserves data types and indexes of the original DataFrame.
- Example:
- 'values': Just the values in the DataFrame.
- Does not preserve the index or columns.
- Example:
- 'table': Dictionary with 'schema' and 'data' keys. 'schema' has information about data types and index, 'data'
has the actual data.
- Preserves data types and indexes of the original DataFrame.
- Example:
Note
For 'records' orient, set lines
to True to write each record as a separate line. The pandas output will
then match the PySpark output (orient='records' and lines=True parameters).
References
columns
class-attribute
instance-attribute
#
columns: Optional[list[str]] = Field(
default=None,
description="The columns to write. If None, all columns will be written.",
)
compression
class-attribute
instance-attribute
#
compression: Optional[Literal["gzip"]] = Field(
default=None,
description="A string representing the compression to use for on-the-fly compression of the output data.Koheesio sets this default to 'None' leaving the data uncompressed by default. Can be set to 'gzip' optionally.",
)
date_format
class-attribute
instance-attribute
#
date_format: Literal["iso", "epoch"] = Field(
default="iso",
description="Type of date conversion. Default is 'iso'.",
)
double_precision
class-attribute
instance-attribute
#
double_precision: int = Field(
default=10,
description="Number of decimal places for encoding floating point values. Default is 10.",
)
force_ascii
class-attribute
instance-attribute
#
force_ascii: bool = Field(
default=True,
description="Force encoded string to be ASCII. Default is True.",
)
lines
class-attribute
instance-attribute
#
lines: bool = Field(
default=True,
description="Format output as one JSON object per line. Only used when orient='records'. Default is True.",
)
orient
class-attribute
instance-attribute
#
orient: Literal[
"split",
"records",
"index",
"columns",
"values",
"table",
] = Field(
default="records",
description="Format of the resulting JSON string. Default is 'records'.",
)
Output #
execute #
execute() -> Output
Write the DataFrame to the buffer using Pandas to_json() method.
Source code in src/koheesio/spark/writers/buffer.py
get_options #
get_options() -> dict
Returns the options to pass to Pandas' to_json() method.