}
// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
//
// Any errors encountered are logged but the program would continue to run (not exit).
// This is because, once one or more crunch jobs are running,
}
// sbatchCmd
-var sbatchCmd = func(uuid string) *exec.Cmd {
+func sbatchFunc(uuid string) *exec.Cmd {
return exec.Command("sbatch", "--job-name="+uuid, "--share", "--parsable")
}
+var sbatchCmd = sbatchFunc
+
// striggerCmd
-var striggerCmd = func(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
+func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
}
-func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) {
- submiterr = nil
+var striggerCmd = striggerFunc
+
+// Submit job to slurm using sbatch.
+func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+ submitErr = nil
+ // Mark record as complete if anything errors out.
defer func() {
- if submiterr != nil {
+ if submitErr != nil {
// This really should be an "Error" state, see #8018
updateErr := arv.Update("containers", container.UUID,
arvadosclient.Dict{
}
}()
+ // Create the command and attach to stdin/stdout
cmd := sbatchCmd(container.UUID)
stdinWriter, stdinerr := cmd.StdinPipe()
if stdinerr != nil {
- submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
+ submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
return
}
- stdoutReader, stdouterr := cmd.StdoutPipe()
- if stdouterr != nil {
- submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr)
+ stdoutReader, stdoutErr := cmd.StdoutPipe()
+ if stdoutErr != nil {
+ submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
return
}
- stderrReader, stderrerr := cmd.StderrPipe()
- if stderrerr != nil {
- submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr)
+ stderrReader, stderrErr := cmd.StderrPipe()
+ if stderrErr != nil {
+ submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
return
}
err := cmd.Start()
if err != nil {
- submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+ submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
return
}
- stdoutchan := make(chan []byte)
+ stdoutChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stdoutReader)
- stdoutchan <- b
- close(stdoutchan)
+ stdoutChan <- b
+ close(stdoutChan)
}()
- stderrchan := make(chan []byte)
+ stderrChan := make(chan []byte)
go func() {
b, _ := ioutil.ReadAll(stderrReader)
- stderrchan <- b
- close(stderrchan)
+ stderrChan <- b
+ close(stderrChan)
}()
+ // Send a tiny script on stdin to execute the crunch-run command
+ // slurm actually enforces that this must be a #! script
fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
stdinWriter.Close()
err = cmd.Wait()
- stdoutmsg := <-stdoutchan
- stderrmsg := <-stderrchan
+ stdoutMsg := <-stdoutChan
+ stderrmsg := <-stderrChan
if err != nil {
- submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+ submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
return
}
- jobid = string(stdoutmsg)
+ // If everything worked out, got the jobid on stdout
+ jobid = string(stdoutMsg)
return
}
-func strigger(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
+// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
+// the slurm controller when the job finishes.
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
}
-// Run queued container:
+// Run a queued container.
// Set container state to locked (TBD)
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
+// Submit job to slurm to execute crunch-run command for the container
+// If the container priority becomes zero while crunch job is still running, cancel the job.
func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
jobid, err := submit(container, crunchRunCommand)
if arv.ApiInsecure {
insecure = "1"
}
- strigger(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+ finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
- // Update container status to Running
+ // Update container status to Running, this is a temporary workaround
+ // to avoid resubmitting queued containers because record locking isn't
+ // implemented yet.
err = arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Running"}},