Skip to content

Commit a7f8564

Browse files
authored
cleanup launcher logic and ctx handling (#83)
1 parent 7343568 commit a7f8564

File tree

3 files changed

+47
-87
lines changed

3 files changed

+47
-87
lines changed

launcher.go

Lines changed: 38 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -145,11 +145,6 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
145145
opt.DoneTopic = defaultDoneTopic
146146
}
147147

148-
lgr := log.New(os.Stderr, "", log.LstdFlags)
149-
if opt.Logger != nil {
150-
lgr = opt.Logger
151-
}
152-
153148
// make sure maxInProgress is at least 1
154149
maxInProgress := uint(1)
155150
if opt.MaxInProgress > 1 {
@@ -171,20 +166,11 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
171166
// lifetime max remaining (0; no lifetime max)
172167
remaining := opt.LifetimeWorkers
173168

174-
// doneCncl (done cancel function)
175-
// - is called internally by the Launcher to signal that the Launcher
176-
// has COMPLETED shutting down.
177-
//
178-
// doneCtx (done context)
179-
// - is for communicating externally that the Launcher is DONE and
180-
// has shutdown gracefully.
181-
doneCtx, doneCncl := context.WithCancel(context.Background())
182-
183169
// stop context and cancel func: shutdown Launcher/workers
184170
//
185171
// stopCtx - Launcher will listen on stopCtx.Done() for external forced shutdown.
186172
// stopCncl - used externally of Launcher to initiate forced Launcher shutdown.
187-
stopCtx, stopCncl := context.WithCancel(context.Background())
173+
//stopCtx, stopCncl := context.WithCancel(context.Background())
188174

189175
// last context and cancel func - for indicating the last task
190176
// is in progress.
@@ -194,7 +180,7 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
194180
//
195181
// lastCncl - for sending a signal indicating the last task has
196182
// been received and is currently being processed.
197-
lastCtx, lastCncl := context.WithCancel(context.Background())
183+
//lastCtx, lastCncl := context.WithCancel(context.Background())
198184

199185
// make sure logger is not nil
200186
if opt.Logger == nil {
@@ -213,10 +199,10 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
213199
// warn if typeHandling is set and
214200
// a task type is not provided.
215201
if typeHandling != "" && opt.TaskType == "" {
216-
lgr.Printf("NO WORKERS WILL BE LAUNCHED! task type handling is set to '%v' but no task type is provided", typeHandling)
202+
opt.Logger.Printf("NO WORKERS WILL BE LAUNCHED! task type handling is set to '%v' but no task type is provided", typeHandling)
217203
}
218204

219-
l := &Launcher{
205+
return &Launcher{
220206
initTime: time.Now(),
221207
isInitialized: true,
222208
consumer: c,
@@ -226,19 +212,11 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
226212
lgr: opt.Logger,
227213
taskType: opt.TaskType,
228214
typeHandling: typeHandling,
229-
doneCtx: doneCtx,
230-
doneCncl: doneCncl,
231-
stopCtx: stopCtx,
232-
stopCncl: stopCncl,
233-
lastCtx: lastCtx,
234-
lastCncl: lastCncl,
235215
maxInProgress: maxInProgress,
236216
remaining: remaining,
237217
slots: slots,
238218
closeTimeout: workerTimeout,
239219
}
240-
241-
return l
242220
}
243221

244222
// Launcher handles the heavy lifting of worker lifecycle, general
@@ -267,19 +245,6 @@ type Launcher struct {
267245
taskType string // registered task type; used for identifying the worker and handling task types that do not match.
268246
typeHandling string // how to handle unmatching task types: one of "reject", "ignore"
269247

270-
// communicating Launcher has finished shutting down
271-
doneCtx context.Context // Launcher context (highest level context)
272-
doneCncl context.CancelFunc // Launcher cancel func (calling it indicates the Launcher has cleanly closed up)
273-
274-
// forcing workers/Launcher to shut down
275-
// all worker contexts inherit from stopCtx.
276-
stopCtx context.Context // for listening to Launcher shutdown signal. Initiates shutdown process.
277-
stopCncl context.CancelFunc // for telling the Launcher to shutdown. Initiates shutdown process. Shutdown is complete when doneCtx.Done() is closed
278-
279-
// indicate the last task is in progress
280-
lastCtx context.Context // main loop will listen on lastCtx.Done() to know if the last task is in progress
281-
lastCncl context.CancelFunc // called to indicate the last task is in progress
282-
283248
// closeTimeout tells the Launcher how long to wait
284249
// when forcing a task to close.
285250
closeTimeout time.Duration
@@ -350,8 +315,8 @@ func (l *Launcher) Stats() LauncherStats {
350315
finishedTasks := createdTasks - activeTasks
351316
resp := LauncherStats{
352317
RunTime: time.Now().Sub(l.initTime).String(),
353-
TasksConsumed: l.tasksConsumed,
354-
TasksRunning: l.tasksRunning,
318+
TasksConsumed: atomic.LoadInt64(&l.tasksConsumed),
319+
TasksRunning: atomic.LoadInt64(&l.tasksRunning),
355320
Producer: l.producer.Info(),
356321
Consumer: l.consumer.Info(),
357322
}
@@ -362,8 +327,14 @@ func (l *Launcher) Stats() LauncherStats {
362327
return resp
363328
}
364329

365-
// DoTasks will start the task loop and immediately
366-
// begin working on tasks if any are available.
330+
// Deprecated: Use Start() instead.
331+
func (l *Launcher) DoTasks() (doneCtx context.Context, stopCncl context.CancelFunc) {
332+
ctx, cancel := context.WithCancel(context.Background())
333+
go l.Start(ctx)
334+
return ctx, cancel
335+
}
336+
337+
// Start is a BLOCKING task loop and immediately begin working on tasks if any are available.
367338
//
368339
// The Launcher assumes the producer and consumer
369340
// are fully initialized when the Launcher is created.
@@ -375,28 +346,19 @@ func (l *Launcher) Stats() LauncherStats {
375346
// will not do anything. If called more than once will
376347
// return a copy of the same context and cancel function
377348
// received the first time.
378-
func (l *Launcher) DoTasks() (doneCtx context.Context, stopCncl context.CancelFunc) {
349+
func (l *Launcher) Start(ctx context.Context) {
379350
if !l.isInitialized {
380351
panic("launcher not initialized")
381352
}
382-
383353
if l.isDoing {
384-
return l.doneCtx, l.stopCncl
354+
return
385355
}
386-
go l.do()
387-
388356
l.isDoing = true
389-
return l.doneCtx, l.stopCncl
390-
}
391-
392-
// do is the main task loop.
393-
func (l *Launcher) do() {
394-
defer l.doneCncl()
357+
stopCtx, stopCancel := context.WithCancel(ctx)
358+
defer stopCancel()
395359
for {
396360
select {
397-
case <-l.stopCtx.Done():
398-
goto Shutdown
399-
case <-l.lastCtx.Done():
361+
case <-stopCtx.Done():
400362
goto Shutdown
401363
case <-l.slots:
402364
l.wg.Add(1)
@@ -408,14 +370,14 @@ func (l *Launcher) do() {
408370
if l.remaining == 0 {
409371
// the message about to be requested will
410372
// be the last one.
411-
l.lastCncl()
373+
stopCancel()
412374
}
413375
}
414376
l.mu.Unlock()
415377

416378
// next() needs to be non-blocking so
417-
// the application can shut down when asked to.
418-
go l.next()
379+
// the application can shut down when asked to
380+
go l.next(stopCtx, stopCancel)
419381
}
420382
}
421383

@@ -440,29 +402,29 @@ Shutdown:
440402
}
441403

442404
// next handles getting and processing the next task.
443-
func (l *Launcher) next() {
405+
func (l *Launcher) next(ctx context.Context, cancel context.CancelFunc) {
444406
tskB, done, err := l.consumer.Msg()
445407
if done {
446-
l.lastCncl()
408+
cancel()
447409
}
448410
if err != nil {
449411
l.log(err.Error())
450-
l.giveBackSlot()
412+
l.giveBackSlot(ctx)
451413

452414
return
453415
}
454416

455417
// handle a zero byte message
456418
if len(tskB) == 0 {
457-
l.giveBackSlot()
419+
l.giveBackSlot(ctx)
458420

459421
return
460422
}
461423

462424
tsk, err := NewFromBytes(tskB)
463-
if err != nil {
464-
l.log(err.Error())
465-
l.giveBackSlot()
425+
if err != nil || tsk == nil {
426+
l.log(err)
427+
l.giveBackSlot(ctx)
466428

467429
return
468430
}
@@ -472,7 +434,7 @@ func (l *Launcher) next() {
472434
go func() {
473435
atomic.AddInt64(&l.tasksConsumed, 1)
474436
atomic.AddInt64(&l.tasksRunning, 1)
475-
l.doLaunch(tsk)
437+
l.doLaunch(tsk, ctx)
476438
atomic.AddInt64(&l.tasksRunning, -1)
477439
}()
478440
}
@@ -481,15 +443,15 @@ func (l *Launcher) next() {
481443
// doLaunch will safely handle the wait
482444
// group and cleanly close down a worker
483445
// and report back on the task result.
484-
func (l *Launcher) doLaunch(tsk *Task) {
485-
defer l.giveBackSlot()
446+
func (l *Launcher) doLaunch(tsk *Task, ctx context.Context) {
447+
defer l.giveBackSlot(ctx)
486448

487449
var wCtx context.Context
488450
var cncl context.CancelFunc
489451
if l.completeTimeout > time.Duration(0) {
490-
wCtx, cncl = context.WithTimeout(l.stopCtx, l.completeTimeout)
452+
wCtx, cncl = context.WithTimeout(ctx, l.completeTimeout)
491453
} else {
492-
wCtx, cncl = context.WithCancel(l.stopCtx)
454+
wCtx, cncl = context.WithCancel(ctx)
493455
}
494456
defer cncl() // clean up worker context
495457

@@ -565,16 +527,16 @@ func (l *Launcher) sendTsk(tsk *Task) {
565527
//
566528
// will not give back the slot if the application is
567529
// shutting down or processing the last task.
568-
func (l *Launcher) giveBackSlot() {
569-
if l.stopCtx.Err() == nil && l.lastCtx.Err() == nil {
530+
func (l *Launcher) giveBackSlot(ctx context.Context) {
531+
if ctx.Err() == nil {
570532
//atomic.AddInt64(&l.tasksRunning, -1)
571533
l.slots <- 1
572534
}
573535
l.wg.Done()
574536
}
575537

576538
// log is the central point of operational logging.
577-
func (l *Launcher) log(msg string) {
539+
func (l *Launcher) log(msg interface{}) {
578540
l.lgr.Println(msg)
579541
}
580542

@@ -587,9 +549,5 @@ func (l *Launcher) log(msg string) {
587549
func (l *Launcher) Err() error {
588550
l.mu.Lock()
589551
defer l.mu.Unlock()
590-
if l.doneCtx.Err() != nil {
591-
return l.closeErr
592-
}
593-
594-
return nil
552+
return l.closeErr
595553
}

launcher_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,14 @@ func TestDoLaunch(t *testing.T) {
110110
}
111111

112112
launcher := &Launcher{
113-
stopCtx: context.Background(),
114-
lastCtx: context.Background(),
115113
slots: make(chan int, 10),
116114
opt: NewLauncherOptions(""),
117115
producer: p,
118116
newWkr: wrkFn,
119117
}
120118
launcher.wg.Add(1)
121119
tsk := &Task{Info: in.info, Meta: in.meta}
122-
launcher.doLaunch(tsk)
120+
launcher.doLaunch(tsk, context.Background())
123121
if err := json.Unmarshal([]byte(p.Messages["done"][0]), tsk); err != nil {
124122
return nil, err
125123
}

task_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package task
33
import (
44
"testing"
55
"time"
6+
7+
"github.com/hydronica/trial"
68
)
79

810
func TestNew(t *testing.T) {
@@ -336,16 +338,18 @@ func TestTask_String(t *testing.T) {
336338
// task to string plumbing test
337339
tsk := &Task{
338340
Type: "test-type",
339-
Info: "test-info",
341+
Info: "./test-info?arg1=abc&arg2=123",
342+
Msg: "<message>",
340343
}
341344

342345
tskStr := tsk.JSONString()
343346

344347
// correct byte count
345348
// {"type":"test-type","info":"test-info"}
346-
expectedCnt := 39
347-
if len(tskStr) != expectedCnt {
348-
t.Errorf("expected '%v' but got '%v'\n", expectedCnt, len(tskStr))
349+
expected := `{"type":"test-type","info":"./test-info?arg1=abc&arg2=123","msg":"<message>"}`
350+
t.Log(tskStr)
351+
if eq, diff := trial.Equal(expected, tskStr); !eq {
352+
t.Errorf("FAIL: %v", diff)
349353
}
350354
}
351355

0 commit comments

Comments
 (0)