Merge branch '13752-index-all-filenames'
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index 45d06c8c1e27f12f2bc6e83ca262ab2ff7f08a53..20305ab90abe91b150ae71a7749fd39c8e529548 100644 (file)
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
-       "bufio"
-       "io"
-       "io/ioutil"
+       "bytes"
+       "fmt"
        "log"
-       "os/exec"
+       "sort"
+       "strings"
        "sync"
        "time"
 )
 
-// Squeue implements asynchronous polling monitor of the SLURM queue using the
-// command 'squeue'.
-type Squeue struct {
-       squeueContents []string
-       squeueDone     chan struct{}
-       squeueCond     *sync.Cond
-       SlurmLock      sync.Mutex
-}
+const slurm15NiceLimit int64 = 10000
 
-// squeueFunc
-func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--all", "--format=%j")
+type slurmJob struct {
+       uuid         string
+       wantPriority int64
+       priority     int64 // current slurm priority (incorporates nice value)
+       nice         int64 // current slurm nice value
+       hitNiceLimit bool
 }
 
-var squeueCmd = squeueFunc
-
-// RunSqueue runs squeue once and captures the output.  If it succeeds, set
-// "squeueContents" and then wake up any goroutines waiting squeueCond in
-// CheckSqueue().  If there was an error, log it and leave the threads blocked.
-func (squeue *Squeue) RunSqueue() {
-       var newSqueueContents []string
+// Squeue implements asynchronous polling monitor of the SLURM queue using the
+// command 'squeue'.
+type SqueueChecker struct {
+       Period         time.Duration
+       PrioritySpread int64
+       Slurm          Slurm
+       queue          map[string]*slurmJob
+       startOnce      sync.Once
+       done           chan struct{}
+       lock           sync.RWMutex
+       notify         sync.Cond
+}
 
-       // Mutex between squeue sync and running sbatch or scancel.  This
-       // establishes a sequence so that squeue doesn't run concurrently with
-       // sbatch or scancel; the next update of squeue will occur only after
-       // sbatch or scancel has completed.
-       squeue.SlurmLock.Lock()
-       defer squeue.SlurmLock.Unlock()
+// HasUUID checks if a given container UUID is in the slurm queue.
+// This does not run squeue directly, but instead blocks until woken
+// up by next successful update of squeue.
+func (sqc *SqueueChecker) HasUUID(uuid string) bool {
+       sqc.startOnce.Do(sqc.start)
 
-       // Also ensure unlock on all return paths
+       sqc.lock.RLock()
+       defer sqc.lock.RUnlock()
 
-       cmd := squeueCmd()
-       sq, err := cmd.StdoutPipe()
-       if err != nil {
-               log.Printf("Error creating stdout pipe for squeue: %v", err)
-               return
-       }
+       // block until next squeue broadcast signaling an update.
+       sqc.notify.Wait()
+       _, exists := sqc.queue[uuid]
+       return exists
+}
 
-       stderrReader, err := cmd.StderrPipe()
-       if err != nil {
-               log.Printf("Error creating stderr pipe for squeue: %v", err)
-               return
+// SetPriority sets or updates the desired (Arvados) priority for a
+// container.
+func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
+       sqc.startOnce.Do(sqc.start)
+
+       sqc.lock.RLock()
+       job := sqc.queue[uuid]
+       if job == nil {
+               // Wait in case the slurm job was just submitted and
+               // will appear in the next squeue update.
+               sqc.notify.Wait()
+               job = sqc.queue[uuid]
        }
+       needUpdate := job != nil && job.wantPriority != want
+       sqc.lock.RUnlock()
 
-       err = cmd.Start()
-       if err != nil {
-               log.Printf("Error running squeue: %v", err)
-               return
+       if needUpdate {
+               sqc.lock.Lock()
+               job.wantPriority = want
+               sqc.lock.Unlock()
        }
+}
 
-       stderrChan := make(chan []byte)
-       go func() {
-               b, _ := ioutil.ReadAll(stderrReader)
-               stderrChan <- b
-               close(stderrChan)
-       }()
-
-       scanner := bufio.NewScanner(sq)
-       for scanner.Scan() {
-               newSqueueContents = append(newSqueueContents, scanner.Text())
+// adjust slurm job nice values as needed to ensure slurm priority
+// order matches Arvados priority order.
+func (sqc *SqueueChecker) reniceAll() {
+       sqc.lock.RLock()
+       defer sqc.lock.RUnlock()
+
+       jobs := make([]*slurmJob, 0, len(sqc.queue))
+       for _, j := range sqc.queue {
+               if j.wantPriority == 0 {
+                       // SLURM job with unknown Arvados priority
+                       // (perhaps it's not an Arvados job)
+                       continue
+               }
+               if j.priority <= 2*slurm15NiceLimit {
+                       // SLURM <= 15.x implements "hold" by setting
+                       // priority to 0. If we include held jobs
+                       // here, we'll end up trying to push other
+                       // jobs below them using negative priority,
+                       // which won't help anything.
+                       continue
+               }
+               jobs = append(jobs, j)
        }
-       io.Copy(ioutil.Discard, sq)
 
-       stderrmsg := <-stderrChan
-
-       err = cmd.Wait()
-
-       if scanner.Err() != nil {
-               log.Printf("Error reading from squeue pipe: %v", err)
-       }
-       if err != nil {
-               log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
+       sort.Slice(jobs, func(i, j int) bool {
+               if jobs[i].wantPriority != jobs[j].wantPriority {
+                       return jobs[i].wantPriority > jobs[j].wantPriority
+               } else {
+                       // break ties with container uuid --
+                       // otherwise, the ordering would change from
+                       // one interval to the next, and we'd do many
+                       // pointless slurm queue rearrangements.
+                       return jobs[i].uuid > jobs[j].uuid
+               }
+       })
+       renice := wantNice(jobs, sqc.PrioritySpread)
+       for i, job := range jobs {
+               niceNew := renice[i]
+               if job.hitNiceLimit && niceNew > slurm15NiceLimit {
+                       niceNew = slurm15NiceLimit
+               }
+               if niceNew == job.nice {
+                       continue
+               }
+               err := sqc.Slurm.Renice(job.uuid, niceNew)
+               if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
+                       log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
+                       job.hitNiceLimit = true
+               }
        }
+}
 
-       if scanner.Err() == nil && err == nil {
-               squeue.squeueCond.L.Lock()
-               squeue.squeueContents = newSqueueContents
-               squeue.squeueCond.Broadcast()
-               squeue.squeueCond.L.Unlock()
+// Stop stops the squeue monitoring goroutine. Do not call HasUUID
+// after calling Stop.
+func (sqc *SqueueChecker) Stop() {
+       if sqc.done != nil {
+               close(sqc.done)
        }
 }
 
-// CheckSqueue checks if a given container UUID is in the slurm queue.  This
-// does not run squeue directly, but instead blocks until woken up by next
-// successful update of squeue.
-func (squeue *Squeue) CheckSqueue(uuid string) bool {
-       squeue.squeueCond.L.Lock()
-       // block until next squeue broadcast signaling an update.
-       squeue.squeueCond.Wait()
-       contents := squeue.squeueContents
-       squeue.squeueCond.L.Unlock()
+// check gets the names of jobs in the SLURM queue (running and
+// queued). If it succeeds, it updates sqc.queue and wakes up any
+// goroutines that are waiting in HasUUID() or All().
+func (sqc *SqueueChecker) check() {
+       sqc.lock.Lock()
+       defer sqc.lock.Unlock()
+
+       cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
+       stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
+       cmd.Stdout, cmd.Stderr = stdout, stderr
+       if err := cmd.Run(); err != nil {
+               log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+               return
+       }
 
-       for _, k := range contents {
-               if k == uuid {
-                       return true
+       lines := strings.Split(stdout.String(), "\n")
+       newq := make(map[string]*slurmJob, len(lines))
+       for _, line := range lines {
+               if line == "" {
+                       continue
+               }
+               var uuid, state, reason string
+               var n, p int64
+               if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
+                       log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+                       continue
+               }
+               replacing, ok := sqc.queue[uuid]
+               if !ok {
+                       replacing = &slurmJob{uuid: uuid}
+               }
+               replacing.priority = p
+               replacing.nice = n
+               newq[uuid] = replacing
+
+               if state == "PENDING" && ((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held") && replacing.wantPriority > 0 {
+                       // When using SLURM 14.x or 15.x, our queued
+                       // jobs land in this state when "scontrol
+                       // reconfigure" invalidates their feature
+                       // constraints by clearing all node features.
+                       // They stay in this state even after the
+                       // features reappear, until we run "scontrol
+                       // release {jobid}". Priority is usually 0 in
+                       // this state, but sometimes (due to a race
+                       // with nice adjustments?) it's a small
+                       // positive value.
+                       //
+                       // "scontrol release" is silent and successful
+                       // regardless of whether the features have
+                       // reappeared, so rather than second-guessing
+                       // whether SLURM is ready, we just keep trying
+                       // this until it works.
+                       //
+                       // "launch failed requeued held" seems to be
+                       // another manifestation of this problem,
+                       // resolved the same way.
+                       log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
+                       sqc.Slurm.Release(uuid)
+               } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
+                       log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
                }
        }
-       return false
-}
-
-// StartMonitor starts the squeue monitoring goroutine.
-func (squeue *Squeue) StartMonitor(pollInterval time.Duration) {
-       squeue.squeueCond = sync.NewCond(&sync.Mutex{})
-       squeue.squeueDone = make(chan struct{})
-       go squeue.SyncSqueue(pollInterval)
+       sqc.queue = newq
+       sqc.notify.Broadcast()
 }
 
-// Done stops the squeue monitoring goroutine.
-func (squeue *Squeue) Done() {
-       squeue.squeueDone <- struct{}{}
-       close(squeue.squeueDone)
+// Initialize, and start a goroutine to call check() once per
+// squeue.Period until terminated by calling Stop().
+func (sqc *SqueueChecker) start() {
+       sqc.notify.L = sqc.lock.RLocker()
+       sqc.done = make(chan struct{})
+       go func() {
+               ticker := time.NewTicker(sqc.Period)
+               for {
+                       select {
+                       case <-sqc.done:
+                               ticker.Stop()
+                               return
+                       case <-ticker.C:
+                               sqc.check()
+                               sqc.reniceAll()
+                               select {
+                               case <-ticker.C:
+                                       // If this iteration took
+                                       // longer than sqc.Period,
+                                       // consume the next tick and
+                                       // wait. Otherwise we would
+                                       // starve other goroutines.
+                               default:
+                               }
+                       }
+               }
+       }()
 }
 
-// SyncSqueue periodically polls RunSqueue() at the given duration until
-// terminated by calling Done().
-func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
-       ticker := time.NewTicker(pollInterval)
-       for {
-               select {
-               case <-squeue.squeueDone:
-                       return
-               case <-ticker.C:
-                       squeue.RunSqueue()
-               }
+// All waits for the next squeue invocation, and returns all job
+// names reported by squeue.
+func (sqc *SqueueChecker) All() []string {
+       sqc.startOnce.Do(sqc.start)
+       sqc.lock.RLock()
+       defer sqc.lock.RUnlock()
+       sqc.notify.Wait()
+       var uuids []string
+       for u := range sqc.queue {
+               uuids = append(uuids, u)
        }
+       return uuids
 }