14360: Encapsulate scheduler object.
[arvados.git] / lib / dispatchcloud / container / queue.go
index e0e1cd0cd1ddc7d1d9b1f7891d4ea2ab2c42e280..432f4d4884a27f1d9e3ba6df15c0c3a1e4cd954d 100644 (file)
@@ -69,6 +69,9 @@ type Queue struct {
        // When no network update is in progress, this protection is
        // not needed, and dontupdate is nil.
        dontupdate map[string]struct{}
+
+       // active notification subscribers (see Subscribe)
+       subscribers map[<-chan struct{}]chan struct{}
 }
 
 // NewQueue returns a new Queue. When a new container appears in the
@@ -76,11 +79,46 @@ type Queue struct {
 // assign an appropriate arvados.InstanceType for the queue entry.
 func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
        return &Queue{
-               logger:     logger,
-               reg:        reg,
-               chooseType: chooseType,
-               client:     client,
-               current:    map[string]QueueEnt{},
+               logger:      logger,
+               reg:         reg,
+               chooseType:  chooseType,
+               client:      client,
+               current:     map[string]QueueEnt{},
+               subscribers: map[<-chan struct{}]chan struct{}{},
+       }
+}
+
+// Subscribe returns a channel that becomes ready to receive when an
+// entry in the Queue is updated.
+//
+//     ch := q.Subscribe()
+//     defer q.Unsubscribe(ch)
+//     for range ch {
+//             // ...
+//     }
+func (cq *Queue) Subscribe() <-chan struct{} {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       ch := make(chan struct{}, 1)
+       cq.subscribers[ch] = ch
+       return ch
+}
+
+// Unsubscribe stops sending updates to the given channel. See
+// Subscribe.
+func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       delete(cq.subscribers, ch)
+}
+
+// Caller must have lock.
+func (cq *Queue) notify() {
+       for _, ch := range cq.subscribers {
+               select {
+               case ch <- struct{}{}:
+               default:
+               }
        }
 }
 
@@ -167,6 +205,7 @@ func (cq *Queue) Update() error {
        }
        cq.dontupdate = nil
        cq.updated = updateStarted
+       cq.notify()
        return nil
 }
 
@@ -192,9 +231,16 @@ func (cq *Queue) Unlock(uuid string) error {
 
 // Cancel cancels the given container.
 func (cq *Queue) Cancel(uuid string) error {
-       return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+       err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
                "container": {"state": arvados.ContainerStateCancelled},
        })
+       if err != nil {
+               return err
+       }
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       cq.notify()
+       return nil
 }
 
 func (cq *Queue) apiUpdate(uuid, action string) error {
@@ -215,6 +261,7 @@ func (cq *Queue) apiUpdate(uuid, action string) error {
                ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
                cq.current[uuid] = ent
        }
+       cq.notify()
        return nil
 }