Skip to content

API Reference

koheesio.ABOUT module-attribute #

ABOUT = _about()

koheesio.VERSION module-attribute #

VERSION = __version__

koheesio.BaseModel #

Base model for all models.

Extends pydantic BaseModel with some additional configuration. To be used as a base class for all models in Koheesio instead of pydantic.BaseModel.

Additional methods and properties:
Fields#

Every Koheesio BaseModel has two predefined fields: name and description. These fields are used to provide a name and a description to the model.

  • name: This is the name of the Model. If not provided, it defaults to the class name.

  • description: This is the description of the Model. It has several default behaviors:

    • If not provided, it defaults to the docstring of the class.
    • If the docstring is not provided, it defaults to the name of the class.
    • For multi-line descriptions, it has the following behaviors:
      • Only the first non-empty line is used.
      • Empty lines are removed.
      • Only the first 3 lines are considered.
      • Only the first 120 characters are considered.
Validators#
  • _set_name_and_description: Set the name and description of the Model as per the rules mentioned above.
Properties#
  • log: Returns a logger with the name of the class.
Class Methods#
  • from_basemodel: Returns a new BaseModel instance based on the data of another BaseModel.
  • from_context: Creates BaseModel instance from a given Context.
  • from_dict: Creates BaseModel instance from a given dictionary.
  • from_json: Creates BaseModel instance from a given JSON string.
  • from_toml: Creates BaseModel object from a given toml file.
  • from_yaml: Creates BaseModel object from a given yaml file.
  • lazy: Constructs the model without doing validation.
Dunder Methods#
  • __add__: Allows to add two BaseModel instances together.
  • __enter__: Allows for using the model in a with-statement.
  • __exit__: Allows for using the model in a with-statement.
  • __setitem__: Set Item dunder method for BaseModel.
  • __getitem__: Get Item dunder method for BaseModel.
Instance Methods#
  • hasattr: Check if given key is present in the model.
  • get: Get an attribute of the model, but don't fail if not present.
  • merge: Merge key,value map with self.
  • set: Allows for subscribing / assigning to class[key].
  • to_context: Converts the BaseModel instance to a Context object.
  • to_dict: Converts the BaseModel instance to a dictionary.
  • to_json: Converts the BaseModel instance to a JSON string.
  • to_yaml: Converts the BaseModel instance to a YAML string.
Different Modes

This BaseModel class supports lazy mode. This means that validation of the items stored in the class can be called at will instead of being forced to run it upfront.

  • Normal mode: you need to know the values ahead of time

    normal_mode = YourOwnModel(a="foo", b=42)
    

  • Lazy mode: being able to defer the validation until later

    lazy_mode = YourOwnModel.lazy()
    lazy_mode.a = "foo"
    lazy_mode.b = 42
    lazy_mode.validate_output()
    
    The prime advantage of using lazy mode is that you don't have to know all your outputs up front, and can add them as they become available. All while still being able to validate that you have collected all your output at the end.

  • With statements: With statements are also allowed. The validate_output method from the earlier example will run upon exit of the with-statement.

    with YourOwnModel.lazy() as with_output:
        with_output.a = "foo"
        with_output.b = 42
    
    Note: that a lazy mode BaseModel object is required to work with a with-statement.

Examples:

from koheesio.models import BaseModel


class Person(BaseModel):
    name: str
    age: int


# Using the lazy method to create an instance without immediate validation
person = Person.lazy()

# Setting attributes
person.name = "John Doe"
person.age = 30

# Now we validate the instance
person.validate_output()

print(person)

In this example, the Person instance is created without immediate validation. The attributes name and age are set afterward. The validate_output method is then called to validate the instance.

Koheesio specific configuration:

Koheesio models are configured differently from Pydantic defaults. The following configuration is used:

  1. extra="allow"

    This setting allows for extra fields that are not specified in the model definition. If a field is present in the data but not in the model, it will not raise an error. Pydantic default is "ignore", which means that extra attributes are ignored.

  2. arbitrary_types_allowed=True

    This setting allows for fields in the model to be of any type. This is useful when you want to include fields in your model that are not standard Python types. Pydantic default is False, which means that fields must be of a standard Python type.

  3. populate_by_name=True

    This setting allows an aliased field to be populated by its name as given by the model attribute, as well as the alias. This was known as allow_population_by_field_name in pydantic v1. Pydantic default is False, which means that fields can only be populated by their alias.

  4. validate_assignment=False

    This setting determines whether the model should be revalidated when the data is changed. If set to True, every time a field is assigned a new value, the entire model is validated again.

    Pydantic default is (also) False, which means that the model is not revalidated when the data is changed. The default behavior of Pydantic is to validate the data when the model is created. In case the user changes the data after the model is created, the model is not revalidated.

  5. revalidate_instances="subclass-instances"

    This setting determines whether to revalidate models during validation if the instance is a subclass of the model. This is important as inheritance is used a lot in Koheesio. Pydantic default is never, which means that the model and dataclass instances are not revalidated during validation.

  6. validate_default=True

    This setting determines whether to validate default values during validation. When set to True, default values are checked during the validation process. We opt to set this to True, as we are attempting to make the sure that the data is valid prior to running / executing any Step. Pydantic default is False, which means that default values are not validated during validation.

  7. frozen=False

    This setting determines whether the model is immutable. If set to True, once a model is created, its fields cannot be changed. Pydantic default is also False, which means that the model is mutable.

  8. coerce_numbers_to_str=True

    This setting determines whether to convert number fields to strings. When set to True, enables automatic coercion of any Number type to str. Pydantic doesn't allow number types (int, float, Decimal) to be coerced as type str by default.

  9. use_enum_values=True

    This setting determines whether to use the values of Enum fields. If set to True, the actual value of the Enum is used instead of the reference. Pydantic default is False, which means that the reference to the Enum is used.

description class-attribute instance-attribute #

description: Optional[str] = Field(
    default=None, description="Description of the Model"
)

log property #

log: Logger

Returns a logger with the name of the class

model_config class-attribute instance-attribute #

model_config = ConfigDict(
    extra="allow",
    arbitrary_types_allowed=True,
    populate_by_name=True,
    validate_assignment=False,
    revalidate_instances="subclass-instances",
    validate_default=True,
    frozen=False,
    coerce_numbers_to_str=True,
    use_enum_values=True,
)

