1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
12 "git.arvados.org/arvados.git/lib/dispatchcloud/container"
13 "git.arvados.org/arvados.git/sdk/go/arvados"
16 // Queue is a test stub for container.Queue. The caller specifies the
17 // initial queue state.
19 // Containers represent the API server database contents.
20 Containers []arvados.Container
22 // ChooseType will be called for each entry in Containers. It
24 ChooseType func(*arvados.Container) (arvados.InstanceType, error)
26 entries map[string]container.QueueEnt
28 subscribers map[<-chan struct{}]chan struct{}
33 // Entries returns the containers that were queued when Update was
35 func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
39 r := map[string]container.QueueEnt{}
40 for uuid, ent := range q.entries {
46 // Get returns the container from the cached queue, i.e., as it was
47 // when Update was last called -- just like a container.Queue does. If
48 // the state has been changed (via Lock, Unlock, or Cancel) since the
49 // last Update, the updated state is returned.
50 func (q *Queue) Get(uuid string) (arvados.Container, bool) {
53 ent, ok := q.entries[uuid]
54 return ent.Container, ok
57 func (q *Queue) Forget(uuid string) {
60 delete(q.entries, uuid)
63 func (q *Queue) Lock(uuid string) error {
66 return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
69 func (q *Queue) Unlock(uuid string) error {
72 return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
75 func (q *Queue) Cancel(uuid string) error {
78 return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
81 func (q *Queue) Subscribe() <-chan struct{} {
84 if q.subscribers == nil {
85 q.subscribers = map[<-chan struct{}]chan struct{}{}
87 ch := make(chan struct{}, 1)
88 q.subscribers[ch] = ch
92 func (q *Queue) Unsubscribe(ch <-chan struct{}) {
95 delete(q.subscribers, ch)
98 // caller must have lock.
99 func (q *Queue) notify() {
100 for _, ch := range q.subscribers {
102 case ch <- struct{}{}:
108 // caller must have lock.
109 func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
110 ent := q.entries[uuid]
111 if ent.Container.State != from {
112 return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
114 ent.Container.State = to
115 q.entries[uuid] = ent
116 for i, ctr := range q.Containers {
117 if ctr.UUID == uuid {
118 q.Containers[i].State = to
126 // Update rebuilds the current entries from the Containers slice.
127 func (q *Queue) Update() error {
130 updTime := time.Now()
131 upd := map[string]container.QueueEnt{}
132 for _, ctr := range q.Containers {
133 _, exists := q.entries[ctr.UUID]
134 if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
137 if ent, ok := upd[ctr.UUID]; ok {
141 it, _ := q.ChooseType(&ctr)
142 upd[ctr.UUID] = container.QueueEnt{
154 // Notify adds/updates an entry in the Containers slice. This
155 // simulates the effect of an API update from someone other than the
156 // dispatcher -- e.g., crunch-run updating state to "Complete" when a
159 // The resulting changes are not exposed through Get() or Entries()
160 // until the next call to Update().
162 // Return value is true unless the update is rejected (invalid state
164 func (q *Queue) Notify(upd arvados.Container) bool {
167 for i, ctr := range q.Containers {
168 if ctr.UUID == upd.UUID {
169 if ctr.State != arvados.ContainerStateComplete && ctr.State != arvados.ContainerStateCancelled {
170 q.Containers[i] = upd
176 q.Containers = append(q.Containers, upd)