1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
16 // Constants for container states
22 Cancelled = "Cancelled"
25 type apiClientAuthorization struct {
26 UUID string `json:"uuid"`
27 APIToken string `json:"api_token"`
30 type apiClientAuthorizationList struct {
31 Items []apiClientAuthorization `json:"items"`
34 // Represents an Arvados container record
35 type Container struct {
36 UUID string `json:"uuid"`
37 State string `json:"state"`
38 Priority int `json:"priority"`
39 RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
40 LockedByUUID string `json:"locked_by_uuid"`
43 // ContainerList is a list of the containers from api
44 type ContainerList struct {
45 Items []Container `json:"items"`
46 ItemsAvailable int `json:"items_available"`
49 // Dispatcher holds the state of the dispatcher
50 type Dispatcher struct {
52 Arv arvadosclient.ArvadosClient
54 // When a new queued container appears and is either already owned by
55 // this dispatcher or is successfully locked, the dispatcher will call
56 // go RunContainer(). The RunContainer() goroutine gets a channel over
57 // which it will receive updates to the container state. The
58 // RunContainer() goroutine should only assume status updates come when
59 // the container record changes on the API server; if it needs to
60 // monitor the job submission to the underlying slurm/grid engine/etc
61 // queue it should spin up its own polling goroutines. When the
62 // channel is closed, that means the container is no longer being
63 // handled by this dispatcher and the goroutine should terminate. The
64 // goroutine is responsible for draining the 'status' channel, failure
65 // to do so may deadlock the dispatcher.
66 RunContainer func(*Dispatcher, Container, chan Container)
68 // Amount of time to wait between polling for updates.
69 PollInterval time.Duration
71 // Channel used to signal that RunDispatcher loop should exit.
72 DoneProcessing chan struct{}
75 mineMap map[string]chan Container
76 Auth apiClientAuthorization
77 containers chan Container
80 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
81 // for which this process is actively starting/monitoring. Returns channel to
82 // be used to send container status updates.
83 func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
84 dispatcher.mineMutex.Lock()
85 defer dispatcher.mineMutex.Unlock()
86 if ch, ok := dispatcher.mineMap[uuid]; ok {
90 ch := make(chan Container)
91 dispatcher.mineMap[uuid] = ch
95 // Release a container which is no longer being monitored.
96 func (dispatcher *Dispatcher) notMine(uuid string) {
97 dispatcher.mineMutex.Lock()
98 defer dispatcher.mineMutex.Unlock()
99 if ch, ok := dispatcher.mineMap[uuid]; ok {
101 delete(dispatcher.mineMap, uuid)
105 // checkMine returns true/false if there is a channel for updates associated
106 // with container c. If update is true, also send the container record on
108 func (dispatcher *Dispatcher) checkMine(c Container, update bool) bool {
109 dispatcher.mineMutex.Lock()
110 defer dispatcher.mineMutex.Unlock()
111 ch, ok := dispatcher.mineMap[c.UUID]
121 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
122 var containers ContainerList
123 err := dispatcher.Arv.List("containers", params, &containers)
125 log.Printf("Error getting list of containers: %q", err)
129 if containers.ItemsAvailable > len(containers.Items) {
130 // TODO: support paging
131 log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
132 containers.ItemsAvailable,
133 len(containers.Items))
135 for _, container := range containers.Items {
136 touched[container.UUID] = true
137 dispatcher.containers <- container
141 func (dispatcher *Dispatcher) pollContainers() {
142 ticker := time.NewTicker(dispatcher.PollInterval)
144 paramsQ := arvadosclient.Dict{
145 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
146 "order": []string{"priority desc"},
148 paramsP := arvadosclient.Dict{
149 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
155 touched := make(map[string]bool)
156 dispatcher.getContainers(paramsQ, touched)
157 dispatcher.getContainers(paramsP, touched)
158 dispatcher.mineMutex.Lock()
159 var monitored []string
160 for k := range dispatcher.mineMap {
161 if _, ok := touched[k]; !ok {
162 monitored = append(monitored, k)
165 dispatcher.mineMutex.Unlock()
166 if monitored != nil {
167 dispatcher.getContainers(arvadosclient.Dict{
168 "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
170 case <-dispatcher.DoneProcessing:
171 close(dispatcher.containers)
178 func (dispatcher *Dispatcher) handleUpdate(container Container) {
179 if container.State == Queued && dispatcher.checkMine(container, false) {
180 // If we previously started the job, something failed, and it
181 // was re-queued, this dispatcher might still be monitoring it.
182 // Stop the existing monitor, then try to lock and run it
184 dispatcher.notMine(container.UUID)
187 if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
188 // If container is Complete, Cancelled, or Queued, LockedByUUID
189 // will be nil. If the container was formerly Locked, moved
190 // back to Queued and then locked by another dispatcher,
191 // LockedByUUID will be different. In either case, we want
192 // to stop monitoring it.
193 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
194 dispatcher.notMine(container.UUID)
198 if dispatcher.checkMine(container, true) {
199 // Already monitored, sent status update
203 if container.State == Queued && container.Priority > 0 {
204 // Try to take the lock
205 if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
208 container.State = Locked
211 if container.State == Locked || container.State == Running {
212 // Not currently monitored but in Locked or Running state and
213 // owned by this dispatcher, so start monitoring.
214 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
218 // UpdateState makes an API call to change the state of a container.
219 func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
220 err := dispatcher.Arv.Update("containers", uuid,
222 "container": arvadosclient.Dict{"state": newState}},
225 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
230 // RunDispatcher runs the main loop of the dispatcher until receiving a message
231 // on the dispatcher.DoneProcessing channel. It also installs a signal handler
232 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
233 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
234 err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
236 log.Printf("Error getting my token UUID: %v", err)
240 dispatcher.mineMap = make(map[string]chan Container)
241 dispatcher.containers = make(chan Container)
243 // Graceful shutdown on signal
244 sigChan := make(chan os.Signal)
245 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
247 go func(sig <-chan os.Signal) {
248 for sig := range sig {
249 log.Printf("Caught signal: %v", sig)
250 dispatcher.DoneProcessing <- struct{}{}
255 defer signal.Stop(sigChan)
257 go dispatcher.pollContainers()
258 for container := range dispatcher.containers {
259 dispatcher.handleUpdate(container)