Skip to content

Classes

brickflow.engine.workflow.Workflow(_name: str, schedule_quartz_expression: Optional[str] = None, schedule_continuous: Optional[JobsContinuous] = None, timezone: str = 'UTC', schedule_pause_status: str = 'UNPAUSED', default_cluster: Optional[Cluster] = None, clusters: List[Cluster] = lambda: [](), health: Optional[List[JobsHealthRules]] = None, timeout_seconds: Optional[int] = None, default_task_settings: TaskSettings = TaskSettings(), email_notifications: Optional[WorkflowEmailNotifications] = None, webhook_notifications: Optional[WorkflowWebhookNotifications] = None, notification_settings: Optional[WorkflowNotificationSettings] = None, trigger: Optional[Trigger] = None, libraries: List[TaskLibrary] = lambda: [](), tags: Optional[Dict[str, str]] = None, max_concurrent_runs: int = 1, permissions: WorkflowPermissions = WorkflowPermissions(), active_task: Optional[str] = None, graph: nx.DiGraph = nx.DiGraph(), tasks: Dict[str, Task] = lambda: {}(), prefix: Optional[str] = None, suffix: Optional[str] = None, common_task_parameters: Optional[Dict[str, str]] = None, run_as_user: Optional[str] = None, run_as_service_principal: Optional[str] = None, max_tasks_in_workflow: int = 100, enable_plugins: Optional[bool] = None) dataclass

Attributes

active_task: Optional[str] = None class-attribute instance-attribute

bfs_layers: List[str] property

clusters: List[Cluster] = field(default_factory=lambda: []) class-attribute instance-attribute

common_task_parameters: Optional[Dict[str, str]] = None class-attribute instance-attribute

default_cluster: Optional[Cluster] = None class-attribute instance-attribute

default_task_settings: TaskSettings = TaskSettings() class-attribute instance-attribute

email_notifications: Optional[WorkflowEmailNotifications] = None class-attribute instance-attribute

enable_plugins: Optional[bool] = None class-attribute instance-attribute

graph: nx.DiGraph = field(default_factory=nx.DiGraph) class-attribute instance-attribute

health: Optional[List[JobsHealthRules]] = None class-attribute instance-attribute

libraries: List[TaskLibrary] = field(default_factory=lambda: []) class-attribute instance-attribute

max_concurrent_runs: int = 1 class-attribute instance-attribute

max_tasks_in_workflow: int = 100 class-attribute instance-attribute

name: str property

notification_settings: Optional[WorkflowNotificationSettings] = None class-attribute instance-attribute

permissions: WorkflowPermissions = WorkflowPermissions() class-attribute instance-attribute

prefix: Optional[str] = None class-attribute instance-attribute

run_as_service_principal: Optional[str] = None class-attribute instance-attribute

run_as_user: Optional[str] = None class-attribute instance-attribute

schedule_continuous: Optional[JobsContinuous] = None class-attribute instance-attribute

schedule_pause_status: str = 'UNPAUSED' class-attribute instance-attribute

schedule_quartz_expression: Optional[str] = None class-attribute instance-attribute

suffix: Optional[str] = None class-attribute instance-attribute

tags: Optional[Dict[str, str]] = None class-attribute instance-attribute

tasks: Dict[str, Task] = field(default_factory=lambda: {}) class-attribute instance-attribute

timeout_seconds: Optional[int] = None class-attribute instance-attribute

timezone: str = 'UTC' class-attribute instance-attribute

trigger: Optional[Trigger] = None class-attribute instance-attribute

unique_new_clusters: List[Cluster] property

webhook_notifications: Optional[WorkflowWebhookNotifications] = None class-attribute instance-attribute

Functions

bfs_task_iter() -> Iterator[Task]

Source code in brickflow/engine/workflow.py
def bfs_task_iter(self) -> Iterator[Task]:
    for layer in self.bfs_layers:
        for task_key in layer:
            yield self.get_task(task_key)

check_no_active_task() -> None

Source code in brickflow/engine/workflow.py
def check_no_active_task(self) -> None:
    if self.active_task is not None:
        raise AnotherActiveTaskError(
            "You are calling another active task in another task. Please abstract the code more."
        )

