10703: Do not catch signals in crunch-dispatch-slurm. Simplify "stop dispatcher loop...
[arvados.git] / sdk / go / dispatch / dispatch.go
index a27971f90655ca8e49046688119092c837b77938..4129b24f94f45e3360f922adc3d694ce55706088 100644 (file)
@@ -1,72 +1,64 @@
+// Framework for monitoring the Arvados container Queue, Locks container
+// records, and runs goroutine callbacks which implement execution and
+// monitoring of the containers.
 package dispatch
 
 import (
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "log"
-       "os"
-       "os/signal"
        "sync"
-       "syscall"
        "time"
 )
 
-// Constants for container states
 const (
-       Queued    = "Queued"
-       Locked    = "Locked"
-       Running   = "Running"
-       Complete  = "Complete"
-       Cancelled = "Cancelled"
+       Queued    = arvados.ContainerStateQueued
+       Locked    = arvados.ContainerStateLocked
+       Running   = arvados.ContainerStateRunning
+       Complete  = arvados.ContainerStateComplete
+       Cancelled = arvados.ContainerStateCancelled
 )
 
-type apiClientAuthorization struct {
-       UUID     string `json:"uuid"`
-       APIToken string `json:"api_token"`
-}
-
-type apiClientAuthorizationList struct {
-       Items []apiClientAuthorization `json:"items"`
-}
-
-// Container data
-type Container struct {
-       UUID               string           `json:"uuid"`
-       State              string           `json:"state"`
-       Priority           int              `json:"priority"`
-       RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
-       LockedByUUID       string           `json:"locked_by_uuid"`
-}
-
-// ContainerList is a list of the containers from api
-type ContainerList struct {
-       Items          []Container `json:"items"`
-       ItemsAvailable int         `json:"items_available"`
-}
-
 // Dispatcher holds the state of the dispatcher
 type Dispatcher struct {
-       Arv            arvadosclient.ArvadosClient
-       RunContainer   func(*Dispatcher, Container, chan Container)
-       PollInterval   time.Duration
-       DoneProcessing chan struct{}
-
-       mineMutex  sync.Mutex
-       mineMap    map[string]chan Container
-       Auth       apiClientAuthorization
-       containers chan Container
+       // The Arvados client
+       Arv *arvadosclient.ArvadosClient
+
+       // When a new queued container appears and is either already owned by
+       // this dispatcher or is successfully locked, the dispatcher will call
+       // go RunContainer().  The RunContainer() goroutine gets a channel over
+       // which it will receive updates to the container state.  The
+       // RunContainer() goroutine should only assume status updates come when
+       // the container record changes on the API server; if it needs to
+       // monitor the job submission to the underlying slurm/grid engine/etc
+       // queue it should spin up its own polling goroutines.  When the
+       // channel is closed, that means the container is no longer being
+       // handled by this dispatcher and the goroutine should terminate.  The
+       // goroutine is responsible for draining the 'status' channel, failure
+       // to do so may deadlock the dispatcher.
+       RunContainer func(*Dispatcher, arvados.Container, chan arvados.Container)
+
+       // Amount of time to wait between polling for updates.
+       PollInterval time.Duration
+
+       mineMutex sync.Mutex
+       mineMap   map[string]chan arvados.Container
+       Auth      arvados.APIClientAuthorization
+
+       stop chan struct{}
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
 // for which this process is actively starting/monitoring.  Returns channel to
 // be used to send container status updates.
-func (dispatcher *Dispatcher) setMine(uuid string) chan Container {
+func (dispatcher *Dispatcher) setMine(uuid string) chan arvados.Container {
        dispatcher.mineMutex.Lock()
        defer dispatcher.mineMutex.Unlock()
        if ch, ok := dispatcher.mineMap[uuid]; ok {
                return ch
        }
 
-       ch := make(chan Container)
+       ch := make(chan arvados.Container)
        dispatcher.mineMap[uuid] = ch
        return ch
 }
@@ -81,22 +73,24 @@ func (dispatcher *Dispatcher) notMine(uuid string) {
        }
 }
 
-// Check if there is a channel for updates associated with this container.  If
-// so send the container record on the channel and return true, if not return
-// false.
-func (dispatcher *Dispatcher) updateMine(c Container) bool {
+// checkMine returns true if there is a channel for updates associated
+// with container c.  If update is true, also send the container record on
+// the channel.
+func (dispatcher *Dispatcher) checkMine(c arvados.Container, update bool) bool {
        dispatcher.mineMutex.Lock()
        defer dispatcher.mineMutex.Unlock()
        ch, ok := dispatcher.mineMap[c.UUID]
        if ok {
-               ch <- c
+               if update {
+                       ch <- c
+               }
                return true
        }
        return false
 }
 
 func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched map[string]bool) {
-       var containers ContainerList
+       var containers arvados.ContainerList
        err := dispatcher.Arv.List("containers", params, &containers)
        if err != nil {
                log.Printf("Error getting list of containers: %q", err)
@@ -111,12 +105,13 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
        }
        for _, container := range containers.Items {
                touched[container.UUID] = true
-               dispatcher.containers <- container
+               dispatcher.handleUpdate(container)
        }
 }
 
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
        ticker := time.NewTicker(dispatcher.PollInterval)
+       defer ticker.Stop()
 
        paramsQ := arvadosclient.Dict{
                "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
@@ -127,51 +122,57 @@ func (dispatcher *Dispatcher) pollContainers() {
                "limit":   "1000"}
 
        for {
+               touched := make(map[string]bool)
+               dispatcher.getContainers(paramsQ, touched)
+               dispatcher.getContainers(paramsP, touched)
+               dispatcher.mineMutex.Lock()
+               var monitored []string
+               for k := range dispatcher.mineMap {
+                       if _, ok := touched[k]; !ok {
+                               monitored = append(monitored, k)
+                       }
+               }
+               dispatcher.mineMutex.Unlock()
+               if monitored != nil {
+                       dispatcher.getContainers(arvadosclient.Dict{
+                               "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+               }
                select {
                case <-ticker.C:
-                       touched := make(map[string]bool)
-                       dispatcher.getContainers(paramsQ, touched)
-                       dispatcher.getContainers(paramsP, touched)
-                       dispatcher.mineMutex.Lock()
-                       var monitored []string
-                       for k := range dispatcher.mineMap {
-                               if _, ok := touched[k]; !ok {
-                                       monitored = append(monitored, k)
-                               }
-                       }
-                       dispatcher.mineMutex.Unlock()
-                       if monitored != nil {
-                               dispatcher.getContainers(arvadosclient.Dict{
-                                       "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
-                       }
-               case <-dispatcher.DoneProcessing:
-                       close(dispatcher.containers)
-                       ticker.Stop()
+               case <-stop:
                        return
                }
        }
 }
 
-func (dispatcher *Dispatcher) handleUpdate(container Container) {
+func (dispatcher *Dispatcher) handleUpdate(container arvados.Container) {
+       if container.State == Queued && dispatcher.checkMine(container, false) {
+               // If we previously started the job, something failed, and it
+               // was re-queued, this dispatcher might still be monitoring it.
+               // Stop the existing monitor, then try to lock and run it
+               // again.
+               dispatcher.notMine(container.UUID)
+       }
+
        if container.LockedByUUID != dispatcher.Auth.UUID && container.State != Queued {
                // If container is Complete, Cancelled, or Queued, LockedByUUID
-               // will be nil.  If the container was formally Locked, moved
+               // will be nil.  If the container was formerly Locked, moved
                // back to Queued and then locked by another dispatcher,
                // LockedByUUID will be different.  In either case, we want
                // to stop monitoring it.
-               log.Printf("Container %v now in state %v with locked_by_uuid %v", container.UUID, container.State, container.LockedByUUID)
+               log.Printf("Container %v now in state %q with locked_by_uuid %q", container.UUID, container.State, container.LockedByUUID)
                dispatcher.notMine(container.UUID)
                return
        }
 
-       if dispatcher.updateMine(container) {
+       if dispatcher.checkMine(container, true) {
                // Already monitored, sent status update
                return
        }
 
-       if container.State == Queued {
+       if container.State == Queued && container.Priority > 0 {
                // Try to take the lock
-               if err := dispatcher.UpdateState(container.UUID, Locked); err != nil {
+               if err := dispatcher.Lock(container.UUID); err != nil {
                        return
                }
                container.State = Locked
@@ -185,56 +186,55 @@ func (dispatcher *Dispatcher) handleUpdate(container Container) {
 }
 
 // UpdateState makes an API call to change the state of a container.
-func (dispatcher *Dispatcher) UpdateState(uuid, newState string) error {
+func (dispatcher *Dispatcher) UpdateState(uuid string, newState arvados.ContainerState) error {
        err := dispatcher.Arv.Update("containers", uuid,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": newState}},
                nil)
        if err != nil {
-               log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+               log.Printf("Error updating container %s to state %q: %q", uuid, newState, err)
        }
        return err
 }
 
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-//
-// When a new queued container appears and is successfully locked, the
-// dispatcher will call RunContainer() followed by MonitorContainer().  If a
-// container appears that is Locked or Running but not known to the dispatcher,
-// it will only call monitorContainer().  The monitorContainer() callback is
-// passed a channel over which it will receive updates to the container state.
-// The callback is responsible for draining the channel, if it fails to do so
-// it will deadlock the dispatcher.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
-       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
+// Lock makes the lock API call which updates the state of a container to Locked.
+func (dispatcher *Dispatcher) Lock(uuid string) error {
+       err := dispatcher.Arv.Call("POST", "containers", uuid, "lock", nil, nil)
        if err != nil {
-               log.Printf("Error getting my token UUID: %v", err)
-               return
+               log.Printf("Error locking container %s: %q", uuid, err)
        }
+       return err
+}
 
-       dispatcher.mineMap = make(map[string]chan Container)
-       dispatcher.containers = make(chan Container)
-
-       // Graceful shutdown on signal
-       sigChan := make(chan os.Signal)
-       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
-       go func(sig <-chan os.Signal) {
-               for sig := range sig {
-                       log.Printf("Caught signal: %v", sig)
-                       dispatcher.DoneProcessing <- struct{}{}
-               }
-       }(sigChan)
+// Unlock makes the unlock API call which updates the state of a container to Queued.
+func (dispatcher *Dispatcher) Unlock(uuid string) error {
+       err := dispatcher.Arv.Call("POST", "containers", uuid, "unlock", nil, nil)
+       if err != nil {
+               log.Printf("Error unlocking container %s: %q", uuid, err)
+       }
+       return err
+}
 
-       defer close(sigChan)
-       defer signal.Stop(sigChan)
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+       if dispatcher.stop == nil {
+               // already stopped
+               return
+       }
+       close(dispatcher.stop)
+       dispatcher.stop = nil
+}
 
-       go dispatcher.pollContainers()
-       for container := range dispatcher.containers {
-               dispatcher.handleUpdate(container)
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
+       err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
+       if err != nil {
+               log.Printf("Error getting my token UUID: %v", err)
+               return
        }
 
+       dispatcher.mineMap = make(map[string]chan arvados.Container)
+       dispatcher.stop = make(chan struct{})
+       dispatcher.pollContainers(dispatcher.stop)
        return nil
 }