Skip to content

Utils

koheesio.spark.utils.SPARK_MINOR_VERSION module-attribute #

SPARK_MINOR_VERSION: float = get_spark_minor_version()

koheesio.spark.utils.SparkDatatype #

Allowed spark datatypes

The following table lists the data types that are supported by Spark SQL.

Data type SQL name
ByteType BYTE, TINYINT
ShortType SHORT, SMALLINT
IntegerType INT, INTEGER
LongType LONG, BIGINT
FloatType FLOAT, REAL
DoubleType DOUBLE
DecimalType DECIMAL, DEC, NUMERIC
StringType STRING
BinaryType BINARY
BooleanType BOOLEAN
TimestampType TIMESTAMP, TIMESTAMP_LTZ
DateType DATE
ArrayType ARRAY
MapType MAP
NullType VOID
Not supported yet
  • TimestampNTZType TIMESTAMP_NTZ
  • YearMonthIntervalType INTERVAL YEAR, INTERVAL YEAR TO MONTH, INTERVAL MONTH
  • DayTimeIntervalType INTERVAL DAY, INTERVAL DAY TO HOUR, INTERVAL DAY TO MINUTE, INTERVAL DAY TO SECOND, INTERVAL HOUR, INTERVAL HOUR TO MINUTE, INTERVAL HOUR TO SECOND, INTERVAL MINUTE, INTERVAL MINUTE TO SECOND, INTERVAL SECOND
See Also

https://spark.apache.org/docs/latest/sql-ref-datatypes.html#supported-data-types

ARRAY class-attribute instance-attribute #

ARRAY = 'array'

BIGINT class-attribute instance-attribute #

BIGINT = 'long'

BINARY class-attribute instance-attribute #

BINARY = 'binary'

BOOLEAN class-attribute instance-attribute #

BOOLEAN = 'boolean'

BYTE class-attribute instance-attribute #

BYTE = 'byte'

DATE class-attribute instance-attribute #

DATE = 'date'

DEC class-attribute instance-attribute #

DEC = 'decimal'

DECIMAL class-attribute instance-attribute #

DECIMAL = 'decimal'

DOUBLE class-attribute instance-attribute #

DOUBLE = 'double'

FLOAT class-attribute instance-attribute #

FLOAT = 'float'

INT class-attribute instance-attribute #

INT = 'integer'

INTEGER class-attribute instance-attribute #

INTEGER = 'integer'

LONG class-attribute instance-attribute #

LONG = 'long'

MAP class-attribute instance-attribute #

MAP = 'map'

NUMERIC class-attribute instance-attribute #

NUMERIC = 'decimal'

REAL class-attribute instance-attribute #

REAL = 'float'

SHORT class-attribute instance-attribute #

SHORT = 'short'

SMALLINT class-attribute instance-attribute #

SMALLINT = 'short'

STRING class-attribute instance-attribute #

STRING = 'string'

TIMESTAMP class-attribute instance-attribute #

TIMESTAMP = 'timestamp'

TIMESTAMP_LTZ class-attribute instance-attribute #

TIMESTAMP_LTZ = 'timestamp'

TINYINT class-attribute instance-attribute #

TINYINT = 'byte'

VOID class-attribute instance-attribute #

VOID = 'void'

spark_type property #

spark_type: type

Returns the spark type for the given enum value

from_string classmethod #

from_string(value: str) -> SparkDatatype

Allows for getting the right Enum value by simply passing a string value This method is not case-sensitive

Source code in src/koheesio/spark/utils/common.py
@classmethod
def from_string(cls, value: str) -> "SparkDatatype":
    """Allows for getting the right Enum value by simply passing a string value
    This method is not case-sensitive
    """
    return getattr(cls, value.upper())

koheesio.spark.utils.check_if_pyspark_connect_is_supported #

check_if_pyspark_connect_is_supported() -> bool

Check if the current version of PySpark supports the connect module

Source code in src/koheesio/spark/utils/common.py
def check_if_pyspark_connect_is_supported() -> bool:
    """Check if the current version of PySpark supports the connect module"""
    if SPARK_MINOR_VERSION >= 3.5:
        try:
            importlib.import_module("pyspark.sql.connect")
            from pyspark.sql.connect.column import Column

            _col: Column  # type: ignore
            return True
        except (ModuleNotFoundError, ImportError):
            return False
    return False

koheesio.spark.utils.get_column_name #

get_column_name(col: Column) -> str

Get the column name from a Column object

Normally, the name of a Column object is not directly accessible in the regular pyspark API. This function extracts the name of the given column object without needing to provide it in the context of a DataFrame.

Parameters:

Name Type Description Default
col Column

The Column object

required

Returns:

Type Description
str

The name of the given column

Source code in src/koheesio/spark/utils/common.py
def get_column_name(col: Column) -> str:  # type: ignore
    """Get the column name from a Column object

    Normally, the name of a Column object is not directly accessible in the regular pyspark API. This function
    extracts the name of the given column object without needing to provide it in the context of a DataFrame.

    Parameters
    ----------
    col: Column
        The Column object

    Returns
    -------
    str
        The name of the given column
    """
    # we have to distinguish between the Column object from column from local session and remote
    if hasattr(col, "_jc"):
        # In case of a 'regular' Column object, we can directly access the name attribute through the _jc attribute
        # noinspection PyProtectedMember
        name = col._jc.toString()  # type: ignore[operator]
    elif any(cls.__module__ == "pyspark.sql.connect.column" for cls in inspect.getmro(col.__class__)):
        # noinspection PyProtectedMember
        name = col._expr.name()
    else:
        raise ValueError("Column object is not a valid Column object")

    return name

