// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
- "encoding/json"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"github.com/coreos/go-systemd/daemon"
"io"
func main() {
err := doMain()
if err != nil {
- log.Fatalf("%q", err)
+ log.Fatal(err)
}
}
var (
- config Config
+ theConfig Config
squeueUpdater Squeue
)
-const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/config.json"
+const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
func doMain() error {
flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
configPath := flags.String(
"config",
defaultConfigPath,
- "`path` to json configuration file")
+ "`path` to JSON or YAML configuration file")
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
- err := readConfig(&config, *configPath)
+ err := readConfig(&theConfig, *configPath)
if err != nil {
- log.Printf("Error reading configuration: %v", err)
return err
}
- if config.CrunchRunCommand == nil {
- config.CrunchRunCommand = []string{"crunch-run"}
+ if theConfig.CrunchRunCommand == nil {
+ theConfig.CrunchRunCommand = []string{"crunch-run"}
}
- if config.PollPeriod == 0 {
- config.PollPeriod = arvados.Duration(10 * time.Second)
+ if theConfig.PollPeriod == 0 {
+ theConfig.PollPeriod = arvados.Duration(10 * time.Second)
}
- if config.Client.APIHost != "" || config.Client.AuthToken != "" {
+ if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
// MakeArvadosClient() uses them, and [b] they get
// propagated to crunch-run via SLURM.
- os.Setenv("ARVADOS_API_HOST", config.Client.APIHost)
- os.Setenv("ARVADOS_API_TOKEN", config.Client.AuthToken)
- os.Setenv("ARVADOS_API_INSECURE", "")
- if config.Client.Insecure {
- os.Setenv("ARVADOS_API_INSECURE", "1")
+ os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
+ os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "")
+ if theConfig.Client.Insecure {
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_KEEP_SERVICES", "")
+ os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
}
arv.Retries = 25
- squeueUpdater.StartMonitor(time.Duration(config.PollPeriod))
+ squeueUpdater.StartMonitor(time.Duration(theConfig.PollPeriod))
defer squeueUpdater.Done()
dispatcher := dispatch.Dispatcher{
Arv: arv,
RunContainer: run,
- PollInterval: time.Duration(config.PollPeriod),
+ PollInterval: time.Duration(theConfig.PollPeriod),
DoneProcessing: make(chan struct{})}
- if _, err := daemon.SdNotify("READY=1"); err != nil {
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
var sbatchArgs []string
sbatchArgs = append(sbatchArgs, "--share")
- sbatchArgs = append(sbatchArgs, config.SbatchArguments...)
+ sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
+ if container.SchedulingParameters.Partitions != nil {
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
+ }
return exec.Command("sbatch", sbatchArgs...)
}
// OK, no cleanup needed
return
}
- err := dispatcher.Arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Queued"}},
- nil)
+ err := dispatcher.Unlock(container.UUID)
if err != nil {
log.Printf("Error unlocking container %s: %v", container.UUID, err)
}
squeueUpdater.SlurmLock.Lock()
defer squeueUpdater.SlurmLock.Unlock()
+ log.Printf("sbatch starting: %+q", cmd.Args)
err := cmd.Start()
if err != nil {
- submitErr = fmt.Errorf("Error starting %v: %v", cmd.Args, err)
+ submitErr = fmt.Errorf("Error starting sbatch: %v", err)
return
}
b, _ := ioutil.ReadAll(stdoutReader)
stdoutReader.Close()
stdoutChan <- b
+ close(stdoutChan)
}()
stderrChan := make(chan []byte)
b, _ := ioutil.ReadAll(stderrReader)
stderrReader.Close()
stderrChan <- b
+ close(stderrChan)
}()
// Send a tiny script on stdin to execute the crunch-run command
io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
stdinWriter.Close()
- err = cmd.Wait()
-
stdoutMsg := <-stdoutChan
stderrmsg := <-stderrChan
- close(stdoutChan)
- close(stderrChan)
+ err = cmd.Wait()
if err != nil {
submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
log.Printf("About to submit queued container %v", container.UUID)
- if err := submit(dispatcher, container, config.CrunchRunCommand); err != nil {
+ if err := submit(dispatcher, container, theConfig.CrunchRunCommand); err != nil {
log.Printf("Error submitting container %s to slurm: %v",
container.UUID, err)
// maybe sbatch is broken, put it back to queued
- dispatcher.UpdateState(container.UUID, dispatch.Queued)
+ dispatcher.Unlock(container.UUID)
}
submitted = true
} else {
log.Printf("Error getting final container state: %v", err)
}
- var st arvados.ContainerState
switch con.State {
case dispatch.Locked:
- st = dispatch.Queued
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ container.UUID, con.State, dispatch.Queued)
+ dispatcher.Unlock(container.UUID)
case dispatch.Running:
- st = dispatch.Cancelled
+ st := dispatch.Cancelled
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ container.UUID, con.State, st)
+ dispatcher.UpdateState(container.UUID, st)
default:
// Container state is Queued, Complete or Cancelled so stop monitoring it.
return
}
-
- log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
- container.UUID, con.State, st)
- dispatcher.UpdateState(container.UUID, st)
}
}
}
// Mutex between squeue sync and running sbatch or scancel.
squeueUpdater.SlurmLock.Lock()
- err := scancelCmd(container).Run()
+ cmd := scancelCmd(container)
+ msg, err := cmd.CombinedOutput()
squeueUpdater.SlurmLock.Unlock()
if err != nil {
- log.Printf("Error stopping container %s with scancel: %v",
- container.UUID, err)
+ log.Printf("Error stopping container %s with %v %v: %v %v",
+ container.UUID, cmd.Path, cmd.Args, err, string(msg))
if squeueUpdater.CheckSqueue(container.UUID) {
log.Printf("Container %s is still in squeue after scancel.",
container.UUID)
}
func readConfig(dst interface{}, path string) error {
- if buf, err := ioutil.ReadFile(path); err != nil && os.IsNotExist(err) {
- if path == defaultConfigPath {
- log.Printf("Config not specified. Continue with default configuration.")
- } else {
- return fmt.Errorf("Config file not found %q: %v", path, err)
- }
- } else if err != nil {
- return fmt.Errorf("Error reading config %q: %v", path, err)
- } else if err = json.Unmarshal(buf, dst); err != nil {
- return fmt.Errorf("Error decoding config %q: %v", path, err)
+ err := config.LoadFile(dst, path)
+ if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
+ log.Printf("Config not specified. Continue with default configuration.")
+ err = nil
}
- return nil
+ return err
}