"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/coreos/go-systemd/daemon"
"io"
"io/ioutil"
"log"
// Config used by crunch-dispatch-slurm
type Config struct {
+ Client arvados.Client
+
SbatchArguments []string
- PollPeriod *time.Duration
+ PollPeriod arvados.Duration
// crunch-run command to invoke. The container UUID will be
// appended. If nil, []string{"crunch-run"} will be used.
func doMain() error {
flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+ flags.Usage = func() { usage(flags) }
configPath := flags.String(
"config",
defaultConfigPath,
"`path` to json configuration file")
- config.PollPeriod = flags.Duration(
- "poll-interval",
- 10*time.Second,
- "Time duration to poll for queued containers")
-
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
config.CrunchRunCommand = []string{"crunch-run"}
}
+ if config.PollPeriod == 0 {
+ config.PollPeriod = arvados.Duration(10 * time.Second)
+ }
+
+ if config.Client.APIHost != "" || config.Client.AuthToken != "" {
+ // Copy real configs into env vars so [a]
+ // MakeArvadosClient() uses them, and [b] they get
+ // propagated to crunch-run via SLURM.
+ os.Setenv("ARVADOS_API_HOST", config.Client.APIHost)
+ os.Setenv("ARVADOS_API_TOKEN", config.Client.AuthToken)
+ os.Setenv("ARVADOS_API_INSECURE", "")
+ if config.Client.Insecure {
+ os.Setenv("ARVADOS_API_INSECURE", "1")
+ }
+ os.Setenv("ARVADOS_KEEP_SERVICES", "")
+ os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+ } else {
+ log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
+ }
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("Error making Arvados client: %v", err)
}
arv.Retries = 25
- squeueUpdater.StartMonitor(*config.PollPeriod)
+ squeueUpdater.StartMonitor(time.Duration(config.PollPeriod))
defer squeueUpdater.Done()
dispatcher := dispatch.Dispatcher{
Arv: arv,
RunContainer: run,
- PollInterval: *config.PollPeriod,
+ PollInterval: time.Duration(config.PollPeriod),
DoneProcessing: make(chan struct{})}
+ if _, err := daemon.SdNotify("READY=1"); err != nil {
+ log.Printf("Error notifying init daemon: %v", err)
+ }
+
err = dispatcher.RunDispatcher()
if err != nil {
return err
// OK, no cleanup needed
return
}
- err := dispatcher.Arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Queued"}},
- nil)
+ err := dispatcher.Unlock(container.UUID)
if err != nil {
log.Printf("Error unlocking container %s: %v", container.UUID, err)
}
log.Printf("Error submitting container %s to slurm: %v",
container.UUID, err)
// maybe sbatch is broken, put it back to queued
- dispatcher.UpdateState(container.UUID, dispatch.Queued)
+ dispatcher.Unlock(container.UUID)
}
submitted = true
} else {
log.Printf("Error getting final container state: %v", err)
}
- var st arvados.ContainerState
switch con.State {
case dispatch.Locked:
- st = dispatch.Queued
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ container.UUID, con.State, dispatch.Queued)
+ dispatcher.Unlock(container.UUID)
case dispatch.Running:
- st = dispatch.Cancelled
+ st := dispatch.Cancelled
+ log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
+ container.UUID, con.State, st)
+ dispatcher.UpdateState(container.UUID, st)
default:
// Container state is Queued, Complete or Cancelled so stop monitoring it.
return
}
-
- log.Printf("Container %s in state %v but missing from slurm queue, changing to %v.",
- container.UUID, con.State, st)
- dispatcher.UpdateState(container.UUID, st)
}
}
}