Skip to content

Commit e407047

Browse files
committed
Make client fail fast when they face an issue with API.
1 parent 9acde9e commit e407047

File tree

2 files changed

+57
-2
lines changed

2 files changed

+57
-2
lines changed

internal/utils/workerpool.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@ package utils
22

33
import (
44
"context"
5+
"errors"
56
"sync"
67

78
"golang.org/x/sync/semaphore"
89
)
910

11+
// ErrTaskAborted error thrown when a taks is aborted
12+
var ErrTaskAborted = errors.New("Aborted")
13+
1014
// Task is a representation of a task
1115
type Task struct {
1216
fn func() error
@@ -19,6 +23,7 @@ type WorkerPool struct {
1923
taskQ chan Task
2024
wg sync.WaitGroup
2125
sem *semaphore.Weighted
26+
failed bool
2227
}
2328

2429
// NewWorkerPool instantiate a new worker pool
@@ -28,6 +33,7 @@ func NewWorkerPool(workers int) *WorkerPool {
2833
workers: workers,
2934
taskQ: taskQ,
3035
sem: semaphore.NewWeighted(int64(workers)),
36+
failed: false,
3137
}
3238

3339
wp.wg.Add(workers)
@@ -36,8 +42,18 @@ func NewWorkerPool(workers int) *WorkerPool {
3642
go func() {
3743
defer wp.wg.Done()
3844
for task := range taskQ {
39-
task.errC <- task.fn()
45+
// If the worker pool has been aborted, we skip all tasks in the queue.
46+
if wp.failed {
47+
task.errC <- ErrTaskAborted
48+
wp.sem.Release(1)
49+
continue
50+
}
51+
err := task.fn()
4052
wp.sem.Release(1)
53+
if err != nil {
54+
wp.failed = true
55+
}
56+
task.errC <- err
4157
}
4258
}()
4359
}
@@ -47,10 +63,15 @@ func NewWorkerPool(workers int) *WorkerPool {
4763

4864
// Exec enqueues one task into the worker pool
4965
func (wp *WorkerPool) Exec(f func() error) chan error {
66+
errC := make(chan error, 1)
67+
if wp.failed {
68+
errC <- ErrTaskAborted
69+
return errC
70+
}
5071
wp.sem.Acquire(context.Background(), 1)
5172
t := Task{
5273
fn: f,
53-
errC: make(chan error, 1),
74+
errC: errC,
5475
}
5576
wp.taskQ <- t
5677
return t.errC

internal/utils/workerpool_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"testing"
7+
"time"
78

89
"github.com/stretchr/testify/assert"
910
)
@@ -59,3 +60,36 @@ func TestWorkerPoolWithManyTasks(t *testing.T) {
5960
<-f
6061
}
6162
}
63+
64+
func TestWorkerPoolAborted(t *testing.T) {
65+
p := NewWorkerPool(5)
66+
defer p.Close()
67+
68+
var errFailed = errors.New("failed")
69+
70+
futures := make([]chan error, 0)
71+
for i := 0; i < 1000; i++ {
72+
ix := i
73+
fmt.Printf("task %d inserted\n", i)
74+
f := p.Exec(func() error {
75+
fmt.Printf("task %d started\n", ix)
76+
time.Sleep(3 * time.Second)
77+
if ix == 6 {
78+
fmt.Printf("task failed!")
79+
return errFailed
80+
}
81+
fmt.Printf("task %d ended\n", ix)
82+
return nil
83+
})
84+
futures = append(futures, f)
85+
}
86+
87+
errorsCount := 0
88+
for _, f := range futures {
89+
err := <-f
90+
if err == errFailed {
91+
errorsCount++
92+
}
93+
}
94+
assert.Equal(t, 1, errorsCount)
95+
}

0 commit comments

Comments
 (0)