Workflows
A Workflow is similar to an Airflow dag that lets you encapsulate a set of tasks.
Here is an example of a workflow. Click the plus buttons to understand all the parts of the workflow file.
from datetime import timedelta
from brickflow import (Workflow, Cluster, WorkflowPermissions, User,
TaskSettings, EmailNotifications, PypiTaskLibrary, MavenTaskLibrary, JobsParameters)
wf = Workflow( # (1)!
"wf_test", # (2)!
default_cluster=Cluster.from_existing_cluster("your_existing_cluster_id"), # (3)!
# Optional parameters below
schedule_quartz_expression="0 0/20 0 ? * * *", # (4)!
timezone="UTC", # (5)!
schedule_pause_status="PAUSED", # (15)!
default_task_settings=TaskSettings( # (6)!
email_notifications=EmailNotifications(
on_start=["email@nike.com"],
on_success=["email@nike.com"],
on_failure=["email@nike.com"],
on_duration_warning_threshold_exceeded=["email@nike.com"]
),
timeout_seconds=timedelta(hours=2).seconds
),
libraries=[ # (7)!
PypiTaskLibrary(package="requests"),
MavenTaskLibrary(coordinates="com.cronutils:cron-utils:9.2.0"),
],
tags={ # (8)!
"product_id": "brickflow_demo",
"slack_channel": "nike-sole-brickflow-support"
},
max_concurrent_runs=1, # (9)!
permissions=WorkflowPermissions( # (10)!
can_manage_run=[User("abc@abc.com")],
can_view=[User("abc@abc.com")],
can_manage=[User("abc@abc.com")],
),
prefix="feature-jira-xxx", # (11)!
suffix="_qa1", # (12)!
common_task_parameters={ # (13)!
"catalog": "development",
"database": "your_database"
},
health = { # (16)!
"metric": "RUN_DURATION_SECONDS",
"op": "GREATER_THAN",
"value": 7200
},
parameters=[JobsParameters(default="INFO", name="jp_logging_level")], # (18)!
)
@wf.task() # (14)!
def task_function(*, test="var"):
return "hello world"
- Workflow definition which constructs the workflow object
- Define the workflow name
- The default cluster used for all the tasks in the workflow. This is an all-purpose cluster, but you can also create a job cluster
- Cron expression in the quartz format
- Define the timezone for your workflow. It is defaulted to UTC
- Default task setting that can be used for all the tasks
- Libraries that need to be installed for all the tasks
- Tags for the resulting workflow and other objects created during the workflow.
- Define the maximum number of concurrent runs
- Define the permissions on the workflow
- Prefix for the name of the workflow
- Suffix for the name of the workflow
- Define the common task parameters that can be used in all the tasks
- Define a workflow task and associate it to the workflow
- Define the schedule pause status. It is defaulted to "UNPAUSED"
- Define health check condition that triggers duration warning threshold exceeded notifications
- Define timeout_seconds check condition that triggers workflow failure if duration exceeds threshold
- Define the parameters on workflow level databricks docs
Clusters¶
There are two ways to define the cluster for the workflow or a task
Using an existing cluster¶
from brickflow import Cluster
default_cluster=Cluster.from_existing_cluster("your_existing_cluster_id")
Use a job cluster¶
from brickflow import Cluster
default_cluster=Cluster(
name="your_cluster_name",
spark_version='11.3.x-scala2.12',
node_type_id='m6g.xlarge',
driver_node_type_id='m6g.xlarge',
min_workers=1,
max_workers=3,
enable_elastic_disk=True,
policy_id='your_policy_id',
aws_attributes={
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"instance_profile_arn": "arn:aws:iam::XXXX:instance-profile/XXXX/group/XX",
"spot_bid_price_percent": 100,
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 3,
"ebs_volume_size": 100
}
)
Use a job cluster with driver and worker instances from the same pool¶
from brickflow import Cluster
default_cluster=Cluster(
name="your_cluster_name",
spark_version='11.3.x-scala2.12',
min_workers=1,
max_workers=3,
instance_pool_id="your_instance_pool_id",
policy_id='policy_id_of_pool',
)
Use a job cluster with driver and worker instances from different pools¶
from brickflow import Cluster
default_cluster=Cluster(
name="your_cluster_name",
spark_version='11.3.x-scala2.12',
min_workers=1,
max_workers=3,
instance_pool_id="your_workers_instance_pool_id",
driver_instance_pool_id="your_driver_instance_pool_id",
policy_id='policy_id_of_pools',
)
Permissions¶
Brickflow provides an opportunity to manage permissions on the workflows. You can provide individual users or to a group or to a ServicePrincipal that can help manage, run or view the workflows.
Below example is for reference
from brickflow import WorkflowPermissions, User, Group, ServicePrincipal
permissions=WorkflowPermissions(
can_manage_run=[
User("abc@abc.com"),
Group("app.xyz.team.Developer"),
ServicePrincipal("ServicePrinciple_dbx_url.app.xyz.team.Developer")
],
can_view=[User("abc@abc.com")],
can_manage=[User("abc@abc.com")],
)
Tags¶
Using brickflow, custom tags can be created on the workflow - but there are also some default tags that are created while the job is deployed.
The defaults tags that gets automatically attached to the workflow are below
- "brickflow_project_name" : Brickflow Project Name that is referred from the entrypoint.py file
- "brickflow_version" : Brickflow Version that is used to deploy the workflow
- "databricks_tf_provider_version" : Databricks terraform provider version that is used to deploy the workflow
- "deployed_by" : Email id of the profile that is used to deploy the workflow. It can be a user or a service principle. Whichever id is used to deploy the workflow, automatically becomes the owner of the workflow
- "environment" : Environment to which the workflow is identified to
Use the below reference to define more tags and attach to the workflow. These can be used for collecting various metrics and build dashboards.
tags={
"product_id": "brickflow_demo",
"slack_channel": "nike-sole-brickflow-support"
}
Schedule¶
Databricks workflows uses Quartz cron expression unlike airflow's unix based cron scheduler. A typical Quartz cron expression have six or seven fields, seperated by spaces
Below is a sample
Tasksettings¶
Task setting at workflow level can be used to have common setting defined that will be applicable for all the tasks. Below is a sample that can be used for reference and all the parameters in TaskSettings are optional
from datetime import timedelta
from brickflow import TaskSettings, EmailNotifications
default_task_settings=TaskSettings(
email_notifications=EmailNotifications(
on_start=["email@nike.com"],
on_success=["email@nike.com"],
on_failure=["email@nike.com"],
on_duration_warning_threshold_exceeded=["email@nike.com"]
),
timeout_seconds=timedelta(hours=2).seconds,
max_retries=2,
min_retry_interval_millis=60000,
retry_on_timeout=True
)
Libraries¶
Brickflow allows to specify libraries that are need to be installed and used across different tasks. There are many ways to install library from different repositories/sources
from brickflow import PypiTaskLibrary, MavenTaskLibrary, StorageBasedTaskLibrary, \
JarTaskLibrary, EggTaskLibrary, WheelTaskLibrary
libraries=[
PypiTaskLibrary(package="requests"),
MavenTaskLibrary(coordinates="com.cronutils:cron-utils:9.2.0"),
StorageBasedTaskLibrary("s3://..."),
StorageBasedTaskLibrary("dbfs://..."),
JarTaskLibrary("s3://..."),
JarTaskLibrary("dbfs://..."),
EggTaskLibrary("s3://..."),
EggTaskLibrary("dbfs://..."),
WheelTaskLibrary("s3://..."),
WheelTaskLibrary("dbfs://..."),
]
Common task parameters¶
Define the common parameters that can be used in all the tasks. Example could be database name, secrets_id etc
common_task_parameters={
"catalog": "development",
"database": "your_database"
}
Jobs Parameters¶
You can configure parameters on a job that are passed to any of the job’s tasks that accept key-value parameters, including Python wheel files configured to accept keyword arguments. Parameters set at the job level are added to configured task-level parameters. Job parameters passed to tasks are visible in the task configuration, along with any parameters configured on the task.
You can also pass job parameters to tasks that are not configured with key-value parameters such as JAR or Spark Submit tasks. To pass job parameters to these tasks, format arguments as {{job.parameters.[name]}}, replacing [name] with the key that identifies the parameter.
Job parameters take precedence over task parameters. If a job parameter and a task parameter have the same key, the job parameter overrides the task parameter.
You can override configured job parameters or add new job parameters when you run a job with different parameters or repair a job run.