name class-attribute instance-attribute #

name: Optional[str] = Field(
    default=None, description="Name of the Model"
)

from_basemodel classmethod #

from_basemodel(
    basemodel: BaseModel, **kwargs
) -> InstanceOf[BaseModel]

Returns a new BaseModel instance based on the data of another BaseModel

Source code in src/koheesio/models/__init__.py
@classmethod
def from_basemodel(cls, basemodel: BaseModel, **kwargs) -> InstanceOf[BaseModel]:
    """Returns a new BaseModel instance based on the data of another BaseModel"""
    kwargs = {**basemodel.model_dump(), **kwargs}
    return cls(**kwargs)

from_context classmethod #

from_context(context: Context) -> BaseModel

Creates BaseModel instance from a given Context

You have to make sure that the Context object has the necessary attributes to create the model.

Examples:

class SomeStep(BaseModel):
    foo: str


context = Context(foo="bar")
some_step = SomeStep.from_context(context)
print(some_step.foo)  # prints 'bar'

Parameters:

Name Type Description Default
context Context
required

Returns:

Type Description
BaseModel
Source code in src/koheesio/models/__init__.py
@classmethod
def from_context(cls, context: Context) -> BaseModel:
    """Creates BaseModel instance from a given Context

    You have to make sure that the Context object has the necessary attributes to create the model.

    Examples
    --------
    ```python
    class SomeStep(BaseModel):
        foo: str


    context = Context(foo="bar")
    some_step = SomeStep.from_context(context)
    print(some_step.foo)  # prints 'bar'
    ```

    Parameters
    ----------
    context: Context

    Returns
    -------
    BaseModel
    """
    return cls(**context)

from_dict classmethod #

from_dict(data: Dict[str, Any]) -> BaseModel

Creates BaseModel instance from a given dictionary

Parameters:

Name Type Description Default
data Dict[str, Any]
required

Returns:

Type Description
BaseModel
Source code in src/koheesio/models/__init__.py
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> BaseModel:
    """Creates BaseModel instance from a given dictionary

    Parameters
    ----------
    data: Dict[str, Any]

    Returns
    -------
    BaseModel
    """
    return cls(**data)

from_json classmethod #

from_json(json_file_or_str: Union[str, Path]) -> BaseModel

Creates BaseModel instance from a given JSON string

BaseModel offloads the serialization and deserialization of the JSON string to Context class. Context uses jsonpickle library to serialize and deserialize the JSON string. This is done to allow for objects to be stored in the BaseModel object, which is not possible with the standard json library.

See Also

Context.from_json : Deserializes a JSON string to a Context object

Parameters:

Name Type Description Default
json_file_or_str Union[str, Path]

Pathlike string or Path that points to the json file or string containing json

required

Returns:

Type Description
BaseModel
Source code in src/koheesio/models/__init__.py
@classmethod
def from_json(cls, json_file_or_str: Union[str, Path]) -> BaseModel:
    """Creates BaseModel instance from a given JSON string

    BaseModel offloads the serialization and deserialization of the JSON string to Context class. Context uses
    jsonpickle library to serialize and deserialize the JSON string. This is done to allow for objects to be stored
    in the BaseModel object, which is not possible with the standard json library.

    See Also
    --------
    Context.from_json : Deserializes a JSON string to a Context object

    Parameters
    ----------
    json_file_or_str : Union[str, Path]
        Pathlike string or Path that points to the json file or string containing json

    Returns
    -------
    BaseModel
    """
    _context = Context.from_json(json_file_or_str)
    return cls.from_context(_context)

from_toml classmethod #

from_toml(toml_file_or_str: Union[str, Path]) -> BaseModel

Creates BaseModel object from a given toml file

Note: BaseModel offloads the serialization and deserialization of the TOML string to Context class.

Parameters:

Name Type Description Default
toml_file_or_str Union[str, Path]

Pathlike string or Path that points to the toml file, or string containing toml

required

Returns:

Type Description
BaseModel
Source code in src/koheesio/models/__init__.py
@classmethod
def from_toml(cls, toml_file_or_str: Union[str, Path]) -> BaseModel:
    """Creates BaseModel object from a given toml file

    Note: BaseModel offloads the serialization and deserialization of the TOML string to Context class.

    Parameters
    ----------
    toml_file_or_str: str or Path
        Pathlike string or Path that points to the toml file, or string containing toml

    Returns
    -------
    BaseModel
    """
    _context = Context.from_toml(toml_file_or_str)
    return cls.from_context(_context)

from_yaml classmethod #

from_yaml(yaml_file_or_str: str) -> BaseModel

Creates BaseModel object from a given yaml file

Note: BaseModel offloads the serialization and deserialization of the YAML string to Context class.

Parameters:

Name Type Description Default
yaml_file_or_str str

Pathlike string or Path that points to the yaml file, or string containing yaml

required

Returns:

Type Description
BaseModel
Source code in src/koheesio/models/__init__.py
@classmethod
def from_yaml(cls, yaml_file_or_str: str) -> BaseModel:
    """Creates BaseModel object from a given yaml file

    Note: BaseModel offloads the serialization and deserialization of the YAML string to Context class.

    Parameters
    ----------
    yaml_file_or_str: str or Path
        Pathlike string or Path that points to the yaml file, or string containing yaml

    Returns
    -------
    BaseModel
    """
    _context = Context.from_yaml(yaml_file_or_str)
    return cls.from_context(_context)

get #

get(key: str, default: Optional[Any] = None)

Get an attribute of the model, but don't fail if not present

Similar to dict.get()

Examples:

step_output = StepOutput(foo="bar")
step_output.get("foo")  # returns 'bar'
step_output.get("non_existent_key", "oops")  # returns 'oops'

Parameters:

Name Type Description Default
key str

name of the key to get

required
default Optional[Any]

Default value in case the attribute does not exist

None

Returns:

Type Description
Any

The value of the attribute

Source code in src/koheesio/models/__init__.py
def get(self, key: str, default: Optional[Any] = None):
    """Get an attribute of the model, but don't fail if not present

    Similar to dict.get()

    Examples
    --------
    ```python
    step_output = StepOutput(foo="bar")
    step_output.get("foo")  # returns 'bar'
    step_output.get("non_existent_key", "oops")  # returns 'oops'
    ```

    Parameters
    ----------
    key: str
        name of the key to get
    default: Optional[Any]
        Default value in case the attribute does not exist

    Returns
    -------
    Any
        The value of the attribute
    """
    if self.hasattr(key):
        return self.__getitem__(key)
    return default