dlt_task(task_func: Optional[Callable] = None, name: Optional[str] = None, task_settings: Optional[TaskSettings] = None, depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, if_else_outcome: Optional[Dict[Union[str, str], str]] = None) -> Callable

Source code in brickflow/engine/workflow.py
def dlt_task(
    self,
    task_func: Optional[Callable] = None,
    name: Optional[str] = None,
    task_settings: Optional[TaskSettings] = None,
    depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
    if_else_outcome: Optional[Dict[Union[str, str], str]] = None,
) -> Callable:
    return self.task(
        task_func,
        name,
        task_type=TaskType.DLT,
        task_settings=task_settings,
        depends_on=depends_on,
        if_else_outcome=if_else_outcome,
    )

get_task(task_id: str) -> Task

Source code in brickflow/engine/workflow.py
@wraps_keyerror(TaskNotFoundError, "Unable to find task: ")
def get_task(self, task_id: str) -> Task:
    return self.tasks[task_id]

if_else_condition_task(task_func: Optional[Callable] = None, name: Optional[str] = None, task_settings: Optional[TaskSettings] = None, depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, if_else_outcome: Optional[Dict[Union[str, str], str]] = None) -> Callable

Source code in brickflow/engine/workflow.py
def if_else_condition_task(
    self,
    task_func: Optional[Callable] = None,
    name: Optional[str] = None,
    task_settings: Optional[TaskSettings] = None,
    depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
    if_else_outcome: Optional[Dict[Union[str, str], str]] = None,
) -> Callable:
    return self.task(
        task_func,
        name,
        task_type=TaskType.IF_ELSE_CONDITION_TASK,
        task_settings=task_settings,
        depends_on=depends_on,
        if_else_outcome=if_else_outcome,
    )

log_timeout_warning(task_settings: TaskSettings) -> bool

Source code in brickflow/engine/workflow.py
def log_timeout_warning(self, task_settings: TaskSettings) -> bool:
    if task_settings is not None and self.timeout_seconds is not None:
        if task_settings.timeout_seconds is not None:
            if task_settings.timeout_seconds > self.timeout_seconds:
                return True
    return False

notebook_task(task_func: Optional[Callable] = None, name: Optional[str] = None, cluster: Optional[Cluster] = None, libraries: Optional[List[TaskLibrary]] = None, task_settings: Optional[TaskSettings] = None, depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, if_else_outcome: Optional[Dict[Union[str, str], str]] = None) -> Callable

Source code in brickflow/engine/workflow.py
def notebook_task(
    self,
    task_func: Optional[Callable] = None,
    name: Optional[str] = None,
    cluster: Optional[Cluster] = None,
    libraries: Optional[List[TaskLibrary]] = None,
    task_settings: Optional[TaskSettings] = None,
    depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
    if_else_outcome: Optional[Dict[Union[str, str], str]] = None,
) -> Callable:
    return self.task(
        task_func,
        name,
        cluster=cluster,
        libraries=libraries,
        task_type=TaskType.NOTEBOOK_TASK,
        task_settings=task_settings,
        depends_on=depends_on,
        if_else_outcome=if_else_outcome,
    )

parents(node: str) -> Iterator

Source code in brickflow/engine/workflow.py
def parents(self, node: str) -> Iterator:
    return self.graph.predecessors(node)

pop_task(task_id: str) -> None

Source code in brickflow/engine/workflow.py
@wraps_keyerror(TaskNotFoundError, "Unable to find task: ")
def pop_task(self, task_id: str) -> None:
    # Pop from dict and graph
    self.tasks.pop(task_id)
    self.graph.remove_node(task_id)

run_job_task(task_func: Optional[Callable] = None, name: Optional[str] = None, task_settings: Optional[TaskSettings] = None, depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, if_else_outcome: Optional[Dict[Union[str, str], str]] = None) -> Callable

