8784: Fix test for latest firefox.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index 61decde61c4bd61d0a92e96bde20ff0c82780f57..ee87a44fff5866632b1a27aa8910b332a0ba3b8d 100644 (file)
 package main
 
 import (
-       "bufio"
+       "bytes"
        "log"
        "os/exec"
+       "strings"
        "sync"
        "time"
 )
 
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
-type Squeue struct {
-       squeueContents []string
-       squeueDone     chan struct{}
-       squeueCond     *sync.Cond
-       SlurmLock      sync.Mutex
+type SqueueChecker struct {
+       Period    time.Duration
+       uuids     map[string]bool
+       startOnce sync.Once
+       done      chan struct{}
+       sync.Cond
 }
 
-// squeueFunc
 func squeueFunc() *exec.Cmd {
        return exec.Command("squeue", "--all", "--format=%j")
 }
 
 var squeueCmd = squeueFunc
 
-// RunSqueue runs squeue once and captures the output.  If it succeeds, set
-// "squeueContents" and then wake up any goroutines waiting squeueCond in
-// CheckSqueue().  If there was an error, log it and leave the threads blocked.
-func (squeue *Squeue) RunSqueue() {
-       var newSqueueContents []string
+// HasUUID checks if a given container UUID is in the slurm queue.
+// This does not run squeue directly, but instead blocks until woken
+// up by next successful update of squeue.
+func (sqc *SqueueChecker) HasUUID(uuid string) bool {
+       sqc.startOnce.Do(sqc.start)
 
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+
+       // block until next squeue broadcast signaling an update.
+       sqc.Wait()
+       return sqc.uuids[uuid]
+}
+
+// Stop stops the squeue monitoring goroutine. Do not call HasUUID
+// after calling Stop.
+func (sqc *SqueueChecker) Stop() {
+       if sqc.done != nil {
+               close(sqc.done)
+       }
+}
+
+// check gets the names of jobs in the SLURM queue (running and
+// queued). If it succeeds, it updates squeue.uuids and wakes up any
+// goroutines that are waiting in HasUUID() or All().
+func (sqc *SqueueChecker) check() {
        // Mutex between squeue sync and running sbatch or scancel.  This
        // establishes a sequence so that squeue doesn't run concurrently with
        // sbatch or scancel; the next update of squeue will occur only after
        // sbatch or scancel has completed.
-       squeue.SlurmLock.Lock()
-       defer squeue.SlurmLock.Unlock()
-
-       // Also ensure unlock on all return paths
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
 
        cmd := squeueCmd()
-       sq, err := cmd.StdoutPipe()
-       if err != nil {
-               log.Printf("Error creating stdout pipe for squeue: %v", err)
-               return
-       }
-       err = cmd.Start()
-       if err != nil {
-               log.Printf("Error running squeue: %v", err)
-               return
-       }
-       scanner := bufio.NewScanner(sq)
-       for scanner.Scan() {
-               newSqueueContents = append(newSqueueContents, scanner.Text())
-       }
-       if err := scanner.Err(); err != nil {
-               cmd.Wait()
-               log.Printf("Error reading from squeue pipe: %v", err)
+       stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
+       cmd.Stdout, cmd.Stderr = stdout, stderr
+       if err := cmd.Run(); err != nil {
+               log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
                return
        }
 
-       err = cmd.Wait()
-       if err != nil {
-               log.Printf("Error running squeue: %v", err)
-               return
+       uuids := strings.Split(stdout.String(), "\n")
+       sqc.uuids = make(map[string]bool, len(uuids))
+       for _, uuid := range uuids {
+               sqc.uuids[uuid] = true
        }
-
-       squeue.squeueCond.L.Lock()
-       squeue.squeueContents = newSqueueContents
-       squeue.squeueCond.Broadcast()
-       squeue.squeueCond.L.Unlock()
+       sqc.Broadcast()
 }
 
-// CheckSqueue checks if a given container UUID is in the slurm queue.  This
-// does not run squeue directly, but instead blocks until woken up by next
-// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) bool {
-       squeue.squeueCond.L.Lock()
-       // block until next squeue broadcast signaling an update.
-       squeue.squeueCond.Wait()
-       contents := squeue.squeueContents
-       squeue.squeueCond.L.Unlock()
-
-       for _, k := range contents {
-               if k == uuid {
-                       return true
+// Initialize, and start a goroutine to call check() once per
+// squeue.Period until terminated by calling Stop().
+func (sqc *SqueueChecker) start() {
+       sqc.L = &sync.Mutex{}
+       sqc.done = make(chan struct{})
+       go func() {
+               ticker := time.NewTicker(sqc.Period)
+               for {
+                       select {
+                       case <-sqc.done:
+                               ticker.Stop()
+                               return
+                       case <-ticker.C:
+                               sqc.check()
+                       }
                }
-       }
-       return false
+       }()
 }
 
-// StartMonitor starts the squeue monitoring goroutine.
-func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
-       squeue.squeueCond = sync.NewCond(&sync.Mutex{})
-       squeue.squeueDone = make(chan struct{})
-       go squeue.SyncSqueue(pollInterval)
-}
-
-// Done stops the squeue monitoring goroutine.
-func (squeue *Squeue) Done() {
-       squeue.squeueDone <- struct{}{}
-       close(squeue.squeueDone)
-}
-
-// SyncSqueue periodically polls RunSqueue() at the given duration until
-// terminated by calling Done().
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
-       ticker := time.NewTicker(pollInterval)
-       for {
-               select {
-               case <-squeue.squeueDone:
-                       return
-               case <-ticker.C:
-                       squeue.RunSqueue()
-               }
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+       sqc.startOnce.Do(sqc.start)
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+       sqc.Wait()
+       var uuids []string
+       for uuid := range sqc.uuids {
+               uuids = append(uuids, uuid)
        }
+       return uuids
 }