14325: Cancel containers with unsatisfiable runtime constraints.
[arvados.git] / lib / dispatchcloud / container / queue.go
index 17a38259d31cf7579f479aa460b1461992465615..7a41d47c395c57b7224b5063ae715776c2e14e8c 100644 (file)
@@ -10,8 +10,8 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/Sirupsen/logrus"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
 )
 
 type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
@@ -56,11 +56,22 @@ type Queue struct {
        chooseType typeChooser
        client     APIClient
 
-       auth      *arvados.APIClientAuthorization
-       current   map[string]QueueEnt
-       updated   time.Time
-       mtx       sync.Mutex
-       keeplocal map[string]struct{}
+       auth    *arvados.APIClientAuthorization
+       current map[string]QueueEnt
+       updated time.Time
+       mtx     sync.Mutex
+
+       // Methods that modify the Queue (like Lock) add the affected
+       // container UUIDs to dontupdate. When applying a batch of
+       // updates received from the network, anything appearing in
+       // dontupdate is skipped, in case the received update has
+       // already been superseded by the locally initiated change.
+       // When no network update is in progress, this protection is
+       // not needed, and dontupdate is nil.
+       dontupdate map[string]struct{}
+
+       // active notification subscribers (see Subscribe)
+       subscribers map[<-chan struct{}]chan struct{}
 }
 
 // NewQueue returns a new Queue. When a new container appears in the
@@ -68,11 +79,46 @@ type Queue struct {
 // assign an appropriate arvados.InstanceType for the queue entry.
 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
        return &Queue{
-               logger:     logger,
-               reg:        reg,
-               chooseType: chooseType,
-               client:     client,
-               current:    map[string]QueueEnt{},
+               logger:      logger,
+               reg:         reg,
+               chooseType:  chooseType,
+               client:      client,
+               current:     map[string]QueueEnt{},
+               subscribers: map[<-chan struct{}]chan struct{}{},
+       }
+}
+
+// Subscribe returns a channel that becomes ready to receive when an
+// entry in the Queue is updated.
+//
+//     ch := q.Subscribe()
+//     defer q.Unsubscribe(ch)
+//     for range ch {
+//             // ...
+//     }
+func (cq *Queue) Subscribe() <-chan struct{} {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       ch := make(chan struct{}, 1)
+       cq.subscribers[ch] = ch
+       return ch
+}
+
+// Unsubscribe stops sending updates to the given channel. See
+// Subscribe.
+func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       delete(cq.subscribers, ch)
+}
+
+// Caller must have lock.
+func (cq *Queue) notify() {
+       for _, ch := range cq.subscribers {
+               select {
+               case ch <- struct{}{}:
+               default:
+               }
        }
 }
 
@@ -126,7 +172,7 @@ func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
 // containers.
 func (cq *Queue) Update() error {
        cq.mtx.Lock()
-       cq.keeplocal = map[string]struct{}{}
+       cq.dontupdate = map[string]struct{}{}
        updateStarted := time.Now()
        cq.mtx.Unlock()
 
@@ -138,7 +184,7 @@ func (cq *Queue) Update() error {
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
        for uuid, ctr := range next {
-               if _, keep := cq.keeplocal[uuid]; keep {
+               if _, keep := cq.dontupdate[uuid]; keep {
                        continue
                }
                if cur, ok := cq.current[uuid]; !ok {
@@ -149,7 +195,7 @@ func (cq *Queue) Update() error {
                }
        }
        for uuid := range cq.current {
-               if _, keep := cq.keeplocal[uuid]; keep {
+               if _, keep := cq.dontupdate[uuid]; keep {
                        continue
                } else if _, keep = next[uuid]; keep {
                        continue
@@ -157,16 +203,49 @@ func (cq *Queue) Update() error {
                        delete(cq.current, uuid)
                }
        }
-       cq.keeplocal = nil
+       cq.dontupdate = nil
        cq.updated = updateStarted
+       cq.notify()
        return nil
 }
 
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
        it, err := cq.chooseType(&ctr)
-       if err != nil {
-               // FIXME: throttle warnings, cancel after timeout
-               cq.logger.Warnf("cannot run %s", &ctr)
+       if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
+               errorString := err.Error()
+               cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
+               go func() {
+                       var err error
+                       defer func() {
+                               if err == nil {
+                                       return
+                               }
+                               // On failure, check current container
+                               // state, and don't log the error if
+                               // the failure came from losing a
+                               // race.
+                               var latest arvados.Container
+                               cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
+                               if latest.State == arvados.ContainerStateCancelled {
+                                       return
+                               }
+                               cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container")
+                       }()
+                       if ctr.State == arvados.ContainerStateQueued {
+                               err = cq.Lock(ctr.UUID)
+                               if err != nil {
+                                       return
+                               }
+                       }
+                       err = cq.setRuntimeError(ctr.UUID, errorString)
+                       if err != nil {
+                               return
+                       }
+                       err = cq.Cancel(ctr.UUID)
+                       if err != nil {
+                               return
+                       }
+               }()
                return
        }
        cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
@@ -182,11 +261,30 @@ func (cq *Queue) Unlock(uuid string) error {
        return cq.apiUpdate(uuid, "unlock")
 }
 
+// setRuntimeError sets runtime_status["error"] to the given value.
+// Container should already have state==Locked or Running.
+func (cq *Queue) setRuntimeError(uuid, errorString string) error {
+       return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
+               "container": {
+                       "runtime_status": {
+                               "error": errorString,
+                       },
+               },
+       })
+}
+
 // Cancel cancels the given container.
 func (cq *Queue) Cancel(uuid string) error {
-       return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+       err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
                "container": {"state": arvados.ContainerStateCancelled},
        })
+       if err != nil {
+               return err
+       }
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       cq.notify()
+       return nil
 }
 
 func (cq *Queue) apiUpdate(uuid, action string) error {
@@ -198,8 +296,8 @@ func (cq *Queue) apiUpdate(uuid, action string) error {
 
        cq.mtx.Lock()
        defer cq.mtx.Unlock()
-       if cq.keeplocal != nil {
-               cq.keeplocal[uuid] = struct{}{}
+       if cq.dontupdate != nil {
+               cq.dontupdate[uuid] = struct{}{}
        }
        if ent, ok := cq.current[uuid]; !ok {
                cq.addEnt(uuid, resp)
@@ -207,6 +305,7 @@ func (cq *Queue) apiUpdate(uuid, action string) error {
                ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
                cq.current[uuid] = ent
        }
+       cq.notify()
        return nil
 }