Source code in brickflow/engine/workflow.py
def run_job_task(
    self,
    task_func: Optional[Callable] = None,
    name: Optional[str] = None,
    task_settings: Optional[TaskSettings] = None,
    depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
    if_else_outcome: Optional[Dict[Union[str, str], str]] = None,
) -> Callable:
    return self.task(
        task_func,
        name,
        task_type=TaskType.RUN_JOB_TASK,
        task_settings=task_settings,
        depends_on=depends_on,
        if_else_outcome=if_else_outcome,
    )

spark_jar_task(task_func: Optional[Callable] = None, name: Optional[str] = None, cluster: Optional[Cluster] = None, libraries: Optional[List[TaskLibrary]] = None, task_settings: Optional[TaskSettings] = None, depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, if_else_outcome: Optional[Dict[Union[str, str], str]] = None) -> Callable

Source code in brickflow/engine/workflow.py
def spark_jar_task(
    self,
    task_func: Optional[Callable] = None,
    name: Optional[str] = None,
    cluster: Optional[Cluster] = None,
    libraries: Optional[List[TaskLibrary]] = None,
    task_settings: Optional[TaskSettings] = None,
    depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
    if_else_outcome: Optional[Dict[Union[str, str], str]] = None,
) -> Callable:
    return self.task(
        task_func,
        name,
        cluster=cluster,
        libraries=libraries,
        task_type=TaskType.SPARK_JAR_TASK,
        task_settings=task_settings,
        depends_on=depends_on,
        if_else_outcome=if_else_outcome,
    )

sql_task(task_func: Optional[Callable] = None, name: Optional[str] = None, task_settings: Optional[TaskSettings] = None, depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, if_else_outcome: Optional[Dict[Union[str, str], str]] = None) -> Callable

Source code in brickflow/engine/workflow.py
def sql_task(
    self,
    task_func: Optional[Callable] = None,
    name: Optional[str] = None,
    task_settings: Optional[TaskSettings] = None,
    depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
    if_else_outcome: Optional[Dict[Union[str, str], str]] = None,
) -> Callable:
    return self.task(
        task_func,
        name,
        task_type=TaskType.SQL,
        task_settings=task_settings,
        depends_on=depends_on,
        if_else_outcome=if_else_outcome,
    )

task(task_func: Optional[Callable] = None, name: Optional[str] = None, cluster: Optional[Cluster] = None, libraries: Optional[List[TaskLibrary]] = None, task_type: TaskType = TaskType.BRICKFLOW_TASK, depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, trigger_rule: BrickflowTriggerRule = BrickflowTriggerRule.ALL_SUCCESS, custom_execute_callback: Optional[Callable] = None, task_settings: Optional[TaskSettings] = None, ensure_brickflow_plugins: bool = False, if_else_outcome: Optional[Dict[Union[str, str], str]] = None) -> Callable

Source code in brickflow/engine/workflow.py
def task(
    self,
    task_func: Optional[Callable] = None,
    name: Optional[str] = None,
    cluster: Optional[Cluster] = None,
    libraries: Optional[List[TaskLibrary]] = None,
    task_type: TaskType = TaskType.BRICKFLOW_TASK,
    depends_on: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None,
    trigger_rule: BrickflowTriggerRule = BrickflowTriggerRule.ALL_SUCCESS,
    custom_execute_callback: Optional[Callable] = None,
    task_settings: Optional[TaskSettings] = None,
    ensure_brickflow_plugins: bool = False,
    if_else_outcome: Optional[Dict[Union[str, str], str]] = None,
) -> Callable:
    if len(self.tasks) >= self.max_tasks_in_workflow:
        raise ValueError(
            "You have reached the maximum number of tasks allowed in a databricks workflow. "
            "Please split your workflow into multiple workflows or raise a feature request "
            "with your Databricks team."
        )

    def task_wrapper(f: Callable) -> Callable:
        task_id = name or f.__name__

        self._add_task(
            f,
            task_id,
            cluster=cluster,
            task_type=task_type,
            libraries=libraries,
            depends_on=depends_on,
            trigger_rule=trigger_rule,
            custom_execute_callback=custom_execute_callback,
            task_settings=task_settings,
            ensure_brickflow_plugins=ensure_brickflow_plugins,
            if_else_outcome=if_else_outcome,
        )

        @functools.wraps(f)
        def func(*args, **kwargs):  # type: ignore
            try:
                self.check_no_active_task()
                self._set_active_task(task_id)
                resp = f(*args, **kwargs)
                return resp
            except Exception as e:
                self._reset_active_task()
                raise e
            finally:
                self._reset_active_task()

        return func

    if task_func is not None:
        if callable(task_func):
            return task_wrapper(task_func)
        else:
            raise NoCallableTaskError(
                "Please use task decorator against a callable function."
            )

    return task_wrapper

