3 // Dispatcher service for Crunch that submits containers to the slurm queue.
17 "git.curoverse.com/arvados.git/sdk/go/arvados"
18 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
19 "git.curoverse.com/arvados.git/sdk/go/config"
20 "git.curoverse.com/arvados.git/sdk/go/dispatch"
21 "github.com/coreos/go-systemd/daemon"
24 // Config used by crunch-dispatch-slurm
28 SbatchArguments []string
29 PollPeriod arvados.Duration
31 // crunch-run command to invoke. The container UUID will be
32 // appended. If nil, []string{"crunch-run"} will be used.
34 // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
35 CrunchRunCommand []string
50 const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
53 flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
54 flags.Usage = func() { usage(flags) }
56 configPath := flags.String(
59 "`path` to JSON or YAML configuration file")
60 dumpConfig := flag.Bool(
63 "write current configuration to stdout and exit")
65 // Parse args; omit the first arg which is the command name
66 flags.Parse(os.Args[1:])
68 err := readConfig(&theConfig, *configPath)
73 if theConfig.CrunchRunCommand == nil {
74 theConfig.CrunchRunCommand = []string{"crunch-run"}
77 if theConfig.PollPeriod == 0 {
78 theConfig.PollPeriod = arvados.Duration(10 * time.Second)
81 if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
82 // Copy real configs into env vars so [a]
83 // MakeArvadosClient() uses them, and [b] they get
84 // propagated to crunch-run via SLURM.
85 os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
86 os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
87 os.Setenv("ARVADOS_API_HOST_INSECURE", "")
88 if theConfig.Client.Insecure {
89 os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
91 os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
92 os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
94 log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
98 log.Fatal(config.DumpAndExit(theConfig))
101 arv, err := arvadosclient.MakeArvadosClient()
103 log.Printf("Error making Arvados client: %v", err)
108 squeueUpdater.StartMonitor(time.Duration(theConfig.PollPeriod))
109 defer squeueUpdater.Done()
111 dispatcher := dispatch.Dispatcher{
114 PollInterval: time.Duration(theConfig.PollPeriod),
115 DoneProcessing: make(chan struct{})}
117 if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
118 log.Printf("Error notifying init daemon: %v", err)
121 err = dispatcher.RunDispatcher()
130 func sbatchFunc(container arvados.Container) *exec.Cmd {
131 memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
133 var sbatchArgs []string
134 sbatchArgs = append(sbatchArgs, "--share")
135 sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
136 sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
137 sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
138 sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
139 if container.SchedulingParameters.Partitions != nil {
140 sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
143 return exec.Command("sbatch", sbatchArgs...)
147 func scancelFunc(container arvados.Container) *exec.Cmd {
148 return exec.Command("scancel", "--name="+container.UUID)
151 // Wrap these so that they can be overridden by tests
152 var sbatchCmd = sbatchFunc
153 var scancelCmd = scancelFunc
155 // Submit job to slurm using sbatch.
156 func submit(dispatcher *dispatch.Dispatcher,
157 container arvados.Container, crunchRunCommand []string) (submitErr error) {
159 // If we didn't get as far as submitting a slurm job,
160 // unlock the container and return it to the queue.
161 if submitErr == nil {
162 // OK, no cleanup needed
165 err := dispatcher.Unlock(container.UUID)
167 log.Printf("Error unlocking container %s: %v", container.UUID, err)
171 // Create the command and attach to stdin/stdout
172 cmd := sbatchCmd(container)
173 stdinWriter, stdinerr := cmd.StdinPipe()
175 submitErr = fmt.Errorf("Error creating stdin pipe %v: %q", container.UUID, stdinerr)
179 stdoutReader, stdoutErr := cmd.StdoutPipe()
180 if stdoutErr != nil {
181 submitErr = fmt.Errorf("Error creating stdout pipe %v: %q", container.UUID, stdoutErr)
185 stderrReader, stderrErr := cmd.StderrPipe()
186 if stderrErr != nil {
187 submitErr = fmt.Errorf("Error creating stderr pipe %v: %q", container.UUID, stderrErr)
191 // Mutex between squeue sync and running sbatch or scancel.
192 squeueUpdater.SlurmLock.Lock()
193 defer squeueUpdater.SlurmLock.Unlock()
195 log.Printf("sbatch starting: %+q", cmd.Args)
198 submitErr = fmt.Errorf("Error starting sbatch: %v", err)
202 stdoutChan := make(chan []byte)
204 b, _ := ioutil.ReadAll(stdoutReader)
210 stderrChan := make(chan []byte)
212 b, _ := ioutil.ReadAll(stderrReader)
218 // Send a tiny script on stdin to execute the crunch-run command
219 // slurm actually enforces that this must be a #! script
220 io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
223 stdoutMsg := <-stdoutChan
224 stderrmsg := <-stderrChan
229 submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
233 log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
237 // If the container is marked as Locked, check if it is already in the slurm
238 // queue. If not, submit it.
240 // If the container is marked as Running, check if it is in the slurm queue.
241 // If not, mark it as Cancelled.
242 func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Container, monitorDone *bool) {
245 if squeueUpdater.CheckSqueue(container.UUID) {
246 // Found in the queue, so continue monitoring
248 } else if container.State == dispatch.Locked && !submitted {
249 // Not in queue but in Locked state and we haven't
250 // submitted it yet, so submit it.
252 log.Printf("About to submit queued container %v", container.UUID)
254 if err := submit(dispatcher, container, theConfig.CrunchRunCommand); err != nil {
255 log.Printf("Error submitting container %s to slurm: %v",
257 // maybe sbatch is broken, put it back to queued
258 dispatcher.Unlock(container.UUID)
262 // Not in queue and we are not going to submit it.
263 // Refresh the container state. If it is
264 // Complete/Cancelled, do nothing, if it is Locked then
265 // release it back to the Queue, if it is Running then
266 // clean up the record.
268 var con arvados.Container
269 err := dispatcher.Arv.Get("containers", container.UUID, nil, &con)
271 log.Printf("Error getting final container state: %v", err)
275 case dispatch.Locked:
276 log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
277 container.UUID, con.State, dispatch.Queued)
278 dispatcher.Unlock(container.UUID)
279 case dispatch.Running:
280 st := dispatch.Cancelled
281 log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
282 container.UUID, con.State, st)
283 dispatcher.UpdateState(container.UUID, st)
285 // Container state is Queued, Complete or Cancelled so stop monitoring it.
292 // Run or monitor a container.
294 // Monitor status updates. If the priority changes to zero, cancel the
295 // container using scancel.
296 func run(dispatcher *dispatch.Dispatcher,
297 container arvados.Container,
298 status chan arvados.Container) {
300 log.Printf("Monitoring container %v started", container.UUID)
301 defer log.Printf("Monitoring container %v finished", container.UUID)
304 go monitorSubmitOrCancel(dispatcher, container, &monitorDone)
306 for container = range status {
307 if container.State == dispatch.Locked || container.State == dispatch.Running {
308 if container.Priority == 0 {
309 log.Printf("Canceling container %s", container.UUID)
311 // Mutex between squeue sync and running sbatch or scancel.
312 squeueUpdater.SlurmLock.Lock()
313 cmd := scancelCmd(container)
314 msg, err := cmd.CombinedOutput()
315 squeueUpdater.SlurmLock.Unlock()
318 log.Printf("Error stopping container %s with %v %v: %v %v",
319 container.UUID, cmd.Path, cmd.Args, err, string(msg))
320 if squeueUpdater.CheckSqueue(container.UUID) {
321 log.Printf("Container %s is still in squeue after scancel.",
327 err = dispatcher.UpdateState(container.UUID, dispatch.Cancelled)
334 func readConfig(dst interface{}, path string) error {
335 err := config.LoadFile(dst, path)
336 if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
337 log.Printf("Config not specified. Continue with default configuration.")