15370: Add dispatchcloud loopback driver.
authorTom Clegg <tom@curii.com>
Wed, 4 May 2022 18:52:22 +0000 (14:52 -0400)
committerTom Clegg <tom@curii.com>
Fri, 13 May 2022 15:20:32 +0000 (11:20 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

15 files changed:
lib/boot/passenger.go
lib/boot/supervisor.go
lib/cloud/loopback/loopback.go [new file with mode: 0644]
lib/cloud/loopback/loopback_test.go [new file with mode: 0644]
lib/config/config.default.yml
lib/config/load.go
lib/config/load_test.go
lib/controller/integration_test.go
lib/crunchrun/executor_test.go
lib/dispatchcloud/driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/worker.go
lib/install/init.go
sdk/go/arvados/container.go
sdk/go/arvadostest/busybox_image.go [new file with mode: 0644]

index 410befab90793e81fac336bbc84bea3d21ad7a94..f86f1f930398f48a133cc33fe4752333246b73cb 100644 (file)
@@ -28,8 +28,8 @@ var railsEnv = []string{
 // Install a Rails application's dependencies, including phusion
 // passenger.
 type installPassenger struct {
-       src       string
-       varlibdir string
+       src       string // path to app in source tree
+       varlibdir string // path to app (relative to /var/lib/arvados) in OS package: "railsapi" or "workbench1"
        depends   []supervisedTask
 }
 
@@ -52,6 +52,11 @@ func (runner installPassenger) Run(ctx context.Context, fail func(error), super
 
        appdir := runner.src
        if super.ClusterType == "test" {
+               // In the multi-cluster test setup, if we run multiple
+               // Rails instances directly from the source tree, they
+               // step on one another's files in {source}/tmp, log,
+               // etc. So instead we copy the source directory into a
+               // temp dir and run the Rails app from there.
                appdir = filepath.Join(super.tempdir, runner.varlibdir)
                err = super.RunProgram(ctx, super.tempdir, runOptions{}, "mkdir", "-p", appdir)
                if err != nil {
@@ -112,7 +117,7 @@ func (runner installPassenger) Run(ctx context.Context, fail func(error), super
 
 type runPassenger struct {
        src       string // path to app in source tree
-       varlibdir string // path to app (relative to /var/lib/arvados) in OS package
+       varlibdir string // path to app (relative to /var/lib/arvados) in OS package: "railsapi" or "workbench1"
        svc       arvados.Service
        depends   []supervisedTask
 }
index 5ec1b22448b68a4c50dd97545cba4c46ca9e6f80..d69cc9f18c522449f5d5b3df1999cc1b270ed16c 100644 (file)
@@ -387,10 +387,14 @@ func (super *Supervisor) runCluster() error {
        }
        if super.ClusterType != "test" {
                tasks = append(tasks,
-                       runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
                        runServiceCommand{name: "keep-balance", svc: super.cluster.Services.Keepbalance},
                )
        }
+       if super.cluster.Containers.CloudVMs.Enable {
+               tasks = append(tasks,
+                       runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
+               )
+       }
        super.tasksReady = map[string]chan bool{}
        for _, task := range tasks {
                super.tasksReady[task.String()] = make(chan bool)
@@ -824,9 +828,6 @@ func (super *Supervisor) autofillConfig() error {
                &super.cluster.Services.Workbench1,
                &super.cluster.Services.Workbench2,
        } {
-               if svc == &super.cluster.Services.DispatchCloud && super.ClusterType == "test" {
-                       continue
-               }
                if svc.ExternalURL.Host == "" {
                        port, err := nextPort(defaultExtHost)
                        if err != nil {
diff --git a/lib/cloud/loopback/loopback.go b/lib/cloud/loopback/loopback.go
new file mode 100644 (file)
index 0000000..6ad4f87
--- /dev/null
@@ -0,0 +1,164 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package loopback
+
+import (
+       "bytes"
+       "crypto/rand"
+       "crypto/rsa"
+       "encoding/json"
+       "errors"
+       "io"
+       "os/exec"
+       "os/user"
+       "strings"
+       "sync"
+       "syscall"
+
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/test"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
+)
+
+// Driver is the loopback implementation of the cloud.Driver interface.
+var Driver = cloud.DriverFunc(newInstanceSet)
+
+var (
+       errUnimplemented = errors.New("function not implemented by loopback driver")
+       errQuota         = quotaError("loopback driver is always at quota")
+)
+
+type quotaError string
+
+func (e quotaError) IsQuotaError() bool { return true }
+func (e quotaError) Error() string      { return string(e) }
+
+type instanceSet struct {
+       instanceSetID cloud.InstanceSetID
+       logger        logrus.FieldLogger
+       instances     []*instance
+       mtx           sync.Mutex
+}
+
+func newInstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+       is := &instanceSet{
+               instanceSetID: instanceSetID,
+               logger:        logger,
+       }
+       return is, nil
+}
+
+func (is *instanceSet) Create(it arvados.InstanceType, _ cloud.ImageID, tags cloud.InstanceTags, _ cloud.InitCommand, pubkey ssh.PublicKey) (cloud.Instance, error) {
+       is.mtx.Lock()
+       defer is.mtx.Unlock()
+       if len(is.instances) > 0 {
+               return nil, errQuota
+       }
+       u, err := user.Current()
+       if err != nil {
+               return nil, err
+       }
+       hostRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+       if err != nil {
+               return nil, err
+       }
+       hostKey, err := ssh.NewSignerFromKey(hostRSAKey)
+       if err != nil {
+               return nil, err
+       }
+       hostPubKey, err := ssh.NewPublicKey(hostRSAKey.Public())
+       if err != nil {
+               return nil, err
+       }
+       inst := &instance{
+               is:           is,
+               instanceType: it,
+               adminUser:    u.Username,
+               tags:         tags,
+               hostPubKey:   hostPubKey,
+               sshService: test.SSHService{
+                       HostKey:        hostKey,
+                       AuthorizedUser: u.Username,
+                       AuthorizedKeys: []ssh.PublicKey{pubkey},
+               },
+       }
+       inst.sshService.Exec = inst.sshExecFunc
+       go inst.sshService.Start()
+       is.instances = []*instance{inst}
+       return inst, nil
+}
+
+func (is *instanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+       is.mtx.Lock()
+       defer is.mtx.Unlock()
+       var ret []cloud.Instance
+       for _, inst := range is.instances {
+               ret = append(ret, inst)
+       }
+       return ret, nil
+}
+
+func (is *instanceSet) Stop() {
+       is.mtx.Lock()
+       defer is.mtx.Unlock()
+       for _, inst := range is.instances {
+               inst.sshService.Close()
+       }
+}
+
+type instance struct {
+       is           *instanceSet
+       instanceType arvados.InstanceType
+       adminUser    string
+       tags         cloud.InstanceTags
+       hostPubKey   ssh.PublicKey
+       sshService   test.SSHService
+}
+
+func (i *instance) ID() cloud.InstanceID     { return cloud.InstanceID(i.instanceType.ProviderType) }
+func (i *instance) String() string           { return i.instanceType.ProviderType }
+func (i *instance) ProviderType() string     { return i.instanceType.ProviderType }
+func (i *instance) Address() string          { return i.sshService.Address() }
+func (i *instance) RemoteUser() string       { return i.adminUser }
+func (i *instance) Tags() cloud.InstanceTags { return i.tags }
+func (i *instance) SetTags(tags cloud.InstanceTags) error {
+       i.tags = tags
+       return nil
+}
+func (i *instance) Destroy() error {
+       i.is.mtx.Lock()
+       defer i.is.mtx.Unlock()
+       i.is.instances = i.is.instances[:0]
+       return nil
+}
+func (i *instance) VerifyHostKey(pubkey ssh.PublicKey, _ *ssh.Client) error {
+       if !bytes.Equal(pubkey.Marshal(), i.hostPubKey.Marshal()) {
+               return errors.New("host key mismatch")
+       }
+       return nil
+}
+func (i *instance) sshExecFunc(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       cmd := exec.Command("sh", "-c", strings.TrimPrefix(command, "sudo "))
+       cmd.Stdin = stdin
+       cmd.Stdout = stdout
+       cmd.Stderr = stderr
+       for k, v := range env {
+               cmd.Env = append(cmd.Env, k+"="+v)
+       }
+       // Prevent child process from using our tty.
+       cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
+       err := cmd.Run()
+       if err == nil {
+               return 0
+       } else if err, ok := err.(*exec.ExitError); !ok {
+               return 1
+       } else if code := err.ExitCode(); code < 0 {
+               return 1
+       } else {
+               return uint32(code)
+       }
+}
diff --git a/lib/cloud/loopback/loopback_test.go b/lib/cloud/loopback/loopback_test.go
new file mode 100644 (file)
index 0000000..5c30f5f
--- /dev/null
@@ -0,0 +1,127 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package loopback
+
+import (
+       "crypto/rand"
+       "crypto/rsa"
+       "encoding/json"
+       "testing"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+type suite struct{}
+
+var _ = check.Suite(&suite{})
+
+func (*suite) TestCreateListExecDestroy(c *check.C) {
+       logger := ctxlog.TestLogger(c)
+       is, err := Driver.InstanceSet(json.RawMessage("{}"), "testInstanceSetID", cloud.SharedResourceTags{"sharedTag": "sharedTagValue"}, logger)
+       c.Assert(err, check.IsNil)
+
+       clientRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+       c.Assert(err, check.IsNil)
+       clientSSHKey, err := ssh.NewSignerFromKey(clientRSAKey)
+       c.Assert(err, check.IsNil)
+       clientSSHPubKey, err := ssh.NewPublicKey(clientRSAKey.Public())
+       c.Assert(err, check.IsNil)
+
+       it := arvados.InstanceType{
+               Name:         "localhost",
+               ProviderType: "localhost",
+               RAM:          1002003004,
+               VCPUs:        5,
+       }
+
+       // First call to Create should succeed, and the returned
+       // instance's SSH target address should be available in << 1s.
+       inst, err := is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+       c.Assert(err, check.IsNil)
+       for deadline := time.Now().Add(time.Second); inst.Address() == ""; time.Sleep(time.Second / 100) {
+               if deadline.Before(time.Now()) {
+                       c.Fatal("timed out")
+               }
+       }
+
+       // Another call to Create should fail with a quota error.
+       inst2, err := is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+       c.Check(inst2, check.IsNil)
+       qerr, ok := err.(cloud.QuotaError)
+       if c.Check(ok, check.Equals, true, check.Commentf("expect cloud.QuotaError, got %#v", err)) {
+               c.Check(qerr.IsQuotaError(), check.Equals, true)
+       }
+
+       // Instance list should now have one entry, for the new
+       // instance.
+       list, err := is.Instances(nil)
+       c.Assert(err, check.IsNil)
+       c.Assert(list, check.HasLen, 1)
+       inst = list[0]
+       c.Check(inst.String(), check.Equals, "localhost")
+
+       // Instance's SSH server should execute shell commands.
+       exr := sshexecutor.New(inst)
+       exr.SetSigners(clientSSHKey)
+
+       stdout, stderr, err := exr.Execute(nil, "echo ok", nil)
+       c.Check(err, check.IsNil)
+       c.Check(string(stdout), check.Equals, "ok\n")
+       c.Check(string(stderr), check.Equals, "")
+
+       // SSH server should propagate stderr and non-zero exit
+       // status.
+       stdout, stderr, err = exr.Execute(nil, "echo fail && echo -n fail2 >&2 && false", nil)
+       c.Check(err, check.FitsTypeOf, &ssh.ExitError{})
+       c.Check(string(stdout), check.Equals, "fail\n")
+       c.Check(string(stderr), check.Equals, "fail2")
+
+       // SSH server should strip "sudo" from the front of the
+       // command.
+       withoutsudo, _, err := exr.Execute(nil, "whoami", nil)
+       c.Check(err, check.IsNil)
+       withsudo, _, err := exr.Execute(nil, "sudo whoami", nil)
+       c.Check(err, check.IsNil)
+       c.Check(string(withsudo), check.Equals, string(withoutsudo))
+
+       // SSH server should reject keys other than the one whose
+       // public key we passed to Create.
+       badRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+       c.Assert(err, check.IsNil)
+       badSSHKey, err := ssh.NewSignerFromKey(badRSAKey)
+       c.Assert(err, check.IsNil)
+       // Create a new executor here, otherwise Execute would reuse
+       // the existing connection instead of authenticating with
+       // badRSAKey.
+       exr = sshexecutor.New(inst)
+       exr.SetSigners(badSSHKey)
+       stdout, stderr, err = exr.Execute(nil, "true", nil)
+       c.Check(err, check.ErrorMatches, `.*unable to authenticate.*`)
+
+       // Destroying the instance causes it to disappear from the
+       // list, and allows us to create one more.
+       err = inst.Destroy()
+       c.Check(err, check.IsNil)
+       list, err = is.Instances(nil)
+       c.Assert(err, check.IsNil)
+       c.Assert(list, check.HasLen, 0)
+       _, err = is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+       c.Check(err, check.IsNil)
+       _, err = is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+       c.Check(err, check.NotNil)
+       list, err = is.Instances(nil)
+       c.Assert(err, check.IsNil)
+       c.Assert(list, check.HasLen, 1)
+}
index e60880c21735de9dc0d1f89da4aefabe43d2474e..b09c1ecb9f0ce8fc7fa8096544d2195f35d640bc 100644 (file)
@@ -1278,7 +1278,9 @@ Clusters:
         # need to be detected and cleaned up manually.
         TagKeyPrefix: Arvados
 
-        # Cloud driver: "azure" (Microsoft Azure) or "ec2" (Amazon AWS).
+        # Cloud driver: "azure" (Microsoft Azure), "ec2" (Amazon AWS),
+        # or "loopback" (run containers on dispatch host for testing
+        # purposes).
         Driver: ec2
 
         # Cloud-specific driver parameters.
index 8f8ab2bf27312adb76b6597ef1f51f8e21ecb338..abb3a804baab7cb447f9bc55e4bc39d8e8b376ca 100644 (file)
@@ -16,6 +16,7 @@ import (
        "io/ioutil"
        "os"
        "regexp"
+       "runtime"
        "strconv"
        "strings"
        "time"
@@ -25,6 +26,7 @@ import (
        "github.com/imdario/mergo"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
+       "golang.org/x/sys/unix"
 )
 
 //go:embed config.default.yml
@@ -297,7 +299,10 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
                        ldr.loadOldKeepBalanceConfig,
                )
        }
-       loadFuncs = append(loadFuncs, ldr.setImplicitStorageClasses)
+       loadFuncs = append(loadFuncs,
+               ldr.setImplicitStorageClasses,
+               ldr.setLoopbackInstanceType,
+       )
        for _, f := range loadFuncs {
                err = f(&cfg)
                if err != nil {
@@ -414,6 +419,67 @@ func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
        return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
 }
 
+func (ldr *Loader) setLoopbackInstanceType(cfg *arvados.Config) error {
+       for id, cc := range cfg.Clusters {
+               if !cc.Containers.CloudVMs.Enable || cc.Containers.CloudVMs.Driver != "loopback" {
+                       continue
+               }
+               if len(cc.InstanceTypes) == 1 {
+                       continue
+               }
+               if len(cc.InstanceTypes) > 1 {
+                       return fmt.Errorf("Clusters.%s.InstanceTypes: cannot use multiple InstanceTypes with loopback driver", id)
+               }
+               // No InstanceTypes configured. Fill in implicit
+               // default.
+               hostram, err := getHostRAM()
+               if err != nil {
+                       return err
+               }
+               scratch, err := getFilesystemSize(os.TempDir())
+               if err != nil {
+                       return err
+               }
+               cc.InstanceTypes = arvados.InstanceTypeMap{"localhost": {
+                       Name:            "localhost",
+                       ProviderType:    "localhost",
+                       VCPUs:           runtime.NumCPU(),
+                       RAM:             hostram,
+                       Scratch:         scratch,
+                       IncludedScratch: scratch,
+               }}
+               cfg.Clusters[id] = cc
+       }
+       return nil
+}
+
+func getFilesystemSize(path string) (arvados.ByteSize, error) {
+       var stat unix.Statfs_t
+       err := unix.Statfs(path, &stat)
+       if err != nil {
+               return 0, err
+       }
+       return arvados.ByteSize(stat.Blocks * uint64(stat.Bsize)), nil
+}
+
+var reMemTotal = regexp.MustCompile(`(^|\n)MemTotal: *(\d+) kB\n`)
+
+func getHostRAM() (arvados.ByteSize, error) {
+       buf, err := os.ReadFile("/proc/meminfo")
+       if err != nil {
+               return 0, err
+       }
+       m := reMemTotal.FindSubmatch(buf)
+       if m == nil {
+               return 0, errors.New("error parsing /proc/meminfo: no MemTotal")
+       }
+       kb, err := strconv.ParseInt(string(m[2]), 10, 64)
+       if err != nil {
+               return 0, fmt.Errorf("error parsing /proc/meminfo: %q: %w", m[2], err)
+       }
+       return arvados.ByteSize(kb) * 1024, nil
+}
+
 func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
 cluster:
        for id, cc := range cfg.Clusters {
index 4ae9a513c8c0d7874cfcfaaa0aeb47143ade075a..256e8a3e8cb54e18a155e9414b98773e04f8f47a 100644 (file)
@@ -13,6 +13,7 @@ import (
        "os/exec"
        "reflect"
        "regexp"
+       "runtime"
        "strings"
        "testing"
        "time"
@@ -601,6 +602,55 @@ func (s *LoadSuite) TestListKeys(c *check.C) {
        }
 }
 
+func (s *LoadSuite) TestLoopbackInstanceTypes(c *check.C) {
+       ldr := testLoader(c, `
+Clusters:
+ z1111:
+  Containers:
+   CloudVMs:
+    Enable: true
+    Driver: loopback
+  InstanceTypes:
+   a: {}
+   b: {}
+`, nil)
+       cfg, err := ldr.Load()
+       c.Check(err, check.ErrorMatches, `Clusters\.z1111\.InstanceTypes: cannot use multiple InstanceTypes with loopback driver`)
+
+       ldr = testLoader(c, `
+Clusters:
+ z1111:
+  Containers:
+   CloudVMs:
+    Enable: true
+    Driver: loopback
+`, nil)
+       cfg, err = ldr.Load()
+       c.Assert(err, check.IsNil)
+       cc, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       c.Check(cc.InstanceTypes, check.HasLen, 1)
+       c.Check(cc.InstanceTypes["localhost"].VCPUs, check.Equals, runtime.NumCPU())
+
+       ldr = testLoader(c, `
+Clusters:
+ z1111:
+  Containers:
+   CloudVMs:
+    Enable: true
+    Driver: loopback
+  InstanceTypes:
+   a:
+    VCPUs: 9
+`, nil)
+       cfg, err = ldr.Load()
+       c.Assert(err, check.IsNil)
+       cc, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       c.Check(cc.InstanceTypes, check.HasLen, 1)
+       c.Check(cc.InstanceTypes["a"].VCPUs, check.Equals, 9)
+}
+
 func (s *LoadSuite) TestImplicitStorageClasses(c *check.C) {
        // If StorageClasses and Volumes.*.StorageClasses are all
        // empty, there is a default storage class named "default".
@@ -824,3 +874,16 @@ arvados_config_source_timestamp_seconds{sha256="83aea5d82eb1d53372cd65c936c60acc
 `)
        }
 }
+
+func (s *LoadSuite) TestGetHostRAM(c *check.C) {
+       hostram, err := getHostRAM()
+       c.Check(err, check.IsNil)
+       c.Logf("getHostRAM() == %v", hostram)
+}
+
+func (s *LoadSuite) TestGetFilesystemSize(c *check.C) {
+       path := c.MkDir()
+       size, err := getFilesystemSize(path)
+       c.Check(err, check.IsNil)
+       c.Logf("getFilesystemSize(%q) == %v", path, size)
+}
index 44be17c77ffb1d99e4fbeafcbe2a01dd30a82b60..d65bb3149e6f0dd425e80d21f53131c5c5813465 100644 (file)
@@ -11,6 +11,7 @@ import (
        "encoding/json"
        "fmt"
        "io"
+       "io/fs"
        "io/ioutil"
        "math"
        "net"
@@ -71,6 +72,17 @@ func (s *IntegrationSuite) SetUpSuite(c *check.C) {
       Insecure: true
     SystemLogs:
       Format: text
+    Containers:
+      CloudVMs:
+        Enable: true
+        Driver: loopback
+        BootProbeCommand: id
+        ProbeInterval: 1s
+        PollInterval: 5s
+        SyncInterval: 10s
+        TimeoutIdle: 1s
+        TimeoutBooting: 2s
+      RuntimeEngine: singularity
     RemoteClusters:
       z1111:
         Host: ` + hostport["z1111"] + `
@@ -1111,3 +1123,96 @@ func (s *IntegrationSuite) TestForwardRuntimeTokenToLoginCluster(c *check.C) {
        c.Check(err, check.ErrorMatches, `request failed: .* 401 Unauthorized: cannot use a locally issued token to forward a request to our login cluster \(z1111\)`)
        c.Check(err, check.Not(check.ErrorMatches), `(?ms).*127\.0\.0\.11.*`)
 }
+
+func (s *IntegrationSuite) TestRunTrivialContainer(c *check.C) {
+       conn1 := s.super.Conn("z1111")
+       rootctx1, _, _ := s.super.RootClients("z1111")
+       _, ac1, kc1, _ := s.super.UserClients("z1111", rootctx1, c, conn1, s.oidcprovider.AuthEmail, true)
+
+       c.Log("[docker load]")
+       out, err := exec.Command("docker", "load", "--input", arvadostest.BusyboxDockerImage(c)).CombinedOutput()
+       c.Logf("[docker load done] %s", out)
+       c.Check(err, check.IsNil)
+
+       c.Log("[arv-keepdocker]")
+       akd := exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc")
+       akd.Env = append(os.Environ(), "ARVADOS_API_HOST="+ac1.APIHost, "ARVADOS_API_HOST_INSECURE=1", "ARVADOS_API_TOKEN="+ac1.AuthToken)
+       c.Logf("[arv-keepdocker env] %q", akd.Env)
+       out, err = akd.CombinedOutput()
+       c.Logf("[arv-keepdocker done] %s", out)
+       c.Check(err, check.IsNil)
+
+       var cr arvados.ContainerRequest
+       err = ac1.RequestAndDecode(&cr, "POST", "/arvados/v1/container_requests", nil, map[string]interface{}{
+               "container_request": map[string]interface{}{
+                       "command":             []string{"touch", "/out/hello_world"},
+                       "container_image":     "busybox:uclibc",
+                       "cwd":                 "/tmp",
+                       "environment":         map[string]string{},
+                       "mounts":              map[string]arvados.Mount{"/out": {Kind: "tmp", Capacity: 10000}},
+                       "output_path":         "/out",
+                       "runtime_constraints": arvados.RuntimeConstraints{RAM: 100000000, VCPUs: 1},
+                       "priority":            1,
+                       "state":               arvados.ContainerRequestStateCommitted,
+               },
+       })
+       c.Assert(err, check.IsNil)
+
+       showlogs := func(collectionID string) {
+               var logcoll arvados.Collection
+               err = ac1.RequestAndDecode(&logcoll, "GET", "/arvados/v1/collections/"+collectionID, nil, nil)
+               c.Assert(err, check.IsNil)
+               cfs, err := logcoll.FileSystem(ac1, kc1)
+               c.Assert(err, check.IsNil)
+               fs.WalkDir(arvados.FS(cfs), "/", func(path string, d fs.DirEntry, err error) error {
+                       if d.IsDir() || strings.HasPrefix(path, "/log for container") {
+                               return nil
+                       }
+                       f, err := cfs.Open(path)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       buf, err := ioutil.ReadAll(f)
+                       c.Assert(err, check.IsNil)
+                       c.Logf("=== %s\n%s\n", path, buf)
+                       return nil
+               })
+       }
+
+       var ctr arvados.Container
+       var lastState arvados.ContainerState
+       deadline := time.Now().Add(time.Minute)
+wait:
+       for ; ; lastState = ctr.State {
+               err = ac1.RequestAndDecode(&ctr, "GET", "/arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
+               c.Assert(err, check.IsNil)
+               switch ctr.State {
+               case lastState:
+                       if time.Now().After(deadline) {
+                               c.Errorf("timed out, container request state is %q", cr.State)
+                               showlogs(ctr.Log)
+                               c.FailNow()
+                       }
+                       time.Sleep(time.Second / 2)
+               case arvados.ContainerStateComplete:
+                       break wait
+               case arvados.ContainerStateQueued, arvados.ContainerStateLocked, arvados.ContainerStateRunning:
+                       c.Logf("container state changed to %q", ctr.State)
+               default:
+                       c.Errorf("unexpected container state %q", ctr.State)
+                       showlogs(ctr.Log)
+                       c.FailNow()
+               }
+       }
+       c.Check(ctr.ExitCode, check.Equals, 0)
+
+       err = ac1.RequestAndDecode(&cr, "GET", "/arvados/v1/container_requests/"+cr.UUID, nil, nil)
+       c.Assert(err, check.IsNil)
+
+       showlogs(cr.LogUUID)
+
+       var outcoll arvados.Collection
+       err = ac1.RequestAndDecode(&outcoll, "GET", "/arvados/v1/collections/"+cr.OutputUUID, nil, nil)
+       c.Assert(err, check.IsNil)
+       c.Check(outcoll.ManifestText, check.Matches, `\. d41d8.* 0:0:hello_world\n`)
+       c.Check(outcoll.PortableDataHash, check.Equals, "dac08d558cfb6c9536f604ca89e3c002+53")
+}
index 99af0530ff35dd55e2163c8726e09699b06d5852..1833fc8ac42b5d931c8118c1273cf52335a96824 100644 (file)
@@ -7,43 +7,15 @@ package crunchrun
 import (
        "bytes"
        "io"
-       "io/ioutil"
-       "net/http"
-       "os"
        "strings"
        "time"
 
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "golang.org/x/net/context"
        . "gopkg.in/check.v1"
 )
 
-func busyboxDockerImage(c *C) string {
-       fnm := "busybox_uclibc.tar"
-       cachedir := c.MkDir()
-       cachefile := cachedir + "/" + fnm
-       if _, err := os.Stat(cachefile); err == nil {
-               return cachefile
-       }
-
-       f, err := ioutil.TempFile(cachedir, "")
-       c.Assert(err, IsNil)
-       defer f.Close()
-       defer os.Remove(f.Name())
-
-       resp, err := http.Get("https://cache.arvados.org/" + fnm)
-       c.Assert(err, IsNil)
-       defer resp.Body.Close()
-       _, err = io.Copy(f, resp.Body)
-       c.Assert(err, IsNil)
-       err = f.Close()
-       c.Assert(err, IsNil)
-       err = os.Rename(f.Name(), cachefile)
-       c.Assert(err, IsNil)
-
-       return cachefile
-}
-
 type nopWriteCloser struct{ io.Writer }
 
 func (nopWriteCloser) Close() error { return nil }
@@ -71,7 +43,7 @@ func (s *executorSuite) SetUpTest(c *C) {
                Stdout:      nopWriteCloser{&s.stdout},
                Stderr:      nopWriteCloser{&s.stderr},
        }
-       err := s.executor.LoadImage("", busyboxDockerImage(c), arvados.Container{}, "", nil)
+       err := s.executor.LoadImage("", arvadostest.BusyboxDockerImage(c), arvados.Container{}, "", nil)
        c.Assert(err, IsNil)
 }
 
index 5fcc0903f5d1b43d3b58e83d8e6721832985c4ed..93515defb7d8ebb68c1a5800770c203d52aeaa38 100644 (file)
@@ -11,6 +11,7 @@ import (
        "git.arvados.org/arvados.git/lib/cloud"
        "git.arvados.org/arvados.git/lib/cloud/azure"
        "git.arvados.org/arvados.git/lib/cloud/ec2"
+       "git.arvados.org/arvados.git/lib/cloud/loopback"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
@@ -21,8 +22,9 @@ import (
 // Clusters.*.Containers.CloudVMs.Driver configuration values
 // correspond to keys in this map.
 var Drivers = map[string]cloud.Driver{
-       "azure": azure.Driver,
-       "ec2":   ec2.Driver,
+       "azure":    azure.Driver,
+       "ec2":      ec2.Driver,
+       "loopback": loopback.Driver,
 }
 
 func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
index 37e3fa9882ddec9ad9a0623ad62f9b3d75d94433..66e0bfee910a236b46980f2db4b7c30850b3a759 100644 (file)
@@ -13,6 +13,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       mathrand "math/rand"
        "sort"
        "strings"
        "sync"
@@ -774,6 +775,13 @@ func (wp *Pool) runProbes() {
 
        workers := []cloud.InstanceID{}
        for range probeticker.C {
+               // Add some jitter. Without this, if probeInterval is
+               // a multiple of syncInterval and sync is
+               // instantaneous (as with the loopback driver), the
+               // first few probes race with sync operations and
+               // don't update the workers.
+               time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
                workers = workers[:0]
                wp.mtx.Lock()
                for id, wkr := range wp.workers {
@@ -900,7 +908,7 @@ func (wp *Pool) loadRunnerData() error {
        }
        wp.runnerData = buf
        wp.runnerMD5 = md5.Sum(buf)
-       wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+       wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
        return nil
 }
 
index 9e89d7daafc01d05b770fb065f88049dea231a7e..1c8d62c20ee40571e3f1789451e3b46d148abf4f 100644 (file)
@@ -313,6 +313,10 @@ func (wkr *worker) probeAndUpdate() {
                // not yet running when ctrUUIDs was generated. Leave
                // wkr.running alone and wait for the next probe to
                // catch up on any changes.
+               logger.WithFields(logrus.Fields{
+                       "updated":     updated,
+                       "wkr.updated": wkr.updated,
+               }).Debug("skipping worker state update due to probe/sync race")
                return
        }
 
@@ -387,6 +391,11 @@ func (wkr *worker) probeRunning() (running []string, reportsBroken, ok bool) {
                wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
                return
        }
+       wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       }).Debug("probe succeeded")
        wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
        ok = true
 
index 6a52441d716a2781beb53aa6d2a67004e166a7ea..d2fed1dd7ad4fee9fac408da3e2070c913174b19 100644 (file)
@@ -160,6 +160,9 @@ func (initcmd *initCommand) RunCommand(prog string, args []string, stdin io.Read
       {{end}}
     Containers:
       DispatchPrivateKey: {{printf "%q" .GenerateSSHPrivateKey}}
+      CloudVMs:
+        Enable: true
+        Driver: loopback
     ManagementToken: {{printf "%q" ( .RandomHex 50 )}}
     PostgreSQL:
       Connection:
index 3510a6db048565b019cbdbfaca24d6f5540bf458..d6d75192f01837ee6779fc9c8ea1cea9937c8b7d 100644 (file)
@@ -37,6 +37,7 @@ type Container struct {
        RuntimeAuthScopes         []string               `json:"runtime_auth_scopes"`
        RuntimeToken              string                 `json:"runtime_token"`
        AuthUUID                  string                 `json:"auth_uuid"`
+       Log                       string                 `json:"log"`
 }
 
 // ContainerRequest is an arvados#container_request resource.
diff --git a/sdk/go/arvadostest/busybox_image.go b/sdk/go/arvadostest/busybox_image.go
new file mode 100644 (file)
index 0000000..0411491
--- /dev/null
@@ -0,0 +1,43 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package arvadostest
+
+import (
+       "io"
+       "io/ioutil"
+       "net/http"
+       "os"
+
+       . "gopkg.in/check.v1"
+)
+
+// BusyboxDockerImage downloads the busybox:uclibc docker image
+// (busybox_uclibc.tar) from cache.arvados.org into a temporary file
+// and returns the temporary file name.
+func BusyboxDockerImage(c *C) string {
+       fnm := "busybox_uclibc.tar"
+       cachedir := c.MkDir()
+       cachefile := cachedir + "/" + fnm
+       if _, err := os.Stat(cachefile); err == nil {
+               return cachefile
+       }
+
+       f, err := ioutil.TempFile(cachedir, "")
+       c.Assert(err, IsNil)
+       defer f.Close()
+       defer os.Remove(f.Name())
+
+       resp, err := http.Get("https://cache.arvados.org/" + fnm)
+       c.Assert(err, IsNil)
+       defer resp.Body.Close()
+       _, err = io.Copy(f, resp.Body)
+       c.Assert(err, IsNil)
+       err = f.Close()
+       c.Assert(err, IsNil)
+       err = os.Rename(f.Name(), cachefile)
+       c.Assert(err, IsNil)
+
+       return cachefile
+}