Skip to content

Attributes

examples.scripts.sample_dq_iceberg.current_dir = os.path.dirname(os.path.abspath(__file__)) module-attribute

examples.scripts.sample_dq_iceberg.dic_job_info = {'job': 'job_name', 'Region': 'NA', 'Snapshot': '2024-04-15'} module-attribute

examples.scripts.sample_dq_iceberg.job_info = str(dic_job_info) module-attribute

examples.scripts.sample_dq_iceberg.se: SparkExpectations = SparkExpectations(product_id='your_product', rules_df=spark.sql('select * from dq_spark_local.dq_rules'), stats_table='dq_spark_local.dq_stats', stats_table_writer=writer, target_and_error_table_writer=writer, debugger=False, stats_streaming_options={user_config.se_enable_streaming: False}) module-attribute

examples.scripts.sample_dq_iceberg.spark = set_up_iceberg() module-attribute

examples.scripts.sample_dq_iceberg.user_conf: Dict[str, Union[str, int, bool, Dict[str, str]]] = {user_config.se_notifications_enable_email: False, user_config.se_notifications_email_smtp_host: 'mailhost.com', user_config.se_notifications_email_smtp_port: 25, user_config.se_notifications_email_from: '', user_config.se_notifications_email_to_other_mail_id: '', user_config.se_notifications_email_subject: 'spark expectations - data quality - notifications', user_config.se_notifications_enable_slack: False, user_config.se_notifications_slack_webhook_url: '', user_config.se_notifications_on_start: True, user_config.se_notifications_on_completion: True, user_config.se_notifications_on_fail: True, user_config.se_notifications_on_error_drop_exceeds_threshold_breach: True, user_config.se_notifications_on_error_drop_threshold: 15, user_config.se_enable_query_dq_detailed_result: True, user_config.se_enable_agg_dq_detailed_result: True, user_config.se_enable_error_table: True, user_config.se_dq_rules_params: {'env': 'local', 'table': 'product'}, user_config.se_job_metadata: job_info} module-attribute

examples.scripts.sample_dq_iceberg.writer = WrappedDataFrameWriter().mode('append').format('iceberg') module-attribute

Classes

Functions

examples.scripts.sample_dq_iceberg.build_new() -> DataFrame

Source code in examples/scripts/sample_dq_iceberg.py
@se.with_expectations(
    target_table="dq_spark_local.customer_order",
    write_to_table=True,
    user_conf=user_conf,
    target_table_view="order",
)
def build_new() -> DataFrame:
    _df_order_source: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "..", "resources", "order_s.csv"))
    )
    _df_order_source.createOrReplaceTempView("order_source")

    _df_order_target: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "..", "resources", "order_t.csv"))
    )
    _df_order_target.createOrReplaceTempView("order_target")

    _df_product: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "..", "resources", "product.csv"))
    )
    _df_product.createOrReplaceTempView("product")

    _df_customer_source: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "..", "resources", "customer_source.csv"))
    )

    _df_customer_source.createOrReplaceTempView("customer_source")

    _df_customer_target: DataFrame = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(os.path.join(os.path.dirname(__file__), "..", "resources", "customer_source.csv"))
    )
    _df_customer_target.createOrReplaceTempView("customer_target")

    return _df_order_source