10703: Do not catch signals in crunch-dispatch-slurm. Simplify "stop dispatcher loop...
authorTom Clegg <tom@curoverse.com>
Wed, 25 Jan 2017 15:59:48 +0000 (10:59 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 25 Jan 2017 15:59:48 +0000 (10:59 -0500)
sdk/go/dispatch/dispatch.go
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index 4987c01055203e262b2534e2f001b200c7de21eb..4129b24f94f45e3360f922adc3d694ce55706088 100644 (file)
@@ -7,10 +7,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "log"
-       "os"
-       "os/signal"
        "sync"
-       "syscall"
        "time"
 )
 
@@ -44,13 +41,11 @@ type Dispatcher struct {
        // Amount of time to wait between polling for updates.
        PollInterval time.Duration
 
-       // Channel used to signal that RunDispatcher loop should exit.
-       DoneProcessing chan struct{}
+       mineMutex sync.Mutex
+       mineMap   map[string]chan arvados.Container
+       Auth      arvados.APIClientAuthorization
 
-       mineMutex  sync.Mutex
-       mineMap    map[string]chan arvados.Container
-       Auth       arvados.APIClientAuthorization
-       containers chan arvados.Container
+       stop chan struct{}
 }
 
 // Goroutine-safely add/remove uuid to the set of "my" containers, i.e., ones
@@ -110,12 +105,13 @@ func (dispatcher *Dispatcher) getContainers(params arvadosclient.Dict, touched m
        }
        for _, container := range containers.Items {
                touched[container.UUID] = true
-               dispatcher.containers <- container
+               dispatcher.handleUpdate(container)
        }
 }
 
