Make sure that we can set ARVADOS_KEEP_SERVICES via the
[arvados.git] / services / crunch-dispatch-slurm / crunch-dispatch-slurm.go
1 package main
2
3 // Dispatcher service for Crunch that submits containers to the slurm queue.
4
5 import (
6         "flag"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/arvados"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/config"
11         "git.curoverse.com/arvados.git/sdk/go/dispatch"
12         "github.com/coreos/go-systemd/daemon"
13         "io"
14         "io/ioutil"
15         "log"
16         "math"
17         "os"
18         "os/exec"
19         "strings"
20         "time"
21 )
22
23 // Config used by crunch-dispatch-slurm
24 type Config struct {
25         Client arvados.Client
26
27         SbatchArguments []string
28         PollPeriod      arvados.Duration
29
30         // crunch-run command to invoke. The container UUID will be
31         // appended. If nil, []string{"crunch-run"} will be used.
32         //
33         // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
34         CrunchRunCommand []string
35
36         ArvadosKeepServices []string
37 }
38
39 func main() {
40         err := doMain()
41         if err != nil {
42                 log.Fatal(err)
43         }
44 }
45
46 var (
47         theConfig     Config
48         squeueUpdater Squeue
49 )
50
51 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
52
53 func doMain() error {
54         flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
55         flags.Usage = func() { usage(flags) }
56
57         configPath := flags.String(
58                 "config",
59                 defaultConfigPath,
60                 "`path` to JSON or YAML configuration file")
61
62         // Parse args; omit the first arg which is the command name
63         flags.Parse(os.Args[1:])
64
65         err := readConfig(&theConfig, *configPath)
66         if err != nil {
67                 return err
68         }
69
70         if theConfig.CrunchRunCommand == nil {
71                 theConfig.CrunchRunCommand = []string{"crunch-run"}
72         }
73
74         if theConfig.PollPeriod == 0 {
75                 theConfig.PollPeriod = arvados.Duration(10 * time.Second)
76         }
77
78         if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
79                 // Copy real configs into env vars so [a]
80                 // MakeArvadosClient() uses them, and [b] they get
81                 // propagated to crunch-run via SLURM.
82                 os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
83                 os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
84                 os.Setenv("ARVADOS_API_INSECURE", "")
85                 if theConfig.Client.Insecure {
86                         os.Setenv("ARVADOS_API_INSECURE", "1")
87                 }
88                 os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.ArvadosKeepServices, " "))
89                 os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
90         } else {
91                 log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
92         }
93
94         arv, err := arvadosclient.MakeArvadosClient()
95         if err != nil {
96                 log.Printf("Error making Arvados client: %v", err)
97                 return err
98         }
99         arv.Retries = 25
100
101         squeueUpdater.StartMonitor(time.Duration(theConfig.PollPeriod))
102         defer squeueUpdater.Done()
103
104         dispatcher := dispatch.Dispatcher{
105                 Arv:            arv,
106                 RunContainer:   run,
107                 PollInterval:   time.Duration(theConfig.PollPeriod),
108                 DoneProcessing: make(chan struct{})}
109
110         if _, err := daemon.SdNotify("READY=1"); err != nil {
111                 log.Printf("Error notifying init daemon: %v", err)
112         }
113
114         err = dispatcher.RunDispatcher()
115         if err != nil {
116                 return err
117         }
118
119         return nil
120 }
121
122 // sbatchCmd
123 func sbatchFunc(container arvados.Container) *exec.Cmd {
124         memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
125
126         var sbatchArgs []string
127         sbatchArgs = append(sbatchArgs, "--share")
128         sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
129         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
130         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
131         sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
132
133         return exec.Command("sbatch", sbatchArgs...)
134 }
135
136 // scancelCmd
137 func scancelFunc(container arvados.Container) *exec.Cmd {
138         return exec.Command("scancel", "--name="+container.UUID)
139 }
140
141 // Wrap these so that they can be overridden by tests
142 var sbatchCmd = sbatchFunc
143 var scancelCmd = scancelFunc
144
145 // Submit job to slurm using sbatch.
146 func submit(dispatcher *dispatch.Dispatcher,
147         container arvados.Container, crunchRunCommand []string) (submitErr error) {
148         defer func() {
149                 // If we didn't get as far as submitting a slurm job,
150                 // unlock the container and return it to the queue.
151                 if submitErr == nil {
152                         // OK, no cleanup needed
153                         return
154                 }
155                 err := dispatcher.Unlock(container.UUID)
156                 if err != nil {
157                         log.Printf("Error unlocking container %s: %v", container.UUID, err)
158                 }
159         }()
160
161         // Create the command and attach to stdin/stdout
162         cmd := sbatchCmd(container)
163         stdinWriter, stdinerr := cmd.StdinPipe()
164         if stdinerr != nil {
165                 submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
166                 return
167         }
168
169         stdoutReader, stdoutErr := cmd.StdoutPipe()
170         if stdoutErr != nil {
171                 submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
172                 return
173         }
174
175         stderrReader, stderrErr := cmd.StderrPipe()
176         if stderrErr != nil {
177                 submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
178                 return
179         }
180
181         // Mutex between squeue sync and running sbatch or scancel.
182         squeueUpdater.SlurmLock.Lock()
183         defer squeueUpdater.SlurmLock.Unlock()
184
185         err := cmd.Start()
186         if err != nil {
187                 submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
188                 return
189         }
190
191         stdoutChan := make(chan []byte)
192         go func() {
193                 b, _ := ioutil.ReadAll(stdoutReader)
194                 stdoutReader.Close()
195                 stdoutChan <- b
196         }()
197
198         stderrChan := make(chan []byte)
199         go func() {
200                 b, _ := ioutil.ReadAll(stderrReader)
201                 stderrReader.Close()
202                 stderrChan <- b
203         }()
204
205         // Send a tiny script on stdin to execute the crunch-run command
206         // slurm actually enforces that this must be a #! script
207         io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
208         stdinWriter.Close()
209
210         err = cmd.Wait()
211
212         stdoutMsg := <-stdoutChan
213         stderrmsg := <-stderrChan
214
215         close(stdoutChan)
216         close(stderrChan)
217
218         if err != nil {
219                 submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
220                 return
221         }
222
223         log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
224         return
225 }
226
227 // If the container is marked as Locked, check if it is already in the slurm
228 // queue.  If not, submit it.
229 //
230 // If the container is marked as Running, check if it is in the slurm queue.
231 // If not, mark it as Cancelled.
232 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
233         submitted := false
234         for !*monitorDone {
235                 if squeueUpdater.CheckSqueue(container.UUID) {
236                         // Found in the queue, so continue monitoring
237                         submitted = true
238                 } else if container.State == dispatch.Locked && !submitted {
239                         // Not in queue but in Locked state and we haven't
240                         // submitted it yet, so submit it.
241
242                         log.Printf("About to submit queued container %v", container.UUID)
243
244                         if err := submit(dispatcher, container, theConfig.CrunchRunCommand); err != nil {
245                                 log.Printf("Error submitting container %s to slurm: %v",
246                                         container.UUID, err)
247                                 // maybe sbatch is broken, put it back to queued
248                                 dispatcher.Unlock(container.UUID)
249                         }
250                         submitted = true
251                 } else {
252                         // Not in queue and we are not going to submit it.
253                         // Refresh the container state. If it is
254                         // Complete/Cancelled, do nothing, if it is Locked then
255                         // release it back to the Queue, if it is Running then
256                         // clean up the record.
257
258                         var con arvados.Container
259                         err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
260                         if err != nil {
261                                 log.Printf("Error getting final container state: %v", err)
262                         }
263
264                         switch con.State {
265                         case dispatch.Locked:
266                                 log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
267                                         container.UUID, con.State, dispatch.Queued)
268                                 dispatcher.Unlock(container.UUID)
269                         case dispatch.Running:
270                                 st := dispatch.Cancelled
271                                 log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
272                                         container.UUID, con.State, st)
273                                 dispatcher.UpdateState(container.UUID, st)
274                         default:
275                                 // Container state is Queued, Complete or Cancelled so stop monitoring it.
276                                 return
277                         }
278                 }
279         }
280 }
281
282 // Run or monitor a container.
283 //
284 // Monitor status updates.  If the priority changes to zero, cancel the
285 // container using scancel.
286 func run(dispatcher *dispatch.Dispatcher,
287         container arvados.Container,
288         status chan arvados.Container) {
289
290         log.Printf("Monitoring container %v started", container.UUID)
291         defer log.Printf("Monitoring container %v finished", container.UUID)
292
293         monitorDone := false
294         go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
295
296         for container = range status {
297                 if container.State == dispatch.Locked || container.State == dispatch.Running {
298                         if container.Priority == 0 {
299                                 log.Printf("Canceling container %s", container.UUID)
300
301                                 // Mutex between squeue sync and running sbatch or scancel.
302                                 squeueUpdater.SlurmLock.Lock()
303                                 err := scancelCmd(container).Run()
304                                 squeueUpdater.SlurmLock.Unlock()
305
306                                 if err != nil {
307                                         log.Printf("Error stopping container %s with scancel: %v",
308                                                 container.UUID, err)
309                                         if squeueUpdater.CheckSqueue(container.UUID) {
310                                                 log.Printf("Container %s is still in squeue after scancel.",
311                                                         container.UUID)
312                                                 continue
313                                         }
314                                 }
315
316                                 err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
317                         }
318                 }
319         }
320         monitorDone = true
321 }
322
323 func readConfig(dst interface{}, path string) error {
324         err := config.LoadFile(dst, path)
325         if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
326                 log.Printf("Config not specified. Continue with default configuration.")
327                 err = nil
328         }
329         return err
330 }