Merge branch '9684-workflows'
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 package main
2
3 // Dispatcher service for Crunch that submits containers to the slurm queue.
4
5 import (
6         "encoding/json"
7         "flag"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvados"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/dispatch"
12         "io"
13         "io/ioutil"
14         "log"
15         "math"
16         "os"
17         "os/exec"
18         "strings"
19         "time"
20 )
21
22 // Config used by crunch-dispatch-slurm
23 type Config struct {
24         arvados.Client
25
26         SbatchArguments []string
27         PollPeriod      arvados.Duration
28
29         // crunch-run command to invoke. The container UUID will be
30         // appended. If nil, []string{"crunch-run"} will be used.
31         //
32         // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
33         CrunchRunCommand []string
34 }
35
36 func main() {
37         err := doMain()
38         if err != nil {
39                 log.Fatalf("%q", err)
40         }
41 }
42
43 var (
44         config        Config
45         squeueUpdater Squeue
46 )
47
48 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/config.json"
49
50 func doMain() error {
51         flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
52         flags.Usage = func() { usage(flags) }
53
54         configPath := flags.String(
55                 "config",
56                 defaultConfigPath,
57                 "`path` to json configuration file")
58
59         // Parse args; omit the first arg which is the command name
60         flags.Parse(os.Args[1:])
61
62         err := readConfig(&config, *configPath)
63         if err != nil {
64                 log.Printf("Error reading configuration: %v", err)
65                 return err
66         }
67
68         if config.CrunchRunCommand == nil {
69                 config.CrunchRunCommand = []string{"crunch-run"}
70         }
71
72         if config.PollPeriod == 0 {
73                 config.PollPeriod = arvados.Duration(10 * time.Second)
74         }
75
76         if config.Client.APIHost != "" || config.Client.AuthToken != "" {
77                 // Copy real configs into env vars so [a]
78                 // MakeArvadosClient() uses them, and [b] they get
79                 // propagated to crunch-run via SLURM.
80                 os.Setenv("ARVADOS_API_HOST", config.Client.APIHost)
81                 os.Setenv("ARVADOS_API_TOKEN", config.Client.AuthToken)
82                 os.Setenv("ARVADOS_API_INSECURE", "")
83                 if config.Client.Insecure {
84                         os.Setenv("ARVADOS_API_INSECURE", "1")
85                 }
86                 os.Setenv("ARVADOS_KEEP_SERVICES", "")
87                 os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
88         } else {
89                 log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
90         }
91
92         arv, err := arvadosclient.MakeArvadosClient()
93         if err != nil {
94                 log.Printf("Error making Arvados client: %v", err)
95                 return err
96         }
97         arv.Retries = 25
98
99         squeueUpdater.StartMonitor(time.Duration(config.PollPeriod))
100         defer squeueUpdater.Done()
101
102         dispatcher := dispatch.Dispatcher{
103                 Arv:            arv,
104                 RunContainer:   run,
105                 PollInterval:   time.Duration(config.PollPeriod),
106                 DoneProcessing: make(chan struct{})}
107
108         err = dispatcher.RunDispatcher()
109         if err != nil {
110                 return err
111         }
112
113         return nil
114 }
115
116 // sbatchCmd
117 func sbatchFunc(container arvados.Container) *exec.Cmd {
118         memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
119
120         var sbatchArgs []string
121         sbatchArgs = append(sbatchArgs, "--share")
122         sbatchArgs = append(sbatchArgs, config.SbatchArguments...)
123         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
124         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
125         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
126
127         return exec.Command("sbatch", sbatchArgs...)
128 }
129
130 // scancelCmd
131 func scancelFunc(container arvados.Container) *exec.Cmd {
132         return exec.Command("scancel", "--name="+container.UUID)
133 }
134
135 // Wrap these so that they can be overridden by tests
136 var sbatchCmd = sbatchFunc
137 var scancelCmd = scancelFunc
138
139 // Submit job to slurm using sbatch.
140 func submit(dispatcher *dispatch.Dispatcher,
141         container arvados.Container, crunchRunCommand []string) (submitErr error) {
142         defer func() {
143                 // If we didn't get as far as submitting a slurm job,
144                 // unlock the container and return it to the queue.
145                 if submitErr == nil {
146                         // OK, no cleanup needed
147                         return
148                 }
149                 err := dispatcher.Arv.Update("containers", container.UUID,
150                         arvadosclient.Dict{
151                                 "container": arvadosclient.Dict{"state": "Queued"}},
152                         nil)
153                 if err != nil {
154                         log.Printf("Error unlocking container %s: %v", container.UUID, err)
155                 }
156         }()
157
158         // Create the command and attach to stdin/stdout
159         cmd := sbatchCmd(container)
160         stdinWriter, stdinerr := cmd.StdinPipe()
161         if stdinerr != nil {
162                 submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
163                 return
164         }
165
166         stdoutReader, stdoutErr := cmd.StdoutPipe()
167         if stdoutErr != nil {
168                 submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
169                 return
170         }
171
172         stderrReader, stderrErr := cmd.StderrPipe()
173         if stderrErr != nil {
174                 submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
175                 return
176         }
177
178         // Mutex between squeue sync and running sbatch or scancel.
179         squeueUpdater.SlurmLock.Lock()
180         defer squeueUpdater.SlurmLock.Unlock()
181
182         err := cmd.Start()
183         if err != nil {
184                 submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
185                 return
186         }
187
188         stdoutChan := make(chan []byte)
189         go func() {
190                 b, _ := ioutil.ReadAll(stdoutReader)
191                 stdoutReader.Close()
192                 stdoutChan <- b
193         }()
194
195         stderrChan := make(chan []byte)
196         go func() {
197                 b, _ := ioutil.ReadAll(stderrReader)
198                 stderrReader.Close()
199                 stderrChan <- b
200         }()
201
202         // Send a tiny script on stdin to execute the crunch-run command
203         // slurm actually enforces that this must be a #! script
204         io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
205         stdinWriter.Close()
206
207         err = cmd.Wait()
208
209         stdoutMsg := <-stdoutChan
210         stderrmsg := <-stderrChan
211
212         close(stdoutChan)
213         close(stderrChan)
214
215         if err != nil {
216                 submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
217                 return
218         }
219
220         log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
221         return
222 }
223
224 // If the container is marked as Locked, check if it is already in the slurm
225 // queue.  If not, submit it.
226 //
227 // If the container is marked as Running, check if it is in the slurm queue.
228 // If not, mark it as Cancelled.
229 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
230         submitted := false
231         for !*monitorDone {
232                 if squeueUpdater.CheckSqueue(container.UUID) {
233                         // Found in the queue, so continue monitoring
234                         submitted = true
235                 } else if container.State == dispatch.Locked && !submitted {
236                         // Not in queue but in Locked state and we haven't
237                         // submitted it yet, so submit it.
238
239                         log.Printf("About to submit queued container %v", container.UUID)
240
241                         if err := submit(dispatcher, container, config.CrunchRunCommand); err != nil {
242                                 log.Printf("Error submitting container %s to slurm: %v",
243                                         container.UUID, err)
244                                 // maybe sbatch is broken, put it back to queued
245                                 dispatcher.UpdateState(container.UUID, dispatch.Queued)
246                         }
247                         submitted = true
248                 } else {
249                         // Not in queue and we are not going to submit it.
250                         // Refresh the container state. If it is
251                         // Complete/Cancelled, do nothing, if it is Locked then
252                         // release it back to the Queue, if it is Running then
253                         // clean up the record.
254
255                         var con arvados.Container
256                         err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
257                         if err != nil {
258                                 log.Printf("Error getting final container state: %v", err)
259                         }
260
261                         var st arvados.ContainerState
262                         switch con.State {
263                         case dispatch.Locked:
264                                 st = dispatch.Queued
265                         case dispatch.Running:
266                                 st = dispatch.Cancelled
267                         default:
268                                 // Container state is Queued, Complete or Cancelled so stop monitoring it.
269                                 return
270                         }
271
272                         log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
273                                 container.UUID, con.State, st)
274                         dispatcher.UpdateState(container.UUID, st)
275                 }
276         }
277 }
278
279 // Run or monitor a container.
280 //
281 // Monitor status updates.  If the priority changes to zero, cancel the
282 // container using scancel.
283 func run(dispatcher *dispatch.Dispatcher,
284         container arvados.Container,
285         status chan arvados.Container) {
286
287         log.Printf("Monitoring container %v started", container.UUID)
288         defer log.Printf("Monitoring container %v finished", container.UUID)
289
290         monitorDone := false
291         go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
292
293         for container = range status {
294                 if container.State == dispatch.Locked || container.State == dispatch.Running {
295                         if container.Priority == 0 {
296                                 log.Printf("Canceling container %s", container.UUID)
297
298                                 // Mutex between squeue sync and running sbatch or scancel.
299                                 squeueUpdater.SlurmLock.Lock()
300                                 err := scancelCmd(container).Run()
301                                 squeueUpdater.SlurmLock.Unlock()
302
303                                 if err != nil {
304                                         log.Printf("Error stopping container %s with scancel: %v",
305                                                 container.UUID, err)
306                                         if squeueUpdater.CheckSqueue(container.UUID) {
307                                                 log.Printf("Container %s is still in squeue after scancel.",
308                                                         container.UUID)
309                                                 continue
310                                         }
311                                 }
312
313                                 err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
314                         }
315                 }
316         }
317         monitorDone = true
318 }
319
320 func readConfig(dst interface{}, path string) error {
321         if buf, err := ioutil.ReadFile(path); err != nil && os.IsNotExist(err) {
322                 if path == defaultConfigPath {
323                         log.Printf("Config not specified. Continue with default configuration.")
324                 } else {
325                         return fmt.Errorf("Config file not found %q: %v", path, err)
326                 }
327         } else if err != nil {
328                 return fmt.Errorf("Error reading config %q: %v", path, err)
329         } else if err = json.Unmarshal(buf, dst); err != nil {
330                 return fmt.Errorf("Error decoding config %q: %v", path, err)
331         }
332         return nil
333 }