Skip to content

Commit 25b42e9

Browse files
committed
feat: add support for data source messages in FlexConnect
Add two utility functions to augment metadata and schemas with DataSourceMessages. Also, opt-in into ruff formatting code samples in docstrings. JIRA: CQ-1301 risk: low
1 parent 3d4cd5a commit 25b42e9

File tree

4 files changed

+172
-0
lines changed

4 files changed

+172
-0
lines changed

gooddata-flexconnect/gooddata_flexconnect/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# (C) 2024 GoodData Corporation
22

3+
from gooddata_flexconnect.function.data_source_messages import (
4+
DataSourceMessage,
5+
add_data_source_messages_metadata,
6+
with_data_source_messages,
7+
)
38
from gooddata_flexconnect.function.execution_context import (
49
ExecutionContext,
510
ExecutionContextAbsoluteDateFilter,
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# (C) 2025 GoodData Corporation
2+
from collections.abc import Iterable
3+
from dataclasses import dataclass
4+
from typing import Any, Optional
5+
6+
import orjson
7+
import pyarrow
8+
9+
_DATA_SOURCE_MESSAGES_KEY = b"x-gdc-data-source-messages"
10+
"""
11+
Key used in the PyArrow metadata to carry the data source messages.
12+
"""
13+
14+
15+
@dataclass(frozen=True)
16+
class DataSourceMessage:
17+
"""
18+
A message sent by the data source. This will be included in the execution results' metadata.
19+
"""
20+
21+
source: str
22+
"""
23+
Identification of the source of the message, typically the id or other identifier of the data source.
24+
"""
25+
26+
correlation_id: str
27+
"""
28+
Unique id of the operation, meant to correlate different messages together.
29+
"""
30+
31+
type: str
32+
"""
33+
Type of the message, currently free-form, we might define some enum for these in the future.
34+
"""
35+
36+
data: Optional[Any] = None
37+
"""
38+
Optional message-specific data. This can be anything that can be JSON-serialized.
39+
Try to keep this as small as possible: the backend has a quite strict size limit on the messages.
40+
"""
41+
42+
def to_dict(self) -> dict[str, Any]:
43+
"""
44+
Converts this instance to its dictionary representation.
45+
"""
46+
return {"source": self.source, "correlation_id": self.correlation_id, "type": self.type, "data": self.data}
47+
48+
49+
def add_data_source_messages_metadata(
50+
data_source_messages: Iterable[DataSourceMessage], original_metadata: Optional[dict] = None
51+
) -> dict[bytes, bytes]:
52+
"""
53+
Given a list of DataSourceMessages, creates a PyArrow-compatible metadata dictionary.
54+
55+
This is useful when creating a PyArrow Table directly:
56+
57+
>>> t = pyarrow.table(
58+
... [1, 2, 3],
59+
... ["test"],
60+
... metadata=add_data_source_messages_metadata(
61+
... [DataSourceMessage(correlation_id="123", source="test_source", type="info")]
62+
... ),
63+
... )
64+
65+
You can also add other metadata as needed:
66+
>>> t2 = pyarrow.table(
67+
... [1, 2, 3],
68+
... ["test"],
69+
... metadata=add_data_source_messages_metadata(
70+
... [DataSourceMessage(correlation_id="123", source="test_source", type="info")],
71+
... original_metadata={b"some": b"extra metadata"},
72+
... ),
73+
... )
74+
75+
:param data_source_messages: list of DataSourceMessages to include
76+
:param original_metadata: original medata to add the DataSourceMessages to.
77+
:return: a new metadata dictionary
78+
"""
79+
if original_metadata is None:
80+
original_metadata = {}
81+
return {**original_metadata, _DATA_SOURCE_MESSAGES_KEY: orjson.dumps(data_source_messages)}
82+
83+
84+
def with_data_source_messages(
85+
data_source_messages: Iterable[DataSourceMessage], schema: pyarrow.Schema
86+
) -> pyarrow.Schema:
87+
"""
88+
Given a schema and a list of DataSourceMessages, returns a new schema with the DataSourceMessages included.
89+
90+
This is useful when creating PyArrow RecordBatchReaders:
91+
92+
>>> s = with_data_source_messages(
93+
... [DataSourceMessage(correlation_id="123", source="test_source", type="info")],
94+
... pyarrow.schema(...), # the original schema for your RecordBatchReader
95+
... )
96+
>>> rbr = pyarrow.RecordBatchReader.from_batches(schema=s, batches=...)
97+
98+
:param schema: the schema to enrich with the DataSourceMessages
99+
:param data_source_messages: list of DataSourceMessages to include
100+
"""
101+
return schema.with_metadata(add_data_source_messages_metadata(data_source_messages, schema.metadata))
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# (C) 2025 GoodData Corporation
2+
import orjson
3+
import pyarrow
4+
import pytest
5+
from gooddata_flexconnect.function.data_source_messages import (
6+
_DATA_SOURCE_MESSAGES_KEY,
7+
DataSourceMessage,
8+
add_data_source_messages_metadata,
9+
with_data_source_messages,
10+
)
11+
12+
13+
@pytest.fixture(scope="module")
14+
def sample_schema():
15+
return pyarrow.schema(
16+
[
17+
pyarrow.field("name", pyarrow.string()),
18+
pyarrow.field("height", pyarrow.int32()),
19+
]
20+
)
21+
22+
23+
@pytest.fixture(scope="module")
24+
def sample_data_source_message():
25+
return DataSourceMessage(correlation_id="123", source="test_source", type="test")
26+
27+
28+
def test_with_data_source_messages_empty_original(sample_schema, sample_data_source_message):
29+
augmented_schema = with_data_source_messages([sample_data_source_message], sample_schema)
30+
31+
# with_data_source_messages creates a copy, the original schema must stay intact
32+
assert sample_schema.metadata is None
33+
34+
assert augmented_schema.names == sample_schema.names
35+
assert augmented_schema.metadata is not None
36+
assert orjson.loads(augmented_schema.metadata[_DATA_SOURCE_MESSAGES_KEY]) == [sample_data_source_message.to_dict()]
37+
38+
39+
def test_with_data_source_messages_non_empty_original(sample_schema, sample_data_source_message):
40+
sample_schema_with_some_metadata = sample_schema.with_metadata({b"foo": b"bar"})
41+
42+
augmented_schema = with_data_source_messages([sample_data_source_message], sample_schema_with_some_metadata)
43+
44+
# with_data_source_messages creates a copy, the original schema must stay intact
45+
assert sample_schema_with_some_metadata.metadata == {b"foo": b"bar"}
46+
47+
# ensure the original metadata is still there
48+
assert augmented_schema.names == sample_schema.names
49+
assert augmented_schema.metadata is not None
50+
assert augmented_schema.metadata[b"foo"] == b"bar"
51+
assert orjson.loads(augmented_schema.metadata[_DATA_SOURCE_MESSAGES_KEY]) == [sample_data_source_message.to_dict()]
52+
53+
54+
def test_add_data_source_messages_metadata_empty_original(sample_data_source_message):
55+
metadata = add_data_source_messages_metadata(data_source_messages=[sample_data_source_message])
56+
assert orjson.loads(metadata[_DATA_SOURCE_MESSAGES_KEY]) == [sample_data_source_message.to_dict()]
57+
58+
59+
def test_add_data_source_messages_metadata_non_empty_original(sample_data_source_message):
60+
metadata = add_data_source_messages_metadata(
61+
data_source_messages=[sample_data_source_message], original_metadata={b"foo": b"bar"}
62+
)
63+
assert orjson.loads(metadata[_DATA_SOURCE_MESSAGES_KEY]) == [sample_data_source_message.to_dict()]
64+
assert metadata[b"foo"] == b"bar"

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ target-version = "py39"
4444

4545
[tool.ruff.format]
4646
exclude = ['(gooddata-api-client|.*\.snapshot\..*)']
47+
# use ruff to format code examples in docstrings as well
48+
docstring-code-format = true
4749

4850
[tool.ruff.lint.pydocstyle]
4951
convention = "google"

0 commit comments

Comments
 (0)