+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
- "bufio"
+ "bytes"
+ "fmt"
"log"
- "os/exec"
+ "strings"
"sync"
"time"
)
-type Squeue struct {
- squeueContents []string
- squeueDone chan struct{}
- squeueError error
- squeueCond *sync.Cond
- SlurmLock sync.Mutex
+type jobPriority struct {
+ niceness int
+ currentPriority int
}
-// squeueFunc
-func squeueFunc() *exec.Cmd {
- return exec.Command("squeue", "--format=%j")
+// Squeue implements asynchronous polling monitor of the SLURM queue using the
+// command 'squeue'.
+type SqueueChecker struct {
+ Period time.Duration
+ Slurm Slurm
+ uuids map[string]jobPriority
+ startOnce sync.Once
+ done chan struct{}
+ sync.Cond
}
-var squeueCmd = squeueFunc
+// HasUUID checks if a given container UUID is in the slurm queue.
+// This does not run squeue directly, but instead blocks until woken
+// up by next successful update of squeue.
+func (sqc *SqueueChecker) HasUUID(uuid string) bool {
+ sqc.startOnce.Do(sqc.start)
-func (squeue *Squeue) RunSqueue() error {
- var newSqueueContents []string
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
- // Mutex between squeue sync and running sbatch or scancel. This
- // establishes a sequence so that squeue doesn't run concurrently with
- // sbatch or scancel; the next update of squeue will occur only after
- // sbatch or scancel has completed.
- squeueUpdater.SlurmLock.Lock()
- defer squeueUpdater.SlurmLock.Unlock()
-
- // Also ensure unlock on all return paths
- defer squeueUpdater.squeueCond.L.Unlock()
-
- cmd := squeueCmd()
- sq, err := cmd.StdoutPipe()
- if err != nil {
- log.Printf("Error creating stdout pipe for squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
- return err
- }
- cmd.Start()
- scanner := bufio.NewScanner(sq)
- for scanner.Scan() {
- newSqueueContents = append(newSqueueContents, scanner.Text())
- }
- if err := scanner.Err(); err != nil {
- cmd.Wait()
- log.Printf("Error reading from squeue pipe: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
- return err
- }
+ // block until next squeue broadcast signaling an update.
+ sqc.Wait()
+ _, exists := sqc.uuids[uuid]
+ return exists
+}
- err = cmd.Wait()
- if err != nil {
- log.Printf("Error running squeue: %v", err)
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = err
- return err
- }
+// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
+func (sqc *SqueueChecker) GetNiceness(uuid string) int {
+ sqc.startOnce.Do(sqc.start)
+
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
- squeueUpdater.squeueCond.L.Lock()
- squeueUpdater.squeueError = nil
- squeueUpdater.squeueContents = newSqueueContents
- squeueUpdater.squeueCond.Broadcast()
+ n, exists := sqc.uuids[uuid]
+ if exists {
+ return n.niceness
+ } else {
+ return -1
+ }
+}
- return nil
+// Stop stops the squeue monitoring goroutine. Do not call HasUUID
+// after calling Stop.
+func (sqc *SqueueChecker) Stop() {
+ if sqc.done != nil {
+ close(sqc.done)
+ }
}
-// Check if a container UUID is in the slurm queue. This will block until the
-// next successful update from SLURM.
-func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) {
- squeueUpdater.squeueCond.L.Lock()
- // block until next squeue broadcast signaling an update.
- squeueUpdater.squeueCond.Wait()
- if squeueUpdater.squeueError != nil {
- e := squeueUpdater.squeueError
- squeueUpdater.squeueCond.L.Unlock()
- return false, e
+// check gets the names of jobs in the SLURM queue (running and
+// queued). If it succeeds, it updates squeue.uuids and wakes up any
+// goroutines that are waiting in HasUUID() or All().
+func (sqc *SqueueChecker) check() {
+ // Mutex between squeue sync and running sbatch or scancel. This
+ // establishes a sequence so that squeue doesn't run concurrently with
+ // sbatch or scancel; the next update of squeue will occur only after
+ // sbatch or scancel has completed.
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+
+ cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
+ stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
+ cmd.Stdout, cmd.Stderr = stdout, stderr
+ if err := cmd.Run(); err != nil {
+ log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+ return
}
- contents := squeueUpdater.squeueContents
- squeueUpdater.squeueCond.L.Unlock()
- for _, k := range contents {
- if k == uuid {
- return true, nil
+ lines := strings.Split(stdout.String(), "\n")
+ sqc.uuids = make(map[string]jobPriority, len(lines))
+ for _, line := range lines {
+ var uuid string
+ var nice int
+ var prio int
+ fmt.Sscan(line, &uuid, &nice, &prio)
+ if uuid != "" {
+ sqc.uuids[uuid] = jobPriority{nice, prio}
}
}
- return false, nil
-}
-
-func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
- squeueUpdater.squeueCond = sync.NewCond(&sync.Mutex{})
- squeueUpdater.squeueDone = make(chan struct{})
- squeueUpdater.RunSqueue()
- go squeueUpdater.SyncSqueue(pollInterval)
+ sqc.Broadcast()
}
-func (squeue *Squeue) Done() {
- squeueUpdater.squeueDone <- struct{}{}
- close(squeueUpdater.squeueDone)
+// Initialize, and start a goroutine to call check() once per
+// squeue.Period until terminated by calling Stop().
+func (sqc *SqueueChecker) start() {
+ sqc.L = &sync.Mutex{}
+ sqc.done = make(chan struct{})
+ go func() {
+ ticker := time.NewTicker(sqc.Period)
+ for {
+ select {
+ case <-sqc.done:
+ ticker.Stop()
+ return
+ case <-ticker.C:
+ sqc.check()
+ }
+ }
+ }()
}
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
- ticker := time.NewTicker(pollInterval)
- for {
- select {
- case <-squeueUpdater.squeueDone:
- return
- case <-ticker.C:
- squeueUpdater.RunSqueue()
- }
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+ sqc.startOnce.Do(sqc.start)
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+ sqc.Wait()
+ var uuids []string
+ for uuid := range sqc.uuids {
+ uuids = append(uuids, uuid)
}
+ return uuids
}