Skip to content

fix(dataflow): do not create new KafkaStreams app for existing pipelines #5550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 2, 2024

Conversation

lc525
Copy link
Member

@lc525 lc525 commented Apr 25, 2024

This fixes a dataflow-engine bug triggered when the scheduler sends/re-sends pipeline creation messages after a restart (because it's not aware of their status across various components).

Previous behaviour:
Dataflow-engine, on receiving a command to create a pipeline, would first create a new Kafka Streams application for this pipeline, before checking if one already exists and it's running.

Because of this, triggering a control-plane restart of the scheduler would result in dataflow errors for pipelines that kept internal state (mostly pipelines making use of triggers/joins). Kafka Streams would complain about an existing application using the same state directory, fail the newly created pipeline and inform the scheduler about this.

However, in actuality the old pipeline, if it was previously running ok, would continue doing so inside dataflow. This meant that a disconnect between the state of dataflow-engine and what the scheduler knew about it was being created

New behaviour:
The introduced changes mean that dataflow-engine first checks if a pipeline with the same id is already running. If its state is ok, dataflow simply informs the scheduler that the pipeline is created, without taking further action.

If a pipeline with the same id already exists but is in a failed state, it is first stopped (local Kafka Streams state is cleaned), then an attempt is made to re-create it, with the corresponding status being sent to the scheduler.

Which issue(s) this PR fixes:

  • INFRA-978 (internal issue) Pipelines fail to start because Kafka Streams state hasn't been cleared

Special notes for your reviewer:

  • Tested by restarting scheduler while a pipeline using state (a join) was running on dataflow engine
  • Ran pipeline smoke-tests to confirm the change hasn't introduced unexpected behaviour

@lc525 lc525 added the v2 label Apr 25, 2024
@lc525 lc525 requested a review from sakoush as a code owner April 25, 2024 12:34
@lc525 lc525 force-pushed the fix-create-existing-pipeline branch 2 times, most recently from ca6a027 to e034353 Compare April 25, 2024 13:48
This fixes a dataflow-engine bug triggered when the scheduler sends or
re-sends pipeline creation messages after a restart (because it's not aware
of pipeline statuses across various components).

Previous behaviour:
Dataflow-engine, on receiving a command to create a pipeline, would first
create a new Kafka Streams application for this pipeline, before checking
if one already exists and it's running.

Because of this, triggering a control-plane restart of the scheduler would
result in dataflow errors for pipelines that kept internal state (mostly
pipelines making use of triggers/joins). Kafka Streams would complain
about an existing application using the same state directory, fail the newly
created pipeline and inform the scheduler about this.

However, in actuality the old pipeline, if it was previously running ok,
would continue doing so inside dataflow. This meant that a disconnect between
the state of dataflow-engine and what the scheduler knew about it was being
created

New behaviour:
The introduced changes mean that dataflow-engine first checks if a pipeline
with the same id is already running. If its state is ok, dataflow simply
informs the scheduler that the pipeline is created, without taking further
action.

If a pipeline with the same id already exists but is in a failed state,
it is first stopped (local Kafka Streams state is cleaned), then an attempt
is made to re-create it, with the corresponding status being sent to the
scheduler.
@lc525 lc525 force-pushed the fix-create-existing-pipeline branch from e034353 to a07605a Compare April 25, 2024 14:04
Copy link
Contributor

@sakoush sakoush left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. I left a comment on a potential state inconsistency that is not clear to me.

metadata = metadata,
operation = PipelineOperation.Create,
success = true,
reason = previous.status.getDescription() ?: "pipeline created",
Copy link
Contributor

@sakoush sakoush Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that this code is used elsewhere (line 255 in this file) should we at least make pipeline created message as a constant?

Copy link
Contributor

@sakoush sakoush Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also a particular pipeline can be in a rebalance state / creating state? ie if the dataflow is loading the pipeline, the scheduler restarted and resending the load request to the same pipeline, we are returning ready?

or the assumption as you mentioned that it is single threaded so we guarantee that the first load cmd has finished (it is not clear 100% for me).

Copy link
Member Author

@lc525 lc525 Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the first point:
I was first considering putting "pipeline running" here, but at the moment those constants don't really play any role (the reason is taken from the pipeline.status.getDescription() and only if that doesn't exist we return the constant). However, for all the states we currently use, the description exists. I'll define this default value as a constant though, it makes sense.

