+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"bytes"
+ "fmt"
"log"
- "os/exec"
"strings"
"sync"
"time"
)
+type jobPriority struct {
+ niceness int
+ currentPriority int
+}
+
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type SqueueChecker struct {
Period time.Duration
- uuids map[string]bool
+ Slurm Slurm
+ uuids map[string]jobPriority
startOnce sync.Once
done chan struct{}
sync.Cond
}
-func squeueFunc() *exec.Cmd {
- return exec.Command("squeue", "--all", "--format=%j")
-}
-
-var squeueCmd = squeueFunc
-
// HasUUID checks if a given container UUID is in the slurm queue.
// This does not run squeue directly, but instead blocks until woken
// up by next successful update of squeue.
// block until next squeue broadcast signaling an update.
sqc.Wait()
- return sqc.uuids[uuid]
+ _, exists := sqc.uuids[uuid]
+ return exists
+}
+
+// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
+func (sqc *SqueueChecker) GetNiceness(uuid string) int {
+ sqc.startOnce.Do(sqc.start)
+
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+
+ n, exists := sqc.uuids[uuid]
+ if exists {
+ return n.niceness
+ } else {
+ return -1
+ }
}
// Stop stops the squeue monitoring goroutine. Do not call HasUUID
// check gets the names of jobs in the SLURM queue (running and
// queued). If it succeeds, it updates squeue.uuids and wakes up any
-// goroutines that are waiting in HasUUID().
+// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
// Mutex between squeue sync and running sbatch or scancel. This
// establishes a sequence so that squeue doesn't run concurrently with
sqc.L.Lock()
defer sqc.L.Unlock()
- cmd := squeueCmd()
+ cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
return
}
- uuids := strings.Split(stdout.String(), "\n")
- sqc.uuids = make(map[string]bool, len(uuids))
- for _, uuid := range uuids {
- sqc.uuids[uuid] = true
+ lines := strings.Split(stdout.String(), "\n")
+ sqc.uuids = make(map[string]jobPriority, len(lines))
+ for _, line := range lines {
+ var uuid string
+ var nice int
+ var prio int
+ fmt.Sscan(line, &uuid, &nice, &prio)
+ if uuid != "" {
+ sqc.uuids[uuid] = jobPriority{nice, prio}
+ }
}
sqc.Broadcast()
}
}()
}
-// All Uuids in squeue
-func (sqc *SqueueChecker) AllUuids() map[string]bool {
- return sqc.uuids
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+ sqc.startOnce.Do(sqc.start)
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+ sqc.Wait()
+ var uuids []string
+ for uuid := range sqc.uuids {
+ uuids = append(uuids, uuid)
+ }
+ return uuids
}