14360: Improve comments.
[arvados.git] / lib / dispatchcloud / worker / pool.go
index a7b5132a5ab6307c46b26699bfd2500667817170..1e759e38f3a31992a9fd628248c7d5c3b196b885 100644 (file)
@@ -23,8 +23,8 @@ const (
        tagKeyHold         = "Hold"
 )
 
-// A View shows a worker's current state and recent activity.
-type View struct {
+// An InstanceView shows a worker's current state and recent activity.
+type InstanceView struct {
        Instance             string
        Price                float64
        ArvadosInstanceType  string
@@ -233,6 +233,8 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
 // AtQuota returns true if Create is not expected to work at the
 // moment.
 func (wp *Pool) AtQuota() bool {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
        return time.Now().Before(wp.atQuotaUntil)
 }
 
@@ -311,7 +313,7 @@ func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
        go func() {
                err := wkr.instance.Destroy()
                if err != nil {
-                       logger.WithError(err).Warn("shutdown failed")
+                       logger.WithError(err).WithField("Instance", wkr.instance).Warn("shutdown failed")
                        return
                }
                wp.mtx.Lock()
@@ -382,9 +384,12 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
                stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               wkr.updated = time.Now()
+               now := time.Now()
+               wkr.updated = now
+               wkr.busy = now
                delete(wkr.starting, ctr.UUID)
                wkr.running[ctr.UUID] = struct{}{}
+               wkr.lastUUID = ctr.UUID
                if err != nil {
                        logger.WithField("stdout", string(stdout)).
                                WithField("stderr", string(stderr)).
@@ -404,6 +409,9 @@ func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) b
 
 // KillContainer kills the crunch-run process for the given container
 // UUID, if it's running on any worker.
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
 func (wp *Pool) KillContainer(uuid string) {
        wp.mtx.Lock()
        defer wp.mtx.Unlock()
@@ -542,7 +550,7 @@ func (wp *Pool) runProbes() {
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
-                       if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
+                       if wkr.state == StateShutdown || wp.shutdownIfIdle(wkr) {
                                continue
                        }
                        workers = append(workers, id)
@@ -596,7 +604,28 @@ func (wp *Pool) runSync() {
 }
 
 // caller must have lock.
-func (wp *Pool) autoShutdown(wkr *worker) bool {
+func (wp *Pool) shutdownIfBroken(wkr *worker, dur time.Duration) {
+       if wkr.state == StateHold {
+               return
+       }
+       label, threshold := "", wp.timeoutProbe
+       if wkr.state == StateBooting {
+               label, threshold = "new ", wp.timeoutBooting
+       }
+       if dur < threshold {
+               return
+       }
+       wp.logger.WithFields(logrus.Fields{
+               "Instance": wkr.instance,
+               "Duration": dur,
+               "Since":    wkr.probed,
+               "State":    wkr.state,
+       }).Warnf("%sinstance unresponsive, shutting down", label)
+       wp.shutdown(wkr, wp.logger)
+}
+
+// caller must have lock.
+func (wp *Pool) shutdownIfIdle(wkr *worker) bool {
        if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
                return false
        }
@@ -619,13 +648,14 @@ func (wp *Pool) Stop() {
        close(wp.stop)
 }
 
-// View reports status information for every worker in the pool.
-func (wp *Pool) View() []View {
-       var r []View
+// Instances returns an InstanceView for each worker in the pool,
+// summarizing its current state and recent activity.
+func (wp *Pool) Instances() []InstanceView {
+       var r []InstanceView
        wp.setupOnce.Do(wp.setup)
        wp.mtx.Lock()
        for _, w := range wp.workers {
-               r = append(r, View{
+               r = append(r, InstanceView{
                        Instance:             w.instance.String(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
@@ -762,28 +792,12 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
                } else {
                        logger.Info("instance not responding")
                }
-
-               if wkr.state == StateHold {
-                       return
-               }
-
-               label, threshold := "", wp.timeoutProbe
-               if wkr.state == StateBooting {
-                       label, threshold = "new ", wp.timeoutBooting
-               }
-               if dur > threshold {
-                       logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label)
-                       wp.shutdown(wkr, logger)
-               }
+               wp.shutdownIfBroken(wkr, dur)
                return
        }
 
        updateTime := time.Now()
        wkr.probed = updateTime
-       if len(ctrUUIDs) > 0 {
-               wkr.busy = updateTime
-               wkr.lastUUID = ctrUUIDs[0]
-       }
        if wkr.state == StateShutdown || wkr.state == StateHold {
        } else if booted {
                if wkr.state != StateRunning {
@@ -795,13 +809,18 @@ func (wp *Pool) probeAndUpdate(wkr *worker) {
        }
 
        if updated != wkr.updated {
-               // Worker was updated (e.g., by starting a new
-               // container) after the probe began. Avoid clobbering
-               // those changes with the probe results.
+               // Worker was updated after the probe began, so
+               // wkr.running might have a container UUID that was
+               // not yet running when ctrUUIDs was generated. Leave
+               // wkr.running alone and wait for the next probe to
+               // catch up on any changes.
                return
        }
 
-       if len(ctrUUIDs) == 0 && len(wkr.running) > 0 {
+       if len(ctrUUIDs) > 0 {
+               wkr.busy = updateTime
+               wkr.lastUUID = ctrUUIDs[0]
+       } else if len(wkr.running) > 0 {
                wkr.unallocated = updateTime
        }
        running := map[string]struct{}{}