To the second point:
On the control plane operations, indeed: if dataflow is within the handleCreate() for a pipeline then it will not process other scheduler messages until that pipeline either gets into a "running" state or fails. The concurrent case would be considerably more complex to handle here. What I don't know (and I'll check) is how the code in handleCreate() acts if the scheduler grpc endpoint disappears. It certainly it won't be able to send the status updates. There might be some grpc-level retries, but i suspect that eventually I'd get an exception.

More generally, I believe this is a fair point/question which is worth some discussion. To start with the current state: we don't issue "as-it-happens" updates about the state of the pipeline to the scheduler, not even if the pipeline fails entirely. The only time we do issue updates is during the first creation of the pipeline: i.e. we wait for it to become "running" before confirming that it was created successfully, and if it fails before reaching that "running" state, we return that particular error. I believe it would make sense to start issuing updates "during the pipeline lifetime" (like kafka streams rebalances), but it would be a separate PR, and I need to think carefully/test so that we don't spam the scheduler unnecessarily.

Now onto the code here: I've identified that a previous pipeline (1) exists and (2) is not in an error state. So I took the decision to return "success" to the "Create" scheduler operation, and return the actual description of the current state as the reason. The logic is that the create was certainly successful previously (pipeline not in an error state), and we keep consistency with not providing up-to-date status updates. For example, if the pipeline is currently "Rebalancing", and I'm returning success=false, at the moment there isn't a code path to re-issue the pipelineUpdateEvent later when rebalancing is done.

There is also a concurrency issue: the pipeline might be "Running" now, but it could have changed to an error state since I've read its state, so I'd still send something wrong to the scheduler. So in general, I take the view that the scheduler should receive updates about less transient events (once the pipeline state is somewhat more "final"). The only issue there is that the "Rebalancing" state is actually quite disruptive: dataflow processing for the pipeline stops during rebalancing as the clients re-divide the partitions they are responsible to read/write from amongst themselves. So this might be of interest to a scheduler, but practically it can't to much (besides inform end-users): By definition, all clients part of the same Kafka Streams app (pipeline) pause at the same time, to do the rebalancing.

Copy link
Contributor

@sakoush sakoush Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed response, what happens if the scheduler disconnects while dataflow is processing handleCreate? Is the execution of handleCreate interrupted or is it still guaranteed to finish?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the bit that I want to check. It will depend on the grpc-layer setup and how it reacts to disconnections. It is conceivable that at some point it could throw an exception during the client.pipelineUpdateEvent(...) rpc call, which we don't handle.

Copy link
Member Author

@lc525 lc525 Apr 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The results of tests here indicate reasonable behaviour: if the scheduler goes away while dataflow is in handleCreate, the pipeline creation logic continues to work correctly. More exactly, handleCreate() doesn't get interrupted up to the point where it needs to send a pipelineUpdateEvent to the scheduler (the pipeline gets created, waited for to get into "Running" state, etc). Now, we have two cases, both ending up with the similar behaviour:

  1. If the scheduler is not yet up when handleCreate calls the first grpc function (to update the scheduler state)
  • a io.grpc.StatusException is thrown from handleCreate(), which is caught in subscribePipelines(), which finishes by reporting the error
  • the functionality to retry connecting to the scheduler in subscribe() kicks in, and continues until a scheduler becomes available
  • when the new scheduler becomes available, dataflow connects to it, receives new "Create" messages, and responds with the actual status of the pipelines (which we failed to transmit initially). If the pipeline ended up in an error state, we retry creating it at this point
  1. If a new scheduler instance has come up in the meantime
  • the io.grpc.StatusException still gets thrown, because the previous grpc connection transport has dropped; a new connection is established via the retry mechanism in subscribe() and everything behaves as in case 1. above.

A note on how this is tested:

  • if one restarts dataflow-engine, the scheduler re-sends pipeline create messages to the new instance, therefore handleCreate() gets called. However, the creation process takes a long time (tens of seconds), because Kafka Streams needs to sync with remote state from Kafka topics. This gives us time to kill the scheduler while handleCreate() waits for the pipeline to start and transition to a Running state.

Copy link
Contributor

@sakoush sakoush left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks for adding tests. I left some questions but it is good to go.

metadata.id,
previous.status.getDescription(),
)
previous.stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a note in the code why do we need to call explicitly stop

Comment on lines +110 to +111
this is StreamStarting || this is Started -> true
this.state in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the difference between these two conditions? is the second one happening at runtime perhaps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When one calls pipeline.start(), the state becomes StreamStarting. It only transitions to Started when the underlying Kafka Streams application declares that it's "Started" (i.e after successful rebalance).


