"sync"
"time"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
)
// Queue is a test stub for container.Queue. The caller specifies the
delete(q.subscribers, ch)
}
+// caller must have lock.
func (q *Queue) notify() {
for _, ch := range q.subscribers {
select {
func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
ent := q.entries[uuid]
if ent.Container.State != from {
- return fmt.Errorf("lock failed: state=%q", ent.Container.State)
+ return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
}
ent.Container.State = to
q.entries[uuid] = ent
if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
continue
}
- it, _ := q.ChooseType(&ctr)
- upd[ctr.UUID] = container.QueueEnt{
- Container: ctr,
- InstanceType: it,
+ if ent, ok := upd[ctr.UUID]; ok {
+ ent.Container = ctr
+ upd[ctr.UUID] = ent
+ } else {
+ it, _ := q.ChooseType(&ctr)
+ upd[ctr.UUID] = container.QueueEnt{
+ Container: ctr,
+ InstanceType: it,
+ }
}
}
q.entries = upd
//
// The resulting changes are not exposed through Get() or Entries()
// until the next call to Update().
-func (q *Queue) Notify(upd arvados.Container) {
+//
+// Return value is true unless the update is rejected (invalid state
+// transition).
+func (q *Queue) Notify(upd arvados.Container) bool {
q.mtx.Lock()
defer q.mtx.Unlock()
for i, ctr := range q.Containers {
if ctr.UUID == upd.UUID {
- q.Containers[i] = upd
- return
+ if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
+ q.Containers[i] = upd
+ return true
+ }
+ return false
}
}
q.Containers = append(q.Containers, upd)
+ return true
}