+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"bytes"
+ "fmt"
"log"
- "os/exec"
+ "sort"
"strings"
"sync"
"time"
)
+type slurmJob struct {
+ uuid string
+ wantPriority int64
+ priority int64 // current slurm priority (incorporates nice value)
+ nice int64 // current slurm nice value
+}
+
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type SqueueChecker struct {
- Period time.Duration
- uuids map[string]bool
- startOnce sync.Once
- done chan struct{}
+ Period time.Duration
+ PrioritySpread int64
+ Slurm Slurm
+ queue map[string]*slurmJob
+ startOnce sync.Once
+ done chan struct{}
sync.Cond
}
-func squeueFunc() *exec.Cmd {
- return exec.Command("squeue", "--all", "--format=%j")
-}
-
-var squeueCmd = squeueFunc
-
// HasUUID checks if a given container UUID is in the slurm queue.
// This does not run squeue directly, but instead blocks until woken
// up by next successful update of squeue.
// block until next squeue broadcast signaling an update.
sqc.Wait()
- return sqc.uuids[uuid]
+ _, exists := sqc.queue[uuid]
+ return exists
+}
+
+// SetPriority sets or updates the desired (Arvados) priority for a
+// container.
+func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
+ sqc.startOnce.Do(sqc.start)
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+ job, ok := sqc.queue[uuid]
+ if !ok {
+ // Wait in case the slurm job was just submitted and
+ // will appear in the next squeue update.
+ sqc.Wait()
+ if job, ok = sqc.queue[uuid]; !ok {
+ return
+ }
+ }
+ job.wantPriority = want
+}
+
+// adjust slurm job nice values as needed to ensure slurm priority
+// order matches Arvados priority order.
+func (sqc *SqueueChecker) reniceAll() {
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+
+ jobs := make([]*slurmJob, 0, len(sqc.queue))
+ for _, j := range sqc.queue {
+ if j.wantPriority == 0 {
+ // SLURM job with unknown Arvados priority
+ // (perhaps it's not an Arvados job)
+ continue
+ }
+ jobs = append(jobs, j)
+ }
+
+ sort.Slice(jobs, func(i, j int) bool {
+ return jobs[i].wantPriority > jobs[j].wantPriority
+ })
+ renice := wantNice(jobs, sqc.PrioritySpread)
+ for i, job := range jobs {
+ if renice[i] == job.nice {
+ continue
+ }
+ log.Printf("updating slurm priority for %q: nice %d => %d", job.uuid, job.nice, renice[i])
+ sqc.Slurm.Renice(job.uuid, renice[i])
+ }
}
// Stop stops the squeue monitoring goroutine. Do not call HasUUID
}
// check gets the names of jobs in the SLURM queue (running and
-// queued). If it succeeds, it updates squeue.uuids and wakes up any
+// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
// Mutex between squeue sync and running sbatch or scancel. This
sqc.L.Lock()
defer sqc.L.Unlock()
- cmd := squeueCmd()
+ cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
return
}
- uuids := strings.Split(stdout.String(), "\n")
- sqc.uuids = make(map[string]bool, len(uuids))
- for _, uuid := range uuids {
- sqc.uuids[uuid] = true
+ lines := strings.Split(stdout.String(), "\n")
+ newq := make(map[string]*slurmJob, len(lines))
+ for _, line := range lines {
+ if line == "" {
+ continue
+ }
+ var uuid string
+ var n, p int64
+ if _, err := fmt.Sscan(line, &uuid, &n, &p); err != nil {
+ log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+ continue
+ }
+ replacing, ok := sqc.queue[uuid]
+ if !ok {
+ replacing = &slurmJob{uuid: uuid}
+ }
+ replacing.priority = p
+ replacing.nice = n
+ newq[uuid] = replacing
}
+ sqc.queue = newq
sqc.Broadcast()
}
return
case <-ticker.C:
sqc.check()
+ sqc.reniceAll()
}
}
}()
// All waits for the next squeue invocation, and returns all job
// names reported by squeue.
func (sqc *SqueueChecker) All() []string {
+ sqc.startOnce.Do(sqc.start)
sqc.L.Lock()
defer sqc.L.Unlock()
sqc.Wait()
var uuids []string
- for uuid := range sqc.uuids {
- uuids = append(uuids, uuid)
+ for u := range sqc.queue {
+ uuids = append(uuids, u)
}
return uuids
}