Merge branch '12552-slurm-priority'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 1 Mar 2018 21:41:07 +0000 (16:41 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 1 Mar 2018 21:41:07 +0000 (16:41 -0500)
refs #12552

Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

1  2 
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go

index f77023697e0f54ccaa12e2e7bc1bf3dd39f71509,dab6025cd832fefdc1e5c0043913c078905bb2ef..23e4b3a8cb456aac06f644c0219148967586fe71
@@@ -25,6 -25,8 +25,8 @@@ import 
        "github.com/coreos/go-systemd/daemon"
  )
  
+ const initialNiceValue int64 = 10000
  var (
        version           = "dev"
        defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@@ -40,6 -42,7 +42,7 @@@ type Dispatcher struct 
  
        SbatchArguments []string
        PollPeriod      arvados.Duration
+       PrioritySpread  int64
  
        // crunch-run command to invoke. The container UUID will be
        // appended. If nil, []string{"crunch-run"} will be used.
        // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
        CrunchRunCommand []string
  
 +      // Extra RAM to reserve (in Bytes) for SLURM job, in addition
 +      // to the amount specified in the container's RuntimeConstraints
 +      ReserveExtraRAM int64
 +
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
  }
@@@ -154,8 -153,9 +157,9 @@@ func (disp *Dispatcher) setup() 
  
        disp.slurm = &slurmCLI{}
        disp.sqCheck = &SqueueChecker{
-               Period: time.Duration(disp.PollPeriod),
-               Slurm:  disp.slurm,
+               Period:         time.Duration(disp.PollPeriod),
+               PrioritySpread: disp.PrioritySpread,
+               Slurm:          disp.slurm,
        }
        disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
@@@ -198,19 -198,8 +202,8 @@@ func (disp *Dispatcher) checkSqueueForO
        }
  }
  
- func (disp *Dispatcher) niceness(priority int) int {
-       if priority > 1000 {
-               priority = 1000
-       }
-       if priority < 0 {
-               priority = 0
-       }
-       // Niceness range 1-10000
-       return (1000 - priority) * 10
- }
  func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
 -      mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
 +      mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
  
        var disk int64
        for _, m := range container.Mounts {
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
-       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", initialNiceValue))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
@@@ -327,12 -316,19 +320,19 @@@ func (disp *Dispatcher) runContainer(_ 
                                log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                disp.scancel(ctr)
                        } else {
-                               disp.renice(updated)
+                               p := int64(updated.Priority)
+                               if p <= 1000 {
+                                       // API is providing
+                                       // user-assigned priority. If
+                                       // ctrs have equal priority,
+                                       // run the older one first.
+                                       p = int64(p)<<50 - (updated.CreatedAt.UnixNano() >> 14)
+                               }
+                               disp.sqCheck.SetPriority(ctr.UUID, p)
                        }
                }
        }
  }
  func (disp *Dispatcher) scancel(ctr arvados.Container) {
        disp.sqCheck.L.Lock()
        err := disp.slurm.Cancel(ctr.UUID)
        }
  }
  
- func (disp *Dispatcher) renice(ctr arvados.Container) {
-       nice := disp.niceness(ctr.Priority)
-       oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
-       if nice == oldnice || oldnice == -1 {
-               return
-       }
-       log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
-       disp.sqCheck.L.Lock()
-       err := disp.slurm.Renice(ctr.UUID, nice)
-       disp.sqCheck.L.Unlock()
-       if err != nil {
-               log.Printf("renice: %s", err)
-               time.Sleep(time.Second)
-               return
-       }
-       if disp.sqCheck.HasUUID(ctr.UUID) {
-               log.Printf("container %s has arvados priority %d, slurm nice %d",
-                       ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
-       }
- }
  func (disp *Dispatcher) readConfig(path string) error {
        err := config.LoadFile(disp, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {