Merge branch '12573-crunch2-slurm-priority' closes #12573
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "os/exec"
12         "strings"
13         "sync"
14         "time"
15 )
16
17 type jobPriority struct {
18         niceness        int
19         currentPriority int
20 }
21
22 // Squeue implements asynchronous polling monitor of the SLURM queue using the
23 // command 'squeue'.
24 type SqueueChecker struct {
25         Period    time.Duration
26         uuids     map[string]jobPriority
27         startOnce sync.Once
28         done      chan struct{}
29         sync.Cond
30 }
31
32 func squeueFunc() *exec.Cmd {
33         return exec.Command("squeue", "--all", "--format=%j %y %Q")
34 }
35
36 var squeueCmd = squeueFunc
37
38 // HasUUID checks if a given container UUID is in the slurm queue.
39 // This does not run squeue directly, but instead blocks until woken
40 // up by next successful update of squeue.
41 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
42         sqc.startOnce.Do(sqc.start)
43
44         sqc.L.Lock()
45         defer sqc.L.Unlock()
46
47         // block until next squeue broadcast signaling an update.
48         sqc.Wait()
49         _, exists := sqc.uuids[uuid]
50         return exists
51 }
52
53 // GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
54 func (sqc *SqueueChecker) GetNiceness(uuid string) int {
55         sqc.startOnce.Do(sqc.start)
56
57         sqc.L.Lock()
58         defer sqc.L.Unlock()
59
60         n, exists := sqc.uuids[uuid]
61         if exists {
62                 return n.niceness
63         } else {
64                 return -1
65         }
66 }
67
68 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
69 // after calling Stop.
70 func (sqc *SqueueChecker) Stop() {
71         if sqc.done != nil {
72                 close(sqc.done)
73         }
74 }
75
76 // check gets the names of jobs in the SLURM queue (running and
77 // queued). If it succeeds, it updates squeue.uuids and wakes up any
78 // goroutines that are waiting in HasUUID() or All().
79 func (sqc *SqueueChecker) check() {
80         // Mutex between squeue sync and running sbatch or scancel.  This
81         // establishes a sequence so that squeue doesn't run concurrently with
82         // sbatch or scancel; the next update of squeue will occur only after
83         // sbatch or scancel has completed.
84         sqc.L.Lock()
85         defer sqc.L.Unlock()
86
87         cmd := squeueCmd()
88         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
89         cmd.Stdout, cmd.Stderr = stdout, stderr
90         if err := cmd.Run(); err != nil {
91                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
92                 return
93         }
94
95         lines := strings.Split(stdout.String(), "\n")
96         sqc.uuids = make(map[string]jobPriority, len(lines))
97         for _, line := range lines {
98                 var uuid string
99                 var nice int
100                 var prio int
101                 fmt.Sscan(line, &uuid, &nice, &prio)
102                 if uuid != "" {
103                         sqc.uuids[uuid] = jobPriority{nice, prio}
104                 }
105         }
106         sqc.Broadcast()
107 }
108
109 // Initialize, and start a goroutine to call check() once per
110 // squeue.Period until terminated by calling Stop().
111 func (sqc *SqueueChecker) start() {
112         sqc.L = &sync.Mutex{}
113         sqc.done = make(chan struct{})
114         go func() {
115                 ticker := time.NewTicker(sqc.Period)
116                 for {
117                         select {
118                         case <-sqc.done:
119                                 ticker.Stop()
120                                 return
121                         case <-ticker.C:
122                                 sqc.check()
123                         }
124                 }
125         }()
126 }
127
128 // All waits for the next squeue invocation, and returns all job
129 // names reported by squeue.
130 func (sqc *SqueueChecker) All() []string {
131         sqc.startOnce.Do(sqc.start)
132         sqc.L.Lock()
133         defer sqc.L.Unlock()
134         sqc.Wait()
135         var uuids []string
136         for uuid := range sqc.uuids {
137                 uuids = append(uuids, uuid)
138         }
139         return uuids
140 }