14360: Merge branch 'master'
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 15 Nov 2018 21:24:48 +0000 (16:24 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 15 Nov 2018 21:24:48 +0000 (16:24 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

47 files changed:
.licenseignore
build/run-build-packages.sh
build/run-tests.sh
cmd/arvados-server/cmd.go
cmd/arvados-server/crunch-dispatch-cloud.service [new file with mode: 0644]
lib/cloud/interfaces.go [new file with mode: 0644]
lib/cmd/cmd.go
lib/dispatchcloud/cmd.go [new file with mode: 0644]
lib/dispatchcloud/container/queue.go [new file with mode: 0644]
lib/dispatchcloud/dispatcher.go [new file with mode: 0644]
lib/dispatchcloud/dispatcher_test.go [new file with mode: 0644]
lib/dispatchcloud/driver.go [new file with mode: 0644]
lib/dispatchcloud/instance_set_proxy.go [new file with mode: 0644]
lib/dispatchcloud/logger.go [new file with mode: 0644]
lib/dispatchcloud/node_size.go
lib/dispatchcloud/readme.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/fix_stale_locks.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/gocheck_test.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/interfaces.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/run_queue.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/run_queue_test.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/scheduler.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/sync.go [new file with mode: 0644]
lib/dispatchcloud/ssh_executor/executor.go [new file with mode: 0644]
lib/dispatchcloud/ssh_executor/executor_test.go [new file with mode: 0644]
lib/dispatchcloud/test/doc.go [new file with mode: 0644]
lib/dispatchcloud/test/fixtures.go [new file with mode: 0644]
lib/dispatchcloud/test/lame_instance_set.go [new file with mode: 0644]
lib/dispatchcloud/test/queue.go [new file with mode: 0644]
lib/dispatchcloud/test/ssh_service.go [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_dispatch [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_dispatch.pub [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_vm [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_vm.pub [new file with mode: 0644]
lib/dispatchcloud/test/stub_driver.go [new file with mode: 0644]
lib/dispatchcloud/worker/gocheck_test.go [new file with mode: 0644]
lib/dispatchcloud/worker/pool.go [new file with mode: 0644]
lib/dispatchcloud/worker/pool_test.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go [new file with mode: 0644]
sdk/go/arvados/config.go
sdk/go/arvados/container.go
sdk/go/health/aggregator_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/node_type.go [new file with mode: 0644]
services/crunch-run/background.go [new file with mode: 0644]
services/crunch-run/crunchrun.go
vendor/vendor.json

index 51a1e7cbd2f0aabca972527475630923f1f1ef75..a0127cfa30f09a986b4dad176ae9358e573209fc 100644 (file)
@@ -69,3 +69,4 @@ sdk/R/NAMESPACE
 sdk/R/.Rbuildignore
 sdk/R/ArvadosR.Rproj
 *.Rd
+lib/dispatchcloud/test/sshkey_*
index fe01e4b2ef2528222dfff7d097729b9b5c773b30..1b486b40b6ba8e3beb7899b39f0bb1892234bb31 100755 (executable)
@@ -294,6 +294,9 @@ package_go_binary cmd/arvados-server arvados-server \
     "Arvados server daemons"
 package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
+# No package until #14325
+#package_go_binary cmd/arvados-server crunch-dispatch-cloud \
+#    "Arvados cluster cloud dispatch"
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
index cd44347cbabed54d1d19dc8373383160fde7dc1a..cb44372566f8eb36ec6163f9108c97e464d6fca8 100755 (executable)
@@ -77,6 +77,10 @@ lib/cmd
 lib/controller
 lib/crunchstat
 lib/dispatchcloud
+lib/dispatchcloud/container
+lib/dispatchcloud/scheduler
+lib/dispatchcloud/ssh_executor
+lib/dispatchcloud/worker
 services/api
 services/arv-git-httpd
 services/crunchstat
@@ -926,6 +930,10 @@ gostuff=(
     lib/controller
     lib/crunchstat
     lib/dispatchcloud
+    lib/dispatchcloud/container
+    lib/dispatchcloud/scheduler
+    lib/dispatchcloud/ssh_executor
+    lib/dispatchcloud/worker
     sdk/go/arvados
     sdk/go/arvadosclient
     sdk/go/auth
index 1af3745df0c3a4c54d296e848532c1d19c124e43..cd15d25dda760a41c427b8bfd4b621fb43e2130a 100644 (file)
@@ -9,6 +9,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/controller"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud"
 )
 
 var (
@@ -18,7 +19,8 @@ var (
                "-version":  cmd.Version(version),
                "--version": cmd.Version(version),
 
-               "controller": controller.Command,
+               "controller":     controller.Command,
+               "dispatch-cloud": dispatchcloud.Command,
        })
 )
 
diff --git a/cmd/arvados-server/crunch-dispatch-cloud.service b/cmd/arvados-server/crunch-dispatch-cloud.service
new file mode 100644 (file)
index 0000000..f8d71c9
--- /dev/null
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/crunch-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644 (file)
index 0000000..e3a0725
--- /dev/null
@@ -0,0 +1,179 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+       "io"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by an InstanceSet when the
+// cloud service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+       // Time before which the caller should expect requests to
+       // fail.
+       EarliestRetry() time.Time
+       error
+}
+
+// A QuotaError should be returned by an InstanceSet when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+       // If true, don't create more instances until some existing
+       // instances are destroyed. If false, don't handle the error
+       // as a quota error.
+       IsQuotaError() bool
+       error
+}
+
+type InstanceSetID string
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// An Executor executes commands on an ExecutorTarget.
+type Executor interface {
+       // Update the set of private keys used to authenticate to
+       // targets.
+       SetSigners(...ssh.Signer)
+
+       // Set the target used for subsequent command executions.
+       SetTarget(ExecutorTarget)
+
+       // Return the current target.
+       Target() ExecutorTarget
+
+       // Execute a shell command and return the resulting stdout and
+       // stderr. stdin can be nil.
+       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+}
+
+// An ExecutorTarget is a remote command execution service.
+type ExecutorTarget interface {
+       // SSH server hostname or IP address, or empty string if
+       // unknown while instance is booting.
+       Address() string
+
+       // Return nil if the given public key matches the instance's
+       // SSH server key. If the provided Dialer is not nil,
+       // VerifyHostKey can use it to make outgoing network
+       // connections from the instance -- e.g., to use the cloud's
+       // "this instance's metadata" API.
+       VerifyHostKey(ssh.PublicKey, *ssh.Client) error
+}
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+       ExecutorTarget
+
+       // ID returns the provider's instance ID. It must be stable
+       // for the life of the instance.
+       ID() InstanceID
+
+       // String typically returns the cloud-provided instance ID.
+       String() string
+
+       // Cloud provider's "instance type" ID. Matches a ProviderType
+       // in the cluster's InstanceTypes configuration.
+       ProviderType() string
+
+       // Get current tags
+       Tags() InstanceTags
+
+       // Replace tags with the given tags
+       SetTags(InstanceTags) error
+
+       // Shut down the node
+       Destroy() error
+}
+
+// An InstanceSet manages a set of VM instances created by an elastic
+// cloud provider like AWS, GCE, or Azure.
+//
+// All public methods of an InstanceSet, and all public methods of the
+// instances it returns, are goroutine safe.
+type InstanceSet interface {
+       // Create a new instance. If supported by the driver, add the
+       // provided public key to /root/.ssh/authorized_keys.
+       //
+       // The returned error should implement RateLimitError and
+       // QuotaError where applicable.
+       Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+       // Return all instances, including ones that are booting or
+       // shutting down. Optionally, filter out nodes that don't have
+       // all of the given InstanceTags (the caller will ignore these
+       // anyway).
+       //
+       // An instance returned by successive calls to Instances() may
+       // -- but does not need to -- be represented by the same
+       // Instance object each time. Thus, the caller is responsible
+       // for de-duplicating the returned instances by comparing the
+       // InstanceIDs returned by the instances' ID() methods.
+       Instances(InstanceTags) ([]Instance, error)
+
+       // Stop any background tasks and release other resources.
+       Stop()
+}
+
+// A Driver returns an InstanceSet that uses the given InstanceSetID
+// and driver-dependent configuration parameters.
+//
+// The supplied id will be of the form "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+// where each z can be any alphanum. The returned InstanceSet must use
+// this id to tag long-lived cloud resources that it creates, and must
+// assume control of any existing resources that are tagged with the
+// same id. Tagging can be accomplished by including the ID in
+// resource names, using the cloud provider's tagging feature, or any
+// other mechanism. The tags must be visible to another instance of
+// the same driver running on a different host.
+//
+// The returned InstanceSet must ignore existing resources that are
+// visible but not tagged with the given id, except that it should log
+// a summary of such resources -- only once -- when it starts
+// up. Thus, two identically configured InstanceSets running on
+// different hosts with different ids should log about the existence
+// of each other's resources at startup, but will not interfere with
+// each other.
+//
+// Example:
+//
+//     type exampleInstanceSet struct {
+//             ownID     string
+//             AccessKey string
+//     }
+//
+//     type exampleDriver struct {}
+//
+//     func (*exampleDriver) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+//             var is exampleInstanceSet
+//             if err := mapstructure.Decode(config, &is); err != nil {
+//                     return nil, err
+//             }
+//             is.ownID = id
+//             return &is, nil
+//     }
+//
+//     var _ = registerCloudDriver("example", &exampleDriver{})
+type Driver interface {
+       InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)
+}
+
+// DriverFunc makes a Driver using the provided function as its
+// InstanceSet method. This is similar to http.HandlerFunc.
+func DriverFunc(fn func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)) Driver {
+       return driverFunc(fn)
+}
+
+type driverFunc func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)
+
+func (df driverFunc) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+       return df(config, id)
+}
index 8c65cf7acf1b6dd7bc02660464be06ea07cc3daa..9292ef7e5ff5b3afb6012833299d9f89a7ea346c 100644 (file)
@@ -36,8 +36,9 @@ func (v Version) RunCommand(prog string, args []string, stdin io.Reader, stdout,
        return 0
 }
 
