X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1bc602ad5480b9b1ed78b318e9d3d9749d2b83ab..d121e087ad1b4e91f869dbd57534c6d6ce51d19d:/services/crunch-dispatch-slurm/squeue.go diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index c1bbe920ed..ee87a44fff 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -11,9 +11,9 @@ import ( // Squeue implements asynchronous polling monitor of the SLURM queue using the // command 'squeue'. -type Squeue struct { +type SqueueChecker struct { Period time.Duration - hasUUID map[string]bool + uuids map[string]bool startOnce sync.Once done chan struct{} sync.Cond @@ -28,35 +28,35 @@ var squeueCmd = squeueFunc // HasUUID checks if a given container UUID is in the slurm queue. // This does not run squeue directly, but instead blocks until woken // up by next successful update of squeue. -func (squeue *Squeue) HasUUID(uuid string) bool { - squeue.startOnce.Do(squeue.start) +func (sqc *SqueueChecker) HasUUID(uuid string) bool { + sqc.startOnce.Do(sqc.start) - squeue.L.Lock() - defer squeue.L.Unlock() + sqc.L.Lock() + defer sqc.L.Unlock() // block until next squeue broadcast signaling an update. - squeue.Wait() - return squeue.hasUUID[uuid] + sqc.Wait() + return sqc.uuids[uuid] } // Stop stops the squeue monitoring goroutine. Do not call HasUUID // after calling Stop. -func (squeue *Squeue) Stop() { - if squeue.done != nil { - close(squeue.done) +func (sqc *SqueueChecker) Stop() { + if sqc.done != nil { + close(sqc.done) } } // check gets the names of jobs in the SLURM queue (running and -// queued). If it succeeds, it updates squeue.hasUUID and wakes up any -// goroutines that are waiting in HasUUID(). -func (squeue *Squeue) check() { +// queued). If it succeeds, it updates squeue.uuids and wakes up any +// goroutines that are waiting in HasUUID() or All(). +func (sqc *SqueueChecker) check() { // Mutex between squeue sync and running sbatch or scancel. This // establishes a sequence so that squeue doesn't run concurrently with // sbatch or scancel; the next update of squeue will occur only after // sbatch or scancel has completed. - squeue.L.Lock() - defer squeue.L.Unlock() + sqc.L.Lock() + defer sqc.L.Unlock() cmd := squeueCmd() stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} @@ -67,28 +67,42 @@ func (squeue *Squeue) check() { } uuids := strings.Split(stdout.String(), "\n") - squeue.hasUUID = make(map[string]bool, len(uuids)) + sqc.uuids = make(map[string]bool, len(uuids)) for _, uuid := range uuids { - squeue.hasUUID[uuid] = true + sqc.uuids[uuid] = true } - squeue.Broadcast() + sqc.Broadcast() } // Initialize, and start a goroutine to call check() once per // squeue.Period until terminated by calling Stop(). -func (squeue *Squeue) start() { - squeue.L = &sync.Mutex{} - squeue.done = make(chan struct{}) +func (sqc *SqueueChecker) start() { + sqc.L = &sync.Mutex{} + sqc.done = make(chan struct{}) go func() { - ticker := time.NewTicker(squeue.Period) + ticker := time.NewTicker(sqc.Period) for { select { - case <-squeue.done: + case <-sqc.done: ticker.Stop() return case <-ticker.C: - squeue.check() + sqc.check() } } }() } + +// All waits for the next squeue invocation, and returns all job +// names reported by squeue. +func (sqc *SqueueChecker) All() []string { + sqc.startOnce.Do(sqc.start) + sqc.L.Lock() + defer sqc.L.Unlock() + sqc.Wait() + var uuids []string + for uuid := range sqc.uuids { + uuids = append(uuids, uuid) + } + return uuids +}