Skip to content

Conversation

lnbest0707-uber
Copy link
Contributor

@lnbest0707-uber lnbest0707-uber commented Aug 4, 2025

real-time ingestion feature
Part 2 of #15782, but this is independent from part 1
Issue #14815
Design doc https://docs.google.com/document/d/1NKPeNh6V2ctaQ4T_X3OKJ6Gcy5TRanLiU1uIDT8_9UA/edit?usp=sharing

Introduce the new ephemeral type of topic in multi-topic ingestion. The main difference would be the segment name. For such type of topic, the segment name would be tableName__topicName__partitionGroupId__sequenceNumber__creationTime instead of tableName__partitionGroupId__sequenceNumber__creationTime.
This change is supposed to be no-op at the moment because such naming would not be applied to any ingestions. Afterwards, this would be used to:

  • [Short-term]Support skip interval backfill for offset auto reset during real-time ingestion lag (configurable).
  • [Mid/Long-term]Replace the existing multi-topic ingestion naming format. So that it would be easy to remove topics from multi-topic table.

Other changes:

  • Ensure each stream could use their own streamConfig instead of sharing the configs from the first streamConfig. Removing the streamConfigs.get(0) usage.
  • Does not allow duplicate stream (same stream type with same topic name)
  • [Tentative] Does not allow deleting topics. Deleting topics is not banned currently, but it is not a safe operation as segment metadata mapping would be messed.

E.g. when we have streamConfigMaps like

"streamConfigMaps": [
          {
            "realtime.segment.flush.threshold.rows": "0",
            "stream.kafka.decoder.prop.fieldsForClpEncoding": "message",
            "realtime.segment.flush.autotune.initialRows": "2000",
            "stream.kafka.decoder.prop.errorSamplingPeriod": "100000000",
            "stream.kafka.decoder.prop.unencodableFieldSuffix": "_noindex",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.clplog.CLPLogMessageDecoder",
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "realtime.segment.flush.threshold.segment.size": "200MB",
            "stream.kafka.broker.list": "<broker>",
            "realtime.segment.flush.threshold.time": "9000000",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "realtime.segment.offsetAutoReset.timeSecThreshold": "1800",
            "stream.kafka.topic.name": "topic1"
          },
          {
            "realtime.segment.flush.threshold.rows": "0",
            "stream.kafka.decoder.prop.fieldsForClpEncoding": "message",
            "realtime.segment.flush.autotune.initialRows": "2000",
            "stream.kafka.decoder.prop.errorSamplingPeriod": "100000000",
            "stream.kafka.decoder.prop.unencodableFieldSuffix": "_noindex",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.clplog.CLPLogMessageDecoder",
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "realtime.segment.flush.threshold.segment.size": "200MB",
            "stream.kafka.broker.list": "<broker>",
            "realtime.segment.flush.threshold.time": "9000000",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "realtime.segment.offsetAutoReset.timeSecThreshold": "1800",
            "stream.kafka.topic.name": "topic2"
          },
          {
            "realtime.segment.flush.threshold.rows": "0",
            "stream.kafka.decoder.prop.fieldsForClpEncoding": "message",
            "realtime.segment.isBackfillTopic": "true",
            "realtime.segment.flush.autotune.initialRows": "2000",
            "stream.kafka.decoder.prop.errorSamplingPeriod": "100000000",
            "stream.kafka.decoder.prop.unencodableFieldSuffix": "_noindex",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.clplog.CLPLogMessageDecoder",
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowlevel",
            "realtime.segment.flush.threshold.segment.size": "200MB",
            "stream.kafka.broker.list": "<broker>",
            "realtime.segment.flush.threshold.time": "9000000",
            "realtime.segment.offsetAutoReset.timeSecThreshold": "1800",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.topic.name": "topic1_backfill"
          }
]

With "realtime.segment.isBackfillTopic": "true", in one of the topic, then the segment name would be (assuming 2 partitions per topic)

table1__0__1__20250101T0000Z
table1__1__1__20250101T0001Z
table1__10000__1__20250101T0000Z
table1__10001__1__20250101T0001Z
table1__topic1_backfill__0__1__20250101T0010Z
table1__topic1_backfill__1__1__20250101T0010Z

