Skip to content

fix(dataflow): do not create new KafkaStreams app for existing pipelines #5550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,57 @@ class PipelineSubscriber(
kafkaConsumerGroupIdPrefix: String,
namespace: String,
) {
logger.info(
"Create pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}",
metadata.name,
metadata.version,
metadata.id,
)
// If a pipeline with the same id exists, we assume it has the same name & version
// If it's in an error state, try re-creating.
//
// WARNING: at the moment handleCreate is called sequentially on each update in
// Flow<PipelineUpdateMessage> from subscribePipelines(). This allows us to sidestep issues
// related to race conditions on `pipelines.containsKey(...)` below. If we ever move to
// concurrent creation of pipelines, this needs to be revisited.
if (pipelines.containsKey(metadata.id)) {
val previous = pipelines[metadata.id]!!
if (!previous.status.isError) {
client.pipelineUpdateEvent(
makePipelineUpdateEvent(
metadata = metadata,
operation = PipelineOperation.Create,
success = true,
reason = previous.status.getDescription() ?: "pipeline created",
Copy link
Contributor

@sakoush sakoush Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that this code is used elsewhere (line 255 in this file) should we at least make pipeline created message as a constant?

Copy link
Contributor

@sakoush sakoush Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a particular pipeline can be in a rebalance state / creating state? ie if the dataflow is loading the pipeline, the scheduler restarted and resending the load request to the same pipeline, we are returning ready?

or the assumption as you mentioned that it is single threaded so we guarantee that the first load cmd has finished (it is not clear 100% for me).

Copy link
Member Author

@lc525 lc525 Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the first point:
I was first considering putting "pipeline running" here, but at the moment those constants don't really play any role (the reason is taken from the pipeline.status.getDescription() and only if that doesn't exist we return the constant). However, for all the states we currently use, the description exists. I'll define this default value as a constant though, it makes sense.

To the second point:
On the control plane operations, indeed: if dataflow is within the handleCreate() for a pipeline then it will not process other scheduler messages until that pipeline either gets into a "running" state or fails. The concurrent case would be considerably more complex to handle here. What I don't know (and I'll check) is how the code in handleCreate() acts if the scheduler grpc endpoint disappears. It certainly it won't be able to send the status updates. There might be some grpc-level retries, but i suspect that eventually I'd get an exception.

More generally, I believe this is a fair point/question which is worth some discussion. To start with the current state: we don't issue "as-it-happens" updates about the state of the pipeline to the scheduler, not even if the pipeline fails entirely. The only time we do issue updates is during the first creation of the pipeline: i.e. we wait for it to become "running" before confirming that it was created successfully, and if it fails before reaching that "running" state, we return that particular error. I believe it would make sense to start issuing updates "during the pipeline lifetime" (like kafka streams rebalances), but it would be a separate PR, and I need to think carefully/test so that we don't spam the scheduler unnecessarily.

Now onto the code here: I've identified that a previous pipeline (1) exists and (2) is not in an error state. So I took the decision to return "success" to the "Create" scheduler operation, and return the actual description of the current state as the reason. The logic is that the create was certainly successful previously (pipeline not in an error state), and we keep consistency with not providing up-to-date status updates. For example, if the pipeline is currently "Rebalancing", and I'm returning success=false, at the moment there isn't a code path to re-issue the pipelineUpdateEvent later when rebalancing is done.

There is also a concurrency issue: the pipeline might be "Running" now, but it could have changed to an error state since I've read its state, so I'd still send something wrong to the scheduler. So in general, I take the view that the scheduler should receive updates about less transient events (once the pipeline state is somewhat more "final"). The only issue there is that the "Rebalancing" state is actually quite disruptive: dataflow processing for the pipeline stops during rebalancing as the clients re-divide the partitions they are responsible to read/write from amongst themselves. So this might be of interest to a scheduler, but practically it can't to much (besides inform end-users): By definition, all clients part of the same Kafka Streams app (pipeline) pause at the same time, to do the rebalancing.

Copy link
Contributor

@sakoush sakoush Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response, what happens if the scheduler disconnects while dataflow is processing handleCreate? Is the execution of handleCreate interrupted or is it still guaranteed to finish?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the bit that I want to check. It will depend on the grpc-layer setup and how it reacts to disconnections. It is conceivable that at some point it could throw an exception during the client.pipelineUpdateEvent(...) rpc call, which we don't handle.

Copy link
Member Author

@lc525 lc525 Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The results of tests here indicate reasonable behaviour: if the scheduler goes away while dataflow is in handleCreate, the pipeline creation logic continues to work correctly. More exactly, handleCreate() doesn't get interrupted up to the point where it needs to send a pipelineUpdateEvent to the scheduler (the pipeline gets created, waited for to get into "Running" state, etc). Now, we have two cases, both ending up with the similar behaviour:

  1. If the scheduler is not yet up when handleCreate calls the first grpc function (to update the scheduler state)
  • a io.grpc.StatusException is thrown from handleCreate(), which is caught in subscribePipelines(), which finishes by reporting the error
  • the functionality to retry connecting to the scheduler in subscribe() kicks in, and continues until a scheduler becomes available
  • when the new scheduler becomes available, dataflow connects to it, receives new "Create" messages, and responds with the actual status of the pipelines (which we failed to transmit initially). If the pipeline ended up in an error state, we retry creating it at this point
  1. If a new scheduler instance has come up in the meantime
  • the io.grpc.StatusException still gets thrown, because the previous grpc connection transport has dropped; a new connection is established via the retry mechanism in subscribe() and everything behaves as in case 1. above.

A note on how this is tested:

  • if one restarts dataflow-engine, the scheduler re-sends pipeline create messages to the new instance, therefore handleCreate() gets called. However, the creation process takes a long time (tens of seconds), because Kafka Streams needs to sync with remote state from Kafka topics. This gives us time to kill the scheduler while handleCreate() waits for the pipeline to start and transition to a Running state.

),
)
logger.debug(
"response to scheduler: pipeline {pipelineName} continues to run normally; " +
"pipeline version: {pipelineVersion}, id: {pipelineId}",
metadata.name,
metadata.version,
metadata.id,
)
return
} else { // pipeline exists but in failed state; cleanup state and re-create
logger.info(
"Recreating failed pipeline {pipelineName} version: {pipelineVersion}, id: {pipelineId}",
metadata.name,
metadata.version,
metadata.id,
)
logger.debug(
"Previous state for failed pipeline {pipelineName} version: {pipelineVersion}, id: {pipelineId}: {pipelineStatus}",
metadata.name,
metadata.version,
metadata.id,
previous.status.getDescription(),
)
previous.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a note in the code why do we need to call explicitly stop

}
} else { // pipeline doesn't exist
logger.info(
"Creating pipeline {pipelineName} version: {pipelineVersion} id: {pipelineId}",
metadata.name,
metadata.version,
metadata.id,
)
}

