11 // Squeue implements asynchronous polling monitor of the SLURM queue using the
14 squeueContents []string
15 squeueDone chan struct{}
21 func squeueFunc() *exec.Cmd {
22 return exec.Command("squeue", "--all", "--format=%j")
25 var squeueCmd = squeueFunc
27 // RunSqueue runs squeue once and captures the output. If it succeeds, set
28 // "squeueContents" and then wake up any goroutines waiting squeueCond in
29 // CheckSqueue(). If there was an error, log it and leave the threads blocked.
30 func (squeue *Squeue) RunSqueue() {
31 var newSqueueContents []string
33 // Mutex between squeue sync and running sbatch or scancel. This
34 // establishes a sequence so that squeue doesn't run concurrently with
35 // sbatch or scancel; the next update of squeue will occur only after
36 // sbatch or scancel has completed.
37 squeue.SlurmLock.Lock()
38 defer squeue.SlurmLock.Unlock()
40 // Also ensure unlock on all return paths
43 sq, err := cmd.StdoutPipe()
45 log.Printf("Error creating stdout pipe for squeue: %v", err)
50 log.Printf("Error running squeue: %v", err)
53 scanner := bufio.NewScanner(sq)
55 newSqueueContents = append(newSqueueContents, scanner.Text())
57 if err := scanner.Err(); err != nil {
59 log.Printf("Error reading from squeue pipe: %v", err)
65 log.Printf("Error running squeue: %v", err)
69 squeue.squeueCond.L.Lock()
70 squeue.squeueContents = newSqueueContents
71 squeue.squeueCond.Broadcast()
72 squeue.squeueCond.L.Unlock()
75 // CheckSqueue checks if a given container UUID is in the slurm queue. This
76 // does not run squeue directly, but instead blocks until woken up by next
77 // successful update of squeue.
78 func (squeue *Squeue) CheckSqueue(uuid string) bool {
79 squeue.squeueCond.L.Lock()
80 // block until next squeue broadcast signaling an update.
81 squeue.squeueCond.Wait()
82 contents := squeue.squeueContents
83 squeue.squeueCond.L.Unlock()
85 for _, k := range contents {
93 // StartMonitor starts the squeue monitoring goroutine.
94 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
95 squeue.squeueCond = sync.NewCond(&sync.Mutex{})
96 squeue.squeueDone = make(chan struct{})
97 go squeue.SyncSqueue(pollInterval)
100 // Done stops the squeue monitoring goroutine.
101 func (squeue *Squeue) Done() {
102 squeue.squeueDone <- struct{}{}
103 close(squeue.squeueDone)
106 // SyncSqueue periodically polls RunSqueue() at the given duration until
107 // terminated by calling Done().
108 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
109 ticker := time.NewTicker(pollInterval)
112 case <-squeue.squeueDone: