X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/17479bd75a29c52470abe0049cb447e114eb39e9..bd8bdd90055d61263eff5bdb9a953c57319aa83d:/lib/dispatchcloud/test/queue.go diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go index 0bad5a391e..74b84122f2 100644 --- a/lib/dispatchcloud/test/queue.go +++ b/lib/dispatchcloud/test/queue.go @@ -9,8 +9,9 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/lib/dispatchcloud/container" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/lib/dispatchcloud/container" + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) // Queue is a test stub for container.Queue. The caller specifies the @@ -23,6 +24,8 @@ type Queue struct { // must not be nil. ChooseType func(*arvados.Container) (arvados.InstanceType, error) + Logger logrus.FieldLogger + entries map[string]container.QueueEnt updTime time.Time subscribers map[<-chan struct{}]chan struct{} @@ -61,14 +64,20 @@ func (q *Queue) Forget(uuid string) { } func (q *Queue) Lock(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked) } func (q *Queue) Unlock(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued) } func (q *Queue) Cancel(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled) } @@ -89,6 +98,7 @@ func (q *Queue) Unsubscribe(ch <-chan struct{}) { delete(q.subscribers, ch) } +// caller must have lock. func (q *Queue) notify() { for _, ch := range q.subscribers { select { @@ -98,12 +108,11 @@ func (q *Queue) notify() { } } +// caller must have lock. func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error { - q.mtx.Lock() - defer q.mtx.Unlock() ent := q.entries[uuid] if ent.Container.State != from { - return fmt.Errorf("lock failed: state=%q", ent.Container.State) + return fmt.Errorf("changeState failed: state=%q", ent.Container.State) } ent.Container.State = to q.entries[uuid] = ent @@ -128,10 +137,15 @@ func (q *Queue) Update() error { if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) { continue } - it, _ := q.ChooseType(&ctr) - upd[ctr.UUID] = container.QueueEnt{ - Container: ctr, - InstanceType: it, + if ent, ok := upd[ctr.UUID]; ok { + ent.Container = ctr + upd[ctr.UUID] = ent + } else { + it, _ := q.ChooseType(&ctr) + upd[ctr.UUID] = container.QueueEnt{ + Container: ctr, + InstanceType: it, + } } } q.entries = upd @@ -147,14 +161,44 @@ func (q *Queue) Update() error { // // The resulting changes are not exposed through Get() or Entries() // until the next call to Update(). -func (q *Queue) Notify(upd arvados.Container) { +// +// Return value is true unless the update is rejected (invalid state +// transition). +func (q *Queue) Notify(upd arvados.Container) bool { q.mtx.Lock() defer q.mtx.Unlock() for i, ctr := range q.Containers { if ctr.UUID == upd.UUID { - q.Containers[i] = upd - return + if allowContainerUpdate[ctr.State][upd.State] { + q.Containers[i] = upd + return true + } else { + if q.Logger != nil { + q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State) + } + return false + } } } q.Containers = append(q.Containers, upd) + return true +} + +var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{ + arvados.ContainerStateQueued: map[arvados.ContainerState]bool{ + arvados.ContainerStateQueued: true, + arvados.ContainerStateLocked: true, + arvados.ContainerStateCancelled: true, + }, + arvados.ContainerStateLocked: map[arvados.ContainerState]bool{ + arvados.ContainerStateQueued: true, + arvados.ContainerStateLocked: true, + arvados.ContainerStateRunning: true, + arvados.ContainerStateCancelled: true, + }, + arvados.ContainerStateRunning: map[arvados.ContainerState]bool{ + arvados.ContainerStateRunning: true, + arvados.ContainerStateCancelled: true, + arvados.ContainerStateComplete: true, + }, }