Skip to content

Commit fdf8ee4

Browse files
committed
[Enhancement] Add retry metrics for sink version V1 (StarRocks#229)
Signed-off-by: PengFei Li <[email protected]>
1 parent 5086a2e commit fdf8ee4

File tree

2 files changed

+14
-0
lines changed

2 files changed

+14
-0
lines changed

src/main/java/com/starrocks/connector/flink/manager/StarRocksSinkManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,11 @@ public class StarRocksSinkManager implements Serializable {
6565
private transient Counter totalFlushTimeWithoutRetries;
6666
private transient Counter totalFlushSucceededTimes;
6767
private transient Counter totalFlushFailedTimes;
68+
private transient Histogram flushTimeNsWithRetry;
6869
private transient Histogram flushTimeNs;
6970
private transient Histogram offerTimeNs;
71+
private transient Counter totalFlushRetry;
72+
private transient Histogram flushRetry;
7073

7174
private transient Counter totalFilteredRows;
7275
private transient Histogram commitAndPublishTimeMs;
@@ -82,8 +85,11 @@ public class StarRocksSinkManager implements Serializable {
8285
private static final String COUNTER_TOTAL_FLUSH_COST_TIME = "totalFlushTimeNs";
8386
private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES = "totalFlushSucceededTimes";
8487
private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES = "totalFlushFailedTimes";
88+
private static final String HISTOGRAM_FLUSH_TIME_WITH_RETRY = "flushTimeNsWithRetry";
8589
private static final String HISTOGRAM_FLUSH_TIME= "flushTimeNs";
8690
private static final String HISTOGRAM_OFFER_TIME_NS = "offerTimeNs";
91+
private static final String COUNTER_TOTAL_FLUSH_RETRY = "totalFlushRetry";
92+
private static final String HISTOGRAM_FLUSH_RETRY = "flushRetry";
8793

8894
// from stream load result
8995
private static final String COUNTER_NUMBER_FILTERED_ROWS = "totalFilteredRows";
@@ -143,8 +149,12 @@ public void setRuntimeContext(RuntimeContext runtimeCtx) {
143149
totalFlushTimeWithoutRetries = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
144150
totalFlushSucceededTimes = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
145151
totalFlushFailedTimes = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
152+
flushTimeNsWithRetry = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME_WITH_RETRY,
153+
new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
146154
flushTimeNs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
147155
offerTimeNs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
156+
totalFlushRetry = runtimeCtx.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_RETRY);
157+
flushRetry = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_FLUSH_RETRY, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
148158

149159
totalFilteredRows = runtimeCtx.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS);
150160
commitAndPublishTimeMs = runtimeCtx.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
@@ -334,6 +344,9 @@ private boolean asyncFlush() throws Exception {
334344
totalFlushTimeWithoutRetries.inc(System.nanoTime() - start);
335345
totalFlushSucceededTimes.inc();
336346
flushTimeNs.update(System.nanoTime() - start);
347+
flushTimeNsWithRetry.update(System.nanoTime() - startWithRetries);
348+
totalFlushRetry.inc(i);
349+
flushRetry.update(i);
337350
updateMetricsFromStreamLoadResult(result);
338351
}
339352
startScheduler();

src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public Set<ConfigOption<?>> optionalOptions() {
6565
optionalOptions.add(StarRocksSinkOptions.SINK_MAX_RETRIES);
6666
optionalOptions.add(StarRocksSinkOptions.SINK_SEMANTIC);
6767
optionalOptions.add(StarRocksSinkOptions.SINK_BATCH_OFFER_TIMEOUT);
68+
optionalOptions.add(StarRocksSinkOptions.SINK_METRIC_HISTOGRAM_WINDOW_SIZE);
6869
optionalOptions.add(StarRocksSinkOptions.SINK_PARALLELISM);
6970
optionalOptions.add(StarRocksSinkOptions.SINK_LABEL_PREFIX);
7071
optionalOptions.add(StarRocksSinkOptions.SINK_CONNECT_TIMEOUT);

0 commit comments

Comments
 (0)