Skip to content

Steps

Steps Module

This module contains the definition of the Step class, which serves as the base class for custom units of logic that can be executed. It also includes the StepOutput class, which defines the output data model for a Step.

The Step class is designed to be subclassed for creating new steps in a data pipeline. Each subclass should implement the execute method, specifying the expected inputs and outputs.

This module also exports the SparkStep class for steps that interact with Spark

Classes:
  • Step: Base class for a custom unit of logic that can be executed.
  • StepOutput: Defines the output data model for a Step.

koheesio.steps.Step #

Base class for a step

A custom unit of logic that can be executed.

The Step class is designed to be subclassed. To create a new step, one would subclass Step and implement the def execute(self) method, specifying the expected inputs and outputs.

Note: since the Step class is meta classed, the execute method is wrapped with the do_execute function making it always return the Step's output. Hence, an explicit return is not needed when implementing execute.

Methods and Attributes

The Step class has several attributes and methods.

INPUT#

The following fields are available by default on the Step class: - name: Name of the Step. If not set, the name of the class will be used. - description: Description of the Step. If not set, the docstring of the class will be used. If the docstring contains multiple lines, only the first line will be used.

When subclassing a Step, any additional pydantic field will be treated as input to the Step. See also the explanation on the .execute() method below.

OUTPUT#

Every Step has an Output class, which is a subclass of StepOutput. This class is used to validate the output of the Step. The Output class is defined as an inner class of the Step class. The Output class can be accessed through the Step.Output attribute. The Output class can be extended to add additional fields to the output of the Step. See also the explanation on the .execute().

  • Output: A nested class representing the output of the Step used to validate the output of the Step and based on the StepOutput class.
  • output: Allows you to interact with the Output of the Step lazily (see above and StepOutput)

When subclassing a Step, any additional pydantic field added to the nested Output class will be treated as output of the Step. See also the description of StepOutput for more information.

Methods:#
  • execute: Abstract method to implement for new steps.
    • The Inputs of the step can be accessed, using self.input_name.
    • The output of the step can be accessed, using self.output.output_name.
  • run: Alias to .execute() method. You can use this to run the step, but execute is preferred.
  • to_yaml: YAML dump the step
  • get_description: Get the description of the Step

When subclassing a Step, execute is the only method that needs to be implemented. Any additional method added to the class will be treated as a method of the Step.

Note: since the Step class is meta-classed, the execute method is automatically wrapped with the do_execute function making it always return a StepOutput. See also the explanation on the do_execute function.

class methods:#
  • from_step: Returns a new Step instance based on the data of another Step instance. for example: MyStep.from_step(other_step, a="foo")
  • get_description: Get the description of the Step
dunder methods:#
  • __getattr__: Allows input to be accessed through self.input_name
  • __repr__ and __str__: String representation of a step
Background

A Step is an atomic operation and serves as the building block of data pipelines built with the framework. Tasks typically consist of a series of Steps.

A step can be seen as an operation on a set of inputs, that returns a set of outputs. This however does not imply that steps are stateless (e.g. data writes)!

The diagram serves to illustrate the concept of a Step:

┌─────────┐        ┌──────────────────┐        ┌─────────┐
│ Input 1 │───────▶│                  ├───────▶│Output 1 │
└─────────┘        │                  │        └─────────┘
                   │                  │
┌─────────┐        │                  │        ┌─────────┐
│ Input 2 │───────▶│       Step       │───────▶│Output 2 │
└─────────┘        │                  │        └─────────┘
                   │                  │
┌─────────┐        │                  │        ┌─────────┐
│ Input 3 │───────▶│                  ├───────▶│Output 3 │
└─────────┘        └──────────────────┘        └─────────┘

Steps are built on top of Pydantic, which is a data validation and settings management using python type annotations. This allows for the automatic validation of the inputs and outputs of a Step.

  • Step inherits from BaseModel, which is a Pydantic class used to define data models. This allows Step to automatically validate data against the defined fields and their types.
  • Step is metaclassed by StepMetaClass, which is a custom metaclass that wraps the execute method of the Step class with the _execute_wrapper function. This ensures that the execute method always returns the output of the Step along with providing logging and validation of the output.
  • Step has an Output class, which is a subclass of StepOutput. This class is used to validate the output of the Step. The Output class is defined as an inner class of the Step class. The Output class can be accessed through the Step.Output attribute.
  • The Output class can be extended to add additional fields to the output of the Step.

Examples:

class MyStep(Step):
    a: str  # input

    class Output(StepOutput):  # output
        b: str

    def execute(self) -> MyStep.Output:
        self.output.b = f"{self.a}-some-suffix"

output property writable #

output: Output

Interact with the output of the Step

Output #

Output class for Step

execute abstractmethod #

execute() -> InstanceOf[StepOutput]

Abstract method to implement for new steps.

The Inputs of the step can be accessed, using self.input_name

Note: since the Step class is meta-classed, the execute method is wrapped with the do_execute function making it always return the Steps output

Source code in src/koheesio/steps/__init__.py
@abstractmethod
def execute(self) -> InstanceOf[StepOutput]:
    """Abstract method to implement for new steps.

    The Inputs of the step can be accessed, using `self.input_name`

    Note: since the Step class is meta-classed, the execute method is wrapped with the `do_execute` function making
      it always return the Steps output
    """
    raise NotImplementedError

