Skip to content

Classes

brickflow.engine.task.Task(task_id: str, task_func: Callable, workflow: Workflow, cluster: Cluster, description: Optional[str] = None, libraries: List[TaskLibrary] = lambda: [](), depends_on: List[Union[Callable, str]] = lambda: [](), task_type: TaskType = TaskType.BRICKFLOW_TASK, trigger_rule: BrickflowTriggerRule = BrickflowTriggerRule.ALL_SUCCESS, task_settings: Optional[TaskSettings] = None, custom_execute_callback: Optional[Callable] = None, ensure_brickflow_plugins: bool = False, health: Optional[List[JobsTasksHealthRules]] = None, if_else_outcome: Optional[Dict[Union[str, str], str]] = None) dataclass

Attributes

brickflow_default_params: Dict[str, str] property

builtin_notebook_params: Dict[str, str] property

cluster: Cluster instance-attribute

custom_execute_callback: Optional[Callable] = None class-attribute instance-attribute

custom_task_parameters: Dict[str, str] property

databricks_task_type_str: str property

depends_on: List[Union[Callable, str]] = field(default_factory=lambda: []) class-attribute instance-attribute

depends_on_names: Iterator[Dict[str, Optional[str]]] property

description: Optional[str] = None class-attribute instance-attribute

ensure_brickflow_plugins: bool = False class-attribute instance-attribute

health: Optional[List[JobsTasksHealthRules]] = None class-attribute instance-attribute

if_else_outcome: Optional[Dict[Union[str, str], str]] = None class-attribute instance-attribute

libraries: List[TaskLibrary] = field(default_factory=lambda: []) class-attribute instance-attribute

name: str property

parents: List[str] property

task_func: Callable instance-attribute

task_func_name: str property

task_id: str instance-attribute

task_settings: Optional[TaskSettings] = None class-attribute instance-attribute

task_type: TaskType = TaskType.BRICKFLOW_TASK class-attribute instance-attribute

trigger_rule: BrickflowTriggerRule = BrickflowTriggerRule.ALL_SUCCESS class-attribute instance-attribute

workflow: Workflow instance-attribute

Functions

execute(ignore_all_deps: bool = False) -> Any

Source code in brickflow/engine/task.py
@with_brickflow_logger
def execute(self, ignore_all_deps: bool = False) -> Any:
    # Workflow is:
    #   1. Check to see if there selected tasks and if there are is this task in the list
    #   2. Check to see if the previous task is skipped and trigger rule.
    #   3. Check to see if this a custom python task and execute it
    #   4. Execute the task function
    _ilog.setLevel(logging.INFO)  # enable logging for task execution
    ctx._set_current_task(self.name)
    self._ensure_brickflow_plugins()  # if you are expecting brickflow plugins to be installed
    if ignore_all_deps is True:
        _ilog.info(
            "Ignoring all dependencies for task: %s due to debugging", self.name
        )
    _select_task_skip, _select_task_skip_reason = self._skip_because_not_selected()
    if _select_task_skip is True and ignore_all_deps is False:
        # check if this task is skipped due to task selection
        _ilog.info(
            "Skipping task... %s for reason: %s",
            self.name,
            _select_task_skip_reason,
        )
        ctx._reset_current_task()
        return
    _skip, reason = self.should_skip()
    if _skip is True and ignore_all_deps is False:
        _ilog.info("Skipping task... %s for reason: %s", self.name, reason)
        ctx.task_coms.put(self.name, BRANCH_SKIP_EXCEPT, SKIP_EXCEPT_HACK)
        ctx._reset_current_task()
        return

    _ilog.info("Executing task... %s", self.name)
    _ilog.info("%s", pretty_print_function_source(self.name, self.task_func))

    brickflow_execution_hook = get_brickflow_tasks_hook()

    initial_resp: TaskResponse = brickflow_execution_hook.task_execute(
        task=self, workflow=self.workflow
    )
    resp: TaskResponse = brickflow_execution_hook.handle_results(
        resp=initial_resp, task=self, workflow=self.workflow
    )
    if resp.push_return_value is True:
        ctx.task_coms.put(self.name, RETURN_VALUE_KEY, resp.response)
    ctx._reset_current_task()
    return resp.response

get_obj_dict(entrypoint: str) -> Dict[str, Any]

Source code in brickflow/engine/task.py
def get_obj_dict(self, entrypoint: str) -> Dict[str, Any]:
    return {
        "notebook_path": self.handle_notebook_path(entrypoint),
        "base_parameters": {
            **self.builtin_notebook_params,
            **self.brickflow_default_params,
            **self.custom_task_parameters,  # type: ignore
            # **(self.custom_unique_task_parameters or {}),
            # TODO: implement only after validating limit on parameters
        },
    }

get_runtime_parameter_values() -> Dict[str, Any]

Source code in brickflow/engine/task.py
def get_runtime_parameter_values(self) -> Dict[str, Any]:
    # if dbutils returns None then return v instead
    return {
        k: (ctx.get_parameter(k, str(v)) or v)
        for k, v in (
            inspect.getfullargspec(self.task_func).kwonlydefaults or {}
        ).items()
    }

