10701: Add back MinRetryPeriod throttle. Update comments/identifiers.
[arvados.git] / sdk / go / dispatch / dispatch.go
1 // Package dispatch is a helper library for building Arvados container
2 // dispatchers.
3 package dispatch
4
5 import (
6         "context"
7         "fmt"
8         "log"
9         "sync"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
14 )
15
16 const (
17         Queued    = arvados.ContainerStateQueued
18         Locked    = arvados.ContainerStateLocked
19         Running   = arvados.ContainerStateRunning
20         Complete  = arvados.ContainerStateComplete
21         Cancelled = arvados.ContainerStateCancelled
22 )
23
24 type Dispatcher struct {
25         Arv *arvadosclient.ArvadosClient
26
27         // Queue polling frequency
28         PollPeriod time.Duration
29
30         // Time to wait between successive attempts to run the same container
31         MinRetryPeriod time.Duration
32
33         // Func that implements the container lifecycle. Must be set
34         // to a non-nil DispatchFunc before calling Run().
35         RunContainer DispatchFunc
36
37         auth     arvados.APIClientAuthorization
38         mtx      sync.Mutex
39         running  map[string]*runTracker
40         throttle throttle
41 }
42
43 // A DispatchFunc executes a container (if the container record is
44 // Locked) or resume monitoring an already-running container, and wait
45 // until that container exits.
46 //
47 // While the container runs, the DispatchFunc should listen for
48 // updated container records on the provided channel. When the channel
49 // closes, the DispatchFunc should stop the container if it's still
50 // running, and return.
51 //
52 // The DispatchFunc should not return until the container is finished.
53 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
54
55 // Run watches the API server's queue for containers that are either
56 // ready to run and available to lock, or are already locked by this
57 // dispatcher's token. When a new one appears, Run calls RunContainer
58 // in a new goroutine.
59 func (d *Dispatcher) Run(ctx context.Context) error {
60         err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
61         if err != nil {
62                 return fmt.Errorf("error getting my token UUID: %v", err)
63         }
64
65         d.throttle.hold = d.MinRetryPeriod
66
67         poll := time.NewTicker(d.PollPeriod)
68         defer poll.Stop()
69
70         for {
71                 d.checkForUpdates([][]interface{}{
72                         {"uuid", "in", d.runningUUIDs()}})
73                 d.checkForUpdates([][]interface{}{
74                         {"locked_by_uuid", "=", d.auth.UUID},
75                         {"uuid", "not in", d.runningUUIDs()}})
76                 d.checkForUpdates([][]interface{}{
77                         {"state", "=", Queued},
78                         {"priority", ">", "0"},
79                         {"uuid", "not in", d.runningUUIDs()}})
80                 select {
81                 case <-poll.C:
82                         continue
83                 case <-ctx.Done():
84                         return ctx.Err()
85                 }
86         }
87 }
88
89 func (d *Dispatcher) runningUUIDs() []string {
90         d.mtx.Lock()
91         defer d.mtx.Unlock()
92         if len(d.running) == 0 {
93                 // API bug: ["uuid", "not in", []] does not match everything
94                 return []string{"X"}
95         }
96         uuids := make([]string, 0, len(d.running))
97         for x := range d.running {
98                 uuids = append(uuids, x)
99         }
100         return uuids
101 }
102
103 // Start a runner in a new goroutine, and send the initial container
104 // record to its updates channel.
105 func (d *Dispatcher) start(c arvados.Container) *runTracker {
106         updates := make(chan arvados.Container, 1)
107         tracker := &runTracker{updates: updates}
108         tracker.updates <- c
109         go func() {
110                 d.RunContainer(d, c, tracker.updates)
111
112                 d.mtx.Lock()
113                 delete(d.running, c.UUID)
114                 d.mtx.Unlock()
115         }()
116         return tracker
117 }
118
119 func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
120         params := arvadosclient.Dict{
121                 "filters": filters,
122                 "order":   []string{"priority desc"},
123                 "limit":   "1000"}
124
125         var list arvados.ContainerList
126         err := d.Arv.List("containers", params, &list)
127         if err != nil {
128                 log.Printf("Error getting list of containers: %q", err)
129                 return
130         }
131
132         if list.ItemsAvailable > len(list.Items) {
133                 // TODO: support paging
134                 log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
135                         list.ItemsAvailable,
136                         len(list.Items))
137         }
138
139         d.mtx.Lock()
140         defer d.mtx.Unlock()
141         if d.running == nil {
142                 d.running = make(map[string]*runTracker)
143         }
144
145         for _, c := range list.Items {
146                 tracker, running := d.running[c.UUID]
147                 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
148                         log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
149                 } else if running {
150                         switch c.State {
151                         case Queued:
152                                 tracker.close()
153                         case Locked, Running:
154                                 tracker.update(c)
155                         case Cancelled, Complete:
156                                 tracker.close()
157                         }
158                 } else {
159                         switch c.State {
160                         case Queued:
161                                 if !d.throttle.Check(c.UUID) {
162                                         break
163                                 }
164                                 err := d.lock(c.UUID)
165                                 if err != nil {
166                                         log.Printf("debug: error locking container %s: %s", c.UUID, err)
167                                         break
168                                 }
169                                 c.State = Locked
170                                 d.running[c.UUID] = d.start(c)
171                         case Locked, Running:
172                                 if !d.throttle.Check(c.UUID) {
173                                         break
174                                 }
175                                 d.running[c.UUID] = d.start(c)
176                         case Cancelled, Complete:
177                                 tracker.close()
178                         }
179                 }
180         }
181 }
182
183 // UpdateState makes an API call to change the state of a container.
184 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
185         err := d.Arv.Update("containers", uuid,
186                 arvadosclient.Dict{
187                         "container": arvadosclient.Dict{"state": state},
188                 }, nil)
189         if err != nil {
190                 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
191         }
192         return err
193 }
194
195 // Lock makes the lock API call which updates the state of a container to Locked.
196 func (d *Dispatcher) lock(uuid string) error {
197         return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
198 }
199
200 // Unlock makes the unlock API call which updates the state of a container to Queued.
201 func (d *Dispatcher) Unlock(uuid string) error {
202         return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
203 }
204
205 type runTracker struct {
206         closing bool
207         updates chan<- arvados.Container
208 }
209
210 func (tracker *runTracker) close() {
211         if !tracker.closing {
212                 close(tracker.updates)
213         }
214         tracker.closing = true
215 }
216
217 func (tracker *runTracker) update(c arvados.Container) {
218         if tracker.closing {
219                 return
220         }
221         select {
222         case <-tracker.updates:
223                 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
224         default:
225         }
226         tracker.updates <- c
227 }