Skip to content

Commit 7930a1f

Browse files
authored
Revert "Avoid publishing string set metrics on the Dataflow legacy runner." (#32002)
* Revert "Avoid publishing string set metrics on the Dataflow legacy runner. (#…" This reverts commit a226094. * unskip stingset * Fix ClassCastException
1 parent 4479281 commit 7930a1f

File tree

5 files changed

+7
-26
lines changed

5 files changed

+7
-26
lines changed

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,6 @@ def commonLegacyExcludeCategories = [
183183
'org.apache.beam.sdk.testing.UsesExternalService',
184184
'org.apache.beam.sdk.testing.UsesDistributionMetrics',
185185
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
186-
'org.apache.beam.sdk.testing.UsesStringSetMetrics',
187186
'org.apache.beam.sdk.testing.UsesMultimapState',
188187
'org.apache.beam.sdk.testing.UsesTestStream',
189188
'org.apache.beam.sdk.testing.UsesParDoLifecycle',

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowMetrics.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import com.google.api.services.dataflow.model.JobMetrics;
2525
import com.google.api.services.dataflow.model.MetricUpdate;
2626
import java.io.IOException;
27+
import java.util.Collection;
2728
import java.util.Collections;
2829
import java.util.HashMap;
2930
import java.util.HashSet;
3031
import java.util.List;
31-
import java.util.Set;
3232
import org.apache.beam.model.pipeline.v1.RunnerApi;
3333
import org.apache.beam.sdk.metrics.DistributionResult;
3434
import org.apache.beam.sdk.metrics.GaugeResult;
@@ -191,7 +191,7 @@ private StringSetResult getStringSetValue(MetricUpdate metricUpdate) {
191191
if (metricUpdate.getSet() == null) {
192192
return StringSetResult.empty();
193193
}
194-
return StringSetResult.create(ImmutableSet.copyOf(((Set) metricUpdate.getSet())));
194+
return StringSetResult.create(ImmutableSet.copyOf(((Collection) metricUpdate.getSet())));
195195
}
196196

197197
private DistributionResult getDistributionValue(MetricUpdate metricUpdate) {

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchModeExecutionContext.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import com.google.api.services.dataflow.model.CounterUpdate;
2121
import com.google.api.services.dataflow.model.SideInputInfo;
22-
import java.util.Collections;
2322
import java.util.Objects;
2423
import java.util.concurrent.TimeUnit;
2524
import org.apache.beam.runners.core.InMemoryStateInternals;
@@ -78,18 +77,14 @@ public class BatchModeExecutionContext
7877
protected static final String BIGQUERY_READ_THROTTLE_TIME_NAMESPACE =
7978
"org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$StorageClientImpl";
8079

81-
// TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
82-
private final boolean populateStringSetMetrics;
83-
8480
private BatchModeExecutionContext(
8581
CounterFactory counterFactory,
8682
Cache<?, WeightedValue<?>> dataCache,
8783
Cache<?, ?> logicalReferenceCache,
8884
ReaderFactory readerFactory,
8985
PipelineOptions options,
9086
DataflowExecutionStateTracker executionStateTracker,
91-
DataflowExecutionStateRegistry executionStateRegistry,
92-
boolean populateStringSetMetrics) {
87+
DataflowExecutionStateRegistry executionStateRegistry) {
9388
super(
9489
counterFactory,
9590
createMetricsContainerRegistry(),
@@ -102,7 +97,6 @@ private BatchModeExecutionContext(
10297
this.dataCache = dataCache;
10398
this.containerRegistry =
10499
(MetricsContainerRegistry<MetricsContainerImpl>) getMetricsContainerRegistry();
105-
this.populateStringSetMetrics = populateStringSetMetrics;
106100
}
107101

108102
private static MetricsContainerRegistry<MetricsContainerImpl> createMetricsContainerRegistry() {
@@ -138,8 +132,7 @@ public static BatchModeExecutionContext forTesting(
138132
counterFactory,
139133
options,
140134
"test-work-item-id"),
141-
stateRegistry,
142-
true);
135+
stateRegistry);
143136
}
144137

145138
public static BatchModeExecutionContext forTesting(PipelineOptions options, String stageName) {
@@ -252,8 +245,7 @@ public static BatchModeExecutionContext create(
252245
counterFactory,
253246
options,
254247
workItemId),
255-
executionStateRegistry,
256-
false);
248+
executionStateRegistry);
257249
}
258250

259251
/** Create a new {@link StepContext}. */
@@ -523,10 +515,7 @@ public Iterable<CounterUpdate> extractMetricUpdates(boolean isFinalUpdate) {
523515
update ->
524516
MetricsToCounterUpdateConverter.fromDistribution(
525517
update.getKey(), true, update.getUpdate())),
526-
FluentIterable.from(
527-
populateStringSetMetrics
528-
? updates.stringSetUpdates()
529-
: Collections.emptyList())
518+
FluentIterable.from(updates.stringSetUpdates())
530519
.transform(
531520
update ->
532521
MetricsToCounterUpdateConverter.fromStringSet(

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.time.Clock;
2424
import java.time.Duration;
2525
import java.time.Instant;
26-
import java.util.Collections;
2726
import java.util.HashSet;
2827
import java.util.Map;
2928
import java.util.Map.Entry;
@@ -89,9 +88,6 @@ public class StreamingStepMetricsContainer implements MetricsContainer {
8988

9089
private final Clock clock;
9190

92-
// TODO(BEAM-31814): Remove once Dataflow legacy runner supports this.
93-
@VisibleForTesting boolean populateStringSetUpdates = false;
94-
9591
private StreamingStepMetricsContainer(String stepName) {
9692
this.stepName = stepName;
9793
this.perWorkerCountersByFirstStaleTime = new ConcurrentHashMap<>();
@@ -191,8 +187,7 @@ public Histogram getPerWorkerHistogram(
191187
public Iterable<CounterUpdate> extractUpdates() {
192188
return counterUpdates()
193189
.append(distributionUpdates())
194-
.append(gaugeUpdates())
195-
.append(populateStringSetUpdates ? stringSetUpdates() : Collections.emptyList());
190+
.append(gaugeUpdates().append(stringSetUpdates()));
196191
}
197192

198193
private FluentIterable<CounterUpdate> counterUpdates() {

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainerTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,6 @@ public void testStringSetUpdateExtraction() {
292292
.setCumulative(false)
293293
.setStringList(new StringList().setElements(Arrays.asList("ab", "cd", "ef", "gh")));
294294

295-
((StreamingStepMetricsContainer) c1).populateStringSetUpdates = true;
296295
Iterable<CounterUpdate> updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
297296
assertThat(updates, containsInAnyOrder(name1Update));
298297

@@ -315,7 +314,6 @@ public void testStringSetUpdateExtraction() {
315314
.setCumulative(false)
316315
.setStringList(new StringList().setElements(Arrays.asList("ij", "kl", "mn")));
317316

318-
((StreamingStepMetricsContainer) c2).populateStringSetUpdates = true;
319317
updates = StreamingStepMetricsContainer.extractMetricUpdates(registry);
320318
assertThat(updates, containsInAnyOrder(name1Update, name2Update));
321319

0 commit comments

Comments
 (0)