14360: Add "idle" state.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 6 Nov 2018 21:17:58 +0000 (16:17 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 6 Nov 2018 21:17:58 +0000 (16:17 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/worker.go

index e5e8c7ecfab8ae1297e471bad17d87eca67f3662..b8a03275f88bb78f01c2368ad5ffeb04b6e8508d 100644 (file)
@@ -75,7 +75,8 @@ func (p *stubPool) Shutdown(arvados.InstanceType) bool {
 func (p *stubPool) Workers() map[worker.State]int {
        return map[worker.State]int{
                worker.StateBooting: len(p.unalloc) - len(p.idle),
-               worker.StateRunning: len(p.idle) - len(p.running),
+               worker.StateIdle:    len(p.idle),
+               worker.StateRunning: len(p.running),
        }
 }
 func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
index b1b570bd900f3a661816a217910d35fde3bc563b..4e331256f9123d37f0c81bda8edaaa096fc82969 100644 (file)
@@ -148,8 +148,8 @@ type worker struct {
        updated  time.Time
        busy     time.Time
        lastUUID string
-       running  map[string]struct{}
-       starting map[string]struct{}
+       running  map[string]struct{} // remember to update state idle<->running when this changes
+       starting map[string]struct{} // remember to update state idle<->running when this changes
        probing  chan struct{}
 }
 
@@ -194,7 +194,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
                u[it] = c
        }
        for _, wkr := range wp.workers {
-               if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
+               if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
                        u[wkr.instType]++
                }
        }
@@ -288,21 +288,16 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
        defer wp.mtx.Unlock()
        logger := wp.logger.WithField("InstanceType", it.Name)
        logger.Info("shutdown requested")
-       for _, tryState := range []State{StateBooting, StateRunning} {
+       for _, tryState := range []State{StateBooting, StateIdle} {
                // TODO: shutdown the worker with the longest idle
-               // time (Running) or the earliest create time
-               // (Booting)
+               // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
-                       if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
-                               continue
-                       }
-                       if wkr.instType != it {
-                               continue
+                       if wkr.state == tryState && wkr.instType == it {
+                               logger = logger.WithField("Instance", wkr.instance)
+                               logger.Info("shutting down")
+                               wp.shutdown(wkr, logger)
+                               return true
                        }
-                       logger = logger.WithField("Instance", wkr.instance)
-                       logger.Info("shutting down")
-                       wp.shutdown(wkr, logger)
-                       return true
                }
        }
        return false
@@ -370,7 +365,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
        defer wp.mtx.Unlock()
        var wkr *worker
        for _, w := range wp.workers {
-               if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
+               if w.instType == it && w.state == StateIdle {
                        if wkr == nil || w.busy.After(wkr.busy) {
                                wkr = w
                        }
@@ -382,6 +377,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
        logger = logger.WithField("Instance", wkr.instance)
        logger.Debug("starting container")
        wkr.starting[ctr.UUID] = struct{}{}
+       wkr.state = StateRunning
        go func() {
                stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
                wp.mtx.Lock()
@@ -451,6 +447,9 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
        defer wp.mtx.Unlock()
        if _, ok := wkr.running[uuid]; ok {
                delete(wkr.running, uuid)
+               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+                       wkr.state = StateIdle
+               }
                wkr.updated = time.Now()
                go wp.notify()
        }
@@ -628,7 +627,7 @@ func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
 
 // caller must have lock.
 func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
-       if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
+       if wkr.state != StateIdle {
                return false
        }
        age := time.Since(wkr.busy)
@@ -755,7 +754,7 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
        logger := wp.logger.WithField("Instance", wkr.instance)
        wp.mtx.Lock()
        updated := wkr.updated
-       needProbeRunning := wkr.state == StateRunning
+       needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
        needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
        wp.mtx.Unlock()
        if !needProbeBooted && !needProbeRunning {
@@ -770,13 +769,10 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
        if needProbeBooted {
                ok, stderr = wp.probeBooted(wkr)
                wp.mtx.Lock()
-               if ok && (wkr.state == StateUnknown || wkr.state == StateBooting) {
-                       wkr.state = StateRunning
-                       wkr.probed = time.Now()
-                       logger.Info("instance booted")
-                       go wp.notify()
+               if ok || wkr.state == StateRunning || wkr.state == StateIdle {
+                       logger.Info("instance booted; will try probeRunning")
+                       needProbeRunning = true
                }
-               needProbeRunning = wkr.state == StateRunning
                wp.mtx.Unlock()
        }
        if needProbeRunning {
@@ -786,7 +782,7 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if !ok {
-               if wkr.state == StateShutdown {
+               if wkr.state == StateShutdown && wkr.updated.After(updated) {
                        // Skip the logging noise if shutdown was
                        // initiated during probe.
                        return
@@ -841,8 +837,17 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
                        changed = true
                }
        }
+       if wkr.state == StateUnknown || wkr.state == StateBooting {
+               wkr.state = StateIdle
+               changed = true
+       }
        if changed {
                wkr.running = running
+               if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+                       wkr.state = StateRunning
+               } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+                       wkr.state = StateIdle
+               }
                wkr.updated = updateTime
                go wp.notify()
        }
index 7828d4f69fc0b81645e9665dfe798a275b3ea07b..c24efeb3c577eda3498b89cb2d5d730e45e54b15 100644 (file)
@@ -15,7 +15,8 @@ type State int
 const (
        StateUnknown  State = iota // might be running a container already
        StateBooting               // instance is booting
-       StateRunning               // instance is running
+       StateIdle                  // instance booted, no containers are running
+       StateRunning               // instance is running one or more containers
        StateShutdown              // worker has stopped monitoring the instance
        StateHold                  // running, but not available to run new containers
 )
@@ -28,6 +29,7 @@ const (
 var stateString = map[State]string{
        StateUnknown:  "unknown",
        StateBooting:  "booting",
+       StateIdle:     "idle",
        StateRunning:  "running",
        StateShutdown: "shutdown",
        StateHold:     "hold",