14713: Migrate old crunch-dispatch-slurm config to new config
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Wed, 17 Jul 2019 20:54:01 +0000 (16:54 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 25 Jul 2019 13:34:09 +0000 (09:34 -0400)
Update code to use new config.

Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

lib/config/deprecated.go
lib/config/export.go
lib/config/load.go
sdk/go/arvados/config.go
sdk/go/dispatch/dispatch.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index 0b0bb26689902af4fb8c2b668e75cc4cc7e00981..614f5dcf97767851f3a6ae90408ab2a4309e79d5 100644 (file)
@@ -108,29 +108,37 @@ type oldKeepstoreConfig struct {
        Debug *bool
 }
 
-// update config using values from an old-style keepstore config file.
-func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
-       path := ldr.KeepstorePath
+func (ldr *Loader) loadOldConfigHelper(component, path, defaultPath string, target interface{}) error {
        if path == "" {
                return nil
        }
        buf, err := ioutil.ReadFile(path)
-       if os.IsNotExist(err) && path == defaultKeepstoreConfigPath {
+       if os.IsNotExist(err) && path == defaultPath {
                return nil
        } else if err != nil {
                return err
        } else {
-               ldr.Logger.Warnf("you should remove the legacy keepstore config file (%s) after migrating all config keys to the cluster configuration file (%s)", path, ldr.Path)
+               ldr.Logger.Warnf("you should remove the legacy %v config file (%s) after migrating all config keys to the cluster configuration file (%s)", component, path, ldr.Path)
        }
-       cluster, err := cfg.GetCluster("")
+
+       err = yaml.Unmarshal(buf, target)
        if err != nil {
-               return err
+               return fmt.Errorf("%s: %s", path, err)
        }
+       return nil
+}
 
+// update config using values from an old-style keepstore config file.
+func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
        var oc oldKeepstoreConfig
-       err = yaml.Unmarshal(buf, &oc)
+       err := ldr.loadOldConfigHelper("keepstore", ldr.KeepstorePath, defaultKeepstoreConfigPath, &oc)
        if err != nil {
-               return fmt.Errorf("%s: %s", path, err)
+               return err
+       }
+
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               return err
        }
 
        if v := oc.Debug; v == nil {
@@ -143,3 +151,71 @@ func (ldr *Loader) loadOldKeepstoreConfig(cfg *arvados.Config) error {
        cfg.Clusters[cluster.ClusterID] = *cluster
        return nil
 }
+
+type oldCrunchDispatchSlurmConfig struct {
+       Client arvados.Client
+
+       SbatchArguments []string
+       PollPeriod      arvados.Duration
+       PrioritySpread  int64
+
+       // crunch-run command to invoke. The container UUID will be
+       // appended. If nil, []string{"crunch-run"} will be used.
+       //
+       // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
+       CrunchRunCommand []string
+
+       // Extra RAM to reserve (in Bytes) for SLURM job, in addition
+       // to the amount specified in the container's RuntimeConstraints
+       ReserveExtraRAM int64
+
+       // Minimum time between two attempts to run the same container
+       MinRetryPeriod arvados.Duration
+
+       // Batch size for container queries
+       BatchSize int64
+}
+
+const defaultCrunchDispatchSlurmConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+
+// update config using values from an crunch-dispatch-slurm config file.
+func (ldr *Loader) loadOldCrunchDispatchSlurmConfig(cfg *arvados.Config) error {
+       var oc oldCrunchDispatchSlurmConfig
+       err := ldr.loadOldConfigHelper("crunch-dispatch-slurm", ldr.CrunchDispatchSlurmPath, defaultCrunchDispatchSlurmConfigPath, &oc)
+       if err != nil {
+               return err
+       }
+
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               return err
+       }
+
+       u := arvados.URL{}
+       u.Host = oc.Client.APIHost
+       if oc.Client.Scheme != "" {
+               u.Scheme = oc.Client.Scheme
+       } else {
+               u.Scheme = "https"
+       }
+       cluster.Services.Controller.ExternalURL = u
+       cluster.SystemRootToken = oc.Client.AuthToken
+       cluster.TLS.Insecure = oc.Client.Insecure
+
+       cluster.Containers.SLURM.SbatchArgumentsList = oc.SbatchArguments
+       cluster.Containers.CloudVMs.PollInterval = oc.PollPeriod
+       cluster.Containers.SLURM.PrioritySpread = oc.PrioritySpread
+       if len(oc.CrunchRunCommand) >= 1 {
+               cluster.Containers.CrunchRunCommand = oc.CrunchRunCommand[0]
+       }
+       if len(oc.CrunchRunCommand) >= 2 {
+               cluster.Containers.CrunchRunArgumentsList = oc.CrunchRunCommand[1:]
+       }
+       cluster.Containers.ReserveExtraRAM = arvados.ByteSize(oc.ReserveExtraRAM)
+       cluster.Containers.MinRetryPeriod = oc.MinRetryPeriod
+
+       cluster.API.MaxItemsPerResponse = int(oc.BatchSize)
+
+       cfg.Clusters[cluster.ClusterID] = *cluster
+       return nil
+}
index b79dec4d9d1532b1f348965e5c657e71df21704e..a050492b3e422706961c5b0920c51c5f2835c49b 100644 (file)
@@ -83,6 +83,8 @@ var whitelist = map[string]bool{
        "Collections.TrustAllContent":                  false,
        "Containers":                                   true,
        "Containers.CloudVMs":                          false,
+       "Containers.CrunchRunCommand":                  false,
+       "Containers.CrunchRunArgumentsList":            false,
        "Containers.DefaultKeepCacheRAM":               true,
        "Containers.DispatchPrivateKey":                false,
        "Containers.JobsAPI":                           true,
@@ -98,6 +100,8 @@ var whitelist = map[string]bool{
        "Containers.MaxComputeVMs":                     false,
        "Containers.MaxDispatchAttempts":               false,
        "Containers.MaxRetryAttempts":                  true,
+       "Containers.MinRetryPeriod":                    true,
+       "Containers.ReserveExtraRam":                   true,
        "Containers.SLURM":                             false,
        "Containers.StaleLockTimeout":                  false,
        "Containers.SupportedDockerImageFormats":       true,
index 168c1aa22a8554ef649cc65463b10b8437970494..bce57d75982c2b5f8c2ef8d1be6b9d43688c1dc0 100644 (file)
@@ -28,8 +28,9 @@ type Loader struct {
        Logger         logrus.FieldLogger
        SkipDeprecated bool // Don't load legacy/deprecated config keys/files
 
-       Path          string
-       KeepstorePath string
+       Path                    string
+       KeepstorePath           string
+       CrunchDispatchSlurmPath string
 
        configdata []byte
 }
@@ -57,6 +58,7 @@ func NewLoader(stdin io.Reader, logger logrus.FieldLogger) *Loader {
 func (ldr *Loader) SetupFlags(flagset *flag.FlagSet) {
        flagset.StringVar(&ldr.Path, "config", arvados.DefaultConfigFile, "Site configuration `file` (default may be overridden by setting an ARVADOS_CONFIG environment variable)")
        flagset.StringVar(&ldr.KeepstorePath, "legacy-keepstore-config", defaultKeepstoreConfigPath, "Legacy keepstore configuration `file`")
+       flagset.StringVar(&ldr.CrunchDispatchSlurmPath, "legacy-crunch-dispatch-slurm-config", defaultCrunchDispatchSlurmConfigPath, "Legacy crunch-dispatch-slurm configuration `file`")
 }
 
 // MungeLegacyConfigArgs checks args for a -config flag whose argument
@@ -205,6 +207,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
                }
                for _, err := range []error{
                        ldr.loadOldKeepstoreConfig(&cfg),
+                       ldr.loadOldCrunchDispatchSlurmConfig(&cfg),
                } {
                        if err != nil {
                                return nil, err
index c8206c7da437c48ff963d563e976cc77cdb4ac3b..12ec8e6b26dc66fd3f4444dfe6a4c62897dbd1f8 100644 (file)
@@ -253,12 +253,16 @@ type InstanceType struct {
 
 type ContainersConfig struct {
        CloudVMs                    CloudVMsConfig
+       CrunchRunCommand            string
+       CrunchRunArgumentsList      []string
        DefaultKeepCacheRAM         ByteSize
        DispatchPrivateKey          string
        LogReuseDecisions           bool
        MaxComputeVMs               int
        MaxDispatchAttempts         int
        MaxRetryAttempts            int
+       MinRetryPeriod              Duration
+       ReserveExtraRAM             ByteSize
        StaleLockTimeout            Duration
        SupportedDockerImageFormats []string
        UsePreemptibleInstances     bool
@@ -285,7 +289,9 @@ type ContainersConfig struct {
                LogUpdateSize                ByteSize
        }
        SLURM struct {
-               Managed struct {
+               PrioritySpread      int64
+               SbatchArgumentsList []string
+               Managed             struct {
                        DNSServerConfDir       string
                        DNSServerConfTemplate  string
                        DNSServerReloadCommand string
index fdb52e510bd34e36ffe7f22b2975fc95bc05bf60..587c9999c4c98d3e68c957ccfd88d6eb130ae5d7 100644 (file)
@@ -38,7 +38,7 @@ type Dispatcher struct {
        Logger Logger
 
        // Batch size for container queries
-       BatchSize int64
+       BatchSize int
 
        // Queue polling frequency
        PollPeriod time.Duration
index 09e3d591a8b5107082556ef1e30227801fc610c8..1a7ad6fac745067fa9da535ba3052321f062e9de 100644 (file)
@@ -25,6 +25,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
        "github.com/coreos/go-systemd/daemon"
        "github.com/sirupsen/logrus"
+       "gopkg.in/yaml.v2"
 )
 
 type logger interface {
@@ -35,8 +36,7 @@ type logger interface {
 const initialNiceValue int64 = 10000
 
 var (
-       version           = "dev"
-       defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+       version = "dev"
 )
 
 type Dispatcher struct {
@@ -74,16 +74,15 @@ func (disp *Dispatcher) Run(prog string, args []string) error {
 
 // configure() loads config files. Tests skip this.
 func (disp *Dispatcher) configure(prog string, args []string) error {
+       if disp.logger == nil {
+               disp.logger = logrus.StandardLogger()
+       }
        flags := flag.NewFlagSet(prog, flag.ExitOnError)
        flags.Usage = func() { usage(flags) }
 
-       loader := config.NewLoader(stdin, log)
+       loader := config.NewLoader(nil, disp.logger)
        loader.SetupFlags(flags)
 
-       configPath := flags.String(
-               "config",
-               defaultConfigPath,
-               "`path` to JSON or YAML configuration file")
        dumpConfig := flag.Bool(
                "dump-config",
                false,
@@ -93,10 +92,10 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
                false,
                "Print version information and exit.")
 
-       args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-crunch-dispatch-slurm-config")
+       args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-legacy-crunch-dispatch-slurm-config")
 
        // Parse args; omit the first arg which is the command name
-       flags.Parse(args)
+       err := flags.Parse(args)
 
        if err == flag.ErrHelp {
                return nil
@@ -119,8 +118,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
                return fmt.Errorf("config error: %s", err)
        }
 
-       disp.Client.APIHost = fmt.Sprintf("%s:%d", disp.cluster.Services.Controller.ExternalURL.Host,
-               disp.cluster.Services.Controller.ExternalURL.Port)
+       disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
        disp.Client.AuthToken = disp.cluster.SystemRootToken
        disp.Client.Insecure = disp.cluster.TLS.Insecure
 
@@ -141,7 +139,14 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
        }
 
        if *dumpConfig {
-               return config.DumpAndExit(cfg)
+               out, err := yaml.Marshal(cfg)
+               if err != nil {
+                       return err
+               }
+               _, err = os.Stdout.Write(out)
+               if err != nil {
+                       return err
+               }
        }
 
        return nil
@@ -149,9 +154,6 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
 
 // setup() initializes private fields after configure().
 func (disp *Dispatcher) setup() {
-       if disp.logger == nil {
-               disp.logger = logrus.StandardLogger()
-       }
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                disp.logger.Fatalf("Error making Arvados client: %v", err)
@@ -161,17 +163,17 @@ func (disp *Dispatcher) setup() {
        disp.slurm = NewSlurmCLI()
        disp.sqCheck = &SqueueChecker{
                Logger:         disp.logger,
-               Period:         time.Duration(disp.PollPeriod),
-               PrioritySpread: disp.PrioritySpread,
+               Period:         time.Duration(disp.cluster.Containers.CloudVMs.PollInterval),
+               PrioritySpread: disp.cluster.Containers.SLURM.PrioritySpread,
                Slurm:          disp.slurm,
        }
        disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
                Logger:         disp.logger,
-               BatchSize:      disp.BatchSize,
+               BatchSize:      disp.cluster.API.MaxItemsPerResponse,
                RunContainer:   disp.runContainer,
-               PollPeriod:     time.Duration(disp.PollPeriod),
-               MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+               PollPeriod:     time.Duration(disp.cluster.Containers.CloudVMs.PollInterval),
+               MinRetryPeriod: time.Duration(disp.cluster.Containers.MinRetryPeriod),
        }
 }
 
@@ -209,7 +211,9 @@ func (disp *Dispatcher) checkSqueueForOrphans() {
 }
 
 func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []string {
-       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
+       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+
+               container.RuntimeConstraints.KeepCacheRAM+
+               int64(disp.cluster.Containers.ReserveExtraRAM)) / float64(1048576)))
 
        disk := dispatchcloud.EstimateScratchSpace(&container)
        disk = int64(math.Ceil(float64(disk) / float64(1048576)))
@@ -222,7 +226,7 @@ func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []strin
 
 func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
        var args []string
-       args = append(args, disp.SbatchArguments...)
+       args = append(args, disp.cluster.Containers.SLURM.SbatchArgumentsList...)
        args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue), "--no-requeue")
 
        if disp.cluster == nil {
@@ -270,7 +274,9 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 
        if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
                log.Printf("Submitting container %s to slurm", ctr.UUID)
-               if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
+               cmd := []string{disp.cluster.Containers.CrunchRunCommand}
+               cmd = append(cmd, disp.cluster.Containers.CrunchRunArgumentsList...)
+               if err := disp.submit(ctr, cmd); err != nil {
                        var text string
                        if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
                                var logBuf bytes.Buffer
@@ -361,12 +367,3 @@ func (disp *Dispatcher) scancel(ctr arvados.Container) {
                time.Sleep(time.Second)
        }
 }
-
-func (disp *Dispatcher) readConfig(path string) error {
-       err := config.LoadFile(disp, path)
-       if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
-               log.Printf("Config not specified. Continue with default configuration.")
-               err = nil
-       }
-       return err
-}
index eea102012befe3c09dbb22a21c5b2a5ad532af4e..ca3944d76e6bc59b2d9df758a0b2a55742e5c069 100644 (file)
@@ -11,6 +11,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "net/http"
        "net/http/httptest"
        "os"
@@ -45,6 +46,7 @@ func (s *IntegrationSuite) SetUpTest(c *C) {
        arvadostest.StartAPI()
        os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
        s.disp = Dispatcher{}
+       s.disp.cluster = &arvados.Cluster{}
        s.disp.setup()
        s.slurm = slurmFake{}
 }
@@ -118,7 +120,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
        c.Check(err, IsNil)
        c.Assert(len(containers.Items), Equals, 1)
 
-       s.disp.CrunchRunCommand = []string{"echo"}
+       s.disp.cluster.Containers.CrunchRunCommand = "echo"
 
        ctx, cancel := context.WithCancel(context.Background())
        doneRun := make(chan struct{})
@@ -243,6 +245,7 @@ type StubbedSuite struct {
 
 func (s *StubbedSuite) SetUpTest(c *C) {
        s.disp = Dispatcher{}
+       s.disp.cluster = &arvados.Cluster{}
        s.disp.setup()
 }
 
@@ -272,7 +275,7 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
        logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
        defer logrus.SetOutput(os.Stderr)
 
-       s.disp.CrunchRunCommand = []string{crunchCmd}
+       s.disp.cluster.Containers.CrunchRunCommand = "crunchCmd"
 
        ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
@@ -302,51 +305,6 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
        c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }
 
-func (s *StubbedSuite) TestNoSuchConfigFile(c *C) {
-       err := s.disp.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
-       c.Assert(err, NotNil)
-}
-
-func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) {
-       tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
-       c.Check(err, IsNil)
-       defer os.Remove(tmpfile.Name())
-
-       _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
-       c.Check(err, IsNil)
-
-       err = s.disp.readConfig(tmpfile.Name())
-       c.Assert(err, NotNil)
-}
-
-func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) {
-       tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
-       c.Check(err, IsNil)
-       defer os.Remove(tmpfile.Name())
-
-       _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
-       c.Check(err, IsNil)
-
-       err = s.disp.readConfig(tmpfile.Name())
-       c.Assert(err, IsNil)
-       c.Check(0, Equals, len(s.disp.SbatchArguments))
-}
-
-func (s *StubbedSuite) TestReadConfig(c *C) {
-       tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
-       c.Check(err, IsNil)
-       defer os.Remove(tmpfile.Name())
-
-       args := []string{"--arg1=v1", "--arg2", "--arg3=v3"}
-       argsS := `{"SbatchArguments": ["--arg1=v1",  "--arg2", "--arg3=v3"]}`
-       _, err = tmpfile.Write([]byte(argsS))
-       c.Check(err, IsNil)
-
-       err = s.disp.readConfig(tmpfile.Name())
-       c.Assert(err, IsNil)
-       c.Check(args, DeepEquals, s.disp.SbatchArguments)
-}
-
 func (s *StubbedSuite) TestSbatchArgs(c *C) {
        container := arvados.Container{
                UUID:               "123",
@@ -360,7 +318,7 @@ func (s *StubbedSuite) TestSbatchArgs(c *C) {
                {"--arg1=v1", "--arg2"},
        } {
                c.Logf("%#v", defaults)
-               s.disp.SbatchArguments = defaults
+               s.disp.cluster.Containers.SLURM.SbatchArgumentsList = defaults
 
                args, err := s.disp.sbatchArgs(container)
                c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--no-requeue", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
@@ -432,3 +390,44 @@ func (s *StubbedSuite) TestSbatchPartition(c *C) {
        })
        c.Check(err, IsNil)
 }
+
+func (s *StubbedSuite) TestLoadLegacyConfig(c *C) {
+       content := []byte(`
+Client:
+  APIHost: example.com
+  APIToken: abcdefg
+SbatchArguments: ["--foo", "bar"]
+PollPeriod: 12s
+PrioritySpread: 42
+CrunchRunCommand: ["x-crunch-run", "--cgroup-parent-subsystem=memory"]
+ReserveExtraRAM: 12345
+MinRetryPeriod: 13s
+BatchSize: 99
+`)
+       tmpfile, err := ioutil.TempFile("", "example")
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       defer os.Remove(tmpfile.Name()) // clean up
+
+       if _, err := tmpfile.Write(content); err != nil {
+               log.Fatal(err)
+       }
+       if err := tmpfile.Close(); err != nil {
+               log.Fatal(err)
+
+       }
+       err = s.disp.configure("crunch-dispatch-slurm", []string{"-config", tmpfile.Name()})
+       c.Check(err, IsNil)
+
+       c.Check(s.disp.cluster.Services.Controller.ExternalURL, Equals, arvados.URL{Scheme: "https", Host: "example.com"})
+       c.Check(s.disp.cluster.Containers.SLURM.SbatchArgumentsList, DeepEquals, []string{"--foo", "bar"})
+       c.Check(s.disp.cluster.Containers.CloudVMs.PollInterval, Equals, arvados.Duration(12*time.Second))
+       c.Check(s.disp.cluster.Containers.SLURM.PrioritySpread, Equals, int64(42))
+       c.Check(s.disp.cluster.Containers.CrunchRunCommand, Equals, "x-crunch-run")
+       c.Check(s.disp.cluster.Containers.CrunchRunArgumentsList, DeepEquals, []string{"--cgroup-parent-subsystem=memory"})
+       c.Check(s.disp.cluster.Containers.ReserveExtraRAM, Equals, arvados.ByteSize(12345))
+       c.Check(s.disp.cluster.Containers.MinRetryPeriod, Equals, arvados.Duration(13*time.Second))
+       c.Check(s.disp.cluster.API.MaxItemsPerResponse, Equals, 99)
+}