Merge branch '11097-reuse-impure'
[arvados.git] / sdk / go / dispatch / dispatch.go
index 7342c3b8cb9d98577d8639ef24d7de96c48e8f47..fd507371b1c451bd15a5962b74fe6e4c989a44d9 100644 (file)
@@ -1,6 +1,5 @@
-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
 package dispatch
 
 import (
@@ -22,76 +21,63 @@ const (
        Cancelled = arvados.ContainerStateCancelled
 )
 
-type runner struct {
-       closing bool
-       updates chan arvados.Container
-}
-
-func (ex *runner) close() {
-       if !ex.closing {
-               close(ex.updates)
-       }
-       ex.closing = true
-}
+type Dispatcher struct {
+       Arv *arvadosclient.ArvadosClient
 
-func (ex *runner) update(c arvados.Container) {
-       if ex.closing {
-               return
-       }
-       select {
-       case <-ex.updates:
-               log.Print("debug: executor is handling updates slowly, discarded previous update for %s", c.UUID)
-       default:
-       }
-       ex.updates <- c
-}
+       // Queue polling frequency
+       PollPeriod time.Duration
 
-type Dispatcher struct {
-       Arv            *arvadosclient.ArvadosClient
-       PollPeriod     time.Duration
+       // Time to wait between successive attempts to run the same container
        MinRetryPeriod time.Duration
-       RunContainer   Runner
+
+       // Func that implements the container lifecycle. Must be set
+       // to a non-nil DispatchFunc before calling Run().
+       RunContainer DispatchFunc
 
        auth     arvados.APIClientAuthorization
        mtx      sync.Mutex
-       running  map[string]*runner
+       trackers map[string]*runTracker
        throttle throttle
 }
 
-// A Runner executes a container. If it starts any goroutines, it must
-// not return until it can guarantee that none of those goroutines
-// will do anything with this container.
-type Runner func(*Dispatcher, arvados.Container, <-chan arvados.Container)
-
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
+
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
 func (d *Dispatcher) Run(ctx context.Context) error {
        err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
        if err != nil {
                return fmt.Errorf("error getting my token UUID: %v", err)
        }
 
+       d.throttle.hold = d.MinRetryPeriod
+
        poll := time.NewTicker(d.PollPeriod)
        defer poll.Stop()
 
        for {
-               running := make([]string, 0, len(d.running))
-               d.mtx.Lock()
-               for uuid := range d.running {
-                       running = append(running, uuid)
-               }
-               d.mtx.Unlock()
-               if len(running) == 0 {
-                       // API bug: ["uuid", "not in", []] does not match everything
-                       running = []string{"X"}
-               }
+               tracked := d.trackedUUIDs()
                d.checkForUpdates([][]interface{}{
-                       {"uuid", "in", running}})
+                       {"uuid", "in", tracked}})
+               d.checkForUpdates([][]interface{}{
+                       {"locked_by_uuid", "=", d.auth.UUID},
+                       {"uuid", "not in", tracked}})
                d.checkForUpdates([][]interface{}{
                        {"state", "=", Queued},
                        {"priority", ">", "0"},
-                       {"uuid", "not in", running}})
-               d.checkForUpdates([][]interface{}{
-                       {"locked_by_uuid", "=", d.auth.UUID},
-                       {"uuid", "not in", running}})
+                       {"uuid", "not in", tracked}})
                select {
                case <-poll.C:
                        continue
@@ -101,71 +87,94 @@ func (d *Dispatcher) Run(ctx context.Context) error {
        }
 }
 
-func (d *Dispatcher) start(c arvados.Container) *runner {
-       ex := &runner{
-               updates: make(chan arvados.Container, 1),
+func (d *Dispatcher) trackedUUIDs() []string {
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if len(d.trackers) == 0 {
+               // API bug: ["uuid", "not in", []] does not work as
+               // expected, but this does:
+               return []string{"this-uuid-does-not-exist"}
        }
-       if d.running == nil {
-               d.running = make(map[string]*runner)
+       uuids := make([]string, 0, len(d.trackers))
+       for x := range d.trackers {
+               uuids = append(uuids, x)
        }
-       d.running[c.UUID] = ex
+       return uuids
+}
+
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+       tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+       tracker.updates <- c
        go func() {
-               d.RunContainer(d, c, ex.updates)
+               d.RunContainer(d, c, tracker.updates)
+
                d.mtx.Lock()
-               delete(d.running, c.UUID)
+               delete(d.trackers, c.UUID)
                d.mtx.Unlock()
        }()
-       return ex
+       return tracker
 }
 
 func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
        params := arvadosclient.Dict{
                "filters": filters,
-               "order":   []string{"priority desc"},
-               "limit":   "1000"}
+               "order":   []string{"priority desc"}}
 
        var list arvados.ContainerList
-       err := d.Arv.List("containers", params, &list)
-       if err != nil {
-               log.Printf("Error getting list of containers: %q", err)
-               return
-       }
-
-       if list.ItemsAvailable > len(list.Items) {
-               // TODO: support paging
-               log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-                       list.ItemsAvailable,
-                       len(list.Items))
+       for offset, more := 0, true; more; offset += len(list.Items) {
+               params["offset"] = offset
+               err := d.Arv.List("containers", params, &list)
+               if err != nil {
+                       log.Printf("Error getting list of containers: %q", err)
+                       return
+               }
+               more = list.ItemsAvailable > len(list.Items)
+               d.checkListForUpdates(list.Items)
        }
+}
 
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
        d.mtx.Lock()
        defer d.mtx.Unlock()
-       for _, c := range list.Items {
-               ex, running := d.running[c.UUID]
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
+       }
+
+       for _, c := range containers {
+               tracker, alreadyTracking := d.trackers[c.UUID]
                if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
                        log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
-               } else if running {
+               } else if alreadyTracking {
                        switch c.State {
                        case Queued:
-                               ex.close()
+                               tracker.close()
                        case Locked, Running:
-                               ex.update(c)
+                               tracker.update(c)
                        case Cancelled, Complete:
-                               ex.close()
+                               tracker.close()
                        }
                } else {
                        switch c.State {
                        case Queued:
-                               if err := d.lock(c.UUID); err != nil {
-                                       log.Printf("Error locking container %s: %s", c.UUID, err)
-                               } else {
-                                       c.State = Locked
-                                       d.start(c).update(c)
+                               if !d.throttle.Check(c.UUID) {
+                                       break
                                }
+                               err := d.lock(c.UUID)
+                               if err != nil {
+                                       log.Printf("debug: error locking container %s: %s", c.UUID, err)
+                                       break
+                               }
+                               c.State = Locked
+                               d.trackers[c.UUID] = d.start(c)
                        case Locked, Running:
-                               d.start(c).update(c)
+                               if !d.throttle.Check(c.UUID) {
+                                       break
+                               }
+                               d.trackers[c.UUID] = d.start(c)
                        case Cancelled, Complete:
-                               ex.close()
+                               // no-op (we already stopped monitoring)
                        }
                }
        }
@@ -192,3 +201,27 @@ func (d *Dispatcher) lock(uuid string) error {
 func (d *Dispatcher) Unlock(uuid string) error {
        return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
+
+type runTracker struct {
+       closing bool
+       updates chan arvados.Container
+}
+
+func (tracker *runTracker) close() {
+       if !tracker.closing {
+               close(tracker.updates)
+       }
+       tracker.closing = true
+}
+
+func (tracker *runTracker) update(c arvados.Container) {
+       if tracker.closing {
+               return
+       }
+       select {
+       case <-tracker.updates:
+               log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+       default:
+       }
+       tracker.updates <- c
+}