package container
import (
+ "errors"
"io"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
type QueueEnt struct {
// The container to run. Only the UUID, State, Priority, and
// RuntimeConstraints fields are populated.
- Container arvados.Container
- InstanceType arvados.InstanceType
+ Container arvados.Container `json:"container"`
+ InstanceType arvados.InstanceType `json:"instance_type"`
}
// String implements fmt.Stringer by returning the queued container's
// cache up to date.
type Queue struct {
logger logrus.FieldLogger
- reg *prometheus.Registry
chooseType typeChooser
client APIClient
// Arvados cluster's queue during Update, chooseType will be called to
// assign an appropriate arvados.InstanceType for the queue entry.
func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
- return &Queue{
+ cq := &Queue{
logger: logger,
- reg: reg,
chooseType: chooseType,
client: client,
current: map[string]QueueEnt{},
subscribers: map[<-chan struct{}]chan struct{}{},
}
+ if reg != nil {
+ go cq.runMetrics(reg)
+ }
+ return cq
}
// Subscribe returns a channel that becomes ready to receive when an
defer cq.mtx.Unlock()
ctr := cq.current[uuid].Container
if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
- delete(cq.current, uuid)
+ cq.delEnt(uuid, ctr.State)
}
}
cq.mtx.Lock()
defer cq.mtx.Unlock()
for uuid, ctr := range next {
- if _, keep := cq.dontupdate[uuid]; keep {
+ if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
+ // Don't clobber a local update that happened
+ // after we started polling.
continue
}
if cur, ok := cq.current[uuid]; !ok {
cq.current[uuid] = cur
}
}
- for uuid := range cq.current {
- if _, keep := cq.dontupdate[uuid]; keep {
- continue
- } else if _, keep = next[uuid]; keep {
+ for uuid, ent := range cq.current {
+ if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
+ // Don't expunge an entry that was
+ // added/updated locally after we started
+ // polling.
continue
- } else {
- delete(cq.current, uuid)
+ } else if _, stillpresent := next[uuid]; !stillpresent {
+ // Expunge an entry that no longer appears in
+ // the poll response (evidently it's
+ // cancelled, completed, deleted, or taken by
+ // a different dispatcher).
+ cq.delEnt(uuid, ent.Container.State)
}
}
cq.dontupdate = nil
return nil
}
+// Caller must have lock.
+func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": state,
+ }).Info("dropping container from queue")
+ delete(cq.current, uuid)
+}
+
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
- if err != nil {
- // FIXME: throttle warnings, cancel after timeout
- cq.logger.Warnf("cannot run %s", &ctr)
+ if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
+ // We assume here that any chooseType error is a hard
+ // error: it wouldn't help to try again, or to leave
+ // it for a different dispatcher process to attempt.
+ errorString := err.Error()
+ logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
+ logger.WithError(err).Warn("cancel container with no suitable instance type")
+ go func() {
+ if ctr.State == arvados.ContainerStateQueued {
+ // Can't set runtime error without
+ // locking first. If Lock() is
+ // successful, it will call addEnt()
+ // again itself, and we'll fall
+ // through to the
+ // setRuntimeError/Cancel code below.
+ err := cq.Lock(ctr.UUID)
+ if err != nil {
+ logger.WithError(err).Warn("lock failed")
+ // ...and try again on the
+ // next Update, if the problem
+ // still exists.
+ }
+ return
+ }
+ var err error
+ defer func() {
+ if err == nil {
+ return
+ }
+ // On failure, check current container
+ // state, and don't log the error if
+ // the failure came from losing a
+ // race.
+ var latest arvados.Container
+ cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
+ if latest.State == arvados.ContainerStateCancelled {
+ return
+ }
+ logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
+ }()
+ err = cq.setRuntimeError(ctr.UUID, errorString)
+ if err != nil {
+ return
+ }
+ err = cq.Cancel(ctr.UUID)
+ if err != nil {
+ return
+ }
+ }()
return
}
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "State": ctr.State,
+ "Priority": ctr.Priority,
+ "InstanceType": it.Name,
+ }).Info("adding container to queue")
cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
}
return cq.apiUpdate(uuid, "unlock")
}
+// setRuntimeError sets runtime_status["error"] to the given value.
+// Container should already have state==Locked or Running.
+func (cq *Queue) setRuntimeError(uuid, errorString string) error {
+ return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
+ "container": {
+ "runtime_status": {
+ "error": errorString,
+ },
+ },
+ })
+}
+
// Cancel cancels the given container.
func (cq *Queue) Cancel(uuid string) error {
- err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+ var resp arvados.Container
+ err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
"container": {"state": arvados.ContainerStateCancelled},
})
if err != nil {
return err
}
- cq.mtx.Lock()
- defer cq.mtx.Unlock()
- cq.notify()
+ cq.updateWithResp(uuid, resp)
return nil
}
if err != nil {
return err
}
+ cq.updateWithResp(uuid, resp)
+ return nil
+}
+// Update the local queue with the response received from a
+// state-changing API request (lock/unlock/cancel).
+func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
cq.mtx.Lock()
defer cq.mtx.Unlock()
if cq.dontupdate != nil {
cq.current[uuid] = ent
}
cq.notify()
- return nil
}
func (cq *Queue) poll() (map[string]*arvados.Container, error) {
}
apply(avail)
- var missing []string
+ missing := map[string]bool{}
cq.mtx.Lock()
for uuid, ent := range cq.current {
if next[uuid] == nil &&
ent.Container.State != arvados.ContainerStateCancelled &&
ent.Container.State != arvados.ContainerStateComplete {
- missing = append(missing, uuid)
+ missing[uuid] = true
}
}
cq.mtx.Unlock()
- for i, page := 0, 20; i < len(missing); i += page {
- batch := missing[i:]
- if len(batch) > page {
- batch = batch[:page]
+ for len(missing) > 0 {
+ var batch []string
+ for uuid := range missing {
+ batch = append(batch, uuid)
+ if len(batch) == 20 {
+ break
+ }
}
+ filters := []arvados.Filter{{"uuid", "in", batch}}
ended, err := cq.fetchAll(arvados.ResourceListParams{
Select: selectParam,
Order: "uuid",
Count: "none",
- Filters: []arvados.Filter{{"uuid", "in", batch}},
+ Filters: filters,
})
if err != nil {
return nil, err
}
apply(ended)
+ if len(ended) == 0 {
+ // This is the only case where we can conclude
+ // a container has been deleted from the
+ // database. A short (but non-zero) page, on
+ // the other hand, can be caused by a response
+ // size limit.
+ for _, uuid := range batch {
+ cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
+ delete(missing, uuid)
+ cq.mtx.Lock()
+ cq.delEnt(uuid, cq.current[uuid].Container.State)
+ cq.mtx.Unlock()
+ }
+ continue
+ }
+ for _, ctr := range ended {
+ if _, ok := missing[ctr.UUID]; !ok {
+ msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "Filters": filters,
+ }).Error(msg)
+ return nil, errors.New(msg)
+ }
+ delete(missing, ctr.UUID)
+ }
}
return next, nil
}
}
return results, nil
}
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+ mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "queue_entries",
+ Help: "Number of active container entries in the controller database.",
+ }, []string{"state", "instance_type"})
+ reg.MustRegister(mEntries)
+
+ type entKey struct {
+ state arvados.ContainerState
+ inst string
+ }
+ count := map[entKey]int{}
+
+ ch := cq.Subscribe()
+ defer cq.Unsubscribe(ch)
+ for range ch {
+ for k := range count {
+ count[k] = 0
+ }
+ ents, _ := cq.Entries()
+ for _, ent := range ents {
+ count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+ }
+ for k, v := range count {
+ mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+ }
+ }
+}