14807: Merge branch 'master'
authorTom Clegg <tclegg@veritasgenetics.com>
Wed, 20 Feb 2019 16:49:18 +0000 (11:49 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Wed, 20 Feb 2019 16:49:18 +0000 (11:49 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

35 files changed:
build/run-tests.sh
cmd/arvados-server/arvados-dispatch-cloud.service
lib/cloud/azure/azure.go
lib/cloud/azure/azure_test.go
lib/cloud/interfaces.go
lib/controller/cmd.go
lib/controller/federation_test.go
lib/controller/handler_test.go
lib/controller/server_test.go
lib/dispatchcloud/cmd.go
lib/dispatchcloud/container/queue.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/dispatchcloud/scheduler/fix_stale_locks.go
lib/dispatchcloud/scheduler/run_queue_test.go
lib/dispatchcloud/scheduler/scheduler.go
lib/dispatchcloud/scheduler/sync.go
lib/dispatchcloud/ssh_executor/executor.go
lib/dispatchcloud/ssh_executor/executor_test.go
lib/dispatchcloud/test/logger.go [deleted file]
lib/dispatchcloud/test/ssh_service.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/pool_test.go
lib/dispatchcloud/worker/verify.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go
lib/dispatchcloud/worker/worker_test.go
lib/service/cmd.go
sdk/go/arvados/config.go
sdk/go/ctxlog/log.go
services/api/app/models/container.rb
services/api/test/unit/container_test.rb
services/crunch-run/background.go
services/crunch-run/crunchrun.go
services/ws/session_v0.go

index 9919c3e1751766892b72bcc806170b61f43e5999..caaca1f31e51677c3881dbf82ea9197ff53660c2 100755 (executable)
@@ -537,7 +537,6 @@ export GOPATH
 (
     set -e
     mkdir -p "$GOPATH/src/git.curoverse.com"
-    rmdir -v --parents --ignore-fail-on-non-empty "${temp}/GOPATH"
     if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
         for d in \
             "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
index 5ea5d45e79c4940ee2ef83230988f4641c57c2b8..aa5cc3b4a5d033c3c163b09ad9a4ad411c3237b4 100644 (file)
@@ -3,7 +3,7 @@
 # SPDX-License-Identifier: AGPL-3.0
 
 [Unit]
-Description=Arvados cloud dispatch
+Description=arvados-dispatch-cloud
 Documentation=https://doc.arvados.org/
 After=network.target
 AssertPathExists=/etc/arvados/config.yml
index d745e7e54d27473147e3f214a213a4514a596131..8ae8a44811529f598cb8ecee0044919b76bf467e 100644 (file)
@@ -47,8 +47,11 @@ type azureInstanceSetConfig struct {
        StorageAccount               string
        BlobContainer                string
        DeleteDanglingResourcesAfter arvados.Duration
+       AdminUsername                string
 }
 
+const tagKeyInstanceSecret = "InstanceSecret"
+
 type virtualMachinesClientWrapper interface {
        createOrUpdate(ctx context.Context,
                resourceGroupName string,
@@ -311,15 +314,12 @@ func (az *azureInstanceSet) Create(
        instanceType arvados.InstanceType,
        imageID cloud.ImageID,
        newTags cloud.InstanceTags,
+       initCommand cloud.InitCommand,
        publicKey ssh.PublicKey) (cloud.Instance, error) {
 
        az.stopWg.Add(1)
        defer az.stopWg.Done()
 
-       if len(newTags["node-token"]) == 0 {
-               return nil, fmt.Errorf("Must provide tag 'node-token'")
-       }
-
        name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
        if err != nil {
                return nil, err
@@ -336,8 +336,6 @@ func (az *azureInstanceSet) Create(
                tags["dispatch-"+k] = &newstr
        }
 
-       tags["dispatch-instance-type"] = &instanceType.Name
-
        nicParameters := network.Interface{
                Location: &az.azconfig.Location,
                Tags:     tags,
@@ -371,8 +369,7 @@ func (az *azureInstanceSet) Create(
                az.azconfig.BlobContainer,
                name)
 
-       customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
-echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
+       customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
 
        vmParameters := compute.VirtualMachine{
                Location: &az.azconfig.Location,
@@ -406,13 +403,13 @@ echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
                        },
                        OsProfile: &compute.OSProfile{
                                ComputerName:  &name,
-                               AdminUsername: to.StringPtr("crunch"),
+                               AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
                                LinuxConfiguration: &compute.LinuxConfiguration{
                                        DisablePasswordAuthentication: to.BoolPtr(true),
                                        SSH: &compute.SSHConfiguration{
                                                PublicKeys: &[]compute.SSHPublicKey{
-                                                       compute.SSHPublicKey{
-                                                               Path:    to.StringPtr("/home/crunch/.ssh/authorized_keys"),
+                                                       {
+                                                               Path:    to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
                                                                KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
                                                        },
                                                },
@@ -494,8 +491,8 @@ func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
                                if result.Value().Tags["created-at"] != nil {
                                        createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
                                        if err == nil {
-                                               if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
-                                                       az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
+                                               if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
+                                                       az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
                                                        az.deleteNIC <- *result.Value().Name
                                                }
                                        }
@@ -634,63 +631,10 @@ func (ai *azureInstance) Address() string {
        return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
 }
 
-func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
-       ai.provider.stopWg.Add(1)
-       defer ai.provider.stopWg.Done()
-
-       remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
-
-       tags := ai.Tags()
-
-       tg := tags["ssh-pubkey-fingerprint"]
-       if tg != "" {
-               if remoteFingerprint == tg {
-                       return nil
-               }
-               return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
-       }
-
-       nodetokenTag := tags["node-token"]
-       if nodetokenTag == "" {
-               return fmt.Errorf("Missing node token tag")
-       }
-
-       sess, err := client.NewSession()
-       if err != nil {
-               return err
-       }
-
-       nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
-       if err != nil {
-               return err
-       }
-
-       nodetoken := strings.TrimSpace(string(nodetokenbytes))
-
-       expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
-
-       if strings.TrimSpace(nodetoken) != expectedToken {
-               return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
-       }
-
-       sess, err = client.NewSession()
-       if err != nil {
-               return err
-       }
-
-       keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
-       if err != nil {
-               return err
-       }
-
-       sp := strings.Split(string(keyfingerprintbytes), " ")
-
-       if remoteFingerprint != sp[1] {
-               return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
-       }
+func (ai *azureInstance) RemoteUser() string {
+       return ai.provider.azconfig.AdminUsername
+}
 
-       tags["ssh-pubkey-fingerprint"] = sp[1]
-       delete(tags, "node-token")
-       ai.SetTags(tags)
-       return nil
+func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+       return cloud.ErrNotImplemented
 }
index 850a3fb4270fd9c5da7b7829e9f00fa27c630a49..61649c39800d7ad8e31252313285e0e85fe7b16c 100644 (file)
@@ -25,6 +25,7 @@
 //      StorageAccount: example
 //      BlobContainer: vhds
 //      DeleteDanglingResourcesAfter: 20s
+//      AdminUsername: crunch
 
 package azure
 
@@ -50,7 +51,6 @@ import (
        "github.com/Azure/go-autorest/autorest"
        "github.com/Azure/go-autorest/autorest/azure"
        "github.com/Azure/go-autorest/autorest/to"
-       "github.com/jmcvetta/randutil"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
        check "gopkg.in/check.v1"
@@ -160,18 +160,16 @@ func (*AzureInstanceSetSuite) TestCreate(c *check.C) {
        pk, _, _, _, err := ssh.ParseAuthorizedKey(testKey)
        c.Assert(err, check.IsNil)
 
-       nodetoken, err := randutil.String(40, "abcdefghijklmnopqrstuvwxyz0123456789")
-       c.Assert(err, check.IsNil)
-
        inst, err := ap.Create(cluster.InstanceTypes["tiny"],
                img, map[string]string{
-                       "node-token": nodetoken},
-               pk)
+                       "TestTagName": "test tag value",
+               }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
 
        c.Assert(err, check.IsNil)
 
-       tg := inst.Tags()
-       log.Printf("Result %v %v %v", inst.String(), inst.Address(), tg)
+       tags := inst.Tags()
+       c.Check(tags["TestTagName"], check.Equals, "test tag value")
+       c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
 
 }
 
@@ -306,19 +304,22 @@ func (*AzureInstanceSetSuite) TestSSH(c *check.C) {
        c.Assert(err, check.IsNil)
 
        if len(l) > 0 {
-
                sshclient, err := SetupSSHClient(c, l[0])
                c.Assert(err, check.IsNil)
+               defer sshclient.Conn.Close()
 
                sess, err := sshclient.NewSession()
                c.Assert(err, check.IsNil)
-
-               out, err := sess.Output("cat /home/crunch/node-token")
+               defer sess.Close()
+               _, err = sess.Output("find /var/run/test-file -maxdepth 0 -user root -perm 0600")
                c.Assert(err, check.IsNil)
 
-               log.Printf("%v", string(out))
-
-               sshclient.Conn.Close()
+               sess, err = sshclient.NewSession()
+               c.Assert(err, check.IsNil)
+               defer sess.Close()
+               out, err := sess.Output("sudo cat /var/run/test-file")
+               c.Assert(err, check.IsNil)
+               c.Check(string(out), check.Equals, "test-file-data")
        }
 }
 
index 46a2c1682404ec067bc7c332a77c469f0b353d57..792e737a914a1ce7d39d98c05c1a9428e77fb1ff 100644 (file)
@@ -6,6 +6,7 @@ package cloud
 
 import (
        "encoding/json"
+       "errors"
        "io"
        "time"
 
@@ -57,17 +58,25 @@ type Executor interface {
        Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
 }
 
+var ErrNotImplemented = errors.New("not implemented")
+
 // An ExecutorTarget is a remote command execution service.
 type ExecutorTarget interface {
        // SSH server hostname or IP address, or empty string if
        // unknown while instance is booting.
        Address() string
 
+       // Remote username to send during SSH authentication.
+       RemoteUser() string
+
        // Return nil if the given public key matches the instance's
        // SSH server key. If the provided Dialer is not nil,
        // VerifyHostKey can use it to make outgoing network
        // connections from the instance -- e.g., to use the cloud's
        // "this instance's metadata" API.
+       //
+       // Return ErrNotImplemented if no verification mechanism is
+       // available.
        VerifyHostKey(ssh.PublicKey, *ssh.Client) error
 }
 
@@ -102,12 +111,18 @@ type Instance interface {
 // All public methods of an InstanceSet, and all public methods of the
 // instances it returns, are goroutine safe.
 type InstanceSet interface {
-       // Create a new instance. If supported by the driver, add the
+       // Create a new instance with the given type, image, and
+       // initial set of tags. If supported by the driver, add the
        // provided public key to /root/.ssh/authorized_keys.
        //
+       // The given InitCommand should be executed on the newly
+       // created instance. This is optional for a driver whose
+       // instances' VerifyHostKey() method never returns
+       // ErrNotImplemented. InitCommand will be under 1 KiB.
+       //
        // The returned error should implement RateLimitError and
        // QuotaError where applicable.
-       Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+       Create(arvados.InstanceType, ImageID, InstanceTags, InitCommand, ssh.PublicKey) (Instance, error)
 
        // Return all instances, including ones that are booting or
        // shutting down. Optionally, filter out nodes that don't have
@@ -125,6 +140,8 @@ type InstanceSet interface {
        Stop()
 }
 
+type InitCommand string
+
 // A Driver returns an InstanceSet that uses the given InstanceSetID
 // and driver-dependent configuration parameters.
 //
index 94eb2580bd759c9c781dc2346353440124207899..c1d4657ba47b7801b63bad0222e4a2df71f7881d 100644 (file)
@@ -5,6 +5,8 @@
 package controller
 
 import (
+       "context"
+
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/service"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -12,6 +14,6 @@ import (
 
 var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
 
-func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
        return &Handler{Cluster: cluster, NodeProfile: np}
 }
index d49d16a35eee1be2fe3cbe0f765921316fdafa64..62916acd2ac10be14d90d4e02e2703e77949e32b 100644 (file)
@@ -19,6 +19,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/sirupsen/logrus"
@@ -29,7 +30,7 @@ import (
 var _ = check.Suite(&FederationSuite{})
 
 type FederationSuite struct {
-       log *logrus.Logger
+       log logrus.FieldLogger
        // testServer and testHandler are the controller being tested,
        // "zhome".
        testServer  *httpserver.Server
@@ -44,9 +45,7 @@ type FederationSuite struct {
 }
 
 func (s *FederationSuite) SetUpTest(c *check.C) {
-       s.log = logrus.New()
-       s.log.Formatter = &logrus.JSONFormatter{}
-       s.log.Out = &logWriter{c.Log}
+       s.log = ctxlog.TestLogger(c)
 
        s.remoteServer = newServerFromIntegrationTestEnv(c)
        c.Assert(s.remoteServer.Start(), check.IsNil)
index f11228a31350b93f2da70a7b5ab46b8926a47b06..dfe60d90a5f3119909658149b1017f3b782515f3 100644 (file)
@@ -5,6 +5,7 @@
 package controller
 
 import (
+       "context"
        "encoding/json"
        "net/http"
        "net/http/httptest"
@@ -16,6 +17,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        check "gopkg.in/check.v1"
 )
@@ -30,9 +32,13 @@ var _ = check.Suite(&HandlerSuite{})
 type HandlerSuite struct {
        cluster *arvados.Cluster
        handler http.Handler
+       ctx     context.Context
+       cancel  context.CancelFunc
 }
 
 func (s *HandlerSuite) SetUpTest(c *check.C) {
+       s.ctx, s.cancel = context.WithCancel(context.Background())
+       s.ctx = ctxlog.Context(s.ctx, ctxlog.New(os.Stderr, "json", "debug"))
        s.cluster = &arvados.Cluster{
                ClusterID:  "zzzzz",
                PostgreSQL: integrationTestCluster().PostgreSQL,
@@ -44,7 +50,11 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
                },
        }
        node := s.cluster.NodeProfiles["*"]
-       s.handler = newHandler(s.cluster, &node)
+       s.handler = newHandler(s.ctx, s.cluster, &node)
+}
+
+func (s *HandlerSuite) TearDownTest(c *check.C) {
+       s.cancel()
 }
 
 func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
index 95f17e79e6d8370e8c211f928264dff9869189c2..ae89c3d7ea4d073fa44885f193af138f81b85508 100644 (file)
@@ -5,28 +5,16 @@
 package controller
 
 import (
-       "bytes"
        "net/http"
        "os"
        "path/filepath"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
-// logWriter is an io.Writer that writes by calling a "write log"
-// function, typically (*check.C)Log().
-type logWriter struct {
-       logfunc func(...interface{})
-}
-
-func (tl *logWriter) Write(buf []byte) (int, error) {
-       tl.logfunc(string(bytes.TrimRight(buf, "\n")))
-       return len(buf), nil
-}
-
 func integrationTestCluster() *arvados.Cluster {
        cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
        if err != nil {
@@ -42,9 +30,7 @@ func integrationTestCluster() *arvados.Cluster {
 // Return a new unstarted controller server, using the Rails API
 // provided by the integration-testing environment.
 func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
-       log := logrus.New()
-       log.Formatter = &logrus.JSONFormatter{}
-       log.Out = &logWriter{c.Log}
+       log := ctxlog.TestLogger(c)
 
        nodeProfile := arvados.NodeProfile{
                Controller: arvados.SystemServiceInstance{Listen: ":"},
index 92948fb300e703971e59957b4f6f98db176a42ef..7231e839475639c2aa5e6c720091c15b4d4b5ed7 100644 (file)
@@ -5,6 +5,8 @@
 package dispatchcloud
 
 import (
+       "context"
+
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/service"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -12,8 +14,8 @@ import (
 
 var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
 
-func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
-       d := &dispatcher{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
+       d := &dispatcher{Cluster: cluster, Context: ctx}
        go d.Start()
        return d
 }
index 297782c35b9a972668fd7623d251e3411b26389b..bbe47625a893d6874d2c3c415952948f290de74f 100644 (file)
@@ -131,7 +131,7 @@ func (cq *Queue) Forget(uuid string) {
        defer cq.mtx.Unlock()
        ctr := cq.current[uuid].Container
        if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
-               delete(cq.current, uuid)
+               cq.delEnt(uuid, ctr.State)
        }
 }
 
@@ -196,7 +196,7 @@ func (cq *Queue) Update() error {
                        cq.current[uuid] = cur
                }
        }
-       for uuid := range cq.current {
+       for uuid, ent := range cq.current {
                if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
                        // Don't expunge an entry that was
                        // added/updated locally after we started
@@ -207,7 +207,7 @@ func (cq *Queue) Update() error {
                        // the poll response (evidently it's
                        // cancelled, completed, deleted, or taken by
                        // a different dispatcher).
-                       delete(cq.current, uuid)
+                       cq.delEnt(uuid, ent.Container.State)
                }
        }
        cq.dontupdate = nil
@@ -216,6 +216,15 @@ func (cq *Queue) Update() error {
        return nil
 }
 
+// Caller must have lock.
+func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
+       cq.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "State":         state,
+       }).Info("dropping container from queue")
+       delete(cq.current, uuid)
+}
+
 func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
        it, err := cq.chooseType(&ctr)
        if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
@@ -269,6 +278,12 @@ func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
                }()
                return
        }
+       cq.logger.WithFields(logrus.Fields{
+               "ContainerUUID": ctr.UUID,
+               "State":         ctr.State,
+               "Priority":      ctr.Priority,
+               "InstanceType":  it.Name,
+       }).Info("adding container to queue")
        cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
 }
 
index 2d73afcd2624ed1e55041f25eda9e91fe0f8f8ed..adf1028b35fe16ab13afbfcb4f0c91672ec17849 100644 (file)
@@ -5,6 +5,7 @@
 package dispatchcloud
 
 import (
+       "context"
        "crypto/md5"
        "encoding/json"
        "fmt"
@@ -20,6 +21,7 @@ import (
        "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/julienschmidt/httprouter"
        "github.com/prometheus/client_golang/prometheus"
@@ -42,6 +44,7 @@ type pool interface {
 
 type dispatcher struct {
        Cluster       *arvados.Cluster
+       Context       context.Context
        InstanceSetID cloud.InstanceSetID
 
        logger      logrus.FieldLogger
@@ -116,7 +119,7 @@ func (disp *dispatcher) initialize() {
        }
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
-       disp.logger = logrus.StandardLogger()
+       disp.logger = ctxlog.FromContext(disp.Context)
 
        if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
                disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
@@ -130,7 +133,7 @@ func (disp *dispatcher) initialize() {
        }
        disp.instanceSet = instanceSet
        disp.reg = prometheus.NewRegistry()
-       disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+       disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
 
        if disp.Cluster.ManagementToken == "" {
@@ -141,9 +144,9 @@ func (disp *dispatcher) initialize() {
                mux := httprouter.New()
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
                mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
-               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
-               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
-               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
+               mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
                metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
                        ErrorLog: disp.logger,
                })
@@ -166,7 +169,7 @@ func (disp *dispatcher) run() {
        if pollInterval <= 0 {
                pollInterval = defaultPollInterval
        }
-       sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+       sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
        sched.Start()
        defer sched.Stop()
 
@@ -210,8 +213,11 @@ func (disp *dispatcher) apiInstanceRun(w http.ResponseWriter, r *http.Request) {
 }
 
 func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
-       params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
-       id := cloud.InstanceID(params.ByName("instance_id"))
+       id := cloud.InstanceID(r.FormValue("instance_id"))
+       if id == "" {
+               httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+               return
+       }
        err := disp.pool.SetIdleBehavior(id, want)
        if err != nil {
                httpserver.Error(w, err.Error(), http.StatusNotFound)
index 0558d79f1a5b95737ab61958e467a7ee73177df9..36b06020748f43f5f4c7bbdefb5302935dedb861 100644 (file)
@@ -5,6 +5,7 @@
 package dispatchcloud
 
 import (
+       "context"
        "encoding/json"
        "io/ioutil"
        "math/rand"
@@ -16,6 +17,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "golang.org/x/crypto/ssh"
        check "gopkg.in/check.v1"
 )
@@ -23,12 +25,16 @@ import (
 var _ = check.Suite(&DispatcherSuite{})
 
 type DispatcherSuite struct {
+       ctx        context.Context
+       cancel     context.CancelFunc
        cluster    *arvados.Cluster
        stubDriver *test.StubDriver
        disp       *dispatcher
 }
 
 func (s *DispatcherSuite) SetUpTest(c *check.C) {
+       s.ctx, s.cancel = context.WithCancel(context.Background())
+       s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
        dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
        dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
        c.Assert(err, check.IsNil)
@@ -73,13 +79,17 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                        },
                },
        }
-       s.disp = &dispatcher{Cluster: s.cluster}
+       s.disp = &dispatcher{
+               Cluster: s.cluster,
+               Context: s.ctx,
+       }
        // Test cases can modify s.cluster before calling
        // initialize(), and then modify private state before calling
        // go run().
 }
 
 func (s *DispatcherSuite) TearDownTest(c *check.C) {
+       s.cancel()
        s.disp.Close()
 }
 
index 4bd27021c675d1c8ce40753d131d0631041ea59c..148b653c2e52305b2ece2255c49d98bf6cb72f50 100644 (file)
@@ -23,7 +23,7 @@ func (sch *Scheduler) fixStaleLocks() {
        var stale []string
        timeout := time.NewTimer(sch.staleLockTimeout)
 waiting:
-       for {
+       for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
                running := sch.pool.Running()
                qEntries, _ := sch.queue.Entries()
 
@@ -43,11 +43,6 @@ waiting:
 
                select {
                case <-wp:
-                       // Stop waiting if all workers have been
-                       // contacted.
-                       if sch.pool.CountWorkers()[worker.StateUnknown] == 0 {
-                               break waiting
-                       }
                case <-timeout.C:
                        // Give up.
                        break waiting
index 7dd6866c0f389e51a393300f9235053159582d51..4296a1364c911fc94d44af28512ecac195b4e5f5 100644 (file)
@@ -5,12 +5,14 @@
 package scheduler
 
 import (
+       "context"
        "sync"
        "time"
 
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        check "gopkg.in/check.v1"
 )
 
@@ -120,6 +122,7 @@ type SchedulerSuite struct{}
 // immediately. Don't try to create any other nodes after the failed
 // create.
 func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        queue := test.Queue{
                ChooseType: chooseType,
                Containers: []arvados.Container{
@@ -174,7 +177,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
                running:   map[string]time.Time{},
                canCreate: 0,
        }
-       New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
        c.Check(pool.running, check.HasLen, 1)
@@ -186,6 +189,7 @@ func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
 // If Create() fails, shutdown some nodes, and don't call Create()
 // again.  Don't call Create() at all if AtQuota() is true.
 func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        for quota := 0; quota < 2; quota++ {
                c.Logf("quota=%d", quota)
                shouldCreate := []arvados.InstanceType{}
@@ -229,7 +233,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
                        starts:    []string{},
                        canCreate: 0,
                }
-               New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+               New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
                c.Check(pool.creates, check.DeepEquals, shouldCreate)
                c.Check(pool.starts, check.DeepEquals, []string{})
                c.Check(pool.shutdowns, check.Not(check.Equals), 0)
@@ -239,6 +243,7 @@ func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
 // Start lower-priority containers while waiting for new/existing
 // workers to come up for higher-priority containers.
 func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+       ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
        pool := stubPool{
                unalloc: map[arvados.InstanceType]int{
                        test.InstanceType(1): 2,
@@ -317,7 +322,7 @@ func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
                },
        }
        queue.Update()
-       New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
        c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
        c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
        running := map[string]bool{}
index 83fc08a9ffdb28c285965ca7a3f6cd41aba4dd7d..97cacee08f3e6c5ecfaf1701833f2bfe06fd4132 100644 (file)
@@ -7,9 +7,11 @@
 package scheduler
 
 import (
+       "context"
        "sync"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/sirupsen/logrus"
 )
 
@@ -44,9 +46,9 @@ type Scheduler struct {
 //
 // Any given queue and pool should not be used by more than one
 // scheduler at a time.
-func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
        return &Scheduler{
-               logger:              logger,
+               logger:              ctxlog.FromContext(ctx),
                queue:               queue,
                pool:                pool,
                staleLockTimeout:    staleLockTimeout,
@@ -75,7 +77,10 @@ func (sch *Scheduler) run() {
        // Ensure the queue is fetched once before attempting anything.
        for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
                sch.logger.Errorf("error updating queue: %s", err)
-               d := sch.queueUpdateInterval / 60
+               d := sch.queueUpdateInterval / 10
+               if d < time.Second {
+                       d = time.Second
+               }
                sch.logger.Infof("waiting %s before retry", d)
                time.Sleep(d)
        }
index 47c754e243dab20ae127cd8741c9decb9eea9688..ed78a8daeacac175523b4073684ab64cb08f7697 100644 (file)
@@ -50,7 +50,7 @@ func (sch *Scheduler) sync() {
                        } else if !exited.IsZero() && qUpdated.After(exited) {
                                go cancel(ent, "state=\"Running\" after crunch-run exited")
                        } else if ent.Container.Priority == 0 {
-                               go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
+                               go kill(ent, "priority=0")
                        }
                case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
                        if running {
@@ -82,16 +82,30 @@ func (sch *Scheduler) sync() {
                        if running && !exited.IsZero() && qUpdated.After(exited) {
                                logger := sch.logger.WithFields(logrus.Fields{
                                        "ContainerUUID": uuid,
+                                       "State":         ent.Container.State,
+                                       "Priority":      ent.Container.Priority,
                                        "Exited":        time.Since(exited).Seconds(),
                                })
-                               logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
+                               logger.Info("requeueing locked container after crunch-run exited")
+                               err := sch.queue.Unlock(uuid)
+                               if err != nil {
+                                       logger.WithError(err).Error("error requeueing container")
+                               }
+                       } else if running && exited.IsZero() && ent.Container.Priority == 0 {
+                               go kill(ent, "priority=0")
+                       } else if !running && ent.Container.Priority == 0 {
+                               logger := sch.logger.WithField("ContainerUUID", uuid)
+                               logger.Info("unlocking container because priority=0")
                                err := sch.queue.Unlock(uuid)
                                if err != nil {
-                                       logger.WithError(err).Info("error requeueing container")
+                                       logger.WithError(err).Error("error requeueing container")
                                }
                        }
                default:
-                       sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+                       sch.logger.WithFields(logrus.Fields{
+                               "ContainerUUID": uuid,
+                               "State":         ent.Container.State,
+                       }).Error("BUG: unexpected state")
                }
        }
 }
index d0fb54c54cd932df806e0129a0f92d78cd3a9999..feed1c2a78b82a84821f22eee99e39e960dbd431 100644 (file)
@@ -38,6 +38,7 @@ func New(t cloud.ExecutorTarget) *Executor {
 type Executor struct {
        target     cloud.ExecutorTarget
        targetPort string
+       targetUser string
        signers    []ssh.Signer
        mtx        sync.RWMutex // controls access to instance after creation
 
@@ -182,6 +183,9 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
        if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
                // Target address does not specify a port.  Use
                // targetPort, or "ssh".
+               if h == "" {
+                       h = addr
+               }
                if p = exr.targetPort; p == "" {
                        p = "ssh"
                }
@@ -189,7 +193,7 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
        }
        var receivedKey ssh.PublicKey
        client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
-               User: "root",
+               User: target.RemoteUser(),
                Auth: []ssh.AuthMethod{
                        ssh.PublicKeys(exr.signers...),
                },
index f8565b4a710705c1367a54f9a954a753ff1d2715..e7c023586b4bb3c09ac8968c35c2cc3f1ed01ee2 100644 (file)
@@ -73,6 +73,7 @@ func (s *ExecutorSuite) TestBadHostKey(c *check.C) {
                                return 0
                        },
                        HostKey:        hostpriv,
+                       AuthorizedUser: "username",
                        AuthorizedKeys: []ssh.PublicKey{clientpub},
                },
        }
@@ -121,6 +122,7 @@ func (s *ExecutorSuite) TestExecute(c *check.C) {
                                        return uint32(exitcode)
                                },
                                HostKey:        hostpriv,
+                               AuthorizedUser: "username",
                                AuthorizedKeys: []ssh.PublicKey{clientpub},
                        },
                }
diff --git a/lib/dispatchcloud/test/logger.go b/lib/dispatchcloud/test/logger.go
deleted file mode 100644 (file)
index a59eeb6..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package test
-
-import (
-       "os"
-
-       "github.com/sirupsen/logrus"
-)
-
-func Logger() logrus.FieldLogger {
-       logger := logrus.StandardLogger()
-       if os.Getenv("ARVADOS_DEBUG") != "" {
-               logger.SetLevel(logrus.DebugLevel)
-       }
-       return logger
-}
index ed5995f4c5f0faa84356c01f2777d1e0a366cbc1..5fab9572a953858fcc0f9aff7abc4886f5a54c8d 100644 (file)
@@ -39,6 +39,7 @@ type SSHExecFunc func(env map[string]string, command string, stdin io.Reader, st
 type SSHService struct {
        Exec           SSHExecFunc
        HostKey        ssh.Signer
+       AuthorizedUser string
        AuthorizedKeys []ssh.PublicKey
 
        listener net.Listener
@@ -64,6 +65,11 @@ func (ss *SSHService) Address() string {
        return ln.Addr().String()
 }
 
+// RemoteUser returns the username that will be accepted.
+func (ss *SSHService) RemoteUser() string {
+       return ss.AuthorizedUser
+}
+
 // Close shuts down the server and releases resources. Established
 // connections are unaffected.
 func (ss *SSHService) Close() {
index fbab30b175d674ceec0b18d2b1726dfd930df1d7..5873e492213b86f58eaa98850c5c00c073cd2aee 100644 (file)
@@ -10,6 +10,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "io/ioutil"
        math_rand "math/rand"
        "regexp"
        "strings"
@@ -61,6 +62,7 @@ func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID
        }
        sis := StubInstanceSet{
                driver:  sd,
+               logger:  logger,
                servers: map[cloud.InstanceID]*StubVM{},
        }
        sd.instanceSets = append(sd.instanceSets, &sis)
@@ -90,6 +92,7 @@ func (sd *StubDriver) ReleaseCloudOps(n int) {
 
 type StubInstanceSet struct {
        driver  *StubDriver
+       logger  logrus.FieldLogger
        servers map[cloud.InstanceID]*StubVM
        mtx     sync.RWMutex
        stopped bool
@@ -98,7 +101,7 @@ type StubInstanceSet struct {
        allowInstancesCall time.Time
 }
 
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
        if sis.driver.HoldCloudOps {
                sis.driver.holdCloudOps <- true
        }
@@ -122,9 +125,11 @@ func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID,
                id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
                tags:         copyTags(tags),
                providerType: it.ProviderType,
+               initCommand:  cmd,
        }
        svm.SSHService = SSHService{
                HostKey:        sis.driver.HostKey,
+               AuthorizedUser: "root",
                AuthorizedKeys: ak,
                Exec:           svm.Exec,
        }
@@ -182,6 +187,7 @@ type StubVM struct {
        sis          *StubInstanceSet
        id           cloud.InstanceID
        tags         cloud.InstanceTags
+       initCommand  cloud.InitCommand
        providerType string
        SSHService   SSHService
        running      map[string]bool
@@ -205,6 +211,11 @@ func (svm *StubVM) Instance() stubInstance {
 }
 
 func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       stdinData, err := ioutil.ReadAll(stdin)
+       if err != nil {
+               fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
+               return 1
+       }
        queue := svm.sis.driver.Queue
        uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
        if eta := svm.Boot.Sub(time.Now()); eta > 0 {
@@ -219,10 +230,16 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                fmt.Fprint(stderr, "crunch-run: command not found\n")
                return 1
        }
-       if strings.HasPrefix(command, "crunch-run --detach ") {
+       if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
+               var stdinKV map[string]string
+               err := json.Unmarshal(stdinData, &stdinKV)
+               if err != nil {
+                       fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
+                       return 1
+               }
                for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
-                       if env[name] == "" {
-                               fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+                       if stdinKV[name] == "" {
+                               fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
                                return 1
                        }
                }
@@ -234,7 +251,7 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                svm.Unlock()
                time.Sleep(svm.CrunchRunDetachDelay)
                fmt.Fprintf(stderr, "starting %s\n", uuid)
-               logger := logrus.WithFields(logrus.Fields{
+               logger := svm.sis.logger.WithFields(logrus.Fields{
                        "Instance":      svm.id,
                        "ContainerUUID": uuid,
                })
@@ -319,6 +336,10 @@ func (si stubInstance) Address() string {
        return si.addr
 }
 
+func (si stubInstance) RemoteUser() string {
+       return si.svm.SSHService.AuthorizedUser
+}
+
 func (si stubInstance) Destroy() error {
        sis := si.svm.sis
        if sis.driver.HoldCloudOps {
index e6b50629892e62c250ebcf05ba5bb226daec1ce1..14f6a3efced3815f11b19b6e08612ead4326e4f6 100644 (file)
@@ -5,7 +5,9 @@
 package worker
 
 import (
+       "crypto/rand"
        "errors"
+       "fmt"
        "io"
        "sort"
        "strings"
@@ -16,16 +18,19 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
 )
 
 const (
-       tagKeyInstanceType = "InstanceType"
-       tagKeyIdleBehavior = "IdleBehavior"
+       tagKeyInstanceType   = "InstanceType"
+       tagKeyIdleBehavior   = "IdleBehavior"
+       tagKeyInstanceSecret = "InstanceSecret"
 )
 
 // An InstanceView shows a worker's current state and recent activity.
 type InstanceView struct {
        Instance             cloud.InstanceID `json:"instance"`
+       Address              string           `json:"address"`
        Price                float64          `json:"price"`
        ArvadosInstanceType  string           `json:"arvados_instance_type"`
        ProviderInstanceType string           `json:"provider_instance_type"`
@@ -84,7 +89,7 @@ func duration(conf arvados.Duration, def time.Duration) time.Duration {
 //
 // New instances are configured and set up according to the given
 // cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
        wp := &Pool{
                logger:             logger,
                arvClient:          arvClient,
@@ -100,6 +105,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
                timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
                timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               installPublicKey:   installPublicKey,
                stop:               make(chan bool),
        }
        wp.registerMetrics(reg)
@@ -130,6 +136,7 @@ type Pool struct {
        timeoutBooting     time.Duration
        timeoutProbe       time.Duration
        timeoutShutdown    time.Duration
+       installPublicKey   ssh.PublicKey
 
        // private state
        subscribers  map[<-chan struct{}]chan<- struct{}
@@ -146,13 +153,11 @@ type Pool struct {
        throttleCreate    throttle
        throttleInstances throttle
 
-       mInstances         prometheus.Gauge
-       mInstancesPrice    prometheus.Gauge
        mContainersRunning prometheus.Gauge
-       mVCPUs             prometheus.Gauge
-       mVCPUsInuse        prometheus.Gauge
-       mMemory            prometheus.Gauge
-       mMemoryInuse       prometheus.Gauge
+       mInstances         *prometheus.GaugeVec
+       mInstancesPrice    *prometheus.GaugeVec
+       mVCPUs             *prometheus.GaugeVec
+       mMemory            *prometheus.GaugeVec
 }
 
 // Subscribe returns a buffered channel that becomes ready after any
@@ -254,15 +259,18 @@ func (wp *Pool) Create(it arvados.InstanceType) bool {
        if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
                return false
        }
-       tags := cloud.InstanceTags{
-               tagKeyInstanceType: it.Name,
-               tagKeyIdleBehavior: string(IdleBehaviorRun),
-       }
        now := time.Now()
        wp.creating[it] = append(wp.creating[it], now)
        go func() {
                defer wp.notify()
-               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               secret := randomHex(instanceSecretLength)
+               tags := cloud.InstanceTags{
+                       tagKeyInstanceType:   it.Name,
+                       tagKeyIdleBehavior:   string(IdleBehaviorRun),
+                       tagKeyInstanceSecret: secret,
+               }
+               initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
                wp.mtx.Lock()
                defer wp.mtx.Unlock()
                // Remove our timestamp marker from wp.creating
@@ -318,6 +326,7 @@ func (wp *Pool) SetIdleBehavior(id cloud.InstanceID, idleBehavior IdleBehavior)
 //
 // Caller must have lock.
 func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+       inst = tagVerifier{inst}
        id := inst.ID()
        if wkr := wp.workers[id]; wkr != nil {
                wkr.executor.SetTarget(inst)
@@ -343,7 +352,8 @@ func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initi
 
        logger := wp.logger.WithFields(logrus.Fields{
                "InstanceType": it.Name,
-               "Instance":     inst,
+               "Instance":     inst.ID(),
+               "Address":      inst.Address(),
        })
        logger.WithFields(logrus.Fields{
                "State":        initialState,
@@ -482,10 +492,14 @@ func (wp *Pool) KillContainer(uuid string) {
 func (wp *Pool) kill(wkr *worker, uuid string) {
        logger := wp.logger.WithFields(logrus.Fields{
                "ContainerUUID": uuid,
-               "Instance":      wkr.instance,
+               "Instance":      wkr.instance.ID(),
        })
        logger.Debug("killing process")
-       stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
+       cmd := "crunch-run --kill 15 " + uuid
+       if u := wkr.instance.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
+       stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                logger.WithFields(logrus.Fields{
                        "stderr": string(stderr),
@@ -511,20 +525,6 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
        if reg == nil {
                reg = prometheus.NewRegistry()
        }
-       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_total",
-               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstances)
-       wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "instances_price_total",
-               Help:      "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
-       })
-       reg.MustRegister(wp.mInstancesPrice)
        wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
@@ -532,35 +532,34 @@ func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
                Help:      "Number of containers reported running by cloud VMs.",
        })
        reg.MustRegister(wp.mContainersRunning)
-
-       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstances)
+       wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_price",
+               Help:      "Price of cloud VMs.",
+       }, []string{"category"})
+       reg.MustRegister(wp.mInstancesPrice)
+       wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "vcpus_total",
                Help:      "Total VCPUs on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mVCPUs)
-       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "vcpus_inuse",
-               Help:      "VCPUs on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mVCPUsInuse)
-       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+       wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
                Namespace: "arvados",
                Subsystem: "dispatchcloud",
                Name:      "memory_bytes_total",
                Help:      "Total memory on all cloud VMs.",
-       })
+       }, []string{"category"})
        reg.MustRegister(wp.mMemory)
-       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
-               Namespace: "arvados",
-               Subsystem: "dispatchcloud",
-               Name:      "memory_bytes_inuse",
-               Help:      "Memory on cloud VMs that are running containers.",
-       })
-       reg.MustRegister(wp.mMemoryInuse)
 }
 
 func (wp *Pool) runMetrics() {
@@ -575,26 +574,38 @@ func (wp *Pool) updateMetrics() {
        wp.mtx.RLock()
        defer wp.mtx.RUnlock()
 
-       var price float64
-       var alloc, cpu, cpuInuse, mem, memInuse int64
+       instances := map[string]int64{}
+       price := map[string]float64{}
+       cpu := map[string]int64{}
+       mem := map[string]int64{}
+       var running int64
        for _, wkr := range wp.workers {
-               price += wkr.instType.Price
-               cpu += int64(wkr.instType.VCPUs)
-               mem += int64(wkr.instType.RAM)
-               if len(wkr.running)+len(wkr.starting) == 0 {
-                       continue
+               var cat string
+               switch {
+               case len(wkr.running)+len(wkr.starting) > 0:
+                       cat = "inuse"
+               case wkr.idleBehavior == IdleBehaviorHold:
+                       cat = "hold"
+               case wkr.state == StateBooting:
+                       cat = "booting"
+               case wkr.state == StateUnknown:
+                       cat = "unknown"
+               default:
+                       cat = "idle"
                }
-               alloc += int64(len(wkr.running) + len(wkr.starting))
-               cpuInuse += int64(wkr.instType.VCPUs)
-               memInuse += int64(wkr.instType.RAM)
-       }
-       wp.mInstances.Set(float64(len(wp.workers)))
-       wp.mInstancesPrice.Set(price)
-       wp.mContainersRunning.Set(float64(alloc))
-       wp.mVCPUs.Set(float64(cpu))
-       wp.mMemory.Set(float64(mem))
-       wp.mVCPUsInuse.Set(float64(cpuInuse))
-       wp.mMemoryInuse.Set(float64(memInuse))
+               instances[cat]++
+               price[cat] += wkr.instType.Price
+               cpu[cat] += int64(wkr.instType.VCPUs)
+               mem[cat] += int64(wkr.instType.RAM)
+               running += int64(len(wkr.running) + len(wkr.starting))
+       }
+       for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+               wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+               wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+               wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+               wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+       }
+       wp.mContainersRunning.Set(float64(running))
 }
 
 func (wp *Pool) runProbes() {
@@ -673,6 +684,7 @@ func (wp *Pool) Instances() []InstanceView {
        for _, w := range wp.workers {
                r = append(r, InstanceView{
                        Instance:             w.instance.ID(),
+                       Address:              w.instance.Address(),
                        Price:                w.instType.Price,
                        ArvadosInstanceType:  w.instType.Name,
                        ProviderInstanceType: w.instType.ProviderType,
@@ -753,7 +765,7 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                        continue
                }
                logger := wp.logger.WithFields(logrus.Fields{
-                       "Instance":    wkr.instance,
+                       "Instance":    wkr.instance.ID(),
                        "WorkerState": wkr.state,
                })
                logger.Info("instance disappeared in cloud")
@@ -771,3 +783,14 @@ func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
                go wp.notify()
        }
 }
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+       buf := make([]byte, n/2)
+       _, err := rand.Read(buf)
+       if err != nil {
+               panic(err)
+       }
+       return fmt.Sprintf("%x", buf)
+}
index 3b66eeb417e7fb310768d950b50b7ad6c7066f0c..da9e650b8121889511886e9b16dc8eb827fcee14 100644 (file)
@@ -12,6 +12,7 @@ import (
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
@@ -62,7 +63,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       logger := test.Logger()
+       logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
        is, err := driver.InstanceSet(nil, "", logger)
        c.Assert(err, check.IsNil)
@@ -90,7 +91,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                },
        }
 
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
        pool.Create(type1)
@@ -108,7 +109,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
        c.Log("------- starting new pool, waiting to recover state")
 
-       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
        notify2 := pool2.Subscribe()
        defer pool2.Unsubscribe(notify2)
        waitForIdle(pool2, notify2)
@@ -124,7 +125,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 }
 
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-       logger := test.Logger()
+       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
        instanceSet, err := driver.InstanceSet(nil, "", logger)
        c.Assert(err, check.IsNil)
diff --git a/lib/dispatchcloud/worker/verify.go b/lib/dispatchcloud/worker/verify.go
new file mode 100644 (file)
index 0000000..e22c85d
--- /dev/null
@@ -0,0 +1,56 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "golang.org/x/crypto/ssh"
+)
+
+var (
+       errBadInstanceSecret = errors.New("bad instance secret")
+
+       // filename on instance, as given to shell (quoted accordingly)
+       instanceSecretFilename = "/var/run/arvados-instance-secret"
+       instanceSecretLength   = 40 // hex digits
+)
+
+type tagVerifier struct {
+       cloud.Instance
+}
+
+func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
+       expectSecret := tv.Instance.Tags()[tagKeyInstanceSecret]
+       if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || expectSecret == "" {
+               // If the wrapped instance indicates it has a way to
+               // verify the key, return that decision.
+               return err
+       }
+       session, err := client.NewSession()
+       if err != nil {
+               return err
+       }
+       defer session.Close()
+       var stdout, stderr bytes.Buffer
+       session.Stdin = bytes.NewBuffer(nil)
+       session.Stdout = &stdout
+       session.Stderr = &stderr
+       cmd := fmt.Sprintf("cat %s", instanceSecretFilename)
+       if u := tv.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
+       err = session.Run(cmd)
+       if err != nil {
+               return err
+       }
+       if stdout.String() != expectSecret {
+               return errBadInstanceSecret
+       }
+       return nil
+}
index a24747267615b9b0d0d0c8851271e92bdb85087c..9be9f41f43b7ef51cbb1d1257e4ac39f642472aa 100644 (file)
@@ -6,6 +6,7 @@ package worker
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "strings"
        "sync"
@@ -13,6 +14,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
        "github.com/sirupsen/logrus"
 )
 
@@ -96,7 +98,7 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                "ContainerUUID": ctr.UUID,
                "Priority":      ctr.Priority,
        })
-       logger = logger.WithField("Instance", wkr.instance)
+       logger = logger.WithField("Instance", wkr.instance.ID())
        logger.Debug("starting container")
        wkr.starting[ctr.UUID] = struct{}{}
        wkr.state = StateRunning
@@ -105,7 +107,19 @@ func (wkr *worker) startContainer(ctr arvados.Container) {
                        "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
                        "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
                }
-               stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+               if wkr.wp.arvClient.Insecure {
+                       env["ARVADOS_API_HOST_INSECURE"] = "1"
+               }
+               envJSON, err := json.Marshal(env)
+               if err != nil {
+                       panic(err)
+               }
+               stdin := bytes.NewBuffer(envJSON)
+               cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+               if u := wkr.instance.RemoteUser(); u != "root" {
+                       cmd = "sudo " + cmd
+               }
+               stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
                wkr.mtx.Lock()
                defer wkr.mtx.Unlock()
                now := time.Now()
@@ -325,6 +339,9 @@ func (wkr *worker) probeAndUpdate() {
 
 func (wkr *worker) probeRunning() (running []string, ok bool) {
        cmd := "crunch-run --list"
+       if u := wkr.instance.RemoteUser(); u != "root" {
+               cmd = "sudo " + cmd
+       }
        stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
        if err != nil {
                wkr.logger.WithFields(logrus.Fields{
@@ -400,7 +417,7 @@ func (wkr *worker) shutdownIfIdle() bool {
 
        wkr.logger.WithFields(logrus.Fields{
                "State":        wkr.state,
-               "Age":          age,
+               "IdleDuration": stats.Duration(age),
                "IdleBehavior": wkr.idleBehavior,
        }).Info("shutdown idle worker")
        wkr.shutdown()
@@ -427,22 +444,24 @@ func (wkr *worker) shutdown() {
 // match. Caller must have lock.
 func (wkr *worker) saveTags() {
        instance := wkr.instance
-       have := instance.Tags()
-       want := cloud.InstanceTags{
+       tags := instance.Tags()
+       update := cloud.InstanceTags{
                tagKeyInstanceType: wkr.instType.Name,
                tagKeyIdleBehavior: string(wkr.idleBehavior),
        }
-       go func() {
-               for k, v := range want {
-                       if v == have[k] {
-                               continue
-                       }
-                       err := instance.SetTags(want)
+       save := false
+       for k, v := range update {
+               if tags[k] != v {
+                       tags[k] = v
+                       save = true
+               }
+       }
+       if save {
+               go func() {
+                       err := instance.SetTags(tags)
                        if err != nil {
-                               wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+                               wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
                        }
-                       break
-
-               }
-       }()
+               }()
+       }
 }
index 2eb5255b87fb174410b05d2e1c9d8e66800187e3..3bc33b62c9fee896f107278a564b859d1448366e 100644 (file)
@@ -12,6 +12,7 @@ import (
        "git.curoverse.com/arvados.git/lib/cloud"
        "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        check "gopkg.in/check.v1"
 )
 
@@ -20,13 +21,13 @@ var _ = check.Suite(&WorkerSuite{})
 type WorkerSuite struct{}
 
 func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
-       logger := test.Logger()
+       logger := ctxlog.TestLogger(c)
        bootTimeout := time.Minute
        probeTimeout := time.Second
 
        is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
        c.Assert(err, check.IsNil)
-       inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+       inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
        c.Assert(err, check.IsNil)
 
        type trialT struct {
index 7f6b0236cb9571cd3ca30420cb6d41af6d787bd3..d99af0eea15428054fd5adc16596ca89b1de7820 100644 (file)
@@ -6,6 +6,7 @@
 package service
 
 import (
+       "context"
        "flag"
        "fmt"
        "io"
@@ -14,6 +15,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/coreos/go-systemd/daemon"
        "github.com/sirupsen/logrus"
@@ -24,7 +26,7 @@ type Handler interface {
        CheckHealth() error
 }
 
-type NewHandlerFunc func(*arvados.Cluster, *arvados.NodeProfile) Handler
+type NewHandlerFunc func(context.Context, *arvados.Cluster, *arvados.NodeProfile) Handler
 
 type command struct {
        newHandler NewHandlerFunc
@@ -45,11 +47,7 @@ func Command(svcName arvados.ServiceName, newHandler NewHandlerFunc) cmd.Handler
 }
 
 func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
-       log := logrus.New()
-       log.Formatter = &logrus.JSONFormatter{
-               TimestampFormat: rfc3339NanoFixed,
-       }
-       log.Out = stderr
+       log := ctxlog.New(stderr, "json", "info")
 
        var err error
        defer func() {
@@ -76,6 +74,10 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
        if err != nil {
                return 1
        }
+       log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
+               "PID": os.Getpid(),
+       })
+       ctx := ctxlog.Context(context.Background(), log)
        profileName := *nodeProfile
        if profileName == "" {
                profileName = os.Getenv("ARVADOS_NODE_PROFILE")
@@ -89,7 +91,7 @@ func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout
                err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
                return 1
        }
-       handler := c.newHandler(cluster, profile)
+       handler := c.newHandler(ctx, cluster, profile)
        if err = handler.CheckHealth(); err != nil {
                return 1
        }
index 636cf350c151b150c59a96c45c312ea7af6d45c2..c2154d0f29cd1dbb6decad7962036dd9073bd24e 100644 (file)
@@ -66,6 +66,12 @@ type Cluster struct {
        RemoteClusters     map[string]RemoteCluster
        PostgreSQL         PostgreSQL
        RequestLimits      RequestLimits
+       Logging            Logging
+}
+
+type Logging struct {
+       Level  string
+       Format string
 }
 
 type PostgreSQL struct {
@@ -168,6 +174,9 @@ func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
                        if _, ok := (*it)[t.Name]; ok {
                                return errDuplicateInstanceTypeName
                        }
+                       if t.ProviderType == "" {
+                               t.ProviderType = t.Name
+                       }
                        (*it)[t.Name] = t
                }
                return nil
@@ -177,10 +186,14 @@ func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
        if err != nil {
                return err
        }
-       // Fill in Name field using hash key.
+       // Fill in Name field (and ProviderType field, if not
+       // specified) using hash key.
        *it = InstanceTypeMap(hash)
        for name, t := range *it {
                t.Name = name
+               if t.ProviderType == "" {
+                       t.ProviderType = name
+               }
                (*it)[name] = t
        }
        return nil
index 45e4efdbeff2c5e5e507e92c94c85c8a189d8263..e66eeadee1e1fc8d6b50cd3e10fa59e8a5a66a80 100644 (file)
@@ -5,9 +5,13 @@
 package ctxlog
 
 import (
+       "bytes"
        "context"
+       "io"
+       "os"
 
        "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
 )
 
 var (
@@ -19,45 +23,87 @@ const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
 // Context returns a new child context such that FromContext(child)
 // returns the given logger.
-func Context(ctx context.Context, logger *logrus.Entry) context.Context {
+func Context(ctx context.Context, logger logrus.FieldLogger) context.Context {
        return context.WithValue(ctx, loggerCtxKey, logger)
 }
 
 // FromContext returns the logger suitable for the given context -- the one
 // attached by contextWithLogger() if applicable, otherwise the
 // top-level logger with no fields/values.
-func FromContext(ctx context.Context) *logrus.Entry {
+func FromContext(ctx context.Context) logrus.FieldLogger {
        if ctx != nil {
-               if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+               if logger, ok := ctx.Value(loggerCtxKey).(logrus.FieldLogger); ok {
                        return logger
                }
        }
        return rootLogger.WithFields(nil)
 }
 
+// New returns a new logger with the indicated format and
+// level.
+func New(out io.Writer, format, level string) logrus.FieldLogger {
+       logger := logrus.New()
+       logger.Out = out
+       setFormat(logger, format)
+       setLevel(logger, level)
+       return logger
+}
+
+func TestLogger(c *check.C) logrus.FieldLogger {
+       logger := logrus.New()
+       logger.Out = &logWriter{c.Log}
+       setFormat(logger, "text")
+       if d := os.Getenv("ARVADOS_DEBUG"); d != "0" && d != "" {
+               setLevel(logger, "debug")
+       } else {
+               setLevel(logger, "info")
+       }
+       return logger
+}
+
 // SetLevel sets the current logging level. See logrus for level
 // names.
 func SetLevel(level string) {
-       lvl, err := logrus.ParseLevel(level)
-       if err != nil {
-               logrus.Fatal(err)
+       setLevel(rootLogger, level)
+}
+
+func setLevel(logger *logrus.Logger, level string) {
+       if level == "" {
+       } else if lvl, err := logrus.ParseLevel(level); err != nil {
+               logrus.WithField("Level", level).Fatal("unknown log level")
+       } else {
+               logger.Level = lvl
        }
-       rootLogger.Level = lvl
 }
 
 // SetFormat sets the current logging format to "json" or "text".
 func SetFormat(format string) {
+       setFormat(rootLogger, format)
+}
+
+func setFormat(logger *logrus.Logger, format string) {
        switch format {
        case "text":
-               rootLogger.Formatter = &logrus.TextFormatter{
+               logger.Formatter = &logrus.TextFormatter{
                        FullTimestamp:   true,
                        TimestampFormat: rfc3339NanoFixed,
                }
-       case "json":
-               rootLogger.Formatter = &logrus.JSONFormatter{
+       case "json", "":
+               logger.Formatter = &logrus.JSONFormatter{
                        TimestampFormat: rfc3339NanoFixed,
                }
        default:
-               logrus.WithField("LogFormat", format).Fatal("unknown log format")
+               logrus.WithField("Format", format).Fatal("unknown log format")
        }
 }
+
+// logWriter is an io.Writer that writes by calling a "write log"
+// function, typically (*check.C)Log().
+type logWriter struct {
+       logfunc func(...interface{})
+}
+
+func (tl *logWriter) Write(buf []byte) (int, error) {
+       tl.logfunc(string(bytes.TrimRight(buf, "\n")))
+       return len(buf), nil
+}
index 3d1e6491150624bcbcbad9aa95a5e4dc202dd390..abcfdbd296b3ab71cf7e8466e7c9279076f2c93f 100644 (file)
@@ -382,6 +382,9 @@ class Container < ArvadosModel
     else
       kwargs = {}
     end
+    if users_list.select { |u| u.is_admin }.any?
+      return super
+    end
     Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
   end
 
index 5750d5ebbd0394d99bdce8ae2a038bb1d799cd77..178135ead87098b23874b3eeb607437458ee2eb0 100644 (file)
@@ -723,6 +723,14 @@ class ContainerTest < ActiveSupport::TestCase
     assert_equal 1, Container.readable_by(users(:active)).where(state: "Queued").count
   end
 
+  test "Containers with no matching request are readable by admin" do
+    uuids = Container.includes('container_requests').where(container_requests: {uuid: nil}).collect(&:uuid)
+    assert_not_empty uuids
+    assert_empty Container.readable_by(users(:active)).where(uuid: uuids)
+    assert_not_empty Container.readable_by(users(:admin)).where(uuid: uuids)
+    assert_equal uuids.count, Container.readable_by(users(:admin)).where(uuid: uuids).count
+  end
+
   test "Container locked cancel" do
     set_user_from_auth :active
     c, _ = minimal_new
index a50853837085f6b7a6fd89bb61eba381dc9f6098..b3c530e69013e91dfb7599c677347ecef3856d78 100644 (file)
@@ -8,7 +8,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "io/ioutil"
        "os"
        "os/exec"
        "path/filepath"
@@ -25,16 +24,17 @@ var (
 
 // procinfo is saved in each process's lockfile.
 type procinfo struct {
-       UUID   string
-       PID    int
-       Stdout string
-       Stderr string
+       UUID string
+       PID  int
 }
 
 // Detach acquires a lock for the given uuid, and starts the current
 // program as a child process (with -no-detach prepended to the given
 // arguments so the child knows not to detach again). The lock is
 // passed along to the child process.
+//
+// Stdout and stderr in the child process are sent to the systemd
+// journal using the systemd-cat program.
 func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
        return exitcode(stderr, detach(uuid, args, stdout, stderr))
 }
@@ -49,14 +49,15 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
                        return nil, err
                }
                defer dirlock.Close()
-               lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+               lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
+               lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
                if err != nil {
-                       return nil, err
+                       return nil, fmt.Errorf("open %s: %s", lockfilename, err)
                }
                err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
                if err != nil {
                        lockfile.Close()
-                       return nil, err
+                       return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
                }
                return lockfile, nil
        }()
@@ -66,21 +67,7 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
        defer lockfile.Close()
        lockfile.Truncate(0)
 
-       outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-")
-       if err != nil {
-               return err
-       }
-       defer outfile.Close()
-       errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-")
-       if err != nil {
-               os.Remove(outfile.Name())
-               return err
-       }
-       defer errfile.Close()
-
-       cmd := exec.Command(args[0], append([]string{"-no-detach"}, args[1:]...)...)
-       cmd.Stdout = outfile
-       cmd.Stderr = errfile
+       cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
        // Child inherits lockfile.
        cmd.ExtraFiles = []*os.File{lockfile}
        // Ensure child isn't interrupted even if we receive signals
@@ -89,24 +76,14 @@ func detach(uuid string, args []string, stdout, stderr io.Writer) error {
        cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
        err = cmd.Start()
        if err != nil {
-               os.Remove(outfile.Name())
-               os.Remove(errfile.Name())
-               return err
+               return fmt.Errorf("exec %s: %s", cmd.Path, err)
        }
 
        w := io.MultiWriter(stdout, lockfile)
-       err = json.NewEncoder(w).Encode(procinfo{
-               UUID:   uuid,
-               PID:    cmd.Process.Pid,
-               Stdout: outfile.Name(),
-               Stderr: errfile.Name(),
+       return json.NewEncoder(w).Encode(procinfo{
+               UUID: uuid,
+               PID:  cmd.Process.Pid,
        })
-       if err != nil {
-               os.Remove(outfile.Name())
-               os.Remove(errfile.Name())
-               return err
-       }
-       return nil
 }
 
 // KillProcess finds the crunch-run process corresponding to the given
@@ -123,14 +100,14 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
        if os.IsNotExist(err) {
                return nil
        } else if err != nil {
-               return err
+               return fmt.Errorf("open %s: %s", path, err)
        }
        defer f.Close()
 
        var pi procinfo
        err = json.NewDecoder(f).Decode(&pi)
        if err != nil {
-               return fmt.Errorf("%s: %s\n", path, err)
+               return fmt.Errorf("decode %s: %s\n", path, err)
        }
 
        if pi.UUID != uuid || pi.PID == 0 {
@@ -139,7 +116,7 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
 
        proc, err := os.FindProcess(pi.PID)
        if err != nil {
-               return err
+               return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
        }
 
        err = proc.Signal(signal)
@@ -147,16 +124,19 @@ func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
                err = proc.Signal(syscall.Signal(0))
        }
        if err == nil {
-               return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
+               return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
        }
-       fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err)
+       fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
        return nil
 }
 
 // List UUIDs of active crunch-run processes.
 func ListProcesses(stdout, stderr io.Writer) int {
-       return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
-               if info.IsDir() {
+       // filepath.Walk does not follow symlinks, so we must walk
+       // lockdir+"/." in case lockdir itself is a symlink.
+       walkdir := lockdir + "/."
+       return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
+               if info.IsDir() && path != walkdir {
                        return filepath.SkipDir
                }
                if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
@@ -186,7 +166,7 @@ func ListProcesses(stdout, stderr io.Writer) int {
                        err := os.Remove(path)
                        dirlock.Close()
                        if err != nil {
-                               fmt.Fprintln(stderr, err)
+                               fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
                        }
                        return nil
                }
@@ -224,14 +204,15 @@ func exitcode(stderr io.Writer, err error) int {
 //
 // Caller releases the lock by closing the returned file.
 func lockall() (*os.File, error) {
-       f, err := os.OpenFile(filepath.Join(lockdir, lockprefix+"all"+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+       lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
+       f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
        if err != nil {
-               return nil, err
+               return nil, fmt.Errorf("open %s: %s", lockfile, err)
        }
        err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
        if err != nil {
                f.Close()
-               return nil, err
+               return nil, fmt.Errorf("lock %s: %s", lockfile, err)
        }
        return f, nil
 }
index 2b9a119581dfd7c4f3245b1e57317ae95155f5b9..0576337aa13c280841187db3a7aea2dcf4af65c0 100644 (file)
@@ -1737,6 +1737,7 @@ func main() {
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
        detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
        sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1766,6 +1767,13 @@ func main() {
 
        flag.Parse()
 
+       if *stdinEnv && !ignoreDetachFlag {
+               // Load env vars on stdin if asked (but not in a
+               // detached child process, in which case stdin is
+               // /dev/null).
+               loadEnv(os.Stdin)
+       }
+
        switch {
        case *detach && !ignoreDetachFlag:
                os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
@@ -1856,3 +1864,21 @@ func main() {
                log.Fatalf("%s: %v", containerId, runerr)
        }
 }
+
+func loadEnv(rdr io.Reader) {
+       buf, err := ioutil.ReadAll(rdr)
+       if err != nil {
+               log.Fatalf("read stdin: %s", err)
+       }
+       var env map[string]string
+       err = json.Unmarshal(buf, &env)
+       if err != nil {
+               log.Fatalf("decode stdin: %s", err)
+       }
+       for k, v := range env {
+               err = os.Setenv(k, v)
+               if err != nil {
+                       log.Fatalf("setenv(%q): %s", k, err)
+               }
+       }
+}
index 7bd1498158304ea3ab8a969c0c90129241ab1028..63bdb49e5b6ad8675b26077deba8e29ead2e5b0f 100644 (file)
@@ -44,7 +44,7 @@ type v0session struct {
        permChecker   permChecker
        subscriptions []v0subscribe
        lastMsgID     uint64
-       log           *logrus.Entry
+       log           logrus.FieldLogger
        mtx           sync.Mutex
        setupOnce     sync.Once
 }