Skip to content

Buffer

This module contains classes for writing data to a buffer before writing to the final destination.

The BufferWriter class is a base class for writers that write to a buffer first. It provides methods for writing, reading, and resetting the buffer, as well as checking if the buffer is compressed and compressing the buffer.

The PandasCsvBufferWriter class is a subclass of BufferWriter that writes a Spark DataFrame to CSV file(s) using Pandas. It is not meant to be used for writing huge amounts of data, but rather for writing smaller amounts of data to more arbitrary file systems (e.g., SFTP).

The PandasJsonBufferWriter class is a subclass of BufferWriter that writes a Spark DataFrame to JSON file(s) using Pandas. It is not meant to be used for writing huge amounts of data, but rather for writing smaller amounts of data to more arbitrary file systems (e.g., SFTP).

koheesio.spark.writers.buffer.BufferWriter #

Base class for writers that write to a buffer first, before writing to the final destination.

execute() method should implement how the incoming DataFrame is written to the buffer object (e.g. BytesIO) in the output.

The default implementation uses a SpooledTemporaryFile as the buffer. This is a file-like object that starts off stored in memory and automatically rolls over to a temporary file on disk if it exceeds a certain size. A SpooledTemporaryFile behaves similar to BytesIO, but with the added benefit of being able to handle larger amounts of data.

This approach provides a balance between speed and memory usage, allowing for fast in-memory operations for smaller amounts of data while still being able to handle larger amounts of data that would not otherwise fit in memory.

Output #

Output class for BufferWriter

buffer class-attribute instance-attribute #

buffer: InstanceOf[SpooledTemporaryFile] = Field(
    default_factory=partial(
        SpooledTemporaryFile, mode="w+b", max_size=0
    ),
    exclude=True,
)

compress #

compress()

Compress the file_buffer in place using GZIP

Source code in src/koheesio/spark/writers/buffer.py
def compress(self):
    """Compress the file_buffer in place using GZIP"""
    # check if the buffer is already compressed
    if self.is_compressed():
        self.logger.warn("Buffer is already compressed. Nothing to compress...")
        return self

    # compress the file_buffer
    file_buffer = self.buffer
    compressed = gzip.compress(file_buffer.read())

    # write the compressed content back to the buffer
    self.reset_buffer()
    self.buffer.write(compressed)

    return self  # to allow for chaining

is_compressed #

is_compressed()

Check if the buffer is compressed.

Source code in src/koheesio/spark/writers/buffer.py
def is_compressed(self):
    """Check if the buffer is compressed."""
    self.rewind_buffer()
    magic_number_present = self.buffer.read(2) == b"\x1f\x8b"
    self.rewind_buffer()
    return magic_number_present

read #

read()

Read the buffer

Source code in src/koheesio/spark/writers/buffer.py
def read(self):
    """Read the buffer"""
    self.rewind_buffer()
    data = self.buffer.read()
    self.rewind_buffer()
    return data

reset_buffer #

reset_buffer()

Reset the buffer

Source code in src/koheesio/spark/writers/buffer.py
def reset_buffer(self):
    """Reset the buffer"""
    self.buffer.truncate(0)
    self.rewind_buffer()
    return self

rewind_buffer #

rewind_buffer()

Rewind the buffer

Source code in src/koheesio/spark/writers/buffer.py
def rewind_buffer(self):
    """Rewind the buffer"""
    self.buffer.seek(0)
    return self

write #

write(df=None) -> Output

Write the DataFrame to the buffer

Source code in src/koheesio/spark/writers/buffer.py
def write(self, df=None) -> Output:
    """Write the DataFrame to the buffer"""
    self.df = df or self.df
    if not self.df:
        raise RuntimeError("No valid Dataframe was passed")
    self.output.reset_buffer()
    self.execute()
    return self.output

koheesio.spark.writers.buffer.PandasCsvBufferWriter #

Write a Spark DataFrame to CSV file(s) using Pandas.

Takes inspiration from https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html

See also: https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option

Note

This class is not meant to be used for writing huge amounts of data. It is meant to be used for writing smaller amounts of data to more arbitrary file systems (e.g. SFTP).

Pyspark vs Pandas

