Skip to content

Commit d2433d8

Browse files
committed
Added a setting to turn the reactive Pools into thread-local pools
1 parent 0c0b348 commit d2433d8

File tree

12 files changed

+319
-0
lines changed

12 files changed

+319
-0
lines changed

extensions/reactive-datasource/runtime/src/main/java/io/quarkus/reactive/datasource/runtime/DataSourceReactiveRuntimeConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,10 @@ public class DataSourceReactiveRuntimeConfig {
8888
*/
8989
@ConfigItem
9090
public PfxConfiguration keyCertificatePfx;
91+
92+
/**
93+
* Experimental: use one connection pool per thread.
94+
*/
95+
@ConfigItem
96+
public Optional<Boolean> threadLocal;
9197
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package io.quarkus.reactive.datasource.runtime;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.atomic.AtomicReference;
6+
7+
import org.jboss.logging.Logger;
8+
9+
import io.vertx.core.AsyncResult;
10+
import io.vertx.core.Handler;
11+
import io.vertx.core.Vertx;
12+
import io.vertx.sqlclient.Pool;
13+
import io.vertx.sqlclient.PoolOptions;
14+
import io.vertx.sqlclient.PreparedQuery;
15+
import io.vertx.sqlclient.Query;
16+
import io.vertx.sqlclient.Row;
17+
import io.vertx.sqlclient.RowSet;
18+
import io.vertx.sqlclient.SqlConnection;
19+
import io.vertx.sqlclient.Transaction;
20+
21+
public abstract class ThreadLocalPool<PoolType extends Pool> implements Pool {
22+
23+
private static final Logger log = Logger.getLogger(ThreadLocalPool.class);
24+
25+
private final AtomicReference<ThreadLocal<PoolType>> pool = new AtomicReference<>(new ThreadLocal<>());
26+
private static final List<Pool> threadLocalPools = new ArrayList<>();
27+
28+
protected final PoolOptions poolOptions;
29+
protected final Vertx vertx;
30+
31+
public ThreadLocalPool(Vertx vertx, PoolOptions poolOptions) {
32+
this.vertx = vertx;
33+
this.poolOptions = poolOptions;
34+
}
35+
36+
private PoolType pool() {
37+
ThreadLocal<PoolType> poolThreadLocal = pool.get();
38+
PoolType ret = poolThreadLocal.get();
39+
if (ret == null) {
40+
log.debugf("Making pool for thread: %s", Thread.currentThread());
41+
ret = createThreadLocalPool();
42+
synchronized (threadLocalPools) {
43+
threadLocalPools.add(ret);
44+
}
45+
poolThreadLocal.set(ret);
46+
}
47+
return ret;
48+
}
49+
50+
protected abstract PoolType createThreadLocalPool();
51+
52+
@Override
53+
public void getConnection(Handler<AsyncResult<SqlConnection>> handler) {
54+
pool().getConnection(handler);
55+
}
56+
57+
@Override
58+
public Query<RowSet<Row>> query(String sql) {
59+
return pool().query(sql);
60+
}
61+
62+
@Override
63+
public PreparedQuery<RowSet<Row>> preparedQuery(String sql) {
64+
return pool().preparedQuery(sql);
65+
}
66+
67+
@Override
68+
public void begin(Handler<AsyncResult<Transaction>> handler) {
69+
pool().begin(handler);
70+
}
71+
72+
/**
73+
* This is a bit weird because it works on all ThreadLocal pools, but it's only
74+
* called from a single thread, when doing shutdown, and needs to close all the
75+
* pools and reinitialise the thread local so that all newly created pools after
76+
* the restart will start with an empty thread local instead of a closed one.
77+
*/
78+
@Override
79+
public void close() {
80+
// close all the thread-local pools
81+
synchronized (threadLocalPools) {
82+
for (Pool pool : threadLocalPools) {
83+
log.debugf("Closing pool: %s", pool);
84+
pool.close();
85+
}
86+
threadLocalPools.clear();
87+
}
88+
// discard the TL to clear them all
89+
pool.set(new ThreadLocal<PoolType>());
90+
}
91+
}

extensions/reactive-db2-client/runtime/src/main/java/io/quarkus/reactive/db2/client/runtime/DB2PoolRecorder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ private DB2Pool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntim
4646
dataSourceReactiveDB2Config);
4747
DB2ConnectOptions connectOptions = toConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
4848
dataSourceReactiveDB2Config);
49+
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
50+
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
51+
return new ThreadLocalDB2Pool(vertx, connectOptions, poolOptions);
52+
}
4953
return DB2Pool.pool(vertx, connectOptions, poolOptions);
5054
}
5155

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.quarkus.reactive.db2.client.runtime;
2+
3+
import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.db2client.DB2ConnectOptions;
6+
import io.vertx.db2client.DB2Pool;
7+
import io.vertx.sqlclient.PoolOptions;
8+
9+
public class ThreadLocalDB2Pool extends ThreadLocalPool<DB2Pool> implements DB2Pool {
10+
11+
private final DB2ConnectOptions db2ConnectOptions;
12+
13+
public ThreadLocalDB2Pool(Vertx vertx, DB2ConnectOptions db2ConnectOptions, PoolOptions poolOptions) {
14+
super(vertx, poolOptions);
15+
this.db2ConnectOptions = db2ConnectOptions;
16+
}
17+
18+
@Override
19+
protected DB2Pool createThreadLocalPool() {
20+
return DB2Pool.pool(vertx, db2ConnectOptions, poolOptions);
21+
}
22+
}

