Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 package main
2
3 import (
4         "bufio"
5         "log"
6         "os/exec"
7         "sync"
8         "time"
9 )
10
11 // Squeue implements asynchronous polling monitor of the SLURM queue using the
12 // command 'squeue'.
13 type Squeue struct {
14         squeueContents []string
15         squeueDone     chan struct{}
16         squeueCond     *sync.Cond
17         SlurmLock      sync.Mutex
18 }
19
20 // squeueFunc
21 func squeueFunc() *exec.Cmd {
22         return exec.Command("squeue", "--all", "--format=%j")
23 }
24
25 var squeueCmd = squeueFunc
26
27 // RunSqueue runs squeue once and captures the output.  If it succeeds, set
28 // "squeueContents" and then wake up any goroutines waiting squeueCond in
29 // CheckSqueue().  If there was an error, log it and leave the threads blocked.
30 func (squeue *Squeue) RunSqueue() {
31         var newSqueueContents []string
32
33         // Mutex between squeue sync and running sbatch or scancel.  This
34         // establishes a sequence so that squeue doesn't run concurrently with
35         // sbatch or scancel; the next update of squeue will occur only after
36         // sbatch or scancel has completed.
37         squeue.SlurmLock.Lock()
38         defer squeue.SlurmLock.Unlock()
39
40         // Also ensure unlock on all return paths
41
42         cmd := squeueCmd()
43         sq, err := cmd.StdoutPipe()
44         if err != nil {
45                 log.Printf("Error creating stdout pipe for squeue: %v", err)
46                 return
47         }
48         err = cmd.Start()
49         if err != nil {
50                 log.Printf("Error running squeue: %v", err)
51                 return
52         }
53         scanner := bufio.NewScanner(sq)
54         for scanner.Scan() {
55                 newSqueueContents = append(newSqueueContents, scanner.Text())
56         }
57         if err := scanner.Err(); err != nil {
58                 cmd.Wait()
59                 log.Printf("Error reading from squeue pipe: %v", err)
60                 return
61         }
62
63         err = cmd.Wait()
64         if err != nil {
65                 log.Printf("Error running squeue: %v", err)
66                 return
67         }
68
69         squeue.squeueCond.L.Lock()
70         squeue.squeueContents = newSqueueContents
71         squeue.squeueCond.Broadcast()
72         squeue.squeueCond.L.Unlock()
73 }
74
75 // CheckSqueue checks if a given container UUID is in the slurm queue.  This
76 // does not run squeue directly, but instead blocks until woken up by next
77 // successful update of squeue.
78 func (squeue *Squeue) CheckSqueue(uuid string) bool {
79         squeue.squeueCond.L.Lock()
80         // block until next squeue broadcast signaling an update.
81         squeue.squeueCond.Wait()
82         contents := squeue.squeueContents
83         squeue.squeueCond.L.Unlock()
84
85         for _, k := range contents {
86                 if k == uuid {
87                         return true
88                 }
89         }
90         return false
91 }
92
93 // StartMonitor starts the squeue monitoring goroutine.
94 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
95         squeue.squeueCond = sync.NewCond(&sync.Mutex{})
96         squeue.squeueDone = make(chan struct{})
97         go squeue.SyncSqueue(pollInterval)
98 }
99
100 // Done stops the squeue monitoring goroutine.
101 func (squeue *Squeue) Done() {
102         squeue.squeueDone <- struct{}{}
103         close(squeue.squeueDone)
104 }
105
106 // SyncSqueue periodically polls RunSqueue() at the given duration until
107 // terminated by calling Done().
108 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
109         ticker := time.NewTicker(pollInterval)
110         for {
111                 select {
112                 case <-squeue.squeueDone:
113                         return
114                 case <-ticker.C:
115                         squeue.RunSqueue()
116                 }
117         }
118 }