Skip to content

Commit 80bff11

Browse files
author
Xin Gao
committed
Introduce ephemeral topic type into multi-topic ingestion
1 parent 3d44c91 commit 80bff11

File tree

14 files changed

+482
-250
lines changed

14 files changed

+482
-250
lines changed

pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030

3131
public class LLCSegmentName implements Comparable<LLCSegmentName> {
32-
private static final String SEPARATOR = "__";
32+
public static final String SEPARATOR = "__";
3333
private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
3434
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
3535

@@ -38,25 +38,62 @@ public class LLCSegmentName implements Comparable<LLCSegmentName> {
3838
private final int _sequenceNumber;
3939
private final String _creationTime;
4040
private final String _segmentName;
41+
private final String _topicName;
4142

4243
public LLCSegmentName(String segmentName) {
4344
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
44-
Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: %s", segmentName);
45+
// Validate the segment name format should have 4 or 5 parts:
46+
// e.g. tableName__partitionGroupId__sequenceNumber__creationTime
47+
// or tableName__topicName__partitionGroupId__sequenceNumber__creationTime
48+
Preconditions.checkArgument(
49+
parts.length >= 4 && parts.length <= 5, "Invalid LLC segment name: %s", segmentName);
4550
_tableName = parts[0];
46-
_partitionGroupId = Integer.parseInt(parts[1]);
47-
_sequenceNumber = Integer.parseInt(parts[2]);
48-
_creationTime = parts[3];
51+
if (parts.length == 4) {
52+
_topicName = "";
53+
_partitionGroupId = Integer.parseInt(parts[1]);
54+
_sequenceNumber = Integer.parseInt(parts[2]);
55+
_creationTime = parts[3];
56+
} else {
57+
_topicName = parts[1];
58+
_partitionGroupId = Integer.parseInt(parts[2]);
59+
_sequenceNumber = Integer.parseInt(parts[3]);
60+
_creationTime = parts[4];
61+
}
4962
_segmentName = segmentName;
5063
}
5164

5265
public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) {
66+
this(tableName, "", partitionGroupId, sequenceNumber, msSinceEpoch);
67+
}
68+
69+
public LLCSegmentName(
70+
String tableName, String topicName, int partitionGroupId, int sequenceNumber, long msSinceEpoch) {
5371
Preconditions.checkArgument(!tableName.contains(SEPARATOR), "Illegal table name: %s", tableName);
72+
Preconditions.checkArgument(topicName == null || !topicName.contains(SEPARATOR),
73+
"Illegal topic name: %s", tableName);
5474
_tableName = tableName;
75+
_topicName = topicName;
5576
_partitionGroupId = partitionGroupId;
5677
_sequenceNumber = sequenceNumber;
5778
// ISO8601 date: 20160120T1234Z
5879
_creationTime = DATE_FORMATTER.print(msSinceEpoch);
59-
_segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
80+
if ("".equals(topicName)) {
81+
_segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR + _creationTime;
82+
} else {
83+
_segmentName =
84+
tableName + SEPARATOR + topicName + SEPARATOR + partitionGroupId + SEPARATOR + sequenceNumber + SEPARATOR
85+
+ _creationTime;
86+
}
87+
}
88+
89+
private LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber, String creationTime,
90+
String segmentName) {
91+
_tableName = tableName;
92+
_topicName = "";
93+
_partitionGroupId = partitionGroupId;
94+
_sequenceNumber = sequenceNumber;
95+
_creationTime = creationTime;
96+
_segmentName = segmentName;
6097
}
6198

