Merge branch '17995-filter-by-comparing-attrs'
[arvados.git] / lib / dispatchcloud / container / queue.go
index af17aaf3927ce9f3b8b94a03ca289201c11640d2..938ef915f251e4d27e1ea4f714b82f10425d4224 100644 (file)
@@ -10,7 +10,7 @@ import (
        "sync"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
@@ -26,10 +26,12 @@ type APIClient interface {
 // A QueueEnt is an entry in the queue, consisting of a container
 // record and the instance type that should be used to run it.
 type QueueEnt struct {
-       // The container to run. Only the UUID, State, Priority, and
-       // RuntimeConstraints fields are populated.
+       // The container to run. Only the UUID, State, Priority,
+       // RuntimeConstraints, Mounts, and ContainerImage fields are
+       // populated.
        Container    arvados.Container    `json:"container"`
        InstanceType arvados.InstanceType `json:"instance_type"`
+       FirstSeenAt  time.Time            `json:"first_seen_at"`
 }
 
 // String implements fmt.Stringer by returning the queued container's
@@ -133,7 +135,7 @@ func (cq *Queue) Forget(uuid string) {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        ctr := cq.current[uuid].Container
-       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled || (ctr.State == arvados.ContainerStateQueued && ctr.Priority == 0) {
                cq.delEnt(uuid, ctr.State)
        }
 }
@@ -144,11 +146,11 @@ func (cq *Queue) Forget(uuid string) {
 func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
-       if ctr, ok := cq.current[uuid]; !ok {
+       ctr, ok := cq.current[uuid]
+       if !ok {
                return arvados.Container{}, false
-       } else {
-               return ctr.Container, true
        }
+       return ctr.Container, true
 }
 
 // Entries returns all cache entries, keyed by container UUID.
@@ -228,6 +230,7 @@ func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
        delete(cq.current, uuid)
 }
 
+// Caller must have lock.
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
        it, err := cq.chooseType(&ctr)
        if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
@@ -240,19 +243,15 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                go func() {
                        if ctr.State == arvados.ContainerStateQueued {
                                // Can't set runtime error without
-                               // locking first. If Lock() is
-                               // successful, it will call addEnt()
-                               // again itself, and we'll fall
-                               // through to the
-                               // setRuntimeError/Cancel code below.
+                               // locking first.
                                err := cq.Lock(ctr.UUID)
                                if err != nil {
                                        logger.WithError(err).Warn("lock failed")
+                                       return
                                        // ...and try again on the
                                        // next Update, if the problem
                                        // still exists.
                                }
-                               return
                        }
                        var err error
                        defer func() {
@@ -287,7 +286,7 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                "Priority":      ctr.Priority,
                "InstanceType":  it.Name,
        }).Info("adding container to queue")
-       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it, FirstSeenAt: time.Now()}
 }
 
 // Lock acquires the dispatch lock for the given container.
@@ -314,15 +313,14 @@ func (cq *Queue) setRuntimeError(uuid, errorString string) error {
 
 // Cancel cancels the given container.
 func (cq *Queue) Cancel(uuid string) error {
-       err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+       var resp arvados.Container
+       err := cq.client.RequestAndDecode(&resp, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
                "container": {"state": arvados.ContainerStateCancelled},
        })
        if err != nil {
                return err
        }
-       cq.mtx.Lock()
-       defer cq.mtx.Unlock()
-       cq.notify()
+       cq.updateWithResp(uuid, resp)
        return nil
 }
 
@@ -332,20 +330,32 @@ func (cq *Queue) apiUpdate(uuid, action string) error {
        if err != nil {
                return err
        }
+       cq.updateWithResp(uuid, resp)
+       return nil
+}
 
+// Update the local queue with the response received from a
+// state-changing API request (lock/unlock/cancel).
+func (cq *Queue) updateWithResp(uuid string, resp arvados.Container) {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        if cq.dontupdate != nil {
                cq.dontupdate[uuid] = struct{}{}
        }
-       if ent, ok := cq.current[uuid]; !ok {
-               cq.addEnt(uuid, resp)
-       } else {
-               ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
-               cq.current[uuid] = ent
+       ent, ok := cq.current[uuid]
+       if !ok {
+               // Container is not in queue (e.g., it was not added
+               // because there is no suitable instance type, and
+               // we're just locking/updating it in order to set an
+               // error message). No need to add it, and we don't
+               // necessarily have enough information to add it here
+               // anyway because lock/unlock responses don't include
+               // runtime_constraints.
+               return
        }
+       ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+       cq.current[uuid] = ent
        cq.notify()
-       return nil
 }
 
 func (cq *Queue) poll() (map[string]*arvados.Container, error) {
@@ -374,7 +384,7 @@ func (cq *Queue) poll() (map[string]*arvados.Container, error) {
                        *next[upd.UUID] = upd
                }
        }
-       selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
+       selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters", "created_at"}
        limitParam := 1000
 
        mine, err := cq.fetchAll(arvados.ResourceListParams{