from_step classmethod #

from_step(step: Step, **kwargs) -> InstanceOf[BaseModel]

Returns a new Step instance based on the data of another Step or BaseModel instance

Source code in src/koheesio/steps/__init__.py
@classmethod
def from_step(cls, step: Step, **kwargs) -> InstanceOf[PydanticBaseModel]:  # type: ignore[no-untyped-def]
    """Returns a new Step instance based on the data of another Step or BaseModel instance"""
    return cls.from_basemodel(step, **kwargs)

repr_json #

repr_json(simple: bool = False) -> str

dump the step to json, meant for representation

Note: use to_json if you want to dump the step to json for serialization This method is meant for representation purposes only!

Examples:

>>> step = MyStep(a="foo")
>>> print(step.repr_json())
{"input": {"a": "foo"}}

Parameters:

Name Type Description Default
simple bool

When toggled to True, a briefer output will be produced. This is friendlier for logging purposes

False

Returns:

Type Description
str

A string, which is valid json

Source code in src/koheesio/steps/__init__.py
def repr_json(self, simple: bool = False) -> str:
    """dump the step to json, meant for representation

    Note: use to_json if you want to dump the step to json for serialization
    This method is meant for representation purposes only!

    Examples
    --------
    ```python
    >>> step = MyStep(a="foo")
    >>> print(step.repr_json())
    {"input": {"a": "foo"}}
    ```

    Parameters
    ----------
    simple: bool
        When toggled to True, a briefer output will be produced. This is friendlier for logging purposes

    Returns
    -------
    str
        A string, which is valid json
    """
    model_dump_options = dict(warnings="none", exclude_unset=True)

    _result = {}

    # extract input
    _input = self.model_dump(**model_dump_options)  # type: ignore[arg-type]

    # remove name and description from input and add to result if simple is not set
    name = _input.pop("name", None)
    description = _input.pop("description", None)
    if not simple:
        if name:
            _result["name"] = name
        if description:
            _result["description"] = description
    else:
        model_dump_options["exclude"] = {"name", "description"}

    # extract output
    _output = self.output.model_dump(**model_dump_options)  # type: ignore[arg-type]

    # add output to result
    if _output:
        _result["output"] = _output

    # add input to result
    _result["input"] = _input

    class MyEncoder(json.JSONEncoder):
        """Custom JSON Encoder to handle non-serializable types"""

        def default(self, o: Any) -> Any:
            try:
                return super().default(o)
            except TypeError:
                return o.__class__.__name__

    # Use MyEncoder when converting the dictionary to a JSON string
    json_str = json.dumps(_result, cls=MyEncoder)

    return json_str

repr_yaml #

repr_yaml(simple: bool = False) -> str

dump the step to yaml, meant for representation

Note: use to_yaml if you want to dump the step to yaml for serialization This method is meant for representation purposes only!

Examples:

>>> step = MyStep(a="foo")
>>> print(step.repr_yaml())
input:
  a: foo

Parameters:

Name Type Description Default
simple bool

When toggled to True, a briefer output will be produced. This is friendlier for logging purposes

False

Returns:

Type Description
str

A string, which is valid yaml

Source code in src/koheesio/steps/__init__.py
def repr_yaml(self, simple: bool = False) -> str:
    """dump the step to yaml, meant for representation

    Note: use to_yaml if you want to dump the step to yaml for serialization
    This method is meant for representation purposes only!

    Examples
    --------
    ```python
    >>> step = MyStep(a="foo")
    >>> print(step.repr_yaml())
    input:
      a: foo
    ```

    Parameters
    ----------
    simple: bool
        When toggled to True, a briefer output will be produced. This is friendlier for logging purposes

    Returns
    -------
    str
        A string, which is valid yaml
    """
    json_str = self.repr_json(simple=simple)

    # Parse the JSON string back into a dictionary
    _result = json.loads(json_str)

    return yaml.dump(_result)

run #

run() -> InstanceOf[StepOutput]

Alias to .execute()

Source code in src/koheesio/steps/__init__.py
def run(self) -> InstanceOf[StepOutput]:
    """Alias to .execute()"""
    return self.execute()

koheesio.steps.StepMetaClass #

StepMetaClass has to be set up as a Metaclass extending ModelMetaclass to allow Pydantic to be unaffected while allowing for the execute method to be auto-decorated with do_execute

koheesio.steps.StepOutput #

Class for the StepOutput model

Usage

Setting up the StepOutputs class is done like this:

class YourOwnOutput(StepOutput):
    a: str
    b: int

model_config class-attribute instance-attribute #

model_config = ConfigDict(
    validate_default=False, defer_build=True
)

validate_output #

validate_output() -> StepOutput

Validate the output of the Step

Essentially, this method is a wrapper around the validate method of the BaseModel class

Source code in src/koheesio/steps/__init__.py
def validate_output(self) -> StepOutput:
    """Validate the output of the Step

    Essentially, this method is a wrapper around the validate method of the BaseModel class
    """
    validated_model = self.validate()  # type: ignore[call-arg]
    return StepOutput.from_basemodel(validated_model)