Workflows
A Workflow is similar to an Airflow dag that lets you encapsulate a set of tasks.
Here is an example of a workflow. Click the plus buttons to understand all the parts of the workflow file.
from datetime import timedelta
from brickflow import Workflow, Cluster, WorkflowPermissions, User, \
TaskSettings, EmailNotifications, PypiTaskLibrary, MavenTaskLibrary
wf = Workflow( # (1)!
"wf_test", # (2)!
default_cluster=Cluster.from_existing_cluster("your_existing_cluster_id"), # (3)!
# Optional parameters below
schedule_quartz_expression="0 0/20 0 ? * * *", # (4)!
timezone="UTC", # (5)!
schedule_pause_status="PAUSED", # (15)!
default_task_settings=TaskSettings( # (6)!
email_notifications=EmailNotifications(
on_start=["email@nike.com"],
on_success=["email@nike.com"],
on_failure=["email@nike.com"],
on_duration_warning_threshold_exceeded=["email@nike.com"]
),
timeout_seconds=timedelta(hours=2).seconds
),
libraries=[ # (7)!
PypiTaskLibrary(package="requests"),
MavenTaskLibrary(coordinates="com.cronutils:cron-utils:9.2.0"),
],
tags={ # (8)!
"product_id": "brickflow_demo",
"slack_channel": "nike-sole-brickflow-support"
},
max_concurrent_runs=1, # (9)!
permissions=WorkflowPermissions( # (10)!
can_manage_run=[User("abc@abc.com")],
can_view=[User("abc@abc.com")],
can_manage=[User("abc@abc.com")],
),
prefix="feature-jira-xxx", # (11)!
suffix="_qa1", # (12)!
common_task_parameters={ # (13)!
"catalog": "development",
"database": "your_database"
},
health = { # (16)!
"metric": "RUN_DURATION_SECONDS",
"op": "GREATER_THAN",
"value": 7200
}
)
@wf.task() # (14)!
def task_function(*, test="var"):
return "hello world"
- Workflow definition which constructs the workflow object
- Define the workflow name
- The default cluster used for all the tasks in the workflow. This is an all-purpose cluster, but you can also create a job cluster
- Cron expression in the quartz format
- Define the timezone for your workflow. It is defaulted to UTC
- Default task setting that can be used for all the tasks
- Libraries that need to be installed for all the tasks
- Tags for the resulting workflow and other objects created during the workflow.
- Define the maximum number of concurrent runs
- Define the permissions on the workflow
- Prefix for the name of the workflow
- Suffix for the name of the workflow
- Define the common task parameters that can be used in all the tasks
- Define a workflow task and associate it to the workflow
- Define the schedule pause status. It is defaulted to "UNPAUSED"
- Define health check condition that triggers duration warning threshold exceeded notifications
- Define timeout_seconds check condition that triggers workflow failure if duration exceeds threshold
Clusters¶
There are two ways to define the cluster for the workflow or a task
Using an existing cluster¶
from brickflow import Cluster
default_cluster=Cluster.from_existing_cluster("your_existing_cluster_id")
Use a job cluster¶
from brickflow import Cluster
default_cluster=Cluster(
name="your_cluster_name",
spark_version='11.3.x-scala2.12',
node_type_id='m6g.xlarge',
driver_node_type_id='m6g.xlarge',
min_workers=1,
max_workers=3,
enable_elastic_disk=True,
policy_id='your_policy_id',
aws_attributes={
"first_on_demand": 1,
"availability": "SPOT_WITH_FALLBACK",
"instance_profile_arn": "arn:aws:iam::XXXX:instance-profile/XXXX/group/XX",
"spot_bid_price_percent": 100,
"ebs_volume_type": "GENERAL_PURPOSE_SSD",
"ebs_volume_count": 3,
"ebs_volume_size": 100
}
)
Use a job cluster with driver and worker instances from the same pool¶
from brickflow import Cluster
default_cluster=Cluster(
name="your_cluster_name",
spark_version='11.3.x-scala2.12',
min_workers=1,
max_workers=3,
instance_pool_id="your_instance_pool_id",
policy_id='policy_id_of_pool',
)
Use a job cluster with driver and worker instances from different pools¶
from brickflow import Cluster
default_cluster=Cluster(
name="your_cluster_name",
spark_version='11.3.x-scala2.12',
min_workers=1,
max_workers=3,
instance_pool_id="your_workers_instance_pool_id",
driver_instance_pool_id="your_driver_instance_pool_id",
policy_id='policy_id_of_pools',
)
Permissions¶
Brickflow provides an opportunity to manage permissions on the workflows. You can provide individual users or to a group or to a ServicePrincipal that can help manage, run or view the workflows.
Below example is for reference
from brickflow import WorkflowPermissions, User, Group, ServicePrincipal
permissions=WorkflowPermissions(
can_manage_run=[
User("abc@abc.com"),
Group("app.xyz.team.Developer"),
ServicePrincipal("ServicePrinciple_dbx_url.app.xyz.team.Developer")
],
can_view=[User("abc@abc.com")],
can_manage=[User("abc@abc.com")],
)
Tags¶
Using brickflow, custom tags can be created on the workflow - but there are also some default tags that are created while the job is deployed.
The defaults tags that gets automatically attached to the workflow are below
- "brickflow_project_name" : Brickflow Project Name that is referred from the entrypoint.py file
- "brickflow_version" : Brickflow Version that is used to deploy the workflow
- "databricks_tf_provider_version" : Databricks terraform provider version that is used to deploy the workflow
- "deployed_by" : Email id of the profile that is used to deploy the workflow. It can be a user or a service principle. Whichever id is used to deploy the workflow, automatically becomes the owner of the workflow
- "environment" : Environment to which the workflow is identified to
Use the below reference to define more tags and attach to the workflow. These can be used for collecting various metrics and build dashboards.
tags={
"product_id": "brickflow_demo",
"slack_channel": "nike-sole-brickflow-support"
}
Schedule¶
Databricks workflows uses Quartz cron expression unlike airflow's unix based cron scheduler. A typical Quartz cron expression have six or seven fields, seperated by spaces
Below is a sampleTasksettings¶
Task setting at workflow level can be used to have common setting defined that will be applicable for all the tasks. Below is a sample that can be used for reference and all the parameters in TaskSettings are optional
from datetime import timedelta
from brickflow import TaskSettings, EmailNotifications
default_task_settings=TaskSettings(
email_notifications=EmailNotifications(
on_start=["email@nike.com"],
on_success=["email@nike.com"],
on_failure=["email@nike.com"],
on_duration_warning_threshold_exceeded=["email@nike.com"]
),
timeout_seconds=timedelta(hours=2).seconds,
max_retries=2,
min_retry_interval_millis=60000,
retry_on_timeout=True
)
Libraries¶
Brickflow allows to specify libraries that are need to be installed and used across different tasks. There are many ways to install library from different repositories/sources
from brickflow import PypiTaskLibrary, MavenTaskLibrary, StorageBasedTaskLibrary, \
JarTaskLibrary, EggTaskLibrary, WheelTaskLibrary
libraries=[
PypiTaskLibrary(package="requests"),
MavenTaskLibrary(coordinates="com.cronutils:cron-utils:9.2.0"),
StorageBasedTaskLibrary("s3://..."),
StorageBasedTaskLibrary("dbfs://..."),
JarTaskLibrary("s3://..."),
JarTaskLibrary("dbfs://..."),
EggTaskLibrary("s3://..."),
EggTaskLibrary("dbfs://..."),
WheelTaskLibrary("s3://..."),
WheelTaskLibrary("dbfs://..."),
]
Common task parameters¶
Define the common parameters that can be used in all the tasks. Example could be database name, secrets_id etc