13 // Squeue implements asynchronous polling monitor of the SLURM queue using the
16 squeueContents []string
17 squeueDone chan struct{}
23 func squeueFunc() *exec.Cmd {
24 return exec.Command("squeue", "--all", "--format=%j")
27 var squeueCmd = squeueFunc
29 // RunSqueue runs squeue once and captures the output. If it succeeds, set
30 // "squeueContents" and then wake up any goroutines waiting squeueCond in
31 // CheckSqueue(). If there was an error, log it and leave the threads blocked.
32 func (squeue *Squeue) RunSqueue() {
33 var newSqueueContents []string
35 // Mutex between squeue sync and running sbatch or scancel. This
36 // establishes a sequence so that squeue doesn't run concurrently with
37 // sbatch or scancel; the next update of squeue will occur only after
38 // sbatch or scancel has completed.
39 squeue.SlurmLock.Lock()
40 defer squeue.SlurmLock.Unlock()
42 // Also ensure unlock on all return paths
45 sq, err := cmd.StdoutPipe()
47 log.Printf("Error creating stdout pipe for squeue: %v", err)
51 stderrReader, err := cmd.StderrPipe()
53 log.Printf("Error creating stderr pipe for squeue: %v", err)
59 log.Printf("Error running squeue: %v", err)
63 stderrChan := make(chan []byte)
65 b, _ := ioutil.ReadAll(stderrReader)
70 scanner := bufio.NewScanner(sq)
72 newSqueueContents = append(newSqueueContents, scanner.Text())
74 io.Copy(ioutil.Discard, sq)
76 stderrmsg := <-stderrChan
80 if scanner.Err() != nil {
81 log.Printf("Error reading from squeue pipe: %v", err)
84 log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
87 if scanner.Err() == nil && err == nil {
88 squeue.squeueCond.L.Lock()
89 squeue.squeueContents = newSqueueContents
90 squeue.squeueCond.Broadcast()
91 squeue.squeueCond.L.Unlock()
95 // CheckSqueue checks if a given container UUID is in the slurm queue. This
96 // does not run squeue directly, but instead blocks until woken up by next
97 // successful update of squeue.
98 func (squeue *Squeue) CheckSqueue(uuid string) bool {
99 squeue.squeueCond.L.Lock()
100 // block until next squeue broadcast signaling an update.
101 squeue.squeueCond.Wait()
102 contents := squeue.squeueContents
103 squeue.squeueCond.L.Unlock()
105 for _, k := range contents {
113 // StartMonitor starts the squeue monitoring goroutine.
114 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
115 squeue.squeueCond = sync.NewCond(&sync.Mutex{})
116 squeue.squeueDone = make(chan struct{})
117 go squeue.SyncSqueue(pollInterval)
120 // Done stops the squeue monitoring goroutine.
121 func (squeue *Squeue) Done() {
122 squeue.squeueDone <- struct{}{}
123 close(squeue.squeueDone)
126 // SyncSqueue periodically polls RunSqueue() at the given duration until
127 // terminated by calling Done().
128 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
129 ticker := time.NewTicker(pollInterval)
132 case <-squeue.squeueDone: