8128: Update crunch-dispatch-local to use new Locked state.
authorTom Clegg <tom@curoverse.com>
Thu, 5 May 2016 21:50:44 +0000 (17:50 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 12 May 2016 15:33:44 +0000 (11:33 -0400)
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go

index e05c0c5da4439e44931837ea5a259885624b80d8..848d723380da29d9a9e3337aa654e097fde782e1 100644 (file)
@@ -72,7 +72,7 @@ func doMain() error {
        }(sigChan)
 
        // Run all queued containers
-       runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+       runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
 
        // Finished dispatching; interrupt any crunch jobs that are still running
        for _, cmd := range runningCmds {
@@ -91,8 +91,8 @@ func doMain() error {
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
-       ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
+       ticker := time.NewTicker(pollInterval)
 
        for {
                select {
@@ -107,9 +107,10 @@ func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunComman
 
 // Container data
 type Container struct {
-       UUID     string `json:"uuid"`
-       State    string `json:"state"`
-       Priority int    `json:"priority"`
+       UUID         string `json:"uuid"`
+       State        string `json:"state"`
+       Priority     int    `json:"priority"`
+       LockedByUUID string `json:"locked_by_uuid"`
 }
 
 // ContainerList is a list of the containers from api
@@ -118,7 +119,7 @@ type ContainerList struct {
 }
 
 // Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
        params := arvadosclient.Dict{
                "filters": [][]string{[]string{"state", "=", "Queued"}},
        }
@@ -133,88 +134,117 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
        for i := 0; i < len(containers.Items); i++ {
                log.Printf("About to run queued container %v", containers.Items[i].UUID)
                // Run the container
-               go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+               go run(containers.Items[i].UUID, crunchRunCommand, pollInterval)
        }
 }
 
+func updateState(uuid, newState string) error {
+       err := arv.Update("containers", uuid,
+               arvadosclient.Dict{
+                       "container": arvadosclient.Dict{"state": newState}},
+               nil)
+       if err != nil {
+               log.Printf("Error updating container %s to '%s' state: %q", uuid, newState, err)
+       }
+       return err
+}
+
 // Run queued container:
-// Set container state to locked (TBD)
+// Set container state to Locked
 // Run container using the given crunch-run command
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
-       cmd := exec.Command(crunchRunCommand, uuid)
+func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
+       if err := updateState(uuid, "Locked"); err != nil {
+               return
+       }
 
+       cmd := exec.Command(crunchRunCommand, uuid)
        cmd.Stdin = nil
        cmd.Stderr = os.Stderr
        cmd.Stdout = os.Stderr
+
+       // Add this crunch job to the list of runningCmds only if we
+       // succeed in starting crunch-run.
+       runningCmdsMutex.Lock()
        if err := cmd.Start(); err != nil {
-               log.Printf("Error running container for %v: %q", uuid, err)
+               log.Printf("Error starting crunch-run for %v: %q", uuid, err)
+               runningCmdsMutex.Unlock()
+               updateState(uuid, "Queued")
                return
        }
-
-       // Add this crunch job to the list of runningCmds
-       runningCmdsMutex.Lock()
        runningCmds[uuid] = cmd
        runningCmdsMutex.Unlock()
 
-       log.Printf("Started container run for %v", uuid)
+       defer func() {
+               setFinalState(uuid)
+
+               // Remove the crunch job from runningCmds
+               runningCmdsMutex.Lock()
+               delete(runningCmds, uuid)
+               runningCmdsMutex.Unlock()
+       }()
+
+       log.Printf("Starting container %v", uuid)
 
        // Add this crunch job to waitGroup
        waitGroup.Add(1)
        defer waitGroup.Done()
 
-       // Update container status to Running
-       err := arv.Update("containers", uuid,
-               arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"state": "Running"}},
-               nil)
-       if err != nil {
-               log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
-       }
+       updateState(uuid, "Running")
+
+       cmdExited := make(chan struct{})
 
-       // A goroutine to terminate the runner if container priority becomes zero
-       priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+       // Kill the child process if container priority changes to zero
        go func() {
-               for _ = range priorityTicker.C {
+               ticker := time.NewTicker(pollInterval)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-cmdExited:
+                               return
+                       case <-ticker.C:
+                       }
                        var container Container
                        err := arv.Get("containers", uuid, nil, &container)
                        if err != nil {
-                               log.Printf("Error getting container info for %v: %q", uuid, err)
-                       } else {
-                               if container.Priority == 0 {
-                                       priorityTicker.Stop()
-                                       cmd.Process.Signal(os.Interrupt)
-                               }
+                               log.Printf("Error getting container %v: %q", uuid, err)
+                               continue
+                       }
+                       if container.Priority == 0 {
+                               log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+                               cmd.Process.Signal(os.Interrupt)
                        }
                }
        }()
 
-       // Wait for the crunch job to exit
+       // Wait for crunch-run to exit
        if _, err := cmd.Process.Wait(); err != nil {
                log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
        }
+       close(cmdExited)
 
-       // Remove the crunch job to runningCmds
-       runningCmdsMutex.Lock()
-       delete(runningCmds, uuid)
-       runningCmdsMutex.Unlock()
-
-       priorityTicker.Stop()
+       log.Printf("Finished container run for %v", uuid)
+}
 