koheesio.spark.utils.get_spark_minor_version #

get_spark_minor_version() -> float

Returns the minor version of the spark instance.

For example, if the spark version is 3.3.2, this function would return 3.3

Source code in src/koheesio/spark/utils/common.py
def get_spark_minor_version() -> float:
    """Returns the minor version of the spark instance.

    For example, if the spark version is 3.3.2, this function would return 3.3
    """
    return float(".".join(spark_version.split(".")[:2]))

koheesio.spark.utils.import_pandas_based_on_pyspark_version #

import_pandas_based_on_pyspark_version() -> ModuleType

This function checks the installed version of PySpark and then tries to import the appropriate version of pandas. If the correct version of pandas is not installed, it raises an ImportError with a message indicating which version of pandas should be installed.

Source code in src/koheesio/spark/utils/common.py
def import_pandas_based_on_pyspark_version() -> ModuleType:
    """
    This function checks the installed version of PySpark and then tries to import the appropriate version of pandas.
    If the correct version of pandas is not installed, it raises an ImportError with a message indicating which version
    of pandas should be installed.
    """
    try:
        import pandas as pd

        pyspark_version = get_spark_minor_version()
        pandas_version = pd.__version__

        if (pyspark_version < 3.4 and pandas_version >= "2") or (pyspark_version >= 3.4 and pandas_version < "2"):
            raise ImportError(
                f"For PySpark {pyspark_version}, "
                f"please install Pandas version {'< 2' if pyspark_version < 3.4 else '>= 2'}"
            )

        return pd
    except ImportError as e:
        raise ImportError("Pandas module is not installed.") from e

koheesio.spark.utils.on_databricks #

on_databricks() -> bool

Retrieve if we're running on databricks or elsewhere

Source code in src/koheesio/spark/utils/common.py
def on_databricks() -> bool:
    """Retrieve if we're running on databricks or elsewhere"""
    dbr_version = os.getenv("DATABRICKS_RUNTIME_VERSION", None)
    return dbr_version is not None and dbr_version != ""

koheesio.spark.utils.schema_struct_to_schema_str #

schema_struct_to_schema_str(schema: StructType) -> str

Converts a StructType to a schema str

Source code in src/koheesio/spark/utils/common.py
def schema_struct_to_schema_str(schema: StructType) -> str:
    """Converts a StructType to a schema str"""
    if not schema:
        return ""
    return ",\n".join([f"{field.name} {field.dataType.typeName().upper()}" for field in schema.fields])

koheesio.spark.utils.show_string #

show_string(
    df: DataFrame,
    n: int = 20,
    truncate: Union[bool, int] = True,
    vertical: bool = False,
) -> str

Returns a string representation of the DataFrame The default implementation of DataFrame.show() hardcodes a print statement, which is not always desirable. With this function, you can get the string representation of the DataFrame instead, and choose how to display it.

Example
print(show_string(df))

# or use with a logger
logger.info(show_string(df))

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to display

required
n int

The number of rows to display, by default 20

20
truncate Union[bool, int]

If set to True, truncate the displayed columns, by default True

True
vertical bool

If set to True, display the DataFrame vertically, by default False

False
Source code in src/koheesio/spark/utils/common.py
def show_string(df: DataFrame, n: int = 20, truncate: Union[bool, int] = True, vertical: bool = False) -> str:  # type: ignore
    """Returns a string representation of the DataFrame
    The default implementation of DataFrame.show() hardcodes a print statement, which is not always desirable.
    With this function, you can get the string representation of the DataFrame instead, and choose how to display it.

    Example
    -------
    ```python
    print(show_string(df))

    # or use with a logger
    logger.info(show_string(df))
    ```

    Parameters
    ----------
    df : DataFrame
        The DataFrame to display
    n : int, optional
        The number of rows to display, by default 20
    truncate : Union[bool, int], optional
        If set to True, truncate the displayed columns, by default True
    vertical : bool, optional
        If set to True, display the DataFrame vertically, by default False
    """
    if SPARK_MINOR_VERSION < 3.5:
        # noinspection PyProtectedMember
        return df._jdf.showString(n, truncate, vertical)  # type: ignore
    # as per spark 3.5, the _show_string method is now available making calls to _jdf.showString obsolete
    # noinspection PyProtectedMember
    return df._show_string(n, truncate, vertical)

koheesio.spark.utils.spark_data_type_is_array #

spark_data_type_is_array(data_type: DataType) -> bool

Check if the column's dataType is of type ArrayType

Source code in src/koheesio/spark/utils/common.py
def spark_data_type_is_array(data_type: DataType) -> bool:  # type: ignore
    """Check if the column's dataType is of type ArrayType"""
    return isinstance(data_type, ArrayType)

koheesio.spark.utils.spark_data_type_is_numeric #

spark_data_type_is_numeric(data_type: DataType) -> bool

Check if the column's dataType is of type ArrayType

Source code in src/koheesio/spark/utils/common.py
def spark_data_type_is_numeric(data_type: DataType) -> bool:  # type: ignore
    """Check if the column's dataType is of type ArrayType"""
    return isinstance(data_type, (IntegerType, LongType, FloatType, DoubleType, DecimalType))