13959: Use logrus for crunch-dispatch-local logging.
[arvados.git] / sdk / go / dispatch / dispatch.go
index a48613292eba54c594cd4eb8ed9a13cf64011c70..152207ea94bb4bb4c4e21de36ca2b440691109a2 100644 (file)
@@ -1,17 +1,20 @@
-// Framework for monitoring the Arvados container Queue, Locks container
-// records, and runs goroutine callbacks which implement execution and
-// monitoring of the containers.
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+// Package dispatch is a helper library for building Arvados container
+// dispatchers.
 package dispatch
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "log"
-       "os"
-       "os/signal"
+       "context"
+       "fmt"
        "sync"
-       "syscall"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "github.com/Sirupsen/logrus"
 )
 
 const (
@@ -22,236 +25,308 @@ const (
        Cancelled = arvados.ContainerStateCancelled
 )
 
-// Dispatcher holds the state of the dispatcher
-type Dispatcher struct {
-       // The Arvados client
-       Arv arvadosclient.ArvadosClient
-
-       // When a new queued container appears and is either already owned by
-       // this dispatcher or is successfully locked, the dispatcher will call
-       // go RunContainer().  The RunContainer() goroutine gets a channel over
-       // which it will receive updates to the container state.  The
-       // RunContainer() goroutine should only assume status updates come when
-       // the container record changes on the API server; if it needs to
-       // monitor the job submission to the underlying slurm/grid engine/etc
-       // queue it should spin up its own polling goroutines.  When the
-       // channel is closed, that means the container is no longer being
-       // handled by this dispatcher and the goroutine should terminate.  The
-       // goroutine is responsible for draining the 'status' channel, failure
-       // to do so may deadlock the dispatcher.
-       RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
-
-       // Amount of time to wait between polling for updates.
-       PollInterval time.Duration
-
-       // Channel used to signal that RunDispatcher loop should exit.
-       DoneProcessing chan struct{}
-
-       mineMutex  sync.Mutex
-       mineMap    map[string]chan arvados.Container
-       Auth       arvados.APIClientAuthorization
-       containers chan arvados.Container
+type Logger interface {
+       Printf(string, ...interface{})
+       Warnf(string, ...interface{})
+       Debugf(string, ...interface{})
 }
 
-// Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
-// for which this process is actively starting/monitoring.  Returns channel to
-// be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       if ch, ok := dispatcher.mineMap[uuid]; ok {
-               return ch
-       }
+// Dispatcher struct
+type Dispatcher struct {
+       Arv *arvadosclient.ArvadosClient
 
-       ch := make(chan arvados.Container)
-       dispatcher.mineMap[uuid] = ch
-       return ch
-}
+       Logger Logger
 
-// Release a container which is no longer being monitored.
-func (dispatcher *Dispatcher) notMine(uuid string) {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       if ch, ok := dispatcher.mineMap[uuid]; ok {
-               close(ch)
-               delete(dispatcher.mineMap, uuid)
-       }
-}
+       // Queue polling frequency
+       PollPeriod time.Duration
 
-// checkMine returns true if there is a channel for updates associated
-// with container c.  If update is true, also send the container record on
-// the channel.
-func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
-       dispatcher.mineMutex.Lock()
-       defer dispatcher.mineMutex.Unlock()
-       ch, ok := dispatcher.mineMap[c.UUID]
-       if ok {
-               if update {
-                       ch <- c
-               }
-               return true
-       }
-       return false
+       // Time to wait between successive attempts to run the same container
+       MinRetryPeriod time.Duration
+
+       // Func that implements the container lifecycle. Must be set
+       // to a non-nil DispatchFunc before calling Run().
+       RunContainer DispatchFunc
+
+       auth     arvados.APIClientAuthorization
+       mtx      sync.Mutex
+       trackers map[string]*runTracker
+       throttle throttle
 }
 
