Merge branch '10211-double-close-crash' closes #10211
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index 3ee8b6f8b69a9c737127ff8d3cb4d06899b9ff90..61decde61c4bd61d0a92e96bde20ff0c82780f57 100644 (file)
@@ -13,22 +13,21 @@ import (
 type Squeue struct {
        squeueContents []string
        squeueDone     chan struct{}
-       squeueError    error
        squeueCond     *sync.Cond
        SlurmLock      sync.Mutex
 }
 
 // squeueFunc
 func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--format=%j")
+       return exec.Command("squeue", "--all", "--format=%j")
 }
 
 var squeueCmd = squeueFunc
 
-// RunSqueue runs squeue once and captures the output.  If there is an error,
-// set "squeueError".  If it succeeds, set "squeueContents" and then wake up
-// any goroutines waiting squeueCond in CheckSqueue().
-func (squeue *Squeue) RunSqueue() error {
+// RunSqueue runs squeue once and captures the output.  If it succeeds, set
+// "squeueContents" and then wake up any goroutines waiting squeueCond in
+// CheckSqueue().  If there was an error, log it and leave the threads blocked.
+func (squeue *Squeue) RunSqueue() {
        var newSqueueContents []string
 
        // Mutex between squeue sync and running sbatch or scancel.  This
@@ -39,17 +38,18 @@ func (squeue *Squeue) RunSqueue() error {
        defer squeue.SlurmLock.Unlock()
 
        // Also ensure unlock on all return paths
-       defer squeue.squeueCond.L.Unlock()
 
        cmd := squeueCmd()
        sq, err := cmd.StdoutPipe()
        if err != nil {
                log.Printf("Error creating stdout pipe for squeue: %v", err)
-               squeue.squeueCond.L.Lock()
-               squeue.squeueError = err
-               return err
+               return
+       }
+       err = cmd.Start()
+       if err != nil {
+               log.Printf("Error running squeue: %v", err)
+               return
        }
-       cmd.Start()
        scanner := bufio.NewScanner(sq)
        for scanner.Scan() {
                newSqueueContents = append(newSqueueContents, scanner.Text())
@@ -57,55 +57,43 @@ func (squeue *Squeue) RunSqueue() error {
        if err := scanner.Err(); err != nil {
                cmd.Wait()
                log.Printf("Error reading from squeue pipe: %v", err)
-               squeue.squeueCond.L.Lock()
-               squeue.squeueError = err
-               return err
+               return
        }
 
        err = cmd.Wait()
        if err != nil {
                log.Printf("Error running squeue: %v", err)
-               squeue.squeueCond.L.Lock()
-               squeue.squeueError = err
-               return err
+               return
        }
 
        squeue.squeueCond.L.Lock()
-       squeue.squeueError = nil
        squeue.squeueContents = newSqueueContents
        squeue.squeueCond.Broadcast()
-
-       return nil
+       squeue.squeueCond.L.Unlock()
 }
 
 // CheckSqueue checks if a given container UUID is in the slurm queue.  This
 // does not run squeue directly, but instead blocks until woken up by next
 // successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
+func (squeue *Squeue) CheckSqueue(uuid string) bool {
        squeue.squeueCond.L.Lock()
        // block until next squeue broadcast signaling an update.
        squeue.squeueCond.Wait()
-       if squeue.squeueError != nil {
-               e := squeue.squeueError
-               squeue.squeueCond.L.Unlock()
-               return false, e
-       }
        contents := squeue.squeueContents
        squeue.squeueCond.L.Unlock()
 
        for _, k := range contents {
                if k == uuid {
-                       return true, nil
+                       return true
                }
        }
-       return false, nil
+       return false
 }
 
 // StartMonitor starts the squeue monitoring goroutine.
 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
        squeue.squeueCond = sync.NewCond(&sync.Mutex{})
        squeue.squeueDone = make(chan struct{})
-       squeue.RunSqueue()
        go squeue.SyncSqueue(pollInterval)
 }