X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d3b76845c740935f7474f535d308303c748b0b4a..3583e494ed815632bbaa2582fd0a49110a21123b:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 00c75154f6..d34ea68d7a 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -98,6 +98,11 @@ func (d *Dispatcher) Run(ctx context.Context) error { case <-poll.C: break case <-ctx.Done(): + d.mtx.Lock() + defer d.mtx.Unlock() + for _, tracker := range d.trackers { + tracker.close() + } return ctx.Err() } @@ -172,10 +177,12 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } tracker.updates <- c go func() { + fallbackState := Queued err := d.RunContainer(d, c, tracker.updates) if err != nil { text := fmt.Sprintf("Error running container %s: %s", c.UUID, err) if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok { + fallbackState = Cancelled var logBuf bytes.Buffer fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err) if len(err.AvailableTypes) == 0 { @@ -189,7 +196,6 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } } text = logBuf.String() - d.UpdateState(c.UUID, Cancelled) } d.Logger.Printf("%s", text) lr := arvadosclient.Dict{"log": arvadosclient.Dict{ @@ -197,12 +203,30 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { "event_type": "dispatch", "properties": map[string]string{"text": text}}} d.Arv.Create("logs", lr, nil) - d.Unlock(c.UUID) } - - d.mtx.Lock() - delete(d.trackers, c.UUID) - d.mtx.Unlock() + // If checkListForUpdates() doesn't close the tracker + // after 2 queue updates, try to move the container to + // the fallback state, which should eventually work + // and cause the tracker to close. + updates := 0 + for upd := range tracker.updates { + updates++ + if upd.State == Locked || upd.State == Running { + // Tracker didn't clean up before + // returning -- or this is the first + // update and it contains stale + // information from before + // RunContainer() returned. + if updates < 2 { + // Avoid generating confusing + // logs / API calls in the + // stale-info case. + continue + } + d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState) + d.UpdateState(c.UUID, fallbackState) + } + } }() return tracker } @@ -263,12 +287,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking { switch c.State { - case Queued: + case Queued, Cancelled, Complete: + d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State) tracker.close() + delete(d.trackers, c.UUID) case Locked, Running: + d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State) tracker.update(c) - case Cancelled, Complete: - tracker.close() } } else { switch c.State {