9187: Add documentation comments to Squeue functions.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 package main
2
3 import (
4         "bufio"
5         "log"
6         "os/exec"
7         "sync"
8         "time"
9 )
10
11 // Squeue implements asynchronous polling monitor of the SLURM queue using the
12 // command 'squeue'.
13 type Squeue struct {
14         squeueContents []string
15         squeueDone     chan struct{}
16         squeueError    error
17         squeueCond     *sync.Cond
18         SlurmLock      sync.Mutex
19 }
20
21 // squeueFunc
22 func squeueFunc() *exec.Cmd {
23         return exec.Command("squeue", "--format=%j")
24 }
25
26 var squeueCmd = squeueFunc
27
28 // RunSqueue runs squeue once and captures the output.  If there is an error,
29 // set "squeueError".  If it succeeds, set "squeueContents" and then wake up
30 // any goroutines waiting squeueCond in CheckSqueue().
31 func (squeue *Squeue) RunSqueue() error {
32         var newSqueueContents []string
33
34         // Mutex between squeue sync and running sbatch or scancel.  This
35         // establishes a sequence so that squeue doesn't run concurrently with
36         // sbatch or scancel; the next update of squeue will occur only after
37         // sbatch or scancel has completed.
38         squeueUpdater.SlurmLock.Lock()
39         defer squeueUpdater.SlurmLock.Unlock()
40
41         // Also ensure unlock on all return paths
42         defer squeueUpdater.squeueCond.L.Unlock()
43
44         cmd := squeueCmd()
45         sq, err := cmd.StdoutPipe()
46         if err != nil {
47                 log.Printf("Error creating stdout pipe for squeue: %v", err)
48                 squeueUpdater.squeueCond.L.Lock()
49                 squeueUpdater.squeueError = err
50                 return err
51         }
52         cmd.Start()
53         scanner := bufio.NewScanner(sq)
54         for scanner.Scan() {
55                 newSqueueContents = append(newSqueueContents, scanner.Text())
56         }
57         if err := scanner.Err(); err != nil {
58                 cmd.Wait()
59                 log.Printf("Error reading from squeue pipe: %v", err)
60                 squeueUpdater.squeueCond.L.Lock()
61                 squeueUpdater.squeueError = err
62                 return err
63         }
64
65         err = cmd.Wait()
66         if err != nil {
67                 log.Printf("Error running squeue: %v", err)
68                 squeueUpdater.squeueCond.L.Lock()
69                 squeueUpdater.squeueError = err
70                 return err
71         }
72
73         squeueUpdater.squeueCond.L.Lock()
74         squeueUpdater.squeueError = nil
75         squeueUpdater.squeueContents = newSqueueContents
76         squeueUpdater.squeueCond.Broadcast()
77
78         return nil
79 }
80
81 // CheckSqueue checks if a given container UUID is in the slurm queue.  This
82 // does not run squeue directly, but instead blocks until woken up by next
83 // successful update of squeue.
84 func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
85         squeueUpdater.squeueCond.L.Lock()
86         // block until next squeue broadcast signaling an update.
87         squeueUpdater.squeueCond.Wait()
88         if squeueUpdater.squeueError != nil {
89                 e := squeueUpdater.squeueError
90                 squeueUpdater.squeueCond.L.Unlock()
91                 return false, e
92         }
93         contents := squeueUpdater.squeueContents
94         squeueUpdater.squeueCond.L.Unlock()
95
96         for _, k := range contents {
97                 if k == uuid {
98                         return true, nil
99                 }
100         }
101         return false, nil
102 }
103
104 // StartMonitor starts the squeue monitoring goroutine.
105 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
106         squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
107         squeueUpdater.squeueDone = make(chan struct{})
108         squeueUpdater.RunSqueue()
109         go squeueUpdater.SyncSqueue(pollInterval)
110 }
111
112 // Done stops the squeue monitoring goroutine.
113 func (squeue *Squeue) Done() {
114         squeueUpdater.squeueDone <- struct{}{}
115         close(squeueUpdater.squeueDone)
116 }
117
118 // SyncSqueue periodically polls RunSqueue() at the given duration until
119 // terminated by calling Done().
120 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
121         ticker := time.NewTicker(pollInterval)
122         for {
123                 select {
124                 case <-squeueUpdater.squeueDone:
125                         return
126                 case <-ticker.C:
127                         squeueUpdater.RunSqueue()
128                 }
129         }
130 }