-func (dispatcher *Dispatcher) pollContainers() {
+func (dispatcher *Dispatcher) pollContainers(stop chan struct{}) {
        ticker := time.NewTicker(dispatcher.PollInterval)
+       defer ticker.Stop()
 
        paramsQ := arvadosclient.Dict{
                "filters": [][]interface{}{{"state", "=", "Queued"}, {"priority", ">", "0"}},
@@ -126,26 +122,24 @@ func (dispatcher *Dispatcher) pollContainers() {
                "limit":   "1000"}
 
        for {
+               touched := make(map[string]bool)
+               dispatcher.getContainers(paramsQ, touched)
+               dispatcher.getContainers(paramsP, touched)
+               dispatcher.mineMutex.Lock()
+               var monitored []string
+               for k := range dispatcher.mineMap {
+                       if _, ok := touched[k]; !ok {
+                               monitored = append(monitored, k)
+                       }
+               }
+               dispatcher.mineMutex.Unlock()
+               if monitored != nil {
+                       dispatcher.getContainers(arvadosclient.Dict{
+                               "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
+               }
                select {
                case <-ticker.C:
-                       touched := make(map[string]bool)
-                       dispatcher.getContainers(paramsQ, touched)
-                       dispatcher.getContainers(paramsP, touched)
-                       dispatcher.mineMutex.Lock()
-                       var monitored []string
-                       for k := range dispatcher.mineMap {
-                               if _, ok := touched[k]; !ok {
-                                       monitored = append(monitored, k)
-                               }
-                       }
-                       dispatcher.mineMutex.Unlock()
-                       if monitored != nil {
-                               dispatcher.getContainers(arvadosclient.Dict{
-                                       "filters": [][]interface{}{{"uuid", "in", monitored}}}, touched)
-                       }
-               case <-dispatcher.DoneProcessing:
-                       close(dispatcher.containers)
-                       ticker.Stop()
+               case <-stop:
                        return
                }
        }
@@ -221,10 +215,18 @@ func (dispatcher *Dispatcher) Unlock(uuid string) error {
        return err
 }
 
-// RunDispatcher runs the main loop of the dispatcher until receiving a message
-// on the dispatcher.DoneProcessing channel.  It also installs a signal handler
-// to terminate gracefully on SIGINT, SIGTERM or SIGQUIT.
-func (dispatcher *Dispatcher) RunDispatcher() (err error) {
+// Stop causes Run to return after the current polling cycle.
+func (dispatcher *Dispatcher) Stop() {
+       if dispatcher.stop == nil {
+               // already stopped
+               return
+       }
+       close(dispatcher.stop)
+       dispatcher.stop = nil
+}
+
+// Run runs the main loop of the dispatcher.
+func (dispatcher *Dispatcher) Run() (err error) {
        err = dispatcher.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &dispatcher.Auth)
        if err != nil {
                log.Printf("Error getting my token UUID: %v", err)
@@ -232,26 +234,7 @@ func (dispatcher *Dispatcher) RunDispatcher() (err error) {
        }
 
        dispatcher.mineMap = make(map[string]chan arvados.Container)
-       dispatcher.containers = make(chan arvados.Container)
-
-       // Graceful shutdown on signal
-       sigChan := make(chan os.Signal)
-       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
-
-       go func(sig <-chan os.Signal) {
-               for sig := range sig {
-                       log.Printf("Caught signal: %v", sig)
-                       dispatcher.DoneProcessing <- struct{}{}
-               }
-       }(sigChan)
-
-       defer close(sigChan)
-       defer signal.Stop(sigChan)
-
-       go dispatcher.pollContainers()
-       for container := range dispatcher.containers {
-               dispatcher.handleUpdate(container)
-       }
-
+       dispatcher.stop = make(chan struct{})
+       dispatcher.pollContainers(dispatcher.stop)
        return nil
 }
index 0ca765185119c152dd11870641c15f905042311e..cfb0c7da947186d74b1a3526eb9a3b08d9d58bf5 100644 (file)
@@ -10,7 +10,9 @@ import (
        "log"
        "os"
        "os/exec"
+       "os/signal"
        "sync"
+       "syscall"
        "time"
 )
 
@@ -54,16 +56,24 @@ func doMain() error {
        arv.Retries = 25
 
        dispatcher := dispatch.Dispatcher{
-               Arv:            arv,
-               RunContainer:   run,
-               PollInterval:   time.Duration(*pollInterval) * time.Second,
-               DoneProcessing: make(chan struct{})}
+               Arv:          arv,
+               RunContainer: run,
+               PollInterval: time.Duration(*pollInterval) * time.Second,
+       }
 
-       err = dispatcher.RunDispatcher()
+       err = dispatcher.Run()
        if err != nil {
                return err
        }
 
+       c := make(chan os.Signal, 1)
+       signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT)
+       sig := <-c
+       log.Printf("Received %s, shutting down", sig)
+       signal.Stop(c)
+
+       dispatcher.Stop()
+
        runningCmdsMutex.Lock()
        // Finished dispatching; interrupt any crunch jobs that are still running
        for _, cmd := range runningCmds {
index bcb406eb8e47667143ae5b4c246b00fbf3a53260..04547308d528cbc9a2f1c3c448f062ded015e1ee 100644 (file)
@@ -62,7 +62,6 @@ func (s *TestSuite) TestIntegration(c *C) {
        echo := "echo"
        crunchRunCommand = &echo
 
-       doneProcessing := make(chan struct{})
        dispatcher := dispatch.Dispatcher{
                Arv:          arv,
                PollInterval: time.Second,
@@ -70,9 +69,9 @@ func (s *TestSuite) TestIntegration(c *C) {
                        container arvados.Container,
                        status chan arvados.Container) {
                        run(dispatcher, container, status)
-                       doneProcessing <- struct{}{}
+                       dispatcher.Stop()
                },
-               DoneProcessing: doneProcessing}
+       }
 
        startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
                dispatcher.UpdateState(container.UUID, "Running")
@@ -80,7 +79,7 @@ func (s *TestSuite) TestIntegration(c *C) {
                return cmd.Start()
        }
 
-       err = dispatcher.RunDispatcher()
+       err = dispatcher.Run()
        c.Assert(err, IsNil)
 
        // Wait for all running crunch jobs to complete / terminate
@@ -166,7 +165,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
        *crunchRunCommand = crunchCmd
 
-       doneProcessing := make(chan struct{})
        dispatcher := dispatch.Dispatcher{
                Arv:          arv,
                PollInterval: time.Duration(1) * time.Second,
@@ -174,9 +172,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                        container arvados.Container,
                        status chan arvados.Container) {
                        run(dispatcher, container, status)
-                       doneProcessing <- struct{}{}
+                       dispatcher.Stop()
                },
-               DoneProcessing: doneProcessing}
+       }
 
        startCmd = func(container arvados.Container, cmd *exec.Cmd) error {
                dispatcher.UpdateState(container.UUID, "Running")
@@ -188,10 +186,10 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
                        time.Sleep(100 * time.Millisecond)
                }
-               dispatcher.DoneProcessing <- struct{}{}
+               dispatcher.Stop()
        }()
 
-       err := dispatcher.RunDispatcher()
+       err := dispatcher.Run()
        c.Assert(err, IsNil)
 
        // Wait for all running crunch jobs to complete / terminate
index e768b509cd6f2c69bb529d9e9a90e2d923e422ce..ae70800874dc48a8369ef0a544bffb434178b2df 100644 (file)
@@ -100,21 +100,16 @@ func doMain() error {
        defer squeueUpdater.Done()
 
        dispatcher := dispatch.Dispatcher{
-               Arv:            arv,
-               RunContainer:   run,
-               PollInterval:   time.Duration(theConfig.PollPeriod),
-               DoneProcessing: make(chan struct{})}
+               Arv:          arv,
+               RunContainer: run,
+               PollInterval: time.Duration(theConfig.PollPeriod),
+       }
 
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
 
-       err = dispatcher.RunDispatcher()
-       if err != nil {
-               return err
-       }
-
-       return nil
+       return dispatcher.Run()
 }
 
 // sbatchCmd
index 40461031e214486f1dbed9feeda6aae0d97fb76c..86844dc61c09545561b10c4df27f42df03149016 100644 (file)
@@ -146,7 +146,6 @@ func (s *TestSuite) integrationTest(c *C,
 
        theConfig.CrunchRunCommand = []string{"echo"}
 
-       doneProcessing := make(chan struct{})
        dispatcher := dispatch.Dispatcher{
                Arv:          arv,
                PollInterval: time.Duration(1) * time.Second,
@@ -155,13 +154,13 @@ func (s *TestSuite) integrationTest(c *C,
                        status chan arvados.Container) {
                        go runContainer(dispatcher, container)
                        run(dispatcher, container, status)
-                       doneProcessing <- struct{}{}
+                       dispatcher.Stop()
                },
-               DoneProcessing: doneProcessing}
+       }
 
        squeueUpdater.StartMonitor(time.Duration(500) * time.Millisecond)
 
-       err = dispatcher.RunDispatcher()
+       err = dispatcher.Run()
        c.Assert(err, IsNil)
 
        squeueUpdater.Done()
@@ -208,7 +207,6 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
        theConfig.CrunchRunCommand = []string{crunchCmd}
 
-       doneProcessing := make(chan struct{})
        dispatcher := dispatch.Dispatcher{
                Arv:          arv,
                PollInterval: time.Duration(1) * time.Second,
@@ -221,18 +219,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                                dispatcher.UpdateState(container.UUID, dispatch.Complete)
                        }()
                        run(dispatcher, container, status)
-                       doneProcessing <- struct{}{}
+                       dispatcher.Stop()
                },
-               DoneProcessing: doneProcessing}
+       }
 
        go func() {
                for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
                        time.Sleep(100 * time.Millisecond)
                }
-               dispatcher.DoneProcessing <- struct{}{}
+               dispatcher.Stop()
        }()
 
-       err := dispatcher.RunDispatcher()
+       err := dispatcher.Run()
        c.Assert(err, IsNil)
 
        c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)