1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
7 "git.curoverse.com/arvados.git/sdk/go/arvados"
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 Queued = arvados.ContainerStateQueued
19 Locked = arvados.ContainerStateLocked
20 Running = arvados.ContainerStateRunning
21 Complete = arvados.ContainerStateComplete
22 Cancelled = arvados.ContainerStateCancelled
25 // Dispatcher holds the state of the dispatcher
26 type Dispatcher struct {
28 Arv arvadosclient.ArvadosClient
30 // When a new queued container appears and is either already owned by
31 // this dispatcher or is successfully locked, the dispatcher will call
32 // go RunContainer(). The RunContainer() goroutine gets a channel over
33 // which it will receive updates to the container state. The
34 // RunContainer() goroutine should only assume status updates come when
35 // the container record changes on the API server; if it needs to
36 // monitor the job submission to the underlying slurm/grid engine/etc
37 // queue it should spin up its own polling goroutines. When the
38 // channel is closed, that means the container is no longer being
39 // handled by this dispatcher and the goroutine should terminate. The
40 // goroutine is responsible for draining the 'status' channel, failure
41 // to do so may deadlock the dispatcher.
42 RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
44 // Amount of time to wait between polling for updates.
45 PollInterval time.Duration
47 // Channel used to signal that RunDispatcher loop should exit.
48 DoneProcessing chan struct{}
51 mineMap map[string]chan arvados.Container
52 Auth arvados.APIClientAuthorization
53 containers chan arvados.Container
56 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
57 // for which this process is actively starting/monitoring. Returns channel to
58 // be used to send container status updates.
59 func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
60 dispatcher.mineMutex.Lock()
61 defer dispatcher.mineMutex.Unlock()
62 if ch, ok := dispatcher.mineMap[uuid]; ok {
66 ch := make(chan arvados.Container)
67 dispatcher.mineMap[uuid] = ch
71 // Release a container which is no longer being monitored.
72 func (dispatcher *Dispatcher) notMine(uuid string) {
73 dispatcher.mineMutex.Lock()
74 defer dispatcher.mineMutex.Unlock()
75 if ch, ok := dispatcher.mineMap[uuid]; ok {
77 delete(dispatcher.mineMap, uuid)
81 // checkMine returns true if there is a channel for updates associated
82 // with container c. If update is true, also send the container record on
84 func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
85 dispatcher.mineMutex.Lock()
86 defer dispatcher.mineMutex.Unlock()
87 ch, ok := dispatcher.mineMap[c.UUID]
97 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
98 var containers arvados.ContainerList
99 err := dispatcher.Arv.List("containers", params, &containers)
101 log.Printf("Error getting list of containers: %q", err)
105 if containers.ItemsAvailable > len(containers.Items) {
106 // TODO: support paging
107 log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
108 containers.ItemsAvailable,
109 len(containers.Items))
111 for _, container := range containers.Items {
112 touched[container.UUID] = true
113 dispatcher.containers <- container
117 func (dispatcher *Dispatcher) pollContainers() {
118 ticker := time.NewTicker(dispatcher.PollInterval)
120 paramsQ := arvadosclient.Dict{
121 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
122 "order": []string{"priority desc"},
124 paramsP := arvadosclient.Dict{
125 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
131 touched := make(map[string]bool)
132 dispatcher.getContainers(paramsQ, touched)
133 dispatcher.getContainers(paramsP, touched)
134 dispatcher.mineMutex.Lock()
135 var monitored []string
136 for k := range dispatcher.mineMap {
137 if _, ok := touched[k]; !ok {
138 monitored = append(monitored, k)
141 dispatcher.mineMutex.Unlock()
142 if monitored != nil {
143 dispatcher.getContainers(arvadosclient.Dict{
144 "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
146 case <-dispatcher.DoneProcessing:
147 close(dispatcher.containers)
154 func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
155 if container.State == Queued && dispatcher.checkMine(container, false) {
156 // If we previously started the job, something failed, and it
157 // was re-queued, this dispatcher might still be monitoring it.
158 // Stop the existing monitor, then try to lock and run it
160 dispatcher.notMine(container.UUID)
163 if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
164 // If container is Complete, Cancelled, or Queued, LockedByUUID
165 // will be nil. If the container was formerly Locked, moved
166 // back to Queued and then locked by another dispatcher,
167 // LockedByUUID will be different. In either case, we want
168 // to stop monitoring it.
169 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
170 dispatcher.notMine(container.UUID)
174 if dispatcher.checkMine(container, true) {
175 // Already monitored, sent status update
179 if container.State == Queued && container.Priority > 0 {
180 // Try to take the lock
181 if err := dispatcher.Lock(container.UUID); err != nil {
184 container.State = Locked
187 if container.State == Locked || container.State == Running {
188 // Not currently monitored but in Locked or Running state and
189 // owned by this dispatcher, so start monitoring.
190 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
194 // UpdateState makes an API call to change the state of a container.
195 func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
196 err := dispatcher.Arv.Update("containers", uuid,
198 "container": arvadosclient.Dict{"state": newState}},
201 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
206 // Lock makes the lock API call which updates the state of a container to Locked.
207 func (dispatcher *Dispatcher) Lock(uuid string) error {
208 err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
210 log.Printf("Error locking container %s: %q", uuid, err)
215 // Unlock makes the unlock API call which updates the state of a container to Queued.
216 func (dispatcher *Dispatcher) Unlock(uuid string) error {
217 err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
219 log.Printf("Error unlocking container %s: %q", uuid, err)
224 // RunDispatcher runs the main loop of the dispatcher until receiving a message
225 // on the dispatcher.DoneProcessing channel. It also installs a signal handler
226 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
227 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
228 err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
230 log.Printf("Error getting my token UUID: %v", err)
234 dispatcher.mineMap = make(map[string]chan arvados.Container)
235 dispatcher.containers = make(chan arvados.Container)
237 // Graceful shutdown on signal
238 sigChan := make(chan os.Signal)
239 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
241 go func(sig <-chan os.Signal) {
242 for sig := range sig {
243 log.Printf("Caught signal: %v", sig)
244 dispatcher.DoneProcessing <- struct{}{}
249 defer signal.Stop(sigChan)
251 go dispatcher.pollContainers()
252 for container := range dispatcher.containers {
253 dispatcher.handleUpdate(container)