@@ -99,6 +99,7 @@ private Boolean fetchSingleStream()
9999 private Boolean fetchMultipleStreams ()
100100 throws Exception {
101101 int numStreams = _streamConfigs .size ();
102+ int permanentTopicIndex = 0 ;
102103 for (int i = 0 ; i < numStreams ; i ++) {
103104 StreamConfig streamConfig = _streamConfigs .get (i );
104105 String topicName = streamConfig .getTopicName ();
@@ -107,21 +108,32 @@ private Boolean fetchMultipleStreams()
107108 + topicName ;
108109 StreamConsumerFactory streamConsumerFactory = StreamConsumerFactoryProvider .create (streamConfig );
109110 int index = i ;
111+ int finalPermanentTopicIndex = permanentTopicIndex ;
112+ // For permanent topics, we use the index of the stream config to get the partition group consumption status.
113+ // For ephemeral backfill topics, we use the topic name to filter the partition group consumption status.
110114 List <PartitionGroupConsumptionStatus > topicPartitionGroupConsumptionStatusList =
111115 _partitionGroupConsumptionStatusList .stream ()
112- .filter (partitionGroupConsumptionStatus -> IngestionConfigUtils .getStreamConfigIndexFromPinotPartitionId (
113- partitionGroupConsumptionStatus .getPartitionGroupId ()) == index )
116+ .filter (partitionGroupConsumptionStatus -> _streamConfigs .get (index ).isEphemeralBackfillTopic ()
117+ ? _streamConfigs .get (index ).getTopicName ().equals (partitionGroupConsumptionStatus .getTopicName ())
118+ : IngestionConfigUtils .getStreamConfigIndexFromPinotPartitionId (
119+ partitionGroupConsumptionStatus .getPartitionGroupId ()) == finalPermanentTopicIndex )
114120 .collect (Collectors .toList ());
115121 try (StreamMetadataProvider streamMetadataProvider = streamConsumerFactory .createStreamMetadataProvider (
116122 StreamConsumerFactory .getUniqueClientId (clientId ))) {
123+ // Similarly, for ephemeral backfill topics, we create the partition group metadata with the topic name.
117124 _newPartitionGroupMetadataList .addAll (
118125 streamMetadataProvider .computePartitionGroupMetadata (clientId ,
119- streamConfig , topicPartitionGroupConsumptionStatusList , /*maxWaitTimeMs=*/ 15000 ,
126+ _streamConfigs . get ( i ) , topicPartitionGroupConsumptionStatusList , /*maxWaitTimeMs=*/ 15000 ,
120127 _forceGetOffsetFromStream )
121128 .stream ()
122129 .map (metadata -> new PartitionGroupMetadata (
123- IngestionConfigUtils .getPinotPartitionIdFromStreamPartitionId (metadata .getPartitionGroupId (),
124- index ), metadata .getStartOffset ()))
130+ _streamConfigs .get (finalPermanentTopicIndex ).isEphemeralBackfillTopic () ? _streamConfigs .get (index )
131+ .getTopicName () : "" ,
132+ _streamConfigs .get (finalPermanentTopicIndex ).isEphemeralBackfillTopic ()
133+ ? metadata .getPartitionGroupId ()
134+ : IngestionConfigUtils .getPinotPartitionIdFromStreamPartitionId (metadata .getPartitionGroupId (),
135+ finalPermanentTopicIndex ),
136+ metadata .getStartOffset ()))
125137 .collect (Collectors .toList ()));
126138 if (_exception != null ) {
127139 // We had at least one failure, but succeeded now. Log an info
@@ -136,6 +148,7 @@ private Boolean fetchMultipleStreams()
136148 _exception = e ;
137149 throw e ;
138150 }
151+ permanentTopicIndex += _streamConfigs .get (i ).isEphemeralBackfillTopic () ? 0 : 1 ;
139152 }
140153 return Boolean .TRUE ;
141154 }
0 commit comments