12891: Don't use SIGKILL when telling crunch-run to cancel.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index c1bbe920ede523850a07fdc578103097b53f3650..5ecfe8ff2fc049201eb27c8e58c7a02eb84cbbb4 100644 (file)
@@ -1,64 +1,83 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
        "bytes"
+       "fmt"
        "log"
-       "os/exec"
        "strings"
        "sync"
        "time"
 )
 
+type jobPriority struct {
+       niceness        int
+       currentPriority int
+}
+
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
-type Squeue struct {
+type SqueueChecker struct {
        Period    time.Duration
-       hasUUID   map[string]bool
+       uuids     map[string]jobPriority
        startOnce sync.Once
        done      chan struct{}
        sync.Cond
 }
 
-func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--all", "--format=%j")
-}
-
-var squeueCmd = squeueFunc
-
 // HasUUID checks if a given container UUID is in the slurm queue.
 // This does not run squeue directly, but instead blocks until woken
 // up by next successful update of squeue.
-func (squeue *Squeue) HasUUID(uuid string) bool {
-       squeue.startOnce.Do(squeue.start)
+func (sqc *SqueueChecker) HasUUID(uuid string) bool {
+       sqc.startOnce.Do(sqc.start)
 
-       squeue.L.Lock()
-       defer squeue.L.Unlock()
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
 
        // block until next squeue broadcast signaling an update.
-       squeue.Wait()
-       return squeue.hasUUID[uuid]
+       sqc.Wait()
+       _, exists := sqc.uuids[uuid]
+       return exists
+}
+
+// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
+func (sqc *SqueueChecker) GetNiceness(uuid string) int {
+       sqc.startOnce.Do(sqc.start)
+
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+
+       n, exists := sqc.uuids[uuid]
+       if exists {
+               return n.niceness
+       } else {
+               return -1
+       }
 }
 
 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
 // after calling Stop.
-func (squeue *Squeue) Stop() {
-       if squeue.done != nil {
-               close(squeue.done)
+func (sqc *SqueueChecker) Stop() {
+       if sqc.done != nil {
+               close(sqc.done)
        }
 }
 
 // check gets the names of jobs in the SLURM queue (running and
-// queued). If it succeeds, it updates squeue.hasUUID and wakes up any
-// goroutines that are waiting in HasUUID().
-func (squeue *Squeue) check() {
+// queued). If it succeeds, it updates squeue.uuids and wakes up any
+// goroutines that are waiting in HasUUID() or All().
+func (sqc *SqueueChecker) check() {
        // Mutex between squeue sync and running sbatch or scancel.  This
        // establishes a sequence so that squeue doesn't run concurrently with
        // sbatch or scancel; the next update of squeue will occur only after
        // sbatch or scancel has completed.
-       squeue.L.Lock()
-       defer squeue.L.Unlock()
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
 
-       cmd := squeueCmd()
+       cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
        cmd.Stdout, cmd.Stderr = stdout, stderr
        if err := cmd.Run(); err != nil {
@@ -66,29 +85,49 @@ func (squeue *Squeue) check() {
                return
        }
 
-       uuids := strings.Split(stdout.String(), "\n")
-       squeue.hasUUID = make(map[string]bool, len(uuids))
-       for _, uuid := range uuids {
-               squeue.hasUUID[uuid] = true
+       lines := strings.Split(stdout.String(), "\n")
+       sqc.uuids = make(map[string]jobPriority, len(lines))
+       for _, line := range lines {
+               var uuid string
+               var nice int
+               var prio int
+               fmt.Sscan(line, &uuid, &nice, &prio)
+               if uuid != "" {
+                       sqc.uuids[uuid] = jobPriority{nice, prio}
+               }
        }
-       squeue.Broadcast()
+       sqc.Broadcast()
 }
 
 // Initialize, and start a goroutine to call check() once per
 // squeue.Period until terminated by calling Stop().
-func (squeue *Squeue) start() {
-       squeue.L = &sync.Mutex{}
-       squeue.done = make(chan struct{})
+func (sqc *SqueueChecker) start() {
+       sqc.L = &sync.Mutex{}
+       sqc.done = make(chan struct{})
        go func() {
-               ticker := time.NewTicker(squeue.Period)
+               ticker := time.NewTicker(sqc.Period)
                for {
                        select {
-                       case <-squeue.done:
+                       case <-sqc.done:
                                ticker.Stop()
                                return
                        case <-ticker.C:
-                               squeue.check()
+                               sqc.check()
                        }
                }
        }()
 }
+
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+       sqc.startOnce.Do(sqc.start)
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+       sqc.Wait()
+       var uuids []string
+       for uuid := range sqc.uuids {
+               uuids = append(uuids, uuid)
+       }
+       return uuids
+}