From 8e5206a5b1910ee7bc1d0a45af754ce507a7f237 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 5 May 2016 17:50:44 -0400 Subject: [PATCH] 8128: Update crunch-dispatch-local to use new Locked state. --- .../crunch-dispatch-local.go | 136 +++++++++++------- .../crunch-dispatch-local_test.go | 19 ++- 2 files changed, 91 insertions(+), 64 deletions(-) diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index e05c0c5da4..848d723380 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -72,7 +72,7 @@ func doMain() error { }(sigChan) // Run all queued containers - runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand) + runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand) // Finished dispatching; interrupt any crunch jobs that are still running for _, cmd := range runningCmds { @@ -91,8 +91,8 @@ func doMain() error { // Any errors encountered are logged but the program would continue to run (not exit). // This is because, once one or more crunch jobs are running, // we would need to wait for them complete. -func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) { - ticker := time.NewTicker(time.Duration(pollInterval) * time.Second) +func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) { + ticker := time.NewTicker(pollInterval) for { select { @@ -107,9 +107,10 @@ func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunComman // Container data type Container struct { - UUID string `json:"uuid"` - State string `json:"state"` - Priority int `json:"priority"` + UUID string `json:"uuid"` + State string `json:"state"` + Priority int `json:"priority"` + LockedByUUID string `json:"locked_by_uuid"` } // ContainerList is a list of the containers from api @@ -118,7 +119,7 @@ type ContainerList struct { } // Get the list of queued containers from API server and invoke run for each container. -func dispatchLocal(priorityPollInterval int, crunchRunCommand string) { +func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) { params := arvadosclient.Dict{ "filters": [][]string{[]string{"state", "=", "Queued"}}, } @@ -133,88 +134,117 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) { for i := 0; i < len(containers.Items); i++ { log.Printf("About to run queued container %v", containers.Items[i].UUID) // Run the container - go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval) + go run(containers.Items[i].UUID, crunchRunCommand, pollInterval) } } +func updateState(uuid, newState string) error { + err := arv.Update("containers", uuid, + arvadosclient.Dict{ + "container": arvadosclient.Dict{"state": newState}}, + nil) + if err != nil { + log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err) + } + return err +} + // Run queued container: -// Set container state to locked (TBD) +// Set container state to Locked // Run container using the given crunch-run command // Set the container state to Running // If the container priority becomes zero while crunch job is still running, terminate it. -func run(uuid string, crunchRunCommand string, priorityPollInterval int) { - cmd := exec.Command(crunchRunCommand, uuid) +func run(uuid string, crunchRunCommand string, pollInterval time.Duration) { + if err := updateState(uuid, "Locked"); err != nil { + return + } + cmd := exec.Command(crunchRunCommand, uuid) cmd.Stdin = nil cmd.Stderr = os.Stderr cmd.Stdout = os.Stderr + + // Add this crunch job to the list of runningCmds only if we + // succeed in starting crunch-run. + runningCmdsMutex.Lock() if err := cmd.Start(); err != nil { - log.Printf("Error running container for %v: %q", uuid, err) + log.Printf("Error starting crunch-run for %v: %q", uuid, err) + runningCmdsMutex.Unlock() + updateState(uuid, "Queued") return } - - // Add this crunch job to the list of runningCmds - runningCmdsMutex.Lock() runningCmds[uuid] = cmd runningCmdsMutex.Unlock() - log.Printf("Started container run for %v", uuid) + defer func() { + setFinalState(uuid) + + // Remove the crunch job from runningCmds + runningCmdsMutex.Lock() + delete(runningCmds, uuid) + runningCmdsMutex.Unlock() + }() + + log.Printf("Starting container %v", uuid) // Add this crunch job to waitGroup waitGroup.Add(1) defer waitGroup.Done() - // Update container status to Running - err := arv.Update("containers", uuid, - arvadosclient.Dict{ - "container": arvadosclient.Dict{"state": "Running"}}, - nil) - if err != nil { - log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err) - } + updateState(uuid, "Running") + + cmdExited := make(chan struct{}) - // A goroutine to terminate the runner if container priority becomes zero - priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second) + // Kill the child process if container priority changes to zero go func() { - for _ = range priorityTicker.C { + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for { + select { + case <-cmdExited: + return + case <-ticker.C: + } var container Container err := arv.Get("containers", uuid, nil, &container) if err != nil { - log.Printf("Error getting container info for %v: %q", uuid, err) - } else { - if container.Priority == 0 { - priorityTicker.Stop() - cmd.Process.Signal(os.Interrupt) - } + log.Printf("Error getting container %v: %q", uuid, err) + continue + } + if container.Priority == 0 { + log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid) + cmd.Process.Signal(os.Interrupt) } } }() - // Wait for the crunch job to exit + // Wait for crunch-run to exit if _, err := cmd.Process.Wait(); err != nil { log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err) } + close(cmdExited) - // Remove the crunch job to runningCmds - runningCmdsMutex.Lock() - delete(runningCmds, uuid) - runningCmdsMutex.Unlock() - - priorityTicker.Stop() + log.Printf("Finished container run for %v", uuid) +} - // The container state should be 'Complete' +func setFinalState(uuid string) { + // The container state should now be 'Complete' if everything + // went well. If it started but crunch-run didn't change its + // final state to 'Running', fix that now. If it never even + // started, cancel it as unrunnable. (TODO: Requeue instead, + // and fix tests so they can tell something happened even if + // the final state is Queued.) var container Container - err = arv.Get("containers", uuid, nil, &container) - if container.State == "Running" { - log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid) - err = arv.Update("containers", uuid, - arvadosclient.Dict{ - "container": arvadosclient.Dict{"state": "Complete"}}, - nil) - if err != nil { - log.Printf("Error updating container state to Complete for %v: %q", uuid, err) - } + err := arv.Get("containers", uuid, nil, &container) + if err != nil { + log.Printf("Error getting final container state: %v", err) + } + fixState := map[string]string{ + "Running": "Complete", + "Locked": "Cancelled", + } + if newState, ok := fixState[container.State]; ok { + log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState) + updateState(uuid, newState) } - - log.Printf("Finished container run for %v", uuid) } diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go index 3ec1e2ec6b..e3ab3a4e1d 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go @@ -4,12 +4,11 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" - "io/ioutil" + "bytes" "log" "net/http" "net/http/httptest" "os" - "strings" "syscall" "testing" "time" @@ -101,7 +100,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) { apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] = arvadostest.StubResponse{500, string(`{}`)} - testWithServerStub(c, apiStubResponses, "echo", "Error updating container state") + testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state") } func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) { @@ -122,7 +121,7 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) { apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] = arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)} - testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3") + testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3") } func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) { @@ -139,21 +138,19 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon Retries: 0, } - tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file") - c.Check(err, IsNil) - defer os.Remove(tempfile.Name()) - log.SetOutput(tempfile) + buf := bytes.NewBuffer(nil) + log.SetOutput(buf) + defer log.SetOutput(os.Stderr) go func() { time.Sleep(2 * time.Second) sigChan <- syscall.SIGTERM }() - runQueuedContainers(1, 1, crunchCmd) + runQueuedContainers(time.Second, time.Second, crunchCmd) // Wait for all running crunch jobs to complete / terminate waitGroup.Wait() - buf, _ := ioutil.ReadFile(tempfile.Name()) - c.Check(strings.Contains(string(buf), expected), Equals, true) + c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`) } -- 2.30.2