import (
"fmt"
+ "sync"
"time"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
)
// Queue is a test stub for container.Queue. The caller specifies the
// must not be nil.
ChooseType func(*arvados.Container) (arvados.InstanceType, error)
- entries map[string]container.QueueEnt
- updTime time.Time
+ Logger logrus.FieldLogger
+
+ entries map[string]container.QueueEnt
+ updTime time.Time
+ subscribers map[<-chan struct{}]chan struct{}
+
+ mtx sync.Mutex
}
// Entries returns the containers that were queued when Update was
// last called.
func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
updTime := q.updTime
r := map[string]container.QueueEnt{}
for uuid, ent := range q.entries {
// the state has been changed (via Lock, Unlock, or Cancel) since the
// last Update, the updated state is returned.
func (q *Queue) Get(uuid string) (arvados.Container, bool) {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
ent, ok := q.entries[uuid]
return ent.Container, ok
}
func (q *Queue) Forget(uuid string) {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
delete(q.entries, uuid)
}
func (q *Queue) Lock(uuid string) error {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
}
func (q *Queue) Unlock(uuid string) error {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
}
func (q *Queue) Cancel(uuid string) error {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
}
+func (q *Queue) Subscribe() <-chan struct{} {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
+ if q.subscribers == nil {
+ q.subscribers = map[<-chan struct{}]chan struct{}{}
+ }
+ ch := make(chan struct{}, 1)
+ q.subscribers[ch] = ch
+ return ch
+}
+
+func (q *Queue) Unsubscribe(ch <-chan struct{}) {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
+ delete(q.subscribers, ch)
+}
+
+// caller must have lock.
+func (q *Queue) notify() {
+ for _, ch := range q.subscribers {
+ select {
+ case ch <- struct{}{}:
+ default:
+ }
+ }
+}
+
+// caller must have lock.
func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
ent := q.entries[uuid]
if ent.Container.State != from {
- return fmt.Errorf("lock failed: state=%q", ent.Container.State)
+ return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
}
ent.Container.State = to
q.entries[uuid] = ent
break
}
}
+ q.notify()
return nil
}
// Update rebuilds the current entries from the Containers slice.
func (q *Queue) Update() error {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
updTime := time.Now()
upd := map[string]container.QueueEnt{}
for _, ctr := range q.Containers {
if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
continue
}
- it, _ := q.ChooseType(&ctr)
- upd[ctr.UUID] = container.QueueEnt{
- Container: ctr,
- InstanceType: it,
+ if ent, ok := upd[ctr.UUID]; ok {
+ ent.Container = ctr
+ upd[ctr.UUID] = ent
+ } else {
+ it, _ := q.ChooseType(&ctr)
+ upd[ctr.UUID] = container.QueueEnt{
+ Container: ctr,
+ InstanceType: it,
+ }
}
}
q.entries = upd
q.updTime = updTime
+ q.notify()
return nil
}
//
// The resulting changes are not exposed through Get() or Entries()
// until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
+ q.mtx.Lock()
+ defer q.mtx.Unlock()
for i, ctr := range q.Containers {
if ctr.UUID == upd.UUID {
- q.Containers[i] = upd
- return
+ if allowContainerUpdate[ctr.State][upd.State] {
+ q.Containers[i] = upd
+ return true
+ }
+ if q.Logger != nil {
+ q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State)
+ }
+ return false
}
}
q.Containers = append(q.Containers, upd)
+ return true
+}
+
+var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{
+ arvados.ContainerStateQueued: {
+ arvados.ContainerStateQueued: true,
+ arvados.ContainerStateLocked: true,
+ arvados.ContainerStateCancelled: true,
+ },
+ arvados.ContainerStateLocked: {
+ arvados.ContainerStateQueued: true,
+ arvados.ContainerStateLocked: true,
+ arvados.ContainerStateRunning: true,
+ arvados.ContainerStateCancelled: true,
+ },
+ arvados.ContainerStateRunning: {
+ arvados.ContainerStateRunning: true,
+ arvados.ContainerStateCancelled: true,
+ arvados.ContainerStateComplete: true,
+ },
}