Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,10 @@ public class DataSourceReactiveRuntimeConfig {
*/
@ConfigItem
public PfxConfiguration keyCertificatePfx;

/**
* Experimental: use one connection pool per thread.
*/
@ConfigItem
public Optional<Boolean> threadLocal;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package io.quarkus.reactive.datasource.runtime;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import org.jboss.logging.Logger;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.sqlclient.Pool;
import io.vertx.sqlclient.PoolOptions;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Query;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.Transaction;

public abstract class ThreadLocalPool<PoolType extends Pool> implements Pool {

private static final Logger log = Logger.getLogger(ThreadLocalPool.class);

private final AtomicReference<ThreadLocal<PoolType>> pool = new AtomicReference<>(new ThreadLocal<>());
private static final List<Pool> threadLocalPools = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be a copy on write list and relieve from synchronization, given the list will not change often.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 that's probably a good idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW github UI seems confusing: we're now commenting on FroMage 's original PR but looking at the version of code I had modified further in a follow-up PR.. weird.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah nevermind, for some reason it looked like my follow up PR. Please check the new version, now in master ;)


protected final PoolOptions poolOptions;
protected final Vertx vertx;

public ThreadLocalPool(Vertx vertx, PoolOptions poolOptions) {
this.vertx = vertx;
this.poolOptions = poolOptions;
}

private PoolType pool() {
ThreadLocal<PoolType> poolThreadLocal = pool.get();
PoolType ret = poolThreadLocal.get();
if (ret == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have an AtomicReference, you could use compare and swap and avoid adding a superfluous ThreadLocalPool to the list if two threads execute this part concurrently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I've done something similar in the follow-up PR, I was more concerned about making sure we'd not leak than getting most performance out of it: having chatted with @johnaohara we believe there's actually room for a better design (similar to what Agroal and Hikari have) and we should do that, but then contribute it into the pgclient project.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, looking forward to it

log.debugf("Making pool for thread: %s", Thread.currentThread());
ret = createThreadLocalPool();
synchronized (threadLocalPools) {
threadLocalPools.add(ret);
}
poolThreadLocal.set(ret);
}
return ret;
}

protected abstract PoolType createThreadLocalPool();

@Override
public void getConnection(Handler<AsyncResult<SqlConnection>> handler) {
pool().getConnection(handler);
}

@Override
public Query<RowSet<Row>> query(String sql) {
return pool().query(sql);
}

@Override
public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
return pool().preparedQuery(sql);
}

@Override
public void begin(Handler<AsyncResult<Transaction>> handler) {
pool().begin(handler);
}

/**
* This is a bit weird because it works on all ThreadLocal pools, but it's only
* called from a single thread, when doing shutdown, and needs to close all the
* pools and reinitialise the thread local so that all newly created pools after
* the restart will start with an empty thread local instead of a closed one.
*/
@Override
public void close() {
// close all the thread-local pools
synchronized (threadLocalPools) {
for (Pool pool : threadLocalPools) {
log.debugf("Closing pool: %s", pool);
pool.close();
}
threadLocalPools.clear();
}
// discard the TL to clear them all
pool.set(new ThreadLocal<PoolType>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim
dataSourceReactiveDB2Config);
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactiveDB2Config);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalDB2Pool(vertx, connectOptions, poolOptions);
}
return DB2Pool.pool(vertx, connectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.db2.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.db2client.DB2ConnectOptions;
import io.vertx.db2client.DB2Pool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalDB2Pool extends ThreadLocalPool<DB2Pool> implements DB2Pool {

private final DB2ConnectOptions db2ConnectOptions;

public ThreadLocalDB2Pool(Vertx vertx, DB2ConnectOptions db2ConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.db2ConnectOptions = db2ConnectOptions;
}

@Override
protected DB2Pool createThreadLocalPool() {
return DB2Pool.pool(vertx, db2ConnectOptions, poolOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt
dataSourceReactiveMySQLConfig);
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalMySQLPool(vertx, mysqlConnectOptions, poolOptions);
}
return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.mysql.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.mysqlclient.MySQLConnectOptions;
import io.vertx.mysqlclient.MySQLPool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalMySQLPool extends ThreadLocalPool<MySQLPool> implements MySQLPool {

private final MySQLConnectOptions mySQLConnectOptions;

public ThreadLocalMySQLPool(Vertx vertx, MySQLConnectOptions mySQLConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.mySQLConnectOptions = mySQLConnectOptions;
}

@Override
protected MySQLPool createThreadLocalPool() {
return MySQLPool.pool(vertx, mySQLConnectOptions, poolOptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ private PgPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntime
dataSourceReactivePostgreSQLConfig);
PgConnectOptions pgConnectOptions = toPgConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
dataSourceReactivePostgreSQLConfig);
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
return new ThreadLocalPgPool(vertx, pgConnectOptions, poolOptions);
}
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.reactive.pg.client.runtime;

import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.PoolOptions;

public class ThreadLocalPgPool extends ThreadLocalPool<PgPool> implements PgPool {

private final PgConnectOptions pgConnectOptions;

public ThreadLocalPgPool(Vertx vertx, PgConnectOptions pgConnectOptions, PoolOptions poolOptions) {
super(vertx, poolOptions);
this.pgConnectOptions = pgConnectOptions;
}

@Override
protected PgPool createThreadLocalPool() {
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
}
}
10 changes: 10 additions & 0 deletions integration-tests/reactive-pg-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
Expand All @@ -51,6 +56,11 @@
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/test/resources</directory>
<filtering>true</filtering>
<targetPath>../test-classes</targetPath>
</resource>
</resources>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.quarkus.it.reactive.pg.client;

import java.util.concurrent.CompletionStage;

import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;

@Path("/hot-fruits")
public class HotReloadFruitResource {

@Inject
PgPool client;

@PostConstruct
void setupDb() {
client.query("DROP TABLE IF EXISTS fruits").execute()
.flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Orange')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pear')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Apple')").execute())
.await().indefinitely();
}

@GET
@Produces(MediaType.APPLICATION_JSON)
public CompletionStage<JsonArray> listFruits() {
return client.query("SELECT * FROM fruits").execute()
.map(pgRowSet -> {
JsonArray jsonArray = new JsonArray();
for (Row row : pgRowSet) {
jsonArray.add(toJson(row));
}
return jsonArray;
})
.subscribeAsCompletionStage();
}

private JsonObject toJson(Row row) {
return new JsonObject()
.put("id", row.getLong("id"))
.put("name", row.getString("name"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.quarkus.it.reactive.pg.client;

import static io.restassured.RestAssured.given;
import static org.hamcrest.CoreMatchers.containsString;

import java.util.List;
import java.util.function.Function;
import java.util.logging.LogRecord;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusDevModeTest;

public class HotReloadTestCase {
@RegisterExtension
final static QuarkusDevModeTest TEST = new QuarkusDevModeTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(HotReloadFruitResource.class)
.addAsResource("application-tl.properties", "application.properties"))
.setLogRecordPredicate(record -> {
return record.getLoggerName().startsWith("io.quarkus.reactive.datasource");
});

@AfterAll
public static void afterAll() {
List<LogRecord> records = TEST.getLogRecords();
Assertions.assertEquals(8, records.size());
// make sure that we closed all thread-local pools on reload and close
Assertions.assertEquals("Making pool for thread: %s", records.get(0).getMessage());
Assertions.assertEquals("Making pool for thread: %s", records.get(1).getMessage());
Assertions.assertEquals("Closing pool: %s", records.get(2).getMessage());
Assertions.assertEquals("Closing pool: %s", records.get(3).getMessage());
Assertions.assertEquals("Making pool for thread: %s", records.get(4).getMessage());
Assertions.assertEquals("Making pool for thread: %s", records.get(5).getMessage());
Assertions.assertEquals("Closing pool: %s", records.get(6).getMessage());
Assertions.assertEquals("Closing pool: %s", records.get(7).getMessage());
}

@Test
public void testAddNewFieldToEntity() {
checkRequest("Orange");
TEST.modifySourceFile(HotReloadFruitResource.class, new Function<String, String>() {
@Override
public String apply(String s) {
return s.replace("'Orange'", "'Strawberry'");
}
});
// trigger a pool hot reload by changing the config
TEST.modifyResourceFile("application.properties", new Function<String, String>() {
@Override
public String apply(String s) {
return s.replace("quarkus.datasource.reactive.thread-local=true",
"quarkus.datasource.reactive.thread-local = true");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the goal of this replacement? Might need a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to trigger a hot reload.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment. And is the change to the entity not enought to trigger one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it does not reload the pools. The entity change is to verify that a reload happened. I've added a comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok thanks. Off topic, but I wonder if we should improve on that: the configuration isn't really different so there shouldn't be any need to reload :)

}
});

checkRequest("Strawberry");
}

private void checkRequest(String fruit) {
given()
.when().get("/hot-fruits")
.then()
.statusCode(200)
.body(
containsString(fruit),
containsString("Pear"),
containsString("Apple"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=hibernate_orm_test
quarkus.datasource.password=hibernate_orm_test
quarkus.datasource.reactive.url=${reactive-postgres.url}
quarkus.datasource.reactive.thread-local=true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The configuration property isn't suggesting that it relates to an enhanced connection pool.

Maybe threadlocal-pool ? Or pool-strategy= [enum: "threadlocal" | "simple" ] ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, but max-size also relates to the pool without saying it, so…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gsmet how would you name it? You recently refactored all datasource configuration right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've un-blocked the PR. Let's go ahead with the feature as we need it asap for benchmarking, but I'd still like to revisit the configuration in a follow up.

quarkus.log.category."io.quarkus.reactive.datasource".level=DEBUG
Loading