Skip to content

Utils

Spark Utility functions

koheesio.spark.utils.spark_minor_version module-attribute #

spark_minor_version: float = get_spark_minor_version()

koheesio.spark.utils.SparkDatatype #

Allowed spark datatypes

The following table lists the data types that are supported by Spark SQL.

Data type SQL name
ByteType BYTE, TINYINT
ShortType SHORT, SMALLINT
IntegerType INT, INTEGER
LongType LONG, BIGINT
FloatType FLOAT, REAL
DoubleType DOUBLE
DecimalType DECIMAL, DEC, NUMERIC
StringType STRING
BinaryType BINARY
BooleanType BOOLEAN
TimestampType TIMESTAMP, TIMESTAMP_LTZ
DateType DATE
ArrayType ARRAY
MapType MAP
NullType VOID
Not supported yet
  • TimestampNTZType TIMESTAMP_NTZ
  • YearMonthIntervalType INTERVAL YEAR, INTERVAL YEAR TO MONTH, INTERVAL MONTH
  • DayTimeIntervalType INTERVAL DAY, INTERVAL DAY TO HOUR, INTERVAL DAY TO MINUTE, INTERVAL DAY TO SECOND, INTERVAL HOUR, INTERVAL HOUR TO MINUTE, INTERVAL HOUR TO SECOND, INTERVAL MINUTE, INTERVAL MINUTE TO SECOND, INTERVAL SECOND
See Also

https://spark.apache.org/docs/latest/sql-ref-datatypes.html#supported-data-types

ARRAY class-attribute instance-attribute #

ARRAY = 'array'

BIGINT class-attribute instance-attribute #

BIGINT = 'long'

BINARY class-attribute instance-attribute #

BINARY = 'binary'

BOOLEAN class-attribute instance-attribute #

BOOLEAN = 'boolean'

BYTE class-attribute instance-attribute #

BYTE = 'byte'

DATE class-attribute instance-attribute #

DATE = 'date'

DEC class-attribute instance-attribute #

DEC = 'decimal'

DECIMAL class-attribute instance-attribute #

DECIMAL = 'decimal'

DOUBLE class-attribute instance-attribute #

DOUBLE = 'double'

FLOAT class-attribute instance-attribute #

FLOAT = 'float'

INT class-attribute instance-attribute #

INT = 'integer'

INTEGER class-attribute instance-attribute #

INTEGER = 'integer'

LONG class-attribute instance-attribute #

LONG = 'long'

MAP class-attribute instance-attribute #

MAP = 'map'

NUMERIC class-attribute instance-attribute #

NUMERIC = 'decimal'

REAL class-attribute instance-attribute #

REAL = 'float'

SHORT class-attribute instance-attribute #

SHORT = 'short'

SMALLINT class-attribute instance-attribute #

SMALLINT = 'short'

STRING class-attribute instance-attribute #

STRING = 'string'

TIMESTAMP class-attribute instance-attribute #

TIMESTAMP = 'timestamp'

TIMESTAMP_LTZ class-attribute instance-attribute #

TIMESTAMP_LTZ = 'timestamp'

TINYINT class-attribute instance-attribute #

TINYINT = 'byte'

VOID class-attribute instance-attribute #

VOID = 'void'

spark_type property #

spark_type: DataType

Returns the spark type for the given enum value

from_string classmethod #

from_string(value: str) -> SparkDatatype

Allows for getting the right Enum value by simply passing a string value This method is not case-sensitive

Source code in src/koheesio/spark/utils.py
@classmethod
def from_string(cls, value: str) -> "SparkDatatype":
    """Allows for getting the right Enum value by simply passing a string value
    This method is not case-sensitive
    """
    return getattr(cls, value.upper())

koheesio.spark.utils.get_spark_minor_version #

get_spark_minor_version() -> float

Returns the minor version of the spark instance.

For example, if the spark version is 3.3.2, this function would return 3.3

Source code in src/koheesio/spark/utils.py
def get_spark_minor_version() -> float:
    """Returns the minor version of the spark instance.

    For example, if the spark version is 3.3.2, this function would return 3.3
    """
    return float(".".join(spark_version.split(".")[:2]))

koheesio.spark.utils.import_pandas_based_on_pyspark_version #

import_pandas_based_on_pyspark_version()

This function checks the installed version of PySpark and then tries to import the appropriate version of pandas. If the correct version of pandas is not installed, it raises an ImportError with a message indicating which version of pandas should be installed.

Source code in src/koheesio/spark/utils.py
def import_pandas_based_on_pyspark_version():
    """
    This function checks the installed version of PySpark and then tries to import the appropriate version of pandas.
    If the correct version of pandas is not installed, it raises an ImportError with a message indicating which version
    of pandas should be installed.
    """
    try:
        import pandas as pd

        pyspark_version = get_spark_minor_version()
        pandas_version = pd.__version__

        if (pyspark_version < 3.4 and pandas_version >= "2") or (pyspark_version >= 3.4 and pandas_version < "2"):
            raise ImportError(
                f"For PySpark {pyspark_version}, "
                f"please install Pandas version {'< 2' if pyspark_version < 3.4 else '>= 2'}"
            )

        return pd
    except ImportError as e:
        raise ImportError("Pandas module is not installed.") from e

koheesio.spark.utils.on_databricks #

on_databricks() -> bool

Retrieve if we're running on databricks or elsewhere

Source code in src/koheesio/spark/utils.py
def on_databricks() -> bool:
    """Retrieve if we're running on databricks or elsewhere"""
    dbr_version = os.getenv("DATABRICKS_RUNTIME_VERSION", None)
    return dbr_version is not None and dbr_version != ""

koheesio.spark.utils.schema_struct_to_schema_str #

schema_struct_to_schema_str(schema: StructType) -> str

Converts a StructType to a schema str

Source code in src/koheesio/spark/utils.py
def schema_struct_to_schema_str(schema: StructType) -> str:
    """Converts a StructType to a schema str"""
    if not schema:
        return ""
    return ",\n".join([f"{field.name} {field.dataType.typeName().upper()}" for field in schema.fields])

koheesio.spark.utils.spark_data_type_is_array #

spark_data_type_is_array(data_type: DataType) -> bool

Check if the column's dataType is of type ArrayType

Source code in src/koheesio/spark/utils.py
def spark_data_type_is_array(data_type: DataType) -> bool:
    """Check if the column's dataType is of type ArrayType"""
    return isinstance(data_type, ArrayType)

koheesio.spark.utils.spark_data_type_is_numeric #

spark_data_type_is_numeric(data_type: DataType) -> bool

Check if the column's dataType is of type ArrayType

Source code in src/koheesio/spark/utils.py
def spark_data_type_is_numeric(data_type: DataType) -> bool:
    """Check if the column's dataType is of type ArrayType"""
    return isinstance(data_type, (IntegerType, LongType, FloatType, DoubleType, DecimalType))