Skip to content

Snowflake

Module containing Snowflake reader classes.

This module contains classes for reading data from Snowflake. The classes are used to create a Spark DataFrame from a Snowflake table or a query.

Classes:

Name Description
SnowflakeReader

Reader for Snowflake tables.

Query

Reader for Snowflake queries.

DbTableQuery

Reader for Snowflake queries that return a single row.

Notes

The classes are defined in the koheesio.steps.integrations.snowflake module; this module simply inherits from the classes defined there.

See Also

More detailed class descriptions can be found in the class docstrings.

koheesio.spark.readers.snowflake.DbTableQuery #

Read table from Snowflake using the dbtable option instead of query

Example
DbTableQuery(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="user",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    table="db.schema.table",
).execute().df

dbtable class-attribute instance-attribute #

dbtable: str = Field(
    default=...,
    alias="table",
    description="The name of the table",
)

koheesio.spark.readers.snowflake.Query #

Query data from Snowflake and return the result as a DataFrame

Example
Query(
    database="MY_DB",
    schema_="MY_SCHEMA",
    warehouse="MY_WH",
    user="gid.account@abc.com",
    password=Secret("super-secret-password"),
    role="APPLICATION.SNOWFLAKE.ADMIN",
    query="SELECT * FROM MY_TABLE",
).execute().df

query class-attribute instance-attribute #

query: str = Field(
    default=..., description="The query to run"
)

get_options #

get_options(
    by_alias: bool = True, include: Set[str] = None
) -> Dict[str, Any]

add query to options

Source code in src/koheesio/integrations/spark/snowflake.py
def get_options(self, by_alias: bool = True, include: Set[str] = None) -> Dict[str, Any]:
    """add query to options"""
    options = super().get_options(by_alias)
    options["query"] = self.query
    return options

validate_query #

validate_query(query: str) -> str

Replace escape characters

Source code in src/koheesio/integrations/spark/snowflake.py
@field_validator("query")
def validate_query(cls, query: str) -> str:
    """Replace escape characters"""
    query = query.replace("\\n", "\n").replace("\\t", "\t").strip()
    return query

koheesio.spark.readers.snowflake.SnowflakeReader #

Wrapper around JdbcReader for Snowflake.

Example
sr = SnowflakeReader(
    url="foo.snowflakecomputing.com",
    user="YOUR_USERNAME",
    password="***",
    database="db",
    schema="schema",
)
df = sr.read()
Notes

driver class-attribute instance-attribute #

driver: Optional[str] = None

format class-attribute instance-attribute #

format: str = Field(
    default="snowflake",
    description="The format to use when writing to Snowflake",
)

execute #

execute() -> Output

Read from Snowflake

Source code in src/koheesio/integrations/spark/snowflake.py
def execute(self) -> SparkStep.Output:
    """Read from Snowflake"""
    super().execute()