type Squeue struct {
squeueContents []string
squeueDone chan struct{}
- squeueError error
squeueCond *sync.Cond
SlurmLock sync.Mutex
}
var squeueCmd = squeueFunc
-// RunSqueue runs squeue once and captures the output. If there is an error,
-// set "squeueError". If it succeeds, set "squeueContents" and then wake up
-// any goroutines waiting squeueCond in CheckSqueue().
-func (squeue *Squeue) RunSqueue() error {
+// RunSqueue runs squeue once and captures the output. If it succeeds, set
+// "squeueContents" and then wake up any goroutines waiting squeueCond in
+// CheckSqueue(). If there was an error, log it and leave the threads blocked.
+func (squeue *Squeue) RunSqueue() {
var newSqueueContents []string
// Mutex between squeue sync and running sbatch or scancel. This
defer squeue.SlurmLock.Unlock()
// Also ensure unlock on all return paths
- defer squeue.squeueCond.L.Unlock()
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error creating stdout pipe for squeue: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
cmd.Start()
scanner := bufio.NewScanner(sq)
if err := scanner.Err(); err != nil {
cmd.Wait()
log.Printf("Error reading from squeue pipe: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
err = cmd.Wait()
if err != nil {
log.Printf("Error running squeue: %v", err)
- squeue.squeueCond.L.Lock()
- squeue.squeueError = err
- return err
+ return
}
squeue.squeueCond.L.Lock()
- squeue.squeueError = nil
squeue.squeueContents = newSqueueContents
squeue.squeueCond.Broadcast()
-
- return nil
+ squeue.squeueCond.L.Unlock()
}
// CheckSqueue checks if a given container UUID is in the slurm queue. This
// does not run squeue directly, but instead blocks until woken up by next
// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+func (squeue *Squeue) CheckSqueue(uuid string) bool {
squeue.squeueCond.L.Lock()
// block until next squeue broadcast signaling an update.
squeue.squeueCond.Wait()
- if squeue.squeueError != nil {
- e := squeue.squeueError
- squeue.squeueCond.L.Unlock()
- return false, e
- }
contents := squeue.squeueContents
squeue.squeueCond.L.Unlock()
for _, k := range contents {
if k == uuid {
- return true, nil
+ return true
}
}
- return false, nil
+ return false
}
// StartMonitor starts the squeue monitoring goroutine.
func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
squeue.squeueCond = sync.NewCond(&sync.Mutex{})
squeue.squeueDone = make(chan struct{})
- squeue.RunSqueue()
go squeue.SyncSqueue(pollInterval)
}