Merge branch '14660-arvbox-workbench2' refs #14660
[arvados.git] / lib / dispatchcloud / worker / pool.go
index be66895a98e8abb927943a198a72140b6e9c5a91..ff5f762c1d225575f0ad4eeb1b69bb4de463d281 100644 (file)
@@ -60,6 +60,7 @@ const (
        defaultTimeoutIdle        = time.Minute
        defaultTimeoutBooting     = time.Minute * 10
        defaultTimeoutProbe       = time.Minute * 10
+       defaultTimeoutShutdown    = time.Second * 10
 )
 
 func duration(conf arvados.Duration, def time.Duration) time.Duration {
@@ -88,6 +89,8 @@ func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cl
                timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
        go func() {
@@ -115,10 +118,11 @@ type Pool struct {
        timeoutIdle        time.Duration
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
+       timeoutShutdown    time.Duration
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+       creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
        exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
@@ -168,25 +172,41 @@ func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
 
 // Unallocated returns the number of unallocated (creating + booting +
 // idle + unknown) workers for each instance type.
-//
-// The returned counts should be interpreted as upper bounds, rather
-// than exact counts: they are sometimes artificially high when a
-// newly created instance appears in the driver's Instances() list
-// before the Create() call returns.
 func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
-       u := map[arvados.InstanceType]int{}
-       for it, c := range wp.creating {
-               u[it] = c
+       unalloc := map[arvados.InstanceType]int{}
+       creating := map[arvados.InstanceType]int{}
+       for it, times := range wp.creating {
+               creating[it] = len(times)
        }
        for _, wkr := range wp.workers {
-               if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
-                       u[wkr.instType]++
+               if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+                       continue
+               }
+               it := wkr.instType
+               unalloc[it]++
+               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+                       // If up to N new workers appear in
+                       // Instances() while we are waiting for N
+                       // Create() calls to complete, we assume we're
+                       // just seeing a race between Instances() and
+                       // Create() responses.
+                       //
+                       // The other common reason why nodes have
+                       // state==Unknown is that they appeared at
+                       // startup, before any Create calls. They
+                       // don't match the above timing condition, so
+                       // we never mistakenly attribute them to
+                       // pending Create calls.
+                       creating[it]--
                }
        }
-       return u
+       for it, c := range creating {
+               unalloc[it] += c
+       }
+       return unalloc
 }
 
 // Create a new instance with the given type, and add it to the worker
@@ -201,13 +221,21 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
                return wp.atQuotaErr
        }
        tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
-       wp.creating[it]++
+       now := time.Now()
+       wp.creating[it] = append(wp.creating[it], now)
        go func() {
                defer wp.notify()
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               wp.creating[it]--
+               // Remove our timestamp marker from wp.creating
+               for i, t := range wp.creating[it] {
+                       if t == now {
+                               copy(wp.creating[it][i:], wp.creating[it][i+1:])
+                               wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
+                               break
+                       }
+               }
                if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
                        wp.atQuotaErr = err
                        wp.atQuotaUntil = time.Now().Add(time.Minute)
@@ -230,19 +258,21 @@ func (wp *Pool) AtQuota() bool {
 }
 
 // Add or update worker attached to the given instance. Use
-// initialState if a new worker is created. Caller must have lock.
+// initialState if a new worker is created.
+//
+// The second return value is true if a new worker is created.
 //
-// Returns true when a new worker is created.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) bool {
+// Caller must have lock.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
        id := inst.ID()
-       if wp.workers[id] != nil {
-               wp.workers[id].executor.SetTarget(inst)
-               wp.workers[id].instance = inst
-               wp.workers[id].updated = time.Now()
-               if initialState == StateBooting && wp.workers[id].state == StateUnknown {
-                       wp.workers[id].state = StateBooting
+       if wkr := wp.workers[id]; wkr != nil {
+               wkr.executor.SetTarget(inst)
+               wkr.instance = inst
+               wkr.updated = time.Now()
+               if initialState == StateBooting && wkr.state == StateUnknown {
+                       wkr.state = StateBooting
                }
-               return false
+               return wkr, false
        }
        if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
                initialState = StateHold
@@ -253,7 +283,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
        })
        logger.WithField("State", initialState).Infof("instance appeared in cloud")
        now := time.Now()
-       wp.workers[id] = &worker{
+       wkr := &worker{
                mtx:      &wp.mtx,
                wp:       wp,
                logger:   logger,
@@ -261,6 +291,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                state:    initialState,
                instance: inst,
                instType: it,
+               appeared: now,
                probed:   now,
                busy:     now,
                updated:  now,
@@ -268,7 +299,8 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                starting: make(map[string]struct{}),
                probing:  make(chan struct{}, 1),
        }
-       return true
+       wp.workers[id] = wkr
+       return wkr, true
 }
 
 // caller must have lock.
@@ -298,8 +330,8 @@ func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
        return false
 }
 
-// Workers returns the current number of workers in each state.
-func (wp *Pool) Workers() map[State]int {
+// CountWorkers returns the current number of workers in each state.
+func (wp *Pool) CountWorkers() map[State]int {
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
@@ -379,7 +411,7 @@ func (wp *Pool) kill(wkr *worker, uuid string) {
                "Instance":      wkr.instance,
        })
        logger.Debug("killing process")
-       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
+       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
        if err != nil {
                logger.WithFields(logrus.Fields{
                        "stderr": string(stderr),
@@ -573,7 +605,7 @@ func (wp *Pool) Instances() []InstanceView {
 }
 
 func (wp *Pool) setup() {
-       wp.creating = map[arvados.InstanceType]int{}
+       wp.creating = map[arvados.InstanceType][]time.Time{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
@@ -619,8 +651,11 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               if wp.updateWorker(inst, it, StateUnknown) {
+               if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
                        notify = true
+               } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
+                       wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+                       wkr.shutdown()
                }
        }