X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/dc021c3b57dcdebe464c148d55f9990a74e8246b..ae92d144610446849eb568247a44f02ae985c281:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index df43c2b10d..d34ea68d7a 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -7,11 +7,13 @@ package dispatch import ( + "bytes" "context" "fmt" "sync" "time" + "git.arvados.org/arvados.git/lib/dispatchcloud" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "github.com/sirupsen/logrus" @@ -66,7 +68,7 @@ type Dispatcher struct { // running, and return. // // The DispatchFunc should not return until the container is finished. -type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error // Run watches the API server's queue for containers that are either // ready to run and available to lock, or are already locked by this @@ -96,6 +98,11 @@ func (d *Dispatcher) Run(ctx context.Context) error { case <-poll.C: break case <-ctx.Done(): + d.mtx.Lock() + defer d.mtx.Unlock() + for _, tracker := range d.trackers { + tracker.close() + } return ctx.Err() } @@ -170,12 +177,56 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } tracker.updates <- c go func() { - d.RunContainer(d, c, tracker.updates) - // RunContainer blocks for the lifetime of the container. When - // it returns, the tracker should delete itself. - d.mtx.Lock() - delete(d.trackers, c.UUID) - d.mtx.Unlock() + fallbackState := Queued + err := d.RunContainer(d, c, tracker.updates) + if err != nil { + text := fmt.Sprintf("Error running container %s: %s", c.UUID, err) + if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok { + fallbackState = Cancelled + var logBuf bytes.Buffer + fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err) + if len(err.AvailableTypes) == 0 { + fmt.Fprint(&logBuf, "No instance types are configured.\n") + } else { + fmt.Fprint(&logBuf, "Available instance types:\n") + for _, t := range err.AvailableTypes { + fmt.Fprintf(&logBuf, + "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", + t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price) + } + } + text = logBuf.String() + } + d.Logger.Printf("%s", text) + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": c.UUID, + "event_type": "dispatch", + "properties": map[string]string{"text": text}}} + d.Arv.Create("logs", lr, nil) + } + // If checkListForUpdates() doesn't close the tracker + // after 2 queue updates, try to move the container to + // the fallback state, which should eventually work + // and cause the tracker to close. + updates := 0 + for upd := range tracker.updates { + updates++ + if upd.State == Locked || upd.State == Running { + // Tracker didn't clean up before + // returning -- or this is the first + // update and it contains stale + // information from before + // RunContainer() returned. + if updates < 2 { + // Avoid generating confusing + // logs / API calls in the + // stale-info case. + continue + } + d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState) + d.UpdateState(c.UUID, fallbackState) + } + } }() return tracker } @@ -236,12 +287,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking { switch c.State { - case Queued: + case Queued, Cancelled, Complete: + d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State) tracker.close() + delete(d.trackers, c.UUID) case Locked, Running: + d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State) tracker.update(c) - case Cancelled, Complete: - tracker.close() } } else { switch c.State {