handle_notebook_path(entrypoint: str) -> str staticmethod

Source code in brickflow/engine/task.py
@staticmethod
def handle_notebook_path(entrypoint: str) -> str:
    # local will get created as workspace notebook job and not a git source job
    if ctx.env == BrickflowDefaultEnvs.LOCAL.value:
        # check and ensure suffix has .py extension
        return entrypoint if entrypoint.endswith(".py") else f"{entrypoint}.py"
    return entrypoint

is_valid_task_signature() -> None

Source code in brickflow/engine/task.py
def is_valid_task_signature(self) -> None:
    # only supports kwonlyargs with defaults
    spec: inspect.FullArgSpec = inspect.getfullargspec(self.task_func)
    sig: inspect.Signature = inspect.signature(self.task_func)
    signature_error_msg = (
        "Task signatures only supports kwargs with defaults. or catch all varkw **kwargs"
        "For example def execute(*, variable_a=None, variable_b=None, **kwargs). "
        f"Please fix function def {self.task_func_name}{sig}: ..."
    )
    kwargs_default_error_msg = (
        f"Keyword arguments must be Strings. "
        f"Please handle booleans and numbers via strings. "
        f"Please fix function def {self.task_func_name}{sig}: ..."
    )

    valid_case = spec.args == [] and spec.varargs is None and spec.defaults is None
    for _, v in (spec.kwonlydefaults or {}).items():
        # in python boolean is a type of int must be captured here via short circuit
        if not (isinstance(v, str) or v is None):
            raise InvalidTaskSignatureDefinition(kwargs_default_error_msg)
    if valid_case:
        return

    raise InvalidTaskSignatureDefinition(signature_error_msg)

should_skip() -> Tuple[bool, Optional[str]]

Source code in brickflow/engine/task.py
def should_skip(self) -> Tuple[bool, Optional[str]]:
    # return true or false and reason
    node_skip_checks = []
    for parent in self.parents:
        if parent != ROOT_NODE:
            try:
                task_to_not_skip = ctx.task_coms.get(parent, BRANCH_SKIP_EXCEPT)
                if self.name != task_to_not_skip:
                    # set this task to skip hack to keep to empty to trigger failure
                    # key look up will fail
                    node_skip_checks.append(True)
                else:
                    node_skip_checks.append(False)
            except Exception:
                # ignore errors as it probably doesnt exist
                # TODO: log errors
                node_skip_checks.append(False)
    if not node_skip_checks:
        return False, None
    if self.trigger_rule == BrickflowTriggerRule.NONE_FAILED:
        # by default a task failure automatically skips
        return self._get_skip_with_reason(
            all(node_skip_checks),
            "At least one task before this were not successful",
        )
    # default is BrickflowTriggerRule.ALL_SUCCESS
    return self._get_skip_with_reason(
        any(node_skip_checks), "All tasks before this were not successful"
    )

brickflow.engine.task.EmailNotifications(on_failure: Optional[List[str]] = None, on_success: Optional[List[str]] = None, on_start: Optional[List[str]] = None) dataclass

Attributes

on_failure: Optional[List[str]] = None class-attribute instance-attribute

on_start: Optional[List[str]] = None class-attribute instance-attribute

on_success: Optional[List[str]] = None class-attribute instance-attribute

Functions

to_tf_dict() -> Dict[str, Optional[List[str]]]

Source code in brickflow/engine/task.py
def to_tf_dict(self) -> Dict[str, Optional[List[str]]]:
    return {
        "on_start": self.on_start,
        "on_failure": self.on_failure,
        "on_success": self.on_success,
    }

brickflow.engine.task.JarTaskLibrary(jar: str) dataclass

Bases: StorageBasedTaskLibrary

Parameters:

Name Type Description Default
jar str

String to s3/dbfs path for jar

required

Attributes

jar: str instance-attribute

brickflow.engine.task.EggTaskLibrary(egg: str) dataclass

Bases: StorageBasedTaskLibrary

Parameters:

Name Type Description Default
egg str

String to s3/dbfs path for egg

required

Attributes

egg: str instance-attribute

brickflow.engine.task.WheelTaskLibrary(whl: str) dataclass

Bases: StorageBasedTaskLibrary

Parameters:

Name Type Description Default
whl str

String to s3/dbfs path for whl

required

Attributes

whl: str instance-attribute

brickflow.engine.task.PypiTaskLibrary(package: str, repo: Optional[str] = None) dataclass

Bases: TaskLibrary

Parameters:

Name Type Description Default
package str

The package in pypi i.e. requests, requests==x.y.z, git+https://github.com/Nike-Inc/brickflow.git

required
repo Optional[str]

The repository where the package can be found. By default pypi is used

None

Attributes

dict: Dict[str, Union[str, Dict[str, str]]] property

package: str instance-attribute

repo: Optional[str] = None class-attribute instance-attribute

brickflow.engine.task.MavenTaskLibrary(coordinates: str, repo: Optional[str] = None, exclusions: Optional[List[str]] = None) dataclass

