Merge branch '14660-arvbox-workbench2' refs #14660
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
index 20305ab90abe91b150ae71a7749fd39c8e529548..5aee7e087b2658945b2eebe1f2f309d67c351d16 100644 (file)
@@ -7,7 +7,6 @@ package main
 import (
        "bytes"
        "fmt"
-       "log"
        "sort"
        "strings"
        "sync"
@@ -27,6 +26,7 @@ type slurmJob struct {
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
 type SqueueChecker struct {
+       Logger         logger
        Period         time.Duration
        PrioritySpread int64
        Slurm          Slurm
@@ -78,9 +78,9 @@ func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
 // adjust slurm job nice values as needed to ensure slurm priority
 // order matches Arvados priority order.
 func (sqc *SqueueChecker) reniceAll() {
-       sqc.lock.RLock()
-       defer sqc.lock.RUnlock()
-
+       // This is slow (it shells out to scontrol many times) and no
+       // other goroutines update sqc.queue or any of the job fields
+       // we use here, so we don't acquire a lock.
        jobs := make([]*slurmJob, 0, len(sqc.queue))
        for _, j := range sqc.queue {
                if j.wantPriority == 0 {
@@ -121,7 +121,7 @@ func (sqc *SqueueChecker) reniceAll() {
                }
                err := sqc.Slurm.Renice(job.uuid, niceNew)
                if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
-                       log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
+                       sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
                        job.hitNiceLimit = true
                }
        }
@@ -139,14 +139,11 @@ func (sqc *SqueueChecker) Stop() {
 // queued). If it succeeds, it updates sqc.queue and wakes up any
 // goroutines that are waiting in HasUUID() or All().
 func (sqc *SqueueChecker) check() {
-       sqc.lock.Lock()
-       defer sqc.lock.Unlock()
-
        cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
        cmd.Stdout, cmd.Stderr = stdout, stderr
        if err := cmd.Run(); err != nil {
-               log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+               sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
                return
        }
 
@@ -159,9 +156,13 @@ func (sqc *SqueueChecker) check() {
                var uuid, state, reason string
                var n, p int64
                if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
-                       log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+                       sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
                        continue
                }
+
+               // No other goroutines write to jobs' priority or nice
+               // fields, so we can read and write them without
+               // locks.
                replacing, ok := sqc.queue[uuid]
                if !ok {
                        replacing = &slurmJob{uuid: uuid}
@@ -191,13 +192,15 @@ func (sqc *SqueueChecker) check() {
                        // "launch failed requeued held" seems to be
                        // another manifestation of this problem,
                        // resolved the same way.
-                       log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
+                       sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
                        sqc.Slurm.Release(uuid)
                } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
-                       log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
+                       sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
                }
        }
+       sqc.lock.Lock()
        sqc.queue = newq
+       sqc.lock.Unlock()
        sqc.notify.Broadcast()
 }