Skip to content

Conversation

@FroMage
Copy link
Member

@FroMage FroMage commented Jun 24, 2020

Added the quarkus.datasource.reactive.thread-local experimental configuration, which stores the pool into a thread-local.

Copy link
Member

@Sanne Sanne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good at high level, I think some little race conditions need to be made water tight.

Maybe the configuration property should be renamed.. no hard position on that.


@Override
public void beforeEach(ExtensionContext extensionContext) throws Exception {
rootLogger.addHandler(inMemoryLogHandler);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not too familiar with loggers and junit extensions, but isn't this going to leak in the testsuite?

Looks like it will keep adding new InMemoryLogHandler instances, and also it's not emptying each of them after usage. I suppose the rootLogger is a global static so this might get nasty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@geoand any opinion? I copied this from your QuarkusProdModeTest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In QuarkusProdModeTest this is done in beforeAll, is there any specific reason why it was added to beforeEach here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having it in beforeAll might be slightly better, but would still leak no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't follow, what would leak?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not understanding what you mean by "but because each test replaces it, it shouldn't be a problem" but it's ok, we can avoid blocking this PR just because I don't get it :)

With @FroMage having added the afterEach I'm happy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that each new instance of QuarkusDevModeTest will provide a new handler thus replacing the old one.
But for sure the best solution is to get a hold of the initial handlers in beforeAll and then restore them in afterAll

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's adding handlers, not replacing existing ones.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take care of this in a separate PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override
public String apply(String s) {
return s.replace("quarkus.datasource.reactive.thread-local=true",
"quarkus.datasource.reactive.thread-local = true");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the goal of this replacement? Might need a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to trigger a hot reload.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment. And is the change to the entity not enought to trigger one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it does not reload the pools. The entity change is to verify that a reload happened. I've added a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks. Off topic, but I wonder if we should improve on that: the configuration isn't really different so there shouldn't be any need to reload :)

quarkus.datasource.username=hibernate_orm_test
quarkus.datasource.password=hibernate_orm_test
quarkus.datasource.reactive.url=${reactive-postgres.url}
quarkus.datasource.reactive.thread-local=true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration property isn't suggesting that it relates to an enhanced connection pool.

Maybe threadlocal-pool ? Or pool-strategy= [enum: "threadlocal" | "simple" ] ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, but max-size also relates to the pool without saying it, so…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gsmet how would you name it? You recently refactored all datasource configuration right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've un-blocked the PR. Let's go ahead with the feature as we need it asap for benchmarking, but I'd still like to revisit the configuration in a follow up.

@FroMage FroMage force-pushed the pgpool-thread-local branch from 307b35a to f66f865 Compare June 24, 2020 12:47
@FroMage FroMage force-pushed the pgpool-thread-local branch from f66f865 to d2433d8 Compare June 24, 2020 13:04
@Sanne Sanne added the area/hibernate-reactive Hibernate Reactive label Jun 24, 2020
@Sanne Sanne added this to the 1.6.0 - master milestone Jun 24, 2020
@Sanne Sanne merged commit c00822d into quarkusio:master Jun 24, 2020
@geoand
Copy link
Contributor

geoand commented Jun 24, 2020

@FroMage, @Sanne I think we need to revert because CI failed the new test (and I can also reproduce the failure locally).

Also the failure has nothing to do with the logs, the HotReloadTestCase fails even if one disables all the log handling from QuarkusDevModeTest.

Here is the output:

2020-06-24 23:46:59,286 ERROR [io.qua.dep.dev.DevModeMain] (main) Quarkus dev mode failed to start: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: Cannot reset a started application
        at io.quarkus.bootstrap.app.CuratedApplication.runInCl(CuratedApplication.java:131)
        at io.quarkus.bootstrap.app.CuratedApplication.runInAugmentClassLoader(CuratedApplication.java:81)
        at io.quarkus.deployment.dev.DevModeMain.start(DevModeMain.java:132)
        at io.quarkus.test.QuarkusDevModeTest.beforeEach(QuarkusDevModeTest.java:174)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachCallbacks$1(TestMethodTestDescriptor.java:161)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$5(TestMethodTestDescriptor.java:197)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:197)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:160)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:220)
        at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$6(DefaultLauncher.java:188)
        at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:202)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:181)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:128)
        at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:142)
        at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:117)
        at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
        at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.IllegalStateException: Cannot reset a started application
        at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:321)
        at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:42)
        at io.quarkus.bootstrap.app.CuratedApplication.runInCl(CuratedApplication.java:128)
        ... 52 more
Caused by: java.lang.RuntimeException: java.lang.IllegalStateException: Cannot reset a started application
        at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:302)
        ... 54 more
Caused by: java.lang.IllegalStateException: Cannot reset a started application
        at io.quarkus.dev.appstate.ApplicationStateNotification.reset(ApplicationStateNotification.java:20)
        at io.quarkus.runner.bootstrap.StartupActionImpl.runMainClass(StartupActionImpl.java:122)
        at io.quarkus.deployment.dev.IsolatedDevModeMain.firstStart(IsolatedDevModeMain.java:95)
        at io.quarkus.deployment.dev.IsolatedDevModeMain.accept(IsolatedDevModeMain.java:296)
        ... 54 more

Copy link
Contributor

@tsegismont tsegismont left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments @FroMage

private static final Logger log = Logger.getLogger(ThreadLocalPool.class);

