Skip to content

Attributes

brickflow.context.context.BRANCH_SKIP_EXCEPT = 'branch_skip_except' module-attribute

brickflow.context.context.RETURN_VALUE_KEY = 'return_value' module-attribute

brickflow.context.context.SKIP_EXCEPT_HACK = 'brickflow_hack_skip_all' module-attribute

brickflow.context.context.T = TypeVar('T') module-attribute

brickflow.context.context.ctx = Context() module-attribute

Classes

brickflow.context.context.BrickflowBuiltInTaskVariables

Bases: Enum

Attributes

job_id = 'brickflow_job_id' class-attribute instance-attribute

parent_run_id = 'brickflow_parent_run_id' class-attribute instance-attribute

run_id = 'brickflow_run_id' class-attribute instance-attribute

start_date = 'brickflow_start_date' class-attribute instance-attribute

start_time = 'brickflow_start_time' class-attribute instance-attribute

task_key = 'brickflow_task_key' class-attribute instance-attribute

task_retry_count = 'brickflow_task_retry_count' class-attribute instance-attribute

brickflow.context.context.BrickflowInternalVariables

Bases: Enum

Attributes

env = BrickflowEnvVars.BRICKFLOW_ENV.value.lower() class-attribute instance-attribute

only_run_tasks = 'brickflow_internal_only_run_tasks' class-attribute instance-attribute

task_id = 'brickflow_internal_task_name' class-attribute instance-attribute

workflow_id = 'brickflow_internal_workflow_name' class-attribute instance-attribute

workflow_prefix = 'brickflow_internal_workflow_prefix' class-attribute instance-attribute

workflow_suffix = 'brickflow_internal_workflow_suffix' class-attribute instance-attribute

brickflow.context.context.BrickflowTaskComs dataclass

Attributes

dbutils: Optional[Any] = None class-attribute instance-attribute

storage: Dict[str, Any] = field(init=False, default_factory=lambda : {}) class-attribute instance-attribute

Functions

get(task_id: str, key: Optional[str] = None) -> Any

Source code in brickflow/context/context.py
def get(self, task_id: str, key: Optional[str] = None) -> Any:
    if key is None:
        return BrickflowTaskComsDict(task_id=task_id, task_coms=self)
    if self.dbutils is not None:
        encoded_value = self.dbutils.jobs.taskValues.get(
            key=key, taskKey=task_id, debugValue="debug"
        )
        return BrickflowTaskComsObject.from_encoded_value(encoded_value).value
    else:
        # TODO: logging using local task coms
        encoded_value = self.storage[self._key(task_id, key)]
        return BrickflowTaskComsObject.from_encoded_value(encoded_value).value

put(task_id: str, key: str, value: Any) -> None

Source code in brickflow/context/context.py
def put(self, task_id: str, key: str, value: Any) -> None:
    encoded_value = BrickflowTaskComsObject(value).to_encoded_value
    if self.dbutils is not None:
        self.dbutils.jobs.taskValues.set(key, encoded_value)
    else:
        # TODO: logging using local task coms
        self.storage[self._key(task_id, key)] = encoded_value

brickflow.context.context.BrickflowTaskComsDict dataclass

Attributes

task_coms: BrickflowTaskComs instance-attribute

task_id: str instance-attribute

brickflow.context.context.BrickflowTaskComsObject dataclass

Attributes

to_encoded_value: str property

value: Any property

Functions

from_encoded_value(encoded_value: Union[str, bytes]) -> BrickflowTaskComsObject classmethod

Source code in brickflow/context/context.py
@classmethod
def from_encoded_value(
    cls, encoded_value: Union[str, bytes]
) -> "BrickflowTaskComsObject":
    try:
        _encoded_value = (
            encoded_value
            if isinstance(encoded_value, bytes)
            else encoded_value.encode("utf-8")
        )
        b64_bytes = base64.b64decode(_encoded_value)
        return cls(pickle.loads(b64_bytes).value)
    except binascii.Error:
        _decoded_value = (
            encoded_value.decode("utf-8")
            if isinstance(encoded_value, bytes)
            else encoded_value
        )
        return cls(_decoded_value)

brickflow.context.context.Context()

Source code in brickflow/context/context.py
def __init__(self) -> None:
    # Order of init matters todo: fix this

    self._dbutils: Optional[Any] = None
    self._spark: Optional[Any] = None
    self._task_coms: BrickflowTaskComs
    self._current_task: Optional[str] = None
    self._configure()
    self._current_project: Optional[str] = None

