Skip to content

Commit ca6a027

Browse files
committed
fix(dataflow): do not create new KafkaStreams app for existing pipelines
This fixes a dataflow-engine bug triggered when the scheduler will sends/re-sends pipeline creation messages after a restart (because it's not aware of their status across various components). Previous behaviour: Dataflow-engine, on receiving a command to create a pipeline, would first create a new Kafka Streams application for this pipeline, before checking if one already exists and it's running. Because of this, triggering a control-plane restart of the scheduler in 2.8.1 would result in dataflow errors for pipelines that kept internal state (mostly pipelines making use of triggers/joins). Kafka Streams would complain about an existing application using the same state directory, fail the newly created pipeline and inform the scheduler about this. However, in actuality the old pipeline, if it was previously running ok, would continue doing so inside dataflow. This meant that a disconnect between the state of dataflow-engine and what the scheduler knew about it was being created New behaviour: The introduced changes mean that dataflow-engine first checks if a pipeline with the same id is already running. If its state is ok, dataflow simply informs the scheduler that the pipeline is created, without taking further action. If a pipeline with the same id already exists but is in a failed state, it is first stopped (local Kafka Streams state is cleaned), then an attempt is made to re-create it, with the corresponding status being sent to the scheduler.
1 parent a23fbc8 commit ca6a027

File tree

1 file changed

+64
-28
lines changed

1 file changed

+64
-28
lines changed

scheduler/data-flow/src/main/kotlin/io/seldon/dataflow/PipelineSubscriber.kt

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,57 @@ class PipelineSubscriber(
141141
kafkaConsumerGroupIdPrefix: String,
142142
namespace: String,
143143
) {
144-
logger.info(
145-
"Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}",
146-
metadata.name,
147-
metadata.version,
148-
metadata.id,
149-
)
144+
// If a pipeline with the same id exists, we assume it has the same name & version
145+
// If it's in an error state, try re-creating.
146+
//
147+
// WARNING: at the moment handleCreate is called sequentially on each update in
148+
// Flow<PipelineUpdateMessage> from subscribePipelines(). This allows us to sidestep issues
149+
// related to race conditions on `pipelines.containsKey(...)` below. If we ever move to
150+
// concurrent creation of pipelines, this needs to be revisited.
151+
if (pipelines.containsKey(metadata.id)) {
152+
val previous = pipelines[metadata.id]!!
153+
if (!previous.status.isError) {
154+
client.pipelineUpdateEvent(
155+
makePipelineUpdateEvent(
156+
metadata = metadata,
157+
operation = PipelineOperation.Create,
158+
success = true,
159+
reason = previous.status.getDescription() ?: "pipeline created",
160+
),
161+
)
162+
logger.debug(
163+
"response to scheduler: pipeline {pipelineName} continues to run normally; " +
164+
"pipeline version: {pipelineVersion}, id: {pipelineId}",
165+
metadata.name,
166+
metadata.version,
167+
metadata.id,
168+
)
169+
return
170+
} else { // pipeline exists but in failed state; cleanup state and re-create
171+
logger.info(
172+
"Recreating failed pipeline {pipelineName} version: {pipelineVersion}, id: {pipelineId}",
173+
metadata.name,
174+
metadata.version,
175+
metadata.id,
176+
)
177+
logger.debug(
178+
"Previous state for failed pipeline {pipelineName} version: {pipelineVersion}, id: {pipelineId}: {pipelineStatus}",
179+
metadata.name,
180+
metadata.version,
181+
metadata.id,
182+
previous.status.getDescription(),
183+
)
184+
previous.stop()
185+
}
186+
} else { // pipeline doesn't exist
187+
logger.info(
188+
"Creating pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}",
189+
metadata.name,
190+
metadata.version,
191+
metadata.id,
192+
)
193+
}
194+
150195
val (pipeline, err) =
151196
Pipeline.forSteps(
152197
metadata,
@@ -184,28 +229,19 @@ class PipelineSubscriber(
184229
return
185230
}
186231

187-
val previous = pipelines.putIfAbsent(metadata.id, pipeline)
188-
var pipelineStatus: PipelineStatus
189-
if (previous == null) {
190-
val errTopics = kafkaAdmin.ensureTopicsExist(steps)
191-
if (errTopics == null) {
192-
pipelineStatus = pipeline.start()
193-
} else {
194-
pipelineStatus =
195-
PipelineStatus.Error(null)
196-
.withException(errTopics)
197-
.withMessage("kafka streams topic creation error")
198-
pipeline.stop()
199-
}
232+
// This overwrites any previous pipelines with the same id. We can only get here if those previous pipelines
233+
// are in a failed state and they are being re-created by the scheduler.
234+
pipelines[metadata.id] = pipeline
235+
val pipelineStatus: PipelineStatus
236+
val errTopics = kafkaAdmin.ensureTopicsExist(steps)
237+
if (errTopics == null) {
238+
pipelineStatus = pipeline.start()
200239
} else {
201-
pipelineStatus = previous.status
202-
logger.warn("pipeline {pipelineName} with id {pipelineId} already exists", metadata.name, metadata.id)
203-
if (pipelineStatus.isError) {
204-
// do not try to resuscitate an existing pipeline if in a failed state
205-
// it's up to the scheduler to delete it & reinitialize it, as it might require
206-
// coordination with {model, pipeline}gateway
207-
previous.stop()
208-
}
240+
pipelineStatus =
241+
PipelineStatus.Error(null)
242+
.withException(errTopics)
243+
.withMessage("kafka streams topic creation error")
244+
pipeline.stop()
209245
}
210246

211247
// We don't want to mark the PipelineOperation.Create as successful unless the
@@ -215,7 +251,7 @@ class PipelineSubscriber(
215251
if (pipelineStatus !is PipelineStatus.Started) {
216252
pipelineStatus.isError = true
217253
}
218-
pipelineStatus.log(logger, Level.INFO)
254+
pipelineStatus.log(logger, Level.DEBUG)
219255
client.pipelineUpdateEvent(
220256
makePipelineUpdateEvent(
221257
metadata = metadata,

0 commit comments

Comments
 (0)