Skip to content

XCom unable to parse tuple response from DatabricksSQLOperator on SQL query execution #39448

@KanikaAdik

Description

@KanikaAdik

Apache Airflow Provider(s)

databricks

Versions of Apache Airflow Providers

apache-airflow-providers-databricks==6.3.0

Apache Airflow version

2.7.3

Operating System

(airflow)cat /etc/os-release PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" (airflow)

Deployment

Other Docker-based deployment

Deployment details

No response

What happened

I am trying to execute a few queries using DatabricksSQLOperator in an airflow dag. Seems like the output received from the Operator cannot be handled by Xcom.
Hence receiving error log -
[2024-05-06T20:17:44.253+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
[2024-05-06T20:17:44.254+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.class.name} '
TypeError: Object of type tuple is not JSON serializable
[2024-05-06T20:17:44.259+0000] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=ct_sql_xcom, task_id=select_data, execution_date=20240506T170918, start_date=20240506T201741, end_date=20240506T201744
[2024-05-06T20:17:44.266+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 238 for task select_data (Object of type tuple is not JSON serializable; 1850)

What you think should happen instead

Xcom should be able to handle the JSON serialization or have set a standard with DatabricksSQL providers on acceptable response type to handle any generic case.

How to reproduce

  1. create a DatabricksSQLOperator airflow dag
  2. set the do_xcom_push=True
  3. set a separate task to parse and use values sql query result

`from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.databricks.operators.databricks_sql import DatabricksSqlOperator
from airflow.models import Variable

env = Variable.get('AIRFLOW_VAR_ENV_NAME')

with DAG('ct_sql_xcom',
start_date = datetime(2024, 1, 30),
schedule_interval = None
) as dag:

create_file = DatabricksSqlOperator(
    databricks_conn_id='databricks',
    sql_endpoint_name='Serverless',
    task_id="create_and_populate_from_file",
    sql="select table_name from system.information_schema.tables where table_catalog = 'rsg_prod'",
    do_xcom_push=True,
)

def downstream_task(**kwargs):
    result = kwargs['task_instance'].xcom_pull(task_ids='create_file')
    print("Received result from XCom:", result)

create_file >> downstream_task
`

Anything else

`[2024-05-06T19:13:57.614+0000] {xcom.py:661} ERROR - Object of type tuple is not JSON serializable. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config or make sure to decorate your object with attr.
[2024-05-06T19:13:57.615+0000] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 91, in default
return serialize(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 145, in serialize
return encode(classname, version, serialize(data, depth + 1))
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in serialize
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 124, in
return [serialize(d, depth + 1) for d in o]
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/serialization/serde.py", line 178, in serialize
raise TypeError(f"cannot serialize object of type {cls}")
TypeError: cannot serialize object of type <class '***.providers.databricks.hooks.databricks_sql.Row'>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 2479, in xcom_push
XCom.set(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 244, in set
value = cls.serialize_value(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/xcom.py", line 659, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/usr/local/lib/python3.8/json/init.py", line 234, in dumps
return cls(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 102, in encode
o = self.default(o)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/json.py", line 93, in default
return super().default(o)
File "/usr/local/lib/python3.8/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.class.name} '
TypeError: Object of type tuple is not JSON serializable
[2024-05-06T19:13:57.620+0000] {taskinstance.py:1400} INFO - Marking task as UP_FOR_RETRY. dag_id=ct_sql_xcom, task_id=create_file , execution_date=20240506T170918, start_date=20240506T191355, end_date=20240506T191357
[2024-05-06T19:13:57.627+0000] {standard_task_runner.py:104} ERROR - Failed to execute job 233 for task create_file (Object of type tuple is not JSON serializable; 842)`

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions