Skip to content

Stream

Module that holds some classes and functions to be able to write to a stream

Classes:

Name Description
Trigger

class to set the trigger for a stream query

StreamWriter

abstract class for stream writers

ForEachBatchStreamWriter

class to run a writer for each batch

Functions:

Name Description
writer_to_foreachbatch

function to be used as batch_function for StreamWriter (sub)classes

koheesio.spark.writers.stream.ForEachBatchStreamWriter #

Runnable ForEachBatchWriter

execute #

execute() -> None
Source code in src/koheesio/spark/writers/stream.py
def execute(self) -> None:
    self.streaming_query = self.writer.start()

koheesio.spark.writers.stream.StreamWriter #

ABC Stream Writer

batch_function class-attribute instance-attribute #

batch_function: Optional[Callable] = Field(
    default=None,
    description="allows you to run custom batch functions for each micro batch",
    alias="batch_function_for_each_df",
)

checkpoint_location class-attribute instance-attribute #

checkpoint_location: str = Field(
    default=...,
    alias="checkpointLocation",
    description="In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information and the running aggregates to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system that is accessible by Spark (e.g. Databricks Unity Catalog External Location)",
)

output_mode class-attribute instance-attribute #

output_mode: StreamingOutputMode = Field(
    default=APPEND, alias="outputMode", description=__doc__
)

stream_writer property #

stream_writer: DataStreamWriter

Returns the stream writer for the given DataFrame and settings

streaming_query class-attribute instance-attribute #

streaming_query: Optional[Union[str, StreamingQuery]] = (
    Field(
        default=None,
        description="Query ID of the stream query",
    )
)

trigger class-attribute instance-attribute #

trigger: Optional[Union[Trigger, str, Dict]] = Field(
    default=Trigger(available_now=True),
    description="Set the trigger for the stream query. If this is not set it process data as batch",
)

writer property #

writer: DataStreamWriter

Returns the stream writer since we don't have a batch mode for streams

await_termination #

await_termination(timeout: Optional[int] = None) -> None

Await termination of the stream query

Source code in src/koheesio/spark/writers/stream.py
def await_termination(self, timeout: Optional[int] = None) -> None:
    """Await termination of the stream query"""
    self.streaming_query.awaitTermination(timeout=timeout)

execute abstractmethod #

execute() -> None
Source code in src/koheesio/spark/writers/stream.py
@abstractmethod
def execute(self) -> None:
    raise NotImplementedError

koheesio.spark.writers.stream.Trigger #

Trigger types for a stream query.

Only one trigger can be set!

Example
  • processingTime='5 seconds'
  • continuous='5 seconds'
  • availableNow=True
  • once=True
See Also

available_now class-attribute instance-attribute #

available_now: Optional[bool] = Field(
    default=None,
    alias="availableNow",
    description="if set to True, set a trigger that processes all available data in multiple batches then terminates the query.",
)

continuous class-attribute instance-attribute #

continuous: Optional[str] = Field(
    default=None,
    description="a time interval as a string, e.g. '5 seconds', '1 minute'.Set a trigger that runs a continuous query with a given checkpoint interval.",
)

model_config class-attribute instance-attribute #

model_config = ConfigDict(
    validate_default=False, extra="forbid"
)

once class-attribute instance-attribute #

once: Optional[bool] = Field(
    default=None,
    deprecated=True,
    description="if set to True, set a trigger that processes only one batch of data in a streaming query then terminates the query. use `available_now` instead of `once`.",
)

processing_time class-attribute instance-attribute #

processing_time: Optional[str] = Field(
    default=None,
    alias="processingTime",
    description="a processing time interval as a string, e.g. '5 seconds', '1 minute'.Set a trigger that runs a microbatch query periodically based on the processing time.",
)

triggers property #

triggers: Dict

Returns a list of tuples with the value for each trigger

value property #

value: Dict[str, str]

Returns the trigger value as a dictionary

execute #

execute() -> None

Returns the trigger value as a dictionary This method can be skipped, as the value can be accessed directly from the value property

