Merge branch '9066-max-requests'
[arvados.git] / services / crunch-dispatch-local / crunch-dispatch-local.go
index 9fb4cb9bad7e4cf9e1e6675ef65596880450ef96..e05c0c5da4439e44931837ea5a259885624b80d8 100644 (file)
@@ -6,6 +6,9 @@ import (
        "log"
        "os"
        "os/exec"
+       "os/signal"
+       "sync"
+       "syscall"
        "time"
 )
 
@@ -16,7 +19,14 @@ func main() {
        }
 }
 
-var arv arvadosclient.ArvadosClient
+var (
+       arv              arvadosclient.ArvadosClient
+       runningCmds      map[string]*exec.Cmd
+       runningCmdsMutex sync.Mutex
+       waitGroup        sync.WaitGroup
+       doneProcessing   chan bool
+       sigChan          chan os.Signal
+)
 
 func doMain() error {
        flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
@@ -45,21 +55,41 @@ func doMain() error {
                return err
        }
 
-       // channel to terminate
+       // Channel to terminate
        doneProcessing = make(chan bool)
 
-       // run all queued containers
+       // Map of running crunch jobs
+       runningCmds = make(map[string]*exec.Cmd)
+
+       // Graceful shutdown
+       sigChan = make(chan os.Signal, 1)
+       signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
+       go func(sig <-chan os.Signal) {
+               for sig := range sig {
+                       log.Printf("Caught signal: %v", sig)
+                       doneProcessing <- true
+               }
+       }(sigChan)
+
+       // Run all queued containers
        runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+
+       // Finished dispatching; interrupt any crunch jobs that are still running
+       for _, cmd := range runningCmds {
+               cmd.Process.Signal(os.Interrupt)
+       }
+
+       // Wait for all running crunch jobs to complete / terminate
+       waitGroup.Wait()
+
        return nil
 }
 
-var doneProcessing chan bool
-
 // Poll for queued containers using pollInterval.
 // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
 //
 // Any errors encountered are logged but the program would continue to run (not exit).
-// This is because, once one or more child processes are running,
+// This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
        ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
@@ -84,8 +114,7 @@ type Container struct {
 
 // ContainerList is a list of the containers from api
 type ContainerList struct {
-       ItemsAvailable int         `json:"items_available"`
-       Items          []Container `json:"items"`
+       Items []Container `json:"items"`
 }
 
 // Get the list of queued containers from API server and invoke run for each container.
@@ -101,8 +130,9 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
                return
        }
 
-       for i := 0; i < containers.ItemsAvailable; i++ {
+       for i := 0; i < len(containers.Items); i++ {
                log.Printf("About to run queued container %v", containers.Items[i].UUID)
+               // Run the container
                go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
        }
 }
@@ -113,7 +143,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
 func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
-       cmd := exec.Command(crunchRunCommand, "--job", uuid)
+       cmd := exec.Command(crunchRunCommand, uuid)
 
        cmd.Stdin = nil
        cmd.Stderr = os.Stderr
@@ -123,8 +153,18 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                return
        }
 
+       // Add this crunch job to the list of runningCmds
+       runningCmdsMutex.Lock()
+       runningCmds[uuid] = cmd
+       runningCmdsMutex.Unlock()
+
        log.Printf("Started container run for %v", uuid)
 
+       // Add this crunch job to waitGroup
+       waitGroup.Add(1)
+       defer waitGroup.Done()
+
+       // Update container status to Running
        err := arv.Update("containers", uuid,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": "Running"}},
@@ -133,34 +173,36 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
        }
 
-       // Terminate the runner if container priority becomes zero
+       // A goroutine to terminate the runner if container priority becomes zero
        priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
        go func() {
-               for {
-                       select {
-                       case <-priorityTicker.C:
-                               var container Container
-                               err := arv.Get("containers", uuid, nil, &container)
-                               if err != nil {
-                                       log.Printf("Error getting container info for %v: %q", uuid, err)
-                               } else {
-                                       if container.Priority == 0 {
-                                               priorityTicker.Stop()
-                                               cmd.Process.Kill()
-                                               return
-                                       }
+               for _ = range priorityTicker.C {
+                       var container Container
+                       err := arv.Get("containers", uuid, nil, &container)
+                       if err != nil {
+                               log.Printf("Error getting container info for %v: %q", uuid, err)
+                       } else {
+                               if container.Priority == 0 {
+                                       priorityTicker.Stop()
+                                       cmd.Process.Signal(os.Interrupt)
                                }
                        }
                }
        }()
 
-       // Wait for the process to exit
+       // Wait for the crunch job to exit
        if _, err := cmd.Process.Wait(); err != nil {
-               log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+               log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
        }
 
+       // Remove the crunch job to runningCmds
+       runningCmdsMutex.Lock()
+       delete(runningCmds, uuid)
+       runningCmdsMutex.Unlock()
+
        priorityTicker.Stop()
 
+       // The container state should be 'Complete'
        var container Container
        err = arv.Get("containers", uuid, nil, &container)
        if container.State == "Running" {
@@ -168,9 +210,11 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                err = arv.Update("containers", uuid,
                        arvadosclient.Dict{
                                "container": arvadosclient.Dict{"state": "Complete"}},
-                       &container)
+                       nil)
                if err != nil {
                        log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
                }
        }
+
+       log.Printf("Finished container run for %v", uuid)
 }