Skip to content

Autoloader

Read from a location using Databricks' autoloader

Autoloader can ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats.

koheesio.spark.readers.databricks.autoloader.AutoLoader #

Read from a location using Databricks' autoloader

Autoloader can ingest JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats.

Notes

autoloader is a Spark Structured Streaming function!

Although most transformations are compatible with Spark Structured Streaming, not all of them are. As a result, be mindful with your downstream transformations.

Parameters:

Name Type Description Default
format Union[str, AutoLoaderFormat]

The file format, used in cloudFiles.format. Autoloader supports JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats.

required
location str

The location where the files are located, used in cloudFiles.location

required
schema_location str

The location for storing inferred schema and supporting schema evolution, used in cloudFiles.schemaLocation.

required
options Optional[Dict[str, str]]

Extra inputs to provide to the autoloader. For a full list of inputs, see https://docs.databricks.com/ingestion/auto-loader/options.html

{}
Example
from koheesio.steps.readers.databricks import AutoLoader, AutoLoaderFormat

result_df = AutoLoader(
    format=AutoLoaderFormat.JSON,
    location="some_s3_path",
    schema_location="other_s3_path",
    options={"multiLine": "true"},
).read()
See Also

Some other useful documentation:

format class-attribute instance-attribute #

format: Union[str, AutoLoaderFormat] = Field(
    default=..., description=__doc__
)

location class-attribute instance-attribute #

location: str = Field(
    default=...,
    description="The location where the files are located, used in `cloudFiles.location`",
)

options class-attribute instance-attribute #

options: Optional[Dict[str, Any]] = Field(
    default_factory=dict,
    description="Extra inputs to provide to the autoloader. For a full list of inputs, see https://docs.databricks.com/ingestion/auto-loader/options.html",
)

schema_ class-attribute instance-attribute #

schema_: Optional[
    Union[
        str,
        StructType,
        List[str],
        Tuple[str, ...],
        AtomicType,
    ]
] = Field(
    default=None,
    description="Explicit schema to apply to the input files.",
    alias="schema",
)

schema_location class-attribute instance-attribute #

schema_location: str = Field(
    default=...,
    alias="schemaLocation",
    description="The location for storing inferred schema and supporting schema evolution, used in `cloudFiles.schemaLocation`.",
)

execute #

execute() -> Output

Reads from the given location with the given options using Autoloader

Source code in src/koheesio/spark/readers/databricks/autoloader.py
def execute(self) -> Reader.Output:
    """Reads from the given location with the given options using Autoloader"""
    self.output.df = self.reader().load(self.location)

get_options #

get_options() -> Dict[str, Any]

Get the options for the autoloader

Source code in src/koheesio/spark/readers/databricks/autoloader.py
def get_options(self) -> Dict[str, Any]:
    """Get the options for the autoloader"""
    self.options.update(
        {
            "cloudFiles.format": self.format,
            "cloudFiles.schemaLocation": self.schema_location,
        }
    )
    return self.options

reader #

reader() -> DataStreamReader
Source code in src/koheesio/spark/readers/databricks/autoloader.py
def reader(self) -> DataStreamReader:
    reader = self.spark.readStream.format("cloudFiles")
    if self.schema_ is not None:
        reader = reader.schema(self.schema_)  # type: ignore
    reader = reader.options(**self.get_options())
    return reader

validate_format #

validate_format(
    format_specified: Union[str, AutoLoaderFormat]
) -> str

Validate format value

Source code in src/koheesio/spark/readers/databricks/autoloader.py
@field_validator("format")
def validate_format(cls, format_specified: Union[str, AutoLoaderFormat]) -> str:
    """Validate `format` value"""
    if isinstance(format_specified, str):
        if format_specified.upper() in [f.value.upper() for f in AutoLoaderFormat]:
            format_specified = getattr(AutoLoaderFormat, format_specified.upper())
    return str(format_specified.value)

koheesio.spark.readers.databricks.autoloader.AutoLoaderFormat #

The file format, used in cloudFiles.format Autoloader supports JSON, CSV, PARQUET, AVRO, ORC, TEXT, and BINARYFILE file formats.

AVRO class-attribute instance-attribute #

AVRO = 'avro'

BINARYFILE class-attribute instance-attribute #

BINARYFILE = 'binaryfile'

CSV class-attribute instance-attribute #

CSV = 'csv'

JSON class-attribute instance-attribute #

JSON = 'json'

ORC class-attribute instance-attribute #

ORC = 'orc'

PARQUET class-attribute instance-attribute #

PARQUET = 'parquet'

TEXT class-attribute instance-attribute #

TEXT = 'text'