Skip to content

Commit 86a71ca

Browse files
authored
Another attempt to fix timestamp logical type. (#35426)
* Another fix for jdbc timestamp logical type. - Used schema registry directly if it exists. - Fixed a bug on schema registry. - Triggered post commit tests for jdbcio. - Fixed failed tests and trigger dataflow test on xlang. - Fix schema test by adding field description in named_fields_to_schema * Apply yapf. * Revert "Fix post commit xlang jdbcio test failure (#35428)" This reverts commit 6ac5009. * Enforce MilliInstant as the logical type for Timestamp in ReadFromJDBC. * Fix a failed test on DECIMAL field.
1 parent 9b50ef8 commit 86a71ca

File tree

7 files changed

+38
-24
lines changed

7 files changed

+38
-24
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 2
3+
"modification": 4
44
}
55

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 9
3+
"modification": 11
44
}

sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
from apache_beam.testing.test_pipeline import TestPipeline
3737
from apache_beam.testing.util import assert_that
3838
from apache_beam.testing.util import equal_to
39-
from apache_beam.typehints.schemas import LogicalType
40-
from apache_beam.typehints.schemas import MillisInstant
4139
from apache_beam.utils.timestamp import Timestamp
4240

4341
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -242,10 +240,6 @@ def test_xlang_jdbc_write_read(self, database):
242240

243241
config = self.jdbc_configs[database]
244242

245-
# Register MillisInstant logical type to override the mapping from Timestamp
246-
# originally handled by MicrosInstant.
247-
LogicalType.register_logical_type(MillisInstant)
248-
249243
with TestPipeline() as p:
250244
p.not_use_test_runner_api = True
251245
_ = (
@@ -356,10 +350,6 @@ def custom_row_equals(expected, actual):
356350
classpath=config['classpath'],
357351
))
358352

359-
# Register MillisInstant logical type to override the mapping from Timestamp
360-
# originally handled by MicrosInstant.
361-
LogicalType.register_logical_type(MillisInstant)
362-
363353
# Run read pipeline with custom schema
364354
with TestPipeline() as p:
365355
p.not_use_test_runner_api = True

sdks/python/apache_beam/io/jdbc.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686

8787
# pytype: skip-file
8888

89+
import contextlib
8990
import datetime
9091
import typing
9192

@@ -257,6 +258,17 @@ def __init__(
257258
)
258259

259260

261+
@contextlib.contextmanager
262+
def enforce_millis_instant_for_timestamp():
263+
old_registry = LogicalType._known_logical_types
264+
LogicalType._known_logical_types = old_registry.copy()
265+
try:
266+
LogicalType.register_logical_type(MillisInstant)
267+
yield
268+
finally:
269+
LogicalType._known_logical_types = old_registry
270+
271+
260272
class ReadFromJdbc(ExternalTransform):
261273
"""A PTransform which reads Rows from the specified database via JDBC.
262274
@@ -352,8 +364,9 @@ def __init__(
352364

353365
dataSchema = None
354366
if schema is not None:
355-
# Convert Python schema to Beam Schema proto
356-
schema_proto = typing_to_runner_api(schema).row_type.schema
367+
with enforce_millis_instant_for_timestamp():
368+
# Convert Python schema to Beam Schema proto
369+
schema_proto = typing_to_runner_api(schema).row_type.schema
357370
# Serialize the proto to bytes for transmission
358371
dataSchema = schema_proto.SerializeToString()
359372

sdks/python/apache_beam/typehints/row_type.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ def __init__(
228228
schema_id=schema_id,
229229
schema_options=schema_options,
230230
field_options=field_options,
231+
field_descriptions=field_descriptions,
231232
**kwargs)
232233
user_type = named_tuple_from_schema(schema, **kwargs)
233234

sdks/python/apache_beam/typehints/schema_registry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def generate_new_id(self):
4040
"schemas.")
4141

4242
def add(self, typing, schema):
43-
if not schema.id:
43+
if schema.id:
4444
self.by_id[schema.id] = (typing, schema)
4545

4646
def get_typing_by_id(self, unique_id):

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,11 @@ def named_fields_to_schema(
142142
schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
143143
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
144144
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY,
145+
field_descriptions: Optional[Dict[str, str]] = None,
145146
):
146147
schema_options = schema_options or []
147148
field_options = field_options or {}
149+
field_descriptions = field_descriptions or {}
148150

149151
if isinstance(names_and_types, dict):
150152
names_and_types = names_and_types.items()
@@ -158,7 +160,8 @@ def named_fields_to_schema(
158160
option_to_runner_api(option_tuple)
159161
for option_tuple in field_options.get(name, [])
160162
],
161-
) for (name, type) in names_and_types
163+
description=field_descriptions.get(name, None))
164+
for (name, type) in names_and_types
162165
],
163166
options=[
164167
option_to_runner_api(option_tuple) for option_tuple in schema_options
@@ -616,6 +619,13 @@ def schema_from_element_type(element_type: type) -> schema_pb2.Schema:
616619
if isinstance(element_type, row_type.RowTypeConstraint):
617620
return named_fields_to_schema(element_type._fields)
618621
elif match_is_named_tuple(element_type):
622+
if hasattr(element_type, row_type._BEAM_SCHEMA_ID):
623+
# if the named tuple's schema is in registry, we just use it instead of
624+
# regenerating one.
625+
schema_id = getattr(element_type, row_type._BEAM_SCHEMA_ID)
626+
schema = SCHEMA_REGISTRY.get_schema_by_id(schema_id)
627+
if schema is not None:
628+
return schema
619629
return named_tuple_to_schema(element_type)
620630
else:
621631
raise TypeError(
@@ -1017,15 +1027,15 @@ def representation_type(cls):
10171027
def language_type(cls):
10181028
return decimal.Decimal
10191029

1020-
def to_representation_type(self, value):
1021-
# type: (decimal.Decimal) -> bytes
1022-
1023-
return DecimalLogicalType().to_representation_type(value)
1024-
1025-
def to_language_type(self, value):
1026-
# type: (bytes) -> decimal.Decimal
1030+
# from language type (decimal.Decimal) to representation type
1031+
# (the type corresponding to the coder used in DecimalLogicalType)
1032+
def to_representation_type(self, value: decimal.Decimal) -> decimal.Decimal:
1033+
return value
10271034

1028-
return DecimalLogicalType().to_language_type(value)
1035+
# from representation type (the type corresponding to the coder used in
1036+
# DecimalLogicalType) to language type
1037+
def to_language_type(self, value: decimal.Decimal) -> decimal.Decimal:
1038+
return value
10291039

10301040
@classmethod
10311041
def argument_type(cls):

0 commit comments

Comments
 (0)