The following table shows the mapping between Pyspark, Pandas, and Koheesio properties. Note that the default values are mostly the same as Pyspark's DataFrameWriter implementation, with some exceptions (see below).

This class implements the most commonly used properties. If a property is not explicitly implemented, it can be accessed through params.

PySpark Property Default PySpark Pandas Property Default Pandas Koheesio Property Default Koheesio Notes
maxRecordsPerFile ... chunksize None max_records_per_file ... Spark property name: spark.sql.files.maxRecordsPerFile
sep , sep , sep ,
lineSep \n line_terminator os.linesep lineSep (alias=line_terminator) \n
N/A ... index True index False Determines whether row labels (index) are included in the output
header False header True header True
quote " quotechar " quote (alias=quotechar) "
quoteAll False doublequote True quoteAll (alias=doublequote) False
escape \ escapechar None escapechar (alias=escape) \
escapeQuotes True N/A N/A N/A ... Not available in Pandas
ignoreLeadingWhiteSpace True N/A N/A N/A ... Not available in Pandas
ignoreTrailingWhiteSpace True N/A N/A N/A ... Not available in Pandas
charToEscapeQuoteEscaping escape or N/A N/A N/A ... Not available in Pandas
dateFormat yyyy-MM-dd N/A N/A N/A ... Pandas implements Timestamp, not Date
timestampFormat yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] date_format N/A timestampFormat (alias=date_format) yyyy-MM-dd'T'HH🇲🇲ss[.SSS][XXX] Follows PySpark defaults
timestampNTZFormat yyyy-MM-dd'T'HH:mm:ss[.SSS] N/A N/A N/A ... Pandas implements Timestamp, see above
compression None compression infer compression None
encoding utf-8 encoding utf-8 N/A ... Not explicitly implemented
nullValue "" na_rep "" N/A "" Not explicitly implemented
emptyValue "" na_rep "" N/A "" Not explicitly implemented
N/A ... float_format N/A N/A ... Not explicitly implemented
N/A ... decimal N/A N/A ... Not explicitly implemented
N/A ... index_label None N/A ... Not explicitly implemented
N/A ... columns N/A N/A ... Not explicitly implemented
N/A ... mode N/A N/A ... Not explicitly implemented
N/A ... quoting N/A N/A ... Not explicitly implemented
N/A ... errors N/A N/A ... Not explicitly implemented
N/A ... storage_options N/A N/A ... Not explicitly implemented
differences with Pyspark:
  • dateFormat -> Pandas implements Timestamp, not just Date. Hence, Koheesio sets the default to the python equivalent of PySpark's default.
  • compression -> Spark does not compress by default, hence Koheesio does not compress by default. Compression can be provided though.

Parameters:

Name Type Description Default
header bool

Whether to write the names of columns as the first line. In Pandas a list of strings can be given assumed to be aliases for the column names - this is not supported in this class. Instead, the column names as used in the dataframe are used as the header. Koheesio sets this default to True to match Pandas' default.

True
sep str

Field delimiter for the output file. Default is ','.

,
quote str

String of length 1. Character used to quote fields. In PySpark, this is called 'quote', in Pandas this is called 'quotechar'. Default is '"'.

"
quoteAll bool

A flag indicating whether all values should always be enclosed in quotes in a field. Koheesio sets the default (False) to only escape values containing a quote character - this is Pyspark's default behavior. In Pandas, this is called 'doublequote'. Default is False.

False
escape str

String of length 1. Character used to escape sep and quotechar when appropriate. Koheesio sets this default to \ to match Pyspark's default behavior. In Pandas, this field is called 'escapechar', and defaults to None. Default is '\'.

\
timestampFormat str

Sets the string that indicates a date format for datetime objects. Koheesio sets this default to a close equivalent of Pyspark's default (excluding timezone information). In Pandas, this field is called 'date_format'. Note: Pandas does not support Java Timestamps, only Python Timestamps. The Pyspark default is yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX] which mimics the iso8601 format (datetime.isoformat()). Default is '%Y-%m-%dT%H:%M:%S.%f'.

yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]
lineSep str, optional, default=

String of length 1. Defines the character used as line separator that should be used for writing. Default is os.linesep.

