Merge branch '8784-dir-listings'
[arvados.git] / sdk / go / dispatch / dispatch.go
index ce536de47a07be129d7f9e1f9ed730bdf30f6918..356d087a46f31c7f97ce92ad975300f0a0959990 100644 (file)
@@ -1,17 +1,20 @@
-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
 package dispatch
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "context"
+       "fmt"
        "log"
-       "os"
-       "os/signal"
        "sync"
-       "syscall"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 )
 
 const (
@@ -22,218 +25,245 @@ const (
        Cancelled = arvados.ContainerStateCancelled
 )
 
-// Dispatcher holds the state of the dispatcher
+// Dispatcher struct
 type Dispatcher struct {
-       // The Arvados client
-       Arv arvadosclient.ArvadosClient
-
-       // When a new queued container appears and is either already owned by
-       // this dispatcher or is successfully locked, the dispatcher will call
-       // go RunContainer().  The RunContainer() goroutine gets a channel over
-       // which it will receive updates to the container state.  The
-       // RunContainer() goroutine should only assume status updates come when
-       // the container record changes on the API server; if it needs to
-       // monitor the job submission to the underlying slurm/grid engine/etc
-       // queue it should spin up its own polling goroutines.  When the
-       // channel is closed, that means the container is no longer being
-       // handled by this dispatcher and the goroutine should terminate.  The
-       // goroutine is responsible for draining the 'status' channel, failure
-       // to do so may deadlock the dispatcher.
-       RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
-
-       // Amount of time to wait between polling for updates.
-       PollInterval time.Duration
-
-       // Channel used to signal that RunDispatcher loop should exit.
-       DoneProcessing chan struct{}
-
-       mineMutex  sync.Mutex
-       mineMap    map[string]chan arvados.Container
-       Auth       arvados.APIClientAuthorization
-       containers chan arvados.Container
-}
+       Arv *arvadosclient.ArvadosClient
 
-// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
-// for which this process is actively starting/monitoring.  Returns channel to
-// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       if ch, ok := dispatcher.mineMap[uuid]; ok {
-               return ch
-       }
+       // Queue polling frequency
+       PollPeriod time.Duration
 
-       ch := make(chan arvados.Container)
-       dispatcher.mineMap[uuid] = ch
-       return ch
-}
+       // Time to wait between successive attempts to run the same container
+       MinRetryPeriod time.Duration
 
-// Release a container which is no longer being monitored.
-func (dispatcher *Dispatcher) notMine(uuid string) {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       if ch, ok := dispatcher.mineMap[uuid]; ok {
-               close(ch)
-               delete(dispatcher.mineMap, uuid)
-       }
-}
+       // Func that implements the container lifecycle. Must be set
+       // to a non-nil DispatchFunc before calling Run().
+       RunContainer DispatchFunc
 
-// checkMine returns true if there is a channel for updates associated
-// with container c.  If update is true, also send the container record on
-// the channel.
-func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       ch, ok := dispatcher.mineMap[c.UUID]
-       if ok {
-               if update {
-                       ch <- c
-               }
-               return true
-       }
-       return false
+       auth     arvados.APIClientAuthorization
+       mtx      sync.Mutex
+       trackers map[string]*runTracker
+       throttle throttle
 }
 
-func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
-       var containers arvados.ContainerList
-       err := dispatcher.Arv.List("containers", params, &containers)
-       if err != nil {
-               log.Printf("Error getting list of containers: %q", err)
-               return
-       }
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
 
