10703: Do not catch signals in crunch-dispatch-slurm. Simplify "stop dispatcher loop...
[arvados.git] / sdk / go / dispatch / dispatch.go
index a48613292eba54c594cd4eb8ed9a13cf64011c70..4129b24f94f45e3360f922adc3d694ce55706088 100644 (file)
@@ -7,10 +7,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "log"
-       "os"
-       "os/signal"
        "sync"
-       "syscall"
        "time"
 )
 
@@ -25,7 +22,7 @@ const (
 // Dispatcher holds the state of the dispatcher
 type Dispatcher struct {
        // The Arvados client
-       Arv arvadosclient.ArvadosClient
+       Arv *arvadosclient.ArvadosClient
 
        // When a new queued container appears and is either already owned by
        // this dispatcher or is successfully locked, the dispatcher will call
@@ -44,13 +41,11 @@ type Dispatcher struct {
        // Amount of time to wait between polling for updates.
        PollInterval time.Duration
 
-       // Channel used to signal that RunDispatcher loop should exit.
-       DoneProcessing chan struct{}
+       mineMutex sync.Mutex
+       mineMap   map[string]chan arvados.Container
+       Auth      arvados.APIClientAuthorization
 
-       mineMutex  sync.Mutex
-       mineMap    map[string]chan arvados.Container
-       Auth       arvados.APIClientAuthorization
-       containers chan arvados.Container
+       stop chan struct{}
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
@@ -110,12 +105,13 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
        }
        for _, container := range containers.Items {
                touched[container.UUID] = true
-               dispatcher.containers <- container
+               dispatcher.handleUpdate(container)
        }
 }
 
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
        ticker := time.NewTicker(dispatcher.PollInterval)
+       defer ticker.Stop()
 
        paramsQ := arvadosclient.Dict{
                "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
@@ -126,26 +122,24 @@ func (dispatcher *Dispatcher) pollContainers() {
                "limit":   "1000"}
 
        for {
+               touched := make(map[string]bool)
+               dispatcher.getContainers(paramsQ, touched)
+               dispatcher.getContainers(paramsP, touched)
+               dispatcher.mineMutex.Lock()
+               var monitored []string
+               for k := range dispatcher.mineMap {
+                       if _, ok := touched[k]; !ok {
+                               monitored = append(monitored, k)
+                       }
+               }
+               dispatcher.mineMutex.Unlock()
+               if monitored != nil {
+                       dispatcher.getContainers(arvadosclient.Dict{
+                               "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+               }
                select {
                case <-ticker.C:
-                       touched := make(map[string]bool)
-                       dispatcher.getContainers(paramsQ, touched)
-                       dispatcher.getContainers(paramsP, touched)
-                       dispatcher.mineMutex.Lock()
-                       var monitored []string
-                       for k := range dispatcher.mineMap {
-                               if _, ok := touched[k]; !ok {
-                                       monitored = append(monitored, k)
-                               }
-                       }
-                       dispatcher.mineMutex.Unlock()
-                       if monitored != nil {
-                               dispatcher.getContainers(arvadosclient.Dict{
-                                       "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
-                       }
-               case <-dispatcher.DoneProcessing:
-                       close(dispatcher.containers)
-                       ticker.Stop()
+               case <-stop:
                        return
                }
        }
@@ -221,10 +215,18 @@ func (dispatcher *Dispatcher) Unlock(uuid string) error {
        return err
 }
 
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+       if dispatcher.stop == nil {
+               // already stopped
+               return
+       }
+       close(dispatcher.stop)
+       dispatcher.stop = nil
+}
+
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
        err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
        if err != nil {
                log.Printf("Error getting my token UUID: %v", err)
@@ -232,26 +234,7 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) {
        }
 
        dispatcher.mineMap = make(map[string]chan arvados.Container)
-       dispatcher.containers = make(chan arvados.Container)
-
-       // Graceful shutdown on signal
-       sigChan := make(chan os.Signal)
-       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
-       go func(sig <-chan os.Signal) {
-               for sig := range sig {
-                       log.Printf("Caught signal: %v", sig)
-                       dispatcher.DoneProcessing <- struct{}{}
-               }
-       }(sigChan)
-
-       defer close(sigChan)
-       defer signal.Stop(sigChan)
-
-       go dispatcher.pollContainers()
-       for container := range dispatcher.containers {
-               dispatcher.handleUpdate(container)
-       }
-
+       dispatcher.stop = make(chan struct{})
+       dispatcher.pollContainers(dispatcher.stop)
        return nil
 }