-
Notifications
You must be signed in to change notification settings - Fork 15.8k
Open
Labels
area:providersgood first issuekind:bugThis is a clearly a bugThis is a clearly a bugprovider:googleGoogle (including GCP) related issuesGoogle (including GCP) related issues
Description
Apache Airflow version
3.0.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
There is a issue within the airflow/providers/google/cloud/hooks/datafusion.py concerning how it interacts with CDAP's Lifecycle Microservices for starting programs.
- Problem: The Airflow hook currently uses the
POST /v3/namespaces/<namespace-id>/start
endpoint (documented here: https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs) to start a single program. While this endpoint returns an overall HTTP200 OK
status code even if some individual programs fail, the Airflow code (specifically aroundself._check_response_status_and_data( statusCode
field within the response body. - Impact: When there is some problem in lets say runtime args the response body will have the error code and no runId. This leads to the failure on the next line where the code is trying to extract the run ID.
Full Error
[2025-05-05T12:19:53.950+0000] {taskinstance.py:2907} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/datafusion.py", line 825, in execute
pipeline_id = hook.start_pipeline(
^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/datafusion.py", line 500, in start_pipeline
return response_json[0]["runId"]
~~~~~~~~~~~~~~~~^^^^^^^^^
KeyError: 'runId'
What you think should happen instead?
There are two issues within the airflow/providers/google/cloud/hooks/datafusion.py concerning how it interacts with CDAP's Lifecycle Microservices for starting programs.
1. Insufficient Status Code Checking
- Problem: The Airflow hook currently uses the
POST /v3/namespaces/<namespace-id>/start
endpoint (documented here: https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs) to start a single program. While this endpoint returns an overall HTTP200 OK
status code even if some individual programs fail, the Airflow code (specifically aroundself._check_response_status_and_data( statusCode
field within the response body. - Proposed Fix: The hook should be updated to parse the JSON response body and check the
statusCode
for each program entry to ensure all intended programs started successfully.
2. [Preferred Approach] Suboptimal API Usage for Single Program Starts
- Problem: When starting a single program, the Airflow hook (around
def start_pipeline( POST /v3/namespaces/<namespace-id>/start
endpoint, which is designed for starting multiple programs. - Recommendation: It is highly recommended to use the dedicated
POST /v3/namespaces/<namespace-id>/apps/<app-id>/<program-type>/<program-id>/start
endpoint for starting a single program (documented here: https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program). - Benefit: Using the specific single-program endpoint can provide clearer and more direct error handling for individual program starts, potentially reducing complexity in parsing multi-program responses for a single operation.
How to reproduce
Trigger the CDF start pipeline with wrong runtime args or incorrect program.
Operating System
I am not sure how is that related to the issue. NA
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Metadata
Assignees
Labels
area:providersgood first issuekind:bugThis is a clearly a bugThis is a clearly a bugprovider:googleGoogle (including GCP) related issuesGoogle (including GCP) related issues