Steps
Steps Module
This module contains the definition of the Step
class, which serves as the base class for custom units of logic that
can be executed. It also includes the StepOutput
class, which defines the output data model for a Step
.
The Step
class is designed to be subclassed for creating new steps in a data pipeline. Each subclass should implement
the execute
method, specifying the expected inputs and outputs.
This module also exports the SparkStep
class for steps that interact with Spark
Classes:
- Step: Base class for a custom unit of logic that can be executed.
- StepOutput: Defines the output data model for a
Step
.
koheesio.steps.Step #
Base class for a step
A custom unit of logic that can be executed.
The Step class is designed to be subclassed. To create a new step, one would subclass Step and implement the
def execute(self)
method, specifying the expected inputs and outputs.
Note: since the Step class is meta classed, the execute method is wrapped with the
do_execute
function making it always return the Step's output. Hence, an explicit return is not needed when implementing execute.
Methods and Attributes
The Step class has several attributes and methods.
INPUT#
The following fields are available by default on the Step class:
- name
: Name of the Step. If not set, the name of the class will be used.
- description
: Description of the Step. If not set, the docstring of the class will be used. If the docstring
contains multiple lines, only the first line will be used.
When subclassing a Step, any additional pydantic field will be treated as input
to the Step. See also the
explanation on the .execute()
method below.
OUTPUT#
Every Step has an Output
class, which is a subclass of StepOutput
. This class is used to validate the output of
the Step. The Output
class is defined as an inner class of the Step class. The Output
class can be accessed
through the Step.Output
attribute. The Output
class can be extended to add additional fields to the output of
the Step. See also the explanation on the .execute()
.
Output
: A nested class representing the output of the Step used to validate the output of the Step and based on the StepOutput class.output
: Allows you to interact with the Output of the Step lazily (see above and StepOutput)
When subclassing a Step, any additional pydantic field added to the nested Output
class will be treated as
output
of the Step. See also the description of StepOutput
for more information.
Methods:#
execute
: Abstract method to implement for new steps.- The Inputs of the step can be accessed, using
self.input_name
. - The output of the step can be accessed, using
self.output.output_name
.
- The Inputs of the step can be accessed, using
run
: Alias to .execute() method. You can use this to run the step, but execute is preferred.to_yaml
: YAML dump the stepget_description
: Get the description of the Step
When subclassing a Step, execute
is the only method that needs to be implemented. Any additional method added
to the class will be treated as a method of the Step.
Note: since the Step class is meta-classed, the execute method is automatically wrapped with the do_execute
function making it always return a StepOutput. See also the explanation on the do_execute
function.
class methods:#
from_step
: Returns a new Step instance based on the data of another Step instance. for example:MyStep.from_step(other_step, a="foo")
get_description
: Get the description of the Step
dunder methods:#
__getattr__
: Allows input to be accessed throughself.input_name
__repr__
and__str__
: String representation of a step
Background
A Step is an atomic operation and serves as the building block of data pipelines built with the framework. Tasks typically consist of a series of Steps.
A step can be seen as an operation on a set of inputs, that returns a set of outputs. This however does not imply that steps are stateless (e.g. data writes)!
The diagram serves to illustrate the concept of a Step:
┌─────────┐ ┌──────────────────┐ ┌─────────┐
│ Input 1 │───────▶│ ├───────▶│Output 1 │
└─────────┘ │ │ └─────────┘
│ │
┌─────────┐ │ │ ┌─────────┐
│ Input 2 │───────▶│ Step │───────▶│Output 2 │
└─────────┘ │ │ └─────────┘
│ │
┌─────────┐ │ │ ┌─────────┐
│ Input 3 │───────▶│ ├───────▶│Output 3 │
└─────────┘ └──────────────────┘ └─────────┘
Steps are built on top of Pydantic, which is a data validation and settings management using python type annotations. This allows for the automatic validation of the inputs and outputs of a Step.
- Step inherits from BaseModel, which is a Pydantic class used to define data models. This allows Step to automatically validate data against the defined fields and their types.
- Step is metaclassed by StepMetaClass, which is a custom metaclass that wraps the
execute
method of the Step class with the_execute_wrapper
function. This ensures that theexecute
method always returns the output of the Step along with providing logging and validation of the output. - Step has an
Output
class, which is a subclass ofStepOutput
. This class is used to validate the output of the Step. TheOutput
class is defined as an inner class of the Step class. TheOutput
class can be accessed through theStep.Output
attribute. - The
Output
class can be extended to add additional fields to the output of the Step.
Examples:
class MyStep(Step):
a: str # input
class Output(StepOutput): # output
b: str
def execute(self) -> MyStep.Output:
self.output.b = f"{self.a}-some-suffix"
Output #
Output class for Step
execute
abstractmethod
#
execute() -> InstanceOf[StepOutput]
Abstract method to implement for new steps.
The Inputs of the step can be accessed, using self.input_name
Note: since the Step class is meta-classed, the execute method is wrapped with the do_execute
function making
it always return the Steps output
Source code in src/koheesio/steps/__init__.py
from_step
classmethod
#
from_step(step: Step, **kwargs) -> InstanceOf[BaseModel]
Returns a new Step instance based on the data of another Step or BaseModel instance
Source code in src/koheesio/steps/__init__.py
repr_json #
dump the step to json, meant for representation
Note: use to_json if you want to dump the step to json for serialization This method is meant for representation purposes only!
Examples:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
simple
|
bool
|
When toggled to True, a briefer output will be produced. This is friendlier for logging purposes |
False
|
Returns:
Type | Description |
---|---|
str
|
A string, which is valid json |
Source code in src/koheesio/steps/__init__.py
repr_yaml #
dump the step to yaml, meant for representation
Note: use to_yaml if you want to dump the step to yaml for serialization This method is meant for representation purposes only!
Examples:
Parameters:
Name | Type | Description | Default |
---|---|---|---|
simple
|
bool
|
When toggled to True, a briefer output will be produced. This is friendlier for logging purposes |
False
|
Returns:
Type | Description |
---|---|
str
|
A string, which is valid yaml |
Source code in src/koheesio/steps/__init__.py
run #
run() -> InstanceOf[StepOutput]
koheesio.steps.StepMetaClass #
StepMetaClass has to be set up as a Metaclass extending ModelMetaclass to allow Pydantic to be unaffected while allowing for the execute method to be auto-decorated with do_execute
koheesio.steps.StepOutput #
Class for the StepOutput model
Usage
Setting up the StepOutputs class is done like this:
model_config
class-attribute
instance-attribute
#
validate_output #
validate_output() -> StepOutput
Validate the output of the Step
Essentially, this method is a wrapper around the validate method of the BaseModel class