7658: update connect error test to use stream handler to read the log file, instead...
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index c94526937316fb9866b33865a32b2f3f6d7383b3..8fbc0fa8b63810bd6a06fadab097836dfd06b113 100644 (file)
@@ -85,7 +85,7 @@ func doMain() error {
 }
 
 // Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
 //
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
@@ -137,21 +137,27 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand str
 }
 
 // sbatchCmd
-var sbatchCmd = func(uuid string) *exec.Cmd {
+func sbatchFunc(uuid string) *exec.Cmd {
        return exec.Command("sbatch", "--job-name="+uuid, "--share", "--parsable")
 }
 
+var sbatchCmd = sbatchFunc
+
 // striggerCmd
-var striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
+func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
        return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
                fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
 }
 
-func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
-       submiterr = nil
+var striggerCmd = striggerFunc
+
+// Submit job to slurm using sbatch.
+func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+       submitErr = nil
 
+       // Mark record as complete if anything errors out.
        defer func() {
-               if submiterr != nil {
+               if submitErr != nil {
                        // This really should be an "Error" state, see #8018
                        updateErr := arv.Update("containers", container.UUID,
                                arvadosclient.Dict{
@@ -163,64 +169,70 @@ func submit(container Container, crunchRunCommand string) (jobid string, submite
                }
        }()
 
+       // Create the command and attach to stdin/stdout
        cmd := sbatchCmd(container.UUID)
        stdinWriter, stdinerr := cmd.StdinPipe()
        if stdinerr != nil {
-               submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
+               submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
                return
        }
 
-       stdoutReader, stdouterr := cmd.StdoutPipe()
-       if stdouterr != nil {
-               submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
+       stdoutReader, stdoutErr := cmd.StdoutPipe()
+       if stdoutErr != nil {
+               submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
                return
        }
 
-       stderrReader, stderrerr := cmd.StderrPipe()
-       if stderrerr != nil {
-               submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
+       stderrReader, stderrErr := cmd.StderrPipe()
+       if stderrErr != nil {
+               submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
                return
        }
 
        err := cmd.Start()
        if err != nil {
-               submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+               submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
                return
        }
 
-       stdoutchan := make(chan []byte)
+       stdoutChan := make(chan []byte)
        go func() {
                b, _ := ioutil.ReadAll(stdoutReader)
-               stdoutchan <- b
-               close(stdoutchan)
+               stdoutChan <- b
+               close(stdoutChan)
        }()
 
-       stderrchan := make(chan []byte)
+       stderrChan := make(chan []byte)
        go func() {
                b, _ := ioutil.ReadAll(stderrReader)
-               stderrchan <- b
-               close(stderrchan)
+               stderrChan <- b
+               close(stderrChan)
        }()
 
+       // Send a tiny script on stdin to execute the crunch-run command
+       // slurm actually enforces that this must be a #! script
        fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
        stdinWriter.Close()
 
        err = cmd.Wait()
 
-       stdoutmsg := <-stdoutchan
-       stderrmsg := <-stderrchan
+       stdoutMsg := <-stdoutChan
+       stderrmsg := <-stderrChan
 
        if err != nil {
-               submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+               submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
                return
        }
 
-       jobid = string(stdoutmsg)
+       // If everything worked out, got the jobid on stdout
+       jobid = string(stdoutMsg)
 
        return
 }
 
-func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
+// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
+// the slurm controller when the job finishes.
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
        cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
        cmd.Stdout = os.Stdout
        cmd.Stderr = os.Stderr
@@ -230,11 +242,10 @@ func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecur
        }
 }
 
-// Run queued container:
+// Run a queued container.
 // Set container state to locked (TBD)
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
+// Submit job to slurm to execute crunch-run command for the container
+// If the container priority becomes zero while crunch job is still running, cancel the job.
 func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
 
        jobid, err := submit(container, crunchRunCommand)
@@ -247,9 +258,11 @@ func run(container Container, crunchRunCommand, finishCommand string, priorityPo
        if arv.ApiInsecure {
                insecure = "1"
        }
-       strigger(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+       finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
 
-       // Update container status to Running
+       // Update container status to Running, this is a temporary workaround
+       // to avoid resubmitting queued containers because record locking isn't
+       // implemented yet.
        err = arv.Update("containers", container.UUID,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": "Running"}},