Skip to content

Commit 6b545ee

Browse files
authored
Merge pull request #420 from Robban1980/feature/kafka-workflow-event-listener
[Feature] Adding support for Kafka in the Workflow Event Listener
2 parents 54791b4 + e9688d0 commit 6b545ee

File tree

9 files changed

+612
-23
lines changed

9 files changed

+612
-23
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2025 Conductor Authors.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package com.netflix.conductor.common.run;
14+
15+
import java.util.Map;
16+
17+
import com.netflix.conductor.annotations.protogen.ProtoField;
18+
19+
import com.fasterxml.jackson.annotation.JsonIgnore;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
22+
/** Extended version of WorkflowSummary that retains input/output as Map */
23+
public class WorkflowSummaryExtended extends WorkflowSummary {
24+
25+
@ProtoField(id = 9) // Ensure Protobuf compatibility
26+
@JsonIgnore
27+
private Map<String, Object> inputMap;
28+
29+
@ProtoField(id = 10)
30+
@JsonIgnore
31+
private Map<String, Object> outputMap;
32+
33+
public WorkflowSummaryExtended(Workflow workflow) {
34+
super(workflow);
35+
if (workflow.getInput() != null) {
36+
this.inputMap = workflow.getInput();
37+
}
38+
if (workflow.getOutput() != null) {
39+
this.outputMap = workflow.getOutput();
40+
}
41+
}
42+
43+
/** New method for JSON serialization */
44+
@JsonProperty("input")
45+
public Map<String, Object> getInputMap() {
46+
return inputMap;
47+
}
48+
49+
/** New method for JSON serialization */
50+
@JsonProperty("output")
51+
public Map<String, Object> getOutputMap() {
52+
return outputMap;
53+
}
54+
55+
public void setInputMap(Map<String, Object> inputMap) {
56+
this.inputMap = inputMap;
57+
}
58+
59+
public void setOutputMap(Map<String, Object> outputMap) {
60+
this.outputMap = outputMap;
61+
}
62+
}

core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorOps.java

Lines changed: 106 additions & 20 deletions
Large diffs are not rendered by default.

core/src/main/java/com/netflix/conductor/core/listener/WorkflowStatusListener.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,29 @@
1414

1515
import com.netflix.conductor.model.WorkflowModel;
1616

17+
import com.fasterxml.jackson.annotation.JsonValue;
18+
1719
/** Listener for the completed and terminated workflows */
1820
public interface WorkflowStatusListener {
1921

22+
enum WorkflowEventType {
23+
STARTED,
24+
RERAN,
25+
RETRIED,
26+
PAUSED,
27+
RESUMED,
28+
RESTARTED,
29+
COMPLETED,
30+
TERMINATED,
31+
FINALIZED;
32+
33+
@JsonValue // Ensures correct JSON serialization
34+
@Override
35+
public String toString() {
36+
return name().toLowerCase(); // Convert to lowercase for consistency
37+
}
38+
}
39+
2040
default void onWorkflowCompletedIfEnabled(WorkflowModel workflow) {
2141
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
2242
onWorkflowCompleted(workflow);
@@ -35,9 +55,57 @@ default void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) {
3555
}
3656
}
3757

58+
default void onWorkflowStartedIfEnabled(WorkflowModel workflow) {
59+
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
60+
onWorkflowStarted(workflow);
61+
}
62+
}
63+
64+
default void onWorkflowRestartedIfEnabled(WorkflowModel workflow) {
65+
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
66+
onWorkflowRestarted(workflow);
67+
}
68+
}
69+
70+
default void onWorkflowRerunIfEnabled(WorkflowModel workflow) {
71+
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
72+
onWorkflowRerun(workflow);
73+
}
74+
}
75+
76+
default void onWorkflowRetriedIfEnabled(WorkflowModel workflow) {
77+
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
78+
onWorkflowRetried(workflow);
79+
}
80+
}
81+
82+
default void onWorkflowPausedIfEnabled(WorkflowModel workflow) {
83+
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
84+
onWorkflowPaused(workflow);
85+
}
86+
}
87+
88+
default void onWorkflowResumedIfEnabled(WorkflowModel workflow) {
89+
if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) {
90+
onWorkflowResumed(workflow);
91+
}
92+
}
93+
3894
void onWorkflowCompleted(WorkflowModel workflow);
3995

4096
void onWorkflowTerminated(WorkflowModel workflow);
4197

4298
default void onWorkflowFinalized(WorkflowModel workflow) {}
99+
100+
default void onWorkflowStarted(WorkflowModel workflow) {}
101+
102+
default void onWorkflowRestarted(WorkflowModel workflow) {}
103+
104+
default void onWorkflowRerun(WorkflowModel workflow) {}
105+
106+
default void onWorkflowPaused(WorkflowModel workflow) {}
107+
108+
default void onWorkflowResumed(WorkflowModel workflow) {}
109+
110+
default void onWorkflowRetried(WorkflowModel workflow) {}
43111
}