fun isError(): Boolean {
return when {
this is Error -> true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pipeline status is controlled both internally within the Pipeline object (for example when Kafka Streams transitions from one state to another), but can also be set externally (i.e if one determines via application logic external to the pipeline that an error has occured: for example, if we couldn't create all the Kafka topics required by the pipeline).

PipelineStatus.Error is a way for objects external to the Pipeline to declare those kinds of errors. The other error states are controlled internally within the Pipeline.

"StreamStopped(prevState=nested StreamStopped without error)",
!IS_ACTIVE,
!IS_ERROR,
PipelineStatus.StreamStopped(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have this condition for real? multiple restarts perhaps to the scheduler?

Copy link
Member Author

@lc525 lc525 May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the sense that: A pipeline is in state PipelineStatus.Started(). If stop() is called on this pipeline, then it has state s1=PipelineStatus.StreamStopped(PipelineStatus.Started()).

Now, say stop() gets called again on this pipeline. The new state will be initialised as s2=PipelineStatus.StreamStopped(s1). s2 is essentially what gets tested above. However, because in the implementation of StreamStopped we take measures not to deeply nest StreamStopped states,
s2 == s1 (here == would compare the two states by their internal contents)

private const val IS_ERROR = true

@JvmStatic
fun checkState(): Stream<Arguments> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to parametrize choices as opposed to listing them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is already parameterised (it is written once in the checkState function above -- the one outside the companion object). The following lists the states and the expected results for the given state. If one were to automate the way the expected results are created then you would need to test that the automation produces the expected expected results and so on... So I think further parameterising here would not help, but perhaps I'm misunderstanding your point

Copy link
Member Author

@lc525 lc525 May 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After our discussion: it would be indeed worthwhile to think of ways of ensuring that all possible combinations have been covered; Those type of combinatorial tests are possible, but not for the case where the expected values need to be manually provided per test case. In this case, the logic of isActive() and isError() would need to be replicated within the test, providing less value.

"PipelineStatus(state=non-error,active state, hasError=true)",
IS_ACTIVE,
IS_ERROR,
PipelineStatus(KafkaStreams.State.CREATED, true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how can it be created and has error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't typically encounter this, but the goal of "hasError" is to mark that there has been some exception/error message recorded together with the state. It could be that it's not a fatal error (the pipeline still got created), but there was an exception and we recovered.

- remove duplicate `PipelineState` test case
- test `StreamStopped(prevState)` with `prevState=StreamStopped` avoids nesting
- update test names for clarity
- update build config to print test PASS/SKIP/FAIL results
- add `test` target to Makefile
@lc525 lc525 merged commit c346347 into SeldonIO:v2 May 2, 2024
jtayl222 pushed a commit to jtayl222/seldon-core that referenced this pull request Jul 20, 2025
…nes (SeldonIO#5550)

This fixes a dataflow-engine bug triggered when the scheduler sends or
re-sends pipeline creation messages after a restart (because it's not aware
of pipeline statuses across various components).

Previous behaviour:
Dataflow-engine, on receiving a command to create a pipeline, would first
create a new Kafka Streams application for this pipeline, before checking
if one already exists and it's running.

Because of this, triggering a control-plane restart of the scheduler would
result in dataflow errors for pipelines that kept internal state (mostly
pipelines making use of triggers/joins). Kafka Streams would complain
about an existing application using the same state directory, fail the newly
created pipeline and inform the scheduler about this.

However, in actuality the old pipeline, if it was previously running ok,
would continue doing so inside dataflow. This meant that a disconnect between
the state of dataflow-engine and what the scheduler knew about it was being
created

New behaviour:
The introduced changes mean that dataflow-engine first checks if a pipeline
with the same id is already running. If its state is ok, dataflow simply
informs the scheduler that the pipeline is created, without taking further
action.

If a pipeline with the same id already exists but is in a failed or stopped state,
the local Kafka Streams state is first cleaned, then an attempt
is made to re-create it, with the corresponding status being sent to the
scheduler.

- adds `PipelineState` tests
- test `StreamStopped(prevState)` with `prevState=StreamStopped` avoids nesting
- update test names for clarity
- update build config to print test PASS/SKIP/FAIL results
- add `test` target to Makefile

**Which issue(s) this PR fixes:**
- INFRA-978 (internal issue) Pipelines fail to start because Kafka Streams state hasn't been cleared
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants