1 // Framework for monitoring the Arvados container Queue, Locks container
2 // records, and runs goroutine callbacks which implement execution and
3 // monitoring of the containers.
7 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
16 // Constants for container states
22 Cancelled = "Cancelled"
25 type apiClientAuthorization struct {
26 UUID string `json:"uuid"`
27 APIToken string `json:"api_token"`
30 type apiClientAuthorizationList struct {
31 Items []apiClientAuthorization `json:"items"`
34 // Represents an Arvados container record
35 type Container struct {
36 UUID string `json:"uuid"`
37 State string `json:"state"`
38 Priority int `json:"priority"`
39 RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
40 LockedByUUID string `json:"locked_by_uuid"`
43 // ContainerList is a list of the containers from api
44 type ContainerList struct {
45 Items []Container `json:"items"`
46 ItemsAvailable int `json:"items_available"`
49 // Dispatcher holds the state of the dispatcher
50 type Dispatcher struct {
52 Arv arvadosclient.ArvadosClient
54 // When a new queued container appears and is either already owned by
55 // this dispatcher or is successfully locked, the dispatcher will call
56 // go RunContainer(). The RunContainer() goroutine gets a channel over
57 // which it will receive updates to the container state. The
58 // RunContainer() goroutine should only assume status updates come when
59 // the container record changes on the API server; if it needs to
60 // monitor the job submission to the underlying slurm/grid engine/etc
61 // queue it should spin up its own polling goroutines. When the
62 // channel is closed, that means the container is no longer being
63 // handled by this dispatcher and the goroutine should terminate. The
64 // goroutine is responsible for draining the 'status' channel, failure
65 // to do so may deadlock the dispatcher.
66 RunContainer func(*Dispatcher, Container, chan Container)
68 // Amount of time to wait between polling for updates.
69 PollInterval time.Duration
71 // Channel used to signal that RunDispatcher loop should exit.
72 DoneProcessing chan struct{}
75 mineMap map[string]chan Container
76 Auth apiClientAuthorization
77 containers chan Container
80 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
81 // for which this process is actively starting/monitoring. Returns channel to
82 // be used to send container status updates.
83 func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
84 dispatcher.mineMutex.Lock()
85 defer dispatcher.mineMutex.Unlock()
86 if ch, ok := dispatcher.mineMap[uuid]; ok {
90 ch := make(chan Container)
91 dispatcher.mineMap[uuid] = ch
95 // Release a container which is no longer being monitored.
96 func (dispatcher *Dispatcher) notMine(uuid string) {
97 dispatcher.mineMutex.Lock()
98 defer dispatcher.mineMutex.Unlock()
99 if ch, ok := dispatcher.mineMap[uuid]; ok {
101 delete(dispatcher.mineMap, uuid)
105 // Check if there is a channel for updates associated with this container. If
106 // so send the container record on the channel and return true, if not return
108 func (dispatcher *Dispatcher) updateMine(c Container) bool {
109 dispatcher.mineMutex.Lock()
110 defer dispatcher.mineMutex.Unlock()
111 ch, ok := dispatcher.mineMap[c.UUID]
119 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
120 var containers ContainerList
121 err := dispatcher.Arv.List("containers", params, &containers)
123 log.Printf("Error getting list of containers: %q", err)
127 if containers.ItemsAvailable > len(containers.Items) {
128 // TODO: support paging
129 log.Printf("Warning! %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
130 containers.ItemsAvailable,
131 len(containers.Items))
133 for _, container := range containers.Items {
134 touched[container.UUID] = true
135 dispatcher.containers <- container
139 func (dispatcher *Dispatcher) pollContainers() {
140 ticker := time.NewTicker(dispatcher.PollInterval)
142 paramsQ := arvadosclient.Dict{
143 "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
144 "order": []string{"priority desc"},
146 paramsP := arvadosclient.Dict{
147 "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
153 touched := make(map[string]bool)
154 dispatcher.getContainers(paramsQ, touched)
155 dispatcher.getContainers(paramsP, touched)
156 dispatcher.mineMutex.Lock()
157 var monitored []string
158 for k := range dispatcher.mineMap {
159 if _, ok := touched[k]; !ok {
160 monitored = append(monitored, k)
163 dispatcher.mineMutex.Unlock()
164 if monitored != nil {
165 dispatcher.getContainers(arvadosclient.Dict{
166 "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
168 case <-dispatcher.DoneProcessing:
169 close(dispatcher.containers)
176 func (dispatcher *Dispatcher) handleUpdate(container Container) {
177 if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
178 // If container is Complete, Cancelled, or Queued, LockedByUUID
179 // will be nil. If the container was formerly Locked, moved
180 // back to Queued and then locked by another dispatcher,
181 // LockedByUUID will be different. In either case, we want
182 // to stop monitoring it.
183 log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
184 dispatcher.notMine(container.UUID)
188 if dispatcher.updateMine(container) {
189 // Already monitored, sent status update
193 if container.State == Queued {
194 // Try to take the lock
195 if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
198 container.State = Locked
201 if container.State == Locked || container.State == Running {
202 // Not currently monitored but in Locked or Running state and
203 // owned by this dispatcher, so start monitoring.
204 go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
208 // UpdateState makes an API call to change the state of a container.
209 func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
210 err := dispatcher.Arv.Update("containers", uuid,
212 "container": arvadosclient.Dict{"state": newState}},
215 log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
220 // RunDispatcher runs the main loop of the dispatcher until receiving a message
221 // on the dispatcher.DoneProcessing channel. It also installs a signal handler
222 // to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
223 func (dispatcher *Dispatcher) RunDispatcher() (err error) {
224 err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
226 log.Printf("Error getting my token UUID: %v", err)
230 dispatcher.mineMap = make(map[string]chan Container)
231 dispatcher.containers = make(chan Container)
233 // Graceful shutdown on signal
234 sigChan := make(chan os.Signal)
235 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
237 go func(sig <-chan os.Signal) {
238 for sig := range sig {
239 log.Printf("Caught signal: %v", sig)
240 dispatcher.DoneProcessing <- struct{}{}
245 defer signal.Stop(sigChan)
247 go dispatcher.pollContainers()
248 for container := range dispatcher.containers {
249 dispatcher.handleUpdate(container)