Bases: TaskLibrary

Parameters:

Name Type Description Default
coordinates str

Gradle-style Maven coordinates. For example: org.jsoup:jsoup:1.7.2.

required
repo Optional[str]

Maven repo to install the Maven package from. If omitted, both Maven Central Repository and Spark Packages are searched.

None
exclusions Optional[List[str]]

List of dependences to exclude. For example: ["slf4j:slf4j", "*:hadoop-client"]. Maven dependency exclusions: https://maven.apache.org/guides/introduction/introduction-to-optional-and-excludes-dependencies.html.

None

Attributes

coordinates: str instance-attribute

dict: Dict[str, Union[str, Dict[str, str]]] property

exclusions: Optional[List[str]] = None class-attribute instance-attribute

repo: Optional[str] = None class-attribute instance-attribute

brickflow.engine.task.CranTaskLibrary(package: str, repo: Optional[str] = None) dataclass

Bases: TaskLibrary

Parameters:

Name Type Description Default
package str

The name of the CRAN package to install.

required
repo Optional[str]

The repository where the package can be found. If not specified, the default CRAN repo is used.

None

Attributes

dict: Dict[str, Union[str, Dict[str, str]]] property

package: str instance-attribute

repo: Optional[str] = None class-attribute instance-attribute

brickflow.engine.task.BrickflowTriggerRule

Bases: Enum

Attributes

ALL_SUCCESS = 'all_success' class-attribute instance-attribute

NONE_FAILED = 'none_failed' class-attribute instance-attribute

brickflow.engine.task.BrickflowTaskEnvVars

Bases: Enum

Attributes

BRICKFLOW_SELECT_TASKS = 'BRICKFLOW_SELECT_TASKS' class-attribute instance-attribute

brickflow.engine.task.TaskSettings(email_notifications: Optional[EmailNotifications] = None, notification_settings: Optional[TaskNotificationSettings] = None, timeout_seconds: Optional[int] = None, max_retries: Optional[int] = None, min_retry_interval_millis: Optional[int] = None, retry_on_timeout: Optional[bool] = None, run_if: Optional[TaskRunCondition] = None) dataclass

Attributes

email_notifications: Optional[EmailNotifications] = None class-attribute instance-attribute

max_retries: Optional[int] = None class-attribute instance-attribute

min_retry_interval_millis: Optional[int] = None class-attribute instance-attribute

notification_settings: Optional[TaskNotificationSettings] = None class-attribute instance-attribute

retry_on_timeout: Optional[bool] = None class-attribute instance-attribute

run_if: Optional[TaskRunCondition] = None class-attribute instance-attribute

timeout_seconds: Optional[int] = None class-attribute instance-attribute

Functions

merge(other: Optional['TaskSettings']) -> 'TaskSettings'

Source code in brickflow/engine/task.py
def merge(self, other: Optional["TaskSettings"]) -> "TaskSettings":
    # overrides top level values
    if other is None:
        return self
    return TaskSettings(
        other.email_notifications or self.email_notifications,
        other.notification_settings or self.notification_settings,
        other.timeout_seconds or self.timeout_seconds or 0,
        other.max_retries or self.max_retries,
        other.min_retry_interval_millis or self.min_retry_interval_millis,
        other.retry_on_timeout or self.retry_on_timeout,
        other.run_if or self.run_if,
    )

to_tf_dict() -> Dict[str, Optional[str] | Optional[int] | Optional[bool] | Optional[Dict[str, Optional[List[str]]]]]

Source code in brickflow/engine/task.py
def to_tf_dict(
    self,
) -> Dict[
    str,
    Optional[str]
    | Optional[int]
    | Optional[bool]
    | Optional[Dict[str, Optional[List[str]]]],
]:
    email_not = (
        self.email_notifications.to_tf_dict()
        if self.email_notifications is not None
        else {}
    )
    notification_settings = (
        {}
        if self.notification_settings is None
        else {"notification_settings": self.notification_settings.dict()}
    )
    return {
        **notification_settings,
        "email_notifications": email_not,
        "timeout_seconds": self.timeout_seconds,
        "max_retries": self.max_retries,
        "min_retry_interval_millis": self.min_retry_interval_millis,
        "retry_on_timeout": self.retry_on_timeout,
        **({"run_if": self.run_if.value} if self.run_if else {}),
    }

brickflow.engine.task.TaskType

Bases: Enum

Attributes

BRICKFLOW_TASK = 'brickflow_task' class-attribute instance-attribute

CUSTOM_PYTHON_TASK = 'custom_python_task' class-attribute instance-attribute

DLT = 'pipeline_task' class-attribute instance-attribute

IF_ELSE_CONDITION_TASK = 'condition_task' class-attribute instance-attribute

NOTEBOOK_TASK = 'notebook_task' class-attribute instance-attribute

RUN_JOB_TASK = 'run_job_task' class-attribute instance-attribute

SPARK_JAR_TASK = 'spark_jar_task' class-attribute instance-attribute

SQL = 'sql_task' class-attribute instance-attribute