task_exists(task_id: str) -> bool

Source code in brickflow/engine/workflow.py
def task_exists(self, task_id: str) -> bool:
    return task_id in self.tasks

task_iter() -> Iterator[Task]

Source code in brickflow/engine/workflow.py
def task_iter(self) -> Iterator[Task]:
    for task in self.bfs_task_iter():
        yield task

unique_new_clusters_dict() -> List[Dict[str, Any]]

Source code in brickflow/engine/workflow.py
def unique_new_clusters_dict(self) -> List[Dict[str, Any]]:
    self.validate_new_clusters_with_unique_names()
    all_unique_clusters = self.unique_new_clusters
    return [
        # job clusters do not need names
        {
            "job_cluster_key": c.name,
            "new_cluster": c.as_dict(remove_fields=["name"]),
        }
        for c in all_unique_clusters
    ]

validate_new_clusters_with_unique_names() -> None

Source code in brickflow/engine/workflow.py
def validate_new_clusters_with_unique_names(self) -> None:
    all_unique_clusters = self.unique_new_clusters
    unique_name_list: Dict[str, Optional[str]] = {}
    duplicates = []
    for cluster in all_unique_clusters:
        if cluster.name not in unique_name_list:
            unique_name_list[cluster.name] = None
        else:
            duplicates.append(cluster.name)

    duplicate_list = list(set(duplicates))
    if len(duplicate_list) > 0:
        raise DuplicateClustersDefinitionError(
            f"Found duplicate cluster definitions in your workflow: {self.name}, "
            f"with names: {duplicate_list}"
        )

validate_schedule_configs() -> None

Source code in brickflow/engine/workflow.py
def validate_schedule_configs(self) -> None:
    allowed_scheduled_pause_statuses = ["PAUSED", "UNPAUSED"]
    self.schedule_pause_status = self.schedule_pause_status.upper()
    if self.schedule_pause_status not in allowed_scheduled_pause_statuses:
        raise WorkflowConfigError(
            f"schedule_pause_status must be one of {allowed_scheduled_pause_statuses}"
        )

    if (
        self.schedule_quartz_expression is not None
        and self.schedule_continuous is not None
    ):
        raise WorkflowConfigError(
            "Please configure either schedule_quartz_expression or schedule_continuous for workflow"
        )

    if self.trigger is not None and self.schedule_continuous is not None:
        raise WorkflowConfigError(
            "Please configure either trigger or schedule_continuous for workflow"
        )

    if self.schedule_continuous is not None:
        self.schedule_continuous.pause_status = (
            self.schedule_continuous.pause_status.upper()
        )

        if (
            self.schedule_continuous.pause_status
            not in allowed_scheduled_pause_statuses
        ):
            raise WorkflowConfigError(
                "Please configure either PAUSED or UNPAUSED for schedule_continuous.pause_status"
            )

brickflow.engine.workflow.User(name: str) dataclass

Bases: ScimEntity

Functions

to_access_control() -> Dict[str, str]

Source code in brickflow/engine/workflow.py
def to_access_control(self) -> Dict[str, str]:
    return {"user_name": self.name}

brickflow.engine.workflow.Group(name: str) dataclass

Bases: ScimEntity

Functions

to_access_control() -> Dict[str, str]

Source code in brickflow/engine/workflow.py
def to_access_control(self) -> Dict[str, str]:
    return {"group_name": self.name}

brickflow.engine.workflow.ServicePrincipal(name: str) dataclass

Bases: ScimEntity

Functions

to_access_control() -> Dict[str, str]

Source code in brickflow/engine/workflow.py
def to_access_control(self) -> Dict[str, str]:
    return {"service_principal_name": self.name}