1 // Package dispatch is a helper library for building Arvados container
12 "git.curoverse.com/arvados.git/sdk/go/arvados"
13 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
17 Queued = arvados.ContainerStateQueued
18 Locked = arvados.ContainerStateLocked
19 Running = arvados.ContainerStateRunning
20 Complete = arvados.ContainerStateComplete
21 Cancelled = arvados.ContainerStateCancelled
24 type Dispatcher struct {
25 Arv *arvadosclient.ArvadosClient
27 // Queue polling frequency
28 PollPeriod time.Duration
30 // Time to wait between successive attempts to run the same container
31 MinRetryPeriod time.Duration
33 // Func that implements the container lifecycle. Must be set
34 // to a non-nil DispatchFunc before calling Run().
35 RunContainer DispatchFunc
37 auth arvados.APIClientAuthorization
39 trackers map[string]*runTracker
43 // A DispatchFunc executes a container (if the container record is
44 // Locked) or resume monitoring an already-running container, and wait
45 // until that container exits.
47 // While the container runs, the DispatchFunc should listen for
48 // updated container records on the provided channel. When the channel
49 // closes, the DispatchFunc should stop the container if it's still
50 // running, and return.
52 // The DispatchFunc should not return until the container is finished.
53 type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
55 // Run watches the API server's queue for containers that are either
56 // ready to run and available to lock, or are already locked by this
57 // dispatcher's token. When a new one appears, Run calls RunContainer
58 // in a new goroutine.
59 func (d *Dispatcher) Run(ctx context.Context) error {
60 err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
62 return fmt.Errorf("error getting my token UUID: %v", err)
65 d.throttle.hold = d.MinRetryPeriod
67 poll := time.NewTicker(d.PollPeriod)
71 tracked := d.trackedUUIDs()
72 d.checkForUpdates([][]interface{}{
73 {"uuid", "in", tracked}})
74 d.checkForUpdates([][]interface{}{
75 {"locked_by_uuid", "=", d.auth.UUID},
76 {"uuid", "not in", tracked}})
77 d.checkForUpdates([][]interface{}{
78 {"state", "=", Queued},
79 {"priority", ">", "0"},
80 {"uuid", "not in", tracked}})
90 func (d *Dispatcher) trackedUUIDs() []string {
93 if len(d.trackers) == 0 {
94 // API bug: ["uuid", "not in", []] does not work as
95 // expected, but this does:
96 return []string{"this-uuid-does-not-exist"}
98 uuids := make([]string, 0, len(d.trackers))
99 for x := range d.trackers {
100 uuids = append(uuids, x)
105 // Start a runner in a new goroutine, and send the initial container
106 // record to its updates channel.
107 func (d *Dispatcher) start(c arvados.Container) *runTracker {
108 tracker := &runTracker{updates: make(chan arvados.Container, 1)}
111 d.RunContainer(d, c, tracker.updates)
114 delete(d.trackers, c.UUID)
120 func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
121 params := arvadosclient.Dict{
123 "order": []string{"priority desc"}}
125 var list arvados.ContainerList
126 for offset, more := 0, true; more; offset += len(list.Items) {
127 params["offset"] = offset
128 err := d.Arv.List("containers", params, &list)
130 log.Printf("Error getting list of containers: %q", err)
133 more = list.ItemsAvailable > len(list.Items)
134 d.checkListForUpdates(list.Items)
138 func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
141 if d.trackers == nil {
142 d.trackers = make(map[string]*runTracker)
145 for _, c := range containers {
146 tracker, alreadyTracking := d.trackers[c.UUID]
147 if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
148 log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
149 } else if alreadyTracking {
153 case Locked, Running:
155 case Cancelled, Complete:
161 if !d.throttle.Check(c.UUID) {
164 err := d.lock(c.UUID)
166 log.Printf("debug: error locking container %s: %s", c.UUID, err)
170 d.trackers[c.UUID] = d.start(c)
171 case Locked, Running:
172 if !d.throttle.Check(c.UUID) {
175 d.trackers[c.UUID] = d.start(c)
176 case Cancelled, Complete:
177 // no-op (we already stopped monitoring)
183 // UpdateState makes an API call to change the state of a container.
184 func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
185 err := d.Arv.Update("containers", uuid,
187 "container": arvadosclient.Dict{"state": state},
190 log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
195 // Lock makes the lock API call which updates the state of a container to Locked.
196 func (d *Dispatcher) lock(uuid string) error {
197 return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
200 // Unlock makes the unlock API call which updates the state of a container to Queued.
201 func (d *Dispatcher) Unlock(uuid string) error {
202 return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
205 type runTracker struct {
207 updates chan arvados.Container
210 func (tracker *runTracker) close() {
211 if !tracker.closing {
212 close(tracker.updates)
214 tracker.closing = true
217 func (tracker *runTracker) update(c arvados.Container) {
222 case <-tracker.updates:
223 log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)