X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/609646134bcd8fc3a7fd500848220741ecc4a9d2..1bff2ab0181be31492c53351afc1c3c1e58ea05d:/services/crunch-dispatch-slurm/squeue.go diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index 45d06c8c1e..742943f197 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -1,138 +1,220 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( - "bufio" - "io" - "io/ioutil" + "bytes" + "fmt" "log" - "os/exec" + "sort" + "strings" "sync" "time" ) -// Squeue implements asynchronous polling monitor of the SLURM queue using the -// command 'squeue'. -type Squeue struct { - squeueContents []string - squeueDone chan struct{} - squeueCond *sync.Cond - SlurmLock sync.Mutex +type slurmJob struct { + uuid string + wantPriority int64 + priority int64 // current slurm priority (incorporates nice value) + nice int64 // current slurm nice value } -// squeueFunc -func squeueFunc() *exec.Cmd { - return exec.Command("squeue", "--all", "--format=%j") +// Squeue implements asynchronous polling monitor of the SLURM queue using the +// command 'squeue'. +type SqueueChecker struct { + Period time.Duration + PrioritySpread int64 + Slurm Slurm + queue map[string]*slurmJob + startOnce sync.Once + done chan struct{} + sync.Cond } -var squeueCmd = squeueFunc +// HasUUID checks if a given container UUID is in the slurm queue. +// This does not run squeue directly, but instead blocks until woken +// up by next successful update of squeue. +func (sqc *SqueueChecker) HasUUID(uuid string) bool { + sqc.startOnce.Do(sqc.start) -// RunSqueue runs squeue once and captures the output. If it succeeds, set -// "squeueContents" and then wake up any goroutines waiting squeueCond in -// CheckSqueue(). If there was an error, log it and leave the threads blocked. -func (squeue *Squeue) RunSqueue() { - var newSqueueContents []string + sqc.L.Lock() + defer sqc.L.Unlock() - // Mutex between squeue sync and running sbatch or scancel. This - // establishes a sequence so that squeue doesn't run concurrently with - // sbatch or scancel; the next update of squeue will occur only after - // sbatch or scancel has completed. - squeue.SlurmLock.Lock() - defer squeue.SlurmLock.Unlock() - - // Also ensure unlock on all return paths - - cmd := squeueCmd() - sq, err := cmd.StdoutPipe() - if err != nil { - log.Printf("Error creating stdout pipe for squeue: %v", err) - return - } - - stderrReader, err := cmd.StderrPipe() - if err != nil { - log.Printf("Error creating stderr pipe for squeue: %v", err) - return - } + // block until next squeue broadcast signaling an update. + sqc.Wait() + _, exists := sqc.queue[uuid] + return exists +} - err = cmd.Start() - if err != nil { - log.Printf("Error running squeue: %v", err) - return +// SetPriority sets or updates the desired (Arvados) priority for a +// container. +func (sqc *SqueueChecker) SetPriority(uuid string, want int64) { + sqc.startOnce.Do(sqc.start) + sqc.L.Lock() + defer sqc.L.Unlock() + job, ok := sqc.queue[uuid] + if !ok { + // Wait in case the slurm job was just submitted and + // will appear in the next squeue update. + sqc.Wait() + if job, ok = sqc.queue[uuid]; !ok { + return + } } + job.wantPriority = want +} - stderrChan := make(chan []byte) - go func() { - b, _ := ioutil.ReadAll(stderrReader) - stderrChan <- b - close(stderrChan) - }() - - scanner := bufio.NewScanner(sq) - for scanner.Scan() { - newSqueueContents = append(newSqueueContents, scanner.Text()) +// adjust slurm job nice values as needed to ensure slurm priority +// order matches Arvados priority order. +func (sqc *SqueueChecker) reniceAll() { + sqc.L.Lock() + defer sqc.L.Unlock() + + jobs := make([]*slurmJob, 0, len(sqc.queue)) + for _, j := range sqc.queue { + if j.wantPriority == 0 { + // SLURM job with unknown Arvados priority + // (perhaps it's not an Arvados job) + continue + } + if j.priority == 0 { + // SLURM <= 15.x implements "hold" by setting + // priority to 0. If we include held jobs + // here, we'll end up trying to push other + // jobs below them using negative priority, + // which won't help anything. + continue + } + jobs = append(jobs, j) } - io.Copy(ioutil.Discard, sq) - - stderrmsg := <-stderrChan - - err = cmd.Wait() - if scanner.Err() != nil { - log.Printf("Error reading from squeue pipe: %v", err) - } - if err != nil { - log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg)) + sort.Slice(jobs, func(i, j int) bool { + if jobs[i].wantPriority != jobs[j].wantPriority { + return jobs[i].wantPriority > jobs[j].wantPriority + } else { + // break ties with container uuid -- + // otherwise, the ordering would change from + // one interval to the next, and we'd do many + // pointless slurm queue rearrangements. + return jobs[i].uuid > jobs[j].uuid + } + }) + renice := wantNice(jobs, sqc.PrioritySpread) + for i, job := range jobs { + if renice[i] == job.nice { + continue + } + sqc.Slurm.Renice(job.uuid, renice[i]) } +} - if scanner.Err() == nil && err == nil { - squeue.squeueCond.L.Lock() - squeue.squeueContents = newSqueueContents - squeue.squeueCond.Broadcast() - squeue.squeueCond.L.Unlock() +// Stop stops the squeue monitoring goroutine. Do not call HasUUID +// after calling Stop. +func (sqc *SqueueChecker) Stop() { + if sqc.done != nil { + close(sqc.done) } } -// CheckSqueue checks if a given container UUID is in the slurm queue. This -// does not run squeue directly, but instead blocks until woken up by next -// successful update of squeue. -func (squeue *Squeue) CheckSqueue(uuid string) bool { - squeue.squeueCond.L.Lock() - // block until next squeue broadcast signaling an update. - squeue.squeueCond.Wait() - contents := squeue.squeueContents - squeue.squeueCond.L.Unlock() +// check gets the names of jobs in the SLURM queue (running and +// queued). If it succeeds, it updates sqc.queue and wakes up any +// goroutines that are waiting in HasUUID() or All(). +func (sqc *SqueueChecker) check() { + // Mutex between squeue sync and running sbatch or scancel. This + // establishes a sequence so that squeue doesn't run concurrently with + // sbatch or scancel; the next update of squeue will occur only after + // sbatch or scancel has completed. + sqc.L.Lock() + defer sqc.L.Unlock() + + cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"}) + stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} + cmd.Stdout, cmd.Stderr = stdout, stderr + if err := cmd.Run(); err != nil { + log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String()) + return + } - for _, k := range contents { - if k == uuid { - return true + lines := strings.Split(stdout.String(), "\n") + newq := make(map[string]*slurmJob, len(lines)) + for _, line := range lines { + if line == "" { + continue + } + var uuid, state, reason string + var n, p int64 + if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil { + log.Printf("warning: ignoring unparsed line in squeue output: %q", line) + continue + } + replacing, ok := sqc.queue[uuid] + if !ok { + replacing = &slurmJob{uuid: uuid} + } + replacing.priority = p + replacing.nice = n + newq[uuid] = replacing + + if state == "PENDING" && ((reason == "BadConstraints" && p == 0) || reason == "launch failed requeued held") && replacing.wantPriority > 0 { + // When using SLURM 14.x or 15.x, our queued + // jobs land in this state when "scontrol + // reconfigure" invalidates their feature + // constraints by clearing all node features. + // They stay in this state even after the + // features reappear, until we run "scontrol + // release {jobid}". + // + // "scontrol release" is silent and successful + // regardless of whether the features have + // reappeared, so rather than second-guessing + // whether SLURM is ready, we just keep trying + // this until it works. + // + // "launch failed requeued held" seems to be + // another manifestation of this problem, + // resolved the same way. + log.Printf("releasing held job %q", uuid) + sqc.Slurm.Release(uuid) + } else if p < 1<<20 && replacing.wantPriority > 0 { + log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason) } } - return false -} - -// StartMonitor starts the squeue monitoring goroutine. -func (squeue *Squeue) StartMonitor(pollInterval time.Duration) { - squeue.squeueCond = sync.NewCond(&sync.Mutex{}) - squeue.squeueDone = make(chan struct{}) - go squeue.SyncSqueue(pollInterval) + sqc.queue = newq + sqc.Broadcast() } -// Done stops the squeue monitoring goroutine. -func (squeue *Squeue) Done() { - squeue.squeueDone <- struct{}{} - close(squeue.squeueDone) +// Initialize, and start a goroutine to call check() once per +// squeue.Period until terminated by calling Stop(). +func (sqc *SqueueChecker) start() { + sqc.L = &sync.Mutex{} + sqc.done = make(chan struct{}) + go func() { + ticker := time.NewTicker(sqc.Period) + for { + select { + case <-sqc.done: + ticker.Stop() + return + case <-ticker.C: + sqc.check() + sqc.reniceAll() + } + } + }() } -// SyncSqueue periodically polls RunSqueue() at the given duration until -// terminated by calling Done(). -func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) { - ticker := time.NewTicker(pollInterval) - for { - select { - case <-squeue.squeueDone: - return - case <-ticker.C: - squeue.RunSqueue() - } +// All waits for the next squeue invocation, and returns all job +// names reported by squeue. +func (sqc *SqueueChecker) All() []string { + sqc.startOnce.Do(sqc.start) + sqc.L.Lock() + defer sqc.L.Unlock() + sqc.Wait() + var uuids []string + for u := range sqc.queue { + uuids = append(uuids, u) } + return uuids }