required
compression Optional[Literal['infer', 'gzip', 'bz2', 'zip', 'xz', 'zstd', 'tar']]

A string representing the compression to use for on-the-fly compression of the output data. Note: Pandas sets this default to 'infer', Koheesio sets this default to 'None' leaving the data uncompressed by default. Can be set to one of 'infer', 'gzip', 'bz2', 'zip', 'xz', 'zstd', or 'tar'. See Pandas documentation for more details.

None

compression class-attribute instance-attribute #

compression: Optional[CompressionOptions] = Field(
    default=None,
    description="A string representing the compression to use for on-the-fly compression of the output data.Note: Pandas sets this default to 'infer', Koheesio sets this default to 'None' leaving the data uncompressed by default. Can be set to one of 'infer', 'gzip', 'bz2', 'zip', 'xz', 'zstd', or 'tar'. See Pandas documentation for more details.",
)

escape class-attribute instance-attribute #

escape: constr(max_length=1) = Field(
    default="\\",
    description="String of length 1. Character used to escape sep and quotechar when appropriate. Koheesio sets this default to `\\` to match Pyspark's default behavior. In Pandas, this is called 'escapechar', and defaults to None.",
    alias="escapechar",
)

header class-attribute instance-attribute #

header: bool = Field(
    default=True,
    description="Whether to write the names of columns as the first line. In Pandas a list of strings can be given assumed to be aliases for the column names - this is not supported in this class. Instead, the column names as used in the dataframe are used as the header. Koheesio sets this default to True to match Pandas' default.",
)

index class-attribute instance-attribute #

index: bool = Field(
    default=False,
    description="Toggles whether to write row names (index). Default False in Koheesio - pandas default is True.",
)

lineSep class-attribute instance-attribute #

lineSep: Optional[constr(max_length=1)] = Field(
    default=linesep,
    description="String of length 1. Defines the character used as line separator that should be used for writing.",
    alias="line_terminator",
)

quote class-attribute instance-attribute #

quote: constr(max_length=1) = Field(
    default='"',
    description="String of length 1. Character used to quote fields. In PySpark, this is called 'quote', in Pandas this is called 'quotechar'.",
    alias="quotechar",
)

quoteAll class-attribute instance-attribute #

quoteAll: bool = Field(
    default=False,
    description="A flag indicating whether all values should always be enclosed in quotes in a field. Koheesio set the default (False) to only escape values containing a quote character - this is Pyspark's default behavior. In Pandas, this is called 'doublequote'.",
    alias="doublequote",
)

sep class-attribute instance-attribute #

sep: constr(max_length=1) = Field(
    default=",",
    description="Field delimiter for the output file",
)

timestampFormat class-attribute instance-attribute #

timestampFormat: str = Field(
    default="%Y-%m-%dT%H:%M:%S.%f",
    description="Sets the string that indicates a date format for datetime objects. Koheesio sets this default to a close equivalent of Pyspark's default (excluding timezone information). In Pandas, this field is called 'date_format'. Note: Pandas does not support Java Timestamps, only Python Timestamps. The Pyspark default is `yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]` which mimics the iso8601 format (`datetime.isoformat()`).",
    alias="date_format",
)

Output #

Output class for PandasCsvBufferWriter

pandas_df class-attribute instance-attribute #

pandas_df: Optional[DataFrame] = Field(
    None,
    description="The Pandas DataFrame that was written",
)

execute #

execute()

Write the DataFrame to the buffer using Pandas to_csv() method. Compression is handled by pandas to_csv() method.

Source code in src/koheesio/spark/writers/buffer.py
def execute(self):
    """Write the DataFrame to the buffer using Pandas to_csv() method.
    Compression is handled by pandas to_csv() method.
    """
    # convert the Spark DataFrame to a Pandas DataFrame
    self.output.pandas_df = self.df.toPandas()

    # create csv file in memory
    file_buffer = self.output.buffer
    self.output.pandas_df.to_csv(file_buffer, **self.get_options(options_type="spark"))

get_options #

get_options(options_type: str = 'csv')

Returns the options to pass to Pandas' to_csv() method.

Source code in src/koheesio/spark/writers/buffer.py
def get_options(self, options_type: str = "csv"):
    """Returns the options to pass to Pandas' to_csv() method."""
    try:
        import pandas as _pd

        # Get the pandas version as a tuple of integers
        pandas_version = tuple(int(i) for i in _pd.__version__.split("."))
    except ImportError:
        raise ImportError("Pandas is required to use this writer")

    # Use line_separator for pandas 2.0.0 and later
    line_sep_option_naming = "line_separator" if pandas_version >= (2, 0, 0) else "line_terminator"

    csv_options = {
        "header": self.header,
        "sep": self.sep,
        "quotechar": self.quote,
        "doublequote": self.quoteAll,
        "escapechar": self.escape,
        "na_rep": self.emptyValue or self.nullValue,
        line_sep_option_naming: self.lineSep,
        "index": self.index,
        "date_format": self.timestampFormat,
        "compression": self.compression,
        **self.params,
    }

    if options_type == "spark":
        csv_options["lineterminator"] = csv_options.pop(line_sep_option_naming)
    elif options_type == "koheesio_pandas_buffer_writer":
        csv_options["line_terminator"] = csv_options.pop(line_sep_option_naming)

    return csv_options

koheesio.spark.writers.buffer.PandasJsonBufferWriter #

Write a Spark DataFrame to JSON file(s) using Pandas.

Takes inspiration from https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_json.html

Note

This class is not meant to be used for writing huge amounts of data. It is meant to be used for writing smaller amounts of data to more arbitrary file systems (e.g. SFTP).

Parameters:

Name Type Description Default
orient Literal['split', 'records', 'index', 'columns', 'values', 'table']

Format of the resulting JSON string. Default is 'records'.

required
lines bool

Format output as one JSON object per line. Only used when orient='records'. Default is True. - If true, the output will be formatted as one JSON object per line. - If false, the output will be written as a single JSON object. Note: this value is only used when orient='records' and will be ignored otherwise.

required
date_format Literal['iso', 'epoch']

Type of date conversion. Default is 'iso'. See Date and Timestamp Formats for a detailed description and more information.

required
double_precision int

Number of decimal places for encoding floating point values. Default is 10.

required
force_ascii bool

Force encoded string to be ASCII. Default is True.

required
compression Optional[Literal['gzip']]

A string representing the compression to use for on-the-fly compression of the output data. Koheesio sets this default to 'None' leaving the data uncompressed. Can be set to gzip' optionally. Other compression options are currently not supported by Koheesio for JSON output.

required
Other Possible Parameters
Date and Timestamp Formats in JSON#

The date_format and date_unit parameters in pandas to_json() method are used to control the representation of dates and timestamps in the resulting JSON.

  • date_format: This parameter determines the format of date strings. It accepts two options: 'epoch' and 'iso'.

    • 'epoch': Dates are represented as the number of milliseconds since the epoch (1970-01-01).
    • 'iso': Dates are represented in ISO8601 format, 'YYYY-MM-DDTHH:MM:SS'. In Pandas the default value depends on the orient parameter. For orient='table', the default is 'iso'. For all other orient values, the default is 'epoch'. However, in Koheesio, the default is set to 'iso' irrespective of the orient parameter.
  • date_unit: This parameter specifies the time unit for encoding timestamps and datetime objects. It accepts four options: 's' for seconds, 'ms' for milliseconds, 'us' for microseconds, and 'ns' for nanoseconds. The default is 'ms'. Note that this parameter is ignored when date_format='iso'.

Orient Parameter#

