8128: Update crunch-dispatch-slurm to use new Locked state.
authorTom Clegg <tom@curoverse.com>
Thu, 5 May 2016 21:15:51 +0000 (17:15 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 12 May 2016 15:32:46 +0000 (11:32 -0400)
sdk/go/arvadostest/fixtures.go
services/api/app/models/container.rb
services/api/test/fixtures/api_client_authorizations.yml
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index bebef79074ebb1758f3f5298502f235a8a02f1e1..84a3bff06c0f09e3d326925d89b30ef4deaf0804 100644 (file)
@@ -13,6 +13,9 @@ const (
        FooBarDirCollection   = "zzzzz-4zz18-foonbarfilesdir"
        FooPdh                = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
        HelloWorldPdh         = "55713e6a34081eb03609e7ad5fcad129+62"
+
+       Dispatch1Token    = "kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw"
+       Dispatch1AuthUUID = "zzzzz-gj3su-k9dvestay1plssr"
 )
 
 // A valid manifest designed to test various edge cases and parsing
index 5856eddaf6715e881e6818fd72edb1bcdae02e7f..a0145523d4af3377d8426bd94c10d7578c7963a6 100644 (file)
@@ -27,6 +27,7 @@ class Container < ArvadosModel
     t.add :environment
     t.add :exit_code
     t.add :finished_at
+    t.add :locked_by_uuid
     t.add :log
     t.add :mounts
     t.add :output
@@ -74,6 +75,13 @@ class Container < ArvadosModel
     end
   end
 
+  def locked_by_uuid
+    # Stub to permit a single dispatch to recognize its own containers
+    if current_user.is_admin
+      Thread.current[:api_client_authorization].andand.uuid
+    end
+  end
+
   protected
 
   def fill_field_defaults
index f99a9fb941f1b26f44d2d4b4035a28afd84fbc08..485b6d1d3c59abd039b58f94442583e89fdb2a99 100644 (file)
@@ -271,3 +271,9 @@ fuse:
   api_token: 4nagbkv8eap0uok7pxm72nossq5asihls3yn5p4xmvqx5t5e7p
   expires_at: 2038-01-01 00:00:00
 
+dispatch1:
+  uuid: zzzzz-gj3su-k9dvestay1plssr
+  api_client: trusted
+  user: system_user
+  api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
+  expires_at: 2038-01-01 00:00:00
\ No newline at end of file
index f45c2a106a07a61f8c9af871936b51f83c178e12..8eefd35ec25081f0a157e4674e60d14681078c06 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bufio"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -86,6 +87,15 @@ func doMain() error {
        return nil
 }
 
+type apiClientAuthorization struct {
+       UUID     string `json:"uuid"`
+       APIToken string `json:"api_token"`
+}
+
+type apiClientAuthorizationList struct {
+       Items []apiClientAuthorization `json:"items"`
+}
+
 // Poll for queued containers using pollInterval.
 // Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
 //
