From: Tom Clegg Date: Wed, 26 Jan 2022 18:32:13 +0000 (-0500) Subject: 18670: Fix abandoned job tracker during race. X-Git-Tag: 2.4.0~104^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/1789aa86c580495f0a722289cec41c4e31872e26 18670: Fix abandoned job tracker during race. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 00c75154f6..a0a61f2b6d 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -172,10 +172,12 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } tracker.updates <- c go func() { + fallbackState := Queued err := d.RunContainer(d, c, tracker.updates) if err != nil { text := fmt.Sprintf("Error running container %s: %s", c.UUID, err) if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok { + fallbackState = Cancelled var logBuf bytes.Buffer fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err) if len(err.AvailableTypes) == 0 { @@ -189,7 +191,6 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } } text = logBuf.String() - d.UpdateState(c.UUID, Cancelled) } d.Logger.Printf("%s", text) lr := arvadosclient.Dict{"log": arvadosclient.Dict{ @@ -197,12 +198,30 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { "event_type": "dispatch", "properties": map[string]string{"text": text}}} d.Arv.Create("logs", lr, nil) - d.Unlock(c.UUID) } - - d.mtx.Lock() - delete(d.trackers, c.UUID) - d.mtx.Unlock() + // If checkListForUpdates() doesn't close the tracker + // after 2 queue updates, try to move the container to + // the fallback state, which should eventually work + // and cause the tracker to close. + updates := 0 + for upd := range tracker.updates { + updates++ + if upd.State == Locked || upd.State == Running { + // Tracker didn't clean up before + // returning -- or this is the first + // update and it contains stale + // information from before + // RunContainer() returned. + if updates < 2 { + // Avoid generating confusing + // logs / API calls in the + // stale-info case. + continue + } + d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState) + d.UpdateState(c.UUID, fallbackState) + } + } }() return tracker } @@ -263,12 +282,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking { switch c.State { - case Queued: + case Queued, Cancelled, Complete: + d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State) tracker.close() + delete(d.trackers, c.UUID) case Locked, Running: + d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State) tracker.update(c) - case Cancelled, Complete: - tracker.close() } } else { switch c.State {