X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5cf8c18e735bb15da3f131e7ae57bb4b222bb4ed..a5ce43cd7493e17fe1bb5f67451957b283881228:/services/crunch-dispatch-slurm/squeue.go diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index e157469711..fafa3c3607 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -13,41 +13,37 @@ import ( type Squeue struct { squeueContents []string squeueDone chan struct{} - squeueError error squeueCond *sync.Cond SlurmLock sync.Mutex } // squeueFunc func squeueFunc() *exec.Cmd { - return exec.Command("squeue", "--format=%j") + return exec.Command("squeue", "--all", "--format=%j") } var squeueCmd = squeueFunc -// RunSqueue runs squeue once and captures the output. If there is an error, -// set "squeueError". If it succeeds, set "squeueContents" and then wake up -// any goroutines waiting squeueCond in CheckSqueue(). -func (squeue *Squeue) RunSqueue() error { +// RunSqueue runs squeue once and captures the output. If it succeeds, set +// "squeueContents" and then wake up any goroutines waiting squeueCond in +// CheckSqueue(). If there was an error, log it and leave the threads blocked. +func (squeue *Squeue) RunSqueue() { var newSqueueContents []string // Mutex between squeue sync and running sbatch or scancel. This // establishes a sequence so that squeue doesn't run concurrently with // sbatch or scancel; the next update of squeue will occur only after // sbatch or scancel has completed. - squeueUpdater.SlurmLock.Lock() - defer squeueUpdater.SlurmLock.Unlock() + squeue.SlurmLock.Lock() + defer squeue.SlurmLock.Unlock() // Also ensure unlock on all return paths - defer squeueUpdater.squeueCond.L.Unlock() cmd := squeueCmd() sq, err := cmd.StdoutPipe() if err != nil { log.Printf("Error creating stdout pipe for squeue: %v", err) - squeueUpdater.squeueCond.L.Lock() - squeueUpdater.squeueError = err - return err + return } cmd.Start() scanner := bufio.NewScanner(sq) @@ -57,62 +53,50 @@ func (squeue *Squeue) RunSqueue() error { if err := scanner.Err(); err != nil { cmd.Wait() log.Printf("Error reading from squeue pipe: %v", err) - squeueUpdater.squeueCond.L.Lock() - squeueUpdater.squeueError = err - return err + return } err = cmd.Wait() if err != nil { log.Printf("Error running squeue: %v", err) - squeueUpdater.squeueCond.L.Lock() - squeueUpdater.squeueError = err - return err + return } - squeueUpdater.squeueCond.L.Lock() - squeueUpdater.squeueError = nil - squeueUpdater.squeueContents = newSqueueContents - squeueUpdater.squeueCond.Broadcast() - - return nil + squeue.squeueCond.L.Lock() + squeue.squeueContents = newSqueueContents + squeue.squeueCond.Broadcast() + squeue.squeueCond.L.Unlock() } // CheckSqueue checks if a given container UUID is in the slurm queue. This // does not run squeue directly, but instead blocks until woken up by next // successful update of squeue. -func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) { - squeueUpdater.squeueCond.L.Lock() +func (squeue *Squeue) CheckSqueue(uuid string) bool { + squeue.squeueCond.L.Lock() // block until next squeue broadcast signaling an update. - squeueUpdater.squeueCond.Wait() - if squeueUpdater.squeueError != nil { - e := squeueUpdater.squeueError - squeueUpdater.squeueCond.L.Unlock() - return false, e - } - contents := squeueUpdater.squeueContents - squeueUpdater.squeueCond.L.Unlock() + squeue.squeueCond.Wait() + contents := squeue.squeueContents + squeue.squeueCond.L.Unlock() for _, k := range contents { if k == uuid { - return true, nil + return true } } - return false, nil + return false } // StartMonitor starts the squeue monitoring goroutine. func (squeue *Squeue) StartMonitor(pollInterval time.Duration) { - squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{}) - squeueUpdater.squeueDone = make(chan struct{}) - squeueUpdater.RunSqueue() - go squeueUpdater.SyncSqueue(pollInterval) + squeue.squeueCond = sync.NewCond(&sync.Mutex{}) + squeue.squeueDone = make(chan struct{}) + go squeue.SyncSqueue(pollInterval) } // Done stops the squeue monitoring goroutine. func (squeue *Squeue) Done() { - squeueUpdater.squeueDone <- struct{}{} - close(squeueUpdater.squeueDone) + squeue.squeueDone <- struct{}{} + close(squeue.squeueDone) } // SyncSqueue periodically polls RunSqueue() at the given duration until @@ -121,10 +105,10 @@ func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) { ticker := time.NewTicker(pollInterval) for { select { - case <-squeueUpdater.squeueDone: + case <-squeue.squeueDone: return case <-ticker.C: - squeueUpdater.RunSqueue() + squeue.RunSqueue() } } }