Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public final class TracingAssembly {
private static Function<? super ParallelFlowable, ? extends ParallelFlowable>
oldOnParallelAssembly;

@GuardedBy("TracingAssembly.class")
@Nullable
private static Function<? super Runnable, ? extends Runnable> oldScheduleHandler;

@GuardedBy("TracingAssembly.class")
private static boolean enabled;

Expand All @@ -118,6 +122,8 @@ public void enable() {

enableObservable();

enableWrappedScheduleHandler();

enableCompletable();

enableSingle();
Expand All @@ -142,6 +148,8 @@ public void disable() {

disableObservable();

disableObservableAssembly();

disableCompletable();

disableSingle();
Expand Down Expand Up @@ -219,6 +227,25 @@ private static void enableObservable() {
}
}

@GuardedBy("TracingAssembly.class")
private static void enableWrappedScheduleHandler() {
oldScheduleHandler = RxJavaPlugins.getScheduleHandler();
RxJavaPlugins.setScheduleHandler(
runnable -> {
Context context = Context.current();
Runnable wrappedRunnable =
() -> {
try (Scope ignored = context.makeCurrent()) {
runnable.run();
}
};
// If there was a previous handler, apply it to our wrapped runnable
return oldScheduleHandler != null
? oldScheduleHandler.apply(wrappedRunnable)
: wrappedRunnable;
});
}

@GuardedBy("TracingAssembly.class")
@SuppressWarnings({"rawtypes", "unchecked"})
private static void enableSingle() {
Expand Down Expand Up @@ -274,6 +301,12 @@ private static void disableObservable() {
oldOnObservableSubscribe = null;
}

@GuardedBy("TracingAssembly.class")
private static void disableObservableAssembly() {
RxJavaPlugins.setScheduleHandler(oldScheduleHandler);
oldScheduleHandler = null;
}

@GuardedBy("TracingAssembly.class")
private static void disableCompletable() {
RxJavaPlugins.setOnCompletableSubscribe(oldOnCompletableSubscribe);
Expand Down
Loading
Loading