From 6b3a880d607ee3e3dd273f019981fd6cae62373c Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 4 Oct 2021 10:55:49 -0400 Subject: [PATCH] 16347: Run a dedicated keepstore process for each container. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/config/config.default.yml | 18 +++ lib/config/generated_config.go | 18 +++ lib/config/load.go | 13 +++ lib/crunchrun/background.go | 22 +++- lib/crunchrun/bufthenwrite.go | 34 ++++++ lib/crunchrun/crunchrun.go | 147 ++++++++++++++++++++---- lib/dispatchcloud/test/stub_driver.go | 11 +- lib/dispatchcloud/worker/pool.go | 2 + lib/dispatchcloud/worker/pool_test.go | 70 ++++++----- lib/dispatchcloud/worker/runner.go | 20 ++-- lib/dispatchcloud/worker/worker_test.go | 21 +++- sdk/go/arvados/config.go | 1 + 12 files changed, 302 insertions(+), 75 deletions(-) create mode 100644 lib/crunchrun/bufthenwrite.go diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 4e2a0e26d4..429b9fe48b 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -911,6 +911,24 @@ Clusters: # Container runtime: "docker" (default) or "singularity" RuntimeEngine: docker + # When running a container, run a dedicated keepstore process, + # using the specified number of 64 MiB memory buffers per + # allocated CPU core (VCPUs in the container's runtime + # constraints). The dedicated keepstore handles I/O for + # collections mounted in the container, as well as saving + # container logs. + # + # A zero value disables this feature. + # + # This feature has security implications. (1) Container logs + # will include keepstore log files, which typically reveal some + # volume configuration details, error messages from the cloud + # storage provider, etc., which are not otherwise visible to + # users. (2) The entire cluster configuration file, including + # the system root token, is copied to the worker node and held + # in memory for the duration of the container. + LocalKeepBlobBuffersPerVCPU: 0 + Logging: # When you run the db:delete_old_container_logs task, it will find # containers that have been finished for at least this many seconds, diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index 875939a3e1..dfa406bbe6 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -917,6 +917,24 @@ Clusters: # Container runtime: "docker" (default) or "singularity" RuntimeEngine: docker + # When running a container, run a dedicated keepstore process, + # using the specified number of 64 MiB memory buffers per + # allocated CPU core (VCPUs in the container's runtime + # constraints). The dedicated keepstore handles I/O for + # collections mounted in the container, as well as saving + # container logs. + # + # A zero value disables this feature. + # + # This feature has security implications. (1) Container logs + # will include keepstore log files, which typically reveal some + # volume configuration details, error messages from the cloud + # storage provider, etc., which are not otherwise visible to + # users. (2) The entire cluster configuration file, including + # the system root token, is copied to the worker node and held + # in memory for the duration of the container. + LocalKeepBlobBuffersPerVCPU: 0 + Logging: # When you run the db:delete_old_container_logs task, it will find # containers that have been finished for at least this many seconds, diff --git a/lib/config/load.go b/lib/config/load.go index 248960beb9..b6375c820f 100644 --- a/lib/config/load.go +++ b/lib/config/load.go @@ -297,6 +297,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) { checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection), ldr.checkEmptyKeepstores(cc), ldr.checkUnlistedKeepstores(cc), + ldr.checkLocalKeepstoreVolumes(cc), ldr.checkStorageClasses(cc), // TODO: check non-empty Rendezvous on // services other than Keepstore @@ -361,6 +362,18 @@ cluster: return nil } +func (ldr *Loader) checkLocalKeepstoreVolumes(cc arvados.Cluster) error { + if cc.Containers.LocalKeepBlobBuffersPerVCPU < 1 { + return nil + } + for _, vol := range cc.Volumes { + if len(vol.AccessViaHosts) == 0 { + return nil + } + } + return fmt.Errorf("LocalKeepBlobBuffersPerVCPU is %d, but no volumes would be accessible from a worker instance", cc.Containers.LocalKeepBlobBuffersPerVCPU) +} + func (ldr *Loader) checkStorageClasses(cc arvados.Cluster) error { classOnVolume := map[string]bool{} for volid, vol := range cc.Volumes { diff --git a/lib/crunchrun/background.go b/lib/crunchrun/background.go index 4bb249380f..07f8f5b885 100644 --- a/lib/crunchrun/background.go +++ b/lib/crunchrun/background.go @@ -36,10 +36,10 @@ type procinfo struct { // // Stdout and stderr in the child process are sent to the systemd // journal using the systemd-cat program. -func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int { - return exitcode(stderr, detach(uuid, prog, args, stdout, stderr)) +func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + return exitcode(stderr, detach(uuid, prog, args, stdin, stdout)) } -func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error { +func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error { lockfile, err := func() (*os.File, error) { // We must hold the dir-level lock between // opening/creating the lockfile and acquiring LOCK_EX @@ -99,10 +99,26 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e // from parent (sshd) while sending lockfile content to // caller. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + // We need to manage our own OS pipe here to ensure the child + // process reads all of our stdin pipe before we return. + piper, pipew, err := os.Pipe() + if err != nil { + return err + } + defer pipew.Close() + cmd.Stdin = piper err = cmd.Start() if err != nil { return fmt.Errorf("exec %s: %s", cmd.Path, err) } + _, err = io.Copy(pipew, stdin) + if err != nil { + return err + } + err = pipew.Close() + if err != nil { + return err + } w := io.MultiWriter(stdout, lockfile) return json.NewEncoder(w).Encode(procinfo{ diff --git a/lib/crunchrun/bufthenwrite.go b/lib/crunchrun/bufthenwrite.go new file mode 100644 index 0000000000..2d1c407161 --- /dev/null +++ b/lib/crunchrun/bufthenwrite.go @@ -0,0 +1,34 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package crunchrun + +import ( + "bytes" + "io" + "sync" +) + +type bufThenWrite struct { + buf bytes.Buffer + w io.Writer + mtx sync.Mutex +} + +func (btw *bufThenWrite) SetWriter(w io.Writer) error { + btw.mtx.Lock() + defer btw.mtx.Unlock() + btw.w = w + _, err := io.Copy(w, &btw.buf) + return err +} + +func (btw *bufThenWrite) Write(p []byte) (int, error) { + btw.mtx.Lock() + defer btw.mtx.Unlock() + if btw.w == nil { + btw.w = &btw.buf + } + return btw.w.Write(p) +} diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 42f143f1cb..7a2afeacca 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -6,6 +6,7 @@ package crunchrun import ( "bytes" + "context" "encoding/json" "errors" "flag" @@ -13,6 +14,8 @@ import ( "io" "io/ioutil" "log" + "net" + "net/http" "os" "os/exec" "os/signal" @@ -33,13 +36,20 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" - "golang.org/x/net/context" ) type command struct{} var Command = command{} +// ConfigData contains environment variables and (when needed) cluster +// configuration, passed from dispatchcloud to crunch-run on stdin. +type ConfigData struct { + Env map[string]string + KeepBuffers int + Cluster *arvados.Cluster +} + // IArvadosClient is the minimal Arvados API methods used by crunch-run. type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error @@ -1644,7 +1654,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") - stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin") + stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") @@ -1674,33 +1684,45 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } - if *stdinEnv && !ignoreDetachFlag { - // Load env vars on stdin if asked (but not in a - // detached child process, in which case stdin is - // /dev/null). - err := loadEnv(os.Stdin) - if err != nil { - log.Print(err) - return 1 - } - } - containerUUID := flags.Arg(0) switch { case *detach && !ignoreDetachFlag: - return Detach(containerUUID, prog, args, os.Stdout, os.Stderr) + return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr) case *kill >= 0: return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr) case *list: return ListProcesses(os.Stdout, os.Stderr) } - if containerUUID == "" { + if len(containerUUID) != 27 { log.Printf("usage: %s [options] UUID", prog) return 1 } + var conf ConfigData + if *stdinConfig { + err := json.NewDecoder(os.Stdin).Decode(&conf) + if err != nil { + log.Print(err) + return 1 + } + for k, v := range conf.Env { + err = os.Setenv(k, v) + if err != nil { + log.Printf("setenv(%q): %s", k, err) + return 1 + } + } + if conf.Cluster != nil { + // ClusterID is missing from the JSON + // representation, but we need it to generate + // a valid config file for keepstore, so we + // fill it using the container UUID prefix. + conf.Cluster.ClusterID = containerUUID[:5] + } + } + log.Printf("crunch-run %s started", cmd.Version.String()) time.Sleep(*sleep) @@ -1708,6 +1730,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } + var keepstoreLog bufThenWrite + keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLog, stderr)) + if err != nil { + log.Print(err) + return 1 + } + if keepstore != nil { + defer keepstore.Process.Kill() + } + api, err := arvadosclient.MakeArvadosClient() if err != nil { log.Printf("%s: %v", containerUUID, err) @@ -1729,6 +1761,19 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + if keepstore != nil { + w, err := cr.NewLogWriter("keepstore") + if err != nil { + log.Print(err) + return 1 + } + err = keepstoreLog.SetWriter(NewThrottledLogger(w)) + if err != nil { + log.Print(err) + return 1 + } + } + switch *runtimeEngine { case "docker": cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval) @@ -1816,21 +1861,73 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 0 } -func loadEnv(rdr io.Reader) error { - buf, err := ioutil.ReadAll(rdr) +func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) { + if configData.Cluster == nil || configData.KeepBuffers < 1 { + return nil, nil + } + + // Rather than have an alternate way to tell keepstore how + // many buffers to use when starting it this way, we just + // modify the cluster configuration that we feed it on stdin. + configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers + + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, err + } + _, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + ln.Close() + return nil, err + } + ln.Close() + url := "http://localhost:" + port + + fmt.Fprintf(logbuf, "starting keepstore on %s\n", url) + + var confJSON bytes.Buffer + err = json.NewEncoder(&confJSON).Encode(arvados.Config{ + Clusters: map[string]arvados.Cluster{ + configData.Cluster.ClusterID: *configData.Cluster, + }, + }) if err != nil { - return fmt.Errorf("read stdin: %s", err) + return nil, err + } + cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-") + cmd.Stdin = &confJSON + cmd.Stdout = logbuf + cmd.Stderr = logbuf + cmd.Env = []string{ + "GOGC=10", + "ARVADOS_SERVICE_INTERNAL_URL=" + url, } - var env map[string]string - err = json.Unmarshal(buf, &env) + err = cmd.Start() if err != nil { - return fmt.Errorf("decode stdin: %s", err) + return nil, fmt.Errorf("error starting keepstore process: %w", err) } - for k, v := range env { - err = os.Setenv(k, v) + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) + defer cancel() + poll := time.NewTicker(time.Second / 10) + defer poll.Stop() + client := http.Client{} + for range poll.C { + testReq, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { - return fmt.Errorf("setenv(%q): %s", k, err) + return nil, err + } + resp, err := client.Do(testReq) + if err == nil { + // Success -- don't need to check the + // response, we just need to know it's + // accepting requests. + resp.Body.Close() + break + } + if ctx.Err() != nil { + return nil, fmt.Errorf("timed out waiting for new keepstore process to accept a request") } } - return nil + os.Setenv("ARVADOS_KEEP_SERVICES", url) + return cmd, nil } diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go index 1b31a71a26..f57db0f09f 100644 --- a/lib/dispatchcloud/test/stub_driver.go +++ b/lib/dispatchcloud/test/stub_driver.go @@ -18,6 +18,7 @@ import ( "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/crunchrun" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" @@ -193,7 +194,7 @@ type StubVM struct { ArvMountDeadlockRate float64 ExecuteContainer func(arvados.Container) int CrashRunningContainer func(arvados.Container) - ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-env " + ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config " sis *StubInstanceSet id cloud.InstanceID @@ -252,15 +253,15 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, fmt.Fprint(stderr, "crunch-run: command not found\n") return 1 } - if strings.HasPrefix(command, "crunch-run --detach --stdin-env "+svm.ExtraCrunchRunArgs) { - var stdinKV map[string]string - err := json.Unmarshal(stdinData, &stdinKV) + if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) { + var configData crunchrun.ConfigData + err := json.Unmarshal(stdinData, &configData) if err != nil { fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData) return 1 } for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} { - if stdinKV[name] == "" { + if configData.Env[name] == "" { fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData) return 1 } diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index a5924cf997..37e3fa9882 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -103,6 +103,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe instanceSetID: instanceSetID, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, newExecutor: newExecutor, + cluster: cluster, bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand, runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary, imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID), @@ -144,6 +145,7 @@ type Pool struct { instanceSetID cloud.InstanceSetID instanceSet *throttledInstanceSet newExecutor func(cloud.Instance) Executor + cluster *arvados.Cluster bootProbeCommand string runnerSource string imageID cloud.ImageID diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go index 0f5c5ee196..7b5634605f 100644 --- a/lib/dispatchcloud/worker/pool_test.go +++ b/lib/dispatchcloud/worker/pool_test.go @@ -10,10 +10,12 @@ import ( "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/dispatchcloud/test" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" check "gopkg.in/check.v1" ) @@ -31,7 +33,18 @@ func (*lessChecker) Check(params []interface{}, names []string) (result bool, er var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}} -type PoolSuite struct{} +type PoolSuite struct { + logger logrus.FieldLogger + testCluster *arvados.Cluster +} + +func (suite *PoolSuite) SetUpTest(c *check.C) { + suite.logger = ctxlog.TestLogger(c) + cfg, err := config.NewLoader(nil, suite.logger).Load() + c.Assert(err, check.IsNil) + suite.testCluster, err = cfg.GetCluster("") + c.Assert(err, check.IsNil) +} func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { type1 := test.InstanceType(1) @@ -63,10 +76,9 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { } } - logger := ctxlog.TestLogger(c) driver := &test.StubDriver{} instanceSetID := cloud.InstanceSetID("test-instance-set-id") - is, err := driver.InstanceSet(nil, instanceSetID, nil, logger) + is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger) c.Assert(err, check.IsNil) newExecutor := func(cloud.Instance) Executor { @@ -78,25 +90,21 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { } } - cluster := &arvados.Cluster{ - Containers: arvados.ContainersConfig{ - CloudVMs: arvados.CloudVMsConfig{ - BootProbeCommand: "true", - MaxProbesPerSecond: 1000, - ProbeInterval: arvados.Duration(time.Millisecond * 10), - SyncInterval: arvados.Duration(time.Millisecond * 10), - TagKeyPrefix: "testprefix:", - }, - CrunchRunCommand: "crunch-run-custom", - }, - InstanceTypes: arvados.InstanceTypeMap{ - type1.Name: type1, - type2.Name: type2, - type3.Name: type3, - }, + suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{ + BootProbeCommand: "true", + MaxProbesPerSecond: 1000, + ProbeInterval: arvados.Duration(time.Millisecond * 10), + SyncInterval: arvados.Duration(time.Millisecond * 10), + TagKeyPrefix: "testprefix:", + } + suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom" + suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{ + type1.Name: type1, + type2.Name: type2, + type3.Name: type3, } - pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) + pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster) notify := pool.Subscribe() defer pool.Unsubscribe(notify) pool.Create(type1) @@ -111,7 +119,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { } } // Wait for the tags to save to the cloud provider - tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior + tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior deadline := time.Now().Add(time.Second) for !func() bool { pool.mtx.RLock() @@ -132,7 +140,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { c.Log("------- starting new pool, waiting to recover state") - pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster) + pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster) notify2 := pool2.Subscribe() defer pool2.Unsubscribe(notify2) waitForIdle(pool2, notify2) @@ -148,9 +156,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) { } func (suite *PoolSuite) TestDrain(c *check.C) { - logger := ctxlog.TestLogger(c) driver := test.StubDriver{} - instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger) + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger) c.Assert(err, check.IsNil) ac := arvados.NewClientFromEnv() @@ -158,8 +165,9 @@ func (suite *PoolSuite) TestDrain(c *check.C) { type1 := test.InstanceType(1) pool := &Pool{ arvClient: ac, - logger: logger, + logger: suite.logger, newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, + cluster: suite.testCluster, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1, @@ -201,15 +209,15 @@ func (suite *PoolSuite) TestDrain(c *check.C) { } func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) { - logger := ctxlog.TestLogger(c) driver := test.StubDriver{HoldCloudOps: true} - instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger) + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger) c.Assert(err, check.IsNil) type1 := test.InstanceType(1) pool := &Pool{ - logger: logger, + logger: suite.logger, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, + cluster: suite.testCluster, maxConcurrentInstanceCreateOps: 1, instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1, @@ -241,17 +249,17 @@ func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) { } func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { - logger := ctxlog.TestLogger(c) driver := test.StubDriver{HoldCloudOps: true} - instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger) + instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger) c.Assert(err, check.IsNil) type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01} type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02} type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04} pool := &Pool{ - logger: logger, + logger: suite.logger, newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, + cluster: suite.testCluster, instanceSet: &throttledInstanceSet{InstanceSet: instanceSet}, instanceTypes: arvados.InstanceTypeMap{ type1.Name: type1, diff --git a/lib/dispatchcloud/worker/runner.go b/lib/dispatchcloud/worker/runner.go index 63561874c9..29c4b8e0a3 100644 --- a/lib/dispatchcloud/worker/runner.go +++ b/lib/dispatchcloud/worker/runner.go @@ -13,6 +13,7 @@ import ( "syscall" "time" + "git.arvados.org/arvados.git/lib/crunchrun" "github.com/sirupsen/logrus" ) @@ -21,7 +22,7 @@ import ( type remoteRunner struct { uuid string executor Executor - envJSON json.RawMessage + configJSON json.RawMessage runnerCmd string runnerArgs []string remoteUser string @@ -47,7 +48,8 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner { if err := enc.Encode(wkr.instType); err != nil { panic(err) } - env := map[string]string{ + var configData crunchrun.ConfigData + configData.Env = map[string]string{ "ARVADOS_API_HOST": wkr.wp.arvClient.APIHost, "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken, "InstanceType": instJSON.String(), @@ -55,16 +57,20 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner { "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid), } if wkr.wp.arvClient.Insecure { - env["ARVADOS_API_HOST_INSECURE"] = "1" + configData.Env["ARVADOS_API_HOST_INSECURE"] = "1" } - envJSON, err := json.Marshal(env) + if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 { + configData.Cluster = wkr.wp.cluster + configData.KeepBuffers = bufs * wkr.instType.VCPUs + } + configJSON, err := json.Marshal(configData) if err != nil { panic(err) } rr := &remoteRunner{ uuid: uuid, executor: wkr.executor, - envJSON: envJSON, + configJSON: configJSON, runnerCmd: wkr.wp.runnerCmd, runnerArgs: wkr.wp.runnerArgs, remoteUser: wkr.instance.RemoteUser(), @@ -84,7 +90,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner { // assume the remote process _might_ have started, at least until it // probes the worker and finds otherwise. func (rr *remoteRunner) Start() { - cmd := rr.runnerCmd + " --detach --stdin-env" + cmd := rr.runnerCmd + " --detach --stdin-config" for _, arg := range rr.runnerArgs { cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'" } @@ -92,7 +98,7 @@ func (rr *remoteRunner) Start() { if rr.remoteUser != "root" { cmd = "sudo " + cmd } - stdin := bytes.NewBuffer(rr.envJSON) + stdin := bytes.NewBuffer(rr.configJSON) stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin) if err != nil { rr.logger.WithField("stdout", string(stdout)). diff --git a/lib/dispatchcloud/worker/worker_test.go b/lib/dispatchcloud/worker/worker_test.go index 4134788b2e..2ee6b7c362 100644 --- a/lib/dispatchcloud/worker/worker_test.go +++ b/lib/dispatchcloud/worker/worker_test.go @@ -14,24 +14,36 @@ import ( "time" "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/dispatchcloud/test" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" check "gopkg.in/check.v1" ) var _ = check.Suite(&WorkerSuite{}) -type WorkerSuite struct{} +type WorkerSuite struct { + logger logrus.FieldLogger + testCluster *arvados.Cluster +} + +func (suite *WorkerSuite) SetUpTest(c *check.C) { + suite.logger = ctxlog.TestLogger(c) + cfg, err := config.NewLoader(nil, suite.logger).Load() + c.Assert(err, check.IsNil) + suite.testCluster, err = cfg.GetCluster("") + c.Assert(err, check.IsNil) +} func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { - logger := ctxlog.TestLogger(c) bootTimeout := time.Minute probeTimeout := time.Second ac := arvados.NewClientFromEnv() - is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger) + is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger) c.Assert(err, check.IsNil) inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil) c.Assert(err, check.IsNil) @@ -232,6 +244,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { wp := &Pool{ arvClient: ac, newExecutor: func(cloud.Instance) Executor { return exr }, + cluster: suite.testCluster, bootProbeCommand: "bootprobe", timeoutBooting: bootTimeout, timeoutProbe: probeTimeout, @@ -249,7 +262,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) { exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed } wkr := &worker{ - logger: logger, + logger: suite.logger, executor: exr, wp: wp, mtx: &wp.mtx, diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index f1d27b8dcd..b84e1eefaa 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -435,6 +435,7 @@ type ContainersConfig struct { SupportedDockerImageFormats StringSet UsePreemptibleInstances bool RuntimeEngine string + LocalKeepBlobBuffersPerVCPU int JobsAPI struct { Enable string -- 2.30.2