6299
/**
@@ -65,6 +102,10 @@ public LLCSegmentName(String tableName, int partitionGroupId, int sequenceNumber
65102
*/
66103
@Nullable
67104
public static LLCSegmentName of(String segmentName) {
105+
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
106+
if (parts.length < 4 || parts.length > 5) {
107+
return null;
108+
}
68109
try {
69110
return new LLCSegmentName(segmentName);
70111
} catch (Exception e) {
@@ -76,13 +117,7 @@ public static LLCSegmentName of(String segmentName) {
76117
* Returns whether the given segment name represents an LLC segment.
77118
*/
78119
public static boolean isLLCSegment(String segmentName) {
79-
int numSeparators = 0;
80-
int index = 0;
81-
while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
82-
numSeparators++;
83-
index += 2; // SEPARATOR.length()
84-
}
85-
return numSeparators == 3;
120+
return of(segmentName) != null;
86121
}
87122

88123
@Deprecated
@@ -94,17 +129,34 @@ public static boolean isLowLevelConsumerSegmentName(String segmentName) {
94129
* Returns the sequence number of the given segment name.
95130
*/
96131
public static int getSequenceNumber(String segmentName) {
97-
return Integer.parseInt(StringUtils.splitByWholeSeparator(segmentName, SEPARATOR)[2]);
132+
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
133+
if (parts.length == 4) {
134+
return Integer.parseInt(parts[2]);
135+
} else {
136+
return Integer.parseInt(parts[3]);
137+
}
98138
}
99139

100140
public String getTableName() {
101141
return _tableName;
102142
}
103143

144+
public String getTopicName() {
145+
return _topicName;
146+
}
147+
104148
public int getPartitionGroupId() {
105149
return _partitionGroupId;
106150
}
107151

152+
public String getPartitionGroupInfo() {
153+
if (_topicName.isEmpty()) {
154+
return String.valueOf(_partitionGroupId);
155+
} else {
156+
return _topicName + SEPARATOR + _partitionGroupId;
157+
}
158+
}
159+
108160
public int getSequenceNumber() {
109161
return _sequenceNumber;
110162
}
@@ -127,6 +179,9 @@ public String getSegmentName() {
127179
public int compareTo(LLCSegmentName other) {
128180
Preconditions.checkArgument(_tableName.equals(other._tableName),
129181
"Cannot compare segment names from different table: %s, %s", _segmentName, other.getSegmentName());
182+
if (!_topicName.equals(other._topicName)) {
183+
return StringUtils.compare(_topicName, other._topicName);
184+
}
130185
if (_partitionGroupId != other._partitionGroupId) {
131186
return Integer.compare(_partitionGroupId, other._partitionGroupId);
132187
}

pinot-common/src/test/java/org/apache/pinot/common/utils/LLCSegmentNameTest.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,15 @@ public void testSegmentNameBuilder() {
4242
assertEquals(llcSegmentName.getPartitionGroupId(), 0);
4343
assertEquals(llcSegmentName.getSequenceNumber(), 1);
4444

45+
LLCSegmentName llcSegmentNameWithTopicName = new LLCSegmentName("myTable", "myTopic", 0, 1, 1465508537069L);
46+
String segmentNameWithTopicName = llcSegmentNameWithTopicName.getSegmentName();
47+
assertEquals(segmentNameWithTopicName, "myTable__myTopic__0__1__20160609T2142Z");
48+
assertTrue(LLCSegmentName.isLLCSegment(segmentName));
49+
assertEquals(llcSegmentNameWithTopicName.getTableName(), "myTable");
50+
assertEquals(llcSegmentNameWithTopicName.getTopicName(), "myTopic");
51+
assertEquals(llcSegmentNameWithTopicName.getPartitionGroupId(), 0);
52+
assertEquals(llcSegmentNameWithTopicName.getSequenceNumber(), 1);
53+
4554
// Invalid segment name
4655
assertFalse(LLCSegmentName.isLLCSegment("a__abc__1__3__4__54__g__gg___h"));
4756
}
@@ -95,4 +104,61 @@ public void testLLCSegmentName() {
95104
Arrays.sort(testSorted);
96105
Assert.assertEquals(testSorted, new LLCSegmentName[]{segName5, segName1, segName6, segName3, segName4});
97106
}
107+
108+
@Test
109+
public void testLLCSegmentNameWithTopicName() {
110+
String tableName = "myTable";
111+
String topicName = "myTopic";
112+
final int partitionGroupId = 4;
113+
final int sequenceNumber = 27;
114+
final long msSinceEpoch = 1466200248000L;
115+
final String creationTime = "20160617T2150Z";
116+
final long creationTimeInMs = 1466200200000L;
117+
final String segmentName = "myTable__myTopic__4__27__" + creationTime;
118+
119+
LLCSegmentName segName1 = new LLCSegmentName(tableName, topicName, partitionGroupId, sequenceNumber, msSinceEpoch);
120+
Assert.assertEquals(segName1.getSegmentName(), segmentName);
121+
Assert.assertEquals(segName1.getPartitionGroupId(), partitionGroupId);
122+
Assert.assertEquals(segName1.getCreationTime(), creationTime);
123+
Assert.assertEquals(segName1.getCreationTimeMs(), creationTimeInMs);
124+
Assert.assertEquals(segName1.getSequenceNumber(), sequenceNumber);
125+
Assert.assertEquals(segName1.getTableName(), tableName);
126+
Assert.assertEquals(segName1.getTopicName(), topicName);
127+
128+
LLCSegmentName segName2 = new LLCSegmentName(segmentName);
129+
Assert.assertEquals(segName2.getSegmentName(), segmentName);
130+
Assert.assertEquals(segName2.getPartitionGroupId(), partitionGroupId);
131+
Assert.assertEquals(segName2.getCreationTime(), creationTime);
132+
Assert.assertEquals(segName2.getCreationTimeMs(), creationTimeInMs);
133+
Assert.assertEquals(segName2.getSequenceNumber(), sequenceNumber);
134+
Assert.assertEquals(segName2.getTableName(), tableName);
135+
Assert.assertEquals(segName2.getTopicName(), topicName);
136+
137+
Assert.assertEquals(segName1, segName2);
138+
139+
LLCSegmentName segName3 =
140+
new LLCSegmentName(tableName, topicName, partitionGroupId + 1, sequenceNumber - 1, msSinceEpoch);
141+
Assert.assertTrue(segName1.compareTo(segName3) < 0);
142+
LLCSegmentName segName4 =
143+
new LLCSegmentName(tableName, topicName, partitionGroupId + 1, sequenceNumber + 1, msSinceEpoch);
144+
Assert.assertTrue(segName1.compareTo(segName4) < 0);
145+
LLCSegmentName segName5 =
146+
new LLCSegmentName(tableName, topicName, partitionGroupId - 1, sequenceNumber + 1, msSinceEpoch);
147+
Assert.assertTrue(segName1.compareTo(segName5) > 0);
148+
LLCSegmentName segName6 =
149+
new LLCSegmentName(tableName, topicName, partitionGroupId, sequenceNumber + 1, msSinceEpoch);
150+
Assert.assertTrue(segName1.compareTo(segName6) < 0);
151+
152+
LLCSegmentName segName7 =
153+
new LLCSegmentName(tableName + "NotGood", topicName, partitionGroupId, sequenceNumber + 1, msSinceEpoch);
154+
try {
155+
segName1.compareTo(segName7);
156+
Assert.fail("Not failing when comparing " + segName1.getSegmentName() + " and " + segName7.getSegmentName());
157+
} catch (Exception e) {
158+
// expected
159+
}
160+
LLCSegmentName[] testSorted = new LLCSegmentName[]{segName3, segName1, segName4, segName5, segName6};
161+
Arrays.sort(testSorted);
162+
Assert.assertEquals(testSorted, new LLCSegmentName[]{segName5, segName1, segName6, segName3, segName4});
163+
}
98164
}

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class MissingConsumingSegmentFinder {
6161

6262
private final String _realtimeTableName;
6363
private final SegmentMetadataFetcher _segmentMetadataFetcher;
64-
private final Map<Integer, StreamPartitionMsgOffset> _partitionGroupIdToLargestStreamOffsetMap;
64+
private final Map<String, StreamPartitionMsgOffset> _partitionGroupInfoToLargestStreamOffsetMap;
6565
private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;
6666

6767
private ControllerMetrics _controllerMetrics;
@@ -75,15 +75,16 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt
7575
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
7676

7777
// create partition group id to largest stream offset map
78-
_partitionGroupIdToLargestStreamOffsetMap = new HashMap<>();
78+
_partitionGroupInfoToLargestStreamOffsetMap = new HashMap<>();
7979
streamConfigs.stream().map(streamConfig -> {
8080
streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
8181
return streamConfig;
8282
});
8383
try {
8484
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, Collections.emptyList(), false)
8585
.forEach(metadata -> {
86-
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
86+
_partitionGroupInfoToLargestStreamOffsetMap.put(metadata.getPartitionGroupInfo(),
87+
metadata.getStartOffset());
8788
});
8889
} catch (Exception e) {
8990
LOGGER.warn("Problem encountered in fetching stream metadata for topics: {} of table: {}. "
@@ -95,11 +96,11 @@ public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertySt
9596

9697
@VisibleForTesting
9798
MissingConsumingSegmentFinder(String realtimeTableName, SegmentMetadataFetcher segmentMetadataFetcher,
98-
Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToLargestStreamOffsetMap,
99+
Map<String, StreamPartitionMsgOffset> partitionGroupInfoToLargestStreamOffsetMap,
99100
StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory) {
100101
_realtimeTableName = realtimeTableName;
101102
_segmentMetadataFetcher = segmentMetadataFetcher;
102-
_partitionGroupIdToLargestStreamOffsetMap = partitionGroupIdToLargestStreamOffsetMap;
103+
_partitionGroupInfoToLargestStreamOffsetMap = partitionGroupInfoToLargestStreamOffsetMap;
103104
_streamPartitionMsgOffsetFactory = streamPartitionMsgOffsetFactory;
104105
}
105106

@@ -118,24 +119,24 @@ public void findAndEmitMetrics(IdealState idealState) {
118119
@VisibleForTesting
119120
MissingSegmentInfo findMissingSegments(Map<String, Map<String, String>> idealStateMap, Instant now) {
120121
// create the maps
121-
Map<Integer, LLCSegmentName> partitionGroupIdToLatestConsumingSegmentMap = new HashMap<>();
122-
Map<Integer, LLCSegmentName> partitionGroupIdToLatestCompletedSegmentMap = new HashMap<>();
122+
Map<String, LLCSegmentName> partitionGroupInfoToLatestConsumingSegmentMap = new HashMap<>();
123+
Map<String, LLCSegmentName> partitionGroupInfoToLatestCompletedSegmentMap = new HashMap<>();
123124
idealStateMap.forEach((segmentName, instanceToStatusMap) -> {
124125
LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
125126
if (llcSegmentName != null) { // Skip the uploaded realtime segments that don't conform to llc naming
126127
if (instanceToStatusMap.containsValue(SegmentStateModel.CONSUMING)) {
127-
updateMap(partitionGroupIdToLatestConsumingSegmentMap, llcSegmentName);
128+
updateMap(partitionGroupInfoToLatestConsumingSegmentMap, llcSegmentName);
128129
} else if (instanceToStatusMap.containsValue(SegmentStateModel.ONLINE)) {
129-
updateMap(partitionGroupIdToLatestCompletedSegmentMap, llcSegmentName);
130+
updateMap(partitionGroupInfoToLatestCompletedSegmentMap, llcSegmentName);
130131
}
131132
}
132133
});
133134

134135
MissingSegmentInfo missingSegmentInfo = new MissingSegmentInfo();
135-
if (!_partitionGroupIdToLargestStreamOffsetMap.isEmpty()) {
136-
_partitionGroupIdToLargestStreamOffsetMap.forEach((partitionGroupId, largestStreamOffset) -> {
137-
if (!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) {
138-
LLCSegmentName latestCompletedSegment = partitionGroupIdToLatestCompletedSegmentMap.get(partitionGroupId);
136+
if (!_partitionGroupInfoToLargestStreamOffsetMap.isEmpty()) {
137+
_partitionGroupInfoToLargestStreamOffsetMap.forEach((partitionGroupInfo, largestStreamOffset) -> {
138+
if (!partitionGroupInfoToLatestConsumingSegmentMap.containsKey(partitionGroupInfo)) {
139+
LLCSegmentName latestCompletedSegment = partitionGroupInfoToLatestCompletedSegmentMap.get(partitionGroupInfo);
139140
if (latestCompletedSegment == null) {
140141
// There's no consuming or completed segment for this partition group. Possibilities:
141142
// 1) it's a new partition group that has not yet been detected
@@ -152,37 +153,36 @@ MissingSegmentInfo findMissingSegments(Map<String, Map<String, String>> idealSta
152153
if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
153154
// there are unconsumed messages available on the stream
154155
missingSegmentInfo._totalCount++;
155-
updateMaxDurationInfo(missingSegmentInfo, partitionGroupId, segmentZKMetadata.getCreationTime(), now);
156+
updateMaxDurationInfo(missingSegmentInfo, partitionGroupInfo, segmentZKMetadata.getCreationTime(), now);
156157
}
157158
}
158159
}
159160
});
160161
} else {
161-
partitionGroupIdToLatestCompletedSegmentMap.forEach((partitionGroupId, latestCompletedSegment) -> {
162-
if (!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) {
162+
partitionGroupInfoToLatestCompletedSegmentMap.forEach((partitionGroupInfo, latestCompletedSegment) -> {
163+
if (!partitionGroupInfoToLatestConsumingSegmentMap.containsKey(partitionGroupInfo)) {
163164
missingSegmentInfo._totalCount++;
164165
long segmentCompletionTimeMillis = _segmentMetadataFetcher
165166
.fetchSegmentCompletionTime(_realtimeTableName, latestCompletedSegment.getSegmentName());
166-
updateMaxDurationInfo(missingSegmentInfo, partitionGroupId, segmentCompletionTimeMillis, now);
167+
updateMaxDurationInfo(missingSegmentInfo, partitionGroupInfo, segmentCompletionTimeMillis, now);
167168
}
168169
});
169170
}
170171
return missingSegmentInfo;
171172
}
172173

173-
private void updateMaxDurationInfo(MissingSegmentInfo missingSegmentInfo, Integer partitionGroupId,
174+
private void updateMaxDurationInfo(MissingSegmentInfo missingSegmentInfo, String partitionGroupInfo,
174175
long segmentCompletionTimeMillis, Instant now) {
175176
long duration = Duration.between(Instant.ofEpochMilli(segmentCompletionTimeMillis), now).toMinutes();
176177
if (duration > missingSegmentInfo._maxDurationInMinutes) {
177178
missingSegmentInfo._maxDurationInMinutes = duration;
178179
}
179-
LOGGER.warn("PartitionGroupId {} hasn't had a consuming segment for {} minutes!", partitionGroupId, duration);
180+
LOGGER.warn("PartitionGroupId {} hasn't had a consuming segment for {} minutes!", partitionGroupInfo, duration);
180181
}
181182

182-
private void updateMap(Map<Integer, LLCSegmentName> partitionGroupIdToLatestSegmentMap,
183+
private void updateMap(Map<String, LLCSegmentName> partitionGroupInfoToLatestSegmentMap,
183184
LLCSegmentName llcSegmentName) {
184-
int partitionGroupId = llcSegmentName.getPartitionGroupId();
185-
partitionGroupIdToLatestSegmentMap.compute(partitionGroupId, (pid, existingSegment) -> {
185+
partitionGroupInfoToLatestSegmentMap.compute(llcSegmentName.getPartitionGroupInfo(), (pid, existingSegment) -> {
186186
if (existingSegment == null) {
187187
return llcSegmentName;
188188
} else {

0 commit comments

Comments
 (0)