X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/83c8f1685d812c31d8bd568f3c2ac1edcd120aed..f44a15adce692614ecb816dbe2d0205704d9a4ab:/services/crunch-dispatch-slurm/squeue.go diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index 0c90679d3f..819c2d2510 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -1,7 +1,12 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "bytes" + "fmt" "log" "os/exec" "strings" @@ -9,18 +14,23 @@ import ( "time" ) +type jobPriority struct { + niceness int + currentPriority int +} + // Squeue implements asynchronous polling monitor of the SLURM queue using the // command 'squeue'. type SqueueChecker struct { Period time.Duration - uuids map[string]bool + uuids map[string]jobPriority startOnce sync.Once done chan struct{} sync.Cond } func squeueFunc() *exec.Cmd { - return exec.Command("squeue", "--all", "--format=%j") + return exec.Command("squeue", "--all", "--format=%j %y %Q") } var squeueCmd = squeueFunc @@ -36,7 +46,23 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool { // block until next squeue broadcast signaling an update. sqc.Wait() - return sqc.uuids[uuid] + _, exists := sqc.uuids[uuid] + return exists +} + +// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist. +func (sqc *SqueueChecker) GetNiceness(uuid string) int { + sqc.startOnce.Do(sqc.start) + + sqc.L.Lock() + defer sqc.L.Unlock() + + n, exists := sqc.uuids[uuid] + if exists { + return n.niceness + } else { + return -1 + } } // Stop stops the squeue monitoring goroutine. Do not call HasUUID @@ -49,7 +75,7 @@ func (sqc *SqueueChecker) Stop() { // check gets the names of jobs in the SLURM queue (running and // queued). If it succeeds, it updates squeue.uuids and wakes up any -// goroutines that are waiting in HasUUID(). +// goroutines that are waiting in HasUUID() or All(). func (sqc *SqueueChecker) check() { // Mutex between squeue sync and running sbatch or scancel. This // establishes a sequence so that squeue doesn't run concurrently with @@ -66,10 +92,16 @@ func (sqc *SqueueChecker) check() { return } - uuids := strings.Split(stdout.String(), "\n") - sqc.uuids = make(map[string]bool, len(uuids)) - for _, uuid := range uuids { - sqc.uuids[uuid] = true + lines := strings.Split(stdout.String(), "\n") + sqc.uuids = make(map[string]jobPriority, len(lines)) + for _, line := range lines { + var uuid string + var nice int + var prio int + fmt.Sscan(line, &uuid, &nice, &prio) + if uuid != "" { + sqc.uuids[uuid] = jobPriority{nice, prio} + } } sqc.Broadcast() } @@ -93,7 +125,16 @@ func (sqc *SqueueChecker) start() { }() } -// All Uuids in squeue -func (sqc *SqueueChecker) AllUuids() map[string]bool { - return sqc.uuids +// All waits for the next squeue invocation, and returns all job +// names reported by squeue. +func (sqc *SqueueChecker) All() []string { + sqc.startOnce.Do(sqc.start) + sqc.L.Lock() + defer sqc.L.Unlock() + sqc.Wait() + var uuids []string + for uuid := range sqc.uuids { + uuids = append(uuids, uuid) + } + return uuids }