go_ldflags() {
version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}
- echo "-X git.arvados.org/arvados.git/lib/cmd.version=${version} -X main.version=${version}"
+ echo "-X git.arvados.org/arvados.git/lib/cmd.version=${version} -X main.version=${version} -s -w"
}
do_test_once() {
// Install a Rails application's dependencies, including phusion
// passenger.
type installPassenger struct {
- src string
- depends []supervisedTask
+ src string // path to app in source tree
+ varlibdir string // path to app (relative to /var/lib/arvados) in OS package: "railsapi" or "workbench1"
+ depends []supervisedTask
}
func (runner installPassenger) String() string {
passengerInstallMutex.Lock()
defer passengerInstallMutex.Unlock()
+ appdir := runner.src
+ if super.ClusterType == "test" {
+ // In the multi-cluster test setup, if we run multiple
+ // Rails instances directly from the source tree, they
+ // step on one another's files in {source}/tmp, log,
+ // etc. So instead we copy the source directory into a
+ // temp dir and run the Rails app from there.
+ appdir = filepath.Join(super.tempdir, runner.varlibdir)
+ err = super.RunProgram(ctx, super.tempdir, runOptions{}, "mkdir", "-p", appdir)
+ if err != nil {
+ return err
+ }
+ err = super.RunProgram(ctx, filepath.Join(super.SourcePath, runner.src), runOptions{}, "rsync",
+ "-a", "--no-owner", "--no-group", "--delete-after", "--delete-excluded",
+ "--exclude", "/coverage",
+ "--exclude", "/log",
+ "--exclude", "/node_modules",
+ "--exclude", "/tmp",
+ "--exclude", "/public/assets",
+ "--exclude", "/vendor",
+ "--exclude", "/config/environments",
+ "./",
+ appdir+"/")
+ if err != nil {
+ return err
+ }
+ }
+
var buf bytes.Buffer
- err = super.RunProgram(ctx, runner.src, runOptions{output: &buf}, "gem", "list", "--details", "bundler")
+ err = super.RunProgram(ctx, appdir, runOptions{output: &buf}, "gem", "list", "--details", "bundler")
if err != nil {
return err
}
for _, version := range []string{"2.2.19"} {
if !strings.Contains(buf.String(), "("+version+")") {
- err = super.RunProgram(ctx, runner.src, runOptions{}, "gem", "install", "--user", "--conservative", "--no-document", "bundler:2.2.19")
+ err = super.RunProgram(ctx, appdir, runOptions{}, "gem", "install", "--user", "--conservative", "--no-document", "bundler:2.2.19")
if err != nil {
return err
}
break
}
}
- err = super.RunProgram(ctx, runner.src, runOptions{}, "bundle", "install", "--jobs", "4", "--path", filepath.Join(os.Getenv("HOME"), ".gem"))
+ err = super.RunProgram(ctx, appdir, runOptions{}, "bundle", "install", "--jobs", "4", "--path", filepath.Join(os.Getenv("HOME"), ".gem"))
if err != nil {
return err
}
- err = super.RunProgram(ctx, runner.src, runOptions{}, "bundle", "exec", "passenger-config", "build-native-support")
+ err = super.RunProgram(ctx, appdir, runOptions{}, "bundle", "exec", "passenger-config", "build-native-support")
if err != nil {
return err
}
- err = super.RunProgram(ctx, runner.src, runOptions{}, "bundle", "exec", "passenger-config", "install-standalone-runtime")
+ err = super.RunProgram(ctx, appdir, runOptions{}, "bundle", "exec", "passenger-config", "install-standalone-runtime")
if err != nil {
return err
}
- err = super.RunProgram(ctx, runner.src, runOptions{}, "bundle", "exec", "passenger-config", "validate-install")
+ err = super.RunProgram(ctx, appdir, runOptions{}, "bundle", "exec", "passenger-config", "validate-install")
if err != nil && !strings.Contains(err.Error(), "exit status 2") {
// Exit code 2 indicates there were warnings (like
// "other passenger installations have been detected",
type runPassenger struct {
src string // path to app in source tree
- varlibdir string // path to app (relative to /var/lib/arvados) in OS package
+ varlibdir string // path to app (relative to /var/lib/arvados) in OS package: "railsapi" or "workbench1"
svc arvados.Service
depends []supervisedTask
}
return fmt.Errorf("bug: no internalPort for %q: %v (%#v)", runner, err, runner.svc)
}
var appdir string
- if super.ClusterType == "production" {
+ switch super.ClusterType {
+ case "production":
appdir = "/var/lib/arvados/" + runner.varlibdir
- } else {
+ case "test":
+ appdir = filepath.Join(super.tempdir, runner.varlibdir)
+ default:
appdir = runner.src
}
loglevel := "4"
runServiceCommand{name: "keepstore", svc: super.cluster.Services.Keepstore},
runServiceCommand{name: "keep-web", svc: super.cluster.Services.WebDAV},
runServiceCommand{name: "ws", svc: super.cluster.Services.Websocket, depends: []supervisedTask{seedDatabase{}}},
- installPassenger{src: "services/api"},
- runPassenger{src: "services/api", varlibdir: "railsapi", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, seedDatabase{}, installPassenger{src: "services/api"}}},
+ installPassenger{src: "services/api", varlibdir: "railsapi"},
+ runPassenger{src: "services/api", varlibdir: "railsapi", svc: super.cluster.Services.RailsAPI, depends: []supervisedTask{createCertificates{}, seedDatabase{}, installPassenger{src: "services/api", varlibdir: "railsapi"}}},
seedDatabase{},
}
if !super.NoWorkbench1 {
tasks = append(tasks,
- installPassenger{src: "apps/workbench", depends: []supervisedTask{seedDatabase{}}}, // dependency ensures workbench doesn't delay api install/startup
- runPassenger{src: "apps/workbench", varlibdir: "workbench1", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench"}}},
+ installPassenger{src: "apps/workbench", varlibdir: "workbench1", depends: []supervisedTask{seedDatabase{}}}, // dependency ensures workbench doesn't delay api install/startup
+ runPassenger{src: "apps/workbench", varlibdir: "workbench1", svc: super.cluster.Services.Workbench1, depends: []supervisedTask{installPassenger{src: "apps/workbench", varlibdir: "workbench1"}}},
)
}
if !super.NoWorkbench2 {
}
if super.ClusterType != "test" {
tasks = append(tasks,
- runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
runServiceCommand{name: "keep-balance", svc: super.cluster.Services.Keepbalance},
)
}
+ if super.cluster.Containers.CloudVMs.Enable {
+ tasks = append(tasks,
+ runServiceCommand{name: "dispatch-cloud", svc: super.cluster.Services.DispatchCloud},
+ )
+ }
super.tasksReady = map[string]chan bool{}
for _, task := range tasks {
super.tasksReady[task.String()] = make(chan bool)
&super.cluster.Services.Workbench1,
&super.cluster.Services.Workbench2,
} {
- if svc == &super.cluster.Services.DispatchCloud && super.ClusterType == "test" {
- continue
- }
if svc.ExternalURL.Host == "" {
port, err := nextPort(defaultExtHost)
if err != nil {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package loopback
+
+import (
+ "bytes"
+ "crypto/rand"
+ "crypto/rsa"
+ "encoding/json"
+ "errors"
+ "io"
+ "os/exec"
+ "os/user"
+ "strings"
+ "sync"
+ "syscall"
+
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/test"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
+)
+
+// Driver is the loopback implementation of the cloud.Driver interface.
+var Driver = cloud.DriverFunc(newInstanceSet)
+
+var (
+ errUnimplemented = errors.New("function not implemented by loopback driver")
+ errQuota = quotaError("loopback driver is always at quota")
+)
+
+type quotaError string
+
+func (e quotaError) IsQuotaError() bool { return true }
+func (e quotaError) Error() string { return string(e) }
+
+type instanceSet struct {
+ instanceSetID cloud.InstanceSetID
+ logger logrus.FieldLogger
+ instances []*instance
+ mtx sync.Mutex
+}
+
+func newInstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+ is := &instanceSet{
+ instanceSetID: instanceSetID,
+ logger: logger,
+ }
+ return is, nil
+}
+
+func (is *instanceSet) Create(it arvados.InstanceType, _ cloud.ImageID, tags cloud.InstanceTags, _ cloud.InitCommand, pubkey ssh.PublicKey) (cloud.Instance, error) {
+ is.mtx.Lock()
+ defer is.mtx.Unlock()
+ if len(is.instances) > 0 {
+ return nil, errQuota
+ }
+ u, err := user.Current()
+ if err != nil {
+ return nil, err
+ }
+ hostRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+ if err != nil {
+ return nil, err
+ }
+ hostKey, err := ssh.NewSignerFromKey(hostRSAKey)
+ if err != nil {
+ return nil, err
+ }
+ hostPubKey, err := ssh.NewPublicKey(hostRSAKey.Public())
+ if err != nil {
+ return nil, err
+ }
+ inst := &instance{
+ is: is,
+ instanceType: it,
+ adminUser: u.Username,
+ tags: tags,
+ hostPubKey: hostPubKey,
+ sshService: test.SSHService{
+ HostKey: hostKey,
+ AuthorizedUser: u.Username,
+ AuthorizedKeys: []ssh.PublicKey{pubkey},
+ },
+ }
+ inst.sshService.Exec = inst.sshExecFunc
+ go inst.sshService.Start()
+ is.instances = []*instance{inst}
+ return inst, nil
+}
+
+func (is *instanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+ is.mtx.Lock()
+ defer is.mtx.Unlock()
+ var ret []cloud.Instance
+ for _, inst := range is.instances {
+ ret = append(ret, inst)
+ }
+ return ret, nil
+}
+
+func (is *instanceSet) Stop() {
+ is.mtx.Lock()
+ defer is.mtx.Unlock()
+ for _, inst := range is.instances {
+ inst.sshService.Close()
+ }
+}
+
+type instance struct {
+ is *instanceSet
+ instanceType arvados.InstanceType
+ adminUser string
+ tags cloud.InstanceTags
+ hostPubKey ssh.PublicKey
+ sshService test.SSHService
+}
+
+func (i *instance) ID() cloud.InstanceID { return cloud.InstanceID(i.instanceType.ProviderType) }
+func (i *instance) String() string { return i.instanceType.ProviderType }
+func (i *instance) ProviderType() string { return i.instanceType.ProviderType }
+func (i *instance) Address() string { return i.sshService.Address() }
+func (i *instance) RemoteUser() string { return i.adminUser }
+func (i *instance) Tags() cloud.InstanceTags { return i.tags }
+func (i *instance) SetTags(tags cloud.InstanceTags) error {
+ i.tags = tags
+ return nil
+}
+func (i *instance) Destroy() error {
+ i.is.mtx.Lock()
+ defer i.is.mtx.Unlock()
+ i.is.instances = i.is.instances[:0]
+ return nil
+}
+func (i *instance) VerifyHostKey(pubkey ssh.PublicKey, _ *ssh.Client) error {
+ if !bytes.Equal(pubkey.Marshal(), i.hostPubKey.Marshal()) {
+ return errors.New("host key mismatch")
+ }
+ return nil
+}
+func (i *instance) sshExecFunc(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ cmd := exec.Command("sh", "-c", strings.TrimPrefix(command, "sudo "))
+ cmd.Stdin = stdin
+ cmd.Stdout = stdout
+ cmd.Stderr = stderr
+ for k, v := range env {
+ cmd.Env = append(cmd.Env, k+"="+v)
+ }
+ // Prevent child process from using our tty.
+ cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}
+ err := cmd.Run()
+ if err == nil {
+ return 0
+ } else if err, ok := err.(*exec.ExitError); !ok {
+ return 1
+ } else if code := err.ExitCode(); code < 0 {
+ return 1
+ } else {
+ return uint32(code)
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package loopback
+
+import (
+ "crypto/rand"
+ "crypto/rsa"
+ "encoding/json"
+ "testing"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "golang.org/x/crypto/ssh"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+type suite struct{}
+
+var _ = check.Suite(&suite{})
+
+func (*suite) TestCreateListExecDestroy(c *check.C) {
+ logger := ctxlog.TestLogger(c)
+ is, err := Driver.InstanceSet(json.RawMessage("{}"), "testInstanceSetID", cloud.SharedResourceTags{"sharedTag": "sharedTagValue"}, logger)
+ c.Assert(err, check.IsNil)
+
+ clientRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+ c.Assert(err, check.IsNil)
+ clientSSHKey, err := ssh.NewSignerFromKey(clientRSAKey)
+ c.Assert(err, check.IsNil)
+ clientSSHPubKey, err := ssh.NewPublicKey(clientRSAKey.Public())
+ c.Assert(err, check.IsNil)
+
+ it := arvados.InstanceType{
+ Name: "localhost",
+ ProviderType: "localhost",
+ RAM: 1002003004,
+ VCPUs: 5,
+ }
+
+ // First call to Create should succeed, and the returned
+ // instance's SSH target address should be available in << 1s.
+ inst, err := is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+ c.Assert(err, check.IsNil)
+ for deadline := time.Now().Add(time.Second); inst.Address() == ""; time.Sleep(time.Second / 100) {
+ if deadline.Before(time.Now()) {
+ c.Fatal("timed out")
+ }
+ }
+
+ // Another call to Create should fail with a quota error.
+ inst2, err := is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+ c.Check(inst2, check.IsNil)
+ qerr, ok := err.(cloud.QuotaError)
+ if c.Check(ok, check.Equals, true, check.Commentf("expect cloud.QuotaError, got %#v", err)) {
+ c.Check(qerr.IsQuotaError(), check.Equals, true)
+ }
+
+ // Instance list should now have one entry, for the new
+ // instance.
+ list, err := is.Instances(nil)
+ c.Assert(err, check.IsNil)
+ c.Assert(list, check.HasLen, 1)
+ inst = list[0]
+ c.Check(inst.String(), check.Equals, "localhost")
+
+ // Instance's SSH server should execute shell commands.
+ exr := sshexecutor.New(inst)
+ exr.SetSigners(clientSSHKey)
+
+ stdout, stderr, err := exr.Execute(nil, "echo ok", nil)
+ c.Check(err, check.IsNil)
+ c.Check(string(stdout), check.Equals, "ok\n")
+ c.Check(string(stderr), check.Equals, "")
+
+ // SSH server should propagate stderr and non-zero exit
+ // status.
+ stdout, stderr, err = exr.Execute(nil, "echo fail && echo -n fail2 >&2 && false", nil)
+ c.Check(err, check.FitsTypeOf, &ssh.ExitError{})
+ c.Check(string(stdout), check.Equals, "fail\n")
+ c.Check(string(stderr), check.Equals, "fail2")
+
+ // SSH server should strip "sudo" from the front of the
+ // command.
+ withoutsudo, _, err := exr.Execute(nil, "whoami", nil)
+ c.Check(err, check.IsNil)
+ withsudo, _, err := exr.Execute(nil, "sudo whoami", nil)
+ c.Check(err, check.IsNil)
+ c.Check(string(withsudo), check.Equals, string(withoutsudo))
+
+ // SSH server should reject keys other than the one whose
+ // public key we passed to Create.
+ badRSAKey, err := rsa.GenerateKey(rand.Reader, 1024)
+ c.Assert(err, check.IsNil)
+ badSSHKey, err := ssh.NewSignerFromKey(badRSAKey)
+ c.Assert(err, check.IsNil)
+ // Create a new executor here, otherwise Execute would reuse
+ // the existing connection instead of authenticating with
+ // badRSAKey.
+ exr = sshexecutor.New(inst)
+ exr.SetSigners(badSSHKey)
+ stdout, stderr, err = exr.Execute(nil, "true", nil)
+ c.Check(err, check.ErrorMatches, `.*unable to authenticate.*`)
+
+ // Destroying the instance causes it to disappear from the
+ // list, and allows us to create one more.
+ err = inst.Destroy()
+ c.Check(err, check.IsNil)
+ list, err = is.Instances(nil)
+ c.Assert(err, check.IsNil)
+ c.Assert(list, check.HasLen, 0)
+ _, err = is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+ c.Check(err, check.IsNil)
+ _, err = is.Create(it, "testImageID", cloud.InstanceTags{"instanceTag": "instanceTagValue"}, "testInitCommand", clientSSHPubKey)
+ c.Check(err, check.NotNil)
+ list, err = is.Instances(nil)
+ c.Assert(err, check.IsNil)
+ c.Assert(list, check.HasLen, 1)
+}
# need to be detected and cleaned up manually.
TagKeyPrefix: Arvados
- # Cloud driver: "azure" (Microsoft Azure) or "ec2" (Amazon AWS).
+ # Cloud driver: "azure" (Microsoft Azure), "ec2" (Amazon AWS),
+ # or "loopback" (run containers on dispatch host for testing
+ # purposes).
Driver: ec2
# Cloud-specific driver parameters.
"io/ioutil"
"os"
"regexp"
+ "runtime"
"strconv"
"strings"
"time"
"github.com/imdario/mergo"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "golang.org/x/sys/unix"
)
//go:embed config.default.yml
ldr.loadOldKeepBalanceConfig,
)
}
- loadFuncs = append(loadFuncs, ldr.setImplicitStorageClasses)
+ loadFuncs = append(loadFuncs,
+ ldr.setImplicitStorageClasses,
+ ldr.setLoopbackInstanceType,
+ )
for _, f := range loadFuncs {
err = f(&cfg)
if err != nil {
return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
}
+func (ldr *Loader) setLoopbackInstanceType(cfg *arvados.Config) error {
+ for id, cc := range cfg.Clusters {
+ if !cc.Containers.CloudVMs.Enable || cc.Containers.CloudVMs.Driver != "loopback" {
+ continue
+ }
+ if len(cc.InstanceTypes) == 1 {
+ continue
+ }
+ if len(cc.InstanceTypes) > 1 {
+ return fmt.Errorf("Clusters.%s.InstanceTypes: cannot use multiple InstanceTypes with loopback driver", id)
+ }
+ // No InstanceTypes configured. Fill in implicit
+ // default.
+ hostram, err := getHostRAM()
+ if err != nil {
+ return err
+ }
+ scratch, err := getFilesystemSize(os.TempDir())
+ if err != nil {
+ return err
+ }
+ cc.InstanceTypes = arvados.InstanceTypeMap{"localhost": {
+ Name: "localhost",
+ ProviderType: "localhost",
+ VCPUs: runtime.NumCPU(),
+ RAM: hostram,
+ Scratch: scratch,
+ IncludedScratch: scratch,
+ }}
+ cfg.Clusters[id] = cc
+ }
+ return nil
+}
+
+func getFilesystemSize(path string) (arvados.ByteSize, error) {
+ var stat unix.Statfs_t
+ err := unix.Statfs(path, &stat)
+ if err != nil {
+ return 0, err
+ }
+ return arvados.ByteSize(stat.Blocks * uint64(stat.Bsize)), nil
+}
+
+var reMemTotal = regexp.MustCompile(`(^|\n)MemTotal: *(\d+) kB\n`)
+
+func getHostRAM() (arvados.ByteSize, error) {
+ buf, err := os.ReadFile("/proc/meminfo")
+ if err != nil {
+ return 0, err
+ }
+ m := reMemTotal.FindSubmatch(buf)
+ if m == nil {
+ return 0, errors.New("error parsing /proc/meminfo: no MemTotal")
+ }
+ kb, err := strconv.ParseInt(string(m[2]), 10, 64)
+ if err != nil {
+ return 0, fmt.Errorf("error parsing /proc/meminfo: %q: %w", m[2], err)
+ }
+ return arvados.ByteSize(kb) * 1024, nil
+}
+
func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
cluster:
for id, cc := range cfg.Clusters {
"os/exec"
"reflect"
"regexp"
+ "runtime"
"strings"
"testing"
"time"
}
}
+func (s *LoadSuite) TestLoopbackInstanceTypes(c *check.C) {
+ ldr := testLoader(c, `
+Clusters:
+ z1111:
+ Containers:
+ CloudVMs:
+ Enable: true
+ Driver: loopback
+ InstanceTypes:
+ a: {}
+ b: {}
+`, nil)
+ cfg, err := ldr.Load()
+ c.Check(err, check.ErrorMatches, `Clusters\.z1111\.InstanceTypes: cannot use multiple InstanceTypes with loopback driver`)
+
+ ldr = testLoader(c, `
+Clusters:
+ z1111:
+ Containers:
+ CloudVMs:
+ Enable: true
+ Driver: loopback
+`, nil)
+ cfg, err = ldr.Load()
+ c.Assert(err, check.IsNil)
+ cc, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ c.Check(cc.InstanceTypes, check.HasLen, 1)
+ c.Check(cc.InstanceTypes["localhost"].VCPUs, check.Equals, runtime.NumCPU())
+
+ ldr = testLoader(c, `
+Clusters:
+ z1111:
+ Containers:
+ CloudVMs:
+ Enable: true
+ Driver: loopback
+ InstanceTypes:
+ a:
+ VCPUs: 9
+`, nil)
+ cfg, err = ldr.Load()
+ c.Assert(err, check.IsNil)
+ cc, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ c.Check(cc.InstanceTypes, check.HasLen, 1)
+ c.Check(cc.InstanceTypes["a"].VCPUs, check.Equals, 9)
+}
+
func (s *LoadSuite) TestWarnUnusedLocalKeep(c *check.C) {
var logbuf bytes.Buffer
_, err := testLoader(c, `
`)
}
}
+
+func (s *LoadSuite) TestGetHostRAM(c *check.C) {
+ hostram, err := getHostRAM()
+ c.Check(err, check.IsNil)
+ c.Logf("getHostRAM() == %v", hostram)
+}
+
+func (s *LoadSuite) TestGetFilesystemSize(c *check.C) {
+ path := c.MkDir()
+ size, err := getFilesystemSize(path)
+ c.Check(err, check.IsNil)
+ c.Logf("getFilesystemSize(%q) == %v", path, size)
+}
"encoding/json"
"fmt"
"io"
+ "io/fs"
"io/ioutil"
"math"
"net"
Insecure: true
SystemLogs:
Format: text
+ Containers:
+ CloudVMs:
+ Enable: true
+ Driver: loopback
+ BootProbeCommand: "rm -f /var/lock/crunch-run-broken"
+ ProbeInterval: 1s
+ PollInterval: 5s
+ SyncInterval: 10s
+ TimeoutIdle: 1s
+ TimeoutBooting: 2s
+ RuntimeEngine: singularity
+ CrunchRunArgumentsList: ["--broken-node-hook", "true"]
RemoteClusters:
z1111:
Host: ` + hostport["z1111"] + `
c.Check(err, check.ErrorMatches, `request failed: .* 401 Unauthorized: cannot use a locally issued token to forward a request to our login cluster \(z1111\)`)
c.Check(err, check.Not(check.ErrorMatches), `(?ms).*127\.0\.0\.11.*`)
}
+
+func (s *IntegrationSuite) TestRunTrivialContainer(c *check.C) {
+ outcoll := s.runContainer(c, "z1111", map[string]interface{}{
+ "command": []string{"sh", "-c", "touch \"/out/hello world\" /out/ohai"},
+ "container_image": "busybox:uclibc",
+ "cwd": "/tmp",
+ "environment": map[string]string{},
+ "mounts": map[string]arvados.Mount{"/out": {Kind: "tmp", Capacity: 10000}},
+ "output_path": "/out",
+ "runtime_constraints": arvados.RuntimeConstraints{RAM: 100000000, VCPUs: 1},
+ "priority": 1,
+ "state": arvados.ContainerRequestStateCommitted,
+ }, 0)
+ c.Check(outcoll.ManifestText, check.Matches, `\. d41d8.* 0:0:hello\\040world 0:0:ohai\n`)
+ c.Check(outcoll.PortableDataHash, check.Equals, "8fa5dee9231a724d7cf377c5a2f4907c+65")
+}
+
+func (s *IntegrationSuite) runContainer(c *check.C, clusterID string, ctrSpec map[string]interface{}, expectExitCode int) arvados.Collection {
+ conn := s.super.Conn(clusterID)
+ rootctx, _, _ := s.super.RootClients(clusterID)
+ _, ac, kc, _ := s.super.UserClients(clusterID, rootctx, c, conn, s.oidcprovider.AuthEmail, true)
+
+ c.Log("[docker load]")
+ out, err := exec.Command("docker", "load", "--input", arvadostest.BusyboxDockerImage(c)).CombinedOutput()
+ c.Logf("[docker load done] %s", out)
+ c.Check(err, check.IsNil)
+
+ c.Log("[arv-keepdocker]")
+ akd := exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc")
+ akd.Env = append(os.Environ(), "ARVADOS_API_HOST="+ac.APIHost, "ARVADOS_API_HOST_INSECURE=1", "ARVADOS_API_TOKEN="+ac.AuthToken)
+ c.Logf("[arv-keepdocker env] %q", akd.Env)
+ out, err = akd.CombinedOutput()
+ c.Logf("[arv-keepdocker done] %s", out)
+ c.Check(err, check.IsNil)
+
+ var cr arvados.ContainerRequest
+ err = ac.RequestAndDecode(&cr, "POST", "/arvados/v1/container_requests", nil, map[string]interface{}{
+ "container_request": ctrSpec,
+ })
+ c.Assert(err, check.IsNil)
+
+ showlogs := func(collectionID string) {
+ var logcoll arvados.Collection
+ err = ac.RequestAndDecode(&logcoll, "GET", "/arvados/v1/collections/"+collectionID, nil, nil)
+ c.Assert(err, check.IsNil)
+ cfs, err := logcoll.FileSystem(ac, kc)
+ c.Assert(err, check.IsNil)
+ fs.WalkDir(arvados.FS(cfs), "/", func(path string, d fs.DirEntry, err error) error {
+ if d.IsDir() || strings.HasPrefix(path, "/log for container") {
+ return nil
+ }
+ f, err := cfs.Open(path)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ buf, err := ioutil.ReadAll(f)
+ c.Assert(err, check.IsNil)
+ c.Logf("=== %s\n%s\n", path, buf)
+ return nil
+ })
+ }
+
+ var ctr arvados.Container
+ var lastState arvados.ContainerState
+ deadline := time.Now().Add(time.Minute)
+wait:
+ for ; ; lastState = ctr.State {
+ err = ac.RequestAndDecode(&ctr, "GET", "/arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
+ c.Assert(err, check.IsNil)
+ switch ctr.State {
+ case lastState:
+ if time.Now().After(deadline) {
+ c.Errorf("timed out, container request state is %q", cr.State)
+ showlogs(ctr.Log)
+ c.FailNow()
+ }
+ time.Sleep(time.Second / 2)
+ case arvados.ContainerStateComplete:
+ break wait
+ case arvados.ContainerStateQueued, arvados.ContainerStateLocked, arvados.ContainerStateRunning:
+ c.Logf("container state changed to %q", ctr.State)
+ default:
+ c.Errorf("unexpected container state %q", ctr.State)
+ showlogs(ctr.Log)
+ c.FailNow()
+ }
+ }
+ c.Check(ctr.ExitCode, check.Equals, 0)
+
+ err = ac.RequestAndDecode(&cr, "GET", "/arvados/v1/container_requests/"+cr.UUID, nil, nil)
+ c.Assert(err, check.IsNil)
+
+ showlogs(cr.LogUUID)
+
+ var outcoll arvados.Collection
+ err = ac.RequestAndDecode(&outcoll, "GET", "/arvados/v1/collections/"+cr.OutputUUID, nil, nil)
+ c.Assert(err, check.IsNil)
+ return outcoll
+}
"bytes"
"fmt"
"io"
- "io/ioutil"
"net"
"net/http"
"os"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
"golang.org/x/net/context"
. "gopkg.in/check.v1"
)
-func busyboxDockerImage(c *C) string {
- fnm := "busybox_uclibc.tar"
- cachedir := c.MkDir()
- cachefile := cachedir + "/" + fnm
- if _, err := os.Stat(cachefile); err == nil {
- return cachefile
- }
-
- f, err := ioutil.TempFile(cachedir, "")
- c.Assert(err, IsNil)
- defer f.Close()
- defer os.Remove(f.Name())
-
- resp, err := http.Get("https://cache.arvados.org/" + fnm)
- c.Assert(err, IsNil)
- defer resp.Body.Close()
- _, err = io.Copy(f, resp.Body)
- c.Assert(err, IsNil)
- err = f.Close()
- c.Assert(err, IsNil)
- err = os.Rename(f.Name(), cachefile)
- c.Assert(err, IsNil)
-
- return cachefile
-}
-
type nopWriteCloser struct{ io.Writer }
func (nopWriteCloser) Close() error { return nil }
Stdout: nopWriteCloser{&s.stdout},
Stderr: nopWriteCloser{&s.stderr},
}
- err := s.executor.LoadImage("", busyboxDockerImage(c), arvados.Container{}, "", nil)
+ err := s.executor.LoadImage("", arvadostest.BusyboxDockerImage(c), arvados.Container{}, "", nil)
c.Assert(err, IsNil)
}
arvadostest.StartKeep(2, true)
- out, err := exec.Command("docker", "load", "--input", busyboxDockerImage(c)).CombinedOutput()
+ out, err := exec.Command("docker", "load", "--input", arvadostest.BusyboxDockerImage(c)).CombinedOutput()
c.Log(string(out))
c.Assert(err, IsNil)
out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
"git.arvados.org/arvados.git/lib/cloud"
"git.arvados.org/arvados.git/lib/cloud/azure"
"git.arvados.org/arvados.git/lib/cloud/ec2"
+ "git.arvados.org/arvados.git/lib/cloud/loopback"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
// Clusters.*.Containers.CloudVMs.Driver configuration values
// correspond to keys in this map.
var Drivers = map[string]cloud.Driver{
- "azure": azure.Driver,
- "ec2": ec2.Driver,
+ "azure": azure.Driver,
+ "ec2": ec2.Driver,
+ "loopback": loopback.Driver,
}
func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger, reg *prometheus.Registry) (cloud.InstanceSet, error) {
"fmt"
"io"
"io/ioutil"
+ mathrand "math/rand"
"sort"
"strings"
"sync"
workers := []cloud.InstanceID{}
for range probeticker.C {
+ // Add some jitter. Without this, if probeInterval is
+ // a multiple of syncInterval and sync is
+ // instantaneous (as with the loopback driver), the
+ // first few probes race with sync operations and
+ // don't update the workers.
+ time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
workers = workers[:0]
wp.mtx.Lock()
for id, wkr := range wp.workers {
}
wp.runnerData = buf
wp.runnerMD5 = md5.Sum(buf)
- wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+ wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
return nil
}
// not yet running when ctrUUIDs was generated. Leave
// wkr.running alone and wait for the next probe to
// catch up on any changes.
+ logger.WithFields(logrus.Fields{
+ "updated": updated,
+ "wkr.updated": wkr.updated,
+ }).Debug("skipping worker state update due to probe/sync race")
return
}
wkr.wp.mRunProbeDuration.WithLabelValues("fail").Observe(time.Now().Sub(before).Seconds())
return
}
+ wkr.logger.WithFields(logrus.Fields{
+ "Command": cmd,
+ "stdout": string(stdout),
+ "stderr": string(stderr),
+ }).Debug("probe succeeded")
wkr.wp.mRunProbeDuration.WithLabelValues("success").Observe(time.Now().Sub(before).Seconds())
ok = true
{{end}}
Containers:
DispatchPrivateKey: {{printf "%q" .GenerateSSHPrivateKey}}
+ CloudVMs:
+ Enable: true
+ Driver: loopback
ManagementToken: {{printf "%q" ( .RandomHex 50 )}}
PostgreSQL:
Connection:
RuntimeAuthScopes []string `json:"runtime_auth_scopes"`
RuntimeToken string `json:"runtime_token"`
AuthUUID string `json:"auth_uuid"`
+ Log string `json:"log"`
}
// ContainerRequest is an arvados#container_request resource.
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package arvadostest
+
+import (
+ "io"
+ "io/ioutil"
+ "net/http"
+ "os"
+
+ . "gopkg.in/check.v1"
+)
+
+// BusyboxDockerImage downloads the busybox:uclibc docker image
+// (busybox_uclibc.tar) from cache.arvados.org into a temporary file
+// and returns the temporary file name.
+func BusyboxDockerImage(c *C) string {
+ fnm := "busybox_uclibc.tar"
+ cachedir := c.MkDir()
+ cachefile := cachedir + "/" + fnm
+ if _, err := os.Stat(cachefile); err == nil {
+ return cachefile
+ }
+
+ f, err := ioutil.TempFile(cachedir, "")
+ c.Assert(err, IsNil)
+ defer f.Close()
+ defer os.Remove(f.Name())
+
+ resp, err := http.Get("https://cache.arvados.org/" + fnm)
+ c.Assert(err, IsNil)
+ defer resp.Body.Close()
+ _, err = io.Copy(f, resp.Body)
+ c.Assert(err, IsNil)
+ err = f.Close()
+ c.Assert(err, IsNil)
+ err = os.Rename(f.Name(), cachefile)
+ c.Assert(err, IsNil)
+
+ return cachefile
+}
Insecure: true
SystemLogs:
Format: text
+ Containers:
+ CloudVMs:
+ Enable: true
+ Driver: loopback
RemoteClusters:
z1111:
Host: ` + hostport["z1111"] + `