X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c3c538444c15e68e96780f157935f2baa4ba0bc5..63842ff25274318349f511a507effe27f47d9fcc:/lib/dispatchcloud/test/queue.go diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go index 909f561144..11d410fb1b 100644 --- a/lib/dispatchcloud/test/queue.go +++ b/lib/dispatchcloud/test/queue.go @@ -6,10 +6,11 @@ package test import ( "fmt" + "sync" "time" - "git.curoverse.com/arvados.git/lib/dispatchcloud/container" - "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/lib/dispatchcloud/container" + "git.arvados.org/arvados.git/sdk/go/arvados" ) // Queue is a test stub for container.Queue. The caller specifies the @@ -22,13 +23,18 @@ type Queue struct { // must not be nil. ChooseType func(*arvados.Container) (arvados.InstanceType, error) - entries map[string]container.QueueEnt - updTime time.Time + entries map[string]container.QueueEnt + updTime time.Time + subscribers map[<-chan struct{}]chan struct{} + + mtx sync.Mutex } // Entries returns the containers that were queued when Update was // last called. func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) { + q.mtx.Lock() + defer q.mtx.Unlock() updTime := q.updTime r := map[string]container.QueueEnt{} for uuid, ent := range q.entries { @@ -42,30 +48,68 @@ func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) { // the state has been changed (via Lock, Unlock, or Cancel) since the // last Update, the updated state is returned. func (q *Queue) Get(uuid string) (arvados.Container, bool) { + q.mtx.Lock() + defer q.mtx.Unlock() ent, ok := q.entries[uuid] return ent.Container, ok } func (q *Queue) Forget(uuid string) { + q.mtx.Lock() + defer q.mtx.Unlock() delete(q.entries, uuid) } func (q *Queue) Lock(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked) } func (q *Queue) Unlock(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued) } func (q *Queue) Cancel(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled) } +func (q *Queue) Subscribe() <-chan struct{} { + q.mtx.Lock() + defer q.mtx.Unlock() + if q.subscribers == nil { + q.subscribers = map[<-chan struct{}]chan struct{}{} + } + ch := make(chan struct{}, 1) + q.subscribers[ch] = ch + return ch +} + +func (q *Queue) Unsubscribe(ch <-chan struct{}) { + q.mtx.Lock() + defer q.mtx.Unlock() + delete(q.subscribers, ch) +} + +// caller must have lock. +func (q *Queue) notify() { + for _, ch := range q.subscribers { + select { + case ch <- struct{}{}: + default: + } + } +} + +// caller must have lock. func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error { ent := q.entries[uuid] if ent.Container.State != from { - return fmt.Errorf("lock failed: state=%q", ent.Container.State) + return fmt.Errorf("changeState failed: state=%q", ent.Container.State) } ent.Container.State = to q.entries[uuid] = ent @@ -75,11 +119,14 @@ func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error break } } + q.notify() return nil } // Update rebuilds the current entries from the Containers slice. func (q *Queue) Update() error { + q.mtx.Lock() + defer q.mtx.Unlock() updTime := time.Now() upd := map[string]container.QueueEnt{} for _, ctr := range q.Containers { @@ -87,14 +134,20 @@ func (q *Queue) Update() error { if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) { continue } - it, _ := q.ChooseType(&ctr) - upd[ctr.UUID] = container.QueueEnt{ - Container: ctr, - InstanceType: it, + if ent, ok := upd[ctr.UUID]; ok { + ent.Container = ctr + upd[ctr.UUID] = ent + } else { + it, _ := q.ChooseType(&ctr) + upd[ctr.UUID] = container.QueueEnt{ + Container: ctr, + InstanceType: it, + } } } q.entries = upd q.updTime = updTime + q.notify() return nil } @@ -105,12 +158,21 @@ func (q *Queue) Update() error { // // The resulting changes are not exposed through Get() or Entries() // until the next call to Update(). -func (q *Queue) Notify(upd arvados.Container) { +// +// Return value is true unless the update is rejected (invalid state +// transition). +func (q *Queue) Notify(upd arvados.Container) bool { + q.mtx.Lock() + defer q.mtx.Unlock() for i, ctr := range q.Containers { if ctr.UUID == upd.UUID { - q.Containers[i] = upd - return + if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled { + q.Containers[i] = upd + return true + } + return false } } q.Containers = append(q.Containers, upd) + return true }