Skip to content

Commit 01be776

Browse files
committed
cleanup launcher logic and ctx handling
1 parent 7343568 commit 01be776

File tree

3 files changed

+61
-79
lines changed

3 files changed

+61
-79
lines changed

launcher.go

Lines changed: 52 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ type LauncherOptions struct {
6969
// in progress at one time.
7070
MaxInProgress uint `toml:"max_in_progress" commented:"true" comment:"maximum number of workers within the application at one time"`
7171

72+
TaskLimit int `toml:"task_limit" commented:"true" comment:"hourly task rate limit, 0 disables limit"`
73+
7274
// WorkerKillTime is how long the Launcher will
7375
// wait for a forced-shutdown worker to cleanup.
7476
WorkerKillTime time.Duration `toml:"worker_kill_time" commented:"true" comment:"how long the application will wait for a task to finish before shutting down when being forced to shut down"`
@@ -145,9 +147,8 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
145147
opt.DoneTopic = defaultDoneTopic
146148
}
147149

148-
lgr := log.New(os.Stderr, "", log.LstdFlags)
149150
if opt.Logger != nil {
150-
lgr = opt.Logger
151+
opt.Logger = log.New(os.Stderr, "", log.LstdFlags)
151152
}
152153

153154
// make sure maxInProgress is at least 1
@@ -171,20 +172,11 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
171172
// lifetime max remaining (0; no lifetime max)
172173
remaining := opt.LifetimeWorkers
173174

174-
// doneCncl (done cancel function)
175-
// - is called internally by the Launcher to signal that the Launcher
176-
// has COMPLETED shutting down.
177-
//
178-
// doneCtx (done context)
179-
// - is for communicating externally that the Launcher is DONE and
180-
// has shutdown gracefully.
181-
doneCtx, doneCncl := context.WithCancel(context.Background())
182-
183175
// stop context and cancel func: shutdown Launcher/workers
184176
//
185177
// stopCtx - Launcher will listen on stopCtx.Done() for external forced shutdown.
186178
// stopCncl - used externally of Launcher to initiate forced Launcher shutdown.
187-
stopCtx, stopCncl := context.WithCancel(context.Background())
179+
//stopCtx, stopCncl := context.WithCancel(context.Background())
188180

189181
// last context and cancel func - for indicating the last task
190182
// is in progress.
@@ -194,7 +186,7 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
194186
//
195187
// lastCncl - for sending a signal indicating the last task has
196188
// been received and is currently being processed.
197-
lastCtx, lastCncl := context.WithCancel(context.Background())
189+
//lastCtx, lastCncl := context.WithCancel(context.Background())
198190

