11097: Merge branch 'master' into 11097-reuse-impure
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Package dispatch is a helper library for building Arvados container
2 // dispatchers.
3 package dispatch
4
5 import (
6         "context"
7         "fmt"
8         "log"
9         "sync"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
14 )
15
16 const (
17         Queued    = arvados.ContainerStateQueued
18         Locked    = arvados.ContainerStateLocked
19         Running   = arvados.ContainerStateRunning
20         Complete  = arvados.ContainerStateComplete
21         Cancelled = arvados.ContainerStateCancelled
22 )
23
24 type Dispatcher struct {
25         Arv *arvadosclient.ArvadosClient
26
27         // Queue polling frequency
28         PollPeriod time.Duration
29
30         // Time to wait between successive attempts to run the same container
31         MinRetryPeriod time.Duration
32
33         // Func that implements the container lifecycle. Must be set
34         // to a non-nil DispatchFunc before calling Run().
35         RunContainer DispatchFunc
36
37         auth     arvados.APIClientAuthorization
38         mtx      sync.Mutex
39         trackers map[string]*runTracker
40         throttle throttle
41 }
42
43 // A DispatchFunc executes a container (if the container record is
44 // Locked) or resume monitoring an already-running container, and wait
45 // until that container exits.
46 //
47 // While the container runs, the DispatchFunc should listen for
48 // updated container records on the provided channel. When the channel
49 // closes, the DispatchFunc should stop the container if it's still
50 // running, and return.
51 //
52 // The DispatchFunc should not return until the container is finished.
53 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
54
55 // Run watches the API server's queue for containers that are either
56 // ready to run and available to lock, or are already locked by this
57 // dispatcher's token. When a new one appears, Run calls RunContainer
58 // in a new goroutine.
59 func (d *Dispatcher) Run(ctx context.Context) error {
60         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
61         if err != nil {
62                 return fmt.Errorf("error getting my token UUID: %v", err)
63         }
64
65         d.throttle.hold = d.MinRetryPeriod
66
67         poll := time.NewTicker(d.PollPeriod)
68         defer poll.Stop()
69
70         for {
71                 tracked := d.trackedUUIDs()
72                 d.checkForUpdates([][]interface{}{
73                         {"uuid", "in", tracked}})
74                 d.checkForUpdates([][]interface{}{
75                         {"locked_by_uuid", "=", d.auth.UUID},
76                         {"uuid", "not in", tracked}})
77                 d.checkForUpdates([][]interface{}{
78                         {"state", "=", Queued},
79                         {"priority", ">", "0"},
80                         {"uuid", "not in", tracked}})
81                 select {
82                 case <-poll.C:
83                         continue
84                 case <-ctx.Done():
85                         return ctx.Err()
86                 }
87         }
88 }
89
90 func (d *Dispatcher) trackedUUIDs() []string {
91         d.mtx.Lock()
92         defer d.mtx.Unlock()
93         if len(d.trackers) == 0 {
94                 // API bug: ["uuid", "not in", []] does not work as
95                 // expected, but this does:
96                 return []string{"this-uuid-does-not-exist"}
97         }
98         uuids := make([]string, 0, len(d.trackers))
99         for x := range d.trackers {
100                 uuids = append(uuids, x)
101         }
102         return uuids
103 }
104
105 // Start a runner in a new goroutine, and send the initial container
106 // record to its updates channel.
107 func (d *Dispatcher) start(c arvados.Container) *runTracker {
108         tracker := &runTracker{updates: make(chan arvados.Container, 1)}
109         tracker.updates <- c
110         go func() {
111                 d.RunContainer(d, c, tracker.updates)
112
113                 d.mtx.Lock()
114                 delete(d.trackers, c.UUID)
115                 d.mtx.Unlock()
116         }()
117         return tracker
118 }
119
120 func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
121         params := arvadosclient.Dict{
122                 "filters": filters,
123                 "order":   []string{"priority desc"}}
124
125         var list arvados.ContainerList
126         for offset, more := 0, true; more; offset += len(list.Items) {
127                 params["offset"] = offset
128                 err := d.Arv.List("containers", params, &list)
129                 if err != nil {
130                         log.Printf("Error getting list of containers: %q", err)
131                         return
132                 }
133                 more = list.ItemsAvailable > len(list.Items)
134                 d.checkListForUpdates(list.Items)
135         }
136 }
137
138 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
139         d.mtx.Lock()
140         defer d.mtx.Unlock()
141         if d.trackers == nil {
142                 d.trackers = make(map[string]*runTracker)
143         }
144
145         for _, c := range containers {
146                 tracker, alreadyTracking := d.trackers[c.UUID]
147                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
148                         log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
149                 } else if alreadyTracking {
150                         switch c.State {
151                         case Queued:
152                                 tracker.close()
153                         case Locked, Running:
154                                 tracker.update(c)
155                         case Cancelled, Complete:
156                                 tracker.close()
157                         }
158                 } else {
159                         switch c.State {
160                         case Queued:
161                                 if !d.throttle.Check(c.UUID) {
162                                         break
163                                 }
164                                 err := d.lock(c.UUID)
165                                 if err != nil {
166                                         log.Printf("debug: error locking container %s: %s", c.UUID, err)
167                                         break
168                                 }
169                                 c.State = Locked
170                                 d.trackers[c.UUID] = d.start(c)
171                         case Locked, Running:
172                                 if !d.throttle.Check(c.UUID) {
173                                         break
174                                 }
175                                 d.trackers[c.UUID] = d.start(c)
176                         case Cancelled, Complete:
177                                 tracker.close()
178                         }
179                 }
180         }
181 }
182
183 // UpdateState makes an API call to change the state of a container.
184 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
185         err := d.Arv.Update("containers", uuid,
186                 arvadosclient.Dict{
187                         "container": arvadosclient.Dict{"state": state},
188                 }, nil)
189         if err != nil {
190                 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
191         }
192         return err
193 }
194
195 // Lock makes the lock API call which updates the state of a container to Locked.
196 func (d *Dispatcher) lock(uuid string) error {
197         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
198 }
199
200 // Unlock makes the unlock API call which updates the state of a container to Queued.
201 func (d *Dispatcher) Unlock(uuid string) error {
202         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
203 }
204
205 type runTracker struct {
206         closing bool
207         updates chan arvados.Container
208 }
209
210 func (tracker *runTracker) close() {
211         if !tracker.closing {
212                 close(tracker.updates)
213         }
214         tracker.closing = true
215 }
216
217 func (tracker *runTracker) update(c arvados.Container) {
218         if tracker.closing {
219                 return
220         }
221         select {
222         case <-tracker.updates:
223                 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
224         default:
225         }
226         tracker.updates <- c
227 }