Attributes

current_project: Optional[str] property

current_task: Optional[str] property

dbutils: DBUtils property

env: str property

log: logging.Logger property

spark: SparkSession property

task_coms: BrickflowTaskComs property

Functions

dbutils_widget_get_or_else(key: str, debug: Optional[str]) -> Optional[str]

Source code in brickflow/context/context.py
@deprecated
def dbutils_widget_get_or_else(
    self, key: str, debug: Optional[str]
) -> Optional[str]:
    try:
        return self.dbutils.widgets.get(key)
    except Exception:
        # todo: log error
        return debug

get_by_env(purpose: str, *, default: Optional[T] = None, local: Optional[T] = None, dev: Optional[T] = None, non_prod: Optional[T] = None, test: Optional[T] = None, qa: Optional[T] = None, prod: Optional[T] = None, uat: Optional[T] = None, **kwargs: Optional[T]) -> Optional[T]

Source code in brickflow/context/context.py
def get_by_env(
    self,
    purpose: str,
    *,
    default: Optional[T] = None,
    local: Optional[T] = None,
    dev: Optional[T] = None,
    non_prod: Optional[T] = None,
    test: Optional[T] = None,
    qa: Optional[T] = None,
    prod: Optional[T] = None,
    uat: Optional[T] = None,
    **kwargs: Optional[T],
) -> Optional[T]:
    # deep copy without modifying kwargs
    def add_if_not_none(
        _d: Dict[str, Optional[T]], _k: str, _v: Optional[T]
    ) -> None:
        if _v is None:
            return
        _d[_k] = _v

    _dict = copy.deepcopy(kwargs)
    add_if_not_none(_dict, "local", local)
    add_if_not_none(_dict, "non_prod", non_prod)
    add_if_not_none(_dict, "dev", dev)
    add_if_not_none(_dict, "test", test)
    add_if_not_none(_dict, "qa", qa)
    add_if_not_none(_dict, "prod", prod)
    add_if_not_none(_dict, "uat", uat)
    _env = self.env
    _ilog.info("Configuring: %s; Using env: '%s' to fetch value...", purpose, _env)
    if _env not in _dict and default is None:
        raise KeyError(
            f"Configuring: {purpose}; Unable to find environment key: {_env}, "
            f"only found env definitions: {list(_dict.keys())}"
        )
    if _env not in _dict and default is not None:
        _ilog.info(
            "Configuring: %s; Found no value configured with env: '%s' using default value...",
            purpose,
            _env,
        )
    res = _dict.get(_env, default)
    return res

get_parameter(key: str, debug: Optional[str] = None) -> Optional[str]

Source code in brickflow/context/context.py
def get_parameter(self, key: str, debug: Optional[str] = None) -> Optional[str]:
    try:
        return self.dbutils.widgets.get(key)
    except Exception:
        # todo: log error
        _ilog.debug("Unable to get parameter: %s from dbutils", key)
        return debug

get_return_value(task_key: Union[str, Callable]) -> Any

Source code in brickflow/context/context.py
def get_return_value(self, task_key: Union[str, Callable]) -> Any:
    task_key = task_key.__name__ if callable(task_key) else task_key
    return self.task_coms.get(task_key, RETURN_VALUE_KEY)

is_local() -> bool

Source code in brickflow/context/context.py
def is_local(self) -> bool:
    return self.env == BrickflowDefaultEnvs.LOCAL.value

job_id(*, debug: Optional[str] = None) -> Any

This function fetches the job_id value using the bind_variable decorator. The implementation is intentionally empty because the decorator handles the logic.

Source code in brickflow/context/context.py
@bind_variable(BrickflowBuiltInTaskVariables.job_id)
def job_id(self, *, debug: Optional[str] = None) -> Any:
    """
    This function fetches the job_id value using the bind_variable decorator.
    The implementation is intentionally empty because the decorator handles the logic.
    """
    pass

parent_run_id(*, debug: Optional[str] = None) -> Any

This function fetches the parent_run_id value using the bind_variable decorator. The implementation is intentionally empty because the decorator handles the logic.

Source code in brickflow/context/context.py
@bind_variable(BrickflowBuiltInTaskVariables.parent_run_id)
def parent_run_id(self, *, debug: Optional[str] = None) -> Any:
    """
    This function fetches the parent_run_id value using the bind_variable decorator.
    The implementation is intentionally empty because the decorator handles the logic.
    """
    pass

run_id(*, debug: Optional[str] = None) -> Any

