Skip to content

Commit 7c081b5

Browse files
authored
Merge branch 'main' into support-named-workflows-and-activities
2 parents 37deb0d + 2e40804 commit 7c081b5

File tree

14 files changed

+261
-32
lines changed

14 files changed

+261
-32
lines changed

.github/workflows/validate_examples.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ jobs:
3838
CHECKOUT_REF: ${{ github.ref }}
3939
outputs:
4040
DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }}
41-
DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }}
41+
DAPR_CLI_VER: 1.16.0-rc.1
4242
DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }}
43-
DAPR_RUNTIME_VER: ${{ steps.outputs.outputs.DAPR_RUNTIME_VER }}
43+
DAPR_RUNTIME_VER: 1.16.0-rc.3
4444
CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }}
4545
CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }}
4646
DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }}
@@ -175,6 +175,7 @@ jobs:
175175
"socket",
176176
"workflow",
177177
"workflow-parallel",
178+
"workflow-taskexecutionid"
178179
]
179180
steps:
180181
- name: Check out code onto GOPATH

client/client.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func NewClientWithAddressContext(ctx context.Context, address string) (client Cl
355355
return nil, fmt.Errorf("error parsing address '%s': %w", address, err)
356356
}
357357

358-
at := &authToken{}
358+
at := newAuthToken()
359359

