Skip to content

The Databricks task is not skpping in Databricks workflow tasks in side the databricks cluster when run with DatabricksWorkflowTaskGroup #47024

@anilreddaboina54

Description

@anilreddaboina54

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==7.0.0

Apache Airflow version

2.10.4

Operating System

PRETTY_NAME="Debian GNU/Linux 12 (bookworm)", NAME="Debian GNU/Linux"

Deployment

Other Docker-based deployment

Deployment details

We are building our own docker image with base image as apache/airflow:2.10.4-python3.10 and use the same for deployment.

What happened

In the DAG code, the “db_workflow” group type is “DatabricksWorkflowTaskGroup”
The “feed_transformation” task of type “spark_jar”. This task should run only when both “fact_feed_exists” and “dimension_feed_exists” are success.

As shown in the image below, the “feed_transformation” Databricks task is marked as “skipped” in the Airflow DAG, even though it is actually executing in Databricks.

Image

What you think should happen instead

The Databricks workflow should also mark the “feed_transformation” task as “skipped” and continue with the next tasks.
Both airflow DAG tasks and Databricks workflow tasks should be in sync.

How to reproduce

  • Create a DAG with DatabricksWorkflowTaskGroup
  • Add couple of tasks
  • Add non-databricks task group with PythonOperator
  • create a dependency with non-databricks task group with databricksWorkflow task group
  • The databricks tasks should have a trigger_rule as "All_SUCCESS"

below is the sample DAG

JOB_CLUSTER_KEY = "db_workflow_cluster"
DATABRICKS_CONNECTION_ID = "databricks_conn"
SPARK_LIBS = [{"jar": "/testpath/etl.jar"}]

job_clusters_spec = [
{
"job_cluster_key": JOB_CLUSTER_KEY,
"new_cluster": {
"spark_version": "14.3.x-scala2.12",
"node_type_id": "Standard_D3_v2",
"autoscale": {
"min_workers": 2,
"max_workers": 4
},
},
}
]

def build_transform_task(dag: DAG):
"""
This task runs only when the upstream is success for example this depends on the
fact and dimension feed check tasks, it runs only when dependency feeds are available
"""
return DatabricksTaskOperator(
dag=dag,
trigger_rule=TriggerRule.ALL_SUCCESS,
task_id="feed_transformation",
databricks_conn_id=DATABRICKS_CONNECTION_ID,
job_cluster_key=JOB_CLUSTER_KEY,
task_config={
"spark_jar_task": {
"main_class_name": "com.etl.loader.Validation",
},
"libraries": SPARK_LIBS,
},
)

def end(dag: DAG):
return EmptyOperator(dag=dag, task_id="end", trigger_rule=TriggerRule.ALL_SUCCESS)

def start(dag: DAG):
return EmptyOperator(dag=dag, task_id="start", trigger_rule=TriggerRule.ALL_SUCCESS)

def is_dimension_feed_exists(dag):
"""
Check for the dimension feed available of not, if not available mark this task as skipped
hence the upStram tasks also automatically skip in the Airflow DAG
"""
return PythonOperator(
dag=dag,
task_id="dimension_feed_exists",
python_callable=lambda: is_optional_feed_present("dimension")
)

def is_optional_feed_present(feed_name: str):
"""
This is a sample code to check if the feed is present or not
hard coded the feed_name to mimic the same behaviour
instead of sharing the original business logic
assume that both dimension and fact feeds are not present for this run.
"""
if feed_name == "other_feed":
return True
else:
raise AirflowSkipException("skipped task when the feed is not present")

def is_fact_feed_exists(dag):
"""
Check for the fact feed available of not, if not available mark this task as skipped
hence the upStram tasks also automatically skip in the Airflow DAG
"""
return PythonOperator(
dag=dag,
task_id="fact_feed_exists",
python_callable=lambda: is_optional_feed_present("fact")
)

def build_etl_task(dag):
return DatabricksTaskOperator(
dag=dag,
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
task_id="etl_task2",
databricks_conn_id=DATABRICKS_CONNECTION_ID,
job_cluster_key=JOB_CLUSTER_KEY,
task_config={
"spark_jar_task": {
"main_class_name": "com.etl.loader.sparkTask2",
},
"libraries": SPARK_LIBS,
},
)

dag = DAG(
dag_id="db_workflow_etl",
start_date=datetime(2022, 1, 1),
schedule=None,
catchup=False,
tags=["sanity", "etl", "databricks"],
)

with dag:
workflow_group = DatabricksWorkflowTaskGroup(
group_id="db_workflow",
databricks_conn_id="databricks_conn",
job_clusters=job_clusters_spec,
)

feed_group = TaskGroup(group_id="etl_feed_group")

with feed_group:
is_fact_exists_task = is_fact_feed_exists(dag)
is_dimension_exist_task = is_dimension_feed_exists(dag)

with workflow_group:
"""
All the Workflow tasks added here
"""
transform_task = build_transform_task(dag)
etl_general_task = build_etl_task(dag)

[is_fact_exists_task, is_dimension_exist_task] >> transform_task
transform_task >> etl_general_task
start(dag) >> feed_group
etl_general_task >> end(dag)

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions