// establishes a sequence so that squeue doesn't run concurrently with
// sbatch or scancel; the next update of squeue will occur only after
// sbatch or scancel has completed.
- squeueUpdater.SlurmLock.Lock()
- defer squeueUpdater.SlurmLock.Unlock()
+ squeue.SlurmLock.Lock()
+ defer squeue.SlurmLock.Unlock()
// Also ensure unlock on all return paths
- defer squeueUpdater.squeueCond.L.Unlock()
+ defer squeue.squeueCond.L.Unlock()
cmd := squeueCmd()
sq, err := cmd.StdoutPipe()
if err != nil {
log.Printf("Error creating stdout pipe for squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
cmd.Start()
if err := scanner.Err(); err != nil {
cmd.Wait()
log.Printf("Error reading from squeue pipe: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
err = cmd.Wait()
if err != nil {
log.Printf("Error running squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = err
return err
}
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = nil
- squeueUpdater.squeueContents = newSqueueContents
- squeueUpdater.squeueCond.Broadcast()
+ squeue.squeueCond.L.Lock()
+ squeue.squeueError = nil
+ squeue.squeueContents = newSqueueContents
+ squeue.squeueCond.Broadcast()
return nil
}
// does not run squeue directly, but instead blocks until woken up by next
// successful update of squeue.
func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
- squeueUpdater.squeueCond.L.Lock()
+ squeue.squeueCond.L.Lock()
// block until next squeue broadcast signaling an update.
- squeueUpdater.squeueCond.Wait()
- if squeueUpdater.squeueError != nil {
- e := squeueUpdater.squeueError
- squeueUpdater.squeueCond.L.Unlock()
+ squeue.squeueCond.Wait()
+ if squeue.squeueError != nil {
+ e := squeue.squeueError
+ squeue.squeueCond.L.Unlock()
return false, e
}
- contents := squeueUpdater.squeueContents
- squeueUpdater.squeueCond.L.Unlock()
+ contents := squeue.squeueContents
+ squeue.squeueCond.L.Unlock()
for _, k := range contents {
if k == uuid {
// StartMonitor starts the squeue monitoring goroutine.
func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
- squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
- squeueUpdater.squeueDone = make(chan struct{})
- squeueUpdater.RunSqueue()
- go squeueUpdater.SyncSqueue(pollInterval)
+ squeue.squeueCond = sync.NewCond(&sync.Mutex{})
+ squeue.squeueDone = make(chan struct{})
+ squeue.RunSqueue()
+ go squeue.SyncSqueue(pollInterval)
}
// Done stops the squeue monitoring goroutine.
func (squeue *Squeue) Done() {
- squeueUpdater.squeueDone <- struct{}{}
- close(squeueUpdater.squeueDone)
+ squeue.squeueDone <- struct{}{}
+ close(squeue.squeueDone)
}
// SyncSqueue periodically polls RunSqueue() at the given duration until
ticker := time.NewTicker(pollInterval)
for {
select {
- case <-squeueUpdater.squeueDone:
+ case <-squeue.squeueDone:
return
case <-ticker.C:
- squeueUpdater.RunSqueue()
+ squeue.RunSqueue()
}
}
}