Right now, we are not able to remove topic1 because topic2's corresponding PinotPartitionId would become 10000 -> 0, 10001- > 1, and inherit topic1's segment commit info.
But for this new type of topic, it is safe to remove topic1_backfill because it would not mess up topic1 and topic2's PinotPartitionId mapping.

@lnbest0707-uber lnbest0707-uber changed the title Introduce ephemeral topic type into multi-topic ingestion [Auto reset 2/3]Introduce ephemeral topic type into multi-topic ingestion Aug 5, 2025
@lnbest0707-uber
Copy link
Contributor Author

@Jackie-Jiang @noob-se7en @KKcorps please help review, thanks.

@lnbest0707-uber lnbest0707-uber force-pushed the upstream_fork_segmentname_2 branch 2 times, most recently from 9091900 to 6e3771a Compare August 15, 2025 17:00
@codecov-commenter
Copy link

codecov-commenter commented Aug 15, 2025

Codecov Report

❌ Patch coverage is 63.83929% with 81 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.34%. Comparing base (5b151f7) to head (0835b15).
⚠️ Report is 5 commits behind head on master.

Files with missing lines Patch % Lines
.../core/realtime/PinotLLCRealtimeSegmentManager.java 69.81% 27 Missing and 5 partials ⚠️
.../org/apache/pinot/common/utils/LLCSegmentName.java 59.52% 13 Missing and 4 partials ⚠️
...inot/spi/stream/PartitionGroupMetadataFetcher.java 0.00% 14 Missing ⚠️
...g/apache/pinot/spi/utils/IngestionConfigUtils.java 44.44% 3 Missing and 2 partials ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 33.33% 4 Missing ⚠️
...ot/spi/stream/PartitionGroupConsumptionStatus.java 57.14% 3 Missing ⚠️
.../config/table/ingestion/StreamIngestionConfig.java 33.33% 2 Missing ⚠️
...pache/pinot/spi/stream/PartitionGroupMetadata.java 75.00% 1 Missing and 1 partial ⚠️
...java/org/apache/pinot/spi/stream/StreamConfig.java 71.42% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16494      +/-   ##
============================================
- Coverage     63.34%   63.34%   -0.01%     
- Complexity     1379     1380       +1     
============================================
  Files          3027     3027              
  Lines        176446   176527      +81     
  Branches      27088    27101      +13     
============================================
+ Hits         111765   111816      +51     
- Misses        56115    56133      +18     
- Partials       8566     8578      +12     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
java-11 63.29% <63.83%> (-0.02%) ⬇️
java-21 63.29% <63.83%> (+6.88%) ⬆️
temurin 63.34% <63.83%> (-0.01%) ⬇️
unittests 63.33% <63.83%> (-0.01%) ⬇️
unittests1 56.41% <33.00%> (-0.07%) ⬇️
unittests2 33.17% <58.03%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

public int getPartitionGroupId() {
return _partitionGroupId;
}

public String getPartitionGroupInfo() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can cause confusion. Can you move the topic name check to caller side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change the function name to make it more intuitive

@lnbest0707-uber lnbest0707-uber force-pushed the upstream_fork_segmentname_2 branch from ff2b1ef to 0835b15 Compare August 20, 2025 16:20
@noob-se7en
Copy link
Contributor

@lnbest0707-uber can you please add problem and solution description just specific to this PR in the PR description.
If I understand correctly, this PR adds logic to consume from new topic type and this topic will have data which was skipped before by controller.

  1. If above is correct can you tell how is this topic created in real time in upstream.
  2. Also why we have to make changes to LLCSegmentName?

@lnbest0707-uber
Copy link
Contributor Author

lnbest0707-uber commented Aug 20, 2025

@lnbest0707-uber can you please add problem and solution description just specific to this PR in the PR description. If I understand correctly, this PR adds logic to consume from new topic type and this topic will have data which was skipped before by controller.

  1. If above is correct can you tell how is this topic created in real time in upstream.
  2. Also why we have to make changes to LLCSegmentName?

The PR description is exactly for this PR only. You can forget about the offset auto reset story. The PR is about multi-topic ingestion.
Previously, we uses the 10000 number to pad/map between stream partition Id to Pinot partition Id. (e.g. partition 1 of the second topic would have 10001 partition id in Pinot).
With this PR, we support a special type of topic in multi-topic ingestion. And the mapping between stream partition id and Pinot partition id would be <topic_name>__<stream_partitionId>.

Other benefits for this change are in the description.

