Merge branch '12552-slurm-priority'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
index f77023697e0f54ccaa12e2e7bc1bf3dd39f71509..23e4b3a8cb456aac06f644c0219148967586fe71 100644 (file)
@@ -25,6 +25,8 @@ import (
        "github.com/coreos/go-systemd/daemon"
 )
 
+const initialNiceValue int64 = 10000
+
 var (
        version           = "dev"
        defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -40,6 +42,7 @@ type Dispatcher struct {
 
        SbatchArguments []string
        PollPeriod      arvados.Duration
+       PrioritySpread  int64
 
        // crunch-run command to invoke. The container UUID will be
        // appended. If nil, []string{"crunch-run"} will be used.
@@ -154,8 +157,9 @@ func (disp *Dispatcher) setup() {
 
        disp.slurm = &slurmCLI{}
        disp.sqCheck = &SqueueChecker{
-               Period: time.Duration(disp.PollPeriod),
-               Slurm:  disp.slurm,
+               Period:         time.Duration(disp.PollPeriod),
+               PrioritySpread: disp.PrioritySpread,
+               Slurm:          disp.slurm,
        }
        disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
@@ -198,17 +202,6 @@ func (disp *Dispatcher) checkSqueueForOrphans() {
        }
 }
 
-func (disp *Dispatcher) niceness(priority int) int {
-       if priority > 1000 {
-               priority = 1000
-       }
-       if priority < 0 {
-               priority = 0
-       }
-       // Niceness range 1-10000
-       return (1000 - priority) * 10
-}
-
 func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
 
@@ -226,7 +219,7 @@ func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
-       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", initialNiceValue))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
@@ -327,12 +320,19 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                                log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                disp.scancel(ctr)
                        } else {
-                               disp.renice(updated)
+                               p := int64(updated.Priority)
+                               if p <= 1000 {
+                                       // API is providing
+                                       // user-assigned priority. If
+                                       // ctrs have equal priority,
+                                       // run the older one first.
+                                       p = int64(p)<<50 - (updated.CreatedAt.UnixNano() >> 14)
+                               }
+                               disp.sqCheck.SetPriority(ctr.UUID, p)
                        }
                }
        }
 }
-
 func (disp *Dispatcher) scancel(ctr arvados.Container) {
        disp.sqCheck.L.Lock()
        err := disp.slurm.Cancel(ctr.UUID)
@@ -347,28 +347,6 @@ func (disp *Dispatcher) scancel(ctr arvados.Container) {
        }
 }
 
-func (disp *Dispatcher) renice(ctr arvados.Container) {
-       nice := disp.niceness(ctr.Priority)
-       oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
-       if nice == oldnice || oldnice == -1 {
-               return
-       }
-       log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
-       disp.sqCheck.L.Lock()
-       err := disp.slurm.Renice(ctr.UUID, nice)
-       disp.sqCheck.L.Unlock()
-
-       if err != nil {
-               log.Printf("renice: %s", err)
-               time.Sleep(time.Second)
-               return
-       }
-       if disp.sqCheck.HasUUID(ctr.UUID) {
-               log.Printf("container %s has arvados priority %d, slurm nice %d",
-                       ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
-       }
-}
-
 func (disp *Dispatcher) readConfig(path string) error {
        err := config.LoadFile(disp, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {