Skip to content

Commit def0355

Browse files
committed
Handle watermarks in branching
1 parent 10da6e3 commit def0355

File tree

2 files changed

+74
-9
lines changed

2 files changed

+74
-9
lines changed

quixstreams/core/stream/functions/base.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,15 +54,21 @@ def wrapper(
5454
headers: Any,
5555
is_watermark: bool = False,
5656
):
57-
# TODO: Handle a watermark in branched operations
58-
first_branch_executor, *branch_executors = child_executors
59-
copier = pickle_copier(value)
60-
61-
# Pass the original value to the first branch to reduce copying
62-
first_branch_executor(value, key, timestamp, headers)
63-
# Copy the value for the rest of the branches
64-
for branch_executor in branch_executors:
65-
branch_executor(copier(), key, timestamp, headers)
57+
if is_watermark:
58+
# Watermarks should not be mutated, so no need to copy
59+
# Pass the watermark to all branches
60+
for branch_executor in child_executors:
61+
branch_executor(value, key, timestamp, headers, True)
62+
else:
63+
# Regular data: copy for each branch to prevent mutation issues
64+
first_branch_executor, *branch_executors = child_executors
65+
copier = pickle_copier(value)
66+
67+
# Pass the original value to the first branch to reduce copying
68+
first_branch_executor(value, key, timestamp, headers)
69+
# Copy the value for the rest of the branches
70+
for branch_executor in branch_executors:
71+
branch_executor(copier(), key, timestamp, headers)
6672

6773
return wrapper
6874

tests/test_quixstreams/test_core/test_stream.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,65 @@ def wrapper(value, k, t, h):
571571
# each operation is only called once (no redundant processing)
572572
assert sink == expected
573573

574+
def test_watermark_in_branching(self):
575+
"""
576+
Test that watermarks are properly propagated through all branches.
577+
Each branch should receive the watermark.
578+
"""
579+
watermark_calls = []
580+
581+
def track_watermark(branch_id):
582+
def on_watermark(value, key, timestamp, headers):
583+
watermark_calls.append((branch_id, timestamp))
584+
yield value, key, timestamp, headers
585+
586+
return on_watermark
587+
588+
# Create a branching topology with watermark tracking
589+
stream = Stream()
590+
stream.add_transform(
591+
lambda v, k, t, h: [(v + 1, k, t, h)],
592+
expand=True,
593+
on_watermark=track_watermark("branch1"),
594+
)
595+
stream.add_transform(
596+
lambda v, k, t, h: [(v + 2, k, t, h)],
597+
expand=True,
598+
on_watermark=track_watermark("branch2"),
599+
)
600+
stream = stream.add_transform(
601+
lambda v, k, t, h: [(v + 3, k, t, h)],
602+
expand=True,
603+
on_watermark=track_watermark("main"),
604+
)
605+
606+
sink = Sink()
607+
key, timestamp, headers = "key", 1000, []
608+
609+
# Compose and execute with a regular message
610+
executor = stream.compose_single(sink=sink.append_record)
611+
executor(0, key, timestamp, headers, is_watermark=False)
612+
613+
# Verify regular message was processed
614+
expected_data = [
615+
(1, key, timestamp, headers),
616+
(2, key, timestamp, headers),
617+
(3, key, timestamp, headers),
618+
]
619+
assert sink == expected_data
620+
621+
# Clear the sink and send a watermark
622+
sink.clear()
623+
watermark_calls.clear()
624+
watermark_timestamp = 2000
625+
executor(None, None, watermark_timestamp, [], is_watermark=True)
626+
627+
# Verify watermark was received by all branches
628+
assert len(watermark_calls) == 3
629+
assert ("branch1", watermark_timestamp) in watermark_calls
630+
assert ("branch2", watermark_timestamp) in watermark_calls
631+
assert ("main", watermark_timestamp) in watermark_calls
632+
574633

575634
class TestStreamMerge:
576635
def test_merge_different_streams_success(self):

0 commit comments

Comments
 (0)