Merge branch '14920-unknown-booting-race'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 7 Mar 2019 19:35:01 +0000 (14:35 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 7 Mar 2019 19:35:01 +0000 (14:35 -0500)
fixes #14920

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/worker.go

index fe1c6ecc0304f64345135f6016d29a8b1512fea1..e81c2c091f1c37c7b52488b4d919bdb9a9fe4d79 100644 (file)
@@ -140,7 +140,7 @@ type Pool struct {
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
-       creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
+       creating     map[string]createCall // unfinished (cloud.InstanceSet)Create calls (key is instance secret)
        workers      map[cloud.InstanceID]*worker
        loaded       bool                 // loaded list of instances from InstanceSet at least once
        exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
@@ -160,6 +160,11 @@ type Pool struct {
        mMemory            *prometheus.GaugeVec
 }
 
+type createCall struct {
+       time         time.Time
+       instanceType arvados.InstanceType
+}
+
 // Subscribe returns a buffered channel that becomes ready after any
 // change to the pool's state that could have scheduling implications:
 // a worker's state changes, a new worker appears, the cloud
@@ -205,8 +210,13 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
        defer wp.mtx.RUnlock()
        unalloc := map[arvados.InstanceType]int{}
        creating := map[arvados.InstanceType]int{}
-       for it, times := range wp.creating {
-               creating[it] = len(times)
+       oldestCreate := map[arvados.InstanceType]time.Time{}
+       for _, cc := range wp.creating {
+               it := cc.instanceType
+               creating[it]++
+               if t, ok := oldestCreate[it]; !ok || t.After(cc.time) {
+                       oldestCreate[it] = cc.time
+               }
        }
        for _, wkr := range wp.workers {
                // Skip workers that are not expected to become
@@ -221,7 +231,7 @@ func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
                }
                it := wkr.instType
                unalloc[it]++
-               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(oldestCreate[it]) {
                        // If up to N new workers appear in
                        // Instances() while we are waiting for N
                        // Create() calls to complete, we assume we're
@@ -260,10 +270,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                return false
        }
        now := time.Now()
-       wp.creating[it] = append(wp.creating[it], now)
+       secret := randomHex(instanceSecretLength)
+       wp.creating[secret] = createCall{time: now, instanceType: it}
        go func() {
                defer wp.notify()
-               secret := randomHex(instanceSecretLength)
                tags := cloud.InstanceTags{
                        tagKeyInstanceType:   it.Name,
                        tagKeyIdleBehavior:   string(IdleBehaviorRun),
@@ -273,14 +283,10 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
-               // Remove our timestamp marker from wp.creating
-               for i, t := range wp.creating[it] {
-                       if t == now {
-                               copy(wp.creating[it][i:], wp.creating[it][i+1:])
-                               wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
-                               break
-                       }
-               }
+               // delete() is deferred so the updateWorker() call
+               // below knows to use StateBooting when adding a new
+               // worker.
+               defer delete(wp.creating, secret)
                if err != nil {
                        if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
                                wp.atQuotaErr = err
@@ -291,7 +297,7 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
                        wp.instanceSet.throttleCreate.CheckRateLimitError(err, wp.logger, "create instance", wp.notify)
                        return
                }
-               wp.updateWorker(inst, it, StateBooting)
+               wp.updateWorker(inst, it)
        }()
        return true
 }
@@ -319,26 +325,30 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
        return nil
 }
 
-// Add or update worker attached to the given instance. Use
-// initialState if a new worker is created.
+// Add or update worker attached to the given instance.
 //
 // The second return value is true if a new worker is created.
 //
+// A newly added instance has state=StateBooting if its tags match an
+// entry in wp.creating, otherwise StateUnknown.
+//
 // Caller must have lock.
-func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
        inst = tagVerifier{inst}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
                wkr.instance = inst
                wkr.updated = time.Now()
-               if initialState == StateBooting && wkr.state == StateUnknown {
-                       wkr.state = StateBooting
-               }
                wkr.saveTags()
                return wkr, false
        }
 
+       state := StateUnknown
+       if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+               state = StateBooting
+       }
+
        // If an instance has a valid IdleBehavior tag when it first
        // appears, initialize the new worker accordingly (this is how
        // we restore IdleBehavior that was set by a prior dispatch
@@ -356,7 +366,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                "Address":      inst.Address(),
        })
        logger.WithFields(logrus.Fields{
-               "State":        initialState,
+               "State":        state,
                "IdleBehavior": idleBehavior,
        }).Infof("instance appeared in cloud")
        now := time.Now()
@@ -365,7 +375,7 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
                wp:           wp,
                logger:       logger,
                executor:     wp.newExecutor(inst),
-               state:        initialState,
+               state:        state,
                idleBehavior: idleBehavior,
                instance:     inst,
                instType:     it,
@@ -703,7 +713,7 @@ func (wp *Pool) Instances() []InstanceView {
 }
 
 func (wp *Pool) setup() {
-       wp.creating = map[arvados.InstanceType][]time.Time{}
+       wp.creating = map[string]createCall{}
        wp.exited = map[string]time.Time{}
        wp.workers = map[cloud.InstanceID]*worker{}
        wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
@@ -753,7 +763,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
                        continue
                }
-               if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+               if wkr, isNew := wp.updateWorker(inst, it); isNew {
                        notify = true
                } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
                        wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
index 9be9f41f43b7ef51cbb1d1257e4ac39f642472aa..64e1f7797af8634be63502faea5faaaa8b30a5f9 100644 (file)
@@ -101,7 +101,10 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
        logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
        wkr.starting[ctr.UUID] = struct{}{}
-       wkr.state = StateRunning
+       if wkr.state != StateRunning {
+               wkr.state = StateRunning
+               go wkr.wp.notify()
+       }
        go func() {
                env := map[string]string{
                        "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,