private final AtomicReference<ThreadLocal<PoolType>> pool = new AtomicReference<>(new ThreadLocal<>());
private static final List<Pool> threadLocalPools = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a copy on write list and relieve from synchronization, given the list will not change often.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 that's probably a good idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW github UI seems confusing: we're now commenting on FroMage 's original PR but looking at the version of code I had modified further in a follow-up PR.. weird.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nevermind, for some reason it looked like my follow up PR. Please check the new version, now in master ;)

private PoolType pool() {
ThreadLocal<PoolType> poolThreadLocal = pool.get();
PoolType ret = poolThreadLocal.get();
if (ret == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have an AtomicReference, you could use compare and swap and avoid adding a superfluous ThreadLocalPool to the list if two threads execute this part concurrently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I've done something similar in the follow-up PR, I was more concerned about making sure we'd not leak than getting most performance out of it: having chatted with @johnaohara we believe there's actually room for a better design (similar to what Agroal and Hikari have) and we should do that, but then contribute it into the pgclient project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, looking forward to it

@robotmrv
Copy link

@FroMage could you please describe why do we need to use Pool per thread?
What issues does it solve and how to reproduce them?
I see from https://github.com/quarkusio/quarkus/blob/master/extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/ThreadLocalPool.java#L38 what it solves some
Thread safety issues
but in what scenarios do they appear?

@Sanne
Copy link
Member

Sanne commented Feb 12, 2021

@robotmrv it's quite simple: the pool in vert.x v.3 (which is what we're using here currently) is not designed to work with multiple thread, but is designed to be used by a single thread. So, each thread should have its own.

This will likely change when we switch to vert.x v4

@Sanne
Copy link
Member

Sanne commented Feb 12, 2021

BTW, configuration property quarkus.datasource.reactive.thread-local was removed now as it's always on (since it's very important, but we didn't know this back then)

@robotmrv
Copy link

@Sanne

@robotmrv it's quite simple: the pool in vert.x v.3 (which is what we're using here currently) is not designed to work with multiple thread, but is designed to be used by a single thread. So, each thread should have its own.

I'm looking at the io.vertx.sqlclient.impl.PoolBase code and I see that inside it always works with single netty EventLoop Thread, so it looks Thread safe as state is changed only by single Thread.

Are you saying that this change is only to "scale" Pool so it could use all Netty event loops but not because of thread safety issues?
Or am I missing something?

But if so - it creates Pool for every new Thread, every pool creates connections and If access to the Pool is performed from non netty thread, e.g. ExecutorRecorder#getCurrent it will create a lot of connections (at least one per Thread) and it could be much more than configured.

@Sanne
Copy link
Member

Sanne commented Feb 14, 2021

I'm looking at the io.vertx.sqlclient.impl.PoolBase code and I see that inside it always works with single netty EventLoop Thread, so it looks Thread safe as state is changed only by single Thread.

The "pool" is not used only by its own internals (which is indeed also expecting a single Thread), but within Quarkus there's multiple threads who are accessing it the pool.

... will create a lot of connections (at least one per Thread) and it could be much more than configured.

Yes, this concern has been raised before, and the Vert.x team is working on a better pool implementation for Vert.x 4.

@robotmrv
Copy link

The "pool" is not used only by its own internals (which is indeed also expecting a single Thread), but within Quarkus there's multiple threads who are accessing it the pool.

sorry but I do not understand why it is not thread safe to use Pool by multiple Threads
Pool has such methods

Query<RowSet<Row>> query(String sql);
PreparedQuery<RowSet<Row>> preparedQuery(String sql);
void begin(Handler<AsyncResult<Transaction>> handler);
void getConnection(Handler<AsyncResult<SqlConnection>> handler);
void close()

By using I mean invoke above methods from any Thread, and then process callbacks without switching Thread.
All this methods offload command execution to the vert.x EventLoopContext which is created during Pool creation
by such or similar logic

  public void getConnection(Handler<AsyncResult<SqlConnection>> handler) {
    Context current = Vertx.currentContext();
    if (current == context) {
      pool.acquire(new ConnectionWaiter(handler));
    } else {
      context.runOnContext(v -> getConnection(handler));
    }
  }

so every command is running on a single Thread and Pool / Connection state is mutated by a single Thread.
And all results in callbacks are processed on this Thread.

@Sanne
Could you please point me to the test case where it is not true when I use only Pool API?

The "pool" is not used only by its own internals

Or are you talking about hibernate reactive case that uses Pool internals and it is not thread safe?

Sorry for asking so many questions
I do not use HR at the moment just Pool API and want to know if there is a problem with my use case.
And in addition, solution with thread locals does not look as a real solution to the problem as it probably just better hides the real problem and adds new problems like additional connections.

This is how I see ThreadLocal solution
X Thread accesses Pool, there is no Pool instance for it - new Pool is created.
new Pool per Thread - means connection pooling does not work at the moment. (not good)
Then X Thread creates command like query() and then Event Loop Thread executes it.
X Thread is released and returns to the Thread Pool while query is running.
Then there is a new incoming request and That X Thread starts process it.
It accesses Pool and there is already Pool for it, but first query() is a long one and is not completed at the moment.
so there is 2 commands which are executed by this Pool, which are executed on different connections but started by the same Thread.
Isn't it the same that these two commands would be created by different Threads?

@robotmrv
Copy link

@Sanne
sorry for pinging you, but could you please explain how does ThreadLocal solves issue with accessing by a single Thread in non blocking environment?
as far as I see the only thing that is performed on a the same thread is - Pool acquiring and sending a query command (that is thread safe).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants