Skip to content

Kafka

Module for KafkaReader and KafkaStreamReader.

koheesio.spark.readers.kafka.KafkaReader #

Reader for Kafka topics.

Wrapper around Spark's kafka read format. Supports both batch and streaming reads.

Parameters:

Name Type Description Default
read_broker str

Kafka brokers to read from. Should be passed as a single string with multiple brokers passed in a comma separated list

required
topic str

Kafka topic to consume.

required
streaming Optional[bool]

Whether to read the kafka topic as a stream or not.

required
params Optional[Dict[str, str]]

Arbitrary options to be applied when creating NSP Reader. If a user provides values for subscribe or kafka.bootstrap.servers, they will be ignored in favor of configuration passed through topic and read_broker respectively. Defaults to an empty dictionary.

required
Notes
  • The read_broker and topic parameters are required.
  • The streaming parameter defaults to False.
  • The params parameter defaults to an empty dictionary. This parameter is also aliased as kafka_options.
  • Any extra kafka options can also be passed as key-word arguments; these will be merged with the params parameter
Example
from koheesio.spark.readers.kafka import KafkaReader

kafka_reader = KafkaReader(
    read_broker="kafka-broker-1:9092,kafka-broker-2:9092",
    topic="my-topic",
    streaming=True,
    # extra kafka options can be passed as key-word arguments
    startingOffsets="earliest",
)

In the example above, the KafkaReader will read from the my-topic Kafka topic, using the brokers kafka-broker-1:9092 and kafka-broker-2:9092. The reader will read the topic as a stream and will start reading from the earliest available offset.

The stream can be started by calling the read or execute method on the kafka_reader object.

Note: The KafkaStreamReader could be used in the example above to achieve the same result. streaming would default to True in that case and could be omitted from the parameters.

See Also

batch_reader property #

batch_reader: DataFrameReader

Returns the Spark read object for batch processing.

logged_option_keys property #

logged_option_keys: set

Keys that are allowed to be logged for the options.

options property #

options: Dict[str, str]

Merge fixed parameters with arbitrary options provided by user.

params class-attribute instance-attribute #

params: Dict[str, str] = Field(
    default_factory=dict,
    alias="kafka_options",
    description="Arbitrary options to be applied when creating NSP Reader. If a user provides values for 'subscribe' or 'kafka.bootstrap.servers', they will be ignored in favor of configuration passed through 'topic' and 'read_broker' respectively.",
)

read_broker class-attribute instance-attribute #

read_broker: str = Field(
    ...,
    description="Kafka brokers to read from, should be passed as a single string with multiple brokers passed in a comma separated list",
)

reader property #

reader: Reader

Returns the appropriate reader based on the streaming flag.

stream_reader property #

stream_reader: DataStreamReader

Returns the Spark readStream object.

streaming class-attribute instance-attribute #

streaming: Optional[bool] = Field(
    default=False,
    description="Whether to read the kafka topic as a stream or not. Defaults to False.",
)

topic class-attribute instance-attribute #

topic: str = Field(
    default=..., description="Kafka topic to consume."
)

execute #

execute() -> Output
Source code in src/koheesio/spark/readers/kafka.py
def execute(self) -> Reader.Output:
    applied_options = {k: v for k, v in self.options.items() if k in self.logged_option_keys}
    self.log.debug(f"Applying options {applied_options}")

    self.output.df = self.reader.format("kafka").options(**self.options).load()  # type: ignore

koheesio.spark.readers.kafka.KafkaStreamReader #

KafkaStreamReader is a KafkaReader that reads data as a stream

This class is identical to KafkaReader, with the streaming parameter defaulting to True.

streaming class-attribute instance-attribute #

streaming: bool = True