X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/df5c912a9eb5af7222e5446bc437ee97262542c8..b4f1461e394cc1fe0b7b5f9f56b96121cc46ff2e:/lib/dispatchcloud/test/queue.go diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go index fda04d52b3..8a8f686e16 100644 --- a/lib/dispatchcloud/test/queue.go +++ b/lib/dispatchcloud/test/queue.go @@ -9,8 +9,9 @@ import ( "sync" "time" - "git.curoverse.com/arvados.git/lib/dispatchcloud/container" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/lib/dispatchcloud/container" + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) // Queue is a test stub for container.Queue. The caller specifies the @@ -23,6 +24,8 @@ type Queue struct { // must not be nil. ChooseType func(*arvados.Container) (arvados.InstanceType, error) + Logger logrus.FieldLogger + entries map[string]container.QueueEnt updTime time.Time subscribers map[<-chan struct{}]chan struct{} @@ -95,6 +98,7 @@ func (q *Queue) Unsubscribe(ch <-chan struct{}) { delete(q.subscribers, ch) } +// caller must have lock. func (q *Queue) notify() { for _, ch := range q.subscribers { select { @@ -133,10 +137,15 @@ func (q *Queue) Update() error { if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) { continue } - it, _ := q.ChooseType(&ctr) - upd[ctr.UUID] = container.QueueEnt{ - Container: ctr, - InstanceType: it, + if ent, ok := upd[ctr.UUID]; ok { + ent.Container = ctr + upd[ctr.UUID] = ent + } else { + it, _ := q.ChooseType(&ctr) + upd[ctr.UUID] = container.QueueEnt{ + Container: ctr, + InstanceType: it, + } } } q.entries = upd @@ -152,14 +161,44 @@ func (q *Queue) Update() error { // // The resulting changes are not exposed through Get() or Entries() // until the next call to Update(). -func (q *Queue) Notify(upd arvados.Container) { +// +// Return value is true unless the update is rejected (invalid state +// transition). +func (q *Queue) Notify(upd arvados.Container) bool { q.mtx.Lock() defer q.mtx.Unlock() for i, ctr := range q.Containers { if ctr.UUID == upd.UUID { - q.Containers[i] = upd - return + if allowContainerUpdate[ctr.State][upd.State] { + q.Containers[i] = upd + return true + } else { + if q.Logger != nil { + q.Logger.WithField("ContainerUUID", ctr.UUID).Infof("test.Queue rejected update from %s to %s", ctr.State, upd.State) + } + return false + } } } q.Containers = append(q.Containers, upd) + return true +} + +var allowContainerUpdate = map[arvados.ContainerState]map[arvados.ContainerState]bool{ + arvados.ContainerStateQueued: { + arvados.ContainerStateQueued: true, + arvados.ContainerStateLocked: true, + arvados.ContainerStateCancelled: true, + }, + arvados.ContainerStateLocked: { + arvados.ContainerStateQueued: true, + arvados.ContainerStateLocked: true, + arvados.ContainerStateRunning: true, + arvados.ContainerStateCancelled: true, + }, + arvados.ContainerStateRunning: { + arvados.ContainerStateRunning: true, + arvados.ContainerStateCancelled: true, + arvados.ContainerStateComplete: true, + }, }