X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d45be86b354adec485504bfc09f41e0e22241f34..59eb2058e7111581645c960ba868f49f0fed152b:/services/crunch-dispatch-slurm/squeue.go diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index 3ee8b6f8b6..5ecfe8ff2f 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -1,130 +1,133 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( - "bufio" + "bytes" + "fmt" "log" - "os/exec" + "strings" "sync" "time" ) +type jobPriority struct { + niceness int + currentPriority int +} + // Squeue implements asynchronous polling monitor of the SLURM queue using the // command 'squeue'. -type Squeue struct { - squeueContents []string - squeueDone chan struct{} - squeueError error - squeueCond *sync.Cond - SlurmLock sync.Mutex +type SqueueChecker struct { + Period time.Duration + uuids map[string]jobPriority + startOnce sync.Once + done chan struct{} + sync.Cond } -// squeueFunc -func squeueFunc() *exec.Cmd { - return exec.Command("squeue", "--format=%j") +// HasUUID checks if a given container UUID is in the slurm queue. +// This does not run squeue directly, but instead blocks until woken +// up by next successful update of squeue. +func (sqc *SqueueChecker) HasUUID(uuid string) bool { + sqc.startOnce.Do(sqc.start) + + sqc.L.Lock() + defer sqc.L.Unlock() + + // block until next squeue broadcast signaling an update. + sqc.Wait() + _, exists := sqc.uuids[uuid] + return exists } -var squeueCmd = squeueFunc +// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist. +func (sqc *SqueueChecker) GetNiceness(uuid string) int { + sqc.startOnce.Do(sqc.start) -// RunSqueue runs squeue once and captures the output. If there is an error, -// set "squeueError". If it succeeds, set "squeueContents" and then wake up -// any goroutines waiting squeueCond in CheckSqueue(). -func (squeue *Squeue) RunSqueue() error { - var newSqueueContents []string + sqc.L.Lock() + defer sqc.L.Unlock() - // Mutex between squeue sync and running sbatch or scancel. This - // establishes a sequence so that squeue doesn't run concurrently with - // sbatch or scancel; the next update of squeue will occur only after - // sbatch or scancel has completed. - squeue.SlurmLock.Lock() - defer squeue.SlurmLock.Unlock() - - // Also ensure unlock on all return paths - defer squeue.squeueCond.L.Unlock() - - cmd := squeueCmd() - sq, err := cmd.StdoutPipe() - if err != nil { - log.Printf("Error creating stdout pipe for squeue: %v", err) - squeue.squeueCond.L.Lock() - squeue.squeueError = err - return err - } - cmd.Start() - scanner := bufio.NewScanner(sq) - for scanner.Scan() { - newSqueueContents = append(newSqueueContents, scanner.Text()) - } - if err := scanner.Err(); err != nil { - cmd.Wait() - log.Printf("Error reading from squeue pipe: %v", err) - squeue.squeueCond.L.Lock() - squeue.squeueError = err - return err + n, exists := sqc.uuids[uuid] + if exists { + return n.niceness + } else { + return -1 } +} - err = cmd.Wait() - if err != nil { - log.Printf("Error running squeue: %v", err) - squeue.squeueCond.L.Lock() - squeue.squeueError = err - return err +// Stop stops the squeue monitoring goroutine. Do not call HasUUID +// after calling Stop. +func (sqc *SqueueChecker) Stop() { + if sqc.done != nil { + close(sqc.done) } - - squeue.squeueCond.L.Lock() - squeue.squeueError = nil - squeue.squeueContents = newSqueueContents - squeue.squeueCond.Broadcast() - - return nil } -// CheckSqueue checks if a given container UUID is in the slurm queue. This -// does not run squeue directly, but instead blocks until woken up by next -// successful update of squeue. -func (squeue *Squeue) CheckSqueue(uuid string) (bool, error) { - squeue.squeueCond.L.Lock() - // block until next squeue broadcast signaling an update. - squeue.squeueCond.Wait() - if squeue.squeueError != nil { - e := squeue.squeueError - squeue.squeueCond.L.Unlock() - return false, e +// check gets the names of jobs in the SLURM queue (running and +// queued). If it succeeds, it updates squeue.uuids and wakes up any +// goroutines that are waiting in HasUUID() or All(). +func (sqc *SqueueChecker) check() { + // Mutex between squeue sync and running sbatch or scancel. This + // establishes a sequence so that squeue doesn't run concurrently with + // sbatch or scancel; the next update of squeue will occur only after + // sbatch or scancel has completed. + sqc.L.Lock() + defer sqc.L.Unlock() + + cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"}) + stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} + cmd.Stdout, cmd.Stderr = stdout, stderr + if err := cmd.Run(); err != nil { + log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String()) + return } - contents := squeue.squeueContents - squeue.squeueCond.L.Unlock() - for _, k := range contents { - if k == uuid { - return true, nil + lines := strings.Split(stdout.String(), "\n") + sqc.uuids = make(map[string]jobPriority, len(lines)) + for _, line := range lines { + var uuid string + var nice int + var prio int + fmt.Sscan(line, &uuid, &nice, &prio) + if uuid != "" { + sqc.uuids[uuid] = jobPriority{nice, prio} } } - return false, nil -} - -// StartMonitor starts the squeue monitoring goroutine. -func (squeue *Squeue) StartMonitor(pollInterval time.Duration) { - squeue.squeueCond = sync.NewCond(&sync.Mutex{}) - squeue.squeueDone = make(chan struct{}) - squeue.RunSqueue() - go squeue.SyncSqueue(pollInterval) + sqc.Broadcast() } -// Done stops the squeue monitoring goroutine. -func (squeue *Squeue) Done() { - squeue.squeueDone <- struct{}{} - close(squeue.squeueDone) +// Initialize, and start a goroutine to call check() once per +// squeue.Period until terminated by calling Stop(). +func (sqc *SqueueChecker) start() { + sqc.L = &sync.Mutex{} + sqc.done = make(chan struct{}) + go func() { + ticker := time.NewTicker(sqc.Period) + for { + select { + case <-sqc.done: + ticker.Stop() + return + case <-ticker.C: + sqc.check() + } + } + }() } -// SyncSqueue periodically polls RunSqueue() at the given duration until -// terminated by calling Done(). -func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) { - ticker := time.NewTicker(pollInterval) - for { - select { - case <-squeue.squeueDone: - return - case <-ticker.C: - squeue.RunSqueue() - } +// All waits for the next squeue invocation, and returns all job +// names reported by squeue. +func (sqc *SqueueChecker) All() []string { + sqc.startOnce.Do(sqc.start) + sqc.L.Lock() + defer sqc.L.Unlock() + sqc.Wait() + var uuids []string + for uuid := range sqc.uuids { + uuids = append(uuids, uuid) } + return uuids }