-// Multi is a Handler that looks up its first argument in a map, and
-// invokes the resulting Handler with the remaining args.
+// Multi is a Handler that looks up its first argument in a map (after
+// stripping any "arvados-" or "crunch-" prefix), and invokes the
+// resulting Handler with the remaining args.
 //
 // Example:
 //
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644 (file)
index 0000000..92948fb
--- /dev/null
@@ -0,0 +1,19 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "git.curoverse.com/arvados.git/lib/cmd"
+       "git.curoverse.com/arvados.git/lib/service"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
+       d := &dispatcher{Cluster: cluster}
+       go d.Start()
+       return d
+}
diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go
new file mode 100644 (file)
index 0000000..432f4d4
--- /dev/null
@@ -0,0 +1,378 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package container
+
+import (
+       "io"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
+
+// An APIClient performs Arvados API requests. It is typically an
+// *arvados.Client.
+type APIClient interface {
+       RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
+}
+
+// A QueueEnt is an entry in the queue, consisting of a container
+// record and the instance type that should be used to run it.
+type QueueEnt struct {
+       // The container to run. Only the UUID, State, Priority, and
+       // RuntimeConstraints fields are populated.
+       Container    arvados.Container
+       InstanceType arvados.InstanceType
+}
+
+// String implements fmt.Stringer by returning the queued container's
+// UUID.
+func (c *QueueEnt) String() string {
+       return c.Container.UUID
+}
+
+// A Queue is an interface to an Arvados cluster's container
+// database. It presents only the containers that are eligible to be
+// run by, are already being run by, or have recently been run by the
+// present dispatcher.
+//
+// The Entries, Get, and Forget methods do not block: they return
+// immediately, using cached data.
+//
+// The updating methods (Cancel, Lock, Unlock, Update) do block: they
+// return only after the operation has completed.
+//
+// A Queue's Update method should be called periodically to keep the
+// cache up to date.
+type Queue struct {
+       logger     logrus.FieldLogger
+       reg        *prometheus.Registry
+       chooseType typeChooser
+       client     APIClient
+
+       auth    *arvados.APIClientAuthorization
+       current map[string]QueueEnt
+       updated time.Time
+       mtx     sync.Mutex
+
+       // Methods that modify the Queue (like Lock) add the affected
+       // container UUIDs to dontupdate. When applying a batch of
+       // updates received from the network, anything appearing in
+       // dontupdate is skipped, in case the received update has
+       // already been superseded by the locally initiated change.
+       // When no network update is in progress, this protection is
+       // not needed, and dontupdate is nil.
+       dontupdate map[string]struct{}
+
+       // active notification subscribers (see Subscribe)
+       subscribers map[<-chan struct{}]chan struct{}
+}
+
+// NewQueue returns a new Queue. When a new container appears in the
+// Arvados cluster's queue during Update, chooseType will be called to
+// assign an appropriate arvados.InstanceType for the queue entry.
+func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
+       return &Queue{
+               logger:      logger,
+               reg:         reg,
+               chooseType:  chooseType,
+               client:      client,
+               current:     map[string]QueueEnt{},
+               subscribers: map[<-chan struct{}]chan struct{}{},
+       }
+}
+
+// Subscribe returns a channel that becomes ready to receive when an
+// entry in the Queue is updated.
+//
+//     ch := q.Subscribe()
+//     defer q.Unsubscribe(ch)
+//     for range ch {
+//             // ...
+//     }
+func (cq *Queue) Subscribe() <-chan struct{} {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       ch := make(chan struct{}, 1)
+       cq.subscribers[ch] = ch
+       return ch
+}
+
+// Unsubscribe stops sending updates to the given channel. See
+// Subscribe.
+func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       delete(cq.subscribers, ch)
+}
+
+// Caller must have lock.
+func (cq *Queue) notify() {
+       for _, ch := range cq.subscribers {
+               select {
+               case ch <- struct{}{}:
+               default:
+               }
+       }
+}
+
+// Forget drops the specified container from the cache. It should be
+// called on finalized containers to avoid leaking memory over
+// time. It is a no-op if the indicated container is not in a
+// finalized state.
+func (cq *Queue) Forget(uuid string) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       ctr := cq.current[uuid].Container
+       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+               delete(cq.current, uuid)
+       }
+}
+
+// Get returns the (partial) Container record for the specified
+// container. Like a map lookup, its second return value is false if
+// the specified container is not in the Queue.
+func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       if ctr, ok := cq.current[uuid]; !ok {
+               return arvados.Container{}, false
+       } else {
+               return ctr.Container, true
+       }
+}
+
+// Entries returns all cache entries, keyed by container UUID.
+//
+// The returned threshold indicates the maximum age of any cached data
+// returned in the map. This makes it possible for a scheduler to
+// determine correctly the outcome of a remote process that updates
+// container state. It must first wait for the remote process to exit,
+// then wait for the Queue to start and finish its next Update --
+// i.e., it must wait until threshold > timeProcessExited.
+func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       entries = make(map[string]QueueEnt, len(cq.current))
+       for uuid, ctr := range cq.current {
+               entries[uuid] = ctr
+       }
+       threshold = cq.updated
+       return
+}
+
+// Update refreshes the cache from the Arvados API. It adds newly
+// queued containers, and updates the state of previously queued
+// containers.
+func (cq *Queue) Update() error {
+       cq.mtx.Lock()
+       cq.dontupdate = map[string]struct{}{}
+       updateStarted := time.Now()
+       cq.mtx.Unlock()
+
+       next, err := cq.poll()
+       if err != nil {
+               return err
+       }
+
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       for uuid, ctr := range next {
+               if _, keep := cq.dontupdate[uuid]; keep {
+                       continue
+               }
+               if cur, ok := cq.current[uuid]; !ok {
+                       cq.addEnt(uuid, *ctr)
+               } else {
+                       cur.Container = *ctr
+                       cq.current[uuid] = cur
+               }
+       }
+       for uuid := range cq.current {
+               if _, keep := cq.dontupdate[uuid]; keep {
+                       continue
+               } else if _, keep = next[uuid]; keep {
+                       continue
+               } else {
+                       delete(cq.current, uuid)
+               }
+       }
+       cq.dontupdate = nil
+       cq.updated = updateStarted
+       cq.notify()
+       return nil
+}
+
+func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
+       it, err := cq.chooseType(&ctr)
+       if err != nil {
+               // FIXME: throttle warnings, cancel after timeout
+               cq.logger.Warnf("cannot run %s", &ctr)
+               return
+       }
+       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+}
+
+// Lock acquires the dispatch lock for the given container.
+func (cq *Queue) Lock(uuid string) error {
+       return cq.apiUpdate(uuid, "lock")
+}
+
+// Unlock releases the dispatch lock for the given container.
+func (cq *Queue) Unlock(uuid string) error {
+       return cq.apiUpdate(uuid, "unlock")
+}
+
+// Cancel cancels the given container.
+func (cq *Queue) Cancel(uuid string) error {
+       err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+               "container": {"state": arvados.ContainerStateCancelled},
+       })
+       if err != nil {
+               return err
+       }
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       cq.notify()
+       return nil
+}
+
+func (cq *Queue) apiUpdate(uuid, action string) error {
+       var resp arvados.Container
+       err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+       if err != nil {
+               return err
+       }
+
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       if cq.dontupdate != nil {
+               cq.dontupdate[uuid] = struct{}{}
+       }
+       if ent, ok := cq.current[uuid]; !ok {
+               cq.addEnt(uuid, resp)
+       } else {
+               ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+               cq.current[uuid] = ent
+       }
+       cq.notify()
+       return nil
+}
+
+func (cq *Queue) poll() (map[string]*arvados.Container, error) {
+       cq.mtx.Lock()
+       size := len(cq.current)
+       auth := cq.auth
+       cq.mtx.Unlock()
+
+       if auth == nil {
+               auth = &arvados.APIClientAuthorization{}
+               err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
+               if err != nil {
+                       return nil, err
+               }
+               cq.mtx.Lock()
+               cq.auth = auth
+               cq.mtx.Unlock()
+       }
+
+       next := make(map[string]*arvados.Container, size)
+       apply := func(updates []arvados.Container) {
+               for _, upd := range updates {
+                       if next[upd.UUID] == nil {
+                               next[upd.UUID] = &arvados.Container{}
+                       }
+                       *next[upd.UUID] = upd
+               }
+       }
+       selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
+       limitParam := 1000
+
+       mine, err := cq.fetchAll(arvados.ResourceListParams{
+               Select:  selectParam,
+               Order:   "uuid",
+               Limit:   &limitParam,
+               Count:   "none",
+               Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
+       })
+       if err != nil {
+               return nil, err
+       }
+       apply(mine)
+
+       avail, err := cq.fetchAll(arvados.ResourceListParams{
+               Select:  selectParam,
+               Order:   "uuid",
+               Limit:   &limitParam,
+               Count:   "none",
+               Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
+       })
+       if err != nil {
+               return nil, err
+       }
+       apply(avail)
+
+       var missing []string
+       cq.mtx.Lock()
+       for uuid, ent := range cq.current {
+               if next[uuid] == nil &&
+                       ent.Container.State != arvados.ContainerStateCancelled &&
+                       ent.Container.State != arvados.ContainerStateComplete {
+                       missing = append(missing, uuid)
+               }
+       }
+       cq.mtx.Unlock()
+
+       for i, page := 0, 20; i < len(missing); i += page {
+               batch := missing[i:]
+               if len(batch) > page {
+                       batch = batch[:page]
+               }
+               ended, err := cq.fetchAll(arvados.ResourceListParams{
+                       Select:  selectParam,
+                       Order:   "uuid",
+                       Count:   "none",
+                       Filters: []arvados.Filter{{"uuid", "in", batch}},
+               })
+               if err != nil {
+                       return nil, err
+               }
+               apply(ended)
+       }
+       return next, nil
+}
+
+func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+       var results []arvados.Container
+       params := initialParams
+       params.Offset = 0
+       for {
+               // This list variable must be a new one declared
+               // inside the loop: otherwise, items in the API
+               // response would get deep-merged into the items
+               // loaded in previous iterations.
+               var list arvados.ContainerList
+
+               err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+               if err != nil {
+                       return nil, err
+               }
+               if len(list.Items) == 0 {
+                       break
+               }
+
+               results = append(results, list.Items...)
+               if len(params.Order) == 1 && params.Order == "uuid" {
+                       params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
+               } else {
+                       params.Offset += len(list.Items)
+               }
+       }
+       return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644 (file)
index 0000000..97aacf6
--- /dev/null
@@ -0,0 +1,185 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/scheduler"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       "golang.org/x/crypto/ssh"
+)
+
+const (
+       defaultPollInterval     = time.Second
+       defaultStaleLockTimeout = time.Minute
+)
+
+type pool interface {
+       scheduler.WorkerPool
+       Instances() []worker.InstanceView
+}
+
+type dispatcher struct {
+       Cluster       *arvados.Cluster
+       InstanceSetID cloud.InstanceSetID
+
+       logger      logrus.FieldLogger
+       reg         *prometheus.Registry
+       instanceSet cloud.InstanceSet
+       pool        pool
+       queue       scheduler.ContainerQueue
+       httpHandler http.Handler
+       sshKey      ssh.Signer
+
+       setupOnce sync.Once
+       stop      chan struct{}
+}
+
+// Start starts the dispatcher. Start can be called multiple times
+// with no ill effect.
+func (disp *dispatcher) Start() {
+       disp.setupOnce.Do(disp.setup)
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       disp.Start()
+       disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+       disp.Start()
+       return nil
+}
+
+// Stop dispatching containers and release resources. Typically used
+// in tests.
+func (disp *dispatcher) Close() {
+       disp.Start()
+       select {
+       case disp.stop <- struct{}{}:
+       default:
+       }
+}
+
+// Make a worker.Executor for the given instance.
+func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
+       exr := ssh_executor.New(inst)
+       exr.SetSigners(disp.sshKey)
+       return exr
+}
+
+func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
+       return ChooseInstanceType(disp.Cluster, ctr)
+}
+
+func (disp *dispatcher) setup() {
+       disp.initialize()
+       go disp.run()
+}
+
+func (disp *dispatcher) initialize() {
+       arvClient := arvados.NewClientFromEnv()
+       if disp.InstanceSetID == "" {
+               if strings.HasPrefix(arvClient.AuthToken, "v2/") {
+                       disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
+               } else {
+                       // Use some other string unique to this token
+                       // that doesn't reveal the token itself.
+                       disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
+               }
+       }
+       disp.stop = make(chan struct{}, 1)
+       disp.logger = logrus.StandardLogger()
+
+       if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
+               disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
+       } else {
+               disp.sshKey = key
+       }
+
+       instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID)
+       if err != nil {
+               disp.logger.Fatalf("error initializing driver: %s", err)
+       }
+       disp.instanceSet = &instanceSetProxy{instanceSet}
+       disp.reg = prometheus.NewRegistry()
+       disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+       disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
+
+       mux := http.NewServeMux()
+       mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers)
+       mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances)
+       metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
+               ErrorLog: disp.logger,
+       })
+       mux.Handle("/metrics", metricsH)
+       mux.Handle("/metrics.json", metricsH)
+       disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+}
+
+func (disp *dispatcher) run() {
+       defer disp.instanceSet.Stop()
+
+       staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)
+       if staleLockTimeout == 0 {
+               staleLockTimeout = defaultStaleLockTimeout
+       }
+       pollInterval := time.Duration(disp.Cluster.Dispatch.PollInterval)
+       if pollInterval <= 0 {
+               pollInterval = defaultPollInterval
+       }
+       sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+       sched.Start()
+       defer sched.Stop()
+
+       <-disp.stop
+}
+
+// Management API: all active and queued containers.
+func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
+       if r.Method != "GET" {
+               httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+       var resp struct {
+               Items []container.QueueEnt
+       }
+       qEntries, _ := disp.queue.Entries()
+       for _, ent := range qEntries {
+               resp.Items = append(resp.Items, ent)
+       }
+       json.NewEncoder(w).Encode(resp)
+}
+
+// Management API: all active instances (cloud VMs).
+func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
+       if r.Method != "GET" {
+               httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+       var resp struct {
+               Items []worker.InstanceView
+       }
+       resp.Items = disp.pool.Instances()
+       json.NewEncoder(w).Encode(resp)
+}
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
new file mode 100644 (file)
index 0000000..83797e9
--- /dev/null
@@ -0,0 +1,369 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "math/rand"
+       "net/http/httptest"
+       "os"
+       "regexp"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&DispatcherSuite{})
+
+// fakeCloud provides an exec method that can be used as a
+// test.StubExecFunc. It calls the provided makeVM func when called
+// with a previously unseen instance ID. Calls to exec are passed on
+// to the *fakeVM for the appropriate instance ID.
+type fakeCloud struct {
+       queue      *test.Queue
+       makeVM     func(cloud.Instance) *fakeVM
+       onComplete func(string)
+       onCancel   func(string)
+       vms        map[cloud.InstanceID]*fakeVM
+       sync.Mutex
+}
+
+func (fc *fakeCloud) exec(inst cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       fc.Lock()
+       fvm, ok := fc.vms[inst.ID()]
+       if !ok {
+               if fc.vms == nil {
+                       fc.vms = make(map[cloud.InstanceID]*fakeVM)
+               }
+               fvm = fc.makeVM(inst)
+               fc.vms[inst.ID()] = fvm
+       }
+       fc.Unlock()
+       return fvm.exec(fc.queue, fc.onComplete, fc.onCancel, command, stdin, stdout, stderr)
+}
+
+// fakeVM is a fake VM with configurable delays and failure modes.
+type fakeVM struct {
+       boot                 time.Time
+       broken               time.Time
+       crunchRunMissing     bool
+       crunchRunCrashRate   float64
+       crunchRunDetachDelay time.Duration
+       ctrExit              int
+       running              map[string]bool
+       completed            []string
+       sync.Mutex
+}
+
+func (fvm *fakeVM) exec(queue *test.Queue, onComplete, onCancel func(uuid string), command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
+       if eta := fvm.boot.Sub(time.Now()); eta > 0 {
+               fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
+               return 1
+       }
+       if !fvm.broken.IsZero() && fvm.broken.Before(time.Now()) {
+               fmt.Fprintf(stderr, "cannot fork\n")
+               return 2
+       }
+       if fvm.crunchRunMissing && strings.Contains(command, "crunch-run") {
+               fmt.Fprint(stderr, "crunch-run: command not found\n")
+               return 1
+       }
+       if strings.HasPrefix(command, "crunch-run --detach ") {
+               fvm.Lock()
+               if fvm.running == nil {
+                       fvm.running = map[string]bool{}
+               }
+               fvm.running[uuid] = true
+               fvm.Unlock()
+               time.Sleep(fvm.crunchRunDetachDelay)
+               fmt.Fprintf(stderr, "starting %s\n", uuid)
+               logger := logrus.WithField("ContainerUUID", uuid)
+               logger.Printf("[test] starting crunch-run stub")
+               go func() {
+                       crashluck := rand.Float64()
+                       ctr, ok := queue.Get(uuid)
+                       if !ok {
+                               logger.Print("[test] container not in queue")
+                               return
+                       }
+                       if crashluck > fvm.crunchRunCrashRate/2 {
+                               time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
+                               ctr.State = arvados.ContainerStateRunning
+                               queue.Notify(ctr)
+                       }
+
+                       time.Sleep(time.Duration(rand.Float64()*20) * time.Millisecond)
+                       fvm.Lock()
+                       _, running := fvm.running[uuid]
+                       fvm.Unlock()
+                       if !running {
+                               logger.Print("[test] container was killed")
+                               return
+                       }
+                       // TODO: Check whether the stub instance has
+                       // been destroyed, and if so, don't call
+                       // onComplete. Then "container finished twice"
+                       // can be classified as a bug.
+                       if crashluck < fvm.crunchRunCrashRate {
+                               logger.Print("[test] crashing crunch-run stub")
+                               if onCancel != nil && ctr.State == arvados.ContainerStateRunning {
+                                       onCancel(uuid)
+                               }
+                       } else {
+                               ctr.State = arvados.ContainerStateComplete
+                               ctr.ExitCode = fvm.ctrExit
+                               queue.Notify(ctr)
+                               if onComplete != nil {
+                                       onComplete(uuid)
+                               }
+                       }
+                       logger.Print("[test] exiting crunch-run stub")
+                       fvm.Lock()
+                       defer fvm.Unlock()
+                       delete(fvm.running, uuid)
+               }()
+               return 0
+       }
+       if command == "crunch-run --list" {
+               fvm.Lock()
+               defer fvm.Unlock()
+               for uuid := range fvm.running {
+                       fmt.Fprintf(stdout, "%s\n", uuid)
+               }
+               return 0
+       }
+       if strings.HasPrefix(command, "crunch-run --kill ") {
+               fvm.Lock()
+               defer fvm.Unlock()
+               if fvm.running[uuid] {
+                       delete(fvm.running, uuid)
+               } else {
+                       fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+               }
+               return 0
+       }
+       if command == "true" {
+               return 0
+       }
+       fmt.Fprintf(stderr, "%q: command not found", command)
+       return 1
+}
+
+type DispatcherSuite struct {
+       cluster     *arvados.Cluster
+       instanceSet *test.LameInstanceSet
+       stubDriver  *test.StubDriver
+       disp        *dispatcher
+}
+
+func (s *DispatcherSuite) SetUpSuite(c *check.C) {
+       if os.Getenv("ARVADOS_DEBUG") != "" {
+               logrus.StandardLogger().SetLevel(logrus.DebugLevel)
+       }
+}
+
+func (s *DispatcherSuite) SetUpTest(c *check.C) {
+       dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
+       dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
+       c.Assert(err, check.IsNil)
+
+       _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
+       s.stubDriver = &test.StubDriver{
+               Exec: func(inst cloud.Instance, command string, _ io.Reader, _, _ io.Writer) uint32 {
+                       c.Logf("stubDriver SSHExecFunc(%s, %q, ...)", inst, command)
+                       return 1
+               },
+               HostKey:        hostpriv,
+               AuthorizedKeys: []ssh.PublicKey{dispatchpub},
+
+               ErrorRateDestroy: 0.1,
+       }
+
+       s.cluster = &arvados.Cluster{
+               CloudVMs: arvados.CloudVMs{
+                       Driver:          "test",
+                       SyncInterval:    arvados.Duration(10 * time.Millisecond),
+                       TimeoutIdle:     arvados.Duration(30 * time.Millisecond),
+                       TimeoutBooting:  arvados.Duration(30 * time.Millisecond),
+                       TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
+                       TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+               },
+               Dispatch: arvados.Dispatch{
+                       PrivateKey:         dispatchprivraw,
+                       PollInterval:       arvados.Duration(5 * time.Millisecond),
+                       ProbeInterval:      arvados.Duration(5 * time.Millisecond),
+                       StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
+                       MaxProbesPerSecond: 1000,
+               },
+               InstanceTypes: arvados.InstanceTypeMap{
+                       test.InstanceType(1).Name:  test.InstanceType(1),
+                       test.InstanceType(2).Name:  test.InstanceType(2),
+                       test.InstanceType(3).Name:  test.InstanceType(3),
+                       test.InstanceType(4).Name:  test.InstanceType(4),
+                       test.InstanceType(6).Name:  test.InstanceType(6),
+                       test.InstanceType(8).Name:  test.InstanceType(8),
+                       test.InstanceType(16).Name: test.InstanceType(16),
+               },
+               NodeProfiles: map[string]arvados.NodeProfile{
+                       "*": {
+                               Controller:    arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
+                               DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
+                       },
+               },
+       }
+       s.disp = &dispatcher{Cluster: s.cluster}
+       // Test cases can modify s.cluster before calling
+       // initialize(), and then modify private state before calling
+       // go run().
+}
+
+func (s *DispatcherSuite) TearDownTest(c *check.C) {
+       s.disp.Close()
+}
+
+func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
+       drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       queue := &test.Queue{
+               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                       return ChooseInstanceType(s.cluster, ctr)
+               },
+       }
+       for i := 0; i < 200; i++ {
+               queue.Containers = append(queue.Containers, arvados.Container{
+                       UUID:     test.ContainerUUID(i + 1),
+                       State:    arvados.ContainerStateQueued,
+                       Priority: int64(i%20 + 1),
+                       RuntimeConstraints: arvados.RuntimeConstraints{
+                               RAM:   int64(i%3+1) << 30,
+                               VCPUs: i%8 + 1,
+                       },
+               })
+       }
+       s.disp.queue = queue
+
+       var mtx sync.Mutex
+       done := make(chan struct{})
+       waiting := map[string]struct{}{}
+       for _, ctr := range queue.Containers {
+               waiting[ctr.UUID] = struct{}{}
+       }
+       onComplete := func(uuid string) {
+               mtx.Lock()
+               defer mtx.Unlock()
+               if _, ok := waiting[uuid]; !ok {
+                       c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", uuid)
+               }
+               delete(waiting, uuid)
+               if len(waiting) == 0 {
+                       close(done)
+               }
+       }
+       n := 0
+       fc := &fakeCloud{
+               queue: queue,
+               makeVM: func(inst cloud.Instance) *fakeVM {
+                       n++
+                       fvm := &fakeVM{
+                               boot:                 time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond)))),
+                               crunchRunDetachDelay: time.Duration(rand.Int63n(int64(10 * time.Millisecond))),
+                               ctrExit:              int(rand.Uint32() & 0x3),
+                       }
+                       switch n % 7 {
+                       case 0:
+                               fvm.broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
+                       case 1:
+                               fvm.crunchRunMissing = true
+                       default:
+                               fvm.crunchRunCrashRate = 0.1
+                       }
+                       return fvm
+               },
+               onComplete: onComplete,
+               onCancel:   onComplete,
+       }
+       s.stubDriver.Exec = fc.exec
+
+       start := time.Now()
+       go s.disp.run()
+       err := s.disp.CheckHealth()
+       c.Check(err, check.IsNil)
+
+       select {
+       case <-done:
+               c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
+       case <-time.After(10 * time.Second):
+               c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+       }
+
+       deadline := time.Now().Add(time.Second)
+       for range time.NewTicker(10 * time.Millisecond).C {
+               insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
+               c.Check(err, check.IsNil)
+               queue.Update()
+               ents, _ := queue.Entries()
+               if len(ents) == 0 && len(insts) == 0 {
+                       break
+               }
+               if time.Now().After(deadline) {
+                       c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
+               }
+       }
+}
+
+func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
+       s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
+       drivers["test"] = s.stubDriver
+
+       type instance struct {
+               Instance             string
+               WorkerState          string
+               Price                float64
+               LastContainerUUID    string
+               ArvadosInstanceType  string
+               ProviderInstanceType string
+       }
+       type instancesResponse struct {
+               Items []instance
+       }
+       getInstances := func() instancesResponse {
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               var sr instancesResponse
+               err := json.Unmarshal(resp.Body.Bytes(), &sr)
+               c.Check(err, check.IsNil)
+               return sr
+       }
+
+       sr := getInstances()
+       c.Check(len(sr.Items), check.Equals, 0)
+
+       ch := s.disp.pool.Subscribe()
+       defer s.disp.pool.Unsubscribe(ch)
+       err := s.disp.pool.Create(test.InstanceType(1))
+       c.Check(err, check.IsNil)
+       <-ch
+
+       sr = getInstances()
+       c.Assert(len(sr.Items), check.Equals, 1)
+       c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
+       c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
+       c.Check(sr.Items[0].Price, check.Equals, 0.123)
+       c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
+       c.Check(sr.Items[0].ProviderInstanceType, check.Equals, test.InstanceType(1).ProviderType)
+       c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, test.InstanceType(1).Name)
+}
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
new file mode 100644 (file)
index 0000000..295fd61
--- /dev/null
@@ -0,0 +1,22 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var drivers = map[string]cloud.Driver{}
+
+func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID) (cloud.InstanceSet, error) {
+       driver, ok := drivers[cluster.CloudVMs.Driver]
+       if !ok {
+               return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
+       }
+       return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID)
+}
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
new file mode 100644 (file)
index 0000000..e728b67
--- /dev/null
@@ -0,0 +1,25 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+type instanceSetProxy struct {
+       cloud.InstanceSet
+}
+
+func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
+       // TODO: return if Create failed recently with a RateLimitError or QuotaError
+       return is.InstanceSet.Create(it, id, tags, pk)
+}
+
+func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+       // TODO: return if Instances failed recently with a RateLimitError
+       return is.InstanceSet.Instances(tags)
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644 (file)
index 0000000..90bb6ca
--- /dev/null
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "sync"
+       "time"
+)
+
+type logger interface {
+       Printf(string, ...interface{})
+       Warnf(string, ...interface{})
+       Debugf(string, ...interface{})
+}
+
+var nextSpam = map[string]time.Time{}
+var nextSpamMtx sync.Mutex
+
+func unspam(msg string) bool {
+       nextSpamMtx.Lock()
+       defer nextSpamMtx.Unlock()
+       if nextSpam[msg].Before(time.Now()) {
+               nextSpam[msg] = time.Now().Add(time.Minute)
+               return true
+       }
+       return false
+}
index 1c36d6cf5bb770cb447b6f7f177d39c5ff7ef469..e77c862b3668146d244f20b70b268a38a9dd9640 100644 (file)
@@ -6,19 +6,14 @@ package dispatchcloud
 
 import (
        "errors"
-       "log"
-       "os/exec"
        "sort"
-       "strings"
-       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
-var (
-       ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
-       discountConfiguredRAMPercent  = 5
-)
+var ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+
+var discountConfiguredRAMPercent = 5
 
 // ConstraintsNotSatisfiableError includes a list of available instance types
 // to be reported back to the user.
@@ -79,61 +74,3 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
        }
        return
 }
