Merge branch '9353-retry-http-error' closes #9353
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 package main
2
3 // Dispatcher service for Crunch that submits containers to the slurm queue.
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvados"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/dispatch"
11         "io/ioutil"
12         "log"
13         "math"
14         "os"
15         "os/exec"
16         "strings"
17         "time"
18 )
19
20 func main() {
21         err := doMain()
22         if err != nil {
23                 log.Fatalf("%q", err)
24         }
25 }
26
27 var (
28         crunchRunCommand *string
29         squeueUpdater    Squeue
30 )
31
32 func doMain() error {
33         flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
34
35         pollInterval := flags.Int(
36                 "poll-interval",
37                 10,
38                 "Interval in seconds to poll for queued containers")
39
40         crunchRunCommand = flags.String(
41                 "crunch-run-command",
42                 "/usr/bin/crunch-run",
43                 "Crunch command to run container")
44
45         // Parse args; omit the first arg which is the command name
46         flags.Parse(os.Args[1:])
47
48         arv, err := arvadosclient.MakeArvadosClient()
49         if err != nil {
50                 log.Printf("Error making Arvados client: %v", err)
51                 return err
52         }
53         arv.Retries = 25
54
55         squeueUpdater.StartMonitor(time.Duration(*pollInterval) * time.Second)
56         defer squeueUpdater.Done()
57
58         dispatcher := dispatch.Dispatcher{
59                 Arv:            arv,
60                 RunContainer:   run,
61                 PollInterval:   time.Duration(*pollInterval) * time.Second,
62                 DoneProcessing: make(chan struct{})}
63
64         err = dispatcher.RunDispatcher()
65         if err != nil {
66                 return err
67         }
68
69         return nil
70 }
71
72 // sbatchCmd
73 func sbatchFunc(container arvados.Container) *exec.Cmd {
74         memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
75         return exec.Command("sbatch", "--share", "--parsable",
76                 fmt.Sprintf("--job-name=%s", container.UUID),
77                 fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
78                 fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs),
79                 fmt.Sprintf("--priority=%d", container.Priority))
80 }
81
82 // scancelCmd
83 func scancelFunc(container arvados.Container) *exec.Cmd {
84         return exec.Command("scancel", "--name="+container.UUID)
85 }
86
87 // Wrap these so that they can be overridden by tests
88 var sbatchCmd = sbatchFunc
89 var scancelCmd = scancelFunc
90
91 // Submit job to slurm using sbatch.
92 func submit(dispatcher *dispatch.Dispatcher,
93         container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) {
94         submitErr = nil
95
96         defer func() {
97                 // If we didn't get as far as submitting a slurm job,
98                 // unlock the container and return it to the queue.
99                 if submitErr == nil {
100                         // OK, no cleanup needed
101                         return
102                 }
103                 err := dispatcher.Arv.Update("containers", container.UUID,
104                         arvadosclient.Dict{
105                                 "container": arvadosclient.Dict{"state": "Queued"}},
106                         nil)
107                 if err != nil {
108                         log.Printf("Error unlocking container %s: %v", container.UUID, err)
109                 }
110         }()
111
112         // Create the command and attach to stdin/stdout
113         cmd := sbatchCmd(container)
114         stdinWriter, stdinerr := cmd.StdinPipe()
115         if stdinerr != nil {
116                 submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
117                 return
118         }
119
120         stdoutReader, stdoutErr := cmd.StdoutPipe()
121         if stdoutErr != nil {
122                 submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
123                 return
124         }
125
126         stderrReader, stderrErr := cmd.StderrPipe()
127         if stderrErr != nil {
128                 submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
129                 return
130         }
131
132         // Mutex between squeue sync and running sbatch or scancel.
133         squeueUpdater.SlurmLock.Lock()
134         defer squeueUpdater.SlurmLock.Unlock()
135
136         err := cmd.Start()
137         if err != nil {
138                 submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
139                 return
140         }
141
142         stdoutChan := make(chan []byte)
143         go func() {
144                 b, _ := ioutil.ReadAll(stdoutReader)
145                 stdoutReader.Close()
146                 stdoutChan <- b
147         }()
148
149         stderrChan := make(chan []byte)
150         go func() {
151                 b, _ := ioutil.ReadAll(stderrReader)
152                 stderrReader.Close()
153                 stderrChan <- b
154         }()
155
156         // Send a tiny script on stdin to execute the crunch-run command
157         // slurm actually enforces that this must be a #! script
158         fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
159         stdinWriter.Close()
160
161         err = cmd.Wait()
162
163         stdoutMsg := <-stdoutChan
164         stderrmsg := <-stderrChan
165
166         close(stdoutChan)
167         close(stderrChan)
168
169         if err != nil {
170                 submitErr = fmt.Errorf("Container submission failed %v: %v %v", cmd.Args, err, stderrmsg)
171                 return
172         }
173
174         // If everything worked out, got the jobid on stdout
175         jobid = strings.TrimSpace(string(stdoutMsg))
176
177         return
178 }
179
180 // If the container is marked as Locked, check if it is already in the slurm
181 // queue.  If not, submit it.
182 //
183 // If the container is marked as Running, check if it is in the slurm queue.
184 // If not, mark it as Cancelled.
185 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
186         submitted := false
187         for !*monitorDone {
188                 if squeueUpdater.CheckSqueue(container.UUID) {
189                         // Found in the queue, so continue monitoring
190                         submitted = true
191                 } else if container.State == dispatch.Locked && !submitted {
192                         // Not in queue but in Locked state and we haven't
193                         // submitted it yet, so submit it.
194
195                         log.Printf("About to submit queued container %v", container.UUID)
196
197                         if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
198                                 log.Printf("Error submitting container %s to slurm: %v",
199                                         container.UUID, err)
200                                 // maybe sbatch is broken, put it back to queued
201                                 dispatcher.UpdateState(container.UUID, dispatch.Queued)
202                         }
203                         submitted = true
204                 } else {
205                         // Not in queue and we are not going to submit it.
206                         // Refresh the container state. If it is
207                         // Complete/Cancelled, do nothing, if it is Locked then
208                         // release it back to the Queue, if it is Running then
209                         // clean up the record.
210
211                         var con arvados.Container
212                         err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
213                         if err != nil {
214                                 log.Printf("Error getting final container state: %v", err)
215                         }
216
217                         var st arvados.ContainerState
218                         switch con.State {
219                         case dispatch.Locked:
220                                 st = dispatch.Queued
221                         case dispatch.Running:
222                                 st = dispatch.Cancelled
223                         default:
224                                 // Container state is Queued, Complete or Cancelled so stop monitoring it.
225                                 return
226                         }
227
228                         log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
229                                 container.UUID, con.State, st)
230                         dispatcher.UpdateState(container.UUID, st)
231                 }
232         }
233 }
234
235 // Run or monitor a container.
236 //
237 // Monitor status updates.  If the priority changes to zero, cancel the
238 // container using scancel.
239 func run(dispatcher *dispatch.Dispatcher,
240         container arvados.Container,
241         status chan arvados.Container) {
242
243         log.Printf("Monitoring container %v started", container.UUID)
244         defer log.Printf("Monitoring container %v finished", container.UUID)
245
246         monitorDone := false
247         go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
248
249         for container = range status {
250                 if container.State == dispatch.Locked || container.State == dispatch.Running {
251                         if container.Priority == 0 {
252                                 log.Printf("Canceling container %s", container.UUID)
253
254                                 // Mutex between squeue sync and running sbatch or scancel.
255                                 squeueUpdater.SlurmLock.Lock()
256                                 err := scancelCmd(container).Run()
257                                 squeueUpdater.SlurmLock.Unlock()
258
259                                 if err != nil {
260                                         log.Printf("Error stopping container %s with scancel: %v",
261                                                 container.UUID, err)
262                                         if squeueUpdater.CheckSqueue(container.UUID) {
263                                                 log.Printf("Container %s is still in squeue after scancel.",
264                                                         container.UUID)
265                                                 continue
266                                         }
267                                 }
268
269                                 err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
270                         }
271                 }
272         }
273         monitorDone = true
274 }