Skip to content

Commit ff672d6

Browse files
authored
Merge pull request #19699 from njr-11/18669-transaction-context-for-ManagedExecutorDefinition
transaction context for ContextServiceDefinition
2 parents ff246eb + 6fa9c65 commit ff672d6

File tree

11 files changed

+250
-68
lines changed

11 files changed

+250
-68
lines changed

dev/com.ibm.ws.concurrent/src/com/ibm/ws/concurrent/WSManagedExecutorService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2017,2021 IBM Corporation and others.
2+
* Copyright (c) 2017,2022 IBM Corporation and others.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -27,7 +27,12 @@ public interface WSManagedExecutorService {
2727
* or creates new thread context as determined by the execution properties.
2828
* Do not expect the captured context to be serializable.</p>
2929
*
30-
* @param props execution properties. Custom property keys must not begin with "javax.enterprise.concurrent."
30+
* @param props execution properties. Custom property keys must not begin with
31+
* "javax.enterprise.concurrent." or "jakarta.enterprise.concurrent.".
32+
* Null indicates to use execution properties that are consistent with
33+
* with the managed executor's ContextServiceDefinition, or lacking a
34+
* ContextServiceDefinition use execution properties that suspend the
35+
* transaction on the thread of execution.
3136
* @return captured thread context.
3237
*/
3338
ThreadContextDescriptor captureThreadContext(Map<String, String> props);

dev/com.ibm.ws.concurrent/src/com/ibm/ws/concurrent/internal/AsyncMethod.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2021 IBM Corporation and others.
2+
* Copyright (c) 2021,2022 IBM Corporation and others.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -39,7 +39,7 @@ public class AsyncMethod<I, T> extends ManagedCompletableFuture<T> {
3939
* that is returned to the application, but it has the choice of
4040
* returning a different stage.
4141
*/
42-
private final BiFunction<I, CompletableFuture<T>, CompletionStage<T>> action;
42+
private final BiFunction<I, CompletableFuture<T>, CompletionStage<T>> asyncMethodImpl;
4343

4444
/**
4545
* Thread context that is captured when the asynchronous method is requested,
@@ -78,10 +78,8 @@ public AsyncMethod(BiFunction<I, CompletableFuture<T>, CompletionStage<T>> invok
7878
if (JAVA8)
7979
throw new UnsupportedOperationException();
8080

81-
rejectManagedTask(invoker);
82-
83-
this.action = invoker;
84-
this.contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(XPROPS_SUSPEND_TRAN);
81+
this.asyncMethodImpl = invoker;
82+
this.contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(null);
8583
this.invocation = invocation;
8684

8785
((Executor) futureRef).execute(this::runIfNotStarted);
@@ -120,22 +118,22 @@ public T join() {
120118
@FFDCIgnore({ CompletionException.class, Error.class, RuntimeException.class })
121119
private void runIfNotStarted() {
122120
if (!isDone() && started.compareAndSet(false, true)) {
123-
CompletionStage<T> asyncMethodResultStage = null;
124121
Throwable failure = null;
125122
ArrayList<ThreadContext> contextApplied = null;
126123
try {
127124
if (contextDescriptor != null)
128125
contextApplied = contextDescriptor.taskStarting();
129-
asyncMethodResultStage = action.apply(invocation, this);
126+
127+
CompletionStage<T> asyncMethodResultStage = asyncMethodImpl.apply(invocation, this);
130128

131129
// The asynchronous method implementation can return a different stage or null if it wants to
132130
if (asyncMethodResultStage != this)
133131
if (asyncMethodResultStage == null) {
134132
complete(null);
133+
} else if (asyncMethodResultStage instanceof ManagedCompletableFuture) {
134+
// bypass thread context capture & propagation because it is unnecessary here
135+
((ManagedCompletableFuture<T>) asyncMethodResultStage).super_whenComplete(this::complete);
135136
} else {
136-
// TODO inefficient if a ManagedCompletableFuture. Instead do:
137-
//} else if (asyncMethodResultStage instanceof ManagedCompletableFuture) {
138-
// ((ManagedCompletableFuture) asyncMethodResultStage).super_whenComplete(this::complete);
139137
asyncMethodResultStage.whenComplete(this::complete);
140138
}
141139
} catch (CompletionException x) {

dev/com.ibm.ws.concurrent/src/com/ibm/ws/concurrent/internal/ContextServiceImpl.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2012, 2021 IBM Corporation and others.
2+
* Copyright (c) 2012, 2022 IBM Corporation and others.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -136,6 +136,13 @@ public class ContextServiceImpl implements ContextService, //
136136
*/
137137
private ServiceReference<JavaEEVersion> eeVersionRef;
138138

139+
/**
140+
* Execution properties.
141+
* If ContextServiceDefinition is used, the execution properties are populated upon activate
142+
* to control which context types are cleared vs left unchanged. Otherwise, it remains empty.
143+
*/
144+
Map<String, String> execProps = Collections.emptyMap();
145+
139146
/**
140147
* Hash code for this instance.
141148
*/
@@ -242,6 +249,15 @@ protected void activate(ComponentContext context) {
242249
if (contextSvcName == null)
243250
contextSvcName = (String) props.get(CONFIG_ID);
244251

252+
if (!"file".equals(props.get("config.source"))) {
253+
// execution properties for ContextServiceDefinition
254+
execProps = new TreeMap<String, String>();
255+
execProps.put(WSContextService.DEFAULT_CONTEXT, WSContextService.UNCONFIGURED_CONTEXT_TYPES);
256+
String contextToSkip = (String) props.get("context.unchanged");
257+
if (contextToSkip != null)
258+
execProps.put(WSContextService.SKIP_CONTEXT_PROVIDERS, contextToSkip);
259+
}
260+
245261
lock.writeLock().lock();
246262
try {
247263
componentContext = context;
@@ -362,7 +378,7 @@ public <R> Callable<R> contextualCallable(Callable<R> callable) {
362378
throw new IllegalArgumentException(ContextualCallable.class.getSimpleName());
363379

364380
@SuppressWarnings("unchecked")
365-
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
381+
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
366382
return new ContextualCallable<R>(contextDescriptor, callable);
367383
}
368384

@@ -372,7 +388,7 @@ public <T, U> BiConsumer<T, U> contextualConsumer(BiConsumer<T, U> consumer) {
372388
throw new IllegalArgumentException(ContextualBiConsumer.class.getSimpleName());
373389

374390
@SuppressWarnings("unchecked")
375-
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
391+
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
376392
return new ContextualBiConsumer<T, U>(contextDescriptor, consumer);
377393
}
378394

@@ -382,7 +398,7 @@ public <T> Consumer<T> contextualConsumer(Consumer<T> consumer) {
382398
throw new IllegalArgumentException(ContextualConsumer.class.getSimpleName());
383399

384400
@SuppressWarnings("unchecked")
385-
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
401+
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
386402
return new ContextualConsumer<T>(contextDescriptor, consumer);
387403
}
388404

@@ -392,7 +408,7 @@ public <T, U, R> BiFunction<T, U, R> contextualFunction(BiFunction<T, U, R> func
392408
throw new IllegalArgumentException(ContextualBiFunction.class.getSimpleName());
393409

394410
@SuppressWarnings("unchecked")
395-
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
411+
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
396412
return new ContextualBiFunction<T, U, R>(contextDescriptor, function);
397413
}
398414

@@ -402,7 +418,7 @@ public <T, R> Function<T, R> contextualFunction(Function<T, R> function) {
402418
throw new IllegalArgumentException(ContextualFunction.class.getSimpleName());
403419

404420
@SuppressWarnings("unchecked")
405-
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
421+
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
406422
return new ContextualFunction<T, R>(contextDescriptor, function);
407423
}
408424

@@ -412,7 +428,7 @@ public Runnable contextualRunnable(Runnable runnable) {
412428
throw new IllegalArgumentException(ContextualRunnable.class.getSimpleName());
413429

414430
@SuppressWarnings("unchecked")
415-
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
431+
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
416432
return new ContextualRunnable(contextDescriptor, runnable);
417433
}
418434

@@ -422,7 +438,7 @@ public <R> Supplier<R> contextualSupplier(Supplier<R> supplier) {
422438
throw new IllegalArgumentException(ContextualSupplier.class.getSimpleName());
423439

424440
@SuppressWarnings("unchecked")
425-
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
441+
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
426442
return new ContextualSupplier<R>(contextDescriptor, supplier);
427443
}
428444

@@ -556,7 +572,7 @@ public Object createResource(ResourceInfo ref) throws Exception {
556572
@Override
557573
public Executor currentContextExecutor() {
558574
@SuppressWarnings("unchecked")
559-
ThreadContextDescriptor contextDescriptor = captureThreadContext(Collections.emptyMap());
575+
ThreadContextDescriptor contextDescriptor = captureThreadContext(execProps);
560576
return new ContextualExecutor(contextDescriptor);
561577
}
562578

dev/com.ibm.ws.concurrent/src/com/ibm/ws/concurrent/internal/ManagedCompletableFuture.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2017, 2021 IBM Corporation and others.
2+
* Copyright (c) 2017, 2022 IBM Corporation and others.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -118,15 +118,6 @@ public class ManagedCompletableFuture<T> extends CompletableFuture<T> {
118118
}
119119
}
120120

121-
/**
122-
* Execution property that indicates a task should run with any previous transaction suspended.
123-
*/
124-
static final Map<String, String> XPROPS_SUSPEND_TRAN = new TreeMap<String, String>();
125-
static {
126-
XPROPS_SUSPEND_TRAN.put("jakarta.enterprise.concurrent.TRANSACTION", "SUSPEND");
127-
XPROPS_SUSPEND_TRAN.put("javax.enterprise.concurrent.TRANSACTION", "SUSPEND");
128-
}
129-
130121
/**
131122
* Privileged action that obtains the Liberty non-deferrable ScheduledExecutorService.
132123
*/
@@ -466,7 +457,7 @@ public static CompletableFuture<Void> runAsync(Runnable action, Executor executo
466457
contextDescriptor = r.getContextDescriptor();
467458
action = r.getAction();
468459
} else if (executor instanceof WSManagedExecutorService) {
469-
contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(XPROPS_SUSPEND_TRAN);
460+
contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(null);
470461
} else {
471462
contextDescriptor = null;
472463
}
@@ -515,7 +506,7 @@ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> action, Executor
515506
contextDescriptor = s.getContextDescriptor();
516507
action = s.getAction();
517508
} else if (executor instanceof WSManagedExecutorService) {
518-
contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(XPROPS_SUSPEND_TRAN);
509+
contextDescriptor = ((WSManagedExecutorService) executor).captureThreadContext(null);
519510
} else {
520511
contextDescriptor = null;
521512
}
@@ -697,7 +688,7 @@ private ThreadContextDescriptor captureThreadContext(Executor executor) {
697688
if (managedExecutor == null)
698689
return null;
699690

700-
return managedExecutor.captureThreadContext(XPROPS_SUSPEND_TRAN);
691+
return managedExecutor.captureThreadContext(null);
701692
}
702693

703694
/**
@@ -1410,6 +1401,17 @@ final boolean super_completeExceptionally(Throwable x) {
14101401
return super.completeExceptionally(x);
14111402
}
14121403

1404+
/**
1405+
* Invokes whenComplete on the superclass, bypassing thread context capture and
1406+
* propagation.
1407+
*/
1408+
final void super_whenComplete(BiConsumer<? super T, ? super Throwable> action) {
1409+
if (JAVA8)
1410+
throw new UnsupportedOperationException();
1411+
else
1412+
super.whenComplete(action);
1413+
}
1414+
14131415
/**
14141416
* Convenience method to validate that an executor supports running asynchronously
14151417
* and to wrap the executor, if an ExecutorService, with FutureRefExecutor.

dev/com.ibm.ws.concurrent/src/com/ibm/ws/concurrent/internal/ManagedExecutorServiceImpl.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2012, 2021 IBM Corporation and others.
2+
* Copyright (c) 2012, 2022 IBM Corporation and others.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -117,6 +117,15 @@ public Void run() {
117117
*/
118118
private static final Map<String, String> JAVAX_SUSPEND_TRAN = Collections.singletonMap("javax.enterprise.concurrent.TRANSACTION", "SUSPEND");
119119

120+
/**
121+
* Execution properties that specify to suspend the current transaction.
122+
*/
123+
private static final Map<String, String> XPROPS_SUSPEND_TRAN = new TreeMap<String, String>();
124+
static {
125+
XPROPS_SUSPEND_TRAN.putAll(JAKARTA_SUSPEND_TRAN);
126+
XPROPS_SUSPEND_TRAN.putAll(JAVAX_SUSPEND_TRAN);
127+
}
128+
120129
private final boolean allowLifeCycleMethods;
121130

122131
/**
@@ -302,12 +311,15 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
302311

303312
@Override
304313
public ThreadContextDescriptor captureThreadContext(Map<String, String> props) {
305-
WSContextService contextSvc;
314+
ContextServiceImpl contextSvc;
306315
if (mpContextService == null)
307-
contextSvc = contextSvcRef.getServiceWithException();
316+
contextSvc = (ContextServiceImpl) contextSvcRef.getServiceWithException();
308317
else
309318
contextSvc = mpContextService;
310319

320+
if (props == null)
321+
props = contextSvc.execProps.isEmpty() ? XPROPS_SUSPEND_TRAN : contextSvc.execProps;
322+
311323
@SuppressWarnings("unchecked")
312324
ThreadContextDescriptor threadContext = contextSvc.captureThreadContext(props);
313325
return threadContext;

dev/com.ibm.ws.concurrent_fat_cdi/test-applications/concurrentCDIApp/src/concurrent/cdi/web/ApplicationScopedBean.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*******************************************************************************
2-
* Copyright (c) 2017,2021 IBM Corporation and others.
2+
* Copyright (c) 2017,2022 IBM Corporation and others.
33
* All rights reserved. This program and the accompanying materials
44
* are made available under the terms of the Eclipse Public License v1.0
55
* which accompanies this distribution, and is available at
@@ -81,13 +81,12 @@ public char getCharacter() {
8181
* Obtain the transaction key and status and then commit the active transaction.
8282
*/
8383
@Asynchronous(executor = "java:module/concurrent/txexecutor")
84-
public CompletableFuture<Entry<Object, Integer>> getTransactionInfoAndCommit() {
84+
public CompletableFuture<Entry<Object, Integer>> getTransactionInfoAndCommit(TransactionSynchronizationRegistry tranSyncRegistry,
85+
UserTransaction tx) {
8586
try {
86-
TransactionSynchronizationRegistry tranSyncRegistry = InitialContext.doLookup("java:comp/TransactionSynchronizationRegistry");
8787
Object txKey = tranSyncRegistry.getTransactionKey();
8888
int txStatus = tranSyncRegistry.getTransactionStatus();
8989

90-
UserTransaction tx = InitialContext.doLookup("java:comp/UserTransaction");
9190
tx.commit();
9291

9392
return Asynchronous.Result.complete(new SimpleEntry<Object, Integer>(txKey, txStatus));

0 commit comments

Comments
 (0)