projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '13752-index-all-filenames'
[arvados.git]
/
services
/
crunch-dispatch-slurm
/
squeue.go
diff --git
a/services/crunch-dispatch-slurm/squeue.go
b/services/crunch-dispatch-slurm/squeue.go
index fd4851eb0a8a92b48fcacef0e4552ce99d0a7f48..20305ab90abe91b150ae71a7749fd39c8e529548 100644
(file)
--- a/
services/crunch-dispatch-slurm/squeue.go
+++ b/
services/crunch-dispatch-slurm/squeue.go
@@
-33,7
+33,8
@@
type SqueueChecker struct {
queue map[string]*slurmJob
startOnce sync.Once
done chan struct{}
queue map[string]*slurmJob
startOnce sync.Once
done chan struct{}
- sync.Cond
+ lock sync.RWMutex
+ notify sync.Cond
}
// HasUUID checks if a given container UUID is in the slurm queue.
}
// HasUUID checks if a given container UUID is in the slurm queue.
@@
-42,11
+43,11
@@
type SqueueChecker struct {
func (sqc *SqueueChecker) HasUUID(uuid string) bool {
sqc.startOnce.Do(sqc.start)
func (sqc *SqueueChecker) HasUUID(uuid string) bool {
sqc.startOnce.Do(sqc.start)
- sqc.
L.
Lock()
- defer sqc.
L.
Unlock()
+ sqc.
lock.R
Lock()
+ defer sqc.
lock.R
Unlock()
// block until next squeue broadcast signaling an update.
// block until next squeue broadcast signaling an update.
- sqc.Wait()
+ sqc.
notify.
Wait()
_, exists := sqc.queue[uuid]
return exists
}
_, exists := sqc.queue[uuid]
return exists
}
@@
-55,25
+56,30
@@
func (sqc *SqueueChecker) HasUUID(uuid string) bool {
// container.
func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
sqc.startOnce.Do(sqc.start)
// container.
func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
sqc.startOnce.Do(sqc.start)
- sqc.L.Lock()
-
defer sqc.L.Unl
ock()
- job
, ok
:= sqc.queue[uuid]
- if
!ok
{
+
+
sqc.lock.RL
ock()
+ job := sqc.queue[uuid]
+ if
job == nil
{
// Wait in case the slurm job was just submitted and
// will appear in the next squeue update.
// Wait in case the slurm job was just submitted and
// will appear in the next squeue update.
- sqc.Wait()
- if job, ok = sqc.queue[uuid]; !ok {
- return
- }
+ sqc.notify.Wait()
+ job = sqc.queue[uuid]
+ }
+ needUpdate := job != nil && job.wantPriority != want
+ sqc.lock.RUnlock()
+
+ if needUpdate {
+ sqc.lock.Lock()
+ job.wantPriority = want
+ sqc.lock.Unlock()
}
}
- job.wantPriority = want
}
// adjust slurm job nice values as needed to ensure slurm priority
// order matches Arvados priority order.
func (sqc *SqueueChecker) reniceAll() {
}
// adjust slurm job nice values as needed to ensure slurm priority
// order matches Arvados priority order.
func (sqc *SqueueChecker) reniceAll() {
- sqc.
L.
Lock()
- defer sqc.
L.
Unlock()
+ sqc.
lock.R
Lock()
+ defer sqc.
lock.R
Unlock()
jobs := make([]*slurmJob, 0, len(sqc.queue))
for _, j := range sqc.queue {
jobs := make([]*slurmJob, 0, len(sqc.queue))
for _, j := range sqc.queue {
@@
-82,7
+88,7
@@
func (sqc *SqueueChecker) reniceAll() {
// (perhaps it's not an Arvados job)
continue
}
// (perhaps it's not an Arvados job)
continue
}
- if j.priority
== 0
{
+ if j.priority
<= 2*slurm15NiceLimit
{
// SLURM <= 15.x implements "hold" by setting
// priority to 0. If we include held jobs
// here, we'll end up trying to push other
// SLURM <= 15.x implements "hold" by setting
// priority to 0. If we include held jobs
// here, we'll end up trying to push other
@@
-133,12
+139,8
@@
func (sqc *SqueueChecker) Stop() {
// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
- // Mutex between squeue sync and running sbatch or scancel. This
- // establishes a sequence so that squeue doesn't run concurrently with
- // sbatch or scancel; the next update of squeue will occur only after
- // sbatch or scancel has completed.
- sqc.L.Lock()
- defer sqc.L.Unlock()
+ sqc.lock.Lock()
+ defer sqc.lock.Unlock()
cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
@@
-191,18
+193,18
@@
func (sqc *SqueueChecker) check() {
// resolved the same way.
log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
sqc.Slurm.Release(uuid)
// resolved the same way.
log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
sqc.Slurm.Release(uuid)
- } else if
p < 1<<20
&& replacing.wantPriority > 0 {
+ } else if
state != "RUNNING" && p <= 2*slurm15NiceLimit
&& replacing.wantPriority > 0 {
log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
}
}
sqc.queue = newq
log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
}
}
sqc.queue = newq
- sqc.Broadcast()
+ sqc.
notify.
Broadcast()
}
// Initialize, and start a goroutine to call check() once per
// squeue.Period until terminated by calling Stop().
func (sqc *SqueueChecker) start() {
}
// Initialize, and start a goroutine to call check() once per
// squeue.Period until terminated by calling Stop().
func (sqc *SqueueChecker) start() {
- sqc.
L = &sync.Mutex{}
+ sqc.
notify.L = sqc.lock.RLocker()
sqc.done = make(chan struct{})
go func() {
ticker := time.NewTicker(sqc.Period)
sqc.done = make(chan struct{})
go func() {
ticker := time.NewTicker(sqc.Period)
@@
-214,6
+216,15
@@
func (sqc *SqueueChecker) start() {
case <-ticker.C:
sqc.check()
sqc.reniceAll()
case <-ticker.C:
sqc.check()
sqc.reniceAll()
+ select {
+ case <-ticker.C:
+ // If this iteration took
+ // longer than sqc.Period,
+ // consume the next tick and
+ // wait. Otherwise we would
+ // starve other goroutines.
+ default:
+ }
}
}
}()
}
}
}()
@@
-223,9
+234,9
@@
func (sqc *SqueueChecker) start() {
// names reported by squeue.
func (sqc *SqueueChecker) All() []string {
sqc.startOnce.Do(sqc.start)
// names reported by squeue.
func (sqc *SqueueChecker) All() []string {
sqc.startOnce.Do(sqc.start)
- sqc.
L.
Lock()
- defer sqc.
L.
Unlock()
- sqc.Wait()
+ sqc.
lock.R
Lock()
+ defer sqc.
lock.R
Unlock()
+ sqc.
notify.
Wait()
var uuids []string
for u := range sqc.queue {
uuids = append(uuids, u)
var uuids []string
for u := range sqc.queue {
uuids = append(uuids, u)