-
-// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
-// type name as a valid feature name, even if no instances of that
-// type have appeared yet.
-//
-// It takes advantage of some SLURM peculiarities:
-//
-// (1) A feature is valid after it has been offered by a node, even if
-// it is no longer offered by any node. So, to make a feature name
-// valid, we can add it to a dummy node ("compute0"), then remove it.
-//
-// (2) To test whether a set of feature names are valid without
-// actually submitting a job, we can call srun --test-only with the
-// desired features.
-//
-// SlurmNodeTypeFeatureKludge does a test-and-fix operation
-// immediately, and then periodically, in case slurm restarts and
-// forgets the list of valid features. It never returns (unless there
-// are no node types configured, in which case it returns
-// immediately), so it should generally be invoked with "go".
-func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
-       if len(cc.InstanceTypes) == 0 {
-               return
-       }
-       var features []string
-       for _, it := range cc.InstanceTypes {
-               features = append(features, "instancetype="+it.Name)
-       }
-       for {
-               slurmKludge(features)
-               time.Sleep(2 * time.Second)
-       }
-}
-
-const slurmDummyNode = "compute0"
-
-func slurmKludge(features []string) {
-       allFeatures := strings.Join(features, ",")
-
-       cmd := exec.Command("sinfo", "--nodes="+slurmDummyNode, "--format=%f", "--noheader")
-       out, err := cmd.CombinedOutput()
-       if err != nil {
-               log.Printf("running %q %q: %s (output was %q)", cmd.Path, cmd.Args, err, out)
-               return
-       }
-       if string(out) == allFeatures+"\n" {
-               // Already configured correctly, nothing to do.
-               return
-       }
-
-       log.Printf("configuring node %q with all node type features", slurmDummyNode)
-       cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+allFeatures)
-       log.Printf("running: %q %q", cmd.Path, cmd.Args)
-       out, err = cmd.CombinedOutput()
-       if err != nil {
-               log.Printf("error: scontrol: %s (output was %q)", err, out)
-       }
-}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644 (file)
index 0000000..c8491fb
--- /dev/null
@@ -0,0 +1,70 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a remote command executor, and a cloud driver.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run the scheduler's stale-lock fixer.
+// 5. Run the scheduler's mapper.
+// 6. Run the scheduler's syncer.
+// 7. Wait for updates to the container queue or worker pool.
+// 8. Repeat from 5.
+//
+//
+// A cloud driver creates new cloud VM instances and gets the latest
+// list of instances. The returned instances are caches/proxies for
+// the provider's metadata and control interfaces (get IP address,
+// update tags, shutdown).
+//
+//
+// A worker pool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// An executor maintains a multiplexed SSH connection to a cloud
+// instance, retrying/reconnecting as needed, so the worker pool can
+// execute commands. It asks the cloud driver's instance to verify its
+// SSH public key once when first connecting, and again later if the
+// key changes.
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// The scheduler's stale-lock fixer waits for any already-locked
+// containers (i.e., locked by a prior dispatcher process) to appear
+// on workers as the worker pool recovers its state. It
+// unlocks/requeues any that still remain when all workers are
+// recovered or shutdown, or its timer expires.
+//
+//
+// The scheduler's mapper chooses which containers to assign to which
+// idle workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// The scheduler's syncer updates state to Cancelled when a running
+// container process dies without finalizing its entry in the
+// controller database. It also calls the worker pool to kill
+// containers that have priority=0 while locked or running.
+//
+//
+// An instance set proxy wraps a driver's instance set with
+// rate-limiting logic. After the wrapped instance set receives a
+// cloud.RateLimitError, the proxy starts returning errors to callers
+// immediately without calling through to the wrapped instance set.
diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go
new file mode 100644 (file)
index 0000000..9859410
--- /dev/null
@@ -0,0 +1,57 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// fixStaleLocks waits for any already-locked containers (i.e., locked
+// by a prior dispatcher process) to appear on workers as the worker
+// pool recovers its state. It unlocks any that still remain when all
+// workers are recovered or shutdown, or its timer
+// (sch.staleLockTimeout) expires.
+func (sch *Scheduler) fixStaleLocks() {
+       wp := sch.pool.Subscribe()
+       defer sch.pool.Unsubscribe(wp)
+       timeout := time.NewTimer(sch.staleLockTimeout)
+waiting:
+       for {
+               unlock := false
+               select {
+               case <-wp:
+                       // If all workers have been contacted, unlock
+                       // containers that aren't claimed by any
+                       // worker.
+                       unlock = sch.pool.Workers()[worker.StateUnknown] == 0
+               case <-timeout.C:
+                       // Give up and unlock the containers, even
+                       // though they might be working.
+                       unlock = true
+               }
+
+               running := sch.pool.Running()
+               qEntries, _ := sch.queue.Entries()
+               for uuid, ent := range qEntries {
+                       if ent.Container.State != arvados.ContainerStateLocked {
+                               continue
+                       }
+                       if _, running := running[uuid]; running {
+                               continue
+                       }
+                       if !unlock {
+                               continue waiting
+                       }
+                       err := sch.queue.Unlock(uuid)
+                       if err != nil {
+                               sch.logger.Warnf("Unlock %s: %s", uuid, err)
+                       }
+               }
+               return
+       }
+}
diff --git a/lib/dispatchcloud/scheduler/gocheck_test.go b/lib/dispatchcloud/scheduler/gocheck_test.go
new file mode 100644 (file)
index 0000000..558c60f
--- /dev/null
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "testing"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go
new file mode 100644 (file)
index 0000000..467247a
--- /dev/null
@@ -0,0 +1,43 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// A ContainerQueue is a set of containers that need to be started or
+// stopped. Implemented by container.Queue and test stubs.
+type ContainerQueue interface {
+       Entries() (entries map[string]container.QueueEnt, updated time.Time)
+       Lock(uuid string) error
+       Unlock(uuid string) error
+       Cancel(uuid string) error
+       Forget(uuid string)
+       Get(uuid string) (arvados.Container, bool)
+       Subscribe() <-chan struct{}
+       Unsubscribe(<-chan struct{})
+       Update() error
+}
+
+// A WorkerPool asynchronously starts and stops worker VMs, and starts
+// and stops containers on them. Implemented by worker.Pool and test
+// stubs.
+type WorkerPool interface {
+       Running() map[string]time.Time
+       Unallocated() map[arvados.InstanceType]int
+       Workers() map[worker.State]int
+       AtQuota() bool
+       Create(arvados.InstanceType) error
+       Shutdown(arvados.InstanceType) bool
+       StartContainer(arvados.InstanceType, arvados.Container) bool
+       KillContainer(uuid string)
+       Subscribe() <-chan struct{}
+       Unsubscribe(<-chan struct{})
+}
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
new file mode 100644 (file)
index 0000000..7b432ad
--- /dev/null
@@ -0,0 +1,153 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "sort"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+func (sch *Scheduler) runQueue() {
+       unsorted, _ := sch.queue.Entries()
+       sorted := make([]container.QueueEnt, 0, len(unsorted))
+       for _, ent := range unsorted {
+               sorted = append(sorted, ent)
+       }
+       sort.Slice(sorted, func(i, j int) bool {
+               return sorted[i].Container.Priority > sorted[j].Container.Priority
+       })
+
+       running := sch.pool.Running()
+       unalloc := sch.pool.Unallocated()
+
+       sch.logger.WithFields(logrus.Fields{
+               "Containers": len(sorted),
+               "Processes":  len(running),
+       }).Debug("runQueue")
+
+       dontstart := map[arvados.InstanceType]bool{}
+       var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
+
+tryrun:
+       for i, ctr := range sorted {
+               ctr, it := ctr.Container, ctr.InstanceType
+               logger := sch.logger.WithFields(logrus.Fields{
+                       "ContainerUUID": ctr.UUID,
+                       "InstanceType":  it.Name,
+               })
+               if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
+                       continue
+               }
+               switch ctr.State {
+               case arvados.ContainerStateQueued:
+                       if unalloc[it] < 1 && sch.pool.AtQuota() {
+                               logger.Debug("not locking: AtQuota and no unalloc workers")
+                               overquota = sorted[i:]
+                               break tryrun
+                       }
+                       sch.bgLock(logger, ctr.UUID)
+                       unalloc[it]--
+               case arvados.ContainerStateLocked:
+                       if unalloc[it] < 1 {
+                               if sch.pool.AtQuota() {
+                                       logger.Debug("not starting: AtQuota and no unalloc workers")
+                                       overquota = sorted[i:]
+                                       break tryrun
+                               }
+                               logger.Info("creating new instance")
+                               err := sch.pool.Create(it)
+                               if err != nil {
+                                       if _, ok := err.(cloud.QuotaError); !ok {
+                                               logger.WithError(err).Warn("error creating worker")
+                                       }
+                                       sch.queue.Unlock(ctr.UUID)
+                                       // Don't let lower-priority
+                                       // containers starve this one
+                                       // by using keeping idle
+                                       // workers alive on different
+                                       // instance types.  TODO:
+                                       // avoid getting starved here
+                                       // if instances of a specific
+                                       // type always fail.
+                                       overquota = sorted[i:]
+                                       break tryrun
+                               }
+                               unalloc[it]++
+                       }
+
+                       if dontstart[it] {
+                               // We already tried & failed to start
+                               // a higher-priority container on the
+                               // same instance type. Don't let this
+                               // one sneak in ahead of it.
+                       } else if sch.pool.StartContainer(it, ctr) {
+                               unalloc[it]--
+                       } else {
+                               dontstart[it] = true
+                       }
+               }
+       }
+
+       if len(overquota) > 0 {
+               // Unlock any containers that are unmappable while
+               // we're at quota.
+               for _, ctr := range overquota {
+                       ctr := ctr.Container
+                       if ctr.State == arvados.ContainerStateLocked {
+                               logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
+                               logger.Debug("unlock because pool capacity is used by higher priority containers")
+                               err := sch.queue.Unlock(ctr.UUID)
+                               if err != nil {
+                                       logger.WithError(err).Warn("error unlocking")
+                               }
+                       }
+               }
+               // Shut down idle workers that didn't get any
+               // containers mapped onto them before we hit quota.
+               for it, n := range unalloc {
+                       if n < 1 {
+                               continue
+                       }
+                       sch.pool.Shutdown(it)
+               }
+       }
+}
+
+// Start an API call to lock the given container, and return
+// immediately while waiting for the response in a new goroutine. Do
+// nothing if a lock request is already in progress for this
+// container.
+func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
+       logger.Debug("locking")
+       sch.mtx.Lock()
+       defer sch.mtx.Unlock()
+       if sch.locking[uuid] {
+               logger.Debug("locking in progress, doing nothing")
+               return
+       }
+       sch.locking[uuid] = true
+       go func() {
+               defer func() {
+                       sch.mtx.Lock()
+                       defer sch.mtx.Unlock()
+                       delete(sch.locking, uuid)
+               }()
+               err := sch.queue.Lock(uuid)
+               if err != nil {
+                       logger.WithError(err).Warn("error locking container")
+                       return
+               }
+               ctr, ok := sch.queue.Get(uuid)
+               if !ok {
+                       logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+               } else if ctr.State != arvados.ContainerStateLocked {
+                       logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+               }
+       }()
+}
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
new file mode 100644 (file)
index 0000000..35db705
--- /dev/null
@@ -0,0 +1,309 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "errors"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+var (
+       logger = logrus.StandardLogger()
+
+       // arbitrary example container UUIDs
+       uuids = func() (r []string) {
+               for i := 0; i < 16; i++ {
+                       r = append(r, test.ContainerUUID(i))
+               }
+               return
+       }()
+)
+
+type stubQuotaError struct {
+       error
+}
+
+func (stubQuotaError) IsQuotaError() bool { return true }
+
+type stubPool struct {
+       notify    <-chan struct{}
+       unalloc   map[arvados.InstanceType]int // idle+booting+unknown
+       idle      map[arvados.InstanceType]int
+       running   map[string]time.Time
+       atQuota   bool
+       canCreate int
+       creates   []arvados.InstanceType
+       starts    []string
+       shutdowns int
+}
+
+func (p *stubPool) AtQuota() bool                 { return p.atQuota }
+func (p *stubPool) Subscribe() <-chan struct{}    { return p.notify }
+func (p *stubPool) Unsubscribe(<-chan struct{})   {}
+func (p *stubPool) Running() map[string]time.Time { return p.running }
+func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
+       r := map[arvados.InstanceType]int{}
+       for it, n := range p.unalloc {
+               r[it] = n
+       }
+       return r
+}
+func (p *stubPool) Create(it arvados.InstanceType) error {
+       p.creates = append(p.creates, it)
+       if p.canCreate < 1 {
+               return stubQuotaError{errors.New("quota")}
+       }
+       p.canCreate--
+       p.unalloc[it]++
+       return nil
+}
+func (p *stubPool) KillContainer(uuid string) {
+       p.running[uuid] = time.Now()
+}
+func (p *stubPool) Shutdown(arvados.InstanceType) bool {
+       p.shutdowns++
+       return false
+}
+func (p *stubPool) Workers() map[worker.State]int {
+       return map[worker.State]int{
+               worker.StateBooting: len(p.unalloc) - len(p.idle),
+               worker.StateIdle:    len(p.idle),
+               worker.StateRunning: len(p.running),
+       }
+}
+func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+       p.starts = append(p.starts, ctr.UUID)
+       if p.idle[it] == 0 {
+               return false
+       }
+       p.idle[it]--
+       p.unalloc[it]--
+       p.running[ctr.UUID] = time.Time{}
+       return true
+}
+
+var _ = check.Suite(&SchedulerSuite{})
+
+type SchedulerSuite struct{}
+
+// Assign priority=4 container to idle node. Create a new instance for
+// the priority=3 container. Don't try to start any priority<3
+// containers because priority=3 container didn't start
+// immediately. Don't try to create any other nodes after the failed
+// create.
+func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+       queue := test.Queue{
+               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+               },
+               Containers: []arvados.Container{
+                       {
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(2),
+                               Priority: 2,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(3),
+                               Priority: 3,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(4),
+                               Priority: 4,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(1): 1,
+                       test.InstanceType(2): 2,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(1): 1,
+                       test.InstanceType(2): 2,
+               },
+               running:   map[string]time.Time{},
+               canCreate: 0,
+       }
+       New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
+       c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
+       c.Check(pool.running, check.HasLen, 1)
+       for uuid := range pool.running {
+               c.Check(uuid, check.Equals, uuids[4])
+       }
+}
+
+// Shutdown some nodes if Create() fails -- and without even calling
+// Create(), if AtQuota() is true.
+func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+       for quota := 0; quota < 2; quota++ {
+               c.Logf("quota=%d", quota)
+               shouldCreate := []arvados.InstanceType{}
+               for i := 0; i < quota; i++ {
+                       shouldCreate = append(shouldCreate, test.InstanceType(1))
+               }
+               queue := test.Queue{
+                       ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                               return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+                       },
+                       Containers: []arvados.Container{
+                               {
+                                       UUID:     test.ContainerUUID(1),
+                                       Priority: 1,
+                                       State:    arvados.ContainerStateLocked,
+                                       RuntimeConstraints: arvados.RuntimeConstraints{
+                                               VCPUs: 1,
+                                               RAM:   1 << 30,
+                                       },
+                               },
+                       },
+               }
+               queue.Update()
+               pool := stubPool{
+                       atQuota: quota == 0,
+                       unalloc: map[arvados.InstanceType]int{
+                               test.InstanceType(2): 2,
+                       },
+                       idle: map[arvados.InstanceType]int{
+                               test.InstanceType(2): 2,
+                       },
+                       running:   map[string]time.Time{},
+                       creates:   []arvados.InstanceType{},
+                       starts:    []string{},
+                       canCreate: 0,
+               }
+               New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+               c.Check(pool.creates, check.DeepEquals, shouldCreate)
+               c.Check(pool.starts, check.DeepEquals, []string{})
+               c.Check(pool.shutdowns, check.Not(check.Equals), 0)
+       }
+}
+
+// Start lower-priority containers while waiting for new/existing
+// workers to come up for higher-priority containers.
+func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(1): 1,
+                       test.InstanceType(2): 1,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(1): 1,
+                       test.InstanceType(2): 1,
+               },
+               running:   map[string]time.Time{},
+               canCreate: 4,
+       }
+       queue := test.Queue{
+               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+               },
+               Containers: []arvados.Container{
+                       {
+                               // create a new worker
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               // tentatively map to unalloc worker
+                               UUID:     test.ContainerUUID(2),
+                               Priority: 2,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               // start now on idle worker
+                               UUID:     test.ContainerUUID(3),
+                               Priority: 3,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               // create a new worker
+                               UUID:     test.ContainerUUID(4),
+                               Priority: 4,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 2,
+                                       RAM:   2 << 30,
+                               },
+                       },
+                       {
+                               // tentatively map to unalloc worker
+                               UUID:     test.ContainerUUID(5),
+                               Priority: 5,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 2,
+                                       RAM:   2 << 30,
+                               },
+                       },
+                       {
+                               // start now on idle worker
+                               UUID:     test.ContainerUUID(6),
+                               Priority: 6,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 2,
+                                       RAM:   2 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
+       c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
+       running := map[string]bool{}
+       for uuid, t := range pool.running {
+               if t.IsZero() {
+                       running[uuid] = false
+               } else {
+                       running[uuid] = true
+               }
+       }
+       c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
+}
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
new file mode 100644 (file)
index 0000000..9a5fb10
--- /dev/null
@@ -0,0 +1,111 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package scheduler uses a resizable worker pool to execute
+// containers in priority order.
+package scheduler
+
+import (
+       "sync"
+       "time"
+
+       "github.com/Sirupsen/logrus"
+)
+
+// A Scheduler maps queued containers onto unallocated workers in
+// priority order, creating new workers if needed. It locks containers
+// that can be mapped onto existing/pending workers, and starts them
+// if possible.
+//
+// A Scheduler unlocks any containers that are locked but can't be
+// mapped. (For example, this happens when the cloud provider reaches
+// quota/capacity and a previously mappable container's priority is
+// surpassed by a newer container.)
+//
+// If it encounters errors while creating new workers, a Scheduler
+// shuts down idle workers, in case they are consuming quota.
+type Scheduler struct {
+       logger              logrus.FieldLogger
+       queue               ContainerQueue
+       pool                WorkerPool
+       staleLockTimeout    time.Duration
+       queueUpdateInterval time.Duration
+
+       locking map[string]bool
+       mtx     sync.Mutex
+
+       runOnce sync.Once
+       stop    chan struct{}
+}
+
+// New returns a new unstarted Scheduler.
+//
+// Any given queue and pool should not be used by more than one
+// scheduler at a time.
+func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+       return &Scheduler{
+               logger:              logger,
+               queue:               queue,
+               pool:                pool,
+               staleLockTimeout:    staleLockTimeout,
+               queueUpdateInterval: queueUpdateInterval,
+               stop:                make(chan struct{}),
+               locking:             map[string]bool{},
+       }
+}
+
+// Start starts the scheduler.
+func (sch *Scheduler) Start() {
+       go sch.runOnce.Do(sch.run)
+}
+
+// Stop stops the scheduler. No other method should be called after
+// Stop.
+func (sch *Scheduler) Stop() {
+       close(sch.stop)
+}
+
+func (sch *Scheduler) run() {
+       // Ensure the queue is fetched once before attempting anything.
+       for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
+               sch.logger.Errorf("error updating queue: %s", err)
+               d := sch.queueUpdateInterval / 60
+               sch.logger.Infof("waiting %s before retry", d)
+               time.Sleep(d)
+       }
+
+       // Keep the queue up to date.
+       poll := time.NewTicker(sch.queueUpdateInterval)
+       defer poll.Stop()
+       go func() {
+               for range poll.C {
+                       err := sch.queue.Update()
+                       if err != nil {
+                               sch.logger.Errorf("error updating queue: %s", err)
+                       }
+               }
+       }()
+
+       t0 := time.Now()
+       sch.logger.Infof("FixStaleLocks starting.")
+       sch.fixStaleLocks()
+       sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
+
+       poolNotify := sch.pool.Subscribe()
+       defer sch.pool.Unsubscribe(poolNotify)
+
+       queueNotify := sch.queue.Subscribe()
+       defer sch.queue.Unsubscribe(queueNotify)
+
+       for {
+               sch.runQueue()
+               sch.sync()
+               select {
+               case <-sch.stop:
+                       return
+               case <-queueNotify:
+               case <-poolNotify:
+               }
+       }
+}
diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go
new file mode 100644 (file)
index 0000000..a85162d
--- /dev/null
@@ -0,0 +1,94 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+// sync resolves discrepancies between the queue and the pool:
+//
+// Lingering crunch-run processes for finalized and unlocked/requeued
+// containers are killed.
+//
+// Locked containers whose crunch-run processes have exited are
+// requeued.
+//
+// Running containers whose crunch-run processes have exited are
+// cancelled.
+func (sch *Scheduler) sync() {
+       running := sch.pool.Running()
+       cancel := func(ent container.QueueEnt, reason string) {
+               uuid := ent.Container.UUID
+               logger := sch.logger.WithField("ContainerUUID", uuid)
+               logger.Infof("cancelling container because %s", reason)
+               err := sch.queue.Cancel(uuid)
+               if err != nil {
+                       logger.WithError(err).Print("error cancelling container")
+               }
+       }
+       kill := func(ent container.QueueEnt) {
+               uuid := ent.Container.UUID
+               logger := sch.logger.WithField("ContainerUUID", uuid)
+               logger.Debugf("killing crunch-run process because state=%q", ent.Container.State)
+               sch.pool.KillContainer(uuid)
+       }
+       qEntries, qUpdated := sch.queue.Entries()
+       for uuid, ent := range qEntries {
+               exited, running := running[uuid]
+               switch ent.Container.State {
+               case arvados.ContainerStateRunning:
+                       if !running {
+                               go cancel(ent, "not running on any worker")
+                       } else if !exited.IsZero() && qUpdated.After(exited) {
+                               go cancel(ent, "state=\"Running\" after crunch-run exited")
+                       }
+               case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
+                       if running {
+                               // Kill crunch-run in case it's stuck;
+                               // nothing it does now will matter
+                               // anyway. If crunch-run has already
+                               // exited and we just haven't found
+                               // out about it yet, the only effect
+                               // of kill() will be to make the
+                               // worker available for the next
+                               // container.
+                               go kill(ent)
+                       } else {
+                               sch.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "State":         ent.Container.State,
+                               }).Info("container finished")
+                               sch.queue.Forget(uuid)
+                       }
+               case arvados.ContainerStateQueued:
+                       if running {
+                               // Can happen if a worker returns from
+                               // a network outage and is still
+                               // preparing to run a container that
+                               // has already been unlocked/requeued.
+                               go kill(ent)
+                       }
+               case arvados.ContainerStateLocked:
+                       if running && !exited.IsZero() && qUpdated.After(exited) {
+                               logger := sch.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "Exited":        time.Since(exited).Seconds(),
+                               })
+                               logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
+                               err := sch.queue.Unlock(uuid)
+                               if err != nil {
+                                       logger.WithError(err).Info("error requeueing container")
+                               }
+                       }
+               default:
+                       sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+               }
+       }
+}
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
new file mode 100644 (file)
index 0000000..b5dba98
--- /dev/null
@@ -0,0 +1,190 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package ssh_executor provides an implementation of pool.Executor
+// using a long-lived multiplexed SSH session.
+package ssh_executor
+
+import (
+       "bytes"
+       "errors"
+       "io"
+       "net"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "golang.org/x/crypto/ssh"
+)
+
+// New returns a new Executor, using the given target.
+func New(t cloud.ExecutorTarget) *Executor {
+       return &Executor{target: t}
+}
+
+// An Executor uses a multiplexed SSH connection to execute shell
+// commands on a remote target. It reconnects automatically after
+// errors.
+//
+// When setting up a connection, the Executor accepts whatever host
+// key is provided by the remote server, then passes the received key
+// and the SSH connection to the target's VerifyHostKey method before
+// executing commands on the connection.
+//
+// A zero Executor must not be used before calling SetTarget.
+//
+// An Executor must not be copied.
+type Executor struct {
+       target  cloud.ExecutorTarget
+       signers []ssh.Signer
+       mtx     sync.RWMutex // controls access to instance after creation
+
+       client      *ssh.Client
+       clientErr   error
+       clientOnce  sync.Once     // initialized private state
+       clientSetup chan bool     // len>0 while client setup is in progress
+       hostKey     ssh.PublicKey // most recent host key that passed verification, if any
+}
+
+// SetSigners updates the set of private keys that will be offered to
+// the target next time the Executor sets up a new connection.
+func (exr *Executor) SetSigners(signers ...ssh.Signer) {
+       exr.mtx.Lock()
+       defer exr.mtx.Unlock()
+       exr.signers = signers
+}
+
+// SetTarget sets the current target. The new target will be used next
+// time a new connection is set up; until then, the Executor will
+// continue to use the existing target.
+//
+// The new target is assumed to represent the same host as the
+// previous target, although its address and host key might differ.
+func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
+       exr.mtx.Lock()
+       defer exr.mtx.Unlock()
+       exr.target = t
+}
+
+// Target returns the current target.
+func (exr *Executor) Target() cloud.ExecutorTarget {
+       exr.mtx.RLock()
+       defer exr.mtx.RUnlock()
+       return exr.target
+}
+
+// Execute runs cmd on the target. If an existing connection is not
+// usable, it sets up a new connection to the current target.
+func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+       session, err := exr.newSession()
+       if err != nil {
+               return nil, nil, err
+       }
+       defer session.Close()
+       var stdout, stderr bytes.Buffer
+       session.Stdin = stdin
+       session.Stdout = &stdout
+       session.Stderr = &stderr
+       err = session.Run(cmd)
+       return stdout.Bytes(), stderr.Bytes(), err
+}
+
+// Close shuts down any active connections.
+func (exr *Executor) Close() {
+       // Ensure exr is initialized
+       exr.sshClient(false)
+
+       exr.clientSetup <- true
+       if exr.client != nil {
+               defer exr.client.Close()
+       }
+       exr.client, exr.clientErr = nil, errors.New("closed")
+       <-exr.clientSetup
+}
+
+// Create a new SSH session. If session setup fails or the SSH client
+// hasn't been setup yet, setup a new SSH client and try again.
+func (exr *Executor) newSession() (*ssh.Session, error) {
+       try := func(create bool) (*ssh.Session, error) {
+               client, err := exr.sshClient(create)
+               if err != nil {
+                       return nil, err
+               }
+               return client.NewSession()
+       }
+       session, err := try(false)
+       if err != nil {
+               session, err = try(true)
+       }
+       return session, err
+}
+
+// Get the latest SSH client. If another goroutine is in the process
+// of setting one up, wait for it to finish and return its result (or
+// the last successfully setup client, if it fails).
+func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
+       exr.clientOnce.Do(func() {
+               exr.clientSetup = make(chan bool, 1)
+               exr.clientErr = errors.New("client not yet created")
+       })
+       defer func() { <-exr.clientSetup }()
+       select {
+       case exr.clientSetup <- true:
+               if create {
+                       client, err := exr.setupSSHClient()
+                       if err == nil || exr.client == nil {
+                               if exr.client != nil {
+                                       // Hang up the previous
+                                       // (non-working) client
+                                       go exr.client.Close()
+                               }
+                               exr.client, exr.clientErr = client, err
+                       }
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       default:
+               // Another goroutine is doing the above case.  Wait
+               // for it to finish and return whatever it leaves in
+               // wkr.client.
+               exr.clientSetup <- true
+       }
+       return exr.client, exr.clientErr
+}
+
+// Create a new SSH client.
+func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
+       target := exr.Target()
+       addr := target.Address()
+       if addr == "" {
+               return nil, errors.New("instance has no address")
+       }
+       var receivedKey ssh.PublicKey
+       client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+               User: "root",
+               Auth: []ssh.AuthMethod{
+                       ssh.PublicKeys(exr.signers...),
+               },
+               HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+                       receivedKey = key
+                       return nil
+               },
+               Timeout: time.Minute,
+       })
+       if err != nil {
+               return nil, err
+       } else if receivedKey == nil {
+               return nil, errors.New("BUG: key was never provided to HostKeyCallback")
+       }
+
+       if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
+               err = target.VerifyHostKey(receivedKey, client)
+               if err != nil {
+                       return nil, err
+               }
+               exr.hostKey = receivedKey
+       }
+       return client, nil
+}
diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
new file mode 100644 (file)
index 0000000..8dabfec
--- /dev/null
@@ -0,0 +1,102 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ssh_executor
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+       "sync"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&ExecutorSuite{})
+
+type testTarget struct {
+       test.SSHService
+}
+
+func (*testTarget) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+       return nil
+}
+
+type ExecutorSuite struct{}
+
+func (s *ExecutorSuite) TestExecute(c *check.C) {
+       command := `foo 'bar' "baz"`
+       stdinData := "foobar\nbaz\n"
+       _, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
+       clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
+       for _, exitcode := range []int{0, 1, 2} {
+               srv := &testTarget{
+                       SSHService: test.SSHService{
+                               Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+                                       c.Check(cmd, check.Equals, command)
+                                       var wg sync.WaitGroup
+                                       wg.Add(2)
+                                       go func() {
+                                               io.WriteString(stdout, "stdout\n")
+                                               wg.Done()
+                                       }()
+                                       go func() {
+                                               io.WriteString(stderr, "stderr\n")
+                                               wg.Done()
+                                       }()
+                                       buf, err := ioutil.ReadAll(stdin)
+                                       wg.Wait()
+                                       c.Check(err, check.IsNil)
+                                       if err != nil {
+                                               return 99
+                                       }
+                                       _, err = stdout.Write(buf)
+                                       c.Check(err, check.IsNil)
+                                       return uint32(exitcode)
+                               },
+                               HostKey:        hostpriv,
+                               AuthorizedKeys: []ssh.PublicKey{clientpub},
+                       },
+               }
+               err := srv.Start()
+               c.Check(err, check.IsNil)
+               c.Logf("srv address %q", srv.Address())
+               defer srv.Close()
+
+               exr := New(srv)
+               exr.SetSigners(clientpriv)
+
+               done := make(chan bool)
+               go func() {
+                       stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+                       if exitcode == 0 {
+                               c.Check(err, check.IsNil)
+                       } else {
+                               c.Check(err, check.NotNil)
+                               err, ok := err.(*ssh.ExitError)
+                               c.Assert(ok, check.Equals, true)
+                               c.Check(err.ExitStatus(), check.Equals, exitcode)
+                       }
+                       c.Check(stdout, check.DeepEquals, []byte("stdout\n"+stdinData))
+                       c.Check(stderr, check.DeepEquals, []byte("stderr\n"))
+                       close(done)
+               }()
+
+               timeout := time.NewTimer(time.Second)
+               select {
+               case <-done:
+               case <-timeout.C:
+                       c.Fatal("timed out")
+               }
+       }
+}
diff --git a/lib/dispatchcloud/test/doc.go b/lib/dispatchcloud/test/doc.go
new file mode 100644 (file)
index 0000000..12f3b16
--- /dev/null
@@ -0,0 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package test provides fakes and other tools for testing cloud
+// drivers and other dispatcher modules.
+package test
diff --git a/lib/dispatchcloud/test/fixtures.go b/lib/dispatchcloud/test/fixtures.go
new file mode 100644 (file)
index 0000000..68bdb3d
--- /dev/null
@@ -0,0 +1,28 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// ContainerUUID returns a fake container UUID.
+func ContainerUUID(i int) string {
+       return fmt.Sprintf("zzzzz-dz642-%015d", i)
+}
+
+// InstanceType returns a fake arvados.InstanceType called "type{i}"
+// with i CPUs and i GiB of memory.
+func InstanceType(i int) arvados.InstanceType {
+       return arvados.InstanceType{
+               Name:         fmt.Sprintf("type%d", i),
+               ProviderType: fmt.Sprintf("providertype%d", i),
+               VCPUs:        i,
+               RAM:          arvados.ByteSize(i) << 30,
+               Price:        float64(i) * 0.123,
+       }
+}
diff --git a/lib/dispatchcloud/test/lame_instance_set.go b/lib/dispatchcloud/test/lame_instance_set.go
new file mode 100644 (file)
index 0000000..baab407
--- /dev/null
@@ -0,0 +1,118 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+       "math/rand"
+       "sync"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+// LameInstanceSet creates instances that boot but can't run
+// containers.
+type LameInstanceSet struct {
+       Hold chan bool // set to make(chan bool) to hold operations until Release is called
+
+       mtx       sync.Mutex
+       instances map[*lameInstance]bool
+}
+
+// Create returns a new instance.
+func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
+       inst := &lameInstance{
+               p:            p,
+               id:           cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
+               providerType: instType.ProviderType,
+       }
+       inst.SetTags(tags)
+       if p.Hold != nil {
+               p.Hold <- true
+       }
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       if p.instances == nil {
+               p.instances = map[*lameInstance]bool{}
+       }
+       p.instances[inst] = true
+       return inst, nil
+}
+
+// Instances returns the instances that haven't been destroyed.
+func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       var instances []cloud.Instance
+       for i := range p.instances {
+               instances = append(instances, i)
+       }
+       return instances, nil
+}
+
+// Stop is a no-op, but exists to satisfy cloud.InstanceSet.
+func (p *LameInstanceSet) Stop() {
+}
+
+// Release n held calls. Blocks if n calls aren't already
+// waiting. Blocks forever if Hold is nil.
+func (p *LameInstanceSet) Release(n int) {
+       for i := 0; i < n; i++ {
+               <-p.Hold
+       }
+}
+
+type lameInstance struct {
+       p            *LameInstanceSet
+       id           cloud.InstanceID
+       providerType string
+       tags         cloud.InstanceTags
+}
+
+func (inst *lameInstance) ID() cloud.InstanceID {
+       return inst.id
+}
+
+func (inst *lameInstance) String() string {
+       return fmt.Sprint(inst.id)
+}
+
+func (inst *lameInstance) ProviderType() string {
+       return inst.providerType
+}
+
+func (inst *lameInstance) Address() string {
+       return "0.0.0.0:1234"
+}
+
+func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
+       inst.p.mtx.Lock()
+       defer inst.p.mtx.Unlock()
+       inst.tags = cloud.InstanceTags{}
+       for k, v := range tags {
+               inst.tags[k] = v
+       }
+       return nil
+}
+
+func (inst *lameInstance) Destroy() error {
+       if inst.p.Hold != nil {
+               inst.p.Hold <- true
+       }
+       inst.p.mtx.Lock()
+       defer inst.p.mtx.Unlock()
+       delete(inst.p.instances, inst)
+       return nil
+}
+
+func (inst *lameInstance) Tags() cloud.InstanceTags {
+       return inst.tags
+}
+
+func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+       return nil
+}
diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go
new file mode 100644 (file)
index 0000000..fda04d5
--- /dev/null
@@ -0,0 +1,165 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Queue is a test stub for container.Queue. The caller specifies the
+// initial queue state.
+type Queue struct {
+       // Containers represent the API server database contents.
+       Containers []arvados.Container
+
+       // ChooseType will be called for each entry in Containers. It
+       // must not be nil.
+       ChooseType func(*arvados.Container) (arvados.InstanceType, error)
+
+       entries     map[string]container.QueueEnt
+       updTime     time.Time
+       subscribers map[<-chan struct{}]chan struct{}
+
+       mtx sync.Mutex
+}
+
+// Entries returns the containers that were queued when Update was
+// last called.
+func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       updTime := q.updTime
+       r := map[string]container.QueueEnt{}
+       for uuid, ent := range q.entries {
+               r[uuid] = ent
+       }
+       return r, updTime
+}
+
+// Get returns the container from the cached queue, i.e., as it was
+// when Update was last called -- just like a container.Queue does. If
+// the state has been changed (via Lock, Unlock, or Cancel) since the
+// last Update, the updated state is returned.
+func (q *Queue) Get(uuid string) (arvados.Container, bool) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       ent, ok := q.entries[uuid]
+       return ent.Container, ok
+}
+
+func (q *Queue) Forget(uuid string) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       delete(q.entries, uuid)
+}
+
+func (q *Queue) Lock(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
+}
+
+func (q *Queue) Unlock(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
+}
+
+func (q *Queue) Cancel(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
+}
+
+func (q *Queue) Subscribe() <-chan struct{} {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       if q.subscribers == nil {
+               q.subscribers = map[<-chan struct{}]chan struct{}{}
+       }
+       ch := make(chan struct{}, 1)
+       q.subscribers[ch] = ch
+       return ch
+}
+
+func (q *Queue) Unsubscribe(ch <-chan struct{}) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       delete(q.subscribers, ch)
+}
+
+func (q *Queue) notify() {
+       for _, ch := range q.subscribers {
+               select {
+               case ch <- struct{}{}:
+               default:
+               }
+       }
+}
+
+// caller must have lock.
+func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
+       ent := q.entries[uuid]
+       if ent.Container.State != from {
+               return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
+       }
+       ent.Container.State = to
+       q.entries[uuid] = ent
+       for i, ctr := range q.Containers {
+               if ctr.UUID == uuid {
+                       q.Containers[i].State = to
+                       break
+               }
+       }
+       q.notify()
+       return nil
+}
+
+// Update rebuilds the current entries from the Containers slice.
+func (q *Queue) Update() error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       updTime := time.Now()
+       upd := map[string]container.QueueEnt{}
+       for _, ctr := range q.Containers {
+               _, exists := q.entries[ctr.UUID]
+               if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
+                       continue
+               }
+               it, _ := q.ChooseType(&ctr)
+               upd[ctr.UUID] = container.QueueEnt{
+                       Container:    ctr,
+                       InstanceType: it,
+               }
+       }
+       q.entries = upd
+       q.updTime = updTime
+       q.notify()
+       return nil
+}
+
+// Notify adds/updates an entry in the Containers slice.  This
+// simulates the effect of an API update from someone other than the
+// dispatcher -- e.g., crunch-run updating state to "Complete" when a
+// container exits.
+//
+// The resulting changes are not exposed through Get() or Entries()
+// until the next call to Update().
+func (q *Queue) Notify(upd arvados.Container) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       for i, ctr := range q.Containers {
+               if ctr.UUID == upd.UUID {
+                       q.Containers[i] = upd
+                       return
+               }
+       }
+       q.Containers = append(q.Containers, upd)
+}
diff --git a/lib/dispatchcloud/test/ssh_service.go b/lib/dispatchcloud/test/ssh_service.go
new file mode 100644 (file)
index 0000000..b1e4e03
--- /dev/null
@@ -0,0 +1,169 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "net"
+       "strings"
+       "sync"
+
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) {
+       rawpubkey, err := ioutil.ReadFile(fnm + ".pub")
+       c.Assert(err, check.IsNil)
+       pubkey, _, _, _, err := ssh.ParseAuthorizedKey(rawpubkey)
+       c.Assert(err, check.IsNil)
+       rawprivkey, err := ioutil.ReadFile(fnm)
+       c.Assert(err, check.IsNil)
+       privkey, err := ssh.ParsePrivateKey(rawprivkey)
+       c.Assert(err, check.IsNil)
+       return pubkey, privkey
+}
+
+// An SSHExecFunc handles an "exec" session on a multiplexed SSH
+// connection.
+type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+
+// An SSHService accepts SSH connections on an available TCP port and
+// passes clients' "exec" sessions to the provided SSHExecFunc.
+type SSHService struct {
+       Exec           SSHExecFunc
+       HostKey        ssh.Signer
+       AuthorizedKeys []ssh.PublicKey
+
+       listener net.Listener
+       conn     *ssh.ServerConn
+       setup    sync.Once
+       mtx      sync.Mutex
+       started  chan bool
+       closed   bool
+       err      error
+}
+
+// Address returns the host:port where the SSH server is listening. It
+// returns "" if called before the server is ready to accept
+// connections.
+func (ss *SSHService) Address() string {
+       ss.setup.Do(ss.start)
+       ss.mtx.Lock()
+       ln := ss.listener
+       ss.mtx.Unlock()
+       if ln == nil {
+               return ""
+       }
+       return ln.Addr().String()
+}
+
+// Close shuts down the server and releases resources. Established
+// connections are unaffected.
+func (ss *SSHService) Close() {
+       ss.Start()
+       ss.mtx.Lock()
+       ln := ss.listener
+       ss.closed = true
+       ss.mtx.Unlock()
+       if ln != nil {
+               ln.Close()
+       }
+}
+
+// Start returns when the server is ready to accept connections.
+func (ss *SSHService) Start() error {
+       ss.setup.Do(ss.start)
+       <-ss.started
+       return ss.err
+}
+
+func (ss *SSHService) start() {
+       ss.started = make(chan bool)
+       go ss.run()
+}
+
+func (ss *SSHService) run() {
+       defer close(ss.started)
+       config := &ssh.ServerConfig{
+               PublicKeyCallback: func(c ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) {
+                       for _, ak := range ss.AuthorizedKeys {
+                               if bytes.Equal(ak.Marshal(), pubKey.Marshal()) {
+                                       return &ssh.Permissions{}, nil
+                               }
+                       }
+                       return nil, fmt.Errorf("unknown public key for %q", c.User())
+               },
+       }
+       config.AddHostKey(ss.HostKey)
+
+       listener, err := net.Listen("tcp", ":")
+       if err != nil {
+               ss.err = err
+               return
+       }
+
+       ss.mtx.Lock()
+       ss.listener = listener
+       ss.mtx.Unlock()
+
+       go func() {
+               for {
+                       nConn, err := listener.Accept()
+                       if err != nil && strings.Contains(err.Error(), "use of closed network connection") && ss.closed {
+                               return
+                       } else if err != nil {
+                               log.Printf("accept: %s", err)
+                               return
+                       }
+                       go ss.serveConn(nConn, config)
+               }
+       }()
+}
+
+func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) {
+       defer nConn.Close()
+       conn, newchans, reqs, err := ssh.NewServerConn(nConn, config)
+       if err != nil {
+               log.Printf("ssh.NewServerConn: %s", err)
+               return
+       }
+       defer conn.Close()
+       go ssh.DiscardRequests(reqs)
+       for newch := range newchans {
+               if newch.ChannelType() != "session" {
+                       newch.Reject(ssh.UnknownChannelType, "unknown channel type")
+                       continue
+               }
+               ch, reqs, err := newch.Accept()
+               if err != nil {
+                       log.Printf("accept channel: %s", err)
+                       return
+               }
+               var execReq struct {
+                       Command string
+               }
+               go func() {
+                       for req := range reqs {
+                               if req.Type == "exec" && execReq.Command == "" {
+                                       req.Reply(true, nil)
+                                       ssh.Unmarshal(req.Payload, &execReq)
+                                       go func() {
+                                               var resp struct {
+                                                       Status uint32
+                                               }
+                                               resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+                                               ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
+                                               ch.Close()
+                                       }()
+                               }
+                       }
+               }()
+       }
+}
diff --git a/lib/dispatchcloud/test/sshkey_dispatch b/lib/dispatchcloud/test/sshkey_dispatch
new file mode 100644 (file)
index 0000000..5584519
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAqYm4XsQHm8sBSZFwUX5VeW1OkGsfoNzcGPG2nzzYRhNhClYZ
+0ABHhUk82HkaC/8l6d/jpYTf42HrK42nNQ0r0Yzs7qw8yZMQioK4Yk+kFyVLF78E
+GRG4pGAWXFs6pUchs/lm8fo9zcda4R3XeqgI+NO+nEERXmdRJa1FhI+Za3/S/+CV
+mg+6O00wZz2+vKmDPptGN4MCKmQOCKsMJts7wSZGyVcTtdNv7jjfr6yPAIOIL8X7
+LtarBCFaK/pD7uWll/Uj7h7D8K48nIZUrvBJJjXL8Sm4LxCNoz3Z83k8J5ZzuDRD
+gRiQe/C085mhO6VL+2fypDLwcKt1tOL8fI81MwIDAQABAoIBACR3tEnmHsDbNOav
+Oxq8cwRQh9K2yDHg8BMJgz/TZa4FIx2HEbxVIw0/iLADtJ+Z/XzGJQCIiWQuvtg6
+exoFQESt7JUWRWkSkj9JCQJUoTY9Vl7APtBpqG7rIEQzd3TvzQcagZNRQZQO6rR7
+p8sBdBSZ72lK8cJ9tM3G7Kor/VNK7KgRZFNhEWnmvEa3qMd4hzDcQ4faOn7C9NZK
+dwJAuJVVfwOLlOORYcyEkvksLaDOK2DsB/p0AaCpfSmThRbBKN5fPXYaKgUdfp3w
+70Hpp27WWymb1cgjyqSH3DY+V/kvid+5QxgxCBRq865jPLn3FFT9bWEVS/0wvJRj
+iMIRrjECgYEA4Ffv9rBJXqVXonNQbbstd2PaprJDXMUy9/UmfHL6pkq1xdBeuM7v
+yf2ocXheA8AahHtIOhtgKqwv/aRhVK0ErYtiSvIk+tXG+dAtj/1ZAKbKiFyxjkZV
+X72BH7cTlR6As5SRRfWM/HaBGEgED391gKsI5PyMdqWWdczT5KfxAksCgYEAwXYE
+ewPmV1GaR5fbh2RupoPnUJPMj36gJCnwls7sGaXDQIpdlq56zfKgrLocGXGgj+8f
+QH7FHTJQO15YCYebtsXWwB3++iG43gVlJlecPAydsap2CCshqNWC5JU5pan0QzsP
+exzNzWqfUPSbTkR2SRaN+MenZo2Y/WqScOAth7kCgYBgVoLujW9EXH5QfXJpXLq+
+jTvE38I7oVcs0bJwOLPYGzcJtlwmwn6IYAwohgbhV2pLv+EZSs42JPEK278MLKxY
+lgVkp60npgunFTWroqDIvdc1TZDVxvA8h9VeODEJlSqxczgbMcIUXBM9yRctTI+5
+7DiKlMUA4kTFW2sWwuOlFwKBgGXvrYS0FVbFJKm8lmvMu5D5x5RpjEu/yNnFT4Pn
+G/iXoz4Kqi2PWh3STl804UF24cd1k94D7hDoReZCW9kJnz67F+C67XMW+bXi2d1O
+JIBvlVfcHb1IHMA9YG7ZQjrMRmx2Xj3ce4RVPgUGHh8ra7gvLjd72/Tpf0doNClN
+ti/hAoGBAMW5D3LhU05LXWmOqpeT4VDgqk4MrTBcstVe7KdVjwzHrVHCAmI927vI
+pjpphWzpC9m3x4OsTNf8m+g6H7f3IiQS0aiFNtduXYlcuT5FHS2fSATTzg5PBon9
+1E6BudOve+WyFyBs7hFWAqWFBdWujAl4Qk5Ek09U2ilFEPE7RTgJ
+-----END RSA PRIVATE KEY-----
diff --git a/lib/dispatchcloud/test/sshkey_dispatch.pub b/lib/dispatchcloud/test/sshkey_dispatch.pub
new file mode 100644 (file)
index 0000000..1d5c1ea
--- /dev/null
@@ -0,0 +1 @@
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCpibhexAebywFJkXBRflV5bU6Qax+g3NwY8bafPNhGE2EKVhnQAEeFSTzYeRoL/yXp3+OlhN/jYesrjac1DSvRjOzurDzJkxCKgrhiT6QXJUsXvwQZEbikYBZcWzqlRyGz+Wbx+j3Nx1rhHdd6qAj4076cQRFeZ1ElrUWEj5lrf9L/4JWaD7o7TTBnPb68qYM+m0Y3gwIqZA4Iqwwm2zvBJkbJVxO102/uON+vrI8Ag4gvxfsu1qsEIVor+kPu5aWX9SPuHsPwrjychlSu8EkmNcvxKbgvEI2jPdnzeTwnlnO4NEOBGJB78LTzmaE7pUv7Z/KkMvBwq3W04vx8jzUz tom@curve
diff --git a/lib/dispatchcloud/test/sshkey_vm b/lib/dispatchcloud/test/sshkey_vm
new file mode 100644 (file)
index 0000000..10b7ed1
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEApIfWk2StZGDtmunumIeXLJ46AQrbHHvuxrSAkQf6+zUwjB2I
+rse7ezBRHWcge9U5EsigixmhUM4ozFLnUQNwC862jbmsjbyA97arG/REECNlUrEB
+HQPYHhai5yyJ89AfjWVxKyINfW0K2HX1R8nl4kdVraAgpohPLh0dGjfwzm/BcXDG
++TxW9zRz0KCs9ZRI6s2MNdv08ahKQ0azk8gRTqMADJmYNWIo3zPQ+fhlwyr6EZJ/
+HFbRtjpajEPMJPwoVPO+Wj6wztfHDYKkPIrIWbhMl6w+tEKdsmygd3Iq94ktLS3X
+AbRCfn4njS2QSlkKFEepkUJWCSSWZgFn6DLm2wIDAQABAoIBAQCb137LxcTnG1h0
+L7isCWKMBKN0cU/xvwIAfOB6f1CfuVXuodrhkpZmrPFoJFKEeQbCX/6RQwmlfGDw
+iGZKOjNbO8V2oLRs3GxcNk4FAG2ny58hoD8puIZwmYhb57gTlMMOL1PuQyb78tkf
+Bzv5b6ermV3yQ4Ypt1solrMGLo6NOZD0oDX9p0Zt9kueIhjzgP0v5//T1F4PGHZK
++sLSsMiu9u6F+PB+Oc6uv0Zee9Lnts/QiWH5f18oEculjwKWFx+JwJWiLffGg2Bl
+vbpmvHFRoRWkHTpgSiLwSUqs0ZUWU9R5h11ROg5L39MLsxQoBvHsPEnP5ssN8jGt
+aH86EZjBAoGBAM+A5B/UjhIn9m05EhDTDRzI92hGhM8f7uAwobbnjvIQyZbWlBwj
+2TmgbJdpTGVbD+iTBIwKQdcFBbWobTCZsNMpghqA/ir4YIAnZ5OX9VQ1Bc+bWE7V
+dPmMVpCgyg+ERAe+79FrYWcI3vhnBpHCsY/9p9pGQIKDzlGTWNF1HJGjAoGBAMr7
+2CTVnFImTgD3E+rH4AAAfkz+cyqfK6BUhli/NifFYZhWCs16r9QCGSORnp4gPhMY
+3mf7VBs9rk123zOMo89eJt3adTgbZ+QIxXeXilGXpbT3w1+CJMaZRrIy80E1tB5/
+KvDZcrZ78o8XWMNUa+9k55ukvgyC24ICAmOIWNlpAoGBALEFvphBF2r52MtZUsYz
+pw4VjKvS7V5eWcW891k4tsRf+frK2NQg6SK2b63EUT5ur2W0dr6ZyY2MZVCSfYRm
+uWmMEchWn389IeZyt3Q8wTize1+foXivtflm9jqwUXFnXzpUc/du6kuiT8YO7pXP
+SPgUZ+xY3pP5qjwBvlYC2PqNAoGAZ1CKMi1bdGC0wT8BLzXuqHGX136HhcEgRmnf
+O5qPaOzJAO2CcBWrGuC6hOUgc+F7VuMIiKpeo8LgTeNcNfO2iNymMbN4iEdCuMlS
+IM3MBD2IhTS6h4lJSKBJYHgYYi+AbylQ5Of4wDMUQYqjjkAQ8/dK/2h5pwqPyXtW
+VezXNEkCgYEAq4S0++y9tjlLn+w9BIkmx3bAVRDQZIzIEwxTh+jpqaUp1J0iyseJ
+71pwqQojGNF6x8GglVXa6bMrETae21WhEeHnWmzlpCWIODsYPUQ+erjDuAWi9eGk
+HLklqSEoLB8pzC6zDqjxDw+CnGERIDSaoaeoWiNKZ95IH1WiEwYjuxU=
+-----END RSA PRIVATE KEY-----
diff --git a/lib/dispatchcloud/test/sshkey_vm.pub b/lib/dispatchcloud/test/sshkey_vm.pub
new file mode 100644 (file)
index 0000000..b9d44c9
--- /dev/null
@@ -0,0 +1 @@
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCkh9aTZK1kYO2a6e6Yh5csnjoBCtsce+7GtICRB/r7NTCMHYiux7t7MFEdZyB71TkSyKCLGaFQzijMUudRA3ALzraNuayNvID3tqsb9EQQI2VSsQEdA9geFqLnLInz0B+NZXErIg19bQrYdfVHyeXiR1WtoCCmiE8uHR0aN/DOb8FxcMb5PFb3NHPQoKz1lEjqzYw12/TxqEpDRrOTyBFOowAMmZg1YijfM9D5+GXDKvoRkn8cVtG2OlqMQ8wk/ChU875aPrDO18cNgqQ8ishZuEyXrD60Qp2ybKB3cir3iS0tLdcBtEJ+fieNLZBKWQoUR6mRQlYJJJZmAWfoMubb tom@curve
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
new file mode 100644 (file)
index 0000000..bfe0f8f
--- /dev/null
@@ -0,0 +1,201 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "crypto/rand"
+       "errors"
+       "fmt"
+       "io"
+       math_rand "math/rand"
+       "sync"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/mitchellh/mapstructure"
+       "golang.org/x/crypto/ssh"
+)
+
+type StubExecFunc func(instance cloud.Instance, command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+
+// A StubDriver implements cloud.Driver by setting up local SSH
+// servers that pass their command execution requests to the provided
+// SSHExecFunc.
+type StubDriver struct {
+       Exec           StubExecFunc
+       HostKey        ssh.Signer
+       AuthorizedKeys []ssh.PublicKey
+
+       ErrorRateDestroy float64
+
+       instanceSets []*StubInstanceSet
+}
+
+// InstanceSet returns a new *StubInstanceSet.
+func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
+       sis := StubInstanceSet{
+               driver:  sd,
+               servers: map[cloud.InstanceID]*stubServer{},
+       }
+       sd.instanceSets = append(sd.instanceSets, &sis)
+       return &sis, mapstructure.Decode(params, &sis)
+}
+
+// InstanceSets returns all instances that have been created by the
+// driver. This can be used to test a component that uses the driver
+// but doesn't expose the InstanceSets it has created.
+func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
+       return sd.instanceSets
+}
+
+type StubInstanceSet struct {
+       driver  *StubDriver
+       servers map[cloud.InstanceID]*stubServer
+       mtx     sync.RWMutex
+       stopped bool
+}
+
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       if sis.stopped {
+               return nil, errors.New("StubInstanceSet: Create called after Stop")
+       }
+       ak := sis.driver.AuthorizedKeys
+       if authKey != nil {
+               ak = append([]ssh.PublicKey{authKey}, ak...)
+       }
+       var ss *stubServer
+       ss = &stubServer{
+               sis:          sis,
+               id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
+               tags:         copyTags(tags),
+               providerType: it.ProviderType,
+               SSHService: SSHService{
+                       HostKey:        sis.driver.HostKey,
+                       AuthorizedKeys: ak,
+                       Exec: func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+                               return sis.driver.Exec(ss.Instance(), command, stdin, stdout, stderr)
+                       },
+               },
+       }
+
+       sis.servers[ss.id] = ss
+       return ss.Instance(), nil
+}
+
+func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+       sis.mtx.RLock()
+       defer sis.mtx.RUnlock()
+       var r []cloud.Instance
+       for _, ss := range sis.servers {
+               r = append(r, ss.Instance())
+       }
+       return r, nil
+}
+
+func (sis *StubInstanceSet) Stop() {
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       if sis.stopped {
+               panic("Stop called twice")
+       }
+       sis.stopped = true
+}
+
+type stubServer struct {
+       sis          *StubInstanceSet
+       id           cloud.InstanceID
+       tags         cloud.InstanceTags
+       providerType string
+       SSHService   SSHService
+       sync.Mutex
+}
+
+func (ss *stubServer) Instance() stubInstance {
+       ss.Lock()
+       defer ss.Unlock()
+       return stubInstance{
+               ss:   ss,
+               addr: ss.SSHService.Address(),
+               // We deliberately return a cached/stale copy of the
+               // real tags here, so that (Instance)Tags() sometimes
+               // returns old data after a call to
+               // (Instance)SetTags().  This is permitted by the
+               // driver interface, and this might help remind
+               // callers that they need to tolerate it.
+               tags: copyTags(ss.tags),
+       }
+}
+
+type stubInstance struct {
+       ss   *stubServer
+       addr string
+       tags cloud.InstanceTags
+}
+
+func (si stubInstance) ID() cloud.InstanceID {
+       return si.ss.id
+}
+
+func (si stubInstance) Address() string {
+       return si.addr
+}
+
+func (si stubInstance) Destroy() error {
+       if math_rand.Float64() < si.ss.sis.driver.ErrorRateDestroy {
+               return errors.New("instance could not be destroyed")
+       }
+       si.ss.SSHService.Close()
+       sis := si.ss.sis
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       delete(sis.servers, si.ss.id)
+       return nil
+}
+
+func (si stubInstance) ProviderType() string {
+       return si.ss.providerType
+}
+
+func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
+       tags = copyTags(tags)
+       ss := si.ss
+       go func() {
+               ss.Lock()
+               defer ss.Unlock()
+               ss.tags = tags
+       }()
+       return nil
+}
+
+func (si stubInstance) Tags() cloud.InstanceTags {
+       return si.tags
+}
+
+func (si stubInstance) String() string {
+       return string(si.ss.id)
+}
+
+func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
+       buf := make([]byte, 512)
+       _, err := io.ReadFull(rand.Reader, buf)
+       if err != nil {
+               return err
+       }
+       sig, err := si.ss.sis.driver.HostKey.Sign(rand.Reader, buf)
+       if err != nil {
+               return err
+       }
+       return key.Verify(buf, sig)
+}
+
+func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
+       dst := cloud.InstanceTags{}
+       for k, v := range src {
+               dst[k] = v
+       }
+       return dst
+}
diff --git a/lib/dispatchcloud/worker/gocheck_test.go b/lib/dispatchcloud/worker/gocheck_test.go
new file mode 100644 (file)
index 0000000..b4ca66c
--- /dev/null
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "testing"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
new file mode 100644 (file)
index 0000000..37add6d
--- /dev/null
@@ -0,0 +1,658 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "io"
+       "sort"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+       tagKeyInstanceType = "InstanceType"
+       tagKeyHold         = "Hold"
+)
+
+// An InstanceView shows a worker's current state and recent activity.
+type InstanceView struct {
+       Instance             string
+       Price                float64
+       ArvadosInstanceType  string
+       ProviderInstanceType string
+       LastContainerUUID    string
+       LastBusy             time.Time
+       WorkerState          string
+}
+
+// An Executor executes shell commands on a remote host.
+type Executor interface {
+       // Run cmd on the current target.
+       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+
+       // Use the given target for subsequent operations. The new
+       // target is the same host as the previous target, but it
+       // might return a different address and verify a different
+       // host key.
+       //
+       // SetTarget is called frequently, and in most cases the new
+       // target will behave exactly the same as the old one. An
+       // implementation should optimize accordingly.
+       //
+       // SetTarget must not block on concurrent Execute calls.
+       SetTarget(cloud.ExecutorTarget)
+
+       Close()
+}
+
+const (
+       defaultSyncInterval       = time.Minute
+       defaultProbeInterval      = time.Second * 10
+       defaultMaxProbesPerSecond = 10
+       defaultTimeoutIdle        = time.Minute
+       defaultTimeoutBooting     = time.Minute * 10
+       defaultTimeoutProbe       = time.Minute * 10
+       defaultTimeoutShutdown    = time.Second * 10
+)
+
+func duration(conf arvados.Duration, def time.Duration) time.Duration {
+       if conf > 0 {
+               return time.Duration(conf)
+       } else {
+               return def
+       }
+}
+
+// NewPool creates a Pool of workers backed by instanceSet.
+//
+// New instances are configured and set up according to the given
+// cluster configuration.
+func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+       wp := &Pool{
+               logger:             logger,
+               instanceSet:        instanceSet,
+               newExecutor:        newExecutor,
+               bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
+               imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
+               instanceTypes:      cluster.InstanceTypes,
+               maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
+               probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
+               syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+       }
+       wp.registerMetrics(reg)
+       go func() {
+               wp.setupOnce.Do(wp.setup)
+               go wp.runMetrics()
+               go wp.runProbes()
+               go wp.runSync()
+       }()
+       return wp
+}
+
+// Pool is a resizable worker pool backed by a cloud.InstanceSet. A
+// zero Pool should not be used. Call NewPool to create a new Pool.
+type Pool struct {
+       // configuration
+       logger             logrus.FieldLogger
+       instanceSet        cloud.InstanceSet
+       newExecutor        func(cloud.Instance) Executor
+       bootProbeCommand   string
+       imageID            cloud.ImageID
+       instanceTypes      map[string]arvados.InstanceType
+       syncInterval       time.Duration
+       probeInterval      time.Duration
+       maxProbesPerSecond int
+       timeoutIdle        time.Duration
+       timeoutBooting     time.Duration
+       timeoutProbe       time.Duration
+       timeoutShutdown    time.Duration
+
+       // private state
+       subscribers  map[<-chan struct{}]chan<- struct{}
+       creating     map[arvados.InstanceType]int // goroutines waiting for (InstanceSet)Create to return
+       workers      map[cloud.InstanceID]*worker
+       loaded       bool                 // loaded list of instances from InstanceSet at least once
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       atQuotaUntil time.Time
+       atQuotaErr   cloud.QuotaError
+       stop         chan bool
+       mtx          sync.RWMutex
+       setupOnce    sync.Once
+
+       mInstances         prometheus.Gauge
+       mContainersRunning prometheus.Gauge
+       mVCPUs             prometheus.Gauge
+       mVCPUsInuse        prometheus.Gauge
+       mMemory            prometheus.Gauge
+       mMemoryInuse       prometheus.Gauge
+}
+
+// Subscribe returns a channel that becomes ready whenever a worker's
+// state changes.
+//
+// Example:
+//
+//     ch := wp.Subscribe()
+//     defer wp.Unsubscribe(ch)
+//     for range ch {
+//             // ...try scheduling some work...
+//             if done {
+//                     break
+//             }
+//     }
+func (wp *Pool) Subscribe() <-chan struct{} {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       ch := make(chan struct{}, 1)
+       wp.subscribers[ch] = ch
+       return ch
+}
+
+// Unsubscribe stops sending updates to the given channel.
+func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       delete(wp.subscribers, ch)
+}
+
+// Unallocated returns the number of unallocated (creating + booting +
+// idle + unknown) workers for each instance type.
+//
+// The returned counts should be interpreted as upper bounds, rather
+// than exact counts: they are sometimes artificially high when a
+// newly created instance appears in the driver's Instances() list
+// before the Create() call returns.
+func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       u := map[arvados.InstanceType]int{}
+       for it, c := range wp.creating {
+               u[it] = c
+       }
+       for _, wkr := range wp.workers {
+               if wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown {
+                       u[wkr.instType]++
+               }
+       }
+       return u
+}
+
+// Create a new instance with the given type, and add it to the worker
+// pool. The worker is added immediately; instance creation runs in
+// the background.
+func (wp *Pool) Create(it arvados.InstanceType) error {
+       logger := wp.logger.WithField("InstanceType", it.Name)
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if time.Now().Before(wp.atQuotaUntil) {
+               return wp.atQuotaErr
+       }
+       tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
+       wp.creating[it]++
+       go func() {
+               defer wp.notify()
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               wp.mtx.Lock()
+               defer wp.mtx.Unlock()
+               wp.creating[it]--
+               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                       wp.atQuotaErr = err
+                       wp.atQuotaUntil = time.Now().Add(time.Minute)
+               }
+               if err != nil {
+                       logger.WithError(err).Error("create failed")
+                       return
+               }
+               wp.updateWorker(inst, it, StateBooting)
+       }()
+       return nil
+}
+
+// AtQuota returns true if Create is not expected to work at the
+// moment.
+func (wp *Pool) AtQuota() bool {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       return time.Now().Before(wp.atQuotaUntil)
+}
+
+// Add or update worker attached to the given instance. Use
+// initialState if a new worker is created.
+//
+// The second return value is true if a new worker is created.
+//
+// Caller must have lock.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+       id := inst.ID()
+       if wkr := wp.workers[id]; wkr != nil {
+               wkr.executor.SetTarget(inst)
+               wkr.instance = inst
+               wkr.updated = time.Now()
+               if initialState == StateBooting && wkr.state == StateUnknown {
+                       wkr.state = StateBooting
+               }
+               return wkr, false
+       }
+       if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
+               initialState = StateHold
+       }
+       logger := wp.logger.WithFields(logrus.Fields{
+               "InstanceType": it.Name,
+               "Instance":     inst,
+       })
+       logger.WithField("State", initialState).Infof("instance appeared in cloud")
+       now := time.Now()
+       wkr := &worker{
+               mtx:      &wp.mtx,
+               wp:       wp,
+               logger:   logger,
+               executor: wp.newExecutor(inst),
+               state:    initialState,
+               instance: inst,
+               instType: it,
+               probed:   now,
+               busy:     now,
+               updated:  now,
+               running:  make(map[string]struct{}),
+               starting: make(map[string]struct{}),
+               probing:  make(chan struct{}, 1),
+       }
+       wp.workers[id] = wkr
+       return wkr, true
+}
+
+// caller must have lock.
+func (wp *Pool) notifyExited(uuid string, t time.Time) {
+       wp.exited[uuid] = t
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       logger := wp.logger.WithField("InstanceType", it.Name)
+       logger.Info("shutdown requested")
+       for _, tryState := range []State{StateBooting, StateIdle} {
+               // TODO: shutdown the worker with the longest idle
+               // time (Idle) or the earliest create time (Booting)
+               for _, wkr := range wp.workers {
+                       if wkr.state == tryState && wkr.instType == it {
+                               logger.WithField("Instance", wkr.instance).Info("shutting down")
+                               wkr.shutdown()
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+// Workers returns the current number of workers in each state.
+func (wp *Pool) Workers() map[State]int {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       r := map[State]int{}
+       for _, w := range wp.workers {
+               r[w.state]++
+       }
+       return r
+}
+
+// Running returns the container UUIDs being prepared/run on workers.
+func (wp *Pool) Running() map[string]time.Time {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       r := map[string]time.Time{}
+       for _, wkr := range wp.workers {
+               for uuid := range wkr.running {
+                       r[uuid] = time.Time{}
+               }
+               for uuid := range wkr.starting {
+                       r[uuid] = time.Time{}
+               }
+       }
+       for uuid, exited := range wp.exited {
+               r[uuid] = exited
+       }
+       return r
+}
+
+// StartContainer starts a container on an idle worker immediately if
+// possible, otherwise returns false.
+func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       var wkr *worker
+       for _, w := range wp.workers {
+               if w.instType == it && w.state == StateIdle {
+                       if wkr == nil || w.busy.After(wkr.busy) {
+                               wkr = w
+                       }
+               }
+       }
+       if wkr == nil {
+               return false
+       }
+       wkr.startContainer(ctr)
+       return true
+}
+
+// KillContainer kills the crunch-run process for the given container
+// UUID, if it's running on any worker.
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
+func (wp *Pool) KillContainer(uuid string) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
+               return
+       }
+       for _, wkr := range wp.workers {
+               if _, ok := wkr.running[uuid]; ok {
+                       go wp.kill(wkr, uuid)
+                       return
+               }
+       }
+       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
+}
+
+func (wp *Pool) kill(wkr *worker, uuid string) {
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Instance":      wkr.instance,
+       })
+       logger.Debug("killing process")
+       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill "+uuid, nil)
+       if err != nil {
+               logger.WithFields(logrus.Fields{
+                       "stderr": string(stderr),
+                       "stdout": string(stdout),
+                       "error":  err,
+               }).Warn("kill failed")
+               return
+       }
+       logger.Debug("killing process succeeded")
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wkr.running[uuid]; ok {
+               delete(wkr.running, uuid)
+               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+                       wkr.state = StateIdle
+               }
+               wkr.updated = time.Now()
+               go wp.notify()
+       }
+}
+
+func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
+       if reg == nil {
+               reg = prometheus.NewRegistry()
+       }
+       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
+       })
+       reg.MustRegister(wp.mInstances)
+       wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "containers_running",
+               Help:      "Number of containers reported running by cloud VMs.",
+       })
+       reg.MustRegister(wp.mContainersRunning)
+
+       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "vcpus_total",
+               Help:      "Total VCPUs on all cloud VMs.",
+       })
+       reg.MustRegister(wp.mVCPUs)
+       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "vcpus_inuse",
+               Help:      "VCPUs on cloud VMs that are running containers.",
+       })
+       reg.MustRegister(wp.mVCPUsInuse)
+       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "memory_bytes_total",
+               Help:      "Total memory on all cloud VMs.",
+       })
+       reg.MustRegister(wp.mMemory)
+       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "memory_bytes_inuse",
+               Help:      "Memory on cloud VMs that are running containers.",
+       })
+       reg.MustRegister(wp.mMemoryInuse)
+}
+
+func (wp *Pool) runMetrics() {
+       ch := wp.Subscribe()
+       defer wp.Unsubscribe(ch)
+       for range ch {
+               wp.updateMetrics()
+       }
+}
+
+func (wp *Pool) updateMetrics() {
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+
+       var alloc, cpu, cpuInuse, mem, memInuse int64
+       for _, wkr := range wp.workers {
+               cpu += int64(wkr.instType.VCPUs)
+               mem += int64(wkr.instType.RAM)
+               if len(wkr.running)+len(wkr.starting) == 0 {
+                       continue
+               }
+               alloc += int64(len(wkr.running) + len(wkr.starting))
+               cpuInuse += int64(wkr.instType.VCPUs)
+               memInuse += int64(wkr.instType.RAM)
+       }
+       wp.mInstances.Set(float64(len(wp.workers)))
+       wp.mContainersRunning.Set(float64(alloc))
+       wp.mVCPUs.Set(float64(cpu))
+       wp.mMemory.Set(float64(mem))
+       wp.mVCPUsInuse.Set(float64(cpuInuse))
+       wp.mMemoryInuse.Set(float64(memInuse))
+}
+
+func (wp *Pool) runProbes() {
+       maxPPS := wp.maxProbesPerSecond
+       if maxPPS < 1 {
+               maxPPS = defaultMaxProbesPerSecond
+       }
+       limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+       defer limitticker.Stop()
+
+       probeticker := time.NewTicker(wp.probeInterval)
+       defer probeticker.Stop()
+
+       workers := []cloud.InstanceID{}
+       for range probeticker.C {
+               workers = workers[:0]
+               wp.mtx.Lock()
+               for id, wkr := range wp.workers {
+                       if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
+                               continue
+                       }
+                       workers = append(workers, id)
+               }
+               wp.mtx.Unlock()
+
+               for _, id := range workers {
+                       wp.mtx.Lock()
+                       wkr, ok := wp.workers[id]
+                       wp.mtx.Unlock()
+                       if !ok {
+                               // Deleted while we were probing
+                               // others
+                               continue
+                       }
+                       go wkr.ProbeAndUpdate()
+                       select {
+                       case <-wp.stop:
+                               return
+                       case <-limitticker.C:
+                       }
+               }
+       }
+}
+
+func (wp *Pool) runSync() {
+       // sync once immediately, then wait syncInterval, sync again,
+       // etc.
+       timer := time.NewTimer(1)
+       for {
+               select {
+               case <-timer.C:
+                       err := wp.getInstancesAndSync()
+                       if err != nil {
+                               wp.logger.WithError(err).Warn("sync failed")
+                       }
+                       timer.Reset(wp.syncInterval)
+               case <-wp.stop:
+                       wp.logger.Debug("worker.Pool stopped")
+                       return
+               }
+       }
+}
+
+// Stop synchronizing with the InstanceSet.
+func (wp *Pool) Stop() {
+       wp.setupOnce.Do(wp.setup)
+       close(wp.stop)
+}
+
+// Instances returns an InstanceView for each worker in the pool,
+// summarizing its current state and recent activity.
+func (wp *Pool) Instances() []InstanceView {
+       var r []InstanceView
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       for _, w := range wp.workers {
+               r = append(r, InstanceView{
+                       Instance:             w.instance.String(),
+                       Price:                w.instType.Price,
+                       ArvadosInstanceType:  w.instType.Name,
+                       ProviderInstanceType: w.instType.ProviderType,
+                       LastContainerUUID:    w.lastUUID,
+                       LastBusy:             w.busy,
+                       WorkerState:          w.state.String(),
+               })
+       }
+       wp.mtx.Unlock()
+       sort.Slice(r, func(i, j int) bool {
+               return strings.Compare(r[i].Instance, r[j].Instance) < 0
+       })
+       return r
+}
+
+func (wp *Pool) setup() {
+       wp.creating = map[arvados.InstanceType]int{}
+       wp.exited = map[string]time.Time{}
+       wp.workers = map[cloud.InstanceID]*worker{}
+       wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+}
+
+func (wp *Pool) notify() {
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for _, send := range wp.subscribers {
+               select {
+               case send <- struct{}{}:
+               default:
+               }
+       }
+}
+
+func (wp *Pool) getInstancesAndSync() error {
+       wp.setupOnce.Do(wp.setup)
+       wp.logger.Debug("getting instance list")
+       threshold := time.Now()
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       if err != nil {
+               return err
+       }
+       wp.sync(threshold, instances)
+       wp.logger.Debug("sync done")
+       return nil
+}
+
+// Add/remove/update workers based on instances, which was obtained
+// from the instanceSet. However, don't clobber any other updates that
+// already happened after threshold.
+func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
+       notify := false
+
+       for _, inst := range instances {
+               itTag := inst.Tags()[tagKeyInstanceType]
+               it, ok := wp.instanceTypes[itTag]
+               if !ok {
+                       wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
+                       continue
+               }
+               if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+                       notify = true
+               } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
+                       wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+                       wkr.shutdown()
+               }
+       }
+
+       for id, wkr := range wp.workers {
+               if wkr.updated.After(threshold) {
+                       continue
+               }
+               logger := wp.logger.WithFields(logrus.Fields{
+                       "Instance":    wkr.instance,
+                       "WorkerState": wkr.state,
+               })
+               logger.Info("instance disappeared in cloud")
+               delete(wp.workers, id)
+               go wkr.executor.Close()
+               notify = true
+       }
+
+       if !wp.loaded {
+               wp.loaded = true
+               wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
+       }
+
+       if notify {
+               go wp.notify()
+       }
+}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
new file mode 100644 (file)
index 0000000..f8667e0
--- /dev/null
@@ -0,0 +1,142 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "io"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+const GiB arvados.ByteSize = 1 << 30
+
+var _ = check.Suite(&PoolSuite{})
+
+type lessChecker struct {
+       *check.CheckerInfo
+}
+
+func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
+       return params[0].(int) < params[1].(int), ""
+}
+
+var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
+
+type PoolSuite struct{}
+
+func (suite *PoolSuite) SetUpSuite(c *check.C) {
+       logrus.StandardLogger().SetLevel(logrus.DebugLevel)
+}
+
+func (suite *PoolSuite) TestStartContainer(c *check.C) {
+       // TODO: use an instanceSet stub with an SSH server
+}
+
+func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
+       // TODO: use an instanceSet stub with an SSH server
+}
+
+func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
+       lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
+       type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+       type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+       pool := &Pool{
+               logger:      logrus.StandardLogger(),
+               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               instanceSet: lameInstanceSet,
+               instanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+                       type2.Name: type2,
+               },
+       }
+       notify := pool.Subscribe()
+       defer pool.Unsubscribe(notify)
+       notify2 := pool.Subscribe()
+       defer pool.Unsubscribe(notify2)
+
+       c.Check(pool.Unallocated()[type1], check.Equals, 0)
+       c.Check(pool.Unallocated()[type2], check.Equals, 0)
+       pool.Create(type2)
+       pool.Create(type1)
+       pool.Create(type2)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(pool.Unallocated()[type2], check.Equals, 2)
+       // Unblock the pending Create calls and (before calling Sync!)
+       // wait for the pool to process the returned instances.
+       go lameInstanceSet.Release(3)
+       suite.wait(c, pool, notify, func() bool {
+               list, err := lameInstanceSet.Instances(nil)
+               return err == nil && len(list) == 3
+       })
+
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(pool.Unallocated()[type2], check.Equals, 2)
+       pool.getInstancesAndSync()
+       // Returned counts can be temporarily higher than 1 and 2 if
+       // poll ran before Create() returned.
+       c.Check(pool.Unallocated()[type1], check.Not(less), 1)
+       c.Check(pool.Unallocated()[type2], check.Not(less), 2)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 2
+       })
+
+       c.Check(pool.Shutdown(type2), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
+       })
+       c.Check(pool.Shutdown(type2), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
+       })
+       c.Check(pool.Shutdown(type2), check.Equals, false)
+       for {
+               // Consume any waiting notifications to ensure the
+               // next one we get is from Shutdown.
+               select {
+               case <-notify:
+                       continue
+               default:
+               }
+               break
+       }
+       c.Check(pool.Shutdown(type1), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+       })
+       select {
+       case <-notify2:
+       case <-time.After(time.Second):
+               c.Error("notify did not receive")
+       }
+       go lameInstanceSet.Release(3) // unblock Destroy calls
+}
+
+func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
+       timeout := time.NewTimer(time.Second).C
+       for !ready() {
+               select {
+               case <-notify:
+                       continue
+               case <-timeout:
+               }
+               break
+       }
+       c.Check(ready(), check.Equals, true)
+}
+
+type stubExecutor struct{}
+
+func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+
+func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+       return nil, nil, nil
+}
+
+func (*stubExecutor) Close() {}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
new file mode 100644 (file)
index 0000000..c8e6e86
--- /dev/null
@@ -0,0 +1,315 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "bytes"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+// State indicates whether a worker is available to do work, and (if
+// not) whether/when it is expected to become ready.
+type State int
+
+const (
+       StateUnknown  State = iota // might be running a container already
+       StateBooting               // instance is booting
+       StateIdle                  // instance booted, no containers are running
+       StateRunning               // instance is running one or more containers
+       StateShutdown              // worker has stopped monitoring the instance
+       StateHold                  // running, but not available to run new containers
+)
+
+const (
+       // TODO: configurable
+       maxPingFailTime = 10 * time.Minute
+)
+
+var stateString = map[State]string{
+       StateUnknown:  "unknown",
+       StateBooting:  "booting",
+       StateIdle:     "idle",
+       StateRunning:  "running",
+       StateShutdown: "shutdown",
+       StateHold:     "hold",
+}
+
+// String implements fmt.Stringer.
+func (s State) String() string {
+       return stateString[s]
+}
+
+// MarshalText implements encoding.TextMarshaler so a JSON encoding of
+// map[State]anything uses the state's string representation.
+func (s State) MarshalText() ([]byte, error) {
+       return []byte(stateString[s]), nil
+}
+
+type worker struct {
+       logger   logrus.FieldLogger
+       executor Executor
+       wp       *Pool
+
+       mtx       sync.Locker // must be wp's Locker.
+       state     State
+       instance  cloud.Instance
+       instType  arvados.InstanceType
+       vcpus     int64
+       memory    int64
+       probed    time.Time
+       updated   time.Time
+       busy      time.Time
+       destroyed time.Time
+       lastUUID  string
+       running   map[string]struct{} // remember to update state idle<->running when this changes
+       starting  map[string]struct{} // remember to update state idle<->running when this changes
+       probing   chan struct{}
+}
+
+// caller must have lock.
+func (wkr *worker) startContainer(ctr arvados.Container) {
+       logger := wkr.logger.WithFields(logrus.Fields{
+               "ContainerUUID": ctr.UUID,
+               "Priority":      ctr.Priority,
+       })
+       logger = logger.WithField("Instance", wkr.instance)
+       logger.Debug("starting container")
+       wkr.starting[ctr.UUID] = struct{}{}
+       wkr.state = StateRunning
+       go func() {
+               stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+               wkr.mtx.Lock()
+               defer wkr.mtx.Unlock()
+               now := time.Now()
+               wkr.updated = now
+               wkr.busy = now
+               delete(wkr.starting, ctr.UUID)
+               wkr.running[ctr.UUID] = struct{}{}
+               wkr.lastUUID = ctr.UUID
+               if err != nil {
+                       logger.WithField("stdout", string(stdout)).
+                               WithField("stderr", string(stderr)).
+                               WithError(err).
+                               Error("error starting crunch-run process")
+                       // Leave uuid in wkr.running, though: it's
+                       // possible the error was just a communication
+                       // failure and the process was in fact
+                       // started.  Wait for next probe to find out.
+                       return
+               }
+               logger.Info("crunch-run process started")
+               wkr.lastUUID = ctr.UUID
+       }()
+}
+
+// ProbeAndUpdate conducts appropriate boot/running probes (if any)
+// for the worker's curent state. If a previous probe is still
+// running, it does nothing.
+//
+// It should be called in a new goroutine.
+func (wkr *worker) ProbeAndUpdate() {
+       select {
+       case wkr.probing <- struct{}{}:
+               wkr.probeAndUpdate()
+               <-wkr.probing
+       default:
+               wkr.logger.Debug("still waiting for last probe to finish")
+       }
+}
+
+// should be called in a new goroutine
+func (wkr *worker) probeAndUpdate() {
+       wkr.mtx.Lock()
+       updated := wkr.updated
+       needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
+       needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+       wkr.mtx.Unlock()
+       if !needProbeBooted && !needProbeRunning {
+               return
+       }
+
+       var (
+               ctrUUIDs []string
+               ok       bool
+               stderr   []byte
+       )
+       if needProbeBooted {
+               ok, stderr = wkr.probeBooted()
+               wkr.mtx.Lock()
+               if ok || wkr.state == StateRunning || wkr.state == StateIdle {
+                       wkr.logger.Info("instance booted; will try probeRunning")
+                       needProbeRunning = true
+               }
+               wkr.mtx.Unlock()
+       }
+       if needProbeRunning {
+               ctrUUIDs, ok, stderr = wkr.probeRunning()
+       }
+       logger := wkr.logger.WithField("stderr", string(stderr))
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       if !ok {
+               if wkr.state == StateShutdown && wkr.updated.After(updated) {
+                       // Skip the logging noise if shutdown was
+                       // initiated during probe.
+                       return
+               }
+               dur := time.Since(wkr.probed)
+               logger := logger.WithFields(logrus.Fields{
+                       "Duration": dur,
+                       "State":    wkr.state,
+               })
+               if wkr.state == StateBooting {
+                       logger.Debug("new instance not responding")
+               } else {
+                       logger.Info("instance not responding")
+               }
+               wkr.shutdownIfBroken(dur)
+               return
+       }
+
+       updateTime := time.Now()
+       wkr.probed = updateTime
+
+       if updated != wkr.updated {
+               // Worker was updated after the probe began, so
+               // wkr.running might have a container UUID that was
+               // not yet running when ctrUUIDs was generated. Leave
+               // wkr.running alone and wait for the next probe to
+               // catch up on any changes.
+               return
+       }
+
+       if len(ctrUUIDs) > 0 {
+               wkr.busy = updateTime
+               wkr.lastUUID = ctrUUIDs[0]
+       } else if len(wkr.running) > 0 {
+               // Actual last-busy time was sometime between wkr.busy
+               // and now. Now is the earliest opportunity to take
+               // advantage of the non-busy state, though.
+               wkr.busy = updateTime
+       }
+       running := map[string]struct{}{}
+       changed := false
+       for _, uuid := range ctrUUIDs {
+               running[uuid] = struct{}{}
+               if _, ok := wkr.running[uuid]; !ok {
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if _, ok := running[uuid]; !ok {
+                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+                       wkr.wp.notifyExited(uuid, updateTime)
+                       changed = true
+               }
+       }
+       if wkr.state == StateUnknown || wkr.state == StateBooting {
+               wkr.state = StateIdle
+               changed = true
+       }
+       if changed {
+               wkr.running = running
+               if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+                       wkr.state = StateRunning
+               } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+                       wkr.state = StateIdle
+               }
+               wkr.updated = updateTime
+               go wkr.wp.notify()
+       }
+}
+
+func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+       cmd := "crunch-run --list"
+       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       if err != nil {
+               wkr.logger.WithFields(logrus.Fields{
+                       "Command": cmd,
+                       "stdout":  string(stdout),
+                       "stderr":  string(stderr),
+               }).WithError(err).Warn("probe failed")
+               return nil, false, stderr
+       }
+       stdout = bytes.TrimRight(stdout, "\n")
+       if len(stdout) == 0 {
+               return nil, true, stderr
+       }
+       return strings.Split(string(stdout), "\n"), true, stderr
+}
+
+func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+       cmd := wkr.wp.bootProbeCommand
+       if cmd == "" {
+               cmd = "true"
+       }
+       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       logger := wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       })
+       if err != nil {
+               logger.WithError(err).Debug("boot probe failed")
+               return false, stderr
+       }
+       logger.Info("boot probe succeeded")
+       return true, stderr
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfBroken(dur time.Duration) {
+       if wkr.state == StateHold {
+               return
+       }
+       label, threshold := "", wkr.wp.timeoutProbe
+       if wkr.state == StateBooting {
+               label, threshold = "new ", wkr.wp.timeoutBooting
+       }
+       if dur < threshold {
+               return
+       }
+       wkr.logger.WithFields(logrus.Fields{
+               "Duration": dur,
+               "Since":    wkr.probed,
+               "State":    wkr.state,
+       }).Warnf("%sinstance unresponsive, shutting down", label)
+       wkr.shutdown()
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if wkr.state != StateIdle {
+               return false
+       }
+       age := time.Since(wkr.busy)
+       if age < wkr.wp.timeoutIdle {
+               return false
+       }
+       wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+       wkr.shutdown()
+       return true
+}
+
+// caller must have lock
+func (wkr *worker) shutdown() {
+       now := time.Now()
+       wkr.updated = now
+       wkr.destroyed = now
+       wkr.state = StateShutdown
+       go func() {
+               err := wkr.instance.Destroy()
+               if err != nil {
+                       wkr.logger.WithError(err).Warn("shutdown failed")
+                       return
+               }
+       }()
+}
index e2e9907d5d115ebb61a53ae873249511b3732a50..bfa86abf6a48a1fcf30eda2618bdfaf28b0a2efb 100644 (file)
@@ -60,6 +60,8 @@ type Cluster struct {
        ManagementToken    string
        NodeProfiles       map[string]NodeProfile
        InstanceTypes      InstanceTypeMap
+       CloudVMs           CloudVMs
+       Dispatch           Dispatch
        HTTPRequestTimeout Duration
        RemoteClusters     map[string]RemoteCluster
        PostgreSQL         PostgreSQL
@@ -95,6 +97,50 @@ type InstanceType struct {
        Preemptible  bool
 }
 
+type Dispatch struct {
+       // PEM encoded SSH key (RSA, DSA, or ECDSA) able to log in to
+       // cloud VMs.
+       PrivateKey []byte
+
+       // Max time for workers to come up before abandoning stale
+       // locks from previous run
+       StaleLockTimeout Duration
+
+       // Interval between queue polls
+       PollInterval Duration
+
+       // Interval between probes to each worker
+       ProbeInterval Duration
+
+       // Maximum total worker probes per second
+       MaxProbesPerSecond int
+}
+
+type CloudVMs struct {
+       // Shell command that exits zero IFF the VM is fully booted
+       // and ready to run containers, e.g., "mount | grep
+       // /encrypted-tmp"
+       BootProbeCommand string
+       SyncInterval     Duration
+
+       // Maximum idle time before automatic shutdown
+       TimeoutIdle Duration
+
+       // Maximum booting time before automatic shutdown
+       TimeoutBooting Duration
+
+       // Maximum time with no successful probes before automatic shutdown
+       TimeoutProbe Duration
+
+       // Time after shutdown to retry shutdown
+       TimeoutShutdown Duration
+
+       ImageID string
+
+       Driver           string
+       DriverParameters map[string]interface{}
+}
+
 type InstanceTypeMap map[string]InstanceType
 
 var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
@@ -159,45 +205,48 @@ func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
 }
 
 type NodeProfile struct {
-       Controller  SystemServiceInstance `json:"arvados-controller"`
-       Health      SystemServiceInstance `json:"arvados-health"`
-       Keepbalance SystemServiceInstance `json:"keep-balance"`
-       Keepproxy   SystemServiceInstance `json:"keepproxy"`
-       Keepstore   SystemServiceInstance `json:"keepstore"`
-       Keepweb     SystemServiceInstance `json:"keep-web"`
-       Nodemanager SystemServiceInstance `json:"arvados-node-manager"`
-       RailsAPI    SystemServiceInstance `json:"arvados-api-server"`
-       Websocket   SystemServiceInstance `json:"arvados-ws"`
-       Workbench   SystemServiceInstance `json:"arvados-workbench"`
+       Controller    SystemServiceInstance `json:"arvados-controller"`
+       Health        SystemServiceInstance `json:"arvados-health"`
+       Keepbalance   SystemServiceInstance `json:"keep-balance"`
+       Keepproxy     SystemServiceInstance `json:"keepproxy"`
+       Keepstore     SystemServiceInstance `json:"keepstore"`
+       Keepweb       SystemServiceInstance `json:"keep-web"`
+       Nodemanager   SystemServiceInstance `json:"arvados-node-manager"`
+       DispatchCloud SystemServiceInstance `json:"arvados-dispatch-cloud"`
+       RailsAPI      SystemServiceInstance `json:"arvados-api-server"`
+       Websocket     SystemServiceInstance `json:"arvados-ws"`
+       Workbench     SystemServiceInstance `json:"arvados-workbench"`
 }
 
 type ServiceName string
 
 const (
-       ServiceNameRailsAPI    ServiceName = "arvados-api-server"
-       ServiceNameController  ServiceName = "arvados-controller"
-       ServiceNameNodemanager ServiceName = "arvados-node-manager"
-       ServiceNameWorkbench   ServiceName = "arvados-workbench"
-       ServiceNameWebsocket   ServiceName = "arvados-ws"
-       ServiceNameKeepbalance ServiceName = "keep-balance"
-       ServiceNameKeepweb     ServiceName = "keep-web"
-       ServiceNameKeepproxy   ServiceName = "keepproxy"
-       ServiceNameKeepstore   ServiceName = "keepstore"
+       ServiceNameRailsAPI      ServiceName = "arvados-api-server"
+       ServiceNameController    ServiceName = "arvados-controller"
+       ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+       ServiceNameNodemanager   ServiceName = "arvados-node-manager"
+       ServiceNameWorkbench     ServiceName = "arvados-workbench"
+       ServiceNameWebsocket     ServiceName = "arvados-ws"
+       ServiceNameKeepbalance   ServiceName = "keep-balance"
+       ServiceNameKeepweb       ServiceName = "keep-web"
+       ServiceNameKeepproxy     ServiceName = "keepproxy"
+       ServiceNameKeepstore     ServiceName = "keepstore"
 )
 
 // ServicePorts returns the configured listening address (or "" if
 // disabled) for each service on the node.
 func (np *NodeProfile) ServicePorts() map[ServiceName]string {
        return map[ServiceName]string{
-               ServiceNameRailsAPI:    np.RailsAPI.Listen,
-               ServiceNameController:  np.Controller.Listen,
-               ServiceNameNodemanager: np.Nodemanager.Listen,
-               ServiceNameWorkbench:   np.Workbench.Listen,
-               ServiceNameWebsocket:   np.Websocket.Listen,
-               ServiceNameKeepbalance: np.Keepbalance.Listen,
-               ServiceNameKeepweb:     np.Keepweb.Listen,
-               ServiceNameKeepproxy:   np.Keepproxy.Listen,
-               ServiceNameKeepstore:   np.Keepstore.Listen,
+               ServiceNameRailsAPI:      np.RailsAPI.Listen,
+               ServiceNameController:    np.Controller.Listen,
+               ServiceNameDispatchCloud: np.DispatchCloud.Listen,
+               ServiceNameNodemanager:   np.Nodemanager.Listen,
+               ServiceNameWorkbench:     np.Workbench.Listen,
+               ServiceNameWebsocket:     np.Websocket.Listen,
+               ServiceNameKeepbalance:   np.Keepbalance.Listen,
+               ServiceNameKeepweb:       np.Keepweb.Listen,
+               ServiceNameKeepproxy:     np.Keepproxy.Listen,
+               ServiceNameKeepstore:     np.Keepstore.Listen,
        }
 }
 
