Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datadog.trace.core.DDSpan
import datadog.trace.instrumentation.kotlin.coroutines.AbstractKotlinCoroutineInstrumentationTest
import kotlinx.coroutines.CoroutineDispatcher
import spock.lang.Ignore

class KotlinCoroutineInstrumentationTest extends AbstractKotlinCoroutineInstrumentationTest<KotlinCoroutineTests> {

Expand Down Expand Up @@ -44,7 +43,6 @@ class KotlinCoroutineInstrumentationTest extends AbstractKotlinCoroutineInstrume
[dispatcherName, dispatcher] << dispatchersToTest
}

@Ignore("Not working: disconnected trace")
def "kotlin trace consistent after flow"() {
setup:
KotlinCoroutineTests kotlinTest = new KotlinCoroutineTests(dispatcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class KotlinCoroutineTests(dispatcher: CoroutineDispatcher) : CoreKotlinCoroutin
emit(1)
}.flowOn(Dispatchers.IO)
val ff = f.single()
// FIXME: This span is detached

childSpan("outside-flow").activateAndUse {
println("hello $ff")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package datadog.trace.instrumentation.kotlin.coroutines;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import kotlin.coroutines.CoroutineContext;
import kotlinx.coroutines.AbstractCoroutine;
import kotlinx.coroutines.ChildHandle;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.JobNode;
import kotlinx.coroutines.JobSupport;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CoroutineContextHelper {
private static final Logger log = LoggerFactory.getLogger(CoroutineContextHelper.class);

/*
IntelliJ shows a warning here for Job being out of bounds, but that's not true, the class compiles.
*/

@Nullable
@SuppressWarnings("unchecked")
public static Job getJob(final CoroutineContext context) {
Expand Down Expand Up @@ -40,7 +49,57 @@ public static void closeScopeStateContext(final AbstractCoroutine<?> coroutine)
final ScopeStateCoroutineContext scopeStackContext =
getScopeStateContext(coroutine.getContext());
if (scopeStackContext != null) {
scopeStackContext.maybeCloseScopeAndCancelContinuation(coroutine);
scopeStackContext.maybeCloseScopeAndCancelContinuation(coroutine, getParentJob(coroutine));
}
}

private static final MethodHandle PARENT_HANDLE_METHOD;
private static final MethodHandle PARENT_HANDLE_FIELD;
private static final MethodHandle JOB_FIELD;

static {
MethodHandle parentHandleMethod = null;
MethodHandle parentHandleField = null;
MethodHandle jobField = null;

MethodHandles.Lookup lookup = MethodHandles.publicLookup();
try {
// Kotlin coroutines 1.5+
parentHandleMethod =
lookup.findVirtual(
JobSupport.class,
"getParentHandle$kotlinx_coroutines_core",
MethodType.methodType(ChildHandle.class));
jobField = lookup.findGetter(JobNode.class, "job", JobSupport.class);
} catch (Throwable ignore) {
try {
// Kotlin coroutines 1.3
parentHandleField = lookup.findGetter(JobSupport.class, "parentHandle", ChildHandle.class);
jobField = lookup.findGetter(JobNode.class, "job", Job.class);
} catch (Throwable e) {
log.debug("Unable to access parent handle", e);
}
}

PARENT_HANDLE_METHOD = parentHandleMethod;
PARENT_HANDLE_FIELD = parentHandleField;
JOB_FIELD = jobField;
}

private static Job getParentJob(JobSupport coroutine) {
try {
Object parentHandle = null;
if (null != PARENT_HANDLE_METHOD) {
parentHandle = PARENT_HANDLE_METHOD.invoke(coroutine);
} else if (null != PARENT_HANDLE_FIELD) {
parentHandle = PARENT_HANDLE_FIELD.invoke(coroutine);
}
if (parentHandle instanceof JobNode) {
return (Job) JOB_FIELD.invoke((JobNode) parentHandle);
}
} catch (Throwable e) {
log.debug("Unable to extract parent job", e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,20 @@ public ScopeState updateThreadContext(@NotNull final CoroutineContext coroutineC
}

/** If there's a context item for the coroutine then try to close it */
public void maybeCloseScopeAndCancelContinuation(final Job coroutine) {
public void maybeCloseScopeAndCancelContinuation(final Job coroutine, final Job parent) {
final ScopeStateCoroutineContextItem contextItem = contextItemPerCoroutine.get(coroutine);
if (contextItem != null) {
final ScopeState currentThreadScopeState = AgentTracer.get().newScopeState();
currentThreadScopeState.fetchFromActive();
ScopeState currentThreadScopeState = null;
if (parent != null) {
final ScopeStateCoroutineContextItem parentItem = contextItemPerCoroutine.get(parent);
if (parentItem != null) {
currentThreadScopeState = parentItem.getScopeState();
}
}
if (currentThreadScopeState == null) {
currentThreadScopeState = AgentTracer.get().newScopeState();
currentThreadScopeState.fetchFromActive();
}

contextItem.maybeCloseScopeAndCancelContinuation();
contextItemPerCoroutine.remove(coroutine);
Expand Down Expand Up @@ -107,6 +116,10 @@ public ScopeStateCoroutineContextItem() {
coroutineScopeState = AgentTracer.get().newScopeState();
}

public ScopeState getScopeState() {
return coroutineScopeState;
}

public void activate() {
coroutineScopeState.activate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import datadog.trace.core.DDSpan
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ThreadPoolDispatcherKt
import spock.lang.Ignore
import spock.lang.Shared

abstract class AbstractKotlinCoroutineInstrumentationTest<T extends CoreKotlinCoroutineTests> extends AgentTestRunner {
Expand Down Expand Up @@ -366,7 +365,6 @@ abstract class AbstractKotlinCoroutineInstrumentationTest<T extends CoreKotlinCo
[dispatcherName, dispatcher] << dispatchersToTest
}

@Ignore("Not working: disconnected trace")
def "kotlin trace consistent with timeout"() {
setup:
CoreKotlinCoroutineTests kotlinTest = getCoreKotlinCoroutineTestsInstance(dispatcher)
Expand All @@ -389,7 +387,7 @@ abstract class AbstractKotlinCoroutineInstrumentationTest<T extends CoreKotlinCo
}
span(2) {
operationName "3-after-timeout"
childOf span(5)
childOf span(1)
}
span(3) {
operationName "4-after-timeout-2"
Expand All @@ -406,7 +404,6 @@ abstract class AbstractKotlinCoroutineInstrumentationTest<T extends CoreKotlinCo
[dispatcherName, dispatcher] << dispatchersToTest
}

@Ignore("Not working: disconnected trace")
def "kotlin trace consistent after delay"() {
setup:
CoreKotlinCoroutineTests kotlinTest = getCoreKotlinCoroutineTestsInstance(dispatcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ abstract class CoreKotlinCoroutineTests(private val dispatcher: CoroutineDispatc
delay(10)
}
}
// FIXME: This span is detached
childSpan("3-after-timeout").activateAndUse {
delay(10)
}
Expand Down Expand Up @@ -351,7 +350,6 @@ abstract class CoreKotlinCoroutineTests(private val dispatcher: CoroutineDispatc
}
}

// FIXME: This span is detached
tracedChild("after-process")

6
Expand Down