Skip to content

Concat

Concatenates multiple input columns together into a single column, optionally using a given separator.

koheesio.spark.transformations.strings.concat.Concat #

This is a wrapper around PySpark concat() and concat_ws() functions

Concatenates multiple input columns together into a single column, optionally using a given separator. The function works with strings, date/timestamps, binary, and compatible array columns.

Concept

When working with arrays, the function will return the result of the concatenation of the elements in the array.

  • If a spacer is used, the resulting output will be a string with the elements of the array separated by the spacer.
  • If no spacer is used, the resulting output will be an array with the elements of the array concatenated together.

When working with date/timestamps, the function will return the result of the concatenation of the elements in the array. The timestamp is converted to a string using the default format of yyyy-MM-dd HH:mm:ss.

Parameters:

Name Type Description Default
columns Union[str, List[str]]

The column (or list of columns) to concatenate. Alias: column. If at least one of the values is None or null, the resulting string will also be None/null (except for when using arrays). Columns can be of any type, but must ideally be of the same type. Different types can be used, but the function will convert them to string values first.

required
target_column Optional[str]

Target column name. When this is left blank, a name will be generated by concatenating the names of the source columns with an '_'.

None
spacer Optional[str]

Optional spacer / separator symbol. Defaults to None. When left blank, no spacer is used

None
Example
Example using a string column and a timestamp column#

input_df:

column_a column_b
text 1997-02-28 10:30:00
output_df = Concat(
    columns=["column_a", "column_b"],
    target_column="concatenated_column",
    spacer="--",
).transform(input_df)

output_df:

column_a column_b concatenated_column
text 1997-02-28 10:30:00 text--1997-02-28 10:30:00

In the example above, the resulting column is a string column.

If we had left out the spacer, the resulting column would have had the value of text1997-02-28 10:30:00 (a string). Note that the timestamp is converted to a string using the default format of yyyy-MM-dd HH:mm:ss.

Example using two array columns#

input_df:

array_col_1 array_col_2
[text1, text2] [text3, text4]
output_df = Concat(
    columns=["array_col_1", "array_col_2"],
    target_column="concatenated_column",
    spacer="--",
).transform(input_df)

output_df:

array_col_1 array_col_2 concatenated_column
[text1, text2] [text3, text4] "text1--text2--text3"

Note that the null value in the second array is ignored. If we had left out the spacer, the resulting column would have been an array with the values of ["text1", "text2", "text3"].

Array columns can only be concatenated with another array column. If you want to concatenate an array column with a none-array value, you will have to convert said column to an array first.

spacer class-attribute instance-attribute #

spacer: Optional[str] = Field(
    default=None,
    description="Optional spacer / separator symbol. Defaults to None. When left blank, no spacer is used",
    alias="sep",
)

target_column class-attribute instance-attribute #

target_column: Optional[str] = Field(
    default=None,
    description="Target column name. When this is left blank, a name will be generated by concatenating the names of the source columns with an '_'.",
)

execute #

execute() -> Output
Source code in src/koheesio/spark/transformations/strings/concat.py
def execute(self) -> ColumnsTransformation.Output:
    columns = [col(s) for s in self.get_columns()]
    self.output.df = self.df.withColumn(  # type: ignore
        self.target_column, concat_ws(self.spacer, *columns) if self.spacer else concat(*columns)
    )

get_target_column #

get_target_column(
    target_column_value: str, values: dict
) -> str

Get the target column name if it is not provided.

If not provided, a name will be generated by concatenating the names of the source columns with an '_'.

Source code in src/koheesio/spark/transformations/strings/concat.py
@field_validator("target_column")
def get_target_column(cls, target_column_value: str, values: dict) -> str:
    """Get the target column name if it is not provided.

    If not provided, a name will be generated by concatenating the names of the source columns with an '_'."""
    if not target_column_value:
        columns_value: List = values["columns"]
        columns = list(dict.fromkeys(columns_value))  # dict.fromkeys is used to dedup while maintaining order
        return "_".join(columns)

    return target_column_value