X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f87bda0e69ec38a7485f250328c32643c5587fd8..090e546b14e13798d15ac61728da05a0d9fb1317:/services/crunch-dispatch-local/crunch-dispatch-local.go diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index eb8550eceb..e05c0c5da4 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -19,8 +19,14 @@ func main() { } } -var arv arvadosclient.ArvadosClient -var runningCmds map[string]*exec.Cmd +var ( + arv arvadosclient.ArvadosClient + runningCmds map[string]*exec.Cmd + runningCmdsMutex sync.Mutex + waitGroup sync.WaitGroup + doneProcessing chan bool + sigChan chan os.Signal +) func doMain() error { flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError) @@ -49,44 +55,41 @@ func doMain() error { return err } + // Channel to terminate + doneProcessing = make(chan bool) + + // Map of running crunch jobs runningCmds = make(map[string]*exec.Cmd) + + // Graceful shutdown sigChan = make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func(sig <-chan os.Signal) { - var wg sync.WaitGroup for sig := range sig { + log.Printf("Caught signal: %v", sig) doneProcessing <- true - caught := sig - for uuid, cmd := range runningCmds { - go func(uuid string) { - wg.Add(1) - defer wg.Done() - cmd.Process.Signal(caught) - if _, err := cmd.Process.Wait(); err != nil { - log.Printf("Error while waiting for process to finish for %v: %q", uuid, err) - } - }(uuid) - } } - wg.Wait() }(sigChan) - // channel to terminate - doneProcessing = make(chan bool) - - // run all queued containers + // Run all queued containers runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand) + + // Finished dispatching; interrupt any crunch jobs that are still running + for _, cmd := range runningCmds { + cmd.Process.Signal(os.Interrupt) + } + + // Wait for all running crunch jobs to complete / terminate + waitGroup.Wait() + return nil } -var doneProcessing chan bool -var sigChan chan os.Signal - // Poll for queued containers using pollInterval. // Invoke dispatchLocal for each ticker cycle, which will run all the queued containers. // // Any errors encountered are logged but the program would continue to run (not exit). -// This is because, once one or more child processes are running, +// This is because, once one or more crunch jobs are running, // we would need to wait for them complete. func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) { ticker := time.NewTicker(time.Duration(pollInterval) * time.Second) @@ -129,6 +132,7 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) { for i := 0; i < len(containers.Items); i++ { log.Printf("About to run queued container %v", containers.Items[i].UUID) + // Run the container go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval) } } @@ -149,10 +153,18 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) { return } + // Add this crunch job to the list of runningCmds + runningCmdsMutex.Lock() runningCmds[uuid] = cmd + runningCmdsMutex.Unlock() log.Printf("Started container run for %v", uuid) + // Add this crunch job to waitGroup + waitGroup.Add(1) + defer waitGroup.Done() + + // Update container status to Running err := arv.Update("containers", uuid, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": "Running"}}, @@ -161,36 +173,36 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) { log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err) } - // Terminate the runner if container priority becomes zero + // A goroutine to terminate the runner if container priority becomes zero priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second) go func() { - for { - select { - case <-priorityTicker.C: - var container Container - err := arv.Get("containers", uuid, nil, &container) - if err != nil { - log.Printf("Error getting container info for %v: %q", uuid, err) - } else { - if container.Priority == 0 { - priorityTicker.Stop() - cmd.Process.Signal(os.Interrupt) - delete(runningCmds, uuid) - return - } + for _ = range priorityTicker.C { + var container Container + err := arv.Get("containers", uuid, nil, &container) + if err != nil { + log.Printf("Error getting container info for %v: %q", uuid, err) + } else { + if container.Priority == 0 { + priorityTicker.Stop() + cmd.Process.Signal(os.Interrupt) } } } }() - // Wait for the process to exit + // Wait for the crunch job to exit if _, err := cmd.Process.Wait(); err != nil { - log.Printf("Error while waiting for process to finish for %v: %q", uuid, err) + log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err) } + + // Remove the crunch job to runningCmds + runningCmdsMutex.Lock() delete(runningCmds, uuid) + runningCmdsMutex.Unlock() priorityTicker.Stop() + // The container state should be 'Complete' var container Container err = arv.Get("containers", uuid, nil, &container) if container.State == "Running" { @@ -203,4 +215,6 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) { log.Printf("Error updating container state to Complete for %v: %q", uuid, err) } } + + log.Printf("Finished container run for %v", uuid) }