hasattr #

hasattr(key: str) -> bool

Check if given key is present in the model

Parameters:

Name Type Description Default
key str
required

Returns:

Type Description
bool
Source code in src/koheesio/models/__init__.py
def hasattr(self, key: str) -> bool:
    """Check if given key is present in the model

    Parameters
    ----------
    key: str

    Returns
    -------
    bool
    """
    return hasattr(self, key)

lazy classmethod #

lazy()

Constructs the model without doing validation

Essentially an alias to BaseModel.construct()

Source code in src/koheesio/models/__init__.py
@classmethod
def lazy(cls):
    """Constructs the model without doing validation

    Essentially an alias to BaseModel.construct()
    """
    return cls.model_construct()

merge #

merge(other: Union[Dict, BaseModel])

Merge key,value map with self

Functionally similar to adding two dicts together; like running {**dict_a, **dict_b}.

Examples:

step_output = StepOutput(foo="bar")
step_output.merge(
    {"lorem": "ipsum"}
)  # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}

Parameters:

Name Type Description Default
other Union[Dict, BaseModel]

Dict or another instance of a BaseModel class that will be added to self

required
Source code in src/koheesio/models/__init__.py
def merge(self, other: Union[Dict, BaseModel]):
    """Merge key,value map with self

    Functionally similar to adding two dicts together; like running `{**dict_a, **dict_b}`.

    Examples
    --------
    ```python
    step_output = StepOutput(foo="bar")
    step_output.merge(
        {"lorem": "ipsum"}
    )  # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'}
    ```

    Parameters
    ----------
    other: Union[Dict, BaseModel]
        Dict or another instance of a BaseModel class that will be added to self
    """
    if isinstance(other, BaseModel):
        other = other.model_dump()  # ensures we really have a dict

    for k, v in other.items():
        self.set(k, v)

    return self

set #

set(key: str, value: Any)

Allows for subscribing / assigning to class[key].

Examples:

