3 // Dispatcher service for Crunch that submits containers to the slurm queue.
9 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10 "git.curoverse.com/arvados.git/sdk/go/dispatch"
23 squeueContents []string
24 SqueueDone chan struct{}
35 crunchRunCommand *string
40 flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
42 pollInterval := flags.Int(
45 "Interval in seconds to poll for queued containers")
47 crunchRunCommand = flags.String(
49 "/usr/bin/crunch-run",
50 "Crunch command to run container")
52 // Parse args; omit the first arg which is the command name
53 flags.Parse(os.Args[1:])
55 arv, err := arvadosclient.MakeArvadosClient()
57 log.Printf("Error making Arvados client: %v", err)
62 dispatcher := dispatch.Dispatcher{
65 PollInterval: time.Duration(*pollInterval) * time.Second,
66 DoneProcessing: make(chan struct{})}
68 squeueUpdater.SqueueDone = make(chan struct{})
69 go squeueUpdater.SyncSqueue(time.Duration(*pollInterval) * time.Second)
71 err = dispatcher.RunDispatcher()
76 squeueUpdater.SqueueDone <- struct{}{}
77 close(squeueUpdater.SqueueDone)
83 func sbatchFunc(container dispatch.Container) *exec.Cmd {
84 memPerCPU := math.Ceil((float64(container.RuntimeConstraints["ram"])) / (float64(container.RuntimeConstraints["vcpus"] * 1048576)))
85 return exec.Command("sbatch", "--share", "--parsable",
86 fmt.Sprintf("--job-name=%s", container.UUID),
87 fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
88 fmt.Sprintf("--cpus-per-task=%d", int(container.RuntimeConstraints["vcpus"])),
89 fmt.Sprintf("--priority=%d", container.Priority))
93 func squeueFunc() *exec.Cmd {
94 return exec.Command("squeue", "--format=%j")
97 // Wrap these so that they can be overridden by tests
98 var sbatchCmd = sbatchFunc
99 var squeueCmd = squeueFunc
101 // Submit job to slurm using sbatch.
102 func submit(dispatcher *dispatch.Dispatcher,
103 container dispatch.Container, crunchRunCommand string) (jobid string, submitErr error) {
107 // If we didn't get as far as submitting a slurm job,
108 // unlock the container and return it to the queue.
109 if submitErr == nil {
110 // OK, no cleanup needed
113 err := dispatcher.Arv.Update("containers", container.UUID,
115 "container": arvadosclient.Dict{"state": "Queued"}},
118 log.Printf("Error unlocking container %s: %v", container.UUID, err)
122 // Create the command and attach to stdin/stdout
123 cmd := sbatchCmd(container)
124 stdinWriter, stdinerr := cmd.StdinPipe()
126 submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
130 stdoutReader, stdoutErr := cmd.StdoutPipe()
131 if stdoutErr != nil {
132 submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
136 stderrReader, stderrErr := cmd.StderrPipe()
137 if stderrErr != nil {
138 submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
144 submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
148 stdoutChan := make(chan []byte)
150 b, _ := ioutil.ReadAll(stdoutReader)
155 stderrChan := make(chan []byte)
157 b, _ := ioutil.ReadAll(stderrReader)
162 // Send a tiny script on stdin to execute the crunch-run command
163 // slurm actually enforces that this must be a #! script
164 fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
169 stdoutMsg := <-stdoutChan
170 stderrmsg := <-stderrChan
176 submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
180 // If everything worked out, got the jobid on stdout
181 jobid = strings.TrimSpace(string(stdoutMsg))
186 func (squeue *Squeue) runSqueue() ([]string, error) {
187 var newSqueueContents []string
190 sq, err := cmd.StdoutPipe()
195 scanner := bufio.NewScanner(sq)
197 newSqueueContents = append(newSqueueContents, scanner.Text())
199 if err := scanner.Err(); err != nil {
209 return newSqueueContents, nil
212 func (squeue *Squeue) CheckSqueue(uuid string, check bool) (bool, error) {
214 n, err := squeue.runSqueue()
219 squeue.squeueContents = n
225 defer squeue.Unlock()
226 for _, k := range squeue.squeueContents {
235 func (squeue *Squeue) SyncSqueue(pollInterval time.Duration) {
236 // TODO: considering using "squeue -i" instead of polling squeue.
237 ticker := time.NewTicker(pollInterval)
240 case <-squeueUpdater.SqueueDone:
243 squeue.CheckSqueue("", true)
248 // Run or monitor a container.
250 // If the container is marked as Locked, check if it is already in the slurm
251 // queue. If not, submit it.
253 // If the container is marked as Running, check if it is in the slurm queue.
254 // If not, mark it as Cancelled.
256 // Monitor status updates. If the priority changes to zero, cancel the
257 // container using scancel.
258 func run(dispatcher *dispatch.Dispatcher,
259 container dispatch.Container,
260 status chan dispatch.Container) {
262 uuid := container.UUID
264 if container.State == dispatch.Locked {
265 if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
266 // maybe squeue is broken, put it back in the queue
267 log.Printf("Error running squeue: %v", err)
268 dispatcher.UpdateState(container.UUID, dispatch.Queued)
270 log.Printf("About to submit queued container %v", container.UUID)
272 if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
273 log.Printf("Error submitting container %s to slurm: %v",
275 // maybe sbatch is broken, put it back to queued
276 dispatcher.UpdateState(container.UUID, dispatch.Queued)
281 log.Printf("Monitoring container %v started", uuid)
283 // periodically check squeue
284 doneSqueue := make(chan struct{})
286 squeueUpdater.CheckSqueue(container.UUID, true)
287 ticker := time.NewTicker(dispatcher.PollInterval)
291 if inQ, err := squeueUpdater.CheckSqueue(container.UUID, false); err != nil {
292 log.Printf("Error running squeue: %v", err)
293 // don't cancel, just leave it the way it is
295 var con dispatch.Container
296 err := dispatcher.Arv.Get("containers", uuid, nil, &con)
298 log.Printf("Error getting final container state: %v", err)
303 case dispatch.Locked:
305 case dispatch.Running:
306 st = dispatch.Cancelled
312 log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
314 dispatcher.UpdateState(uuid, st)
325 for container = range status {
326 if container.State == dispatch.Locked || container.State == dispatch.Running {
327 if container.Priority == 0 {
328 log.Printf("Canceling container %s", container.UUID)
330 err := exec.Command("scancel", "--name="+container.UUID).Run()
332 log.Printf("Error stopping container %s with scancel: %v",
334 if inQ, err := squeueUpdater.CheckSqueue(container.UUID, true); err != nil {
335 log.Printf("Error running squeue: %v", err)
338 log.Printf("Container %s is still in squeue after scancel.",
344 err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
349 doneSqueue <- struct{}{}
351 log.Printf("Monitoring container %v finished", uuid)