8028: add signal handling to dispatcher.
authorradhika <radhika@curoverse.com>
Wed, 20 Jan 2016 02:40:59 +0000 (21:40 -0500)
committerradhika <radhika@curoverse.com>
Wed, 20 Jan 2016 17:36:02 +0000 (12:36 -0500)
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go

index 2b7bd2492edbc7e11ac6d4d8db5ebb8ab763bbfc..e5758390403591c03d52a8fbf60313891b7f4d94 100644 (file)
@@ -6,6 +6,8 @@ import (
        "log"
        "os"
        "os/exec"
+       "os/signal"
+       "syscall"
        "time"
 )
 
@@ -17,6 +19,7 @@ func main() {
 }
 
 var arv arvadosclient.ArvadosClient
+var runningCmds map[string]*exec.Cmd
 
 func doMain() error {
        flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
@@ -45,6 +48,22 @@ func doMain() error {
                return err
        }
 
+       runningCmds = make(map[string]*exec.Cmd)
+       sigChan = make(chan os.Signal, 1)
+       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+       go func(sig <-chan os.Signal) {
+               for sig := range sig {
+                       doneProcessing <- true
+                       caught := sig
+                       for uuid, cmd := range runningCmds {
+                               cmd.Process.Signal(caught)
+                               if _, err := cmd.Process.Wait(); err != nil {
+                                       log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+                               }
+                       }
+               }
+       }(sigChan)
+
        // channel to terminate
        doneProcessing = make(chan bool)
 
@@ -54,6 +73,7 @@ func doMain() error {
 }
 
 var doneProcessing chan bool
+var sigChan chan os.Signal
 
 // Poll for queued containers using pollInterval.
 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
@@ -122,6 +142,8 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                return
        }
 
+       runningCmds[uuid] = cmd
+
        log.Printf("Started container run for %v", uuid)
 
        err := arv.Update("containers", uuid,
@@ -146,6 +168,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                                        if container.Priority == 0 {
                                                priorityTicker.Stop()
                                                cmd.Process.Signal(os.Interrupt)
+                                               delete(runningCmds, uuid)
                                                return
                                        }
                                }
@@ -157,6 +180,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
        if _, err := cmd.Process.Wait(); err != nil {
                log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
        }
+       delete(runningCmds, uuid)
 
        priorityTicker.Stop()
 
index 997c63a003285b03cbe2dfba987a41d842f52dfd..ef8cb836c9b5727d32d7fd4fdc0ee51d0c5888c6 100644 (file)
@@ -10,6 +10,7 @@ import (
        "net/http/httptest"
        "os"
        "strings"
+       "syscall"
        "testing"
        "time"
 
@@ -63,16 +64,13 @@ func (s *TestSuite) Test_doMain(c *C) {
        os.Args = append(os.Args, args...)
 
        go func() {
-               time.Sleep(2 * time.Second)
-               doneProcessing <- true
+               time.Sleep(5 * time.Second)
+               sigChan <- syscall.SIGINT
        }()
 
        err := doMain()
        c.Check(err, IsNil)
 
-       // Give some time for run goroutine to complete
-       time.Sleep(1 * time.Second)
-
        // There should be no queued containers now
        params := arvadosclient.Dict{
                "filters": [][]string{[]string{"state", "=", "Queued"}},
@@ -148,7 +146,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 
        go func() {
                time.Sleep(1 * time.Second)
-               doneProcessing <- true
+               sigChan <- syscall.SIGTERM
        }()
 
        runQueuedContainers(1, 1, crunchCmd)