package dispatch
import (
+ "bytes"
"context"
"fmt"
"sync"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "github.com/Sirupsen/logrus"
+ "git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "github.com/sirupsen/logrus"
)
const (
Logger Logger
// Batch size for container queries
- BatchSize int64
+ BatchSize int
// Queue polling frequency
PollPeriod time.Duration
// running, and return.
//
// The DispatchFunc should not return until the container is finished.
-type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container) error
// Run watches the API server's queue for containers that are either
// ready to run and available to lock, or are already locked by this
case <-poll.C:
break
case <-ctx.Done():
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ for _, tracker := range d.trackers {
+ tracker.close()
+ }
return ctx.Err()
}
}
tracker.updates <- c
go func() {
- d.RunContainer(d, c, tracker.updates)
- // RunContainer blocks for the lifetime of the container. When
- // it returns, the tracker should delete itself.
- d.mtx.Lock()
- delete(d.trackers, c.UUID)
- d.mtx.Unlock()
+ fallbackState := Queued
+ err := d.RunContainer(d, c, tracker.updates)
+ if err != nil {
+ text := fmt.Sprintf("Error running container %s: %s", c.UUID, err)
+ if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+ fallbackState = Cancelled
+ var logBuf bytes.Buffer
+ fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", c.UUID, err)
+ if len(err.AvailableTypes) == 0 {
+ fmt.Fprint(&logBuf, "No instance types are configured.\n")
+ } else {
+ fmt.Fprint(&logBuf, "Available instance types:\n")
+ for _, t := range err.AvailableTypes {
+ fmt.Fprintf(&logBuf,
+ "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+ t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price)
+ }
+ }
+ text = logBuf.String()
+ }
+ d.Logger.Printf("%s", text)
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": c.UUID,
+ "event_type": "dispatch",
+ "properties": map[string]string{"text": text}}}
+ d.Arv.Create("logs", lr, nil)
+ }
+ // If checkListForUpdates() doesn't close the tracker
+ // after 2 queue updates, try to move the container to
+ // the fallback state, which should eventually work
+ // and cause the tracker to close.
+ updates := 0
+ for upd := range tracker.updates {
+ updates++
+ if upd.State == Locked || upd.State == Running {
+ // Tracker didn't clean up before
+ // returning -- or this is the first
+ // update and it contains stale
+ // information from before
+ // RunContainer() returned.
+ if updates < 2 {
+ // Avoid generating confusing
+ // logs / API calls in the
+ // stale-info case.
+ continue
+ }
+ d.Logger.Printf("container %s state is still %s, changing to %s", c.UUID, upd.State, fallbackState)
+ d.UpdateState(c.UUID, fallbackState)
+ }
+ }
}()
return tracker
}
d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
switch c.State {
- case Queued:
+ case Queued, Cancelled, Complete:
+ d.Logger.Debugf("update has %s in state %s, closing tracker", c.UUID, c.State)
tracker.close()
+ delete(d.trackers, c.UUID)
case Locked, Running:
+ d.Logger.Debugf("update has %s in state %s, updating tracker", c.UUID, c.State)
tracker.update(c)
- case Cancelled, Complete:
- tracker.close()
}
} else {
switch c.State {