"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
chooseType typeChooser
client APIClient
- auth *arvados.APIClientAuthorization
- current map[string]QueueEnt
- updated time.Time
- mtx sync.Mutex
- keeplocal map[string]struct{}
+ auth *arvados.APIClientAuthorization
+ current map[string]QueueEnt
+ updated time.Time
+ mtx sync.Mutex
+
+ // Methods that modify the Queue (like Lock) add the affected
+ // container UUIDs to dontupdate. When applying a batch of
+ // updates received from the network, anything appearing in
+ // dontupdate is skipped, in case the received update has
+ // already been superseded by the locally initiated change.
+ // When no network update is in progress, this protection is
+ // not needed, and dontupdate is nil.
+ dontupdate map[string]struct{}
+
+ // active notification subscribers (see Subscribe)
+ subscribers map[<-chan struct{}]chan struct{}
}
// NewQueue returns a new Queue. When a new container appears in the
// assign an appropriate arvados.InstanceType for the queue entry.
func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
return &Queue{
- logger: logger,
- reg: reg,
- chooseType: chooseType,
- client: client,
- current: map[string]QueueEnt{},
+ logger: logger,
+ reg: reg,
+ chooseType: chooseType,
+ client: client,
+ current: map[string]QueueEnt{},
+ subscribers: map[<-chan struct{}]chan struct{}{},
+ }
+}
+
+// Subscribe returns a channel that becomes ready to receive when an
+// entry in the Queue is updated.
+//
+// ch := q.Subscribe()
+// defer q.Unsubscribe(ch)
+// for range ch {
+// // ...
+// }
+func (cq *Queue) Subscribe() <-chan struct{} {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ ch := make(chan struct{}, 1)
+ cq.subscribers[ch] = ch
+ return ch
+}
+
+// Unsubscribe stops sending updates to the given channel. See
+// Subscribe.
+func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ delete(cq.subscribers, ch)
+}
+
+// Caller must have lock.
+func (cq *Queue) notify() {
+ for _, ch := range cq.subscribers {
+ select {
+ case ch <- struct{}{}:
+ default:
+ }
}
}
// containers.
func (cq *Queue) Update() error {
cq.mtx.Lock()
- cq.keeplocal = map[string]struct{}{}
+ cq.dontupdate = map[string]struct{}{}
updateStarted := time.Now()
cq.mtx.Unlock()
cq.mtx.Lock()
defer cq.mtx.Unlock()
for uuid, ctr := range next {
- if _, keep := cq.keeplocal[uuid]; keep {
+ if _, keep := cq.dontupdate[uuid]; keep {
continue
}
if cur, ok := cq.current[uuid]; !ok {
}
}
for uuid := range cq.current {
- if _, keep := cq.keeplocal[uuid]; keep {
+ if _, keep := cq.dontupdate[uuid]; keep {
continue
} else if _, keep = next[uuid]; keep {
continue
delete(cq.current, uuid)
}
}
- cq.keeplocal = nil
+ cq.dontupdate = nil
cq.updated = updateStarted
+ cq.notify()
return nil
}
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
- if err != nil {
- // FIXME: throttle warnings, cancel after timeout
- cq.logger.Warnf("cannot run %s", &ctr)
+ if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
+ errorString := err.Error()
+ cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
+ go func() {
+ var err error
+ defer func() {
+ if err == nil {
+ return
+ }
+ // On failure, check current container
+ // state, and don't log the error if
+ // the failure came from losing a
+ // race.
+ var latest arvados.Container
+ cq.client.RequestAndDecode(&latest, "GET", "arvados/v1/containers/"+ctr.UUID, nil, map[string][]string{"select": {"state"}})
+ if latest.State == arvados.ContainerStateCancelled {
+ return
+ }
+ cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container")
+ }()
+ if ctr.State == arvados.ContainerStateQueued {
+ err = cq.Lock(ctr.UUID)
+ if err != nil {
+ return
+ }
+ }
+ err = cq.setRuntimeError(ctr.UUID, errorString)
+ if err != nil {
+ return
+ }
+ err = cq.Cancel(ctr.UUID)
+ if err != nil {
+ return
+ }
+ }()
return
}
cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
return cq.apiUpdate(uuid, "unlock")
}
+// setRuntimeError sets runtime_status["error"] to the given value.
+// Container should already have state==Locked or Running.
+func (cq *Queue) setRuntimeError(uuid, errorString string) error {
+ return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]map[string]interface{}{
+ "container": {
+ "runtime_status": {
+ "error": errorString,
+ },
+ },
+ })
+}
+
// Cancel cancels the given container.
func (cq *Queue) Cancel(uuid string) error {
- return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+ err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
"container": {"state": arvados.ContainerStateCancelled},
})
+ if err != nil {
+ return err
+ }
+ cq.mtx.Lock()
+ defer cq.mtx.Unlock()
+ cq.notify()
+ return nil
}
func (cq *Queue) apiUpdate(uuid, action string) error {
cq.mtx.Lock()
defer cq.mtx.Unlock()
- if cq.keeplocal != nil {
- cq.keeplocal[uuid] = struct{}{}
+ if cq.dontupdate != nil {
+ cq.dontupdate[uuid] = struct{}{}
}
if ent, ok := cq.current[uuid]; !ok {
cq.addEnt(uuid, resp)
ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
cq.current[uuid] = ent
}
+ cq.notify()
return nil
}