Skip to content

Replace

Transformation to replace a particular value in a column with another one

koheesio.spark.transformations.replace.Replace #

Replace a particular value in a column with another one

Can handle empty strings ("") as well as NULL / None values.

Unsupported datatypes:

Following casts are not supported

will raise an error in Spark:

  • binary
  • boolean
  • array<...>
  • map<...,...>
Supported datatypes:

Following casts are supported:

  • byte
  • short
  • integer
  • long
  • float
  • double
  • decimal
  • timestamp
  • date
  • string
  • void skipped by default

Any supported none-string datatype will be cast to string before the replacement is done.

Example

input_df:

id string
1 hello
2 world
3
output_df = Replace(
    column="string",
    from_value="hello",
    to_value="programmer",
).transform(input_df)

output_df:

id string
1 programmer
2 world
3

In this example, the value "hello" in the column "string" is replaced with "programmer".

from_value class-attribute instance-attribute #

from_value: Optional[str] = Field(
    default=None,
    alias="from",
    description="The original value that needs to be replaced. If no value is given, all 'null' values will be replaced with the to_value",
)

to_value class-attribute instance-attribute #

to_value: str = Field(
    default=...,
    alias="to",
    description="The new value to replace this with",
)

ColumnConfig #

Column type configurations for the column to be replaced

limit_data_type class-attribute instance-attribute #

limit_data_type = [*run_for_all_data_type, VOID]

run_for_all_data_type class-attribute instance-attribute #

run_for_all_data_type = [
    BYTE,
    SHORT,
    INTEGER,
    LONG,
    FLOAT,
    DOUBLE,
    DECIMAL,
    STRING,
    TIMESTAMP,
    DATE,
]

func #

func(column: Column) -> Column
Source code in src/koheesio/spark/transformations/replace.py
def func(self, column: Column) -> Column:
    return replace(column=column, from_value=self.from_value, to_value=self.to_value)

koheesio.spark.transformations.replace.replace #

replace(
    column: Union[Column, str],
    to_value: str,
    from_value: Optional[str] = None,
) -> Column

Function to replace a particular value in a column with another one

Source code in src/koheesio/spark/transformations/replace.py
def replace(column: Union[Column, str], to_value: str, from_value: Optional[str] = None) -> Column:
    """Function to replace a particular value in a column with another one"""
    # make sure we have a Column object
    if isinstance(column, str):
        column = col(column)

    if not from_value:
        condition = column.isNull()
    else:
        condition = column == from_value

    return when(condition, lit(to_value)).otherwise(column)