-       if containers.ItemsAvailable > len(containers.Items) {
-               // TODO: support paging
-               log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-                       containers.ItemsAvailable,
-                       len(containers.Items))
-       }
-       for _, container := range containers.Items {
-               touched[container.UUID] = true
-               dispatcher.containers <- container
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
+func (d *Dispatcher) Run(ctx context.Context) error {
+       err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
+       if err != nil {
+               return fmt.Errorf("error getting my token UUID: %v", err)
        }
-}
 
-func (dispatcher *Dispatcher) pollContainers() {
-       ticker := time.NewTicker(dispatcher.PollInterval)
+       d.throttle.hold = d.MinRetryPeriod
 
-       paramsQ := arvadosclient.Dict{
-               "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
-               "order":   []string{"priority desc"},
-               "limit":   "1000"}
-       paramsP := arvadosclient.Dict{
-               "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
-               "limit":   "1000"}
+       poll := time.NewTicker(d.PollPeriod)
+       defer poll.Stop()
 
        for {
+               tracked := d.trackedUUIDs()
+               d.checkForUpdates([][]interface{}{
+                       {"uuid", "in", tracked}})
+               d.checkForUpdates([][]interface{}{
+                       {"locked_by_uuid", "=", d.auth.UUID},
+                       {"uuid", "not in", tracked}})
+               d.checkForUpdates([][]interface{}{
+                       {"state", "=", Queued},
+                       {"priority", ">", "0"},
+                       {"uuid", "not in", tracked}})
                select {
-               case <-ticker.C:
-                       touched := make(map[string]bool)
-                       dispatcher.getContainers(paramsQ, touched)
-                       dispatcher.getContainers(paramsP, touched)
-                       dispatcher.mineMutex.Lock()
-                       var monitored []string
-                       for k := range dispatcher.mineMap {
-                               if _, ok := touched[k]; !ok {
-                                       monitored = append(monitored, k)
-                               }
-                       }
-                       dispatcher.mineMutex.Unlock()
-                       if monitored != nil {
-                               dispatcher.getContainers(arvadosclient.Dict{
-                                       "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
-                       }
-               case <-dispatcher.DoneProcessing:
-                       close(dispatcher.containers)
-                       ticker.Stop()
-                       return
+               case <-poll.C:
+                       continue
+               case <-ctx.Done():
+                       return ctx.Err()
                }
        }
 }
 
-func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
-       if container.State == Queued && dispatcher.checkMine(container, false) {
-               // If we previously started the job, something failed, and it
-               // was re-queued, this dispatcher might still be monitoring it.
-               // Stop the existing monitor, then try to lock and run it
-               // again.
-               dispatcher.notMine(container.UUID)
-       }
-
-       if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
-               // If container is Complete, Cancelled, or Queued, LockedByUUID
-               // will be nil.  If the container was formerly Locked, moved
-               // back to Queued and then locked by another dispatcher,
-               // LockedByUUID will be different.  In either case, we want
-               // to stop monitoring it.
-               log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
-               dispatcher.notMine(container.UUID)
-               return
+func (d *Dispatcher) trackedUUIDs() []string {
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if len(d.trackers) == 0 {
+               // API bug: ["uuid", "not in", []] does not work as
+               // expected, but this does:
+               return []string{"this-uuid-does-not-exist"}
        }
-
-       if dispatcher.checkMine(container, true) {
-               // Already monitored, sent status update
-               return
+       uuids := make([]string, 0, len(d.trackers))
+       for x := range d.trackers {
+               uuids = append(uuids, x)
        }
+       return uuids
+}
+
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+       tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+       tracker.updates <- c
+       go func() {
+               d.RunContainer(d, c, tracker.updates)
+
+               d.mtx.Lock()
+               delete(d.trackers, c.UUID)
+               d.mtx.Unlock()
+       }()
+       return tracker
+}
 
-       if container.State == Queued && container.Priority > 0 {
-               // Try to take the lock
-               if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}) {
+       params := arvadosclient.Dict{
+               "filters": filters,
+               "order":   []string{"priority desc"}}
+
+       var list arvados.ContainerList
+       for offset, more := 0, true; more; offset += len(list.Items) {
+               params["offset"] = offset
+               err := d.Arv.List("containers", params, &list)
+               if err != nil {
+                       log.Printf("Error getting list of containers: %q", err)
                        return
                }
-               container.State = Locked
+               more = len(list.Items) > 0 && list.ItemsAvailable > len(list.Items)+offset
+               d.checkListForUpdates(list.Items)
        }
+}
 
-       if container.State == Locked || container.State == Running {
-               // Not currently monitored but in Locked or Running state and
-               // owned by this dispatcher, so start monitoring.
-               go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container) {
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
+       }
+
+       for _, c := range containers {
+               tracker, alreadyTracking := d.trackers[c.UUID]
+               if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
+                       log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+               } else if alreadyTracking {
+                       switch c.State {
+                       case Queued:
+                               tracker.close()
+                       case Locked, Running:
+                               tracker.update(c)
+                       case Cancelled, Complete:
+                               tracker.close()
+                       }
+               } else {
+                       switch c.State {
+                       case Queued:
+                               if !d.throttle.Check(c.UUID) {
+                                       break
+                               }
+                               err := d.lock(c.UUID)
+                               if err != nil {
+                                       log.Printf("debug: error locking container %s: %s", c.UUID, err)
+                                       break
+                               }
+                               c.State = Locked
+                               d.trackers[c.UUID] = d.start(c)
+                       case Locked, Running:
+                               if !d.throttle.Check(c.UUID) {
+                                       break
+                               }
+                               d.trackers[c.UUID] = d.start(c)
+                       case Cancelled, Complete:
+                               // no-op (we already stopped monitoring)
+                       }
+               }
        }
 }
 
 // UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
-       err := dispatcher.Arv.Update("containers", uuid,
+func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
+       err := d.Arv.Update("containers", uuid,
                arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"state": newState}},
-               nil)
+                       "container": arvadosclient.Dict{"state": state},
+               }, nil)
        if err != nil {
-               log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
+               log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
        }
        return err
 }
 
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
-       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
-       if err != nil {
-               log.Printf("Error getting my token UUID: %v", err)
-               return
-       }
+// Lock makes the lock API call which updates the state of a container to Locked.
+func (d *Dispatcher) lock(uuid string) error {
+       return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
+}
 
-       dispatcher.mineMap = make(map[string]chan arvados.Container)
-       dispatcher.containers = make(chan arvados.Container)
+// Unlock makes the unlock API call which updates the state of a container to Queued.
+func (d *Dispatcher) Unlock(uuid string) error {
+       return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
+}
 
-       // Graceful shutdown on signal
-       sigChan := make(chan os.Signal)
-       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+       var cntr arvados.Container
+       err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
+       if err != nil {
+               return err
+       }
+       if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+               return nil
+       }
 
-       go func(sig <-chan os.Signal) {
-               for sig := range sig {
-                       log.Printf("Caught signal: %v", sig)
-                       dispatcher.DoneProcessing <- struct{}{}
-               }
-       }(sigChan)
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+               return nil
+       }
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
+       }
+       d.trackers[uuid] = d.start(cntr)
+       switch cntr.State {
+       case Queued, Cancelled, Complete:
+               d.trackers[uuid].close()
+       }
+       return nil
+}
 
-       defer close(sigChan)
-       defer signal.Stop(sigChan)
+type runTracker struct {
+       closing bool
+       updates chan arvados.Container
+}
 
-       go dispatcher.pollContainers()
-       for container := range dispatcher.containers {
-               dispatcher.handleUpdate(container)
+func (tracker *runTracker) close() {
+       if !tracker.closing {
+               close(tracker.updates)
        }
+       tracker.closing = true
+}
 
-       return nil
+func (tracker *runTracker) update(c arvados.Container) {
+       if tracker.closing {
+               return
+       }
+       select {
+       case <-tracker.updates:
+               log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+       default:
+       }
+       tracker.updates <- c
 }