}
var (
- theConfig Config
- squeueUpdater Squeue
+ theConfig Config
+ sqCheck SqueueChecker
)
const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
}
arv.Retries = 25
- squeueUpdater = Squeue{Period: time.Duration(theConfig.PollPeriod)}
- defer squeueUpdater.Stop()
+ sqCheck = SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
+ defer sqCheck.Stop()
dispatcher := dispatch.Dispatcher{
Arv: arv,
cmd.Stderr = &stderr
// Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.L.Lock()
- defer squeueUpdater.L.Unlock()
+ sqCheck.L.Lock()
+ defer sqCheck.L.Unlock()
log.Printf("exec sbatch %+q", cmd.Args)
err := cmd.Run()
func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
submitted := false
for !*monitorDone {
- if squeueUpdater.HasUUID(container.UUID) {
+ if sqCheck.HasUUID(container.UUID) {
// Found in the queue, so continue monitoring
submitted = true
} else if container.State == dispatch.Locked && !submitted {
if container.Priority == 0 && (container.State == dispatch.Locked || container.State == dispatch.Running) {
log.Printf("Canceling container %s", container.UUID)
// Mutex between squeue sync and running sbatch or scancel.
- squeueUpdater.L.Lock()
+ sqCheck.L.Lock()
cmd := scancelCmd(container)
msg, err := cmd.CombinedOutput()
- squeueUpdater.L.Unlock()
+ sqCheck.L.Unlock()
if err != nil {
log.Printf("Error stopping container %s with %v %v: %v %v", container.UUID, cmd.Path, cmd.Args, err, string(msg))
- if squeueUpdater.HasUUID(container.UUID) {
+ if sqCheck.HasUUID(container.UUID) {
log.Printf("Container %s is still in squeue after scancel.", container.UUID)
continue
}
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
-type Squeue struct {
+type SqueueChecker struct {
Period time.Duration
hasUUID map[string]bool
startOnce sync.Once
// HasUUID checks if a given container UUID is in the slurm queue.
// This does not run squeue directly, but instead blocks until woken
// up by next successful update of squeue.
-func (squeue *Squeue) HasUUID(uuid string) bool {
- squeue.startOnce.Do(squeue.start)
+func (sqc *SqueueChecker) HasUUID(uuid string) bool {
+ sqc.startOnce.Do(sqc.start)
- squeue.L.Lock()
- defer squeue.L.Unlock()
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
// block until next squeue broadcast signaling an update.
- squeue.Wait()
- return squeue.hasUUID[uuid]
+ sqc.Wait()
+ return sqc.hasUUID[uuid]
}
// Stop stops the squeue monitoring goroutine. Do not call HasUUID
// after calling Stop.
-func (squeue *Squeue) Stop() {
- if squeue.done != nil {
- close(squeue.done)
+func (sqc *SqueueChecker) Stop() {
+ if sqc.done != nil {
+ close(sqc.done)
}
}
// check gets the names of jobs in the SLURM queue (running and
// queued). If it succeeds, it updates squeue.hasUUID and wakes up any
// goroutines that are waiting in HasUUID().
-func (squeue *Squeue) check() {
+func (sqc *SqueueChecker) check() {
// Mutex between squeue sync and running sbatch or scancel. This
// establishes a sequence so that squeue doesn't run concurrently with
// sbatch or scancel; the next update of squeue will occur only after
// sbatch or scancel has completed.
- squeue.L.Lock()
- defer squeue.L.Unlock()
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
cmd := squeueCmd()
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
}
uuids := strings.Split(stdout.String(), "\n")
- squeue.hasUUID = make(map[string]bool, len(uuids))
+ sqc.hasUUID = make(map[string]bool, len(uuids))
for _, uuid := range uuids {
- squeue.hasUUID[uuid] = true
+ sqc.hasUUID[uuid] = true
}
- squeue.Broadcast()
+ sqc.Broadcast()
}
// Initialize, and start a goroutine to call check() once per
// squeue.Period until terminated by calling Stop().
-func (squeue *Squeue) start() {
- squeue.L = &sync.Mutex{}
- squeue.done = make(chan struct{})
+func (sqc *SqueueChecker) start() {
+ sqc.L = &sync.Mutex{}
+ sqc.done = make(chan struct{})
go func() {
- ticker := time.NewTicker(squeue.Period)
+ ticker := time.NewTicker(sqc.Period)
for {
select {
- case <-squeue.done:
+ case <-sqc.done:
ticker.Stop()
return
case <-ticker.C:
- squeue.check()
+ sqc.check()
}
}
}()