"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "io"
+ "io/ioutil"
"log"
"os"
"os/exec"
"/usr/bin/crunch-run",
"Crunch command to run container")
+ finishCommand := flags.String(
+ "finish-command",
+ "/usr/bin/crunch-finish-slurm.sh",
+ "Command to run from strigger when job is finished")
+
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
}(sigChan)
// Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+ runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
}
// Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
//
// Any errors encountered are logged but the program would continue to run (not exit).
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
for {
select {
case <-ticker.C:
- dispatchSlurm(priorityPollInterval, crunchRunCommand)
+ dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
case <-doneProcessing:
ticker.Stop()
return
}
// Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
+func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
for i := 0; i < len(containers.Items); i++ {
log.Printf("About to submit queued container %v", containers.Items[i].UUID)
// Run the container
- go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+ go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
}
}
-// Run queued container:
-// Set container state to locked (TBD)
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
- stdinReader, stdinWriter := io.Pipe()
-
- cmd := exec.Command("sbatch", "--job-name="+uuid)
- cmd.Stdin = stdinReader
- cmd.Stderr = os.Stderr
- cmd.Stdout = os.Stderr
- if err := cmd.Start(); err != nil {
- log.Printf("Error running container for %v: %q", uuid, err)
+// sbatchCmd
+func sbatchFunc(uuid string) *exec.Cmd {
+ return exec.Command("sbatch", "--job-name="+uuid, "--share", "--parsable")
+}
+
+var sbatchCmd = sbatchFunc
+
+// striggerCmd
+func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
+ return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
+ fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
+}
+
+var striggerCmd = striggerFunc
+
+// Submit job to slurm using sbatch.
+func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+ submitErr = nil
+
+ // Mark record as complete if anything errors out.
+ defer func() {
+ if submitErr != nil {
+ // This really should be an "Error" state, see #8018
+ updateErr := arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"state": "Complete"}},
+ nil)
+ if updateErr != nil {
+ log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
+ }
+ }
+ }()
+
+ // Create the command and attach to stdin/stdout
+ cmd := sbatchCmd(container.UUID)
+ stdinWriter, stdinerr := cmd.StdinPipe()
+ if stdinerr != nil {
+ submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
return
}
- fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec %s %s\n", crunchRunCommand, uuid)
+ stdoutReader, stdoutErr := cmd.StdoutPipe()
+ if stdoutErr != nil {
+ submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
+ return
+ }
+
+ stderrReader, stderrErr := cmd.StderrPipe()
+ if stderrErr != nil {
+ submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
+ return
+ }
+
+ err := cmd.Start()
+ if err != nil {
+ submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+ return
+ }
+
+ stdoutChan := make(chan []byte)
+ go func() {
+ b, _ := ioutil.ReadAll(stdoutReader)
+ stdoutChan <- b
+ close(stdoutChan)
+ }()
+
+ stderrChan := make(chan []byte)
+ go func() {
+ b, _ := ioutil.ReadAll(stderrReader)
+ stderrChan <- b
+ close(stderrChan)
+ }()
+ // Send a tiny script on stdin to execute the crunch-run command
+ // slurm actually enforces that this must be a #! script
+ fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
stdinWriter.Close()
- cmd.Wait()
- // Update container status to Running
- err := arv.Update("containers", uuid,
+ err = cmd.Wait()
+
+ stdoutMsg := <-stdoutChan
+ stderrmsg := <-stderrChan
+
+ if err != nil {
+ submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+ return
+ }
+
+ // If everything worked out, got the jobid on stdout
+ jobid = string(stdoutMsg)
+
+ return
+}
+
+// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
+// the slurm controller when the job finishes.
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
+ cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ err := cmd.Run()
+ if err != nil {
+ log.Printf("While setting up strigger: %v", err)
+ }
+}
+
+// Run a queued container.
+// Set container state to locked (TBD)
+// Submit job to slurm to execute crunch-run command for the container
+// If the container priority becomes zero while crunch job is still running, cancel the job.
+func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
+
+ jobid, err := submit(container, crunchRunCommand)
+ if err != nil {
+ log.Printf("Error queuing container run: %v", err)
+ return
+ }
+
+ insecure := "0"
+ if arv.ApiInsecure {
+ insecure = "1"
+ }
+ finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+
+ // Update container status to Running, this is a temporary workaround
+ // to avoid resubmitting queued containers because record locking isn't
+ // implemented yet.
+ err = arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"state": "Running"}},
nil)
if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+ log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
}
- log.Printf("Submitted container run for %v", uuid)
+ log.Printf("Submitted container run for %v", container.UUID)
+
+ containerUUID := container.UUID
// A goroutine to terminate the runner if container priority becomes zero
priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
go func() {
for _ = range priorityTicker.C {
var container Container
- err := arv.Get("containers", uuid, nil, &container)
+ err := arv.Get("containers", containerUUID, nil, &container)
if err != nil {
- log.Printf("Error getting container info for %v: %q", uuid, err)
+ log.Printf("Error getting container info for %v: %q", container.UUID, err)
} else {
if container.Priority == 0 {
+ log.Printf("Canceling container %v", container.UUID)
priorityTicker.Stop()
- cancelcmd := exec.Command("scancel", "--name="+uuid)
+ cancelcmd := exec.Command("scancel", "--name="+container.UUID)
cancelcmd.Run()
}
+ if container.State == "Complete" {
+ priorityTicker.Stop()
+ }
}
}
}()