Kafka
Kafka writer to write batch or streaming data into kafka topics
koheesio.spark.writers.kafka.KafkaWriter #
Kafka writer to write batch or streaming data into kafka topics
All kafka specific options can be provided as additional init params
Parameters:
Name | Type | Description | Default |
---|---|---|---|
broker |
str
|
broker url of the kafka cluster |
required |
topic |
str
|
full topic name to write the data to |
required |
trigger |
Optional[Union[Trigger, str, Dict]]
|
Indicates optionally how to stream the data into kafka, continuous or batch |
required |
checkpoint_location |
str
|
In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. |
required |
Example
KafkaWriter(
write_broker="broker.com:9500",
topic="test-topic",
trigger=Trigger(continuous=True)
includeHeaders: "true",
key.serializer: "org.apache.kafka.common.serialization.StringSerializer",
value.serializer: "org.apache.kafka.common.serialization.StringSerializer",
kafka.group.id: "test-group",
checkpoint_location: "s3://bucket/test-topic"
)
batch_writer
property
#
returns a batch writer
Returns:
Type | Description |
---|---|
DataFrameWriter
|
|
broker
class-attribute
instance-attribute
#
broker: str = Field(
default=..., description="Kafka brokers to write to"
)
checkpoint_location
class-attribute
instance-attribute
#
checkpoint_location: str = Field(
default=...,
alias="checkpointLocation",
description="In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information and the running aggregates to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system that is accessible by Spark (e.g. Databricks Unity Catalog External Location)",
)
options
property
#
retrieve the kafka options incl topic and broker.
Returns:
Type | Description |
---|---|
dict
|
Dict being the combination of kafka options + topic + broker |
stream_writer
property
#
returns a stream writer
Returns:
Type | Description |
---|---|
DataStreamWriter
|
|
streaming_query
property
#
return the streaming query
topic
class-attribute
instance-attribute
#
topic: str = Field(
default=..., description="Kafka topic to write to"
)
trigger
class-attribute
instance-attribute
#
trigger: Optional[Union[Trigger, str, Dict]] = Field(
Trigger(available_now=True),
description="Set the trigger for the stream query. If not set data is processed in batch",
)
writer
property
#
writer: Union[DataStreamWriter, DataFrameWriter]
function to get the writer of proper type according to whether the data to written is a stream or not This function will also set the trigger property in case of a datastream.
Returns:
Type | Description |
---|---|
Union[DataStreamWriter, DataFrameWriter]
|
In case of streaming data -> DataStreamWriter, else -> DataFrameWriter |
Output #
execute #
Effectively write the data from the dataframe (streaming of batch) to kafka topic.
Returns:
Type | Description |
---|---|
Output
|
streaming_query function can be used to gain insights on running write. |