12573: Translate Arvados priority to slurm niceness.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "os/exec"
12         "strings"
13         "sync"
14         "time"
15 )
16
17 // Squeue implements asynchronous polling monitor of the SLURM queue using the
18 // command 'squeue'.
19 type SqueueChecker struct {
20         Period    time.Duration
21         uuids     map[string]int
22         startOnce sync.Once
23         done      chan struct{}
24         sync.Cond
25 }
26
27 func squeueFunc() *exec.Cmd {
28         return exec.Command("squeue", "--all", "--format=%j %y")
29 }
30
31 var squeueCmd = squeueFunc
32
33 // HasUUID checks if a given container UUID is in the slurm queue.
34 // This does not run squeue directly, but instead blocks until woken
35 // up by next successful update of squeue.
36 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
37         sqc.startOnce.Do(sqc.start)
38
39         sqc.L.Lock()
40         defer sqc.L.Unlock()
41
42         // block until next squeue broadcast signaling an update.
43         sqc.Wait()
44         _, exists := sqc.uuids[uuid]
45         return exists
46 }
47
48 // GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
49 func (sqc *SqueueChecker) GetNiceness(uuid string) int {
50         sqc.startOnce.Do(sqc.start)
51
52         sqc.L.Lock()
53         defer sqc.L.Unlock()
54
55         n, exists := sqc.uuids[uuid]
56         if exists {
57                 return n
58         } else {
59                 return -1
60         }
61 }
62
63 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
64 // after calling Stop.
65 func (sqc *SqueueChecker) Stop() {
66         if sqc.done != nil {
67                 close(sqc.done)
68         }
69 }
70
71 // check gets the names of jobs in the SLURM queue (running and
72 // queued). If it succeeds, it updates squeue.uuids and wakes up any
73 // goroutines that are waiting in HasUUID() or All().
74 func (sqc *SqueueChecker) check() {
75         // Mutex between squeue sync and running sbatch or scancel.  This
76         // establishes a sequence so that squeue doesn't run concurrently with
77         // sbatch or scancel; the next update of squeue will occur only after
78         // sbatch or scancel has completed.
79         sqc.L.Lock()
80         defer sqc.L.Unlock()
81
82         cmd := squeueCmd()
83         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
84         cmd.Stdout, cmd.Stderr = stdout, stderr
85         if err := cmd.Run(); err != nil {
86                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
87                 return
88         }
89
90         lines := strings.Split(stdout.String(), "\n")
91         sqc.uuids = make(map[string]int, len(lines))
92         for _, line := range lines {
93                 var uuid string
94                 var nice int
95                 fmt.Sscan(line, &uuid, &nice)
96                 if uuid != "" {
97                         sqc.uuids[uuid] = nice
98                 }
99         }
100         sqc.Broadcast()
101 }
102
103 // Initialize, and start a goroutine to call check() once per
104 // squeue.Period until terminated by calling Stop().
105 func (sqc *SqueueChecker) start() {
106         sqc.L = &sync.Mutex{}
107         sqc.done = make(chan struct{})
108         go func() {
109                 ticker := time.NewTicker(sqc.Period)
110                 for {
111                         select {
112                         case <-sqc.done:
113                                 ticker.Stop()
114                                 return
115                         case <-ticker.C:
116                                 sqc.check()
117                         }
118                 }
119         }()
120 }
121
122 // All waits for the next squeue invocation, and returns all job
123 // names reported by squeue.
124 func (sqc *SqueueChecker) All() []string {
125         sqc.startOnce.Do(sqc.start)
126         sqc.L.Lock()
127         defer sqc.L.Unlock()
128         sqc.Wait()
129         var uuids []string
130         for uuid := range sqc.uuids {
131                 uuids = append(uuids, uuid)
132         }
133         return uuids
134 }