Skip to content

Commit 9091900

Browse files
Add the missing PartitionGroupMetadataFetcher change
1 parent 19c3e2d commit 9091900

File tree

1 file changed

+18
-5
lines changed

1 file changed

+18
-5
lines changed

pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ private Boolean fetchSingleStream()
9999
private Boolean fetchMultipleStreams()
100100
throws Exception {
101101
int numStreams = _streamConfigs.size();
102+
int permanentTopicIndex = 0;
102103
for (int i = 0; i < numStreams; i++) {
103104
StreamConfig streamConfig = _streamConfigs.get(i);
104105
String topicName = streamConfig.getTopicName();
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
107108
+ topicName;
108109
StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
109110
int index = i;
111+
int finalPermanentTopicIndex = permanentTopicIndex;
112+
// For permanent topics, we use the index of the stream config to get the partition group consumption status.
113+
// For ephemeral backfill topics, we use the topic name to filter the partition group consumption status.
110114
List<PartitionGroupConsumptionStatus> topicPartitionGroupConsumptionStatusList =
111115
_partitionGroupConsumptionStatusList.stream()
112-
.filter(partitionGroupConsumptionStatus -> IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
113-
partitionGroupConsumptionStatus.getPartitionGroupId()) == index)
116+
.filter(partitionGroupConsumptionStatus -> _streamConfigs.get(index).isEphemeralBackfillTopic()
117+
? _streamConfigs.get(index).getTopicName().equals(partitionGroupConsumptionStatus.getTopicName())
118+
: IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
119+
partitionGroupConsumptionStatus.getPartitionGroupId()) == finalPermanentTopicIndex)
114120
.collect(Collectors.toList());
115121
try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory.createStreamMetadataProvider(
116122
StreamConsumerFactory.getUniqueClientId(clientId))) {
123+
// Similarly, for ephemeral backfill topics, we create the partition group metadata with the topic name.
117124
_newPartitionGroupMetadataList.addAll(
118125
streamMetadataProvider.computePartitionGroupMetadata(clientId,
119-
streamConfig, topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
126+
_streamConfigs.get(i), topicPartitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
120127
_forceGetOffsetFromStream)
121128
.stream()
122129
.map(metadata -> new PartitionGroupMetadata(
123-
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
124-
index), metadata.getStartOffset()))
130+
_streamConfigs.get(finalPermanentTopicIndex).isEphemeralBackfillTopic() ? _streamConfigs.get(index)
131+
.getTopicName() : "",
132+
_streamConfigs.get(finalPermanentTopicIndex).isEphemeralBackfillTopic()
133+
? metadata.getPartitionGroupId()
134+
: IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
135+
finalPermanentTopicIndex),
136+
metadata.getStartOffset()))
125137
.collect(Collectors.toList()));
126138
if (_exception != null) {
127139
// We had at least one failure, but succeeded now. Log an info
@@ -136,6 +148,7 @@ private Boolean fetchMultipleStreams()
136148
_exception = e;
137149
throw e;
138150
}
151+
permanentTopicIndex += _streamConfigs.get(i).isEphemeralBackfillTopic() ? 0 : 1;
139152
}
140153
return Boolean.TRUE;
141154
}

0 commit comments

Comments
 (0)