Merge branch '10700-dispatch'
authorTom Clegg <tom@curoverse.com>
Tue, 31 Jan 2017 21:32:05 +0000 (16:32 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 31 Jan 2017 21:32:05 +0000 (16:32 -0500)
closes #10700
  refs #10701
closes #10702
closes #10703
closes #10704

Conflicts:
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go

1  2 
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go

index f1f5d1e6b84df0473ffdbe71edb64342cca16d81,60dc6071b40459ccbab6920ba80727917a69d9eb..1c080f36ac13133b12ad4308fb62d6f53549ded3
@@@ -3,22 -3,20 +3,21 @@@ package mai
  // Dispatcher service for Crunch that submits containers to the slurm queue.
  
  import (
+       "bytes"
        "flag"
        "fmt"
-       "io"
-       "io/ioutil"
 -      "git.curoverse.com/arvados.git/sdk/go/arvados"
 -      "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 -      "git.curoverse.com/arvados.git/sdk/go/config"
 -      "git.curoverse.com/arvados.git/sdk/go/dispatch"
 -      "github.com/coreos/go-systemd/daemon"
        "log"
        "math"
        "os"
        "os/exec"
        "strings"
        "time"
 +
 +      "git.curoverse.com/arvados.git/sdk/go/arvados"
 +      "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 +      "git.curoverse.com/arvados.git/sdk/go/config"
 +      "git.curoverse.com/arvados.git/sdk/go/dispatch"
 +      "github.com/coreos/go-systemd/daemon"
  )
  
  // Config used by crunch-dispatch-slurm
@@@ -33,6 -31,9 +32,9 @@@ type Config struct 
        //
        // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
        CrunchRunCommand []string
+       // Minimum time between two attempts to run the same container
+       MinRetryPeriod arvados.Duration
  }
  
  func main() {
@@@ -43,8 -44,8 +45,8 @@@
  }
  
  var (
-       theConfig     Config
-       squeueUpdater Squeue
+       theConfig Config
+       sqCheck   SqueueChecker
  )
  
  const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@@ -57,10 -58,6 +59,10 @@@ func doMain() error 
                "config",
                defaultConfigPath,
                "`path` to JSON or YAML configuration file")
 +      dumpConfig := flag.Bool(
 +              "dump-config",
 +              false,
 +              "write current configuration to stdout and exit")
  
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
                log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
        }
  
 +      if *dumpConfig {
 +              log.Fatal(config.DumpAndExit(theConfig))
 +      }
 +
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Printf("Error making Arvados client: %v", err)
        }
        arv.Retries = 25
  
-       squeueUpdater.StartMonitor(time.Duration(theConfig.PollPeriod))
-       defer squeueUpdater.Done()
+       sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+       defer sqCheck.Stop()
  
        dispatcher := dispatch.Dispatcher{
                Arv:            arv,
                RunContainer:   run,
-               PollInterval:   time.Duration(theConfig.PollPeriod),
-               DoneProcessing: make(chan struct{})}
+               PollPeriod:     time.Duration(theConfig.PollPeriod),
+               MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
+       }
  
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
  
-       err = dispatcher.RunDispatcher()
-       if err != nil {
-               return err
-       }
-       return nil
+       return dispatcher.Run()
  }
  
  // sbatchCmd
@@@ -168,70 -157,31 +166,31 @@@ func submit(dispatcher *dispatch.Dispat
                }
        }()
  
-       // Create the command and attach to stdin/stdout
        cmd := sbatchCmd(container)
-       stdinWriter, stdinerr := cmd.StdinPipe()
-       if stdinerr != nil {
-               submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
-               return
-       }
  
-       stdoutReader, stdoutErr := cmd.StdoutPipe()
-       if stdoutErr != nil {
-               submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
-               return
-       }
+       // Send a tiny script on stdin to execute the crunch-run
+       // command (slurm requires this to be a #! script)
+       cmd.Stdin = strings.NewReader(execScript(append(crunchRunCommand, container.UUID)))
  
