14360: Initial version of dispatch-cloud.
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 25 Oct 2018 05:34:35 +0000 (01:34 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Fri, 26 Oct 2018 04:09:41 +0000 (00:09 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

43 files changed:
.licenseignore
build/run-build-packages.sh
build/run-tests.sh
cmd/arvados-server/cmd.go
cmd/arvados-server/crunch-dispatch-cloud.service [new file with mode: 0644]
lib/cloud/interfaces.go [new file with mode: 0644]
lib/cmd/cmd.go
lib/dispatchcloud/cmd.go [new file with mode: 0644]
lib/dispatchcloud/container/queue.go [new file with mode: 0644]
lib/dispatchcloud/dispatcher.go [new file with mode: 0644]
lib/dispatchcloud/dispatcher_test.go [new file with mode: 0644]
lib/dispatchcloud/driver.go [new file with mode: 0644]
lib/dispatchcloud/instance_set_proxy.go [new file with mode: 0644]
lib/dispatchcloud/logger.go [new file with mode: 0644]
lib/dispatchcloud/node_size.go
lib/dispatchcloud/readme.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/fix_stale_locks.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/gocheck_test.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/interfaces.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/map.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/map_test.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/sync.go [new file with mode: 0644]
lib/dispatchcloud/ssh_executor/executor.go [new file with mode: 0644]
lib/dispatchcloud/ssh_executor/executor_test.go [new file with mode: 0644]
lib/dispatchcloud/test/doc.go [new file with mode: 0644]
lib/dispatchcloud/test/fixtures.go [new file with mode: 0644]
lib/dispatchcloud/test/lame_instance_set.go [new file with mode: 0644]
lib/dispatchcloud/test/queue.go [new file with mode: 0644]
lib/dispatchcloud/test/ssh_service.go [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_dispatch [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_dispatch.pub [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_vm [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_vm.pub [new file with mode: 0644]
lib/dispatchcloud/test/stub_driver.go [new file with mode: 0644]
lib/dispatchcloud/worker/gocheck_test.go [new file with mode: 0644]
lib/dispatchcloud/worker/pool.go [new file with mode: 0644]
lib/dispatchcloud/worker/pool_test.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go [new file with mode: 0644]
sdk/go/arvados/config.go
sdk/go/arvados/container.go
services/crunch-run/background.go [new file with mode: 0644]
services/crunch-run/crunchrun.go
vendor/vendor.json

index 51a1e7cbd2f0aabca972527475630923f1f1ef75..a0127cfa30f09a986b4dad176ae9358e573209fc 100644 (file)
@@ -69,3 +69,4 @@ sdk/R/NAMESPACE
 sdk/R/.Rbuildignore
 sdk/R/ArvadosR.Rproj
 *.Rd
+lib/dispatchcloud/test/sshkey_*
index fe01e4b2ef2528222dfff7d097729b9b5c773b30..1b486b40b6ba8e3beb7899b39f0bb1892234bb31 100755 (executable)
@@ -294,6 +294,9 @@ package_go_binary cmd/arvados-server arvados-server \
     "Arvados server daemons"
 package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
+# No package until #14325
+#package_go_binary cmd/arvados-server crunch-dispatch-cloud \
+#    "Arvados cluster cloud dispatch"
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
index 9674ad5d4d28ccaeb0c2904366ab4c7883da37bd..1b42ea21c4cc0ae7ae44c84ad2cb8ecc52325c77 100755 (executable)
@@ -77,6 +77,10 @@ lib/cmd
 lib/controller
 lib/crunchstat
 lib/dispatchcloud
+lib/dispatchcloud/container
+lib/dispatchcloud/scheduler
+lib/dispatchcloud/ssh_executor
+lib/dispatchcloud/worker
 services/api
 services/arv-git-httpd
 services/crunchstat
@@ -926,6 +930,10 @@ gostuff=(
     lib/controller
     lib/crunchstat
     lib/dispatchcloud
+    lib/dispatchcloud/container
+    lib/dispatchcloud/scheduler
+    lib/dispatchcloud/ssh_executor
+    lib/dispatchcloud/worker
     sdk/go/arvados
     sdk/go/arvadosclient
     sdk/go/auth
index 1af3745df0c3a4c54d296e848532c1d19c124e43..cd15d25dda760a41c427b8bfd4b621fb43e2130a 100644 (file)
@@ -9,6 +9,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/controller"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud"
 )
 
 var (
@@ -18,7 +19,8 @@ var (
                "-version":  cmd.Version(version),
                "--version": cmd.Version(version),
 
-               "controller": controller.Command,
+               "controller":     controller.Command,
+               "dispatch-cloud": dispatchcloud.Command,
        })
 )
 
diff --git a/cmd/arvados-server/crunch-dispatch-cloud.service b/cmd/arvados-server/crunch-dispatch-cloud.service
new file mode 100644 (file)
index 0000000..f8d71c9
--- /dev/null
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/crunch-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644 (file)
index 0000000..e3a0725
--- /dev/null
@@ -0,0 +1,179 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+       "io"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by an InstanceSet when the
+// cloud service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+       // Time before which the caller should expect requests to
+       // fail.
+       EarliestRetry() time.Time
+       error
+}
+
+// A QuotaError should be returned by an InstanceSet when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+       // If true, don't create more instances until some existing
+       // instances are destroyed. If false, don't handle the error
+       // as a quota error.
+       IsQuotaError() bool
+       error
+}
+
+type InstanceSetID string
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// An Executor executes commands on an ExecutorTarget.
+type Executor interface {
+       // Update the set of private keys used to authenticate to
+       // targets.
+       SetSigners(...ssh.Signer)
+
+       // Set the target used for subsequent command executions.
+       SetTarget(ExecutorTarget)
+
+       // Return the current target.
+       Target() ExecutorTarget
+
+       // Execute a shell command and return the resulting stdout and
+       // stderr. stdin can be nil.
+       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+}
+
+// An ExecutorTarget is a remote command execution service.
+type ExecutorTarget interface {
+       // SSH server hostname or IP address, or empty string if
+       // unknown while instance is booting.
+       Address() string
+
+       // Return nil if the given public key matches the instance's
+       // SSH server key. If the provided Dialer is not nil,
+       // VerifyHostKey can use it to make outgoing network
+       // connections from the instance -- e.g., to use the cloud's
+       // "this instance's metadata" API.
+       VerifyHostKey(ssh.PublicKey, *ssh.Client) error
+}
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+       ExecutorTarget
+
+       // ID returns the provider's instance ID. It must be stable
+       // for the life of the instance.
+       ID() InstanceID
+
+       // String typically returns the cloud-provided instance ID.
+       String() string
+
+       // Cloud provider's "instance type" ID. Matches a ProviderType
+       // in the cluster's InstanceTypes configuration.
+       ProviderType() string
+
+       // Get current tags
+       Tags() InstanceTags
+
+       // Replace tags with the given tags
+       SetTags(InstanceTags) error
+
+       // Shut down the node
+       Destroy() error
+}
+
+// An InstanceSet manages a set of VM instances created by an elastic
+// cloud provider like AWS, GCE, or Azure.
+//
+// All public methods of an InstanceSet, and all public methods of the
+// instances it returns, are goroutine safe.
+type InstanceSet interface {
+       // Create a new instance. If supported by the driver, add the
+       // provided public key to /root/.ssh/authorized_keys.
+       //
+       // The returned error should implement RateLimitError and
+       // QuotaError where applicable.
+       Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+       // Return all instances, including ones that are booting or
+       // shutting down. Optionally, filter out nodes that don't have
+       // all of the given InstanceTags (the caller will ignore these
+       // anyway).
+       //
+       // An instance returned by successive calls to Instances() may
+       // -- but does not need to -- be represented by the same
+       // Instance object each time. Thus, the caller is responsible
+       // for de-duplicating the returned instances by comparing the
+       // InstanceIDs returned by the instances' ID() methods.
+       Instances(InstanceTags) ([]Instance, error)
+
+       // Stop any background tasks and release other resources.
+       Stop()
+}
+
+// A Driver returns an InstanceSet that uses the given InstanceSetID
+// and driver-dependent configuration parameters.
+//
+// The supplied id will be of the form "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+// where each z can be any alphanum. The returned InstanceSet must use
+// this id to tag long-lived cloud resources that it creates, and must
+// assume control of any existing resources that are tagged with the
+// same id. Tagging can be accomplished by including the ID in
+// resource names, using the cloud provider's tagging feature, or any
+// other mechanism. The tags must be visible to another instance of
+// the same driver running on a different host.
+//
+// The returned InstanceSet must ignore existing resources that are
+// visible but not tagged with the given id, except that it should log
+// a summary of such resources -- only once -- when it starts
+// up. Thus, two identically configured InstanceSets running on
+// different hosts with different ids should log about the existence
+// of each other's resources at startup, but will not interfere with
+// each other.
+//
+// Example:
+//
+//     type exampleInstanceSet struct {
+//             ownID     string
+//             AccessKey string
+//     }
+//
+//     type exampleDriver struct {}
+//
+//     func (*exampleDriver) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+//             var is exampleInstanceSet
+//             if err := mapstructure.Decode(config, &is); err != nil {
+//                     return nil, err
+//             }
+//             is.ownID = id
+//             return &is, nil
+//     }
+//
+//     var _ = registerCloudDriver("example", &exampleDriver{})
+type Driver interface {
+       InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)
+}
+
+// DriverFunc makes a Driver using the provided function as its
+// InstanceSet method. This is similar to http.HandlerFunc.
+func DriverFunc(fn func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)) Driver {
+       return driverFunc(fn)
+}
+
+type driverFunc func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)
+
+func (df driverFunc) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+       return df(config, id)
+}
index 8c65cf7acf1b6dd7bc02660464be06ea07cc3daa..9292ef7e5ff5b3afb6012833299d9f89a7ea346c 100644 (file)
@@ -36,8 +36,9 @@ func (v Version) RunCommand(prog string, args []string, stdin io.Reader, stdout,
        return 0
 }
 
