8953: shutdown_eligible() returns a tuple. Report reason for shutdown decision.
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index 29d58c528eba673532ff77ac62279913ec5111b2..8fbc0fa8b63810bd6a06fadab097836dfd06b113 100644 (file)
@@ -4,7 +4,7 @@ import (
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "io"
+       "io/ioutil"
        "log"
        "os"
        "os/exec"
@@ -48,6 +48,11 @@ func doMain() error {
                "/usr/bin/crunch-run",
                "Crunch command to run container")
 
+       finishCommand := flags.String(
+               "finish-command",
+               "/usr/bin/crunch-finish-slurm.sh",
+               "Command to run from strigger when job is finished")
+
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
@@ -71,7 +76,7 @@ func doMain() error {
        }(sigChan)
 
        // Run all queued containers
-       runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+       runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand)
 
        // Wait for all running crunch jobs to complete / terminate
        waitGroup.Wait()
@@ -80,18 +85,18 @@ func doMain() error {
 }
 
 // Poll for queued containers using pollInterval.
-// Invoke dispatchLocal for each ticker cycle, which will run all the queued containers.
+// Invoke dispatchSlurm for each ticker cycle, which will run all the queued containers.
 //
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
+func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) {
        ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
 
        for {
                select {
                case <-ticker.C:
-                       dispatchSlurm(priorityPollInterval, crunchRunCommand)
+                       dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand)
                case <-doneProcessing:
                        ticker.Stop()
                        return
@@ -112,7 +117,7 @@ type ContainerList struct {
 }
 
 // Get the list of queued containers from API server and invoke run for each container.
-func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
+func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) {
        params := arvadosclient.Dict{
                "filters": [][]string{[]string{"state", "=", "Queued"}},
        }
@@ -127,57 +132,167 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) {
        for i := 0; i < len(containers.Items); i++ {
                log.Printf("About to submit queued container %v", containers.Items[i].UUID)
                // Run the container
-               go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+               go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval)
        }
 }
 