step_output = StepOutput(foo="bar")
step_output.set(foo", "baz")  # overwrites 'foo' to be 'baz'

Parameters:

Name Type Description Default
key str

The key of the attribute to assign to

required
value Any

Value that should be assigned to the given key

required
Source code in src/koheesio/models/__init__.py
def set(self, key: str, value: Any):
    """Allows for subscribing / assigning to `class[key]`.

    Examples
    --------
    ```python
    step_output = StepOutput(foo="bar")
    step_output.set(foo", "baz")  # overwrites 'foo' to be 'baz'
    ```

    Parameters
    ----------
    key: str
        The key of the attribute to assign to
    value: Any
        Value that should be assigned to the given key
    """
    self.__setitem__(key, value)

to_context #

to_context() -> Context

Converts the BaseModel instance to a Context object

Returns:

Type Description
Context
Source code in src/koheesio/models/__init__.py
def to_context(self) -> Context:
    """Converts the BaseModel instance to a Context object

    Returns
    -------
    Context
    """
    return Context(**self.to_dict())

to_dict #

to_dict() -> Dict[str, Any]

Converts the BaseModel instance to a dictionary

Returns:

Type Description
Dict[str, Any]
Source code in src/koheesio/models/__init__.py
def to_dict(self) -> Dict[str, Any]:
    """Converts the BaseModel instance to a dictionary

    Returns
    -------
    Dict[str, Any]
    """
    return self.model_dump()

to_json #

to_json(pretty: bool = False)

Converts the BaseModel instance to a JSON string

BaseModel offloads the serialization and deserialization of the JSON string to Context class. Context uses jsonpickle library to serialize and deserialize the JSON string. This is done to allow for objects to be stored in the BaseModel object, which is not possible with the standard json library.

See Also

Context.to_json : Serializes a Context object to a JSON string

Parameters:

Name Type Description Default
pretty bool

Toggles whether to return a pretty json string or not

False

Returns:

Type Description
str

containing all parameters of the BaseModel instance

Source code in src/koheesio/models/__init__.py
def to_json(self, pretty: bool = False):
    """Converts the BaseModel instance to a JSON string

    BaseModel offloads the serialization and deserialization of the JSON string to Context class. Context uses
    jsonpickle library to serialize and deserialize the JSON string. This is done to allow for objects to be stored
    in the BaseModel object, which is not possible with the standard json library.

    See Also
    --------
    Context.to_json : Serializes a Context object to a JSON string

    Parameters
    ----------
    pretty : bool, optional, default=False
        Toggles whether to return a pretty json string or not

    Returns
    -------
    str
        containing all parameters of the BaseModel instance
    """
    _context = self.to_context()
    return _context.to_json(pretty=pretty)

to_yaml #

to_yaml(clean: bool = False) -> str

Converts the BaseModel instance to a YAML string

BaseModel offloads the serialization and deserialization of the YAML string to Context class.

Parameters:

Name Type Description Default
clean bool

Toggles whether to remove !!python/object:... from yaml or not. Default: False

False

Returns:

Type Description
str

containing all parameters of the BaseModel instance

Source code in src/koheesio/models/__init__.py
def to_yaml(self, clean: bool = False) -> str:
    """Converts the BaseModel instance to a YAML string

    BaseModel offloads the serialization and deserialization of the YAML string to Context class.

    Parameters
    ----------
    clean: bool
        Toggles whether to remove `!!python/object:...` from yaml or not.
        Default: False

    Returns
    -------
    str
        containing all parameters of the BaseModel instance
    """
    _context = self.to_context()
    return _context.to_yaml(clean=clean)

validate #

validate() -> BaseModel

Validate the BaseModel instance

This method is used to validate the BaseModel instance. It is used in conjunction with the lazy method to validate the instance after all the attributes have been set.

This method is intended to be used with the lazy method. The lazy method is used to create an instance of the BaseModel without immediate validation. The validate method is then used to validate the instance after.

Note: in the Pydantic BaseModel, the validate method throws a deprecated warning. This is because Pydantic recommends using the validate_model method instead. However, we are using the validate method here in a different context and a slightly different way.

Examples:

class FooModel(BaseModel):
    foo: str
    lorem: str


foo_model = FooModel.lazy()
foo_model.foo = "bar"
foo_model.lorem = "ipsum"
foo_model.validate()
In this example, the foo_model instance is created without immediate validation. The attributes foo and lorem are set afterward. The validate method is then called to validate the instance.

Returns:

Type Description
BaseModel

The BaseModel instance

Source code in src/koheesio/models/__init__.py
def validate(self) -> BaseModel:
    """Validate the BaseModel instance

    This method is used to validate the BaseModel instance. It is used in conjunction with the lazy method to
    validate the instance after all the attributes have been set.

    This method is intended to be used with the `lazy` method. The `lazy` method is used to create an instance of
    the BaseModel without immediate validation. The `validate` method is then used to validate the instance after.

    > Note: in the Pydantic BaseModel, the `validate` method throws a deprecated warning. This is because Pydantic
    recommends using the `validate_model` method instead. However, we are using the `validate` method here in a
    different context and a slightly different way.

    Examples
    --------
    ```python
    class FooModel(BaseModel):
        foo: str
        lorem: str


    foo_model = FooModel.lazy()
    foo_model.foo = "bar"
    foo_model.lorem = "ipsum"
    foo_model.validate()
    ```
    In this example, the `foo_model` instance is created without immediate validation. The attributes foo and lorem
    are set afterward. The `validate` method is then called to validate the instance.

    Returns
    -------
    BaseModel
        The BaseModel instance
    """
    return self.model_validate(self.model_dump())

koheesio.Context #

Context(*args, **kwargs)

The Context class is a key component of the Koheesio framework, designed to manage configuration data and shared variables across tasks and steps in your application. It behaves much like a dictionary, but with added functionalities.

Key Features
  • Nested keys: Supports accessing and adding nested keys similar to dictionary keys.
  • Recursive merging: Merges two Contexts together, with the incoming Context having priority.
  • Serialization/Deserialization: Easily created from a yaml, toml, or json file, or a dictionary, and can be converted back to a dictionary.
  • Handling complex Python objects: Uses jsonpickle for serialization and deserialization of complex Python objects to and from JSON.

For a comprehensive guide on the usage, examples, and additional features of the Context class, please refer to the reference/concepts/context section of the Koheesio documentation.

Methods:

Name Description
add

Add a key/value pair to the context.

get

Get value of a given key.

get_item

Acts just like .get, except that it returns the key also.

contains

Check if the context contains a given key.

merge

Merge this context with the context of another, where the incoming context has priority.

to_dict

Returns all parameters of the context as a dict.

from_dict

Creates Context object from the given dict.

from_yaml

Creates Context object from a given yaml file.

from_json

Creates Context object from a given json file.

Dunder methods
  • __iter__(): Allows for iteration across a Context.
  • __len__(): Returns the length of the Context.
  • __getitem__(item): Makes class subscriptable.
Inherited from Mapping
  • items(): Returns all items of the Context.
  • keys(): Returns all keys of the Context.
  • values(): Returns all values of the Context.
Source code in src/koheesio/context.py
def __init__(self, *args, **kwargs):
    """Initializes the Context object with given arguments."""
    for arg in args:
        if isinstance(arg, dict):
            kwargs.update(arg)
        if isinstance(arg, Context):
            kwargs = kwargs.update(arg.to_dict())

    for key, value in kwargs.items():
        self.__dict__[key] = self.process_value(value)

add #

add(key: str, value: Any) -> Context

Add a key/value pair to the context

Source code in src/koheesio/context.py
def add(self, key: str, value: Any) -> Context:
    """Add a key/value pair to the context"""
    self.__dict__[key] = value
    return self

contains #

contains(key: str) -> bool

Check if the context contains a given key

Parameters:

Name Type Description Default
key str
required

Returns:

Type Description
bool
Source code in src/koheesio/context.py
def contains(self, key: str) -> bool:
    """Check if the context contains a given key

    Parameters
    ----------
    key: str

    Returns
    -------
    bool
    """
    try:
        self.get(key, safe=False)
        return True
    except KeyError:
        return False

from_dict classmethod #

from_dict(kwargs: dict) -> Context

Creates Context object from the given dict

Parameters:

Name Type Description Default
kwargs dict
required

Returns:

Type Description
Context
Source code in src/koheesio/context.py
@classmethod
def from_dict(cls, kwargs: dict) -> Context:
    """Creates Context object from the given dict

    Parameters
    ----------
    kwargs: dict

    Returns
    -------
    Context
    """
    return cls(kwargs)

from_json classmethod #

from_json(json_file_or_str: Union[str, Path]) -> Context

Creates Context object from a given json file

Note: jsonpickle is used to serialize/deserialize the Context object. This is done to allow for objects to be stored in the Context object, which is not possible with the standard json library.

Why jsonpickle?

(from https://jsonpickle.github.io/)

Data serialized with python’s pickle (or cPickle or dill) is not easily readable outside of python. Using the json format, jsonpickle allows simple data types to be stored in a human-readable format, and more complex data types such as numpy arrays and pandas dataframes, to be machine-readable on any platform that supports json.

Security

(from https://jsonpickle.github.io/)

jsonpickle should be treated the same as the Python stdlib pickle module from a security perspective.

! Warning !#

The jsonpickle module is not secure. Only unpickle data you trust. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling. Never unpickle data that could have come from an untrusted source, or that could have been tampered with. Consider signing data with an HMAC if you need to ensure that it has not been tampered with. Safer deserialization approaches, such as reading JSON directly, may be more appropriate if you are processing untrusted data.

Parameters:

Name Type Description Default
json_file_or_str Union[str, Path]

Pathlike string or Path that points to the json file or string containing json

required

Returns:

Type Description
Context
Source code in src/koheesio/context.py
@classmethod
def from_json(cls, json_file_or_str: Union[str, Path]) -> Context:
    """Creates Context object from a given json file

    Note: jsonpickle is used to serialize/deserialize the Context object. This is done to allow for objects to be
    stored in the Context object, which is not possible with the standard json library.

    Why jsonpickle?
    ---------------
    (from https://jsonpickle.github.io/)

    > Data serialized with python’s pickle (or cPickle or dill) is not easily readable outside of python. Using the
    json format, jsonpickle allows simple data types to be stored in a human-readable format, and more complex
    data types such as numpy arrays and pandas dataframes, to be machine-readable on any platform that supports
    json.

    Security
    --------
    (from https://jsonpickle.github.io/)

    > jsonpickle should be treated the same as the Python stdlib pickle module from a security perspective.

    ### ! Warning !
    > The jsonpickle module is not secure. Only unpickle data you trust.
    It is possible to construct malicious pickle data which will execute arbitrary code during unpickling.
    Never unpickle data that could have come from an untrusted source, or that could have been tampered with.
    Consider signing data with an HMAC if you need to ensure that it has not been tampered with.
    Safer deserialization approaches, such as reading JSON directly, may be more appropriate if you are processing
    untrusted data.

    Parameters
    ----------
    json_file_or_str : Union[str, Path]
        Pathlike string or Path that points to the json file or string containing json

    Returns
    -------
    Context
    """
    json_str = json_file_or_str

    # check if json_str is pathlike
    if (json_file := Path(json_file_or_str)).exists():
        json_str = json_file.read_text(encoding="utf-8")

    json_dict = jsonpickle.loads(json_str)
    return cls.from_dict(json_dict)

from_toml classmethod #

from_toml(toml_file_or_str: Union[str, Path]) -> Context

Creates Context object from a given toml file

Parameters:

Name Type Description Default
toml_file_or_str Union[str, Path]

Pathlike string or Path that points to the toml file or string containing toml

required

Returns:

Type Description
Context
Source code in src/koheesio/context.py
@classmethod
def from_toml(cls, toml_file_or_str: Union[str, Path]) -> Context:
    """Creates Context object from a given toml file

    Parameters
    ----------
    toml_file_or_str: Union[str, Path]
        Pathlike string or Path that points to the toml file or string containing toml

    Returns
    -------
    Context
    """
    toml_str = toml_file_or_str

    # check if toml_str is pathlike
    if (toml_file := Path(toml_file_or_str)).exists():
        toml_str = toml_file.read_text(encoding="utf-8")

    toml_dict = tomli.loads(toml_str)
    return cls.from_dict(toml_dict)

from_yaml classmethod #

from_yaml(yaml_file_or_str: str) -> Context

Creates Context object from a given yaml file

Parameters:

Name Type Description Default
yaml_file_or_str str

Pathlike string or Path that points to the yaml file, or string containing yaml

required

Returns:

Type Description
Context
Source code in src/koheesio/context.py
@classmethod
def from_yaml(cls, yaml_file_or_str: str) -> Context:
    """Creates Context object from a given yaml file

    Parameters
    ----------
    yaml_file_or_str: str or Path
        Pathlike string or Path that points to the yaml file, or string containing yaml

    Returns
    -------
    Context
    """
    yaml_str = yaml_file_or_str

    # check if yaml_str is pathlike
    if (yaml_file := Path(yaml_file_or_str)).exists():
        yaml_str = yaml_file.read_text(encoding="utf-8")

    # Bandit: disable yaml.load warning
    yaml_dict = yaml.load(yaml_str, Loader=yaml.Loader)  # nosec B506: yaml_load

    return cls.from_dict(yaml_dict)

get #

get(
    key: str, default: Any = None, safe: bool = True
) -> Any

Get value of a given key

The key can either be an actual key (top level) or the key of a nested value. Behaves a lot like a dict's .get() method otherwise.

Parameters:

Name Type Description Default
key str

Can be a real key, or can be a dotted notation of a nested key

required
default Any

Default value to return

None
safe bool

Toggles whether to fail or not when item cannot be found

True

Returns:

Type Description
Any

Value of the requested item

Example

Example of a nested call:

context = Context({"a": {"b": "c", "d": "e"}, "f": "g"})
context.get("a.b")

Returns c

Source code in src/koheesio/context.py
def get(self, key: str, default: Any = None, safe: bool = True) -> Any:
    """Get value of a given key

    The key can either be an actual key (top level) or the key of a nested value.
    Behaves a lot like a dict's `.get()` method otherwise.

    Parameters
    ----------
    key:
        Can be a real key, or can be a dotted notation of a nested key
    default:
        Default value to return
    safe:
        Toggles whether to fail or not when item cannot be found

    Returns
    -------
    Any
        Value of the requested item

    Example
    -------
    Example of a nested call:

    ```python
    context = Context({"a": {"b": "c", "d": "e"}, "f": "g"})
    context.get("a.b")
    ```

    Returns `c`
    """
    try:
        if "." not in key:
            return self.__dict__[key]

        # handle nested keys
        nested_keys = key.split(".")
        value = self  # parent object
        for k in nested_keys:
            value = value[k]  # iterate through nested values
        return value

    except (AttributeError, KeyError, TypeError) as e:
        if not safe:
            raise KeyError(f"requested key '{key}' does not exist in {self}") from e
        return default

get_all #

get_all() -> dict

alias to to_dict()

Source code in src/koheesio/context.py
def get_all(self) -> dict:
    """alias to to_dict()"""
    return self.to_dict()

get_item #

get_item(
    key: str, default: Any = None, safe: bool = True
) -> Dict[str, Any]

Acts just like .get, except that it returns the key also

Returns:

Type Description
Dict[str, Any]

key/value-pair of the requested item

Example

Example of a nested call:

context = Context({"a": {"b": "c", "d": "e"}, "f": "g"})
context.get_item("a.b")

Returns {'a.b': 'c'}

Source code in src/koheesio/context.py
def get_item(self, key: str, default: Any = None, safe: bool = True) -> Dict[str, Any]:
    """Acts just like `.get`, except that it returns the key also

    Returns
    -------
    Dict[str, Any]
        key/value-pair of the requested item

    Example
    -------
    Example of a nested call:

    ```python
    context = Context({"a": {"b": "c", "d": "e"}, "f": "g"})
    context.get_item("a.b")
    ```

    Returns `{'a.b': 'c'}`
    """
    value = self.get(key, default, safe)
    return {key: value}

merge #

merge(context: Context, recursive: bool = False) -> Context

Merge this context with the context of another, where the incoming context has priority.

Parameters:

Name Type Description Default
context Context

Another Context class

required
recursive bool

Recursively merge two dictionaries to an arbitrary depth

False

Returns:

Type Description
Context

updated context

Source code in src/koheesio/context.py
def merge(self, context: Context, recursive: bool = False) -> Context:
    """Merge this context with the context of another, where the incoming context has priority.

    Parameters
    ----------
    context: Context
        Another Context class
    recursive: bool
        Recursively merge two dictionaries to an arbitrary depth

    Returns
    -------
    Context
        updated context
    """
    if recursive:
        return Context.from_dict(self._recursive_merge(target_context=self, merge_context=context).to_dict())

    # just merge on the top level keys
    return Context.from_dict({**self.to_dict(), **context.to_dict()})

process_value #

process_value(value: Any) -> Any

Processes the given value, converting dictionaries to Context objects as needed.

Source code in src/koheesio/context.py
def process_value(self, value: Any) -> Any:
    """Processes the given value, converting dictionaries to Context objects as needed."""
    if isinstance(value, dict):
        return self.from_dict(value)

    if isinstance(value, (list, set)):
        return [self.from_dict(v) if isinstance(v, dict) else v for v in value]

    return value

to_dict #

to_dict() -> Dict[str, Any]

Returns all parameters of the context as a dict

Returns:

Type Description
dict

containing all parameters of the context

Source code in src/koheesio/context.py
def to_dict(self) -> Dict[str, Any]:
    """Returns all parameters of the context as a dict

    Returns
    -------
    dict
        containing all parameters of the context
    """
    result = {}

    for key, value in self.__dict__.items():
        if isinstance(value, Context):
            result[key] = value.to_dict()
        elif isinstance(value, list):
            result[key] = [e.to_dict() if isinstance(e, Context) else e for e in value]
        else:
            result[key] = value

    return result

to_json #

to_json(pretty: bool = False) -> str

Returns all parameters of the context as a json string

Note: jsonpickle is used to serialize/deserialize the Context object. This is done to allow for objects to be stored in the Context object, which is not possible with the standard json library.

Why jsonpickle?

(from https://jsonpickle.github.io/)

Data serialized with python's pickle (or cPickle or dill) is not easily readable outside of python. Using the json format, jsonpickle allows simple data types to be stored in a human-readable format, and more complex data types such as numpy arrays and pandas dataframes, to be machine-readable on any platform that supports json.

Parameters:

Name Type Description Default
pretty bool

Toggles whether to return a pretty json string or not

False

Returns:

Type Description
str

containing all parameters of the context

Source code in src/koheesio/context.py
def to_json(self, pretty: bool = False) -> str:
    """Returns all parameters of the context as a json string

    Note: jsonpickle is used to serialize/deserialize the Context object. This is done to allow for objects to be
    stored in the Context object, which is not possible with the standard json library.

    Why jsonpickle?
    ---------------
    (from https://jsonpickle.github.io/)

    > Data serialized with python's pickle (or cPickle or dill) is not easily readable outside of python. Using the
    json format, jsonpickle allows simple data types to be stored in a human-readable format, and more complex
    data types such as numpy arrays and pandas dataframes, to be machine-readable on any platform that supports
    json.

    Parameters
    ----------
    pretty : bool, optional, default=False
        Toggles whether to return a pretty json string or not

    Returns
    -------
    str
        containing all parameters of the context
    """
    d = self.to_dict()
    return jsonpickle.dumps(d, indent=4) if pretty else jsonpickle.dumps(d)

to_yaml #

to_yaml(clean: bool = False) -> str

Returns all parameters of the context as a yaml string

Parameters:

Name Type Description Default
clean bool

Toggles whether to remove !!python/object:... from yaml or not. Default: False

False

Returns:

Type Description
str

containing all parameters of the context

Source code in src/koheesio/context.py
def to_yaml(self, clean: bool = False) -> str:
    """Returns all parameters of the context as a yaml string

    Parameters
    ----------
    clean: bool
        Toggles whether to remove `!!python/object:...` from yaml or not.
        Default: False

    Returns
    -------
    str
        containing all parameters of the context
    """
    # sort_keys=False to preserve order of keys
    yaml_str = yaml.dump(self.to_dict(), sort_keys=False)

    # remove `!!python/object:...` from yaml
    if clean:
        remove_pattern = re.compile(r"!!python/object:.*?\n")
        yaml_str = re.sub(remove_pattern, "\n", yaml_str)

    return yaml_str

koheesio.ExtraParamsMixin #

Mixin class that adds support for arbitrary keyword arguments to Pydantic models.

The keyword arguments are extracted from the model's values and moved to a params dictionary.

extra_params cached property #

extra_params: Dict[str, Any]

Extract params (passed as arbitrary kwargs) from values and move them to params dict

params class-attribute instance-attribute #

params: Dict[str, Any] = Field(default_factory=dict)

koheesio.LoggingFactory #

LoggingFactory(
    name: Optional[str] = None,
    env: Optional[str] = None,
    level: Optional[str] = None,
    logger_id: Optional[str] = None,
)

Logging factory to be used to generate logger instances.

Parameters:

Name Type Description Default
name Optional[str]
None
env Optional[str]
None
logger_id Optional[str]
None
Source code in src/koheesio/logger.py
def __init__(
    self,
    name: Optional[str] = None,
    env: Optional[str] = None,
    level: Optional[str] = None,
    logger_id: Optional[str] = None,
):
    """Logging factory to be used in pipeline.Prepare logger instance.

    Parameters
    ----------
    name logger name.
    env environment ("local", "qa", "prod).
    logger_id unique identifier for the logger.
    """

    LoggingFactory.LOGGER_NAME = name or LoggingFactory.LOGGER_NAME
    LoggerIDFilter.LOGGER_ID = logger_id or LoggerIDFilter.LOGGER_ID
    LoggingFactory.LOGGER_FILTER = LoggingFactory.LOGGER_FILTER or LoggerIDFilter()
    LoggingFactory.ENV = env or LoggingFactory.ENV

    console_handler = logging.StreamHandler(sys.stdout if LoggingFactory.ENV == "local" else sys.stderr)
    console_handler.setFormatter(LoggingFactory.LOGGER_FORMATTER)
    console_handler.addFilter(LoggingFactory.LOGGER_FILTER)
    # WARNING is default level for root logger in python
    logging.basicConfig(level=logging.WARNING, handlers=[console_handler], force=True)

    LoggingFactory.CONSOLE_HANDLER = console_handler

    logger = getLogger(LoggingFactory.LOGGER_NAME)
    logger.setLevel(level or LoggingFactory.LOGGER_LEVEL)
    LoggingFactory.LOGGER = logger

CONSOLE_HANDLER class-attribute instance-attribute #

CONSOLE_HANDLER: Optional[Handler] = None

ENV class-attribute instance-attribute #

ENV: Optional[str] = None

LOGGER class-attribute instance-attribute #

LOGGER: Optional[Logger] = None

LOGGER_ENV class-attribute instance-attribute #

LOGGER_ENV: str = 'local'

LOGGER_FILTER class-attribute instance-attribute #

LOGGER_FILTER: Optional[Filter] = None

LOGGER_FORMAT class-attribute instance-attribute #

LOGGER_FORMAT: str = (
    "[%(logger_id)s] [%(asctime)s] [%(levelname)s] [%(name)s] {%(module)s.py:%(funcName)s:%(lineno)d} - %(message)s"
)

LOGGER_FORMATTER class-attribute instance-attribute #

LOGGER_FORMATTER: Formatter = Formatter(LOGGER_FORMAT)

LOGGER_LEVEL class-attribute instance-attribute #

LOGGER_LEVEL: str = get("KOHEESIO_LOGGING_LEVEL", "WARNING")

LOGGER_NAME class-attribute instance-attribute #

LOGGER_NAME: str = 'koheesio'

add_handlers staticmethod #

add_handlers(handlers: List[Tuple[str, Dict]]) -> None

Add handlers to existing root logger.

Parameters:

Name Type Description Default
handler_class
required
handlers_config
required
Source code in src/koheesio/logger.py
@staticmethod
def add_handlers(handlers: List[Tuple[str, Dict]]) -> None:
    """Add handlers to existing root logger.

    Parameters
    ----------
    handler_class handler module and class for importing.
    handlers_config configuration for handler.

    """
    for handler_module_class, handler_conf in handlers:
        handler_class: logging.Handler = import_class(handler_module_class)
        handler_level = handler_conf.pop("level") if "level" in handler_conf else "WARNING"
        # noinspection PyCallingNonCallable
        handler = handler_class(**handler_conf)
        handler.setLevel(handler_level)
        handler.addFilter(LoggingFactory.LOGGER_FILTER)
        handler.setFormatter(LoggingFactory.LOGGER_FORMATTER)
        LoggingFactory.LOGGER.addHandler(handler)

get_logger staticmethod #

get_logger(
    name: str, inherit_from_koheesio: bool = False
) -> Logger

Provide logger. If inherit_from_koheesio then inherit from LoggingFactory.PIPELINE_LOGGER_NAME.

Parameters:

Name Type Description Default
name str
required
inherit_from_koheesio bool
False

Returns:

Name Type Description
logger Logger
Source code in src/koheesio/logger.py
@staticmethod
def get_logger(name: str, inherit_from_koheesio: bool = False) -> Logger:
    """Provide logger. If inherit_from_koheesio then inherit from LoggingFactory.PIPELINE_LOGGER_NAME.

    Parameters
    ----------
    name: Name of logger.
    inherit_from_koheesio: Inherit logger from koheesio

    Returns
    -------
    logger: Logger

    """
    if inherit_from_koheesio:
        LoggingFactory.__check_koheesio_logger_initialized()
        name = f"{LoggingFactory.LOGGER_NAME}.{name}"

    return getLogger(name)

koheesio.Step #

Base class for a step

A custom unit of logic that can be executed.

The Step class is designed to be subclassed. To create a new step, one would subclass Step and implement the def execute(self) method, specifying the expected inputs and outputs.

Note: since the Step class is meta classed, the execute method is wrapped with the do_execute function making it always return the Step's output. Hence, an explicit return is not needed when implementing execute.

Methods and Attributes

The Step class has several attributes and methods.

INPUT#

The following fields are available by default on the Step class: - name: Name of the Step. If not set, the name of the class will be used. - description: Description of the Step. If not set, the docstring of the class will be used. If the docstring contains multiple lines, only the first line will be used.

When subclassing a Step, any additional pydantic field will be treated as input to the Step. See also the explanation on the .execute() method below.

OUTPUT#

Every Step has an Output class, which is a subclass of StepOutput. This class is used to validate the output of the Step. The Output class is defined as an inner class of the Step class. The Output class can be accessed through the Step.Output attribute. The Output class can be extended to add additional fields to the output of the Step. See also the explanation on the .execute().

  • Output: A nested class representing the output of the Step used to validate the output of the Step and based on the StepOutput class.
  • output: Allows you to interact with the Output of the Step lazily (see above and StepOutput)

When subclassing a Step, any additional pydantic field added to the nested Output class will be treated as output of the Step. See also the description of StepOutput for more information.

Methods:#
  • execute: Abstract method to implement for new steps.
    • The Inputs of the step can be accessed, using self.input_name.
    • The output of the step can be accessed, using self.output.output_name.
  • run: Alias to .execute() method. You can use this to run the step, but execute is preferred.
  • to_yaml: YAML dump the step
  • get_description: Get the description of the Step

When subclassing a Step, execute is the only method that needs to be implemented. Any additional method added to the class will be treated as a method of the Step.

Note: since the Step class is meta-classed, the execute method is automatically wrapped with the do_execute function making it always return a StepOutput. See also the explanation on the do_execute function.

class methods:#
  • from_step: Returns a new Step instance based on the data of another Step instance. for example: MyStep.from_step(other_step, a="foo")
  • get_description: Get the description of the Step
dunder methods:#
  • __getattr__: Allows input to be accessed through self.input_name
  • __repr__ and __str__: String representation of a step
Background

A Step is an atomic operation and serves as the building block of data pipelines built with the framework. Tasks typically consist of a series of Steps.

A step can be seen as an operation on a set of inputs, that returns a set of outputs. This however does not imply that steps are stateless (e.g. data writes)!

The diagram serves to illustrate the concept of a Step:

┌─────────┐        ┌──────────────────┐        ┌─────────┐
│ Input 1 │───────▶│                  ├───────▶│Output 1 │
└─────────┘        │                  │        └─────────┘
                   │                  │
┌─────────┐        │                  │        ┌─────────┐
│ Input 2 │───────▶│       Step       │───────▶│Output 2 │
└─────────┘        │                  │        └─────────┘
                   │                  │
┌─────────┐        │                  │        ┌─────────┐
│ Input 3 │───────▶│                  ├───────▶│Output 3 │
└─────────┘        └──────────────────┘        └─────────┘

Steps are built on top of Pydantic, which is a data validation and settings management using python type annotations. This allows for the automatic validation of the inputs and outputs of a Step.

  • Step inherits from BaseModel, which is a Pydantic class used to define data models. This allows Step to automatically validate data against the defined fields and their types.
  • Step is metaclassed by StepMetaClass, which is a custom metaclass that wraps the execute method of the Step class with the _execute_wrapper function. This ensures that the execute method always returns the output of the Step along with providing logging and validation of the output.
  • Step has an Output class, which is a subclass of StepOutput. This class is used to validate the output of the Step. The Output class is defined as an inner class of the Step class. The Output class can be accessed through the Step.Output attribute.
  • The Output class can be extended to add additional fields to the output of the Step.

Examples:

class MyStep(Step):
    a: str  # input

    class Output(StepOutput):  # output
        b: str

    def execute(self) -> MyStep.Output:
        self.output.b = f"{self.a}-some-suffix"

output property writable #

output: Output

Interact with the output of the Step

Output #

Output class for Step

execute abstractmethod #

execute()

Abstract method to implement for new steps.

The Inputs of the step can be accessed, using self.input_name

Note: since the Step class is meta-classed, the execute method is wrapped with the do_execute function making it always return the Steps output

Source code in src/koheesio/steps/__init__.py
@abstractmethod
def execute(self):
    """Abstract method to implement for new steps.

    The Inputs of the step can be accessed, using `self.input_name`

    Note: since the Step class is meta-classed, the execute method is wrapped with the `do_execute` function making
      it always return the Steps output
    """
    raise NotImplementedError

from_step classmethod #

from_step(step: Step, **kwargs)

Returns a new Step instance based on the data of another Step or BaseModel instance

Source code in src/koheesio/steps/__init__.py
@classmethod
def from_step(cls, step: Step, **kwargs):
    """Returns a new Step instance based on the data of another Step or BaseModel instance"""
    return cls.from_basemodel(step, **kwargs)

repr_json #

repr_json(simple=False) -> str

dump the step to json, meant for representation

Note: use to_json if you want to dump the step to json for serialization This method is meant for representation purposes only!

Examples:

>>> step = MyStep(a="foo")
>>> print(step.repr_json())
{"input": {"a": "foo"}}

Parameters:

Name Type Description Default
simple

When toggled to True, a briefer output will be produced. This is friendlier for logging purposes

False

Returns:

Type Description
str

A string, which is valid json

Source code in src/koheesio/steps/__init__.py
def repr_json(self, simple=False) -> str:
    """dump the step to json, meant for representation

    Note: use to_json if you want to dump the step to json for serialization
    This method is meant for representation purposes only!

    Examples
    --------
    ```python
    >>> step = MyStep(a="foo")
    >>> print(step.repr_json())
    {"input": {"a": "foo"}}
    ```

    Parameters
    ----------
    simple: bool
        When toggled to True, a briefer output will be produced. This is friendlier for logging purposes

    Returns
    -------
    str
        A string, which is valid json
    """
    model_dump_options = dict(warnings="none", exclude_unset=True)

    _result = {}

    # extract input
    _input = self.model_dump(**model_dump_options)

    # remove name and description from input and add to result if simple is not set
    name = _input.pop("name", None)
    description = _input.pop("description", None)
    if not simple:
        if name:
            _result["name"] = name
        if description:
            _result["description"] = description
    else:
        model_dump_options["exclude"] = {"name", "description"}

    # extract output
    _output = self.output.model_dump(**model_dump_options)

    # add output to result
    if _output:
        _result["output"] = _output

    # add input to result
    _result["input"] = _input

    class MyEncoder(json.JSONEncoder):
        """Custom JSON Encoder to handle non-serializable types"""

        def default(self, o: Any) -> Any:
            try:
                return super().default(o)
            except TypeError:
                return o.__class__.__name__

    # Use MyEncoder when converting the dictionary to a JSON string
    json_str = json.dumps(_result, cls=MyEncoder)

    return json_str

repr_yaml #

repr_yaml(simple=False) -> str

dump the step to yaml, meant for representation

Note: use to_yaml if you want to dump the step to yaml for serialization This method is meant for representation purposes only!

Examples:

>>> step = MyStep(a="foo")
>>> print(step.repr_yaml())
input:
  a: foo

Parameters:

Name Type Description Default
simple

When toggled to True, a briefer output will be produced. This is friendlier for logging purposes

False

Returns:

Type Description
str

A string, which is valid yaml

Source code in src/koheesio/steps/__init__.py
def repr_yaml(self, simple=False) -> str:
    """dump the step to yaml, meant for representation

    Note: use to_yaml if you want to dump the step to yaml for serialization
    This method is meant for representation purposes only!

    Examples
    --------
    ```python
    >>> step = MyStep(a="foo")
    >>> print(step.repr_yaml())
    input:
      a: foo
    ```

    Parameters
    ----------
    simple: bool
        When toggled to True, a briefer output will be produced. This is friendlier for logging purposes

    Returns
    -------
    str
        A string, which is valid yaml
    """
    json_str = self.repr_json(simple=simple)

    # Parse the JSON string back into a dictionary
    _result = json.loads(json_str)

    return yaml.dump(_result)

run #

run()

Alias to .execute()

Source code in src/koheesio/steps/__init__.py
def run(self):
    """Alias to .execute()"""
    return self.execute()

koheesio.StepOutput #

Class for the StepOutput model

Usage

Setting up the StepOutputs class is done like this:

class YourOwnOutput(StepOutput):
    a: str
    b: int

model_config class-attribute instance-attribute #

model_config = ConfigDict(
    validate_default=False, defer_build=True
)

validate_output #

validate_output() -> StepOutput

Validate the output of the Step

Essentially, this method is a wrapper around the validate method of the BaseModel class

Source code in src/koheesio/steps/__init__.py
def validate_output(self) -> StepOutput:
    """Validate the output of the Step

    Essentially, this method is a wrapper around the validate method of the BaseModel class
    """
    validated_model = self.validate()
    return StepOutput.from_basemodel(validated_model)
print_logo()
Source code in src/koheesio/__init__.py
def print_logo():
    global _logo_printed
    global _koheesio_print_logo

    if not _logo_printed and _koheesio_print_logo:
        print(ABOUT)
        _logo_printed = True