Merge branch 'master' into 10293-container-request-output-uuid
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index b86a4d95689489fbb492b821d356f5c8400af3a7..61decde61c4bd61d0a92e96bde20ff0c82780f57 100644 (file)
@@ -8,43 +8,48 @@ import (
        "time"
 )
 
+// Squeue implements asynchronous polling monitor of the SLURM queue using the
+// command 'squeue'.
 type Squeue struct {
        squeueContents []string
        squeueDone     chan struct{}
-       squeueError    error
        squeueCond     *sync.Cond
        SlurmLock      sync.Mutex
 }
 
 // squeueFunc
 func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--format=%j")
+       return exec.Command("squeue", "--all", "--format=%j")
 }
 
 var squeueCmd = squeueFunc
 
-func (squeue *Squeue) RunSqueue() error {
+// RunSqueue runs squeue once and captures the output.  If it succeeds, set
+// "squeueContents" and then wake up any goroutines waiting squeueCond in
+// CheckSqueue().  If there was an error, log it and leave the threads blocked.
+func (squeue *Squeue) RunSqueue() {
        var newSqueueContents []string
 
        // Mutex between squeue sync and running sbatch or scancel.  This
        // establishes a sequence so that squeue doesn't run concurrently with
        // sbatch or scancel; the next update of squeue will occur only after
        // sbatch or scancel has completed.
-       squeueUpdater.SlurmLock.Lock()
-       defer squeueUpdater.SlurmLock.Unlock()
+       squeue.SlurmLock.Lock()
+       defer squeue.SlurmLock.Unlock()
 
        // Also ensure unlock on all return paths
-       defer squeueUpdater.squeueCond.L.Unlock()
 
        cmd := squeueCmd()
        sq, err := cmd.StdoutPipe()
        if err != nil {
                log.Printf("Error creating stdout pipe for squeue: %v", err)
-               squeueUpdater.squeueCond.L.Lock()
-               squeueUpdater.squeueError = err
-               return err
+               return
+       }
+       err = cmd.Start()
+       if err != nil {
+               log.Printf("Error running squeue: %v", err)
+               return
        }
-       cmd.Start()
        scanner := bufio.NewScanner(sq)
        for scanner.Scan() {
                newSqueueContents = append(newSqueueContents, scanner.Text())
@@ -52,69 +57,62 @@ func (squeue *Squeue) RunSqueue() error {
        if err := scanner.Err(); err != nil {
                cmd.Wait()
                log.Printf("Error reading from squeue pipe: %v", err)
-               squeueUpdater.squeueCond.L.Lock()
-               squeueUpdater.squeueError = err
-               return err
+               return
        }
 
        err = cmd.Wait()
        if err != nil {
                log.Printf("Error running squeue: %v", err)
-               squeueUpdater.squeueCond.L.Lock()
-               squeueUpdater.squeueError = err
-               return err
+               return
        }
 
-       squeueUpdater.squeueCond.L.Lock()
-       squeueUpdater.squeueError = nil
-       squeueUpdater.squeueContents = newSqueueContents
-       squeueUpdater.squeueCond.Broadcast()
-
-       return nil
+       squeue.squeueCond.L.Lock()
+       squeue.squeueContents = newSqueueContents
+       squeue.squeueCond.Broadcast()
+       squeue.squeueCond.L.Unlock()
 }
 
-// Check if a container UUID is in the slurm queue.  This will block until the
-// next successful update from SLURM.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
-       squeueUpdater.squeueCond.L.Lock()
+// CheckSqueue checks if a given container UUID is in the slurm queue.  This
+// does not run squeue directly, but instead blocks until woken up by next
+// successful update of squeue.
+func (squeue *Squeue) CheckSqueue(uuid string) bool {
+       squeue.squeueCond.L.Lock()
        // block until next squeue broadcast signaling an update.
-       squeueUpdater.squeueCond.Wait()
-       if squeueUpdater.squeueError != nil {
-               e := squeueUpdater.squeueError
-               squeueUpdater.squeueCond.L.Unlock()
-               return false, e
-       }
-       contents := squeueUpdater.squeueContents
-       squeueUpdater.squeueCond.L.Unlock()
+       squeue.squeueCond.Wait()
+       contents := squeue.squeueContents
+       squeue.squeueCond.L.Unlock()
 
        for _, k := range contents {
                if k == uuid {
-                       return true, nil
+                       return true
                }
        }
-       return false, nil
+       return false
 }
 
+// StartMonitor starts the squeue monitoring goroutine.
 func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
-       squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
-       squeueUpdater.squeueDone = make(chan struct{})
-       squeueUpdater.RunSqueue()
-       go squeueUpdater.SyncSqueue(pollInterval)
+       squeue.squeueCond = sync.NewCond(&sync.Mutex{})
+       squeue.squeueDone = make(chan struct{})
+       go squeue.SyncSqueue(pollInterval)
 }
 
+// Done stops the squeue monitoring goroutine.
 func (squeue *Squeue) Done() {
-       squeueUpdater.squeueDone <- struct{}{}
-       close(squeueUpdater.squeueDone)
+       squeue.squeueDone <- struct{}{}
+       close(squeue.squeueDone)
 }
 
+// SyncSqueue periodically polls RunSqueue() at the given duration until
+// terminated by calling Done().
 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
        ticker := time.NewTicker(pollInterval)
        for {
                select {
-               case <-squeueUpdater.squeueDone:
+               case <-squeue.squeueDone:
                        return
                case <-ticker.C:
-                       squeueUpdater.RunSqueue()
+                       squeue.RunSqueue()
                }
        }
 }