workflow-event-listener/README.md

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
# Workflow Event Listeners
22
Workflow Event listeners can be configured for the purpose in Conductor:
3-
1. Remove and/or archive workflows from primary datasource (e.g. Redis) once the workflow reaches a terminal status
4-
2. Publish a message to a conductor queue as the workflows complete that can be used to trigger other workflows
3+
1. Remove and/or archive workflows from primary datasource (e.g. Redis) once the workflow reaches a terminal status.
4+
2. Publish a message to a conductor queue as the workflows complete that can be used to trigger other workflows.
5+
3. Publish workflow status changes to Kafka as it moves along its lifecycle.
56

67
## Published Artifacts
78

@@ -47,3 +48,65 @@ conductor.workflow-status-listener.queue-publisher.failureQueue=_callbackFailure
4748
#Queue for terminal state workflows (success or failed)
4849
conductor.workflow-status-listener.queue-publisher.finalizeQueue=_callbackFinalizeQueue
4950
```
51+
52+
### Kafka Publisher
53+
Publish a summary of workflow WorkflowSummary to a Kafka topic(s) as a workflow moves through its lifecycle.
54+
55+
This publisher introduced some new events
56+
- STARTED
57+
- RERAN
58+
- RETRIED
59+
- PAUSED
60+
- RESUMED
61+
- RESTARTED
62+
- COMPLETED (supported by queue_publisher)
63+
- TERMINATED (supported by queue_publisher)
64+
- FINALIZED (supported by queue_publisher)
65+
66+
Example of a default configuration:
67+
68+
```properties
69+
conductor.workflow-status-listener.type=kafka
70+
71+
# Kafka Producer Configurations
72+
conductor.workflow-status-listener.kafka.producer.bootstrap.servers=kafka:29092
73+
74+
# Kafka Producer Configuration
75+
conductor.workflow-status-listener.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
76+
conductor.workflow-status-listener.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
77+
78+
# Reliability Settings
79+
conductor.workflow-status-listener.kafka.producer.acks=all
80+
conductor.workflow-status-listener.kafka.producer.enable.idempotence=true
81+
82+
# Retry sending messages if failure
83+
conductor.workflow-status-listener.kafka.producer.retries=5
84+
conductor.workflow-status-listener.kafka.producer.retry.backoff.ms=100
85+
86+
# Allow batching (default 0)
87+
conductor.workflow-status-listener.kafka.producer.linger.ms=10
88+
conductor.workflow-status-listener.kafka.producer.batch.size=65536
89+
conductor.workflow-status-listener.kafka.producer.buffer.memory=67108864
90+
# Reduce network load
91+
conductor.workflow-status-listener.kafka.producer.compression.type=zstd
92+
93+
# Allow multiple in-flight messages (better throughput)
94+
conductor.workflow-status-listener.kafka.producer.max.in.flight.requests.per.connection=1
95+
96+
# Default Topic for All Workflow Status Events
97+
conductor.workflow-status-listener.kafka.default-topic=workflow-status-events
98+
99+
```
100+
101+
For configuration it supports the Kafka Producer clients settings prefixed with `conductor.workflow-status-listener.kafka.producer`.
102+
103+
`conductor.workflow-status-listener.kafka.default-topic` defines the default topic to use for all events.
104+
Each event can also have its dedicated topic prefix the proeprty with `conductor.workflow-status-listener.kafka.event-topics.` followed by the event name in lowercase.
105+
106+
Example of using specific topics for the events:
107+
```properties
108+
# Custom Topics for Specific Events
109+
conductor.workflow-status-listener.kafka.event-topics.completed=workflow-completed-events
110+
conductor.workflow-status-listener.kafka.event-topics.terminated=workflow-terminated-events
111+
conductor.workflow-status-listener.kafka.event-topics.started=workflow-started-events
112+
```

workflow-event-listener/build.gradle

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ dependencies {
1313
implementation "org.apache.commons:commons-lang3:"
1414
implementation group: 'org.apache.httpcomponents', name: 'httpclient', version: '4.5.14'
1515
implementation "com.amazonaws:aws-java-sdk-s3:${revAwsSdk}"
16+
implementation "org.apache.kafka:kafka-clients:${revKafka}"
17+
18+
implementation "com.fasterxml.jackson.core:jackson-databind"
19+
implementation "com.fasterxml.jackson.core:jackson-core"
1620

1721
compileOnly 'org.springframework.boot:spring-boot-starter'
1822
compileOnly 'org.springframework.boot:spring-boot-starter-web'
@@ -21,14 +25,15 @@ dependencies {
2125
testImplementation "org.apache.groovy:groovy-all:${revGroovy}"
2226
testImplementation "org.spockframework:spock-core:${revSpock}"
2327
testImplementation "org.spockframework:spock-spring:${revSpock}"
28+
2429
implementation "org.springframework.boot:spring-boot-starter-log4j2"
2530
testImplementation 'org.springframework.retry:spring-retry'
2631
testImplementation 'org.springframework.boot:spring-boot-starter-web'
32+
2733
testImplementation "com.netflix.dyno-queues:dyno-queues-redis:${revDynoQueues}"
2834
testImplementation project(':conductor-test-util').sourceSets.test.output
2935

3036
//In memory
3137
implementation "org.rarefiedredis.redis:redis-java:${revRarefiedRedis}"
3238
testImplementation "redis.clients:jedis:${revJedis}"
33-
3439
}

0 commit comments

Comments
 (0)