2121import static io .grpc .census .CensusStatsModule .CallAttemptsTracerFactory .RETRIES_PER_CALL ;
2222import static io .grpc .census .CensusStatsModule .CallAttemptsTracerFactory .RETRY_DELAY_PER_CALL ;
2323import static io .grpc .census .CensusStatsModule .CallAttemptsTracerFactory .TRANSPARENT_RETRIES_PER_CALL ;
24+ import static io .grpc .census .internal .ObservabilityCensusConstants .API_LATENCY_PER_CALL ;
2425import static java .util .concurrent .TimeUnit .MILLISECONDS ;
2526import static org .junit .Assert .assertEquals ;
2627import static org .junit .Assert .assertFalse ;
6364import io .grpc .Status ;
6465import io .grpc .census .CensusTracingModule .CallAttemptsTracerFactory ;
6566import io .grpc .census .internal .DeprecatedCensusConstants ;
67+ import io .grpc .census .internal .ObservabilityCensusConstants ;
6668import io .grpc .internal .FakeClock ;
6769import io .grpc .internal .testing .StatsTestUtils ;
6870import io .grpc .internal .testing .StatsTestUtils .FakeStatsRecorder ;
121123 */
122124@ RunWith (JUnit4 .class )
123125public class CensusModulesTest {
126+
127+ private static final double TOLERANCE = 1e-6 ;
124128 private static final CallOptions .Key <String > CUSTOM_OPTION =
125129 CallOptions .Key .createWithDefault ("option1" , "default" );
126130 private static final CallOptions CALL_OPTIONS =
@@ -368,7 +372,7 @@ record = statsRecorder.pollRecord();
368372 .setSampleToLocalSpanStore (false )
369373 .build ());
370374 verify (spyClientSpan , never ()).end ();
371- assertZeroRetryRecorded ( );
375+ assertPerCallMetrics ( 0D );
372376 }
373377
374378 @ Test
@@ -503,7 +507,7 @@ private void subtestClientBasicStatsDefaultContext(
503507 DeprecatedCensusConstants .RPC_CLIENT_UNCOMPRESSED_RESPONSE_BYTES ));
504508 assertEquals (30 + 100 + 16 + 24 ,
505509 record .getMetricAsLongOrFail (RpcMeasureConstants .GRPC_CLIENT_ROUNDTRIP_LATENCY ));
506- assertZeroRetryRecorded ( );
510+ assertPerCallMetrics ( 30D + 100 + 16 + 24 );
507511 } else {
508512 assertNull (statsRecorder .pollRecord ());
509513 }
@@ -691,6 +695,8 @@ record = statsRecorder.pollRecord();
691695 assertThat (record .getMetric (RETRIES_PER_CALL )).isEqualTo (1 );
692696 assertThat (record .getMetric (TRANSPARENT_RETRIES_PER_CALL )).isEqualTo (2 );
693697 assertThat (record .getMetric (RETRY_DELAY_PER_CALL )).isEqualTo (1000D + 10 + 10 );
698+ assertThat (record .getMetric (API_LATENCY_PER_CALL ))
699+ .isEqualTo (30D + 100 + 24 + 1000 + 100 + 10 + 10 + 16 + 24 );
694700 }
695701
696702 private void assertRealTimeMetric (
@@ -716,13 +722,14 @@ private void assertRealTimeMetric(
716722 assertEquals (expectedValue , record .getMetricAsLongOrFail (measure ));
717723 }
718724
719- private void assertZeroRetryRecorded ( ) {
725+ private void assertPerCallMetrics ( double expectedLatencyValue ) {
720726 StatsTestUtils .MetricsRecord record = statsRecorder .pollRecord ();
721727 TagValue methodTag = record .tags .get (RpcMeasureConstants .GRPC_CLIENT_METHOD );
722728 assertEquals (method .getFullMethodName (), methodTag .asString ());
723729 assertThat (record .getMetric (RETRIES_PER_CALL )).isEqualTo (0 );
724730 assertThat (record .getMetric (TRANSPARENT_RETRIES_PER_CALL )).isEqualTo (0 );
725731 assertThat (record .getMetric (RETRY_DELAY_PER_CALL )).isEqualTo (0D );
732+ assertThat (record .getMetric (API_LATENCY_PER_CALL )).isEqualTo (expectedLatencyValue );
726733 }
727734
728735 @ Test
@@ -849,7 +856,7 @@ record = statsRecorder.pollRecord();
849856 3000 ,
850857 record .getMetricAsLongOrFail (RpcMeasureConstants .GRPC_CLIENT_ROUNDTRIP_LATENCY ));
851858 assertNull (record .getMetric (RpcMeasureConstants .GRPC_CLIENT_SERVER_LATENCY ));
852- assertZeroRetryRecorded ( );
859+ assertPerCallMetrics ( 3000D );
853860 }
854861
855862 @ Test
@@ -989,7 +996,7 @@ private void subtestStatsHeadersPropagateTags(boolean propagate, boolean recordS
989996 assertNull (clientRecord .getMetric (DeprecatedCensusConstants .RPC_CLIENT_ERROR_COUNT ));
990997 TagValue clientPropagatedTag = clientRecord .tags .get (StatsTestUtils .EXTRA_TAG );
991998 assertEquals ("extra-tag-value-897" , clientPropagatedTag .asString ());
992- assertZeroRetryRecorded ( );
999+ assertPerCallMetrics ( 0D );
9931000 }
9941001
9951002 if (!recordStats ) {
@@ -1507,6 +1514,81 @@ public Long apply(AggregationData arg) {
15071514 });
15081515 }
15091516
1517+ @ Test
1518+ public void callLatencyView () throws InterruptedException {
1519+ StatsComponent localStats = new StatsComponentImpl ();
1520+
1521+ localStats
1522+ .getViewManager ()
1523+ .registerView (ObservabilityCensusConstants .GRPC_CLIENT_API_LATENCY_VIEW );
1524+
1525+ CensusStatsModule localCensusStats = new CensusStatsModule (
1526+ tagger , tagCtxSerializer , localStats .getStatsRecorder (), fakeClock .getStopwatchSupplier (),
1527+ false , false , true , false /* real-time */ , true );
1528+
1529+ CensusStatsModule .CallAttemptsTracerFactory callAttemptsTracerFactory =
1530+ new CensusStatsModule .CallAttemptsTracerFactory (
1531+ localCensusStats , tagger .empty (), method .getFullMethodName ());
1532+
1533+ Metadata headers = new Metadata ();
1534+ ClientStreamTracer tracer =
1535+ callAttemptsTracerFactory .newClientStreamTracer (STREAM_INFO , headers );
1536+ tracer .streamCreated (Attributes .EMPTY , headers );
1537+ fakeClock .forwardTime (50 , MILLISECONDS );
1538+ Status status = Status .OK .withDescription ("Success" );
1539+ tracer .streamClosed (status );
1540+ callAttemptsTracerFactory .callEnded (status );
1541+
1542+ // Give OpenCensus a chance to update the views asynchronously.
1543+ Thread .sleep (100 );
1544+
1545+ assertDistributionData (
1546+ localStats ,
1547+ ObservabilityCensusConstants .GRPC_CLIENT_API_LATENCY_VIEW ,
1548+ ImmutableList .of (TagValue .create (method .getFullMethodName ()), TagValue .create ("OK" )),
1549+ 50.0 , 1 , 0.0 ,
1550+ ImmutableList .of (
1551+ 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 1L ,
1552+ 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L , 0L ));
1553+ }
1554+
1555+ private void assertDistributionData (StatsComponent localStats , View view ,
1556+ List <TagValue > dimension , double mean , long count , double sumOfSquaredDeviations ,
1557+ List <Long > expectedBucketCounts ) {
1558+ AggregationData aggregationData = localStats .getViewManager ()
1559+ .getView (view .getName ())
1560+ .getAggregationMap ()
1561+ .get (dimension );
1562+
1563+ aggregationData .match (
1564+ Functions .</*@Nullable*/ Void >throwAssertionError (),
1565+ Functions .</*@Nullable*/ Void >throwAssertionError (),
1566+ Functions .</*@Nullable*/ Void >throwAssertionError (),
1567+ /* p3= */ new Function <AggregationData .DistributionData , Void >() {
1568+ @ Override
1569+ public Void apply (AggregationData .DistributionData arg ) {
1570+ assertThat (arg .getMean ()).isWithin (TOLERANCE ).of (mean );
1571+ assertThat (arg .getCount ()).isEqualTo (count );
1572+ assertThat (arg .getSumOfSquaredDeviations ())
1573+ .isWithin (TOLERANCE )
1574+ .of (sumOfSquaredDeviations );
1575+ assertThat (arg .getBucketCounts ())
1576+ .containsExactlyElementsIn (expectedBucketCounts )
1577+ .inOrder ();
1578+ return null ;
1579+ }
1580+ },
1581+ Functions .</*@Nullable*/ Void >throwAssertionError (),
1582+ Functions .</*@Nullable*/ Void >throwAssertionError (),
1583+ new Function <AggregationData , Void >() {
1584+ @ Override
1585+ public Void apply (AggregationData arg ) {
1586+ assertThat (((AggregationData .DistributionData ) arg ).getCount ()).isEqualTo (count );
1587+ return null ;
1588+ }
1589+ });
1590+ }
1591+
15101592 static class CallInfo <ReqT , RespT > extends ServerCallInfo <ReqT , RespT > {
15111593 private final MethodDescriptor <ReqT , RespT > methodDescriptor ;
15121594 private final Attributes attributes ;
0 commit comments