Source code in src/koheesio/spark/writers/stream.py
def execute(self) -> None:
    """Returns the trigger value as a dictionary
    This method can be skipped, as the value can be accessed directly from the `value` property
    """
    self.log.warning("Trigger.execute is deprecated. Use Trigger.value directly instead")
    self.output.value = self.value

from_any classmethod #

from_any(value: Union[Trigger, str, dict]) -> Trigger

Dynamically creates a Trigger class based on either another Trigger class, a passed string value, or a dictionary

This way Trigger.from_any can be used as part of a validator, without needing to worry about supported types

Source code in src/koheesio/spark/writers/stream.py
@classmethod
def from_any(cls, value: Union["Trigger", str, dict]) -> "Trigger":
    """Dynamically creates a Trigger class based on either another Trigger class, a passed string value, or a
    dictionary

    This way Trigger.from_any can be used as part of a validator, without needing to worry about supported types
    """
    if isinstance(value, Trigger):
        return value

    if isinstance(value, str):
        return cls.from_string(value)

    if isinstance(value, dict):
        return cls.from_dict(value)

    raise RuntimeError(f"Unable to create Trigger based on the given value: {value}")

from_dict classmethod #

from_dict(_dict: dict) -> Trigger

Creates a Trigger class based on a dictionary

Source code in src/koheesio/spark/writers/stream.py
@classmethod
def from_dict(cls, _dict: dict) -> "Trigger":
    """Creates a Trigger class based on a dictionary"""
    return cls(**_dict)

from_string classmethod #

from_string(trigger: str) -> Trigger

Creates a Trigger class based on a string

Example
happy flow#
  • processingTime='5 seconds'
  • processing_time="5 hours"
  • processingTime=4 minutes
  • once=True
  • once=true
  • available_now=true
  • continuous='3 hours'
  • once=TrUe
  • once=TRUE
unhappy flow#

valid values, but should fail the validation check of the class

  • availableNow=False
  • continuous=True
  • once=false
Source code in src/koheesio/spark/writers/stream.py
@classmethod
def from_string(cls, trigger: str) -> "Trigger":
    """Creates a Trigger class based on a string

    Example
    -------
    ### happy flow

    * processingTime='5 seconds'
    * processing_time="5 hours"
    * processingTime=4 minutes
    * once=True
    * once=true
    * available_now=true
    * continuous='3 hours'
    * once=TrUe
    * once=TRUE

    ### unhappy flow
    valid values, but should fail the validation check of the class

    * availableNow=False
    * continuous=True
    * once=false
    """
    import re

    trigger_from_string = re.compile(r"(?P<triggerType>\w+)=[\'\"]?(?P<value>.+)[\'\"]?")
    _match = trigger_from_string.match(trigger)

    if _match is None:
        raise ValueError(
            f"Cannot parse value for Trigger: '{trigger}'. \n"
            f"Valid types are {', '.join(cls._all_triggers_with_alias())}"
        )

    trigger_type, value = _match.groups()

    # strip the value of any quotes
    value = value.strip("'").strip('"')

    # making value a boolean when given
    value = convert_str_to_bool(value)

    return cls.from_dict({trigger_type: value})

validate_available_now #

validate_available_now(available_now: str) -> bool

Validate the available_now trigger value

Source code in src/koheesio/spark/writers/stream.py
@field_validator("available_now", mode="before")
def validate_available_now(cls, available_now: str) -> bool:
    """Validate the available_now trigger value"""
    # making value a boolean when given
    available_now = convert_str_to_bool(available_now)

    # adapted from `pyspark.sql.streaming.readwriter.DataStreamWriter.trigger`
    if available_now is not True:
        raise ValueError(f"Value for availableNow must be True. Got:{available_now}")
    return available_now

validate_continuous #

validate_continuous(continuous: str) -> str

Validate the continuous trigger value