index b70b4ac917672f363096a810cd35e3689f5132f9..02a0d76decbad272baee737282b5087a72a33c60 100644 (file)
@@ -18,10 +18,11 @@ type Container struct {
        Mounts               map[string]Mount     `json:"mounts"`
        Output               string               `json:"output"`
        OutputPath           string               `json:"output_path"`
-       Priority             int                  `json:"priority"`
+       Priority             int64                `json:"priority"`
        RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
        State                ContainerState       `json:"state"`
        SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
+       ExitCode             int                  `json:"exit_code"`
 }
 
 // Container is an arvados#container resource.
index cb47c9e6705ea096087199813050e3c3095f4974..122355be987755b161d38a2e46e0bc2cc4f52208 100644 (file)
@@ -107,15 +107,16 @@ func (s *AggregatorSuite) TestHealthy(c *check.C) {
        srv, listen := s.stubServer(&healthyHandler{})
        defer srv.Close()
        s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
-               Controller:  arvados.SystemServiceInstance{Listen: listen},
-               Keepbalance: arvados.SystemServiceInstance{Listen: listen},
-               Keepproxy:   arvados.SystemServiceInstance{Listen: listen},
-               Keepstore:   arvados.SystemServiceInstance{Listen: listen},
-               Keepweb:     arvados.SystemServiceInstance{Listen: listen},
-               Nodemanager: arvados.SystemServiceInstance{Listen: listen},
-               RailsAPI:    arvados.SystemServiceInstance{Listen: listen},
-               Websocket:   arvados.SystemServiceInstance{Listen: listen},
-               Workbench:   arvados.SystemServiceInstance{Listen: listen},
+               Controller:    arvados.SystemServiceInstance{Listen: listen},
+               DispatchCloud: arvados.SystemServiceInstance{Listen: listen},
+               Keepbalance:   arvados.SystemServiceInstance{Listen: listen},
+               Keepproxy:     arvados.SystemServiceInstance{Listen: listen},
+               Keepstore:     arvados.SystemServiceInstance{Listen: listen},
+               Keepweb:       arvados.SystemServiceInstance{Listen: listen},
+               Nodemanager:   arvados.SystemServiceInstance{Listen: listen},
+               RailsAPI:      arvados.SystemServiceInstance{Listen: listen},
+               Websocket:     arvados.SystemServiceInstance{Listen: listen},
+               Workbench:     arvados.SystemServiceInstance{Listen: listen},
        }
        s.handler.ServeHTTP(s.resp, s.req)
        resp := s.checkOK(c)
