Skip to content

Spark expectations

Koheesio step for running data quality rules with Spark Expectations engine.

koheesio.integrations.spark.dq.spark_expectations.SparkExpectationsTransformation #

Run DQ rules for an input dataframe with Spark Expectations engine.

References

Spark Expectations: https://engineering.nike.com/spark-expectations/1.0.0/

drop_meta_column class-attribute instance-attribute #

drop_meta_column: bool = Field(
    default=False,
    alias="drop_meta_columns",
    description="Whether to drop meta columns added by spark expectations on the output df",
)

enable_debugger class-attribute instance-attribute #

enable_debugger: bool = Field(
    default=False, alias="debugger", description="..."
)

error_writer_format class-attribute instance-attribute #

error_writer_format: Optional[str] = Field(
    default="delta",
    alias="dataframe_writer_format",
    description="The format used to write to the err table",
)

error_writer_mode class-attribute instance-attribute #

error_writer_mode: Optional[Union[str, BatchOutputMode]] = (
    Field(
        default=APPEND,
        alias="dataframe_writer_mode",
        description="The write mode that will be used to write to the err table",
    )
)

error_writing_options class-attribute instance-attribute #

error_writing_options: Optional[Dict[str, str]] = Field(
    default_factory=dict,
    alias="error_writing_options",
    description="Options for writing to the error table",
)

format class-attribute instance-attribute #

format: str = Field(
    default="delta",
    alias="dataframe_writer_format",
    description="The format used to write to the stats and err table. Separate output formats can be specified for each table using the error_writer_format and stats_writer_format params",
)

mode class-attribute instance-attribute #

mode: Union[str, BatchOutputMode] = Field(
    default=APPEND,
    alias="dataframe_writer_mode",
    description="The write mode that will be used to write to the err and stats table. Separate output modes can be specified for each table using the error_writer_mode and stats_writer_mode params",
)

product_id class-attribute instance-attribute #

product_id: str = Field(
    default=...,
    description="Spark Expectations product identifier",
)

rules_table class-attribute instance-attribute #

rules_table: str = Field(
    default=...,
    alias="product_rules_table",
    description="DQ rules table",
)

se_user_conf class-attribute instance-attribute #

se_user_conf: Dict[str, Any] = Field(
    default={
        se_notifications_enable_email: False,
        se_notifications_enable_slack: False,
    },
    alias="user_conf",
    description="SE user provided confs",
    validate_default=False,
)

statistics_streaming class-attribute instance-attribute #

statistics_streaming: Dict[str, Any] = Field(
    default={se_enable_streaming: False},
    alias="stats_streaming_options",
    description="SE stats streaming options ",
    validate_default=False,
)

statistics_table class-attribute instance-attribute #

statistics_table: str = Field(
    default=...,
    alias="dq_stats_table_name",
    description="DQ stats table",
)

stats_writer_format class-attribute instance-attribute #

stats_writer_format: Optional[str] = Field(
    default="delta",
    alias="stats_writer_format",
    description="The format used to write to the stats table",
)

stats_writer_mode class-attribute instance-attribute #

stats_writer_mode: Optional[Union[str, BatchOutputMode]] = (
    Field(
        default=APPEND,
        alias="stats_writer_mode",
        description="The write mode that will be used to write to the stats table",
    )
)

target_table class-attribute instance-attribute #

target_table: str = Field(
    default=...,
    alias="target_table_name",
    description="The table that will contain good records. Won't write to it, but will write to the err table with same name plus _err suffix",
)

Output #

Output of the SparkExpectationsTransformation step.

error_table_writer class-attribute instance-attribute #

error_table_writer: WrappedDataFrameWriter = Field(
    default=...,
    description="Spark Expectations error table writer",
)

rules_df class-attribute instance-attribute #

rules_df: DataFrame = Field(
    default=..., description="Output dataframe"
)

se class-attribute instance-attribute #

se: SparkExpectations = Field(
    default=..., description="Spark Expectations object"
)

stats_table_writer class-attribute instance-attribute #

stats_table_writer: WrappedDataFrameWriter = Field(
    default=...,
    description="Spark Expectations stats table writer",
)

execute #

execute() -> Output

Apply data quality rules to a dataframe using the out-of-the-box SE decorator

Source code in src/koheesio/integrations/spark/dq/spark_expectations.py
def execute(self) -> Output:
    """
    Apply data quality rules to a dataframe using the out-of-the-box SE decorator
    """
    # read rules table
    rules_df = self.spark.read.table(self.rules_table).cache()
    self.output.rules_df = rules_df

    @self._se.with_expectations(
        target_table=self.target_table,
        user_conf=self.se_user_conf,
        # Below params are `False` by default, however exposing them here for extra visibility
        # The writes can be handled by downstream Koheesio steps
        write_to_table=False,
        write_to_temp_table=False,
    )
    def inner(df: DataFrame) -> DataFrame:
        """Just a wrapper to be able to use Spark Expectations decorator"""
        return df

    output_df = inner(self.df)

    if self.drop_meta_column:
        output_df = output_df.drop("meta_dq_run_id", "meta_dq_run_datetime")

    self.output.df = output_df