type Squeue struct {
squeueContents []string
squeueDone chan struct{}
- squeueError error
squeueCond *sync.Cond
SlurmLock sync.Mutex
}
var squeueCmd = squeueFunc
-// RunSqueue runs squeue once and captures the output. If there is an error,
-// set "squeueError". If it succeeds, set "squeueContents" and then wake up
-// any goroutines waiting squeueCond in CheckSqueue().
-func (squeue *Squeue) RunSqueue() error {
+// RunSqueue runs squeue once and captures the output. If it succeeds, set
+// "squeueContents" and then wake up any goroutines waiting squeueCond in
+// CheckSqueue(). If there was an error, log it and leave the threads blocked.
+func (squeue *Squeue) RunSqueue() {
var newSqueueContents []string
// Mutex between squeue sync and running sbatch or scancel. This
// establishes a sequence so that squeue doesn't run concurrently with
// sbatch or scancel; the next update of squeue will occur only after
// sbatch or scancel has completed.
- squeueUpdater.SlurmLock.Lock()
- defer squeueUpdater.SlurmLock.Unlock()
+ squeue.SlurmLock.Lock()
+ defer squeue.SlurmLock.Unlock()
// Also ensure unlock on all return paths
- defer squeueUpdater.squeueCond.L.Unlock()
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error creating stdout pipe for squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
- return err
+ return
}
cmd.Start()
scanner := bufio.NewScanner(sq)
if err := scanner.Err(); err != nil {
cmd.Wait()
log.Printf("Error reading from squeue pipe: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
- return err
+ return
}
err = cmd.Wait()
if err != nil {
log.Printf("Error running squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
- return err
+ return
}
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = nil
- squeueUpdater.squeueContents = newSqueueContents
- squeueUpdater.squeueCond.Broadcast()
-
- return nil
+ squeue.squeueCond.L.Lock()
+ squeue.squeueContents = newSqueueContents
+ squeue.squeueCond.Broadcast()
+ squeue.squeueCond.L.Unlock()
}
// CheckSqueue checks if a given container UUID is in the slurm queue. This
// does not run squeue directly, but instead blocks until woken up by next
// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
- squeueUpdater.squeueCond.L.Lock()
+func (squeue *Squeue) CheckSqueue(uuid string) bool {
+ squeue.squeueCond.L.Lock()
// block until next squeue broadcast signaling an update.
- squeueUpdater.squeueCond.Wait()
- if squeueUpdater.squeueError != nil {
- e := squeueUpdater.squeueError
- squeueUpdater.squeueCond.L.Unlock()
- return false, e
- }
- contents := squeueUpdater.squeueContents
- squeueUpdater.squeueCond.L.Unlock()
+ squeue.squeueCond.Wait()
+ contents := squeue.squeueContents
+ squeue.squeueCond.L.Unlock()
for _, k := range contents {
if k == uuid {
- return true, nil
+ return true
}
}
- return false, nil
+ return false
}
// StartMonitor starts the squeue monitoring goroutine.
func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
- squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
- squeueUpdater.squeueDone = make(chan struct{})
- squeueUpdater.RunSqueue()
- go squeueUpdater.SyncSqueue(pollInterval)
+ squeue.squeueCond = sync.NewCond(&sync.Mutex{})
+ squeue.squeueDone = make(chan struct{})
+ go squeue.SyncSqueue(pollInterval)
}
// Done stops the squeue monitoring goroutine.
func (squeue *Squeue) Done() {
- squeueUpdater.squeueDone <- struct{}{}
- close(squeueUpdater.squeueDone)
+ squeue.squeueDone <- struct{}{}
+ close(squeue.squeueDone)
}
// SyncSqueue periodically polls RunSqueue() at the given duration until
ticker := time.NewTicker(pollInterval)
for {
select {
- case <-squeueUpdater.squeueDone:
+ case <-squeue.squeueDone:
return
case <-ticker.C:
- squeueUpdater.RunSqueue()
+ squeue.RunSqueue()
}
}
}