-func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
-       var containers arvados.ContainerList
-       err := dispatcher.Arv.List("containers", params, &containers)
-       if err != nil {
-               log.Printf("Error getting list of containers: %q", err)
-               return
-       }
+// A DispatchFunc executes a container (if the container record is
+// Locked) or resume monitoring an already-running container, and wait
+// until that container exits.
+//
+// While the container runs, the DispatchFunc should listen for
+// updated container records on the provided channel. When the channel
+// closes, the DispatchFunc should stop the container if it's still
+// running, and return.
+//
+// The DispatchFunc should not return until the container is finished.
+type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
 
-       if containers.ItemsAvailable > len(containers.Items) {
-               // TODO: support paging
-               log.Printf("Warning!  %d containers are available but only received %d, paged requests are not yet supported, some containers may be ignored.",
-                       containers.ItemsAvailable,
-                       len(containers.Items))
+// Run watches the API server's queue for containers that are either
+// ready to run and available to lock, or are already locked by this
+// dispatcher's token. When a new one appears, Run calls RunContainer
+// in a new goroutine.
+func (d *Dispatcher) Run(ctx context.Context) error {
+       if d.Logger == nil {
+               d.Logger = logrus.StandardLogger()
        }
-       for _, container := range containers.Items {
-               touched[container.UUID] = true
-               dispatcher.containers <- container
+
+       err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
+       if err != nil {
+               return fmt.Errorf("error getting my token UUID: %v", err)
        }
-}
 
-func (dispatcher *Dispatcher) pollContainers() {
-       ticker := time.NewTicker(dispatcher.PollInterval)
+       d.throttle.hold = d.MinRetryPeriod
 
-       paramsQ := arvadosclient.Dict{
-               "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
-               "order":   []string{"priority desc"},
-               "limit":   "1000"}
-       paramsP := arvadosclient.Dict{
-               "filters": [][]interface{}{{"locked_by_uuid", "=", dispatcher.Auth.UUID}},
-               "limit":   "1000"}
+       poll := time.NewTicker(d.PollPeriod)
+       defer poll.Stop()
 
        for {
                select {
-               case <-ticker.C:
-                       touched := make(map[string]bool)
-                       dispatcher.getContainers(paramsQ, touched)
-                       dispatcher.getContainers(paramsP, touched)
-                       dispatcher.mineMutex.Lock()
-                       var monitored []string
-                       for k := range dispatcher.mineMap {
-                               if _, ok := touched[k]; !ok {
-                                       monitored = append(monitored, k)
-                               }
-                       }
-                       dispatcher.mineMutex.Unlock()
-                       if monitored != nil {
-                               dispatcher.getContainers(arvadosclient.Dict{
-                                       "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+               case <-poll.C:
+                       break
+               case <-ctx.Done():
+                       return ctx.Err()
+               }
+
+               todo := make(map[string]*runTracker)
+               d.mtx.Lock()
+               // Make a copy of trackers
+               for uuid, tracker := range d.trackers {
+                       todo[uuid] = tracker
+               }
+               d.mtx.Unlock()
+
+               // Containers I currently own (Locked/Running)
+               querySuccess := d.checkForUpdates([][]interface{}{
+                       {"locked_by_uuid", "=", d.auth.UUID}}, todo)
+
+               // Containers I should try to dispatch
+               querySuccess = d.checkForUpdates([][]interface{}{
+                       {"state", "=", Queued},
+                       {"priority", ">", "0"}}, todo) && querySuccess
+
+               if !querySuccess {
+                       // There was an error in one of the previous queries,
+                       // we probably didn't get updates for all the
+                       // containers we should have.  Don't check them
+                       // individually because it may be expensive.
+                       continue
+               }
+
+               // Containers I know about but didn't fall into the
+               // above two categories (probably Complete/Cancelled)
+               var missed []string
+               for uuid := range todo {
+                       missed = append(missed, uuid)
+               }
+
+               for len(missed) > 0 {
+                       var batch []string
+                       if len(missed) > 20 {
+                               batch = missed[0:20]
+                               missed = missed[20:]
+                       } else {
+                               batch = missed
+                               missed = missed[0:0]
                        }
-               case <-dispatcher.DoneProcessing:
-                       close(dispatcher.containers)
-                       ticker.Stop()
-                       return
+                       querySuccess = d.checkForUpdates([][]interface{}{
+                               {"uuid", "in", batch}}, todo) && querySuccess
                }
+
+               if !querySuccess {
+                       // There was an error in one of the previous queries, we probably
+                       // didn't see all the containers we should have, so don't shut down
+                       // the missed containers.
+                       continue
+               }
+
+               // Containers that I know about that didn't show up in any
+               // query should be let go.
+               for uuid, tracker := range todo {
+                       d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+                       tracker.close()
+               }
+
        }
 }
 
-func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
-       if container.State == Queued && dispatcher.checkMine(container, false) {
-               // If we previously started the job, something failed, and it
-               // was re-queued, this dispatcher might still be monitoring it.
-               // Stop the existing monitor, then try to lock and run it
-               // again.
-               dispatcher.notMine(container.UUID)
+// Start a runner in a new goroutine, and send the initial container
+// record to its updates channel.
+func (d *Dispatcher) start(c arvados.Container) *runTracker {
+       tracker := &runTracker{
+               updates: make(chan arvados.Container, 1),
+               logger:  d.Logger,
        }
+       tracker.updates <- c
+       go func() {
+               d.RunContainer(d, c, tracker.updates)
+               // RunContainer blocks for the lifetime of the container.  When
+               // it returns, the tracker should delete itself.
+               d.mtx.Lock()
+               delete(d.trackers, c.UUID)
+               d.mtx.Unlock()
+       }()
+       return tracker
+}
 
-       if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
-               // If container is Complete, Cancelled, or Queued, LockedByUUID
-               // will be nil.  If the container was formerly Locked, moved
-               // back to Queued and then locked by another dispatcher,
-               // LockedByUUID will be different.  In either case, we want
-               // to stop monitoring it.
-               log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
-               dispatcher.notMine(container.UUID)
-               return
-       }
+func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*runTracker) bool {
+       params := arvadosclient.Dict{
+               "filters": filters,
+               "order":   []string{"priority desc"}}
+       offset := 0
+       for {
+               params["offset"] = offset
 
-       if dispatcher.checkMine(container, true) {
-               // Already monitored, sent status update
-               return
-       }
+               // This list variable must be a new one declared
+               // inside the loop: otherwise, items in the API
+               // response would get deep-merged into the items
+               // loaded in previous iterations.
+               var list arvados.ContainerList
 
-       if container.State == Queued && container.Priority > 0 {
-               // Try to take the lock
-               if err := dispatcher.Lock(container.UUID); err != nil {
-                       return
+               err := d.Arv.List("containers", params, &list)
+               if err != nil {
+                       d.Logger.Warnf("error getting list of containers: %q", err)
+                       return false
+               }
+               d.checkListForUpdates(list.Items, todo)
+               offset += len(list.Items)
+               if len(list.Items) == 0 || list.ItemsAvailable <= offset {
+                       return true
                }
-               container.State = Locked
+       }
+}
+
+func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo map[string]*runTracker) {
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
        }
 
-       if container.State == Locked || container.State == Running {
-               // Not currently monitored but in Locked or Running state and
-               // owned by this dispatcher, so start monitoring.
-               go dispatcher.RunContainer(dispatcher, container, dispatcher.setMine(container.UUID))
+       for _, c := range containers {
+               tracker, alreadyTracking := d.trackers[c.UUID]
+               delete(todo, c.UUID)
+
+               if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
+                       d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+               } else if alreadyTracking {
+                       switch c.State {
+                       case Queued:
+                               tracker.close()
+                       case Locked, Running:
+                               tracker.update(c)
+                       case Cancelled, Complete:
+                               tracker.close()
+                       }
+               } else {
+                       switch c.State {
+                       case Queued:
+                               if !d.throttle.Check(c.UUID) {
+                                       break
+                               }
+                               err := d.lock(c.UUID)
+                               if err != nil {
+                                       d.Logger.Warnf("error locking container %s: %s", c.UUID, err)
+                                       break
+                               }
+                               c.State = Locked
+                               d.trackers[c.UUID] = d.start(c)
+                       case Locked, Running:
+                               if !d.throttle.Check(c.UUID) {
+                                       break
+                               }
+                               d.trackers[c.UUID] = d.start(c)
+                       case Cancelled, Complete:
+                               // no-op (we already stopped monitoring)
+                       }
+               }
        }
 }
 
 // UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
-       err := dispatcher.Arv.Update("containers", uuid,
+func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) error {
+       err := d.Arv.Update("containers", uuid,
                arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"state": newState}},
-               nil)
+                       "container": arvadosclient.Dict{"state": state},
+               }, nil)
        if err != nil {
-               log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
+               d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
        }
        return err
 }
 
 // Lock makes the lock API call which updates the state of a container to Locked.
-func (dispatcher *Dispatcher) Lock(uuid string) error {
-       err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
-       if err != nil {
-               log.Printf("Error locking container %s: %q", uuid, err)
-       }
-       return err
+func (d *Dispatcher) lock(uuid string) error {
+       return d.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
 }
 
 // Unlock makes the unlock API call which updates the state of a container to Queued.
-func (dispatcher *Dispatcher) Unlock(uuid string) error {
-       err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
-       if err != nil {
-               log.Printf("Error unlocking container %s: %q", uuid, err)
-       }
-       return err
+func (d *Dispatcher) Unlock(uuid string) error {
+       return d.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
 }
 
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
-       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
+// TrackContainer ensures a tracker is running for the given UUID,
+// regardless of the current state of the container (except: if the
+// container is locked by a different dispatcher, a tracker will not
+// be started). If the container is not in Locked or Running state,
+// the new tracker will close down immediately.
+//
+// This allows the dispatcher to put its own RunContainer func into a
+// cleanup phase (for example, to kill local processes created by a
+// prevous dispatch process that are still running even though the
+// container state is final) without the risk of having multiple
+// goroutines monitoring the same UUID.
+func (d *Dispatcher) TrackContainer(uuid string) error {
+       var cntr arvados.Container
+       err := d.Arv.Call("GET", "containers", uuid, "", nil, &cntr)
        if err != nil {
-               log.Printf("Error getting my token UUID: %v", err)
-               return
+               return err
+       }
+       if cntr.LockedByUUID != "" && cntr.LockedByUUID != d.auth.UUID {
+               return nil
        }
 
-       dispatcher.mineMap = make(map[string]chan arvados.Container)
-       dispatcher.containers = make(chan arvados.Container)
-
-       // Graceful shutdown on signal
-       sigChan := make(chan os.Signal)
-       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
-       go func(sig <-chan os.Signal) {
-               for sig := range sig {
-                       log.Printf("Caught signal: %v", sig)
-                       dispatcher.DoneProcessing <- struct{}{}
-               }
-       }(sigChan)
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+       if _, alreadyTracking := d.trackers[uuid]; alreadyTracking {
+               return nil
+       }
+       if d.trackers == nil {
+               d.trackers = make(map[string]*runTracker)
+       }
+       d.trackers[uuid] = d.start(cntr)
+       switch cntr.State {
+       case Queued, Cancelled, Complete:
+               d.trackers[uuid].close()
+       }
+       return nil
+}
 
-       defer close(sigChan)
-       defer signal.Stop(sigChan)
+type runTracker struct {
+       closing bool
+       updates chan arvados.Container
+       logger  Logger
+}
 
-       go dispatcher.pollContainers()
-       for container := range dispatcher.containers {
-               dispatcher.handleUpdate(container)
+func (tracker *runTracker) close() {
+       if !tracker.closing {
+               close(tracker.updates)
        }
+       tracker.closing = true
+}
 
-       return nil
+func (tracker *runTracker) update(c arvados.Container) {
+       if tracker.closing {
+               return
+       }
+       select {
+       case <-tracker.updates:
+               tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
+       default:
+       }
+       tracker.updates <- c
 }