Merge branch '12573-crunch2-slurm-priority' closes #12573
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 4 Dec 2017 21:32:04 +0000 (16:32 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 4 Dec 2017 21:32:11 +0000 (16:32 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

1  2 
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go

index d322b0f3f6dcf53735df8e920f07d0329397bbbf,9dea3cbe9294666d9b2eb08c2bc6d2d9b052caab..3d094c7f6e3a68f745f02be1950cc76eb8fdecd2
@@@ -26,8 -26,6 +26,8 @@@ import 
        "github.com/coreos/go-systemd/daemon"
  )
  
 +var version = "dev"
 +
  // Config used by crunch-dispatch-slurm
  type Config struct {
        Client arvados.Client
@@@ -71,21 -69,10 +71,21 @@@ func doMain() error 
                "dump-config",
                false,
                "write current configuration to stdout and exit")
 -
 +      getVersion := flags.Bool(
 +              "version",
 +              false,
 +              "Print version information and exit.")
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
  
 +      // Print version information if requested
 +      if *getVersion {
 +              fmt.Printf("crunch-dispatch-slurm %s\n", version)
 +              return nil
 +      }
 +
 +      log.Printf("crunch-dispatch-slurm %s started", version)
 +
        err := readConfig(&theConfig, *configPath)
        if err != nil {
                return err
@@@ -164,6 -151,17 +164,17 @@@ func checkSqueueForOrphans(dispatcher *
        }
  }
  
+ func niceness(priority int) int {
+       if priority > 1000 {
+               priority = 1000
+       }
+       if priority < 0 {
+               priority = 0
+       }
+       // Niceness range 1-10000
+       return (1000 - priority) * 10
+ }
  // sbatchCmd
  func sbatchFunc(container arvados.Container) *exec.Cmd {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
@@@ -194,9 -193,15 +206,15 @@@ func scancelFunc(container arvados.Cont
        return exec.Command("scancel", "--name="+container.UUID)
  }
  
+ // scontrolCmd
+ func scontrolFunc(container arvados.Container) *exec.Cmd {
+       return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+ }
  // Wrap these so that they can be overridden by tests
  var sbatchCmd = sbatchFunc
  var scancelCmd = scancelFunc
+ var scontrolCmd = scontrolFunc
  
  // Submit job to slurm using sbatch.
  func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
@@@ -290,6 -295,10 +308,10 @@@ func run(disp *dispatch.Dispatcher, ct
                        } else if updated.Priority == 0 {
                                log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                scancel(ctr)
+                       } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
+                               // dynamically adjust priority
+                               log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
+                               scontrolUpdate(updated)
                        }
                }
        }
@@@ -310,6 -319,21 +332,21 @@@ func scancel(ctr arvados.Container) 
        }
  }
  
+ func scontrolUpdate(ctr arvados.Container) {
+       sqCheck.L.Lock()
+       cmd := scontrolCmd(ctr)
+       msg, err := cmd.CombinedOutput()
+       sqCheck.L.Unlock()
+       if err != nil {
+               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               time.Sleep(time.Second)
+       } else if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("Container %s priority is now %v, niceness is now %v",
+                       ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+       }
+ }
  func readConfig(dst interface{}, path string) error {
        err := config.LoadFile(dst, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {