@se.with_expectations(
target_table="dq_spark_local.customer_order",
write_to_table=True,
user_conf=user_conf,
target_table_view="order",
)
def build_new() -> DataFrame:
_df_order_source: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "..", "resources", "order_s.csv"))
)
_df_order_source.createOrReplaceTempView("order_source")
_df_order_target: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "..", "resources", "order_t.csv"))
)
_df_order_target.createOrReplaceTempView("order_target")
_df_product: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "..", "resources", "product.csv"))
)
_df_product.createOrReplaceTempView("product")
_df_customer_source: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "..", "resources", "customer_source.csv"))
)
_df_customer_source.createOrReplaceTempView("customer_source")
_df_customer_target: DataFrame = (
spark.read.option("header", "true")
.option("inferSchema", "true")
.csv(os.path.join(os.path.dirname(__file__), "..", "resources", "customer_source.csv"))
)
_df_customer_target.createOrReplaceTempView("customer_target")
return _df_order_source