X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/420a8e7fb7b159452da834062cc3e040dd1b411b..ae92d144610446849eb568247a44f02ae985c281:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index e9fb6b6d93..d34ea68d7a 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -7,14 +7,16 @@ package dispatch import ( + "bytes" "context" "fmt" - "log" "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/lib/dispatchcloud" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "github.com/sirupsen/logrus" ) const ( @@ -25,12 +27,20 @@ const ( Cancelled = arvados.ContainerStateCancelled ) +type Logger interface { + Printf(string, ...interface{}) + Warnf(string, ...interface{}) + Debugf(string, ...interface{}) +} + // Dispatcher struct type Dispatcher struct { Arv *arvadosclient.ArvadosClient + Logger Logger + // Batch size for container queries - BatchSize int64 + BatchSize int // Queue polling frequency PollPeriod time.Duration @@ -58,13 +68,17 @@ type Dispatcher struct { // running, and return. // // The DispatchFunc should not return until the container is finished. -type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error // Run watches the API server's queue for containers that are either // ready to run and available to lock, or are already locked by this // dispatcher's token. When a new one appears, Run calls RunContainer // in a new goroutine. func (d *Dispatcher) Run(ctx context.Context) error { + if d.Logger == nil { + d.Logger = logrus.StandardLogger() + } + err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth) if err != nil { return fmt.Errorf("error getting my token UUID: %v", err) @@ -84,6 +98,11 @@ func (d *Dispatcher) Run(ctx context.Context) error { case <-poll.C: break case <-ctx.Done(): + d.mtx.Lock() + defer d.mtx.Unlock() + for _, tracker := range d.trackers { + tracker.close() + } return ctx.Err() } @@ -142,7 +161,7 @@ func (d *Dispatcher) Run(ctx context.Context) error { // Containers that I know about that didn't show up in any // query should be let go. for uuid, tracker := range todo { - log.Printf("Container %q not returned by any query, stopping tracking.", uuid) + d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid) tracker.close() } @@ -152,15 +171,62 @@ func (d *Dispatcher) Run(ctx context.Context) error { // Start a runner in a new goroutine, and send the initial container // record to its updates channel. func (d *Dispatcher) start(c arvados.Container) *runTracker { - tracker := &runTracker{updates: make(chan arvados.Container, 1)} + tracker := &runTracker{ + updates: make(chan arvados.Container, 1), + logger: d.Logger, + } tracker.updates <- c go func() { - d.RunContainer(d, c, tracker.updates) - // RunContainer blocks for the lifetime of the container. When - // it returns, the tracker should delete itself. - d.mtx.Lock() - delete(d.trackers, c.UUID) - d.mtx.Unlock() + fallbackState := Queued + err := d.RunContainer(d, c, tracker.updates) + if err != nil { + text := fmt.Sprintf("Error running container %s: %s", c.UUID, err) + if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok { + fallbackState = Cancelled + var logBuf bytes.Buffer + fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err) + if len(err.AvailableTypes) == 0 { + fmt.Fprint(&logBuf, "No instance types are configured.\n") + } else { + fmt.Fprint(&logBuf, "Available instance types:\n") + for _, t := range err.AvailableTypes { + fmt.Fprintf(&logBuf, + "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", + t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price) + } + } + text = logBuf.String() + } + d.Logger.Printf("%s", text) + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": c.UUID, + "event_type": "dispatch", + "properties": map[string]string{"text": text}}} + d.Arv.Create("logs", lr, nil) + } + // If checkListForUpdates() doesn't close the tracker + // after 2 queue updates, try to move the container to + // the fallback state, which should eventually work + // and cause the tracker to close. + updates := 0 + for upd := range tracker.updates { + updates++ + if upd.State == Locked || upd.State == Running { + // Tracker didn't clean up before + // returning -- or this is the first + // update and it contains stale + // information from before + // RunContainer() returned. + if updates < 2 { + // Avoid generating confusing + // logs / API calls in the + // stale-info case. + continue + } + d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState) + d.UpdateState(c.UUID, fallbackState) + } + } }() return tracker } @@ -169,27 +235,33 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r var countList arvados.ContainerList params := arvadosclient.Dict{ "filters": filters, - "count": "exact", - "limit": 0, + "count": "exact", + "limit": 0, "order": []string{"priority desc"}} err := d.Arv.List("containers", params, &countList) if err != nil { - log.Printf("Error getting count of containers: %q", err) + d.Logger.Warnf("error getting count of containers: %q", err) return false } - itemsAvailable := countList.ItemsAvailable + itemsAvailable := countList.ItemsAvailable params = arvadosclient.Dict{ "filters": filters, - "count": "none", - "limit": d.BatchSize, + "count": "none", + "limit": d.BatchSize, "order": []string{"priority desc"}} offset := 0 for { params["offset"] = offset + + // This list variable must be a new one declared + // inside the loop: otherwise, items in the API + // response would get deep-merged into the items + // loaded in previous iterations. var list arvados.ContainerList + err := d.Arv.List("containers", params, &list) if err != nil { - log.Printf("Error getting list of containers: %q", err) + d.Logger.Warnf("error getting list of containers: %q", err) return false } d.checkListForUpdates(list.Items, todo) @@ -212,15 +284,16 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma delete(todo, c.UUID) if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID { - log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID) + d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking { switch c.State { - case Queued: + case Queued, Cancelled, Complete: + d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State) tracker.close() + delete(d.trackers, c.UUID) case Locked, Running: + d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State) tracker.update(c) - case Cancelled, Complete: - tracker.close() } } else { switch c.State { @@ -230,7 +303,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma } err := d.lock(c.UUID) if err != nil { - log.Printf("debug: error locking container %s: %s", c.UUID, err) + d.Logger.Warnf("error locking container %s: %s", c.UUID, err) break } c.State = Locked @@ -254,7 +327,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro "container": arvadosclient.Dict{"state": state}, }, nil) if err != nil { - log.Printf("Error updating container %s to state %q: %s", uuid, state, err) + d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err) } return err } @@ -309,6 +382,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error { type runTracker struct { closing bool updates chan arvados.Container + logger Logger } func (tracker *runTracker) close() { @@ -324,7 +398,7 @@ func (tracker *runTracker) update(c arvados.Container) { } select { case <-tracker.updates: - log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID) + tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID) default: } tracker.updates <- c