6518: Merge in tests. Code cleanup around variable naming and comments.
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 16 Mar 2016 00:01:19 +0000 (20:01 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 16 Mar 2016 00:01:19 +0000 (20:01 -0400)
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/crunch-finish-slurm.sh

index c94526937316fb9866b33865a32b2f3f6d7383b3..bc1b0a5228264cfbade661d97e1785fb5f05baa7 100644 (file)
@@ -85,7 +85,7 @@ func doMain() error {
 }
 
 // Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
 //
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
@@ -147,11 +147,13 @@ var striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, a
                fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
 }
 
-func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
-       submiterr = nil
+// Submit job to slurm using sbatch.
+func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+       submitErr = nil
 
+       // Mark record as complete if anything errors out.
        defer func() {
-               if submiterr != nil {
+               if submitErr != nil {
                        // This really should be an "Error" state, see #8018
                        updateErr := arv.Update("containers", container.UUID,
                                arvadosclient.Dict{
@@ -163,64 +165,70 @@ func submit(container Container, crunchRunCommand string) (jobid string, submite
                }
        }()
 
+       // Create the command and attach to stdin/stdout
        cmd := sbatchCmd(container.UUID)
        stdinWriter, stdinerr := cmd.StdinPipe()
        if stdinerr != nil {
-               submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
+               submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
                return
        }
 
-       stdoutReader, stdouterr := cmd.StdoutPipe()
-       if stdouterr != nil {
-               submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
+       stdoutReader, stdoutErr := cmd.StdoutPipe()
+       if stdoutErr != nil {
+               submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
                return
        }
 
-       stderrReader, stderrerr := cmd.StderrPipe()
-       if stderrerr != nil {
-               submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
+       stderrReader, stderrErr := cmd.StderrPipe()
+       if stderrErr != nil {
+               submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
                return
        }
 
        err := cmd.Start()
        if err != nil {
-               submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+               submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
                return
        }
 
-       stdoutchan := make(chan []byte)
+       stdoutChan := make(chan []byte)
        go func() {
                b, _ := ioutil.ReadAll(stdoutReader)
-               stdoutchan <- b
-               close(stdoutchan)
+               stdoutChan <- b
+               close(stdoutChan)
        }()
 
-       stderrchan := make(chan []byte)
+       stderrChan := make(chan []byte)
        go func() {
                b, _ := ioutil.ReadAll(stderrReader)
-               stderrchan <- b
-               close(stderrchan)
+               stderrChan <- b
+               close(stderrChan)
        }()
 
+       // Send a tiny script on stdin to execute the crunch-run command
+       // slurm actually enforces that this must be a #! script
        fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
        stdinWriter.Close()
 
        err = cmd.Wait()
 
-       stdoutmsg := <-stdoutchan
-       stderrmsg := <-stderrchan
+       stdoutMsg := <-stdoutChan
+       stderrmsg := <-stderrChan
 
        if err != nil {
-               submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+               submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
                return
        }
 
-       jobid = string(stdoutmsg)
+       // If everything worked out, got the jobid on stdout
+       jobid = string(stdoutMsg)
 
        return
 }
 
-func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
+// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
+// the slurm controller when the job finishes.
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
        cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
        cmd.Stdout = os.Stdout
        cmd.Stderr = os.Stderr
@@ -230,11 +238,10 @@ func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecur
        }
 }
 
-// Run queued container:
+// Run a queued container.
 // Set container state to locked (TBD)
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
+// Submit job to slurm to execute crunch-run command for the container
+// If the container priority becomes zero while crunch job is still running, cancel the job.
 func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
 
        jobid, err := submit(container, crunchRunCommand)
@@ -247,9 +254,11 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
        if arv.ApiInsecure {
                insecure = "1"
        }
-       strigger(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+       finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 
-       // Update container status to Running
+       // Update container status to Running, this is a temporary workaround
+       // to avoid resubmitting queued containers because record locking isn't
+       // implemented yet.
        err = arv.Update("containers", container.UUID,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": "Running"}},
index 9b13f00a07b0340755fcc2d8eefb9269ca535721..56dd3cee797334ab1d46c7bff99ff5a02e62b7e6 100644 (file)
@@ -92,14 +92,19 @@ func (s *TestSuite) Test_doMain(c *C) {
                sigChan <- syscall.SIGINT
        }()
 
-       err := doMain()
-       c.Check(err, IsNil)
-
        // There should be no queued containers now
        params := arvadosclient.Dict{
                "filters": [][]string{[]string{"state", "=", "Queued"}},
        }
        var containers ContainerList
+       err := arv.List("containers", params, &containers)
+       c.Check(err, IsNil)
+       c.Assert(len(containers.Items), Equals, 1)
+
+       err = doMain()
+       c.Check(err, IsNil)
+
+       // There should be no queued containers now
        err = arv.List("containers", params, &containers)
        c.Check(err, IsNil)
        c.Assert(len(containers.Items), Equals, 0)
index 2977e1c6ecd11fa2d3d7db05866adae64b63beba..95a37ba4ef4d37c3d7c26d639146d7920312834d 100755 (executable)
@@ -1,7 +1,8 @@
 #!/bin/sh
 
-# I wonder if it is possible to attach metadata to job records to look these
-# things up instead of having to provide it on the command line.
+# Script to be called by strigger when a job finishes.  This ensures the job
+# record has the correct state "Complete" even if the node running the job
+# failed.
 
 ARVADOS_API_HOST=$1
 ARVADOS_API_TOKEN=$2
@@ -9,6 +10,9 @@ ARVADOS_API_HOST_INSECURE=$3
 uuid=$4
 jobid=$5
 
+# If it is possible to attach metadata to job records we could look up the
+# above information instead of getting it on the command line.  For example,
+# this is the recipe for getting the job name (container uuid) from the job id.
 #uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader)
 
 export ARVADOS_API_HOST ARVADOS_API_TOKEN ARVADOS_API_HOST_INSECURE