(
set -e
mkdir -p "$GOPATH/src/git.curoverse.com"
- rmdir -v --parents --ignore-fail-on-non-empty "${temp}/GOPATH"
if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
for d in \
"$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
# SPDX-License-Identifier: AGPL-3.0
[Unit]
-Description=Arvados cloud dispatch
+Description=arvados-dispatch-cloud
Documentation=https://doc.arvados.org/
After=network.target
AssertPathExists=/etc/arvados/config.yml
StorageAccount string
BlobContainer string
DeleteDanglingResourcesAfter arvados.Duration
+ AdminUsername string
}
+const tagKeyInstanceSecret = "InstanceSecret"
+
type virtualMachinesClientWrapper interface {
createOrUpdate(ctx context.Context,
resourceGroupName string,
instanceType arvados.InstanceType,
imageID cloud.ImageID,
newTags cloud.InstanceTags,
+ initCommand cloud.InitCommand,
publicKey ssh.PublicKey) (cloud.Instance, error) {
az.stopWg.Add(1)
defer az.stopWg.Done()
- if len(newTags["node-token"]) == 0 {
- return nil, fmt.Errorf("Must provide tag 'node-token'")
- }
-
name, err := randutil.String(15, "abcdefghijklmnopqrstuvwxyz0123456789")
if err != nil {
return nil, err
tags["dispatch-"+k] = &newstr
}
- tags["dispatch-instance-type"] = &instanceType.Name
-
nicParameters := network.Interface{
Location: &az.azconfig.Location,
Tags: tags,
az.azconfig.BlobContainer,
name)
- customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
-echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
+ customData := base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))
vmParameters := compute.VirtualMachine{
Location: &az.azconfig.Location,
},
OsProfile: &compute.OSProfile{
ComputerName: &name,
- AdminUsername: to.StringPtr("crunch"),
+ AdminUsername: to.StringPtr(az.azconfig.AdminUsername),
LinuxConfiguration: &compute.LinuxConfiguration{
DisablePasswordAuthentication: to.BoolPtr(true),
SSH: &compute.SSHConfiguration{
PublicKeys: &[]compute.SSHPublicKey{
- compute.SSHPublicKey{
- Path: to.StringPtr("/home/crunch/.ssh/authorized_keys"),
+ {
+ Path: to.StringPtr("/home/" + az.azconfig.AdminUsername + "/.ssh/authorized_keys"),
KeyData: to.StringPtr(string(ssh.MarshalAuthorizedKey(publicKey))),
},
},
if result.Value().Tags["created-at"] != nil {
createdAt, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
if err == nil {
- if timestamp.Sub(createdAt).Seconds() > az.azconfig.DeleteDanglingResourcesAfter.Duration().Seconds() {
- az.logger.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
+ if timestamp.Sub(createdAt) > az.azconfig.DeleteDanglingResourcesAfter.Duration() {
+ az.logger.Printf("Will delete %v because it is older than %s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
az.deleteNIC <- *result.Value().Name
}
}
return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
}
-func (ai *azureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
- ai.provider.stopWg.Add(1)
- defer ai.provider.stopWg.Done()
-
- remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
-
- tags := ai.Tags()
-
- tg := tags["ssh-pubkey-fingerprint"]
- if tg != "" {
- if remoteFingerprint == tg {
- return nil
- }
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", tg, remoteFingerprint)
- }
-
- nodetokenTag := tags["node-token"]
- if nodetokenTag == "" {
- return fmt.Errorf("Missing node token tag")
- }
-
- sess, err := client.NewSession()
- if err != nil {
- return err
- }
-
- nodetokenbytes, err := sess.Output("cat /home/crunch/node-token")
- if err != nil {
- return err
- }
-
- nodetoken := strings.TrimSpace(string(nodetokenbytes))
-
- expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
-
- if strings.TrimSpace(nodetoken) != expectedToken {
- return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
- }
-
- sess, err = client.NewSession()
- if err != nil {
- return err
- }
-
- keyfingerprintbytes, err := sess.Output("ssh-keygen -E sha256 -l -f /etc/ssh/ssh_host_rsa_key.pub")
- if err != nil {
- return err
- }
-
- sp := strings.Split(string(keyfingerprintbytes), " ")
-
- if remoteFingerprint != sp[1] {
- return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
- }
+func (ai *azureInstance) RemoteUser() string {
+ return ai.provider.azconfig.AdminUsername
+}
- tags["ssh-pubkey-fingerprint"] = sp[1]
- delete(tags, "node-token")
- ai.SetTags(tags)
- return nil
+func (ai *azureInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+ return cloud.ErrNotImplemented
}
// StorageAccount: example
// BlobContainer: vhds
// DeleteDanglingResourcesAfter: 20s
+// AdminUsername: crunch
package azure
"github.com/Azure/go-autorest/autorest"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/go-autorest/autorest/to"
- "github.com/jmcvetta/randutil"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
pk, _, _, _, err := ssh.ParseAuthorizedKey(testKey)
c.Assert(err, check.IsNil)
- nodetoken, err := randutil.String(40, "abcdefghijklmnopqrstuvwxyz0123456789")
- c.Assert(err, check.IsNil)
-
inst, err := ap.Create(cluster.InstanceTypes["tiny"],
img, map[string]string{
- "node-token": nodetoken},
- pk)
+ "TestTagName": "test tag value",
+ }, "umask 0600; echo -n test-file-data >/var/run/test-file", pk)
c.Assert(err, check.IsNil)
- tg := inst.Tags()
- log.Printf("Result %v %v %v", inst.String(), inst.Address(), tg)
+ tags := inst.Tags()
+ c.Check(tags["TestTagName"], check.Equals, "test tag value")
+ c.Logf("inst.String()=%v Address()=%v Tags()=%v", inst.String(), inst.Address(), tags)
}
c.Assert(err, check.IsNil)
if len(l) > 0 {
-
sshclient, err := SetupSSHClient(c, l[0])
c.Assert(err, check.IsNil)
+ defer sshclient.Conn.Close()
sess, err := sshclient.NewSession()
c.Assert(err, check.IsNil)
-
- out, err := sess.Output("cat /home/crunch/node-token")
+ defer sess.Close()
+ _, err = sess.Output("find /var/run/test-file -maxdepth 0 -user root -perm 0600")
c.Assert(err, check.IsNil)
- log.Printf("%v", string(out))
-
- sshclient.Conn.Close()
+ sess, err = sshclient.NewSession()
+ c.Assert(err, check.IsNil)
+ defer sess.Close()
+ out, err := sess.Output("sudo cat /var/run/test-file")
+ c.Assert(err, check.IsNil)
+ c.Check(string(out), check.Equals, "test-file-data")
}
}
import (
"encoding/json"
+ "errors"
"io"
"time"
Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
}
+var ErrNotImplemented = errors.New("not implemented")
+
// An ExecutorTarget is a remote command execution service.
type ExecutorTarget interface {
// SSH server hostname or IP address, or empty string if
// unknown while instance is booting.
Address() string
+ // Remote username to send during SSH authentication.
+ RemoteUser() string
+
// Return nil if the given public key matches the instance's
// SSH server key. If the provided Dialer is not nil,
// VerifyHostKey can use it to make outgoing network
// connections from the instance -- e.g., to use the cloud's
// "this instance's metadata" API.
+ //
+ // Return ErrNotImplemented if no verification mechanism is
+ // available.
VerifyHostKey(ssh.PublicKey, *ssh.Client) error
}
// All public methods of an InstanceSet, and all public methods of the
// instances it returns, are goroutine safe.
type InstanceSet interface {
- // Create a new instance. If supported by the driver, add the
+ // Create a new instance with the given type, image, and
+ // initial set of tags. If supported by the driver, add the
// provided public key to /root/.ssh/authorized_keys.
//
+ // The given InitCommand should be executed on the newly
+ // created instance. This is optional for a driver whose
+ // instances' VerifyHostKey() method never returns
+ // ErrNotImplemented. InitCommand will be under 1 KiB.
+ //
// The returned error should implement RateLimitError and
// QuotaError where applicable.
- Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+ Create(arvados.InstanceType, ImageID, InstanceTags, InitCommand, ssh.PublicKey) (Instance, error)
// Return all instances, including ones that are booting or
// shutting down. Optionally, filter out nodes that don't have
Stop()
}
+type InitCommand string
+
// A Driver returns an InstanceSet that uses the given InstanceSetID
// and driver-dependent configuration parameters.
//
package controller
import (
+ "context"
+
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
-func newHandler(cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
+func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile) service.Handler {
return &Handler{Cluster: cluster, NodeProfile: np}
}
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/sirupsen/logrus"
var _ = check.Suite(&FederationSuite{})
type FederationSuite struct {
- log *logrus.Logger
+ log logrus.FieldLogger
// testServer and testHandler are the controller being tested,
// "zhome".
testServer *httpserver.Server
}
func (s *FederationSuite) SetUpTest(c *check.C) {
- s.log = logrus.New()
- s.log.Formatter = &logrus.JSONFormatter{}
- s.log.Out = &logWriter{c.Log}
+ s.log = ctxlog.TestLogger(c)
s.remoteServer = newServerFromIntegrationTestEnv(c)
c.Assert(s.remoteServer.Start(), check.IsNil)
package controller
import (
+ "context"
"encoding/json"
"net/http"
"net/http/httptest"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
check "gopkg.in/check.v1"
)
type HandlerSuite struct {
cluster *arvados.Cluster
handler http.Handler
+ ctx context.Context
+ cancel context.CancelFunc
}
func (s *HandlerSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.New(os.Stderr, "json", "debug"))
s.cluster = &arvados.Cluster{
ClusterID: "zzzzz",
PostgreSQL: integrationTestCluster().PostgreSQL,
},
}
node := s.cluster.NodeProfiles["*"]
- s.handler = newHandler(s.cluster, &node)
+ s.handler = newHandler(s.ctx, s.cluster, &node)
+}
+
+func (s *HandlerSuite) TearDownTest(c *check.C) {
+ s.cancel()
}
func (s *HandlerSuite) TestProxyDiscoveryDoc(c *check.C) {
package controller
import (
- "bytes"
"net/http"
"os"
"path/filepath"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
- "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
-// logWriter is an io.Writer that writes by calling a "write log"
-// function, typically (*check.C)Log().
-type logWriter struct {
- logfunc func(...interface{})
-}
-
-func (tl *logWriter) Write(buf []byte) (int, error) {
- tl.logfunc(string(bytes.TrimRight(buf, "\n")))
- return len(buf), nil
-}
-
func integrationTestCluster() *arvados.Cluster {
cfg, err := arvados.GetConfig(filepath.Join(os.Getenv("WORKSPACE"), "tmp", "arvados.yml"))
if err != nil {
// Return a new unstarted controller server, using the Rails API
// provided by the integration-testing environment.
func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
- log := logrus.New()
- log.Formatter = &logrus.JSONFormatter{}
- log.Out = &logWriter{c.Log}
+ log := ctxlog.TestLogger(c)
nodeProfile := arvados.NodeProfile{
Controller: arvados.SystemServiceInstance{Listen: ":"},
package dispatchcloud
import (
+ "context"
+
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
-func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
- d := &dispatcher{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
+ d := &dispatcher{Cluster: cluster, Context: ctx}
go d.Start()
return d
}
defer cq.mtx.Unlock()
ctr := cq.current[uuid].Container
if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
- delete(cq.current, uuid)
+ cq.delEnt(uuid, ctr.State)
}
}
cq.current[uuid] = cur
}
}
- for uuid := range cq.current {
+ for uuid, ent := range cq.current {
if _, dontupdate := cq.dontupdate[uuid]; dontupdate {
// Don't expunge an entry that was
// added/updated locally after we started
// the poll response (evidently it's
// cancelled, completed, deleted, or taken by
// a different dispatcher).
- delete(cq.current, uuid)
+ cq.delEnt(uuid, ent.Container.State)
}
}
cq.dontupdate = nil
return nil
}
+// Caller must have lock.
+func (cq *Queue) delEnt(uuid string, state arvados.ContainerState) {
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": state,
+ }).Info("dropping container from queue")
+ delete(cq.current, uuid)
+}
+
func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
it, err := cq.chooseType(&ctr)
if err != nil && (ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked) {
}()
return
}
+ cq.logger.WithFields(logrus.Fields{
+ "ContainerUUID": ctr.UUID,
+ "State": ctr.State,
+ "Priority": ctr.Priority,
+ "InstanceType": it.Name,
+ }).Info("adding container to queue")
cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
}
package dispatchcloud
import (
+ "context"
"crypto/md5"
"encoding/json"
"fmt"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
type dispatcher struct {
Cluster *arvados.Cluster
+ Context context.Context
InstanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- disp.logger = logrus.StandardLogger()
+ disp.logger = ctxlog.FromContext(disp.Context)
if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
if disp.Cluster.ManagementToken == "" {
mux := httprouter.New()
mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+ sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
sched.Start()
defer sched.Stop()
}
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
- params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
- id := cloud.InstanceID(params.ByName("instance_id"))
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
err := disp.pool.SetIdleBehavior(id, want)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusNotFound)
package dispatchcloud
import (
+ "context"
"encoding/json"
"io/ioutil"
"math/rand"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"golang.org/x/crypto/ssh"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&DispatcherSuite{})
type DispatcherSuite struct {
+ ctx context.Context
+ cancel context.CancelFunc
cluster *arvados.Cluster
stubDriver *test.StubDriver
disp *dispatcher
}
func (s *DispatcherSuite) SetUpTest(c *check.C) {
+ s.ctx, s.cancel = context.WithCancel(context.Background())
+ s.ctx = ctxlog.Context(s.ctx, ctxlog.TestLogger(c))
dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
c.Assert(err, check.IsNil)
},
},
}
- s.disp = &dispatcher{Cluster: s.cluster}
+ s.disp = &dispatcher{
+ Cluster: s.cluster,
+ Context: s.ctx,
+ }
// Test cases can modify s.cluster before calling
// initialize(), and then modify private state before calling
// go run().
}
func (s *DispatcherSuite) TearDownTest(c *check.C) {
+ s.cancel()
s.disp.Close()
}
var stale []string
timeout := time.NewTimer(sch.staleLockTimeout)
waiting:
- for {
+ for sch.pool.CountWorkers()[worker.StateUnknown] > 0 {
running := sch.pool.Running()
qEntries, _ := sch.queue.Entries()
select {
case <-wp:
- // Stop waiting if all workers have been
- // contacted.
- if sch.pool.CountWorkers()[worker.StateUnknown] == 0 {
- break waiting
- }
case <-timeout.C:
// Give up.
break waiting
package scheduler
import (
+ "context"
"sync"
"time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
// immediately. Don't try to create any other nodes after the failed
// create.
func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
queue := test.Queue{
ChooseType: chooseType,
Containers: []arvados.Container{
running: map[string]time.Time{},
canCreate: 0,
}
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
c.Check(pool.running, check.HasLen, 1)
// If Create() fails, shutdown some nodes, and don't call Create()
// again. Don't call Create() at all if AtQuota() is true.
func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
for quota := 0; quota < 2; quota++ {
c.Logf("quota=%d", quota)
shouldCreate := []arvados.InstanceType{}
starts: []string{},
canCreate: 0,
}
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, shouldCreate)
c.Check(pool.starts, check.DeepEquals, []string{})
c.Check(pool.shutdowns, check.Not(check.Equals), 0)
// Start lower-priority containers while waiting for new/existing
// workers to come up for higher-priority containers.
func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
pool := stubPool{
unalloc: map[arvados.InstanceType]int{
test.InstanceType(1): 2,
},
}
queue.Update()
- New(test.Logger(), &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+ New(ctx, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
running := map[string]bool{}
package scheduler
import (
+ "context"
"sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/sirupsen/logrus"
)
//
// Any given queue and pool should not be used by more than one
// scheduler at a time.
-func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+func New(ctx context.Context, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
return &Scheduler{
- logger: logger,
+ logger: ctxlog.FromContext(ctx),
queue: queue,
pool: pool,
staleLockTimeout: staleLockTimeout,
// Ensure the queue is fetched once before attempting anything.
for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
sch.logger.Errorf("error updating queue: %s", err)
- d := sch.queueUpdateInterval / 60
+ d := sch.queueUpdateInterval / 10
+ if d < time.Second {
+ d = time.Second
+ }
sch.logger.Infof("waiting %s before retry", d)
time.Sleep(d)
}
} else if !exited.IsZero() && qUpdated.After(exited) {
go cancel(ent, "state=\"Running\" after crunch-run exited")
} else if ent.Container.Priority == 0 {
- go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
+ go kill(ent, "priority=0")
}
case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
if running {
if running && !exited.IsZero() && qUpdated.After(exited) {
logger := sch.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
+ "State": ent.Container.State,
+ "Priority": ent.Container.Priority,
"Exited": time.Since(exited).Seconds(),
})
- logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
+ logger.Info("requeueing locked container after crunch-run exited")
+ err := sch.queue.Unlock(uuid)
+ if err != nil {
+ logger.WithError(err).Error("error requeueing container")
+ }
+ } else if running && exited.IsZero() && ent.Container.Priority == 0 {
+ go kill(ent, "priority=0")
+ } else if !running && ent.Container.Priority == 0 {
+ logger := sch.logger.WithField("ContainerUUID", uuid)
+ logger.Info("unlocking container because priority=0")
err := sch.queue.Unlock(uuid)
if err != nil {
- logger.WithError(err).Info("error requeueing container")
+ logger.WithError(err).Error("error requeueing container")
}
}
default:
- sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ }).Error("BUG: unexpected state")
}
}
}
type Executor struct {
target cloud.ExecutorTarget
targetPort string
+ targetUser string
signers []ssh.Signer
mtx sync.RWMutex // controls access to instance after creation
if h, p, err := net.SplitHostPort(addr); err != nil || p == "" {
// Target address does not specify a port. Use
// targetPort, or "ssh".
+ if h == "" {
+ h = addr
+ }
if p = exr.targetPort; p == "" {
p = "ssh"
}
}
var receivedKey ssh.PublicKey
client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
- User: "root",
+ User: target.RemoteUser(),
Auth: []ssh.AuthMethod{
ssh.PublicKeys(exr.signers...),
},
return 0
},
HostKey: hostpriv,
+ AuthorizedUser: "username",
AuthorizedKeys: []ssh.PublicKey{clientpub},
},
}
return uint32(exitcode)
},
HostKey: hostpriv,
+ AuthorizedUser: "username",
AuthorizedKeys: []ssh.PublicKey{clientpub},
},
}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package test
-
-import (
- "os"
-
- "github.com/sirupsen/logrus"
-)
-
-func Logger() logrus.FieldLogger {
- logger := logrus.StandardLogger()
- if os.Getenv("ARVADOS_DEBUG") != "" {
- logger.SetLevel(logrus.DebugLevel)
- }
- return logger
-}
type SSHService struct {
Exec SSHExecFunc
HostKey ssh.Signer
+ AuthorizedUser string
AuthorizedKeys []ssh.PublicKey
listener net.Listener
return ln.Addr().String()
}
+// RemoteUser returns the username that will be accepted.
+func (ss *SSHService) RemoteUser() string {
+ return ss.AuthorizedUser
+}
+
// Close shuts down the server and releases resources. Established
// connections are unaffected.
func (ss *SSHService) Close() {
"errors"
"fmt"
"io"
+ "io/ioutil"
math_rand "math/rand"
"regexp"
"strings"
}
sis := StubInstanceSet{
driver: sd,
+ logger: logger,
servers: map[cloud.InstanceID]*StubVM{},
}
sd.instanceSets = append(sd.instanceSets, &sis)
type StubInstanceSet struct {
driver *StubDriver
+ logger logrus.FieldLogger
servers map[cloud.InstanceID]*StubVM
mtx sync.RWMutex
stopped bool
allowInstancesCall time.Time
}
-func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, cmd cloud.InitCommand, authKey ssh.PublicKey) (cloud.Instance, error) {
if sis.driver.HoldCloudOps {
sis.driver.holdCloudOps <- true
}
id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
tags: copyTags(tags),
providerType: it.ProviderType,
+ initCommand: cmd,
}
svm.SSHService = SSHService{
HostKey: sis.driver.HostKey,
+ AuthorizedUser: "root",
AuthorizedKeys: ak,
Exec: svm.Exec,
}
sis *StubInstanceSet
id cloud.InstanceID
tags cloud.InstanceTags
+ initCommand cloud.InitCommand
providerType string
SSHService SSHService
running map[string]bool
}
func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+ stdinData, err := ioutil.ReadAll(stdin)
+ if err != nil {
+ fmt.Fprintf(stderr, "error reading stdin: %s\n", err)
+ return 1
+ }
queue := svm.sis.driver.Queue
uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
if eta := svm.Boot.Sub(time.Now()); eta > 0 {
fmt.Fprint(stderr, "crunch-run: command not found\n")
return 1
}
- if strings.HasPrefix(command, "crunch-run --detach ") {
+ if strings.HasPrefix(command, "crunch-run --detach --stdin-env ") {
+ var stdinKV map[string]string
+ err := json.Unmarshal(stdinData, &stdinKV)
+ if err != nil {
+ fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
+ return 1
+ }
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
- if env[name] == "" {
- fmt.Fprintf(stderr, "%s missing from environment %q\n", name, env)
+ if stdinKV[name] == "" {
+ fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdin)
return 1
}
}
svm.Unlock()
time.Sleep(svm.CrunchRunDetachDelay)
fmt.Fprintf(stderr, "starting %s\n", uuid)
- logger := logrus.WithFields(logrus.Fields{
+ logger := svm.sis.logger.WithFields(logrus.Fields{
"Instance": svm.id,
"ContainerUUID": uuid,
})
return si.addr
}
+func (si stubInstance) RemoteUser() string {
+ return si.svm.SSHService.AuthorizedUser
+}
+
func (si stubInstance) Destroy() error {
sis := si.svm.sis
if sis.driver.HoldCloudOps {
package worker
import (
+ "crypto/rand"
"errors"
+ "fmt"
"io"
"sort"
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
+ "golang.org/x/crypto/ssh"
)
const (
- tagKeyInstanceType = "InstanceType"
- tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceType = "InstanceType"
+ tagKeyIdleBehavior = "IdleBehavior"
+ tagKeyInstanceSecret = "InstanceSecret"
)
// An InstanceView shows a worker's current state and recent activity.
type InstanceView struct {
Instance cloud.InstanceID `json:"instance"`
+ Address string `json:"address"`
Price float64 `json:"price"`
ArvadosInstanceType string `json:"arvados_instance_type"`
ProviderInstanceType string `json:"provider_instance_type"`
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ installPublicKey: installPublicKey,
stop: make(chan bool),
}
wp.registerMetrics(reg)
timeoutBooting time.Duration
timeoutProbe time.Duration
timeoutShutdown time.Duration
+ installPublicKey ssh.PublicKey
// private state
subscribers map[<-chan struct{}]chan<- struct{}
throttleCreate throttle
throttleInstances throttle
- mInstances prometheus.Gauge
- mInstancesPrice prometheus.Gauge
mContainersRunning prometheus.Gauge
- mVCPUs prometheus.Gauge
- mVCPUsInuse prometheus.Gauge
- mMemory prometheus.Gauge
- mMemoryInuse prometheus.Gauge
+ mInstances *prometheus.GaugeVec
+ mInstancesPrice *prometheus.GaugeVec
+ mVCPUs *prometheus.GaugeVec
+ mMemory *prometheus.GaugeVec
}
// Subscribe returns a buffered channel that becomes ready after any
if time.Now().Before(wp.atQuotaUntil) || wp.throttleCreate.Error() != nil {
return false
}
- tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
- }
now := time.Now()
wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+ secret := randomHex(instanceSecretLength)
+ tags := cloud.InstanceTags{
+ tagKeyInstanceType: it.Name,
+ tagKeyIdleBehavior: string(IdleBehaviorRun),
+ tagKeyInstanceSecret: secret,
+ }
+ initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
+ inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// Remove our timestamp marker from wp.creating
//
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+ inst = tagVerifier{inst}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
logger := wp.logger.WithFields(logrus.Fields{
"InstanceType": it.Name,
- "Instance": inst,
+ "Instance": inst.ID(),
+ "Address": inst.Address(),
})
logger.WithFields(logrus.Fields{
"State": initialState,
func (wp *Pool) kill(wkr *worker, uuid string) {
logger := wp.logger.WithFields(logrus.Fields{
"ContainerUUID": uuid,
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
})
logger.Debug("killing process")
- stdout, stderr, err := wkr.executor.Execute(nil, "crunch-run --kill 15 "+uuid, nil)
+ cmd := "crunch-run --kill 15 " + uuid
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
logger.WithFields(logrus.Fields{
"stderr": string(stderr),
if reg == nil {
reg = prometheus.NewRegistry()
}
- wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_total",
- Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstances)
- wp.mInstancesPrice = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "instances_price_total",
- Help: "Sum of prices of all cloud VMs including pending, booting, running, held, and shutting down.",
- })
- reg.MustRegister(wp.mInstancesPrice)
wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Help: "Number of containers reported running by cloud VMs.",
})
reg.MustRegister(wp.mContainersRunning)
-
- wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mInstances = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_total",
+ Help: "Number of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstances)
+ wp.mInstancesPrice = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "dispatchcloud",
+ Name: "instances_price",
+ Help: "Price of cloud VMs.",
+ }, []string{"category"})
+ reg.MustRegister(wp.mInstancesPrice)
+ wp.mVCPUs = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "vcpus_total",
Help: "Total VCPUs on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mVCPUs)
- wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "vcpus_inuse",
- Help: "VCPUs on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mVCPUsInuse)
- wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+ wp.mMemory = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "arvados",
Subsystem: "dispatchcloud",
Name: "memory_bytes_total",
Help: "Total memory on all cloud VMs.",
- })
+ }, []string{"category"})
reg.MustRegister(wp.mMemory)
- wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
- Namespace: "arvados",
- Subsystem: "dispatchcloud",
- Name: "memory_bytes_inuse",
- Help: "Memory on cloud VMs that are running containers.",
- })
- reg.MustRegister(wp.mMemoryInuse)
}
func (wp *Pool) runMetrics() {
wp.mtx.RLock()
defer wp.mtx.RUnlock()
- var price float64
- var alloc, cpu, cpuInuse, mem, memInuse int64
+ instances := map[string]int64{}
+ price := map[string]float64{}
+ cpu := map[string]int64{}
+ mem := map[string]int64{}
+ var running int64
for _, wkr := range wp.workers {
- price += wkr.instType.Price
- cpu += int64(wkr.instType.VCPUs)
- mem += int64(wkr.instType.RAM)
- if len(wkr.running)+len(wkr.starting) == 0 {
- continue
+ var cat string
+ switch {
+ case len(wkr.running)+len(wkr.starting) > 0:
+ cat = "inuse"
+ case wkr.idleBehavior == IdleBehaviorHold:
+ cat = "hold"
+ case wkr.state == StateBooting:
+ cat = "booting"
+ case wkr.state == StateUnknown:
+ cat = "unknown"
+ default:
+ cat = "idle"
}
- alloc += int64(len(wkr.running) + len(wkr.starting))
- cpuInuse += int64(wkr.instType.VCPUs)
- memInuse += int64(wkr.instType.RAM)
- }
- wp.mInstances.Set(float64(len(wp.workers)))
- wp.mInstancesPrice.Set(price)
- wp.mContainersRunning.Set(float64(alloc))
- wp.mVCPUs.Set(float64(cpu))
- wp.mMemory.Set(float64(mem))
- wp.mVCPUsInuse.Set(float64(cpuInuse))
- wp.mMemoryInuse.Set(float64(memInuse))
+ instances[cat]++
+ price[cat] += wkr.instType.Price
+ cpu[cat] += int64(wkr.instType.VCPUs)
+ mem[cat] += int64(wkr.instType.RAM)
+ running += int64(len(wkr.running) + len(wkr.starting))
+ }
+ for _, cat := range []string{"inuse", "hold", "booting", "unknown", "idle"} {
+ wp.mInstances.WithLabelValues(cat).Set(float64(instances[cat]))
+ wp.mInstancesPrice.WithLabelValues(cat).Set(price[cat])
+ wp.mVCPUs.WithLabelValues(cat).Set(float64(cpu[cat]))
+ wp.mMemory.WithLabelValues(cat).Set(float64(mem[cat]))
+ }
+ wp.mContainersRunning.Set(float64(running))
}
func (wp *Pool) runProbes() {
for _, w := range wp.workers {
r = append(r, InstanceView{
Instance: w.instance.ID(),
+ Address: w.instance.Address(),
Price: w.instType.Price,
ArvadosInstanceType: w.instType.Name,
ProviderInstanceType: w.instType.ProviderType,
continue
}
logger := wp.logger.WithFields(logrus.Fields{
- "Instance": wkr.instance,
+ "Instance": wkr.instance.ID(),
"WorkerState": wkr.state,
})
logger.Info("instance disappeared in cloud")
go wp.notify()
}
}
+
+// Return a random string of n hexadecimal digits (n*4 random bits). n
+// must be even.
+func randomHex(n int) string {
+ buf := make([]byte, n/2)
+ _, err := rand.Read(buf)
+ if err != nil {
+ panic(err)
+ }
+ return fmt.Sprintf("%x", buf)
+}
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
}
}
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
is, err := driver.InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
},
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, cluster)
+ pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
instanceSet, err := driver.InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+
+ "git.curoverse.com/arvados.git/lib/cloud"
+ "golang.org/x/crypto/ssh"
+)
+
+var (
+ errBadInstanceSecret = errors.New("bad instance secret")
+
+ // filename on instance, as given to shell (quoted accordingly)
+ instanceSecretFilename = "/var/run/arvados-instance-secret"
+ instanceSecretLength = 40 // hex digits
+)
+
+type tagVerifier struct {
+ cloud.Instance
+}
+
+func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
+ expectSecret := tv.Instance.Tags()[tagKeyInstanceSecret]
+ if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || expectSecret == "" {
+ // If the wrapped instance indicates it has a way to
+ // verify the key, return that decision.
+ return err
+ }
+ session, err := client.NewSession()
+ if err != nil {
+ return err
+ }
+ defer session.Close()
+ var stdout, stderr bytes.Buffer
+ session.Stdin = bytes.NewBuffer(nil)
+ session.Stdout = &stdout
+ session.Stderr = &stderr
+ cmd := fmt.Sprintf("cat %s", instanceSecretFilename)
+ if u := tv.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ err = session.Run(cmd)
+ if err != nil {
+ return err
+ }
+ if stdout.String() != expectSecret {
+ return errBadInstanceSecret
+ }
+ return nil
+}
import (
"bytes"
+ "encoding/json"
"fmt"
"strings"
"sync"
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/stats"
"github.com/sirupsen/logrus"
)
"ContainerUUID": ctr.UUID,
"Priority": ctr.Priority,
})
- logger = logger.WithField("Instance", wkr.instance)
+ logger = logger.WithField("Instance", wkr.instance.ID())
logger.Debug("starting container")
wkr.starting[ctr.UUID] = struct{}{}
wkr.state = StateRunning
"ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
}
- stdout, stderr, err := wkr.executor.Execute(env, "crunch-run --detach '"+ctr.UUID+"'", nil)
+ if wkr.wp.arvClient.Insecure {
+ env["ARVADOS_API_HOST_INSECURE"] = "1"
+ }
+ envJSON, err := json.Marshal(env)
+ if err != nil {
+ panic(err)
+ }
+ stdin := bytes.NewBuffer(envJSON)
+ cmd := "crunch-run --detach --stdin-env '" + ctr.UUID + "'"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
+ stdout, stderr, err := wkr.executor.Execute(nil, cmd, stdin)
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
now := time.Now()
func (wkr *worker) probeRunning() (running []string, ok bool) {
cmd := "crunch-run --list"
+ if u := wkr.instance.RemoteUser(); u != "root" {
+ cmd = "sudo " + cmd
+ }
stdout, stderr, err := wkr.executor.Execute(nil, cmd, nil)
if err != nil {
wkr.logger.WithFields(logrus.Fields{
wkr.logger.WithFields(logrus.Fields{
"State": wkr.state,
- "Age": age,
+ "IdleDuration": stats.Duration(age),
"IdleBehavior": wkr.idleBehavior,
}).Info("shutdown idle worker")
wkr.shutdown()
// match. Caller must have lock.
func (wkr *worker) saveTags() {
instance := wkr.instance
- have := instance.Tags()
- want := cloud.InstanceTags{
+ tags := instance.Tags()
+ update := cloud.InstanceTags{
tagKeyInstanceType: wkr.instType.Name,
tagKeyIdleBehavior: string(wkr.idleBehavior),
}
- go func() {
- for k, v := range want {
- if v == have[k] {
- continue
- }
- err := instance.SetTags(want)
+ save := false
+ for k, v := range update {
+ if tags[k] != v {
+ tags[k] = v
+ save = true
+ }
+ }
+ if save {
+ go func() {
+ err := instance.SetTags(tags)
if err != nil {
- wkr.wp.logger.WithField("Instance", instance).WithError(err).Warnf("error updating tags")
+ wkr.wp.logger.WithField("Instance", instance.ID()).WithError(err).Warnf("error updating tags")
}
- break
-
- }
- }()
+ }()
+ }
}
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
type WorkerSuite struct{}
func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
- logger := test.Logger()
+ logger := ctxlog.TestLogger(c)
bootTimeout := time.Minute
probeTimeout := time.Second
is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
c.Assert(err, check.IsNil)
- inst, err := is.Create(arvados.InstanceType{}, "", nil, nil)
+ inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
type trialT struct {
package service
import (
+ "context"
"flag"
"fmt"
"io"
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/coreos/go-systemd/daemon"
"github.com/sirupsen/logrus"
CheckHealth() error
}
-type NewHandlerFunc func(*arvados.Cluster, *arvados.NodeProfile) Handler
+type NewHandlerFunc func(context.Context, *arvados.Cluster, *arvados.NodeProfile) Handler
type command struct {
newHandler NewHandlerFunc
}
func (c *command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- log := logrus.New()
- log.Formatter = &logrus.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- }
- log.Out = stderr
+ log := ctxlog.New(stderr, "json", "info")
var err error
defer func() {
if err != nil {
return 1
}
+ log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
+ "PID": os.Getpid(),
+ })
+ ctx := ctxlog.Context(context.Background(), log)
profileName := *nodeProfile
if profileName == "" {
profileName = os.Getenv("ARVADOS_NODE_PROFILE")
err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
return 1
}
- handler := c.newHandler(cluster, profile)
+ handler := c.newHandler(ctx, cluster, profile)
if err = handler.CheckHealth(); err != nil {
return 1
}
RemoteClusters map[string]RemoteCluster
PostgreSQL PostgreSQL
RequestLimits RequestLimits
+ Logging Logging
+}
+
+type Logging struct {
+ Level string
+ Format string
}
type PostgreSQL struct {
if _, ok := (*it)[t.Name]; ok {
return errDuplicateInstanceTypeName
}
+ if t.ProviderType == "" {
+ t.ProviderType = t.Name
+ }
(*it)[t.Name] = t
}
return nil
if err != nil {
return err
}
- // Fill in Name field using hash key.
+ // Fill in Name field (and ProviderType field, if not
+ // specified) using hash key.
*it = InstanceTypeMap(hash)
for name, t := range *it {
t.Name = name
+ if t.ProviderType == "" {
+ t.ProviderType = name
+ }
(*it)[name] = t
}
return nil
package ctxlog
import (
+ "bytes"
"context"
+ "io"
+ "os"
"github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
)
var (
// Context returns a new child context such that FromContext(child)
// returns the given logger.
-func Context(ctx context.Context, logger *logrus.Entry) context.Context {
+func Context(ctx context.Context, logger logrus.FieldLogger) context.Context {
return context.WithValue(ctx, loggerCtxKey, logger)
}
// FromContext returns the logger suitable for the given context -- the one
// attached by contextWithLogger() if applicable, otherwise the
// top-level logger with no fields/values.
-func FromContext(ctx context.Context) *logrus.Entry {
+func FromContext(ctx context.Context) logrus.FieldLogger {
if ctx != nil {
- if logger, ok := ctx.Value(loggerCtxKey).(*logrus.Entry); ok {
+ if logger, ok := ctx.Value(loggerCtxKey).(logrus.FieldLogger); ok {
return logger
}
}
return rootLogger.WithFields(nil)
}
+// New returns a new logger with the indicated format and
+// level.
+func New(out io.Writer, format, level string) logrus.FieldLogger {
+ logger := logrus.New()
+ logger.Out = out
+ setFormat(logger, format)
+ setLevel(logger, level)
+ return logger
+}
+
+func TestLogger(c *check.C) logrus.FieldLogger {
+ logger := logrus.New()
+ logger.Out = &logWriter{c.Log}
+ setFormat(logger, "text")
+ if d := os.Getenv("ARVADOS_DEBUG"); d != "0" && d != "" {
+ setLevel(logger, "debug")
+ } else {
+ setLevel(logger, "info")
+ }
+ return logger
+}
+
// SetLevel sets the current logging level. See logrus for level
// names.
func SetLevel(level string) {
- lvl, err := logrus.ParseLevel(level)
- if err != nil {
- logrus.Fatal(err)
+ setLevel(rootLogger, level)
+}
+
+func setLevel(logger *logrus.Logger, level string) {
+ if level == "" {
+ } else if lvl, err := logrus.ParseLevel(level); err != nil {
+ logrus.WithField("Level", level).Fatal("unknown log level")
+ } else {
+ logger.Level = lvl
}
- rootLogger.Level = lvl
}
// SetFormat sets the current logging format to "json" or "text".
func SetFormat(format string) {
+ setFormat(rootLogger, format)
+}
+
+func setFormat(logger *logrus.Logger, format string) {
switch format {
case "text":
- rootLogger.Formatter = &logrus.TextFormatter{
+ logger.Formatter = &logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: rfc3339NanoFixed,
}
- case "json":
- rootLogger.Formatter = &logrus.JSONFormatter{
+ case "json", "":
+ logger.Formatter = &logrus.JSONFormatter{
TimestampFormat: rfc3339NanoFixed,
}
default:
- logrus.WithField("LogFormat", format).Fatal("unknown log format")
+ logrus.WithField("Format", format).Fatal("unknown log format")
}
}
+
+// logWriter is an io.Writer that writes by calling a "write log"
+// function, typically (*check.C)Log().
+type logWriter struct {
+ logfunc func(...interface{})
+}
+
+func (tl *logWriter) Write(buf []byte) (int, error) {
+ tl.logfunc(string(bytes.TrimRight(buf, "\n")))
+ return len(buf), nil
+}
else
kwargs = {}
end
+ if users_list.select { |u| u.is_admin }.any?
+ return super
+ end
Container.where(ContainerRequest.readable_by(*users_list).where("containers.uuid = container_requests.container_uuid").exists)
end
assert_equal 1, Container.readable_by(users(:active)).where(state: "Queued").count
end
+ test "Containers with no matching request are readable by admin" do
+ uuids = Container.includes('container_requests').where(container_requests: {uuid: nil}).collect(&:uuid)
+ assert_not_empty uuids
+ assert_empty Container.readable_by(users(:active)).where(uuid: uuids)
+ assert_not_empty Container.readable_by(users(:admin)).where(uuid: uuids)
+ assert_equal uuids.count, Container.readable_by(users(:admin)).where(uuid: uuids).count
+ end
+
test "Container locked cancel" do
set_user_from_auth :active
c, _ = minimal_new
"encoding/json"
"fmt"
"io"
- "io/ioutil"
"os"
"os/exec"
"path/filepath"
// procinfo is saved in each process's lockfile.
type procinfo struct {
- UUID string
- PID int
- Stdout string
- Stderr string
+ UUID string
+ PID int
}
// Detach acquires a lock for the given uuid, and starts the current
// program as a child process (with -no-detach prepended to the given
// arguments so the child knows not to detach again). The lock is
// passed along to the child process.
+//
+// Stdout and stderr in the child process are sent to the systemd
+// journal using the systemd-cat program.
func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
return exitcode(stderr, detach(uuid, args, stdout, stderr))
}
return nil, err
}
defer dirlock.Close()
- lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+ lockfilename := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
+ lockfile, err := os.OpenFile(lockfilename, os.O_CREATE|os.O_RDWR, 0700)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("open %s: %s", lockfilename, err)
}
err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
lockfile.Close()
- return nil, err
+ return nil, fmt.Errorf("lock %s: %s", lockfilename, err)
}
return lockfile, nil
}()
defer lockfile.Close()
lockfile.Truncate(0)
- outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-")
- if err != nil {
- return err
- }
- defer outfile.Close()
- errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-")
- if err != nil {
- os.Remove(outfile.Name())
- return err
- }
- defer errfile.Close()
-
- cmd := exec.Command(args[0], append([]string{"-no-detach"}, args[1:]...)...)
- cmd.Stdout = outfile
- cmd.Stderr = errfile
+ cmd := exec.Command("systemd-cat", append([]string{"--identifier=crunch-run", args[0], "-no-detach"}, args[1:]...)...)
// Child inherits lockfile.
cmd.ExtraFiles = []*os.File{lockfile}
// Ensure child isn't interrupted even if we receive signals
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
err = cmd.Start()
if err != nil {
- os.Remove(outfile.Name())
- os.Remove(errfile.Name())
- return err
+ return fmt.Errorf("exec %s: %s", cmd.Path, err)
}
w := io.MultiWriter(stdout, lockfile)
- err = json.NewEncoder(w).Encode(procinfo{
- UUID: uuid,
- PID: cmd.Process.Pid,
- Stdout: outfile.Name(),
- Stderr: errfile.Name(),
+ return json.NewEncoder(w).Encode(procinfo{
+ UUID: uuid,
+ PID: cmd.Process.Pid,
})
- if err != nil {
- os.Remove(outfile.Name())
- os.Remove(errfile.Name())
- return err
- }
- return nil
}
// KillProcess finds the crunch-run process corresponding to the given
if os.IsNotExist(err) {
return nil
} else if err != nil {
- return err
+ return fmt.Errorf("open %s: %s", path, err)
}
defer f.Close()
var pi procinfo
err = json.NewDecoder(f).Decode(&pi)
if err != nil {
- return fmt.Errorf("%s: %s\n", path, err)
+ return fmt.Errorf("decode %s: %s\n", path, err)
}
if pi.UUID != uuid || pi.PID == 0 {
proc, err := os.FindProcess(pi.PID)
if err != nil {
- return err
+ return fmt.Errorf("%s: find process %d: %s", uuid, pi.PID, err)
}
err = proc.Signal(signal)
err = proc.Signal(syscall.Signal(0))
}
if err == nil {
- return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
+ return fmt.Errorf("%s: pid %d: sent signal %d (%s) but process is still alive", uuid, pi.PID, signal, signal)
}
- fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err)
+ fmt.Fprintf(stderr, "%s: pid %d: %s\n", uuid, pi.PID, err)
return nil
}
// List UUIDs of active crunch-run processes.
func ListProcesses(stdout, stderr io.Writer) int {
- return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
- if info.IsDir() {
+ // filepath.Walk does not follow symlinks, so we must walk
+ // lockdir+"/." in case lockdir itself is a symlink.
+ walkdir := lockdir + "/."
+ return exitcode(stderr, filepath.Walk(walkdir, func(path string, info os.FileInfo, err error) error {
+ if info.IsDir() && path != walkdir {
return filepath.SkipDir
}
if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
err := os.Remove(path)
dirlock.Close()
if err != nil {
- fmt.Fprintln(stderr, err)
+ fmt.Fprintf(stderr, "unlink %s: %s\n", f.Name(), err)
}
return nil
}
//
// Caller releases the lock by closing the returned file.
func lockall() (*os.File, error) {
- f, err := os.OpenFile(filepath.Join(lockdir, lockprefix+"all"+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+ lockfile := filepath.Join(lockdir, lockprefix+"all"+locksuffix)
+ f, err := os.OpenFile(lockfile, os.O_CREATE|os.O_RDWR, 0700)
if err != nil {
- return nil, err
+ return nil, fmt.Errorf("open %s: %s", lockfile, err)
}
err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
if err != nil {
f.Close()
- return nil, err
+ return nil, fmt.Errorf("lock %s: %s", lockfile, err)
}
return f, nil
}
cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+ stdinEnv := flag.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
flag.Parse()
+ if *stdinEnv && !ignoreDetachFlag {
+ // Load env vars on stdin if asked (but not in a
+ // detached child process, in which case stdin is
+ // /dev/null).
+ loadEnv(os.Stdin)
+ }
+
switch {
case *detach && !ignoreDetachFlag:
os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
log.Fatalf("%s: %v", containerId, runerr)
}
}
+
+func loadEnv(rdr io.Reader) {
+ buf, err := ioutil.ReadAll(rdr)
+ if err != nil {
+ log.Fatalf("read stdin: %s", err)
+ }
+ var env map[string]string
+ err = json.Unmarshal(buf, &env)
+ if err != nil {
+ log.Fatalf("decode stdin: %s", err)
+ }
+ for k, v := range env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Fatalf("setenv(%q): %s", k, err)
+ }
+ }
+}
permChecker permChecker
subscriptions []v0subscribe
lastMsgID uint64
- log *logrus.Entry
+ log logrus.FieldLogger
mtx sync.Mutex
setupOnce sync.Once
}