11 // Squeue implements asynchronous polling monitor of the SLURM queue using the
14 squeueContents []string
15 squeueDone chan struct{}
22 func squeueFunc() *exec.Cmd {
23 return exec.Command("squeue", "--format=%j")
26 var squeueCmd = squeueFunc
28 // RunSqueue runs squeue once and captures the output. If there is an error,
29 // set "squeueError". If it succeeds, set "squeueContents" and then wake up
30 // any goroutines waiting squeueCond in CheckSqueue().
31 func (squeue *Squeue) RunSqueue() error {
32 var newSqueueContents []string
34 // Mutex between squeue sync and running sbatch or scancel. This
35 // establishes a sequence so that squeue doesn't run concurrently with
36 // sbatch or scancel; the next update of squeue will occur only after
37 // sbatch or scancel has completed.
38 squeueUpdater.SlurmLock.Lock()
39 defer squeueUpdater.SlurmLock.Unlock()
41 // Also ensure unlock on all return paths
42 defer squeueUpdater.squeueCond.L.Unlock()
45 sq, err := cmd.StdoutPipe()
47 log.Printf("Error creating stdout pipe for squeue: %v", err)
48 squeueUpdater.squeueCond.L.Lock()
49 squeueUpdater.squeueError = err
53 scanner := bufio.NewScanner(sq)
55 newSqueueContents = append(newSqueueContents, scanner.Text())
57 if err := scanner.Err(); err != nil {
59 log.Printf("Error reading from squeue pipe: %v", err)
60 squeueUpdater.squeueCond.L.Lock()
61 squeueUpdater.squeueError = err
67 log.Printf("Error running squeue: %v", err)
68 squeueUpdater.squeueCond.L.Lock()
69 squeueUpdater.squeueError = err
73 squeueUpdater.squeueCond.L.Lock()
74 squeueUpdater.squeueError = nil
75 squeueUpdater.squeueContents = newSqueueContents
76 squeueUpdater.squeueCond.Broadcast()
81 // CheckSqueue checks if a given container UUID is in the slurm queue. This
82 // does not run squeue directly, but instead blocks until woken up by next
83 // successful update of squeue.
84 func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
85 squeueUpdater.squeueCond.L.Lock()
86 // block until next squeue broadcast signaling an update.
87 squeueUpdater.squeueCond.Wait()
88 if squeueUpdater.squeueError != nil {
89 e := squeueUpdater.squeueError
90 squeueUpdater.squeueCond.L.Unlock()
93 contents := squeueUpdater.squeueContents
94 squeueUpdater.squeueCond.L.Unlock()
96 for _, k := range contents {
104 // StartMonitor starts the squeue monitoring goroutine.
105 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
106 squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
107 squeueUpdater.squeueDone = make(chan struct{})
108 squeueUpdater.RunSqueue()
109 go squeueUpdater.SyncSqueue(pollInterval)
112 // Done stops the squeue monitoring goroutine.
113 func (squeue *Squeue) Done() {
114 squeueUpdater.squeueDone <- struct{}{}
115 close(squeueUpdater.squeueDone)
118 // SyncSqueue periodically polls RunSqueue() at the given duration until
119 // terminated by calling Done().
120 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
121 ticker := time.NewTicker(pollInterval)
124 case <-squeueUpdater.squeueDone:
127 squeueUpdater.RunSqueue()