extensions/reactive-mysql-client/runtime/src/main/java/io/quarkus/reactive/mysql/client/runtime/MySQLPoolRecorder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ private MySQLPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRunt
6868
dataSourceReactiveMySQLConfig);
6969
MySQLConnectOptions mysqlConnectOptions = toMySQLConnectOptions(dataSourceRuntimeConfig,
7070
dataSourceReactiveRuntimeConfig, dataSourceReactiveMySQLConfig);
71+
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
72+
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
73+
return new ThreadLocalMySQLPool(vertx, mysqlConnectOptions, poolOptions);
74+
}
7175
return MySQLPool.pool(vertx, mysqlConnectOptions, poolOptions);
7276
}
7377

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.quarkus.reactive.mysql.client.runtime;
2+
3+
import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.mysqlclient.MySQLConnectOptions;
6+
import io.vertx.mysqlclient.MySQLPool;
7+
import io.vertx.sqlclient.PoolOptions;
8+
9+
public class ThreadLocalMySQLPool extends ThreadLocalPool<MySQLPool> implements MySQLPool {
10+
11+
private final MySQLConnectOptions mySQLConnectOptions;
12+
13+
public ThreadLocalMySQLPool(Vertx vertx, MySQLConnectOptions mySQLConnectOptions, PoolOptions poolOptions) {
14+
super(vertx, poolOptions);
15+
this.mySQLConnectOptions = mySQLConnectOptions;
16+
}
17+
18+
@Override
19+
protected MySQLPool createThreadLocalPool() {
20+
return MySQLPool.pool(vertx, mySQLConnectOptions, poolOptions);
21+
}
22+
}

