X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/83c8f1685d812c31d8bd568f3c2ac1edcd120aed..59eb2058e7111581645c960ba868f49f0fed152b:/services/crunch-dispatch-slurm/squeue.go diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index 0c90679d3f..5ecfe8ff2f 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -1,30 +1,33 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( "bytes" + "fmt" "log" - "os/exec" "strings" "sync" "time" ) +type jobPriority struct { + niceness int + currentPriority int +} + // Squeue implements asynchronous polling monitor of the SLURM queue using the // command 'squeue'. type SqueueChecker struct { Period time.Duration - uuids map[string]bool + uuids map[string]jobPriority startOnce sync.Once done chan struct{} sync.Cond } -func squeueFunc() *exec.Cmd { - return exec.Command("squeue", "--all", "--format=%j") -} - -var squeueCmd = squeueFunc - // HasUUID checks if a given container UUID is in the slurm queue. // This does not run squeue directly, but instead blocks until woken // up by next successful update of squeue. @@ -36,7 +39,23 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool { // block until next squeue broadcast signaling an update. sqc.Wait() - return sqc.uuids[uuid] + _, exists := sqc.uuids[uuid] + return exists +} + +// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist. +func (sqc *SqueueChecker) GetNiceness(uuid string) int { + sqc.startOnce.Do(sqc.start) + + sqc.L.Lock() + defer sqc.L.Unlock() + + n, exists := sqc.uuids[uuid] + if exists { + return n.niceness + } else { + return -1 + } } // Stop stops the squeue monitoring goroutine. Do not call HasUUID @@ -49,7 +68,7 @@ func (sqc *SqueueChecker) Stop() { // check gets the names of jobs in the SLURM queue (running and // queued). If it succeeds, it updates squeue.uuids and wakes up any -// goroutines that are waiting in HasUUID(). +// goroutines that are waiting in HasUUID() or All(). func (sqc *SqueueChecker) check() { // Mutex between squeue sync and running sbatch or scancel. This // establishes a sequence so that squeue doesn't run concurrently with @@ -58,7 +77,7 @@ func (sqc *SqueueChecker) check() { sqc.L.Lock() defer sqc.L.Unlock() - cmd := squeueCmd() + cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"}) stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} cmd.Stdout, cmd.Stderr = stdout, stderr if err := cmd.Run(); err != nil { @@ -66,10 +85,16 @@ func (sqc *SqueueChecker) check() { return } - uuids := strings.Split(stdout.String(), "\n") - sqc.uuids = make(map[string]bool, len(uuids)) - for _, uuid := range uuids { - sqc.uuids[uuid] = true + lines := strings.Split(stdout.String(), "\n") + sqc.uuids = make(map[string]jobPriority, len(lines)) + for _, line := range lines { + var uuid string + var nice int + var prio int + fmt.Sscan(line, &uuid, &nice, &prio) + if uuid != "" { + sqc.uuids[uuid] = jobPriority{nice, prio} + } } sqc.Broadcast() } @@ -93,7 +118,16 @@ func (sqc *SqueueChecker) start() { }() } -// All Uuids in squeue -func (sqc *SqueueChecker) AllUuids() map[string]bool { - return sqc.uuids +// All waits for the next squeue invocation, and returns all job +// names reported by squeue. +func (sqc *SqueueChecker) All() []string { + sqc.startOnce.Do(sqc.start) + sqc.L.Lock() + defer sqc.L.Unlock() + sqc.Wait() + var uuids []string + for uuid := range sqc.uuids { + uuids = append(uuids, uuid) + } + return uuids }