14360: Add "idle" state.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index d1e23eb6b2316d45bbcee3ae6b9db736fb3921db..4e331256f9123d37f0c81bda8edaaa096fc82969 100644 (file)
@@ -30,7 +30,7 @@ type InstanceView struct {
        ArvadosInstanceType  string
        ProviderInstanceType string
        LastContainerUUID    string
-       Unallocated          time.Time
+       LastBusy             time.Time
        WorkerState          string
 }
 
@@ -124,6 +124,7 @@ type Pool struct {
        loaded       bool                 // loaded list of instances from InstanceSet at least once
        exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
        atQuotaUntil time.Time
+       atQuotaErr   cloud.QuotaError
        stop         chan bool
        mtx          sync.RWMutex
        setupOnce    sync.Once
@@ -137,21 +138,19 @@ type Pool struct {
 }
 
 type worker struct {
-       state       State
-       instance    cloud.Instance
-       executor    Executor
-       instType    arvados.InstanceType
-       vcpus       int64
-       memory      int64
-       booted      bool
-       probed      time.Time
-       updated     time.Time
-       busy        time.Time
-       unallocated time.Time
-       lastUUID    string
-       running     map[string]struct{}
-       starting    map[string]struct{}
-       probing     chan struct{}
+       state    State
+       instance cloud.Instance
+       executor Executor
+       instType arvados.InstanceType
+       vcpus    int64
+       memory   int64
+       probed   time.Time
+       updated  time.Time
+       busy     time.Time
+       lastUUID string
+       running  map[string]struct{} // remember to update state idle<->running when this changes
+       starting map[string]struct{} // remember to update state idle<->running when this changes
+       probing  chan struct{}
 }
 
 // Subscribe returns a channel that becomes ready whenever a worker's
@@ -195,7 +194,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
                u[it] = c
        }
        for _, wkr := range wp.workers {
-               if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
+               if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
                        u[wkr.instType]++
                }
        }
@@ -210,6 +209,9 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
+       if time.Now().Before(wp.atQuotaUntil) {
+               return wp.atQuotaErr
+       }
        tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
        wp.creating[it]++
        go func() {
@@ -219,6 +221,7 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
                defer wp.mtx.Unlock()
                wp.creating[it]--
                if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                       wp.atQuotaErr = err
                        wp.atQuotaUntil = time.Now().Add(time.Minute)
                }
                if err != nil {
@@ -263,17 +266,16 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
        }).Infof("instance appeared in cloud")
        now := time.Now()
        wp.workers[id] = &worker{
-               executor:    wp.newExecutor(inst),
-               state:       initialState,
-               instance:    inst,
-               instType:    it,
-               probed:      now,
-               busy:        now,
-               updated:     now,
-               unallocated: now,
-               running:     make(map[string]struct{}),
-               starting:    make(map[string]struct{}),
-               probing:     make(chan struct{}, 1),
+               executor: wp.newExecutor(inst),
+               state:    initialState,
+               instance: inst,
+               instType: it,
+               probed:   now,
+               busy:     now,
+               updated:  now,
+               running:  make(map[string]struct{}),
+               starting: make(map[string]struct{}),
+               probing:  make(chan struct{}, 1),
        }
        return true
 }
@@ -286,21 +288,16 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
        defer wp.mtx.Unlock()
        logger := wp.logger.WithField("InstanceType", it.Name)
        logger.Info("shutdown requested")