-// Run queued container:
-// Set container state to locked (TBD)
-// Run container using the given crunch-run command
-// Set the container state to Running
-// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
-       stdinReader, stdinWriter := io.Pipe()
-
-       cmd := exec.Command("sbatch", "--job-name="+uuid)
-       cmd.Stdin = stdinReader
-       cmd.Stderr = os.Stderr
-       cmd.Stdout = os.Stderr
-       if err := cmd.Start(); err != nil {
-               log.Printf("Error running container for %v: %q", uuid, err)
+// sbatchCmd
+func sbatchFunc(uuid string) *exec.Cmd {
+       return exec.Command("sbatch", "--job-name="+uuid, "--share", "--parsable")
+}
+
+var sbatchCmd = sbatchFunc
+
+// striggerCmd
+func striggerFunc(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) *exec.Cmd {
+       return exec.Command("strigger", "--set", "--jobid="+jobid, "--fini",
+               fmt.Sprintf("--program=%s %s %s %s %s", finishCommand, apiHost, apiToken, apiInsecure, containerUUID))
+}
+
+var striggerCmd = striggerFunc
+
+// Submit job to slurm using sbatch.
+func submit(container Container, crunchRunCommand string) (jobid string, submitErr error) {
+       submitErr = nil
+
+       // Mark record as complete if anything errors out.
+       defer func() {
+               if submitErr != nil {
+                       // This really should be an "Error" state, see #8018
+                       updateErr := arv.Update("containers", container.UUID,
+                               arvadosclient.Dict{
+                                       "container": arvadosclient.Dict{"state": "Complete"}},
+                               nil)
+                       if updateErr != nil {
+                               log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr)
+                       }
+               }
+       }()
+
+       // Create the command and attach to stdin/stdout
+       cmd := sbatchCmd(container.UUID)
+       stdinWriter, stdinerr := cmd.StdinPipe()
+       if stdinerr != nil {
+               submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
                return
        }
 
-       fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec %s %s\n", crunchRunCommand, uuid)
+       stdoutReader, stdoutErr := cmd.StdoutPipe()
+       if stdoutErr != nil {
+               submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
+               return
+       }
+
+       stderrReader, stderrErr := cmd.StderrPipe()
+       if stderrErr != nil {
+               submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
+               return
+       }
+
+       err := cmd.Start()
+       if err != nil {
+               submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+               return
+       }
+
+       stdoutChan := make(chan []byte)
+       go func() {
+               b, _ := ioutil.ReadAll(stdoutReader)
+               stdoutChan <- b
+               close(stdoutChan)
+       }()
+
+       stderrChan := make(chan []byte)
+       go func() {
+               b, _ := ioutil.ReadAll(stderrReader)
+               stderrChan <- b
+               close(stderrChan)
+       }()
 
+       // Send a tiny script on stdin to execute the crunch-run command
+       // slurm actually enforces that this must be a #! script
+       fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
        stdinWriter.Close()
-       cmd.Wait()
 
-       // Update container status to Running
-       err := arv.Update("containers", uuid,
+       err = cmd.Wait()
+
+       stdoutMsg := <-stdoutChan
+       stderrmsg := <-stderrChan
+
+       if err != nil {
+               submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
+               return
+       }
+
+       // If everything worked out, got the jobid on stdout
+       jobid = string(stdoutMsg)
+
+       return
+}
+
+// finalizeRecordOnFinish uses 'strigger' command to register a script that will run on
+// the slurm controller when the job finishes.
+func finalizeRecordOnFinish(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure string) {
+       cmd := striggerCmd(jobid, containerUUID, finishCommand, apiHost, apiToken, apiInsecure)
+       cmd.Stdout = os.Stdout
+       cmd.Stderr = os.Stderr
+       err := cmd.Run()
+       if err != nil {
+               log.Printf("While setting up strigger: %v", err)
+       }
+}
+
+// Run a queued container.
+// Set container state to locked (TBD)
+// Submit job to slurm to execute crunch-run command for the container
+// If the container priority becomes zero while crunch job is still running, cancel the job.
+func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) {
+
+       jobid, err := submit(container, crunchRunCommand)
+       if err != nil {
+               log.Printf("Error queuing container run: %v", err)
+               return
+       }
+
+       insecure := "0"
+       if arv.ApiInsecure {
+               insecure = "1"
+       }
+       finalizeRecordOnFinish(jobid, container.UUID, finishCommand, arv.ApiServer, arv.ApiToken, insecure)
+
+       // Update container status to Running, this is a temporary workaround
+       // to avoid resubmitting queued containers because record locking isn't
+       // implemented yet.
+       err = arv.Update("containers", container.UUID,
                arvadosclient.Dict{
                        "container": arvadosclient.Dict{"state": "Running"}},
                nil)
        if err != nil {
-               log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+               log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err)
        }
 
-       log.Printf("Submitted container run for %v", uuid)
+       log.Printf("Submitted container run for %v", container.UUID)
+
+       containerUUID := container.UUID
 
        // A goroutine to terminate the runner if container priority becomes zero
        priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
        go func() {
                for _ = range priorityTicker.C {
                        var container Container
-                       err := arv.Get("containers", uuid, nil, &container)
+                       err := arv.Get("containers", containerUUID, nil, &container)
                        if err != nil {
-                               log.Printf("Error getting container info for %v: %q", uuid, err)
+                               log.Printf("Error getting container info for %v: %q", container.UUID, err)
                        } else {
                                if container.Priority == 0 {
+                                       log.Printf("Canceling container %v", container.UUID)
                                        priorityTicker.Stop()
-                                       cancelcmd := exec.Command("scancel", "--name="+uuid)
+                                       cancelcmd := exec.Command("scancel", "--name="+container.UUID)
                                        cancelcmd.Run()
                                }
+                               if container.State == "Complete" {
+                                       priorityTicker.Stop()
+                               }
                        }
                }
        }()