12891: Refactor slurm commands.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "strings"
12         "sync"
13         "time"
14 )
15
16 type jobPriority struct {
17         niceness        int
18         currentPriority int
19 }
20
21 // Squeue implements asynchronous polling monitor of the SLURM queue using the
22 // command 'squeue'.
23 type SqueueChecker struct {
24         Period    time.Duration
25         uuids     map[string]jobPriority
26         startOnce sync.Once
27         done      chan struct{}
28         sync.Cond
29 }
30
31 // HasUUID checks if a given container UUID is in the slurm queue.
32 // This does not run squeue directly, but instead blocks until woken
33 // up by next successful update of squeue.
34 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
35         sqc.startOnce.Do(sqc.start)
36
37         sqc.L.Lock()
38         defer sqc.L.Unlock()
39
40         // block until next squeue broadcast signaling an update.
41         sqc.Wait()
42         _, exists := sqc.uuids[uuid]
43         return exists
44 }
45
46 // GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
47 func (sqc *SqueueChecker) GetNiceness(uuid string) int {
48         sqc.startOnce.Do(sqc.start)
49
50         sqc.L.Lock()
51         defer sqc.L.Unlock()
52
53         n, exists := sqc.uuids[uuid]
54         if exists {
55                 return n.niceness
56         } else {
57                 return -1
58         }
59 }
60
61 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
62 // after calling Stop.
63 func (sqc *SqueueChecker) Stop() {
64         if sqc.done != nil {
65                 close(sqc.done)
66         }
67 }
68
69 // check gets the names of jobs in the SLURM queue (running and
70 // queued). If it succeeds, it updates squeue.uuids and wakes up any
71 // goroutines that are waiting in HasUUID() or All().
72 func (sqc *SqueueChecker) check() {
73         // Mutex between squeue sync and running sbatch or scancel.  This
74         // establishes a sequence so that squeue doesn't run concurrently with
75         // sbatch or scancel; the next update of squeue will occur only after
76         // sbatch or scancel has completed.
77         sqc.L.Lock()
78         defer sqc.L.Unlock()
79
80         cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
81         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
82         cmd.Stdout, cmd.Stderr = stdout, stderr
83         if err := cmd.Run(); err != nil {
84                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
85                 return
86         }
87
88         lines := strings.Split(stdout.String(), "\n")
89         sqc.uuids = make(map[string]jobPriority, len(lines))
90         for _, line := range lines {
91                 var uuid string
92                 var nice int
93                 var prio int
94                 fmt.Sscan(line, &uuid, &nice, &prio)
95                 if uuid != "" {
96                         sqc.uuids[uuid] = jobPriority{nice, prio}
97                 }
98         }
99         sqc.Broadcast()
100 }
101
102 // Initialize, and start a goroutine to call check() once per
103 // squeue.Period until terminated by calling Stop().
104 func (sqc *SqueueChecker) start() {
105         sqc.L = &sync.Mutex{}
106         sqc.done = make(chan struct{})
107         go func() {
108                 ticker := time.NewTicker(sqc.Period)
109                 for {
110                         select {
111                         case <-sqc.done:
112                                 ticker.Stop()
113                                 return
114                         case <-ticker.C:
115                                 sqc.check()
116                         }
117                 }
118         }()
119 }
120
121 // All waits for the next squeue invocation, and returns all job
122 // names reported by squeue.
123 func (sqc *SqueueChecker) All() []string {
124         sqc.startOnce.Do(sqc.start)
125         sqc.L.Lock()
126         defer sqc.L.Unlock()
127         sqc.Wait()
128         var uuids []string
129         for uuid := range sqc.uuids {
130                 uuids = append(uuids, uuid)
131         }
132         return uuids
133 }