Skip to content

Commit 71b0e9a

Browse files
authored
Merge pull request #393 from conductor-oss/sqlite_persistence_mtdao
Feature: Adding MetadataDAO implementation for Sqlite
2 parents 8e77db8 + fe5d09b commit 71b0e9a

File tree

10 files changed

+1005
-4
lines changed

10 files changed

+1005
-4
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,5 @@ build/
3636
# asdf version file
3737
.tool-versions
3838

39+
40+
.qodo

sqlite-persistence/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@ dependencies {
2121

2222

2323
implementation "org.springframework.boot:spring-boot-starter-jdbc"
24+
25+
testImplementation "org.apache.groovy:groovy-all:${revGroovy}"
26+
testImplementation project(':conductor-server')
27+
testImplementation project(':conductor-client')
28+
testImplementation project(':conductor-grpc-client')
29+
testImplementation project(':conductor-es7-persistence')
30+
31+
testImplementation project(':conductor-test-util').sourceSets.test.output
32+
testImplementation project(':conductor-common-persistence').sourceSets.test.output
2433
}
2534

2635
test {

sqlite-persistence/src/main/java/com/netflix/conductor/sqlite/config/SqliteConfiguration.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,23 @@
2424
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
2525
import org.springframework.boot.context.properties.ConfigurationProperties;
2626
import org.springframework.boot.context.properties.EnableConfigurationProperties;
27-
import org.springframework.context.annotation.*;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.context.annotation.DependsOn;
30+
import org.springframework.context.annotation.Import;
2831
import org.springframework.retry.RetryContext;
2932
import org.springframework.retry.backoff.NoBackOffPolicy;
3033
import org.springframework.retry.policy.SimpleRetryPolicy;
3134
import org.springframework.retry.support.RetryTemplate;
3235

3336
import com.netflix.conductor.sqlite.dao.*;
37+
import com.netflix.conductor.sqlite.dao.metadata.SqliteEventHandlerMetadataDAO;
38+
import com.netflix.conductor.sqlite.dao.metadata.SqliteMetadataDAO;
39+
import com.netflix.conductor.sqlite.dao.metadata.SqliteTaskMetadataDAO;
40+
import com.netflix.conductor.sqlite.dao.metadata.SqliteWorkflowMetadataDAO;
3441

3542
import com.fasterxml.jackson.databind.ObjectMapper;
36-
import jakarta.annotation.*;
43+
import jakarta.annotation.PostConstruct;
3744

3845
@Configuration(proxyBeanMethods = false)
3946
@EnableConfigurationProperties(SqliteProperties.class)
@@ -70,10 +77,35 @@ public Flyway flywayForPrimaryDb() {
7077
@Bean
7178
@DependsOn({"flywayForPrimaryDb"})
7279
public SqliteMetadataDAO sqliteMetadataDAO(
80+
SqliteTaskMetadataDAO taskMetadataDAO,
81+
SqliteWorkflowMetadataDAO workflowMetadataDAO,
82+
SqliteEventHandlerMetadataDAO eventHandlerMetadataDAO) {
83+
return new SqliteMetadataDAO(taskMetadataDAO, workflowMetadataDAO, eventHandlerMetadataDAO);
84+
}
85+
86+
@Bean
87+
@DependsOn({"flywayForPrimaryDb"})
88+
public SqliteEventHandlerMetadataDAO sqliteEventHandlerMetadataDAO(
89+
@Qualifier("sqliteRetryTemplate") RetryTemplate retryTemplate,
90+
ObjectMapper objectMapper) {
91+
return new SqliteEventHandlerMetadataDAO(retryTemplate, objectMapper, dataSource);
92+
}
93+
94+
@Bean
95+
@DependsOn({"flywayForPrimaryDb"})
96+
public SqliteWorkflowMetadataDAO sqliteWorkflowMetadataDAO(
7397
@Qualifier("sqliteRetryTemplate") RetryTemplate retryTemplate,
7498
ObjectMapper objectMapper,
7599
SqliteProperties properties) {
76-
return new SqliteMetadataDAO(retryTemplate, objectMapper, dataSource, properties);
100+
return new SqliteWorkflowMetadataDAO(retryTemplate, objectMapper, dataSource, properties);
101+
}
102+
103+
@Bean
104+
@DependsOn({"flywayForPrimaryDb"})
105+
public SqliteTaskMetadataDAO sqliteTaskMetadataDAO(
106+
@Qualifier("sqliteRetryTemplate") RetryTemplate retryTemplate,
107+
ObjectMapper objectMapper) {
108+
return new SqliteTaskMetadataDAO(retryTemplate, objectMapper, dataSource);
77109
}
78110

79111
@Bean

sqlite-persistence/src/main/java/com/netflix/conductor/sqlite/dao/SqliteBaseDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private <R> R getWithTransaction(final TransactionalFunction<R> function) {
114114
}
115115
}
116116

117-
<R> R getWithRetriedTransactions(final TransactionalFunction<R> function) {
117+
protected <R> R getWithRetriedTransactions(final TransactionalFunction<R> function) {
118118
try {
119119
return retryTemplate.execute(context -> getWithTransaction(function));
120120
} catch (Exception e) {
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2025 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.sqlite.dao.metadata;
14+
15+
import java.sql.Connection;
16+
import java.util.ArrayList;
17+
import java.util.List;
18+
19+
import javax.sql.DataSource;
20+
21+
import org.springframework.retry.support.RetryTemplate;
22+
23+
import com.netflix.conductor.common.metadata.events.EventHandler;
24+
import com.netflix.conductor.core.exception.ConflictException;
25+
import com.netflix.conductor.core.exception.NotFoundException;
26+
import com.netflix.conductor.sqlite.dao.SqliteBaseDAO;
27+
28+
import com.fasterxml.jackson.databind.ObjectMapper;
29+
import com.google.common.base.Preconditions;
30+
31+
public class SqliteEventHandlerMetadataDAO extends SqliteBaseDAO {
32+
33+
public SqliteEventHandlerMetadataDAO(
34+
RetryTemplate retryTemplate, ObjectMapper objectMapper, DataSource dataSource) {
35+
super(retryTemplate, objectMapper, dataSource);
36+
}
37+
38+
public void addEventHandler(EventHandler eventHandler) {
39+
Preconditions.checkNotNull(eventHandler.getName(), "EventHandler name cannot be null");
40+
41+
final String INSERT_EVENT_HANDLER_QUERY =
42+
"INSERT INTO meta_event_handler (name, event, active, json_data) "
43+
+ "VALUES (?, ?, ?, ?)";
44+
45+
withTransaction(
46+
tx -> {
47+
if (getEventHandler(tx, eventHandler.getName()) != null) {
48+
throw new ConflictException(
49+
"EventHandler with name "
50+
+ eventHandler.getName()
51+
+ " already exists!");
52+
}
53+
54+
execute(
55+
tx,
56+
INSERT_EVENT_HANDLER_QUERY,
57+
q ->
58+
q.addParameter(eventHandler.getName())
59+
.addParameter(eventHandler.getEvent())
60+
.addParameter(eventHandler.isActive())
61+
.addJsonParameter(eventHandler)
62+
.executeUpdate());
63+
});
64+
}
65+
66+
public void updateEventHandler(EventHandler eventHandler) {
67+
Preconditions.checkNotNull(eventHandler.getName(), "EventHandler name cannot be null");
68+
69+
// @formatter:off
70+
final String UPDATE_EVENT_HANDLER_QUERY =
71+
"UPDATE meta_event_handler SET "
72+
+ "event = ?, active = ?, json_data = ?, "
73+
+ "modified_on = CURRENT_TIMESTAMP WHERE name = ?";
74+
// @formatter:on
75+
76+
withTransaction(
77+
tx -> {
78+
EventHandler existing = getEventHandler(tx, eventHandler.getName());
79+
if (existing == null) {
80+
throw new NotFoundException(
81+
"EventHandler with name " + eventHandler.getName() + " not found!");
82+
}
83+
84+
execute(
85+
tx,
86+
UPDATE_EVENT_HANDLER_QUERY,
87+
q ->
88+
q.addParameter(eventHandler.getEvent())
89+
.addParameter(eventHandler.isActive())
90+
.addJsonParameter(eventHandler)
91+
.addParameter(eventHandler.getName())
92+
.executeUpdate());
93+
});
94+
}
95+
96+
public void removeEventHandler(String name) {
97+
final String DELETE_EVENT_HANDLER_QUERY = "DELETE FROM meta_event_handler WHERE name = ?";
98+
99+
withTransaction(
100+
tx -> {
101+
EventHandler existing = getEventHandler(tx, name);
102+
if (existing == null) {
103+
throw new NotFoundException(
104+
"EventHandler with name " + name + " not found!");
105+
}
106+
107+
execute(
108+
tx,
109+
DELETE_EVENT_HANDLER_QUERY,
110+
q -> q.addParameter(name).executeDelete());
111+
});
112+
}
113+
114+
public List<EventHandler> getEventHandlersForEvent(String event, boolean activeOnly) {
115+
final String READ_ALL_EVENT_HANDLER_BY_EVENT_QUERY =
116+
"SELECT json_data FROM meta_event_handler WHERE event = ?";
117+
return queryWithTransaction(
118+
READ_ALL_EVENT_HANDLER_BY_EVENT_QUERY,
119+
q -> {
120+
q.addParameter(event);
121+
return q.executeAndFetch(
122+
rs -> {
123+
List<EventHandler> handlers = new ArrayList<>();
124+
while (rs.next()) {
125+
EventHandler h = readValue(rs.getString(1), EventHandler.class);
126+
if (!activeOnly || h.isActive()) {
127+
handlers.add(h);
128+
}
129+
}
130+
131+
return handlers;
132+
});
133+
});
134+
}
135+
136+
public List<EventHandler> getAllEventHandlers() {
137+
final String READ_ALL_EVENT_HANDLER_QUERY = "SELECT json_data FROM meta_event_handler";
138+
return queryWithTransaction(
139+
READ_ALL_EVENT_HANDLER_QUERY, q -> q.executeAndFetch(EventHandler.class));
140+
}
141+
142+
private EventHandler getEventHandler(Connection connection, String name) {
143+
final String READ_ONE_EVENT_HANDLER_QUERY =
144+
"SELECT json_data FROM meta_event_handler WHERE name = ?";
145+
146+
return query(
147+
connection,
148+
READ_ONE_EVENT_HANDLER_QUERY,
149+
q -> q.addParameter(name).executeAndFetchFirst(EventHandler.class));
150+
}
151+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright 2025 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.sqlite.dao.metadata;
14+
15+
import java.util.List;
16+
import java.util.Optional;
17+
18+
import com.netflix.conductor.common.metadata.events.EventHandler;
19+
import com.netflix.conductor.common.metadata.tasks.TaskDef;
20+
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
21+
import com.netflix.conductor.dao.EventHandlerDAO;
22+
import com.netflix.conductor.dao.MetadataDAO;
23+
24+
import lombok.RequiredArgsConstructor;
25+
26+
@RequiredArgsConstructor
27+
public class SqliteMetadataDAO implements MetadataDAO, EventHandlerDAO {
28+
29+
private final SqliteTaskMetadataDAO taskMetadataDAO;
30+
private final SqliteWorkflowMetadataDAO workflowMetadataDAO;
31+
private final SqliteEventHandlerMetadataDAO eventHandlerMetadataDAO;
32+
33+
@Override
34+
public TaskDef createTaskDef(TaskDef taskDef) {
35+
return taskMetadataDAO.createTaskDef(taskDef);
36+
}
37+
38+
@Override
39+
public TaskDef updateTaskDef(TaskDef taskDef) {
40+
return taskMetadataDAO.updateTaskDef(taskDef);
41+
}
42+
43+
@Override
44+
public TaskDef getTaskDef(String name) {
45+
return taskMetadataDAO.getTaskDef(name);
46+
}
47+
48+
@Override
49+
public List<TaskDef> getAllTaskDefs() {
50+
return taskMetadataDAO.getAllTaskDefs();
51+
}
52+
53+
@Override
54+
public void removeTaskDef(String name) {
55+
taskMetadataDAO.removeTaskDef(name);
56+
}
57+
58+
@Override
59+
public void createWorkflowDef(WorkflowDef def) {
60+
workflowMetadataDAO.createWorkflowDef(def);
61+
}
62+
63+
@Override
64+
public void updateWorkflowDef(WorkflowDef def) {
65+
workflowMetadataDAO.updateWorkflowDef(def);
66+
}
67+
68+
@Override
69+
public Optional<WorkflowDef> getLatestWorkflowDef(String name) {
70+
return workflowMetadataDAO.getLatestWorkflowDef(name);
71+
}
72+
73+
@Override
74+
public Optional<WorkflowDef> getWorkflowDef(String name, int version) {
75+
return workflowMetadataDAO.getWorkflowDef(name, version);
76+
}
77+
78+
@Override
79+
public void removeWorkflowDef(String name, Integer version) {
80+
workflowMetadataDAO.removeWorkflowDef(name, version);
81+
}
82+
83+
@Override
84+
public List<WorkflowDef> getAllWorkflowDefs() {
85+
return workflowMetadataDAO.getAllWorkflowDefs();
86+
}
87+
88+
@Override
89+
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
90+
return workflowMetadataDAO.getAllWorkflowDefsLatestVersions();
91+
}
92+
93+
@Override
94+
public void addEventHandler(EventHandler eventHandler) {
95+
eventHandlerMetadataDAO.addEventHandler(eventHandler);
96+
}
97+
98+
@Override
99+
public void updateEventHandler(EventHandler eventHandler) {
100+
eventHandlerMetadataDAO.updateEventHandler(eventHandler);
101+
}
102+
103+
@Override
104+
public void removeEventHandler(String name) {
105+
eventHandlerMetadataDAO.removeEventHandler(name);
106+
}
107+
108+
@Override
109+
public List<EventHandler> getAllEventHandlers() {
110+
return eventHandlerMetadataDAO.getAllEventHandlers();
111+
}
112+
113+
@Override
114+
public List<EventHandler> getEventHandlersForEvent(String event, boolean activeOnly) {
115+
return eventHandlerMetadataDAO.getEventHandlersForEvent(event, activeOnly);
116+
}
117+
118+
public List<String> findAll() {
119+
return workflowMetadataDAO.findAll();
120+
}
121+
122+
public List<WorkflowDef> getAllLatest() {
123+
return workflowMetadataDAO.getAllLatest();
124+
}
125+
126+
public List<WorkflowDef> getAllVersions(String name) {
127+
return workflowMetadataDAO.getAllVersions(name);
128+
}
129+
}

0 commit comments

Comments
 (0)