1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
7 "git.curoverse.com/arvados.git/sdk/go/arvados"
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
18 Queued = arvados.ContainerStateQueued
19 Locked = arvados.ContainerStateLocked
20 Running = arvados.ContainerStateRunning
21 Complete = arvados.ContainerStateComplete
22 Cancelled = arvados.ContainerStateCancelled
25 type apiClientAuthorization struct {
26 UUID string `json:"uuid"`
27 APIToken string `json:"api_token"`
30 type apiClientAuthorizationList struct {
31 Items []apiClientAuthorization `json:"items"`
34 // Dispatcher holds the state of the dispatcher
35 type Dispatcher struct {
37 Arv arvadosclient.ArvadosClient
39 // When a new queued container appears and is either already owned by
40 // this dispatcher or is successfully locked, the dispatcher will call
41 // go RunContainer(). The RunContainer() goroutine gets a channel over
42 // which it will receive updates to the container state. The
43 // RunContainer() goroutine should only assume status updates come when
44 // the container record changes on the API server; if it needs to
45 // monitor the job submission to the underlying slurm/grid engine/etc
46 // queue it should spin up its own polling goroutines. When the
47 // channel is closed, that means the container is no longer being
48 // handled by this dispatcher and the goroutine should terminate. The
49 // goroutine is responsible for draining the 'status' channel, failure
50 // to do so may deadlock the dispatcher.
51 RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
53 // Amount of time to wait between polling for updates.
54 PollInterval time.Duration
56 // Channel used to signal that RunDispatcher loop should exit.
57 DoneProcessing chan struct{}
60 mineMap map[string]chan arvados.Container
61 Auth apiClientAuthorization
62 containers chan arvados.Container
65 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
66 // for which this process is actively starting/monitoring. Returns channel to
67 // be used to send container status updates.
68 func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
69 dispatcher.mineMutex.Lock()
70 defer dispatcher.mineMutex.Unlock()
71 if ch, ok := dispatcher.mineMap[uuid]; ok {
75 ch := make(chan arvados.Container)
76 dispatcher.mineMap[uuid] = ch
80 // Release a container which is no longer being monitored.
81 func (dispatcher *Dispatcher) notMine(uuid string) {
82 dispatcher.mineMutex.Lock()
83 defer dispatcher.mineMutex.Unlock()
84 if ch, ok := dispatcher.mineMap[uuid]; ok {
86 delete(dispatcher.mineMap, uuid)
90 // checkMine returns true if there is a channel for updates associated
91 // with container c. If update is true, also send the container record on
93 func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
94 dispatcher.mineMutex.Lock()
95 defer dispatcher.mineMutex.Unlock()
96 ch, ok := dispatcher.mineMap[c.UUID]
106 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
107 var containers arvados.ContainerList
108 err := dispatcher.Arv.List("containers", params, &containers)
110 log.Printf("Error getting list of containers: %q", err)
114 if containers.ItemsAvailable > len(containers.Items) {
115 // TODO: support paging
116 log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
117 containers.ItemsAvailable,
118 len(containers.Items))
120 for _, container := range containers.Items {
121 touched[container.UUID] = true
122 dispatcher.containers <- container
126 func (dispatcher *Dispatcher) pollContainers() {
127 ticker := time.NewTicker(dispatcher.PollInterval)
129 paramsQ := arvadosclient.Dict{
130 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
131 "order": []string{"priority desc"},
133 paramsP := arvadosclient.Dict{
134 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
140 touched := make(map[string]bool)
141 dispatcher.getContainers(paramsQ, touched)
142 dispatcher.getContainers(paramsP, touched)
143 dispatcher.mineMutex.Lock()
144 var monitored []string
145 for k := range dispatcher.mineMap {
146 if _, ok := touched[k]; !ok {
147 monitored = append(monitored, k)
150 dispatcher.mineMutex.Unlock()
151 if monitored != nil {
152 dispatcher.getContainers(arvadosclient.Dict{
153 "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
155 case <-dispatcher.DoneProcessing:
156 close(dispatcher.containers)
163 func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
164 if container.State == Queued && dispatcher.checkMine(container, false) {
165 // If we previously started the job, something failed, and it
166 // was re-queued, this dispatcher might still be monitoring it.
167 // Stop the existing monitor, then try to lock and run it
169 dispatcher.notMine(container.UUID)
172 if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
173 // If container is Complete, Cancelled, or Queued, LockedByUUID
174 // will be nil. If the container was formerly Locked, moved
175 // back to Queued and then locked by another dispatcher,
176 // LockedByUUID will be different. In either case, we want
177 // to stop monitoring it.
178 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
179 dispatcher.notMine(container.UUID)
183 if dispatcher.checkMine(container, true) {
184 // Already monitored, sent status update
188 if container.State == Queued && container.Priority > 0 {
189 // Try to take the lock
190 if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
193 container.State = Locked
196 if container.State == Locked || container.State == Running {
197 // Not currently monitored but in Locked or Running state and
198 // owned by this dispatcher, so start monitoring.
199 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
203 // UpdateState makes an API call to change the state of a container.
204 func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
205 err := dispatcher.Arv.Update("containers", uuid,
207 "container": arvadosclient.Dict{"state": newState}},
210 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
215 // RunDispatcher runs the main loop of the dispatcher until receiving a message
216 // on the dispatcher.DoneProcessing channel. It also installs a signal handler
217 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
218 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
219 err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
221 log.Printf("Error getting my token UUID: %v", err)
225 dispatcher.mineMap = make(map[string]chan arvados.Container)
226 dispatcher.containers = make(chan arvados.Container)
228 // Graceful shutdown on signal
229 sigChan := make(chan os.Signal)
230 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
232 go func(sig <-chan os.Signal) {
233 for sig := range sig {
234 log.Printf("Caught signal: %v", sig)
235 dispatcher.DoneProcessing <- struct{}{}
240 defer signal.Stop(sigChan)
242 go dispatcher.pollContainers()
243 for container := range dispatcher.containers {
244 dispatcher.handleUpdate(container)