18670: Fix abandoned job tracker during race.
authorTom Clegg <tom@curii.com>
Wed, 26 Jan 2022 18:32:13 +0000 (13:32 -0500)
committerTom Clegg <tom@curii.com>
Wed, 26 Jan 2022 18:32:13 +0000 (13:32 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

sdk/go/dispatch/dispatch.go

index 00c75154f656a70e0b42deed7ef0e34fa7a01d7d..a0a61f2b6d8aa28acc412376e058db8a5842215d 100644 (file)
@@ -172,10 +172,12 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
        }
        tracker.updates <- c
        go func() {
+               fallbackState := Queued
                err := d.RunContainer(d, c, tracker.updates)
                if err != nil {
                        text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
                        if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+                               fallbackState = Cancelled
                                var logBuf bytes.Buffer
                                fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
                                if len(err.AvailableTypes) == 0 {
@@ -189,7 +191,6 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
                                        }
                                }
                                text = logBuf.String()
-                               d.UpdateState(c.UUID, Cancelled)
                        }
                        d.Logger.Printf("%s", text)
                        lr := arvadosclient.Dict{"log": arvadosclient.Dict{
@@ -197,12 +198,30 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker {
                                "event_type":  "dispatch",
                                "properties":  map[string]string{"text": text}}}
                        d.Arv.Create("logs", lr, nil)
-                       d.Unlock(c.UUID)
                }
-
-               d.mtx.Lock()
-               delete(d.trackers, c.UUID)
-               d.mtx.Unlock()
+               // If checkListForUpdates() doesn't close the tracker
+               // after 2 queue updates, try to move the container to
+               // the fallback state, which should eventually work
+               // and cause the tracker to close.
+               updates := 0
+               for upd := range tracker.updates {
+                       updates++
+                       if upd.State == Locked || upd.State == Running {
+                               // Tracker didn't clean up before
+                               // returning -- or this is the first
+                               // update and it contains stale
+                               // information from before
+                               // RunContainer() returned.
+                               if updates < 2 {
+                                       // Avoid generating confusing
+                                       // logs / API calls in the
+                                       // stale-info case.
+                                       continue
+                               }
+                               d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
+                               d.UpdateState(c.UUID, fallbackState)
+                       }
+               }
        }()
        return tracker
 }
@@ -263,12 +282,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                        d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
                } else if alreadyTracking {
                        switch c.State {
-                       case Queued:
+                       case Queued, Cancelled, Complete:
+                               d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
                                tracker.close()
+                               delete(d.trackers, c.UUID)
                        case Locked, Running:
+                               d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
                                tracker.update(c)
-                       case Cancelled, Complete:
-                               tracker.close()
                        }
                } else {
                        switch c.State {