-// Multi is a Handler that looks up its first argument in a map, and
-// invokes the resulting Handler with the remaining args.
+// Multi is a Handler that looks up its first argument in a map (after
+// stripping any "arvados-" or "crunch-" prefix), and invokes the
+// resulting Handler with the remaining args.
 //
 // Example:
 //
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644 (file)
index 0000000..a5a11d2
--- /dev/null
@@ -0,0 +1,17 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "git.curoverse.com/arvados.git/lib/cmd"
+       "git.curoverse.com/arvados.git/lib/service"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
+       return &dispatcher{Cluster: cluster}
+}
diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go
new file mode 100644 (file)
index 0000000..17a3825
--- /dev/null
@@ -0,0 +1,323 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package container
+
+import (
+       "io"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
+
+// An APIClient performs Arvados API requests. It is typically an
+// *arvados.Client.
+type APIClient interface {
+       RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
+}
+
+// A QueueEnt is an entry in the queue, consisting of a container
+// record and the instance type that should be used to run it.
+type QueueEnt struct {
+       // The container to run. Only the UUID, State, Priority, and
+       // RuntimeConstraints fields are populated.
+       Container    arvados.Container
+       InstanceType arvados.InstanceType
+}
+
+// String implements fmt.Stringer by returning the queued container's
+// UUID.
+func (c *QueueEnt) String() string {
+       return c.Container.UUID
+}
+
+// A Queue is an interface to an Arvados cluster's container
+// database. It presents only the containers that are eligible to be
+// run by, are already being run by, or have recently been run by the
+// present dispatcher.
+//
+// The Entries, Get, and Forget methods do not block: they return
+// immediately, using cached data.
+//
+// The updating methods (Cancel, Lock, Unlock, Update) do block: they
+// return only after the operation has completed.
+//
+// A Queue's Update method should be called periodically to keep the
+// cache up to date.
+type Queue struct {
+       logger     logrus.FieldLogger
+       reg        *prometheus.Registry
+       chooseType typeChooser
+       client     APIClient
+
+       auth      *arvados.APIClientAuthorization
+       current   map[string]QueueEnt
+       updated   time.Time
+       mtx       sync.Mutex
+       keeplocal map[string]struct{}
+}
+
+// NewQueue returns a new Queue. When a new container appears in the
+// Arvados cluster's queue during Update, chooseType will be called to
+// assign an appropriate arvados.InstanceType for the queue entry.
+func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
+       return &Queue{
+               logger:     logger,
+               reg:        reg,
+               chooseType: chooseType,
+               client:     client,
+               current:    map[string]QueueEnt{},
+       }
+}
+
+// Forget drops the specified container from the cache. It should be
+// called on finalized containers to avoid leaking memory over
+// time. It is a no-op if the indicated container is not in a
+// finalized state.
+func (cq *Queue) Forget(uuid string) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       ctr := cq.current[uuid].Container
+       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+               delete(cq.current, uuid)
+       }
+}
+
+// Get returns the (partial) Container record for the specified
+// container. Like a map lookup, its second return value is false if
+// the specified container is not in the Queue.
+func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       if ctr, ok := cq.current[uuid]; !ok {
+               return arvados.Container{}, false
+       } else {
+               return ctr.Container, true
+       }
+}
+
+// Entries returns all cache entries, keyed by container UUID.
+//
+// The returned threshold indicates the maximum age of any cached data
+// returned in the map. This makes it possible for a scheduler to
+// determine correctly the outcome of a remote process that updates
+// container state. It must first wait for the remote process to exit,
+// then wait for the Queue to start and finish its next Update --
+// i.e., it must wait until threshold > timeProcessExited.
+func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       entries = make(map[string]QueueEnt, len(cq.current))
+       for uuid, ctr := range cq.current {
+               entries[uuid] = ctr
+       }
+       threshold = cq.updated
+       return
+}
+
+// Update refreshes the cache from the Arvados API. It adds newly
+// queued containers, and updates the state of previously queued
+// containers.
+func (cq *Queue) Update() error {
+       cq.mtx.Lock()
+       cq.keeplocal = map[string]struct{}{}
+       updateStarted := time.Now()
+       cq.mtx.Unlock()
+
+       next, err := cq.poll()
+       if err != nil {
+               return err
+       }
+
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       for uuid, ctr := range next {
+               if _, keep := cq.keeplocal[uuid]; keep {
+                       continue
+               }
+               if cur, ok := cq.current[uuid]; !ok {
+                       cq.addEnt(uuid, *ctr)
+               } else {
+                       cur.Container = *ctr
+                       cq.current[uuid] = cur
+               }
+       }
+       for uuid := range cq.current {
+               if _, keep := cq.keeplocal[uuid]; keep {
+                       continue
+               } else if _, keep = next[uuid]; keep {
+                       continue
+               } else {
+                       delete(cq.current, uuid)
+               }
+       }
+       cq.keeplocal = nil
+       cq.updated = updateStarted
+       return nil
+}
+
+func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
+       it, err := cq.chooseType(&ctr)
+       if err != nil {
+               // FIXME: throttle warnings, cancel after timeout
+               cq.logger.Warnf("cannot run %s", &ctr)
+               return
+       }
+       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+}
+
+// Lock acquires the dispatch lock for the given container.
+func (cq *Queue) Lock(uuid string) error {
+       return cq.apiUpdate(uuid, "lock")
+}
+
+// Unlock releases the dispatch lock for the given container.
+func (cq *Queue) Unlock(uuid string) error {
+       return cq.apiUpdate(uuid, "unlock")
+}
+
+// Cancel cancels the given container.
+func (cq *Queue) Cancel(uuid string) error {
+       return cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+               "container": {"state": arvados.ContainerStateCancelled},
+       })
+}
+
+func (cq *Queue) apiUpdate(uuid, action string) error {
+       var resp arvados.Container
+       err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+       if err != nil {
+               return err
+       }
+
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       if cq.keeplocal != nil {
+               cq.keeplocal[uuid] = struct{}{}
+       }
+       if ent, ok := cq.current[uuid]; !ok {
+               cq.addEnt(uuid, resp)
+       } else {
+               ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+               cq.current[uuid] = ent
+       }
+       return nil
+}
+
+func (cq *Queue) poll() (map[string]*arvados.Container, error) {
+       cq.mtx.Lock()
+       size := len(cq.current)
+       auth := cq.auth
+       cq.mtx.Unlock()
+
+       if auth == nil {
+               auth = &arvados.APIClientAuthorization{}
+               err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
+               if err != nil {
+                       return nil, err
+               }
+               cq.mtx.Lock()
+               cq.auth = auth
+               cq.mtx.Unlock()
+       }
+
+       next := make(map[string]*arvados.Container, size)
+       apply := func(updates []arvados.Container) {
+               for _, upd := range updates {
+                       if next[upd.UUID] == nil {
+                               next[upd.UUID] = &arvados.Container{}
+                       }
+                       *next[upd.UUID] = upd
+               }
+       }
+       selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
+       limitParam := 1000
+
+       mine, err := cq.fetchAll(arvados.ResourceListParams{
+               Select:  selectParam,
+               Order:   "uuid",
+               Limit:   &limitParam,
+               Count:   "none",
+               Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
+       })
+       if err != nil {
+               return nil, err
+       }
+       apply(mine)
+
+       avail, err := cq.fetchAll(arvados.ResourceListParams{
+               Select:  selectParam,
+               Order:   "uuid",
+               Limit:   &limitParam,
+               Count:   "none",
+               Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
+       })
+       if err != nil {
+               return nil, err
+       }
+       apply(avail)
+
+       var missing []string
+       cq.mtx.Lock()
+       for uuid, ent := range cq.current {
+               if next[uuid] == nil &&
+                       ent.Container.State != arvados.ContainerStateCancelled &&
+                       ent.Container.State != arvados.ContainerStateComplete {
+                       missing = append(missing, uuid)
+               }
+       }
+       cq.mtx.Unlock()
+
+       for i, page := 0, 20; i < len(missing); i += page {
+               batch := missing[i:]
+               if len(batch) > page {
+                       batch = batch[:page]
+               }
+               ended, err := cq.fetchAll(arvados.ResourceListParams{
+                       Select:  selectParam,
+                       Order:   "uuid",
+                       Count:   "none",
+                       Filters: []arvados.Filter{{"uuid", "in", batch}},
+               })
+               if err != nil {
+                       return nil, err
+               }
+               apply(ended)
+       }
+       return next, nil
+}
+
+func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+       var results []arvados.Container
+       params := initialParams
+       params.Offset = 0
+       for {
+               // This list variable must be a new one declared
+               // inside the loop: otherwise, items in the API
+               // response would get deep-merged into the items
+               // loaded in previous iterations.
+               var list arvados.ContainerList
+
+               err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+               if err != nil {
+                       return nil, err
+               }
+               if len(list.Items) == 0 {
+                       break
+               }
+
+               results = append(results, list.Items...)
+               if len(params.Order) == 1 && params.Order == "uuid" {
+                       params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
+               } else {
+                       params.Offset += len(list.Items)
+               }
+       }
+       return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644 (file)
index 0000000..e422b39
--- /dev/null
@@ -0,0 +1,199 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/scheduler"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       "golang.org/x/crypto/ssh"
+)
+
+const (
+       defaultPollInterval = time.Second
+)
+
+type containerQueue interface {
+       scheduler.ContainerQueue
+       Update() error
+}
+
+type pool interface {
+       scheduler.WorkerPool
+       View() []worker.View
+}
+
+type dispatcher struct {
+       Cluster       *arvados.Cluster
+       InstanceSetID cloud.InstanceSetID
+
+       logger       logrus.FieldLogger
+       reg          *prometheus.Registry
+       instanceSet  cloud.InstanceSet
+       pool         pool
+       queue        containerQueue
+       httpHandler  http.Handler
+       pollInterval time.Duration
+       sshKey       ssh.Signer
+
+       setupOnce sync.Once
+       stop      chan struct{}
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       disp.setupOnce.Do(disp.setup)
+       disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+       disp.setupOnce.Do(disp.setup)
+       return nil
+}
+
+// Stop dispatching containers and release resources. Typically used
+// in tests.
+func (disp *dispatcher) Close() {
+       disp.setupOnce.Do(disp.setup)
+       select {
+       case disp.stop <- struct{}{}:
+       default:
+       }
+}
+
+// Make a worker.Executor for the given instance.
+func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
+       exr := ssh_executor.New(inst)
+       exr.SetSigners(disp.sshKey)
+       return exr
+}
+
+func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
+       return ChooseInstanceType(disp.Cluster, ctr)
+}
+
+func (disp *dispatcher) setup() {
+       disp.initialize()
+       go disp.run()
+}
+
+func (disp *dispatcher) initialize() {
+       arvClient := arvados.NewClientFromEnv()
+       if disp.InstanceSetID == "" {
+               if strings.HasPrefix(arvClient.AuthToken, "v2/") {
+                       disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
+               } else {
+                       // Use some other string unique to this token
+                       // that doesn't reveal the token itself.
+                       disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
+               }
+       }
+       disp.stop = make(chan struct{}, 1)
+       disp.logger = logrus.StandardLogger()
+
+       if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
+               disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
+       } else {
+               disp.sshKey = key
+       }
+
+       instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID)
+       if err != nil {
+               disp.logger.Fatalf("error initializing driver: %s", err)
+       }
+       disp.instanceSet = &instanceSetProxy{instanceSet}
+       disp.reg = prometheus.NewRegistry()
+       disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+       disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
+
+       mux := http.NewServeMux()
+       mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers)
+       mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances)
+       metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
+               ErrorLog: disp.logger,
+       })
+       mux.Handle("/metrics", metricsH)
+       mux.Handle("/metrics.json", metricsH)
+       disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+
+       if d := disp.Cluster.Dispatch.PollInterval; d > 0 {
+               disp.pollInterval = time.Duration(d)
+       } else {
+               disp.pollInterval = defaultPollInterval
+       }
+}
+
+func (disp *dispatcher) run() {
+       defer disp.instanceSet.Stop()
+
+       t0 := time.Now()
+       disp.logger.Infof("FixStaleLocks starting.")
+       scheduler.FixStaleLocks(disp.logger, disp.queue, disp.pool, time.Duration(disp.Cluster.Dispatch.StaleLockTimeout))
+       disp.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
+
+       wp := disp.pool.Subscribe()
+       defer disp.pool.Unsubscribe(wp)
+       poll := time.NewTicker(disp.pollInterval)
+       for {
+               scheduler.Map(disp.logger, disp.queue, disp.pool)
+               scheduler.Sync(disp.logger, disp.queue, disp.pool)
+               select {
+               case <-disp.stop:
+                       return
+               case <-wp:
+               case <-poll.C:
+                       err := disp.queue.Update()
+                       if err != nil {
+                               disp.logger.Errorf("error updating queue: %s", err)
+                       }
+               }
+       }
+}
+
+// Management API: all active and queued containers.
+func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
+       if r.Method != "GET" {
+               httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+       var resp struct {
+               Items []container.QueueEnt
+       }
+       qEntries, _ := disp.queue.Entries()
+       for _, ent := range qEntries {
+               resp.Items = append(resp.Items, ent)
+       }
+       json.NewEncoder(w).Encode(resp)
+}
+
+// Management API: all active instances (cloud VMs).
+func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
+       if r.Method != "GET" {
+               httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+       var resp struct {
+               Items []worker.View
+       }
+       resp.Items = disp.pool.View()
+       json.NewEncoder(w).Encode(resp)
+}
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
new file mode 100644 (file)
index 0000000..e6f536d
--- /dev/null
@@ -0,0 +1,370 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "math/rand"
+       "net/http/httptest"
+       "os"
+       "regexp"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&DispatcherSuite{})
+
+// fakeCloud provides an exec method that can be used as a
+// test.StubExecFunc. It calls the provided makeVM func when called
+// with a previously unseen instance ID. Calls to exec are passed on
+// to the *fakeVM for the appropriate instance ID.
+type fakeCloud struct {
+       queue      *test.Queue
+       makeVM     func(cloud.Instance) *fakeVM
+       onComplete func(string)
+       onCancel   func(string)
+       vms        map[cloud.InstanceID]*fakeVM
+       sync.Mutex
+}
+
+func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       fc.Lock()
+       fvm, ok := fc.vms[inst.ID()]
+       if !ok {
+               if fc.vms == nil {
+                       fc.vms = make(map[cloud.InstanceID]*fakeVM)
+               }
+               fvm = fc.makeVM(inst)
+               fc.vms[inst.ID()] = fvm
+       }
+       fc.Unlock()
+       return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
+}
+
+// fakeVM is a fake VM with configurable delays and failure modes.
+type fakeVM struct {
+       boot                 time.Time
+       broken               time.Time
+       crunchRunMissing     bool
+       crunchRunCrashRate   float64
+       crunchRunDetachDelay time.Duration
+       ctrExit              int
+       running              map[string]bool
+       completed            []string
+       sync.Mutex
+}
+
+func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
+       if eta := fvm.boot.Sub(time.Now()); eta > 0 {
+               fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
+               return 1
+       }
+       if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
+               fmt.Fprintf(stderr, "cannot fork\n")
+               return 2
+       }
+       if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
+               fmt.Fprint(stderr, "crunch-run: command not found\n")
+               return 1
+       }
+       if strings.HasPrefix(command, "crunch-run --detach ") {
+               fvm.Lock()
+               if fvm.running == nil {
+                       fvm.running = map[string]bool{}
+               }
+               fvm.running[uuid] = true
+               fvm.Unlock()
+               time.Sleep(fvm.crunchRunDetachDelay)
+               fmt.Fprintf(stderr, "starting %s\n", uuid)
+               logger := logrus.WithField("ContainerUUID", uuid)
+               logger.Printf("[test] starting crunch-run stub")
+               go func() {
+                       crashluck := rand.Float64()
+                       ctr, ok := queue.Get(uuid)
+                       if !ok {
+                               logger.Print("[test] container not in queue")
+                               return
+                       }
+                       if crashluck > fvm.crunchRunCrashRate/2 {
+                               time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
+                               ctr.State = arvados.ContainerStateRunning
+                               queue.Notify(ctr)
+                       }
+
+                       time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
+                       fvm.Lock()
+                       _, running := fvm.running[uuid]
+                       fvm.Unlock()
+                       if !running {
+                               logger.Print("[test] container was killed")
+                               return
+                       }
+                       if crashluck < fvm.crunchRunCrashRate {
+                               logger.Print("[test] crashing crunch-run stub")
+                               if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
+                                       onCancel(uuid)
+                               }
+                       } else {
+                               ctr.State = arvados.ContainerStateComplete
+                               ctr.ExitCode = fvm.ctrExit
+                               queue.Notify(ctr)
+                               if onComplete != nil {
+                                       onComplete(uuid)
+                               }
+                       }
+                       logger.Print("[test] exiting crunch-run stub")
+                       fvm.Lock()
+                       defer fvm.Unlock()
+                       delete(fvm.running, uuid)
+               }()
+               return 0
+       }
+       if command == "crunch-run --list" {
+               fvm.Lock()
+               defer fvm.Unlock()
+               for uuid := range fvm.running {
+                       fmt.Fprintf(stdout, "%s\n", uuid)
+               }
+               return 0
+       }
+       if strings.HasPrefix(command, "crunch-run --kill ") {
+               fvm.Lock()
+               defer fvm.Unlock()
+               if fvm.running[uuid] {
+                       delete(fvm.running, uuid)
+               } else {
+                       fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+               }
+               return 0
+       }
+       if command == "true" {
+               return 0
+       }
+       fmt.Fprintf(stderr, "%q: command not found", command)
+       return 1
+}
+
+type DispatcherSuite struct {
+       cluster     *arvados.Cluster
+       instanceSet *test.LameInstanceSet
+       stubDriver  *test.StubDriver
+       disp        *dispatcher
+}
+
+func (s *DispatcherSuite) SetUpSuite(c *check.C) {
+       if os.Getenv("ARVADOS_DEBUG") != "" {
+               logrus.StandardLogger().SetLevel(logrus.DebugLevel)
+       }
+}
+
+func (s *DispatcherSuite) SetUpTest(c *check.C) {
+       dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
+       dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
+       c.Assert(err, check.IsNil)
+
+       _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
+       s.stubDriver = &test.StubDriver{
+               Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
+                       c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
+                       return 1
+               },
+               HostKey:        hostpriv,
+               AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+       }
+
+       s.cluster = &arvados.Cluster{
+               CloudVMs: arvados.CloudVMs{
+                       Driver:          "test",
+                       SyncInterval:    arvados.Duration(10 * time.Millisecond),
+                       TimeoutIdle:     arvados.Duration(30 * time.Millisecond),
+                       TimeoutBooting:  arvados.Duration(30 * time.Millisecond),
+                       TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
+                       TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+               },
+               Dispatch: arvados.Dispatch{
+                       PrivateKey:         dispatchprivraw,
+                       PollInterval:       arvados.Duration(5 * time.Millisecond),
+                       ProbeInterval:      arvados.Duration(5 * time.Millisecond),
+                       MaxProbesPerSecond: 1000,
+               },
+               InstanceTypes: arvados.InstanceTypeMap{
+                       test.InstanceType(1).Name:  test.InstanceType(1),
+                       test.InstanceType(2).Name:  test.InstanceType(2),
+                       test.InstanceType(3).Name:  test.InstanceType(3),
+                       test.InstanceType(4).Name:  test.InstanceType(4),
+                       test.InstanceType(6).Name:  test.InstanceType(6),
+                       test.InstanceType(8).Name:  test.InstanceType(8),
+                       test.InstanceType(16).Name: test.InstanceType(16),
+               },
+               NodeProfiles: map[string]arvados.NodeProfile{
+                       "*": {
+                               Controller:    arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
+                               DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
+                       },
+               },
+       }
+       s.disp = &dispatcher{Cluster: s.cluster}
+       // Test cases can modify s.cluster before calling
+       // initialize(), and then modify private state before calling
+       // go run().
+}
+
+func (s *DispatcherSuite) TearDownTest(c *check.C) {
+       s.disp.Close()
+}
+
+func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
+       drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       queue := &test.Queue{
+               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                       return ChooseInstanceType(s.cluster, ctr)
+               },
+       }
+       for i := 0; i < 200; i++ {
+               queue.Containers = append(queue.Containers, arvados.Container{
+                       UUID:     test.ContainerUUID(i + 1),
+                       State:    arvados.ContainerStateQueued,
+                       Priority: int64(i%20 + 1),
+                       RuntimeConstraints: arvados.RuntimeConstraints{
+                               RAM:   int64(i%3+1) << 30,
+                               VCPUs: i%8 + 1,
+                       },
+               })
+       }
+       s.disp.queue = queue
+
+       var mtx sync.Mutex
+       done := make(chan struct{})
+       waiting := map[string]struct{}{}
+       for _, ctr := range queue.Containers {
+               waiting[ctr.UUID] = struct{}{}
+       }
+       onComplete := func(uuid string) {
+               mtx.Lock()
+               defer mtx.Unlock()
+               if _, ok := waiting[uuid]; !ok {
+                       c.Errorf("container completed twice: %s", uuid)
+               }
+               delete(waiting, uuid)
+               if len(waiting) == 0 {
+                       close(done)
+               }
+       }
+       n := 0
+       fc := &fakeCloud{
+               queue: queue,
+               makeVM: func(inst cloud.Instance) *fakeVM {
+                       n++
+                       fvm := &fakeVM{
+                               boot:                 time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
+                               crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
+                               ctrExit:              int(rand.Uint32() & 0x3),
+                       }
+                       switch n % 7 {
+                       case 0:
+                               fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
+                       case 1:
+                               fvm.crunchRunMissing = true
+                       default:
+                               fvm.crunchRunCrashRate = 0.1
+                       }
+                       return fvm
+               },
+               onComplete: onComplete,
+               onCancel:   onComplete,
+       }
+       s.stubDriver.Exec = fc.exec
+
+       start := time.Now()
+       go s.disp.run()
+       err := s.disp.CheckHealth()
+       c.Check(err, check.IsNil)
+
+       select {
+       case <-done:
+               c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
+       case <-time.After(10 * time.Second):
+               c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+       }
+
+       deadline := time.Now().Add(time.Second)
+       for range time.NewTicker(10 * time.Millisecond).C {
+               insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
+               c.Check(err, check.IsNil)
+               queue.Update()
+               ents, _ := queue.Entries()
+               if len(ents) == 0 && len(insts) == 0 {
+                       break
+               }
+               if time.Now().After(deadline) {
+                       c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
+               }
+       }
+}
+
+func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
+       var lameSet test.LameInstanceSet
+       drivers["test"] = cloud.DriverFunc(func(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
+               return &lameSet, nil
+       })
+
+       type instance struct {
+               Instance             string
+               WorkerState          string
+               Price                float64
+               LastContainerUUID    string
+               ArvadosInstanceType  string
+               ProviderInstanceType string
+       }
+       type instancesResponse struct {
+               Items []instance
+       }
+       getInstances := func() instancesResponse {
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               var sr instancesResponse
+               err := json.Unmarshal(resp.Body.Bytes(), &sr)
+               c.Check(err, check.IsNil)
+               return sr
+       }
+
+       sr := getInstances()
+       c.Check(len(sr.Items), check.Equals, 0)
+
+       ch := s.disp.pool.Subscribe()
+       defer s.disp.pool.Unsubscribe(ch)
+       err := s.disp.pool.Create(arvados.InstanceType{
+               Name:         "a1.small-1",
+               ProviderType: "a1.small",
+               VCPUs:        1,
+               RAM:          1 << 30,
+               Price:        0.12,
+       })
+       c.Check(err, check.IsNil)
+       <-ch
+
+       sr = getInstances()
+       c.Assert(len(sr.Items), check.Equals, 1)
+       c.Check(sr.Items[0].Instance, check.Matches, "lame-.*")
+       c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
+       c.Check(sr.Items[0].Price, check.Equals, 0.12)
+       c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
+       c.Check(sr.Items[0].ProviderInstanceType, check.Equals, "a1.small")
+       c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, "a1.small-1")
+}
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
new file mode 100644 (file)
index 0000000..295fd61
--- /dev/null
@@ -0,0 +1,22 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var drivers = map[string]cloud.Driver{}
+
+func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID) (cloud.InstanceSet, error) {
+       driver, ok := drivers[cluster.CloudVMs.Driver]
+       if !ok {
+               return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
+       }
+       return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID)
+}
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
new file mode 100644 (file)
index 0000000..e728b67
--- /dev/null
@@ -0,0 +1,25 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+type instanceSetProxy struct {
+       cloud.InstanceSet
+}
+
+func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
+       // TODO: return if Create failed recently with a RateLimitError or QuotaError
+       return is.InstanceSet.Create(it, id, tags, pk)
+}
+
+func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+       // TODO: return if Instances failed recently with a RateLimitError
+       return is.InstanceSet.Instances(tags)
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644 (file)
index 0000000..90bb6ca
--- /dev/null
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "sync"
+       "time"
+)
+
+type logger interface {
+       Printf(string, ...interface{})
+       Warnf(string, ...interface{})
+       Debugf(string, ...interface{})
+}
+
+var nextSpam = map[string]time.Time{}
+var nextSpamMtx sync.Mutex
+
+func unspam(msg string) bool {
+       nextSpamMtx.Lock()
+       defer nextSpamMtx.Unlock()
+       if nextSpam[msg].Before(time.Now()) {
+               nextSpam[msg] = time.Now().Add(time.Minute)
+               return true
+       }
+       return false
+}
index 1c36d6cf5bb770cb447b6f7f177d39c5ff7ef469..3bada3baf37590b93a01271bac9baa43f57acf23 100644 (file)
@@ -15,10 +15,9 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
-var (
-       ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
-       discountConfiguredRAMPercent  = 5
-)
+var ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+
+var discountConfiguredRAMPercent = 5
 
 // ConstraintsNotSatisfiableError includes a list of available instance types
 // to be reported back to the user.
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644 (file)
index 0000000..a4b005e
--- /dev/null
@@ -0,0 +1,79 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a cloud provider, a stale-lock fixer, and a syncer.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run a stale-lock fixer.
+// 5. Start a scheduler.
+// 6. Start a syncer.
+//
+//
+// A provider (cloud driver) creates new cloud VM instances and gets
+// the latest list of instances. The returned instances implement
+// proxies to the provider's metadata and control interfaces (get IP
+// address, update tags, shutdown).
+//
+//
+// A workerPool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// A worker maintains a multiplexed SSH connection to a cloud
+// instance, retrying/reconnecting as needed, so the workerPool can
+// execute commands. It asks the provider's instance to verify its SSH
+// public key once when first connecting, and again later if the key
+// changes.
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// A stale-lock fixer waits for any already-locked containers (i.e.,
+// locked by a prior server process) to appear on workers as the
+// worker pool recovers its state. It unlocks/requeues any that still
+// remain when all workers are recovered or shutdown, or its timer
+// expires.
+//
+//
+// A scheduler chooses which containers to assign to which idle
+// workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// A syncer updates state to Cancelled when a running container
+// process dies without finalizing its entry in the controller
+// database. It also calls the worker pool to kill containers that
+// have priority=0 while locked or running.
+//
+//
+// A provider proxy wraps a provider with rate-limiting logic. After
+// the wrapped provider receives a cloud.RateLimitError, the proxy
+// starts returning errors to callers immediately without calling
+// through to the wrapped provider.
+//
+//
+// TBD: Bootstrapping script via SSH, too? Future version.
+//
+// TBD: drain instance, keep instance alive
+// TBD: metrics, diagnostics
+// TBD: why dispatch token currently passed to worker?
+//
+// Metrics: queue size, time job has been in queued, #idle/busy/booting nodes
+// Timing in each step, and end-to-end
+// Metrics: boot/idle/alloc time and cost
diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go
new file mode 100644 (file)
index 0000000..e9644ae
--- /dev/null
@@ -0,0 +1,57 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+// FixStaleLocks waits for any already-locked containers (i.e., locked
+// by a prior dispatcher process) to appear on workers as the worker
+// pool recovers its state. It unlocks any that still remain when all
+// workers are recovered or shutdown, or its timer expires.
+func FixStaleLocks(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, limit time.Duration) {
+       wp := pool.Subscribe()
+       defer pool.Unsubscribe(wp)
+       timeout := time.NewTimer(limit)
+waiting:
+       for {
+               unlock := false
+               select {
+               case <-wp:
+                       // If all workers have been contacted, unlock
+                       // containers that aren't claimed by any
+                       // worker.
+                       unlock = pool.Workers()[worker.StateUnknown] == 0
+               case <-timeout.C:
+                       // Give up and unlock the containers, even
+                       // though they might be working.
+                       unlock = true
+               }
+
+               running := pool.Running()
+               qEntries, _ := queue.Entries()
+               for uuid, ent := range qEntries {
+                       if ent.Container.State != arvados.ContainerStateLocked {
+                               continue
+                       }
+                       if _, running := running[uuid]; running {
+                               continue
+                       }
+                       if !unlock {
+                               continue waiting
+                       }
+                       err := queue.Unlock(uuid)
+                       if err != nil {
+                               logger.Warnf("Unlock %s: %s", uuid, err)
+                       }
+               }
+               return
+       }
+}
diff --git a/lib/dispatchcloud/scheduler/gocheck_test.go b/lib/dispatchcloud/scheduler/gocheck_test.go
new file mode 100644 (file)
index 0000000..558c60f
--- /dev/null
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "testing"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go
new file mode 100644 (file)
index 0000000..bdb8678
--- /dev/null
@@ -0,0 +1,40 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// A ContainerQueue is a set of containers that need to be started or
+// stopped. Implemented by container.Queue and test stubs.
+type ContainerQueue interface {
+       Entries() (entries map[string]container.QueueEnt, updated time.Time)
+       Lock(uuid string) error
+       Unlock(uuid string) error
+       Cancel(uuid string) error
+       Forget(uuid string)
+       Get(uuid string) (arvados.Container, bool)
+}
+
+// A WorkerPool asynchronously starts and stops worker VMs, and starts
+// and stops containers on them. Implemented by worker.Pool and test
+// stubs.
+type WorkerPool interface {
+       Running() map[string]time.Time
+       Unallocated() map[arvados.InstanceType]int
+       Workers() map[worker.State]int
+       AtQuota() bool
+       Create(arvados.InstanceType) error
+       Shutdown(arvados.InstanceType) bool
+       StartContainer(arvados.InstanceType, arvados.Container) bool
+       KillContainer(uuid string)
+       Subscribe() <-chan struct{}
+       Unsubscribe(<-chan struct{})
+}
diff --git a/lib/dispatchcloud/scheduler/map.go b/lib/dispatchcloud/scheduler/map.go
new file mode 100644 (file)
index 0000000..5742c1f
--- /dev/null
@@ -0,0 +1,144 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package scheduler uses a resizable worker pool to execute
+// containers in priority order.
+//
+// Scheduler functions must not be called concurrently using the same
+// queue or pool.
+package scheduler
+
+import (
+       "sort"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+// Map maps queued containers onto unallocated workers in priority
+// order, creating new workers if needed. It locks containers that can
+// be mapped onto existing/pending workers, and starts them if
+// possible.
+//
+// Map unlocks any containers that are locked but can't be
+// mapped. (For example, this happens when the cloud provider reaches
+// quota/capacity and a previously mappable container's priority is
+// surpassed by a newer container.)
+//
+// If it encounters errors while creating new workers, Map shuts down
+// idle workers, in case they are consuming quota.
+//
+// Map should not be called without first calling FixStaleLocks.
+func Map(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
+       unsorted, _ := queue.Entries()
+       sorted := make([]container.QueueEnt, 0, len(unsorted))
+       for _, ent := range unsorted {
+               sorted = append(sorted, ent)
+       }
+       sort.Slice(sorted, func(i, j int) bool {
+               return sorted[i].Container.Priority > sorted[j].Container.Priority
+       })
+
+       running := pool.Running()
+       unalloc := pool.Unallocated()
+
+       logger.WithFields(logrus.Fields{
+               "Containers": len(sorted),
+               "Processes":  len(running),
+       }).Debug("mapping")
+
+       dontstart := map[arvados.InstanceType]bool{}
+       var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
+
+       for i, ctr := range sorted {
+               ctr, it := ctr.Container, ctr.InstanceType
+               logger := logger.WithFields(logrus.Fields{
+                       "ContainerUUID": ctr.UUID,
+                       "InstanceType":  it.Name,
+               })
+               if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
+                       continue
+               }
+               if ctr.State == arvados.ContainerStateQueued {
+                       logger.Debugf("locking")
+                       if unalloc[it] < 1 && pool.AtQuota() {
+                               overquota = sorted[i:]
+                               break
+                       }
+                       err := queue.Lock(ctr.UUID)
+                       if err != nil {
+                               logger.WithError(err).Warnf("lock error")
+                               unalloc[it]++
+                               continue
+                       }
+                       var ok bool
+                       ctr, ok = queue.Get(ctr.UUID)
+                       if !ok {
+                               logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+                               continue
+                       }
+                       if ctr.State != arvados.ContainerStateLocked {
+                               logger.Debugf("(race?) container has state=%q after Lock succeeded", ctr.State)
+                       }
+               }
+               if ctr.State != arvados.ContainerStateLocked {
+                       continue
+               }
+               if unalloc[it] < 1 {
+                       logger.Info("creating new instance")
+                       err := pool.Create(it)
+                       if err != nil {
+                               if _, ok := err.(cloud.QuotaError); !ok {
+                                       logger.WithError(err).Warn("error creating worker")
+                               }
+                               queue.Unlock(ctr.UUID)
+                               // Don't let lower-priority containers
+                               // starve this one by using keeping
+                               // idle workers alive on different
+                               // instance types.  TODO: avoid
+                               // getting starved here if instances
+                               // of a specific type always fail.
+                               overquota = sorted[i:]
+                               break
+                       }
+                       unalloc[it]++
+               }
+               if dontstart[it] {
+                       // We already tried & failed to start a
+                       // higher-priority container on the same
+                       // instance type. Don't let this one sneak in
+                       // ahead of it.
+               } else if pool.StartContainer(it, ctr) {
+                       unalloc[it]--
+               } else {
+                       dontstart[it] = true
+               }
+       }
+
+       if len(overquota) > 0 {
+               // Unlock any containers that are unmappable while
+               // we're at quota.
+               for _, ctr := range overquota {
+                       ctr := ctr.Container
+                       if ctr.State == arvados.ContainerStateLocked {
+                               logger := logger.WithField("ContainerUUID", ctr.UUID)
+                               logger.Debug("unlock because pool capacity is used by higher priority containers")
+                               err := queue.Unlock(ctr.UUID)
+                               if err != nil {
+                                       logger.WithError(err).Warn("error unlocking")
+                               }
+                       }
+               }
+               // Shut down idle workers that didn't get any
+               // containers mapped onto them before we hit quota.
+               for it, n := range unalloc {
+                       if n < 1 {
+                               continue
+                       }
+                       pool.Shutdown(it)
+               }
+       }
+}
diff --git a/lib/dispatchcloud/scheduler/map_test.go b/lib/dispatchcloud/scheduler/map_test.go
new file mode 100644 (file)
index 0000000..c40b304
--- /dev/null
@@ -0,0 +1,259 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "errors"
+       "fmt"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+var (
+       logger = logrus.StandardLogger()
+
+       // arbitrary example instance types
+       types = func() (r []arvados.InstanceType) {
+               for i := 0; i < 16; i++ {
+                       r = append(r, test.InstanceType(i))
+               }
+               return
+       }()
+
+       // arbitrary example container UUIDs
+       uuids = func() (r []string) {
+               for i := 0; i < 16; i++ {
+                       r = append(r, test.ContainerUUID(i))
+               }
+               return
+       }()
+)
+
+type stubQueue struct {
+       ents map[string]container.QueueEnt
+}
+
+func (q *stubQueue) Entries() map[string]container.QueueEnt {
+       return q.ents
+}
+func (q *stubQueue) Lock(uuid string) error {
+       return q.setState(uuid, arvados.ContainerStateLocked)
+}
+func (q *stubQueue) Unlock(uuid string) error {
+       return q.setState(uuid, arvados.ContainerStateQueued)
+}
+func (q *stubQueue) Get(uuid string) (arvados.Container, bool) {
+       ent, ok := q.ents[uuid]
+       return ent.Container, ok
+}
+func (q *stubQueue) setState(uuid string, state arvados.ContainerState) error {
+       ent, ok := q.ents[uuid]
+       if !ok {
+               return fmt.Errorf("no such ent: %q", uuid)
+       }
+       ent.Container.State = state
+       q.ents[uuid] = ent
+       return nil
+}
+
+type stubQuotaError struct {
+       error
+}
+
+func (stubQuotaError) IsQuotaError() bool { return true }
+
+type stubPool struct {
+       notify    <-chan struct{}
+       unalloc   map[arvados.InstanceType]int // idle+booting+unknown
+       idle      map[arvados.InstanceType]int
+       running   map[string]bool
+       atQuota   bool
+       canCreate int
+       creates   []arvados.InstanceType
+       starts    []string
+       shutdowns int
+}
+
+func (p *stubPool) AtQuota() bool               { return p.atQuota }
+func (p *stubPool) Subscribe() <-chan struct{}  { return p.notify }
+func (p *stubPool) Unsubscribe(<-chan struct{}) {}
+func (p *stubPool) Running() map[string]bool    { return p.running }
+func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
+       r := map[arvados.InstanceType]int{}
+       for it, n := range p.unalloc {
+               r[it] = n
+       }
+       return r
+}
+func (p *stubPool) Create(it arvados.InstanceType) error {
+       p.creates = append(p.creates, it)
+       if p.canCreate < 1 {
+               return stubQuotaError{errors.New("quota")}
+       }
+       p.canCreate--
+       p.unalloc[it]++
+       return nil
+}
+func (p *stubPool) Shutdown(arvados.InstanceType) bool {
+       p.shutdowns++
+       return false
+}
+func (p *stubPool) Workers() map[worker.State]int {
+       return map[worker.State]int{
+               worker.StateBooting: len(p.unalloc) - len(p.idle),
+               worker.StateRunning: len(p.idle) - len(p.running),
+       }
+}
+func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+       p.starts = append(p.starts, ctr.UUID)
+       if p.idle[it] == 0 {
+               return false
+       }
+       p.idle[it]--
+       p.unalloc[it]--
+       p.running[ctr.UUID] = true
+       return true
+}
+
+var _ = check.Suite(&SchedulerSuite{})
+
+type SchedulerSuite struct{}
+
+// Map priority=4 container to idle node. Create a new instance for
+// the priority=3 container. Don't try to start any priority<3
+// containers because priority=3 container didn't start
+// immediately. Don't try to create any other nodes after the failed
+// create.
+func (*SchedulerSuite) TestMapIdle(c *check.C) {
+       queue := stubQueue{
+               ents: map[string]container.QueueEnt{
+                       uuids[1]: {
+                               Container:    arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
+                               InstanceType: types[1],
+                       },
+                       uuids[2]: {
+                               Container:    arvados.Container{UUID: uuids[2], Priority: 2, State: arvados.ContainerStateQueued},
+                               InstanceType: types[1],
+                       },
+                       uuids[3]: {
+                               Container:    arvados.Container{UUID: uuids[3], Priority: 3, State: arvados.ContainerStateQueued},
+                               InstanceType: types[1],
+                       },
+                       uuids[4]: {
+                               Container:    arvados.Container{UUID: uuids[4], Priority: 4, State: arvados.ContainerStateQueued},
+                               InstanceType: types[1],
+                       },
+               },
+       }
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       types[1]: 1,
+                       types[2]: 2,
+               },
+               idle: map[arvados.InstanceType]int{
+                       types[1]: 1,
+                       types[2]: 2,
+               },
+               running:   map[string]bool{},
+               canCreate: 1,
+       }
+       Map(logger, &queue, &pool)
+       c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[1]})
+       c.Check(pool.starts, check.DeepEquals, []string{uuids[4], uuids[3]})
+       c.Check(pool.running, check.DeepEquals, map[string]bool{uuids[4]: true})
+}
+
+// Shutdown some nodes if Create() fails -- and without even calling
+// Create(), if AtQuota() is true.
+func (*SchedulerSuite) TestMapShutdownAtQuota(c *check.C) {
+       for quota := 0; quota < 2; quota++ {
+               shouldCreate := types[1 : 1+quota]
+               queue := stubQueue{
+                       ents: map[string]container.QueueEnt{
+                               uuids[1]: {
+                                       Container:    arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
+                                       InstanceType: types[1],
+                               },
+                       },
+               }
+               pool := stubPool{
+                       atQuota: quota == 0,
+                       unalloc: map[arvados.InstanceType]int{
+                               types[2]: 2,
+                       },
+                       idle: map[arvados.InstanceType]int{
+                               types[2]: 2,
+                       },
+                       running:   map[string]bool{},
+                       creates:   []arvados.InstanceType{},
+                       starts:    []string{},
+                       canCreate: 0,
+               }
+               Map(logger, &queue, &pool)
+               c.Check(pool.creates, check.DeepEquals, shouldCreate)
+               c.Check(pool.starts, check.DeepEquals, []string{})
+               c.Check(pool.shutdowns, check.Not(check.Equals), 0)
+       }
+}
+
+// Start lower-priority containers while waiting for new/existing
+// workers to come up for higher-priority containers.
+func (*SchedulerSuite) TestMapStartWhileCreating(c *check.C) {
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       types[1]: 1,
+                       types[2]: 1,
+               },
+               idle: map[arvados.InstanceType]int{
+                       types[1]: 1,
+                       types[2]: 1,
+               },
+               running:   map[string]bool{},
+               canCreate: 2,
+       }
+       queue := stubQueue{
+               ents: map[string]container.QueueEnt{
+                       uuids[1]: {
+                               // create a new worker
+                               Container:    arvados.Container{UUID: uuids[1], Priority: 1, State: arvados.ContainerStateQueued},
+                               InstanceType: types[1],
+                       },
+                       uuids[2]: {
+                               // tentatively map to unalloc worker
+                               Container:    arvados.Container{UUID: uuids[2], Priority: 2, State: arvados.ContainerStateQueued},
+                               InstanceType: types[1],
+                       },
+                       uuids[3]: {
+                               // start now on idle worker
+                               Container:    arvados.Container{UUID: uuids[3], Priority: 3, State: arvados.ContainerStateQueued},
+                               InstanceType: types[1],
+                       },
+                       uuids[4]: {
+                               // create a new worker
+                               Container:    arvados.Container{UUID: uuids[4], Priority: 4, State: arvados.ContainerStateQueued},
+                               InstanceType: types[2],
+                       },
+                       uuids[5]: {
+                               // tentatively map to unalloc worker
+                               Container:    arvados.Container{UUID: uuids[5], Priority: 5, State: arvados.ContainerStateQueued},
+                               InstanceType: types[2],
+                       },
+                       uuids[6]: {
+                               // start now on idle worker
+                               Container:    arvados.Container{UUID: uuids[6], Priority: 6, State: arvados.ContainerStateQueued},
+                               InstanceType: types[2],
+                       },
+               },
+       }
+       Map(logger, &queue, &pool)
+       c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[2], types[1]})
+       c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
+       c.Check(pool.running, check.DeepEquals, map[string]bool{uuids[3]: true, uuids[6]: true})
+}
diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go
new file mode 100644 (file)
index 0000000..00e2a30
--- /dev/null
@@ -0,0 +1,85 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+// Sync resolves discrepancies between the queue and the pool:
+//
+// Lingering crunch-run processes for finalized and unlocked/requeued
+// containers are killed.
+//
+// Locked containers whose crunch-run processes have exited are
+// requeued.
+//
+// Running containers whose crunch-run processes have exited are
+// cancelled.
+//
+// Sync must not be called concurrently with other calls to Map or
+// Sync using the same queue or pool.
+func Sync(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool) {
+       running := pool.Running()
+       cancel := func(ent container.QueueEnt, reason string) {
+               uuid := ent.Container.UUID
+               logger := logger.WithField("ContainerUUID", uuid)
+               logger.Infof("cancelling container because %s", reason)
+               err := queue.Cancel(uuid)
+               if err != nil {
+                       logger.WithError(err).Print("error cancelling container")
+               }
+       }
+       kill := func(ent container.QueueEnt) {
+               uuid := ent.Container.UUID
+               logger := logger.WithField("ContainerUUID", uuid)
+               logger.Debugf("killing crunch-run process because state=%q", ent.Container.State)
+               pool.KillContainer(uuid)
+       }
+       qEntries, qUpdated := queue.Entries()
+       for uuid, ent := range qEntries {
+               exited, running := running[uuid]
+               switch ent.Container.State {
+               case arvados.ContainerStateRunning:
+                       if !running {
+                               cancel(ent, "not running on any worker")
+                       } else if !exited.IsZero() && qUpdated.After(exited) {
+                               cancel(ent, "state=\"Running\" after crunch-run exited")
+                       }
+               case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
+                       if running {
+                               kill(ent)
+                       } else {
+                               logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "State":         ent.Container.State,
+                               }).Info("container finished")
+                               queue.Forget(uuid)
+                       }
+               case arvados.ContainerStateQueued:
+                       if running {
+                               kill(ent)
+                       }
+               case arvados.ContainerStateLocked:
+                       if running && !exited.IsZero() && qUpdated.After(exited) {
+                               logger = logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "Exited":        time.Since(exited).Seconds(),
+                               })
+                               logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
+                               err := queue.Unlock(uuid)
+                               if err != nil {
+                                       logger.WithError(err).Info("error requeueing container")
+                               }
+                       }
+               default:
+                       logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+               }
+       }
+}
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
new file mode 100644 (file)
index 0000000..804ae6f
--- /dev/null
@@ -0,0 +1,172 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package ssh_executor provides an implementation of pool.Executor
+// using a long-lived multiplexed SSH session.
+package ssh_executor
+
+import (
+       "bytes"
+       "errors"
+       "io"
+       "net"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "golang.org/x/crypto/ssh"
+)
+
+// New returns a new Executor, using the given target.
+func New(t cloud.ExecutorTarget) *Executor {
+       return &Executor{target: t}
+}
+
+// An Executor uses a multiplexed SSH connection to execute shell
+// commands on a remote target. It reconnects automatically after
+// errors.
+//
+// When setting up a connection, the Executor accepts whatever host
+// key is provided by the remote server, then passes the received key
+// and the SSH connection to the target's VerifyHostKey method before
+// executing commands on the connection.
+//
+// A zero Executor must not be used before calling SetTarget.
+//
+// An Executor must not be copied.
+type Executor struct {
+       target  cloud.ExecutorTarget
+       signers []ssh.Signer
+       mtx     sync.RWMutex // controls access to instance after creation
+
+       client      *ssh.Client
+       clientErr   error
+       clientOnce  sync.Once     // initialized private state
+       clientSetup chan bool     // len>0 while client setup is in progress
+       hostKey     ssh.PublicKey // most recent host key that passed verification, if any
+}
+
+// SetSigners updates the set of private keys that will be offered to
+// the target next time the Executor sets up a new connection.
+func (exr *Executor) SetSigners(signers ...ssh.Signer) {
+       exr.mtx.Lock()
+       defer exr.mtx.Unlock()
+       exr.signers = signers
+}
+
+// SetTarget sets the current target. The new target will be used next
+// time a new connection is set up; until then, the Executor will
+// continue to use the existing target.
+//
+// The new target is assumed to represent the same host as the
+// previous target, although its address and host key might differ.
+func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
+       exr.mtx.Lock()
+       defer exr.mtx.Unlock()
+       exr.target = t
+}
+
+// Target returns the current target.
+func (exr *Executor) Target() cloud.ExecutorTarget {
+       exr.mtx.RLock()
+       defer exr.mtx.RUnlock()
+       return exr.target
+}
+
+// Execute runs cmd on the target. If an existing connection is not
+// usable, it sets up a new connection to the current target.
+func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+       session, err := exr.newSession()
+       if err != nil {
+               return nil, nil, err
+       }
+       defer session.Close()
+       var stdout, stderr bytes.Buffer
+       session.Stdin = stdin
+       session.Stdout = &stdout
+       session.Stderr = &stderr
+       err = session.Run(cmd)
+       return stdout.Bytes(), stderr.Bytes(), err
+}
+
+// Create a new SSH session. If session setup fails or the SSH client
+// hasn't been setup yet, setup a new SSH client and try again.
+func (exr *Executor) newSession() (*ssh.Session, error) {
+       try := func(create bool) (*ssh.Session, error) {
+               client, err := exr.sshClient(create)
+               if err != nil {
+                       return nil, err
+               }
+               return client.NewSession()
+       }
+       session, err := try(false)
+       if err != nil {
+               session, err = try(true)
+       }
+       return session, err
+}
+
+// Get the latest SSH client. If another goroutine is in the process
+// of setting one up, wait for it to finish and return its result (or
+// the last successfully setup client, if it fails).
+func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
+       exr.clientOnce.Do(func() {
+               exr.clientSetup = make(chan bool, 1)
+               exr.clientErr = errors.New("client not yet created")
+       })
+       defer func() { <-exr.clientSetup }()
+       select {
+       case exr.clientSetup <- true:
+               if create {
+                       client, err := exr.setupSSHClient()
+                       if err == nil || exr.client == nil {
+                               exr.client, exr.clientErr = client, err
+                       }
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       default:
+               // Another goroutine is doing the above case.  Wait
+               // for it to finish and return whatever it leaves in
+               // wkr.client.
+               exr.clientSetup <- true
+       }
+       return exr.client, exr.clientErr
+}
+
+// Create a new SSH client.
+func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
+       target := exr.Target()
+       addr := target.Address()
+       if addr == "" {
+               return nil, errors.New("instance has no address")
+       }
+       var receivedKey ssh.PublicKey
+       client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+               User: "root",
+               Auth: []ssh.AuthMethod{
+                       ssh.PublicKeys(exr.signers...),
+               },
+               HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+                       receivedKey = key
+                       return nil
+               },
+               Timeout: time.Minute,
+       })
+       if err != nil {
+               return nil, err
+       } else if receivedKey == nil {
+               return nil, errors.New("BUG: key was never provided to HostKeyCallback")
+       }
+
+       if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
+               err = target.VerifyHostKey(receivedKey, client)
+               if err != nil {
+                       return nil, err
+               }
+               exr.hostKey = receivedKey
+       }
+       return client, nil
+}
diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
new file mode 100644 (file)
index 0000000..8dabfec
--- /dev/null
@@ -0,0 +1,102 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ssh_executor
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+       "sync"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&ExecutorSuite{})
+
+type testTarget struct {
+       test.SSHService
+}
+
+func (*testTarget) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+       return nil
+}
+
+type ExecutorSuite struct{}
+
+func (s *ExecutorSuite) TestExecute(c *check.C) {
+       command := `foo 'bar' "baz"`
+       stdinData := "foobar\nbaz\n"
+       _, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
+       clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
+       for _, exitcode := range []int{0, 1, 2} {
+               srv := &testTarget{
+                       SSHService: test.SSHService{
+                               Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+                                       c.Check(cmd, check.Equals, command)
+                                       var wg sync.WaitGroup
+                                       wg.Add(2)
+                                       go func() {
+                                               io.WriteString(stdout, "stdout\n")
+                                               wg.Done()
+                                       }()
+                                       go func() {
+                                               io.WriteString(stderr, "stderr\n")
+                                               wg.Done()
+                                       }()
+                                       buf, err := ioutil.ReadAll(stdin)
+                                       wg.Wait()
+                                       c.Check(err, check.IsNil)
+                                       if err != nil {
+                                               return 99
+                                       }
+                                       _, err = stdout.Write(buf)
+                                       c.Check(err, check.IsNil)
+                                       return uint32(exitcode)
+                               },
+                               HostKey:        hostpriv,
+                               AuthorizedKeys: []ssh.PublicKey{clientpub},
+                       },
+               }
+               err := srv.Start()
+               c.Check(err, check.IsNil)
+               c.Logf("srv address %q", srv.Address())
+               defer srv.Close()
+
+               exr := New(srv)
+               exr.SetSigners(clientpriv)
+
+               done := make(chan bool)
+               go func() {
+                       stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+                       if exitcode == 0 {
+                               c.Check(err, check.IsNil)
+                       } else {
+                               c.Check(err, check.NotNil)
+                               err, ok := err.(*ssh.ExitError)
+                               c.Assert(ok, check.Equals, true)
+                               c.Check(err.ExitStatus(), check.Equals, exitcode)
+                       }
+                       c.Check(stdout, check.DeepEquals, []byte("stdout\n"+stdinData))
+                       c.Check(stderr, check.DeepEquals, []byte("stderr\n"))
+                       close(done)
+               }()
+
+               timeout := time.NewTimer(time.Second)
+               select {
+               case <-done:
+               case <-timeout.C:
+                       c.Fatal("timed out")
+               }
+       }
+}
diff --git a/lib/dispatchcloud/test/doc.go b/lib/dispatchcloud/test/doc.go
new file mode 100644 (file)
index 0000000..12f3b16
--- /dev/null
@@ -0,0 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package test provides fakes and other tools for testing cloud
+// drivers and other dispatcher modules.
+package test
diff --git a/lib/dispatchcloud/test/fixtures.go b/lib/dispatchcloud/test/fixtures.go
new file mode 100644 (file)
index 0000000..7d65ca0
--- /dev/null
@@ -0,0 +1,27 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// ContainerUUID returns a fake container UUID.
+func ContainerUUID(i int) string {
+       return fmt.Sprintf("zzzzz-dz642-%015d", i)
+}
+
+// InstanceType returns a fake arvados.InstanceType called "type{i}"
+// with i CPUs and i GiB of memory.
+func InstanceType(i int) arvados.InstanceType {
+       return arvados.InstanceType{
+               Name:         fmt.Sprintf("type%d", i),
+               ProviderType: fmt.Sprintf("providertype%d", i),
+               VCPUs:        i,
+               RAM:          arvados.ByteSize(i) << 30,
+       }
+}
diff --git a/lib/dispatchcloud/test/lame_instance_set.go b/lib/dispatchcloud/test/lame_instance_set.go
new file mode 100644 (file)
index 0000000..baab407
--- /dev/null
@@ -0,0 +1,118 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+       "math/rand"
+       "sync"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+// LameInstanceSet creates instances that boot but can't run
+// containers.
+type LameInstanceSet struct {
+       Hold chan bool // set to make(chan bool) to hold operations until Release is called
+
+       mtx       sync.Mutex
+       instances map[*lameInstance]bool
+}
+
+// Create returns a new instance.
+func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
+       inst := &lameInstance{
+               p:            p,
+               id:           cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
+               providerType: instType.ProviderType,
+       }
+       inst.SetTags(tags)
+       if p.Hold != nil {
+               p.Hold <- true
+       }
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       if p.instances == nil {
+               p.instances = map[*lameInstance]bool{}
+       }
+       p.instances[inst] = true
+       return inst, nil
+}
+
+// Instances returns the instances that haven't been destroyed.
+func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       var instances []cloud.Instance
+       for i := range p.instances {
+               instances = append(instances, i)
+       }
+       return instances, nil
+}
+
+// Stop is a no-op, but exists to satisfy cloud.InstanceSet.
+func (p *LameInstanceSet) Stop() {
+}
+
+// Release n held calls. Blocks if n calls aren't already
+// waiting. Blocks forever if Hold is nil.
+func (p *LameInstanceSet) Release(n int) {
+       for i := 0; i < n; i++ {
+               <-p.Hold
+       }
+}
+
+type lameInstance struct {
+       p            *LameInstanceSet
+       id           cloud.InstanceID
+       providerType string
+       tags         cloud.InstanceTags
+}
+
+func (inst *lameInstance) ID() cloud.InstanceID {
+       return inst.id
+}
+
+func (inst *lameInstance) String() string {
+       return fmt.Sprint(inst.id)
+}
+
+func (inst *lameInstance) ProviderType() string {
+       return inst.providerType
+}
+
+func (inst *lameInstance) Address() string {
+       return "0.0.0.0:1234"
+}
+
+func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
+       inst.p.mtx.Lock()
+       defer inst.p.mtx.Unlock()
+       inst.tags = cloud.InstanceTags{}
+       for k, v := range tags {
+               inst.tags[k] = v
+       }
+       return nil
+}
+
+func (inst *lameInstance) Destroy() error {
+       if inst.p.Hold != nil {
+               inst.p.Hold <- true
+       }
+       inst.p.mtx.Lock()
+       defer inst.p.mtx.Unlock()
+       delete(inst.p.instances, inst)
+       return nil
+}
+
+func (inst *lameInstance) Tags() cloud.InstanceTags {
+       return inst.tags
+}
+
+func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+       return nil
+}
diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go
new file mode 100644 (file)
index 0000000..909f561
--- /dev/null
@@ -0,0 +1,116 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Queue is a test stub for container.Queue. The caller specifies the
+// initial queue state.
+type Queue struct {
+       // Containers represent the API server database contents.
+       Containers []arvados.Container
+
+       // ChooseType will be called for each entry in Containers. It
+       // must not be nil.
+       ChooseType func(*arvados.Container) (arvados.InstanceType, error)
+
+       entries map[string]container.QueueEnt
+       updTime time.Time
+}
+
+// Entries returns the containers that were queued when Update was
+// last called.
+func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
+       updTime := q.updTime
+       r := map[string]container.QueueEnt{}
+       for uuid, ent := range q.entries {
+               r[uuid] = ent
+       }
+       return r, updTime
+}
+
+// Get returns the container from the cached queue, i.e., as it was
+// when Update was last called -- just like a container.Queue does. If
+// the state has been changed (via Lock, Unlock, or Cancel) since the
+// last Update, the updated state is returned.
+func (q *Queue) Get(uuid string) (arvados.Container, bool) {
+       ent, ok := q.entries[uuid]
+       return ent.Container, ok
+}
+
+func (q *Queue) Forget(uuid string) {
+       delete(q.entries, uuid)
+}
+
+func (q *Queue) Lock(uuid string) error {
+       return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
+}
+
+func (q *Queue) Unlock(uuid string) error {
+       return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
+}
+
+func (q *Queue) Cancel(uuid string) error {
+       return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
+}
+
+func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
+       ent := q.entries[uuid]
+       if ent.Container.State != from {
+               return fmt.Errorf("lock failed: state=%q", ent.Container.State)
+       }
+       ent.Container.State = to
+       q.entries[uuid] = ent
+       for i, ctr := range q.Containers {
+               if ctr.UUID == uuid {
+                       q.Containers[i].State = to
+                       break
+               }
+       }
+       return nil
+}
+
+// Update rebuilds the current entries from the Containers slice.
+func (q *Queue) Update() error {
+       updTime := time.Now()
+       upd := map[string]container.QueueEnt{}
+       for _, ctr := range q.Containers {
+               _, exists := q.entries[ctr.UUID]
+               if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
+                       continue
+               }
+               it, _ := q.ChooseType(&ctr)
+               upd[ctr.UUID] = container.QueueEnt{
+                       Container:    ctr,
+                       InstanceType: it,
+               }
+       }
+       q.entries = upd
+       q.updTime = updTime
+       return nil
+}
+
+// Notify adds/updates an entry in the Containers slice.  This
+// simulates the effect of an API update from someone other than the
+// dispatcher -- e.g., crunch-run updating state to "Complete" when a
+// container exits.
+//
+// The resulting changes are not exposed through Get() or Entries()
+// until the next call to Update().
+func (q *Queue) Notify(upd arvados.Container) {
+       for i, ctr := range q.Containers {
+               if ctr.UUID == upd.UUID {
+                       q.Containers[i] = upd
+                       return
+               }
+       }
+       q.Containers = append(q.Containers, upd)
+}
diff --git a/lib/dispatchcloud/test/ssh_service.go b/lib/dispatchcloud/test/ssh_service.go
new file mode 100644 (file)
index 0000000..b1e4e03
--- /dev/null
@@ -0,0 +1,169 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "net"
+       "strings"
+       "sync"
+
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) {
+       rawpubkey, err := ioutil.ReadFile(fnm + ".pub")
+       c.Assert(err, check.IsNil)
+       pubkey, _, _, _, err := ssh.ParseAuthorizedKey(rawpubkey)
+       c.Assert(err, check.IsNil)
+       rawprivkey, err := ioutil.ReadFile(fnm)
+       c.Assert(err, check.IsNil)
+       privkey, err := ssh.ParsePrivateKey(rawprivkey)
+       c.Assert(err, check.IsNil)
+       return pubkey, privkey
+}
+
+// An SSHExecFunc handles an "exec" session on a multiplexed SSH
+// connection.
+type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+
+// An SSHService accepts SSH connections on an available TCP port and
+// passes clients' "exec" sessions to the provided SSHExecFunc.
+type SSHService struct {
+       Exec           SSHExecFunc
+       HostKey        ssh.Signer
+       AuthorizedKeys []ssh.PublicKey
+
+       listener net.Listener
+       conn     *ssh.ServerConn
+       setup    sync.Once
+       mtx      sync.Mutex
+       started  chan bool
+       closed   bool
+       err      error
+}
+
+// Address returns the host:port where the SSH server is listening. It
+// returns "" if called before the server is ready to accept
+// connections.
+func (ss *SSHService) Address() string {
+       ss.setup.Do(ss.start)
+       ss.mtx.Lock()
+       ln := ss.listener
+       ss.mtx.Unlock()
+       if ln == nil {
+               return ""
+       }
+       return ln.Addr().String()
+}
+
+// Close shuts down the server and releases resources. Established
+// connections are unaffected.
+func (ss *SSHService) Close() {
+       ss.Start()
+       ss.mtx.Lock()
+       ln := ss.listener
+       ss.closed = true
+       ss.mtx.Unlock()
+       if ln != nil {
+               ln.Close()
+       }
+}
+
+// Start returns when the server is ready to accept connections.
+func (ss *SSHService) Start() error {
+       ss.setup.Do(ss.start)
+       <-ss.started
+       return ss.err
+}
+
+func (ss *SSHService) start() {
+       ss.started = make(chan bool)
+       go ss.run()
+}
+
+func (ss *SSHService) run() {
+       defer close(ss.started)
+       config := &ssh.ServerConfig{
+               PublicKeyCallback: func(c ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) {
+                       for _, ak := range ss.AuthorizedKeys {
+                               if bytes.Equal(ak.Marshal(), pubKey.Marshal()) {
+                                       return &ssh.Permissions{}, nil
+                               }
+                       }
+                       return nil, fmt.Errorf("unknown public key for %q", c.User())
+               },
+       }
+       config.AddHostKey(ss.HostKey)
+
+       listener, err := net.Listen("tcp", ":")
+       if err != nil {
+               ss.err = err
+               return
+       }
+
+       ss.mtx.Lock()
+       ss.listener = listener
+       ss.mtx.Unlock()
+
+       go func() {
+               for {
+                       nConn, err := listener.Accept()
+                       if err != nil && strings.Contains(err.Error(), "use of closed network connection") && ss.closed {
+                               return
+                       } else if err != nil {
+                               log.Printf("accept: %s", err)
+                               return
+                       }
+                       go ss.serveConn(nConn, config)
+               }
+       }()
+}
+
+func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) {
+       defer nConn.Close()
+       conn, newchans, reqs, err := ssh.NewServerConn(nConn, config)
+       if err != nil {
+               log.Printf("ssh.NewServerConn: %s", err)
+               return
+       }
+       defer conn.Close()
+       go ssh.DiscardRequests(reqs)
+       for newch := range newchans {
+               if newch.ChannelType() != "session" {
+                       newch.Reject(ssh.UnknownChannelType, "unknown channel type")
+                       continue
+               }
+               ch, reqs, err := newch.Accept()
+               if err != nil {
+                       log.Printf("accept channel: %s", err)
+                       return
+               }
+               var execReq struct {
+                       Command string
+               }
+               go func() {
+                       for req := range reqs {
+                               if req.Type == "exec" && execReq.Command == "" {
+                                       req.Reply(true, nil)
+                                       ssh.Unmarshal(req.Payload, &execReq)
+                                       go func() {
+                                               var resp struct {
+                                                       Status uint32
+                                               }
+                                               resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+                                               ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
+                                               ch.Close()
+                                       }()
+                               }
+                       }
+               }()
+       }
+}
diff --git a/lib/dispatchcloud/test/sshkey_dispatch b/lib/dispatchcloud/test/sshkey_dispatch
new file mode 100644 (file)
index 0000000..5584519
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAqYm4XsQHm8sBSZFwUX5VeW1OkGsfoNzcGPG2nzzYRhNhClYZ
+0ABHhUk82HkaC/8l6d/jpYTf42HrK42nNQ0r0Yzs7qw8yZMQioK4Yk+kFyVLF78E
+GRG4pGAWXFs6pUchs/lm8fo9zcda4R3XeqgI+NO+nEERXmdRJa1FhI+Za3/S/+CV
+mg+6O00wZz2+vKmDPptGN4MCKmQOCKsMJts7wSZGyVcTtdNv7jjfr6yPAIOIL8X7
+LtarBCFaK/pD7uWll/Uj7h7D8K48nIZUrvBJJjXL8Sm4LxCNoz3Z83k8J5ZzuDRD
+gRiQe/C085mhO6VL+2fypDLwcKt1tOL8fI81MwIDAQABAoIBACR3tEnmHsDbNOav
+Oxq8cwRQh9K2yDHg8BMJgz/TZa4FIx2HEbxVIw0/iLADtJ+Z/XzGJQCIiWQuvtg6
+exoFQESt7JUWRWkSkj9JCQJUoTY9Vl7APtBpqG7rIEQzd3TvzQcagZNRQZQO6rR7
+p8sBdBSZ72lK8cJ9tM3G7Kor/VNK7KgRZFNhEWnmvEa3qMd4hzDcQ4faOn7C9NZK
+dwJAuJVVfwOLlOORYcyEkvksLaDOK2DsB/p0AaCpfSmThRbBKN5fPXYaKgUdfp3w
+70Hpp27WWymb1cgjyqSH3DY+V/kvid+5QxgxCBRq865jPLn3FFT9bWEVS/0wvJRj
+iMIRrjECgYEA4Ffv9rBJXqVXonNQbbstd2PaprJDXMUy9/UmfHL6pkq1xdBeuM7v
+yf2ocXheA8AahHtIOhtgKqwv/aRhVK0ErYtiSvIk+tXG+dAtj/1ZAKbKiFyxjkZV
+X72BH7cTlR6As5SRRfWM/HaBGEgED391gKsI5PyMdqWWdczT5KfxAksCgYEAwXYE
+ewPmV1GaR5fbh2RupoPnUJPMj36gJCnwls7sGaXDQIpdlq56zfKgrLocGXGgj+8f
+QH7FHTJQO15YCYebtsXWwB3++iG43gVlJlecPAydsap2CCshqNWC5JU5pan0QzsP
+exzNzWqfUPSbTkR2SRaN+MenZo2Y/WqScOAth7kCgYBgVoLujW9EXH5QfXJpXLq+
+jTvE38I7oVcs0bJwOLPYGzcJtlwmwn6IYAwohgbhV2pLv+EZSs42JPEK278MLKxY
+lgVkp60npgunFTWroqDIvdc1TZDVxvA8h9VeODEJlSqxczgbMcIUXBM9yRctTI+5
+7DiKlMUA4kTFW2sWwuOlFwKBgGXvrYS0FVbFJKm8lmvMu5D5x5RpjEu/yNnFT4Pn
+G/iXoz4Kqi2PWh3STl804UF24cd1k94D7hDoReZCW9kJnz67F+C67XMW+bXi2d1O
+JIBvlVfcHb1IHMA9YG7ZQjrMRmx2Xj3ce4RVPgUGHh8ra7gvLjd72/Tpf0doNClN
+ti/hAoGBAMW5D3LhU05LXWmOqpeT4VDgqk4MrTBcstVe7KdVjwzHrVHCAmI927vI
+pjpphWzpC9m3x4OsTNf8m+g6H7f3IiQS0aiFNtduXYlcuT5FHS2fSATTzg5PBon9
+1E6BudOve+WyFyBs7hFWAqWFBdWujAl4Qk5Ek09U2ilFEPE7RTgJ
+-----END RSA PRIVATE KEY-----
diff --git a/lib/dispatchcloud/test/sshkey_dispatch.pub b/lib/dispatchcloud/test/sshkey_dispatch.pub
new file mode 100644 (file)
index 0000000..1d5c1ea
--- /dev/null
@@ -0,0 +1 @@
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCpibhexAebywFJkXBRflV5bU6Qax+g3NwY8bafPNhGE2EKVhnQAEeFSTzYeRoL/yXp3+OlhN/jYesrjac1DSvRjOzurDzJkxCKgrhiT6QXJUsXvwQZEbikYBZcWzqlRyGz+Wbx+j3Nx1rhHdd6qAj4076cQRFeZ1ElrUWEj5lrf9L/4JWaD7o7TTBnPb68qYM+m0Y3gwIqZA4Iqwwm2zvBJkbJVxO102/uON+vrI8Ag4gvxfsu1qsEIVor+kPu5aWX9SPuHsPwrjychlSu8EkmNcvxKbgvEI2jPdnzeTwnlnO4NEOBGJB78LTzmaE7pUv7Z/KkMvBwq3W04vx8jzUz tom@curve
diff --git a/lib/dispatchcloud/test/sshkey_vm b/lib/dispatchcloud/test/sshkey_vm
new file mode 100644 (file)
index 0000000..10b7ed1
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEApIfWk2StZGDtmunumIeXLJ46AQrbHHvuxrSAkQf6+zUwjB2I
+rse7ezBRHWcge9U5EsigixmhUM4ozFLnUQNwC862jbmsjbyA97arG/REECNlUrEB
+HQPYHhai5yyJ89AfjWVxKyINfW0K2HX1R8nl4kdVraAgpohPLh0dGjfwzm/BcXDG
++TxW9zRz0KCs9ZRI6s2MNdv08ahKQ0azk8gRTqMADJmYNWIo3zPQ+fhlwyr6EZJ/
+HFbRtjpajEPMJPwoVPO+Wj6wztfHDYKkPIrIWbhMl6w+tEKdsmygd3Iq94ktLS3X
+AbRCfn4njS2QSlkKFEepkUJWCSSWZgFn6DLm2wIDAQABAoIBAQCb137LxcTnG1h0
+L7isCWKMBKN0cU/xvwIAfOB6f1CfuVXuodrhkpZmrPFoJFKEeQbCX/6RQwmlfGDw
+iGZKOjNbO8V2oLRs3GxcNk4FAG2ny58hoD8puIZwmYhb57gTlMMOL1PuQyb78tkf
+Bzv5b6ermV3yQ4Ypt1solrMGLo6NOZD0oDX9p0Zt9kueIhjzgP0v5//T1F4PGHZK
++sLSsMiu9u6F+PB+Oc6uv0Zee9Lnts/QiWH5f18oEculjwKWFx+JwJWiLffGg2Bl
+vbpmvHFRoRWkHTpgSiLwSUqs0ZUWU9R5h11ROg5L39MLsxQoBvHsPEnP5ssN8jGt
+aH86EZjBAoGBAM+A5B/UjhIn9m05EhDTDRzI92hGhM8f7uAwobbnjvIQyZbWlBwj
+2TmgbJdpTGVbD+iTBIwKQdcFBbWobTCZsNMpghqA/ir4YIAnZ5OX9VQ1Bc+bWE7V
+dPmMVpCgyg+ERAe+79FrYWcI3vhnBpHCsY/9p9pGQIKDzlGTWNF1HJGjAoGBAMr7
+2CTVnFImTgD3E+rH4AAAfkz+cyqfK6BUhli/NifFYZhWCs16r9QCGSORnp4gPhMY
+3mf7VBs9rk123zOMo89eJt3adTgbZ+QIxXeXilGXpbT3w1+CJMaZRrIy80E1tB5/
+KvDZcrZ78o8XWMNUa+9k55ukvgyC24ICAmOIWNlpAoGBALEFvphBF2r52MtZUsYz
+pw4VjKvS7V5eWcW891k4tsRf+frK2NQg6SK2b63EUT5ur2W0dr6ZyY2MZVCSfYRm
+uWmMEchWn389IeZyt3Q8wTize1+foXivtflm9jqwUXFnXzpUc/du6kuiT8YO7pXP
+SPgUZ+xY3pP5qjwBvlYC2PqNAoGAZ1CKMi1bdGC0wT8BLzXuqHGX136HhcEgRmnf
+O5qPaOzJAO2CcBWrGuC6hOUgc+F7VuMIiKpeo8LgTeNcNfO2iNymMbN4iEdCuMlS
+IM3MBD2IhTS6h4lJSKBJYHgYYi+AbylQ5Of4wDMUQYqjjkAQ8/dK/2h5pwqPyXtW
+VezXNEkCgYEAq4S0++y9tjlLn+w9BIkmx3bAVRDQZIzIEwxTh+jpqaUp1J0iyseJ
+71pwqQojGNF6x8GglVXa6bMrETae21WhEeHnWmzlpCWIODsYPUQ+erjDuAWi9eGk
+HLklqSEoLB8pzC6zDqjxDw+CnGERIDSaoaeoWiNKZ95IH1WiEwYjuxU=
+-----END RSA PRIVATE KEY-----
diff --git a/lib/dispatchcloud/test/sshkey_vm.pub b/lib/dispatchcloud/test/sshkey_vm.pub
new file mode 100644 (file)
index 0000000..b9d44c9
--- /dev/null
@@ -0,0 +1 @@
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCkh9aTZK1kYO2a6e6Yh5csnjoBCtsce+7GtICRB/r7NTCMHYiux7t7MFEdZyB71TkSyKCLGaFQzijMUudRA3ALzraNuayNvID3tqsb9EQQI2VSsQEdA9geFqLnLInz0B+NZXErIg19bQrYdfVHyeXiR1WtoCCmiE8uHR0aN/DOb8FxcMb5PFb3NHPQoKz1lEjqzYw12/TxqEpDRrOTyBFOowAMmZg1YijfM9D5+GXDKvoRkn8cVtG2OlqMQ8wk/ChU875aPrDO18cNgqQ8ishZuEyXrD60Qp2ybKB3cir3iS0tLdcBtEJ+fieNLZBKWQoUR6mRQlYJJJZmAWfoMubb tom@curve
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
new file mode 100644 (file)
index 0000000..53e312e
--- /dev/null
@@ -0,0 +1,195 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "crypto/rand"
+       "errors"
+       "fmt"
+       "io"
+       math_rand "math/rand"
+       "sync"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/mitchellh/mapstructure"
+       "golang.org/x/crypto/ssh"
+)
+
+type StubExecFunc func(instance cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+
+// A StubDriver implements cloud.Driver by setting up local SSH
+// servers that pass their command execution requests to the provided
+// SSHExecFunc.
+type StubDriver struct {
+       Exec           StubExecFunc
+       HostKey        ssh.Signer
+       AuthorizedKeys []ssh.PublicKey
+       instanceSets   []*StubInstanceSet
+}
+
+// InstanceSet returns a new *StubInstanceSet.
+func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
+       sis := StubInstanceSet{
+               driver:  sd,
+               servers: map[cloud.InstanceID]*stubServer{},
+       }
+       sd.instanceSets = append(sd.instanceSets, &sis)
+       return &sis, mapstructure.Decode(params, &sis)
+}
+
+// InstanceSets returns all instances that have been created by the
+// driver. This can be used to test a component that uses the driver
+// but doesn't expose the InstanceSets it has created.
+func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
+       return sd.instanceSets
+}
+
+type StubInstanceSet struct {
+       driver  *StubDriver
+       servers map[cloud.InstanceID]*stubServer
+       mtx     sync.RWMutex
+       stopped bool
+}
+
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       if sis.stopped {
+               return nil, errors.New("StubInstanceSet: Create called after Stop")
+       }
+       ak := sis.driver.AuthorizedKeys
+       if authKey != nil {
+               ak = append([]ssh.PublicKey{authKey}, ak...)
+       }
+       var ss *stubServer
+       ss = &stubServer{
+               sis:          sis,
+               id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
+               tags:         copyTags(tags),
+               providerType: it.ProviderType,
+               SSHService: SSHService{
+                       HostKey:        sis.driver.HostKey,
+                       AuthorizedKeys: ak,
+                       Exec: func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+                               return sis.driver.Exec(ss.Instance(), command, stdin, stdout, stderr)
+                       },
+               },
+       }
+
+       sis.servers[ss.id] = ss
+       return ss.Instance(), nil
+}
+
+func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+       sis.mtx.RLock()
+       defer sis.mtx.RUnlock()
+       var r []cloud.Instance
+       for _, ss := range sis.servers {
+               r = append(r, ss.Instance())
+       }
+       return r, nil
+}
+
+func (sis *StubInstanceSet) Stop() {
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       if sis.stopped {
+               panic("Stop called twice")
+       }
+       sis.stopped = true
+}
+
+type stubServer struct {
+       sis          *StubInstanceSet
+       id           cloud.InstanceID
+       tags         cloud.InstanceTags
+       providerType string
+       SSHService   SSHService
+       sync.Mutex
+}
+
+func (ss *stubServer) Instance() stubInstance {
+       ss.Lock()
+       defer ss.Unlock()
+       return stubInstance{
+               ss:   ss,
+               addr: ss.SSHService.Address(),
+               // We deliberately return a cached/stale copy of the
+               // real tags here, so that (Instance)Tags() sometimes
+               // returns old data after a call to
+               // (Instance)SetTags().  This is permitted by the
+               // driver interface, and this might help remind
+               // callers that they need to tolerate it.
+               tags: copyTags(ss.tags),
+       }
+}
+
+type stubInstance struct {
+       ss   *stubServer
+       addr string
+       tags cloud.InstanceTags
+}
+
+func (si stubInstance) ID() cloud.InstanceID {
+       return si.ss.id
+}
+
+func (si stubInstance) Address() string {
+       return si.addr
+}
+
+func (si stubInstance) Destroy() error {
+       si.ss.SSHService.Close()
+       sis := si.ss.sis
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       delete(sis.servers, si.ss.id)
+       return nil
+}
+
+func (si stubInstance) ProviderType() string {
+       return si.ss.providerType
+}
+
+func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
+       tags = copyTags(tags)
+       ss := si.ss
+       go func() {
+               ss.Lock()
+               defer ss.Unlock()
+               ss.tags = tags
+       }()
+       return nil
+}
+
+func (si stubInstance) Tags() cloud.InstanceTags {
+       return si.tags
+}
+
+func (si stubInstance) String() string {
+       return string(si.ss.id)
+}
+
+func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
+       buf := make([]byte, 512)
+       _, err := io.ReadFull(rand.Reader, buf)
+       if err != nil {
+               return err
+       }
+       sig, err := si.ss.sis.driver.HostKey.Sign(rand.Reader, buf)
+       if err != nil {
+               return err
+       }
+       return key.Verify(buf, sig)
+}
+
+func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
+       dst := cloud.InstanceTags{}
+       for k, v := range src {
+               dst[k] = v
+       }
+       return dst
+}
diff --git a/lib/dispatchcloud/worker/gocheck_test.go b/lib/dispatchcloud/worker/gocheck_test.go
new file mode 100644 (file)
index 0000000..b4ca66c
--- /dev/null
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "testing"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
new file mode 100644 (file)
index 0000000..cf8fac3
--- /dev/null
@@ -0,0 +1,852 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "bytes"
+       "io"
+       "sort"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+// A View shows a worker's current state and recent activity.
+type View struct {
+       Instance             string
+       Price                float64
+       ArvadosInstanceType  string
+       ProviderInstanceType string
+       LastContainerUUID    string
+       Unallocated          time.Time
+       WorkerState          string
+}
+
+// An Executor executes shell commands on a remote host.
+type Executor interface {
+       // Run cmd on the current target.
+       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+
+       // Use the given target for subsequent operations. The new
+       // target is the same host as the previous target, but it
+       // might return a different address and verify a different
+       // host key.
+       //
+       // SetTarget is called frequently, and in most cases the new
+       // target will behave exactly the same as the old one. An
+       // implementation should optimize accordingly.
+       //
+       // SetTarget must not block on concurrent Execute calls.
+       SetTarget(cloud.ExecutorTarget)
+}
+
+const (
+       defaultSyncInterval       = time.Minute
+       defaultProbeInterval      = time.Second * 10
+       defaultMaxProbesPerSecond = 10
+       defaultTimeoutIdle        = time.Minute
+       defaultTimeoutBooting     = time.Minute * 10
+       defaultTimeoutProbe       = time.Minute * 10
+)
+
+func duration(conf arvados.Duration, def time.Duration) time.Duration {
+       if conf > 0 {
+               return time.Duration(conf)
+       } else {
+               return def
+       }
+}
+
+// NewPool creates a Pool of workers backed by instanceSet.
+//
+// New instances are configured and set up according to the given
+// cluster configuration.
+func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+       wp := &Pool{
+               logger:             logger,
+               instanceSet:        instanceSet,
+               newExecutor:        newExecutor,
+               bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
+               imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
+               instanceTypes:      cluster.InstanceTypes,
+               maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
+               probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
+               syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+       }
+       wp.registerMetrics(reg)
+       go wp.run()
+       return wp
+}
+
+// Pool is a resizable worker pool backed by a cloud.InstanceSet. A
+// zero Pool should not be used. Call NewPool to create a new Pool.
+type Pool struct {
+       // configuration
+       logger             logrus.FieldLogger
+       instanceSet        cloud.InstanceSet
+       newExecutor        func(cloud.Instance) Executor
+       bootProbeCommand   string
+       imageID            cloud.ImageID
+       instanceTypes      map[string]arvados.InstanceType
+       syncInterval       time.Duration
+       probeInterval      time.Duration
+       maxProbesPerSecond int
+       timeoutIdle        time.Duration
+       timeoutBooting     time.Duration
+       timeoutProbe       time.Duration
+
+       // private state
+       subscribers  map[<-chan struct{}]chan<- struct{}
+       creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+       workers      map[cloud.InstanceID]*worker
+       loaded       bool                 // loaded list of instances from InstanceSet at least once
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       atQuotaUntil time.Time
+       stop         chan bool
+       mtx          sync.RWMutex
+       setupOnce    sync.Once
+
+       mInstances         prometheus.Gauge
+       mContainersRunning prometheus.Gauge
+       mVCPUs             prometheus.Gauge
+       mVCPUsInuse        prometheus.Gauge
+       mMemory            prometheus.Gauge
+       mMemoryInuse       prometheus.Gauge
+}
+
+type worker struct {
+       state       State
+       instance    cloud.Instance
+       executor    Executor
+       instType    arvados.InstanceType
+       vcpus       int64
+       memory      int64
+       booted      bool
+       probed      time.Time
+       updated     time.Time
+       busy        time.Time
+       unallocated time.Time
+       lastUUID    string
+       running     map[string]struct{}
+       starting    map[string]struct{}
+       probing     chan struct{}
+}
+
+// Subscribe returns a channel that becomes ready whenever a worker's
+// state changes.
+//
+// Example:
+//
+//     ch := wp.Subscribe()
+//     defer wp.Unsubscribe(ch)
+//     for range ch {
+//             // ...try scheduling some work...
+//             if done {
+//                     break
+//             }
+//     }
+func (wp *Pool) Subscribe() <-chan struct{} {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       ch := make(chan struct{}, 1)
+       wp.subscribers[ch] = ch
+       return ch
+}
+
+// Unsubscribe stops sending updates to the given channel.
+func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       delete(wp.subscribers, ch)
+}
+
+// Unallocated returns the number of unallocated (creating + booting +
+// idle + unknown) workers for each instance type.
+func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       u := map[arvados.InstanceType]int{}
+       for it, c := range wp.creating {
+               u[it] = c
+       }
+       for _, wkr := range wp.workers {
+               if len(wkr.running)+len(wkr.starting) == 0 && (wkr.state == StateRunning || wkr.state == StateBooting || wkr.state == StateUnknown) {
+                       u[wkr.instType]++
+               }
+       }
+       return u
+}
+
+// Create a new instance with the given type, and add it to the worker
+// pool. The worker is added immediately; instance creation runs in
+// the background.
+func (wp *Pool) Create(it arvados.InstanceType) error {
+       logger := wp.logger.WithField("InstanceType", it.Name)
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       tags := cloud.InstanceTags{"InstanceType": it.Name}
+       wp.creating[it]++
+       go func() {
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               wp.mtx.Lock()
+               defer wp.mtx.Unlock()
+               wp.creating[it]--
+               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                       wp.atQuotaUntil = time.Now().Add(time.Minute)
+               }
+               if err != nil {
+                       logger.WithError(err).Error("create failed")
+                       go wp.notify()
+                       return
+               }
+               wp.updateWorker(inst, it, StateBooting)
+       }()
+       return nil
+}
+
+// AtQuota returns true if Create is not expected to work at the
+// moment.
+func (wp *Pool) AtQuota() bool {
+       return time.Now().Before(wp.atQuotaUntil)
+}
+
+// Add or update worker attached to the given instance. Use
+// initialState if a new worker is created. Caller must have lock.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) {
+       id := inst.ID()
+       if wp.workers[id] != nil {
+               wp.workers[id].executor.SetTarget(inst)
+               wp.workers[id].instance = inst
+               wp.workers[id].updated = time.Now()
+               if initialState == StateBooting && wp.workers[id].state == StateUnknown {
+                       wp.workers[id].state = StateBooting
+               }
+               return
+       }
+       if initialState == StateUnknown && inst.Tags()["hold"] != "" {
+               initialState = StateHold
+       }
+       wp.logger.WithFields(logrus.Fields{
+               "InstanceType": it.Name,
+               "Instance":     inst,
+               "State":        initialState,
+       }).Infof("instance appeared in cloud")
+       wp.workers[id] = &worker{
+               executor:    wp.newExecutor(inst),
+               state:       initialState,
+               instance:    inst,
+               instType:    it,
+               probed:      time.Now(),
+               busy:        time.Now(),
+               updated:     time.Now(),
+               unallocated: time.Now(),
+               running:     make(map[string]struct{}),
+               starting:    make(map[string]struct{}),
+               probing:     make(chan struct{}, 1),
+       }
+       go wp.notify()
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       logger := wp.logger.WithField("InstanceType", it.Name)
+       logger.Info("shutdown requested")
+       for _, tryState := range []State{StateBooting, StateRunning} {
+               // TODO: shutdown the worker with the longest idle
+               // time (Running) or the earliest create time
+               // (Booting)
+               for _, wkr := range wp.workers {
+                       if wkr.state != tryState || len(wkr.running)+len(wkr.starting) > 0 {
+                               continue
+                       }
+                       if wkr.instType != it {
+                               continue
+                       }
+                       logger = logger.WithField("Instance", wkr.instance)
+                       logger.Info("shutting down")
+                       wp.shutdown(wkr, logger)
+                       return true
+               }
+       }
+       return false
+}
+
+// caller must have lock
+func (wp *Pool) shutdown(wkr *worker, logger logrus.FieldLogger) {
+       wkr.updated = time.Now()
+       wkr.state = StateShutdown
+       go func() {
+               err := wkr.instance.Destroy()
+               if err != nil {
+                       logger.WithError(err).Warn("shutdown failed")
+                       return
+               }
+               wp.mtx.Lock()
+               wp.atQuotaUntil = time.Now()
+               wp.mtx.Unlock()
+               wp.notify()
+       }()
+}
+
+// Workers returns the current number of workers in each state.
+func (wp *Pool) Workers() map[State]int {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       r := map[State]int{}
+       for _, w := range wp.workers {
+               r[w.state]++
+       }
+       return r
+}
+
+// Running returns the container UUIDs being prepared/run on workers.
+func (wp *Pool) Running() map[string]time.Time {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       r := map[string]time.Time{}
+       for _, wkr := range wp.workers {
+               for uuid := range wkr.running {
+                       r[uuid] = time.Time{}
+               }
+               for uuid := range wkr.starting {
+                       r[uuid] = time.Time{}
+               }
+       }
+       for uuid, exited := range wp.exited {
+               r[uuid] = exited
+       }
+       return r
+}
+
+// StartContainer starts a container on an idle worker immediately if
+// possible, otherwise returns false.
+func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+       logger := wp.logger.WithFields(logrus.Fields{
+               "InstanceType":  it.Name,
+               "ContainerUUID": ctr.UUID,
+               "Priority":      ctr.Priority,
+       })
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       var wkr *worker
+       for _, w := range wp.workers {
+               if w.instType == it && w.state == StateRunning && len(w.running)+len(w.starting) == 0 {
+                       if wkr == nil || w.busy.After(wkr.busy) {
+                               wkr = w
+                       }
+               }
+       }
+       if wkr == nil {
+               return false
+       }
+       logger = logger.WithField("Instance", wkr.instance)
+       logger.Debug("starting container")
+       wkr.starting[ctr.UUID] = struct{}{}
+       go func() {
+               stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+               wp.mtx.Lock()
+               defer wp.mtx.Unlock()
+               wkr.updated = time.Now()
+               delete(wkr.starting, ctr.UUID)
+               wkr.running[ctr.UUID] = struct{}{}
+               if err != nil {
+                       logger.WithField("stdout", string(stdout)).
+                               WithField("stderr", string(stderr)).
+                               WithError(err).
+                               Error("error starting crunch-run process")
+                       // Leave uuid in wkr.running, though: it's
+                       // possible the error was just a communication
+                       // failure and the process was in fact
+                       // started.  Wait for next probe to find out.
+                       return
+               }
+               logger.Info("crunch-run process started")
+               wkr.lastUUID = ctr.UUID
+       }()
+       return true
+}
+
+// KillContainer kills the crunch-run process for the given container
+// UUID, if it's running on any worker.
+func (wp *Pool) KillContainer(uuid string) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
+               return
+       }
+       for _, wkr := range wp.workers {
+               if _, ok := wkr.running[uuid]; ok {
+                       go wp.kill(wkr, uuid)
+                       return
+               }
+       }
+       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
+}
+
+func (wp *Pool) kill(wkr *worker, uuid string) {
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Instance":      wkr.instance,
+       })
+       logger.Debug("killing process")
+       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
+       if err != nil {
+               logger.WithFields(logrus.Fields{
+                       "stderr": string(stderr),
+                       "stdout": string(stdout),
+                       "error":  err,
+               }).Warn("kill failed")
+               return
+       }
+       logger.Debug("killing process succeeded")
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wkr.running[uuid]; ok {
+               delete(wkr.running, uuid)
+               wkr.updated = time.Now()
+               go wp.notify()
+       }
+}
+
+func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
+       if reg == nil {
+               reg = prometheus.NewRegistry()
+       }
+       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
+       })
+       reg.MustRegister(wp.mInstances)
+       wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "containers_running",
+               Help:      "Number of containers reported running by cloud VMs.",
+       })
+       reg.MustRegister(wp.mContainersRunning)
+
+       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "vcpus_total",
+               Help:      "Total VCPUs on all cloud VMs.",
+       })
+       reg.MustRegister(wp.mVCPUs)
+       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "vcpus_inuse",
+               Help:      "VCPUs on cloud VMs that are running containers.",
+       })
+       reg.MustRegister(wp.mVCPUsInuse)
+       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "memory_bytes_total",
+               Help:      "Total memory on all cloud VMs.",
+       })
+       reg.MustRegister(wp.mMemory)
+       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "memory_bytes_inuse",
+               Help:      "Memory on cloud VMs that are running containers.",
+       })
+       reg.MustRegister(wp.mMemoryInuse)
+}
+
+func (wp *Pool) updateMetrics() {
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+
+       var alloc, cpu, cpuInuse, mem, memInuse int64
+       for _, wkr := range wp.workers {
+               cpu += int64(wkr.instType.VCPUs)
+               mem += int64(wkr.instType.RAM)
+               if len(wkr.running)+len(wkr.starting) == 0 {
+                       continue
+               }
+               alloc += int64(len(wkr.running) + len(wkr.starting))
+               cpuInuse += int64(wkr.instType.VCPUs)
+               memInuse += int64(wkr.instType.RAM)
+       }
+       wp.mInstances.Set(float64(len(wp.workers)))
+       wp.mContainersRunning.Set(float64(alloc))
+       wp.mVCPUs.Set(float64(cpu))
+       wp.mMemory.Set(float64(mem))
+       wp.mVCPUsInuse.Set(float64(cpuInuse))
+       wp.mMemoryInuse.Set(float64(memInuse))
+}
+
+func (wp *Pool) run() {
+       wp.setupOnce.Do(wp.setup)
+
+       go func() {
+               ch := wp.Subscribe()
+               defer wp.Unsubscribe(ch)
+               for range ch {
+                       wp.updateMetrics()
+               }
+       }()
+
+       go func() {
+               maxPPS := wp.maxProbesPerSecond
+               if maxPPS < 1 {
+                       maxPPS = defaultMaxProbesPerSecond
+               }
+               limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+               defer limitticker.Stop()
+
+               probeticker := time.NewTicker(wp.probeInterval)
+               defer probeticker.Stop()
+
+               workers := []cloud.InstanceID{}
+               for range probeticker.C {
+                       workers = workers[:0]
+                       wp.mtx.Lock()
+                       for id, wkr := range wp.workers {
+                               if wkr.state == StateShutdown || wp.autoShutdown(wkr) {
+                                       continue
+                               }
+                               workers = append(workers, id)
+                       }
+                       wp.mtx.Unlock()
+
+                       for _, id := range workers {
+                               wp.mtx.Lock()
+                               wkr, ok := wp.workers[id]
+                               wp.mtx.Unlock()
+                               if !ok || wkr.state == StateShutdown {
+                                       // Deleted/shutdown while we
+                                       // were probing others
+                                       continue
+                               }
+                               select {
+                               case wkr.probing <- struct{}{}:
+                                       go func() {
+                                               wp.probeAndUpdate(wkr)
+                                               <-wkr.probing
+                                       }()
+                               default:
+                                       wp.logger.WithField("Instance", wkr.instance).Debug("still waiting for last probe to finish")
+                               }
+                               select {
+                               case <-wp.stop:
+                                       return
+                               case <-limitticker.C:
+                               }
+                       }
+               }
+       }()
+
+       timer := time.NewTimer(time.Nanosecond)
+       for {
+               err := wp.getInstancesAndSync()
+               if err != nil {
+                       wp.logger.WithError(err).Warn("sync failed")
+               }
+
+               // Reset timer to desired interval, and ignore the
+               // tick that might have already arrived.
+               timer.Stop()
+               select {
+               case <-timer.C:
+               default:
+               }
+               timer.Reset(wp.syncInterval)
+
+               select {
+               case <-timer.C:
+               case <-wp.stop:
+                       wp.logger.Debug("worker.Pool stopped")
+                       return
+               }
+       }
+}
+
+// caller must have lock.
+func (wp *Pool) autoShutdown(wkr *worker) bool {
+       if len(wkr.running)+len(wkr.starting) > 0 || wkr.state != StateRunning {
+               return false
+       }
+       age := time.Since(wkr.unallocated)
+       if age < wp.timeoutIdle {
+               return false
+       }
+       logger := wp.logger.WithFields(logrus.Fields{
+               "Age":      age,
+               "Instance": wkr.instance,
+       })
+       logger.Info("shutdown idle worker")
+       wp.shutdown(wkr, logger)
+       return true
+}
+
+// Stop synchronizing with the InstanceSet.
+func (wp *Pool) Stop() {
+       wp.setupOnce.Do(wp.setup)
+       close(wp.stop)
+}
+
+// View reports status information for every worker in the pool.
+func (wp *Pool) View() []View {
+       var r []View
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       for _, w := range wp.workers {
+               r = append(r, View{
+                       Instance:             w.instance.String(),
+                       Price:                w.instType.Price,
+                       ArvadosInstanceType:  w.instType.Name,
+                       ProviderInstanceType: w.instType.ProviderType,
+                       LastContainerUUID:    w.lastUUID,
+                       Unallocated:          w.unallocated,
+                       WorkerState:          w.state.String(),
+               })
+       }
+       wp.mtx.Unlock()
+       sort.Slice(r, func(i, j int) bool {
+               return strings.Compare(r[i].Instance, r[j].Instance) < 0
+       })
+       return r
+}
+
+func (wp *Pool) setup() {
+       wp.creating = map[arvados.InstanceType]int{}
+       wp.exited = map[string]time.Time{}
+       wp.workers = map[cloud.InstanceID]*worker{}
+       wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+}
+
+func (wp *Pool) notify() {
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for _, send := range wp.subscribers {
+               select {
+               case send <- struct{}{}:
+               default:
+               }
+       }
+}
+
+func (wp *Pool) getInstancesAndSync() error {
+       wp.setupOnce.Do(wp.setup)
+       wp.logger.Debug("getting instance list")
+       threshold := time.Now()
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       if err != nil {
+               return err
+       }
+       wp.sync(threshold, instances)
+       wp.logger.Debug("sync done")
+       return nil
+}
+
+// Add/remove/update workers based on instances, which was obtained
+// from the instanceSet. However, don't clobber any other updates that
+// already happened after threshold.
+func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
+
+       for _, inst := range instances {
+               itTag := inst.Tags()["InstanceType"]
+               it, ok := wp.instanceTypes[itTag]
+               if !ok {
+                       wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
+                       continue
+               }
+               wp.updateWorker(inst, it, StateUnknown)
+       }
+
+       for id, wkr := range wp.workers {
+               if wkr.updated.After(threshold) {
+                       continue
+               }
+               logger := wp.logger.WithFields(logrus.Fields{
+                       "Instance":    wkr.instance,
+                       "WorkerState": wkr.state,
+               })
+               logger.Info("instance disappeared in cloud")
+               delete(wp.workers, id)
+               go wp.notify()
+       }
+
+       if !wp.loaded {
+               wp.loaded = true
+               wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
+       }
+}
+
+// should be called in a new goroutine
+func (wp *Pool) probeAndUpdate(wkr *worker) {
+       logger := wp.logger.WithField("Instance", wkr.instance)
+       wp.mtx.Lock()
+       updated := wkr.updated
+       booted := wkr.booted
+       wp.mtx.Unlock()
+
+       var (
+               ctrUUIDs []string
+               ok       bool
+               stderr   []byte
+       )
+       if !booted {
+               booted, stderr = wp.probeBooted(wkr)
+               wp.mtx.Lock()
+               if booted && !wkr.booted {
+                       wkr.booted = booted
+                       logger.Info("instance booted")
+               } else {
+                       booted = wkr.booted
+               }
+               wp.mtx.Unlock()
+       }
+       if booted {
+               ctrUUIDs, ok, stderr = wp.probeRunning(wkr)
+       }
+       logger = logger.WithField("stderr", string(stderr))
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if !ok {
+               if wkr.state == StateShutdown {
+                       return
+               }
+               dur := time.Since(wkr.probed)
+               logger := logger.WithFields(logrus.Fields{
+                       "Duration": dur,
+                       "State":    wkr.state,
+               })
+               if wkr.state == StateBooting {
+                       logger.Debug("new instance not responding")
+               } else {
+                       logger.Info("instance not responding")
+               }
+
+               if wkr.state == StateHold {
+                       return
+               }
+
+               label, threshold := "", wp.timeoutProbe
+               if wkr.state == StateBooting {
+                       label, threshold = "new ", wp.timeoutBooting
+               }
+               if dur > threshold {
+                       logger.WithField("Since", wkr.probed).Warnf("%sinstance unresponsive, shutting down", label)
+                       wp.shutdown(wkr, logger)
+               }
+               return
+       }
+
+       updateTime := time.Now()
+       wkr.probed = updateTime
+       if len(ctrUUIDs) > 0 {
+               wkr.busy = updateTime
+               wkr.lastUUID = ctrUUIDs[0]
+       }
+       if wkr.state == StateShutdown || wkr.state == StateHold {
+       } else if booted {
+               if wkr.state != StateRunning {
+                       wkr.state = StateRunning
+                       go wp.notify()
+               }
+       } else {
+               wkr.state = StateBooting
+       }
+
+       if updated != wkr.updated {
+               // Worker was updated (e.g., by starting a new
+               // container) after the probe began. Avoid clobbering
+               // those changes with the probe results.
+               return
+       }
+
+       if len(ctrUUIDs) == 0 && len(wkr.running) > 0 {
+               wkr.unallocated = updateTime
+       }
+       running := map[string]struct{}{}
+       changed := false
+       for _, uuid := range ctrUUIDs {
+               running[uuid] = struct{}{}
+               if _, ok := wkr.running[uuid]; !ok {
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if _, ok := running[uuid]; !ok {
+                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+                       wp.exited[uuid] = updateTime
+                       changed = true
+               }
+       }
+       if changed {
+               wkr.running = running
+               wkr.updated = updateTime
+               go wp.notify()
+       }
+}
+
+func (wp *Pool) probeRunning(wkr *worker) (running []string, ok bool, stderr []byte) {
+       cmd := "crunch-run --list"
+       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       if err != nil {
+               wp.logger.WithFields(logrus.Fields{
+                       "Instance": wkr.instance,
+                       "Command":  cmd,
+                       "stdout":   string(stdout),
+                       "stderr":   string(stderr),
+               }).WithError(err).Warn("probe failed")
+               return nil, false, stderr
+       }
+       stdout = bytes.TrimRight(stdout, "\n")
+       if len(stdout) == 0 {
+               return nil, true, stderr
+       }
+       return strings.Split(string(stdout), "\n"), true, stderr
+}
+
+func (wp *Pool) probeBooted(wkr *worker) (ok bool, stderr []byte) {
+       cmd := wp.bootProbeCommand
+       if cmd == "" {
+               cmd = "true"
+       }
+       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       logger := wp.logger.WithFields(logrus.Fields{
+               "Instance": wkr.instance,
+               "Command":  cmd,
+               "stdout":   string(stdout),
+               "stderr":   string(stderr),
+       })
+       if err != nil {
+               logger.WithError(err).Debug("boot probe failed")
+               return false, stderr
+       }
+       logger.Info("boot probe succeeded")
+       return true, stderr
+}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
new file mode 100644 (file)
index 0000000..cf4bff1
--- /dev/null
@@ -0,0 +1,124 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+const GiB arvados.ByteSize = 1 << 30
+
+var _ = check.Suite(&PoolSuite{})
+
+type PoolSuite struct{}
+
+func (suite *PoolSuite) SetUpSuite(c *check.C) {
+       logrus.StandardLogger().SetLevel(logrus.DebugLevel)
+}
+
+func (suite *PoolSuite) TestStartContainer(c *check.C) {
+       // TODO: use an instanceSet stub with an SSH server
+       c.Fail()
+}
+
+func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
+       // TODO: use an instanceSet stub with an SSH server
+       c.Fail()
+}
+
+func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
+       lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
+       type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+       type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+       pool := &Pool{
+               logger:      logrus.StandardLogger(),
+               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               instanceSet: lameInstanceSet,
+               instanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+                       type2.Name: type2,
+               },
+       }
+       notify := pool.Subscribe()
+       defer pool.Unsubscribe(notify)
+       notify2 := pool.Subscribe()
+       defer pool.Unsubscribe(notify2)
+
+       c.Check(pool.Unallocated()[type1], check.Equals, 0)
+       c.Check(pool.Unallocated()[type2], check.Equals, 0)
+       pool.Create(type2)
+       pool.Create(type1)
+       pool.Create(type2)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(pool.Unallocated()[type2], check.Equals, 2)
+       // Unblock the pending Create calls and (before calling Sync!)
+       // wait for the pool to process the returned instances.
+       go lameInstanceSet.Release(3)
+       suite.wait(c, pool, notify, func() bool {
+               list, err := lameInstanceSet.Instances(nil)
+               return err == nil && len(list) == 3
+       })
+
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(pool.Unallocated()[type2], check.Equals, 2)
+       pool.getInstancesAndSync()
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(pool.Unallocated()[type2], check.Equals, 2)
+
+       c.Check(pool.Shutdown(type2), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
+       })
+       c.Check(pool.Shutdown(type2), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
+       })
+       c.Check(pool.Shutdown(type2), check.Equals, false)
+       for {
+               // Consume any waiting notifications to ensure the
+               // next one we get is from Shutdown.
+               select {
+               case <-notify:
+                       continue
+               default:
+               }
+               break
+       }
+       c.Check(pool.Shutdown(type1), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+       })
+       select {
+       case <-notify2:
+       case <-time.After(time.Second):
+               c.Error("notify did not receive")
+       }
+       go lameInstanceSet.Release(3) // unblock Destroy calls
+}
+
+func (suite *PoolSuite) wait(c *check.C, pool Pool, notify <-chan struct{}, ready func() bool) {
+       timeout := time.NewTimer(time.Second).C
+       for !ready() {
+               select {
+               case <-notify:
+                       continue
+               case <-timeout:
+               }
+               break
+       }
+       c.Check(ready(), check.Equals, true)
+}
+
+type stubExecutor struct{}
+
+func (*stubExecutor) SetInstance(cloud.Instance) {}
+
+func (*stubExecutor) Execute(cmd string, stdin []byte) ([]byte, []byte, error) { return nil, nil, nil }
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
new file mode 100644 (file)
index 0000000..7828d4f
--- /dev/null
@@ -0,0 +1,45 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "time"
+)
+
+// State indicates whether a worker is available to do work, and (if
+// not) whether/when it is expected to become ready.
+type State int
+
+const (
+       StateUnknown  State = iota // might be running a container already
+       StateBooting               // instance is booting
+       StateRunning               // instance is running
+       StateShutdown              // worker has stopped monitoring the instance
+       StateHold                  // running, but not available to run new containers
+)
+
+const (
+       // TODO: configurable
+       maxPingFailTime = 10 * time.Minute
+)
+
+var stateString = map[State]string{
+       StateUnknown:  "unknown",
+       StateBooting:  "booting",
+       StateRunning:  "running",
+       StateShutdown: "shutdown",
+       StateHold:     "hold",
+}
+
+// String implements fmt.Stringer.
+func (s State) String() string {
+       return stateString[s]
+}
+
+// MarshalText implements encoding.TextMarshaler so a JSON encoding of
+// map[State]anything uses the state's string representation.
+func (s State) MarshalText() ([]byte, error) {
+       return []byte(stateString[s]), nil
+}
index e2e9907d5d115ebb61a53ae873249511b3732a50..bfa86abf6a48a1fcf30eda2618bdfaf28b0a2efb 100644 (file)
@@ -60,6 +60,8 @@ type Cluster struct {
        ManagementToken    string
        NodeProfiles       map[string]NodeProfile
        InstanceTypes      InstanceTypeMap
+       CloudVMs           CloudVMs
+       Dispatch           Dispatch
        HTTPRequestTimeout Duration
        RemoteClusters     map[string]RemoteCluster
        PostgreSQL         PostgreSQL
@@ -95,6 +97,50 @@ type InstanceType struct {
        Preemptible  bool
 }
 
+type Dispatch struct {
+       // PEM encoded SSH key (RSA, DSA, or ECDSA) able to log in to
+       // cloud VMs.
+       PrivateKey []byte
+
+       // Max time for workers to come up before abandoning stale
+       // locks from previous run
+       StaleLockTimeout Duration
+
+       // Interval between queue polls
+       PollInterval Duration
+
+       // Interval between probes to each worker
+       ProbeInterval Duration
+
+       // Maximum total worker probes per second
+       MaxProbesPerSecond int
+}
+
+type CloudVMs struct {
+       // Shell command that exits zero IFF the VM is fully booted
+       // and ready to run containers, e.g., "mount | grep
+       // /encrypted-tmp"
+       BootProbeCommand string
+       SyncInterval     Duration
+
+       // Maximum idle time before automatic shutdown
+       TimeoutIdle Duration
+
+       // Maximum booting time before automatic shutdown
+       TimeoutBooting Duration
+
+       // Maximum time with no successful probes before automatic shutdown
+       TimeoutProbe Duration
+
+       // Time after shutdown to retry shutdown
+       TimeoutShutdown Duration
+
+       ImageID string
+
+       Driver           string
+       DriverParameters map[string]interface{}
+}
+
 type InstanceTypeMap map[string]InstanceType
 
 var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
@@ -159,45 +205,48 @@ func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
 }
 
 type NodeProfile struct {
-       Controller  SystemServiceInstance `json:"arvados-controller"`
-       Health      SystemServiceInstance `json:"arvados-health"`
-       Keepbalance SystemServiceInstance `json:"keep-balance"`
-       Keepproxy   SystemServiceInstance `json:"keepproxy"`
-       Keepstore   SystemServiceInstance `json:"keepstore"`
-       Keepweb     SystemServiceInstance `json:"keep-web"`
-       Nodemanager SystemServiceInstance `json:"arvados-node-manager"`
-       RailsAPI    SystemServiceInstance `json:"arvados-api-server"`
-       Websocket   SystemServiceInstance `json:"arvados-ws"`
-       Workbench   SystemServiceInstance `json:"arvados-workbench"`
+       Controller    SystemServiceInstance `json:"arvados-controller"`
+       Health        SystemServiceInstance `json:"arvados-health"`
+       Keepbalance   SystemServiceInstance `json:"keep-balance"`
+       Keepproxy     SystemServiceInstance `json:"keepproxy"`
+       Keepstore     SystemServiceInstance `json:"keepstore"`
+       Keepweb       SystemServiceInstance `json:"keep-web"`
+       Nodemanager   SystemServiceInstance `json:"arvados-node-manager"`
+       DispatchCloud SystemServiceInstance `json:"arvados-dispatch-cloud"`
+       RailsAPI      SystemServiceInstance `json:"arvados-api-server"`
+       Websocket     SystemServiceInstance `json:"arvados-ws"`
+       Workbench     SystemServiceInstance `json:"arvados-workbench"`
 }
 
 type ServiceName string
 
 const (
-       ServiceNameRailsAPI    ServiceName = "arvados-api-server"
-       ServiceNameController  ServiceName = "arvados-controller"
-       ServiceNameNodemanager ServiceName = "arvados-node-manager"
-       ServiceNameWorkbench   ServiceName = "arvados-workbench"
-       ServiceNameWebsocket   ServiceName = "arvados-ws"
-       ServiceNameKeepbalance ServiceName = "keep-balance"
-       ServiceNameKeepweb     ServiceName = "keep-web"
-       ServiceNameKeepproxy   ServiceName = "keepproxy"
-       ServiceNameKeepstore   ServiceName = "keepstore"
+       ServiceNameRailsAPI      ServiceName = "arvados-api-server"
+       ServiceNameController    ServiceName = "arvados-controller"
+       ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+       ServiceNameNodemanager   ServiceName = "arvados-node-manager"
+       ServiceNameWorkbench     ServiceName = "arvados-workbench"
+       ServiceNameWebsocket     ServiceName = "arvados-ws"
+       ServiceNameKeepbalance   ServiceName = "keep-balance"
+       ServiceNameKeepweb       ServiceName = "keep-web"
+       ServiceNameKeepproxy     ServiceName = "keepproxy"
+       ServiceNameKeepstore     ServiceName = "keepstore"
 )
 
 // ServicePorts returns the configured listening address (or "" if
 // disabled) for each service on the node.
 func (np *NodeProfile) ServicePorts() map[ServiceName]string {
        return map[ServiceName]string{
-               ServiceNameRailsAPI:    np.RailsAPI.Listen,
-               ServiceNameController:  np.Controller.Listen,
-               ServiceNameNodemanager: np.Nodemanager.Listen,
-               ServiceNameWorkbench:   np.Workbench.Listen,
-               ServiceNameWebsocket:   np.Websocket.Listen,
-               ServiceNameKeepbalance: np.Keepbalance.Listen,
-               ServiceNameKeepweb:     np.Keepweb.Listen,
-               ServiceNameKeepproxy:   np.Keepproxy.Listen,
-               ServiceNameKeepstore:   np.Keepstore.Listen,
+               ServiceNameRailsAPI:      np.RailsAPI.Listen,
+               ServiceNameController:    np.Controller.Listen,
+               ServiceNameDispatchCloud: np.DispatchCloud.Listen,
+               ServiceNameNodemanager:   np.Nodemanager.Listen,
+               ServiceNameWorkbench:     np.Workbench.Listen,
+               ServiceNameWebsocket:     np.Websocket.Listen,
+               ServiceNameKeepbalance:   np.Keepbalance.Listen,
+               ServiceNameKeepweb:       np.Keepweb.Listen,
+               ServiceNameKeepproxy:     np.Keepproxy.Listen,
+               ServiceNameKeepstore:     np.Keepstore.Listen,
        }
 }
 
index 2622c137030aada559bfc47f5768e7d918b6d816..def4e33cbb5f771cfbe6edbcec0d9704bab518af 100644 (file)
@@ -18,10 +18,11 @@ type Container struct {
        Mounts               map[string]Mount     `json:"mounts"`
        Output               string               `json:"output"`
        OutputPath           string               `json:"output_path"`
-       Priority             int                  `json:"priority"`
+       Priority             int64                `json:"priority"`
        RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
        State                ContainerState       `json:"state"`
        SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
+       ExitCode             int                  `json:"exit_code"`
 }
 
 // Container is an arvados#container resource.
diff --git a/services/crunch-run/background.go b/services/crunch-run/background.go
new file mode 100644 (file)
index 0000000..3dbfcfc
--- /dev/null
@@ -0,0 +1,192 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "os"
+       "os/exec"
+       "path/filepath"
+       "strings"
+       "syscall"
+       "time"
+)
+
+var (
+       lockdir    = "/var/run"
+       lockprefix = "crunch-run-"
+       locksuffix = ".lock"
+)
+
+// procinfo is saved in each process's lockfile.
+type procinfo struct {
+       UUID   string
+       PID    int
+       Stdout string
+       Stderr string
+}
+
+// Detach acquires a lock for the given uuid, and starts the current
+// program as a child process (with -nodetach prepended to the given
+// arguments so the child knows not to detach again). The lock is
+// passed along to the child process.
+func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
+       return exitcode(stderr, detach(uuid, args, stdout, stderr))
+}
+func detach(uuid string, args []string, stdout, stderr io.Writer) error {
+       lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+       if err != nil {
+               return err
+       }
+       defer lockfile.Close()
+       err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+       if err != nil {
+               return err
+       }
+       lockfile.Truncate(0)
+
+       outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-")
+       if err != nil {
+               return err
+       }
+       defer outfile.Close()
+       errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-")
+       if err != nil {
+               os.Remove(outfile.Name())
+               return err
+       }
+       defer errfile.Close()
+
+       cmd := exec.Command(args[0], append([]string{"-nodetach"}, args[1:]...)...)
+       cmd.Stdout = outfile
+       cmd.Stderr = errfile
+       cmd.ExtraFiles = []*os.File{lockfile}
+       err = cmd.Start()
+       if err != nil {
+               os.Remove(outfile.Name())
+               os.Remove(errfile.Name())
+               return err
+       }
+
+       w := io.MultiWriter(stdout, lockfile)
+       err = json.NewEncoder(w).Encode(procinfo{
+               PID:    cmd.Process.Pid,
+               Stdout: outfile.Name(),
+               Stderr: errfile.Name(),
+       })
+       if err != nil {
+               os.Remove(outfile.Name())
+               os.Remove(errfile.Name())
+               return err
+       }
+       return nil
+}
+
+// Kill finds the crunch-run process corresponding to the given uuid,
+// and sends the given signal to it. It then waits up to 1 second for
+// the process to die. It returns 0 if the process is successfully
+// killed or didn't exist in the first place.
+func Kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
+       return exitcode(stderr, kill(uuid, signal, stdout, stderr))
+}
+
+func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
+       path := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
+       f, err := os.Open(path)
+       if os.IsNotExist(err) {
+               return nil
+       } else if err != nil {
+               return err
+       }
+       defer f.Close()
+
+       var pi procinfo
+       err = json.NewDecoder(f).Decode(&pi)
+       if err != nil {
+               return fmt.Errorf("%s: %s\n", path, err)
+       }
+
+       if pi.UUID != uuid || pi.PID == 0 {
+               return fmt.Errorf("%s: bogus procinfo: %+v", path, pi)
+       }
+
+       proc, err := os.FindProcess(pi.PID)
+       if err != nil {
+               return err
+       }
+
+       err = proc.Signal(signal)
+       for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
+               err = proc.Signal(syscall.Signal(0))
+       }
+       if err == nil {
+               return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
+       }
+       fmt.Fprintln(stderr, "pid %d: %s", pi.PID, err)
+       return nil
+}
+
+// List UUIDs of active crunch-run processes.
+func List(stdout, stderr io.Writer) int {
+       return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
+               if info.IsDir() {
+                       return filepath.SkipDir
+               }
+               if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+                       return nil
+               }
+               if info.Size() == 0 {
+                       // race: process has opened/locked but hasn't yet written pid/uuid
+                       return nil
+               }
+
+               f, err := os.Open(path)
+               if err != nil {
+                       return nil
+               }
+               defer f.Close()
+
+               // TODO: Do this check without risk of disrupting lock
+               // acquisition during races, e.g., by connecting to a
+               // unix socket or checking /proc/$pid/fd/$n ->
+               // lockfile.
+               err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+               if err == nil {
+                       // lockfile is stale
+                       err := os.Remove(path)
+                       if err != nil {
+                               fmt.Fprintln(stderr, err)
+                       }
+                       return nil
+               }
+
+               var pi procinfo
+               err = json.NewDecoder(f).Decode(&pi)
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s: %s\n", path, err)
+                       return nil
+               }
+               if pi.UUID == "" || pi.PID == 0 {
+                       fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi)
+                       return nil
+               }
+
+               fmt.Fprintln(stdout, pi.UUID)
+               return nil
+       }))
+}
+
+// If err is nil, return 0 ("success"); otherwise, print err to stderr
+// and return 1.
+func exitcode(stderr io.Writer, err error) int {
+       if err != nil {
+               fmt.Fprintln(stderr, err)
+               return 1
+       }
+       return 0
+}
index 44560b80a0973e78488b2bf2dbb898877d98891f..74ab77ab0f82c465f39804d979b2358ab53588b0 100644 (file)
@@ -1716,6 +1716,10 @@ func main() {
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+       detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
+       kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+       list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
        enableNetwork := flag.String("container-enable-networking", "default",
                `Specify if networking should be enabled for container.  One of 'default', 'always':
        default: only enable networking if container requests it.
@@ -1727,8 +1731,29 @@ func main() {
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        getVersion := flag.Bool("version", false, "Print version information and exit.")
        checkContainerd := flag.Duration("check-containerd", 60*time.Second, "Periodic check if (docker-)containerd is running (use 0s to disable).")
+
+       detached := false
+       if len(os.Args) > 1 && os.Args[1] == "-detached" {
+               // This process was invoked by a parent process, which
+               // has passed along its own arguments, including
+               // -detach, after the leading -detached flag.  Strip
+               // the leading -detached flag (it's not recognized by
+               // flag.Parse()) ... and remember not to detach all
+               // over again in this process.
+               os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+               detached = true
+       }
        flag.Parse()
 
+       switch {
+       case *detach && !detached:
+               os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+       case *kill >= 0:
+               os.Exit(Kill(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+       case *list:
+               os.Exit(List(os.Stdout, os.Stderr))
+       }
+
        // Print version information if requested
        if *getVersion {
                fmt.Printf("crunch-run %s\n", version)
@@ -1736,6 +1761,7 @@ func main() {
        }
 
        log.Printf("crunch-run %s started", version)
+       time.Sleep(*sleep)
 
        containerId := flag.Arg(0)
 
index aa6b2d773dfb1e47969794dd816a81b179539163..aee25beabdf98e6ccd86bbb816e49ec19d41f781 100644 (file)
                        "revision": "b8bc1bf767474819792c23f32d8286a45736f1c6",
                        "revisionTime": "2016-12-03T19:45:07Z"
                },
+               {
+                       "checksumSHA1": "ewGq4nGalpCQOHcmBTdAEQx1wW0=",
+                       "path": "github.com/mitchellh/mapstructure",
+                       "revision": "bb74f1db0675b241733089d5a1faa5dd8b0ef57b",
+                       "revisionTime": "2018-05-11T14:21:26Z"
+               },
                {
                        "checksumSHA1": "OFNit1Qx2DdWhotfREKodDNUwCM=",
                        "path": "github.com/opencontainers/go-digest",