-
Notifications
You must be signed in to change notification settings - Fork 1.2k
chasm(callback): implement backoff and invocation executor #8499
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use chasm/statemachine.go
and copy over all of the state transitions from the HSM implementation.
chasm/lib/callback/executor.go
Outdated
_, err := chasm.ReadComponent( | ||
ctx, | ||
invokerRef, | ||
func(c *Callback, ctx chasm.Context, _ any) (struct{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return what you need here instead of cloning the entire proto IMHO. I typically would rather not cloning components for use outside of the chasm contexts.
chasm/lib/callback/executor.go
Outdated
task *callbackspb.InvocationTask, | ||
) error { | ||
var ns *namespace.Namespace | ||
// var invoker *Invoker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover comment.
chasm/lib/callback/executor.go
Outdated
) error { | ||
var ns *namespace.Namespace | ||
// var invoker *Invoker | ||
var callback *Callback |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be the return type from ReadComponent
.
chasm/lib/callback/executor.go
Outdated
_ chasm.TaskAttributes, | ||
_ *callbackspb.InvocationTask, | ||
) (bool, error) { | ||
return callback.Status == callbackspb.CALLBACK_STATUS_SCHEDULED, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also validate the attempt matches, copy the attempt to the invocation task when scheduled. This would prevent duplicate tasks from being considered valid in some cases.
chasm/lib/callback/executor.go
Outdated
case *callbackspb.Callback_Nexus: | ||
// Parse URL to extract scheme and host, matching HSM's behavior | ||
// from statemachine.go:86-90 | ||
u, err := url.Parse(variant.Nexus.Url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chaptersix we should follow up and change this to use the destination in the token in case the URL is temporal://system
.
map<string, string> header = 2; | ||
} | ||
|
||
message InvokerState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
chasm/lib/callback/component.go
Outdated
|
||
// Fields from HSM's nexusInvocation struct (nexus_invocation.go:35-40) | ||
// These hold the invocation context needed for the HTTP request | ||
completion nexusrpc.OperationCompletion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't hold this here, create a separate struct for that as we did in the HSM version.
chasm/lib/callback/component.go
Outdated
// - HSM invocationResultOK -> CHASM CALLBACK_STATUS_SUCCEEDED | ||
// - HSM invocationResultRetry -> CHASM CALLBACK_STATUS_BACKING_OFF | ||
// - HSM invocationResultFail -> CHASM CALLBACK_STATUS_FAILED | ||
func (c *Callback) invoke( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be a method on the component. I know we did this pattern in scheduler but I do not want us to reference components in general outside of the chasm context, it's too error prone and hard to reason about.
chasm/lib/callback/executor.go
Outdated
) | ||
defer cancel() | ||
|
||
result := callback.invoke(callCtx, ns, e, taskAttributes, task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're missing the chasm variant that @lina-temporal recently added.
|
||
string workflow_id = 10; | ||
string run_id = 11; | ||
|
||
string namespace_id = 12; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Namespace is used to get request timeout from dynamic config on a per-namespace basis
What changed?
Adding an
InvocationTaskExecutor
andBackoffTaskExecutor
tochasm/lib/callback
Why?
Second step of migrating callback from HSM -> CHASM
How did you test it?
Potential risks
None, this is not integrated.