+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
// Package dispatch is a helper library for building Arvados container
// dispatchers.
package dispatch
Cancelled = arvados.ContainerStateCancelled
)
+// Dispatcher struct
type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
defer poll.Stop()
for {
- tracked := d.trackedUUIDs()
- d.checkForUpdates([][]interface{}{
- {"uuid", "in", tracked}})
- d.checkForUpdates([][]interface{}{
- {"locked_by_uuid", "=", d.auth.UUID},
- {"uuid", "not in", tracked}})
- d.checkForUpdates([][]interface{}{
- {"state", "=", Queued},
- {"priority", ">", "0"},
- {"uuid", "not in", tracked}})
select {
case <-poll.C:
- continue
+ break
case <-ctx.Done():
return ctx.Err()
}
- }
-}
-func (d *Dispatcher) trackedUUIDs() []string {
- d.mtx.Lock()
- defer d.mtx.Unlock()
- if len(d.trackers) == 0 {
- // API bug: ["uuid", "not in", []] does not work as
- // expected, but this does:
- return []string{"this-uuid-does-not-exist"}
- }
- uuids := make([]string, 0, len(d.trackers))
- for x := range d.trackers {
- uuids = append(uuids, x)
+ todo := make(map[string]*runTracker)
+ d.mtx.Lock()
+ // Make a copy of trackers
+ for uuid, tracker := range d.trackers {
+ todo[uuid] = tracker
+ }
+ d.mtx.Unlock()
+
+ // Containers I currently own (Locked/Running)
+ querySuccess := d.checkForUpdates([][]interface{}{
+ {"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+ // Containers I should try to dispatch
+ querySuccess = d.checkForUpdates([][]interface{}{
+ {"state", "=", Queued},
+ {"priority", ">", "0"}}, todo) && querySuccess
+
+ if !querySuccess {
+ // There was an error in one of the previous queries,
+ // we probably didn't get updates for all the
+ // containers we should have. Don't check them
+ // individually because it may be expensive.
+ continue
+ }
+
+ // Containers I know about but didn't fall into the
+ // above two categories (probably Complete/Cancelled)
+ var missed []string
+ for uuid := range todo {
+ missed = append(missed, uuid)
+ }
+
+ for len(missed) > 0 {
+ var batch []string
+ if len(missed) > 20 {
+ batch = missed[0:20]
+ missed = missed[20:]
+ } else {
+ batch = missed
+ missed = missed[0:0]
+ }
+ querySuccess = d.checkForUpdates([][]interface{}{
+ {"uuid", "in", batch}}, todo) && querySuccess
+ }
+
+ if !querySuccess {
+ // There was an error in one of the previous queries, we probably
+ // didn't see all the containers we should have, so don't shut down
+ // the missed containers.
+ continue
+ }
+
+ // Containers that I know about that didn't show up in any
+ // query should be let go.
+ for uuid, tracker := range todo {
+ log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+ tracker.close()
+ }
+
}
- return uuids
}
// Start a runner in a new goroutine, and send the initial container
tracker.updates <- c
go func() {
d.RunContainer(d, c, tracker.updates)
-
+ // RunContainer blocks for the lifetime of the container. When
+ // it returns, the tracker should delete itself.
d.mtx.Lock()
delete(d.trackers, c.UUID)
d.mtx.Unlock()
return tracker
}
-func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
params := arvadosclient.Dict{
"filters": filters,
"order": []string{"priority desc"}}
err := d.Arv.List("containers", params, &list)
if err != nil {
log.Printf("Error getting list of containers: %q", err)
- return
+ return false
}
more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
- d.checkListForUpdates(list.Items)
+ d.checkListForUpdates(list.Items, todo)
}
+ return true
}
-func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
d.mtx.Lock()
defer d.mtx.Unlock()
if d.trackers == nil {
for _, c := range containers {
tracker, alreadyTracking := d.trackers[c.UUID]
+ delete(todo, c.UUID)
+
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
}
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+ var cntr arvados.Container
+ err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+ if err != nil {
+ return err
+ }
+ if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+ return nil
+ }
+
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+ if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+ return nil
+ }
+ if d.trackers == nil {
+ d.trackers = make(map[string]*runTracker)
+ }
+ d.trackers[uuid] = d.start(cntr)
+ switch cntr.State {
+ case Queued, Cancelled, Complete:
+ d.trackers[uuid].close()
+ }
+ return nil
+}
+
type runTracker struct {
closing bool
updates chan arvados.Container