-       stderrReader, stderrErr := cmd.StderrPipe()
-       if stderrErr != nil {
-               submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
-               return
-       }
+       var stdout, stderr bytes.Buffer
+       cmd.Stdout = &stdout
+       cmd.Stderr = &stderr
  
        // Mutex between squeue sync and running sbatch or scancel.
-       squeueUpdater.SlurmLock.Lock()
-       defer squeueUpdater.SlurmLock.Unlock()
-       log.Printf("sbatch starting: %+q", cmd.Args)
-       err := cmd.Start()
-       if err != nil {
-               submitErr = fmt.Errorf("Error starting sbatch: %v", err)
-               return
-       }
-       stdoutChan := make(chan []byte)
-       go func() {
-               b, _ := ioutil.ReadAll(stdoutReader)
-               stdoutReader.Close()
-               stdoutChan <- b
-               close(stdoutChan)
-       }()
-       stderrChan := make(chan []byte)
-       go func() {
-               b, _ := ioutil.ReadAll(stderrReader)
-               stderrReader.Close()
-               stderrChan <- b
-               close(stderrChan)
-       }()
-       // Send a tiny script on stdin to execute the crunch-run command
-       // slurm actually enforces that this must be a #! script
-       io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
-       stdinWriter.Close()
-       stdoutMsg := <-stdoutChan
-       stderrmsg := <-stderrChan
-       err = cmd.Wait()
-       if err != nil {
-               submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
-               return
+       sqCheck.L.Lock()
+       defer sqCheck.L.Unlock()
+       log.Printf("exec sbatch %+q", cmd.Args)
+       err := cmd.Run()
+       switch err.(type) {
+       case nil:
+               log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
+               return nil
+       case *exec.ExitError:
+               return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr)
+       default:
+               return fmt.Errorf("exec failed: %v", err)
        }
-       log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
-       return
  }
  
  // If the container is marked as Locked, check if it is already in the slurm
  func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
        submitted := false
        for !*monitorDone {
-               if squeueUpdater.CheckSqueue(container.UUID) {
+               if sqCheck.HasUUID(container.UUID) {
                        // Found in the queue, so continue monitoring
                        submitted = true
                } else if container.State == dispatch.Locked && !submitted {
@@@ -304,28 -254,24 +263,24 @@@ func run(dispatcher *dispatch.Dispatche
        go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
  
        for container = range status {
-               if container.State == dispatch.Locked || container.State == dispatch.Running {
-                       if container.Priority == 0 {
-                               log.Printf("Canceling container %s", container.UUID)
-                               // Mutex between squeue sync and running sbatch or scancel.
-                               squeueUpdater.SlurmLock.Lock()
-                               cmd := scancelCmd(container)
-                               msg, err := cmd.CombinedOutput()
-                               squeueUpdater.SlurmLock.Unlock()
-                               if err != nil {
-                                       log.Printf("Error stopping container %s with %v %v: %v %v",
-                                               container.UUID, cmd.Path, cmd.Args, err, string(msg))
-                                       if squeueUpdater.CheckSqueue(container.UUID) {
-                                               log.Printf("Container %s is still in squeue after scancel.",
-                                                       container.UUID)
-                                               continue
-                                       }
-                               }
+               if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
+                       log.Printf("Canceling container %s", container.UUID)
+                       // Mutex between squeue sync and running sbatch or scancel.
+                       sqCheck.L.Lock()
+                       cmd := scancelCmd(container)
+                       msg, err := cmd.CombinedOutput()
+                       sqCheck.L.Unlock()
  
-                               err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
+                       if err != nil {
+                               log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
+                               if sqCheck.HasUUID(container.UUID) {
+                                       log.Printf("Container %s is still in squeue after scancel.", container.UUID)
+                                       continue
+                               }
                        }
+                       // Ignore errors; if necessary, we'll try again next time
+                       dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
                }
        }
        monitorDone = true