11 // Squeue implements asynchronous polling monitor of the SLURM queue using the
14 squeueContents []string
15 squeueDone chan struct{}
21 func squeueFunc() *exec.Cmd {
22 return exec.Command("squeue", "--format=%j")
25 var squeueCmd = squeueFunc
27 // RunSqueue runs squeue once and captures the output. If it succeeds, set
28 // "squeueContents" and then wake up any goroutines waiting squeueCond in
29 // CheckSqueue(). If there was an error, log it and leave the threads blocked.
30 func (squeue *Squeue) RunSqueue() {
31 var newSqueueContents []string
33 // Mutex between squeue sync and running sbatch or scancel. This
34 // establishes a sequence so that squeue doesn't run concurrently with
35 // sbatch or scancel; the next update of squeue will occur only after
36 // sbatch or scancel has completed.
37 squeue.SlurmLock.Lock()
38 defer squeue.SlurmLock.Unlock()
40 // Also ensure unlock on all return paths
43 sq, err := cmd.StdoutPipe()
45 log.Printf("Error creating stdout pipe for squeue: %v", err)
49 scanner := bufio.NewScanner(sq)
51 newSqueueContents = append(newSqueueContents, scanner.Text())
53 if err := scanner.Err(); err != nil {
55 log.Printf("Error reading from squeue pipe: %v", err)
61 log.Printf("Error running squeue: %v", err)
65 squeue.squeueCond.L.Lock()
66 squeue.squeueContents = newSqueueContents
67 squeue.squeueCond.Broadcast()
68 squeue.squeueCond.L.Unlock()
71 // CheckSqueue checks if a given container UUID is in the slurm queue. This
72 // does not run squeue directly, but instead blocks until woken up by next
73 // successful update of squeue.
74 func (squeue *Squeue) CheckSqueue(uuid string) bool {
75 squeue.squeueCond.L.Lock()
76 // block until next squeue broadcast signaling an update.
77 squeue.squeueCond.Wait()
78 contents := squeue.squeueContents
79 squeue.squeueCond.L.Unlock()
81 for _, k := range contents {
89 // StartMonitor starts the squeue monitoring goroutine.
90 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
91 squeue.squeueCond = sync.NewCond(&sync.Mutex{})
92 squeue.squeueDone = make(chan struct{})
93 go squeue.SyncSqueue(pollInterval)
96 // Done stops the squeue monitoring goroutine.
97 func (squeue *Squeue) Done() {
98 squeue.squeueDone <- struct{}{}
99 close(squeue.squeueDone)
102 // SyncSqueue periodically polls RunSqueue() at the given duration until
103 // terminated by calling Done().
104 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
105 ticker := time.NewTicker(pollInterval)
108 case <-squeue.squeueDone: