Skip to content

Commit 83ce8da

Browse files
committed
OPIK-2164: Add more async inserts
1 parent c5d1f84 commit 83ce8da

File tree

9 files changed

+32
-13
lines changed

9 files changed

+32
-13
lines changed

apps/opik-backend/src/main/java/com/comet/opik/domain/FeedbackScoreDAO.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import com.comet.opik.api.FeedbackScore;
44
import com.comet.opik.api.FeedbackScoreItem;
55
import com.comet.opik.api.ScoreSource;
6+
import com.comet.opik.infrastructure.OpikConfiguration;
67
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
8+
import com.comet.opik.utils.ClickhouseUtils;
79
import com.comet.opik.utils.TemplateUtils;
810
import com.google.common.base.Preconditions;
911
import com.google.inject.ImplementedBy;
@@ -92,7 +94,7 @@ INSERT INTO feedback_scores(
9294
source,
9395
created_by,
9496
last_updated_by
95-
)
97+
) <settings_clause>
9698
VALUES
9799
<items:{item |
98100
(
@@ -253,6 +255,7 @@ ORDER BY (workspace_id, project_id, entity_type, entity_id, name) DESC, last_upd
253255
""";
254256

255257
private final @NonNull TransactionTemplateAsync asyncTemplate;
258+
private final @NonNull OpikConfiguration opikConfiguration;
256259

257260
@Override
258261
@WithSpan
@@ -338,6 +341,10 @@ public Mono<Long> scoreBatchOf(@NonNull EntityType entityType,
338341

339342
ST template = TemplateUtils.getBatchSql(BULK_INSERT_FEEDBACK_SCORE, scores.size());
340343

344+
if (opikConfiguration.getAsyncInsert().enabled()) {
345+
template.add("settings_clause", ClickhouseUtils.ASYNC_INSERT);
346+
}
347+
341348
var statement = connection.createStatement(template.render());
342349

343350
bindParameters(entityType, scores, statement);

apps/opik-backend/src/main/java/com/comet/opik/domain/evaluators/AutomationRuleEvaluatorLogsDAO.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import ch.qos.logback.classic.spi.ILoggingEvent;
44
import com.comet.opik.api.LogCriteria;
55
import com.comet.opik.api.LogItem;
6+
import com.comet.opik.infrastructure.OpikConfiguration;
7+
import com.comet.opik.utils.ClickhouseUtils;
68
import com.google.inject.ImplementedBy;
79
import io.r2dbc.spi.ConnectionFactory;
810
import io.r2dbc.spi.Row;
@@ -33,8 +35,8 @@ public interface AutomationRuleEvaluatorLogsDAO extends UserLogTableDAO {
3335

3436
List<String> CUSTOM_MARKER_KEYS = List.of("trace_id", "thread_model_id");
3537

36-
static AutomationRuleEvaluatorLogsDAO create(ConnectionFactory factory) {
37-
return new AutomationRuleEvaluatorLogsDAOImpl(factory);
38+
static AutomationRuleEvaluatorLogsDAO create(ConnectionFactory factory, OpikConfiguration configuration) {
39+
return new AutomationRuleEvaluatorLogsDAOImpl(factory, configuration);
3840
}
3941

4042
Mono<LogPage> findLogs(LogCriteria criteria);
@@ -48,6 +50,7 @@ class AutomationRuleEvaluatorLogsDAOImpl implements AutomationRuleEvaluatorLogsD
4850

4951
private static final String INSERT_STATEMENT = """
5052
INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule_id, message, markers)
53+
<settings_clause>
5154
VALUES <items:{item |
5255
(
5356
parseDateTime64BestEffort(:timestamp<item.index>, 9),
@@ -72,6 +75,7 @@ INSERT INTO automation_rule_evaluator_logs (timestamp, level, workspace_id, rule
7275
""";
7376

7477
private final @NonNull ConnectionFactory connectionFactory;
78+
private final @NonNull OpikConfiguration opikConfiguration;
7579

7680
public Mono<LogPage> findLogs(@NonNull LogCriteria criteria) {
7781
return Mono.from(connectionFactory.create())
@@ -135,6 +139,10 @@ public Mono<Void> saveAll(@NonNull List<ILoggingEvent> events) {
135139
.flatMapMany(connection -> {
136140
var template = new ST(INSERT_STATEMENT);
137141

142+
if (opikConfiguration.getAsyncInsert().enabled()) {
143+
template.add("settings_clause", ClickhouseUtils.ASYNC_INSERT);
144+
}
145+
138146
List<QueryItem> queryItems = getQueryItemPlaceHolder(events.size());
139147
template.add("items", queryItems);
140148
Statement statement = connection.createStatement(template.render());

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ protected void configure() {
2828

2929
// Initialize the UserFacingRuleLollingFactory
3030
UserFacingLoggingFactory.init(connectionFactory, clickHouseLogAppenderConfig.getBatchSize(),
31-
clickHouseLogAppenderConfig.getFlushIntervalDuration());
31+
clickHouseLogAppenderConfig.getFlushIntervalDuration(), configuration(OpikConfiguration.class));
3232
}
3333

3434
@Provides

apps/opik-backend/src/main/java/com/comet/opik/domain/IdGeneratorImpl.java renamed to apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/IdGeneratorImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
package com.comet.opik.domain;
1+
package com.comet.opik.infrastructure.db;
22

3+
import com.comet.opik.domain.IdGenerator;
34
import com.fasterxml.uuid.Generators;
45
import com.fasterxml.uuid.impl.TimeBasedEpochGenerator;
56

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/IdGeneratorModule.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.comet.opik.infrastructure.db;
22

33
import com.comet.opik.domain.IdGenerator;
4-
import com.comet.opik.domain.IdGeneratorImpl;
54
import com.comet.opik.infrastructure.OpikConfiguration;
65
import com.google.inject.Provides;
76
import jakarta.inject.Singleton;

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/UserFacingLoggingFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import ch.qos.logback.classic.AsyncAppender;
44
import ch.qos.logback.classic.Logger;
55
import ch.qos.logback.classic.LoggerContext;
6+
import com.comet.opik.infrastructure.OpikConfiguration;
67
import com.comet.opik.infrastructure.log.tables.UserLogTableFactory;
78
import io.r2dbc.spi.ConnectionFactory;
89
import lombok.NonNull;
@@ -18,9 +19,9 @@ public class UserFacingLoggingFactory {
1819
private static AsyncAppender asyncAppender;
1920

2021
public static synchronized void init(@NonNull ConnectionFactory connectionFactory, int batchSize,
21-
@NonNull Duration flushIntervalSeconds) {
22+
@NonNull Duration flushIntervalSeconds, @NonNull OpikConfiguration configuration) {
2223

23-
UserLogTableFactory tableFactory = UserLogTableFactory.getInstance(connectionFactory);
24+
UserLogTableFactory tableFactory = UserLogTableFactory.getInstance(connectionFactory, configuration);
2425
ClickHouseAppender clickHouseAppender = ClickHouseAppender.init(tableFactory, batchSize, flushIntervalSeconds,
2526
CONTEXT);
2627

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/log/tables/UserLogTableFactory.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import ch.qos.logback.classic.spi.ILoggingEvent;
44
import com.comet.opik.domain.evaluators.AutomationRuleEvaluatorLogsDAO;
55
import com.comet.opik.domain.evaluators.UserLog;
6+
import com.comet.opik.infrastructure.OpikConfiguration;
67
import io.r2dbc.spi.ConnectionFactory;
78
import lombok.NonNull;
89
import reactor.core.publisher.Mono;
@@ -12,8 +13,9 @@
1213

1314
public interface UserLogTableFactory {
1415

15-
static UserLogTableFactory getInstance(ConnectionFactory factory) {
16-
return new UserLogTableFactoryImpl(factory);
16+
static UserLogTableFactory getInstance(@NonNull ConnectionFactory factory,
17+
@NonNull OpikConfiguration configuration) {
18+
return new UserLogTableFactoryImpl(factory, configuration);
1719
}
1820

1921
interface UserLogTableDAO {
@@ -28,9 +30,9 @@ class UserLogTableFactoryImpl implements UserLogTableFactory {
2830

2931
private final Map<UserLog, UserLogTableDAO> daoMap;
3032

31-
UserLogTableFactoryImpl(@NonNull ConnectionFactory factory) {
33+
UserLogTableFactoryImpl(@NonNull ConnectionFactory factory, @NonNull OpikConfiguration configuration) {
3234
daoMap = Map.of(
33-
UserLog.AUTOMATION_RULE_EVALUATOR, AutomationRuleEvaluatorLogsDAO.create(factory));
35+
UserLog.AUTOMATION_RULE_EVALUATOR, AutomationRuleEvaluatorLogsDAO.create(factory, configuration));
3436
}
3537

3638
@Override

apps/opik-backend/src/test/java/com/comet/opik/api/resources/v1/priv/DatasetsResourceIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
import com.comet.opik.api.sorting.SortingFactoryDatasets;
88
import com.comet.opik.domain.DatasetItemService;
99
import com.comet.opik.domain.DatasetService;
10-
import com.comet.opik.domain.IdGeneratorImpl;
1110
import com.comet.opik.domain.Streamer;
1211
import com.comet.opik.domain.filter.FilterQueryBuilder;
1312
import com.comet.opik.infrastructure.auth.RequestContext;
13+
import com.comet.opik.infrastructure.db.IdGeneratorImpl;
1414
import com.comet.opik.infrastructure.json.JsonNodeMessageBodyWriter;
1515
import com.comet.opik.podam.PodamFactoryUtils;
1616
import com.comet.opik.utils.JsonUtils;

apps/opik-backend/src/test/java/com/comet/opik/domain/TraceServiceImplTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.comet.opik.api.sorting.TraceSortingFactory;
66
import com.comet.opik.api.sorting.TraceThreadSortingFactory;
77
import com.comet.opik.infrastructure.auth.RequestContext;
8+
import com.comet.opik.infrastructure.db.IdGeneratorImpl;
89
import com.comet.opik.infrastructure.db.TransactionTemplateAsync;
910
import com.comet.opik.infrastructure.lock.LockService;
1011
import com.comet.opik.utils.ErrorUtils;

0 commit comments

Comments
 (0)