Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 38 additions & 80 deletions launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,6 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
opt.DoneTopic = defaultDoneTopic
}

lgr := log.New(os.Stderr, "", log.LstdFlags)
if opt.Logger != nil {
lgr = opt.Logger
}

// make sure maxInProgress is at least 1
maxInProgress := uint(1)
if opt.MaxInProgress > 1 {
Expand All @@ -171,20 +166,11 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
// lifetime max remaining (0; no lifetime max)
remaining := opt.LifetimeWorkers

// doneCncl (done cancel function)
// - is called internally by the Launcher to signal that the Launcher
// has COMPLETED shutting down.
//
// doneCtx (done context)
// - is for communicating externally that the Launcher is DONE and
// has shutdown gracefully.
doneCtx, doneCncl := context.WithCancel(context.Background())

// stop context and cancel func: shutdown Launcher/workers
//
// stopCtx - Launcher will listen on stopCtx.Done() for external forced shutdown.
// stopCncl - used externally of Launcher to initiate forced Launcher shutdown.
stopCtx, stopCncl := context.WithCancel(context.Background())
//stopCtx, stopCncl := context.WithCancel(context.Background())

// last context and cancel func - for indicating the last task
// is in progress.
Expand All @@ -194,7 +180,7 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
//
// lastCncl - for sending a signal indicating the last task has
// been received and is currently being processed.
lastCtx, lastCncl := context.WithCancel(context.Background())
//lastCtx, lastCncl := context.WithCancel(context.Background())

// make sure logger is not nil
if opt.Logger == nil {
Expand All @@ -213,10 +199,10 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
// warn if typeHandling is set and
// a task type is not provided.
if typeHandling != "" && opt.TaskType == "" {
lgr.Printf("NO WORKERS WILL BE LAUNCHED! task type handling is set to '%v' but no task type is provided", typeHandling)
opt.Logger.Printf("NO WORKERS WILL BE LAUNCHED! task type handling is set to '%v' but no task type is provided", typeHandling)
}

l := &Launcher{
return &Launcher{
initTime: time.Now(),
isInitialized: true,
consumer: c,
Expand All @@ -226,19 +212,11 @@ func NewLauncherFromBus(newWkr NewWorker, c bus.Consumer, p bus.Producer, opt *L
lgr: opt.Logger,
taskType: opt.TaskType,
typeHandling: typeHandling,
doneCtx: doneCtx,
doneCncl: doneCncl,
stopCtx: stopCtx,
stopCncl: stopCncl,
lastCtx: lastCtx,
lastCncl: lastCncl,
maxInProgress: maxInProgress,
remaining: remaining,
slots: slots,
closeTimeout: workerTimeout,
}

return l
}

// Launcher handles the heavy lifting of worker lifecycle, general
Expand Down Expand Up @@ -267,19 +245,6 @@ type Launcher struct {
taskType string // registered task type; used for identifying the worker and handling task types that do not match.
typeHandling string // how to handle unmatching task types: one of "reject", "ignore"

// communicating Launcher has finished shutting down
doneCtx context.Context // Launcher context (highest level context)
doneCncl context.CancelFunc // Launcher cancel func (calling it indicates the Launcher has cleanly closed up)

// forcing workers/Launcher to shut down
// all worker contexts inherit from stopCtx.
stopCtx context.Context // for listening to Launcher shutdown signal. Initiates shutdown process.
stopCncl context.CancelFunc // for telling the Launcher to shutdown. Initiates shutdown process. Shutdown is complete when doneCtx.Done() is closed

// indicate the last task is in progress
lastCtx context.Context // main loop will listen on lastCtx.Done() to know if the last task is in progress
lastCncl context.CancelFunc // called to indicate the last task is in progress

// closeTimeout tells the Launcher how long to wait
// when forcing a task to close.
closeTimeout time.Duration
Expand Down Expand Up @@ -350,8 +315,8 @@ func (l *Launcher) Stats() LauncherStats {
finishedTasks := createdTasks - activeTasks
resp := LauncherStats{
RunTime: time.Now().Sub(l.initTime).String(),
TasksConsumed: l.tasksConsumed,
TasksRunning: l.tasksRunning,
TasksConsumed: atomic.LoadInt64(&l.tasksConsumed),
TasksRunning: atomic.LoadInt64(&l.tasksRunning),
Producer: l.producer.Info(),
Consumer: l.consumer.Info(),
}
Expand All @@ -362,8 +327,14 @@ func (l *Launcher) Stats() LauncherStats {
return resp
}

// DoTasks will start the task loop and immediately
// begin working on tasks if any are available.
// Deprecated: Use Start() instead.
func (l *Launcher) DoTasks() (doneCtx context.Context, stopCncl context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
go l.Start(ctx)
return ctx, cancel
}

// Start is a BLOCKING task loop and immediately begin working on tasks if any are available.
//
// The Launcher assumes the producer and consumer
// are fully initialized when the Launcher is created.
Expand All @@ -375,28 +346,19 @@ func (l *Launcher) Stats() LauncherStats {
// will not do anything. If called more than once will
// return a copy of the same context and cancel function
// received the first time.
func (l *Launcher) DoTasks() (doneCtx context.Context, stopCncl context.CancelFunc) {
func (l *Launcher) Start(ctx context.Context) {
if !l.isInitialized {
panic("launcher not initialized")
}

if l.isDoing {
return l.doneCtx, l.stopCncl
return
}
go l.do()

l.isDoing = true
return l.doneCtx, l.stopCncl
}

// do is the main task loop.
func (l *Launcher) do() {
defer l.doneCncl()
stopCtx, stopCancel := context.WithCancel(ctx)
defer stopCancel()
for {
select {
case <-l.stopCtx.Done():
goto Shutdown
case <-l.lastCtx.Done():
case <-stopCtx.Done():
goto Shutdown
case <-l.slots:
l.wg.Add(1)
Expand All @@ -408,14 +370,14 @@ func (l *Launcher) do() {
if l.remaining == 0 {
// the message about to be requested will
// be the last one.
l.lastCncl()
stopCancel()
}
}
l.mu.Unlock()

// next() needs to be non-blocking so
// the application can shut down when asked to.
go l.next()
// the application can shut down when asked to
go l.next(stopCtx, stopCancel)
}
}

Expand All @@ -440,29 +402,29 @@ Shutdown:
}

// next handles getting and processing the next task.
func (l *Launcher) next() {
func (l *Launcher) next(ctx context.Context, cancel context.CancelFunc) {
tskB, done, err := l.consumer.Msg()
if done {
l.lastCncl()
cancel()
}
if err != nil {
l.log(err.Error())
l.giveBackSlot()
l.giveBackSlot(ctx)

return
}

// handle a zero byte message
if len(tskB) == 0 {
l.giveBackSlot()
l.giveBackSlot(ctx)

return
}

tsk, err := NewFromBytes(tskB)
if err != nil {
l.log(err.Error())
l.giveBackSlot()
if err != nil || tsk == nil {
l.log(err)
l.giveBackSlot(ctx)

return
}
Expand All @@ -472,7 +434,7 @@ func (l *Launcher) next() {
go func() {
atomic.AddInt64(&l.tasksConsumed, 1)
atomic.AddInt64(&l.tasksRunning, 1)
l.doLaunch(tsk)
l.doLaunch(tsk, ctx)
atomic.AddInt64(&l.tasksRunning, -1)
}()
}
Expand All @@ -481,15 +443,15 @@ func (l *Launcher) next() {
// doLaunch will safely handle the wait
// group and cleanly close down a worker
// and report back on the task result.
func (l *Launcher) doLaunch(tsk *Task) {
defer l.giveBackSlot()
func (l *Launcher) doLaunch(tsk *Task, ctx context.Context) {
defer l.giveBackSlot(ctx)

var wCtx context.Context
var cncl context.CancelFunc
if l.completeTimeout > time.Duration(0) {
wCtx, cncl = context.WithTimeout(l.stopCtx, l.completeTimeout)
wCtx, cncl = context.WithTimeout(ctx, l.completeTimeout)
} else {
wCtx, cncl = context.WithCancel(l.stopCtx)
wCtx, cncl = context.WithCancel(ctx)
}
defer cncl() // clean up worker context

Expand Down Expand Up @@ -565,16 +527,16 @@ func (l *Launcher) sendTsk(tsk *Task) {
//
// will not give back the slot if the application is
// shutting down or processing the last task.
func (l *Launcher) giveBackSlot() {
if l.stopCtx.Err() == nil && l.lastCtx.Err() == nil {
func (l *Launcher) giveBackSlot(ctx context.Context) {
if ctx.Err() == nil {
//atomic.AddInt64(&l.tasksRunning, -1)
l.slots <- 1
}
l.wg.Done()
}

// log is the central point of operational logging.
func (l *Launcher) log(msg string) {
func (l *Launcher) log(msg interface{}) {
l.lgr.Println(msg)
}

Expand All @@ -587,9 +549,5 @@ func (l *Launcher) log(msg string) {
func (l *Launcher) Err() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.doneCtx.Err() != nil {
return l.closeErr
}

return nil
return l.closeErr
}
4 changes: 1 addition & 3 deletions launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,14 @@ func TestDoLaunch(t *testing.T) {
}

launcher := &Launcher{
stopCtx: context.Background(),
lastCtx: context.Background(),
slots: make(chan int, 10),
opt: NewLauncherOptions(""),
producer: p,
newWkr: wrkFn,
}
launcher.wg.Add(1)
tsk := &Task{Info: in.info, Meta: in.meta}
launcher.doLaunch(tsk)
launcher.doLaunch(tsk, context.Background())
if err := json.Unmarshal([]byte(p.Messages["done"][0]), tsk); err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package task
import (
"testing"
"time"

"github.com/hydronica/trial"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -336,16 +338,18 @@ func TestTask_String(t *testing.T) {
// task to string plumbing test
tsk := &Task{
Type: "test-type",
Info: "test-info",
Info: "./test-info?arg1=abc&arg2=123",
Msg: "<message>",
}

tskStr := tsk.JSONString()

// correct byte count
// {"type":"test-type","info":"test-info"}
expectedCnt := 39
if len(tskStr) != expectedCnt {
t.Errorf("expected '%v' but got '%v'\n", expectedCnt, len(tskStr))
expected := `{"type":"test-type","info":"./test-info?arg1=abc&arg2=123","msg":"<message>"}`
t.Log(tskStr)
if eq, diff := trial.Equal(expected, tskStr); !eq {
t.Errorf("FAIL: %v", diff)
}
}

Expand Down