X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/58044098495d066effa7fd4742b6635d9a10fdfb..ae92d144610446849eb568247a44f02ae985c281:/sdk/go/dispatch/dispatch.go diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index 152207ea94..d34ea68d7a 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -7,14 +7,16 @@ package dispatch import ( + "bytes" "context" "fmt" "sync" "time" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "github.com/Sirupsen/logrus" + "git.arvados.org/arvados.git/lib/dispatchcloud" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "github.com/sirupsen/logrus" ) const ( @@ -37,6 +39,9 @@ type Dispatcher struct { Logger Logger + // Batch size for container queries + BatchSize int + // Queue polling frequency PollPeriod time.Duration @@ -63,7 +68,7 @@ type Dispatcher struct { // running, and return. // // The DispatchFunc should not return until the container is finished. -type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) +type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error // Run watches the API server's queue for containers that are either // ready to run and available to lock, or are already locked by this @@ -84,11 +89,20 @@ func (d *Dispatcher) Run(ctx context.Context) error { poll := time.NewTicker(d.PollPeriod) defer poll.Stop() + if d.BatchSize == 0 { + d.BatchSize = 100 + } + for { select { case <-poll.C: break case <-ctx.Done(): + d.mtx.Lock() + defer d.mtx.Unlock() + for _, tracker := range d.trackers { + tracker.close() + } return ctx.Err() } @@ -163,19 +177,77 @@ func (d *Dispatcher) start(c arvados.Container) *runTracker { } tracker.updates <- c go func() { - d.RunContainer(d, c, tracker.updates) - // RunContainer blocks for the lifetime of the container. When - // it returns, the tracker should delete itself. - d.mtx.Lock() - delete(d.trackers, c.UUID) - d.mtx.Unlock() + fallbackState := Queued + err := d.RunContainer(d, c, tracker.updates) + if err != nil { + text := fmt.Sprintf("Error running container %s: %s", c.UUID, err) + if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok { + fallbackState = Cancelled + var logBuf bytes.Buffer + fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err) + if len(err.AvailableTypes) == 0 { + fmt.Fprint(&logBuf, "No instance types are configured.\n") + } else { + fmt.Fprint(&logBuf, "Available instance types:\n") + for _, t := range err.AvailableTypes { + fmt.Fprintf(&logBuf, + "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n", + t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price) + } + } + text = logBuf.String() + } + d.Logger.Printf("%s", text) + lr := arvadosclient.Dict{"log": arvadosclient.Dict{ + "object_uuid": c.UUID, + "event_type": "dispatch", + "properties": map[string]string{"text": text}}} + d.Arv.Create("logs", lr, nil) + } + // If checkListForUpdates() doesn't close the tracker + // after 2 queue updates, try to move the container to + // the fallback state, which should eventually work + // and cause the tracker to close. + updates := 0 + for upd := range tracker.updates { + updates++ + if upd.State == Locked || upd.State == Running { + // Tracker didn't clean up before + // returning -- or this is the first + // update and it contains stale + // information from before + // RunContainer() returned. + if updates < 2 { + // Avoid generating confusing + // logs / API calls in the + // stale-info case. + continue + } + d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState) + d.UpdateState(c.UUID, fallbackState) + } + } }() return tracker } func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool { + var countList arvados.ContainerList params := arvadosclient.Dict{ "filters": filters, + "count": "exact", + "limit": 0, + "order": []string{"priority desc"}} + err := d.Arv.List("containers", params, &countList) + if err != nil { + d.Logger.Warnf("error getting count of containers: %q", err) + return false + } + itemsAvailable := countList.ItemsAvailable + params = arvadosclient.Dict{ + "filters": filters, + "count": "none", + "limit": d.BatchSize, "order": []string{"priority desc"}} offset := 0 for { @@ -194,7 +266,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r } d.checkListForUpdates(list.Items, todo) offset += len(list.Items) - if len(list.Items) == 0 || list.ItemsAvailable <= offset { + if len(list.Items) == 0 || itemsAvailable <= offset { return true } } @@ -215,12 +287,13 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID) } else if alreadyTracking { switch c.State { - case Queued: + case Queued, Cancelled, Complete: + d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State) tracker.close() + delete(d.trackers, c.UUID) case Locked, Running: + d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State) tracker.update(c) - case Cancelled, Complete: - tracker.close() } } else { switch c.State {