// When no network update is in progress, this protection is
// not needed, and dontupdate is nil.
dontupdate map[string]struct{}
+
+ // active notification subscribers (see Subscribe)
+ subscribers map[<-chan struct{}]chan struct{}
}
// NewQueue returns a new Queue. When a new container appears in the
// assign an appropriate arvados.InstanceType for the queue entry.
func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
return &Queue{
- logger: logger,
- reg: reg,
- chooseType: chooseType,
- client: client,
- current: map[string]QueueEnt{},
+ logger: logger,
+ reg: reg,
+ chooseType: chooseType,
+ client: client,
+ current: map[string]QueueEnt{},
+ subscribers: map[<-chan struct{}]chan struct{}{},
+ }
+}
+
+// Subscribe returns a channel that becomes ready to receive when an
+// entry in the Queue is updated.
+//
+// ch := q.Subscribe()
+// defer q.Unsubscribe(ch)
+// for range ch {
+// // ...
+// }
+func (cq *Queue) Subscribe() <-chan struct{} {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ ch := make(chan struct{}, 1)
+ cq.subscribers[ch] = ch
+ return ch
+}
+
+// Unsubscribe stops sending updates to the given channel. See
+// Subscribe.
+func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ delete(cq.subscribers, ch)
+}
+
+// Caller must have lock.
+func (cq *Queue) notify() {
+ for _, ch := range cq.subscribers {
+ select {
+ case ch <- struct{}{}:
+ default:
+ }
}
}
}
cq.dontupdate = nil
cq.updated = updateStarted
+ cq.notify()
return nil
}
// Cancel cancels the given container.
func (cq *Queue) Cancel(uuid string) error {
- return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+ err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
"container": {"state": arvados.ContainerStateCancelled},
})
+ if err != nil {
+ return err
+ }
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ cq.notify()
+ return nil
}
func (cq *Queue) apiUpdate(uuid, action string) error {
ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
cq.current[uuid] = ent
}
+ cq.notify()
return nil
}