8460: Merge branch 'master' into 8460-websocket-go
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 package main
2
3 import (
4         "bufio"
5         "io"
6         "io/ioutil"
7         "log"
8         "os/exec"
9         "sync"
10         "time"
11 )
12
13 // Squeue implements asynchronous polling monitor of the SLURM queue using the
14 // command 'squeue'.
15 type Squeue struct {
16         squeueContents []string
17         squeueDone     chan struct{}
18         squeueCond     *sync.Cond
19         SlurmLock      sync.Mutex
20 }
21
22 // squeueFunc
23 func squeueFunc() *exec.Cmd {
24         return exec.Command("squeue", "--all", "--format=%j")
25 }
26
27 var squeueCmd = squeueFunc
28
29 // RunSqueue runs squeue once and captures the output.  If it succeeds, set
30 // "squeueContents" and then wake up any goroutines waiting squeueCond in
31 // CheckSqueue().  If there was an error, log it and leave the threads blocked.
32 func (squeue *Squeue) RunSqueue() {
33         var newSqueueContents []string
34
35         // Mutex between squeue sync and running sbatch or scancel.  This
36         // establishes a sequence so that squeue doesn't run concurrently with
37         // sbatch or scancel; the next update of squeue will occur only after
38         // sbatch or scancel has completed.
39         squeue.SlurmLock.Lock()
40         defer squeue.SlurmLock.Unlock()
41
42         // Also ensure unlock on all return paths
43
44         cmd := squeueCmd()
45         sq, err := cmd.StdoutPipe()
46         if err != nil {
47                 log.Printf("Error creating stdout pipe for squeue: %v", err)
48                 return
49         }
50
51         stderrReader, err := cmd.StderrPipe()
52         if err != nil {
53                 log.Printf("Error creating stderr pipe for squeue: %v", err)
54                 return
55         }
56
57         err = cmd.Start()
58         if err != nil {
59                 log.Printf("Error running squeue: %v", err)
60                 return
61         }
62
63         stderrChan := make(chan []byte)
64         go func() {
65                 b, _ := ioutil.ReadAll(stderrReader)
66                 stderrChan <- b
67                 close(stderrChan)
68         }()
69
70         scanner := bufio.NewScanner(sq)
71         for scanner.Scan() {
72                 newSqueueContents = append(newSqueueContents, scanner.Text())
73         }
74         io.Copy(ioutil.Discard, sq)
75
76         stderrmsg := <-stderrChan
77
78         err = cmd.Wait()
79
80         if scanner.Err() != nil {
81                 log.Printf("Error reading from squeue pipe: %v", err)
82         }
83         if err != nil {
84                 log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
85         }
86
87         if scanner.Err() == nil && err == nil {
88                 squeue.squeueCond.L.Lock()
89                 squeue.squeueContents = newSqueueContents
90                 squeue.squeueCond.Broadcast()
91                 squeue.squeueCond.L.Unlock()
92         }
93 }
94
95 // CheckSqueue checks if a given container UUID is in the slurm queue.  This
96 // does not run squeue directly, but instead blocks until woken up by next
97 // successful update of squeue.
98 func (squeue *Squeue) CheckSqueue(uuid string) bool {
99         squeue.squeueCond.L.Lock()
100         // block until next squeue broadcast signaling an update.
101         squeue.squeueCond.Wait()
102         contents := squeue.squeueContents
103         squeue.squeueCond.L.Unlock()
104
105         for _, k := range contents {
106                 if k == uuid {
107                         return true
108                 }
109         }
110         return false
111 }
112
113 // StartMonitor starts the squeue monitoring goroutine.
114 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
115         squeue.squeueCond = sync.NewCond(&sync.Mutex{})
116         squeue.squeueDone = make(chan struct{})
117         go squeue.SyncSqueue(pollInterval)
118 }
119
120 // Done stops the squeue monitoring goroutine.
121 func (squeue *Squeue) Done() {
122         squeue.squeueDone <- struct{}{}
123         close(squeue.squeueDone)
124 }
125
126 // SyncSqueue periodically polls RunSqueue() at the given duration until
127 // terminated by calling Done().
128 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
129         ticker := time.NewTicker(pollInterval)
130         for {
131                 select {
132                 case <-squeue.squeueDone:
133                         return
134                 case <-ticker.C:
135                         squeue.RunSqueue()
136                 }
137         }
138 }