-       // The container state should be 'Complete'
+func setFinalState(uuid string) {
+       // The container state should now be 'Complete' if everything
+       // went well. If it started but crunch-run didn't change its
+       // final state to 'Running', fix that now. If it never even
+       // started, cancel it as unrunnable. (TODO: Requeue instead,
+       // and fix tests so they can tell something happened even if
+       // the final state is Queued.)
        var container Container
-       err = arv.Get("containers", uuid, nil, &container)
-       if container.State == "Running" {
-               log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
-               err = arv.Update("containers", uuid,
-                       arvadosclient.Dict{
-                               "container": arvadosclient.Dict{"state": "Complete"}},
-                       nil)
-               if err != nil {
-                       log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
-               }
+       err := arv.Get("containers", uuid, nil, &container)
+       if err != nil {
+               log.Printf("Error getting final container state: %v", err)
+       }
+       fixState := map[string]string{
+               "Running": "Complete",
+               "Locked": "Cancelled",
+       }
+       if newState, ok := fixState[container.State]; ok {
+               log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
+               updateState(uuid, newState)
        }
-
-       log.Printf("Finished container run for %v", uuid)
 }
index 3ec1e2ec6b41c4759bd6f75e6cddc847254633ab..e3ab3a4e1d55ddc5df609fc4273cdfc57e63430a 100644 (file)
@@ -4,12 +4,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 
-       "io/ioutil"
+       "bytes"
        "log"
        "net/http"
        "net/http/httptest"
        "os"
-       "strings"
        "syscall"
        "testing"
        "time"
@@ -101,7 +100,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
                arvadostest.StubResponse{500, string(`{}`)}
 
-       testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+       testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
 }
 
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -122,7 +121,7 @@ func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
        apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
                arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
 
-       testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3")
+       testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error starting crunch-run for zzzzz-dz642-xxxxxxxxxxxxxx3")
 }
 
 func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -139,21 +138,19 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                Retries:   0,
        }
 
-       tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
-       c.Check(err, IsNil)
-       defer os.Remove(tempfile.Name())
-       log.SetOutput(tempfile)
+       buf := bytes.NewBuffer(nil)
+       log.SetOutput(buf)
+       defer log.SetOutput(os.Stderr)
 
        go func() {
                time.Sleep(2 * time.Second)
                sigChan <- syscall.SIGTERM
        }()
 
-       runQueuedContainers(1, 1, crunchCmd)
+       runQueuedContainers(time.Second, time.Second, crunchCmd)
 
        // Wait for all running crunch jobs to complete / terminate
        waitGroup.Wait()
 
-       buf, _ := ioutil.ReadFile(tempfile.Name())
-       c.Check(strings.Contains(string(buf), expected), Equals, true)
+       c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }