@se.with_expectations(
se.reader.get_rules_from_table(
product_rules_table="dq_spark_local.dq_rules",
target_table_name="dq_spark_local.customer_order",
dq_stats_table_name="dq_spark_local.dq_stats",
),
write_to_table=True,
row_dq=True,
agg_dq={
user_config.se_agg_dq: True,
user_config.se_source_agg_dq: True,
user_config.se_final_agg_dq: True,
},
query_dq={
user_config.se_query_dq: True,
user_config.se_source_query_dq: True,
user_config.se_final_query_dq: True,
user_config.se_target_table_view: "order",
},
spark_conf=global_spark_Conf,
)
def build_new() -> DataFrame:
_df_order: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/order.csv"))
)
_df_order.createOrReplaceTempView("order")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "resources/customer.csv"))
)
_df_customer.createOrReplaceTempView("customer")
return _df_order