Steps
Steps Module
This module contains the definition of the Step class, which serves as the base class for custom units of logic that
can be executed. It also includes the StepOutput class, which defines the output data model for a Step.
The Step class is designed to be subclassed for creating new steps in a data pipeline. Each subclass should implement
the execute method, specifying the expected inputs and outputs.
This module also exports the SparkStep class for steps that interact with Spark
Classes:
- Step: Base class for a custom unit of logic that can be executed.
- StepOutput: Defines the output data model for a
Step.
koheesio.steps.Step #
Base class for a step
A custom unit of logic that can be executed.
The Step class is designed to be subclassed. To create a new step, one would subclass Step and implement the
def execute(self) method, specifying the expected inputs and outputs.
Note: since the Step class is meta classed, the execute method is wrapped with the
do_executefunction making it always return the Step's output. Hence, an explicit return is not needed when implementing execute.
Methods and Attributes
The Step class has several attributes and methods.
INPUT#
The following fields are available by default on the Step class:
- name: Name of the Step. If not set, the name of the class will be used.
- description: Description of the Step. If not set, the docstring of the class will be used. If the docstring
contains multiple lines, only the first line will be used.
When subclassing a Step, any additional pydantic field will be treated as input to the Step. See also the
explanation on the .execute() method below.
OUTPUT#
Every Step has an Output class, which is a subclass of StepOutput. This class is used to validate the output of
the Step. The Output class is defined as an inner class of the Step class. The Output class can be accessed
through the Step.Output attribute. The Output class can be extended to add additional fields to the output of
the Step. See also the explanation on the .execute().
Output: A nested class representing the output of the Step used to validate the output of the Step and based on the StepOutput class.output: Allows you to interact with the Output of the Step lazily (see above and StepOutput)
When subclassing a Step, any additional pydantic field added to the nested Output class will be treated as
output of the Step. See also the description of StepOutput for more information.
Methods:#
execute: Abstract method to implement for new steps.- The Inputs of the step can be accessed, using
self.input_name. - The output of the step can be accessed, using
self.output.output_name.
- The Inputs of the step can be accessed, using
run: Alias to .execute() method. You can use this to run the step, but execute is preferred.to_yaml: YAML dump the stepget_description: Get the description of the Step
When subclassing a Step, execute is the only method that needs to be implemented. Any additional method added
to the class will be treated as a method of the Step.
Note: since the Step class is meta-classed, the execute method is automatically wrapped with the do_execute
function making it always return a StepOutput. See also the explanation on the do_execute function.
class methods:#
from_step: Returns a new Step instance based on the data of another Step instance. for example:MyStep.from_step(other_step, a="foo")get_description: Get the description of the Step
dunder methods:#
__getattr__: Allows input to be accessed throughself.input_name__repr__and__str__: String representation of a step
Background
A Step is an atomic operation and serves as the building block of data pipelines built with the framework. Tasks typically consist of a series of Steps.
A step can be seen as an operation on a set of inputs, that returns a set of outputs. This however does not imply that steps are stateless (e.g. data writes)!
The diagram serves to illustrate the concept of a Step:
┌─────────┐ ┌──────────────────┐ ┌─────────┐
│ Input 1 │───────▶│ ├───────▶│Output 1 │
└─────────┘ │ │ └─────────┘
│ │
┌─────────┐ │ │ ┌─────────┐
│ Input 2 │───────▶│ Step │───────▶│Output 2 │
└─────────┘ │ │ └─────────┘
│ │
┌─────────┐ │ │ ┌─────────┐
│ Input 3 │───────▶│ ├───────▶│Output 3 │
└─────────┘ └──────────────────┘ └─────────┘
Steps are built on top of Pydantic, which is a data validation and settings management using python type annotations. This allows for the automatic validation of the inputs and outputs of a Step.
- Step inherits from BaseModel, which is a Pydantic class used to define data models. This allows Step to automatically validate data against the defined fields and their types.
- Step is metaclassed by StepMetaClass, which is a custom metaclass that wraps the
executemethod of the Step class with the_execute_wrapperfunction. This ensures that theexecutemethod always returns the output of the Step along with providing logging and validation of the output. - Step has an
Outputclass, which is a subclass ofStepOutput. This class is used to validate the output of the Step. TheOutputclass is defined as an inner class of the Step class. TheOutputclass can be accessed through theStep.Outputattribute. - The
Outputclass can be extended to add additional fields to the output of the Step.
Examples:
class MyStep(Step):
a: str # input
class Output(StepOutput): # output
b: str
def execute(self) -> MyStep.Output:
self.output.b = f"{self.a}-some-suffix"
Output #
Output class for Step
execute
abstractmethod
#
Abstract method to implement for new steps.
The Inputs of the step can be accessed, using self.input_name
Note: since the Step class is meta-classed, the execute method is wrapped with the do_execute function making
it always return the Steps output
Source code in src/koheesio/steps/__init__.py
from_step
classmethod
#
from_step(step: Step, **kwargs)
Returns a new Step instance based on the data of another Step or BaseModel instance
repr_json #
repr_json(simple=False) -> str
dump the step to json, meant for representation
Note: use to_json if you want to dump the step to json for serialization This method is meant for representation purposes only!
Examples:
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
simple |
When toggled to True, a briefer output will be produced. This is friendlier for logging purposes |
False
|
Returns:
| Type | Description |
|---|---|
str
|
A string, which is valid json |
Source code in src/koheesio/steps/__init__.py
repr_yaml #
repr_yaml(simple=False) -> str
dump the step to yaml, meant for representation
Note: use to_yaml if you want to dump the step to yaml for serialization This method is meant for representation purposes only!
Examples:
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
simple |
When toggled to True, a briefer output will be produced. This is friendlier for logging purposes |
False
|
Returns:
| Type | Description |
|---|---|
str
|
A string, which is valid yaml |
Source code in src/koheesio/steps/__init__.py
koheesio.steps.StepMetaClass #
StepMetaClass has to be set up as a Metaclass extending ModelMetaclass to allow Pydantic to be unaffected while allowing for the execute method to be auto-decorated with do_execute
koheesio.steps.StepOutput #
Class for the StepOutput model
Usage
Setting up the StepOutputs class is done like this:
model_config
class-attribute
instance-attribute
#
validate_output #
validate_output() -> StepOutput
Validate the output of the Step
Essentially, this method is a wrapper around the validate method of the BaseModel class