val (pipeline, err) =
Pipeline.forSteps(
metadata,
Expand Down Expand Up @@ -184,28 +229,19 @@ class PipelineSubscriber(
return
}

val previous = pipelines.putIfAbsent(metadata.id, pipeline)
var pipelineStatus: PipelineStatus
if (previous == null) {
val errTopics = kafkaAdmin.ensureTopicsExist(steps)
if (errTopics == null) {
pipelineStatus = pipeline.start()
} else {
pipelineStatus =
PipelineStatus.Error(null)
.withException(errTopics)
.withMessage("kafka streams topic creation error")
pipeline.stop()
}
// This overwrites any previous pipelines with the same id. We can only get here if those previous pipelines
// are in a failed state and they are being re-created by the scheduler.
pipelines[metadata.id] = pipeline
val pipelineStatus: PipelineStatus
val errTopics = kafkaAdmin.ensureTopicsExist(steps)
if (errTopics == null) {
pipelineStatus = pipeline.start()
} else {
pipelineStatus = previous.status
logger.warn("pipeline {pipelineName} with id {pipelineId} already exists", metadata.name, metadata.id)
if (pipelineStatus.isError) {
// do not try to resuscitate an existing pipeline if in a failed state
// it's up to the scheduler to delete it & reinitialize it, as it might require
// coordination with {model, pipeline}gateway
previous.stop()
}
pipelineStatus =
PipelineStatus.Error(null)
.withException(errTopics)
.withMessage("kafka streams topic creation error")
pipeline.stop()
}

// We don't want to mark the PipelineOperation.Create as successful unless the
Expand All @@ -215,7 +251,7 @@ class PipelineSubscriber(
if (pipelineStatus !is PipelineStatus.Started) {
pipelineStatus.isError = true
}
pipelineStatus.log(logger, Level.INFO)
pipelineStatus.log(logger, Level.DEBUG)
client.pipelineUpdateEvent(
makePipelineUpdateEvent(
metadata = metadata,
Expand Down