1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 type jobPriority struct {
22 // Squeue implements asynchronous polling monitor of the SLURM queue using the
24 type SqueueChecker struct {
26 uuids map[string]jobPriority
32 func squeueFunc() *exec.Cmd {
33 return exec.Command("squeue", "--all", "--format=%j %y %Q")
36 var squeueCmd = squeueFunc
38 // HasUUID checks if a given container UUID is in the slurm queue.
39 // This does not run squeue directly, but instead blocks until woken
40 // up by next successful update of squeue.
41 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
42 sqc.startOnce.Do(sqc.start)
47 // block until next squeue broadcast signaling an update.
49 _, exists := sqc.uuids[uuid]
53 // GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
54 func (sqc *SqueueChecker) GetNiceness(uuid string) int {
55 sqc.startOnce.Do(sqc.start)
60 n, exists := sqc.uuids[uuid]
68 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
69 // after calling Stop.
70 func (sqc *SqueueChecker) Stop() {
76 // check gets the names of jobs in the SLURM queue (running and
77 // queued). If it succeeds, it updates squeue.uuids and wakes up any
78 // goroutines that are waiting in HasUUID() or All().
79 func (sqc *SqueueChecker) check() {
80 // Mutex between squeue sync and running sbatch or scancel. This
81 // establishes a sequence so that squeue doesn't run concurrently with
82 // sbatch or scancel; the next update of squeue will occur only after
83 // sbatch or scancel has completed.
88 stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
89 cmd.Stdout, cmd.Stderr = stdout, stderr
90 if err := cmd.Run(); err != nil {
91 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
95 lines := strings.Split(stdout.String(), "\n")
96 sqc.uuids = make(map[string]jobPriority, len(lines))
97 for _, line := range lines {
101 fmt.Sscan(line, &uuid, &nice, &prio)
103 sqc.uuids[uuid] = jobPriority{nice, prio}
109 // Initialize, and start a goroutine to call check() once per
110 // squeue.Period until terminated by calling Stop().
111 func (sqc *SqueueChecker) start() {
112 sqc.L = &sync.Mutex{}
113 sqc.done = make(chan struct{})
115 ticker := time.NewTicker(sqc.Period)
128 // All waits for the next squeue invocation, and returns all job
129 // names reported by squeue.
130 func (sqc *SqueueChecker) All() []string {
131 sqc.startOnce.Do(sqc.start)
136 for uuid := range sqc.uuids {
137 uuids = append(uuids, uuid)