"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "io"
"io/ioutil"
"log"
"math"
// Config used by crunch-dispatch-slurm
type Config struct {
- SbatchArguments []string
- PollPeriod *time.Duration
- CrunchRunCommand *string
+ SbatchArguments []string
+ PollPeriod arvados.Duration
+
+ // crunch-run command to invoke. The container UUID will be
+ // appended. If nil, []string{"crunch-run"} will be used.
+ //
+ // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
+ CrunchRunCommand []string
}
func main() {
func doMain() error {
flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+ flags.Usage = func() { usage(flags) }
configPath := flags.String(
"config",
defaultConfigPath,
"`path` to json configuration file")
- config.PollPeriod = flags.Duration(
- "poll-interval",
- 10*time.Second,
- "Time duration to poll for queued containers")
-
- config.CrunchRunCommand = flags.String(
- "crunch-run-command",
- "/usr/bin/crunch-run",
- "Crunch command to run container")
-
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
return err
}
+ if config.CrunchRunCommand == nil {
+ config.CrunchRunCommand = []string{"crunch-run"}
+ }
+
+ if config.PollPeriod == 0 {
+ config.PollPeriod = arvados.Duration(10 * time.Second)
+ }
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("Error making Arvados client: %v", err)
}
arv.Retries = 25
- squeueUpdater.StartMonitor(*config.PollPeriod)
+ squeueUpdater.StartMonitor(time.Duration(config.PollPeriod))
defer squeueUpdater.Done()
dispatcher := dispatch.Dispatcher{
Arv: arv,
RunContainer: run,
- PollInterval: *config.PollPeriod,
+ PollInterval: time.Duration(config.PollPeriod),
DoneProcessing: make(chan struct{})}
err = dispatcher.RunDispatcher()
// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher,
- container arvados.Container, crunchRunCommand string) (submitErr error) {
+ container arvados.Container, crunchRunCommand []string) (submitErr error) {
defer func() {
// If we didn't get as far as submitting a slurm job,
// unlock the container and return it to the queue.
// Send a tiny script on stdin to execute the crunch-run command
// slurm actually enforces that this must be a #! script
- fmt.Fprintf(stdinWriter, "#!/bin/sh\nexec '%s' '%s'\n", crunchRunCommand, container.UUID)
+ io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
stdinWriter.Close()
err = cmd.Wait()
log.Printf("About to submit queued container %v", container.UUID)
- if err := submit(dispatcher, container, *config.CrunchRunCommand); err != nil {
+ if err := submit(dispatcher, container, config.CrunchRunCommand); err != nil {
log.Printf("Error submitting container %s to slurm: %v",
container.UUID, err)
// maybe sbatch is broken, put it back to queued