Skip to content

Asyncio

This module provides classes for asynchronous steps in the koheesio package.

koheesio.asyncio.AsyncStep #

Asynchronous step class that inherits from Step and uses the AsyncStepMetaClass metaclass.

Attributes:

Name Type Description
Output AsyncStepOutput

The output class for the asynchronous step.

Output #

Output class for asyncio step.

This class represents the output of the asyncio step. It inherits from the AsyncStepOutput class.

koheesio.asyncio.AsyncStepMetaClass #

Metaclass for asynchronous steps.

This metaclass is used to define asynchronous steps in the Koheesio framework. It inherits from the StepMetaClass and provides additional functionality for executing asynchronous steps.

Attributes: None

Methods: _execute_wrapper: Wrapper method for executing asynchronous steps.

koheesio.asyncio.AsyncStepOutput #

Represents the output of an asynchronous step.

This class extends the base Step.Output class and provides additional functionality for merging key-value maps.

Attributes:

Name Type Description
...

Methods:

Name Description
merge

Merge key-value map with self.

merge #

merge(other: Union[Dict, StepOutput])

Merge key,value map with self

Examples:

step_output = StepOutput(foo="bar")
step_output.merge(
    {"lorem": "ipsum"}
)  # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}

Functionally similar to adding two dicts together; like running {**dict_a, **dict_b}.

Parameters:

Name Type Description Default
other Union[Dict, StepOutput]

Dict or another instance of a StepOutputs class that will be added to self

required
Source code in src/koheesio/asyncio/__init__.py
def merge(self, other: Union[Dict, StepOutput]):
    """Merge key,value map with self

    Examples
    --------
    ```python
    step_output = StepOutput(foo="bar")
    step_output.merge(
        {"lorem": "ipsum"}
    )  # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
    ```

    Functionally similar to adding two dicts together; like running `{**dict_a, **dict_b}`.

    Parameters
    ----------
    other: Union[Dict, StepOutput]
        Dict or another instance of a StepOutputs class that will be added to self
    """
    if isinstance(other, StepOutput):
        other = other.model_dump()  # ensures we really have a dict

    if not iscoroutine(other):
        for k, v in other.items():
            self.set(k, v)

    return self