360360
opts := []grpc.DialOption{
361361
grpc.WithUserAgent(userAgent()),
@@ -404,7 +404,7 @@ func NewClientWithSocket(socket string) (client Client, err error) {
404404
if socket == "" {
405405
return nil, errors.New("nil socket")
406406
}
407-
at := &authToken{}
407+
at := newAuthToken()
408408
logger.Printf("dapr client initializing for: %s", socket)
409409
addr := "unix://" + socket
410410
conn, err := grpc.Dial( //nolint:staticcheck
@@ -421,11 +421,6 @@ func NewClientWithSocket(socket string) (client Client, err error) {
421421
}
422422

423423
func newClientWithConnection(conn *grpc.ClientConn, authToken *authToken) Client {
424-
apiToken := os.Getenv(apiTokenEnvVarName)
425-
if apiToken != "" {
426-
logger.Println("client uses API token")
427-
authToken.set(apiToken)
428-
}
429424
return &GRPCClient{
430425
connection: conn,
431426
protoClient: pb.NewDaprClient(conn),
@@ -435,14 +430,26 @@ func newClientWithConnection(conn *grpc.ClientConn, authToken *authToken) Client
435430

436431
// NewClientWithConnection instantiates Dapr client using specific connection.
437432
func NewClientWithConnection(conn *grpc.ClientConn) Client {
438-
return newClientWithConnection(conn, &authToken{})
433+
return newClientWithConnection(conn, newAuthToken())
439434
}
440435

436+
// NOTE: authToken must be created using newAuthToken()
437+
// it is crucial to correctly initialize the dapr client with the API token from the environment variable
441438
type authToken struct {
442439
mu sync.RWMutex
443440
authToken string
444441
}
445442

443+
func newAuthToken() *authToken {
444+
apiToken := os.Getenv(apiTokenEnvVarName)
445+
if apiToken != "" {
446+
logger.Println("API Token loaded from the environment variable")
447+
}
448+
return &authToken{
449+
authToken: apiToken,
450+
}
451+
}
452+
446453
func (a *authToken) get() string {
447454
a.mu.RLock()
448455
defer a.mu.RUnlock()

client/conversation.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ func WithTemperature(temp float64) conversationRequestOption {
101101

102102
// ConverseAlpha1 can invoke an LLM given a request created by the NewConversationRequest function.
103103
func (c *GRPCClient) ConverseAlpha1(ctx context.Context, req conversationRequest, options ...conversationRequestOption) (*ConversationResponse, error) {
104+
//nolint:staticcheck
104105
cinputs := make([]*runtimev1pb.ConversationInput, len(req.inputs))
105106
for i, in := range req.inputs {
107+
//nolint:staticcheck
106108
cinputs[i] = &runtimev1pb.ConversationInput{
107109
Content: in.Content,
108110
Role: in.Role,
@@ -115,7 +117,7 @@ func (c *GRPCClient) ConverseAlpha1(ctx context.Context, req conversationRequest
115117
opt(&req)
116118
}
117119
}
118-
120+
//nolint:staticcheck
119121
request := runtimev1pb.ConversationRequest{
120122
Name: req.name,
121123
ContextID: req.ContextID,

examples/go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ require (
1818
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
1919
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
2020
github.com/cespare/xxhash/v2 v2.3.0 // indirect
21-
github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b // indirect
22-
github.com/dapr/durabletask-go v0.7.2 // indirect
21+
github.com/dapr/dapr v1.16.0-rc.3 // indirect
22+
github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5 // indirect
2323
github.com/dapr/kit v0.15.4 // indirect
2424
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
25-
github.com/go-chi/chi/v5 v5.1.0 // indirect
25+
github.com/go-chi/chi/v5 v5.2.2 // indirect
2626
github.com/go-logr/logr v1.4.3 // indirect
2727
github.com/go-logr/stdr v1.2.2 // indirect
2828
github.com/sirupsen/logrus v1.9.3 // indirect

examples/go.sum

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3
66
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
77
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
88
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
9-
github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b h1:fR+ae4QXF8R4GqKrzEls7WaibF1wjiVvifUl+IoP37I=
10-
github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b/go.mod h1:kx/7l7wDGkKRVoE6CUtuNl1FjKA0hj7bn/6xJ1Fc6HY=
11-
github.com/dapr/durabletask-go v0.7.2 h1:ssNupibV65/o5HNJRceU6x7D4LSyrGsz6nfMFUcI540=
12-
github.com/dapr/durabletask-go v0.7.2/go.mod h1:JhMyDybRUFmmgieGxCPeg9e2cWwtx4LwNXjD+LBtKYk=
9+
github.com/dapr/dapr v1.16.0-rc.3 h1:D99V20GOhb+bZXH1PngME+wgzIZCcBFOvmaP7DOZxGo=
10+
github.com/dapr/dapr v1.16.0-rc.3/go.mod h1:uyKnxMohSg87LSFzZ/oyuiGSo0+qkzeR0eXncPyIV9c=
11+
github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5 h1:l8oBGwcfCwqvSYDZwla0A2fhENmXFc1Wk4lR0VEq+is=
12+
github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q=
1313
github.com/dapr/kit v0.15.4 h1:29DezCR22OuZhXX4yPEc+lqcOf/PNaeAuIEx9nGv394=
1414
github.com/dapr/kit v0.15.4/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw=
1515
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -20,8 +20,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
2020
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
2121
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
2222
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
23-
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
24-
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
23+
github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618=
24+
github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
2525
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
2626
github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI=
2727
github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Dapr Parallel Workflow Example with go-sdk
2+
3+
## Step
4+
5+
### Prepare
6+
7+
- Dapr installed
8+
9+
### Run Workflow
10+
11+
<!-- STEP
12+
name: Run Workflow
13+
output_match_mode: substring
14+
expected_stdout_lines:
15+
- '== APP == Workflow(s) and activities registered.'
16+
- 'work item listener started'
17+
- '== APP == RetryN 1'
18+
- '== APP == RetryN 2'
19+
- '== APP == RetryN 3'
20+
- '== APP == RetryN 4'
21+
- '== APP == RetryN 1'
22+
- '== APP == RetryN 2'
23+
- '== APP == RetryN 3'
24+
- '== APP == RetryN 4'
25+
- '== APP == workflow status: COMPLETED'
26+
- '== APP == workflow terminated'
27+
- '== APP == workflow purged'
28+
29+
background: true
30+
sleep: 30
31+
timeout_seconds: 60
32+
-->
33+
34+
```bash
35+
dapr run --app-id workflow-taskexecutionid \
36+
--dapr-grpc-port 50001 \
37+
--log-level debug \
38+
--resources-path ./config \
39+
-- go run ./main.go
40+
```
41+
42+
<!-- END_STEP -->
43+
44+
## Result
45+
46+
```
47+
- '== APP == Workflow(s) and activities registered.'
48+
- 'work item listener started'
49+
- '== APP == RetryN 1'
50+
- '== APP == RetryN 2'
51+
- '== APP == RetryN 3'
52+
- '== APP == RetryN 4'
53+
- '== APP == RetryN 1'
54+
- '== APP == RetryN 2'
55+
- '== APP == RetryN 3'
56+
- '== APP == RetryN 4'
57+
- '== APP == workflow status: COMPLETED'
58+
- '== APP == workflow terminated'
59+
- '== APP == workflow purged'
60+
```
61+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
apiVersion: dapr.io/v1alpha1
2+
kind: Component
3+
metadata:
4+
name: wf-store
5+
spec:
6+
type: state.redis
7+
version: v1
8+
metadata:
9+
- name: redisHost
10+
value: localhost:6379
11+
- name: redisPassword
12+
value: ""
13+
- name: actorStateStore
14+
value: "true"
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log"
7+
"sync"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/dapr/go-sdk/workflow"
12+
)
13+
14+
func main() {
15+
w, err := workflow.NewWorker()
16+
if err != nil {
17+
log.Fatalf("failed to initialise worker: %v", err)
18+
}
19+
20+
if err := w.RegisterWorkflow(TaskExecutionIdWorkflow); err != nil {
21+
log.Fatalf("failed to register workflow: %v", err)
22+
}
23+
if err := w.RegisterActivity(RetryN); err != nil {
24+
log.Fatalf("failed to register activity: %v", err)
25+
}
26+
fmt.Println("Workflow(s) and activities registered.")
27+
28+
if err := w.Start(); err != nil {
29+
log.Fatalf("failed to start worker")
30+
}
31+
32+
wfClient, err := workflow.NewClient()
33+
if err != nil {
34+
log.Fatalf("failed to initialise client: %v", err)
35+
}
36+
ctx := context.Background()
37+
id, err := wfClient.ScheduleNewWorkflow(ctx, "TaskExecutionIdWorkflow", workflow.WithInput(5))
38+
if err != nil {
39+
log.Fatalf("failed to schedule a new workflow: %v", err)
40+
}
41+
42+
metadata, err := wfClient.WaitForWorkflowCompletion(ctx, id)
43+
if err != nil {
44+
log.Fatalf("failed to get workflow: %v", err)
45+
}
46+
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
47+
48+
err = wfClient.TerminateWorkflow(ctx, id)
49+
if err != nil {
50+
log.Fatalf("failed to terminate workflow: %v", err)
51+
}
52+
fmt.Println("workflow terminated")
53+
54+
err = wfClient.PurgeWorkflow(ctx, id)
55+
if err != nil {
56+
log.Fatalf("failed to purge workflow: %v", err)
57+
}
58+
fmt.Println("workflow purged")
59+
}
60+
61+
var eMap = sync.Map{}
62+
63+
func TaskExecutionIdWorkflow(ctx *workflow.WorkflowContext) (any, error) {
64+
var retries int
65+
if err := ctx.GetInput(&retries); err != nil {
66+
return 0, err
67+
}
68+
69+
var workBatch []int
70+
if err := ctx.CallActivity(RetryN, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
71+
MaxAttempts: retries,
72+
InitialRetryInterval: 100 * time.Millisecond,
73+
BackoffCoefficient: 2,
74+
MaxRetryInterval: 1 * time.Second,
75+
}), workflow.ActivityInput(retries)).Await(&workBatch); err != nil {
76+
return 0, err
77+
}
78+
79+
if err := ctx.CallActivity(RetryN, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
80+
MaxAttempts: retries,
81+
InitialRetryInterval: 100 * time.Millisecond,
82+
BackoffCoefficient: 2,
83+
MaxRetryInterval: 1 * time.Second,
84+
}), workflow.ActivityInput(retries)).Await(&workBatch); err != nil {
85+
return 0, err
86+
}
87+
88+
return 0, nil
89+
}
90+
91+
func RetryN(ctx workflow.ActivityContext) (any, error) {
92+
taskExecutionID := ctx.GetTaskExecutionID()
93+
counter, _ := eMap.LoadOrStore(taskExecutionID, &atomic.Int32{})
94+
var retries int32
95+
if err := ctx.GetInput(&retries); err != nil {
96+
return 0, err
97+
}
98+
99+
counter.(*atomic.Int32).Add(1)
100+
fmt.Println("RetryN ", counter.(*atomic.Int32).Load())
101+
102+
if counter.(*atomic.Int32).Load() < retries-1 {
103+
return nil, fmt.Errorf("failed")
104+
}
105+
106+
return nil, nil
107+
108+
}

examples/workflow/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ func main() {
6464
ctx := context.Background()
6565

6666
// Start workflow test
67+
// Set the start time to the current time to not wait for the workflow to
68+
// "start". This is useful for increasing the throughput of creating
69+
// workflows.
70+
// workflow.WithStartTime(time.Now())
6771
instanceID, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
6872
if err != nil {
6973
log.Fatalf("failed to start workflow: %v", err)

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ module github.com/dapr/go-sdk
33
go 1.24.4
44

55
require (
6-
github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b
7-
github.com/dapr/durabletask-go v0.7.2
6+
github.com/dapr/dapr v1.16.0-rc.3
7+
github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5
88
github.com/dapr/kit v0.15.4
9-
github.com/go-chi/chi/v5 v5.1.0
9+
github.com/go-chi/chi/v5 v5.2.2
1010
github.com/golang/mock v1.6.0
1111
github.com/google/uuid v1.6.0
1212
github.com/stretchr/testify v1.10.0

0 commit comments

Comments
 (0)