@@ -93,12 +103,21 @@ func doMain() error {
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
 func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
-       ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+       var authList apiClientAuthorizationList
+       err := arv.List("api_client_authorizations", map[string]interface{}{
+               "filters": [][]interface{}{{"api_token", "=", arv.ApiToken}},
+       }, &authList)
+       if err != nil || len(authList.Items) != 1 {
+               log.Printf("Error getting my token UUID: %v (%d)", err, len(authList.Items))
+               return
+       }
+       auth := authList.Items[0]
 
+       ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
        for {
                select {
                case <-ticker.C:
-                       dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
+                       dispatchSlurm(auth, time.Duration(priorityPollInterval)*time.Second, crunchRunCommand, finishCommand)
                case <-doneProcessing:
                        ticker.Stop()
                        return
@@ -112,6 +131,7 @@ type Container struct {
        State              string           `json:"state"`
        Priority           int              `json:"priority"`
        RuntimeConstraints map[string]int64 `json:"runtime_constraints"`
+       LockedByUUID       string           `json:"locked_by_uuid"`
 }
 
 // ContainerList is a list of the containers from api
@@ -119,10 +139,11 @@ type ContainerList struct {
        Items []Container `json:"items"`
 }
 
-// Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
+// Get the list of queued containers from API server and invoke run
+// for each container.
+func dispatchSlurm(auth apiClientAuthorization, pollInterval time.Duration, crunchRunCommand, finishCommand string) {
        params := arvadosclient.Dict{
-               "filters": [][]string{[]string{"state", "=", "Queued"}},
+               "filters": [][]interface{}{{"state", "in", []string{"Queued", "Locked"}}},
        }
 
        var containers ContainerList
@@ -132,16 +153,33 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand str
                return
        }
 
-       for i := 0; i < len(containers.Items); i++ {
-               log.Printf("About to submit queued container %v", containers.Items[i].UUID)
-               // Run the container
-               go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
+       for _, container := range containers.Items {
+               if container.State == "Locked" {
+                       if container.LockedByUUID != auth.UUID {
+                               // Locked by a different dispatcher
+                               continue
+                       } else if checkMine(container.UUID) {
+                               // I already have a goroutine running
+                               // for this container: it just hasn't
+                               // gotten past Locked state yet.
+                               continue
+                       }
+                       log.Printf("WARNING: found container %s already locked by my token %s, but I didn't submit it. "+
+                               "Assuming it was left behind by a previous dispatch process, and waiting for it to finish.",
+                               container.UUID, auth.UUID)
+                       setMine(container.UUID, true)
+                       go func() {
+                               waitContainer(container, pollInterval)
+                               setMine(container.UUID, false)
+                       }()
+               }
+               go run(container, crunchRunCommand, finishCommand, pollInterval)
        }
 }
 
 // sbatchCmd
 func sbatchFunc(container Container) *exec.Cmd {
-       memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"]*1048576)))
+       memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
        return exec.Command("sbatch", "--share", "--parsable",
                "--job-name="+container.UUID,
                "--mem-per-cpu="+strconv.Itoa(int(memPerCPU)),
@@ -162,17 +200,19 @@ var striggerCmd = striggerFunc
 func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
        submitErr = nil
 
-       // Mark record as complete if anything errors out.
        defer func() {
-               if submitErr != nil {
-                       // This really should be an "Error" state, see #8018
-                       updateErr := arv.Update("containers", container.UUID,
-                               arvadosclient.Dict{
-                                       "container": arvadosclient.Dict{"state": "Complete"}},
-                               nil)
-                       if updateErr != nil {
-                               log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
-                       }
+               // If we didn't get as far as submitting a slurm job,
+               // unlock the container and return it to the queue.
+               if submitErr == nil {
+                       // OK, no cleanup needed
+                       return
+               }
+               err := arv.Update("containers", container.UUID,
+                       arvadosclient.Dict{
+                               "container": arvadosclient.Dict{"state": "Queued"}},
+                       nil)
+               if err != nil {
+                       log.Printf("Error unlocking container %s: %v", container.UUID, err)
                }
        }()
 
@@ -205,6 +245,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
        stdoutChan := make(chan []byte)
        go func() {
                b, _ := ioutil.ReadAll(stdoutReader)
+               stdoutReader.Close()
                stdoutChan <- b
                close(stdoutChan)
        }()
@@ -212,6 +253,7 @@ func submit(container Container, crunchRunCommand string) (jobid string, submitE
        stderrChan := make(chan []byte)
        go func() {
                b, _ := ioutil.ReadAll(stderrReader)
+               stderrReader.Close()
                stderrChan <- b
                close(stderrChan)
        }()
@@ -246,18 +288,35 @@ func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiTok
        err := cmd.Run()
        if err != nil {
                log.Printf("While setting up strigger: %v", err)
+               // BUG: we drop the error here and forget about it. A
+               // human has to notice the container is stuck in
+               // Running state, and fix it manually.
        }
 }
 
-// Run a queued container.
-// Set container state to locked (TBD)
-// Submit job to slurm to execute crunch-run command for the container
-// If the container priority becomes zero while crunch job is still running, cancel the job.
-func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
+// Run a queued container: [1] Set container state to locked. [2]
+// Execute crunch-run as a slurm batch job. [3] waitContainer().
+func run(container Container, crunchRunCommand, finishCommand string, pollInterval time.Duration) {
+       setMine(container.UUID, true)
+       defer setMine(container.UUID, false)
+
+       // Update container status to Locked. This will fail if
+       // another dispatcher (token) has already locked it. It will
+       // succeed if *this* dispatcher has already locked it.
+       err := arv.Update("containers", container.UUID,
+               arvadosclient.Dict{
+                       "container": arvadosclient.Dict{"state": "Locked"}},
+               nil)
+       if err != nil {
+               log.Printf("Error updating container state to 'Locked' for %v: %q", container.UUID, err)
+               return
+       }
+
+       log.Printf("About to submit queued container %v", container.UUID)
 
        jobid, err := submit(container, crunchRunCommand)
        if err != nil {
-               log.Printf("Error queuing container run: %v", err)
+               log.Printf("Error submitting container %s to slurm: %v", container.UUID, err)
                return
        }
 
@@ -267,9 +326,9 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
        }
        finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 
-       // Update container status to Running, this is a temporary workaround
-       // to avoid resubmitting queued containers because record locking isn't
-       // implemented yet.
+       // Update container status to Running. This will fail if
+       // another dispatcher (token) has already locked it. It will
+       // succeed if *this* dispatcher has already locked it.
        err = arv.Update("containers", container.UUID,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": "Running"}},
@@ -277,31 +336,100 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
        if err != nil {
                log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
        }
+       log.Printf("Submitted container %v to slurm", container.UUID)
+       waitContainer(container, pollInterval)
+}
 
-       log.Printf("Submitted container run for %v", container.UUID)
-
-       containerUUID := container.UUID
+// Wait for a container to finish. Cancel the slurm job if the
+// container priority changes to zero before it ends.
+func waitContainer(container Container, pollInterval time.Duration) {
+       log.Printf("Monitoring container %v started", container.UUID)
+       defer log.Printf("Monitoring container %v finished", container.UUID)
+
+       pollTicker := time.NewTicker(pollInterval)
+       defer pollTicker.Stop()
+       for _ = range pollTicker.C {
+               var updated Container
+               err := arv.Get("containers", container.UUID, nil, &updated)
+               if err != nil {
+                       log.Printf("Error getting container %s: %q", container.UUID, err)
+                       continue
+               }
+               if updated.State == "Complete" || updated.State == "Cancelled" {
+                       return
+               }
+               if updated.Priority != 0 {
+                       continue
+               }
 
-       // A goroutine to terminate the runner if container priority becomes zero
-       priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
-       go func() {
-               for _ = range priorityTicker.C {
-                       var container Container
-                       err := arv.Get("containers", containerUUID, nil, &container)
-                       if err != nil {
-                               log.Printf("Error getting container info for %v: %q", container.UUID, err)
-                       } else {
-                               if container.Priority == 0 {
-                                       log.Printf("Canceling container %v", container.UUID)
-                                       priorityTicker.Stop()
-                                       cancelcmd := exec.Command("scancel", "--name="+container.UUID)
-                                       cancelcmd.Run()
-                               }
-                               if container.State == "Complete" {
-                                       priorityTicker.Stop()
-                               }
+               // Priority is zero, but state is Running or Locked
+               log.Printf("Canceling container %s", container.UUID)
+
+               err = exec.Command("scancel", "--name="+container.UUID).Run()
+               if err != nil {
+                       log.Printf("Error stopping container %s with scancel: %v", container.UUID, err)
+                       if inQ, err := checkSqueue(container.UUID); err != nil {
+                               log.Printf("Error running squeue: %v", err)
+                               continue
+                       } else if inQ {
+                               log.Printf("Container %s is still in squeue; will retry", container.UUID)
+                               continue
                        }
                }
-       }()
 
+               err = arv.Update("containers", container.UUID,
+                       arvadosclient.Dict{
+                               "container": arvadosclient.Dict{"state": "Cancelled"}},
+                       nil)
+               if err != nil {
+                       log.Printf("Error updating state for container %s: %s", container.UUID, err)
+                       continue
+               }
+
+               return
+       }
+}
+
+func checkSqueue(uuid string) (bool, error) {
+       cmd := exec.Command("squeue", "--format=%j")
+       sq, err := cmd.StdoutPipe()
+       if err != nil {
+               return false, err
+       }
+       cmd.Start()
+       defer cmd.Wait()
+       scanner := bufio.NewScanner(sq)
+       found := false
+       for scanner.Scan() {
+               if scanner.Text() == uuid {
+                       found = true
+               }
+       }
+       if err := scanner.Err(); err != nil {
+               return false, err
+       }
+       return found, nil
+}
+
+var mineMutex sync.RWMutex
+var mineMap = make(map[string]bool)
+
+// Goroutine-safely add/remove uuid to the set of "my" containers,
+// i.e., ones for which this process has a goroutine running.
+func setMine(uuid string, t bool) {
+       mineMutex.Lock()
+       if t {
+               mineMap[uuid] = true
+       } else {
+               delete(mineMap, uuid)
+       }
+       mineMutex.Unlock()
+}
+
+// Check whether there is already a goroutine running for this
+// container.
+func checkMine(uuid string) bool {
+       mineMutex.RLocker().Lock()
+       defer mineMutex.RLocker().Unlock()
+       return mineMap[uuid]
 }
index e58b9e4f546af23fbab7be74c82ce7fda2424e83..7fd20c15bdc2be84474170a695edb9e6ea52ebe6 100644 (file)
@@ -4,8 +4,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
 
+       "bytes"
        "fmt"
-       "io/ioutil"
        "log"
        "math"
        "net/http"
@@ -52,6 +52,7 @@ func (s *TestSuite) SetUpTest(c *C) {
        if err != nil {
                c.Fatalf("Error making arvados client: %s", err)
        }
+       os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
 }
 
 func (s *TestSuite) TearDownTest(c *C) {
@@ -89,10 +90,12 @@ func (s *TestSuite) Test_doMain(c *C) {
                        apiHost, apiToken, apiInsecure).Args
                go func() {
                        time.Sleep(5 * time.Second)
-                       arv.Update("containers", containerUUID,
-                               arvadosclient.Dict{
-                                       "container": arvadosclient.Dict{"state": "Complete"}},
-                               nil)
+                       for _, state := range []string{"Running", "Complete"} {
+                               arv.Update("containers", containerUUID,
+                                       arvadosclient.Dict{
+                                               "container": arvadosclient.Dict{"state": state}},
+                                       nil)
+                       }
                }()
                return exec.Command("echo", "strigger")
        }
@@ -122,7 +125,7 @@ func (s *TestSuite) Test_doMain(c *C) {
        c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
 
        c.Check(striggerCmdLine, DeepEquals, []string{"strigger", "--set", "--jobid=zzzzz-dz642-queuedcontainer\n", "--fini",
-               "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h 1 zzzzz-dz642-queuedcontainer"})
+               "--program=/usr/bin/crunch-finish-slurm.sh " + os.Getenv("ARVADOS_API_HOST") + " " + arvadostest.Dispatch1Token + " 1 zzzzz-dz642-queuedcontainer"})
 
        // There should be no queued containers now
        err = arv.List("containers", params, &containers)
@@ -138,6 +141,7 @@ func (s *TestSuite) Test_doMain(c *C) {
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
+       apiStubResponses["/arvados/v1/api_client_authorizations"] = arvadostest.StubResponse{200, string(`{"items":[{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}]}`)}
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
        testWithServerStub(c, apiStubResponses, "echo", "Error getting list of queued containers")
@@ -157,18 +161,18 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                Retries:   0,
        }
 
-       tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
-       c.Check(err, IsNil)
-       defer os.Remove(tempfile.Name())
-       log.SetOutput(tempfile)
+       buf := bytes.NewBuffer(nil)
+       log.SetOutput(buf)
+       defer log.SetOutput(os.Stderr)
 
        go func() {
-               time.Sleep(2 * time.Second)
+               for i := 0; i < 80 && !strings.Contains(buf.String(), expected); i++ {
+                       time.Sleep(100 * time.Millisecond)
+               }
                sigChan <- syscall.SIGTERM
        }()
 
        runQueuedContainers(2, 1, crunchCmd, crunchCmd)
 
-       buf, _ := ioutil.ReadFile(tempfile.Name())
-       c.Check(strings.Contains(string(buf), expected), Equals, true)
+       c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }