Kafka
Module for KafkaReader and KafkaStreamReader.
koheesio.spark.readers.kafka.KafkaReader #
Reader for Kafka topics.
Wrapper around Spark's kafka read format. Supports both batch and streaming reads.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
read_broker |
str
|
Kafka brokers to read from. Should be passed as a single string with multiple brokers passed in a comma separated list |
required |
topic |
str
|
Kafka topic to consume. |
required |
streaming |
Optional[bool]
|
Whether to read the kafka topic as a stream or not. |
required |
params |
Optional[Dict[str, str]]
|
Arbitrary options to be applied when creating NSP Reader. If a user provides values for |
required |
Notes
- The
read_broker
andtopic
parameters are required. - The
streaming
parameter defaults toFalse
. - The
params
parameter defaults to an empty dictionary. This parameter is also aliased askafka_options
. - Any extra kafka options can also be passed as key-word arguments; these will be merged with the
params
parameter
Example
from koheesio.spark.readers.kafka import KafkaReader
kafka_reader = KafkaReader(
read_broker="kafka-broker-1:9092,kafka-broker-2:9092",
topic="my-topic",
streaming=True,
# extra kafka options can be passed as key-word arguments
startingOffsets="earliest",
)
In the example above, the KafkaReader
will read from the my-topic
Kafka topic, using the brokers
kafka-broker-1:9092
and kafka-broker-2:9092
. The reader will read the topic as a stream and will start reading
from the earliest available offset.
The stream can be started by calling the read
or execute
method on the kafka_reader
object.
Note: The
KafkaStreamReader
could be used in the example above to achieve the same result.streaming
would default toTrue
in that case and could be omitted from the parameters.
See Also
- Official Spark Documentation: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
logged_option_keys
property
#
Keys that are allowed to be logged for the options.
params
class-attribute
instance-attribute
#
params: Optional[Dict[str, str]] = Field(
default_factory=dict,
alias="kafka_options",
description="Arbitrary options to be applied when creating NSP Reader. If a user provides values for 'subscribe' or 'kafka.bootstrap.servers', they will be ignored in favor of configuration passed through 'topic' and 'read_broker' respectively.",
)
read_broker
class-attribute
instance-attribute
#
read_broker: str = Field(
...,
description="Kafka brokers to read from, should be passed as a single string with multiple brokers passed in a comma separated list",
)
streaming
class-attribute
instance-attribute
#
streaming: Optional[bool] = Field(
default=False,
description="Whether to read the kafka topic as a stream or not. Defaults to False.",
)
topic
class-attribute
instance-attribute
#
topic: str = Field(
default=..., description="Kafka topic to consume."
)