1- import com.datadog.debugger.agent.ClassesToRetransformFinder
2- import com.datadog.debugger.agent.Configuration
3- import com.datadog.debugger.agent.ConfigurationUpdater
4- import com.datadog.debugger.agent.DebuggerTransformer
5- import com.datadog.debugger.agent.DenyListHelper
6- import com.datadog.debugger.agent.JsonSnapshotSerializer
7- import com.datadog.debugger.codeorigin.DefaultCodeOriginRecorder
8- import com.datadog.debugger.instrumentation.InstrumentationResult
9- import com.datadog.debugger.probe.ProbeDefinition
10- import com.datadog.debugger.sink.DebuggerSink
11- import com.datadog.debugger.sink.ProbeStatusSink
1+ import datadog.trace.bootstrap.debugger.DebuggerContext.CodeOriginRecorder
122import com.google.common.util.concurrent.MoreExecutors
133import datadog.trace.agent.test.naming.VersionedNamingTestBase
14- import datadog.trace.api.Config
154import datadog.trace.api.DDSpanTypes
16- import datadog.trace.api.DDTags
175import datadog.trace.bootstrap.debugger.DebuggerContext
186import datadog.trace.bootstrap.instrumentation.api.Tags
19- import datadog.trace.util.AgentTaskScheduler
207import example.GreeterGrpc
218import example.Helloworld
229import io.grpc.BindableService
@@ -25,20 +12,18 @@ import io.grpc.Server
2512import io.grpc.inprocess.InProcessChannelBuilder
2613import io.grpc.inprocess.InProcessServerBuilder
2714import io.grpc.stub.StreamObserver
28- import net.bytebuddy.agent.ByteBuddyAgent
15+ import org.mockito.internal.util.MockUtil
2916
3017import java.util.concurrent.CopyOnWriteArrayList
3118import java.util.concurrent.Executors
3219import java.util.concurrent.TimeUnit
3320import java.util.concurrent.atomic.AtomicReference
3421
3522import static datadog.trace.api.config.TraceInstrumentationConfig.*
36- import static datadog.trace.util.AgentThreadFactory.AgentThread.TASK_SCHEDULER
37- import static java.lang.String.format
38- import static org.mockito.Mockito.mock
39- import static org.mockito.Mockito.when
23+ import org.mockito.Mockito
4024
4125abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
26+ private CodeOriginRecorder codeOriginRecorder
4227
4328 @Override
4429 final String service () {
@@ -81,79 +66,78 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
8166 def error = new AtomicReference ()
8267
8368 BindableService greeter = new GreeterGrpc.GreeterImplBase () {
84- @Override
85- StreamObserver<Helloworld.Response > conversation (StreamObserver<Helloworld.Response > observer ) {
86- return new StreamObserver<Helloworld.Response > () {
87- @Override
88- void onNext (Helloworld.Response value ) {
89-
90- serverReceived << value. message
91-
92- (1 .. msgCount). each {
93- if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
94- observer. onNext(value)
95- } else {
96- observer. onError(new IllegalStateException (" not async propagating!" ))
69+ @Override
70+ StreamObserver<Helloworld.Response > conversation (StreamObserver<Helloworld.Response > observer ) {
71+ return new StreamObserver<Helloworld.Response > () {
72+ @Override
73+ void onNext (Helloworld.Response value ) {
74+
75+ serverReceived << value. message
76+
77+ (1 .. msgCount). each {
78+ if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
79+ observer. onNext(value)
80+ } else {
81+ observer. onError(new IllegalStateException (" not async propagating!" ))
82+ }
83+ }
9784 }
98- }
99- }
10085
101- @Override
102- void onError (Throwable t ) {
103- if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
104- error. set(t)
105- observer. onError(t)
106- } else {
107- observer. onError(new IllegalStateException (" not async propagating!" ))
108- }
109- }
86+ @Override
87+ void onError (Throwable t ) {
88+ if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
89+ error. set(t)
90+ observer. onError(t)
91+ } else {
92+ observer. onError(new IllegalStateException (" not async propagating!" ))
93+ }
94+ }
11095
111- @Override
112- void onCompleted () {
113- if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
114- observer. onCompleted()
115- } else {
116- observer. onError(new IllegalStateException (" not async propagating!" ))
96+ @Override
97+ void onCompleted () {
98+ if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
99+ observer. onCompleted()
100+ } else {
101+ observer. onError(new IllegalStateException (" not async propagating!" ))
102+ }
103+ }
117104 }
118- }
119105 }
120106 }
121- }
122107 Server server = InProcessServerBuilder . forName(getClass(). name). addService(greeter)
123- .executor(directExecutor ? MoreExecutors . directExecutor() : Executors . newCachedThreadPool())
124- .build(). start()
108+ .executor(directExecutor ? MoreExecutors . directExecutor() : Executors . newCachedThreadPool())
109+ .build(). start()
125110
126- Thread . sleep(1000 )
127111 ManagedChannel channel = InProcessChannelBuilder . forName(getClass(). name). build()
128112 GreeterGrpc.GreeterStub client = GreeterGrpc . newStub(channel). withWaitForReady()
129113
130114 when :
131115 def streamObserver = client. conversation(new StreamObserver<Helloworld.Response > () {
132- @Override
133- void onNext (Helloworld.Response value ) {
134- if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
135- clientReceived << value. message
136- } else {
137- error. set(new IllegalStateException (" not async propagating!" ))
116+ @Override
117+ void onNext (Helloworld.Response value ) {
118+ if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
119+ clientReceived << value. message
120+ } else {
121+ error. set(new IllegalStateException (" not async propagating!" ))
122+ }
138123 }
139- }
140124
141- @Override
142- void onError (Throwable t ) {
143- if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
144- error. set(t)
145- } else {
146- error. set(new IllegalStateException (" not async propagating!" ))
125+ @Override
126+ void onError (Throwable t ) {
127+ if (TEST_TRACER . activeScope(). isAsyncPropagating()) {
128+ error. set(t)
129+ } else {
130+ error. set(new IllegalStateException (" not async propagating!" ))
131+ }
147132 }
148- }
149133
150- @Override
151- void onCompleted () {
152- if (! TEST_TRACER . activeScope(). isAsyncPropagating()) {
153- error. set(new IllegalStateException (" not async propagating!" ))
134+ @Override
135+ void onCompleted () {
136+ if (! TEST_TRACER . activeScope(). isAsyncPropagating()) {
137+ error. set(new IllegalStateException (" not async propagating!" ))
138+ }
154139 }
155- }
156- })
140+ })
157141
158142 clientRange. each {
159143 def message = Helloworld.Response . newBuilder(). setMessage(" call $it " ). build()
@@ -172,6 +156,9 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
172156 }
173157 }. flatten(). sort()
174158
159+
160+ def invocations = MockUtil . getInvocationContainer(codeOriginRecorder)
161+ assert invocations. invocations. stream(). anyMatch { it. method. name == " captureCodeOrigin" }
175162 assertTraces(2 ) {
176163 trace((hasClientMessageSpans() ? clientMessageCount * serverMessageCount : 0 ) + 1 ) {
177164 span {
@@ -220,14 +207,7 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
220207 " $Tags . COMPONENT " " grpc-server"
221208 " $Tags . SPAN_KIND " Tags . SPAN_KIND_SERVER
222209 " status.code" " OK"
223- isPresent(DDTags . DD_CODE_ORIGIN_TYPE )
224- isPresent(format(DDTags . DD_CODE_ORIGIN_FRAME , 0 , " signature" ))
225210
226- for (i in 0 .. < 8 ) {
227- for (label in [" file" , " line" , " method" , " type" ]) {
228- isPresent(format(DDTags . DD_CODE_ORIGIN_FRAME , i, label))
229- }
230- }
231211 defaultTags(true )
232212 }
233213 }
@@ -273,35 +253,8 @@ abstract class GrpcCodeOriginTest extends VersionedNamingTestBase {
273253
274254 void codeOriginSetup () {
275255 injectSysConfig(CODE_ORIGIN_FOR_SPANS_ENABLED , " true" , true )
276-
277- def configuration = Configuration . builder()
278- .setService(" grpc code origin test" )
279- .build()
280-
281- def config = mock(Config . class)
282- when(config. isDebuggerEnabled()). thenReturn(true )
283- when(config. isDebuggerClassFileDumpEnabled()). thenReturn(true )
284- when(config. isDebuggerVerifyByteCode()). thenReturn(false )
285- when(config. getFinalDebuggerSnapshotUrl())
286- .thenReturn(" http://localhost:8126/debugger/v1/input" )
287- when(config. getFinalDebuggerSymDBUrl()). thenReturn(" http://localhost:8126/symdb/v1/input" )
288- when(config. getDebuggerCodeOriginMaxUserFrames()). thenReturn(8 )
289-
290- def probeStatusSink = mock(ProbeStatusSink . class)
291-
292- def sink = new DebuggerSink (config, probeStatusSink)
293- def configurationUpdater = new ConfigurationUpdater (INSTRUMENTATION , DebuggerTransformer ::new , config, sink, new ClassesToRetransformFinder ())
294-
295- def currentTransformer = new DebuggerTransformer (config, configuration, {
296- ProbeDefinition definition, InstrumentationResult result ->
297- }, sink)
298- INSTRUMENTATION . addTransformer(currentTransformer)
299-
300- DebuggerContext . initProbeResolver(configurationUpdater)
301- DebuggerContext . initClassFilter(new DenyListHelper (null ))
302- DebuggerContext . initValueSerializer(new JsonSnapshotSerializer ())
303-
304- DebuggerContext . initCodeOrigin(new DefaultCodeOriginRecorder (config, configurationUpdater))
256+ codeOriginRecorder = Mockito . mock(CodeOriginRecorder )
257+ DebuggerContext . initCodeOrigin(codeOriginRecorder)
305258 }
306259}
307260
0 commit comments