From 1edffa89f42b3b3e53df9f5669cc3d7e2c99ea4b Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 25 Feb 2016 22:09:16 -0500 Subject: [PATCH] 6518: Working on using strigger to update job records when crunch-run cannot. --- .../crunch-dispatch-slurm.go | 133 ++++++++++++++---- .../crunch-finish-slurm.sh | 7 + services/crunch-run/crunchrun.go | 2 +- 3 files changed, 116 insertions(+), 26 deletions(-) create mode 100755 services/crunch-dispatch-slurm/crunch-finish-slurm.sh diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 5133c9fd81..875eaa37fc 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -4,7 +4,7 @@ import ( "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "io" + "io/ioutil" "log" "os" "os/exec" @@ -48,6 +48,11 @@ func doMain() error { "/usr/bin/crunch-run", "Crunch command to run container") + finishCommand := flags.String( + "finish-command", + "/usr/bin/crunch-finish-slurm.sh", + "Command to run from strigger when job is finished") + // Parse args; omit the first arg which is the command name flags.Parse(os.Args[1:]) @@ -71,7 +76,7 @@ func doMain() error { }(sigChan) // Run all queued containers - runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand) + runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand, *finishCommand) // Wait for all running crunch jobs to complete / terminate waitGroup.Wait() @@ -85,13 +90,13 @@ func doMain() error { // Any errors encountered are logged but the program would continue to run (not exit). // This is because, once one or more crunch jobs are running, // we would need to wait for them complete. -func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) { +func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand, finishCommand string) { ticker := time.NewTicker(time.Duration(pollInterval) * time.Second) for { select { case <-ticker.C: - dispatchSlurm(priorityPollInterval, crunchRunCommand) + dispatchSlurm(priorityPollInterval, crunchRunCommand, finishCommand) case <-doneProcessing: ticker.Stop() return @@ -112,7 +117,7 @@ type ContainerList struct { } // Get the list of queued containers from API server and invoke run for each container. -func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) { +func dispatchSlurm(priorityPollInterval int, crunchRunCommand, finishCommand string) { params := arvadosclient.Dict{ "filters": [][]string{[]string{"state", "=", "Queued"}}, } @@ -127,7 +132,90 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) { for i := 0; i < len(containers.Items); i++ { log.Printf("About to submit queued container %v", containers.Items[i].UUID) // Run the container - go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval) + go run(containers.Items[i], crunchRunCommand, finishCommand, priorityPollInterval) + } +} + +func submit(container Container, crunchRunCommand string) (jobid string, submiterr error) { + submiterr = nil + + defer func() { + if submiterr != nil { + // This really should be an "Error" state, see #8018 + updateErr := arv.Update("containers", container.UUID, + arvadosclient.Dict{ + "container": arvadosclient.Dict{"state": "Complete"}}, + nil) + if updateErr != nil { + log.Printf("Error updating container state to 'Complete' for %v: %q", container.UUID, updateErr) + } + } + }() + + cmd := exec.Command("sbatch", "--job-name="+container.UUID, "--share", "--parsable") + stdinWriter, stdinerr := cmd.StdinPipe() + if stdinerr != nil { + submiterr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr) + return + } + + stdoutReader, stdouterr := cmd.StdoutPipe() + if stdouterr != nil { + submiterr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdouterr) + return + } + + stderrReader, stderrerr := cmd.StderrPipe() + if stderrerr != nil { + submiterr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrerr) + return + } + + err := cmd.Start() + if err != nil { + submiterr = fmt.Errorf("Error starting %v: %v", cmd.Args, err) + return + } + + stdoutchan := make(chan []byte) + go func() { + b, _ := ioutil.ReadAll(stdoutReader) + stdoutchan <- b + close(stdoutchan) + }() + + stderrchan := make(chan []byte) + go func() { + b, _ := ioutil.ReadAll(stderrReader) + stderrchan <- b + close(stderrchan) + }() + + fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID) + stdinWriter.Close() + + err = cmd.Wait() + + stdoutmsg := <-stdoutchan + stderrmsg := <-stderrchan + + if err != nil { + submiterr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg) + return + } + + jobid = string(stdoutmsg) + + return +} + +func strigger(jobid, containerUUID, finishCommand string) { + cmd := exec.Command("strigger", "--set", "--jobid="+jobid, "--fini", fmt.Sprintf("--program=%s", finishCommand)) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Run() + if err != nil { + log.Printf("While setting up strigger: %v", err) } } @@ -136,47 +224,42 @@ func dispatchSlurm(priorityPollInterval int, crunchRunCommand string) { // Run container using the given crunch-run command // Set the container state to Running // If the container priority becomes zero while crunch job is still running, terminate it. -func run(uuid string, crunchRunCommand string, priorityPollInterval int) { - stdinReader, stdinWriter := io.Pipe() +func run(container Container, crunchRunCommand, finishCommand string, priorityPollInterval int) { - cmd := exec.Command("sbatch", "--job-name="+uuid, "--share") - cmd.Stdin = stdinReader - cmd.Stderr = os.Stderr - cmd.Stdout = os.Stderr - if err := cmd.Start(); err != nil { - log.Printf("Error running container for %v: %q", uuid, err) + jobid, err := submit(container, crunchRunCommand) + if err != nil { + log.Printf("Error queuing container run: %v", err) return } - fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, uuid) - - stdinWriter.Close() - cmd.Wait() + strigger(jobid, container.UUID, finishCommand) // Update container status to Running - err := arv.Update("containers", uuid, + err = arv.Update("containers", container.UUID, arvadosclient.Dict{ "container": arvadosclient.Dict{"state": "Running"}}, nil) if err != nil { - log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err) + log.Printf("Error updating container state to 'Running' for %v: %q", container.UUID, err) } - log.Printf("Submitted container run for %v", uuid) + log.Printf("Submitted container run for %v", container.UUID) + + containerUUID := container.UUID // A goroutine to terminate the runner if container priority becomes zero priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second) go func() { for _ = range priorityTicker.C { var container Container - err := arv.Get("containers", uuid, nil, &container) + err := arv.Get("containers", containerUUID, nil, &container) if err != nil { - log.Printf("Error getting container info for %v: %q", uuid, err) + log.Printf("Error getting container info for %v: %q", container.UUID, err) } else { if container.Priority == 0 { - log.Printf("Canceling container %v", uuid) + log.Printf("Canceling container %v", container.UUID) priorityTicker.Stop() - cancelcmd := exec.Command("scancel", "--name="+uuid) + cancelcmd := exec.Command("scancel", "--name="+container.UUID) cancelcmd.Run() } if container.State == "Complete" { diff --git a/services/crunch-dispatch-slurm/crunch-finish-slurm.sh b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh new file mode 100755 index 0000000000..8be6fdd9e0 --- /dev/null +++ b/services/crunch-dispatch-slurm/crunch-finish-slurm.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +jobid=$1 + +uuid=$(squeue --jobs=$jobid --states=all --format=%j --noheader) + +arv containers update --uuid $uuid --container '{"state": "Completed"}' diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 64f0d77b7e..039a649a6d 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -604,7 +604,7 @@ func (runner *ContainerRunner) Run() (err error) { if hosterr != nil { runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr) } else { - runner.CrunchLog.Printf("Executing on host '%s'", runner.ContainerRecord.UUID, hostname) + runner.CrunchLog.Printf("Executing on host '%s'", hostname) } var runerr, waiterr error -- 2.39.5