Skip to content

Kafka

Kafka writer to write batch or streaming data into kafka topics

koheesio.spark.writers.kafka.KafkaWriter #

Kafka writer to write batch or streaming data into kafka topics

All kafka specific options can be provided as additional init params

Parameters:

Name Type Description Default
broker str

broker url of the kafka cluster

required
topic str

full topic name to write the data to

required
trigger Optional[Union[Trigger, str, Dict]]

Indicates optionally how to stream the data into kafka, continuous or batch

required
checkpoint_location str

In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs.

required
Example
KafkaWriter(
    write_broker="broker.com:9500",
    topic="test-topic",
    trigger=Trigger(continuous=True)
    includeHeaders: "true",
    key.serializer: "org.apache.kafka.common.serialization.StringSerializer",
    value.serializer: "org.apache.kafka.common.serialization.StringSerializer",
    kafka.group.id: "test-group",
    checkpoint_location: "s3://bucket/test-topic"
)

batch_writer property #

batch_writer: DataFrameWriter

returns a batch writer

Returns:

Type Description
DataFrameWriter

broker class-attribute instance-attribute #

broker: str = Field(
    default=..., description="Kafka brokers to write to"
)

checkpoint_location class-attribute instance-attribute #

checkpoint_location: str = Field(
    default=...,
    alias="checkpointLocation",
    description="In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information and the running aggregates to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system that is accessible by Spark (e.g. Databricks Unity Catalog External Location)",
)

format class-attribute instance-attribute #

format: str = 'kafka'

logged_option_keys property #

logged_option_keys

keys to be logged

options property #

options

retrieve the kafka options incl topic and broker.

Returns:

Type Description
dict

Dict being the combination of kafka options + topic + broker

stream_writer property #

stream_writer: DataStreamWriter

returns a stream writer

Returns:

Type Description
DataStreamWriter

streaming_query property #

streaming_query: Optional[Union[str, StreamingQuery]]

return the streaming query

topic class-attribute instance-attribute #

topic: str = Field(
    default=..., description="Kafka topic to write to"
)

trigger class-attribute instance-attribute #

trigger: Optional[Union[Trigger, str, Dict]] = Field(
    Trigger(available_now=True),
    description="Set the trigger for the stream query. If not set data is processed in batch",
)

writer property #

writer: Union[DataStreamWriter, DataFrameWriter]

function to get the writer of proper type according to whether the data to written is a stream or not This function will also set the trigger property in case of a datastream.

Returns:

Type Description
Union[DataStreamWriter, DataFrameWriter]

In case of streaming data -> DataStreamWriter, else -> DataFrameWriter

Output #

Output of the KafkaWriter

streaming_query class-attribute instance-attribute #

streaming_query: Optional[Union[str, StreamingQuery]] = (
    Field(
        default=None,
        description="Query ID of the stream query",
    )
)

execute #

execute()

Effectively write the data from the dataframe (streaming of batch) to kafka topic.

Returns:

Type Description
Output

streaming_query function can be used to gain insights on running write.

Source code in src/koheesio/spark/writers/kafka.py
def execute(self):
    """Effectively write the data from the dataframe (streaming of batch) to kafka topic.

    Returns
    -------
    KafkaWriter.Output
        streaming_query function can be used to gain insights on running write.
    """
    applied_options = {k: v for k, v in self.options.items() if k in self.logged_option_keys}
    self.log.debug(f"Applying options {applied_options}")

    self._validate_dataframe()

    _writer = self.writer.format(self.format).options(**self.options)
    self.output.streaming_query = _writer.start() if self.streaming else _writer.save()