Update code to use new config.
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>
Debug *bool
}
-// update config using values from an old-style keepstore config file.
-func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
- path := ldr.KeepstorePath
+func (ldr *Loader) loadOldConfigHelper(component, path, defaultPath string, target interface{}) error {
if path == "" {
return nil
}
buf, err := ioutil.ReadFile(path)
- if os.IsNotExist(err) && path == defaultKeepstoreConfigPath {
+ if os.IsNotExist(err) && path == defaultPath {
return nil
} else if err != nil {
return err
} else {
- ldr.Logger.Warnf("you should remove the legacy keepstore config file (%s) after migrating all config keys to the cluster configuration file (%s)", path, ldr.Path)
+ ldr.Logger.Warnf("you should remove the legacy %v config file (%s) after migrating all config keys to the cluster configuration file (%s)", component, path, ldr.Path)
}
- cluster, err := cfg.GetCluster("")
+
+ err = yaml.Unmarshal(buf, target)
if err != nil {
- return err
+ return fmt.Errorf("%s: %s", path, err)
}
+ return nil
+}
+// update config using values from an old-style keepstore config file.
+func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
var oc oldKeepstoreConfig
- err = yaml.Unmarshal(buf, &oc)
+ err := ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, defaultKeepstoreConfigPath, &oc)
if err != nil {
- return fmt.Errorf("%s: %s", path, err)
+ return err
+ }
+
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return err
}
if v := oc.Debug; v == nil {
cfg.Clusters[cluster.ClusterID] = *cluster
return nil
}
+
+type oldCrunchDispatchSlurmConfig struct {
+ Client arvados.Client
+
+ SbatchArguments []string
+ PollPeriod arvados.Duration
+ PrioritySpread int64
+
+ // crunch-run command to invoke. The container UUID will be
+ // appended. If nil, []string{"crunch-run"} will be used.
+ //
+ // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
+ CrunchRunCommand []string
+
+ // Extra RAM to reserve (in Bytes) for SLURM job, in addition
+ // to the amount specified in the container's RuntimeConstraints
+ ReserveExtraRAM int64
+
+ // Minimum time between two attempts to run the same container
+ MinRetryPeriod arvados.Duration
+
+ // Batch size for container queries
+ BatchSize int64
+}
+
+const defaultCrunchDispatchSlurmConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+
+// update config using values from an crunch-dispatch-slurm config file.
+func (ldr *Loader) loadOldCrunchDispatchSlurmConfig(cfg *arvados.Config) error {
+ var oc oldCrunchDispatchSlurmConfig
+ err := ldr.loadOldConfigHelper("crunch-dispatch-slurm", ldr.CrunchDispatchSlurmPath, defaultCrunchDispatchSlurmConfigPath, &oc)
+ if err != nil {
+ return err
+ }
+
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return err
+ }
+
+ u := arvados.URL{}
+ u.Host = oc.Client.APIHost
+ if oc.Client.Scheme != "" {
+ u.Scheme = oc.Client.Scheme
+ } else {
+ u.Scheme = "https"
+ }
+ cluster.Services.Controller.ExternalURL = u
+ cluster.SystemRootToken = oc.Client.AuthToken
+ cluster.TLS.Insecure = oc.Client.Insecure
+
+ cluster.Containers.SLURM.SbatchArgumentsList = oc.SbatchArguments
+ cluster.Containers.CloudVMs.PollInterval = oc.PollPeriod
+ cluster.Containers.SLURM.PrioritySpread = oc.PrioritySpread
+ if len(oc.CrunchRunCommand) >= 1 {
+ cluster.Containers.CrunchRunCommand = oc.CrunchRunCommand[0]
+ }
+ if len(oc.CrunchRunCommand) >= 2 {
+ cluster.Containers.CrunchRunArgumentsList = oc.CrunchRunCommand[1:]
+ }
+ cluster.Containers.ReserveExtraRAM = arvados.ByteSize(oc.ReserveExtraRAM)
+ cluster.Containers.MinRetryPeriod = oc.MinRetryPeriod
+
+ cluster.API.MaxItemsPerResponse = int(oc.BatchSize)
+
+ cfg.Clusters[cluster.ClusterID] = *cluster
+ return nil
+}
"Collections.TrustAllContent": false,
"Containers": true,
"Containers.CloudVMs": false,
+ "Containers.CrunchRunCommand": false,
+ "Containers.CrunchRunArgumentsList": false,
"Containers.DefaultKeepCacheRAM": true,
"Containers.DispatchPrivateKey": false,
"Containers.JobsAPI": true,
"Containers.MaxComputeVMs": false,
"Containers.MaxDispatchAttempts": false,
"Containers.MaxRetryAttempts": true,
+ "Containers.MinRetryPeriod": true,
+ "Containers.ReserveExtraRam": true,
"Containers.SLURM": false,
"Containers.StaleLockTimeout": false,
"Containers.SupportedDockerImageFormats": true,
Logger logrus.FieldLogger
SkipDeprecated bool // Don't load legacy/deprecated config keys/files
- Path string
- KeepstorePath string
+ Path string
+ KeepstorePath string
+ CrunchDispatchSlurmPath string
configdata []byte
}
func (ldr *Loader) SetupFlags(flagset *flag.FlagSet) {
flagset.StringVar(&ldr.Path, "config", arvados.DefaultConfigFile, "Site configuration `file` (default may be overridden by setting an ARVADOS_CONFIG environment variable)")
flagset.StringVar(&ldr.KeepstorePath, "legacy-keepstore-config", defaultKeepstoreConfigPath, "Legacy keepstore configuration `file`")
+ flagset.StringVar(&ldr.CrunchDispatchSlurmPath, "legacy-crunch-dispatch-slurm-config", defaultCrunchDispatchSlurmConfigPath, "Legacy crunch-dispatch-slurm configuration `file`")
}
// MungeLegacyConfigArgs checks args for a -config flag whose argument
}
for _, err := range []error{
ldr.loadOldKeepstoreConfig(&cfg),
+ ldr.loadOldCrunchDispatchSlurmConfig(&cfg),
} {
if err != nil {
return nil, err
type ContainersConfig struct {
CloudVMs CloudVMsConfig
+ CrunchRunCommand string
+ CrunchRunArgumentsList []string
DefaultKeepCacheRAM ByteSize
DispatchPrivateKey string
LogReuseDecisions bool
MaxComputeVMs int
MaxDispatchAttempts int
MaxRetryAttempts int
+ MinRetryPeriod Duration
+ ReserveExtraRAM ByteSize
StaleLockTimeout Duration
SupportedDockerImageFormats []string
UsePreemptibleInstances bool
LogUpdateSize ByteSize
}
SLURM struct {
- Managed struct {
+ PrioritySpread int64
+ SbatchArgumentsList []string
+ Managed struct {
DNSServerConfDir string
DNSServerConfTemplate string
DNSServerReloadCommand string
Logger Logger
// Batch size for container queries
- BatchSize int64
+ BatchSize int
// Queue polling frequency
PollPeriod time.Duration
"git.curoverse.com/arvados.git/sdk/go/dispatch"
"github.com/coreos/go-systemd/daemon"
"github.com/sirupsen/logrus"
+ "gopkg.in/yaml.v2"
)
type logger interface {
const initialNiceValue int64 = 10000
var (
- version = "dev"
- defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+ version = "dev"
)
type Dispatcher struct {
// configure() loads config files. Tests skip this.
func (disp *Dispatcher) configure(prog string, args []string) error {
+ if disp.logger == nil {
+ disp.logger = logrus.StandardLogger()
+ }
flags := flag.NewFlagSet(prog, flag.ExitOnError)
flags.Usage = func() { usage(flags) }
- loader := config.NewLoader(stdin, log)
+ loader := config.NewLoader(nil, disp.logger)
loader.SetupFlags(flags)
- configPath := flags.String(
- "config",
- defaultConfigPath,
- "`path` to JSON or YAML configuration file")
dumpConfig := flag.Bool(
"dump-config",
false,
false,
"Print version information and exit.")
- args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-crunch-dispatch-slurm-config")
+ args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-legacy-crunch-dispatch-slurm-config")
// Parse args; omit the first arg which is the command name
- flags.Parse(args)
+ err := flags.Parse(args)
if err == flag.ErrHelp {
return nil
return fmt.Errorf("config error: %s", err)
}
- disp.Client.APIHost = fmt.Sprintf("%s:%d", disp.cluster.Services.Controller.ExternalURL.Host,
- disp.cluster.Services.Controller.ExternalURL.Port)
+ disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
disp.Client.AuthToken = disp.cluster.SystemRootToken
disp.Client.Insecure = disp.cluster.TLS.Insecure
}
if *dumpConfig {
- return config.DumpAndExit(cfg)
+ out, err := yaml.Marshal(cfg)
+ if err != nil {
+ return err
+ }
+ _, err = os.Stdout.Write(out)
+ if err != nil {
+ return err
+ }
}
return nil
// setup() initializes private fields after configure().
func (disp *Dispatcher) setup() {
- if disp.logger == nil {
- disp.logger = logrus.StandardLogger()
- }
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
disp.logger.Fatalf("Error making Arvados client: %v", err)
disp.slurm = NewSlurmCLI()
disp.sqCheck = &SqueueChecker{
Logger: disp.logger,
- Period: time.Duration(disp.PollPeriod),
- PrioritySpread: disp.PrioritySpread,
+ Period: time.Duration(disp.cluster.Containers.CloudVMs.PollInterval),
+ PrioritySpread: disp.cluster.Containers.SLURM.PrioritySpread,
Slurm: disp.slurm,
}
disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
Logger: disp.logger,
- BatchSize: disp.BatchSize,
+ BatchSize: disp.cluster.API.MaxItemsPerResponse,
RunContainer: disp.runContainer,
- PollPeriod: time.Duration(disp.PollPeriod),
- MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+ PollPeriod: time.Duration(disp.cluster.Containers.CloudVMs.PollInterval),
+ MinRetryPeriod: time.Duration(disp.cluster.Containers.MinRetryPeriod),
}
}
}
func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []string {
- mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
+ mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+ container.RuntimeConstraints.KeepCacheRAM+
+ int64(disp.cluster.Containers.ReserveExtraRAM)) / float64(1048576)))
disk := dispatchcloud.EstimateScratchSpace(&container)
disk = int64(math.Ceil(float64(disk) / float64(1048576)))
func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
var args []string
- args = append(args, disp.SbatchArguments...)
+ args = append(args, disp.cluster.Containers.SLURM.SbatchArgumentsList...)
args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue), "--no-requeue")
if disp.cluster == nil {
if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
- if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
+ cmd := []string{disp.cluster.Containers.CrunchRunCommand}
+ cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
+ if err := disp.submit(ctr, cmd); err != nil {
var text string
if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
var logBuf bytes.Buffer
time.Sleep(time.Second)
}
}
-
-func (disp *Dispatcher) readConfig(path string) error {
- err := config.LoadFile(disp, path)
- if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
- log.Printf("Config not specified. Continue with default configuration.")
- err = nil
- }
- return err
-}
"fmt"
"io"
"io/ioutil"
+ "log"
"net/http"
"net/http/httptest"
"os"
arvadostest.StartAPI()
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
s.disp = Dispatcher{}
+ s.disp.cluster = &arvados.Cluster{}
s.disp.setup()
s.slurm = slurmFake{}
}
c.Check(err, IsNil)
c.Assert(len(containers.Items), Equals, 1)
- s.disp.CrunchRunCommand = []string{"echo"}
+ s.disp.cluster.Containers.CrunchRunCommand = "echo"
ctx, cancel := context.WithCancel(context.Background())
doneRun := make(chan struct{})
func (s *StubbedSuite) SetUpTest(c *C) {
s.disp = Dispatcher{}
+ s.disp.cluster = &arvados.Cluster{}
s.disp.setup()
}
logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
defer logrus.SetOutput(os.Stderr)
- s.disp.CrunchRunCommand = []string{crunchCmd}
+ s.disp.cluster.Containers.CrunchRunCommand = "crunchCmd"
ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
-func (s *StubbedSuite) TestNoSuchConfigFile(c *C) {
- err := s.disp.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
- c.Assert(err, NotNil)
-}
-
-func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) {
- tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
- c.Check(err, IsNil)
- defer os.Remove(tmpfile.Name())
-
- _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
- c.Check(err, IsNil)
-
- err = s.disp.readConfig(tmpfile.Name())
- c.Assert(err, NotNil)
-}
-
-func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) {
- tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
- c.Check(err, IsNil)
- defer os.Remove(tmpfile.Name())
-
- _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
- c.Check(err, IsNil)
-
- err = s.disp.readConfig(tmpfile.Name())
- c.Assert(err, IsNil)
- c.Check(0, Equals, len(s.disp.SbatchArguments))
-}
-
-func (s *StubbedSuite) TestReadConfig(c *C) {
- tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
- c.Check(err, IsNil)
- defer os.Remove(tmpfile.Name())
-
- args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
- argsS := `{"SbatchArguments": ["--arg1=v1", "--arg2", "--arg3=v3"]}`
- _, err = tmpfile.Write([]byte(argsS))
- c.Check(err, IsNil)
-
- err = s.disp.readConfig(tmpfile.Name())
- c.Assert(err, IsNil)
- c.Check(args, DeepEquals, s.disp.SbatchArguments)
-}
-
func (s *StubbedSuite) TestSbatchArgs(c *C) {
container := arvados.Container{
UUID: "123",
{"--arg1=v1", "--arg2"},
} {
c.Logf("%#v", defaults)
- s.disp.SbatchArguments = defaults
+ s.disp.cluster.Containers.SLURM.SbatchArgumentsList = defaults
args, err := s.disp.sbatchArgs(container)
c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--no-requeue", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
})
c.Check(err, IsNil)
}
+
+func (s *StubbedSuite) TestLoadLegacyConfig(c *C) {
+ content := []byte(`
+Client:
+ APIHost: example.com
+ APIToken: abcdefg
+SbatchArguments: ["--foo", "bar"]
+PollPeriod: 12s
+PrioritySpread: 42
+CrunchRunCommand: ["x-crunch-run", "--cgroup-parent-subsystem=memory"]
+ReserveExtraRAM: 12345
+MinRetryPeriod: 13s
+BatchSize: 99
+`)
+ tmpfile, err := ioutil.TempFile("", "example")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ defer os.Remove(tmpfile.Name()) // clean up
+
+ if _, err := tmpfile.Write(content); err != nil {
+ log.Fatal(err)
+ }
+ if err := tmpfile.Close(); err != nil {
+ log.Fatal(err)
+
+ }
+ err = s.disp.configure("crunch-dispatch-slurm", []string{"-config", tmpfile.Name()})
+ c.Check(err, IsNil)
+
+ c.Check(s.disp.cluster.Services.Controller.ExternalURL, Equals, arvados.URL{Scheme: "https", Host: "example.com"})
+ c.Check(s.disp.cluster.Containers.SLURM.SbatchArgumentsList, DeepEquals, []string{"--foo", "bar"})
+ c.Check(s.disp.cluster.Containers.CloudVMs.PollInterval, Equals, arvados.Duration(12*time.Second))
+ c.Check(s.disp.cluster.Containers.SLURM.PrioritySpread, Equals, int64(42))
+ c.Check(s.disp.cluster.Containers.CrunchRunCommand, Equals, "x-crunch-run")
+ c.Check(s.disp.cluster.Containers.CrunchRunArgumentsList, DeepEquals, []string{"--cgroup-parent-subsystem=memory"})
+ c.Check(s.disp.cluster.Containers.ReserveExtraRAM, Equals, arvados.ByteSize(12345))
+ c.Check(s.disp.cluster.Containers.MinRetryPeriod, Equals, arvados.Duration(13*time.Second))
+ c.Check(s.disp.cluster.API.MaxItemsPerResponse, Equals, 99)
+}