extensions/reactive-pg-client/runtime/src/main/java/io/quarkus/reactive/pg/client/runtime/PgPoolRecorder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ private PgPool initialize(Vertx vertx, DataSourceRuntimeConfig dataSourceRuntime
6868
dataSourceReactivePostgreSQLConfig);
6969
PgConnectOptions pgConnectOptions = toPgConnectOptions(dataSourceRuntimeConfig, dataSourceReactiveRuntimeConfig,
7070
dataSourceReactivePostgreSQLConfig);
71+
if (dataSourceReactiveRuntimeConfig.threadLocal.isPresent() &&
72+
dataSourceReactiveRuntimeConfig.threadLocal.get()) {
73+
return new ThreadLocalPgPool(vertx, pgConnectOptions, poolOptions);
74+
}
7175
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
7276
}
7377

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.quarkus.reactive.pg.client.runtime;
2+
3+
import io.quarkus.reactive.datasource.runtime.ThreadLocalPool;
4+
import io.vertx.core.Vertx;
5+
import io.vertx.pgclient.PgConnectOptions;
6+
import io.vertx.pgclient.PgPool;
7+
import io.vertx.sqlclient.PoolOptions;
8+
9+
public class ThreadLocalPgPool extends ThreadLocalPool<PgPool> implements PgPool {
10+
11+
private final PgConnectOptions pgConnectOptions;
12+
13+
public ThreadLocalPgPool(Vertx vertx, PgConnectOptions pgConnectOptions, PoolOptions poolOptions) {
14+
super(vertx, poolOptions);
15+
this.pgConnectOptions = pgConnectOptions;
16+
}
17+
18+
@Override
19+
protected PgPool createThreadLocalPool() {
20+
return PgPool.pool(vertx, pgConnectOptions, poolOptions);
21+
}
22+
}

integration-tests/reactive-pg-client/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838
<artifactId>quarkus-junit5</artifactId>
3939
<scope>test</scope>
4040
</dependency>
41+
<dependency>
42+
<groupId>io.quarkus</groupId>
43+
<artifactId>quarkus-junit5-internal</artifactId>
44+
<scope>test</scope>
45+
</dependency>
4146
<dependency>
4247
<groupId>io.rest-assured</groupId>
4348
<artifactId>rest-assured</artifactId>
@@ -51,6 +56,11 @@
5156
<directory>src/main/resources</directory>
5257
<filtering>true</filtering>
5358
</resource>
59+
<resource>
60+
<directory>src/test/resources</directory>
61+
<filtering>true</filtering>
62+
<targetPath>../test-classes</targetPath>
63+
</resource>
5464
</resources>
5565
<plugins>
5666
<plugin>
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.quarkus.it.reactive.pg.client;
2+
3+
import java.util.concurrent.CompletionStage;
4+
5+
import javax.annotation.PostConstruct;
6+
import javax.inject.Inject;
7+
import javax.ws.rs.GET;
8+
import javax.ws.rs.Path;
9+
import javax.ws.rs.Produces;
10+
import javax.ws.rs.core.MediaType;
11+
12+
import io.vertx.core.json.JsonArray;
13+
import io.vertx.core.json.JsonObject;
14+
import io.vertx.mutiny.pgclient.PgPool;
15+
import io.vertx.mutiny.sqlclient.Row;
16+
17+
@Path("/hot-fruits")
18+
public class HotReloadFruitResource {
19+
20+
@Inject
21+
PgPool client;
22+
23+
@PostConstruct
24+
void setupDb() {
25+
client.query("DROP TABLE IF EXISTS fruits").execute()
26+
.flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
27+
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Orange')").execute())
28+
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pear')").execute())
29+
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Apple')").execute())
30+
.await().indefinitely();
31+
}
32+
33+
@GET
34+
@Produces(MediaType.APPLICATION_JSON)
35+
public CompletionStage<JsonArray> listFruits() {
36+
return client.query("SELECT * FROM fruits").execute()
37+
.map(pgRowSet -> {
38+
JsonArray jsonArray = new JsonArray();
39+
for (Row row : pgRowSet) {
40+
jsonArray.add(toJson(row));
41+
}
42+
return jsonArray;
43+
})
44+
.subscribeAsCompletionStage();
45+
}
46+
47+
private JsonObject toJson(Row row) {
48+
return new JsonObject()
49+
.put("id", row.getLong("id"))
50+
.put("name", row.getString("name"));
51+
}
52+
53+
}

0 commit comments

Comments
 (0)