@Jackie-Jiang Jackie-Jiang requested a review from Copilot August 21, 2025 23:08
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces support for ephemeral topic types in multi-topic ingestion as part of auto reset functionality. The main purpose is to enable different segment naming patterns for ephemeral backfill topics while maintaining compatibility with existing multi-topic ingestion.

Key changes include:

  • Introduction of ephemeral backfill topic configuration and handling
  • Modified segment naming to include topic names for ephemeral topics (format: tableName__topicName__partitionGroupId__sequenceNumber__creationTime)
  • Enhanced stream configuration handling to prevent duplicate topics and ensure proper configuration selection
  • Updated partition group metadata handling to support topic-aware operations

Reviewed Changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
IngestionConfigUtils.java Added ephemeral topic partition offset and stream config selection method
StreamConfigProperties.java Added ephemeral backfill topic configuration property
StreamConfig.java Added ephemeral topic flag field with getter method
PartitionGroupMetadataFetcher.java Enhanced fetching logic to handle ephemeral topics differently
PartitionGroupMetadata.java Added topic name field and partition group identifier methods
PartitionGroupConsumptionStatus.java Added topic name field to consumption status tracking
StreamIngestionConfig.java Added realtime offset auto reset handler class configuration
TableConfigUtils.java Added validation to prevent duplicate topic names in stream configs
LLCSegmentName.java Extended to support topic names in segment naming format
Various other files Updated to handle topic-aware partition group operations

@@ -431,6 +435,10 @@ public long getOffsetAutoResetTimeSecThreshold() {
return _offsetAutoResetTimeSecThreshold;
}

public Boolean isEphemeralBackfillTopic() {
return Boolean.TRUE.equals(_ephemeralBackfillTopic);
Copy link
Preview

Copilot AI Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return statement uses Boolean.TRUE.equals() which is unnecessarily complex. Since _ephemeralBackfillTopic is initialized to false when null, a simple null check and return would be clearer: return _ephemeralBackfillTopic != null && _ephemeralBackfillTopic;

Suggested change
return Boolean.TRUE.equals(_ephemeralBackfillTopic);
return _ephemeralBackfillTopic != null && _ephemeralBackfillTopic;

Copilot uses AI. Check for mistakes.

.map(String::trim)
.map(Integer::parseInt)
.collect(Collectors.toSet());
Set<String> targetSegments = allConsumingSegments.stream()
.filter(segmentName -> partitionsToCommit.contains(new LLCSegmentName(segmentName).getPartitionGroupId()))
.filter(
segmentName -> partitionsToCommit.contains(new LLCSegmentName(segmentName).getPartitionGroupTopicAndId()))
Copy link
Preview

Copilot AI Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filter is calling getPartitionGroupTopicAndId() but comparing against partitionsToCommit which contains Integer values parsed from comma-separated string. This will cause ClassCastException since String cannot be compared to Integer.

Suggested change
segmentName -> partitionsToCommit.contains(new LLCSegmentName(segmentName).getPartitionGroupTopicAndId()))
segmentName -> partitionsToCommit.contains(new LLCSegmentName(segmentName).getPartitionGroupId()))

Copilot uses AI. Check for mistakes.

}