Source code in src/koheesio/spark/writers/stream.py
@field_validator("continuous", mode="before")
def validate_continuous(cls, continuous: str) -> str:
    """Validate the continuous trigger value"""
    # adapted from `pyspark.sql.streaming.readwriter.DataStreamWriter.trigger` except that the if statement is not
    # split in two parts
    if not isinstance(continuous, str):
        raise ValueError(f"Value for continuous must be a string. Got: {continuous}")

    if len(continuous.strip()) == 0:
        raise ValueError(f"Value for continuous must be a non empty string. Got: {continuous}")
    return continuous

validate_once #

validate_once(once: str) -> bool

Validate the once trigger value

Source code in src/koheesio/spark/writers/stream.py
@field_validator("once", mode="before")
def validate_once(cls, once: str) -> bool:
    """Validate the once trigger value"""
    # making value a boolean when given
    once = convert_str_to_bool(once)

    # adapted from `pyspark.sql.streaming.readwriter.DataStreamWriter.trigger`
    if once is not True:
        raise ValueError(f"Value for once must be True. Got: {once}")
    return once

validate_processing_time #

validate_processing_time(processing_time: str) -> str

Validate the processing time trigger value

Source code in src/koheesio/spark/writers/stream.py
@field_validator("processing_time", mode="before")
def validate_processing_time(cls, processing_time: str) -> str:
    """Validate the processing time trigger value"""
    # adapted from `pyspark.sql.streaming.readwriter.DataStreamWriter.trigger`
    if not isinstance(processing_time, str):
        raise ValueError(f"Value for processing_time must be a string. Got: {processing_time}")

    if len(processing_time.strip()) == 0:
        raise ValueError(f"Value for processingTime must be a non empty string. Got: {processing_time}")
    return processing_time

validate_triggers #

validate_triggers(triggers: Dict) -> Dict

Validate the trigger value

Source code in src/koheesio/spark/writers/stream.py
@model_validator(mode="before")
def validate_triggers(cls, triggers: Dict) -> Dict:
    """Validate the trigger value"""
    params = [*triggers.values()]

    # adapted from `pyspark.sql.streaming.readwriter.DataStreamWriter.trigger`; modified to work with pydantic v2
    if not triggers:
        raise ValueError("No trigger provided")
    if len(params) > 1:
        raise ValueError("Multiple triggers not allowed.")

    return triggers

koheesio.spark.writers.stream.writer_to_foreachbatch #

writer_to_foreachbatch(writer: Writer) -> Callable

Call writer.execute on each batch

To be passed as batch_function for StreamWriter (sub)classes.

Example
Writing to a Delta table and a Snowflake table#
DeltaTableStreamWriter(
    table="my_table",
    checkpointLocation="my_checkpointlocation",
    batch_function=writer_to_foreachbatch(
        SnowflakeWriter(
            **sfOptions,
            table="snowflake_table",
            insert_type=SnowflakeWriter.InsertType.APPEND,
        )
    ),
)
Source code in src/koheesio/spark/writers/stream.py
def writer_to_foreachbatch(writer: Writer) -> Callable:
    """Call `writer.execute` on each batch

    To be passed as batch_function for StreamWriter (sub)classes.

    Example
    -------
    ### Writing to a Delta table and a Snowflake table
    ```python
    DeltaTableStreamWriter(
        table="my_table",
        checkpointLocation="my_checkpointlocation",
        batch_function=writer_to_foreachbatch(
            SnowflakeWriter(
                **sfOptions,
                table="snowflake_table",
                insert_type=SnowflakeWriter.InsertType.APPEND,
            )
        ),
    )
    ```
    """

    def inner(df: DataFrame, batch_id: int) -> None:
        """Inner method

        As per the Spark documentation:
        In every micro-batch, the provided function will be called in every micro-batch with (i) the output rows as a
        DataFrame and (ii) the batch identifier. The batchId can be used deduplicate and transactionally write the
        output (that is, the provided Dataset) to external systems. The output DataFrame is guaranteed to exactly
        same for the same batchId (assuming all operations are deterministic in the query).
        """
        writer.log.debug(f"Running batch function for batch {batch_id}")
        writer.write(df)

    return inner