1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
7 "git.curoverse.com/arvados.git/sdk/go/arvados"
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
15 Queued = arvados.ContainerStateQueued
16 Locked = arvados.ContainerStateLocked
17 Running = arvados.ContainerStateRunning
18 Complete = arvados.ContainerStateComplete
19 Cancelled = arvados.ContainerStateCancelled
22 // Dispatcher holds the state of the dispatcher
23 type Dispatcher struct {
25 Arv *arvadosclient.ArvadosClient
27 // When a new queued container appears and is either already owned by
28 // this dispatcher or is successfully locked, the dispatcher will call
29 // go RunContainer(). The RunContainer() goroutine gets a channel over
30 // which it will receive updates to the container state. The
31 // RunContainer() goroutine should only assume status updates come when
32 // the container record changes on the API server; if it needs to
33 // monitor the job submission to the underlying slurm/grid engine/etc
34 // queue it should spin up its own polling goroutines. When the
35 // channel is closed, that means the container is no longer being
36 // handled by this dispatcher and the goroutine should terminate. The
37 // goroutine is responsible for draining the 'status' channel, failure
38 // to do so may deadlock the dispatcher.
39 RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
41 // Amount of time to wait between polling for updates.
42 PollPeriod time.Duration
44 // Minimum time between two attempts to run the same container
45 MinRetryPeriod time.Duration
48 mineMap map[string]chan arvados.Container
49 Auth arvados.APIClientAuthorization
56 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
57 // for which this process is actively starting/monitoring. Returns channel to
58 // be used to send container status updates.
59 func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
60 dispatcher.mineMutex.Lock()
61 defer dispatcher.mineMutex.Unlock()
62 if ch, ok := dispatcher.mineMap[uuid]; ok {
66 ch := make(chan arvados.Container)
67 dispatcher.mineMap[uuid] = ch
71 // Release a container which is no longer being monitored.
72 func (dispatcher *Dispatcher) notMine(uuid string) {
73 dispatcher.mineMutex.Lock()
74 defer dispatcher.mineMutex.Unlock()
75 if ch, ok := dispatcher.mineMap[uuid]; ok {
77 delete(dispatcher.mineMap, uuid)
81 // checkMine returns true if there is a channel for updates associated
82 // with container c. If update is true, also send the container record on
84 func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
85 dispatcher.mineMutex.Lock()
86 defer dispatcher.mineMutex.Unlock()
87 ch, ok := dispatcher.mineMap[c.UUID]
97 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
98 var containers arvados.ContainerList
99 err := dispatcher.Arv.List("containers", params, &containers)
101 log.Printf("Error getting list of containers: %q", err)
105 if containers.ItemsAvailable > len(containers.Items) {
106 // TODO: support paging
107 log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
108 containers.ItemsAvailable,
109 len(containers.Items))
111 for _, container := range containers.Items {
112 touched[container.UUID] = true
113 dispatcher.handleUpdate(container)
117 func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
118 ticker := time.NewTicker(dispatcher.PollPeriod)
121 paramsQ := arvadosclient.Dict{
122 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
123 "order": []string{"priority desc"},
125 paramsP := arvadosclient.Dict{
126 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
130 touched := make(map[string]bool)
131 dispatcher.getContainers(paramsQ, touched)
132 dispatcher.getContainers(paramsP, touched)
133 dispatcher.mineMutex.Lock()
134 var monitored []string
135 for k := range dispatcher.mineMap {
136 if _, ok := touched[k]; !ok {
137 monitored = append(monitored, k)
140 dispatcher.mineMutex.Unlock()
141 if monitored != nil {
142 dispatcher.getContainers(arvadosclient.Dict{
143 "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
153 func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
154 if container.State == Queued && dispatcher.checkMine(container, false) {
155 // If we previously started the job, something failed, and it
156 // was re-queued, this dispatcher might still be monitoring it.
157 // Stop the existing monitor, then try to lock and run it
159 dispatcher.notMine(container.UUID)
162 if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
163 // If container is Complete, Cancelled, or Queued, LockedByUUID
164 // will be nil. If the container was formerly Locked, moved
165 // back to Queued and then locked by another dispatcher,
166 // LockedByUUID will be different. In either case, we want
167 // to stop monitoring it.
168 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
169 dispatcher.notMine(container.UUID)
173 if dispatcher.checkMine(container, true) {
174 // Already monitored, sent status update
178 if container.State == Queued && container.Priority > 0 {
179 if !dispatcher.throttle.Check(container.UUID) {
182 // Try to take the lock
183 if err := dispatcher.Lock(container.UUID); err != nil {
186 container.State = Locked
189 if container.State == Locked || container.State == Running {
190 // Not currently monitored but in Locked or Running state and
191 // owned by this dispatcher, so start monitoring.
192 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
196 // UpdateState makes an API call to change the state of a container.
197 func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
198 err := dispatcher.Arv.Update("containers", uuid,
200 "container": arvadosclient.Dict{"state": newState}},
203 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
208 // Lock makes the lock API call which updates the state of a container to Locked.
209 func (dispatcher *Dispatcher) Lock(uuid string) error {
210 err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
212 log.Printf("Error locking container %s: %q", uuid, err)
217 // Unlock makes the unlock API call which updates the state of a container to Queued.
218 func (dispatcher *Dispatcher) Unlock(uuid string) error {
219 err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
221 log.Printf("Error unlocking container %s: %q", uuid, err)
226 // Stop causes Run to return after the current polling cycle.
227 func (dispatcher *Dispatcher) Stop() {
228 if dispatcher.stop == nil {
232 close(dispatcher.stop)
233 dispatcher.stop = nil
236 // Run runs the main loop of the dispatcher.
237 func (dispatcher *Dispatcher) Run() (err error) {
238 err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
240 log.Printf("Error getting my token UUID: %v", err)
244 dispatcher.mineMap = make(map[string]chan arvados.Container)
245 dispatcher.stop = make(chan struct{})
246 dispatcher.throttle.hold = dispatcher.MinRetryPeriod
247 dispatcher.pollContainers(dispatcher.stop)