15924: Change import paths to git.arvados.org.
[arvados.git] / lib / dispatchcloud / container / queue.go
index 297782c35b9a972668fd7623d251e3411b26389b..f684105c1231c14cd284d2a57f4e5f919b562aae 100644 (file)
@@ -5,11 +5,12 @@
 package container
 
 import (
+       "errors"
        "io"
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -52,7 +53,6 @@ func (c *QueueEnt) String() string {
 // cache up to date.
 type Queue struct {
        logger     logrus.FieldLogger
-       reg        *prometheus.Registry
        chooseType typeChooser
        client     APIClient
 
@@ -78,14 +78,17 @@ type Queue struct {
 // Arvados cluster's queue during Update, chooseType will be called to
 // assign an appropriate arvados.InstanceType for the queue entry.
 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
-       return &Queue{
+       cq := &Queue{
                logger:      logger,
-               reg:         reg,
                chooseType:  chooseType,
                client:      client,
                current:     map[string]QueueEnt{},
                subscribers: map[<-chan struct{}]chan struct{}{},
        }
+       if reg != nil {
+               go cq.runMetrics(reg)
+       }
+       return cq
 }
 
 // Subscribe returns a channel that becomes ready to receive when an
@@ -130,8 +133,8 @@ func (cq *Queue) Forget(uuid string) {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        ctr := cq.current[uuid].Container
-       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
-               delete(cq.current, uuid)
+       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
+               cq.delEnt(uuid, ctr.State)
        }
 }
 
@@ -196,7 +199,7 @@ func (cq *Queue) Update() error {
                        cq.current[uuid] = cur
                }
        }
-       for uuid := range cq.current {
+       for uuid, ent := range cq.current {
                if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
                        // Don't expunge an entry that was
                        // added/updated locally after we started
@@ -207,7 +210,7 @@ func (cq *Queue) Update() error {
                        // the poll response (evidently it's
                        // cancelled, completed, deleted, or taken by
                        // a different dispatcher).
-                       delete(cq.current, uuid)
+                       cq.delEnt(uuid, ent.Container.State)
                }
        }
        cq.dontupdate = nil
@@ -216,6 +219,15 @@ func (cq *Queue) Update() error {
        return nil
 }
 
+// Caller must have lock.
+func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
+       cq.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "State":         state,
+       }).Info("dropping container from queue")
+       delete(cq.current, uuid)
+}
+
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
        it, err := cq.chooseType(&ctr)
        if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
@@ -269,6 +281,12 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                }()
                return
        }
+       cq.logger.WithFields(logrus.Fields{
+               "ContainerUUID": ctr.UUID,
+               "State":         ctr.State,
+               "Priority":      ctr.Priority,
+               "InstanceType":  it.Name,
+       }).Info("adding container to queue")
        cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
 }
 
@@ -296,15 +314,14 @@ func (cq *Queue) setRuntimeError(uuid, errorString string) error {
 
 // Cancel cancels the given container.
 func (cq *Queue) Cancel(uuid string) error {
-       err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+       var resp arvados.Container
+       err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
                "container": {"state": arvados.ContainerStateCancelled},
        })
        if err != nil {
                return err
        }
-       cq.mtx.Lock()
-       defer cq.mtx.Unlock()
-       cq.notify()
+       cq.updateWithResp(uuid, resp)
        return nil
 }
 
@@ -314,7 +331,13 @@ func (cq *Queue) apiUpdate(uuid, action string) error {
        if err != nil {
                return err
        }
+       cq.updateWithResp(uuid, resp)
+       return nil
+}
 
+// Update the local queue with the response received from a
+// state-changing API request (lock/unlock/cancel).
+func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        if cq.dontupdate != nil {
@@ -327,7 +350,6 @@ func (cq *Queue) apiUpdate(uuid, action string) error {
                cq.current[uuid] = ent
        }
        cq.notify()
-       return nil
 }
 
 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
@@ -383,32 +405,62 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
        }
        apply(avail)
 
-       var missing []string
+       missing := map[string]bool{}
        cq.mtx.Lock()
        for uuid, ent := range cq.current {
                if next[uuid] == nil &&
                        ent.Container.State != arvados.ContainerStateCancelled &&
                        ent.Container.State != arvados.ContainerStateComplete {
-                       missing = append(missing, uuid)
+                       missing[uuid] = true
                }
        }
        cq.mtx.Unlock()
 
-       for i, page := 0, 20; i < len(missing); i += page {
-               batch := missing[i:]
-               if len(batch) > page {
-                       batch = batch[:page]
+       for len(missing) > 0 {
+               var batch []string
+               for uuid := range missing {
+                       batch = append(batch, uuid)
+                       if len(batch) == 20 {
+                               break
+                       }
                }
+               filters := []arvados.Filter{{"uuid", "in", batch}}
                ended, err := cq.fetchAll(arvados.ResourceListParams{
                        Select:  selectParam,
                        Order:   "uuid",
                        Count:   "none",
-                       Filters: []arvados.Filter{{"uuid", "in", batch}},
+                       Filters: filters,
                })
                if err != nil {
                        return nil, err
                }
                apply(ended)
+               if len(ended) == 0 {
+                       // This is the only case where we can conclude
+                       // a container has been deleted from the
+                       // database. A short (but non-zero) page, on
+                       // the other hand, can be caused by a response
+                       // size limit.
+                       for _, uuid := range batch {
+                               cq.logger.WithField("ContainerUUID", uuid).Warn("container not found by controller (deleted?)")
+                               delete(missing, uuid)
+                               cq.mtx.Lock()
+                               cq.delEnt(uuid, cq.current[uuid].Container.State)
+                               cq.mtx.Unlock()
+                       }
+                       continue
+               }
+               for _, ctr := range ended {
+                       if _, ok := missing[ctr.UUID]; !ok {
+                               msg := "BUG? server response did not match requested filters, erroring out rather than risk deadlock"
+                               cq.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": ctr.UUID,
+                                       "Filters":       filters,
+                               }).Error(msg)
+                               return nil, errors.New(msg)
+                       }
+                       delete(missing, ctr.UUID)
+               }
        }
        return next, nil
 }
@@ -441,3 +493,34 @@ func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.C
        }
        return results, nil
 }
+
+func (cq *Queue) runMetrics(reg *prometheus.Registry) {
+       mEntries := prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "queue_entries",
+               Help:      "Number of active container entries in the controller database.",
+       }, []string{"state", "instance_type"})
+       reg.MustRegister(mEntries)
+
+       type entKey struct {
+               state arvados.ContainerState
+               inst  string
+       }
+       count := map[entKey]int{}
+
+       ch := cq.Subscribe()
+       defer cq.Unsubscribe(ch)
+       for range ch {
+               for k := range count {
+                       count[k] = 0
+               }
+               ents, _ := cq.Entries()
+               for _, ent := range ents {
+                       count[entKey{ent.Container.State, ent.InstanceType.Name}]++
+               }
+               for k, v := range count {
+                       mEntries.WithLabelValues(string(k.state), k.inst).Set(float64(v))
+               }
+       }
+}