1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 // Squeue implements asynchronous polling monitor of the SLURM queue using the
18 type SqueueChecker struct {
26 func squeueFunc() *exec.Cmd {
27 return exec.Command("squeue", "--all", "--format=%j")
30 var squeueCmd = squeueFunc
32 // HasUUID checks if a given container UUID is in the slurm queue.
33 // This does not run squeue directly, but instead blocks until woken
34 // up by next successful update of squeue.
35 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
36 sqc.startOnce.Do(sqc.start)
41 // block until next squeue broadcast signaling an update.
43 return sqc.uuids[uuid]
46 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
47 // after calling Stop.
48 func (sqc *SqueueChecker) Stop() {
54 // check gets the names of jobs in the SLURM queue (running and
55 // queued). If it succeeds, it updates squeue.uuids and wakes up any
56 // goroutines that are waiting in HasUUID() or All().
57 func (sqc *SqueueChecker) check() {
58 // Mutex between squeue sync and running sbatch or scancel. This
59 // establishes a sequence so that squeue doesn't run concurrently with
60 // sbatch or scancel; the next update of squeue will occur only after
61 // sbatch or scancel has completed.
66 stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
67 cmd.Stdout, cmd.Stderr = stdout, stderr
68 if err := cmd.Run(); err != nil {
69 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
73 uuids := strings.Split(stdout.String(), "\n")
74 sqc.uuids = make(map[string]bool, len(uuids))
75 for _, uuid := range uuids {
76 sqc.uuids[uuid] = true
81 // Initialize, and start a goroutine to call check() once per
82 // squeue.Period until terminated by calling Stop().
83 func (sqc *SqueueChecker) start() {
85 sqc.done = make(chan struct{})
87 ticker := time.NewTicker(sqc.Period)
100 // All waits for the next squeue invocation, and returns all job
101 // names reported by squeue.
102 func (sqc *SqueueChecker) All() []string {
103 sqc.startOnce.Do(sqc.start)
108 for uuid := range sqc.uuids {
109 uuids = append(uuids, uuid)