This function fetches the run_id value using the bind_variable decorator. The implementation is intentionally empty because the decorator handles the logic.

Source code in brickflow/context/context.py
@bind_variable(BrickflowBuiltInTaskVariables.run_id)
def run_id(self, *, debug: Optional[str] = None) -> Any:
    """
    This function fetches the run_id value using the bind_variable decorator.
    The implementation is intentionally empty because the decorator handles the logic.
    """
    pass

set_current_project(project: str) -> None

Source code in brickflow/context/context.py
def set_current_project(self, project: str) -> None:
    # TODO: not a public api move to internal context or deployment context
    self._ensure_valid_project(project)
    self._current_project = project

skip_all_except(branch_task: Union[Callable, str]) -> None

Source code in brickflow/context/context.py
def skip_all_except(self, branch_task: Union[Callable, str]) -> None:
    if self._current_task is None:
        raise RuntimeError("Current task is empty unable to skip...")
    branch_task_key = (
        branch_task.__name__
        if callable(branch_task) and hasattr(branch_task, "__name__") is True
        else branch_task
    )
    self._task_coms.put(self._current_task, BRANCH_SKIP_EXCEPT, branch_task_key)

skip_all_following() -> None

Source code in brickflow/context/context.py
def skip_all_following(self) -> None:
    if self._current_task is None:
        raise RuntimeError("Current task is empty unable to skip...")
    self._task_coms.put(self._current_task, BRANCH_SKIP_EXCEPT, SKIP_EXCEPT_HACK)

start_date(*, debug: Optional[str] = None) -> Any

This function fetches the start_date value using the bind_variable decorator. The implementation is intentionally empty because the decorator handles the logic.

Source code in brickflow/context/context.py
@bind_variable(BrickflowBuiltInTaskVariables.start_date)
def start_date(self, *, debug: Optional[str] = None) -> Any:
    """
    This function fetches the start_date value using the bind_variable decorator.
    The implementation is intentionally empty because the decorator handles the logic.
    """
    pass

start_time(*, debug: Optional[str] = None) -> Any

This function fetches the start_time value using the bind_variable decorator. The implementation is intentionally empty because the decorator handles the logic.

Source code in brickflow/context/context.py
@bind_variable(BrickflowBuiltInTaskVariables.start_time)
def start_time(self, *, debug: Optional[str] = None) -> Any:
    """
    This function fetches the start_time value using the bind_variable decorator.
    The implementation is intentionally empty because the decorator handles the logic.
    """
    pass

task_key(*, debug: Optional[str] = None) -> Any

This function fetches the task_key value using the bind_variable decorator. The implementation is intentionally empty because the decorator handles the logic.

Source code in brickflow/context/context.py
@bind_variable(BrickflowBuiltInTaskVariables.task_key)
def task_key(self, *, debug: Optional[str] = None) -> Any:
    """
    This function fetches the task_key value using the bind_variable decorator.
    The implementation is intentionally empty because the decorator handles the logic.
    """
    pass

task_retry_count(*, debug: Optional[str] = None) -> Any

This function fetches the task_retry_count value using the bind_variable decorator. The implementation is intentionally empty because the decorator handles the logic.

Source code in brickflow/context/context.py
@bind_variable(BrickflowBuiltInTaskVariables.task_retry_count)
def task_retry_count(self, *, debug: Optional[str] = None) -> Any:
    """
    This function fetches the task_retry_count value using the bind_variable decorator.
    The implementation is intentionally empty because the decorator handles the logic.
    """
    pass

brickflow.context.context.ContextMode

Bases: Enum

Attributes

databricks = 'databricks' class-attribute instance-attribute

not_databricks = 'not_databricks' class-attribute instance-attribute

brickflow.context.context.TaskComsObjectResult

Bases: Enum

Attributes

NO_RESULTS = 'NO_RESULTS' class-attribute instance-attribute

Functions

brickflow.context.context.bind_variable(builtin: BrickflowBuiltInTaskVariables) -> Callable

Source code in brickflow/context/context.py
def bind_variable(builtin: BrickflowBuiltInTaskVariables) -> Callable:
    def wrapper(f: Callable) -> Callable:
        @functools.wraps(f)
        def func(*args, **kwargs):  # type: ignore
            _self: Context = args[0]  # type: ignore
            debug = kwargs["debug"]
            f(*args, **kwargs)  # no-op
            if _self.dbutils is not None:
                return _self.get_parameter(builtin.value, debug)
            return debug

        return func

    return wrapper