-       for _, tryState := range []State{StateBooting, StateRunning} {
+       for _, tryState := range []State{StateBooting, StateIdle} {
                // TODO: shutdown the worker with the longest idle
-               // time (Running) or the earliest create time
-               // (Booting)
+               // time (Idle) or the earliest create time (Booting)
                for _, wkr := range wp.workers {
-                       if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
-                               continue
+                       if wkr.state == tryState && wkr.instType == it {
+                               logger = logger.WithField("Instance", wkr.instance)
+                               logger.Info("shutting down")
+                               wp.shutdown(wkr, logger)
+                               return true
                        }
-                       if wkr.instType != it {
-                               continue
-                       }
-                       logger = logger.WithField("Instance", wkr.instance)
-                       logger.Info("shutting down")
-                       wp.shutdown(wkr, logger)
-                       return true
                }
        }
        return false
@@ -368,7 +365,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
        defer wp.mtx.Unlock()
        var wkr *worker
        for _, w := range wp.workers {
-               if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
+               if w.instType == it && w.state == StateIdle {
                        if wkr == nil || w.busy.After(wkr.busy) {
                                wkr = w
                        }
@@ -380,6 +377,7 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
        logger = logger.WithField("Instance", wkr.instance)
        logger.Debug("starting container")
        wkr.starting[ctr.UUID] = struct{}{}
+       wkr.state = StateRunning
        go func() {
                stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
                wp.mtx.Lock()
@@ -409,6 +407,9 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 
 // KillContainer kills the crunch-run process for the given container
 // UUID, if it's running on any worker.
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
 func (wp *Pool) KillContainer(uuid string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
@@ -446,6 +447,9 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
        defer wp.mtx.Unlock()
        if _, ok := wkr.running[uuid]; ok {
                delete(wkr.running, uuid)
+               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+                       wkr.state = StateIdle
+               }
                wkr.updated = time.Now()
                go wp.notify()
        }
@@ -623,10 +627,10 @@ func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
 
 // caller must have lock.
 func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
-       if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
+       if wkr.state != StateIdle {
                return false
        }
-       age := time.Since(wkr.unallocated)
+       age := time.Since(wkr.busy)
        if age < wp.timeoutIdle {
                return false
        }
@@ -658,7 +662,7 @@ func (wp *Pool) Instances() []InstanceView {
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
                        LastContainerUUID:    w.lastUUID,
-                       Unallocated:          w.unallocated,
+                       LastBusy:             w.busy,
                        WorkerState:          w.state.String(),
                })
        }
@@ -750,33 +754,37 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
        logger := wp.logger.WithField("Instance", wkr.instance)
        wp.mtx.Lock()
        updated := wkr.updated
-       booted := wkr.booted
+       needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
+       needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
        wp.mtx.Unlock()
+       if !needProbeBooted && !needProbeRunning {
+               return
+       }
 
        var (
                ctrUUIDs []string
                ok       bool
                stderr   []byte
        )
-       if !booted {
-               booted, stderr = wp.probeBooted(wkr)
+       if needProbeBooted {
+               ok, stderr = wp.probeBooted(wkr)
                wp.mtx.Lock()
-               if booted && !wkr.booted {
-                       wkr.booted = booted
-                       logger.Info("instance booted")
-               } else {
-                       booted = wkr.booted
+               if ok || wkr.state == StateRunning || wkr.state == StateIdle {
+                       logger.Info("instance booted; will try probeRunning")
+                       needProbeRunning = true
                }
                wp.mtx.Unlock()
        }
-       if booted {
+       if needProbeRunning {
                ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
        }
        logger = logger.WithField("stderr", string(stderr))
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
        if !ok {
-               if wkr.state == StateShutdown {
+               if wkr.state == StateShutdown && wkr.updated.After(updated) {
+                       // Skip the logging noise if shutdown was
+                       // initiated during probe.
                        return
                }
                dur := time.Since(wkr.probed)
@@ -795,15 +803,6 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
 
        updateTime := time.Now()
        wkr.probed = updateTime
-       if wkr.state == StateShutdown || wkr.state == StateHold {
-       } else if booted {
-               if wkr.state != StateRunning {
-                       wkr.state = StateRunning
-                       go wp.notify()
-               }
-       } else {
-               wkr.state = StateBooting
-       }
 
        if updated != wkr.updated {
                // Worker was updated after the probe began, so
@@ -818,7 +817,10 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
                wkr.busy = updateTime
                wkr.lastUUID = ctrUUIDs[0]
        } else if len(wkr.running) > 0 {
-               wkr.unallocated = updateTime
+               // Actual last-busy time was sometime between wkr.busy
+               // and now. Now is the earliest opportunity to take
+               // advantage of the non-busy state, though.
+               wkr.busy = updateTime
        }
        running := map[string]struct{}{}
        changed := false
@@ -835,8 +837,17 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
                        changed = true
                }
        }
+       if wkr.state == StateUnknown || wkr.state == StateBooting {
+               wkr.state = StateIdle
+               changed = true
+       }
        if changed {
                wkr.running = running
+               if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+                       wkr.state = StateRunning
+               } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+                       wkr.state = StateIdle
+               }
                wkr.updated = updateTime
                go wp.notify()
        }