1 // Package dispatch is a helper library for building Arvados container
12 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 Queued = arvados.ContainerStateQueued
18 Locked = arvados.ContainerStateLocked
19 Running = arvados.ContainerStateRunning
20 Complete = arvados.ContainerStateComplete
21 Cancelled = arvados.ContainerStateCancelled
25 type Dispatcher struct {
26 Arv *arvadosclient.ArvadosClient
28 // Queue polling frequency
29 PollPeriod time.Duration
31 // Time to wait between successive attempts to run the same container
32 MinRetryPeriod time.Duration
34 // Func that implements the container lifecycle. Must be set
35 // to a non-nil DispatchFunc before calling Run().
36 RunContainer DispatchFunc
38 auth arvados.APIClientAuthorization
40 trackers map[string]*runTracker
44 // A DispatchFunc executes a container (if the container record is
45 // Locked) or resume monitoring an already-running container, and wait
46 // until that container exits.
48 // While the container runs, the DispatchFunc should listen for
49 // updated container records on the provided channel. When the channel
50 // closes, the DispatchFunc should stop the container if it's still
51 // running, and return.
53 // The DispatchFunc should not return until the container is finished.
54 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
56 // Run watches the API server's queue for containers that are either
57 // ready to run and available to lock, or are already locked by this
58 // dispatcher's token. When a new one appears, Run calls RunContainer
59 // in a new goroutine.
60 func (d *Dispatcher) Run(ctx context.Context) error {
61 err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
63 return fmt.Errorf("error getting my token UUID: %v", err)
66 d.throttle.hold = d.MinRetryPeriod
68 poll := time.NewTicker(d.PollPeriod)
72 tracked := d.trackedUUIDs()
73 d.checkForUpdates([][]interface{}{
74 {"uuid", "in", tracked}})
75 d.checkForUpdates([][]interface{}{
76 {"locked_by_uuid", "=", d.auth.UUID},
77 {"uuid", "not in", tracked}})
78 d.checkForUpdates([][]interface{}{
79 {"state", "=", Queued},
80 {"priority", ">", "0"},
81 {"uuid", "not in", tracked}})
91 func (d *Dispatcher) trackedUUIDs() []string {
94 if len(d.trackers) == 0 {
95 // API bug: ["uuid", "not in", []] does not work as
96 // expected, but this does:
97 return []string{"this-uuid-does-not-exist"}
99 uuids := make([]string, 0, len(d.trackers))
100 for x := range d.trackers {
101 uuids = append(uuids, x)
106 // Start a runner in a new goroutine, and send the initial container
107 // record to its updates channel.
108 func (d *Dispatcher) start(c arvados.Container) *runTracker {
109 tracker := &runTracker{updates: make(chan arvados.Container, 1)}
112 d.RunContainer(d, c, tracker.updates)
115 delete(d.trackers, c.UUID)
121 func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
122 params := arvadosclient.Dict{
124 "order": []string{"priority desc"}}
126 var list arvados.ContainerList
127 for offset, more := 0, true; more; offset += len(list.Items) {
128 params["offset"] = offset
129 err := d.Arv.List("containers", params, &list)
131 log.Printf("Error getting list of containers: %q", err)
134 more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
135 d.checkListForUpdates(list.Items)
139 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
142 if d.trackers == nil {
143 d.trackers = make(map[string]*runTracker)
146 for _, c := range containers {
147 tracker, alreadyTracking := d.trackers[c.UUID]
148 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
149 log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
150 } else if alreadyTracking {
154 case Locked, Running:
156 case Cancelled, Complete:
162 if !d.throttle.Check(c.UUID) {
165 err := d.lock(c.UUID)
167 log.Printf("debug: error locking container %s: %s", c.UUID, err)
171 d.trackers[c.UUID] = d.start(c)
172 case Locked, Running:
173 if !d.throttle.Check(c.UUID) {
176 d.trackers[c.UUID] = d.start(c)
177 case Cancelled, Complete:
178 // no-op (we already stopped monitoring)
184 // UpdateState makes an API call to change the state of a container.
185 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
186 err := d.Arv.Update("containers", uuid,
188 "container": arvadosclient.Dict{"state": state},
191 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
196 // Lock makes the lock API call which updates the state of a container to Locked.
197 func (d *Dispatcher) lock(uuid string) error {
198 return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
201 // Unlock makes the unlock API call which updates the state of a container to Queued.
202 func (d *Dispatcher) Unlock(uuid string) error {
203 return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
206 // TrackContainer ensures a tracker is running for the given UUID,
207 // regardless of the current state of the container (except: if the
208 // container is locked by a different dispatcher, a tracker will not
209 // be started). If the container is not in Locked or Running state,
210 // the new tracker will close down immediately.
212 // This allows the dispatcher to put its own RunContainer func into a
213 // cleanup phase (for example, to kill local processes created by a
214 // prevous dispatch process that are still running even though the
215 // container state is final) without the risk of having multiple
216 // goroutines monitoring the same UUID.
217 func (d *Dispatcher) TrackContainer(uuid string) error {
218 var cntr arvados.Container
219 err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
223 if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
229 if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
232 if d.trackers == nil {
233 d.trackers = make(map[string]*runTracker)
235 d.trackers[uuid] = d.start(cntr)
237 case Queued, Cancelled, Complete:
238 d.trackers[uuid].close()
243 type runTracker struct {
245 updates chan arvados.Container
248 func (tracker *runTracker) close() {
249 if !tracker.closing {
250 close(tracker.updates)
252 tracker.closing = true
255 func (tracker *runTracker) update(c arvados.Container) {
260 case <-tracker.updates:
261 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)