8028: crunch-dispatch-local implementation
authorradhika <radhika@curoverse.com>
Wed, 13 Jan 2016 16:15:28 +0000 (11:15 -0500)
committerradhika <radhika@curoverse.com>
Wed, 20 Jan 2016 17:36:02 +0000 (12:36 -0500)
services/api/test/fixtures/containers.yml [new file with mode: 0644]
services/crunch-dispatch-local/.gitignore [new file with mode: 0644]
services/crunch-dispatch-local/crunch-dispatch-local.go [new file with mode: 0644]
services/crunch-dispatch-local/crunch-dispatch-local_test.go [new file with mode: 0644]

diff --git a/services/api/test/fixtures/containers.yml b/services/api/test/fixtures/containers.yml
new file mode 100644 (file)
index 0000000..22004b4
--- /dev/null
@@ -0,0 +1,25 @@
+queued:
+  uuid: zzzzz-dz642-queuedcontainer
+  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  state: Queued
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  container_image: test
+  cwd: test
+  output: test
+  output_path: test
+  command: ["echo", "hello"]
+
+completed:
+  uuid: zzzzz-dz642-compltcontainer
+  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  state: Complete
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  container_image: test
+  cwd: test
+  output: test
+  output_path: test
+  command: ["echo", "hello"]
diff --git a/services/crunch-dispatch-local/.gitignore b/services/crunch-dispatch-local/.gitignore
new file mode 100644 (file)
index 0000000..7c1070a
--- /dev/null
@@ -0,0 +1 @@
+crunch-dispatch-local
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
new file mode 100644 (file)
index 0000000..9fb4cb9
--- /dev/null
@@ -0,0 +1,176 @@
+package main
+
+import (
+       "flag"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "log"
+       "os"
+       "os/exec"
+       "time"
+)
+
+func main() {
+       err := doMain()
+       if err != nil {
+               log.Fatalf("%q", err)
+       }
+}
+
+var arv arvadosclient.ArvadosClient
+
+func doMain() error {
+       flags := flag.NewFlagSet("crunch-dispatch-local", flag.ExitOnError)
+
+       pollInterval := flags.Int(
+               "poll-interval",
+               10,
+               "Interval in seconds to poll for queued containers")
+
+       priorityPollInterval := flags.Int(
+               "container-priority-poll-interval",
+               60,
+               "Interval in seconds to check priority of a dispatched container")
+
+       crunchRunCommand := flags.String(
+               "crunch-run-command",
+               "/usr/bin/crunch-run",
+               "Crunch command to run container")
+
+       // Parse args; omit the first arg which is the command name
+       flags.Parse(os.Args[1:])
+
+       var err error
+       arv, err = arvadosclient.MakeArvadosClient()
+       if err != nil {
+               return err
+       }
+
+       // channel to terminate
+       doneProcessing = make(chan bool)
+
+       // run all queued containers
+       runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+       return nil
+}
+
+var doneProcessing chan bool
+
+// Poll for queued containers using pollInterval.
+// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+//
+// Any errors encountered are logged but the program would continue to run (not exit).
+// This is because, once one or more child processes are running,
+// we would need to wait for them complete.
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+       ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+
+       for {
+               select {
+               case <-ticker.C:
+                       dispatchLocal(priorityPollInterval, crunchRunCommand)
+               case <-doneProcessing:
+                       ticker.Stop()
+                       return
+               }
+       }
+}
+
+// Container data
+type Container struct {
+       UUID     string `json:"uuid"`
+       State    string `json:"state"`
+       Priority int    `json:"priority"`
+}
+
+// ContainerList is a list of the containers from api
+type ContainerList struct {
+       ItemsAvailable int         `json:"items_available"`
+       Items          []Container `json:"items"`
+}
+
+// Get the list of queued containers from API server and invoke run for each container.
+func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+       params := arvadosclient.Dict{
+               "filters": [][]string{[]string{"state", "=", "Queued"}},
+       }
+
+       var containers ContainerList
+       err := arv.List("containers", params, &containers)
+       if err != nil {
+               log.Printf("Error getting list of queued containers: %q", err)
+               return
+       }
+
+       for i := 0; i < containers.ItemsAvailable; i++ {
+               log.Printf("About to run queued container %v", containers.Items[i].UUID)
+               go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+       }
+}
+
+// Run queued container:
+// Set container state to locked (TBD)
+// Run container using the given crunch-run command
+// Set the container state to Running
+// If the container priority becomes zero while crunch job is still running, terminate it.
+func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+       cmd := exec.Command(crunchRunCommand, "--job", uuid)
+
+       cmd.Stdin = nil
+       cmd.Stderr = os.Stderr
+       cmd.Stdout = os.Stderr
+       if err := cmd.Start(); err != nil {
+               log.Printf("Error running container for %v: %q", uuid, err)
+               return
+       }
+
+       log.Printf("Started container run for %v", uuid)
+
+       err := arv.Update("containers", uuid,
+               arvadosclient.Dict{
+                       "container": arvadosclient.Dict{"state": "Running"}},
+               nil)
+       if err != nil {
+               log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+       }
+
+       // Terminate the runner if container priority becomes zero
+       priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+       go func() {
+               for {
+                       select {
+                       case <-priorityTicker.C:
+                               var container Container
+                               err := arv.Get("containers", uuid, nil, &container)
+                               if err != nil {
+                                       log.Printf("Error getting container info for %v: %q", uuid, err)
+                               } else {
+                                       if container.Priority == 0 {
+                                               priorityTicker.Stop()
+                                               cmd.Process.Kill()
+                                               return
+                                       }
+                               }
+                       }
+               }
+       }()
+
+       // Wait for the process to exit
+       if _, err := cmd.Process.Wait(); err != nil {
+               log.Printf("Error while waiting for process to finish for %v: %q", uuid, err)
+       }
+
+       priorityTicker.Stop()
+
+       var container Container
+       err = arv.Get("containers", uuid, nil, &container)
+       if container.State == "Running" {
+               log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
+               err = arv.Update("containers", uuid,
+                       arvadosclient.Dict{
+                               "container": arvadosclient.Dict{"state": "Complete"}},
+                       &container)
+               if err != nil {
+                       log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
+               }
+       }
+}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
new file mode 100644 (file)
index 0000000..1d526b9
--- /dev/null
@@ -0,0 +1,158 @@
+package main
+
+import (
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+
+       "io/ioutil"
+       "log"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "strings"
+       "testing"
+       "time"
+
+       . "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+var _ = Suite(&TestSuite{})
+var _ = Suite(&MockArvadosServerSuite{})
+
+type TestSuite struct{}
+type MockArvadosServerSuite struct{}
+
+var initialArgs []string
+
+func (s *TestSuite) SetUpSuite(c *C) {
+       initialArgs = os.Args
+       arvadostest.StartAPI()
+}
+
+func (s *TestSuite) TearDownSuite(c *C) {
+       arvadostest.StopAPI()
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+       args := []string{"crunch-dispatch-local"}
+       os.Args = args
+
+       var err error
+       arv, err = arvadosclient.MakeArvadosClient()
+       if err != nil {
+               c.Fatalf("Error making arvados client: %s", err)
+       }
+}
+
+func (s *TestSuite) TearDownTest(c *C) {
+       arvadostest.ResetEnv()
+       os.Args = initialArgs
+}
+
+func (s *MockArvadosServerSuite) TearDownTest(c *C) {
+       arvadostest.ResetEnv()
+}
+
+func (s *TestSuite) Test_doMain(c *C) {
+       args := []string{"-poll-interval", "1", "-container-priority-poll-interval", "1", "-crunch-run-command", "echo"}
+       os.Args = append(os.Args, args...)
+
+       go func() {
+               time.Sleep(2 * time.Second)
+               doneProcessing <- true
+       }()
+
+       err := doMain()
+       c.Check(err, IsNil)
+
+       // There should be no queued containers now
+       params := arvadosclient.Dict{
+               "filters": [][]string{[]string{"state", "=", "Queued"}},
+       }
+       var containers ContainerList
+       err = arv.List("containers", params, &containers)
+       c.Check(err, IsNil)
+       c.Assert(containers.ItemsAvailable, Equals, 0)
+
+       // Previously "Queued" container should now be in "Complete" state
+       var container Container
+       err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
+       c.Check(err, IsNil)
+       c.Check(container.State, Equals, "Complete")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
+       apiStubResponses := make(map[string]arvadostest.StubResponse)
+       apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
+
+       testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
+}
+
+func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
+       apiStubResponses := make(map[string]arvadostest.StubResponse)
+       apiStubResponses["/arvados/v1/containers"] =
+               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx1"}]}`)}
+       apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
+               arvadostest.StubResponse{500, string(`{}`)}
+
+       testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+}
+
+func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
+       apiStubResponses := make(map[string]arvadostest.StubResponse)
+       apiStubResponses["/arvados/v1/containers"] =
+               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2"}]}`)}
+       apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx2"] =
+               arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx2", "state":"Running", "priority":1}`)}
+
+       testWithServerStub(c, apiStubResponses, "echo",
+               "After crunch-run process termination, the state is still 'Running' for zzzzz-dz642-xxxxxxxxxxxxxx2")
+}
+
+func (s *MockArvadosServerSuite) Test_ErrorRunningContainer(c *C) {
+       apiStubResponses := make(map[string]arvadostest.StubResponse)
+       apiStubResponses["/arvados/v1/containers"] =
+               arvadostest.StubResponse{200, string(`{"items_available":1, "items":[{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3"}]}`)}
+       apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx3"] =
+               arvadostest.StubResponse{200, string(`{"uuid":"zzzzz-dz642-xxxxxxxxxxxxxx3", "state":"Running", "priority":1}`)}
+
+       testWithServerStub(c, apiStubResponses, "nosuchcommand", "Error running container for zzzzz-dz642-xxxxxxxxxxxxxx3")
+}
+
+func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+       apiStub := arvadostest.ServerStub{apiStubResponses}
+
+       api := httptest.NewServer(&apiStub)
+       defer api.Close()
+
+       arv = arvadosclient.ArvadosClient{
+               Scheme:    "http",
+               ApiServer: api.URL[7:],
+               ApiToken:  "abc123",
+               Client:    &http.Client{Transport: &http.Transport{}},
+               Retries:   0,
+       }
+
+       tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
+       c.Check(err, IsNil)
+       defer os.Remove(tempfile.Name())
+       log.SetOutput(tempfile)
+
+       go func() {
+               time.Sleep(1 * time.Second)
+               doneProcessing <- true
+       }()
+
+       runQueuedContainers(1, 1, crunchCmd)
+
+       // Give some time for run goroutine to complete
+       time.Sleep(5 * time.Second)
+
+       buf, _ := ioutil.ReadFile(tempfile.Name())
+       c.Check(strings.Contains(string(buf), expected), Equals, true)
+}