From: Tom Clegg Date: Thu, 20 Dec 2018 18:58:26 +0000 (-0500) Subject: 14360: Merge branch 'master' into 14360-dispatch-cloud X-Git-Tag: 1.4.0~180^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/fe33cfe407912df46ae34b5419ebdc84650829f6?hp=eabe449b9a17e14eb2ba67f6e5cb8ce1dafc8005 14360: Merge branch 'master' into 14360-dispatch-cloud Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/.licenseignore b/.licenseignore index 83c81b2fc2..06519a98e8 100644 --- a/.licenseignore +++ b/.licenseignore @@ -71,4 +71,5 @@ sdk/R/NAMESPACE sdk/R/.Rbuildignore sdk/R/ArvadosR.Rproj *.Rd +lib/dispatchcloud/test/sshkey_* *.asc diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh index 73e2e62863..f316c563bd 100755 --- a/build/run-build-packages.sh +++ b/build/run-build-packages.sh @@ -294,6 +294,9 @@ package_go_binary cmd/arvados-server arvados-server \ "Arvados server daemons" package_go_binary cmd/arvados-server arvados-controller \ "Arvados cluster controller daemon" +# No package until #14325 +#package_go_binary cmd/arvados-server crunch-dispatch-cloud \ +# "Arvados cluster cloud dispatch" package_go_binary sdk/go/crunchrunner crunchrunner \ "Crunchrunner executes a command inside a container and uploads the output" package_go_binary services/arv-git-httpd arvados-git-httpd \ diff --git a/build/run-tests.sh b/build/run-tests.sh index cd44347cba..cb44372566 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -77,6 +77,10 @@ lib/cmd lib/controller lib/crunchstat lib/dispatchcloud +lib/dispatchcloud/container +lib/dispatchcloud/scheduler +lib/dispatchcloud/ssh_executor +lib/dispatchcloud/worker services/api services/arv-git-httpd services/crunchstat @@ -926,6 +930,10 @@ gostuff=( lib/controller lib/crunchstat lib/dispatchcloud + lib/dispatchcloud/container + lib/dispatchcloud/scheduler + lib/dispatchcloud/ssh_executor + lib/dispatchcloud/worker sdk/go/arvados sdk/go/arvadosclient sdk/go/auth diff --git a/cmd/arvados-server/cmd.go b/cmd/arvados-server/cmd.go index 1af3745df0..cd15d25dda 100644 --- a/cmd/arvados-server/cmd.go +++ b/cmd/arvados-server/cmd.go @@ -9,6 +9,7 @@ import ( "git.curoverse.com/arvados.git/lib/cmd" "git.curoverse.com/arvados.git/lib/controller" + "git.curoverse.com/arvados.git/lib/dispatchcloud" ) var ( @@ -18,7 +19,8 @@ var ( "-version": cmd.Version(version), "--version": cmd.Version(version), - "controller": controller.Command, + "controller": controller.Command, + "dispatch-cloud": dispatchcloud.Command, }) ) diff --git a/cmd/arvados-server/crunch-dispatch-cloud.service b/cmd/arvados-server/crunch-dispatch-cloud.service new file mode 100644 index 0000000000..f8d71c9753 --- /dev/null +++ b/cmd/arvados-server/crunch-dispatch-cloud.service @@ -0,0 +1,28 @@ +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + +[Unit] +Description=Arvados cloud dispatch +Documentation=https://doc.arvados.org/ +After=network.target +AssertPathExists=/etc/arvados/config.yml + +# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section +StartLimitInterval=0 + +# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section +StartLimitIntervalSec=0 + +[Service] +Type=notify +EnvironmentFile=-/etc/arvados/environment +ExecStart=/usr/bin/crunch-dispatch-cloud +Restart=always +RestartSec=1 + +# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section +StartLimitInterval=0 + +[Install] +WantedBy=multi-user.target diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go new file mode 100644 index 0000000000..e3a072582b --- /dev/null +++ b/lib/cloud/interfaces.go @@ -0,0 +1,179 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package cloud + +import ( + "io" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "golang.org/x/crypto/ssh" +) + +// A RateLimitError should be returned by an InstanceSet when the +// cloud service indicates it is rejecting all API calls for some time +// interval. +type RateLimitError interface { + // Time before which the caller should expect requests to + // fail. + EarliestRetry() time.Time + error +} + +// A QuotaError should be returned by an InstanceSet when the cloud +// service indicates the account cannot create more VMs than already +// exist. +type QuotaError interface { + // If true, don't create more instances until some existing + // instances are destroyed. If false, don't handle the error + // as a quota error. + IsQuotaError() bool + error +} + +type InstanceSetID string +type InstanceTags map[string]string +type InstanceID string +type ImageID string + +// An Executor executes commands on an ExecutorTarget. +type Executor interface { + // Update the set of private keys used to authenticate to + // targets. + SetSigners(...ssh.Signer) + + // Set the target used for subsequent command executions. + SetTarget(ExecutorTarget) + + // Return the current target. + Target() ExecutorTarget + + // Execute a shell command and return the resulting stdout and + // stderr. stdin can be nil. + Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error) +} + +// An ExecutorTarget is a remote command execution service. +type ExecutorTarget interface { + // SSH server hostname or IP address, or empty string if + // unknown while instance is booting. + Address() string + + // Return nil if the given public key matches the instance's + // SSH server key. If the provided Dialer is not nil, + // VerifyHostKey can use it to make outgoing network + // connections from the instance -- e.g., to use the cloud's + // "this instance's metadata" API. + VerifyHostKey(ssh.PublicKey, *ssh.Client) error +} + +// Instance is implemented by the provider-specific instance types. +type Instance interface { + ExecutorTarget + + // ID returns the provider's instance ID. It must be stable + // for the life of the instance. + ID() InstanceID + + // String typically returns the cloud-provided instance ID. + String() string + + // Cloud provider's "instance type" ID. Matches a ProviderType + // in the cluster's InstanceTypes configuration. + ProviderType() string + + // Get current tags + Tags() InstanceTags + + // Replace tags with the given tags + SetTags(InstanceTags) error + + // Shut down the node + Destroy() error +} + +// An InstanceSet manages a set of VM instances created by an elastic +// cloud provider like AWS, GCE, or Azure. +// +// All public methods of an InstanceSet, and all public methods of the +// instances it returns, are goroutine safe. +type InstanceSet interface { + // Create a new instance. If supported by the driver, add the + // provided public key to /root/.ssh/authorized_keys. + // + // The returned error should implement RateLimitError and + // QuotaError where applicable. + Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error) + + // Return all instances, including ones that are booting or + // shutting down. Optionally, filter out nodes that don't have + // all of the given InstanceTags (the caller will ignore these + // anyway). + // + // An instance returned by successive calls to Instances() may + // -- but does not need to -- be represented by the same + // Instance object each time. Thus, the caller is responsible + // for de-duplicating the returned instances by comparing the + // InstanceIDs returned by the instances' ID() methods. + Instances(InstanceTags) ([]Instance, error) + + // Stop any background tasks and release other resources. + Stop() +} + +// A Driver returns an InstanceSet that uses the given InstanceSetID +// and driver-dependent configuration parameters. +// +// The supplied id will be of the form "zzzzz-zzzzz-zzzzzzzzzzzzzzz" +// where each z can be any alphanum. The returned InstanceSet must use +// this id to tag long-lived cloud resources that it creates, and must +// assume control of any existing resources that are tagged with the +// same id. Tagging can be accomplished by including the ID in +// resource names, using the cloud provider's tagging feature, or any +// other mechanism. The tags must be visible to another instance of +// the same driver running on a different host. +// +// The returned InstanceSet must ignore existing resources that are +// visible but not tagged with the given id, except that it should log +// a summary of such resources -- only once -- when it starts +// up. Thus, two identically configured InstanceSets running on +// different hosts with different ids should log about the existence +// of each other's resources at startup, but will not interfere with +// each other. +// +// Example: +// +// type exampleInstanceSet struct { +// ownID string +// AccessKey string +// } +// +// type exampleDriver struct {} +// +// func (*exampleDriver) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) { +// var is exampleInstanceSet +// if err := mapstructure.Decode(config, &is); err != nil { +// return nil, err +// } +// is.ownID = id +// return &is, nil +// } +// +// var _ = registerCloudDriver("example", &exampleDriver{}) +type Driver interface { + InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) +} + +// DriverFunc makes a Driver using the provided function as its +// InstanceSet method. This is similar to http.HandlerFunc. +func DriverFunc(fn func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)) Driver { + return driverFunc(fn) +} + +type driverFunc func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) + +func (df driverFunc) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) { + return df(config, id) +} diff --git a/lib/cmd/cmd.go b/lib/cmd/cmd.go index 8c65cf7acf..9292ef7e5f 100644 --- a/lib/cmd/cmd.go +++ b/lib/cmd/cmd.go @@ -36,8 +36,9 @@ func (v Version) RunCommand(prog string, args []string, stdin io.Reader, stdout, return 0 } -// Multi is a Handler that looks up its first argument in a map, and -// invokes the resulting Handler with the remaining args. +// Multi is a Handler that looks up its first argument in a map (after +// stripping any "arvados-" or "crunch-" prefix), and invokes the +// resulting Handler with the remaining args. // // Example: // diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go new file mode 100644 index 0000000000..92948fb300 --- /dev/null +++ b/lib/dispatchcloud/cmd.go @@ -0,0 +1,19 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package dispatchcloud + +import ( + "git.curoverse.com/arvados.git/lib/cmd" + "git.curoverse.com/arvados.git/lib/service" + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler) + +func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler { + d := &dispatcher{Cluster: cluster} + go d.Start() + return d +} diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go new file mode 100644 index 0000000000..432f4d4884 --- /dev/null +++ b/lib/dispatchcloud/container/queue.go @@ -0,0 +1,378 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package container + +import ( + "io" + "sync" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" + "github.com/prometheus/client_golang/prometheus" +) + +type typeChooser func(*arvados.Container) (arvados.InstanceType, error) + +// An APIClient performs Arvados API requests. It is typically an +// *arvados.Client. +type APIClient interface { + RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error +} + +// A QueueEnt is an entry in the queue, consisting of a container +// record and the instance type that should be used to run it. +type QueueEnt struct { + // The container to run. Only the UUID, State, Priority, and + // RuntimeConstraints fields are populated. + Container arvados.Container + InstanceType arvados.InstanceType +} + +// String implements fmt.Stringer by returning the queued container's +// UUID. +func (c *QueueEnt) String() string { + return c.Container.UUID +} + +// A Queue is an interface to an Arvados cluster's container +// database. It presents only the containers that are eligible to be +// run by, are already being run by, or have recently been run by the +// present dispatcher. +// +// The Entries, Get, and Forget methods do not block: they return +// immediately, using cached data. +// +// The updating methods (Cancel, Lock, Unlock, Update) do block: they +// return only after the operation has completed. +// +// A Queue's Update method should be called periodically to keep the +// cache up to date. +type Queue struct { + logger logrus.FieldLogger + reg *prometheus.Registry + chooseType typeChooser + client APIClient + + auth *arvados.APIClientAuthorization + current map[string]QueueEnt + updated time.Time + mtx sync.Mutex + + // Methods that modify the Queue (like Lock) add the affected + // container UUIDs to dontupdate. When applying a batch of + // updates received from the network, anything appearing in + // dontupdate is skipped, in case the received update has + // already been superseded by the locally initiated change. + // When no network update is in progress, this protection is + // not needed, and dontupdate is nil. + dontupdate map[string]struct{} + + // active notification subscribers (see Subscribe) + subscribers map[<-chan struct{}]chan struct{} +} + +// NewQueue returns a new Queue. When a new container appears in the +// Arvados cluster's queue during Update, chooseType will be called to +// assign an appropriate arvados.InstanceType for the queue entry. +func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue { + return &Queue{ + logger: logger, + reg: reg, + chooseType: chooseType, + client: client, + current: map[string]QueueEnt{}, + subscribers: map[<-chan struct{}]chan struct{}{}, + } +} + +// Subscribe returns a channel that becomes ready to receive when an +// entry in the Queue is updated. +// +// ch := q.Subscribe() +// defer q.Unsubscribe(ch) +// for range ch { +// // ... +// } +func (cq *Queue) Subscribe() <-chan struct{} { + cq.mtx.Lock() + defer cq.mtx.Unlock() + ch := make(chan struct{}, 1) + cq.subscribers[ch] = ch + return ch +} + +// Unsubscribe stops sending updates to the given channel. See +// Subscribe. +func (cq *Queue) Unsubscribe(ch <-chan struct{}) { + cq.mtx.Lock() + defer cq.mtx.Unlock() + delete(cq.subscribers, ch) +} + +// Caller must have lock. +func (cq *Queue) notify() { + for _, ch := range cq.subscribers { + select { + case ch <- struct{}{}: + default: + } + } +} + +// Forget drops the specified container from the cache. It should be +// called on finalized containers to avoid leaking memory over +// time. It is a no-op if the indicated container is not in a +// finalized state. +func (cq *Queue) Forget(uuid string) { + cq.mtx.Lock() + defer cq.mtx.Unlock() + ctr := cq.current[uuid].Container + if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled { + delete(cq.current, uuid) + } +} + +// Get returns the (partial) Container record for the specified +// container. Like a map lookup, its second return value is false if +// the specified container is not in the Queue. +func (cq *Queue) Get(uuid string) (arvados.Container, bool) { + cq.mtx.Lock() + defer cq.mtx.Unlock() + if ctr, ok := cq.current[uuid]; !ok { + return arvados.Container{}, false + } else { + return ctr.Container, true + } +} + +// Entries returns all cache entries, keyed by container UUID. +// +// The returned threshold indicates the maximum age of any cached data +// returned in the map. This makes it possible for a scheduler to +// determine correctly the outcome of a remote process that updates +// container state. It must first wait for the remote process to exit, +// then wait for the Queue to start and finish its next Update -- +// i.e., it must wait until threshold > timeProcessExited. +func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) { + cq.mtx.Lock() + defer cq.mtx.Unlock() + entries = make(map[string]QueueEnt, len(cq.current)) + for uuid, ctr := range cq.current { + entries[uuid] = ctr + } + threshold = cq.updated + return +} + +// Update refreshes the cache from the Arvados API. It adds newly +// queued containers, and updates the state of previously queued +// containers. +func (cq *Queue) Update() error { + cq.mtx.Lock() + cq.dontupdate = map[string]struct{}{} + updateStarted := time.Now() + cq.mtx.Unlock() + + next, err := cq.poll() + if err != nil { + return err + } + + cq.mtx.Lock() + defer cq.mtx.Unlock() + for uuid, ctr := range next { + if _, keep := cq.dontupdate[uuid]; keep { + continue + } + if cur, ok := cq.current[uuid]; !ok { + cq.addEnt(uuid, *ctr) + } else { + cur.Container = *ctr + cq.current[uuid] = cur + } + } + for uuid := range cq.current { + if _, keep := cq.dontupdate[uuid]; keep { + continue + } else if _, keep = next[uuid]; keep { + continue + } else { + delete(cq.current, uuid) + } + } + cq.dontupdate = nil + cq.updated = updateStarted + cq.notify() + return nil +} + +func (cq *Queue) addEnt(uuid string, ctr arvados.Container) { + it, err := cq.chooseType(&ctr) + if err != nil { + // FIXME: throttle warnings, cancel after timeout + cq.logger.Warnf("cannot run %s", &ctr) + return + } + cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it} +} + +// Lock acquires the dispatch lock for the given container. +func (cq *Queue) Lock(uuid string) error { + return cq.apiUpdate(uuid, "lock") +} + +// Unlock releases the dispatch lock for the given container. +func (cq *Queue) Unlock(uuid string) error { + return cq.apiUpdate(uuid, "unlock") +} + +// Cancel cancels the given container. +func (cq *Queue) Cancel(uuid string) error { + err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{ + "container": {"state": arvados.ContainerStateCancelled}, + }) + if err != nil { + return err + } + cq.mtx.Lock() + defer cq.mtx.Unlock() + cq.notify() + return nil +} + +func (cq *Queue) apiUpdate(uuid, action string) error { + var resp arvados.Container + err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil) + if err != nil { + return err + } + + cq.mtx.Lock() + defer cq.mtx.Unlock() + if cq.dontupdate != nil { + cq.dontupdate[uuid] = struct{}{} + } + if ent, ok := cq.current[uuid]; !ok { + cq.addEnt(uuid, resp) + } else { + ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID + cq.current[uuid] = ent + } + cq.notify() + return nil +} + +func (cq *Queue) poll() (map[string]*arvados.Container, error) { + cq.mtx.Lock() + size := len(cq.current) + auth := cq.auth + cq.mtx.Unlock() + + if auth == nil { + auth = &arvados.APIClientAuthorization{} + err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil) + if err != nil { + return nil, err + } + cq.mtx.Lock() + cq.auth = auth + cq.mtx.Unlock() + } + + next := make(map[string]*arvados.Container, size) + apply := func(updates []arvados.Container) { + for _, upd := range updates { + if next[upd.UUID] == nil { + next[upd.UUID] = &arvados.Container{} + } + *next[upd.UUID] = upd + } + } + selectParam := []string{"uuid", "state", "priority", "runtime_constraints"} + limitParam := 1000 + + mine, err := cq.fetchAll(arvados.ResourceListParams{ + Select: selectParam, + Order: "uuid", + Limit: &limitParam, + Count: "none", + Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}}, + }) + if err != nil { + return nil, err + } + apply(mine) + + avail, err := cq.fetchAll(arvados.ResourceListParams{ + Select: selectParam, + Order: "uuid", + Limit: &limitParam, + Count: "none", + Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}}, + }) + if err != nil { + return nil, err + } + apply(avail) + + var missing []string + cq.mtx.Lock() + for uuid, ent := range cq.current { + if next[uuid] == nil && + ent.Container.State != arvados.ContainerStateCancelled && + ent.Container.State != arvados.ContainerStateComplete { + missing = append(missing, uuid) + } + } + cq.mtx.Unlock() + + for i, page := 0, 20; i < len(missing); i += page { + batch := missing[i:] + if len(batch) > page { + batch = batch[:page] + } + ended, err := cq.fetchAll(arvados.ResourceListParams{ + Select: selectParam, + Order: "uuid", + Count: "none", + Filters: []arvados.Filter{{"uuid", "in", batch}}, + }) + if err != nil { + return nil, err + } + apply(ended) + } + return next, nil +} + +func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) { + var results []arvados.Container + params := initialParams + params.Offset = 0 + for { + // This list variable must be a new one declared + // inside the loop: otherwise, items in the API + // response would get deep-merged into the items + // loaded in previous iterations. + var list arvados.ContainerList + + err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params) + if err != nil { + return nil, err + } + if len(list.Items) == 0 { + break + } + + results = append(results, list.Items...) + if len(params.Order) == 1 && params.Order == "uuid" { + params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID}) + } else { + params.Offset += len(list.Items) + } + } + return results, nil +} diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go new file mode 100644 index 0000000000..81ad0ed3fa --- /dev/null +++ b/lib/dispatchcloud/dispatcher.go @@ -0,0 +1,197 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package dispatchcloud + +import ( + "crypto/md5" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/lib/dispatchcloud/container" + "git.curoverse.com/arvados.git/lib/dispatchcloud/scheduler" + "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor" + "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/auth" + "git.curoverse.com/arvados.git/sdk/go/httpserver" + "github.com/Sirupsen/logrus" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/crypto/ssh" +) + +const ( + defaultPollInterval = time.Second + defaultStaleLockTimeout = time.Minute +) + +type pool interface { + scheduler.WorkerPool + Instances() []worker.InstanceView + Stop() +} + +type dispatcher struct { + Cluster *arvados.Cluster + InstanceSetID cloud.InstanceSetID + + logger logrus.FieldLogger + reg *prometheus.Registry + instanceSet cloud.InstanceSet + pool pool + queue scheduler.ContainerQueue + httpHandler http.Handler + sshKey ssh.Signer + + setupOnce sync.Once + stop chan struct{} + stopped chan struct{} +} + +// Start starts the dispatcher. Start can be called multiple times +// with no ill effect. +func (disp *dispatcher) Start() { + disp.setupOnce.Do(disp.setup) +} + +// ServeHTTP implements service.Handler. +func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) { + disp.Start() + disp.httpHandler.ServeHTTP(w, r) +} + +// CheckHealth implements service.Handler. +func (disp *dispatcher) CheckHealth() error { + disp.Start() + return nil +} + +// Stop dispatching containers and release resources. Typically used +// in tests. +func (disp *dispatcher) Close() { + disp.Start() + select { + case disp.stop <- struct{}{}: + default: + } + <-disp.stopped +} + +// Make a worker.Executor for the given instance. +func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor { + exr := ssh_executor.New(inst) + exr.SetSigners(disp.sshKey) + return exr +} + +func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) { + return ChooseInstanceType(disp.Cluster, ctr) +} + +func (disp *dispatcher) setup() { + disp.initialize() + go disp.run() +} + +func (disp *dispatcher) initialize() { + arvClient := arvados.NewClientFromEnv() + if disp.InstanceSetID == "" { + if strings.HasPrefix(arvClient.AuthToken, "v2/") { + disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1]) + } else { + // Use some other string unique to this token + // that doesn't reveal the token itself. + disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken)))) + } + } + disp.stop = make(chan struct{}, 1) + disp.stopped = make(chan struct{}) + disp.logger = logrus.StandardLogger() + + if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil { + disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err) + } else { + disp.sshKey = key + } + + instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID) + if err != nil { + disp.logger.Fatalf("error initializing driver: %s", err) + } + disp.instanceSet = &instanceSetProxy{instanceSet} + disp.reg = prometheus.NewRegistry() + disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster) + disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient) + + if disp.Cluster.ManagementToken == "" { + disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "Management API authentication is not configured", http.StatusForbidden) + }) + } else { + mux := http.NewServeMux() + mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers) + mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances) + metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{ + ErrorLog: disp.logger, + }) + mux.Handle("/metrics", metricsH) + mux.Handle("/metrics.json", metricsH) + disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux) + } +} + +func (disp *dispatcher) run() { + defer close(disp.stopped) + defer disp.instanceSet.Stop() + defer disp.pool.Stop() + + staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout) + if staleLockTimeout == 0 { + staleLockTimeout = defaultStaleLockTimeout + } + pollInterval := time.Duration(disp.Cluster.Dispatch.PollInterval) + if pollInterval <= 0 { + pollInterval = defaultPollInterval + } + sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval) + sched.Start() + defer sched.Stop() + + <-disp.stop +} + +// Management API: all active and queued containers. +func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + var resp struct { + Items []container.QueueEnt + } + qEntries, _ := disp.queue.Entries() + for _, ent := range qEntries { + resp.Items = append(resp.Items, ent) + } + json.NewEncoder(w).Encode(resp) +} + +// Management API: all active instances (cloud VMs). +func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) { + if r.Method != "GET" { + httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + var resp struct { + Items []worker.InstanceView + } + resp.Items = disp.pool.Instances() + json.NewEncoder(w).Encode(resp) +} diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go new file mode 100644 index 0000000000..33823a828d --- /dev/null +++ b/lib/dispatchcloud/dispatcher_test.go @@ -0,0 +1,269 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package dispatchcloud + +import ( + "encoding/json" + "io/ioutil" + "math/rand" + "net/http" + "net/http/httptest" + "os" + "sync" + "time" + + "git.curoverse.com/arvados.git/lib/dispatchcloud/test" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" + "golang.org/x/crypto/ssh" + check "gopkg.in/check.v1" +) + +var _ = check.Suite(&DispatcherSuite{}) + +type DispatcherSuite struct { + cluster *arvados.Cluster + instanceSet *test.LameInstanceSet + stubDriver *test.StubDriver + disp *dispatcher +} + +func (s *DispatcherSuite) SetUpSuite(c *check.C) { + if os.Getenv("ARVADOS_DEBUG") != "" { + logrus.StandardLogger().SetLevel(logrus.DebugLevel) + } +} + +func (s *DispatcherSuite) SetUpTest(c *check.C) { + dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch") + dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch") + c.Assert(err, check.IsNil) + + _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm") + s.stubDriver = &test.StubDriver{ + HostKey: hostpriv, + AuthorizedKeys: []ssh.PublicKey{dispatchpub}, + ErrorRateDestroy: 0.1, + } + + s.cluster = &arvados.Cluster{ + CloudVMs: arvados.CloudVMs{ + Driver: "test", + SyncInterval: arvados.Duration(10 * time.Millisecond), + TimeoutIdle: arvados.Duration(30 * time.Millisecond), + TimeoutBooting: arvados.Duration(30 * time.Millisecond), + TimeoutProbe: arvados.Duration(15 * time.Millisecond), + TimeoutShutdown: arvados.Duration(5 * time.Millisecond), + }, + Dispatch: arvados.Dispatch{ + PrivateKey: dispatchprivraw, + PollInterval: arvados.Duration(5 * time.Millisecond), + ProbeInterval: arvados.Duration(5 * time.Millisecond), + StaleLockTimeout: arvados.Duration(5 * time.Millisecond), + MaxProbesPerSecond: 1000, + }, + InstanceTypes: arvados.InstanceTypeMap{ + test.InstanceType(1).Name: test.InstanceType(1), + test.InstanceType(2).Name: test.InstanceType(2), + test.InstanceType(3).Name: test.InstanceType(3), + test.InstanceType(4).Name: test.InstanceType(4), + test.InstanceType(6).Name: test.InstanceType(6), + test.InstanceType(8).Name: test.InstanceType(8), + test.InstanceType(16).Name: test.InstanceType(16), + }, + NodeProfiles: map[string]arvados.NodeProfile{ + "*": { + Controller: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")}, + DispatchCloud: arvados.SystemServiceInstance{Listen: ":"}, + }, + }, + } + s.disp = &dispatcher{Cluster: s.cluster} + // Test cases can modify s.cluster before calling + // initialize(), and then modify private state before calling + // go run(). +} + +func (s *DispatcherSuite) TearDownTest(c *check.C) { + s.disp.Close() +} + +// DispatchToStubDriver checks that the dispatcher wires everything +// together effectively. It uses a real scheduler and worker pool with +// a fake queue and cloud driver. The fake cloud driver injects +// artificial errors in order to exercise a variety of code paths. +func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) { + drivers["test"] = s.stubDriver + s.disp.setupOnce.Do(s.disp.initialize) + queue := &test.Queue{ + ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + return ChooseInstanceType(s.cluster, ctr) + }, + } + for i := 0; i < 200; i++ { + queue.Containers = append(queue.Containers, arvados.Container{ + UUID: test.ContainerUUID(i + 1), + State: arvados.ContainerStateQueued, + Priority: int64(i%20 + 1), + RuntimeConstraints: arvados.RuntimeConstraints{ + RAM: int64(i%3+1) << 30, + VCPUs: i%8 + 1, + }, + }) + } + s.disp.queue = queue + + var mtx sync.Mutex + done := make(chan struct{}) + waiting := map[string]struct{}{} + for _, ctr := range queue.Containers { + waiting[ctr.UUID] = struct{}{} + } + executeContainer := func(ctr arvados.Container) int { + mtx.Lock() + defer mtx.Unlock() + if _, ok := waiting[ctr.UUID]; !ok { + c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID) + return 1 + } + delete(waiting, ctr.UUID) + if len(waiting) == 0 { + close(done) + } + return int(rand.Uint32() & 0x3) + } + n := 0 + s.stubDriver.Queue = queue + s.stubDriver.SetupVM = func(stubvm *test.StubVM) { + n++ + stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))) + stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond))) + stubvm.ExecuteContainer = executeContainer + switch n % 7 { + case 0: + stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond) + case 1: + stubvm.CrunchRunMissing = true + default: + stubvm.CrunchRunCrashRate = 0.1 + } + } + + start := time.Now() + go s.disp.run() + err := s.disp.CheckHealth() + c.Check(err, check.IsNil) + + select { + case <-done: + c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start)) + case <-time.After(10 * time.Second): + c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting) + } + + deadline := time.Now().Add(time.Second) + for range time.NewTicker(10 * time.Millisecond).C { + insts, err := s.stubDriver.InstanceSets()[0].Instances(nil) + c.Check(err, check.IsNil) + queue.Update() + ents, _ := queue.Entries() + if len(ents) == 0 && len(insts) == 0 { + break + } + if time.Now().After(deadline) { + c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts) + } + } +} + +func (s *DispatcherSuite) TestAPIPermissions(c *check.C) { + s.cluster.ManagementToken = "abcdefgh" + drivers["test"] = s.stubDriver + s.disp.setupOnce.Do(s.disp.initialize) + s.disp.queue = &test.Queue{} + go s.disp.run() + + for _, token := range []string{"abc", ""} { + req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil) + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + resp := httptest.NewRecorder() + s.disp.ServeHTTP(resp, req) + if token == "" { + c.Check(resp.Code, check.Equals, http.StatusUnauthorized) + } else { + c.Check(resp.Code, check.Equals, http.StatusForbidden) + } + } +} + +func (s *DispatcherSuite) TestAPIDisabled(c *check.C) { + s.cluster.ManagementToken = "" + drivers["test"] = s.stubDriver + s.disp.setupOnce.Do(s.disp.initialize) + s.disp.queue = &test.Queue{} + go s.disp.run() + + for _, token := range []string{"abc", ""} { + req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil) + if token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + resp := httptest.NewRecorder() + s.disp.ServeHTTP(resp, req) + c.Check(resp.Code, check.Equals, http.StatusForbidden) + } +} + +func (s *DispatcherSuite) TestInstancesAPI(c *check.C) { + s.cluster.ManagementToken = "abcdefgh" + s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second) + drivers["test"] = s.stubDriver + s.disp.setupOnce.Do(s.disp.initialize) + s.disp.queue = &test.Queue{} + go s.disp.run() + + type instance struct { + Instance string + WorkerState string + Price float64 + LastContainerUUID string + ArvadosInstanceType string + ProviderInstanceType string + } + type instancesResponse struct { + Items []instance + } + getInstances := func() instancesResponse { + req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil) + req.Header.Set("Authorization", "Bearer abcdefgh") + resp := httptest.NewRecorder() + s.disp.ServeHTTP(resp, req) + var sr instancesResponse + c.Check(resp.Code, check.Equals, http.StatusOK) + err := json.Unmarshal(resp.Body.Bytes(), &sr) + c.Check(err, check.IsNil) + return sr + } + + sr := getInstances() + c.Check(len(sr.Items), check.Equals, 0) + + ch := s.disp.pool.Subscribe() + defer s.disp.pool.Unsubscribe(ch) + err := s.disp.pool.Create(test.InstanceType(1)) + c.Check(err, check.IsNil) + <-ch + + sr = getInstances() + c.Assert(len(sr.Items), check.Equals, 1) + c.Check(sr.Items[0].Instance, check.Matches, "stub.*") + c.Check(sr.Items[0].WorkerState, check.Equals, "booting") + c.Check(sr.Items[0].Price, check.Equals, 0.123) + c.Check(sr.Items[0].LastContainerUUID, check.Equals, "") + c.Check(sr.Items[0].ProviderInstanceType, check.Equals, test.InstanceType(1).ProviderType) + c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, test.InstanceType(1).Name) +} diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go new file mode 100644 index 0000000000..295fd6105b --- /dev/null +++ b/lib/dispatchcloud/driver.go @@ -0,0 +1,22 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package dispatchcloud + +import ( + "fmt" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +var drivers = map[string]cloud.Driver{} + +func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID) (cloud.InstanceSet, error) { + driver, ok := drivers[cluster.CloudVMs.Driver] + if !ok { + return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver) + } + return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID) +} diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go new file mode 100644 index 0000000000..e728b67cd2 --- /dev/null +++ b/lib/dispatchcloud/instance_set_proxy.go @@ -0,0 +1,25 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package dispatchcloud + +import ( + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "golang.org/x/crypto/ssh" +) + +type instanceSetProxy struct { + cloud.InstanceSet +} + +func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) { + // TODO: return if Create failed recently with a RateLimitError or QuotaError + return is.InstanceSet.Create(it, id, tags, pk) +} + +func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) { + // TODO: return if Instances failed recently with a RateLimitError + return is.InstanceSet.Instances(tags) +} diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go new file mode 100644 index 0000000000..90bb6ca686 --- /dev/null +++ b/lib/dispatchcloud/logger.go @@ -0,0 +1,29 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package dispatchcloud + +import ( + "sync" + "time" +) + +type logger interface { + Printf(string, ...interface{}) + Warnf(string, ...interface{}) + Debugf(string, ...interface{}) +} + +var nextSpam = map[string]time.Time{} +var nextSpamMtx sync.Mutex + +func unspam(msg string) bool { + nextSpamMtx.Lock() + defer nextSpamMtx.Unlock() + if nextSpam[msg].Before(time.Now()) { + nextSpam[msg] = time.Now().Add(time.Minute) + return true + } + return false +} diff --git a/lib/dispatchcloud/node_size.go b/lib/dispatchcloud/node_size.go index 339e042c1a..d7f4585619 100644 --- a/lib/dispatchcloud/node_size.go +++ b/lib/dispatchcloud/node_size.go @@ -6,21 +6,16 @@ package dispatchcloud import ( "errors" - "log" - "os/exec" "regexp" "sort" "strconv" - "strings" - "time" "git.curoverse.com/arvados.git/sdk/go/arvados" ) -var ( - ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types") - discountConfiguredRAMPercent = 5 -) +var ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types") + +var discountConfiguredRAMPercent = 5 // ConstraintsNotSatisfiableError includes a list of available instance types // to be reported back to the user. @@ -38,12 +33,10 @@ var pdhRegexp = regexp.MustCompile(`^[0-9a-f]{32}\+(\d+)$`) func estimateDockerImageSize(collectionPDH string) int64 { m := pdhRegexp.FindStringSubmatch(collectionPDH) if m == nil { - log.Printf("estimateDockerImageSize: '%v' did not match pdhRegexp, returning 0", collectionPDH) return 0 } n, err := strconv.ParseInt(m[1], 10, 64) if err != nil || n < 122 { - log.Printf("estimateDockerImageSize: short manifest %v or error (%v), returning 0", n, err) return 0 } // To avoid having to fetch the collection, take advantage of @@ -137,61 +130,3 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad } return } - -// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance -// type name as a valid feature name, even if no instances of that -// type have appeared yet. -// -// It takes advantage of some SLURM peculiarities: -// -// (1) A feature is valid after it has been offered by a node, even if -// it is no longer offered by any node. So, to make a feature name -// valid, we can add it to a dummy node ("compute0"), then remove it. -// -// (2) To test whether a set of feature names are valid without -// actually submitting a job, we can call srun --test-only with the -// desired features. -// -// SlurmNodeTypeFeatureKludge does a test-and-fix operation -// immediately, and then periodically, in case slurm restarts and -// forgets the list of valid features. It never returns (unless there -// are no node types configured, in which case it returns -// immediately), so it should generally be invoked with "go". -func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) { - if len(cc.InstanceTypes) == 0 { - return - } - var features []string - for _, it := range cc.InstanceTypes { - features = append(features, "instancetype="+it.Name) - } - for { - slurmKludge(features) - time.Sleep(2 * time.Second) - } -} - -const slurmDummyNode = "compute0" - -func slurmKludge(features []string) { - allFeatures := strings.Join(features, ",") - - cmd := exec.Command("sinfo", "--nodes="+slurmDummyNode, "--format=%f", "--noheader") - out, err := cmd.CombinedOutput() - if err != nil { - log.Printf("running %q %q: %s (output was %q)", cmd.Path, cmd.Args, err, out) - return - } - if string(out) == allFeatures+"\n" { - // Already configured correctly, nothing to do. - return - } - - log.Printf("configuring node %q with all node type features", slurmDummyNode) - cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+allFeatures) - log.Printf("running: %q %q", cmd.Path, cmd.Args) - out, err = cmd.CombinedOutput() - if err != nil { - log.Printf("error: scontrol: %s (output was %q)", err, out) - } -} diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go new file mode 100644 index 0000000000..c8491fb1df --- /dev/null +++ b/lib/dispatchcloud/readme.go @@ -0,0 +1,70 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package dispatchcloud + +// A dispatcher comprises a container queue, a scheduler, a worker +// pool, a remote command executor, and a cloud driver. +// 1. Choose a provider. +// 2. Start a worker pool. +// 3. Start a container queue. +// 4. Run the scheduler's stale-lock fixer. +// 5. Run the scheduler's mapper. +// 6. Run the scheduler's syncer. +// 7. Wait for updates to the container queue or worker pool. +// 8. Repeat from 5. +// +// +// A cloud driver creates new cloud VM instances and gets the latest +// list of instances. The returned instances are caches/proxies for +// the provider's metadata and control interfaces (get IP address, +// update tags, shutdown). +// +// +// A worker pool tracks workers' instance types and readiness states +// (available to do work now, booting, suffering a temporary network +// outage, shutting down). It loads internal state from the cloud +// provider's list of instances at startup, and syncs periodically +// after that. +// +// +// An executor maintains a multiplexed SSH connection to a cloud +// instance, retrying/reconnecting as needed, so the worker pool can +// execute commands. It asks the cloud driver's instance to verify its +// SSH public key once when first connecting, and again later if the +// key changes. +// +// +// A container queue tracks the known state (according to +// arvados-controller) of each container of interest -- i.e., queued, +// or locked/running using our own dispatch token. It also proxies the +// dispatcher's lock/unlock/cancel requests to the controller. It +// handles concurrent refresh and update operations without exposing +// out-of-order updates to its callers. (It drops any new information +// that might have originated before its own most recent +// lock/unlock/cancel operation.) +// +// +// The scheduler's stale-lock fixer waits for any already-locked +// containers (i.e., locked by a prior dispatcher process) to appear +// on workers as the worker pool recovers its state. It +// unlocks/requeues any that still remain when all workers are +// recovered or shutdown, or its timer expires. +// +// +// The scheduler's mapper chooses which containers to assign to which +// idle workers, and decides what to do when there are not enough idle +// workers (including shutting down some idle nodes). +// +// +// The scheduler's syncer updates state to Cancelled when a running +// container process dies without finalizing its entry in the +// controller database. It also calls the worker pool to kill +// containers that have priority=0 while locked or running. +// +// +// An instance set proxy wraps a driver's instance set with +// rate-limiting logic. After the wrapped instance set receives a +// cloud.RateLimitError, the proxy starts returning errors to callers +// immediately without calling through to the wrapped instance set. diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go new file mode 100644 index 0000000000..264f9e4ec6 --- /dev/null +++ b/lib/dispatchcloud/scheduler/fix_stale_locks.go @@ -0,0 +1,57 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package scheduler + +import ( + "time" + + "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// fixStaleLocks waits for any already-locked containers (i.e., locked +// by a prior dispatcher process) to appear on workers as the worker +// pool recovers its state. It unlocks any that still remain when all +// workers are recovered or shutdown, or its timer +// (sch.staleLockTimeout) expires. +func (sch *Scheduler) fixStaleLocks() { + wp := sch.pool.Subscribe() + defer sch.pool.Unsubscribe(wp) + timeout := time.NewTimer(sch.staleLockTimeout) +waiting: + for { + unlock := false + select { + case <-wp: + // If all workers have been contacted, unlock + // containers that aren't claimed by any + // worker. + unlock = sch.pool.CountWorkers()[worker.StateUnknown] == 0 + case <-timeout.C: + // Give up and unlock the containers, even + // though they might be working. + unlock = true + } + + running := sch.pool.Running() + qEntries, _ := sch.queue.Entries() + for uuid, ent := range qEntries { + if ent.Container.State != arvados.ContainerStateLocked { + continue + } + if _, running := running[uuid]; running { + continue + } + if !unlock { + continue waiting + } + err := sch.queue.Unlock(uuid) + if err != nil { + sch.logger.Warnf("Unlock %s: %s", uuid, err) + } + } + return + } +} diff --git a/lib/dispatchcloud/scheduler/gocheck_test.go b/lib/dispatchcloud/scheduler/gocheck_test.go new file mode 100644 index 0000000000..558c60f73d --- /dev/null +++ b/lib/dispatchcloud/scheduler/gocheck_test.go @@ -0,0 +1,16 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package scheduler + +import ( + "testing" + + check "gopkg.in/check.v1" +) + +// Gocheck boilerplate +func Test(t *testing.T) { + check.TestingT(t) +} diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go new file mode 100644 index 0000000000..59700c3935 --- /dev/null +++ b/lib/dispatchcloud/scheduler/interfaces.go @@ -0,0 +1,43 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package scheduler + +import ( + "time" + + "git.curoverse.com/arvados.git/lib/dispatchcloud/container" + "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// A ContainerQueue is a set of containers that need to be started or +// stopped. Implemented by container.Queue and test stubs. +type ContainerQueue interface { + Entries() (entries map[string]container.QueueEnt, updated time.Time) + Lock(uuid string) error + Unlock(uuid string) error + Cancel(uuid string) error + Forget(uuid string) + Get(uuid string) (arvados.Container, bool) + Subscribe() <-chan struct{} + Unsubscribe(<-chan struct{}) + Update() error +} + +// A WorkerPool asynchronously starts and stops worker VMs, and starts +// and stops containers on them. Implemented by worker.Pool and test +// stubs. +type WorkerPool interface { + Running() map[string]time.Time + Unallocated() map[arvados.InstanceType]int + CountWorkers() map[worker.State]int + AtQuota() bool + Create(arvados.InstanceType) error + Shutdown(arvados.InstanceType) bool + StartContainer(arvados.InstanceType, arvados.Container) bool + KillContainer(uuid string) + Subscribe() <-chan struct{} + Unsubscribe(<-chan struct{}) +} diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go new file mode 100644 index 0000000000..ece8e3d989 --- /dev/null +++ b/lib/dispatchcloud/scheduler/run_queue.go @@ -0,0 +1,165 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package scheduler + +import ( + "sort" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/lib/dispatchcloud/container" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" +) + +func (sch *Scheduler) runQueue() { + unsorted, _ := sch.queue.Entries() + sorted := make([]container.QueueEnt, 0, len(unsorted)) + for _, ent := range unsorted { + sorted = append(sorted, ent) + } + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Container.Priority > sorted[j].Container.Priority + }) + + running := sch.pool.Running() + unalloc := sch.pool.Unallocated() + + sch.logger.WithFields(logrus.Fields{ + "Containers": len(sorted), + "Processes": len(running), + }).Debug("runQueue") + + dontstart := map[arvados.InstanceType]bool{} + var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota + +tryrun: + for i, ctr := range sorted { + ctr, it := ctr.Container, ctr.InstanceType + logger := sch.logger.WithFields(logrus.Fields{ + "ContainerUUID": ctr.UUID, + "InstanceType": it.Name, + }) + if _, running := running[ctr.UUID]; running || ctr.Priority < 1 { + continue + } + switch ctr.State { + case arvados.ContainerStateQueued: + if unalloc[it] < 1 && sch.pool.AtQuota() { + logger.Debug("not locking: AtQuota and no unalloc workers") + overquota = sorted[i:] + break tryrun + } + sch.bgLock(logger, ctr.UUID) + unalloc[it]-- + case arvados.ContainerStateLocked: + if unalloc[it] > 0 { + unalloc[it]-- + } else if sch.pool.AtQuota() { + logger.Debug("not starting: AtQuota and no unalloc workers") + overquota = sorted[i:] + break tryrun + } else { + logger.Info("creating new instance") + err := sch.pool.Create(it) + if err != nil { + if _, ok := err.(cloud.QuotaError); !ok { + logger.WithError(err).Warn("error creating worker") + } + sch.queue.Unlock(ctr.UUID) + // Don't let lower-priority + // containers starve this one + // by using keeping idle + // workers alive on different + // instance types. TODO: + // avoid getting starved here + // if instances of a specific + // type always fail. + overquota = sorted[i:] + break tryrun + } + } + + if dontstart[it] { + // We already tried & failed to start + // a higher-priority container on the + // same instance type. Don't let this + // one sneak in ahead of it. + } else if sch.pool.StartContainer(it, ctr) { + // Success. + } else { + dontstart[it] = true + } + } + } + + if len(overquota) > 0 { + // Unlock any containers that are unmappable while + // we're at quota. + for _, ctr := range overquota { + ctr := ctr.Container + if ctr.State == arvados.ContainerStateLocked { + logger := sch.logger.WithField("ContainerUUID", ctr.UUID) + logger.Debug("unlock because pool capacity is used by higher priority containers") + err := sch.queue.Unlock(ctr.UUID) + if err != nil { + logger.WithError(err).Warn("error unlocking") + } + } + } + // Shut down idle workers that didn't get any + // containers mapped onto them before we hit quota. + for it, n := range unalloc { + if n < 1 { + continue + } + sch.pool.Shutdown(it) + } + } +} + +// Start an API call to lock the given container, and return +// immediately while waiting for the response in a new goroutine. Do +// nothing if a lock request is already in progress for this +// container. +func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) { + logger.Debug("locking") + sch.mtx.Lock() + defer sch.mtx.Unlock() + if sch.locking[uuid] { + logger.Debug("locking in progress, doing nothing") + return + } + if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued { + // This happens if the container has been cancelled or + // locked since runQueue called sch.queue.Entries(), + // possibly by a bgLock() call from a previous + // runQueue iteration. In any case, we will respond + // appropriately on the next runQueue iteration, which + // will have already been triggered by the queue + // update. + logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing") + return + } + sch.locking[uuid] = true + go func() { + defer func() { + sch.mtx.Lock() + defer sch.mtx.Unlock() + delete(sch.locking, uuid) + }() + err := sch.queue.Lock(uuid) + if err != nil { + logger.WithError(err).Warn("error locking container") + return + } + logger.Debug("lock succeeded") + ctr, ok := sch.queue.Get(uuid) + if !ok { + logger.Error("(BUG?) container disappeared from queue after Lock succeeded") + } else if ctr.State != arvados.ContainerStateLocked { + logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State) + } + }() +} diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go new file mode 100644 index 0000000000..be13e1c345 --- /dev/null +++ b/lib/dispatchcloud/scheduler/run_queue_test.go @@ -0,0 +1,318 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package scheduler + +import ( + "errors" + "time" + + "git.curoverse.com/arvados.git/lib/dispatchcloud/test" + "git.curoverse.com/arvados.git/lib/dispatchcloud/worker" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" + check "gopkg.in/check.v1" +) + +var ( + logger = logrus.StandardLogger() + + // arbitrary example container UUIDs + uuids = func() (r []string) { + for i := 0; i < 16; i++ { + r = append(r, test.ContainerUUID(i)) + } + return + }() +) + +type stubQuotaError struct { + error +} + +func (stubQuotaError) IsQuotaError() bool { return true } + +type stubPool struct { + notify <-chan struct{} + unalloc map[arvados.InstanceType]int // idle+booting+unknown + idle map[arvados.InstanceType]int + running map[string]time.Time + atQuota bool + canCreate int + creates []arvados.InstanceType + starts []string + shutdowns int +} + +func (p *stubPool) AtQuota() bool { return p.atQuota } +func (p *stubPool) Subscribe() <-chan struct{} { return p.notify } +func (p *stubPool) Unsubscribe(<-chan struct{}) {} +func (p *stubPool) Running() map[string]time.Time { return p.running } +func (p *stubPool) Unallocated() map[arvados.InstanceType]int { + r := map[arvados.InstanceType]int{} + for it, n := range p.unalloc { + r[it] = n + } + return r +} +func (p *stubPool) Create(it arvados.InstanceType) error { + p.creates = append(p.creates, it) + if p.canCreate < 1 { + return stubQuotaError{errors.New("quota")} + } + p.canCreate-- + p.unalloc[it]++ + return nil +} +func (p *stubPool) KillContainer(uuid string) { + p.running[uuid] = time.Now() +} +func (p *stubPool) Shutdown(arvados.InstanceType) bool { + p.shutdowns++ + return false +} +func (p *stubPool) CountWorkers() map[worker.State]int { + return map[worker.State]int{ + worker.StateBooting: len(p.unalloc) - len(p.idle), + worker.StateIdle: len(p.idle), + worker.StateRunning: len(p.running), + } +} +func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool { + p.starts = append(p.starts, ctr.UUID) + if p.idle[it] == 0 { + return false + } + p.idle[it]-- + p.unalloc[it]-- + p.running[ctr.UUID] = time.Time{} + return true +} + +var _ = check.Suite(&SchedulerSuite{}) + +type SchedulerSuite struct{} + +// Assign priority=4 container to idle node. Create a new instance for +// the priority=3 container. Don't try to start any priority<3 +// containers because priority=3 container didn't start +// immediately. Don't try to create any other nodes after the failed +// create. +func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) { + queue := test.Queue{ + ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil + }, + Containers: []arvados.Container{ + { + UUID: test.ContainerUUID(1), + Priority: 1, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, + }, + { + UUID: test.ContainerUUID(2), + Priority: 2, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, + }, + { + UUID: test.ContainerUUID(3), + Priority: 3, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, + }, + { + UUID: test.ContainerUUID(4), + Priority: 4, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, + }, + }, + } + queue.Update() + pool := stubPool{ + unalloc: map[arvados.InstanceType]int{ + test.InstanceType(1): 1, + test.InstanceType(2): 2, + }, + idle: map[arvados.InstanceType]int{ + test.InstanceType(1): 1, + test.InstanceType(2): 2, + }, + running: map[string]time.Time{}, + canCreate: 0, + } + New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() + c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)}) + c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)}) + c.Check(pool.running, check.HasLen, 1) + for uuid := range pool.running { + c.Check(uuid, check.Equals, uuids[4]) + } +} + +// If Create() fails, shutdown some nodes, and don't call Create() +// again. Don't call Create() at all if AtQuota() is true. +func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) { + for quota := 0; quota < 2; quota++ { + c.Logf("quota=%d", quota) + shouldCreate := []arvados.InstanceType{} + for i := 0; i < quota; i++ { + shouldCreate = append(shouldCreate, test.InstanceType(3)) + } + queue := test.Queue{ + ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil + }, + Containers: []arvados.Container{ + { + UUID: test.ContainerUUID(2), + Priority: 2, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 2, + RAM: 2 << 30, + }, + }, + { + UUID: test.ContainerUUID(3), + Priority: 3, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 3, + RAM: 3 << 30, + }, + }, + }, + } + queue.Update() + pool := stubPool{ + atQuota: quota == 0, + unalloc: map[arvados.InstanceType]int{ + test.InstanceType(2): 2, + }, + idle: map[arvados.InstanceType]int{ + test.InstanceType(2): 2, + }, + running: map[string]time.Time{}, + creates: []arvados.InstanceType{}, + starts: []string{}, + canCreate: 0, + } + New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() + c.Check(pool.creates, check.DeepEquals, shouldCreate) + c.Check(pool.starts, check.DeepEquals, []string{}) + c.Check(pool.shutdowns, check.Not(check.Equals), 0) + } +} + +// Start lower-priority containers while waiting for new/existing +// workers to come up for higher-priority containers. +func (*SchedulerSuite) TestStartWhileCreating(c *check.C) { + pool := stubPool{ + unalloc: map[arvados.InstanceType]int{ + test.InstanceType(1): 2, + test.InstanceType(2): 2, + }, + idle: map[arvados.InstanceType]int{ + test.InstanceType(1): 1, + test.InstanceType(2): 1, + }, + running: map[string]time.Time{}, + canCreate: 4, + } + queue := test.Queue{ + ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) { + return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil + }, + Containers: []arvados.Container{ + { + // create a new worker + UUID: test.ContainerUUID(1), + Priority: 1, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, + }, + { + // tentatively map to unalloc worker + UUID: test.ContainerUUID(2), + Priority: 2, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, + }, + { + // start now on idle worker + UUID: test.ContainerUUID(3), + Priority: 3, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 1, + RAM: 1 << 30, + }, + }, + { + // create a new worker + UUID: test.ContainerUUID(4), + Priority: 4, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 2, + RAM: 2 << 30, + }, + }, + { + // tentatively map to unalloc worker + UUID: test.ContainerUUID(5), + Priority: 5, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 2, + RAM: 2 << 30, + }, + }, + { + // start now on idle worker + UUID: test.ContainerUUID(6), + Priority: 6, + State: arvados.ContainerStateLocked, + RuntimeConstraints: arvados.RuntimeConstraints{ + VCPUs: 2, + RAM: 2 << 30, + }, + }, + }, + } + queue.Update() + New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue() + c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)}) + c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]}) + running := map[string]bool{} + for uuid, t := range pool.running { + if t.IsZero() { + running[uuid] = false + } else { + running[uuid] = true + } + } + c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false}) +} diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go new file mode 100644 index 0000000000..3971a5319d --- /dev/null +++ b/lib/dispatchcloud/scheduler/scheduler.go @@ -0,0 +1,116 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +// Package scheduler uses a resizable worker pool to execute +// containers in priority order. +package scheduler + +import ( + "sync" + "time" + + "github.com/Sirupsen/logrus" +) + +// A Scheduler maps queued containers onto unallocated workers in +// priority order, creating new workers if needed. It locks containers +// that can be mapped onto existing/pending workers, and starts them +// if possible. +// +// A Scheduler unlocks any containers that are locked but can't be +// mapped. (For example, this happens when the cloud provider reaches +// quota/capacity and a previously mappable container's priority is +// surpassed by a newer container.) +// +// If it encounters errors while creating new workers, a Scheduler +// shuts down idle workers, in case they are consuming quota. +type Scheduler struct { + logger logrus.FieldLogger + queue ContainerQueue + pool WorkerPool + staleLockTimeout time.Duration + queueUpdateInterval time.Duration + + locking map[string]bool + mtx sync.Mutex + + runOnce sync.Once + stop chan struct{} + stopped chan struct{} +} + +// New returns a new unstarted Scheduler. +// +// Any given queue and pool should not be used by more than one +// scheduler at a time. +func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler { + return &Scheduler{ + logger: logger, + queue: queue, + pool: pool, + staleLockTimeout: staleLockTimeout, + queueUpdateInterval: queueUpdateInterval, + stop: make(chan struct{}), + stopped: make(chan struct{}), + locking: map[string]bool{}, + } +} + +// Start starts the scheduler. +func (sch *Scheduler) Start() { + go sch.runOnce.Do(sch.run) +} + +// Stop stops the scheduler. No other method should be called after +// Stop. +func (sch *Scheduler) Stop() { + close(sch.stop) + <-sch.stopped +} + +func (sch *Scheduler) run() { + defer close(sch.stopped) + + // Ensure the queue is fetched once before attempting anything. + for err := sch.queue.Update(); err != nil; err = sch.queue.Update() { + sch.logger.Errorf("error updating queue: %s", err) + d := sch.queueUpdateInterval / 60 + sch.logger.Infof("waiting %s before retry", d) + time.Sleep(d) + } + + // Keep the queue up to date. + poll := time.NewTicker(sch.queueUpdateInterval) + defer poll.Stop() + go func() { + for range poll.C { + err := sch.queue.Update() + if err != nil { + sch.logger.Errorf("error updating queue: %s", err) + } + } + }() + + t0 := time.Now() + sch.logger.Infof("FixStaleLocks starting.") + sch.fixStaleLocks() + sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0)) + + poolNotify := sch.pool.Subscribe() + defer sch.pool.Unsubscribe(poolNotify) + + queueNotify := sch.queue.Subscribe() + defer sch.queue.Unsubscribe(queueNotify) + + for { + sch.runQueue() + sch.sync() + select { + case <-sch.stop: + return + case <-queueNotify: + case <-poolNotify: + } + } +} diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go new file mode 100644 index 0000000000..4c55b3c0d8 --- /dev/null +++ b/lib/dispatchcloud/scheduler/sync.go @@ -0,0 +1,97 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package scheduler + +import ( + "fmt" + "time" + + "git.curoverse.com/arvados.git/lib/dispatchcloud/container" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" +) + +// sync resolves discrepancies between the queue and the pool: +// +// Lingering crunch-run processes for finalized and unlocked/requeued +// containers are killed. +// +// Locked containers whose crunch-run processes have exited are +// requeued. +// +// Running containers whose crunch-run processes have exited are +// cancelled. +func (sch *Scheduler) sync() { + running := sch.pool.Running() + cancel := func(ent container.QueueEnt, reason string) { + uuid := ent.Container.UUID + logger := sch.logger.WithField("ContainerUUID", uuid) + logger.Infof("cancelling container because %s", reason) + err := sch.queue.Cancel(uuid) + if err != nil { + logger.WithError(err).Print("error cancelling container") + } + } + kill := func(ent container.QueueEnt, reason string) { + uuid := ent.Container.UUID + logger := sch.logger.WithField("ContainerUUID", uuid) + logger.Debugf("killing crunch-run process because %s", reason) + sch.pool.KillContainer(uuid) + } + qEntries, qUpdated := sch.queue.Entries() + for uuid, ent := range qEntries { + exited, running := running[uuid] + switch ent.Container.State { + case arvados.ContainerStateRunning: + if !running { + go cancel(ent, "not running on any worker") + } else if !exited.IsZero() && qUpdated.After(exited) { + go cancel(ent, "state=\"Running\" after crunch-run exited") + } else if ent.Container.Priority == 0 { + go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority)) + } + case arvados.ContainerStateComplete, arvados.ContainerStateCancelled: + if running { + // Kill crunch-run in case it's stuck; + // nothing it does now will matter + // anyway. If crunch-run has already + // exited and we just haven't found + // out about it yet, the only effect + // of kill() will be to make the + // worker available for the next + // container. + go kill(ent, fmt.Sprintf("state=%q", ent.Container.State)) + } else { + sch.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "State": ent.Container.State, + }).Info("container finished") + sch.queue.Forget(uuid) + } + case arvados.ContainerStateQueued: + if running { + // Can happen if a worker returns from + // a network outage and is still + // preparing to run a container that + // has already been unlocked/requeued. + go kill(ent, fmt.Sprintf("state=%q", ent.Container.State)) + } + case arvados.ContainerStateLocked: + if running && !exited.IsZero() && qUpdated.After(exited) { + logger := sch.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "Exited": time.Since(exited).Seconds(), + }) + logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State) + err := sch.queue.Unlock(uuid) + if err != nil { + logger.WithError(err).Info("error requeueing container") + } + } + default: + sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State) + } + } +} diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go new file mode 100644 index 0000000000..b5dba9870d --- /dev/null +++ b/lib/dispatchcloud/ssh_executor/executor.go @@ -0,0 +1,190 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +// Package ssh_executor provides an implementation of pool.Executor +// using a long-lived multiplexed SSH session. +package ssh_executor + +import ( + "bytes" + "errors" + "io" + "net" + "sync" + "time" + + "git.curoverse.com/arvados.git/lib/cloud" + "golang.org/x/crypto/ssh" +) + +// New returns a new Executor, using the given target. +func New(t cloud.ExecutorTarget) *Executor { + return &Executor{target: t} +} + +// An Executor uses a multiplexed SSH connection to execute shell +// commands on a remote target. It reconnects automatically after +// errors. +// +// When setting up a connection, the Executor accepts whatever host +// key is provided by the remote server, then passes the received key +// and the SSH connection to the target's VerifyHostKey method before +// executing commands on the connection. +// +// A zero Executor must not be used before calling SetTarget. +// +// An Executor must not be copied. +type Executor struct { + target cloud.ExecutorTarget + signers []ssh.Signer + mtx sync.RWMutex // controls access to instance after creation + + client *ssh.Client + clientErr error + clientOnce sync.Once // initialized private state + clientSetup chan bool // len>0 while client setup is in progress + hostKey ssh.PublicKey // most recent host key that passed verification, if any +} + +// SetSigners updates the set of private keys that will be offered to +// the target next time the Executor sets up a new connection. +func (exr *Executor) SetSigners(signers ...ssh.Signer) { + exr.mtx.Lock() + defer exr.mtx.Unlock() + exr.signers = signers +} + +// SetTarget sets the current target. The new target will be used next +// time a new connection is set up; until then, the Executor will +// continue to use the existing target. +// +// The new target is assumed to represent the same host as the +// previous target, although its address and host key might differ. +func (exr *Executor) SetTarget(t cloud.ExecutorTarget) { + exr.mtx.Lock() + defer exr.mtx.Unlock() + exr.target = t +} + +// Target returns the current target. +func (exr *Executor) Target() cloud.ExecutorTarget { + exr.mtx.RLock() + defer exr.mtx.RUnlock() + return exr.target +} + +// Execute runs cmd on the target. If an existing connection is not +// usable, it sets up a new connection to the current target. +func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) { + session, err := exr.newSession() + if err != nil { + return nil, nil, err + } + defer session.Close() + var stdout, stderr bytes.Buffer + session.Stdin = stdin + session.Stdout = &stdout + session.Stderr = &stderr + err = session.Run(cmd) + return stdout.Bytes(), stderr.Bytes(), err +} + +// Close shuts down any active connections. +func (exr *Executor) Close() { + // Ensure exr is initialized + exr.sshClient(false) + + exr.clientSetup <- true + if exr.client != nil { + defer exr.client.Close() + } + exr.client, exr.clientErr = nil, errors.New("closed") + <-exr.clientSetup +} + +// Create a new SSH session. If session setup fails or the SSH client +// hasn't been setup yet, setup a new SSH client and try again. +func (exr *Executor) newSession() (*ssh.Session, error) { + try := func(create bool) (*ssh.Session, error) { + client, err := exr.sshClient(create) + if err != nil { + return nil, err + } + return client.NewSession() + } + session, err := try(false) + if err != nil { + session, err = try(true) + } + return session, err +} + +// Get the latest SSH client. If another goroutine is in the process +// of setting one up, wait for it to finish and return its result (or +// the last successfully setup client, if it fails). +func (exr *Executor) sshClient(create bool) (*ssh.Client, error) { + exr.clientOnce.Do(func() { + exr.clientSetup = make(chan bool, 1) + exr.clientErr = errors.New("client not yet created") + }) + defer func() { <-exr.clientSetup }() + select { + case exr.clientSetup <- true: + if create { + client, err := exr.setupSSHClient() + if err == nil || exr.client == nil { + if exr.client != nil { + // Hang up the previous + // (non-working) client + go exr.client.Close() + } + exr.client, exr.clientErr = client, err + } + if err != nil { + return nil, err + } + } + default: + // Another goroutine is doing the above case. Wait + // for it to finish and return whatever it leaves in + // wkr.client. + exr.clientSetup <- true + } + return exr.client, exr.clientErr +} + +// Create a new SSH client. +func (exr *Executor) setupSSHClient() (*ssh.Client, error) { + target := exr.Target() + addr := target.Address() + if addr == "" { + return nil, errors.New("instance has no address") + } + var receivedKey ssh.PublicKey + client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{ + User: "root", + Auth: []ssh.AuthMethod{ + ssh.PublicKeys(exr.signers...), + }, + HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error { + receivedKey = key + return nil + }, + Timeout: time.Minute, + }) + if err != nil { + return nil, err + } else if receivedKey == nil { + return nil, errors.New("BUG: key was never provided to HostKeyCallback") + } + + if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) { + err = target.VerifyHostKey(receivedKey, client) + if err != nil { + return nil, err + } + exr.hostKey = receivedKey + } + return client, nil +} diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go new file mode 100644 index 0000000000..8dabfecad8 --- /dev/null +++ b/lib/dispatchcloud/ssh_executor/executor_test.go @@ -0,0 +1,102 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package ssh_executor + +import ( + "bytes" + "io" + "io/ioutil" + "sync" + "testing" + "time" + + "git.curoverse.com/arvados.git/lib/dispatchcloud/test" + "golang.org/x/crypto/ssh" + check "gopkg.in/check.v1" +) + +// Gocheck boilerplate +func Test(t *testing.T) { + check.TestingT(t) +} + +var _ = check.Suite(&ExecutorSuite{}) + +type testTarget struct { + test.SSHService +} + +func (*testTarget) VerifyHostKey(ssh.PublicKey, *ssh.Client) error { + return nil +} + +type ExecutorSuite struct{} + +func (s *ExecutorSuite) TestExecute(c *check.C) { + command := `foo 'bar' "baz"` + stdinData := "foobar\nbaz\n" + _, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm") + clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch") + for _, exitcode := range []int{0, 1, 2} { + srv := &testTarget{ + SSHService: test.SSHService{ + Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 { + c.Check(cmd, check.Equals, command) + var wg sync.WaitGroup + wg.Add(2) + go func() { + io.WriteString(stdout, "stdout\n") + wg.Done() + }() + go func() { + io.WriteString(stderr, "stderr\n") + wg.Done() + }() + buf, err := ioutil.ReadAll(stdin) + wg.Wait() + c.Check(err, check.IsNil) + if err != nil { + return 99 + } + _, err = stdout.Write(buf) + c.Check(err, check.IsNil) + return uint32(exitcode) + }, + HostKey: hostpriv, + AuthorizedKeys: []ssh.PublicKey{clientpub}, + }, + } + err := srv.Start() + c.Check(err, check.IsNil) + c.Logf("srv address %q", srv.Address()) + defer srv.Close() + + exr := New(srv) + exr.SetSigners(clientpriv) + + done := make(chan bool) + go func() { + stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData)) + if exitcode == 0 { + c.Check(err, check.IsNil) + } else { + c.Check(err, check.NotNil) + err, ok := err.(*ssh.ExitError) + c.Assert(ok, check.Equals, true) + c.Check(err.ExitStatus(), check.Equals, exitcode) + } + c.Check(stdout, check.DeepEquals, []byte("stdout\n"+stdinData)) + c.Check(stderr, check.DeepEquals, []byte("stderr\n")) + close(done) + }() + + timeout := time.NewTimer(time.Second) + select { + case <-done: + case <-timeout.C: + c.Fatal("timed out") + } + } +} diff --git a/lib/dispatchcloud/test/doc.go b/lib/dispatchcloud/test/doc.go new file mode 100644 index 0000000000..12f3b16b27 --- /dev/null +++ b/lib/dispatchcloud/test/doc.go @@ -0,0 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +// Package test provides fakes and other tools for testing cloud +// drivers and other dispatcher modules. +package test diff --git a/lib/dispatchcloud/test/fixtures.go b/lib/dispatchcloud/test/fixtures.go new file mode 100644 index 0000000000..68bdb3db42 --- /dev/null +++ b/lib/dispatchcloud/test/fixtures.go @@ -0,0 +1,28 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package test + +import ( + "fmt" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// ContainerUUID returns a fake container UUID. +func ContainerUUID(i int) string { + return fmt.Sprintf("zzzzz-dz642-%015d", i) +} + +// InstanceType returns a fake arvados.InstanceType called "type{i}" +// with i CPUs and i GiB of memory. +func InstanceType(i int) arvados.InstanceType { + return arvados.InstanceType{ + Name: fmt.Sprintf("type%d", i), + ProviderType: fmt.Sprintf("providertype%d", i), + VCPUs: i, + RAM: arvados.ByteSize(i) << 30, + Price: float64(i) * 0.123, + } +} diff --git a/lib/dispatchcloud/test/lame_instance_set.go b/lib/dispatchcloud/test/lame_instance_set.go new file mode 100644 index 0000000000..baab407a7d --- /dev/null +++ b/lib/dispatchcloud/test/lame_instance_set.go @@ -0,0 +1,118 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package test + +import ( + "fmt" + "math/rand" + "sync" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "golang.org/x/crypto/ssh" +) + +// LameInstanceSet creates instances that boot but can't run +// containers. +type LameInstanceSet struct { + Hold chan bool // set to make(chan bool) to hold operations until Release is called + + mtx sync.Mutex + instances map[*lameInstance]bool +} + +// Create returns a new instance. +func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) { + inst := &lameInstance{ + p: p, + id: cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())), + providerType: instType.ProviderType, + } + inst.SetTags(tags) + if p.Hold != nil { + p.Hold <- true + } + p.mtx.Lock() + defer p.mtx.Unlock() + if p.instances == nil { + p.instances = map[*lameInstance]bool{} + } + p.instances[inst] = true + return inst, nil +} + +// Instances returns the instances that haven't been destroyed. +func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) { + p.mtx.Lock() + defer p.mtx.Unlock() + var instances []cloud.Instance + for i := range p.instances { + instances = append(instances, i) + } + return instances, nil +} + +// Stop is a no-op, but exists to satisfy cloud.InstanceSet. +func (p *LameInstanceSet) Stop() { +} + +// Release n held calls. Blocks if n calls aren't already +// waiting. Blocks forever if Hold is nil. +func (p *LameInstanceSet) Release(n int) { + for i := 0; i < n; i++ { + <-p.Hold + } +} + +type lameInstance struct { + p *LameInstanceSet + id cloud.InstanceID + providerType string + tags cloud.InstanceTags +} + +func (inst *lameInstance) ID() cloud.InstanceID { + return inst.id +} + +func (inst *lameInstance) String() string { + return fmt.Sprint(inst.id) +} + +func (inst *lameInstance) ProviderType() string { + return inst.providerType +} + +func (inst *lameInstance) Address() string { + return "0.0.0.0:1234" +} + +func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error { + inst.p.mtx.Lock() + defer inst.p.mtx.Unlock() + inst.tags = cloud.InstanceTags{} + for k, v := range tags { + inst.tags[k] = v + } + return nil +} + +func (inst *lameInstance) Destroy() error { + if inst.p.Hold != nil { + inst.p.Hold <- true + } + inst.p.mtx.Lock() + defer inst.p.mtx.Unlock() + delete(inst.p.instances, inst) + return nil +} + +func (inst *lameInstance) Tags() cloud.InstanceTags { + return inst.tags +} + +func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error { + return nil +} diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go new file mode 100644 index 0000000000..e18a2b5362 --- /dev/null +++ b/lib/dispatchcloud/test/queue.go @@ -0,0 +1,171 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package test + +import ( + "fmt" + "sync" + "time" + + "git.curoverse.com/arvados.git/lib/dispatchcloud/container" + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// Queue is a test stub for container.Queue. The caller specifies the +// initial queue state. +type Queue struct { + // Containers represent the API server database contents. + Containers []arvados.Container + + // ChooseType will be called for each entry in Containers. It + // must not be nil. + ChooseType func(*arvados.Container) (arvados.InstanceType, error) + + entries map[string]container.QueueEnt + updTime time.Time + subscribers map[<-chan struct{}]chan struct{} + + mtx sync.Mutex +} + +// Entries returns the containers that were queued when Update was +// last called. +func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) { + q.mtx.Lock() + defer q.mtx.Unlock() + updTime := q.updTime + r := map[string]container.QueueEnt{} + for uuid, ent := range q.entries { + r[uuid] = ent + } + return r, updTime +} + +// Get returns the container from the cached queue, i.e., as it was +// when Update was last called -- just like a container.Queue does. If +// the state has been changed (via Lock, Unlock, or Cancel) since the +// last Update, the updated state is returned. +func (q *Queue) Get(uuid string) (arvados.Container, bool) { + q.mtx.Lock() + defer q.mtx.Unlock() + ent, ok := q.entries[uuid] + return ent.Container, ok +} + +func (q *Queue) Forget(uuid string) { + q.mtx.Lock() + defer q.mtx.Unlock() + delete(q.entries, uuid) +} + +func (q *Queue) Lock(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() + return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked) +} + +func (q *Queue) Unlock(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() + return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued) +} + +func (q *Queue) Cancel(uuid string) error { + q.mtx.Lock() + defer q.mtx.Unlock() + return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled) +} + +func (q *Queue) Subscribe() <-chan struct{} { + q.mtx.Lock() + defer q.mtx.Unlock() + if q.subscribers == nil { + q.subscribers = map[<-chan struct{}]chan struct{}{} + } + ch := make(chan struct{}, 1) + q.subscribers[ch] = ch + return ch +} + +func (q *Queue) Unsubscribe(ch <-chan struct{}) { + q.mtx.Lock() + defer q.mtx.Unlock() + delete(q.subscribers, ch) +} + +// caller must have lock. +func (q *Queue) notify() { + for _, ch := range q.subscribers { + select { + case ch <- struct{}{}: + default: + } + } +} + +// caller must have lock. +func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error { + ent := q.entries[uuid] + if ent.Container.State != from { + return fmt.Errorf("changeState failed: state=%q", ent.Container.State) + } + ent.Container.State = to + q.entries[uuid] = ent + for i, ctr := range q.Containers { + if ctr.UUID == uuid { + q.Containers[i].State = to + break + } + } + q.notify() + return nil +} + +// Update rebuilds the current entries from the Containers slice. +func (q *Queue) Update() error { + q.mtx.Lock() + defer q.mtx.Unlock() + updTime := time.Now() + upd := map[string]container.QueueEnt{} + for _, ctr := range q.Containers { + _, exists := q.entries[ctr.UUID] + if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) { + continue + } + if ent, ok := upd[ctr.UUID]; ok { + ent.Container = ctr + upd[ctr.UUID] = ent + } else { + it, _ := q.ChooseType(&ctr) + upd[ctr.UUID] = container.QueueEnt{ + Container: ctr, + InstanceType: it, + } + } + } + q.entries = upd + q.updTime = updTime + q.notify() + return nil +} + +// Notify adds/updates an entry in the Containers slice. This +// simulates the effect of an API update from someone other than the +// dispatcher -- e.g., crunch-run updating state to "Complete" when a +// container exits. +// +// The resulting changes are not exposed through Get() or Entries() +// until the next call to Update(). +func (q *Queue) Notify(upd arvados.Container) { + q.mtx.Lock() + defer q.mtx.Unlock() + for i, ctr := range q.Containers { + if ctr.UUID == upd.UUID { + q.Containers[i] = upd + return + } + } + q.Containers = append(q.Containers, upd) +} diff --git a/lib/dispatchcloud/test/ssh_service.go b/lib/dispatchcloud/test/ssh_service.go new file mode 100644 index 0000000000..b1e4e03b12 --- /dev/null +++ b/lib/dispatchcloud/test/ssh_service.go @@ -0,0 +1,169 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package test + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "strings" + "sync" + + "golang.org/x/crypto/ssh" + check "gopkg.in/check.v1" +) + +func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) { + rawpubkey, err := ioutil.ReadFile(fnm + ".pub") + c.Assert(err, check.IsNil) + pubkey, _, _, _, err := ssh.ParseAuthorizedKey(rawpubkey) + c.Assert(err, check.IsNil) + rawprivkey, err := ioutil.ReadFile(fnm) + c.Assert(err, check.IsNil) + privkey, err := ssh.ParsePrivateKey(rawprivkey) + c.Assert(err, check.IsNil) + return pubkey, privkey +} + +// An SSHExecFunc handles an "exec" session on a multiplexed SSH +// connection. +type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 + +// An SSHService accepts SSH connections on an available TCP port and +// passes clients' "exec" sessions to the provided SSHExecFunc. +type SSHService struct { + Exec SSHExecFunc + HostKey ssh.Signer + AuthorizedKeys []ssh.PublicKey + + listener net.Listener + conn *ssh.ServerConn + setup sync.Once + mtx sync.Mutex + started chan bool + closed bool + err error +} + +// Address returns the host:port where the SSH server is listening. It +// returns "" if called before the server is ready to accept +// connections. +func (ss *SSHService) Address() string { + ss.setup.Do(ss.start) + ss.mtx.Lock() + ln := ss.listener + ss.mtx.Unlock() + if ln == nil { + return "" + } + return ln.Addr().String() +} + +// Close shuts down the server and releases resources. Established +// connections are unaffected. +func (ss *SSHService) Close() { + ss.Start() + ss.mtx.Lock() + ln := ss.listener + ss.closed = true + ss.mtx.Unlock() + if ln != nil { + ln.Close() + } +} + +// Start returns when the server is ready to accept connections. +func (ss *SSHService) Start() error { + ss.setup.Do(ss.start) + <-ss.started + return ss.err +} + +func (ss *SSHService) start() { + ss.started = make(chan bool) + go ss.run() +} + +func (ss *SSHService) run() { + defer close(ss.started) + config := &ssh.ServerConfig{ + PublicKeyCallback: func(c ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) { + for _, ak := range ss.AuthorizedKeys { + if bytes.Equal(ak.Marshal(), pubKey.Marshal()) { + return &ssh.Permissions{}, nil + } + } + return nil, fmt.Errorf("unknown public key for %q", c.User()) + }, + } + config.AddHostKey(ss.HostKey) + + listener, err := net.Listen("tcp", ":") + if err != nil { + ss.err = err + return + } + + ss.mtx.Lock() + ss.listener = listener + ss.mtx.Unlock() + + go func() { + for { + nConn, err := listener.Accept() + if err != nil && strings.Contains(err.Error(), "use of closed network connection") && ss.closed { + return + } else if err != nil { + log.Printf("accept: %s", err) + return + } + go ss.serveConn(nConn, config) + } + }() +} + +func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) { + defer nConn.Close() + conn, newchans, reqs, err := ssh.NewServerConn(nConn, config) + if err != nil { + log.Printf("ssh.NewServerConn: %s", err) + return + } + defer conn.Close() + go ssh.DiscardRequests(reqs) + for newch := range newchans { + if newch.ChannelType() != "session" { + newch.Reject(ssh.UnknownChannelType, "unknown channel type") + continue + } + ch, reqs, err := newch.Accept() + if err != nil { + log.Printf("accept channel: %s", err) + return + } + var execReq struct { + Command string + } + go func() { + for req := range reqs { + if req.Type == "exec" && execReq.Command == "" { + req.Reply(true, nil) + ssh.Unmarshal(req.Payload, &execReq) + go func() { + var resp struct { + Status uint32 + } + resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr()) + ch.SendRequest("exit-status", false, ssh.Marshal(&resp)) + ch.Close() + }() + } + } + }() + } +} diff --git a/lib/dispatchcloud/test/sshkey_dispatch b/lib/dispatchcloud/test/sshkey_dispatch new file mode 100644 index 0000000000..5584519c78 --- /dev/null +++ b/lib/dispatchcloud/test/sshkey_dispatch @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAqYm4XsQHm8sBSZFwUX5VeW1OkGsfoNzcGPG2nzzYRhNhClYZ +0ABHhUk82HkaC/8l6d/jpYTf42HrK42nNQ0r0Yzs7qw8yZMQioK4Yk+kFyVLF78E +GRG4pGAWXFs6pUchs/lm8fo9zcda4R3XeqgI+NO+nEERXmdRJa1FhI+Za3/S/+CV +mg+6O00wZz2+vKmDPptGN4MCKmQOCKsMJts7wSZGyVcTtdNv7jjfr6yPAIOIL8X7 +LtarBCFaK/pD7uWll/Uj7h7D8K48nIZUrvBJJjXL8Sm4LxCNoz3Z83k8J5ZzuDRD +gRiQe/C085mhO6VL+2fypDLwcKt1tOL8fI81MwIDAQABAoIBACR3tEnmHsDbNOav +Oxq8cwRQh9K2yDHg8BMJgz/TZa4FIx2HEbxVIw0/iLADtJ+Z/XzGJQCIiWQuvtg6 +exoFQESt7JUWRWkSkj9JCQJUoTY9Vl7APtBpqG7rIEQzd3TvzQcagZNRQZQO6rR7 +p8sBdBSZ72lK8cJ9tM3G7Kor/VNK7KgRZFNhEWnmvEa3qMd4hzDcQ4faOn7C9NZK +dwJAuJVVfwOLlOORYcyEkvksLaDOK2DsB/p0AaCpfSmThRbBKN5fPXYaKgUdfp3w +70Hpp27WWymb1cgjyqSH3DY+V/kvid+5QxgxCBRq865jPLn3FFT9bWEVS/0wvJRj +iMIRrjECgYEA4Ffv9rBJXqVXonNQbbstd2PaprJDXMUy9/UmfHL6pkq1xdBeuM7v +yf2ocXheA8AahHtIOhtgKqwv/aRhVK0ErYtiSvIk+tXG+dAtj/1ZAKbKiFyxjkZV +X72BH7cTlR6As5SRRfWM/HaBGEgED391gKsI5PyMdqWWdczT5KfxAksCgYEAwXYE +ewPmV1GaR5fbh2RupoPnUJPMj36gJCnwls7sGaXDQIpdlq56zfKgrLocGXGgj+8f +QH7FHTJQO15YCYebtsXWwB3++iG43gVlJlecPAydsap2CCshqNWC5JU5pan0QzsP +exzNzWqfUPSbTkR2SRaN+MenZo2Y/WqScOAth7kCgYBgVoLujW9EXH5QfXJpXLq+ +jTvE38I7oVcs0bJwOLPYGzcJtlwmwn6IYAwohgbhV2pLv+EZSs42JPEK278MLKxY +lgVkp60npgunFTWroqDIvdc1TZDVxvA8h9VeODEJlSqxczgbMcIUXBM9yRctTI+5 +7DiKlMUA4kTFW2sWwuOlFwKBgGXvrYS0FVbFJKm8lmvMu5D5x5RpjEu/yNnFT4Pn +G/iXoz4Kqi2PWh3STl804UF24cd1k94D7hDoReZCW9kJnz67F+C67XMW+bXi2d1O +JIBvlVfcHb1IHMA9YG7ZQjrMRmx2Xj3ce4RVPgUGHh8ra7gvLjd72/Tpf0doNClN +ti/hAoGBAMW5D3LhU05LXWmOqpeT4VDgqk4MrTBcstVe7KdVjwzHrVHCAmI927vI +pjpphWzpC9m3x4OsTNf8m+g6H7f3IiQS0aiFNtduXYlcuT5FHS2fSATTzg5PBon9 +1E6BudOve+WyFyBs7hFWAqWFBdWujAl4Qk5Ek09U2ilFEPE7RTgJ +-----END RSA PRIVATE KEY----- diff --git a/lib/dispatchcloud/test/sshkey_dispatch.pub b/lib/dispatchcloud/test/sshkey_dispatch.pub new file mode 100644 index 0000000000..1d5c1ea1b1 --- /dev/null +++ b/lib/dispatchcloud/test/sshkey_dispatch.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCpibhexAebywFJkXBRflV5bU6Qax+g3NwY8bafPNhGE2EKVhnQAEeFSTzYeRoL/yXp3+OlhN/jYesrjac1DSvRjOzurDzJkxCKgrhiT6QXJUsXvwQZEbikYBZcWzqlRyGz+Wbx+j3Nx1rhHdd6qAj4076cQRFeZ1ElrUWEj5lrf9L/4JWaD7o7TTBnPb68qYM+m0Y3gwIqZA4Iqwwm2zvBJkbJVxO102/uON+vrI8Ag4gvxfsu1qsEIVor+kPu5aWX9SPuHsPwrjychlSu8EkmNcvxKbgvEI2jPdnzeTwnlnO4NEOBGJB78LTzmaE7pUv7Z/KkMvBwq3W04vx8jzUz tom@curve diff --git a/lib/dispatchcloud/test/sshkey_vm b/lib/dispatchcloud/test/sshkey_vm new file mode 100644 index 0000000000..10b7ed1bc4 --- /dev/null +++ b/lib/dispatchcloud/test/sshkey_vm @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpQIBAAKCAQEApIfWk2StZGDtmunumIeXLJ46AQrbHHvuxrSAkQf6+zUwjB2I +rse7ezBRHWcge9U5EsigixmhUM4ozFLnUQNwC862jbmsjbyA97arG/REECNlUrEB +HQPYHhai5yyJ89AfjWVxKyINfW0K2HX1R8nl4kdVraAgpohPLh0dGjfwzm/BcXDG ++TxW9zRz0KCs9ZRI6s2MNdv08ahKQ0azk8gRTqMADJmYNWIo3zPQ+fhlwyr6EZJ/ +HFbRtjpajEPMJPwoVPO+Wj6wztfHDYKkPIrIWbhMl6w+tEKdsmygd3Iq94ktLS3X +AbRCfn4njS2QSlkKFEepkUJWCSSWZgFn6DLm2wIDAQABAoIBAQCb137LxcTnG1h0 +L7isCWKMBKN0cU/xvwIAfOB6f1CfuVXuodrhkpZmrPFoJFKEeQbCX/6RQwmlfGDw +iGZKOjNbO8V2oLRs3GxcNk4FAG2ny58hoD8puIZwmYhb57gTlMMOL1PuQyb78tkf +Bzv5b6ermV3yQ4Ypt1solrMGLo6NOZD0oDX9p0Zt9kueIhjzgP0v5//T1F4PGHZK ++sLSsMiu9u6F+PB+Oc6uv0Zee9Lnts/QiWH5f18oEculjwKWFx+JwJWiLffGg2Bl +vbpmvHFRoRWkHTpgSiLwSUqs0ZUWU9R5h11ROg5L39MLsxQoBvHsPEnP5ssN8jGt +aH86EZjBAoGBAM+A5B/UjhIn9m05EhDTDRzI92hGhM8f7uAwobbnjvIQyZbWlBwj +2TmgbJdpTGVbD+iTBIwKQdcFBbWobTCZsNMpghqA/ir4YIAnZ5OX9VQ1Bc+bWE7V +dPmMVpCgyg+ERAe+79FrYWcI3vhnBpHCsY/9p9pGQIKDzlGTWNF1HJGjAoGBAMr7 +2CTVnFImTgD3E+rH4AAAfkz+cyqfK6BUhli/NifFYZhWCs16r9QCGSORnp4gPhMY +3mf7VBs9rk123zOMo89eJt3adTgbZ+QIxXeXilGXpbT3w1+CJMaZRrIy80E1tB5/ +KvDZcrZ78o8XWMNUa+9k55ukvgyC24ICAmOIWNlpAoGBALEFvphBF2r52MtZUsYz +pw4VjKvS7V5eWcW891k4tsRf+frK2NQg6SK2b63EUT5ur2W0dr6ZyY2MZVCSfYRm +uWmMEchWn389IeZyt3Q8wTize1+foXivtflm9jqwUXFnXzpUc/du6kuiT8YO7pXP +SPgUZ+xY3pP5qjwBvlYC2PqNAoGAZ1CKMi1bdGC0wT8BLzXuqHGX136HhcEgRmnf +O5qPaOzJAO2CcBWrGuC6hOUgc+F7VuMIiKpeo8LgTeNcNfO2iNymMbN4iEdCuMlS +IM3MBD2IhTS6h4lJSKBJYHgYYi+AbylQ5Of4wDMUQYqjjkAQ8/dK/2h5pwqPyXtW +VezXNEkCgYEAq4S0++y9tjlLn+w9BIkmx3bAVRDQZIzIEwxTh+jpqaUp1J0iyseJ +71pwqQojGNF6x8GglVXa6bMrETae21WhEeHnWmzlpCWIODsYPUQ+erjDuAWi9eGk +HLklqSEoLB8pzC6zDqjxDw+CnGERIDSaoaeoWiNKZ95IH1WiEwYjuxU= +-----END RSA PRIVATE KEY----- diff --git a/lib/dispatchcloud/test/sshkey_vm.pub b/lib/dispatchcloud/test/sshkey_vm.pub new file mode 100644 index 0000000000..b9d44c946c --- /dev/null +++ b/lib/dispatchcloud/test/sshkey_vm.pub @@ -0,0 +1 @@ +ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCkh9aTZK1kYO2a6e6Yh5csnjoBCtsce+7GtICRB/r7NTCMHYiux7t7MFEdZyB71TkSyKCLGaFQzijMUudRA3ALzraNuayNvID3tqsb9EQQI2VSsQEdA9geFqLnLInz0B+NZXErIg19bQrYdfVHyeXiR1WtoCCmiE8uHR0aN/DOb8FxcMb5PFb3NHPQoKz1lEjqzYw12/TxqEpDRrOTyBFOowAMmZg1YijfM9D5+GXDKvoRkn8cVtG2OlqMQ8wk/ChU875aPrDO18cNgqQ8ishZuEyXrD60Qp2ybKB3cir3iS0tLdcBtEJ+fieNLZBKWQoUR6mRQlYJJJZmAWfoMubb tom@curve diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go new file mode 100644 index 0000000000..8bdfaa947d --- /dev/null +++ b/lib/dispatchcloud/test/stub_driver.go @@ -0,0 +1,318 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package test + +import ( + "crypto/rand" + "errors" + "fmt" + "io" + math_rand "math/rand" + "regexp" + "strings" + "sync" + "time" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" + "github.com/mitchellh/mapstructure" + "golang.org/x/crypto/ssh" +) + +// A StubDriver implements cloud.Driver by setting up local SSH +// servers that do fake command executions. +type StubDriver struct { + HostKey ssh.Signer + AuthorizedKeys []ssh.PublicKey + + // SetupVM, if set, is called upon creation of each new + // StubVM. This is the caller's opportunity to customize the + // VM's error rate and other behaviors. + SetupVM func(*StubVM) + + // StubVM's fake crunch-run uses this Queue to read and update + // container state. + Queue *Queue + + // Frequency of artificially introduced errors on calls to + // Destroy. 0=always succeed, 1=always fail. + ErrorRateDestroy float64 + + instanceSets []*StubInstanceSet +} + +// InstanceSet returns a new *StubInstanceSet. +func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) { + sis := StubInstanceSet{ + driver: sd, + servers: map[cloud.InstanceID]*StubVM{}, + } + sd.instanceSets = append(sd.instanceSets, &sis) + return &sis, mapstructure.Decode(params, &sis) +} + +// InstanceSets returns all instances that have been created by the +// driver. This can be used to test a component that uses the driver +// but doesn't expose the InstanceSets it has created. +func (sd *StubDriver) InstanceSets() []*StubInstanceSet { + return sd.instanceSets +} + +type StubInstanceSet struct { + driver *StubDriver + servers map[cloud.InstanceID]*StubVM + mtx sync.RWMutex + stopped bool +} + +func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) { + sis.mtx.Lock() + defer sis.mtx.Unlock() + if sis.stopped { + return nil, errors.New("StubInstanceSet: Create called after Stop") + } + ak := sis.driver.AuthorizedKeys + if authKey != nil { + ak = append([]ssh.PublicKey{authKey}, ak...) + } + svm := &StubVM{ + sis: sis, + id: cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())), + tags: copyTags(tags), + providerType: it.ProviderType, + } + svm.SSHService = SSHService{ + HostKey: sis.driver.HostKey, + AuthorizedKeys: ak, + Exec: svm.Exec, + } + if setup := sis.driver.SetupVM; setup != nil { + setup(svm) + } + sis.servers[svm.id] = svm + return svm.Instance(), nil +} + +func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) { + sis.mtx.RLock() + defer sis.mtx.RUnlock() + var r []cloud.Instance + for _, ss := range sis.servers { + r = append(r, ss.Instance()) + } + return r, nil +} + +func (sis *StubInstanceSet) Stop() { + sis.mtx.Lock() + defer sis.mtx.Unlock() + if sis.stopped { + panic("Stop called twice") + } + sis.stopped = true +} + +// StubVM is a fake server that runs an SSH service. It represents a +// VM running in a fake cloud. +// +// Note this is distinct from a stubInstance, which is a snapshot of +// the VM's metadata. Like a VM in a real cloud, a StubVM keeps +// running (and might change IP addresses, shut down, etc.) without +// updating any stubInstances that have been returned to callers. +type StubVM struct { + Boot time.Time + Broken time.Time + CrunchRunMissing bool + CrunchRunCrashRate float64 + CrunchRunDetachDelay time.Duration + ExecuteContainer func(arvados.Container) int + + sis *StubInstanceSet + id cloud.InstanceID + tags cloud.InstanceTags + providerType string + SSHService SSHService + running map[string]bool + sync.Mutex +} + +func (svm *StubVM) Instance() stubInstance { + svm.Lock() + defer svm.Unlock() + return stubInstance{ + svm: svm, + addr: svm.SSHService.Address(), + // We deliberately return a cached/stale copy of the + // real tags here, so that (Instance)Tags() sometimes + // returns old data after a call to + // (Instance)SetTags(). This is permitted by the + // driver interface, and this might help remind + // callers that they need to tolerate it. + tags: copyTags(svm.tags), + } +} + +func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 { + queue := svm.sis.driver.Queue + uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command) + if eta := svm.Boot.Sub(time.Now()); eta > 0 { + fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta) + return 1 + } + if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) { + fmt.Fprintf(stderr, "cannot fork\n") + return 2 + } + if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") { + fmt.Fprint(stderr, "crunch-run: command not found\n") + return 1 + } + if strings.HasPrefix(command, "crunch-run --detach ") { + svm.Lock() + if svm.running == nil { + svm.running = map[string]bool{} + } + svm.running[uuid] = true + svm.Unlock() + time.Sleep(svm.CrunchRunDetachDelay) + fmt.Fprintf(stderr, "starting %s\n", uuid) + logger := logrus.WithField("ContainerUUID", uuid) + logger.Printf("[test] starting crunch-run stub") + go func() { + crashluck := math_rand.Float64() + ctr, ok := queue.Get(uuid) + if !ok { + logger.Print("[test] container not in queue") + return + } + if crashluck > svm.CrunchRunCrashRate/2 { + time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond) + ctr.State = arvados.ContainerStateRunning + queue.Notify(ctr) + } + + time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond) + svm.Lock() + _, running := svm.running[uuid] + svm.Unlock() + if !running { + logger.Print("[test] container was killed") + return + } + if svm.ExecuteContainer != nil { + ctr.ExitCode = svm.ExecuteContainer(ctr) + } + // TODO: Check whether the stub instance has + // been destroyed, and if so, don't call + // queue.Notify. Then "container finished + // twice" can be classified as a bug. + if crashluck < svm.CrunchRunCrashRate { + logger.Print("[test] crashing crunch-run stub") + } else { + ctr.State = arvados.ContainerStateComplete + queue.Notify(ctr) + } + logger.Print("[test] exiting crunch-run stub") + svm.Lock() + defer svm.Unlock() + delete(svm.running, uuid) + }() + return 0 + } + if command == "crunch-run --list" { + svm.Lock() + defer svm.Unlock() + for uuid := range svm.running { + fmt.Fprintf(stdout, "%s\n", uuid) + } + return 0 + } + if strings.HasPrefix(command, "crunch-run --kill ") { + svm.Lock() + defer svm.Unlock() + if svm.running[uuid] { + delete(svm.running, uuid) + } else { + fmt.Fprintf(stderr, "%s: container is not running\n", uuid) + } + return 0 + } + if command == "true" { + return 0 + } + fmt.Fprintf(stderr, "%q: command not found", command) + return 1 +} + +type stubInstance struct { + svm *StubVM + addr string + tags cloud.InstanceTags +} + +func (si stubInstance) ID() cloud.InstanceID { + return si.svm.id +} + +func (si stubInstance) Address() string { + return si.addr +} + +func (si stubInstance) Destroy() error { + if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy { + return errors.New("instance could not be destroyed") + } + si.svm.SSHService.Close() + sis := si.svm.sis + sis.mtx.Lock() + defer sis.mtx.Unlock() + delete(sis.servers, si.svm.id) + return nil +} + +func (si stubInstance) ProviderType() string { + return si.svm.providerType +} + +func (si stubInstance) SetTags(tags cloud.InstanceTags) error { + tags = copyTags(tags) + svm := si.svm + go func() { + svm.Lock() + defer svm.Unlock() + svm.tags = tags + }() + return nil +} + +func (si stubInstance) Tags() cloud.InstanceTags { + return si.tags +} + +func (si stubInstance) String() string { + return string(si.svm.id) +} + +func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error { + buf := make([]byte, 512) + _, err := io.ReadFull(rand.Reader, buf) + if err != nil { + return err + } + sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf) + if err != nil { + return err + } + return key.Verify(buf, sig) +} + +func copyTags(src cloud.InstanceTags) cloud.InstanceTags { + dst := cloud.InstanceTags{} + for k, v := range src { + dst[k] = v + } + return dst +} diff --git a/lib/dispatchcloud/worker/gocheck_test.go b/lib/dispatchcloud/worker/gocheck_test.go new file mode 100644 index 0000000000..b4ca66c97c --- /dev/null +++ b/lib/dispatchcloud/worker/gocheck_test.go @@ -0,0 +1,16 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package worker + +import ( + "testing" + + check "gopkg.in/check.v1" +) + +// Gocheck boilerplate +func Test(t *testing.T) { + check.TestingT(t) +} diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go new file mode 100644 index 0000000000..ff5f762c1d --- /dev/null +++ b/lib/dispatchcloud/worker/pool.go @@ -0,0 +1,684 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package worker + +import ( + "io" + "sort" + "strings" + "sync" + "time" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + tagKeyInstanceType = "InstanceType" + tagKeyHold = "Hold" +) + +// An InstanceView shows a worker's current state and recent activity. +type InstanceView struct { + Instance string + Price float64 + ArvadosInstanceType string + ProviderInstanceType string + LastContainerUUID string + LastBusy time.Time + WorkerState string +} + +// An Executor executes shell commands on a remote host. +type Executor interface { + // Run cmd on the current target. + Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error) + + // Use the given target for subsequent operations. The new + // target is the same host as the previous target, but it + // might return a different address and verify a different + // host key. + // + // SetTarget is called frequently, and in most cases the new + // target will behave exactly the same as the old one. An + // implementation should optimize accordingly. + // + // SetTarget must not block on concurrent Execute calls. + SetTarget(cloud.ExecutorTarget) + + Close() +} + +const ( + defaultSyncInterval = time.Minute + defaultProbeInterval = time.Second * 10 + defaultMaxProbesPerSecond = 10 + defaultTimeoutIdle = time.Minute + defaultTimeoutBooting = time.Minute * 10 + defaultTimeoutProbe = time.Minute * 10 + defaultTimeoutShutdown = time.Second * 10 +) + +func duration(conf arvados.Duration, def time.Duration) time.Duration { + if conf > 0 { + return time.Duration(conf) + } else { + return def + } +} + +// NewPool creates a Pool of workers backed by instanceSet. +// +// New instances are configured and set up according to the given +// cluster configuration. +func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool { + wp := &Pool{ + logger: logger, + instanceSet: instanceSet, + newExecutor: newExecutor, + bootProbeCommand: cluster.CloudVMs.BootProbeCommand, + imageID: cloud.ImageID(cluster.CloudVMs.ImageID), + instanceTypes: cluster.InstanceTypes, + maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond, + probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval), + syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval), + timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle), + timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting), + timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe), + timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown), + stop: make(chan bool), + } + wp.registerMetrics(reg) + go func() { + wp.setupOnce.Do(wp.setup) + go wp.runMetrics() + go wp.runProbes() + go wp.runSync() + }() + return wp +} + +// Pool is a resizable worker pool backed by a cloud.InstanceSet. A +// zero Pool should not be used. Call NewPool to create a new Pool. +type Pool struct { + // configuration + logger logrus.FieldLogger + instanceSet cloud.InstanceSet + newExecutor func(cloud.Instance) Executor + bootProbeCommand string + imageID cloud.ImageID + instanceTypes map[string]arvados.InstanceType + syncInterval time.Duration + probeInterval time.Duration + maxProbesPerSecond int + timeoutIdle time.Duration + timeoutBooting time.Duration + timeoutProbe time.Duration + timeoutShutdown time.Duration + + // private state + subscribers map[<-chan struct{}]chan<- struct{} + creating map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls + workers map[cloud.InstanceID]*worker + loaded bool // loaded list of instances from InstanceSet at least once + exited map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called + atQuotaUntil time.Time + atQuotaErr cloud.QuotaError + stop chan bool + mtx sync.RWMutex + setupOnce sync.Once + + mInstances prometheus.Gauge + mContainersRunning prometheus.Gauge + mVCPUs prometheus.Gauge + mVCPUsInuse prometheus.Gauge + mMemory prometheus.Gauge + mMemoryInuse prometheus.Gauge +} + +// Subscribe returns a channel that becomes ready whenever a worker's +// state changes. +// +// Example: +// +// ch := wp.Subscribe() +// defer wp.Unsubscribe(ch) +// for range ch { +// // ...try scheduling some work... +// if done { +// break +// } +// } +func (wp *Pool) Subscribe() <-chan struct{} { + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() + ch := make(chan struct{}, 1) + wp.subscribers[ch] = ch + return ch +} + +// Unsubscribe stops sending updates to the given channel. +func (wp *Pool) Unsubscribe(ch <-chan struct{}) { + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() + delete(wp.subscribers, ch) +} + +// Unallocated returns the number of unallocated (creating + booting + +// idle + unknown) workers for each instance type. +func (wp *Pool) Unallocated() map[arvados.InstanceType]int { + wp.setupOnce.Do(wp.setup) + wp.mtx.RLock() + defer wp.mtx.RUnlock() + unalloc := map[arvados.InstanceType]int{} + creating := map[arvados.InstanceType]int{} + for it, times := range wp.creating { + creating[it] = len(times) + } + for _, wkr := range wp.workers { + if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) { + continue + } + it := wkr.instType + unalloc[it]++ + if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) { + // If up to N new workers appear in + // Instances() while we are waiting for N + // Create() calls to complete, we assume we're + // just seeing a race between Instances() and + // Create() responses. + // + // The other common reason why nodes have + // state==Unknown is that they appeared at + // startup, before any Create calls. They + // don't match the above timing condition, so + // we never mistakenly attribute them to + // pending Create calls. + creating[it]-- + } + } + for it, c := range creating { + unalloc[it] += c + } + return unalloc +} + +// Create a new instance with the given type, and add it to the worker +// pool. The worker is added immediately; instance creation runs in +// the background. +func (wp *Pool) Create(it arvados.InstanceType) error { + logger := wp.logger.WithField("InstanceType", it.Name) + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() + if time.Now().Before(wp.atQuotaUntil) { + return wp.atQuotaErr + } + tags := cloud.InstanceTags{tagKeyInstanceType: it.Name} + now := time.Now() + wp.creating[it] = append(wp.creating[it], now) + go func() { + defer wp.notify() + inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil) + wp.mtx.Lock() + defer wp.mtx.Unlock() + // Remove our timestamp marker from wp.creating + for i, t := range wp.creating[it] { + if t == now { + copy(wp.creating[it][i:], wp.creating[it][i+1:]) + wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1] + break + } + } + if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() { + wp.atQuotaErr = err + wp.atQuotaUntil = time.Now().Add(time.Minute) + } + if err != nil { + logger.WithError(err).Error("create failed") + return + } + wp.updateWorker(inst, it, StateBooting) + }() + return nil +} + +// AtQuota returns true if Create is not expected to work at the +// moment. +func (wp *Pool) AtQuota() bool { + wp.mtx.Lock() + defer wp.mtx.Unlock() + return time.Now().Before(wp.atQuotaUntil) +} + +// Add or update worker attached to the given instance. Use +// initialState if a new worker is created. +// +// The second return value is true if a new worker is created. +// +// Caller must have lock. +func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) { + id := inst.ID() + if wkr := wp.workers[id]; wkr != nil { + wkr.executor.SetTarget(inst) + wkr.instance = inst + wkr.updated = time.Now() + if initialState == StateBooting && wkr.state == StateUnknown { + wkr.state = StateBooting + } + return wkr, false + } + if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" { + initialState = StateHold + } + logger := wp.logger.WithFields(logrus.Fields{ + "InstanceType": it.Name, + "Instance": inst, + }) + logger.WithField("State", initialState).Infof("instance appeared in cloud") + now := time.Now() + wkr := &worker{ + mtx: &wp.mtx, + wp: wp, + logger: logger, + executor: wp.newExecutor(inst), + state: initialState, + instance: inst, + instType: it, + appeared: now, + probed: now, + busy: now, + updated: now, + running: make(map[string]struct{}), + starting: make(map[string]struct{}), + probing: make(chan struct{}, 1), + } + wp.workers[id] = wkr + return wkr, true +} + +// caller must have lock. +func (wp *Pool) notifyExited(uuid string, t time.Time) { + wp.exited[uuid] = t +} + +// Shutdown shuts down a worker with the given type, or returns false +// if all workers with the given type are busy. +func (wp *Pool) Shutdown(it arvados.InstanceType) bool { + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() + logger := wp.logger.WithField("InstanceType", it.Name) + logger.Info("shutdown requested") + for _, tryState := range []State{StateBooting, StateIdle} { + // TODO: shutdown the worker with the longest idle + // time (Idle) or the earliest create time (Booting) + for _, wkr := range wp.workers { + if wkr.state == tryState && wkr.instType == it { + logger.WithField("Instance", wkr.instance).Info("shutting down") + wkr.shutdown() + return true + } + } + } + return false +} + +// CountWorkers returns the current number of workers in each state. +func (wp *Pool) CountWorkers() map[State]int { + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() + r := map[State]int{} + for _, w := range wp.workers { + r[w.state]++ + } + return r +} + +// Running returns the container UUIDs being prepared/run on workers. +func (wp *Pool) Running() map[string]time.Time { + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() + r := map[string]time.Time{} + for _, wkr := range wp.workers { + for uuid := range wkr.running { + r[uuid] = time.Time{} + } + for uuid := range wkr.starting { + r[uuid] = time.Time{} + } + } + for uuid, exited := range wp.exited { + r[uuid] = exited + } + return r +} + +// StartContainer starts a container on an idle worker immediately if +// possible, otherwise returns false. +func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool { + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + defer wp.mtx.Unlock() + var wkr *worker + for _, w := range wp.workers { + if w.instType == it && w.state == StateIdle { + if wkr == nil || w.busy.After(wkr.busy) { + wkr = w + } + } + } + if wkr == nil { + return false + } + wkr.startContainer(ctr) + return true +} + +// KillContainer kills the crunch-run process for the given container +// UUID, if it's running on any worker. +// +// KillContainer returns immediately; the act of killing the container +// takes some time, and runs in the background. +func (wp *Pool) KillContainer(uuid string) { + wp.mtx.Lock() + defer wp.mtx.Unlock() + if _, ok := wp.exited[uuid]; ok { + wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process") + delete(wp.exited, uuid) + return + } + for _, wkr := range wp.workers { + if _, ok := wkr.running[uuid]; ok { + go wp.kill(wkr, uuid) + return + } + } + wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared") +} + +func (wp *Pool) kill(wkr *worker, uuid string) { + logger := wp.logger.WithFields(logrus.Fields{ + "ContainerUUID": uuid, + "Instance": wkr.instance, + }) + logger.Debug("killing process") + stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil) + if err != nil { + logger.WithFields(logrus.Fields{ + "stderr": string(stderr), + "stdout": string(stdout), + "error": err, + }).Warn("kill failed") + return + } + logger.Debug("killing process succeeded") + wp.mtx.Lock() + defer wp.mtx.Unlock() + if _, ok := wkr.running[uuid]; ok { + delete(wkr.running, uuid) + if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 { + wkr.state = StateIdle + } + wkr.updated = time.Now() + go wp.notify() + } +} + +func (wp *Pool) registerMetrics(reg *prometheus.Registry) { + if reg == nil { + reg = prometheus.NewRegistry() + } + wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "instances_total", + Help: "Number of cloud VMs including pending, booting, running, held, and shutting down.", + }) + reg.MustRegister(wp.mInstances) + wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "containers_running", + Help: "Number of containers reported running by cloud VMs.", + }) + reg.MustRegister(wp.mContainersRunning) + + wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "vcpus_total", + Help: "Total VCPUs on all cloud VMs.", + }) + reg.MustRegister(wp.mVCPUs) + wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "vcpus_inuse", + Help: "VCPUs on cloud VMs that are running containers.", + }) + reg.MustRegister(wp.mVCPUsInuse) + wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "memory_bytes_total", + Help: "Total memory on all cloud VMs.", + }) + reg.MustRegister(wp.mMemory) + wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "arvados", + Subsystem: "dispatchcloud", + Name: "memory_bytes_inuse", + Help: "Memory on cloud VMs that are running containers.", + }) + reg.MustRegister(wp.mMemoryInuse) +} + +func (wp *Pool) runMetrics() { + ch := wp.Subscribe() + defer wp.Unsubscribe(ch) + for range ch { + wp.updateMetrics() + } +} + +func (wp *Pool) updateMetrics() { + wp.mtx.RLock() + defer wp.mtx.RUnlock() + + var alloc, cpu, cpuInuse, mem, memInuse int64 + for _, wkr := range wp.workers { + cpu += int64(wkr.instType.VCPUs) + mem += int64(wkr.instType.RAM) + if len(wkr.running)+len(wkr.starting) == 0 { + continue + } + alloc += int64(len(wkr.running) + len(wkr.starting)) + cpuInuse += int64(wkr.instType.VCPUs) + memInuse += int64(wkr.instType.RAM) + } + wp.mInstances.Set(float64(len(wp.workers))) + wp.mContainersRunning.Set(float64(alloc)) + wp.mVCPUs.Set(float64(cpu)) + wp.mMemory.Set(float64(mem)) + wp.mVCPUsInuse.Set(float64(cpuInuse)) + wp.mMemoryInuse.Set(float64(memInuse)) +} + +func (wp *Pool) runProbes() { + maxPPS := wp.maxProbesPerSecond + if maxPPS < 1 { + maxPPS = defaultMaxProbesPerSecond + } + limitticker := time.NewTicker(time.Second / time.Duration(maxPPS)) + defer limitticker.Stop() + + probeticker := time.NewTicker(wp.probeInterval) + defer probeticker.Stop() + + workers := []cloud.InstanceID{} + for range probeticker.C { + workers = workers[:0] + wp.mtx.Lock() + for id, wkr := range wp.workers { + if wkr.state == StateShutdown || wkr.shutdownIfIdle() { + continue + } + workers = append(workers, id) + } + wp.mtx.Unlock() + + for _, id := range workers { + wp.mtx.Lock() + wkr, ok := wp.workers[id] + wp.mtx.Unlock() + if !ok { + // Deleted while we were probing + // others + continue + } + go wkr.ProbeAndUpdate() + select { + case <-wp.stop: + return + case <-limitticker.C: + } + } + } +} + +func (wp *Pool) runSync() { + // sync once immediately, then wait syncInterval, sync again, + // etc. + timer := time.NewTimer(1) + for { + select { + case <-timer.C: + err := wp.getInstancesAndSync() + if err != nil { + wp.logger.WithError(err).Warn("sync failed") + } + timer.Reset(wp.syncInterval) + case <-wp.stop: + wp.logger.Debug("worker.Pool stopped") + return + } + } +} + +// Stop synchronizing with the InstanceSet. +func (wp *Pool) Stop() { + wp.setupOnce.Do(wp.setup) + close(wp.stop) +} + +// Instances returns an InstanceView for each worker in the pool, +// summarizing its current state and recent activity. +func (wp *Pool) Instances() []InstanceView { + var r []InstanceView + wp.setupOnce.Do(wp.setup) + wp.mtx.Lock() + for _, w := range wp.workers { + r = append(r, InstanceView{ + Instance: w.instance.String(), + Price: w.instType.Price, + ArvadosInstanceType: w.instType.Name, + ProviderInstanceType: w.instType.ProviderType, + LastContainerUUID: w.lastUUID, + LastBusy: w.busy, + WorkerState: w.state.String(), + }) + } + wp.mtx.Unlock() + sort.Slice(r, func(i, j int) bool { + return strings.Compare(r[i].Instance, r[j].Instance) < 0 + }) + return r +} + +func (wp *Pool) setup() { + wp.creating = map[arvados.InstanceType][]time.Time{} + wp.exited = map[string]time.Time{} + wp.workers = map[cloud.InstanceID]*worker{} + wp.subscribers = map[<-chan struct{}]chan<- struct{}{} +} + +func (wp *Pool) notify() { + wp.mtx.RLock() + defer wp.mtx.RUnlock() + for _, send := range wp.subscribers { + select { + case send <- struct{}{}: + default: + } + } +} + +func (wp *Pool) getInstancesAndSync() error { + wp.setupOnce.Do(wp.setup) + wp.logger.Debug("getting instance list") + threshold := time.Now() + instances, err := wp.instanceSet.Instances(cloud.InstanceTags{}) + if err != nil { + return err + } + wp.sync(threshold, instances) + wp.logger.Debug("sync done") + return nil +} + +// Add/remove/update workers based on instances, which was obtained +// from the instanceSet. However, don't clobber any other updates that +// already happened after threshold. +func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) { + wp.mtx.Lock() + defer wp.mtx.Unlock() + wp.logger.WithField("Instances", len(instances)).Debug("sync instances") + notify := false + + for _, inst := range instances { + itTag := inst.Tags()[tagKeyInstanceType] + it, ok := wp.instanceTypes[itTag] + if !ok { + wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag) + continue + } + if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew { + notify = true + } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown { + wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying") + wkr.shutdown() + } + } + + for id, wkr := range wp.workers { + if wkr.updated.After(threshold) { + continue + } + logger := wp.logger.WithFields(logrus.Fields{ + "Instance": wkr.instance, + "WorkerState": wkr.state, + }) + logger.Info("instance disappeared in cloud") + delete(wp.workers, id) + go wkr.executor.Close() + notify = true + } + + if !wp.loaded { + wp.loaded = true + wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list") + } + + if notify { + go wp.notify() + } +} diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go new file mode 100644 index 0000000000..3867e2c63e --- /dev/null +++ b/lib/dispatchcloud/worker/pool_test.go @@ -0,0 +1,135 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package worker + +import ( + "io" + "time" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/lib/dispatchcloud/test" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" + check "gopkg.in/check.v1" +) + +const GiB arvados.ByteSize = 1 << 30 + +var _ = check.Suite(&PoolSuite{}) + +type lessChecker struct { + *check.CheckerInfo +} + +func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) { + return params[0].(int) < params[1].(int), "" +} + +var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}} + +type PoolSuite struct{} + +func (suite *PoolSuite) SetUpSuite(c *check.C) { + logrus.StandardLogger().SetLevel(logrus.DebugLevel) +} + +func (suite *PoolSuite) TestStartContainer(c *check.C) { + // TODO: use an instanceSet stub with an SSH server +} + +func (suite *PoolSuite) TestVerifyHostKey(c *check.C) { + // TODO: use an instanceSet stub with an SSH server +} + +func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) { + lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)} + type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01} + type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02} + pool := &Pool{ + logger: logrus.StandardLogger(), + newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} }, + instanceSet: lameInstanceSet, + instanceTypes: arvados.InstanceTypeMap{ + type1.Name: type1, + type2.Name: type2, + }, + } + notify := pool.Subscribe() + defer pool.Unsubscribe(notify) + notify2 := pool.Subscribe() + defer pool.Unsubscribe(notify2) + + c.Check(pool.Unallocated()[type1], check.Equals, 0) + c.Check(pool.Unallocated()[type2], check.Equals, 0) + pool.Create(type2) + pool.Create(type1) + pool.Create(type2) + c.Check(pool.Unallocated()[type1], check.Equals, 1) + c.Check(pool.Unallocated()[type2], check.Equals, 2) + + // Unblock the pending Create calls. + go lameInstanceSet.Release(3) + + // Wait for each instance to either return from its Create + // call, or show up in a poll. + suite.wait(c, pool, notify, func() bool { + pool.mtx.RLock() + defer pool.mtx.RUnlock() + return len(pool.workers) == 3 + }) + + c.Check(pool.Shutdown(type2), check.Equals, true) + suite.wait(c, pool, notify, func() bool { + return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1 + }) + c.Check(pool.Shutdown(type2), check.Equals, true) + suite.wait(c, pool, notify, func() bool { + return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0 + }) + c.Check(pool.Shutdown(type2), check.Equals, false) + for { + // Consume any waiting notifications to ensure the + // next one we get is from Shutdown. + select { + case <-notify: + continue + default: + } + break + } + c.Check(pool.Shutdown(type1), check.Equals, true) + suite.wait(c, pool, notify, func() bool { + return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0 + }) + select { + case <-notify2: + case <-time.After(time.Second): + c.Error("notify did not receive") + } + go lameInstanceSet.Release(3) // unblock Destroy calls +} + +func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) { + timeout := time.NewTimer(time.Second).C + for !ready() { + select { + case <-notify: + continue + case <-timeout: + } + break + } + c.Check(ready(), check.Equals, true) +} + +type stubExecutor struct{} + +func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {} + +func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) { + return nil, nil, nil +} + +func (*stubExecutor) Close() {} diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go new file mode 100644 index 0000000000..c26186309a --- /dev/null +++ b/lib/dispatchcloud/worker/worker.go @@ -0,0 +1,320 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package worker + +import ( + "bytes" + "strings" + "sync" + "time" + + "git.curoverse.com/arvados.git/lib/cloud" + "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/Sirupsen/logrus" +) + +// State indicates whether a worker is available to do work, and (if +// not) whether/when it is expected to become ready. +type State int + +const ( + StateUnknown State = iota // might be running a container already + StateBooting // instance is booting + StateIdle // instance booted, no containers are running + StateRunning // instance is running one or more containers + StateShutdown // worker has stopped monitoring the instance + StateHold // running, but not available to run new containers +) + +const ( + // TODO: configurable + maxPingFailTime = 10 * time.Minute +) + +var stateString = map[State]string{ + StateUnknown: "unknown", + StateBooting: "booting", + StateIdle: "idle", + StateRunning: "running", + StateShutdown: "shutdown", + StateHold: "hold", +} + +// String implements fmt.Stringer. +func (s State) String() string { + return stateString[s] +} + +// MarshalText implements encoding.TextMarshaler so a JSON encoding of +// map[State]anything uses the state's string representation. +func (s State) MarshalText() ([]byte, error) { + return []byte(stateString[s]), nil +} + +type worker struct { + logger logrus.FieldLogger + executor Executor + wp *Pool + + mtx sync.Locker // must be wp's Locker. + state State + instance cloud.Instance + instType arvados.InstanceType + vcpus int64 + memory int64 + appeared time.Time + probed time.Time + updated time.Time + busy time.Time + destroyed time.Time + lastUUID string + running map[string]struct{} // remember to update state idle<->running when this changes + starting map[string]struct{} // remember to update state idle<->running when this changes + probing chan struct{} +} + +// caller must have lock. +func (wkr *worker) startContainer(ctr arvados.Container) { + logger := wkr.logger.WithFields(logrus.Fields{ + "ContainerUUID": ctr.UUID, + "Priority": ctr.Priority, + }) + logger = logger.WithField("Instance", wkr.instance) + logger.Debug("starting container") + wkr.starting[ctr.UUID] = struct{}{} + wkr.state = StateRunning + go func() { + stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil) + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + now := time.Now() + wkr.updated = now + wkr.busy = now + delete(wkr.starting, ctr.UUID) + wkr.running[ctr.UUID] = struct{}{} + wkr.lastUUID = ctr.UUID + if err != nil { + logger.WithField("stdout", string(stdout)). + WithField("stderr", string(stderr)). + WithError(err). + Error("error starting crunch-run process") + // Leave uuid in wkr.running, though: it's + // possible the error was just a communication + // failure and the process was in fact + // started. Wait for next probe to find out. + return + } + logger.Info("crunch-run process started") + wkr.lastUUID = ctr.UUID + }() +} + +// ProbeAndUpdate conducts appropriate boot/running probes (if any) +// for the worker's curent state. If a previous probe is still +// running, it does nothing. +// +// It should be called in a new goroutine. +func (wkr *worker) ProbeAndUpdate() { + select { + case wkr.probing <- struct{}{}: + wkr.probeAndUpdate() + <-wkr.probing + default: + wkr.logger.Debug("still waiting for last probe to finish") + } +} + +// should be called in a new goroutine +func (wkr *worker) probeAndUpdate() { + wkr.mtx.Lock() + updated := wkr.updated + needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle + needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting + wkr.mtx.Unlock() + if !needProbeBooted && !needProbeRunning { + return + } + + var ( + ctrUUIDs []string + ok bool + stderr []byte + ) + if needProbeBooted { + ok, stderr = wkr.probeBooted() + wkr.mtx.Lock() + if ok || wkr.state == StateRunning || wkr.state == StateIdle { + wkr.logger.Info("instance booted; will try probeRunning") + needProbeRunning = true + } + wkr.mtx.Unlock() + } + if needProbeRunning { + ctrUUIDs, ok, stderr = wkr.probeRunning() + } + logger := wkr.logger.WithField("stderr", string(stderr)) + wkr.mtx.Lock() + defer wkr.mtx.Unlock() + if !ok { + if wkr.state == StateShutdown && wkr.updated.After(updated) { + // Skip the logging noise if shutdown was + // initiated during probe. + return + } + dur := time.Since(wkr.probed) + logger := logger.WithFields(logrus.Fields{ + "Duration": dur, + "State": wkr.state, + }) + if wkr.state == StateBooting && !needProbeRunning { + // If we know the instance has never passed a + // boot probe, it's not noteworthy that it + // hasn't passed this probe. + logger.Debug("new instance not responding") + } else { + logger.Info("instance not responding") + } + wkr.shutdownIfBroken(dur) + return + } + + updateTime := time.Now() + wkr.probed = updateTime + + if updated != wkr.updated { + // Worker was updated after the probe began, so + // wkr.running might have a container UUID that was + // not yet running when ctrUUIDs was generated. Leave + // wkr.running alone and wait for the next probe to + // catch up on any changes. + return + } + + if len(ctrUUIDs) > 0 { + wkr.busy = updateTime + wkr.lastUUID = ctrUUIDs[0] + } else if len(wkr.running) > 0 { + // Actual last-busy time was sometime between wkr.busy + // and now. Now is the earliest opportunity to take + // advantage of the non-busy state, though. + wkr.busy = updateTime + } + running := map[string]struct{}{} + changed := false + for _, uuid := range ctrUUIDs { + running[uuid] = struct{}{} + if _, ok := wkr.running[uuid]; !ok { + changed = true + } + } + for uuid := range wkr.running { + if _, ok := running[uuid]; !ok { + logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended") + wkr.wp.notifyExited(uuid, updateTime) + changed = true + } + } + if wkr.state == StateUnknown || wkr.state == StateBooting { + wkr.state = StateIdle + changed = true + } + if changed { + wkr.running = running + if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 { + wkr.state = StateRunning + } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 { + wkr.state = StateIdle + } + wkr.updated = updateTime + go wkr.wp.notify() + } +} + +func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) { + cmd := "crunch-run --list" + stdout, stderr, err := wkr.executor.Execute(cmd, nil) + if err != nil { + wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }).WithError(err).Warn("probe failed") + return nil, false, stderr + } + stdout = bytes.TrimRight(stdout, "\n") + if len(stdout) == 0 { + return nil, true, stderr + } + return strings.Split(string(stdout), "\n"), true, stderr +} + +func (wkr *worker) probeBooted() (ok bool, stderr []byte) { + cmd := wkr.wp.bootProbeCommand + if cmd == "" { + cmd = "true" + } + stdout, stderr, err := wkr.executor.Execute(cmd, nil) + logger := wkr.logger.WithFields(logrus.Fields{ + "Command": cmd, + "stdout": string(stdout), + "stderr": string(stderr), + }) + if err != nil { + logger.WithError(err).Debug("boot probe failed") + return false, stderr + } + logger.Info("boot probe succeeded") + return true, stderr +} + +// caller must have lock. +func (wkr *worker) shutdownIfBroken(dur time.Duration) { + if wkr.state == StateHold { + return + } + label, threshold := "", wkr.wp.timeoutProbe + if wkr.state == StateBooting { + label, threshold = "new ", wkr.wp.timeoutBooting + } + if dur < threshold { + return + } + wkr.logger.WithFields(logrus.Fields{ + "Duration": dur, + "Since": wkr.probed, + "State": wkr.state, + }).Warnf("%sinstance unresponsive, shutting down", label) + wkr.shutdown() +} + +// caller must have lock. +func (wkr *worker) shutdownIfIdle() bool { + if wkr.state != StateIdle { + return false + } + age := time.Since(wkr.busy) + if age < wkr.wp.timeoutIdle { + return false + } + wkr.logger.WithField("Age", age).Info("shutdown idle worker") + wkr.shutdown() + return true +} + +// caller must have lock +func (wkr *worker) shutdown() { + now := time.Now() + wkr.updated = now + wkr.destroyed = now + wkr.state = StateShutdown + go wkr.wp.notify() + go func() { + err := wkr.instance.Destroy() + if err != nil { + wkr.logger.WithError(err).Warn("shutdown failed") + return + } + }() +} diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index e2e9907d5d..bfa86abf6a 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -60,6 +60,8 @@ type Cluster struct { ManagementToken string NodeProfiles map[string]NodeProfile InstanceTypes InstanceTypeMap + CloudVMs CloudVMs + Dispatch Dispatch HTTPRequestTimeout Duration RemoteClusters map[string]RemoteCluster PostgreSQL PostgreSQL @@ -95,6 +97,50 @@ type InstanceType struct { Preemptible bool } +type Dispatch struct { + // PEM encoded SSH key (RSA, DSA, or ECDSA) able to log in to + // cloud VMs. + PrivateKey []byte + + // Max time for workers to come up before abandoning stale + // locks from previous run + StaleLockTimeout Duration + + // Interval between queue polls + PollInterval Duration + + // Interval between probes to each worker + ProbeInterval Duration + + // Maximum total worker probes per second + MaxProbesPerSecond int +} + +type CloudVMs struct { + // Shell command that exits zero IFF the VM is fully booted + // and ready to run containers, e.g., "mount | grep + // /encrypted-tmp" + BootProbeCommand string + SyncInterval Duration + + // Maximum idle time before automatic shutdown + TimeoutIdle Duration + + // Maximum booting time before automatic shutdown + TimeoutBooting Duration + + // Maximum time with no successful probes before automatic shutdown + TimeoutProbe Duration + + // Time after shutdown to retry shutdown + TimeoutShutdown Duration + + ImageID string + + Driver string + DriverParameters map[string]interface{} +} + type InstanceTypeMap map[string]InstanceType var errDuplicateInstanceTypeName = errors.New("duplicate instance type name") @@ -159,45 +205,48 @@ func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) { } type NodeProfile struct { - Controller SystemServiceInstance `json:"arvados-controller"` - Health SystemServiceInstance `json:"arvados-health"` - Keepbalance SystemServiceInstance `json:"keep-balance"` - Keepproxy SystemServiceInstance `json:"keepproxy"` - Keepstore SystemServiceInstance `json:"keepstore"` - Keepweb SystemServiceInstance `json:"keep-web"` - Nodemanager SystemServiceInstance `json:"arvados-node-manager"` - RailsAPI SystemServiceInstance `json:"arvados-api-server"` - Websocket SystemServiceInstance `json:"arvados-ws"` - Workbench SystemServiceInstance `json:"arvados-workbench"` + Controller SystemServiceInstance `json:"arvados-controller"` + Health SystemServiceInstance `json:"arvados-health"` + Keepbalance SystemServiceInstance `json:"keep-balance"` + Keepproxy SystemServiceInstance `json:"keepproxy"` + Keepstore SystemServiceInstance `json:"keepstore"` + Keepweb SystemServiceInstance `json:"keep-web"` + Nodemanager SystemServiceInstance `json:"arvados-node-manager"` + DispatchCloud SystemServiceInstance `json:"arvados-dispatch-cloud"` + RailsAPI SystemServiceInstance `json:"arvados-api-server"` + Websocket SystemServiceInstance `json:"arvados-ws"` + Workbench SystemServiceInstance `json:"arvados-workbench"` } type ServiceName string const ( - ServiceNameRailsAPI ServiceName = "arvados-api-server" - ServiceNameController ServiceName = "arvados-controller" - ServiceNameNodemanager ServiceName = "arvados-node-manager" - ServiceNameWorkbench ServiceName = "arvados-workbench" - ServiceNameWebsocket ServiceName = "arvados-ws" - ServiceNameKeepbalance ServiceName = "keep-balance" - ServiceNameKeepweb ServiceName = "keep-web" - ServiceNameKeepproxy ServiceName = "keepproxy" - ServiceNameKeepstore ServiceName = "keepstore" + ServiceNameRailsAPI ServiceName = "arvados-api-server" + ServiceNameController ServiceName = "arvados-controller" + ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud" + ServiceNameNodemanager ServiceName = "arvados-node-manager" + ServiceNameWorkbench ServiceName = "arvados-workbench" + ServiceNameWebsocket ServiceName = "arvados-ws" + ServiceNameKeepbalance ServiceName = "keep-balance" + ServiceNameKeepweb ServiceName = "keep-web" + ServiceNameKeepproxy ServiceName = "keepproxy" + ServiceNameKeepstore ServiceName = "keepstore" ) // ServicePorts returns the configured listening address (or "" if // disabled) for each service on the node. func (np *NodeProfile) ServicePorts() map[ServiceName]string { return map[ServiceName]string{ - ServiceNameRailsAPI: np.RailsAPI.Listen, - ServiceNameController: np.Controller.Listen, - ServiceNameNodemanager: np.Nodemanager.Listen, - ServiceNameWorkbench: np.Workbench.Listen, - ServiceNameWebsocket: np.Websocket.Listen, - ServiceNameKeepbalance: np.Keepbalance.Listen, - ServiceNameKeepweb: np.Keepweb.Listen, - ServiceNameKeepproxy: np.Keepproxy.Listen, - ServiceNameKeepstore: np.Keepstore.Listen, + ServiceNameRailsAPI: np.RailsAPI.Listen, + ServiceNameController: np.Controller.Listen, + ServiceNameDispatchCloud: np.DispatchCloud.Listen, + ServiceNameNodemanager: np.Nodemanager.Listen, + ServiceNameWorkbench: np.Workbench.Listen, + ServiceNameWebsocket: np.Websocket.Listen, + ServiceNameKeepbalance: np.Keepbalance.Listen, + ServiceNameKeepweb: np.Keepweb.Listen, + ServiceNameKeepproxy: np.Keepproxy.Listen, + ServiceNameKeepstore: np.Keepstore.Listen, } } diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go index b70b4ac917..02a0d76dec 100644 --- a/sdk/go/arvados/container.go +++ b/sdk/go/arvados/container.go @@ -18,10 +18,11 @@ type Container struct { Mounts map[string]Mount `json:"mounts"` Output string `json:"output"` OutputPath string `json:"output_path"` - Priority int `json:"priority"` + Priority int64 `json:"priority"` RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"` State ContainerState `json:"state"` SchedulingParameters SchedulingParameters `json:"scheduling_parameters"` + ExitCode int `json:"exit_code"` } // Container is an arvados#container resource. diff --git a/sdk/go/health/aggregator_test.go b/sdk/go/health/aggregator_test.go index cb47c9e670..122355be98 100644 --- a/sdk/go/health/aggregator_test.go +++ b/sdk/go/health/aggregator_test.go @@ -107,15 +107,16 @@ func (s *AggregatorSuite) TestHealthy(c *check.C) { srv, listen := s.stubServer(&healthyHandler{}) defer srv.Close() s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{ - Controller: arvados.SystemServiceInstance{Listen: listen}, - Keepbalance: arvados.SystemServiceInstance{Listen: listen}, - Keepproxy: arvados.SystemServiceInstance{Listen: listen}, - Keepstore: arvados.SystemServiceInstance{Listen: listen}, - Keepweb: arvados.SystemServiceInstance{Listen: listen}, - Nodemanager: arvados.SystemServiceInstance{Listen: listen}, - RailsAPI: arvados.SystemServiceInstance{Listen: listen}, - Websocket: arvados.SystemServiceInstance{Listen: listen}, - Workbench: arvados.SystemServiceInstance{Listen: listen}, + Controller: arvados.SystemServiceInstance{Listen: listen}, + DispatchCloud: arvados.SystemServiceInstance{Listen: listen}, + Keepbalance: arvados.SystemServiceInstance{Listen: listen}, + Keepproxy: arvados.SystemServiceInstance{Listen: listen}, + Keepstore: arvados.SystemServiceInstance{Listen: listen}, + Keepweb: arvados.SystemServiceInstance{Listen: listen}, + Nodemanager: arvados.SystemServiceInstance{Listen: listen}, + RailsAPI: arvados.SystemServiceInstance{Listen: listen}, + Websocket: arvados.SystemServiceInstance{Listen: listen}, + Workbench: arvados.SystemServiceInstance{Listen: listen}, } s.handler.ServeHTTP(s.resp, s.req) resp := s.checkOK(c) @@ -132,15 +133,16 @@ func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) { srvU, listenU := s.stubServer(&unhealthyHandler{}) defer srvU.Close() s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{ - Controller: arvados.SystemServiceInstance{Listen: listenH}, - Keepbalance: arvados.SystemServiceInstance{Listen: listenH}, - Keepproxy: arvados.SystemServiceInstance{Listen: listenH}, - Keepstore: arvados.SystemServiceInstance{Listen: listenH}, - Keepweb: arvados.SystemServiceInstance{Listen: listenH}, - Nodemanager: arvados.SystemServiceInstance{Listen: listenH}, - RailsAPI: arvados.SystemServiceInstance{Listen: listenH}, - Websocket: arvados.SystemServiceInstance{Listen: listenH}, - Workbench: arvados.SystemServiceInstance{Listen: listenH}, + Controller: arvados.SystemServiceInstance{Listen: listenH}, + DispatchCloud: arvados.SystemServiceInstance{Listen: listenH}, + Keepbalance: arvados.SystemServiceInstance{Listen: listenH}, + Keepproxy: arvados.SystemServiceInstance{Listen: listenH}, + Keepstore: arvados.SystemServiceInstance{Listen: listenH}, + Keepweb: arvados.SystemServiceInstance{Listen: listenH}, + Nodemanager: arvados.SystemServiceInstance{Listen: listenH}, + RailsAPI: arvados.SystemServiceInstance{Listen: listenH}, + Websocket: arvados.SystemServiceInstance{Listen: listenH}, + Workbench: arvados.SystemServiceInstance{Listen: listenH}, } s.handler.Config.Clusters["zzzzz"].NodeProfiles["127.0.0.1"] = arvados.NodeProfile{ Keepstore: arvados.SystemServiceInstance{Listen: listenU}, diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 29fad32bd1..092524d806 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -197,7 +197,7 @@ func (disp *Dispatcher) run() error { defer disp.sqCheck.Stop() if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 { - go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster) + go SlurmNodeTypeFeatureKludge(disp.cluster) } if _, err := daemon.SdNotify(false, "READY=1"); err != nil { diff --git a/services/crunch-dispatch-slurm/node_type.go b/services/crunch-dispatch-slurm/node_type.go new file mode 100644 index 0000000000..62a96932f3 --- /dev/null +++ b/services/crunch-dispatch-slurm/node_type.go @@ -0,0 +1,72 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package main + +import ( + "log" + "os/exec" + "strings" + "time" + + "git.curoverse.com/arvados.git/sdk/go/arvados" +) + +// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance +// type name as a valid feature name, even if no instances of that +// type have appeared yet. +// +// It takes advantage of some SLURM peculiarities: +// +// (1) A feature is valid after it has been offered by a node, even if +// it is no longer offered by any node. So, to make a feature name +// valid, we can add it to a dummy node ("compute0"), then remove it. +// +// (2) To test whether a set of feature names are valid without +// actually submitting a job, we can call srun --test-only with the +// desired features. +// +// SlurmNodeTypeFeatureKludge does a test-and-fix operation +// immediately, and then periodically, in case slurm restarts and +// forgets the list of valid features. It never returns (unless there +// are no node types configured, in which case it returns +// immediately), so it should generally be invoked with "go". +func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) { + if len(cc.InstanceTypes) == 0 { + return + } + var features []string + for _, it := range cc.InstanceTypes { + features = append(features, "instancetype="+it.Name) + } + for { + slurmKludge(features) + time.Sleep(2 * time.Second) + } +} + +const slurmDummyNode = "compute0" + +func slurmKludge(features []string) { + allFeatures := strings.Join(features, ",") + + cmd := exec.Command("sinfo", "--nodes="+slurmDummyNode, "--format=%f", "--noheader") + out, err := cmd.CombinedOutput() + if err != nil { + log.Printf("running %q %q: %s (output was %q)", cmd.Path, cmd.Args, err, out) + return + } + if string(out) == allFeatures+"\n" { + // Already configured correctly, nothing to do. + return + } + + log.Printf("configuring node %q with all node type features", slurmDummyNode) + cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+allFeatures) + log.Printf("running: %q %q", cmd.Path, cmd.Args) + out, err = cmd.CombinedOutput() + if err != nil { + log.Printf("error: scontrol: %s (output was %q)", err, out) + } +} diff --git a/services/crunch-run/background.go b/services/crunch-run/background.go new file mode 100644 index 0000000000..a508538370 --- /dev/null +++ b/services/crunch-run/background.go @@ -0,0 +1,237 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package main + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "strings" + "syscall" + "time" +) + +var ( + lockdir = "/var/lock" + lockprefix = "crunch-run-" + locksuffix = ".lock" +) + +// procinfo is saved in each process's lockfile. +type procinfo struct { + UUID string + PID int + Stdout string + Stderr string +} + +// Detach acquires a lock for the given uuid, and starts the current +// program as a child process (with -no-detach prepended to the given +// arguments so the child knows not to detach again). The lock is +// passed along to the child process. +func Detach(uuid string, args []string, stdout, stderr io.Writer) int { + return exitcode(stderr, detach(uuid, args, stdout, stderr)) +} +func detach(uuid string, args []string, stdout, stderr io.Writer) error { + lockfile, err := func() (*os.File, error) { + // We must hold the dir-level lock between + // opening/creating the lockfile and acquiring LOCK_EX + // on it, to avoid racing with the ListProcess's + // alive-checking and garbage collection. + dirlock, err := lockall() + if err != nil { + return nil, err + } + defer dirlock.Close() + lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700) + if err != nil { + return nil, err + } + err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + lockfile.Close() + return nil, err + } + return lockfile, nil + }() + if err != nil { + return err + } + defer lockfile.Close() + lockfile.Truncate(0) + + outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-") + if err != nil { + return err + } + defer outfile.Close() + errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-") + if err != nil { + os.Remove(outfile.Name()) + return err + } + defer errfile.Close() + + cmd := exec.Command(args[0], append([]string{"-no-detach"}, args[1:]...)...) + cmd.Stdout = outfile + cmd.Stderr = errfile + // Child inherits lockfile. + cmd.ExtraFiles = []*os.File{lockfile} + // Ensure child isn't interrupted even if we receive signals + // from parent (sshd) while sending lockfile content to + // caller. + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + err = cmd.Start() + if err != nil { + os.Remove(outfile.Name()) + os.Remove(errfile.Name()) + return err + } + + w := io.MultiWriter(stdout, lockfile) + err = json.NewEncoder(w).Encode(procinfo{ + UUID: uuid, + PID: cmd.Process.Pid, + Stdout: outfile.Name(), + Stderr: errfile.Name(), + }) + if err != nil { + os.Remove(outfile.Name()) + os.Remove(errfile.Name()) + return err + } + return nil +} + +// KillProcess finds the crunch-run process corresponding to the given +// uuid, and sends the given signal to it. It then waits up to 1 +// second for the process to die. It returns 0 if the process is +// successfully killed or didn't exist in the first place. +func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int { + return exitcode(stderr, kill(uuid, signal, stdout, stderr)) +} + +func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error { + path := filepath.Join(lockdir, lockprefix+uuid+locksuffix) + f, err := os.Open(path) + if os.IsNotExist(err) { + return nil + } else if err != nil { + return err + } + defer f.Close() + + var pi procinfo + err = json.NewDecoder(f).Decode(&pi) + if err != nil { + return fmt.Errorf("%s: %s\n", path, err) + } + + if pi.UUID != uuid || pi.PID == 0 { + return fmt.Errorf("%s: bogus procinfo: %+v", path, pi) + } + + proc, err := os.FindProcess(pi.PID) + if err != nil { + return err + } + + err = proc.Signal(signal) + for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) { + err = proc.Signal(syscall.Signal(0)) + } + if err == nil { + return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal) + } + fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err) + return nil +} + +// List UUIDs of active crunch-run processes. +func ListProcesses(stdout, stderr io.Writer) int { + return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error { + if info.IsDir() { + return filepath.SkipDir + } + if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) { + return nil + } + if info.Size() == 0 { + // race: process has opened/locked but hasn't yet written pid/uuid + return nil + } + + f, err := os.Open(path) + if err != nil { + return nil + } + defer f.Close() + + // Ensure other processes don't acquire this lockfile + // after we have decided it is abandoned but before we + // have deleted it. + dirlock, err := lockall() + if err != nil { + return err + } + err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH|syscall.LOCK_NB) + if err == nil { + // lockfile is stale + err := os.Remove(path) + dirlock.Close() + if err != nil { + fmt.Fprintln(stderr, err) + } + return nil + } + dirlock.Close() + + var pi procinfo + err = json.NewDecoder(f).Decode(&pi) + if err != nil { + fmt.Fprintf(stderr, "%s: %s\n", path, err) + return nil + } + if pi.UUID == "" || pi.PID == 0 { + fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi) + return nil + } + + fmt.Fprintln(stdout, pi.UUID) + return nil + })) +} + +// If err is nil, return 0 ("success"); otherwise, print err to stderr +// and return 1. +func exitcode(stderr io.Writer, err error) int { + if err != nil { + fmt.Fprintln(stderr, err) + return 1 + } + return 0 +} + +// Acquire a dir-level lock. Must be held while creating or deleting +// container-specific lockfiles, to avoid races during the intervals +// when those container-specific lockfiles are open but not locked. +// +// Caller releases the lock by closing the returned file. +func lockall() (*os.File, error) { + f, err := os.OpenFile(filepath.Join(lockdir, lockprefix+"all"+locksuffix), os.O_CREATE|os.O_RDWR, 0700) + if err != nil { + return nil, err + } + err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX) + if err != nil { + f.Close() + return nil, err + } + return f, nil +} diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 7d933632c9..2b9a119581 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -1736,6 +1736,10 @@ func main() { cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)") cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates") + detach := flag.Bool("detach", false, "Detach from parent process and run in the background") + sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)") + kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") + list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes") enableNetwork := flag.String("container-enable-networking", "default", `Specify if networking should be enabled for container. One of 'default', 'always': default: only enable networking if container requests it. @@ -1747,8 +1751,30 @@ func main() { memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container") getVersion := flag.Bool("version", false, "Print version information and exit.") flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") + + ignoreDetachFlag := false + if len(os.Args) > 1 && os.Args[1] == "-no-detach" { + // This process was invoked by a parent process, which + // has passed along its own arguments, including + // -detach, after the leading -no-detach flag. Strip + // the leading -no-detach flag (it's not recognized by + // flag.Parse()) and ignore the -detach flag that + // comes later. + os.Args = append([]string{os.Args[0]}, os.Args[2:]...) + ignoreDetachFlag = true + } + flag.Parse() + switch { + case *detach && !ignoreDetachFlag: + os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr)) + case *kill >= 0: + os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr)) + case *list: + os.Exit(ListProcesses(os.Stdout, os.Stderr)) + } + // Print version information if requested if *getVersion { fmt.Printf("crunch-run %s\n", version) @@ -1756,6 +1782,7 @@ func main() { } log.Printf("crunch-run %s started", version) + time.Sleep(*sleep) containerId := flag.Arg(0) diff --git a/vendor/vendor.json b/vendor/vendor.json index c0e6fee5db..ec296d21d1 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -336,6 +336,12 @@ "revision": "b8bc1bf767474819792c23f32d8286a45736f1c6", "revisionTime": "2016-12-03T19:45:07Z" }, + { + "checksumSHA1": "ewGq4nGalpCQOHcmBTdAEQx1wW0=", + "path": "github.com/mitchellh/mapstructure", + "revision": "bb74f1db0675b241733089d5a1faa5dd8b0ef57b", + "revisionTime": "2018-05-11T14:21:26Z" + }, { "checksumSHA1": "OFNit1Qx2DdWhotfREKodDNUwCM=", "path": "github.com/opencontainers/go-digest",