9595: Fix hardcoded temp dir in test. Add checks for CleanupDirs().
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 package main
2
3 // Dispatcher service for Crunch that submits containers to the slurm queue.
4
5 import (
6         "encoding/json"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/dispatch"
12         "io/ioutil"
13         "log"
14         "math"
15         "os"
16         "os/exec"
17         "strings"
18         "time"
19 )
20
21 // Config used by crunch-dispatch-slurm
22 type Config struct {
23         SbatchArguments  []string
24         PollPeriod       *time.Duration
25         CrunchRunCommand *string
26 }
27
28 func main() {
29         err := doMain()
30         if err != nil {
31                 log.Fatalf("%q", err)
32         }
33 }
34
35 var (
36         config        Config
37         squeueUpdater Squeue
38 )
39
40 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/config.json"
41
42 func doMain() error {
43         flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
44
45         configPath := flags.String(
46                 "config",
47                 defaultConfigPath,
48                 "`path` to json configuration file")
49
50         config.PollPeriod = flags.Duration(
51                 "poll-interval",
52                 10*time.Second,
53                 "Time duration to poll for queued containers")
54
55         config.CrunchRunCommand = flags.String(
56                 "crunch-run-command",
57                 "/usr/bin/crunch-run",
58                 "Crunch command to run container")
59
60         // Parse args; omit the first arg which is the command name
61         flags.Parse(os.Args[1:])
62
63         err := readConfig(&config, *configPath)
64         if err != nil {
65                 log.Printf("Error reading configuration: %v", err)
66                 return err
67         }
68
69         arv, err := arvadosclient.MakeArvadosClient()
70         if err != nil {
71                 log.Printf("Error making Arvados client: %v", err)
72                 return err
73         }
74         arv.Retries = 25
75
76         squeueUpdater.StartMonitor(*config.PollPeriod)
77         defer squeueUpdater.Done()
78
79         dispatcher := dispatch.Dispatcher{
80                 Arv:            arv,
81                 RunContainer:   run,
82                 PollInterval:   *config.PollPeriod,
83                 DoneProcessing: make(chan struct{})}
84
85         err = dispatcher.RunDispatcher()
86         if err != nil {
87                 return err
88         }
89
90         return nil
91 }
92
93 // sbatchCmd
94 func sbatchFunc(container arvados.Container) *exec.Cmd {
95         memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
96
97         var sbatchArgs []string
98         sbatchArgs = append(sbatchArgs, "--share")
99         sbatchArgs = append(sbatchArgs, config.SbatchArguments...)
100         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
101         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
102         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
103
104         return exec.Command("sbatch", sbatchArgs...)
105 }
106
107 // scancelCmd
108 func scancelFunc(container arvados.Container) *exec.Cmd {
109         return exec.Command("scancel", "--name="+container.UUID)
110 }
111
112 // Wrap these so that they can be overridden by tests
113 var sbatchCmd = sbatchFunc
114 var scancelCmd = scancelFunc
115
116 // Submit job to slurm using sbatch.
117 func submit(dispatcher *dispatch.Dispatcher,
118         container arvados.Container, crunchRunCommand string) (submitErr error) {
119         defer func() {
120                 // If we didn't get as far as submitting a slurm job,
121                 // unlock the container and return it to the queue.
122                 if submitErr == nil {
123                         // OK, no cleanup needed
124                         return
125                 }
126                 err := dispatcher.Arv.Update("containers", container.UUID,
127                         arvadosclient.Dict{
128                                 "container": arvadosclient.Dict{"state": "Queued"}},
129                         nil)
130                 if err != nil {
131                         log.Printf("Error unlocking container %s: %v", container.UUID, err)
132                 }
133         }()
134
135         // Create the command and attach to stdin/stdout
136         cmd := sbatchCmd(container)
137         stdinWriter, stdinerr := cmd.StdinPipe()
138         if stdinerr != nil {
139                 submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
140                 return
141         }
142
143         stdoutReader, stdoutErr := cmd.StdoutPipe()
144         if stdoutErr != nil {
145                 submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
146                 return
147         }
148
149         stderrReader, stderrErr := cmd.StderrPipe()
150         if stderrErr != nil {
151                 submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
152                 return
153         }
154
155         // Mutex between squeue sync and running sbatch or scancel.
156         squeueUpdater.SlurmLock.Lock()
157         defer squeueUpdater.SlurmLock.Unlock()
158
159         err := cmd.Start()
160         if err != nil {
161                 submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
162                 return
163         }
164
165         stdoutChan := make(chan []byte)
166         go func() {
167                 b, _ := ioutil.ReadAll(stdoutReader)
168                 stdoutReader.Close()
169                 stdoutChan <- b
170         }()
171
172         stderrChan := make(chan []byte)
173         go func() {
174                 b, _ := ioutil.ReadAll(stderrReader)
175                 stderrReader.Close()
176                 stderrChan <- b
177         }()
178
179         // Send a tiny script on stdin to execute the crunch-run command
180         // slurm actually enforces that this must be a #! script
181         fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
182         stdinWriter.Close()
183
184         err = cmd.Wait()
185
186         stdoutMsg := <-stdoutChan
187         stderrmsg := <-stderrChan
188
189         close(stdoutChan)
190         close(stderrChan)
191
192         if err != nil {
193                 submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
194                 return
195         }
196
197         log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
198         return
199 }
200
201 // If the container is marked as Locked, check if it is already in the slurm
202 // queue.  If not, submit it.
203 //
204 // If the container is marked as Running, check if it is in the slurm queue.
205 // If not, mark it as Cancelled.
206 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
207         submitted := false
208         for !*monitorDone {
209                 if squeueUpdater.CheckSqueue(container.UUID) {
210                         // Found in the queue, so continue monitoring
211                         submitted = true
212                 } else if container.State == dispatch.Locked && !submitted {
213                         // Not in queue but in Locked state and we haven't
214                         // submitted it yet, so submit it.
215
216                         log.Printf("About to submit queued container %v", container.UUID)
217
218                         if err := submit(dispatcher, container, *config.CrunchRunCommand); err != nil {
219                                 log.Printf("Error submitting container %s to slurm: %v",
220                                         container.UUID, err)
221                                 // maybe sbatch is broken, put it back to queued
222                                 dispatcher.UpdateState(container.UUID, dispatch.Queued)
223                         }
224                         submitted = true
225                 } else {
226                         // Not in queue and we are not going to submit it.
227                         // Refresh the container state. If it is
228                         // Complete/Cancelled, do nothing, if it is Locked then
229                         // release it back to the Queue, if it is Running then
230                         // clean up the record.
231
232                         var con arvados.Container
233                         err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
234                         if err != nil {
235                                 log.Printf("Error getting final container state: %v", err)
236                         }
237
238                         var st arvados.ContainerState
239                         switch con.State {
240                         case dispatch.Locked:
241                                 st = dispatch.Queued
242                         case dispatch.Running:
243                                 st = dispatch.Cancelled
244                         default:
245                                 // Container state is Queued, Complete or Cancelled so stop monitoring it.
246                                 return
247                         }
248
249                         log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
250                                 container.UUID, con.State, st)
251                         dispatcher.UpdateState(container.UUID, st)
252                 }
253         }
254 }
255
256 // Run or monitor a container.
257 //
258 // Monitor status updates.  If the priority changes to zero, cancel the
259 // container using scancel.
260 func run(dispatcher *dispatch.Dispatcher,
261         container arvados.Container,
262         status chan arvados.Container) {
263
264         log.Printf("Monitoring container %v started", container.UUID)
265         defer log.Printf("Monitoring container %v finished", container.UUID)
266
267         monitorDone := false
268         go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
269
270         for container = range status {
271                 if container.State == dispatch.Locked || container.State == dispatch.Running {
272                         if container.Priority == 0 {
273                                 log.Printf("Canceling container %s", container.UUID)
274
275                                 // Mutex between squeue sync and running sbatch or scancel.
276                                 squeueUpdater.SlurmLock.Lock()
277                                 err := scancelCmd(container).Run()
278                                 squeueUpdater.SlurmLock.Unlock()
279
280                                 if err != nil {
281                                         log.Printf("Error stopping container %s with scancel: %v",
282                                                 container.UUID, err)
283                                         if squeueUpdater.CheckSqueue(container.UUID) {
284                                                 log.Printf("Container %s is still in squeue after scancel.",
285                                                         container.UUID)
286                                                 continue
287                                         }
288                                 }
289
290                                 err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
291                         }
292                 }
293         }
294         monitorDone = true
295 }
296
297 func readConfig(dst interface{}, path string) error {
298         if buf, err := ioutil.ReadFile(path); err != nil && os.IsNotExist(err) {
299                 if path == defaultConfigPath {
300                         log.Printf("Config not specified. Continue with default configuration.")
301                 } else {
302                         return fmt.Errorf("Config file not found %q: %v", path, err)
303                 }
304         } else if err != nil {
305                 return fmt.Errorf("Error reading config %q: %v", path, err)
306         } else if err = json.Unmarshal(buf, dst); err != nil {
307                 return fmt.Errorf("Error decoding config %q: %v", path, err)
308         }
309         return nil
310 }