Skip to content

Spark expectations

Koheesio step for running data quality rules with Spark Expectations engine.

koheesio.integrations.spark.dq.spark_expectations.SparkExpectationsTransformation #

Run DQ rules for an input dataframe with Spark Expectations engine.

References

Spark Expectations: https://engineering.nike.com/spark-expectations/1.0.0/

drop_meta_column class-attribute instance-attribute #

drop_meta_column: bool = Field(default=False, alias='drop_meta_columns', description='Whether to drop meta columns added by spark expectations on the output df')

enable_debugger class-attribute instance-attribute #

enable_debugger: bool = Field(default=False, alias='debugger', description='...')

error_writer_format class-attribute instance-attribute #

error_writer_format: Optional[str] = Field(default='delta', alias='dataframe_writer_format', description='The format used to write to the err table')

error_writer_mode class-attribute instance-attribute #

error_writer_mode: Optional[Union[str, BatchOutputMode]] = Field(default=APPEND, alias='dataframe_writer_mode', description='The write mode that will be used to write to the err table')

error_writing_options class-attribute instance-attribute #

error_writing_options: Optional[Dict[str, str]] = Field(default_factory=dict, alias='error_writing_options', description='Options for writing to the error table')

format class-attribute instance-attribute #

format: str = Field(default='delta', alias='dataframe_writer_format', description='The format used to write to the stats and err table. Separate output formats can be specified for each table using the error_writer_format and stats_writer_format params')

mode class-attribute instance-attribute #

mode: Union[str, BatchOutputMode] = Field(default=APPEND, alias='dataframe_writer_mode', description='The write mode that will be used to write to the err and stats table. Separate output modes can be specified for each table using the error_writer_mode and stats_writer_mode params')

product_id class-attribute instance-attribute #

product_id: str = Field(default=..., description='Spark Expectations product identifier')

rules_table class-attribute instance-attribute #

rules_table: str = Field(default=..., alias='product_rules_table', description='DQ rules table')

se_user_conf class-attribute instance-attribute #

se_user_conf: Dict[str, Any] = Field(default={se_notifications_enable_email: False, se_notifications_enable_slack: False}, alias='user_conf', description='SE user provided confs', validate_default=False)

statistics_streaming class-attribute instance-attribute #

statistics_streaming: Dict[str, Any] = Field(default={se_enable_streaming: False}, alias='stats_streaming_options', description='SE stats streaming options ', validate_default=False)

statistics_table class-attribute instance-attribute #

statistics_table: str = Field(default=..., alias='dq_stats_table_name', description='DQ stats table')

stats_writer_format class-attribute instance-attribute #

stats_writer_format: Optional[str] = Field(default='delta', alias='stats_writer_format', description='The format used to write to the stats table')

stats_writer_mode class-attribute instance-attribute #

stats_writer_mode: Optional[Union[str, BatchOutputMode]] = Field(default=APPEND, alias='stats_writer_mode', description='The write mode that will be used to write to the stats table')

target_table class-attribute instance-attribute #

target_table: str = Field(default=..., alias='target_table_name', description="The table that will contain good records. Won't write to it, but will write to the err table with same name plus _err suffix")

Output #

Output of the SparkExpectationsTransformation step.

error_table_writer class-attribute instance-attribute #

error_table_writer: WrappedDataFrameWriter = Field(default=..., description='Spark Expectations error table writer')

rules_df class-attribute instance-attribute #

rules_df: DataFrame = Field(default=..., description='Output dataframe')

se class-attribute instance-attribute #

se: SparkExpectations = Field(default=..., description='Spark Expectations object')

stats_table_writer class-attribute instance-attribute #

stats_table_writer: WrappedDataFrameWriter = Field(default=..., description='Spark Expectations stats table writer')

execute #

execute() -> Output

Apply data quality rules to a dataframe using the out-of-the-box SE decorator

Source code in src/koheesio/integrations/spark/dq/spark_expectations.py
def execute(self) -> Output:
    """
    Apply data quality rules to a dataframe using the out-of-the-box SE decorator
    """
    # read rules table
    rules_df = self.spark.read.table(self.rules_table).cache()
    self.output.rules_df = rules_df

    @self._se.with_expectations(
        target_table=self.target_table,
        user_conf=self.se_user_conf,
        # Below params are `False` by default, however exposing them here for extra visibility
        # The writes can be handled by downstream Koheesio steps
        write_to_table=False,
        write_to_temp_table=False,
    )
    def inner(df: DataFrame) -> DataFrame:
        """Just a wrapper to be able to use Spark Expectations decorator"""
        return df

    output_df = inner(self.df)

    if self.drop_meta_column:
        output_df = output_df.drop("meta_dq_run_id", "meta_dq_run_datetime")

    self.output.df = output_df