X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/e4b791cbe3536f08fbb5df10cf8de11c2d816e04..fdc9a9308c646d23ec50073833f141ceebf78613:/services/crunch-dispatch-local/crunch-dispatch-local.go diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index 9fb4cb9bad..be1fef86e1 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -6,6 +6,9 @@ import ( "log" "os" "os/exec" + "os/signal" + "sync" + "syscall" "time" ) @@ -16,7 +19,14 @@ func main() { } } -var arv arvadosclient.ArvadosClient +var ( + arv arvadosclient.ArvadosClient + runningCmds map[string]*exec.Cmd + runningCmdsMutex sync.Mutex + waitGroup sync.WaitGroup + doneProcessing chan bool + sigChan chan os.Signal +) func doMain() error { flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError) @@ -45,21 +55,41 @@ func doMain() error { return err } - // channel to terminate + // Channel to terminate doneProcessing = make(chan bool) - // run all queued containers + // Map of running crunch jobs + runningCmds = make(map[string]*exec.Cmd) + + // Graceful shutdown + sigChan = make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + go func(sig <-chan os.Signal) { + for sig := range sig { + log.Printf("Caught signal: %v", sig) + doneProcessing <- true + } + }(sigChan) + + // Run all queued containers runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand) + + // Finished dispatching; interrupt any crunch jobs that are still running + for _, cmd := range runningCmds { + cmd.Process.Signal(os.Interrupt) + } + + // Wait for all running crunch jobs to complete / terminate + waitGroup.Wait() + return nil } -var doneProcessing chan bool - // Poll for queued containers using pollInterval. // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers. // // Any errors encountered are logged but the program would continue to run (not exit). -// This is because, once one or more child processes are running, +// This is because, once one or more crunch jobs are running, // we would need to wait for them complete. func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) { ticker := time.NewTicker(time.Duration(pollInterval) * time.Second) @@ -84,8 +114,7 @@ type Container struct { // ContainerList is a list of the containers from api type ContainerList struct { - ItemsAvailable int `json:"items_available"` - Items []Container `json:"items"` + Items []Container `json:"items"` } // Get the list of queued containers from API server and invoke run for each container. @@ -101,8 +130,9 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) { return } - for i := 0; i < containers.ItemsAvailable; i++ { + for i := 0; i < len(containers.Items); i++ { log.Printf("About to run queued container %v", containers.Items[i].UUID) + // Run the container go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval) } } @@ -113,7 +143,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) { // Set the container state to Running // If the container priority becomes zero while crunch job is still running, terminate it. func run(uuid string, crunchRunCommand string, priorityPollInterval int) { - cmd := exec.Command(crunchRunCommand, "--job", uuid) + cmd := exec.Command(crunchRunCommand, uuid) cmd.Stdin = nil cmd.Stderr = os.Stderr @@ -123,8 +153,18 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) { return } + // Add this crunch job to the list of runningCmds + runningCmdsMutex.Lock() + runningCmds[uuid] = cmd + runningCmdsMutex.Unlock() + log.Printf("Started container run for %v", uuid) + // Add this crunch job to waitGroup + waitGroup.Add(1) + defer waitGroup.Done() + + // Update container status to Running err := arv.Update("containers", uuid, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": "Running"}}, @@ -133,34 +173,36 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) { log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err) } - // Terminate the runner if container priority becomes zero + // A goroutine to terminate the runner if container priority becomes zero priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second) go func() { - for { - select { - case <-priorityTicker.C: - var container Container - err := arv.Get("containers", uuid, nil, &container) - if err != nil { - log.Printf("Error getting container info for %v: %q", uuid, err) - } else { - if container.Priority == 0 { - priorityTicker.Stop() - cmd.Process.Kill() - return - } + for _ = range priorityTicker.C { + var container Container + err := arv.Get("containers", uuid, nil, &container) + if err != nil { + log.Printf("Error getting container info for %v: %q", uuid, err) + } else { + if container.Priority == 0 { + priorityTicker.Stop() + cmd.Process.Signal(os.Interrupt) } } } }() - // Wait for the process to exit + // Wait for the crunch job to exit if _, err := cmd.Process.Wait(); err != nil { - log.Printf("Error while waiting for process to finish for %v: %q", uuid, err) + log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err) } + // Remove the crunch job to runningCmds + runningCmdsMutex.Lock() + delete(runningCmds, uuid) + runningCmdsMutex.Unlock() + priorityTicker.Stop() + // The container state should be 'Complete' var container Container err = arv.Get("containers", uuid, nil, &container) if container.State == "Running" { @@ -168,7 +210,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) { err = arv.Update("containers", uuid, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": "Complete"}}, - &container) + nil) if err != nil { log.Printf("Error updating container state to Complete for %v: %q", uuid, err) }