199191
// make sure logger is not nil
200192
if opt.Logger == nil {
@@ -213,10 +205,10 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
213205
// warn if typeHandling is set and
214206
// a task type is not provided.
215207
if typeHandling != "" && opt.TaskType == "" {
216-
lgr.Printf("NO WORKERS WILL BE LAUNCHED! task type handling is set to '%v' but no task type is provided", typeHandling)
208+
opt.Logger.Printf("NO WORKERS WILL BE LAUNCHED! task type handling is set to '%v' but no task type is provided", typeHandling)
217209
}
218210

219-
l := &Launcher{
211+
return &Launcher{
220212
initTime: time.Now(),
221213
isInitialized: true,
222214
consumer: c,
@@ -226,19 +218,15 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
226218
lgr: opt.Logger,
227219
taskType: opt.TaskType,
228220
typeHandling: typeHandling,
229-
doneCtx: doneCtx,
230-
doneCncl: doneCncl,
231-
stopCtx: stopCtx,
232-
stopCncl: stopCncl,
233-
lastCtx: lastCtx,
234-
lastCncl: lastCncl,
221+
// stopCtx: stopCtx,
222+
// stopCncl: stopCncl,
223+
// lastCtx: lastCtx,
224+
// lastCncl: lastCncl,
235225
maxInProgress: maxInProgress,
236226
remaining: remaining,
237227
slots: slots,
238228
closeTimeout: workerTimeout,
239229
}
240-
241-
return l
242230
}
243231

244232
// Launcher handles the heavy lifting of worker lifecycle, general
@@ -267,18 +255,14 @@ type Launcher struct {
267255
taskType string // registered task type; used for identifying the worker and handling task types that do not match.
268256
typeHandling string // how to handle unmatching task types: one of "reject", "ignore"
269257

270-
// communicating Launcher has finished shutting down
271-
doneCtx context.Context // Launcher context (highest level context)
272-
doneCncl context.CancelFunc // Launcher cancel func (calling it indicates the Launcher has cleanly closed up)
273-
274258
// forcing workers/Launcher to shut down
275259
// all worker contexts inherit from stopCtx.
276-
stopCtx context.Context // for listening to Launcher shutdown signal. Initiates shutdown process.
277-
stopCncl context.CancelFunc // for telling the Launcher to shutdown. Initiates shutdown process. Shutdown is complete when doneCtx.Done() is closed
260+
// stopCtx context.Context // for listening to Launcher shutdown signal. Initiates shutdown process.
261+
// stopCncl context.CancelFunc // for telling the Launcher to shutdown. Initiates shutdown process. Shutdown is complete when doneCtx.Done() is closed
278262

279263
// indicate the last task is in progress
280-
lastCtx context.Context // main loop will listen on lastCtx.Done() to know if the last task is in progress
281-
lastCncl context.CancelFunc // called to indicate the last task is in progress
264+
// lastCtx context.Context // main loop will listen on lastCtx.Done() to know if the last task is in progress
265+
// lastCncl context.CancelFunc // called to indicate the last task is in progress
282266

283267
// closeTimeout tells the Launcher how long to wait
284268
// when forcing a task to close.
@@ -350,8 +334,8 @@ func (l *Launcher) Stats() LauncherStats {
350334
finishedTasks := createdTasks - activeTasks
351335
resp := LauncherStats{
352336
RunTime: time.Now().Sub(l.initTime).String(),
353-
TasksConsumed: l.tasksConsumed,
354-
TasksRunning: l.tasksRunning,
337+
TasksConsumed: atomic.LoadInt64(&l.tasksConsumed),
338+
TasksRunning: atomic.LoadInt64(&l.tasksRunning),
355339
Producer: l.producer.Info(),
356340
Consumer: l.consumer.Info(),
357341
}
@@ -362,8 +346,14 @@ func (l *Launcher) Stats() LauncherStats {
362346
return resp
363347
}
364348

365-
// DoTasks will start the task loop and immediately
366-
// begin working on tasks if any are available.
349+
// Deprecated: Use Start() instead.
350+
func (l *Launcher) DoTasks() (doneCtx context.Context, stopCncl context.CancelFunc) {
351+
ctx, cancel := context.WithCancel(context.Background())
352+
go l.Start(ctx)
353+
return ctx, cancel
354+
}
355+
356+
// Start is a BLOCKING task loop and immediately begin working on tasks if any are available.
367357
//
368358
// The Launcher assumes the producer and consumer
369359
// are fully initialized when the Launcher is created.
@@ -375,29 +365,23 @@ func (l *Launcher) Stats() LauncherStats {
375365
// will not do anything. If called more than once will
376366
// return a copy of the same context and cancel function
377367
// received the first time.
378-
func (l *Launcher) DoTasks() (doneCtx context.Context, stopCncl context.CancelFunc) {
368+
func (l *Launcher) Start(ctx context.Context) {
379369
if !l.isInitialized {
380370
panic("launcher not initialized")
381371
}
382-
383372
if l.isDoing {
384-
return l.doneCtx, l.stopCncl
373+
return
385374
}
386-
go l.do()
387-
388375
l.isDoing = true
389-
return l.doneCtx, l.stopCncl
390-
}
391-
392-
// do is the main task loop.
393-
func (l *Launcher) do() {
394-
defer l.doneCncl()
376+
stopCtx, stopCancel := context.WithCancel(ctx)
377+
// lastCtx, lastCancel := context.WithCancel(ctx)
378+
defer stopCancel()
395379
for {
396380
select {
397-
case <-l.stopCtx.Done():
398-
goto Shutdown
399-
case <-l.lastCtx.Done():
381+
case <-stopCtx.Done():
400382
goto Shutdown
383+
// case <-lastCtx.Done():
384+
// goto Shutdown
401385
case <-l.slots:
402386
l.wg.Add(1)
403387
l.mu.Lock()
@@ -408,14 +392,14 @@ func (l *Launcher) do() {
408392
if l.remaining == 0 {
409393
// the message about to be requested will
410394
// be the last one.
411-
l.lastCncl()
395+
stopCancel()
412396
}
413397
}
414398
l.mu.Unlock()
415399

416400
// next() needs to be non-blocking so
417-
// the application can shut down when asked to.
418-
go l.next()
401+
// the application can shut down when asked to
402+
go l.next(stopCtx, stopCancel)
419403
}
420404
}
421405

@@ -440,29 +424,29 @@ Shutdown:
440424
}
441425

442426
// next handles getting and processing the next task.
443-
func (l *Launcher) next() {
427+
func (l *Launcher) next(ctx context.Context, cancel context.CancelFunc) {
444428
tskB, done, err := l.consumer.Msg()
445429
if done {
446-
l.lastCncl()
430+
cancel()
447431
}
448432
if err != nil {
449433
l.log(err.Error())
450-
l.giveBackSlot()
434+
l.giveBackSlot(ctx)
451435

452436
return
453437
}
454438

455439
// handle a zero byte message
456440
if len(tskB) == 0 {
457-
l.giveBackSlot()
441+
l.giveBackSlot(ctx)
458442

459443
return
460444
}
461445

462446
tsk, err := NewFromBytes(tskB)
463-
if err != nil {
464-
l.log(err.Error())
465-
l.giveBackSlot()
447+
if err != nil || tsk == nil {
448+
l.log(err)
449+
l.giveBackSlot(ctx)
466450

467451
return
468452
}
@@ -472,7 +456,7 @@ func (l *Launcher) next() {
472456
go func() {
473457
atomic.AddInt64(&l.tasksConsumed, 1)
474458
atomic.AddInt64(&l.tasksRunning, 1)
475-
l.doLaunch(tsk)
459+
l.doLaunch(tsk, ctx)
476460
atomic.AddInt64(&l.tasksRunning, -1)
477461
}()
478462
}
@@ -481,15 +465,15 @@ func (l *Launcher) next() {
481465
// doLaunch will safely handle the wait
482466
// group and cleanly close down a worker
483467
// and report back on the task result.
484-
func (l *Launcher) doLaunch(tsk *Task) {
485-
defer l.giveBackSlot()
468+
func (l *Launcher) doLaunch(tsk *Task, ctx context.Context) {
469+
defer l.giveBackSlot(ctx)
486470

487471
var wCtx context.Context
488472
var cncl context.CancelFunc
489473
if l.completeTimeout > time.Duration(0) {
490-
wCtx, cncl = context.WithTimeout(l.stopCtx, l.completeTimeout)
474+
wCtx, cncl = context.WithTimeout(ctx, l.completeTimeout)
491475
} else {
492-
wCtx, cncl = context.WithCancel(l.stopCtx)
476+
wCtx, cncl = context.WithCancel(ctx)
493477
}
494478
defer cncl() // clean up worker context
495479

@@ -565,16 +549,16 @@ func (l *Launcher) sendTsk(tsk *Task) {
565549
//
566550
// will not give back the slot if the application is
567551
// shutting down or processing the last task.
568-
func (l *Launcher) giveBackSlot() {
569-
if l.stopCtx.Err() == nil && l.lastCtx.Err() == nil {
552+
func (l *Launcher) giveBackSlot(ctx context.Context) {
553+
if ctx.Err() == nil {
570554
//atomic.AddInt64(&l.tasksRunning, -1)
571555
l.slots <- 1
572556
}
573557
l.wg.Done()
574558
}
575559

576560
// log is the central point of operational logging.
577-
func (l *Launcher) log(msg string) {
561+
func (l *Launcher) log(msg interface{}) {
578562
l.lgr.Println(msg)
579563
}
580564

@@ -587,9 +571,5 @@ func (l *Launcher) log(msg string) {
587571
func (l *Launcher) Err() error {
588572
l.mu.Lock()
589573
defer l.mu.Unlock()
590-
if l.doneCtx.Err() != nil {
591-
return l.closeErr
592-
}
593-
594-
return nil
574+
return l.closeErr
595575
}

launcher_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,16 +110,14 @@ func TestDoLaunch(t *testing.T) {
110110
}
111111

112112
launcher := &Launcher{
113-
stopCtx: context.Background(),
114-
lastCtx: context.Background(),
115113
slots: make(chan int, 10),
116114
opt: NewLauncherOptions(""),
117115
producer: p,
118116
newWkr: wrkFn,
119117
}
120118
launcher.wg.Add(1)
121119
tsk := &Task{Info: in.info, Meta: in.meta}
122-
launcher.doLaunch(tsk)
120+
launcher.doLaunch(tsk, context.Background())
123121
if err := json.Unmarshal([]byte(p.Messages["done"][0]), tsk); err != nil {
124122
return nil, err
125123
}

task_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package task
33
import (
44
"testing"
55
"time"
6+
7+
"github.com/hydronica/trial"
68
)
79

810
func TestNew(t *testing.T) {
@@ -336,16 +338,18 @@ func TestTask_String(t *testing.T) {
336338
// task to string plumbing test
337339
tsk := &Task{
338340
Type: "test-type",
339-
Info: "test-info",
341+
Info: "./test-info?arg1=abc&arg2=123",
342+
Msg: "<message>",
340343
}
341344

342345
tskStr := tsk.JSONString()
343346

344347
// correct byte count
345348
// {"type":"test-type","info":"test-info"}
346-
expectedCnt := 39
347-
if len(tskStr) != expectedCnt {
348-
t.Errorf("expected '%v' but got '%v'\n", expectedCnt, len(tskStr))
349+
expected := `{"type":"test-type","info":"./test-info?arg1=abc&arg2=123","msg":"<message>"}`
350+
t.Log(tskStr)
351+
if eq, diff := trial.Equal(expected, tskStr); !eq {
352+
t.Errorf("FAIL: %v", diff)
349353
}
350354
}
351355

0 commit comments

Comments
 (0)