/**
* Sets up a new partition group.
* <p>Persists the ZK metadata for the first CONSUMING segment, and returns the segment name.
*/
private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig streamConfig,
private String setupNewPartitionGroup(TableConfig tableConfig, List<StreamConfig> streamConfigs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this change. What does it mean to setup a new partition group on multiple StreamConfig? Which StreamConfig should we reference when setting it up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you check the usage of setupNewPartitionGroup, they were all doing setupNewPartitionGroup(xx, streamConfigs.get(0), xxx).
The function mainly relies on partitionGroupMetadata to setup new partition. And the streamConfig is only used for fetching the flush threshold parameters. And it is using the 1st config's instead of the corresponding's.
With this change, we would pass in the entire table config ('s streamConfigMaps). And the followup usages can pick the correct streamConfig by cross checking the partitionGroupMetadata and use their own flush threshold values.

@@ -2776,54 +2842,4 @@ public List<String> getCommittingSegments(String realtimeTableName) {
private List<String> getCommittingSegments(String realtimeTableName, Collection<String> segmentsToCheck) {
return getCommittingSegments(realtimeTableName, segmentsToCheck, _helixResourceManager::getSegmentZKMetadata);
}

public static List<String> getCommittingSegments(String realtimeTableName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several methods/comments are moved. What's the reason? Seems these methods fit more in their original position

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IDE moved the static methods before non-static ones. We do not have strict enforcement in Pinot styles. If preferred not to be change, I can move them back.

List<PartitionGroupConsumptionStatus> topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
.filter(partitionGroupConsumptionStatus -> IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
partitionGroupConsumptionStatus.getPartitionGroupId()) == index)
.filter(partitionGroupConsumptionStatus -> _streamConfigs.get(index).isEphemeralBackfillTopic()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this check to PinotLLCRealtimeSegmentManager.setupNewPartitionGroup()? Currently it is deeply nested, which makes it very hard to follow (I missed it several times, and in the end find this rewrite)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. This code is hard to follow. I've refactored it to completely split the permanent and ephemeral.
Also this class was with only simple functionality and without UT. I've also added extra UTs to test single/mutiple w/ and w/o ephemeral topics cases.

.filter(partitionGroupConsumptionStatus -> IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
partitionGroupConsumptionStatus.getPartitionGroupId()) == index)
.filter(partitionGroupConsumptionStatus -> _streamConfigs.get(index).isEphemeralBackfillTopic()
? _streamConfigs.get(index).getTopicName().equals(partitionGroupConsumptionStatus.getTopicName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code refactored, pls check again, thanks

@chenboat
Copy link
Contributor

Given most of this PR is related to segment name format change, plus the fact that LLC segment format is used and checked in many Pinot classes. Is it possible to continue the current format but instead of encode the topic name in the current format? Put topic names in the segment name also can result in segment name length explosion and related metadata increase.

One possible encoding scheme is to encode topic name, backfill id and partition id in the max allowed int 2,147,483,647 limit.
The first 4 digits are for topic, next 2 digit for backfill id and the next 4 digits for partition id. I think it should accommodate for nearly possible practical needs.

@lnbest0707-uber
Copy link
Contributor Author

One possible encoding scheme is to encode topic name, backfill id and partition id in the max allowed int 2,147,483,647 limit. The first 4 digits are for topic, next 2 digit for backfill id and the next 4 digits for partition id. I think it should accommodate for nearly possible practical needs.

I can think about 2 encoding methods but neither could fulfill the requirement

  • Use the sequence number as backfill or topic id -> It is hard to remove the topic.
  • Use the hardcode to map string to int id -> It is hard to avoid and deal with the collision.

@chenboat
Copy link
Contributor

One possible encoding scheme is to encode topic name, backfill id and partition id in the max allowed int 2,147,483,647 limit. The first 4 digits are for topic, next 2 digit for backfill id and the next 4 digits for partition id. I think it should accommodate for nearly possible practical needs.

I can think about 2 encoding methods but neither could fulfill the requirement

  • Use the sequence number as backfill or topic id -> It is hard to remove the topic.

The backfill topic id is encoded in the partitioned id today. E.g., 11010005 means the 5th partition of the 11th topic (in the topic map) and it is a backfill topic (so 01). This is the way we can detect a backfill topic.

  • Use the hardcode to map string to int id -> It is hard to avoid and deal with the collision.
    Today in multi-topic, we use the index of the topic in the streamConfig map to map a topic to a int. The above proposal uses the same idea. Any issue with continuing today's practice.

@lnbest0707-uber
Copy link
Contributor Author

E.g., 11010005 means the 5th partition of the 11th topic (in the topic map) and it is a backfill topic (so 01). This is the way we can detect a backfill topic.

With this naming format, if we remove a topic, it might affect other topic's pinot partition naming format.
Let's say there's we have 11th topic A as backfill topic, 1101000x, 12th topic B also backfill topic, 1201000x. Then when you remove topicA, how should we naming the topic B. With the naming format, it would become 1101000x, which would be messed up.

Today in multi-topic, we use the index of the topic in the streamConfig map to map a topic to a int. The above proposal uses the same idea. Any issue with continuing today's practice.

As above example, the issue is about removing topics. This index mapping makes topic partition ID rely on its position. And when other topics got removed, the position would change.

@lnbest0707-uber
Copy link
Contributor Author

Added an alternative solution in #16692
Will let the community to vote for the solution.

@lnbest0707-uber
Copy link
Contributor Author

Abandon this one as the other option in #16692 is preferred.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants