Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index 34e6632b48aa11b84ef1ebadaea7cf9fafb21568..819c2d2510f992ab75c18249d8b46d6c13828d6d 100644 (file)
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
-       "bufio"
+       "bytes"
+       "fmt"
        "log"
        "os/exec"
+       "strings"
        "sync"
        "time"
 )
 
+type jobPriority struct {
+       niceness        int
+       currentPriority int
+}
+
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
-type Squeue struct {
-       squeueContents []string
-       squeueDone     chan struct{}
-       squeueCond     *sync.Cond
-       SlurmLock      sync.Mutex
+type SqueueChecker struct {
+       Period    time.Duration
+       uuids     map[string]jobPriority
+       startOnce sync.Once
+       done      chan struct{}
+       sync.Cond
 }
 
-// squeueFunc
 func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--format=%j")
+       return exec.Command("squeue", "--all", "--format=%j %y %Q")
 }
 
 var squeueCmd = squeueFunc
 
-// RunSqueue runs squeue once and captures the output.  If it succeeds, set
-// "squeueContents" and then wake up any goroutines waiting squeueCond in
-// CheckSqueue().  If there was an error, log it and leave the threads blocked.
-func (squeue *Squeue) RunSqueue() {
-       var newSqueueContents []string
+// HasUUID checks if a given container UUID is in the slurm queue.
+// This does not run squeue directly, but instead blocks until woken
+// up by next successful update of squeue.
+func (sqc *SqueueChecker) HasUUID(uuid string) bool {
+       sqc.startOnce.Do(sqc.start)
+
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+
+       // block until next squeue broadcast signaling an update.
+       sqc.Wait()
+       _, exists := sqc.uuids[uuid]
+       return exists
+}
+
+// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
+func (sqc *SqueueChecker) GetNiceness(uuid string) int {
+       sqc.startOnce.Do(sqc.start)
+
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+
+       n, exists := sqc.uuids[uuid]
+       if exists {
+               return n.niceness
+       } else {
+               return -1
+       }
+}
+
+// Stop stops the squeue monitoring goroutine. Do not call HasUUID
+// after calling Stop.
+func (sqc *SqueueChecker) Stop() {
+       if sqc.done != nil {
+               close(sqc.done)
+       }
+}
 
+// check gets the names of jobs in the SLURM queue (running and
+// queued). If it succeeds, it updates squeue.uuids and wakes up any
+// goroutines that are waiting in HasUUID() or All().
+func (sqc *SqueueChecker) check() {
        // Mutex between squeue sync and running sbatch or scancel.  This
        // establishes a sequence so that squeue doesn't run concurrently with
        // sbatch or scancel; the next update of squeue will occur only after
        // sbatch or scancel has completed.
-       squeue.SlurmLock.Lock()
-       defer squeue.SlurmLock.Unlock()
-
-       // Also ensure unlock on all return paths
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
 
        cmd := squeueCmd()
-       sq, err := cmd.StdoutPipe()
-       if err != nil {
-               log.Printf("Error creating stdout pipe for squeue: %v", err)
-               return
-       }
-       cmd.Start()
-       scanner := bufio.NewScanner(sq)
-       for scanner.Scan() {
-               newSqueueContents = append(newSqueueContents, scanner.Text())
-       }
-       if err := scanner.Err(); err != nil {
-               cmd.Wait()
-               log.Printf("Error reading from squeue pipe: %v", err)
+       stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
+       cmd.Stdout, cmd.Stderr = stdout, stderr
+       if err := cmd.Run(); err != nil {
+               log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
                return
        }
 
-       err = cmd.Wait()
-       if err != nil {
-               log.Printf("Error running squeue: %v", err)
-               return
-       }
-
-       squeue.squeueCond.L.Lock()
-       squeue.squeueContents = newSqueueContents
-       squeue.squeueCond.Broadcast()
-       squeue.squeueCond.L.Unlock()
-}
-
-// CheckSqueue checks if a given container UUID is in the slurm queue.  This
-// does not run squeue directly, but instead blocks until woken up by next
-// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) bool {
-       squeue.squeueCond.L.Lock()
-       // block until next squeue broadcast signaling an update.
-       squeue.squeueCond.Wait()
-       contents := squeue.squeueContents
-       squeue.squeueCond.L.Unlock()
-
-       for _, k := range contents {
-               if k == uuid {
-                       return true
+       lines := strings.Split(stdout.String(), "\n")
+       sqc.uuids = make(map[string]jobPriority, len(lines))
+       for _, line := range lines {
+               var uuid string
+               var nice int
+               var prio int
+               fmt.Sscan(line, &uuid, &nice, &prio)
+               if uuid != "" {
+                       sqc.uuids[uuid] = jobPriority{nice, prio}
                }
        }
-       return false
+       sqc.Broadcast()
 }
 
-// StartMonitor starts the squeue monitoring goroutine.
-func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
-       squeue.squeueCond = sync.NewCond(&sync.Mutex{})
-       squeue.squeueDone = make(chan struct{})
-       go squeue.SyncSqueue(pollInterval)
-}
-
-// Done stops the squeue monitoring goroutine.
-func (squeue *Squeue) Done() {
-       squeue.squeueDone <- struct{}{}
-       close(squeue.squeueDone)
+// Initialize, and start a goroutine to call check() once per
+// squeue.Period until terminated by calling Stop().
+func (sqc *SqueueChecker) start() {
+       sqc.L = &sync.Mutex{}
+       sqc.done = make(chan struct{})
+       go func() {
+               ticker := time.NewTicker(sqc.Period)
+               for {
+                       select {
+                       case <-sqc.done:
+                               ticker.Stop()
+                               return
+                       case <-ticker.C:
+                               sqc.check()
+                       }
+               }
+       }()
 }
 
-// SyncSqueue periodically polls RunSqueue() at the given duration until
-// terminated by calling Done().
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
-       ticker := time.NewTicker(pollInterval)
-       for {
-               select {
-               case <-squeue.squeueDone:
-                       return
-               case <-ticker.C:
-                       squeue.RunSqueue()
-               }
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+       sqc.startOnce.Do(sqc.start)
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+       sqc.Wait()
+       var uuids []string
+       for uuid := range sqc.uuids {
+               uuids = append(uuids, uuid)
        }
+       return uuids
 }