queue map[string]*slurmJob
startOnce sync.Once
done chan struct{}
- sync.Cond
+ lock sync.RWMutex
+ notify sync.Cond
}
// HasUUID checks if a given container UUID is in the slurm queue.
func (sqc *SqueueChecker) HasUUID(uuid string) bool {
sqc.startOnce.Do(sqc.start)
- sqc.L.Lock()
- defer sqc.L.Unlock()
+ sqc.lock.RLock()
+ defer sqc.lock.RUnlock()
// block until next squeue broadcast signaling an update.
- sqc.Wait()
+ sqc.notify.Wait()
_, exists := sqc.queue[uuid]
return exists
}
// container.
func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
sqc.startOnce.Do(sqc.start)
- sqc.L.Lock()
- defer sqc.L.Unlock()
- job, ok := sqc.queue[uuid]
- if !ok {
+
+ sqc.lock.RLock()
+ job := sqc.queue[uuid]
+ if job == nil {
// Wait in case the slurm job was just submitted and
// will appear in the next squeue update.
- sqc.Wait()
- if job, ok = sqc.queue[uuid]; !ok {
- return
- }
+ sqc.notify.Wait()
+ job = sqc.queue[uuid]
+ }
+ needUpdate := job != nil && job.wantPriority != want
+ sqc.lock.RUnlock()
+
+ if needUpdate {
+ sqc.lock.Lock()
+ job.wantPriority = want
+ sqc.lock.Unlock()
}
- job.wantPriority = want
}
// adjust slurm job nice values as needed to ensure slurm priority
// order matches Arvados priority order.
func (sqc *SqueueChecker) reniceAll() {
- sqc.L.Lock()
- defer sqc.L.Unlock()
+ sqc.lock.RLock()
+ defer sqc.lock.RUnlock()
jobs := make([]*slurmJob, 0, len(sqc.queue))
for _, j := range sqc.queue {
// (perhaps it's not an Arvados job)
continue
}
- if j.priority == 0 {
+ if j.priority <= 2*slurm15NiceLimit {
// SLURM <= 15.x implements "hold" by setting
// priority to 0. If we include held jobs
// here, we'll end up trying to push other
// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
- // Mutex between squeue sync and running sbatch or scancel. This
- // establishes a sequence so that squeue doesn't run concurrently with
- // sbatch or scancel; the next update of squeue will occur only after
- // sbatch or scancel has completed.
- sqc.L.Lock()
- defer sqc.L.Unlock()
+ sqc.lock.Lock()
+ defer sqc.lock.Unlock()
cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
// resolved the same way.
log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
sqc.Slurm.Release(uuid)
- } else if p < 1<<20 && replacing.wantPriority > 0 {
+ } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
}
}
sqc.queue = newq
- sqc.Broadcast()
+ sqc.notify.Broadcast()
}
// Initialize, and start a goroutine to call check() once per
// squeue.Period until terminated by calling Stop().
func (sqc *SqueueChecker) start() {
- sqc.L = &sync.Mutex{}
+ sqc.notify.L = sqc.lock.RLocker()
sqc.done = make(chan struct{})
go func() {
ticker := time.NewTicker(sqc.Period)
case <-ticker.C:
sqc.check()
sqc.reniceAll()
+ select {
+ case <-ticker.C:
+ // If this iteration took
+ // longer than sqc.Period,
+ // consume the next tick and
+ // wait. Otherwise we would
+ // starve other goroutines.
+ default:
+ }
}
}
}()
// names reported by squeue.
func (sqc *SqueueChecker) All() []string {
sqc.startOnce.Do(sqc.start)
- sqc.L.Lock()
- defer sqc.L.Unlock()
- sqc.Wait()
+ sqc.lock.RLock()
+ defer sqc.lock.RUnlock()
+ sqc.notify.Wait()
var uuids []string
for u := range sqc.queue {
uuids = append(uuids, u)