13063: Use upstream azure-sdk-for-go.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "strings"
12         "sync"
13         "time"
14 )
15
16 type jobPriority struct {
17         niceness        int
18         currentPriority int
19 }
20
21 // Squeue implements asynchronous polling monitor of the SLURM queue using the
22 // command 'squeue'.
23 type SqueueChecker struct {
24         Period    time.Duration
25         Slurm     Slurm
26         uuids     map[string]jobPriority
27         startOnce sync.Once
28         done      chan struct{}
29         sync.Cond
30 }
31
32 // HasUUID checks if a given container UUID is in the slurm queue.
33 // This does not run squeue directly, but instead blocks until woken
34 // up by next successful update of squeue.
35 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
36         sqc.startOnce.Do(sqc.start)
37
38         sqc.L.Lock()
39         defer sqc.L.Unlock()
40
41         // block until next squeue broadcast signaling an update.
42         sqc.Wait()
43         _, exists := sqc.uuids[uuid]
44         return exists
45 }
46
47 // GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
48 func (sqc *SqueueChecker) GetNiceness(uuid string) int {
49         sqc.startOnce.Do(sqc.start)
50
51         sqc.L.Lock()
52         defer sqc.L.Unlock()
53
54         n, exists := sqc.uuids[uuid]
55         if exists {
56                 return n.niceness
57         } else {
58                 return -1
59         }
60 }
61
62 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
63 // after calling Stop.
64 func (sqc *SqueueChecker) Stop() {
65         if sqc.done != nil {
66                 close(sqc.done)
67         }
68 }
69
70 // check gets the names of jobs in the SLURM queue (running and
71 // queued). If it succeeds, it updates squeue.uuids and wakes up any
72 // goroutines that are waiting in HasUUID() or All().
73 func (sqc *SqueueChecker) check() {
74         // Mutex between squeue sync and running sbatch or scancel.  This
75         // establishes a sequence so that squeue doesn't run concurrently with
76         // sbatch or scancel; the next update of squeue will occur only after
77         // sbatch or scancel has completed.
78         sqc.L.Lock()
79         defer sqc.L.Unlock()
80
81         cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
82         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
83         cmd.Stdout, cmd.Stderr = stdout, stderr
84         if err := cmd.Run(); err != nil {
85                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
86                 return
87         }
88
89         lines := strings.Split(stdout.String(), "\n")
90         sqc.uuids = make(map[string]jobPriority, len(lines))
91         for _, line := range lines {
92                 var uuid string
93                 var nice int
94                 var prio int
95                 fmt.Sscan(line, &uuid, &nice, &prio)
96                 if uuid != "" {
97                         sqc.uuids[uuid] = jobPriority{nice, prio}
98                 }
99         }
100         sqc.Broadcast()
101 }
102
103 // Initialize, and start a goroutine to call check() once per
104 // squeue.Period until terminated by calling Stop().
105 func (sqc *SqueueChecker) start() {
106         sqc.L = &sync.Mutex{}
107         sqc.done = make(chan struct{})
108         go func() {
109                 ticker := time.NewTicker(sqc.Period)
110                 for {
111                         select {
112                         case <-sqc.done:
113                                 ticker.Stop()
114                                 return
115                         case <-ticker.C:
116                                 sqc.check()
117                         }
118                 }
119         }()
120 }
121
122 // All waits for the next squeue invocation, and returns all job
123 // names reported by squeue.
124 func (sqc *SqueueChecker) All() []string {
125         sqc.startOnce.Do(sqc.start)
126         sqc.L.Lock()
127         defer sqc.L.Unlock()
128         sqc.Wait()
129         var uuids []string
130         for uuid := range sqc.uuids {
131                 uuids = append(uuids, uuid)
132         }
133         return uuids
134 }