@@ -132,15 +133,16 @@ func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
        srvU, listenU := s.stubServer(&unhealthyHandler{})
        defer srvU.Close()
        s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
-               Controller:  arvados.SystemServiceInstance{Listen: listenH},
-               Keepbalance: arvados.SystemServiceInstance{Listen: listenH},
-               Keepproxy:   arvados.SystemServiceInstance{Listen: listenH},
-               Keepstore:   arvados.SystemServiceInstance{Listen: listenH},
-               Keepweb:     arvados.SystemServiceInstance{Listen: listenH},
-               Nodemanager: arvados.SystemServiceInstance{Listen: listenH},
-               RailsAPI:    arvados.SystemServiceInstance{Listen: listenH},
-               Websocket:   arvados.SystemServiceInstance{Listen: listenH},
-               Workbench:   arvados.SystemServiceInstance{Listen: listenH},
+               Controller:    arvados.SystemServiceInstance{Listen: listenH},
+               DispatchCloud: arvados.SystemServiceInstance{Listen: listenH},
+               Keepbalance:   arvados.SystemServiceInstance{Listen: listenH},
+               Keepproxy:     arvados.SystemServiceInstance{Listen: listenH},
+               Keepstore:     arvados.SystemServiceInstance{Listen: listenH},
+               Keepweb:       arvados.SystemServiceInstance{Listen: listenH},
+               Nodemanager:   arvados.SystemServiceInstance{Listen: listenH},
+               RailsAPI:      arvados.SystemServiceInstance{Listen: listenH},
+               Websocket:     arvados.SystemServiceInstance{Listen: listenH},
+               Workbench:     arvados.SystemServiceInstance{Listen: listenH},
        }
        s.handler.Config.Clusters["zzzzz"].NodeProfiles["127.0.0.1"] = arvados.NodeProfile{
                Keepstore: arvados.SystemServiceInstance{Listen: listenU},
index 084700d39bfad76b109078f29e81ecf82c40c5be..30e88bf9258c751179b4979730ea72d128bc73fd 100644 (file)
@@ -197,7 +197,7 @@ func (disp *Dispatcher) run() error {
        defer disp.sqCheck.Stop()
 
        if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
-               go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster)
+               go SlurmNodeTypeFeatureKludge(disp.cluster)
        }
 
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
diff --git a/services/crunch-dispatch-slurm/node_type.go b/services/crunch-dispatch-slurm/node_type.go
new file mode 100644 (file)
index 0000000..62a9693
--- /dev/null
@@ -0,0 +1,72 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "log"
+       "os/exec"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
+// type name as a valid feature name, even if no instances of that
+// type have appeared yet.
+//
+// It takes advantage of some SLURM peculiarities:
+//
+// (1) A feature is valid after it has been offered by a node, even if
+// it is no longer offered by any node. So, to make a feature name
+// valid, we can add it to a dummy node ("compute0"), then remove it.
+//
+// (2) To test whether a set of feature names are valid without
+// actually submitting a job, we can call srun --test-only with the
+// desired features.
+//
+// SlurmNodeTypeFeatureKludge does a test-and-fix operation
+// immediately, and then periodically, in case slurm restarts and
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
+func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
+       if len(cc.InstanceTypes) == 0 {
+               return
+       }
+       var features []string
+       for _, it := range cc.InstanceTypes {
+               features = append(features, "instancetype="+it.Name)
+       }
+       for {
+               slurmKludge(features)
+               time.Sleep(2 * time.Second)
+       }
+}
+
+const slurmDummyNode = "compute0"
+
+func slurmKludge(features []string) {
+       allFeatures := strings.Join(features, ",")
+
+       cmd := exec.Command("sinfo", "--nodes="+slurmDummyNode, "--format=%f", "--noheader")
+       out, err := cmd.CombinedOutput()
+       if err != nil {
+               log.Printf("running %q %q: %s (output was %q)", cmd.Path, cmd.Args, err, out)
+               return
+       }
+       if string(out) == allFeatures+"\n" {
+               // Already configured correctly, nothing to do.
+               return
+       }
+
+       log.Printf("configuring node %q with all node type features", slurmDummyNode)
+       cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+allFeatures)
+       log.Printf("running: %q %q", cmd.Path, cmd.Args)
+       out, err = cmd.CombinedOutput()
+       if err != nil {
+               log.Printf("error: scontrol: %s (output was %q)", err, out)
+       }
+}
diff --git a/services/crunch-run/background.go b/services/crunch-run/background.go
new file mode 100644 (file)
index 0000000..ee054f0
--- /dev/null
@@ -0,0 +1,197 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "os"
+       "os/exec"
+       "path/filepath"
+       "strings"
+       "syscall"
+       "time"
+)
+
+var (
+       lockdir    = "/var/lock"
+       lockprefix = "crunch-run-"
+       locksuffix = ".lock"
+)
+
+// procinfo is saved in each process's lockfile.
+type procinfo struct {
+       UUID   string
+       PID    int
+       Stdout string
+       Stderr string
+}
+
+// Detach acquires a lock for the given uuid, and starts the current
+// program as a child process (with -detached prepended to the given
+// arguments so the child knows not to detach again). The lock is
+// passed along to the child process.
+func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
+       return exitcode(stderr, detach(uuid, args, stdout, stderr))
+}
+func detach(uuid string, args []string, stdout, stderr io.Writer) error {
+       lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+       if err != nil {
+               return err
+       }
+       defer lockfile.Close()
+       err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+       if err != nil {
+               return err
+       }
+       lockfile.Truncate(0)
+
+       outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-")
+       if err != nil {
+               return err
+       }
+       defer outfile.Close()
+       errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-")
+       if err != nil {
+               os.Remove(outfile.Name())
+               return err
+       }
+       defer errfile.Close()
+
+       cmd := exec.Command(args[0], append([]string{"-detached"}, args[1:]...)...)
+       cmd.Stdout = outfile
+       cmd.Stderr = errfile
+       // Child inherits lockfile.
+       cmd.ExtraFiles = []*os.File{lockfile}
+       // Ensure child isn't interrupted even if we receive signals
+       // from parent (sshd) while sending lockfile content to
+       // caller.
+       cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+       err = cmd.Start()
+       if err != nil {
+               os.Remove(outfile.Name())
+               os.Remove(errfile.Name())
+               return err
+       }
+
+       w := io.MultiWriter(stdout, lockfile)
+       err = json.NewEncoder(w).Encode(procinfo{
+               PID:    cmd.Process.Pid,
+               Stdout: outfile.Name(),
+               Stderr: errfile.Name(),
+       })
+       if err != nil {
+               os.Remove(outfile.Name())
+               os.Remove(errfile.Name())
+               return err
+       }
+       return nil
+}
+
+// KillProcess finds the crunch-run process corresponding to the given
+// uuid, and sends the given signal to it. It then waits up to 1
+// second for the process to die. It returns 0 if the process is
+// successfully killed or didn't exist in the first place.
+func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
+       return exitcode(stderr, kill(uuid, signal, stdout, stderr))
+}
+
+func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
+       path := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
+       f, err := os.Open(path)
+       if os.IsNotExist(err) {
+               return nil
+       } else if err != nil {
+               return err
+       }
+       defer f.Close()
+
+       var pi procinfo
+       err = json.NewDecoder(f).Decode(&pi)
+       if err != nil {
+               return fmt.Errorf("%s: %s\n", path, err)
+       }
+
+       if pi.UUID != uuid || pi.PID == 0 {
+               return fmt.Errorf("%s: bogus procinfo: %+v", path, pi)
+       }
+
+       proc, err := os.FindProcess(pi.PID)
+       if err != nil {
+               return err
+       }
+
+       err = proc.Signal(signal)
+       for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
+               err = proc.Signal(syscall.Signal(0))
+       }
+       if err == nil {
+               return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
+       }
+       fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err)
+       return nil
+}
+
+// List UUIDs of active crunch-run processes.
+func ListProcesses(stdout, stderr io.Writer) int {
+       return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
+               if info.IsDir() {
+                       return filepath.SkipDir
+               }
+               if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+                       return nil
+               }
+               if info.Size() == 0 {
+                       // race: process has opened/locked but hasn't yet written pid/uuid
+                       return nil
+               }
+
+               f, err := os.Open(path)
+               if err != nil {
+                       return nil
+               }
+               defer f.Close()
+
+               // TODO: Do this check without risk of disrupting lock
+               // acquisition during races, e.g., by connecting to a
+               // unix socket or checking /proc/$pid/fd/$n ->
+               // lockfile.
+               err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH)
+               if err == nil {
+                       // lockfile is stale
+                       err := os.Remove(path)
+                       if err != nil {
+                               fmt.Fprintln(stderr, err)
+                       }
+                       return nil
+               }
+
+               var pi procinfo
+               err = json.NewDecoder(f).Decode(&pi)
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s: %s\n", path, err)
+                       return nil
+               }
+               if pi.UUID == "" || pi.PID == 0 {
+                       fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi)
+                       return nil
+               }
+
+               fmt.Fprintln(stdout, pi.UUID)
+               return nil
+       }))
+}
+
+// If err is nil, return 0 ("success"); otherwise, print err to stderr
+// and return 1.
+func exitcode(stderr io.Writer, err error) int {
+       if err != nil {
+               fmt.Fprintln(stderr, err)
+               return 1
+       }
+       return 0
+}
index e5a1b94706c781cdc62e805e972c5f46ac0bba24..1c6c58009fac71bd5fa5015a4922821937e61d5a 100644 (file)
@@ -1732,6 +1732,10 @@ func main() {
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+       detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
+       kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+       list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
        enableNetwork := flag.String("container-enable-networking", "default",
                `Specify if networking should be enabled for container.  One of 'default', 'always':
        default: only enable networking if container requests it.
@@ -1743,8 +1747,30 @@ func main() {
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+
+       detached := false
+       if len(os.Args) > 1 && os.Args[1] == "-detached" {
+               // This process was invoked by a parent process, which
+               // has passed along its own arguments, including
+               // -detach, after the leading -detached flag.  Strip
+               // the leading -detached flag (it's not recognized by
+               // flag.Parse()) ... and remember not to detach all
+               // over again in this process.
+               os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+               detached = true
+       }
+
        flag.Parse()
 
+       switch {
+       case *detach && !detached:
+               os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+       case *kill >= 0:
+               os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+       case *list:
+               os.Exit(ListProcesses(os.Stdout, os.Stderr))
+       }
+
        // Print version information if requested
        if *getVersion {
                fmt.Printf("crunch-run %s\n", version)
@@ -1752,6 +1778,7 @@ func main() {
        }
 
        log.Printf("crunch-run %s started", version)
+       time.Sleep(*sleep)
 
        containerId := flag.Arg(0)
 
index c0e6fee5dbe63ed053c70fcc1ac0875f13813381..ec296d21d1c25a92c14639c36658c4b0aa10bce5 100644 (file)
                        "revision": "b8bc1bf767474819792c23f32d8286a45736f1c6",
                        "revisionTime": "2016-12-03T19:45:07Z"
                },
+               {
+                       "checksumSHA1": "ewGq4nGalpCQOHcmBTdAEQx1wW0=",
+                       "path": "github.com/mitchellh/mapstructure",
+                       "revision": "bb74f1db0675b241733089d5a1faa5dd8b0ef57b",
+                       "revisionTime": "2018-05-11T14:21:26Z"
+               },
                {
                        "checksumSHA1": "OFNit1Qx2DdWhotfREKodDNUwCM=",
                        "path": "github.com/opencontainers/go-digest",