Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 52 additions & 13 deletions workflow/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ func getFunctionName(f interface{}) (string, error) {
}

callSplit := strings.Split(runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name(), ".")

funcName := callSplit[len(callSplit)-1]

if funcName == "1" {
Expand All @@ -114,18 +113,50 @@ func wrapWorkflow(w Workflow) task.Orchestrator {
}
}

type registerOptions struct {
Name string
}

type registerOption func(*registerOptions) error

// WithName allows you to specify a custom name for the workflow or activity being registered.
// Activities and Workflows registered without an explicit name will use the function name as the name.
func WithName(name string) registerOption {
return func(opts *registerOptions) error {
opts.Name = name
return nil
}
}

func processRegisterOptions(options registerOptions, opts ...registerOption) (registerOptions, error) {
for _, opt := range opts {
if err := opt(&options); err != nil {
return options, fmt.Errorf("failed processing options: %w", err)
}
}
return options, nil
}

// RegisterWorkflow adds a workflow function to the registry
func (ww *WorkflowWorker) RegisterWorkflow(w Workflow) error {
func (ww *WorkflowWorker) RegisterWorkflow(w Workflow, opts ...registerOption) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks the func API- please can we create new funcs which take an options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which API do you think this break?
Technically this is backwards compatible 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is somewhat compatible however there are some cases if you're implementing it within an interface or using reflection it would be a breaking change as the function signature is distinctly different

I think in the interests of time I would be inclined to agree that a new method with the variadic parameters for the time being would be the best path forwards while a deprecation/upgrade path is defined. Wdyt?

Copy link
Contributor Author

@tscolari tscolari Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind adding another function for this,
my only concern is that it will mean that there will be 2 functions doing the same work - and removing one in the future will then indeed cause a breaking change, which means a bigger pain long term. The question is how much would add the optional parameter here affect existing users vs the perpetual ergonomic cost of adding to the api.

wrappedOrchestration := wrapWorkflow(w)

// get the function name for the passed workflow
name, err := getFunctionName(w)
options, err := processRegisterOptions(registerOptions{}, opts...)
if err != nil {
return fmt.Errorf("failed to get workflow decorator: %v", err)
return err
}

if options.Name == "" {
// get the function name for the passed workflow if there's
// no explicit name provided.
name, err := getFunctionName(w)
if err != nil {
return fmt.Errorf("failed to get workflow decorator: %v", err)
}
options.Name = name
}

err = ww.tasks.AddOrchestratorN(name, wrappedOrchestration)
return err
return ww.tasks.AddOrchestratorN(options.Name, wrappedOrchestration)
}

func wrapActivity(a Activity) task.Activity {
Expand All @@ -143,17 +174,25 @@ func wrapActivity(a Activity) task.Activity {
}

// RegisterActivity adds an activity function to the registry
func (ww *WorkflowWorker) RegisterActivity(a Activity) error {
func (ww *WorkflowWorker) RegisterActivity(a Activity, opts ...registerOption) error {
wrappedActivity := wrapActivity(a)

// get the function name for the passed activity
name, err := getFunctionName(a)
options, err := processRegisterOptions(registerOptions{}, opts...)
if err != nil {
return fmt.Errorf("failed to get activity decorator: %v", err)
return err
}

if options.Name == "" {
// get the function name for the passed workflow if there's
// no explicit name provided.
name, err := getFunctionName(a)
if err != nil {
return fmt.Errorf("failed to get activity decorator: %v", err)
}
options.Name = name
}

err = ww.tasks.AddActivityN(name, wrappedActivity)
return err
return ww.tasks.AddActivityN(options.Name, wrappedActivity)
}

// Start initialises a non-blocking worker to handle workflows and activities registered
Expand Down
52 changes: 52 additions & 0 deletions workflow/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package workflow

import (
"errors"
"testing"

daprClient "github.com/dapr/go-sdk/client"
Expand Down Expand Up @@ -43,22 +44,46 @@ func TestWorkflowRuntime(t *testing.T) {
t.Run("register workflow", func(t *testing.T) {
err := testWorker.RegisterWorkflow(testWorkflow)
require.NoError(t, err)

t.Run("with explicit name", func(t *testing.T) {
err := testWorker.RegisterWorkflow(testWorkflow, WithName("MyWorkflow"))
require.NoError(t, err)
})
})
t.Run("register workflow - anonymous func", func(t *testing.T) {
err := testWorker.RegisterWorkflow(func(ctx *WorkflowContext) (any, error) {
return nil, nil
})
require.Error(t, err)

t.Run("with explicit name", func(t *testing.T) {
err := testWorker.RegisterWorkflow(func(ctx *WorkflowContext) (any, error) {
return nil, nil
}, WithName("MyWorkflow2"))
require.NoError(t, err)
})
})
t.Run("register activity", func(t *testing.T) {
err := testWorker.RegisterActivity(testActivity)
require.NoError(t, err)

t.Run("with explicit name", func(t *testing.T) {
err := testWorker.RegisterActivity(testActivity, WithName("MyActivity"))
require.NoError(t, err)
})
})
t.Run("register activity - anonymous func", func(t *testing.T) {
err := testWorker.RegisterActivity(func(ctx ActivityContext) (any, error) {
return nil, nil
})
require.Error(t, err)

t.Run("with explicit name", func(t *testing.T) {
err := testWorker.RegisterActivity(func(ctx ActivityContext) (any, error) {
return nil, nil
}, WithName("MyActivity2"))
require.NoError(t, err)
})
})
}

Expand All @@ -69,6 +94,27 @@ func TestWorkerOptions(t *testing.T) {
})
}

func TestRegisterOptions(t *testing.T) {
t.Run("WithName", func(t *testing.T) {
defaultOpts := registerOptions{}
options, err := processRegisterOptions(defaultOpts, WithName("testWorkflow"))
require.NoError(t, err)
assert.NotEmpty(t, options.Name)
assert.Equal(t, "testWorkflow", options.Name)
})

t.Run("error handling", func(t *testing.T) {
optionThatFails := func(opts *registerOptions) error {
return errors.New("this always fails")
}

defaultOpts := registerOptions{}
_, err := processRegisterOptions(defaultOpts, optionThatFails)
require.Error(t, err)
require.ErrorContains(t, err, "this always fails")
})
}

func returnWorkerOptions(opts ...workerOption) workerOptions {
options := new(workerOptions)
for _, configure := range opts {
Expand Down Expand Up @@ -104,6 +150,12 @@ func TestGetFunctionName(t *testing.T) {
require.Error(t, err)
assert.Equal(t, "", name)
})

t.Run("get function name - anonymous", func(t *testing.T) {
name, err := getFunctionName(func() {})
require.Error(t, err)
assert.Equal(t, "", name)
})
}

func testWorkflow(ctx *WorkflowContext) (any, error) {
Expand Down