From 22422ab1e539977ca730aedd46b4cd919e73d05e Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Wed, 17 Jul 2019 16:54:01 -0400 Subject: [PATCH] 14713: Migrate old crunch-dispatch-slurm config to new config Update code to use new config. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- lib/config/deprecated.go | 94 ++++++++++++++++-- lib/config/export.go | 4 + lib/config/load.go | 7 +- sdk/go/arvados/config.go | 8 +- sdk/go/dispatch/dispatch.go | 2 +- .../crunch-dispatch-slurm.go | 61 ++++++------ .../crunch-dispatch-slurm_test.go | 95 +++++++++---------- 7 files changed, 178 insertions(+), 93 deletions(-) diff --git a/lib/config/deprecated.go b/lib/config/deprecated.go index 0b0bb26689..614f5dcf97 100644 --- a/lib/config/deprecated.go +++ b/lib/config/deprecated.go @@ -108,29 +108,37 @@ type oldKeepstoreConfig struct { Debug *bool } -// update config using values from an old-style keepstore config file. -func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { - path := ldr.KeepstorePath +func (ldr *Loader) loadOldConfigHelper(component, path, defaultPath string, target interface{}) error { if path == "" { return nil } buf, err := ioutil.ReadFile(path) - if os.IsNotExist(err) && path == defaultKeepstoreConfigPath { + if os.IsNotExist(err) && path == defaultPath { return nil } else if err != nil { return err } else { - ldr.Logger.Warnf("you should remove the legacy keepstore config file (%s) after migrating all config keys to the cluster configuration file (%s)", path, ldr.Path) + ldr.Logger.Warnf("you should remove the legacy %v config file (%s) after migrating all config keys to the cluster configuration file (%s)", component, path, ldr.Path) } - cluster, err := cfg.GetCluster("") + + err = yaml.Unmarshal(buf, target) if err != nil { - return err + return fmt.Errorf("%s: %s", path, err) } + return nil +} +// update config using values from an old-style keepstore config file. +func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { var oc oldKeepstoreConfig - err = yaml.Unmarshal(buf, &oc) + err := ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, defaultKeepstoreConfigPath, &oc) if err != nil { - return fmt.Errorf("%s: %s", path, err) + return err + } + + cluster, err := cfg.GetCluster("") + if err != nil { + return err } if v := oc.Debug; v == nil { @@ -143,3 +151,71 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error { cfg.Clusters[cluster.ClusterID] = *cluster return nil } + +type oldCrunchDispatchSlurmConfig struct { + Client arvados.Client + + SbatchArguments []string + PollPeriod arvados.Duration + PrioritySpread int64 + + // crunch-run command to invoke. The container UUID will be + // appended. If nil, []string{"crunch-run"} will be used. + // + // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"} + CrunchRunCommand []string + + // Extra RAM to reserve (in Bytes) for SLURM job, in addition + // to the amount specified in the container's RuntimeConstraints + ReserveExtraRAM int64 + + // Minimum time between two attempts to run the same container + MinRetryPeriod arvados.Duration + + // Batch size for container queries + BatchSize int64 +} + +const defaultCrunchDispatchSlurmConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml" + +// update config using values from an crunch-dispatch-slurm config file. +func (ldr *Loader) loadOldCrunchDispatchSlurmConfig(cfg *arvados.Config) error { + var oc oldCrunchDispatchSlurmConfig + err := ldr.loadOldConfigHelper("crunch-dispatch-slurm", ldr.CrunchDispatchSlurmPath, defaultCrunchDispatchSlurmConfigPath, &oc) + if err != nil { + return err + } + + cluster, err := cfg.GetCluster("") + if err != nil { + return err + } + + u := arvados.URL{} + u.Host = oc.Client.APIHost + if oc.Client.Scheme != "" { + u.Scheme = oc.Client.Scheme + } else { + u.Scheme = "https" + } + cluster.Services.Controller.ExternalURL = u + cluster.SystemRootToken = oc.Client.AuthToken + cluster.TLS.Insecure = oc.Client.Insecure + + cluster.Containers.SLURM.SbatchArgumentsList = oc.SbatchArguments + cluster.Containers.CloudVMs.PollInterval = oc.PollPeriod + cluster.Containers.SLURM.PrioritySpread = oc.PrioritySpread + if len(oc.CrunchRunCommand) >= 1 { + cluster.Containers.CrunchRunCommand = oc.CrunchRunCommand[0] + } + if len(oc.CrunchRunCommand) >= 2 { + cluster.Containers.CrunchRunArgumentsList = oc.CrunchRunCommand[1:] + } + cluster.Containers.ReserveExtraRAM = arvados.ByteSize(oc.ReserveExtraRAM) + cluster.Containers.MinRetryPeriod = oc.MinRetryPeriod + + cluster.API.MaxItemsPerResponse = int(oc.BatchSize) + + cfg.Clusters[cluster.ClusterID] = *cluster + return nil +} diff --git a/lib/config/export.go b/lib/config/export.go index b79dec4d9d..a050492b3e 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -83,6 +83,8 @@ var whitelist = map[string]bool{ "Collections.TrustAllContent": false, "Containers": true, "Containers.CloudVMs": false, + "Containers.CrunchRunCommand": false, + "Containers.CrunchRunArgumentsList": false, "Containers.DefaultKeepCacheRAM": true, "Containers.DispatchPrivateKey": false, "Containers.JobsAPI": true, @@ -98,6 +100,8 @@ var whitelist = map[string]bool{ "Containers.MaxComputeVMs": false, "Containers.MaxDispatchAttempts": false, "Containers.MaxRetryAttempts": true, + "Containers.MinRetryPeriod": true, + "Containers.ReserveExtraRam": true, "Containers.SLURM": false, "Containers.StaleLockTimeout": false, "Containers.SupportedDockerImageFormats": true, diff --git a/lib/config/load.go b/lib/config/load.go index 168c1aa22a..bce57d7598 100644 --- a/lib/config/load.go +++ b/lib/config/load.go @@ -28,8 +28,9 @@ type Loader struct { Logger logrus.FieldLogger SkipDeprecated bool // Don't load legacy/deprecated config keys/files - Path string - KeepstorePath string + Path string + KeepstorePath string + CrunchDispatchSlurmPath string configdata []byte } @@ -57,6 +58,7 @@ func NewLoader(stdin io.Reader, logger logrus.FieldLogger) *Loader { func (ldr *Loader) SetupFlags(flagset *flag.FlagSet) { flagset.StringVar(&ldr.Path, "config", arvados.DefaultConfigFile, "Site configuration `file` (default may be overridden by setting an ARVADOS_CONFIG environment variable)") flagset.StringVar(&ldr.KeepstorePath, "legacy-keepstore-config", defaultKeepstoreConfigPath, "Legacy keepstore configuration `file`") + flagset.StringVar(&ldr.CrunchDispatchSlurmPath, "legacy-crunch-dispatch-slurm-config", defaultCrunchDispatchSlurmConfigPath, "Legacy crunch-dispatch-slurm configuration `file`") } // MungeLegacyConfigArgs checks args for a -config flag whose argument @@ -205,6 +207,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) { } for _, err := range []error{ ldr.loadOldKeepstoreConfig(&cfg), + ldr.loadOldCrunchDispatchSlurmConfig(&cfg), } { if err != nil { return nil, err diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index c8206c7da4..12ec8e6b26 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -253,12 +253,16 @@ type InstanceType struct { type ContainersConfig struct { CloudVMs CloudVMsConfig + CrunchRunCommand string + CrunchRunArgumentsList []string DefaultKeepCacheRAM ByteSize DispatchPrivateKey string LogReuseDecisions bool MaxComputeVMs int MaxDispatchAttempts int MaxRetryAttempts int + MinRetryPeriod Duration + ReserveExtraRAM ByteSize StaleLockTimeout Duration SupportedDockerImageFormats []string UsePreemptibleInstances bool @@ -285,7 +289,9 @@ type ContainersConfig struct { LogUpdateSize ByteSize } SLURM struct { - Managed struct { + PrioritySpread int64 + SbatchArgumentsList []string + Managed struct { DNSServerConfDir string DNSServerConfTemplate string DNSServerReloadCommand string diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go index fdb52e510b..587c9999c4 100644 --- a/sdk/go/dispatch/dispatch.go +++ b/sdk/go/dispatch/dispatch.go @@ -38,7 +38,7 @@ type Dispatcher struct { Logger Logger // Batch size for container queries - BatchSize int64 + BatchSize int // Queue polling frequency PollPeriod time.Duration diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 09e3d591a8..1a7ad6fac7 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -25,6 +25,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/dispatch" "github.com/coreos/go-systemd/daemon" "github.com/sirupsen/logrus" + "gopkg.in/yaml.v2" ) type logger interface { @@ -35,8 +36,7 @@ type logger interface { const initialNiceValue int64 = 10000 var ( - version = "dev" - defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml" + version = "dev" ) type Dispatcher struct { @@ -74,16 +74,15 @@ func (disp *Dispatcher) Run(prog string, args []string) error { // configure() loads config files. Tests skip this. func (disp *Dispatcher) configure(prog string, args []string) error { + if disp.logger == nil { + disp.logger = logrus.StandardLogger() + } flags := flag.NewFlagSet(prog, flag.ExitOnError) flags.Usage = func() { usage(flags) } - loader := config.NewLoader(stdin, log) + loader := config.NewLoader(nil, disp.logger) loader.SetupFlags(flags) - configPath := flags.String( - "config", - defaultConfigPath, - "`path` to JSON or YAML configuration file") dumpConfig := flag.Bool( "dump-config", false, @@ -93,10 +92,10 @@ func (disp *Dispatcher) configure(prog string, args []string) error { false, "Print version information and exit.") - args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-crunch-dispatch-slurm-config") + args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-legacy-crunch-dispatch-slurm-config") // Parse args; omit the first arg which is the command name - flags.Parse(args) + err := flags.Parse(args) if err == flag.ErrHelp { return nil @@ -119,8 +118,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error { return fmt.Errorf("config error: %s", err) } - disp.Client.APIHost = fmt.Sprintf("%s:%d", disp.cluster.Services.Controller.ExternalURL.Host, - disp.cluster.Services.Controller.ExternalURL.Port) + disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host disp.Client.AuthToken = disp.cluster.SystemRootToken disp.Client.Insecure = disp.cluster.TLS.Insecure @@ -141,7 +139,14 @@ func (disp *Dispatcher) configure(prog string, args []string) error { } if *dumpConfig { - return config.DumpAndExit(cfg) + out, err := yaml.Marshal(cfg) + if err != nil { + return err + } + _, err = os.Stdout.Write(out) + if err != nil { + return err + } } return nil @@ -149,9 +154,6 @@ func (disp *Dispatcher) configure(prog string, args []string) error { // setup() initializes private fields after configure(). func (disp *Dispatcher) setup() { - if disp.logger == nil { - disp.logger = logrus.StandardLogger() - } arv, err := arvadosclient.MakeArvadosClient() if err != nil { disp.logger.Fatalf("Error making Arvados client: %v", err) @@ -161,17 +163,17 @@ func (disp *Dispatcher) setup() { disp.slurm = NewSlurmCLI() disp.sqCheck = &SqueueChecker{ Logger: disp.logger, - Period: time.Duration(disp.PollPeriod), - PrioritySpread: disp.PrioritySpread, + Period: time.Duration(disp.cluster.Containers.CloudVMs.PollInterval), + PrioritySpread: disp.cluster.Containers.SLURM.PrioritySpread, Slurm: disp.slurm, } disp.Dispatcher = &dispatch.Dispatcher{ Arv: arv, Logger: disp.logger, - BatchSize: disp.BatchSize, + BatchSize: disp.cluster.API.MaxItemsPerResponse, RunContainer: disp.runContainer, - PollPeriod: time.Duration(disp.PollPeriod), - MinRetryPeriod: time.Duration(disp.MinRetryPeriod), + PollPeriod: time.Duration(disp.cluster.Containers.CloudVMs.PollInterval), + MinRetryPeriod: time.Duration(disp.cluster.Containers.MinRetryPeriod), } } @@ -209,7 +211,9 @@ func (disp *Dispatcher) checkSqueueForOrphans() { } func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []string { - mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576))) + mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+ + container.RuntimeConstraints.KeepCacheRAM+ + int64(disp.cluster.Containers.ReserveExtraRAM)) / float64(1048576))) disk := dispatchcloud.EstimateScratchSpace(&container) disk = int64(math.Ceil(float64(disk) / float64(1048576))) @@ -222,7 +226,7 @@ func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []strin func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) { var args []string - args = append(args, disp.SbatchArguments...) + args = append(args, disp.cluster.Containers.SLURM.SbatchArgumentsList...) args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue), "--no-requeue") if disp.cluster == nil { @@ -270,7 +274,9 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) { log.Printf("Submitting container %s to slurm", ctr.UUID) - if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil { + cmd := []string{disp.cluster.Containers.CrunchRunCommand} + cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...) + if err := disp.submit(ctr, cmd); err != nil { var text string if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok { var logBuf bytes.Buffer @@ -361,12 +367,3 @@ func (disp *Dispatcher) scancel(ctr arvados.Container) { time.Sleep(time.Second) } } - -func (disp *Dispatcher) readConfig(path string) error { - err := config.LoadFile(disp, path) - if err != nil && os.IsNotExist(err) && path == defaultConfigPath { - log.Printf("Config not specified. Continue with default configuration.") - err = nil - } - return err -} diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go index eea102012b..ca3944d76e 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "io/ioutil" + "log" "net/http" "net/http/httptest" "os" @@ -45,6 +46,7 @@ func (s *IntegrationSuite) SetUpTest(c *C) { arvadostest.StartAPI() os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token) s.disp = Dispatcher{} + s.disp.cluster = &arvados.Cluster{} s.disp.setup() s.slurm = slurmFake{} } @@ -118,7 +120,7 @@ func (s *IntegrationSuite) integrationTest(c *C, c.Check(err, IsNil) c.Assert(len(containers.Items), Equals, 1) - s.disp.CrunchRunCommand = []string{"echo"} + s.disp.cluster.Containers.CrunchRunCommand = "echo" ctx, cancel := context.WithCancel(context.Background()) doneRun := make(chan struct{}) @@ -243,6 +245,7 @@ type StubbedSuite struct { func (s *StubbedSuite) SetUpTest(c *C) { s.disp = Dispatcher{} + s.disp.cluster = &arvados.Cluster{} s.disp.setup() } @@ -272,7 +275,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva logrus.SetOutput(io.MultiWriter(buf, os.Stderr)) defer logrus.SetOutput(os.Stderr) - s.disp.CrunchRunCommand = []string{crunchCmd} + s.disp.cluster.Containers.CrunchRunCommand = "crunchCmd" ctx, cancel := context.WithCancel(context.Background()) dispatcher := dispatch.Dispatcher{ @@ -302,51 +305,6 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`) } -func (s *StubbedSuite) TestNoSuchConfigFile(c *C) { - err := s.disp.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7") - c.Assert(err, NotNil) -} - -func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) { - tmpfile, err := ioutil.TempFile(os.TempDir(), "config") - c.Check(err, IsNil) - defer os.Remove(tmpfile.Name()) - - _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`)) - c.Check(err, IsNil) - - err = s.disp.readConfig(tmpfile.Name()) - c.Assert(err, NotNil) -} - -func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) { - tmpfile, err := ioutil.TempFile(os.TempDir(), "config") - c.Check(err, IsNil) - defer os.Remove(tmpfile.Name()) - - _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`)) - c.Check(err, IsNil) - - err = s.disp.readConfig(tmpfile.Name()) - c.Assert(err, IsNil) - c.Check(0, Equals, len(s.disp.SbatchArguments)) -} - -func (s *StubbedSuite) TestReadConfig(c *C) { - tmpfile, err := ioutil.TempFile(os.TempDir(), "config") - c.Check(err, IsNil) - defer os.Remove(tmpfile.Name()) - - args := []string{"--arg1=v1", "--arg2", "--arg3=v3"} - argsS := `{"SbatchArguments": ["--arg1=v1", "--arg2", "--arg3=v3"]}` - _, err = tmpfile.Write([]byte(argsS)) - c.Check(err, IsNil) - - err = s.disp.readConfig(tmpfile.Name()) - c.Assert(err, IsNil) - c.Check(args, DeepEquals, s.disp.SbatchArguments) -} - func (s *StubbedSuite) TestSbatchArgs(c *C) { container := arvados.Container{ UUID: "123", @@ -360,7 +318,7 @@ func (s *StubbedSuite) TestSbatchArgs(c *C) { {"--arg1=v1", "--arg2"}, } { c.Logf("%#v", defaults) - s.disp.SbatchArguments = defaults + s.disp.cluster.Containers.SLURM.SbatchArgumentsList = defaults args, err := s.disp.sbatchArgs(container) c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--no-requeue", "--mem=239", "--cpus-per-task=2", "--tmp=0")) @@ -432,3 +390,44 @@ func (s *StubbedSuite) TestSbatchPartition(c *C) { }) c.Check(err, IsNil) } + +func (s *StubbedSuite) TestLoadLegacyConfig(c *C) { + content := []byte(` +Client: + APIHost: example.com + APIToken: abcdefg +SbatchArguments: ["--foo", "bar"] +PollPeriod: 12s +PrioritySpread: 42 +CrunchRunCommand: ["x-crunch-run", "--cgroup-parent-subsystem=memory"] +ReserveExtraRAM: 12345 +MinRetryPeriod: 13s +BatchSize: 99 +`) + tmpfile, err := ioutil.TempFile("", "example") + if err != nil { + log.Fatal(err) + } + + defer os.Remove(tmpfile.Name()) // clean up + + if _, err := tmpfile.Write(content); err != nil { + log.Fatal(err) + } + if err := tmpfile.Close(); err != nil { + log.Fatal(err) + + } + err = s.disp.configure("crunch-dispatch-slurm", []string{"-config", tmpfile.Name()}) + c.Check(err, IsNil) + + c.Check(s.disp.cluster.Services.Controller.ExternalURL, Equals, arvados.URL{Scheme: "https", Host: "example.com"}) + c.Check(s.disp.cluster.Containers.SLURM.SbatchArgumentsList, DeepEquals, []string{"--foo", "bar"}) + c.Check(s.disp.cluster.Containers.CloudVMs.PollInterval, Equals, arvados.Duration(12*time.Second)) + c.Check(s.disp.cluster.Containers.SLURM.PrioritySpread, Equals, int64(42)) + c.Check(s.disp.cluster.Containers.CrunchRunCommand, Equals, "x-crunch-run") + c.Check(s.disp.cluster.Containers.CrunchRunArgumentsList, DeepEquals, []string{"--cgroup-parent-subsystem=memory"}) + c.Check(s.disp.cluster.Containers.ReserveExtraRAM, Equals, arvados.ByteSize(12345)) + c.Check(s.disp.cluster.Containers.MinRetryPeriod, Equals, arvados.Duration(13*time.Second)) + c.Check(s.disp.cluster.API.MaxItemsPerResponse, Equals, 99) +} -- 2.30.2