8028: add command to waitGroup during run method itself; not during signal handling.
authorradhika <radhika@curoverse.com>
Wed, 20 Jan 2016 16:40:30 +0000 (11:40 -0500)
committerradhika <radhika@curoverse.com>
Wed, 20 Jan 2016 17:36:02 +0000 (12:36 -0500)
services/crunch-dispatch-local/crunch-dispatch-local.go

index eb8550eceb8ebcaca25ace2ad2771a4bd587a227..47c3564502b4c5f27157102a93257386f2f1c96b 100644 (file)
@@ -19,8 +19,14 @@ func main() {
        }
 }
 
-var arv arvadosclient.ArvadosClient
-var runningCmds map[string]*exec.Cmd
+var (
+       arv              arvadosclient.ArvadosClient
+       runningCmds      map[string]*exec.Cmd
+       runningCmdsMutex sync.Mutex
+       waitGroup        sync.WaitGroup
+       doneProcessing   chan bool
+       sigChan          chan os.Signal
+)
 
 func doMain() error {
        flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
@@ -49,44 +55,46 @@ func doMain() error {
                return err
        }
 
+       // Channel to terminate
+       doneProcessing = make(chan bool)
+
+       // Map of running crunch jobs
        runningCmds = make(map[string]*exec.Cmd)
+
+       // Graceful shutdown
        sigChan = make(chan os.Signal, 1)
        signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
        go func(sig <-chan os.Signal) {
-               var wg sync.WaitGroup
                for sig := range sig {
+                       log.Printf("Caught signal: %v", sig)
                        doneProcessing <- true
-                       caught := sig
-                       for uuid, cmd := range runningCmds {
-                               go func(uuid string) {
-                                       wg.Add(1)
-                                       defer wg.Done()
-                                       cmd.Process.Signal(caught)
-                                       if _, err := cmd.Process.Wait(); err != nil {
-                                               log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
-                                       }
-                               }(uuid)
-                       }
                }
-               wg.Wait()
        }(sigChan)
 
-       // channel to terminate
-       doneProcessing = make(chan bool)
-
-       // run all queued containers
+       // Run all queued containers
        runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+
+       // Finished dispatching; interrupt any crunch jobs that are still running
+       for uuid, cmd := range runningCmds {
+               go func(uuid string) {
+                       cmd.Process.Signal(os.Interrupt)
+                       if _, err := cmd.Process.Wait(); err != nil {
+                               log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
+                       }
+               }(uuid)
+       }
+
+       // Wait for all running crunch jobs to complete / terminate
+       waitGroup.Wait()
+
        return nil
 }
 
-var doneProcessing chan bool
-var sigChan chan os.Signal
-
 // Poll for queued containers using pollInterval.
 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
 //
 // Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more child processes are running,
+// This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
        ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
@@ -129,6 +137,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 
        for i := 0; i < len(containers.Items); i++ {
                log.Printf("About to run queued container %v", containers.Items[i].UUID)
+               // Run the container
                go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
        }
 }
@@ -149,10 +158,18 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                return
        }
 
+       // Add this crunch job to the list of runningCmds
+       runningCmdsMutex.Lock()
        runningCmds[uuid] = cmd
+       runningCmdsMutex.Unlock()
 
        log.Printf("Started container run for %v", uuid)
 
+       // Add this crunch job to waitGroup
+       waitGroup.Add(1)
+       defer waitGroup.Done()
+
+       // Update container status to Running
        err := arv.Update("containers", uuid,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": "Running"}},
@@ -161,7 +178,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
        }
 
-       // Terminate the runner if container priority becomes zero
+       // A goroutine to terminate the runner if container priority becomes zero
        priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
        go func() {
                for {
@@ -175,7 +192,9 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                                        if container.Priority == 0 {
                                                priorityTicker.Stop()
                                                cmd.Process.Signal(os.Interrupt)
+                                               runningCmdsMutex.Lock()
                                                delete(runningCmds, uuid)
+                                               runningCmdsMutex.Unlock()
                                                return
                                        }
                                }
@@ -183,14 +202,19 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                }
        }()
 
-       // Wait for the process to exit
+       // Wait for the crunch job to exit
        if _, err := cmd.Process.Wait(); err != nil {
-               log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+               log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
        }
+
+       // Remove the crunch job to runningCmds
+       runningCmdsMutex.Lock()
        delete(runningCmds, uuid)
+       runningCmdsMutex.Unlock()
 
        priorityTicker.Stop()
 
+       // The container state should be 'Complete'
        var container Container
        err = arv.Get("containers", uuid, nil, &container)
        if container.State == "Running" {