Stream
Module that holds some classes and functions to be able to write to a stream
Classes:
Name | Description |
---|---|
Trigger |
class to set the trigger for a stream query |
StreamWriter |
abstract class for stream writers |
ForEachBatchStreamWriter |
class to run a writer for each batch |
Functions:
Name | Description |
---|---|
writer_to_foreachbatch |
function to be used as batch_function for StreamWriter (sub)classes |
koheesio.spark.writers.stream.ForEachBatchStreamWriter #
koheesio.spark.writers.stream.StreamWriter #
ABC Stream Writer
batch_function
class-attribute
instance-attribute
#
batch_function: Optional[Callable] = Field(
default=None,
description="allows you to run custom batch functions for each micro batch",
alias="batch_function_for_each_df",
)
checkpoint_location
class-attribute
instance-attribute
#
checkpoint_location: str = Field(
default=...,
alias="checkpointLocation",
description="In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information and the running aggregates to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system that is accessible by Spark (e.g. Databricks Unity Catalog External Location)",
)
output_mode
class-attribute
instance-attribute
#
output_mode: StreamingOutputMode = Field(
default=APPEND, alias="outputMode", description=__doc__
)
stream_writer
property
#
Returns the stream writer for the given DataFrame and settings
streaming_query
class-attribute
instance-attribute
#
streaming_query: Optional[Union[str, StreamingQuery]] = (
Field(
default=None,
description="Query ID of the stream query",
)
)
trigger
class-attribute
instance-attribute
#
trigger: Optional[Union[Trigger, str, Dict]] = Field(
default=Trigger(available_now=True),
description="Set the trigger for the stream query. If this is not set it process data as batch",
)
writer
property
#
Returns the stream writer since we don't have a batch mode for streams
await_termination #
koheesio.spark.writers.stream.Trigger #
Trigger types for a stream query.
Only one trigger can be set!
Example
- processingTime='5 seconds'
- continuous='5 seconds'
- availableNow=True
- once=True
available_now
class-attribute
instance-attribute
#
available_now: Optional[bool] = Field(
default=None,
alias="availableNow",
description="if set to True, set a trigger that processes all available data in multiple batches then terminates the query.",
)
continuous
class-attribute
instance-attribute
#
continuous: Optional[str] = Field(
default=None,
description="a time interval as a string, e.g. '5 seconds', '1 minute'.Set a trigger that runs a continuous query with a given checkpoint interval.",
)
model_config
class-attribute
instance-attribute
#
once
class-attribute
instance-attribute
#
once: Optional[bool] = Field(
default=None,
deprecated=True,
description="if set to True, set a trigger that processes only one batch of data in a streaming query then terminates the query. use `available_now` instead of `once`.",
)
processing_time
class-attribute
instance-attribute
#
processing_time: Optional[str] = Field(
default=None,
alias="processingTime",
description="a processing time interval as a string, e.g. '5 seconds', '1 minute'.Set a trigger that runs a microbatch query periodically based on the processing time.",
)
execute #
Returns the trigger value as a dictionary
This method can be skipped, as the value can be accessed directly from the value
property
Source code in src/koheesio/spark/writers/stream.py
from_any
classmethod
#
Dynamically creates a Trigger class based on either another Trigger class, a passed string value, or a dictionary
This way Trigger.from_any can be used as part of a validator, without needing to worry about supported types
Source code in src/koheesio/spark/writers/stream.py
from_dict
classmethod
#
from_string
classmethod
#
Creates a Trigger class based on a string
Example
happy flow#
- processingTime='5 seconds'
- processing_time="5 hours"
- processingTime=4 minutes
- once=True
- once=true
- available_now=true
- continuous='3 hours'
- once=TrUe
- once=TRUE
unhappy flow#
valid values, but should fail the validation check of the class
- availableNow=False
- continuous=True
- once=false
Source code in src/koheesio/spark/writers/stream.py
validate_available_now #
Validate the available_now trigger value
Source code in src/koheesio/spark/writers/stream.py
validate_continuous #
Validate the continuous trigger value
Source code in src/koheesio/spark/writers/stream.py
validate_once #
Validate the once trigger value
Source code in src/koheesio/spark/writers/stream.py
validate_processing_time #
Validate the processing time trigger value
Source code in src/koheesio/spark/writers/stream.py
validate_triggers #
Validate the trigger value
Source code in src/koheesio/spark/writers/stream.py
koheesio.spark.writers.stream.writer_to_foreachbatch #
Call writer.execute
on each batch
To be passed as batch_function for StreamWriter (sub)classes.