From 2ca82cf645eb7d9dad60f98e1feca67042c38c47 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 4 May 2022 14:52:22 -0400 Subject: [PATCH] 15370: Add dispatchcloud loopback driver. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/boot/passenger.go | 11 +- lib/boot/supervisor.go | 9 +- lib/cloud/loopback/loopback.go | 164 ++++++++++++++++++++++++++++ lib/cloud/loopback/loopback_test.go | 127 +++++++++++++++++++++ lib/config/config.default.yml | 4 +- lib/config/load.go | 68 +++++++++++- lib/config/load_test.go | 63 +++++++++++ lib/controller/integration_test.go | 105 ++++++++++++++++++ lib/crunchrun/executor_test.go | 32 +----- lib/dispatchcloud/driver.go | 6 +- lib/dispatchcloud/worker/pool.go | 10 +- lib/dispatchcloud/worker/worker.go | 9 ++ lib/install/init.go | 3 + sdk/go/arvados/container.go | 1 + sdk/go/arvadostest/busybox_image.go | 43 ++++++++ 15 files changed, 613 insertions(+), 42 deletions(-) create mode 100644 lib/cloud/loopback/loopback.go create mode 100644 lib/cloud/loopback/loopback_test.go create mode 100644 sdk/go/arvadostest/busybox_image.go diff --git a/lib/boot/passenger.go b/lib/boot/passenger.go index 410befab90..f86f1f9303 100644 --- a/lib/boot/passenger.go +++ b/lib/boot/passenger.go @@ -28,8 +28,8 @@ var railsEnv = []string{ // Install a Rails application's dependencies, including phusion // passenger. type installPassenger struct { - src string - varlibdir string + src string // path to app in source tree + varlibdir string // path to app (relative to /var/lib/arvados) in OS package: "railsapi" or "workbench1" depends []supervisedTask } @@ -52,6 +52,11 @@ func (runner installPassenger) Run(ctx context.Context, fail func(error), super appdir := runner.src if super.ClusterType == "test" { + // In the multi-cluster test setup, if we run multiple + // Rails instances directly from the source tree, they + // step on one another's files in {source}/tmp, log, + // etc. So instead we copy the source directory into a + // temp dir and run the Rails app from there. appdir = filepath.Join(super.tempdir, runner.varlibdir) err = super.RunProgram(ctx, super.tempdir, runOptions{}, "mkdir", "-p", appdir) if err != nil { @@ -112,7 +117,7 @@ func (runner installPassenger) Run(ctx context.Context, fail func(error), super type runPassenger struct { src string // path to app in source tree - varlibdir string // path to app (relative to /var/lib/arvados) in OS package + varlibdir string // path to app (relative to /var/lib/arvados) in OS package: "railsapi" or "workbench1" svc arvados.Service depends []supervisedTask } diff --git a/lib/boot/supervisor.go b/lib/boot/supervisor.go index 5ec1b22448..d69cc9f18c 100644 --- a/lib/boot/supervisor.go +++ b/lib/boot/supervisor.go @@ -387,10 +387,14 @@ func (super *Supervisor) runCluster() error { } if super.ClusterType != "test" { tasks = append(tasks, - runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud}, runServiceCommand{name: "keep-balance", svc: super.cluster.Services.Keepbalance}, ) } + if super.cluster.Containers.CloudVMs.Enable { + tasks = append(tasks, + runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud}, + ) + } super.tasksReady = map[string]chan bool{} for _, task := range tasks { super.tasksReady[task.String()] = make(chan bool) @@ -824,9 +828,6 @@ func (super *Supervisor) autofillConfig() error { &super.cluster.Services.Workbench1, &super.cluster.Services.Workbench2, } { - if svc == &super.cluster.Services.DispatchCloud && super.ClusterType == "test" { - continue - } if svc.ExternalURL.Host == "" { port, err := nextPort(defaultExtHost) if err != nil { diff --git a/lib/cloud/loopback/loopback.go b/lib/cloud/loopback/loopback.go new file mode 100644 index 0000000000..6ad4f876d9 --- /dev/null +++ b/lib/cloud/loopback/loopback.go @@ -0,0 +1,164 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package loopback + +import ( + "bytes" + "crypto/rand" + "crypto/rsa" + "encoding/json" + "errors" + "io" + "os/exec" + "os/user" + "strings" + "sync" + "syscall" + + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/dispatchcloud/test" + "git.arvados.org/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh" +) + +// Driver is the loopback implementation of the cloud.Driver interface. +var Driver = cloud.DriverFunc(newInstanceSet) + +var ( + errUnimplemented = errors.New("function not implemented by loopback driver") + errQuota = quotaError("loopback driver is always at quota") +) + +type quotaError string + +func (e quotaError) IsQuotaError() bool { return true } +func (e quotaError) Error() string { return string(e) } + +type instanceSet struct { + instanceSetID cloud.InstanceSetID + logger logrus.FieldLogger + instances []*instance + mtx sync.Mutex +} + +func newInstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) { + is := &instanceSet{ + instanceSetID: instanceSetID, + logger: logger, + } + return is, nil +} + +func (is *instanceSet) Create(it arvados.InstanceType, _ cloud.ImageID, tags cloud.InstanceTags, _ cloud.InitCommand, pubkey ssh.PublicKey) (cloud.Instance, error) { + is.mtx.Lock() + defer is.mtx.Unlock() + if len(is.instances) > 0 { + return nil, errQuota + } + u, err := user.Current() + if err != nil { + return nil, err + } + hostRSAKey, err := rsa.GenerateKey(rand.Reader, 1024) + if err != nil { + return nil, err + } + hostKey, err := ssh.NewSignerFromKey(hostRSAKey) + if err != nil { + return nil, err + } + hostPubKey, err := ssh.NewPublicKey(hostRSAKey.Public()) + if err != nil { + return nil, err + } + inst := &instance{ + is: is, + instanceType: it, + adminUser: u.Username, + tags: tags, + hostPubKey: hostPubKey, + sshService: test.SSHService{ + HostKey: hostKey, + AuthorizedUser: u.Username, + AuthorizedKeys: []ssh.PublicKey{pubkey}, + }, + } + inst.sshService.Exec = inst.sshExecFunc + go inst.sshService.Start() + is.instances = []*instance{inst} + return inst, nil +} + +func (is *instanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) { + is.mtx.Lock() + defer is.mtx.Unlock() + var ret []cloud.Instance + for _, inst := range is.instances { + ret = append(ret, inst) + } + return ret, nil +} + +func (is *instanceSet) Stop() { + is.mtx.Lock() + defer is.mtx.Unlock() + for _, inst := range is.instances { + inst.sshService.Close() + } +} + +type instance struct { + is *instanceSet + instanceType arvados.InstanceType + adminUser string + tags cloud.InstanceTags + hostPubKey ssh.PublicKey + sshService test.SSHService +} + +func (i *instance) ID() cloud.InstanceID { return cloud.InstanceID(i.instanceType.ProviderType) } +func (i *instance) String() string { return i.instanceType.ProviderType } +func (i *instance) ProviderType() string { return i.instanceType.ProviderType } +func (i *instance) Address() string { return i.sshService.Address() } +func (i *instance) RemoteUser() string { return i.adminUser } +func (i *instance) Tags() cloud.InstanceTags { return i.tags } +func (i *instance) SetTags(tags cloud.InstanceTags) error { + i.tags = tags + return nil +} +func (i *instance) Destroy() error { + i.is.mtx.Lock() + defer i.is.mtx.Unlock() + i.is.instances = i.is.instances[:0] + return nil +} +func (i *instance) VerifyHostKey(pubkey ssh.PublicKey, _ *ssh.Client) error { + if !bytes.Equal(pubkey.Marshal(), i.hostPubKey.Marshal()) { + return errors.New("host key mismatch") + } + return nil +} +func (i *instance) sshExecFunc(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 { + cmd := exec.Command("sh", "-c", strings.TrimPrefix(command, "sudo ")) + cmd.Stdin = stdin + cmd.Stdout = stdout + cmd.Stderr = stderr + for k, v := range env { + cmd.Env = append(cmd.Env, k+"="+v) + } + // Prevent child process from using our tty. + cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true} + err := cmd.Run() + if err == nil { + return 0 + } else if err, ok := err.(*exec.ExitError); !ok { + return 1 + } else if code := err.ExitCode(); code < 0 { + return 1 + } else { + return uint32(code) + } +} diff --git a/lib/cloud/loopback/loopback_test.go b/lib/cloud/loopback/loopback_test.go new file mode 100644 index 0000000000..5c30f5f0e1 --- /dev/null +++ b/lib/cloud/loopback/loopback_test.go @@ -0,0 +1,127 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package loopback + +import ( + "crypto/rand" + "crypto/rsa" + "encoding/json" + "testing" + "time" + + "git.arvados.org/arvados.git/lib/cloud" + "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor" + "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "golang.org/x/crypto/ssh" + check "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type suite struct{} + +var _ = check.Suite(&suite{}) + +func (*suite) TestCreateListExecDestroy(c *check.C) { + logger := ctxlog.TestLogger(c) + is, err := Driver.InstanceSet(json.RawMessage("{}"), "testInstanceSetID", cloud.SharedResourceTags{"sharedTag": "sharedTagValue"}, logger) + c.Assert(err, check.IsNil) + + clientRSAKey, err := rsa.GenerateKey(rand.Reader, 1024) + c.Assert(err, check.IsNil) + clientSSHKey, err := ssh.NewSignerFromKey(clientRSAKey) + c.Assert(err, check.IsNil) + clientSSHPubKey, err := ssh.NewPublicKey(clientRSAKey.Public()) + c.Assert(err, check.IsNil) + + it := arvados.InstanceType{ + Name: "localhost", + ProviderType: "localhost", + RAM: 1002003004, + VCPUs: 5, + } + + // First call to Create should succeed, and the returned + // instance's SSH target address should be available in << 1s. + inst, err := is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey) + c.Assert(err, check.IsNil) + for deadline := time.Now().Add(time.Second); inst.Address() == ""; time.Sleep(time.Second / 100) { + if deadline.Before(time.Now()) { + c.Fatal("timed out") + } + } + + // Another call to Create should fail with a quota error. + inst2, err := is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey) + c.Check(inst2, check.IsNil) + qerr, ok := err.(cloud.QuotaError) + if c.Check(ok, check.Equals, true, check.Commentf("expect cloud.QuotaError, got %#v", err)) { + c.Check(qerr.IsQuotaError(), check.Equals, true) + } + + // Instance list should now have one entry, for the new + // instance. + list, err := is.Instances(nil) + c.Assert(err, check.IsNil) + c.Assert(list, check.HasLen, 1) + inst = list[0] + c.Check(inst.String(), check.Equals, "localhost") + + // Instance's SSH server should execute shell commands. + exr := sshexecutor.New(inst) + exr.SetSigners(clientSSHKey) + + stdout, stderr, err := exr.Execute(nil, "echo ok", nil) + c.Check(err, check.IsNil) + c.Check(string(stdout), check.Equals, "ok\n") + c.Check(string(stderr), check.Equals, "") + + // SSH server should propagate stderr and non-zero exit + // status. + stdout, stderr, err = exr.Execute(nil, "echo fail && echo -n fail2 >&2 && false", nil) + c.Check(err, check.FitsTypeOf, &ssh.ExitError{}) + c.Check(string(stdout), check.Equals, "fail\n") + c.Check(string(stderr), check.Equals, "fail2") + + // SSH server should strip "sudo" from the front of the + // command. + withoutsudo, _, err := exr.Execute(nil, "whoami", nil) + c.Check(err, check.IsNil) + withsudo, _, err := exr.Execute(nil, "sudo whoami", nil) + c.Check(err, check.IsNil) + c.Check(string(withsudo), check.Equals, string(withoutsudo)) + + // SSH server should reject keys other than the one whose + // public key we passed to Create. + badRSAKey, err := rsa.GenerateKey(rand.Reader, 1024) + c.Assert(err, check.IsNil) + badSSHKey, err := ssh.NewSignerFromKey(badRSAKey) + c.Assert(err, check.IsNil) + // Create a new executor here, otherwise Execute would reuse + // the existing connection instead of authenticating with + // badRSAKey. + exr = sshexecutor.New(inst) + exr.SetSigners(badSSHKey) + stdout, stderr, err = exr.Execute(nil, "true", nil) + c.Check(err, check.ErrorMatches, `.*unable to authenticate.*`) + + // Destroying the instance causes it to disappear from the + // list, and allows us to create one more. + err = inst.Destroy() + c.Check(err, check.IsNil) + list, err = is.Instances(nil) + c.Assert(err, check.IsNil) + c.Assert(list, check.HasLen, 0) + _, err = is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey) + c.Check(err, check.IsNil) + _, err = is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey) + c.Check(err, check.NotNil) + list, err = is.Instances(nil) + c.Assert(err, check.IsNil) + c.Assert(list, check.HasLen, 1) +} diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index e60880c217..b09c1ecb9f 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -1278,7 +1278,9 @@ Clusters: # need to be detected and cleaned up manually. TagKeyPrefix: Arvados - # Cloud driver: "azure" (Microsoft Azure) or "ec2" (Amazon AWS). + # Cloud driver: "azure" (Microsoft Azure), "ec2" (Amazon AWS), + # or "loopback" (run containers on dispatch host for testing + # purposes). Driver: ec2 # Cloud-specific driver parameters. diff --git a/lib/config/load.go b/lib/config/load.go index 8f8ab2bf27..abb3a804ba 100644 --- a/lib/config/load.go +++ b/lib/config/load.go @@ -16,6 +16,7 @@ import ( "io/ioutil" "os" "regexp" + "runtime" "strconv" "strings" "time" @@ -25,6 +26,7 @@ import ( "github.com/imdario/mergo" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" ) //go:embed config.default.yml @@ -297,7 +299,10 @@ func (ldr *Loader) Load() (*arvados.Config, error) { ldr.loadOldKeepBalanceConfig, ) } - loadFuncs = append(loadFuncs, ldr.setImplicitStorageClasses) + loadFuncs = append(loadFuncs, + ldr.setImplicitStorageClasses, + ldr.setLoopbackInstanceType, + ) for _, f := range loadFuncs { err = f(&cfg) if err != nil { @@ -414,6 +419,67 @@ func (ldr *Loader) checkEnum(label, value string, accepted ...string) error { return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted) } +func (ldr *Loader) setLoopbackInstanceType(cfg *arvados.Config) error { + for id, cc := range cfg.Clusters { + if !cc.Containers.CloudVMs.Enable || cc.Containers.CloudVMs.Driver != "loopback" { + continue + } + if len(cc.InstanceTypes) == 1 { + continue + } + if len(cc.InstanceTypes) > 1 { + return fmt.Errorf("Clusters.%s.InstanceTypes: cannot use multiple InstanceTypes with loopback driver", id) + } + // No InstanceTypes configured. Fill in implicit + // default. + hostram, err := getHostRAM() + if err != nil { + return err + } + scratch, err := getFilesystemSize(os.TempDir()) + if err != nil { + return err + } + cc.InstanceTypes = arvados.InstanceTypeMap{"localhost": { + Name: "localhost", + ProviderType: "localhost", + VCPUs: runtime.NumCPU(), + RAM: hostram, + Scratch: scratch, + IncludedScratch: scratch, + }} + cfg.Clusters[id] = cc + } + return nil +} + +func getFilesystemSize(path string) (arvados.ByteSize, error) { + var stat unix.Statfs_t + err := unix.Statfs(path, &stat) + if err != nil { + return 0, err + } + return arvados.ByteSize(stat.Blocks * uint64(stat.Bsize)), nil +} + +var reMemTotal = regexp.MustCompile(`(^|\n)MemTotal: *(\d+) kB\n`) + +func getHostRAM() (arvados.ByteSize, error) { + buf, err := os.ReadFile("/proc/meminfo") + if err != nil { + return 0, err + } + m := reMemTotal.FindSubmatch(buf) + if m == nil { + return 0, errors.New("error parsing /proc/meminfo: no MemTotal") + } + kb, err := strconv.ParseInt(string(m[2]), 10, 64) + if err != nil { + return 0, fmt.Errorf("error parsing /proc/meminfo: %q: %w", m[2], err) + } + return arvados.ByteSize(kb) * 1024, nil +} + func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error { cluster: for id, cc := range cfg.Clusters { diff --git a/lib/config/load_test.go b/lib/config/load_test.go index 4ae9a513c8..256e8a3e8c 100644 --- a/lib/config/load_test.go +++ b/lib/config/load_test.go @@ -13,6 +13,7 @@ import ( "os/exec" "reflect" "regexp" + "runtime" "strings" "testing" "time" @@ -601,6 +602,55 @@ func (s *LoadSuite) TestListKeys(c *check.C) { } } +func (s *LoadSuite) TestLoopbackInstanceTypes(c *check.C) { + ldr := testLoader(c, ` +Clusters: + z1111: + Containers: + CloudVMs: + Enable: true + Driver: loopback + InstanceTypes: + a: {} + b: {} +`, nil) + cfg, err := ldr.Load() + c.Check(err, check.ErrorMatches, `Clusters\.z1111\.InstanceTypes: cannot use multiple InstanceTypes with loopback driver`) + + ldr = testLoader(c, ` +Clusters: + z1111: + Containers: + CloudVMs: + Enable: true + Driver: loopback +`, nil) + cfg, err = ldr.Load() + c.Assert(err, check.IsNil) + cc, err := cfg.GetCluster("") + c.Assert(err, check.IsNil) + c.Check(cc.InstanceTypes, check.HasLen, 1) + c.Check(cc.InstanceTypes["localhost"].VCPUs, check.Equals, runtime.NumCPU()) + + ldr = testLoader(c, ` +Clusters: + z1111: + Containers: + CloudVMs: + Enable: true + Driver: loopback + InstanceTypes: + a: + VCPUs: 9 +`, nil) + cfg, err = ldr.Load() + c.Assert(err, check.IsNil) + cc, err = cfg.GetCluster("") + c.Assert(err, check.IsNil) + c.Check(cc.InstanceTypes, check.HasLen, 1) + c.Check(cc.InstanceTypes["a"].VCPUs, check.Equals, 9) +} + func (s *LoadSuite) TestImplicitStorageClasses(c *check.C) { // If StorageClasses and Volumes.*.StorageClasses are all // empty, there is a default storage class named "default". @@ -824,3 +874,16 @@ arvados_config_source_timestamp_seconds{sha256="83aea5d82eb1d53372cd65c936c60acc `) } } + +func (s *LoadSuite) TestGetHostRAM(c *check.C) { + hostram, err := getHostRAM() + c.Check(err, check.IsNil) + c.Logf("getHostRAM() == %v", hostram) +} + +func (s *LoadSuite) TestGetFilesystemSize(c *check.C) { + path := c.MkDir() + size, err := getFilesystemSize(path) + c.Check(err, check.IsNil) + c.Logf("getFilesystemSize(%q) == %v", path, size) +} diff --git a/lib/controller/integration_test.go b/lib/controller/integration_test.go index 44be17c77f..d65bb3149e 100644 --- a/lib/controller/integration_test.go +++ b/lib/controller/integration_test.go @@ -11,6 +11,7 @@ import ( "encoding/json" "fmt" "io" + "io/fs" "io/ioutil" "math" "net" @@ -71,6 +72,17 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) { Insecure: true SystemLogs: Format: text + Containers: + CloudVMs: + Enable: true + Driver: loopback + BootProbeCommand: id + ProbeInterval: 1s + PollInterval: 5s + SyncInterval: 10s + TimeoutIdle: 1s + TimeoutBooting: 2s + RuntimeEngine: singularity RemoteClusters: z1111: Host: ` + hostport["z1111"] + ` @@ -1111,3 +1123,96 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) { c.Check(err, check.ErrorMatches, `request failed: .* 401 Unauthorized: cannot use a locally issued token to forward a request to our login cluster \(z1111\)`) c.Check(err, check.Not(check.ErrorMatches), `(?ms).*127\.0\.0\.11.*`) } + +func (s *IntegrationSuite) TestRunTrivialContainer(c *check.C) { + conn1 := s.super.Conn("z1111") + rootctx1, _, _ := s.super.RootClients("z1111") + _, ac1, kc1, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true) + + c.Log("[docker load]") + out, err := exec.Command("docker", "load", "--input", arvadostest.BusyboxDockerImage(c)).CombinedOutput() + c.Logf("[docker load done] %s", out) + c.Check(err, check.IsNil) + + c.Log("[arv-keepdocker]") + akd := exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc") + akd.Env = append(os.Environ(), "ARVADOS_API_HOST="+ac1.APIHost, "ARVADOS_API_HOST_INSECURE=1", "ARVADOS_API_TOKEN="+ac1.AuthToken) + c.Logf("[arv-keepdocker env] %q", akd.Env) + out, err = akd.CombinedOutput() + c.Logf("[arv-keepdocker done] %s", out) + c.Check(err, check.IsNil) + + var cr arvados.ContainerRequest + err = ac1.RequestAndDecode(&cr, "POST", "/arvados/v1/container_requests", nil, map[string]interface{}{ + "container_request": map[string]interface{}{ + "command": []string{"touch", "/out/hello_world"}, + "container_image": "busybox:uclibc", + "cwd": "/tmp", + "environment": map[string]string{}, + "mounts": map[string]arvados.Mount{"/out": {Kind: "tmp", Capacity: 10000}}, + "output_path": "/out", + "runtime_constraints": arvados.RuntimeConstraints{RAM: 100000000, VCPUs: 1}, + "priority": 1, + "state": arvados.ContainerRequestStateCommitted, + }, + }) + c.Assert(err, check.IsNil) + + showlogs := func(collectionID string) { + var logcoll arvados.Collection + err = ac1.RequestAndDecode(&logcoll, "GET", "/arvados/v1/collections/"+collectionID, nil, nil) + c.Assert(err, check.IsNil) + cfs, err := logcoll.FileSystem(ac1, kc1) + c.Assert(err, check.IsNil) + fs.WalkDir(arvados.FS(cfs), "/", func(path string, d fs.DirEntry, err error) error { + if d.IsDir() || strings.HasPrefix(path, "/log for container") { + return nil + } + f, err := cfs.Open(path) + c.Assert(err, check.IsNil) + defer f.Close() + buf, err := ioutil.ReadAll(f) + c.Assert(err, check.IsNil) + c.Logf("=== %s\n%s\n", path, buf) + return nil + }) + } + + var ctr arvados.Container + var lastState arvados.ContainerState + deadline := time.Now().Add(time.Minute) +wait: + for ; ; lastState = ctr.State { + err = ac1.RequestAndDecode(&ctr, "GET", "/arvados/v1/containers/"+cr.ContainerUUID, nil, nil) + c.Assert(err, check.IsNil) + switch ctr.State { + case lastState: + if time.Now().After(deadline) { + c.Errorf("timed out, container request state is %q", cr.State) + showlogs(ctr.Log) + c.FailNow() + } + time.Sleep(time.Second / 2) + case arvados.ContainerStateComplete: + break wait + case arvados.ContainerStateQueued, arvados.ContainerStateLocked, arvados.ContainerStateRunning: + c.Logf("container state changed to %q", ctr.State) + default: + c.Errorf("unexpected container state %q", ctr.State) + showlogs(ctr.Log) + c.FailNow() + } + } + c.Check(ctr.ExitCode, check.Equals, 0) + + err = ac1.RequestAndDecode(&cr, "GET", "/arvados/v1/container_requests/"+cr.UUID, nil, nil) + c.Assert(err, check.IsNil) + + showlogs(cr.LogUUID) + + var outcoll arvados.Collection + err = ac1.RequestAndDecode(&outcoll, "GET", "/arvados/v1/collections/"+cr.OutputUUID, nil, nil) + c.Assert(err, check.IsNil) + c.Check(outcoll.ManifestText, check.Matches, `\. d41d8.* 0:0:hello_world\n`) + c.Check(outcoll.PortableDataHash, check.Equals, "dac08d558cfb6c9536f604ca89e3c002+53") +} diff --git a/lib/crunchrun/executor_test.go b/lib/crunchrun/executor_test.go index 99af0530ff..1833fc8ac4 100644 --- a/lib/crunchrun/executor_test.go +++ b/lib/crunchrun/executor_test.go @@ -7,43 +7,15 @@ package crunchrun import ( "bytes" "io" - "io/ioutil" - "net/http" - "os" "strings" "time" "git.arvados.org/arvados.git/sdk/go/arvados" + "git.arvados.org/arvados.git/sdk/go/arvadostest" "golang.org/x/net/context" . "gopkg.in/check.v1" ) -func busyboxDockerImage(c *C) string { - fnm := "busybox_uclibc.tar" - cachedir := c.MkDir() - cachefile := cachedir + "/" + fnm - if _, err := os.Stat(cachefile); err == nil { - return cachefile - } - - f, err := ioutil.TempFile(cachedir, "") - c.Assert(err, IsNil) - defer f.Close() - defer os.Remove(f.Name()) - - resp, err := http.Get("https://cache.arvados.org/" + fnm) - c.Assert(err, IsNil) - defer resp.Body.Close() - _, err = io.Copy(f, resp.Body) - c.Assert(err, IsNil) - err = f.Close() - c.Assert(err, IsNil) - err = os.Rename(f.Name(), cachefile) - c.Assert(err, IsNil) - - return cachefile -} - type nopWriteCloser struct{ io.Writer } func (nopWriteCloser) Close() error { return nil } @@ -71,7 +43,7 @@ func (s *executorSuite) SetUpTest(c *C) { Stdout: nopWriteCloser{&s.stdout}, Stderr: nopWriteCloser{&s.stderr}, } - err := s.executor.LoadImage("", busyboxDockerImage(c), arvados.Container{}, "", nil) + err := s.executor.LoadImage("", arvadostest.BusyboxDockerImage(c), arvados.Container{}, "", nil) c.Assert(err, IsNil) } diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go index 5fcc0903f5..93515defb7 100644 --- a/lib/dispatchcloud/driver.go +++ b/lib/dispatchcloud/driver.go @@ -11,6 +11,7 @@ import ( "git.arvados.org/arvados.git/lib/cloud" "git.arvados.org/arvados.git/lib/cloud/azure" "git.arvados.org/arvados.git/lib/cloud/ec2" + "git.arvados.org/arvados.git/lib/cloud/loopback" "git.arvados.org/arvados.git/sdk/go/arvados" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" @@ -21,8 +22,9 @@ import ( // Clusters.*.Containers.CloudVMs.Driver configuration values // correspond to keys in this map. var Drivers = map[string]cloud.Driver{ - "azure": azure.Driver, - "ec2": ec2.Driver, + "azure": azure.Driver, + "ec2": ec2.Driver, + "loopback": loopback.Driver, } func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) { diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go index 37e3fa9882..66e0bfee91 100644 --- a/lib/dispatchcloud/worker/pool.go +++ b/lib/dispatchcloud/worker/pool.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "io/ioutil" + mathrand "math/rand" "sort" "strings" "sync" @@ -774,6 +775,13 @@ func (wp *Pool) runProbes() { workers := []cloud.InstanceID{} for range probeticker.C { + // Add some jitter. Without this, if probeInterval is + // a multiple of syncInterval and sync is + // instantaneous (as with the loopback driver), the + // first few probes race with sync operations and + // don't update the workers. + time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23))) + workers = workers[:0] wp.mtx.Lock() for id, wkr := range wp.workers { @@ -900,7 +908,7 @@ func (wp *Pool) loadRunnerData() error { } wp.runnerData = buf wp.runnerMD5 = md5.Sum(buf) - wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5) + wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5) return nil } diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go index 9e89d7daaf..1c8d62c20e 100644 --- a/lib/dispatchcloud/worker/worker.go +++ b/lib/dispatchcloud/worker/worker.go @@ -313,6 +313,10 @@ func (wkr *worker) probeAndUpdate() { // not yet running when ctrUUIDs was generated. Leave // wkr.running alone and wait for the next probe to // catch up on any changes. + logger.WithFields(logrus.Fields{ + "updated": updated, + "wkr.updated": wkr.updated, + }).Debug("skipping worker state update due to probe/sync race") return } @@ -387,6 +391,11 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) { wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds()) return } + wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }).Debug("probe succeeded") wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds()) ok = true diff --git a/lib/install/init.go b/lib/install/init.go index 6a52441d71..d2fed1dd7a 100644 --- a/lib/install/init.go +++ b/lib/install/init.go @@ -160,6 +160,9 @@ func (initcmd *initCommand) RunCommand(prog string, args []string, stdin io.Read {{end}} Containers: DispatchPrivateKey: {{printf "%q" .GenerateSSHPrivateKey}} + CloudVMs: + Enable: true + Driver: loopback ManagementToken: {{printf "%q" ( .RandomHex 50 )}} PostgreSQL: Connection: diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go index 3510a6db04..d6d75192f0 100644 --- a/sdk/go/arvados/container.go +++ b/sdk/go/arvados/container.go @@ -37,6 +37,7 @@ type Container struct { RuntimeAuthScopes []string `json:"runtime_auth_scopes"` RuntimeToken string `json:"runtime_token"` AuthUUID string `json:"auth_uuid"` + Log string `json:"log"` } // ContainerRequest is an arvados#container_request resource. diff --git a/sdk/go/arvadostest/busybox_image.go b/sdk/go/arvadostest/busybox_image.go new file mode 100644 index 0000000000..0411491a63 --- /dev/null +++ b/sdk/go/arvadostest/busybox_image.go @@ -0,0 +1,43 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package arvadostest + +import ( + "io" + "io/ioutil" + "net/http" + "os" + + . "gopkg.in/check.v1" +) + +// BusyboxDockerImage downloads the busybox:uclibc docker image +// (busybox_uclibc.tar) from cache.arvados.org into a temporary file +// and returns the temporary file name. +func BusyboxDockerImage(c *C) string { + fnm := "busybox_uclibc.tar" + cachedir := c.MkDir() + cachefile := cachedir + "/" + fnm + if _, err := os.Stat(cachefile); err == nil { + return cachefile + } + + f, err := ioutil.TempFile(cachedir, "") + c.Assert(err, IsNil) + defer f.Close() + defer os.Remove(f.Name()) + + resp, err := http.Get("https://cache.arvados.org/" + fnm) + c.Assert(err, IsNil) + defer resp.Body.Close() + _, err = io.Copy(f, resp.Body) + c.Assert(err, IsNil) + err = f.Close() + c.Assert(err, IsNil) + err = os.Rename(f.Name(), cachefile) + c.Assert(err, IsNil) + + return cachefile +} -- 2.30.2