Merge branch '9848-copy-container-output' refs #9848
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 package main
2
3 import (
4         "bufio"
5         "log"
6         "os/exec"
7         "sync"
8         "time"
9 )
10
11 // Squeue implements asynchronous polling monitor of the SLURM queue using the
12 // command 'squeue'.
13 type Squeue struct {
14         squeueContents []string
15         squeueDone     chan struct{}
16         squeueCond     *sync.Cond
17         SlurmLock      sync.Mutex
18 }
19
20 // squeueFunc
21 func squeueFunc() *exec.Cmd {
22         return exec.Command("squeue", "--format=%j")
23 }
24
25 var squeueCmd = squeueFunc
26
27 // RunSqueue runs squeue once and captures the output.  If it succeeds, set
28 // "squeueContents" and then wake up any goroutines waiting squeueCond in
29 // CheckSqueue().  If there was an error, log it and leave the threads blocked.
30 func (squeue *Squeue) RunSqueue() {
31         var newSqueueContents []string
32
33         // Mutex between squeue sync and running sbatch or scancel.  This
34         // establishes a sequence so that squeue doesn't run concurrently with
35         // sbatch or scancel; the next update of squeue will occur only after
36         // sbatch or scancel has completed.
37         squeue.SlurmLock.Lock()
38         defer squeue.SlurmLock.Unlock()
39
40         // Also ensure unlock on all return paths
41
42         cmd := squeueCmd()
43         sq, err := cmd.StdoutPipe()
44         if err != nil {
45                 log.Printf("Error creating stdout pipe for squeue: %v", err)
46                 return
47         }
48         cmd.Start()
49         scanner := bufio.NewScanner(sq)
50         for scanner.Scan() {
51                 newSqueueContents = append(newSqueueContents, scanner.Text())
52         }
53         if err := scanner.Err(); err != nil {
54                 cmd.Wait()
55                 log.Printf("Error reading from squeue pipe: %v", err)
56                 return
57         }
58
59         err = cmd.Wait()
60         if err != nil {
61                 log.Printf("Error running squeue: %v", err)
62                 return
63         }
64
65         squeue.squeueCond.L.Lock()
66         squeue.squeueContents = newSqueueContents
67         squeue.squeueCond.Broadcast()
68         squeue.squeueCond.L.Unlock()
69 }
70
71 // CheckSqueue checks if a given container UUID is in the slurm queue.  This
72 // does not run squeue directly, but instead blocks until woken up by next
73 // successful update of squeue.
74 func (squeue *Squeue) CheckSqueue(uuid string) bool {
75         squeue.squeueCond.L.Lock()
76         // block until next squeue broadcast signaling an update.
77         squeue.squeueCond.Wait()
78         contents := squeue.squeueContents
79         squeue.squeueCond.L.Unlock()
80
81         for _, k := range contents {
82                 if k == uuid {
83                         return true
84                 }
85         }
86         return false
87 }
88
89 // StartMonitor starts the squeue monitoring goroutine.
90 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
91         squeue.squeueCond = sync.NewCond(&sync.Mutex{})
92         squeue.squeueDone = make(chan struct{})
93         go squeue.SyncSqueue(pollInterval)
94 }
95
96 // Done stops the squeue monitoring goroutine.
97 func (squeue *Squeue) Done() {
98         squeue.squeueDone <- struct{}{}
99         close(squeue.squeueDone)
100 }
101
102 // SyncSqueue periodically polls RunSqueue() at the given duration until
103 // terminated by calling Done().
104 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
105         ticker := time.NewTicker(pollInterval)
106         for {
107                 select {
108                 case <-squeue.squeueDone:
109                         return
110                 case <-ticker.C:
111                         squeue.RunSqueue()
112                 }
113         }
114 }