15370: Merge branch 'main' into 15370-loopback-dispatchcloud
authorTom Clegg <tom@curii.com>
Wed, 18 May 2022 18:01:21 +0000 (14:01 -0400)
committerTom Clegg <tom@curii.com>
Wed, 18 May 2022 18:01:21 +0000 (14:01 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

18 files changed:
build/run-tests.sh
lib/boot/passenger.go
lib/boot/supervisor.go
lib/cloud/loopback/loopback.go [new file with mode: 0644]
lib/cloud/loopback/loopback_test.go [new file with mode: 0644]
lib/config/config.default.yml
lib/config/load.go
lib/config/load_test.go
lib/controller/integration_test.go
lib/crunchrun/executor_test.go
lib/crunchrun/integration_test.go
lib/dispatchcloud/driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/worker.go
lib/install/init.go
sdk/go/arvados/container.go
sdk/go/arvadostest/busybox_image.go [new file with mode: 0644]
tools/sync-groups/federation_test.go

index 0f996f77e927b4c4d6e0eae6458c4d80f3229be7..9f5d37322ef73743ab4cc19d55ff883bd49d4890 100755 (executable)
@@ -736,7 +736,7 @@ do_test() {
 
 go_ldflags() {
     version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}
-    echo "-X git.arvados.org/arvados.git/lib/cmd.version=${version} -X main.version=${version}"
+    echo "-X git.arvados.org/arvados.git/lib/cmd.version=${version} -X main.version=${version} -s -w"
 }
 
 do_test_once() {
index f0cd02946f376719c4ab8907d01d5533903a24d4..f86f1f930398f48a133cc33fe4752333246b73cb 100644 (file)
@@ -28,8 +28,9 @@ var railsEnv = []string{
 // Install a Rails application's dependencies, including phusion
 // passenger.
 type installPassenger struct {
-       src     string
-       depends []supervisedTask
+       src       string // path to app in source tree
+       varlibdir string // path to app (relative to /var/lib/arvados) in OS package: "railsapi" or "workbench1"
+       depends   []supervisedTask
 }
 
 func (runner installPassenger) String() string {
@@ -49,33 +50,61 @@ func (runner installPassenger) Run(ctx context.Context, fail func(error), super
        passengerInstallMutex.Lock()
        defer passengerInstallMutex.Unlock()
 
+       appdir := runner.src
+       if super.ClusterType == "test" {
+               // In the multi-cluster test setup, if we run multiple
+               // Rails instances directly from the source tree, they
+               // step on one another's files in {source}/tmp, log,
+               // etc. So instead we copy the source directory into a
+               // temp dir and run the Rails app from there.
+               appdir = filepath.Join(super.tempdir, runner.varlibdir)
+               err = super.RunProgram(ctx, super.tempdir, runOptions{}, "mkdir", "-p", appdir)
+               if err != nil {
+                       return err
+               }
+               err = super.RunProgram(ctx, filepath.Join(super.SourcePath, runner.src), runOptions{}, "rsync",
+                       "-a", "--no-owner", "--no-group", "--delete-after", "--delete-excluded",
+                       "--exclude", "/coverage",
+                       "--exclude", "/log",
+                       "--exclude", "/node_modules",
+                       "--exclude", "/tmp",
+                       "--exclude", "/public/assets",
+                       "--exclude", "/vendor",
+                       "--exclude", "/config/environments",
+                       "./",
+                       appdir+"/")
+               if err != nil {
+                       return err
+               }
+       }
+
        var buf bytes.Buffer
-       err = super.RunProgram(ctx, runner.src, runOptions{output: &buf}, "gem", "list", "--details", "bundler")
+       err = super.RunProgram(ctx, appdir, runOptions{output: &buf}, "gem", "list", "--details", "bundler")
        if err != nil {
                return err
        }
        for _, version := range []string{"2.2.19"} {
                if !strings.Contains(buf.String(), "("+version+")") {
-                       err = super.RunProgram(ctx, runner.src, runOptions{}, "gem", "install", "--user", "--conservative", "--no-document", "bundler:2.2.19")
+                       err = super.RunProgram(ctx, appdir, runOptions{}, "gem", "install", "--user", "--conservative", "--no-document", "bundler:2.2.19")
                        if err != nil {
                                return err
                        }
                        break
                }
        }
-       err = super.RunProgram(ctx, runner.src, runOptions{}, "bundle", "install", "--jobs", "4", "--path", filepath.Join(os.Getenv("HOME"), ".gem"))
+       err = super.RunProgram(ctx, appdir, runOptions{}, "bundle", "install", "--jobs", "4", "--path", filepath.Join(os.Getenv("HOME"), ".gem"))
        if err != nil {
                return err
        }
-       err = super.RunProgram(ctx, runner.src, runOptions{}, "bundle", "exec", "passenger-config", "build-native-support")
+       err = super.RunProgram(ctx, appdir, runOptions{}, "bundle", "exec", "passenger-config", "build-native-support")
        if err != nil {
                return err
        }
-       err = super.RunProgram(ctx, runner.src, runOptions{}, "bundle", "exec", "passenger-config", "install-standalone-runtime")
+       err = super.RunProgram(ctx, appdir, runOptions{}, "bundle", "exec", "passenger-config", "install-standalone-runtime")
        if err != nil {
                return err
        }
-       err = super.RunProgram(ctx, runner.src, runOptions{}, "bundle", "exec", "passenger-config", "validate-install")
+       err = super.RunProgram(ctx, appdir, runOptions{}, "bundle", "exec", "passenger-config", "validate-install")
        if err != nil && !strings.Contains(err.Error(), "exit status 2") {
                // Exit code 2 indicates there were warnings (like
                // "other passenger installations have been detected",
@@ -88,7 +117,7 @@ func (runner installPassenger) Run(ctx context.Context, fail func(error), super
 
 type runPassenger struct {
        src       string // path to app in source tree
-       varlibdir string // path to app (relative to /var/lib/arvados) in OS package
+       varlibdir string // path to app (relative to /var/lib/arvados) in OS package: "railsapi" or "workbench1"
        svc       arvados.Service
        depends   []supervisedTask
 }
@@ -107,9 +136,12 @@ func (runner runPassenger) Run(ctx context.Context, fail func(error), super *Sup
                return fmt.Errorf("bug: no internalPort for %q: %v (%#v)", runner, err, runner.svc)
        }
        var appdir string
-       if super.ClusterType == "production" {
+       switch super.ClusterType {
+       case "production":
                appdir = "/var/lib/arvados/" + runner.varlibdir
-       } else {
+       case "test":
+               appdir = filepath.Join(super.tempdir, runner.varlibdir)
+       default:
                appdir = runner.src
        }
        loglevel := "4"
index 4b1526600686d9bfbe63f6039ec7f1f30ed78306..7e641c62dd4407a69f6fbf7b3eeb6ff2cd97e002 100644 (file)
@@ -370,14 +370,14 @@ func (super *Supervisor) runCluster() error {
                runServiceCommand{name: "keepstore", svc: super.cluster.Services.Keepstore},
                runServiceCommand{name: "keep-web", svc: super.cluster.Services.WebDAV},
                runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{seedDatabase{}}},
-               installPassenger{src: "services/api"},
-               runPassenger{src: "services/api", varlibdir: "railsapi", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, seedDatabase{}, installPassenger{src: "services/api"}}},
+               installPassenger{src: "services/api", varlibdir: "railsapi"},
+               runPassenger{src: "services/api", varlibdir: "railsapi", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, seedDatabase{}, installPassenger{src: "services/api", varlibdir: "railsapi"}}},
                seedDatabase{},
        }
        if !super.NoWorkbench1 {
                tasks = append(tasks,
-                       installPassenger{src: "apps/workbench", depends: []supervisedTask{seedDatabase{}}}, // dependency ensures workbench doesn't delay api install/startup
-                       runPassenger{src: "apps/workbench", varlibdir: "workbench1", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}},
+                       installPassenger{src: "apps/workbench", varlibdir: "workbench1", depends: []supervisedTask{seedDatabase{}}}, // dependency ensures workbench doesn't delay api install/startup
+                       runPassenger{src: "apps/workbench", varlibdir: "workbench1", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench", varlibdir: "workbench1"}}},
                )
        }
        if !super.NoWorkbench2 {
@@ -387,10 +387,14 @@ func (super *Supervisor) runCluster() error {
        }
        if super.ClusterType != "test" {
                tasks = append(tasks,
-                       runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
                        runServiceCommand{name: "keep-balance", svc: super.cluster.Services.Keepbalance},
                )
        }
+       if super.cluster.Containers.CloudVMs.Enable {
+               tasks = append(tasks,
+                       runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
+               )
+       }
        super.tasksReady = map[string]chan bool{}
        for _, task := range tasks {
                super.tasksReady[task.String()] = make(chan bool)
@@ -824,9 +828,6 @@ func (super *Supervisor) autofillConfig() error {
                &super.cluster.Services.Workbench1,
                &super.cluster.Services.Workbench2,
        } {
-               if svc == &super.cluster.Services.DispatchCloud && super.ClusterType == "test" {
-                       continue
-               }
                if svc.ExternalURL.Host == "" {
                        port, err := nextPort(defaultExtHost)
                        if err != nil {
diff --git a/lib/cloud/loopback/loopback.go b/lib/cloud/loopback/loopback.go
new file mode 100644 (file)
index 0000000..6ad4f87
--- /dev/null
@@ -0,0 +1,164 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package loopback
+
+import (
+       "bytes"
+       "crypto/rand"
+       "crypto/rsa"
+       "encoding/json"
+       "errors"
+       "io"
+       "os/exec"
+       "os/user"
+       "strings"
+       "sync"
+       "syscall"
+
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/test"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
+)
+
+// Driver is the loopback implementation of the cloud.Driver interface.
+var Driver = cloud.DriverFunc(newInstanceSet)
+
+var (
+       errUnimplemented = errors.New("function not implemented by loopback driver")
+       errQuota         = quotaError("loopback driver is always at quota")
+)
+
+type quotaError string
+
+func (e quotaError) IsQuotaError() bool { return true }
+func (e quotaError) Error() string      { return string(e) }
+
+type instanceSet struct {
+       instanceSetID cloud.InstanceSetID
+       logger        logrus.FieldLogger
+       instances     []*instance
+       mtx           sync.Mutex
+}
+
+func newInstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+       is := &instanceSet{
+               instanceSetID: instanceSetID,
+               logger:        logger,
+       }
+       return is, nil
+}
+
+func (is *instanceSet) Create(it arvados.InstanceType, _ cloud.ImageID, tags cloud.InstanceTags, _ cloud.InitCommand, pubkey ssh.PublicKey) (cloud.Instance, error) {
+       is.mtx.Lock()
+       defer is.mtx.Unlock()
+       if len(is.instances) > 0 {
+               return nil, errQuota
+       }
+       u, err := user.Current()
+       if err != nil {
+               return nil, err
+       }
+       hostRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+       if err != nil {
+               return nil, err
+       }
+       hostKey, err := ssh.NewSignerFromKey(hostRSAKey)
+       if err != nil {
+               return nil, err
+       }
+       hostPubKey, err := ssh.NewPublicKey(hostRSAKey.Public())
+       if err != nil {
+               return nil, err
+       }
+       inst := &instance{
+               is:           is,
+               instanceType: it,
+               adminUser:    u.Username,
+               tags:         tags,
+               hostPubKey:   hostPubKey,
+               sshService: test.SSHService{
+                       HostKey:        hostKey,
+                       AuthorizedUser: u.Username,
+                       AuthorizedKeys: []ssh.PublicKey{pubkey},
+               },
+       }
+       inst.sshService.Exec = inst.sshExecFunc
+       go inst.sshService.Start()
+       is.instances = []*instance{inst}
+       return inst, nil
+}
+
+func (is *instanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+       is.mtx.Lock()
+       defer is.mtx.Unlock()
+       var ret []cloud.Instance
+       for _, inst := range is.instances {
+               ret = append(ret, inst)
+       }
+       return ret, nil
+}
+
+func (is *instanceSet) Stop() {
+       is.mtx.Lock()
+       defer is.mtx.Unlock()
+       for _, inst := range is.instances {
+               inst.sshService.Close()
+       }
+}
+
+type instance struct {
+       is           *instanceSet
+       instanceType arvados.InstanceType
+       adminUser    string
+       tags         cloud.InstanceTags
+       hostPubKey   ssh.PublicKey
+       sshService   test.SSHService
+}
+
+func (i *instance) ID() cloud.InstanceID     { return cloud.InstanceID(i.instanceType.ProviderType) }
+func (i *instance) String() string           { return i.instanceType.ProviderType }
+func (i *instance) ProviderType() string     { return i.instanceType.ProviderType }
+func (i *instance) Address() string          { return i.sshService.Address() }
+func (i *instance) RemoteUser() string       { return i.adminUser }
+func (i *instance) Tags() cloud.InstanceTags { return i.tags }
+func (i *instance) SetTags(tags cloud.InstanceTags) error {
+       i.tags = tags
+       return nil
+}
+func (i *instance) Destroy() error {
+       i.is.mtx.Lock()
+       defer i.is.mtx.Unlock()
+       i.is.instances = i.is.instances[:0]
+       return nil
+}
+func (i *instance) VerifyHostKey(pubkey ssh.PublicKey, _ *ssh.Client) error {
+       if !bytes.Equal(pubkey.Marshal(), i.hostPubKey.Marshal()) {
+               return errors.New("host key mismatch")
+       }
+       return nil
+}
+func (i *instance) sshExecFunc(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       cmd := exec.Command("sh", "-c", strings.TrimPrefix(command, "sudo "))
+       cmd.Stdin = stdin
+       cmd.Stdout = stdout
+       cmd.Stderr = stderr
+       for k, v := range env {
+               cmd.Env = append(cmd.Env, k+"="+v)
+       }
+       // Prevent child process from using our tty.
+       cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
+       err := cmd.Run()
+       if err == nil {
+               return 0
+       } else if err, ok := err.(*exec.ExitError); !ok {
+               return 1
+       } else if code := err.ExitCode(); code < 0 {
+               return 1
+       } else {
+               return uint32(code)
+       }
+}
diff --git a/lib/cloud/loopback/loopback_test.go b/lib/cloud/loopback/loopback_test.go
new file mode 100644 (file)
index 0000000..5c30f5f
--- /dev/null
@@ -0,0 +1,127 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package loopback
+
+import (
+       "crypto/rand"
+       "crypto/rsa"
+       "encoding/json"
+       "testing"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+type suite struct{}
+
+var _ = check.Suite(&suite{})
+
+func (*suite) TestCreateListExecDestroy(c *check.C) {
+       logger := ctxlog.TestLogger(c)
+       is, err := Driver.InstanceSet(json.RawMessage("{}"), "testInstanceSetID", cloud.SharedResourceTags{"sharedTag": "sharedTagValue"}, logger)
+       c.Assert(err, check.IsNil)
+
+       clientRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+       c.Assert(err, check.IsNil)
+       clientSSHKey, err := ssh.NewSignerFromKey(clientRSAKey)
+       c.Assert(err, check.IsNil)
+       clientSSHPubKey, err := ssh.NewPublicKey(clientRSAKey.Public())
+       c.Assert(err, check.IsNil)
+
+       it := arvados.InstanceType{
+               Name:         "localhost",
+               ProviderType: "localhost",
+               RAM:          1002003004,
+               VCPUs:        5,
+       }
+
+       // First call to Create should succeed, and the returned
+       // instance's SSH target address should be available in << 1s.
+       inst, err := is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+       c.Assert(err, check.IsNil)
+       for deadline := time.Now().Add(time.Second); inst.Address() == ""; time.Sleep(time.Second / 100) {
+               if deadline.Before(time.Now()) {
+                       c.Fatal("timed out")
+               }
+       }
+
+       // Another call to Create should fail with a quota error.
+       inst2, err := is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+       c.Check(inst2, check.IsNil)
+       qerr, ok := err.(cloud.QuotaError)
+       if c.Check(ok, check.Equals, true, check.Commentf("expect cloud.QuotaError, got %#v", err)) {
+               c.Check(qerr.IsQuotaError(), check.Equals, true)
+       }
+
+       // Instance list should now have one entry, for the new
+       // instance.
+       list, err := is.Instances(nil)
+       c.Assert(err, check.IsNil)
+       c.Assert(list, check.HasLen, 1)
+       inst = list[0]
+       c.Check(inst.String(), check.Equals, "localhost")
+
+       // Instance's SSH server should execute shell commands.
+       exr := sshexecutor.New(inst)
+       exr.SetSigners(clientSSHKey)
+
+       stdout, stderr, err := exr.Execute(nil, "echo ok", nil)
+       c.Check(err, check.IsNil)
+       c.Check(string(stdout), check.Equals, "ok\n")
+       c.Check(string(stderr), check.Equals, "")
+
+       // SSH server should propagate stderr and non-zero exit
+       // status.
+       stdout, stderr, err = exr.Execute(nil, "echo fail && echo -n fail2 >&2 && false", nil)
+       c.Check(err, check.FitsTypeOf, &ssh.ExitError{})
+       c.Check(string(stdout), check.Equals, "fail\n")
+       c.Check(string(stderr), check.Equals, "fail2")
+
+       // SSH server should strip "sudo" from the front of the
+       // command.
+       withoutsudo, _, err := exr.Execute(nil, "whoami", nil)
+       c.Check(err, check.IsNil)
+       withsudo, _, err := exr.Execute(nil, "sudo whoami", nil)
+       c.Check(err, check.IsNil)
+       c.Check(string(withsudo), check.Equals, string(withoutsudo))
+
+       // SSH server should reject keys other than the one whose
+       // public key we passed to Create.
+       badRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+       c.Assert(err, check.IsNil)
+       badSSHKey, err := ssh.NewSignerFromKey(badRSAKey)
+       c.Assert(err, check.IsNil)
+       // Create a new executor here, otherwise Execute would reuse
+       // the existing connection instead of authenticating with
+       // badRSAKey.
+       exr = sshexecutor.New(inst)
+       exr.SetSigners(badSSHKey)
+       stdout, stderr, err = exr.Execute(nil, "true", nil)
+       c.Check(err, check.ErrorMatches, `.*unable to authenticate.*`)
+
+       // Destroying the instance causes it to disappear from the
+       // list, and allows us to create one more.
+       err = inst.Destroy()
+       c.Check(err, check.IsNil)
+       list, err = is.Instances(nil)
+       c.Assert(err, check.IsNil)
+       c.Assert(list, check.HasLen, 0)
+       _, err = is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+       c.Check(err, check.IsNil)
+       _, err = is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+       c.Check(err, check.NotNil)
+       list, err = is.Instances(nil)
+       c.Assert(err, check.IsNil)
+       c.Assert(list, check.HasLen, 1)
+}
index 893542df184c1c0c2ec43f459833512465e97ecc..d965633055dcb1da4d72548168b1d700f83cd541 100644 (file)
@@ -1281,7 +1281,9 @@ Clusters:
         # need to be detected and cleaned up manually.
         TagKeyPrefix: Arvados
 
-        # Cloud driver: "azure" (Microsoft Azure) or "ec2" (Amazon AWS).
+        # Cloud driver: "azure" (Microsoft Azure), "ec2" (Amazon AWS),
+        # or "loopback" (run containers on dispatch host for testing
+        # purposes).
         Driver: ec2
 
         # Cloud-specific driver parameters.
index 8f8ab2bf27312adb76b6597ef1f51f8e21ecb338..abb3a804baab7cb447f9bc55e4bc39d8e8b376ca 100644 (file)
@@ -16,6 +16,7 @@ import (
        "io/ioutil"
        "os"
        "regexp"
+       "runtime"
        "strconv"
        "strings"
        "time"
@@ -25,6 +26,7 @@ import (
        "github.com/imdario/mergo"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
+       "golang.org/x/sys/unix"
 )
 
 //go:embed config.default.yml
@@ -297,7 +299,10 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
                        ldr.loadOldKeepBalanceConfig,
                )
        }
-       loadFuncs = append(loadFuncs, ldr.setImplicitStorageClasses)
+       loadFuncs = append(loadFuncs,
+               ldr.setImplicitStorageClasses,
+               ldr.setLoopbackInstanceType,
+       )
        for _, f := range loadFuncs {
                err = f(&cfg)
                if err != nil {
@@ -414,6 +419,67 @@ func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
        return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
 }
 
+func (ldr *Loader) setLoopbackInstanceType(cfg *arvados.Config) error {
+       for id, cc := range cfg.Clusters {
+               if !cc.Containers.CloudVMs.Enable || cc.Containers.CloudVMs.Driver != "loopback" {
+                       continue
+               }
+               if len(cc.InstanceTypes) == 1 {
+                       continue
+               }
+               if len(cc.InstanceTypes) > 1 {
+                       return fmt.Errorf("Clusters.%s.InstanceTypes: cannot use multiple InstanceTypes with loopback driver", id)
+               }
+               // No InstanceTypes configured. Fill in implicit
+               // default.
+               hostram, err := getHostRAM()
+               if err != nil {
+                       return err
+               }
+               scratch, err := getFilesystemSize(os.TempDir())
+               if err != nil {
+                       return err
+               }
+               cc.InstanceTypes = arvados.InstanceTypeMap{"localhost": {
+                       Name:            "localhost",
+                       ProviderType:    "localhost",
+                       VCPUs:           runtime.NumCPU(),
+                       RAM:             hostram,
+                       Scratch:         scratch,
+                       IncludedScratch: scratch,
+               }}
+               cfg.Clusters[id] = cc
+       }
+       return nil
+}
+
+func getFilesystemSize(path string) (arvados.ByteSize, error) {
+       var stat unix.Statfs_t
+       err := unix.Statfs(path, &stat)
+       if err != nil {
+               return 0, err
+       }
+       return arvados.ByteSize(stat.Blocks * uint64(stat.Bsize)), nil
+}
+
+var reMemTotal = regexp.MustCompile(`(^|\n)MemTotal: *(\d+) kB\n`)
+
+func getHostRAM() (arvados.ByteSize, error) {
+       buf, err := os.ReadFile("/proc/meminfo")
+       if err != nil {
+               return 0, err
+       }
+       m := reMemTotal.FindSubmatch(buf)
+       if m == nil {
+               return 0, errors.New("error parsing /proc/meminfo: no MemTotal")
+       }
+       kb, err := strconv.ParseInt(string(m[2]), 10, 64)
+       if err != nil {
+               return 0, fmt.Errorf("error parsing /proc/meminfo: %q: %w", m[2], err)
+       }
+       return arvados.ByteSize(kb) * 1024, nil
+}
+
 func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
 cluster:
        for id, cc := range cfg.Clusters {
index 4ae9a513c8c0d7874cfcfaaa0aeb47143ade075a..256e8a3e8cb54e18a155e9414b98773e04f8f47a 100644 (file)
@@ -13,6 +13,7 @@ import (
        "os/exec"
        "reflect"
        "regexp"
+       "runtime"
        "strings"
        "testing"
        "time"
@@ -601,6 +602,55 @@ func (s *LoadSuite) TestListKeys(c *check.C) {
        }
 }
 
+func (s *LoadSuite) TestLoopbackInstanceTypes(c *check.C) {
+       ldr := testLoader(c, `
+Clusters:
+ z1111:
+  Containers:
+   CloudVMs:
+    Enable: true
+    Driver: loopback
+  InstanceTypes:
+   a: {}
+   b: {}
+`, nil)
+       cfg, err := ldr.Load()
+       c.Check(err, check.ErrorMatches, `Clusters\.z1111\.InstanceTypes: cannot use multiple InstanceTypes with loopback driver`)
+
+       ldr = testLoader(c, `
+Clusters:
+ z1111:
+  Containers:
+   CloudVMs:
+    Enable: true
+    Driver: loopback
+`, nil)
+       cfg, err = ldr.Load()
+       c.Assert(err, check.IsNil)
+       cc, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       c.Check(cc.InstanceTypes, check.HasLen, 1)
+       c.Check(cc.InstanceTypes["localhost"].VCPUs, check.Equals, runtime.NumCPU())
+
+       ldr = testLoader(c, `
+Clusters:
+ z1111:
+  Containers:
+   CloudVMs:
+    Enable: true
+    Driver: loopback
+  InstanceTypes:
+   a:
+    VCPUs: 9
+`, nil)
+       cfg, err = ldr.Load()
+       c.Assert(err, check.IsNil)
+       cc, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       c.Check(cc.InstanceTypes, check.HasLen, 1)
+       c.Check(cc.InstanceTypes["a"].VCPUs, check.Equals, 9)
+}
+
 func (s *LoadSuite) TestImplicitStorageClasses(c *check.C) {
        // If StorageClasses and Volumes.*.StorageClasses are all
        // empty, there is a default storage class named "default".
@@ -824,3 +874,16 @@ arvados_config_source_timestamp_seconds{sha256="83aea5d82eb1d53372cd65c936c60acc
 `)
        }
 }
+
+func (s *LoadSuite) TestGetHostRAM(c *check.C) {
+       hostram, err := getHostRAM()
+       c.Check(err, check.IsNil)
+       c.Logf("getHostRAM() == %v", hostram)
+}
+
+func (s *LoadSuite) TestGetFilesystemSize(c *check.C) {
+       path := c.MkDir()
+       size, err := getFilesystemSize(path)
+       c.Check(err, check.IsNil)
+       c.Logf("getFilesystemSize(%q) == %v", path, size)
+}
index 44be17c77ffb1d99e4fbeafcbe2a01dd30a82b60..7bf739f902edcbc0ec8a220590dad40ff0302d93 100644 (file)
@@ -11,6 +11,7 @@ import (
        "encoding/json"
        "fmt"
        "io"
+       "io/fs"
        "io/ioutil"
        "math"
        "net"
@@ -71,6 +72,18 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) {
       Insecure: true
     SystemLogs:
       Format: text
+    Containers:
+      CloudVMs:
+        Enable: true
+        Driver: loopback
+        BootProbeCommand: "rm -f /var/lock/crunch-run-broken"
+        ProbeInterval: 1s
+        PollInterval: 5s
+        SyncInterval: 10s
+        TimeoutIdle: 1s
+        TimeoutBooting: 2s
+      RuntimeEngine: singularity
+      CrunchRunArgumentsList: ["--broken-node-hook", "true"]
     RemoteClusters:
       z1111:
         Host: ` + hostport["z1111"] + `
@@ -1111,3 +1124,101 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) {
        c.Check(err, check.ErrorMatches, `request failed: .* 401 Unauthorized: cannot use a locally issued token to forward a request to our login cluster \(z1111\)`)
        c.Check(err, check.Not(check.ErrorMatches), `(?ms).*127\.0\.0\.11.*`)
 }
+
+func (s *IntegrationSuite) TestRunTrivialContainer(c *check.C) {
+       outcoll := s.runContainer(c, "z1111", map[string]interface{}{
+               "command":             []string{"sh", "-c", "touch \"/out/hello world\" /out/ohai"},
+               "container_image":     "busybox:uclibc",
+               "cwd":                 "/tmp",
+               "environment":         map[string]string{},
+               "mounts":              map[string]arvados.Mount{"/out": {Kind: "tmp", Capacity: 10000}},
+               "output_path":         "/out",
+               "runtime_constraints": arvados.RuntimeConstraints{RAM: 100000000, VCPUs: 1},
+               "priority":            1,
+               "state":               arvados.ContainerRequestStateCommitted,
+       }, 0)
+       c.Check(outcoll.ManifestText, check.Matches, `\. d41d8.* 0:0:hello\\040world 0:0:ohai\n`)
+       c.Check(outcoll.PortableDataHash, check.Equals, "8fa5dee9231a724d7cf377c5a2f4907c+65")
+}
+
+func (s *IntegrationSuite) runContainer(c *check.C, clusterID string, ctrSpec map[string]interface{}, expectExitCode int) arvados.Collection {
+       conn := s.super.Conn(clusterID)
+       rootctx, _, _ := s.super.RootClients(clusterID)
+       _, ac, kc, _ := s.super.UserClients(clusterID, rootctx, c, conn, s.oidcprovider.AuthEmail, true)
+
+       c.Log("[docker load]")
+       out, err := exec.Command("docker", "load", "--input", arvadostest.BusyboxDockerImage(c)).CombinedOutput()
+       c.Logf("[docker load done] %s", out)
+       c.Check(err, check.IsNil)
+
+       c.Log("[arv-keepdocker]")
+       akd := exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc")
+       akd.Env = append(os.Environ(), "ARVADOS_API_HOST="+ac.APIHost, "ARVADOS_API_HOST_INSECURE=1", "ARVADOS_API_TOKEN="+ac.AuthToken)
+       c.Logf("[arv-keepdocker env] %q", akd.Env)
+       out, err = akd.CombinedOutput()
+       c.Logf("[arv-keepdocker done] %s", out)
+       c.Check(err, check.IsNil)
+
+       var cr arvados.ContainerRequest
+       err = ac.RequestAndDecode(&cr, "POST", "/arvados/v1/container_requests", nil, map[string]interface{}{
+               "container_request": ctrSpec,
+       })
+       c.Assert(err, check.IsNil)
+
+       showlogs := func(collectionID string) {
+               var logcoll arvados.Collection
+               err = ac.RequestAndDecode(&logcoll, "GET", "/arvados/v1/collections/"+collectionID, nil, nil)
+               c.Assert(err, check.IsNil)
+               cfs, err := logcoll.FileSystem(ac, kc)
+               c.Assert(err, check.IsNil)
+               fs.WalkDir(arvados.FS(cfs), "/", func(path string, d fs.DirEntry, err error) error {
+                       if d.IsDir() || strings.HasPrefix(path, "/log for container") {
+                               return nil
+                       }
+                       f, err := cfs.Open(path)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       buf, err := ioutil.ReadAll(f)
+                       c.Assert(err, check.IsNil)
+                       c.Logf("=== %s\n%s\n", path, buf)
+                       return nil
+               })
+       }
+
+       var ctr arvados.Container
+       var lastState arvados.ContainerState
+       deadline := time.Now().Add(time.Minute)
+wait:
+       for ; ; lastState = ctr.State {
+               err = ac.RequestAndDecode(&ctr, "GET", "/arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
+               c.Assert(err, check.IsNil)
+               switch ctr.State {
+               case lastState:
+                       if time.Now().After(deadline) {
+                               c.Errorf("timed out, container request state is %q", cr.State)
+                               showlogs(ctr.Log)
+                               c.FailNow()
+                       }
+                       time.Sleep(time.Second / 2)
+               case arvados.ContainerStateComplete:
+                       break wait
+               case arvados.ContainerStateQueued, arvados.ContainerStateLocked, arvados.ContainerStateRunning:
+                       c.Logf("container state changed to %q", ctr.State)
+               default:
+                       c.Errorf("unexpected container state %q", ctr.State)
+                       showlogs(ctr.Log)
+                       c.FailNow()
+               }
+       }
+       c.Check(ctr.ExitCode, check.Equals, 0)
+
+       err = ac.RequestAndDecode(&cr, "GET", "/arvados/v1/container_requests/"+cr.UUID, nil, nil)
+       c.Assert(err, check.IsNil)
+
+       showlogs(cr.LogUUID)
+
+       var outcoll arvados.Collection
+       err = ac.RequestAndDecode(&outcoll, "GET", "/arvados/v1/collections/"+cr.OutputUUID, nil, nil)
+       c.Assert(err, check.IsNil)
+       return outcoll
+}
index 3301a6e63be2d5a9210649268efbcf9b08eff204..5b146a6321789ea2a34b63e8a50156876dcea23d 100644 (file)
@@ -7,43 +7,15 @@ package crunchrun
 import (
        "bytes"
        "io"
-       "io/ioutil"
-       "net/http"
-       "os"
        "strings"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "golang.org/x/net/context"
        . "gopkg.in/check.v1"
 )
 
-func busyboxDockerImage(c *C) string {
-       fnm := "busybox_uclibc.tar"
-       cachedir := c.MkDir()
-       cachefile := cachedir + "/" + fnm
-       if _, err := os.Stat(cachefile); err == nil {
-               return cachefile
-       }
-
-       f, err := ioutil.TempFile(cachedir, "")
-       c.Assert(err, IsNil)
-       defer f.Close()
-       defer os.Remove(f.Name())
-
-       resp, err := http.Get("https://cache.arvados.org/" + fnm)
-       c.Assert(err, IsNil)
-       defer resp.Body.Close()
-       _, err = io.Copy(f, resp.Body)
-       c.Assert(err, IsNil)
-       err = f.Close()
-       c.Assert(err, IsNil)
-       err = os.Rename(f.Name(), cachefile)
-       c.Assert(err, IsNil)
-
-       return cachefile
-}
-
 type nopWriteCloser struct{ io.Writer }
 
 func (nopWriteCloser) Close() error { return nil }
@@ -71,7 +43,7 @@ func (s *executorSuite) SetUpTest(c *C) {
                Stdout:      nopWriteCloser{&s.stdout},
                Stderr:      nopWriteCloser{&s.stderr},
        }
-       err := s.executor.LoadImage("", busyboxDockerImage(c), arvados.Container{}, "", nil)
+       err := s.executor.LoadImage("", arvadostest.BusyboxDockerImage(c), arvados.Container{}, "", nil)
        c.Assert(err, IsNil)
 }
 
index ec0893728328153074aba4aecebc6d940663f363..ce92a9b8075c083ebfc13d0748c88b890ad7052d 100644 (file)
@@ -51,7 +51,7 @@ func (s *integrationSuite) SetUpSuite(c *C) {
 
        arvadostest.StartKeep(2, true)
 
-       out, err := exec.Command("docker", "load", "--input", busyboxDockerImage(c)).CombinedOutput()
+       out, err := exec.Command("docker", "load", "--input", arvadostest.BusyboxDockerImage(c)).CombinedOutput()
        c.Log(string(out))
        c.Assert(err, IsNil)
        out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
index 5fcc0903f5d1b43d3b58e83d8e6721832985c4ed..93515defb7d8ebb68c1a5800770c203d52aeaa38 100644 (file)
@@ -11,6 +11,7 @@ import (
        "git.arvados.org/arvados.git/lib/cloud"
        "git.arvados.org/arvados.git/lib/cloud/azure"
        "git.arvados.org/arvados.git/lib/cloud/ec2"
+       "git.arvados.org/arvados.git/lib/cloud/loopback"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
@@ -21,8 +22,9 @@ import (
 // Clusters.*.Containers.CloudVMs.Driver configuration values
 // correspond to keys in this map.
 var Drivers = map[string]cloud.Driver{
-       "azure": azure.Driver,
-       "ec2":   ec2.Driver,
+       "azure":    azure.Driver,
+       "ec2":      ec2.Driver,
+       "loopback": loopback.Driver,
 }
 
 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
index 37e3fa9882ddec9ad9a0623ad62f9b3d75d94433..66e0bfee910a236b46980f2db4b7c30850b3a759 100644 (file)
@@ -13,6 +13,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       mathrand "math/rand"
        "sort"
        "strings"
        "sync"
@@ -774,6 +775,13 @@ func (wp *Pool) runProbes() {
 
        workers := []cloud.InstanceID{}
        for range probeticker.C {
+               // Add some jitter. Without this, if probeInterval is
+               // a multiple of syncInterval and sync is
+               // instantaneous (as with the loopback driver), the
+               // first few probes race with sync operations and
+               // don't update the workers.
+               time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
@@ -900,7 +908,7 @@ func (wp *Pool) loadRunnerData() error {
        }
        wp.runnerData = buf
        wp.runnerMD5 = md5.Sum(buf)
-       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
        return nil
 }
 
index 9e89d7daafc01d05b770fb065f88049dea231a7e..1c8d62c20ee40571e3f1789451e3b46d148abf4f 100644 (file)
@@ -313,6 +313,10 @@ func (wkr *worker) probeAndUpdate() {
                // not yet running when ctrUUIDs was generated. Leave
                // wkr.running alone and wait for the next probe to
                // catch up on any changes.
+               logger.WithFields(logrus.Fields{
+                       "updated":     updated,
+                       "wkr.updated": wkr.updated,
+               }).Debug("skipping worker state update due to probe/sync race")
                return
        }
 
@@ -387,6 +391,11 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
                wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
                return
        }
+       wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       }).Debug("probe succeeded")
        wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
        ok = true
 
index 6a52441d716a2781beb53aa6d2a67004e166a7ea..d2fed1dd7ad4fee9fac408da3e2070c913174b19 100644 (file)
@@ -160,6 +160,9 @@ func (initcmd *initCommand) RunCommand(prog string, args []string, stdin io.Read
       {{end}}
     Containers:
       DispatchPrivateKey: {{printf "%q" .GenerateSSHPrivateKey}}
+      CloudVMs:
+        Enable: true
+        Driver: loopback
     ManagementToken: {{printf "%q" ( .RandomHex 50 )}}
     PostgreSQL:
       Connection:
index 3510a6db048565b019cbdbfaca24d6f5540bf458..d6d75192f01837ee6779fc9c8ea1cea9937c8b7d 100644 (file)
@@ -37,6 +37,7 @@ type Container struct {
        RuntimeAuthScopes         []string               `json:"runtime_auth_scopes"`
        RuntimeToken              string                 `json:"runtime_token"`
        AuthUUID                  string                 `json:"auth_uuid"`
+       Log                       string                 `json:"log"`
 }
 
 // ContainerRequest is an arvados#container_request resource.
diff --git a/sdk/go/arvadostest/busybox_image.go b/sdk/go/arvadostest/busybox_image.go
new file mode 100644 (file)
index 0000000..0411491
--- /dev/null
@@ -0,0 +1,43 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package arvadostest
+
+import (
+       "io"
+       "io/ioutil"
+       "net/http"
+       "os"
+
+       . "gopkg.in/check.v1"
+)
+
+// BusyboxDockerImage downloads the busybox:uclibc docker image
+// (busybox_uclibc.tar) from cache.arvados.org into a temporary file
+// and returns the temporary file name.
+func BusyboxDockerImage(c *C) string {
+       fnm := "busybox_uclibc.tar"
+       cachedir := c.MkDir()
+       cachefile := cachedir + "/" + fnm
+       if _, err := os.Stat(cachefile); err == nil {
+               return cachefile
+       }
+
+       f, err := ioutil.TempFile(cachedir, "")
+       c.Assert(err, IsNil)
+       defer f.Close()
+       defer os.Remove(f.Name())
+
+       resp, err := http.Get("https://cache.arvados.org/" + fnm)
+       c.Assert(err, IsNil)
+       defer resp.Body.Close()
+       _, err = io.Copy(f, resp.Body)
+       c.Assert(err, IsNil)
+       err = f.Close()
+       c.Assert(err, IsNil)
+       err = os.Rename(f.Name(), cachefile)
+       c.Assert(err, IsNil)
+
+       return cachefile
+}
index 330c22be340d78f8362fcb600dc4d936565fdaa1..88923563d4792bb4b72e82c082b0485d26481f97 100644 (file)
@@ -54,6 +54,10 @@ func (s *FederationSuite) SetUpSuite(c *check.C) {
       Insecure: true
     SystemLogs:
       Format: text
+    Containers:
+      CloudVMs:
+        Enable: true
+        Driver: loopback
     RemoteClusters:
       z1111:
         Host: ` + hostport["z1111"] + `