The orient parameter is used to specify the format of the resulting JSON string. Each option is useful in different scenarios depending on whether you need to preserve the index, data types, and/or column names of the original DataFrame. The set of possible orients is:

  • 'records': List of dictionaries with each dictionary representing a row of data. This is the default orient in Koheesio.
    • Does not preserve the index.
    • If lines=True (default), each record is written as a separate line in the output file.
      • Example:
        {"column1": value1, "column2": value2}
        {"column1": value3, "column2": value4}
        
    • If lines=False, all records are written within a single JSON array.
      • Example:
        [{"column1": value1, "column2": value2}, {"column1": value3, "column2": value4}]
        
  • 'split': Dictionary containing indexes, columns, and data.
    • Preserves data types and indexes of the original DataFrame.
    • Example:
      {"index": [index], "columns": [columns], "data": [values]}
      
  • 'index': Dictionary with DataFrame indexes as keys and dictionaries with column names and values as values.
    • Preserves the index.
    • Example:
      {"index1": {"column1": value1, "column2": value2}}
      
  • 'columns': Dictionary with column names as keys and dictionaries with indexes and values as values.
    • Preserves data types and indexes of the original DataFrame.
    • Example:
      {"column1": {"index1": value1}, "column2": {"index1": value1}}
      
  • 'values': Just the values in the DataFrame.
    • Does not preserve the index or columns.
    • Example:
      [[value1, value2], ..., [valueN-1, valueN]]
      
  • 'table': Dictionary with 'schema' and 'data' keys. 'schema' has information about data types and index, 'data' has the actual data.
    • Preserves data types and indexes of the original DataFrame.
    • Example:
      {"schema":{"fields": [{"name": index, "type": dtype}], "primaryKey": [index]}, "pandas_version":"1.4.0"}, "data": [{"column1": value1, "column2": value2}]}
      

Note: For 'records' orient, set lines to True to write each record as a separate line. The pandas output will then match the PySpark output (orient='records' and lines=True parameters).

References: - Pandas DataFrame to_json documentation - Pandas IO tools (text, CSV, HDF5, …) documentation

columns class-attribute instance-attribute #

columns: Optional[list[str]] = Field(
    default=None,
    description="The columns to write. If None, all columns will be written.",
)

compression class-attribute instance-attribute #

compression: Optional[Literal["gzip"]] = Field(
    default=None,
    description="A string representing the compression to use for on-the-fly compression of the output data.Koheesio sets this default to 'None' leaving the data uncompressed by default. Can be set to 'gzip' optionally.",
)

date_format class-attribute instance-attribute #

date_format: Literal["iso", "epoch"] = Field(
    default="iso",
    description="Type of date conversion. Default is 'iso'.",
)

double_precision class-attribute instance-attribute #

double_precision: int = Field(
    default=10,
    description="Number of decimal places for encoding floating point values. Default is 10.",
)

force_ascii class-attribute instance-attribute #

force_ascii: bool = Field(
    default=True,
    description="Force encoded string to be ASCII. Default is True.",
)

lines class-attribute instance-attribute #

lines: bool = Field(
    default=True,
    description="Format output as one JSON object per line. Only used when orient='records'. Default is True.",
)

orient class-attribute instance-attribute #

orient: Literal[
    "split",
    "records",
    "index",
    "columns",
    "values",
    "table",
] = Field(
    default="records",
    description="Format of the resulting JSON string. Default is 'records'.",
)

Output #

Output class for PandasJsonBufferWriter

pandas_df class-attribute instance-attribute #

pandas_df: Optional[DataFrame] = Field(
    None,
    description="The Pandas DataFrame that was written",
)

execute #

execute()

Write the DataFrame to the buffer using Pandas to_json() method.

Source code in src/koheesio/spark/writers/buffer.py
def execute(self):
    """Write the DataFrame to the buffer using Pandas to_json() method."""
    df = self.df
    if self.columns:
        df = df[self.columns]

    # convert the Spark DataFrame to a Pandas DataFrame
    self.output.pandas_df = df.toPandas()

    # create json file in memory
    file_buffer = self.output.buffer
    self.output.pandas_df.to_json(file_buffer, **self.get_options())

    # compress the buffer if compression is set
    if self.compression:
        self.output.compress()

get_options #

get_options()

Returns the options to pass to Pandas' to_json() method.

Source code in src/koheesio/spark/writers/buffer.py
def get_options(self):
    """Returns the options to pass to Pandas' to_json() method."""
    json_options = {
        "orient": self.orient,
        "date_format": self.date_format,
        "double_precision": self.double_precision,
        "force_ascii": self.force_ascii,
        "lines": self.lines,
        **self.params,
    }

    # ignore the 'lines' parameter if orient is not 'records'
    if self.orient != "records":
        del json_options["lines"]

    return json_options