Merge branch 'master' into 14539-pysdk-empty-dir
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Mon, 14 Jan 2019 17:16:32 +0000 (14:16 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Mon, 14 Jan 2019 17:16:32 +0000 (14:16 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

78 files changed:
.licenseignore
apps/workbench/app/helpers/application_helper.rb
apps/workbench/app/views/users/_virtual_machines.html.erb
apps/workbench/package-build.version [new file with mode: 0644]
apps/workbench/test/the.patch [new file with mode: 0644]
build/run-build-packages.sh
build/run-tests.sh
cmd/arvados-server/cmd.go
cmd/arvados-server/crunch-dispatch-cloud.service [new file with mode: 0644]
doc/README.textile
doc/Rakefile
doc/_config.yml
doc/user/getting_started/ssh-access-unix.html.textile.liquid
lib/cloud/interfaces.go [new file with mode: 0644]
lib/cmd/cmd.go
lib/dispatchcloud/cmd.go [new file with mode: 0644]
lib/dispatchcloud/container/queue.go [new file with mode: 0644]
lib/dispatchcloud/dispatcher.go [new file with mode: 0644]
lib/dispatchcloud/dispatcher_test.go [new file with mode: 0644]
lib/dispatchcloud/driver.go [new file with mode: 0644]
lib/dispatchcloud/instance_set_proxy.go [new file with mode: 0644]
lib/dispatchcloud/logger.go [new file with mode: 0644]
lib/dispatchcloud/node_size.go
lib/dispatchcloud/readme.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/fix_stale_locks.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/gocheck_test.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/interfaces.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/run_queue.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/run_queue_test.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/scheduler.go [new file with mode: 0644]
lib/dispatchcloud/scheduler/sync.go [new file with mode: 0644]
lib/dispatchcloud/ssh_executor/executor.go [new file with mode: 0644]
lib/dispatchcloud/ssh_executor/executor_test.go [new file with mode: 0644]
lib/dispatchcloud/test/doc.go [new file with mode: 0644]
lib/dispatchcloud/test/fixtures.go [new file with mode: 0644]
lib/dispatchcloud/test/lame_instance_set.go [new file with mode: 0644]
lib/dispatchcloud/test/queue.go [new file with mode: 0644]
lib/dispatchcloud/test/ssh_service.go [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_dispatch [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_dispatch.pub [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_vm [new file with mode: 0644]
lib/dispatchcloud/test/sshkey_vm.pub [new file with mode: 0644]
lib/dispatchcloud/test/stub_driver.go [new file with mode: 0644]
lib/dispatchcloud/worker/gocheck_test.go [new file with mode: 0644]
lib/dispatchcloud/worker/pool.go [new file with mode: 0644]
lib/dispatchcloud/worker/pool_test.go [new file with mode: 0644]
lib/dispatchcloud/worker/worker.go [new file with mode: 0644]
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/tests/test_pathmapper.py
sdk/go/arvados/config.go
sdk/go/arvados/container.go
sdk/go/health/aggregator_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/node_type.go [new file with mode: 0644]
services/crunch-run/background.go [new file with mode: 0644]
services/crunch-run/crunchrun.go
tools/arvbox/bin/arvbox
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/arvbox/lib/arvbox/docker/Dockerfile.demo
tools/arvbox/lib/arvbox/docker/api-setup.sh
tools/arvbox/lib/arvbox/docker/common.sh
tools/arvbox/lib/arvbox/docker/service/certificate/log/main/.gitstub [new file with mode: 0644]
tools/arvbox/lib/arvbox/docker/service/certificate/log/run [new symlink]
tools/arvbox/lib/arvbox/docker/service/certificate/run [new file with mode: 0755]
tools/arvbox/lib/arvbox/docker/service/composer/run
tools/arvbox/lib/arvbox/docker/service/gitolite/run-service
tools/arvbox/lib/arvbox/docker/service/nginx/run-service
tools/arvbox/lib/arvbox/docker/service/ready/run-service
tools/arvbox/lib/arvbox/docker/service/ssh/run
tools/arvbox/lib/arvbox/docker/service/sso/run-service
tools/arvbox/lib/arvbox/docker/service/websockets/run-service
tools/arvbox/lib/arvbox/docker/service/workbench/run
tools/arvbox/lib/arvbox/docker/service/workbench/run-service
tools/arvbox/lib/arvbox/docker/service/workbench2/log/main/.gitstub [new file with mode: 0644]
tools/arvbox/lib/arvbox/docker/service/workbench2/log/run [new symlink]
tools/arvbox/lib/arvbox/docker/service/workbench2/run [new file with mode: 0755]
tools/arvbox/lib/arvbox/docker/service/workbench2/run-service [new file with mode: 0755]
vendor/vendor.json

index 83c81b2fc21ca1db86206d82c2f50cae297b9b9c..06519a98e8bc45afcebdad584198a6b6bb47bf71 100644 (file)
@@ -71,4 +71,5 @@ sdk/R/NAMESPACE
 sdk/R/.Rbuildignore
 sdk/R/ArvadosR.Rproj
 *.Rd
+lib/dispatchcloud/test/sshkey_*
 *.asc
index c4a801d68b0a645fe7c10de9cdee91f642ed4ab7..15bf77fa094f188e5b3f8be980c5c57e3d73bcfe 100644 (file)
@@ -15,6 +15,10 @@ module ApplicationHelper
     Rails.configuration.arvados_v1_base.gsub /https?:\/\/|\/arvados\/v1/,''
   end
 
+  def current_uuid_prefix
+    current_api_host[0..4]
+  end
+
   def render_markup(markup)
     allowed_tags = Rails::Html::Sanitizer.white_list_sanitizer.allowed_tags + %w(table tbody th tr td col colgroup caption thead tfoot)
     sanitize(raw(RedCloth.new(markup.to_s).to_html(:refs_arvados, :textile)), tags: allowed_tags) if markup
index 928f50f0b6f50fb0fab0701b7e8f61e7940847fd..c891b0c594af329b9f0a7790217596b11b0109fc 100644 (file)
@@ -85,7 +85,7 @@ SPDX-License-Identifier: AGPL-3.0 %>
             <td style="word-break:break-all;">
               <% if @my_vm_logins[vm[:uuid]] %>
                 <% @my_vm_logins[vm[:uuid]].each do |login| %>
-                  <code>ssh&nbsp;<%= login %>@<%= vm[:hostname] %>.arvados</code>
+                  <code>ssh&nbsp;<%= login %>@<%= vm[:hostname] %>.<%= current_uuid_prefix || 'xyzzy' %></code>
                 <% end %>
               <% end %>
             </td>
@@ -106,7 +106,7 @@ SPDX-License-Identifier: AGPL-3.0 %>
 </div>
 </div>
   <p>In order to access virtual machines using SSH, <%= link_to ssh_keys_user_path(current_user) do%> add an SSH key to your account<%end%> and add a section like this to your SSH configuration file ( <i>~/.ssh/config</i>):</p>
-    <pre>Host *.arvados
+    <pre>Host *.<%= current_uuid_prefix || 'xyzzy' %>
       TCPKeepAlive yes
       ServerAliveInterval 60
       ProxyCommand ssh -p2222 turnout@switchyard.<%= current_api_host || 'xyzzy.arvadosapi.com' %> -x -a $SSH_PROXY_FLAGS %h
diff --git a/apps/workbench/package-build.version b/apps/workbench/package-build.version
new file mode 100644 (file)
index 0000000..41eb2c7
--- /dev/null
@@ -0,0 +1 @@
+1.2.1.20181126194329
diff --git a/apps/workbench/test/the.patch b/apps/workbench/test/the.patch
new file mode 100644 (file)
index 0000000..5a55679
--- /dev/null
@@ -0,0 +1,3 @@
++    echo -n 'geckodriver: '
++    which geckodriver || fatal "No geckodriver. Unable to find Mozilla geckodriver. Please download the server from https://github.com/mozilla/geckodriver/releases and place it somewhere on your PATH. More info at https://developer.mozilla.org/en-US/docs/Mozilla/QA/Marionette/WebDriver."
+
index 73e2e628631bbc71ea84388b37e6ae4d60917734..f316c563bd53e1ea6ddac44ca0928c6b299d8ffe 100755 (executable)
@@ -294,6 +294,9 @@ package_go_binary cmd/arvados-server arvados-server \
     "Arvados server daemons"
 package_go_binary cmd/arvados-server arvados-controller \
     "Arvados cluster controller daemon"
+# No package until #14325
+#package_go_binary cmd/arvados-server crunch-dispatch-cloud \
+#    "Arvados cluster cloud dispatch"
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
index 50c578269d007bfa3c4e4533bd57e0c3318ac166..c81376404cfae23224cca9d828527ebd3a2e3a61 100755 (executable)
@@ -77,6 +77,10 @@ lib/cmd
 lib/controller
 lib/crunchstat
 lib/dispatchcloud
+lib/dispatchcloud/container
+lib/dispatchcloud/scheduler
+lib/dispatchcloud/ssh_executor
+lib/dispatchcloud/worker
 services/api
 services/arv-git-httpd
 services/crunchstat
@@ -926,6 +930,10 @@ gostuff=(
     lib/controller
     lib/crunchstat
     lib/dispatchcloud
+    lib/dispatchcloud/container
+    lib/dispatchcloud/scheduler
+    lib/dispatchcloud/ssh_executor
+    lib/dispatchcloud/worker
     sdk/go/arvados
     sdk/go/arvadosclient
     sdk/go/auth
index 1af3745df0c3a4c54d296e848532c1d19c124e43..cd15d25dda760a41c427b8bfd4b621fb43e2130a 100644 (file)
@@ -9,6 +9,7 @@ import (
 
        "git.curoverse.com/arvados.git/lib/cmd"
        "git.curoverse.com/arvados.git/lib/controller"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud"
 )
 
 var (
@@ -18,7 +19,8 @@ var (
                "-version":  cmd.Version(version),
                "--version": cmd.Version(version),
 
-               "controller": controller.Command,
+               "controller":     controller.Command,
+               "dispatch-cloud": dispatchcloud.Command,
        })
 )
 
diff --git a/cmd/arvados-server/crunch-dispatch-cloud.service b/cmd/arvados-server/crunch-dispatch-cloud.service
new file mode 100644 (file)
index 0000000..f8d71c9
--- /dev/null
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+[Unit]
+Description=Arvados cloud dispatch
+Documentation=https://doc.arvados.org/
+After=network.target
+AssertPathExists=/etc/arvados/config.yml
+
+# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
+StartLimitInterval=0
+
+# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
+StartLimitIntervalSec=0
+
+[Service]
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
+ExecStart=/usr/bin/crunch-dispatch-cloud
+Restart=always
+RestartSec=1
+
+# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
+StartLimitInterval=0
+
+[Install]
+WantedBy=multi-user.target
index 5059a07be5bff4140e329e891f12b26776b3ebaf..75a30e9ef2ade8cfa04d8dec2e6275750a41047d 100644 (file)
@@ -55,7 +55,7 @@ h2. Publish HTML pages inside Workbench
 
 (or some other web site)
 
-You can set @baseurl@ (the URL prefix for all internal links), @arvados_api_host@ and @arvados_workbench_host@ without changing @_config.yml@:
+You can set @baseurl@ (the URL prefix for all internal links), @arvados_cluster_uuid@, @arvados_api_host@ and @arvados_workbench_host@ without changing @_config.yml@:
 
 <pre>
 arvados/doc$ rake generate baseurl=/doc arvados_api_host=xyzzy.arvadosapi.com
index 079f7da27f46b52721849ae9539d6bbe4921dac0..9deca3a28cf8cc8c6911097aee68f01426d86177 100644 (file)
@@ -7,7 +7,7 @@ require "rubygems"
 require "colorize"
 
 task :generate => [ :realclean, 'sdk/python/arvados/index.html', 'sdk/R/arvados/index.html' ] do
-  vars = ['baseurl', 'arvados_api_host', 'arvados_workbench_host']
+  vars = ['baseurl', 'arvados_cluster_uuid', 'arvados_api_host', 'arvados_workbench_host']
   vars.each do |v|
     if ENV[v]
       website.config.h[v] = ENV[v]
index 94c95399662057d9f40d6733d9504419e8e2ed7f..1e17d047062efd8fbf324edcb57979ef83b740df 100644 (file)
@@ -12,6 +12,7 @@
 
 baseurl:
 arvados_api_host: localhost
+arvados_cluster_uuid: local
 arvados_workbench_host: http://localhost
 
 exclude: ["Rakefile", "tmp", "vendor"]
index aeeb37579bcf13d3ef79943baf39ca22f77b5cf9..284d0a1f04aca0117e54737cffba8586c6a57188 100644 (file)
@@ -84,18 +84,17 @@ h3. Connecting to the virtual machine
 
 Use the following command to connect to the _shell_ VM instance as _you_.  Replace *<code>you@shell</code>* at the end of the following command with your *login* and *hostname* from Workbench:
 
-notextile. <pre><code>$ <span class="userinput">ssh -o "ProxyCommand ssh -a -x -p2222 turnout@switchyard.{{ site.arvados_api_host }} <b>shell</b>" -A -x <b>you@shell</b></span></code></pre>
+notextile. <pre><code>$ <span class="userinput">ssh -o "ProxyCommand ssh -p2222 turnout@switchyard.{{ site.arvados_api_host }} -x -a <b>shell</b>" -x <b>you@shell</b></span></code></pre>
 
 This command does several things at once. You usually cannot log in directly to virtual machines over the public Internet.  Instead, you log into a "switchyard" server and then tell the switchyard which virtual machine you want to connect to.
 
 * @-o "ProxyCommand ..."@ configures SSH to run the specified command to create a proxy and route your connection through it.
-* @-a@ tells SSH not to forward your ssh-agent credentials to the switchyard.
-* @-x@ tells SSH not to forward your X session to the switchyard.
 * @-p2222@ specifies that the switchyard is running on non-standard port 2222.
 * <code>turnout@switchyard.{{ site.arvados_api_host }}</code> specifies the user (@turnout@) and hostname (@switchyard.{{ site.arvados_api_host }}@) of the switchyard server that will proxy our connection to the VM.
+* @-x@ tells SSH not to forward your X session to the switchyard.
+* @-a@ tells SSH not to forward your ssh-agent credentials to the switchyard.
 * *@shell@* is the name of the VM that we want to connect to.  This is sent to the switchyard server as if it were an SSH command, and the switchyard server connects to the VM on our behalf.
 * After the ProxyCommand section, we repeat @-x@ to disable X session forwarding to the virtual machine.
-* @-A@ specifies that we want to forward access to @ssh-agent@ to the VM.
 * Finally, *<code>you@shell</code>* specifies your login name and repeats the hostname of the VM.  The username can be found in the *logins* column in the VMs Workbench page, discussed in the previous section.
 
 You should now be able to log into the Arvados VM and "check your environment.":check-environment.html
@@ -105,16 +104,16 @@ h3. Configuration (recommended)
 The command line above is cumbersome, but you can configure SSH to remember many of these settings.  Add this text to the file @.ssh/config@ in your home directory (create a new file if @.ssh/config@ doesn't exist):
 
 <notextile>
-<pre><code class="userinput">Host *.arvados
-  ProxyCommand ssh -a -x -p2222 turnout@switchyard.{{ site.arvados_api_host }} $SSH_PROXY_FLAGS %h
+<pre><code class="userinput">Host *.{{ site.arvados_cluster_uuid }}
+  TCPKeepAlive yes
+  ServerAliveInterval 60
+  ProxyCommand ssh -p2222 turnout@switchyard.{{ site.arvados_api_host }} -x -a $SSH_PROXY_FLAGS %h
   User <b>you</b>
-  ForwardAgent yes
-  ForwardX11 no
 </code></pre>
 </notextile>
 
-This will recognize any host ending in ".arvados" and automatically apply the proxy, user and forwarding settings from the configuration file, allowing you to log in with a much simpler command:
+This will recognize any host ending in ".{{ site.arvados_cluster_uuid }}" and automatically apply the proxy, user and forwarding settings from the configuration file, allowing you to log in with a much simpler command:
 
-notextile. <pre><code>$ <span class="userinput">ssh <b>shell</b>.arvados</span></code></pre>
+notextile. <pre><code>$ <span class="userinput">ssh <b>shell</b>.{{ site.arvados_cluster_uuid }}</span></code></pre>
 
 You should now be able to log into the Arvados VM and "check your environment.":check-environment.html
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
new file mode 100644 (file)
index 0000000..e3a0725
--- /dev/null
@@ -0,0 +1,179 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package cloud
+
+import (
+       "io"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+// A RateLimitError should be returned by an InstanceSet when the
+// cloud service indicates it is rejecting all API calls for some time
+// interval.
+type RateLimitError interface {
+       // Time before which the caller should expect requests to
+       // fail.
+       EarliestRetry() time.Time
+       error
+}
+
+// A QuotaError should be returned by an InstanceSet when the cloud
+// service indicates the account cannot create more VMs than already
+// exist.
+type QuotaError interface {
+       // If true, don't create more instances until some existing
+       // instances are destroyed. If false, don't handle the error
+       // as a quota error.
+       IsQuotaError() bool
+       error
+}
+
+type InstanceSetID string
+type InstanceTags map[string]string
+type InstanceID string
+type ImageID string
+
+// An Executor executes commands on an ExecutorTarget.
+type Executor interface {
+       // Update the set of private keys used to authenticate to
+       // targets.
+       SetSigners(...ssh.Signer)
+
+       // Set the target used for subsequent command executions.
+       SetTarget(ExecutorTarget)
+
+       // Return the current target.
+       Target() ExecutorTarget
+
+       // Execute a shell command and return the resulting stdout and
+       // stderr. stdin can be nil.
+       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+}
+
+// An ExecutorTarget is a remote command execution service.
+type ExecutorTarget interface {
+       // SSH server hostname or IP address, or empty string if
+       // unknown while instance is booting.
+       Address() string
+
+       // Return nil if the given public key matches the instance's
+       // SSH server key. If the provided Dialer is not nil,
+       // VerifyHostKey can use it to make outgoing network
+       // connections from the instance -- e.g., to use the cloud's
+       // "this instance's metadata" API.
+       VerifyHostKey(ssh.PublicKey, *ssh.Client) error
+}
+
+// Instance is implemented by the provider-specific instance types.
+type Instance interface {
+       ExecutorTarget
+
+       // ID returns the provider's instance ID. It must be stable
+       // for the life of the instance.
+       ID() InstanceID
+
+       // String typically returns the cloud-provided instance ID.
+       String() string
+
+       // Cloud provider's "instance type" ID. Matches a ProviderType
+       // in the cluster's InstanceTypes configuration.
+       ProviderType() string
+
+       // Get current tags
+       Tags() InstanceTags
+
+       // Replace tags with the given tags
+       SetTags(InstanceTags) error
+
+       // Shut down the node
+       Destroy() error
+}
+
+// An InstanceSet manages a set of VM instances created by an elastic
+// cloud provider like AWS, GCE, or Azure.
+//
+// All public methods of an InstanceSet, and all public methods of the
+// instances it returns, are goroutine safe.
+type InstanceSet interface {
+       // Create a new instance. If supported by the driver, add the
+       // provided public key to /root/.ssh/authorized_keys.
+       //
+       // The returned error should implement RateLimitError and
+       // QuotaError where applicable.
+       Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+
+       // Return all instances, including ones that are booting or
+       // shutting down. Optionally, filter out nodes that don't have
+       // all of the given InstanceTags (the caller will ignore these
+       // anyway).
+       //
+       // An instance returned by successive calls to Instances() may
+       // -- but does not need to -- be represented by the same
+       // Instance object each time. Thus, the caller is responsible
+       // for de-duplicating the returned instances by comparing the
+       // InstanceIDs returned by the instances' ID() methods.
+       Instances(InstanceTags) ([]Instance, error)
+
+       // Stop any background tasks and release other resources.
+       Stop()
+}
+
+// A Driver returns an InstanceSet that uses the given InstanceSetID
+// and driver-dependent configuration parameters.
+//
+// The supplied id will be of the form "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+// where each z can be any alphanum. The returned InstanceSet must use
+// this id to tag long-lived cloud resources that it creates, and must
+// assume control of any existing resources that are tagged with the
+// same id. Tagging can be accomplished by including the ID in
+// resource names, using the cloud provider's tagging feature, or any
+// other mechanism. The tags must be visible to another instance of
+// the same driver running on a different host.
+//
+// The returned InstanceSet must ignore existing resources that are
+// visible but not tagged with the given id, except that it should log
+// a summary of such resources -- only once -- when it starts
+// up. Thus, two identically configured InstanceSets running on
+// different hosts with different ids should log about the existence
+// of each other's resources at startup, but will not interfere with
+// each other.
+//
+// Example:
+//
+//     type exampleInstanceSet struct {
+//             ownID     string
+//             AccessKey string
+//     }
+//
+//     type exampleDriver struct {}
+//
+//     func (*exampleDriver) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+//             var is exampleInstanceSet
+//             if err := mapstructure.Decode(config, &is); err != nil {
+//                     return nil, err
+//             }
+//             is.ownID = id
+//             return &is, nil
+//     }
+//
+//     var _ = registerCloudDriver("example", &exampleDriver{})
+type Driver interface {
+       InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)
+}
+
+// DriverFunc makes a Driver using the provided function as its
+// InstanceSet method. This is similar to http.HandlerFunc.
+func DriverFunc(fn func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)) Driver {
+       return driverFunc(fn)
+}
+
+type driverFunc func(config map[string]interface{}, id InstanceSetID) (InstanceSet, error)
+
+func (df driverFunc) InstanceSet(config map[string]interface{}, id InstanceSetID) (InstanceSet, error) {
+       return df(config, id)
+}
index 8c65cf7acf1b6dd7bc02660464be06ea07cc3daa..9292ef7e5ff5b3afb6012833299d9f89a7ea346c 100644 (file)
@@ -36,8 +36,9 @@ func (v Version) RunCommand(prog string, args []string, stdin io.Reader, stdout,
        return 0
 }
 
-// Multi is a Handler that looks up its first argument in a map, and
-// invokes the resulting Handler with the remaining args.
+// Multi is a Handler that looks up its first argument in a map (after
+// stripping any "arvados-" or "crunch-" prefix), and invokes the
+// resulting Handler with the remaining args.
 //
 // Example:
 //
diff --git a/lib/dispatchcloud/cmd.go b/lib/dispatchcloud/cmd.go
new file mode 100644 (file)
index 0000000..92948fb
--- /dev/null
@@ -0,0 +1,19 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "git.curoverse.com/arvados.git/lib/cmd"
+       "git.curoverse.com/arvados.git/lib/service"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
+
+func newHandler(cluster *arvados.Cluster, _ *arvados.NodeProfile) service.Handler {
+       d := &dispatcher{Cluster: cluster}
+       go d.Start()
+       return d
+}
diff --git a/lib/dispatchcloud/container/queue.go b/lib/dispatchcloud/container/queue.go
new file mode 100644 (file)
index 0000000..432f4d4
--- /dev/null
@@ -0,0 +1,378 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package container
+
+import (
+       "io"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+type typeChooser func(*arvados.Container) (arvados.InstanceType, error)
+
+// An APIClient performs Arvados API requests. It is typically an
+// *arvados.Client.
+type APIClient interface {
+       RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error
+}
+
+// A QueueEnt is an entry in the queue, consisting of a container
+// record and the instance type that should be used to run it.
+type QueueEnt struct {
+       // The container to run. Only the UUID, State, Priority, and
+       // RuntimeConstraints fields are populated.
+       Container    arvados.Container
+       InstanceType arvados.InstanceType
+}
+
+// String implements fmt.Stringer by returning the queued container's
+// UUID.
+func (c *QueueEnt) String() string {
+       return c.Container.UUID
+}
+
+// A Queue is an interface to an Arvados cluster's container
+// database. It presents only the containers that are eligible to be
+// run by, are already being run by, or have recently been run by the
+// present dispatcher.
+//
+// The Entries, Get, and Forget methods do not block: they return
+// immediately, using cached data.
+//
+// The updating methods (Cancel, Lock, Unlock, Update) do block: they
+// return only after the operation has completed.
+//
+// A Queue's Update method should be called periodically to keep the
+// cache up to date.
+type Queue struct {
+       logger     logrus.FieldLogger
+       reg        *prometheus.Registry
+       chooseType typeChooser
+       client     APIClient
+
+       auth    *arvados.APIClientAuthorization
+       current map[string]QueueEnt
+       updated time.Time
+       mtx     sync.Mutex
+
+       // Methods that modify the Queue (like Lock) add the affected
+       // container UUIDs to dontupdate. When applying a batch of
+       // updates received from the network, anything appearing in
+       // dontupdate is skipped, in case the received update has
+       // already been superseded by the locally initiated change.
+       // When no network update is in progress, this protection is
+       // not needed, and dontupdate is nil.
+       dontupdate map[string]struct{}
+
+       // active notification subscribers (see Subscribe)
+       subscribers map[<-chan struct{}]chan struct{}
+}
+
+// NewQueue returns a new Queue. When a new container appears in the
+// Arvados cluster's queue during Update, chooseType will be called to
+// assign an appropriate arvados.InstanceType for the queue entry.
+func NewQueue(logger logrus.FieldLogger, reg *prometheus.Registry, chooseType typeChooser, client APIClient) *Queue {
+       return &Queue{
+               logger:      logger,
+               reg:         reg,
+               chooseType:  chooseType,
+               client:      client,
+               current:     map[string]QueueEnt{},
+               subscribers: map[<-chan struct{}]chan struct{}{},
+       }
+}
+
+// Subscribe returns a channel that becomes ready to receive when an
+// entry in the Queue is updated.
+//
+//     ch := q.Subscribe()
+//     defer q.Unsubscribe(ch)
+//     for range ch {
+//             // ...
+//     }
+func (cq *Queue) Subscribe() <-chan struct{} {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       ch := make(chan struct{}, 1)
+       cq.subscribers[ch] = ch
+       return ch
+}
+
+// Unsubscribe stops sending updates to the given channel. See
+// Subscribe.
+func (cq *Queue) Unsubscribe(ch <-chan struct{}) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       delete(cq.subscribers, ch)
+}
+
+// Caller must have lock.
+func (cq *Queue) notify() {
+       for _, ch := range cq.subscribers {
+               select {
+               case ch <- struct{}{}:
+               default:
+               }
+       }
+}
+
+// Forget drops the specified container from the cache. It should be
+// called on finalized containers to avoid leaking memory over
+// time. It is a no-op if the indicated container is not in a
+// finalized state.
+func (cq *Queue) Forget(uuid string) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       ctr := cq.current[uuid].Container
+       if ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled {
+               delete(cq.current, uuid)
+       }
+}
+
+// Get returns the (partial) Container record for the specified
+// container. Like a map lookup, its second return value is false if
+// the specified container is not in the Queue.
+func (cq *Queue) Get(uuid string) (arvados.Container, bool) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       if ctr, ok := cq.current[uuid]; !ok {
+               return arvados.Container{}, false
+       } else {
+               return ctr.Container, true
+       }
+}
+
+// Entries returns all cache entries, keyed by container UUID.
+//
+// The returned threshold indicates the maximum age of any cached data
+// returned in the map. This makes it possible for a scheduler to
+// determine correctly the outcome of a remote process that updates
+// container state. It must first wait for the remote process to exit,
+// then wait for the Queue to start and finish its next Update --
+// i.e., it must wait until threshold > timeProcessExited.
+func (cq *Queue) Entries() (entries map[string]QueueEnt, threshold time.Time) {
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       entries = make(map[string]QueueEnt, len(cq.current))
+       for uuid, ctr := range cq.current {
+               entries[uuid] = ctr
+       }
+       threshold = cq.updated
+       return
+}
+
+// Update refreshes the cache from the Arvados API. It adds newly
+// queued containers, and updates the state of previously queued
+// containers.
+func (cq *Queue) Update() error {
+       cq.mtx.Lock()
+       cq.dontupdate = map[string]struct{}{}
+       updateStarted := time.Now()
+       cq.mtx.Unlock()
+
+       next, err := cq.poll()
+       if err != nil {
+               return err
+       }
+
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       for uuid, ctr := range next {
+               if _, keep := cq.dontupdate[uuid]; keep {
+                       continue
+               }
+               if cur, ok := cq.current[uuid]; !ok {
+                       cq.addEnt(uuid, *ctr)
+               } else {
+                       cur.Container = *ctr
+                       cq.current[uuid] = cur
+               }
+       }
+       for uuid := range cq.current {
+               if _, keep := cq.dontupdate[uuid]; keep {
+                       continue
+               } else if _, keep = next[uuid]; keep {
+                       continue
+               } else {
+                       delete(cq.current, uuid)
+               }
+       }
+       cq.dontupdate = nil
+       cq.updated = updateStarted
+       cq.notify()
+       return nil
+}
+
+func (cq *Queue) addEnt(uuid string, ctr arvados.Container) {
+       it, err := cq.chooseType(&ctr)
+       if err != nil {
+               // FIXME: throttle warnings, cancel after timeout
+               cq.logger.Warnf("cannot run %s", &ctr)
+               return
+       }
+       cq.current[uuid] = QueueEnt{Container: ctr, InstanceType: it}
+}
+
+// Lock acquires the dispatch lock for the given container.
+func (cq *Queue) Lock(uuid string) error {
+       return cq.apiUpdate(uuid, "lock")
+}
+
+// Unlock releases the dispatch lock for the given container.
+func (cq *Queue) Unlock(uuid string) error {
+       return cq.apiUpdate(uuid, "unlock")
+}
+
+// Cancel cancels the given container.
+func (cq *Queue) Cancel(uuid string) error {
+       err := cq.client.RequestAndDecode(nil, "PUT", "arvados/v1/containers/"+uuid, nil, map[string]map[string]interface{}{
+               "container": {"state": arvados.ContainerStateCancelled},
+       })
+       if err != nil {
+               return err
+       }
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       cq.notify()
+       return nil
+}
+
+func (cq *Queue) apiUpdate(uuid, action string) error {
+       var resp arvados.Container
+       err := cq.client.RequestAndDecode(&resp, "POST", "arvados/v1/containers/"+uuid+"/"+action, nil, nil)
+       if err != nil {
+               return err
+       }
+
+       cq.mtx.Lock()
+       defer cq.mtx.Unlock()
+       if cq.dontupdate != nil {
+               cq.dontupdate[uuid] = struct{}{}
+       }
+       if ent, ok := cq.current[uuid]; !ok {
+               cq.addEnt(uuid, resp)
+       } else {
+               ent.Container.State, ent.Container.Priority, ent.Container.LockedByUUID = resp.State, resp.Priority, resp.LockedByUUID
+               cq.current[uuid] = ent
+       }
+       cq.notify()
+       return nil
+}
+
+func (cq *Queue) poll() (map[string]*arvados.Container, error) {
+       cq.mtx.Lock()
+       size := len(cq.current)
+       auth := cq.auth
+       cq.mtx.Unlock()
+
+       if auth == nil {
+               auth = &arvados.APIClientAuthorization{}
+               err := cq.client.RequestAndDecode(auth, "GET", "arvados/v1/api_client_authorizations/current", nil, nil)
+               if err != nil {
+                       return nil, err
+               }
+               cq.mtx.Lock()
+               cq.auth = auth
+               cq.mtx.Unlock()
+       }
+
+       next := make(map[string]*arvados.Container, size)
+       apply := func(updates []arvados.Container) {
+               for _, upd := range updates {
+                       if next[upd.UUID] == nil {
+                               next[upd.UUID] = &arvados.Container{}
+                       }
+                       *next[upd.UUID] = upd
+               }
+       }
+       selectParam := []string{"uuid", "state", "priority", "runtime_constraints"}
+       limitParam := 1000
+
+       mine, err := cq.fetchAll(arvados.ResourceListParams{
+               Select:  selectParam,
+               Order:   "uuid",
+               Limit:   &limitParam,
+               Count:   "none",
+               Filters: []arvados.Filter{{"locked_by_uuid", "=", auth.UUID}},
+       })
+       if err != nil {
+               return nil, err
+       }
+       apply(mine)
+
+       avail, err := cq.fetchAll(arvados.ResourceListParams{
+               Select:  selectParam,
+               Order:   "uuid",
+               Limit:   &limitParam,
+               Count:   "none",
+               Filters: []arvados.Filter{{"state", "=", arvados.ContainerStateQueued}, {"priority", ">", "0"}},
+       })
+       if err != nil {
+               return nil, err
+       }
+       apply(avail)
+
+       var missing []string
+       cq.mtx.Lock()
+       for uuid, ent := range cq.current {
+               if next[uuid] == nil &&
+                       ent.Container.State != arvados.ContainerStateCancelled &&
+                       ent.Container.State != arvados.ContainerStateComplete {
+                       missing = append(missing, uuid)
+               }
+       }
+       cq.mtx.Unlock()
+
+       for i, page := 0, 20; i < len(missing); i += page {
+               batch := missing[i:]
+               if len(batch) > page {
+                       batch = batch[:page]
+               }
+               ended, err := cq.fetchAll(arvados.ResourceListParams{
+                       Select:  selectParam,
+                       Order:   "uuid",
+                       Count:   "none",
+                       Filters: []arvados.Filter{{"uuid", "in", batch}},
+               })
+               if err != nil {
+                       return nil, err
+               }
+               apply(ended)
+       }
+       return next, nil
+}
+
+func (cq *Queue) fetchAll(initialParams arvados.ResourceListParams) ([]arvados.Container, error) {
+       var results []arvados.Container
+       params := initialParams
+       params.Offset = 0
+       for {
+               // This list variable must be a new one declared
+               // inside the loop: otherwise, items in the API
+               // response would get deep-merged into the items
+               // loaded in previous iterations.
+               var list arvados.ContainerList
+
+               err := cq.client.RequestAndDecode(&list, "GET", "arvados/v1/containers", nil, params)
+               if err != nil {
+                       return nil, err
+               }
+               if len(list.Items) == 0 {
+                       break
+               }
+
+               results = append(results, list.Items...)
+               if len(params.Order) == 1 && params.Order == "uuid" {
+                       params.Filters = append(initialParams.Filters, arvados.Filter{"uuid", ">", list.Items[len(list.Items)-1].UUID})
+               } else {
+                       params.Offset += len(list.Items)
+               }
+       }
+       return results, nil
+}
diff --git a/lib/dispatchcloud/dispatcher.go b/lib/dispatchcloud/dispatcher.go
new file mode 100644 (file)
index 0000000..81ad0ed
--- /dev/null
@@ -0,0 +1,197 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/scheduler"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       "golang.org/x/crypto/ssh"
+)
+
+const (
+       defaultPollInterval     = time.Second
+       defaultStaleLockTimeout = time.Minute
+)
+
+type pool interface {
+       scheduler.WorkerPool
+       Instances() []worker.InstanceView
+       Stop()
+}
+
+type dispatcher struct {
+       Cluster       *arvados.Cluster
+       InstanceSetID cloud.InstanceSetID
+
+       logger      logrus.FieldLogger
+       reg         *prometheus.Registry
+       instanceSet cloud.InstanceSet
+       pool        pool
+       queue       scheduler.ContainerQueue
+       httpHandler http.Handler
+       sshKey      ssh.Signer
+
+       setupOnce sync.Once
+       stop      chan struct{}
+       stopped   chan struct{}
+}
+
+// Start starts the dispatcher. Start can be called multiple times
+// with no ill effect.
+func (disp *dispatcher) Start() {
+       disp.setupOnce.Do(disp.setup)
+}
+
+// ServeHTTP implements service.Handler.
+func (disp *dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       disp.Start()
+       disp.httpHandler.ServeHTTP(w, r)
+}
+
+// CheckHealth implements service.Handler.
+func (disp *dispatcher) CheckHealth() error {
+       disp.Start()
+       return nil
+}
+
+// Stop dispatching containers and release resources. Typically used
+// in tests.
+func (disp *dispatcher) Close() {
+       disp.Start()
+       select {
+       case disp.stop <- struct{}{}:
+       default:
+       }
+       <-disp.stopped
+}
+
+// Make a worker.Executor for the given instance.
+func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
+       exr := ssh_executor.New(inst)
+       exr.SetSigners(disp.sshKey)
+       return exr
+}
+
+func (disp *dispatcher) typeChooser(ctr *arvados.Container) (arvados.InstanceType, error) {
+       return ChooseInstanceType(disp.Cluster, ctr)
+}
+
+func (disp *dispatcher) setup() {
+       disp.initialize()
+       go disp.run()
+}
+
+func (disp *dispatcher) initialize() {
+       arvClient := arvados.NewClientFromEnv()
+       if disp.InstanceSetID == "" {
+               if strings.HasPrefix(arvClient.AuthToken, "v2/") {
+                       disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
+               } else {
+                       // Use some other string unique to this token
+                       // that doesn't reveal the token itself.
+                       disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
+               }
+       }
+       disp.stop = make(chan struct{}, 1)
+       disp.stopped = make(chan struct{})
+       disp.logger = logrus.StandardLogger()
+
+       if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
+               disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
+       } else {
+               disp.sshKey = key
+       }
+
+       instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID)
+       if err != nil {
+               disp.logger.Fatalf("error initializing driver: %s", err)
+       }
+       disp.instanceSet = &instanceSetProxy{instanceSet}
+       disp.reg = prometheus.NewRegistry()
+       disp.pool = worker.NewPool(disp.logger, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+       disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
+
+       if disp.Cluster.ManagementToken == "" {
+               disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+                       http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+               })
+       } else {
+               mux := http.NewServeMux()
+               mux.HandleFunc("/arvados/v1/dispatch/containers", disp.apiContainers)
+               mux.HandleFunc("/arvados/v1/dispatch/instances", disp.apiInstances)
+               metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
+                       ErrorLog: disp.logger,
+               })
+               mux.Handle("/metrics", metricsH)
+               mux.Handle("/metrics.json", metricsH)
+               disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
+       }
+}
+
+func (disp *dispatcher) run() {
+       defer close(disp.stopped)
+       defer disp.instanceSet.Stop()
+       defer disp.pool.Stop()
+
+       staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)
+       if staleLockTimeout == 0 {
+               staleLockTimeout = defaultStaleLockTimeout
+       }
+       pollInterval := time.Duration(disp.Cluster.Dispatch.PollInterval)
+       if pollInterval <= 0 {
+               pollInterval = defaultPollInterval
+       }
+       sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+       sched.Start()
+       defer sched.Stop()
+
+       <-disp.stop
+}
+
+// Management API: all active and queued containers.
+func (disp *dispatcher) apiContainers(w http.ResponseWriter, r *http.Request) {
+       if r.Method != "GET" {
+               httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+       var resp struct {
+               Items []container.QueueEnt
+       }
+       qEntries, _ := disp.queue.Entries()
+       for _, ent := range qEntries {
+               resp.Items = append(resp.Items, ent)
+       }
+       json.NewEncoder(w).Encode(resp)
+}
+
+// Management API: all active instances (cloud VMs).
+func (disp *dispatcher) apiInstances(w http.ResponseWriter, r *http.Request) {
+       if r.Method != "GET" {
+               httpserver.Error(w, "method not allowed", http.StatusMethodNotAllowed)
+               return
+       }
+       var resp struct {
+               Items []worker.InstanceView
+       }
+       resp.Items = disp.pool.Instances()
+       json.NewEncoder(w).Encode(resp)
+}
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
new file mode 100644 (file)
index 0000000..33823a8
--- /dev/null
@@ -0,0 +1,269 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "encoding/json"
+       "io/ioutil"
+       "math/rand"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&DispatcherSuite{})
+
+type DispatcherSuite struct {
+       cluster     *arvados.Cluster
+       instanceSet *test.LameInstanceSet
+       stubDriver  *test.StubDriver
+       disp        *dispatcher
+}
+
+func (s *DispatcherSuite) SetUpSuite(c *check.C) {
+       if os.Getenv("ARVADOS_DEBUG") != "" {
+               logrus.StandardLogger().SetLevel(logrus.DebugLevel)
+       }
+}
+
+func (s *DispatcherSuite) SetUpTest(c *check.C) {
+       dispatchpub, _ := test.LoadTestKey(c, "test/sshkey_dispatch")
+       dispatchprivraw, err := ioutil.ReadFile("test/sshkey_dispatch")
+       c.Assert(err, check.IsNil)
+
+       _, hostpriv := test.LoadTestKey(c, "test/sshkey_vm")
+       s.stubDriver = &test.StubDriver{
+               HostKey:          hostpriv,
+               AuthorizedKeys:   []ssh.PublicKey{dispatchpub},
+               ErrorRateDestroy: 0.1,
+       }
+
+       s.cluster = &arvados.Cluster{
+               CloudVMs: arvados.CloudVMs{
+                       Driver:          "test",
+                       SyncInterval:    arvados.Duration(10 * time.Millisecond),
+                       TimeoutIdle:     arvados.Duration(30 * time.Millisecond),
+                       TimeoutBooting:  arvados.Duration(30 * time.Millisecond),
+                       TimeoutProbe:    arvados.Duration(15 * time.Millisecond),
+                       TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+               },
+               Dispatch: arvados.Dispatch{
+                       PrivateKey:         dispatchprivraw,
+                       PollInterval:       arvados.Duration(5 * time.Millisecond),
+                       ProbeInterval:      arvados.Duration(5 * time.Millisecond),
+                       StaleLockTimeout:   arvados.Duration(5 * time.Millisecond),
+                       MaxProbesPerSecond: 1000,
+               },
+               InstanceTypes: arvados.InstanceTypeMap{
+                       test.InstanceType(1).Name:  test.InstanceType(1),
+                       test.InstanceType(2).Name:  test.InstanceType(2),
+                       test.InstanceType(3).Name:  test.InstanceType(3),
+                       test.InstanceType(4).Name:  test.InstanceType(4),
+                       test.InstanceType(6).Name:  test.InstanceType(6),
+                       test.InstanceType(8).Name:  test.InstanceType(8),
+                       test.InstanceType(16).Name: test.InstanceType(16),
+               },
+               NodeProfiles: map[string]arvados.NodeProfile{
+                       "*": {
+                               Controller:    arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
+                               DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
+                       },
+               },
+       }
+       s.disp = &dispatcher{Cluster: s.cluster}
+       // Test cases can modify s.cluster before calling
+       // initialize(), and then modify private state before calling
+       // go run().
+}
+
+func (s *DispatcherSuite) TearDownTest(c *check.C) {
+       s.disp.Close()
+}
+
+// DispatchToStubDriver checks that the dispatcher wires everything
+// together effectively. It uses a real scheduler and worker pool with
+// a fake queue and cloud driver. The fake cloud driver injects
+// artificial errors in order to exercise a variety of code paths.
+func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
+       drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       queue := &test.Queue{
+               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                       return ChooseInstanceType(s.cluster, ctr)
+               },
+       }
+       for i := 0; i < 200; i++ {
+               queue.Containers = append(queue.Containers, arvados.Container{
+                       UUID:     test.ContainerUUID(i + 1),
+                       State:    arvados.ContainerStateQueued,
+                       Priority: int64(i%20 + 1),
+                       RuntimeConstraints: arvados.RuntimeConstraints{
+                               RAM:   int64(i%3+1) << 30,
+                               VCPUs: i%8 + 1,
+                       },
+               })
+       }
+       s.disp.queue = queue
+
+       var mtx sync.Mutex
+       done := make(chan struct{})
+       waiting := map[string]struct{}{}
+       for _, ctr := range queue.Containers {
+               waiting[ctr.UUID] = struct{}{}
+       }
+       executeContainer := func(ctr arvados.Container) int {
+               mtx.Lock()
+               defer mtx.Unlock()
+               if _, ok := waiting[ctr.UUID]; !ok {
+                       c.Logf("container completed twice: %s -- perhaps completed after stub instance was killed?", ctr.UUID)
+                       return 1
+               }
+               delete(waiting, ctr.UUID)
+               if len(waiting) == 0 {
+                       close(done)
+               }
+               return int(rand.Uint32() & 0x3)
+       }
+       n := 0
+       s.stubDriver.Queue = queue
+       s.stubDriver.SetupVM = func(stubvm *test.StubVM) {
+               n++
+               stubvm.Boot = time.Now().Add(time.Duration(rand.Int63n(int64(5 * time.Millisecond))))
+               stubvm.CrunchRunDetachDelay = time.Duration(rand.Int63n(int64(10 * time.Millisecond)))
+               stubvm.ExecuteContainer = executeContainer
+               switch n % 7 {
+               case 0:
+                       stubvm.Broken = time.Now().Add(time.Duration(rand.Int63n(90)) * time.Millisecond)
+               case 1:
+                       stubvm.CrunchRunMissing = true
+               default:
+                       stubvm.CrunchRunCrashRate = 0.1
+               }
+       }
+
+       start := time.Now()
+       go s.disp.run()
+       err := s.disp.CheckHealth()
+       c.Check(err, check.IsNil)
+
+       select {
+       case <-done:
+               c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
+       case <-time.After(10 * time.Second):
+               c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+       }
+
+       deadline := time.Now().Add(time.Second)
+       for range time.NewTicker(10 * time.Millisecond).C {
+               insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
+               c.Check(err, check.IsNil)
+               queue.Update()
+               ents, _ := queue.Entries()
+               if len(ents) == 0 && len(insts) == 0 {
+                       break
+               }
+               if time.Now().After(deadline) {
+                       c.Fatalf("timed out with %d containers (%v), %d instances (%+v)", len(ents), ents, len(insts), insts)
+               }
+       }
+}
+
+func (s *DispatcherSuite) TestAPIPermissions(c *check.C) {
+       s.cluster.ManagementToken = "abcdefgh"
+       drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
+
+       for _, token := range []string{"abc", ""} {
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+               if token != "" {
+                       req.Header.Set("Authorization", "Bearer "+token)
+               }
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               if token == "" {
+                       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+               } else {
+                       c.Check(resp.Code, check.Equals, http.StatusForbidden)
+               }
+       }
+}
+
+func (s *DispatcherSuite) TestAPIDisabled(c *check.C) {
+       s.cluster.ManagementToken = ""
+       drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
+
+       for _, token := range []string{"abc", ""} {
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+               if token != "" {
+                       req.Header.Set("Authorization", "Bearer "+token)
+               }
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               c.Check(resp.Code, check.Equals, http.StatusForbidden)
+       }
+}
+
+func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
+       s.cluster.ManagementToken = "abcdefgh"
+       s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
+       drivers["test"] = s.stubDriver
+       s.disp.setupOnce.Do(s.disp.initialize)
+       s.disp.queue = &test.Queue{}
+       go s.disp.run()
+
+       type instance struct {
+               Instance             string
+               WorkerState          string
+               Price                float64
+               LastContainerUUID    string
+               ArvadosInstanceType  string
+               ProviderInstanceType string
+       }
+       type instancesResponse struct {
+               Items []instance
+       }
+       getInstances := func() instancesResponse {
+               req := httptest.NewRequest("GET", "/arvados/v1/dispatch/instances", nil)
+               req.Header.Set("Authorization", "Bearer abcdefgh")
+               resp := httptest.NewRecorder()
+               s.disp.ServeHTTP(resp, req)
+               var sr instancesResponse
+               c.Check(resp.Code, check.Equals, http.StatusOK)
+               err := json.Unmarshal(resp.Body.Bytes(), &sr)
+               c.Check(err, check.IsNil)
+               return sr
+       }
+
+       sr := getInstances()
+       c.Check(len(sr.Items), check.Equals, 0)
+
+       ch := s.disp.pool.Subscribe()
+       defer s.disp.pool.Unsubscribe(ch)
+       err := s.disp.pool.Create(test.InstanceType(1))
+       c.Check(err, check.IsNil)
+       <-ch
+
+       sr = getInstances()
+       c.Assert(len(sr.Items), check.Equals, 1)
+       c.Check(sr.Items[0].Instance, check.Matches, "stub.*")
+       c.Check(sr.Items[0].WorkerState, check.Equals, "booting")
+       c.Check(sr.Items[0].Price, check.Equals, 0.123)
+       c.Check(sr.Items[0].LastContainerUUID, check.Equals, "")
+       c.Check(sr.Items[0].ProviderInstanceType, check.Equals, test.InstanceType(1).ProviderType)
+       c.Check(sr.Items[0].ArvadosInstanceType, check.Equals, test.InstanceType(1).Name)
+}
diff --git a/lib/dispatchcloud/driver.go b/lib/dispatchcloud/driver.go
new file mode 100644 (file)
index 0000000..295fd61
--- /dev/null
@@ -0,0 +1,22 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var drivers = map[string]cloud.Driver{}
+
+func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID) (cloud.InstanceSet, error) {
+       driver, ok := drivers[cluster.CloudVMs.Driver]
+       if !ok {
+               return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
+       }
+       return driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID)
+}
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
new file mode 100644 (file)
index 0000000..e728b67
--- /dev/null
@@ -0,0 +1,25 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+type instanceSetProxy struct {
+       cloud.InstanceSet
+}
+
+func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
+       // TODO: return if Create failed recently with a RateLimitError or QuotaError
+       return is.InstanceSet.Create(it, id, tags, pk)
+}
+
+func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+       // TODO: return if Instances failed recently with a RateLimitError
+       return is.InstanceSet.Instances(tags)
+}
diff --git a/lib/dispatchcloud/logger.go b/lib/dispatchcloud/logger.go
new file mode 100644 (file)
index 0000000..90bb6ca
--- /dev/null
@@ -0,0 +1,29 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "sync"
+       "time"
+)
+
+type logger interface {
+       Printf(string, ...interface{})
+       Warnf(string, ...interface{})
+       Debugf(string, ...interface{})
+}
+
+var nextSpam = map[string]time.Time{}
+var nextSpamMtx sync.Mutex
+
+func unspam(msg string) bool {
+       nextSpamMtx.Lock()
+       defer nextSpamMtx.Unlock()
+       if nextSpam[msg].Before(time.Now()) {
+               nextSpam[msg] = time.Now().Add(time.Minute)
+               return true
+       }
+       return false
+}
index 339e042c1aa0d15cb3f6cf1994c6146bf34de11d..d7f4585619417904a1125bc05d54d58499199179 100644 (file)
@@ -6,21 +6,16 @@ package dispatchcloud
 
 import (
        "errors"
-       "log"
-       "os/exec"
        "regexp"
        "sort"
        "strconv"
-       "strings"
-       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
-var (
-       ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
-       discountConfiguredRAMPercent  = 5
-)
+var ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+
+var discountConfiguredRAMPercent = 5
 
 // ConstraintsNotSatisfiableError includes a list of available instance types
 // to be reported back to the user.
@@ -38,12 +33,10 @@ var pdhRegexp = regexp.MustCompile(`^[0-9a-f]{32}\+(\d+)$`)
 func estimateDockerImageSize(collectionPDH string) int64 {
        m := pdhRegexp.FindStringSubmatch(collectionPDH)
        if m == nil {
-               log.Printf("estimateDockerImageSize: '%v' did not match pdhRegexp, returning 0", collectionPDH)
                return 0
        }
        n, err := strconv.ParseInt(m[1], 10, 64)
        if err != nil || n < 122 {
-               log.Printf("estimateDockerImageSize: short manifest %v or error (%v), returning 0", n, err)
                return 0
        }
        // To avoid having to fetch the collection, take advantage of
@@ -137,61 +130,3 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
        }
        return
 }
-
-// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
-// type name as a valid feature name, even if no instances of that
-// type have appeared yet.
-//
-// It takes advantage of some SLURM peculiarities:
-//
-// (1) A feature is valid after it has been offered by a node, even if
-// it is no longer offered by any node. So, to make a feature name
-// valid, we can add it to a dummy node ("compute0"), then remove it.
-//
-// (2) To test whether a set of feature names are valid without
-// actually submitting a job, we can call srun --test-only with the
-// desired features.
-//
-// SlurmNodeTypeFeatureKludge does a test-and-fix operation
-// immediately, and then periodically, in case slurm restarts and
-// forgets the list of valid features. It never returns (unless there
-// are no node types configured, in which case it returns
-// immediately), so it should generally be invoked with "go".
-func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
-       if len(cc.InstanceTypes) == 0 {
-               return
-       }
-       var features []string
-       for _, it := range cc.InstanceTypes {
-               features = append(features, "instancetype="+it.Name)
-       }
-       for {
-               slurmKludge(features)
-               time.Sleep(2 * time.Second)
-       }
-}
-
-const slurmDummyNode = "compute0"
-
-func slurmKludge(features []string) {
-       allFeatures := strings.Join(features, ",")
-
-       cmd := exec.Command("sinfo", "--nodes="+slurmDummyNode, "--format=%f", "--noheader")
-       out, err := cmd.CombinedOutput()
-       if err != nil {
-               log.Printf("running %q %q: %s (output was %q)", cmd.Path, cmd.Args, err, out)
-               return
-       }
-       if string(out) == allFeatures+"\n" {
-               // Already configured correctly, nothing to do.
-               return
-       }
-
-       log.Printf("configuring node %q with all node type features", slurmDummyNode)
-       cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+allFeatures)
-       log.Printf("running: %q %q", cmd.Path, cmd.Args)
-       out, err = cmd.CombinedOutput()
-       if err != nil {
-               log.Printf("error: scontrol: %s (output was %q)", err, out)
-       }
-}
diff --git a/lib/dispatchcloud/readme.go b/lib/dispatchcloud/readme.go
new file mode 100644 (file)
index 0000000..c8491fb
--- /dev/null
@@ -0,0 +1,70 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+// A dispatcher comprises a container queue, a scheduler, a worker
+// pool, a remote command executor, and a cloud driver.
+// 1. Choose a provider.
+// 2. Start a worker pool.
+// 3. Start a container queue.
+// 4. Run the scheduler's stale-lock fixer.
+// 5. Run the scheduler's mapper.
+// 6. Run the scheduler's syncer.
+// 7. Wait for updates to the container queue or worker pool.
+// 8. Repeat from 5.
+//
+//
+// A cloud driver creates new cloud VM instances and gets the latest
+// list of instances. The returned instances are caches/proxies for
+// the provider's metadata and control interfaces (get IP address,
+// update tags, shutdown).
+//
+//
+// A worker pool tracks workers' instance types and readiness states
+// (available to do work now, booting, suffering a temporary network
+// outage, shutting down). It loads internal state from the cloud
+// provider's list of instances at startup, and syncs periodically
+// after that.
+//
+//
+// An executor maintains a multiplexed SSH connection to a cloud
+// instance, retrying/reconnecting as needed, so the worker pool can
+// execute commands. It asks the cloud driver's instance to verify its
+// SSH public key once when first connecting, and again later if the
+// key changes.
+//
+//
+// A container queue tracks the known state (according to
+// arvados-controller) of each container of interest -- i.e., queued,
+// or locked/running using our own dispatch token. It also proxies the
+// dispatcher's lock/unlock/cancel requests to the controller. It
+// handles concurrent refresh and update operations without exposing
+// out-of-order updates to its callers. (It drops any new information
+// that might have originated before its own most recent
+// lock/unlock/cancel operation.)
+//
+//
+// The scheduler's stale-lock fixer waits for any already-locked
+// containers (i.e., locked by a prior dispatcher process) to appear
+// on workers as the worker pool recovers its state. It
+// unlocks/requeues any that still remain when all workers are
+// recovered or shutdown, or its timer expires.
+//
+//
+// The scheduler's mapper chooses which containers to assign to which
+// idle workers, and decides what to do when there are not enough idle
+// workers (including shutting down some idle nodes).
+//
+//
+// The scheduler's syncer updates state to Cancelled when a running
+// container process dies without finalizing its entry in the
+// controller database. It also calls the worker pool to kill
+// containers that have priority=0 while locked or running.
+//
+//
+// An instance set proxy wraps a driver's instance set with
+// rate-limiting logic. After the wrapped instance set receives a
+// cloud.RateLimitError, the proxy starts returning errors to callers
+// immediately without calling through to the wrapped instance set.
diff --git a/lib/dispatchcloud/scheduler/fix_stale_locks.go b/lib/dispatchcloud/scheduler/fix_stale_locks.go
new file mode 100644 (file)
index 0000000..264f9e4
--- /dev/null
@@ -0,0 +1,57 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// fixStaleLocks waits for any already-locked containers (i.e., locked
+// by a prior dispatcher process) to appear on workers as the worker
+// pool recovers its state. It unlocks any that still remain when all
+// workers are recovered or shutdown, or its timer
+// (sch.staleLockTimeout) expires.
+func (sch *Scheduler) fixStaleLocks() {
+       wp := sch.pool.Subscribe()
+       defer sch.pool.Unsubscribe(wp)
+       timeout := time.NewTimer(sch.staleLockTimeout)
+waiting:
+       for {
+               unlock := false
+               select {
+               case <-wp:
+                       // If all workers have been contacted, unlock
+                       // containers that aren't claimed by any
+                       // worker.
+                       unlock = sch.pool.CountWorkers()[worker.StateUnknown] == 0
+               case <-timeout.C:
+                       // Give up and unlock the containers, even
+                       // though they might be working.
+                       unlock = true
+               }
+
+               running := sch.pool.Running()
+               qEntries, _ := sch.queue.Entries()
+               for uuid, ent := range qEntries {
+                       if ent.Container.State != arvados.ContainerStateLocked {
+                               continue
+                       }
+                       if _, running := running[uuid]; running {
+                               continue
+                       }
+                       if !unlock {
+                               continue waiting
+                       }
+                       err := sch.queue.Unlock(uuid)
+                       if err != nil {
+                               sch.logger.Warnf("Unlock %s: %s", uuid, err)
+                       }
+               }
+               return
+       }
+}
diff --git a/lib/dispatchcloud/scheduler/gocheck_test.go b/lib/dispatchcloud/scheduler/gocheck_test.go
new file mode 100644 (file)
index 0000000..558c60f
--- /dev/null
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "testing"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/scheduler/interfaces.go b/lib/dispatchcloud/scheduler/interfaces.go
new file mode 100644 (file)
index 0000000..59700c3
--- /dev/null
@@ -0,0 +1,43 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// A ContainerQueue is a set of containers that need to be started or
+// stopped. Implemented by container.Queue and test stubs.
+type ContainerQueue interface {
+       Entries() (entries map[string]container.QueueEnt, updated time.Time)
+       Lock(uuid string) error
+       Unlock(uuid string) error
+       Cancel(uuid string) error
+       Forget(uuid string)
+       Get(uuid string) (arvados.Container, bool)
+       Subscribe() <-chan struct{}
+       Unsubscribe(<-chan struct{})
+       Update() error
+}
+
+// A WorkerPool asynchronously starts and stops worker VMs, and starts
+// and stops containers on them. Implemented by worker.Pool and test
+// stubs.
+type WorkerPool interface {
+       Running() map[string]time.Time
+       Unallocated() map[arvados.InstanceType]int
+       CountWorkers() map[worker.State]int
+       AtQuota() bool
+       Create(arvados.InstanceType) error
+       Shutdown(arvados.InstanceType) bool
+       StartContainer(arvados.InstanceType, arvados.Container) bool
+       KillContainer(uuid string)
+       Subscribe() <-chan struct{}
+       Unsubscribe(<-chan struct{})
+}
diff --git a/lib/dispatchcloud/scheduler/run_queue.go b/lib/dispatchcloud/scheduler/run_queue.go
new file mode 100644 (file)
index 0000000..ece8e3d
--- /dev/null
@@ -0,0 +1,165 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "sort"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+func (sch *Scheduler) runQueue() {
+       unsorted, _ := sch.queue.Entries()
+       sorted := make([]container.QueueEnt, 0, len(unsorted))
+       for _, ent := range unsorted {
+               sorted = append(sorted, ent)
+       }
+       sort.Slice(sorted, func(i, j int) bool {
+               return sorted[i].Container.Priority > sorted[j].Container.Priority
+       })
+
+       running := sch.pool.Running()
+       unalloc := sch.pool.Unallocated()
+
+       sch.logger.WithFields(logrus.Fields{
+               "Containers": len(sorted),
+               "Processes":  len(running),
+       }).Debug("runQueue")
+
+       dontstart := map[arvados.InstanceType]bool{}
+       var overquota []container.QueueEnt // entries that are unmappable because of worker pool quota
+
+tryrun:
+       for i, ctr := range sorted {
+               ctr, it := ctr.Container, ctr.InstanceType
+               logger := sch.logger.WithFields(logrus.Fields{
+                       "ContainerUUID": ctr.UUID,
+                       "InstanceType":  it.Name,
+               })
+               if _, running := running[ctr.UUID]; running || ctr.Priority < 1 {
+                       continue
+               }
+               switch ctr.State {
+               case arvados.ContainerStateQueued:
+                       if unalloc[it] < 1 && sch.pool.AtQuota() {
+                               logger.Debug("not locking: AtQuota and no unalloc workers")
+                               overquota = sorted[i:]
+                               break tryrun
+                       }
+                       sch.bgLock(logger, ctr.UUID)
+                       unalloc[it]--
+               case arvados.ContainerStateLocked:
+                       if unalloc[it] > 0 {
+                               unalloc[it]--
+                       } else if sch.pool.AtQuota() {
+                               logger.Debug("not starting: AtQuota and no unalloc workers")
+                               overquota = sorted[i:]
+                               break tryrun
+                       } else {
+                               logger.Info("creating new instance")
+                               err := sch.pool.Create(it)
+                               if err != nil {
+                                       if _, ok := err.(cloud.QuotaError); !ok {
+                                               logger.WithError(err).Warn("error creating worker")
+                                       }
+                                       sch.queue.Unlock(ctr.UUID)
+                                       // Don't let lower-priority
+                                       // containers starve this one
+                                       // by using keeping idle
+                                       // workers alive on different
+                                       // instance types.  TODO:
+                                       // avoid getting starved here
+                                       // if instances of a specific
+                                       // type always fail.
+                                       overquota = sorted[i:]
+                                       break tryrun
+                               }
+                       }
+
+                       if dontstart[it] {
+                               // We already tried & failed to start
+                               // a higher-priority container on the
+                               // same instance type. Don't let this
+                               // one sneak in ahead of it.
+                       } else if sch.pool.StartContainer(it, ctr) {
+                               // Success.
+                       } else {
+                               dontstart[it] = true
+                       }
+               }
+       }
+
+       if len(overquota) > 0 {
+               // Unlock any containers that are unmappable while
+               // we're at quota.
+               for _, ctr := range overquota {
+                       ctr := ctr.Container
+                       if ctr.State == arvados.ContainerStateLocked {
+                               logger := sch.logger.WithField("ContainerUUID", ctr.UUID)
+                               logger.Debug("unlock because pool capacity is used by higher priority containers")
+                               err := sch.queue.Unlock(ctr.UUID)
+                               if err != nil {
+                                       logger.WithError(err).Warn("error unlocking")
+                               }
+                       }
+               }
+               // Shut down idle workers that didn't get any
+               // containers mapped onto them before we hit quota.
+               for it, n := range unalloc {
+                       if n < 1 {
+                               continue
+                       }
+                       sch.pool.Shutdown(it)
+               }
+       }
+}
+
+// Start an API call to lock the given container, and return
+// immediately while waiting for the response in a new goroutine. Do
+// nothing if a lock request is already in progress for this
+// container.
+func (sch *Scheduler) bgLock(logger logrus.FieldLogger, uuid string) {
+       logger.Debug("locking")
+       sch.mtx.Lock()
+       defer sch.mtx.Unlock()
+       if sch.locking[uuid] {
+               logger.Debug("locking in progress, doing nothing")
+               return
+       }
+       if ctr, ok := sch.queue.Get(uuid); !ok || ctr.State != arvados.ContainerStateQueued {
+               // This happens if the container has been cancelled or
+               // locked since runQueue called sch.queue.Entries(),
+               // possibly by a bgLock() call from a previous
+               // runQueue iteration. In any case, we will respond
+               // appropriately on the next runQueue iteration, which
+               // will have already been triggered by the queue
+               // update.
+               logger.WithField("State", ctr.State).Debug("container no longer queued by the time we decided to lock it, doing nothing")
+               return
+       }
+       sch.locking[uuid] = true
+       go func() {
+               defer func() {
+                       sch.mtx.Lock()
+                       defer sch.mtx.Unlock()
+                       delete(sch.locking, uuid)
+               }()
+               err := sch.queue.Lock(uuid)
+               if err != nil {
+                       logger.WithError(err).Warn("error locking container")
+                       return
+               }
+               logger.Debug("lock succeeded")
+               ctr, ok := sch.queue.Get(uuid)
+               if !ok {
+                       logger.Error("(BUG?) container disappeared from queue after Lock succeeded")
+               } else if ctr.State != arvados.ContainerStateLocked {
+                       logger.Warnf("(race?) container has state=%q after Lock succeeded", ctr.State)
+               }
+       }()
+}
diff --git a/lib/dispatchcloud/scheduler/run_queue_test.go b/lib/dispatchcloud/scheduler/run_queue_test.go
new file mode 100644 (file)
index 0000000..be13e1c
--- /dev/null
@@ -0,0 +1,318 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "errors"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+var (
+       logger = logrus.StandardLogger()
+
+       // arbitrary example container UUIDs
+       uuids = func() (r []string) {
+               for i := 0; i < 16; i++ {
+                       r = append(r, test.ContainerUUID(i))
+               }
+               return
+       }()
+)
+
+type stubQuotaError struct {
+       error
+}
+
+func (stubQuotaError) IsQuotaError() bool { return true }
+
+type stubPool struct {
+       notify    <-chan struct{}
+       unalloc   map[arvados.InstanceType]int // idle+booting+unknown
+       idle      map[arvados.InstanceType]int
+       running   map[string]time.Time
+       atQuota   bool
+       canCreate int
+       creates   []arvados.InstanceType
+       starts    []string
+       shutdowns int
+}
+
+func (p *stubPool) AtQuota() bool                 { return p.atQuota }
+func (p *stubPool) Subscribe() <-chan struct{}    { return p.notify }
+func (p *stubPool) Unsubscribe(<-chan struct{})   {}
+func (p *stubPool) Running() map[string]time.Time { return p.running }
+func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
+       r := map[arvados.InstanceType]int{}
+       for it, n := range p.unalloc {
+               r[it] = n
+       }
+       return r
+}
+func (p *stubPool) Create(it arvados.InstanceType) error {
+       p.creates = append(p.creates, it)
+       if p.canCreate < 1 {
+               return stubQuotaError{errors.New("quota")}
+       }
+       p.canCreate--
+       p.unalloc[it]++
+       return nil
+}
+func (p *stubPool) KillContainer(uuid string) {
+       p.running[uuid] = time.Now()
+}
+func (p *stubPool) Shutdown(arvados.InstanceType) bool {
+       p.shutdowns++
+       return false
+}
+func (p *stubPool) CountWorkers() map[worker.State]int {
+       return map[worker.State]int{
+               worker.StateBooting: len(p.unalloc) - len(p.idle),
+               worker.StateIdle:    len(p.idle),
+               worker.StateRunning: len(p.running),
+       }
+}
+func (p *stubPool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+       p.starts = append(p.starts, ctr.UUID)
+       if p.idle[it] == 0 {
+               return false
+       }
+       p.idle[it]--
+       p.unalloc[it]--
+       p.running[ctr.UUID] = time.Time{}
+       return true
+}
+
+var _ = check.Suite(&SchedulerSuite{})
+
+type SchedulerSuite struct{}
+
+// Assign priority=4 container to idle node. Create a new instance for
+// the priority=3 container. Don't try to start any priority<3
+// containers because priority=3 container didn't start
+// immediately. Don't try to create any other nodes after the failed
+// create.
+func (*SchedulerSuite) TestUseIdleWorkers(c *check.C) {
+       queue := test.Queue{
+               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+               },
+               Containers: []arvados.Container{
+                       {
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(2),
+                               Priority: 2,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(3),
+                               Priority: 3,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               UUID:     test.ContainerUUID(4),
+                               Priority: 4,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(1): 1,
+                       test.InstanceType(2): 2,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(1): 1,
+                       test.InstanceType(2): 2,
+               },
+               running:   map[string]time.Time{},
+               canCreate: 0,
+       }
+       New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(1)})
+       c.Check(pool.starts, check.DeepEquals, []string{test.ContainerUUID(4)})
+       c.Check(pool.running, check.HasLen, 1)
+       for uuid := range pool.running {
+               c.Check(uuid, check.Equals, uuids[4])
+       }
+}
+
+// If Create() fails, shutdown some nodes, and don't call Create()
+// again.  Don't call Create() at all if AtQuota() is true.
+func (*SchedulerSuite) TestShutdownAtQuota(c *check.C) {
+       for quota := 0; quota < 2; quota++ {
+               c.Logf("quota=%d", quota)
+               shouldCreate := []arvados.InstanceType{}
+               for i := 0; i < quota; i++ {
+                       shouldCreate = append(shouldCreate, test.InstanceType(3))
+               }
+               queue := test.Queue{
+                       ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                               return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+                       },
+                       Containers: []arvados.Container{
+                               {
+                                       UUID:     test.ContainerUUID(2),
+                                       Priority: 2,
+                                       State:    arvados.ContainerStateLocked,
+                                       RuntimeConstraints: arvados.RuntimeConstraints{
+                                               VCPUs: 2,
+                                               RAM:   2 << 30,
+                                       },
+                               },
+                               {
+                                       UUID:     test.ContainerUUID(3),
+                                       Priority: 3,
+                                       State:    arvados.ContainerStateLocked,
+                                       RuntimeConstraints: arvados.RuntimeConstraints{
+                                               VCPUs: 3,
+                                               RAM:   3 << 30,
+                                       },
+                               },
+                       },
+               }
+               queue.Update()
+               pool := stubPool{
+                       atQuota: quota == 0,
+                       unalloc: map[arvados.InstanceType]int{
+                               test.InstanceType(2): 2,
+                       },
+                       idle: map[arvados.InstanceType]int{
+                               test.InstanceType(2): 2,
+                       },
+                       running:   map[string]time.Time{},
+                       creates:   []arvados.InstanceType{},
+                       starts:    []string{},
+                       canCreate: 0,
+               }
+               New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+               c.Check(pool.creates, check.DeepEquals, shouldCreate)
+               c.Check(pool.starts, check.DeepEquals, []string{})
+               c.Check(pool.shutdowns, check.Not(check.Equals), 0)
+       }
+}
+
+// Start lower-priority containers while waiting for new/existing
+// workers to come up for higher-priority containers.
+func (*SchedulerSuite) TestStartWhileCreating(c *check.C) {
+       pool := stubPool{
+               unalloc: map[arvados.InstanceType]int{
+                       test.InstanceType(1): 2,
+                       test.InstanceType(2): 2,
+               },
+               idle: map[arvados.InstanceType]int{
+                       test.InstanceType(1): 1,
+                       test.InstanceType(2): 1,
+               },
+               running:   map[string]time.Time{},
+               canCreate: 4,
+       }
+       queue := test.Queue{
+               ChooseType: func(ctr *arvados.Container) (arvados.InstanceType, error) {
+                       return test.InstanceType(ctr.RuntimeConstraints.VCPUs), nil
+               },
+               Containers: []arvados.Container{
+                       {
+                               // create a new worker
+                               UUID:     test.ContainerUUID(1),
+                               Priority: 1,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               // tentatively map to unalloc worker
+                               UUID:     test.ContainerUUID(2),
+                               Priority: 2,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               // start now on idle worker
+                               UUID:     test.ContainerUUID(3),
+                               Priority: 3,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 1,
+                                       RAM:   1 << 30,
+                               },
+                       },
+                       {
+                               // create a new worker
+                               UUID:     test.ContainerUUID(4),
+                               Priority: 4,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 2,
+                                       RAM:   2 << 30,
+                               },
+                       },
+                       {
+                               // tentatively map to unalloc worker
+                               UUID:     test.ContainerUUID(5),
+                               Priority: 5,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 2,
+                                       RAM:   2 << 30,
+                               },
+                       },
+                       {
+                               // start now on idle worker
+                               UUID:     test.ContainerUUID(6),
+                               Priority: 6,
+                               State:    arvados.ContainerStateLocked,
+                               RuntimeConstraints: arvados.RuntimeConstraints{
+                                       VCPUs: 2,
+                                       RAM:   2 << 30,
+                               },
+                       },
+               },
+       }
+       queue.Update()
+       New(logger, &queue, &pool, time.Millisecond, time.Millisecond).runQueue()
+       c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{test.InstanceType(2), test.InstanceType(1)})
+       c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
+       running := map[string]bool{}
+       for uuid, t := range pool.running {
+               if t.IsZero() {
+                       running[uuid] = false
+               } else {
+                       running[uuid] = true
+               }
+       }
+       c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
+}
diff --git a/lib/dispatchcloud/scheduler/scheduler.go b/lib/dispatchcloud/scheduler/scheduler.go
new file mode 100644 (file)
index 0000000..3971a53
--- /dev/null
@@ -0,0 +1,116 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package scheduler uses a resizable worker pool to execute
+// containers in priority order.
+package scheduler
+
+import (
+       "sync"
+       "time"
+
+       "github.com/Sirupsen/logrus"
+)
+
+// A Scheduler maps queued containers onto unallocated workers in
+// priority order, creating new workers if needed. It locks containers
+// that can be mapped onto existing/pending workers, and starts them
+// if possible.
+//
+// A Scheduler unlocks any containers that are locked but can't be
+// mapped. (For example, this happens when the cloud provider reaches
+// quota/capacity and a previously mappable container's priority is
+// surpassed by a newer container.)
+//
+// If it encounters errors while creating new workers, a Scheduler
+// shuts down idle workers, in case they are consuming quota.
+type Scheduler struct {
+       logger              logrus.FieldLogger
+       queue               ContainerQueue
+       pool                WorkerPool
+       staleLockTimeout    time.Duration
+       queueUpdateInterval time.Duration
+
+       locking map[string]bool
+       mtx     sync.Mutex
+
+       runOnce sync.Once
+       stop    chan struct{}
+       stopped chan struct{}
+}
+
+// New returns a new unstarted Scheduler.
+//
+// Any given queue and pool should not be used by more than one
+// scheduler at a time.
+func New(logger logrus.FieldLogger, queue ContainerQueue, pool WorkerPool, staleLockTimeout, queueUpdateInterval time.Duration) *Scheduler {
+       return &Scheduler{
+               logger:              logger,
+               queue:               queue,
+               pool:                pool,
+               staleLockTimeout:    staleLockTimeout,
+               queueUpdateInterval: queueUpdateInterval,
+               stop:                make(chan struct{}),
+               stopped:             make(chan struct{}),
+               locking:             map[string]bool{},
+       }
+}
+
+// Start starts the scheduler.
+func (sch *Scheduler) Start() {
+       go sch.runOnce.Do(sch.run)
+}
+
+// Stop stops the scheduler. No other method should be called after
+// Stop.
+func (sch *Scheduler) Stop() {
+       close(sch.stop)
+       <-sch.stopped
+}
+
+func (sch *Scheduler) run() {
+       defer close(sch.stopped)
+
+       // Ensure the queue is fetched once before attempting anything.
+       for err := sch.queue.Update(); err != nil; err = sch.queue.Update() {
+               sch.logger.Errorf("error updating queue: %s", err)
+               d := sch.queueUpdateInterval / 60
+               sch.logger.Infof("waiting %s before retry", d)
+               time.Sleep(d)
+       }
+
+       // Keep the queue up to date.
+       poll := time.NewTicker(sch.queueUpdateInterval)
+       defer poll.Stop()
+       go func() {
+               for range poll.C {
+                       err := sch.queue.Update()
+                       if err != nil {
+                               sch.logger.Errorf("error updating queue: %s", err)
+                       }
+               }
+       }()
+
+       t0 := time.Now()
+       sch.logger.Infof("FixStaleLocks starting.")
+       sch.fixStaleLocks()
+       sch.logger.Infof("FixStaleLocks finished (%s), starting scheduling.", time.Since(t0))
+
+       poolNotify := sch.pool.Subscribe()
+       defer sch.pool.Unsubscribe(poolNotify)
+
+       queueNotify := sch.queue.Subscribe()
+       defer sch.queue.Unsubscribe(queueNotify)
+
+       for {
+               sch.runQueue()
+               sch.sync()
+               select {
+               case <-sch.stop:
+                       return
+               case <-queueNotify:
+               case <-poolNotify:
+               }
+       }
+}
diff --git a/lib/dispatchcloud/scheduler/sync.go b/lib/dispatchcloud/scheduler/sync.go
new file mode 100644 (file)
index 0000000..4c55b3c
--- /dev/null
@@ -0,0 +1,97 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+       "fmt"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+// sync resolves discrepancies between the queue and the pool:
+//
+// Lingering crunch-run processes for finalized and unlocked/requeued
+// containers are killed.
+//
+// Locked containers whose crunch-run processes have exited are
+// requeued.
+//
+// Running containers whose crunch-run processes have exited are
+// cancelled.
+func (sch *Scheduler) sync() {
+       running := sch.pool.Running()
+       cancel := func(ent container.QueueEnt, reason string) {
+               uuid := ent.Container.UUID
+               logger := sch.logger.WithField("ContainerUUID", uuid)
+               logger.Infof("cancelling container because %s", reason)
+               err := sch.queue.Cancel(uuid)
+               if err != nil {
+                       logger.WithError(err).Print("error cancelling container")
+               }
+       }
+       kill := func(ent container.QueueEnt, reason string) {
+               uuid := ent.Container.UUID
+               logger := sch.logger.WithField("ContainerUUID", uuid)
+               logger.Debugf("killing crunch-run process because %s", reason)
+               sch.pool.KillContainer(uuid)
+       }
+       qEntries, qUpdated := sch.queue.Entries()
+       for uuid, ent := range qEntries {
+               exited, running := running[uuid]
+               switch ent.Container.State {
+               case arvados.ContainerStateRunning:
+                       if !running {
+                               go cancel(ent, "not running on any worker")
+                       } else if !exited.IsZero() && qUpdated.After(exited) {
+                               go cancel(ent, "state=\"Running\" after crunch-run exited")
+                       } else if ent.Container.Priority == 0 {
+                               go kill(ent, fmt.Sprintf("priority=%d", ent.Container.Priority))
+                       }
+               case arvados.ContainerStateComplete, arvados.ContainerStateCancelled:
+                       if running {
+                               // Kill crunch-run in case it's stuck;
+                               // nothing it does now will matter
+                               // anyway. If crunch-run has already
+                               // exited and we just haven't found
+                               // out about it yet, the only effect
+                               // of kill() will be to make the
+                               // worker available for the next
+                               // container.
+                               go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                       } else {
+                               sch.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "State":         ent.Container.State,
+                               }).Info("container finished")
+                               sch.queue.Forget(uuid)
+                       }
+               case arvados.ContainerStateQueued:
+                       if running {
+                               // Can happen if a worker returns from
+                               // a network outage and is still
+                               // preparing to run a container that
+                               // has already been unlocked/requeued.
+                               go kill(ent, fmt.Sprintf("state=%q", ent.Container.State))
+                       }
+               case arvados.ContainerStateLocked:
+                       if running && !exited.IsZero() && qUpdated.After(exited) {
+                               logger := sch.logger.WithFields(logrus.Fields{
+                                       "ContainerUUID": uuid,
+                                       "Exited":        time.Since(exited).Seconds(),
+                               })
+                               logger.Infof("requeueing container because state=%q after crunch-run exited", ent.Container.State)
+                               err := sch.queue.Unlock(uuid)
+                               if err != nil {
+                                       logger.WithError(err).Info("error requeueing container")
+                               }
+                       }
+               default:
+                       sch.logger.WithField("ContainerUUID", uuid).Errorf("BUG: unexpected state %q", ent.Container.State)
+               }
+       }
+}
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
new file mode 100644 (file)
index 0000000..b5dba98
--- /dev/null
@@ -0,0 +1,190 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package ssh_executor provides an implementation of pool.Executor
+// using a long-lived multiplexed SSH session.
+package ssh_executor
+
+import (
+       "bytes"
+       "errors"
+       "io"
+       "net"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "golang.org/x/crypto/ssh"
+)
+
+// New returns a new Executor, using the given target.
+func New(t cloud.ExecutorTarget) *Executor {
+       return &Executor{target: t}
+}
+
+// An Executor uses a multiplexed SSH connection to execute shell
+// commands on a remote target. It reconnects automatically after
+// errors.
+//
+// When setting up a connection, the Executor accepts whatever host
+// key is provided by the remote server, then passes the received key
+// and the SSH connection to the target's VerifyHostKey method before
+// executing commands on the connection.
+//
+// A zero Executor must not be used before calling SetTarget.
+//
+// An Executor must not be copied.
+type Executor struct {
+       target  cloud.ExecutorTarget
+       signers []ssh.Signer
+       mtx     sync.RWMutex // controls access to instance after creation
+
+       client      *ssh.Client
+       clientErr   error
+       clientOnce  sync.Once     // initialized private state
+       clientSetup chan bool     // len>0 while client setup is in progress
+       hostKey     ssh.PublicKey // most recent host key that passed verification, if any
+}
+
+// SetSigners updates the set of private keys that will be offered to
+// the target next time the Executor sets up a new connection.
+func (exr *Executor) SetSigners(signers ...ssh.Signer) {
+       exr.mtx.Lock()
+       defer exr.mtx.Unlock()
+       exr.signers = signers
+}
+
+// SetTarget sets the current target. The new target will be used next
+// time a new connection is set up; until then, the Executor will
+// continue to use the existing target.
+//
+// The new target is assumed to represent the same host as the
+// previous target, although its address and host key might differ.
+func (exr *Executor) SetTarget(t cloud.ExecutorTarget) {
+       exr.mtx.Lock()
+       defer exr.mtx.Unlock()
+       exr.target = t
+}
+
+// Target returns the current target.
+func (exr *Executor) Target() cloud.ExecutorTarget {
+       exr.mtx.RLock()
+       defer exr.mtx.RUnlock()
+       return exr.target
+}
+
+// Execute runs cmd on the target. If an existing connection is not
+// usable, it sets up a new connection to the current target.
+func (exr *Executor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+       session, err := exr.newSession()
+       if err != nil {
+               return nil, nil, err
+       }
+       defer session.Close()
+       var stdout, stderr bytes.Buffer
+       session.Stdin = stdin
+       session.Stdout = &stdout
+       session.Stderr = &stderr
+       err = session.Run(cmd)
+       return stdout.Bytes(), stderr.Bytes(), err
+}
+
+// Close shuts down any active connections.
+func (exr *Executor) Close() {
+       // Ensure exr is initialized
+       exr.sshClient(false)
+
+       exr.clientSetup <- true
+       if exr.client != nil {
+               defer exr.client.Close()
+       }
+       exr.client, exr.clientErr = nil, errors.New("closed")
+       <-exr.clientSetup
+}
+
+// Create a new SSH session. If session setup fails or the SSH client
+// hasn't been setup yet, setup a new SSH client and try again.
+func (exr *Executor) newSession() (*ssh.Session, error) {
+       try := func(create bool) (*ssh.Session, error) {
+               client, err := exr.sshClient(create)
+               if err != nil {
+                       return nil, err
+               }
+               return client.NewSession()
+       }
+       session, err := try(false)
+       if err != nil {
+               session, err = try(true)
+       }
+       return session, err
+}
+
+// Get the latest SSH client. If another goroutine is in the process
+// of setting one up, wait for it to finish and return its result (or
+// the last successfully setup client, if it fails).
+func (exr *Executor) sshClient(create bool) (*ssh.Client, error) {
+       exr.clientOnce.Do(func() {
+               exr.clientSetup = make(chan bool, 1)
+               exr.clientErr = errors.New("client not yet created")
+       })
+       defer func() { <-exr.clientSetup }()
+       select {
+       case exr.clientSetup <- true:
+               if create {
+                       client, err := exr.setupSSHClient()
+                       if err == nil || exr.client == nil {
+                               if exr.client != nil {
+                                       // Hang up the previous
+                                       // (non-working) client
+                                       go exr.client.Close()
+                               }
+                               exr.client, exr.clientErr = client, err
+                       }
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       default:
+               // Another goroutine is doing the above case.  Wait
+               // for it to finish and return whatever it leaves in
+               // wkr.client.
+               exr.clientSetup <- true
+       }
+       return exr.client, exr.clientErr
+}
+
+// Create a new SSH client.
+func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
+       target := exr.Target()
+       addr := target.Address()
+       if addr == "" {
+               return nil, errors.New("instance has no address")
+       }
+       var receivedKey ssh.PublicKey
+       client, err := ssh.Dial("tcp", addr, &ssh.ClientConfig{
+               User: "root",
+               Auth: []ssh.AuthMethod{
+                       ssh.PublicKeys(exr.signers...),
+               },
+               HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
+                       receivedKey = key
+                       return nil
+               },
+               Timeout: time.Minute,
+       })
+       if err != nil {
+               return nil, err
+       } else if receivedKey == nil {
+               return nil, errors.New("BUG: key was never provided to HostKeyCallback")
+       }
+
+       if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
+               err = target.VerifyHostKey(receivedKey, client)
+               if err != nil {
+                       return nil, err
+               }
+               exr.hostKey = receivedKey
+       }
+       return client, nil
+}
diff --git a/lib/dispatchcloud/ssh_executor/executor_test.go b/lib/dispatchcloud/ssh_executor/executor_test.go
new file mode 100644 (file)
index 0000000..8dabfec
--- /dev/null
@@ -0,0 +1,102 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package ssh_executor
+
+import (
+       "bytes"
+       "io"
+       "io/ioutil"
+       "sync"
+       "testing"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&ExecutorSuite{})
+
+type testTarget struct {
+       test.SSHService
+}
+
+func (*testTarget) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+       return nil
+}
+
+type ExecutorSuite struct{}
+
+func (s *ExecutorSuite) TestExecute(c *check.C) {
+       command := `foo 'bar' "baz"`
+       stdinData := "foobar\nbaz\n"
+       _, hostpriv := test.LoadTestKey(c, "../test/sshkey_vm")
+       clientpub, clientpriv := test.LoadTestKey(c, "../test/sshkey_dispatch")
+       for _, exitcode := range []int{0, 1, 2} {
+               srv := &testTarget{
+                       SSHService: test.SSHService{
+                               Exec: func(cmd string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+                                       c.Check(cmd, check.Equals, command)
+                                       var wg sync.WaitGroup
+                                       wg.Add(2)
+                                       go func() {
+                                               io.WriteString(stdout, "stdout\n")
+                                               wg.Done()
+                                       }()
+                                       go func() {
+                                               io.WriteString(stderr, "stderr\n")
+                                               wg.Done()
+                                       }()
+                                       buf, err := ioutil.ReadAll(stdin)
+                                       wg.Wait()
+                                       c.Check(err, check.IsNil)
+                                       if err != nil {
+                                               return 99
+                                       }
+                                       _, err = stdout.Write(buf)
+                                       c.Check(err, check.IsNil)
+                                       return uint32(exitcode)
+                               },
+                               HostKey:        hostpriv,
+                               AuthorizedKeys: []ssh.PublicKey{clientpub},
+                       },
+               }
+               err := srv.Start()
+               c.Check(err, check.IsNil)
+               c.Logf("srv address %q", srv.Address())
+               defer srv.Close()
+
+               exr := New(srv)
+               exr.SetSigners(clientpriv)
+
+               done := make(chan bool)
+               go func() {
+                       stdout, stderr, err := exr.Execute(command, bytes.NewBufferString(stdinData))
+                       if exitcode == 0 {
+                               c.Check(err, check.IsNil)
+                       } else {
+                               c.Check(err, check.NotNil)
+                               err, ok := err.(*ssh.ExitError)
+                               c.Assert(ok, check.Equals, true)
+                               c.Check(err.ExitStatus(), check.Equals, exitcode)
+                       }
+                       c.Check(stdout, check.DeepEquals, []byte("stdout\n"+stdinData))
+                       c.Check(stderr, check.DeepEquals, []byte("stderr\n"))
+                       close(done)
+               }()
+
+               timeout := time.NewTimer(time.Second)
+               select {
+               case <-done:
+               case <-timeout.C:
+                       c.Fatal("timed out")
+               }
+       }
+}
diff --git a/lib/dispatchcloud/test/doc.go b/lib/dispatchcloud/test/doc.go
new file mode 100644 (file)
index 0000000..12f3b16
--- /dev/null
@@ -0,0 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+// Package test provides fakes and other tools for testing cloud
+// drivers and other dispatcher modules.
+package test
diff --git a/lib/dispatchcloud/test/fixtures.go b/lib/dispatchcloud/test/fixtures.go
new file mode 100644 (file)
index 0000000..68bdb3d
--- /dev/null
@@ -0,0 +1,28 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// ContainerUUID returns a fake container UUID.
+func ContainerUUID(i int) string {
+       return fmt.Sprintf("zzzzz-dz642-%015d", i)
+}
+
+// InstanceType returns a fake arvados.InstanceType called "type{i}"
+// with i CPUs and i GiB of memory.
+func InstanceType(i int) arvados.InstanceType {
+       return arvados.InstanceType{
+               Name:         fmt.Sprintf("type%d", i),
+               ProviderType: fmt.Sprintf("providertype%d", i),
+               VCPUs:        i,
+               RAM:          arvados.ByteSize(i) << 30,
+               Price:        float64(i) * 0.123,
+       }
+}
diff --git a/lib/dispatchcloud/test/lame_instance_set.go b/lib/dispatchcloud/test/lame_instance_set.go
new file mode 100644 (file)
index 0000000..baab407
--- /dev/null
@@ -0,0 +1,118 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+       "math/rand"
+       "sync"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "golang.org/x/crypto/ssh"
+)
+
+// LameInstanceSet creates instances that boot but can't run
+// containers.
+type LameInstanceSet struct {
+       Hold chan bool // set to make(chan bool) to hold operations until Release is called
+
+       mtx       sync.Mutex
+       instances map[*lameInstance]bool
+}
+
+// Create returns a new instance.
+func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
+       inst := &lameInstance{
+               p:            p,
+               id:           cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
+               providerType: instType.ProviderType,
+       }
+       inst.SetTags(tags)
+       if p.Hold != nil {
+               p.Hold <- true
+       }
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       if p.instances == nil {
+               p.instances = map[*lameInstance]bool{}
+       }
+       p.instances[inst] = true
+       return inst, nil
+}
+
+// Instances returns the instances that haven't been destroyed.
+func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       var instances []cloud.Instance
+       for i := range p.instances {
+               instances = append(instances, i)
+       }
+       return instances, nil
+}
+
+// Stop is a no-op, but exists to satisfy cloud.InstanceSet.
+func (p *LameInstanceSet) Stop() {
+}
+
+// Release n held calls. Blocks if n calls aren't already
+// waiting. Blocks forever if Hold is nil.
+func (p *LameInstanceSet) Release(n int) {
+       for i := 0; i < n; i++ {
+               <-p.Hold
+       }
+}
+
+type lameInstance struct {
+       p            *LameInstanceSet
+       id           cloud.InstanceID
+       providerType string
+       tags         cloud.InstanceTags
+}
+
+func (inst *lameInstance) ID() cloud.InstanceID {
+       return inst.id
+}
+
+func (inst *lameInstance) String() string {
+       return fmt.Sprint(inst.id)
+}
+
+func (inst *lameInstance) ProviderType() string {
+       return inst.providerType
+}
+
+func (inst *lameInstance) Address() string {
+       return "0.0.0.0:1234"
+}
+
+func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
+       inst.p.mtx.Lock()
+       defer inst.p.mtx.Unlock()
+       inst.tags = cloud.InstanceTags{}
+       for k, v := range tags {
+               inst.tags[k] = v
+       }
+       return nil
+}
+
+func (inst *lameInstance) Destroy() error {
+       if inst.p.Hold != nil {
+               inst.p.Hold <- true
+       }
+       inst.p.mtx.Lock()
+       defer inst.p.mtx.Unlock()
+       delete(inst.p.instances, inst)
+       return nil
+}
+
+func (inst *lameInstance) Tags() cloud.InstanceTags {
+       return inst.tags
+}
+
+func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
+       return nil
+}
diff --git a/lib/dispatchcloud/test/queue.go b/lib/dispatchcloud/test/queue.go
new file mode 100644 (file)
index 0000000..e18a2b5
--- /dev/null
@@ -0,0 +1,171 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "fmt"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// Queue is a test stub for container.Queue. The caller specifies the
+// initial queue state.
+type Queue struct {
+       // Containers represent the API server database contents.
+       Containers []arvados.Container
+
+       // ChooseType will be called for each entry in Containers. It
+       // must not be nil.
+       ChooseType func(*arvados.Container) (arvados.InstanceType, error)
+
+       entries     map[string]container.QueueEnt
+       updTime     time.Time
+       subscribers map[<-chan struct{}]chan struct{}
+
+       mtx sync.Mutex
+}
+
+// Entries returns the containers that were queued when Update was
+// last called.
+func (q *Queue) Entries() (map[string]container.QueueEnt, time.Time) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       updTime := q.updTime
+       r := map[string]container.QueueEnt{}
+       for uuid, ent := range q.entries {
+               r[uuid] = ent
+       }
+       return r, updTime
+}
+
+// Get returns the container from the cached queue, i.e., as it was
+// when Update was last called -- just like a container.Queue does. If
+// the state has been changed (via Lock, Unlock, or Cancel) since the
+// last Update, the updated state is returned.
+func (q *Queue) Get(uuid string) (arvados.Container, bool) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       ent, ok := q.entries[uuid]
+       return ent.Container, ok
+}
+
+func (q *Queue) Forget(uuid string) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       delete(q.entries, uuid)
+}
+
+func (q *Queue) Lock(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       return q.changeState(uuid, arvados.ContainerStateQueued, arvados.ContainerStateLocked)
+}
+
+func (q *Queue) Unlock(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       return q.changeState(uuid, arvados.ContainerStateLocked, arvados.ContainerStateQueued)
+}
+
+func (q *Queue) Cancel(uuid string) error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       return q.changeState(uuid, q.entries[uuid].Container.State, arvados.ContainerStateCancelled)
+}
+
+func (q *Queue) Subscribe() <-chan struct{} {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       if q.subscribers == nil {
+               q.subscribers = map[<-chan struct{}]chan struct{}{}
+       }
+       ch := make(chan struct{}, 1)
+       q.subscribers[ch] = ch
+       return ch
+}
+
+func (q *Queue) Unsubscribe(ch <-chan struct{}) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       delete(q.subscribers, ch)
+}
+
+// caller must have lock.
+func (q *Queue) notify() {
+       for _, ch := range q.subscribers {
+               select {
+               case ch <- struct{}{}:
+               default:
+               }
+       }
+}
+
+// caller must have lock.
+func (q *Queue) changeState(uuid string, from, to arvados.ContainerState) error {
+       ent := q.entries[uuid]
+       if ent.Container.State != from {
+               return fmt.Errorf("changeState failed: state=%q", ent.Container.State)
+       }
+       ent.Container.State = to
+       q.entries[uuid] = ent
+       for i, ctr := range q.Containers {
+               if ctr.UUID == uuid {
+                       q.Containers[i].State = to
+                       break
+               }
+       }
+       q.notify()
+       return nil
+}
+
+// Update rebuilds the current entries from the Containers slice.
+func (q *Queue) Update() error {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       updTime := time.Now()
+       upd := map[string]container.QueueEnt{}
+       for _, ctr := range q.Containers {
+               _, exists := q.entries[ctr.UUID]
+               if !exists && (ctr.State == arvados.ContainerStateComplete || ctr.State == arvados.ContainerStateCancelled) {
+                       continue
+               }
+               if ent, ok := upd[ctr.UUID]; ok {
+                       ent.Container = ctr
+                       upd[ctr.UUID] = ent
+               } else {
+                       it, _ := q.ChooseType(&ctr)
+                       upd[ctr.UUID] = container.QueueEnt{
+                               Container:    ctr,
+                               InstanceType: it,
+                       }
+               }
+       }
+       q.entries = upd
+       q.updTime = updTime
+       q.notify()
+       return nil
+}
+
+// Notify adds/updates an entry in the Containers slice.  This
+// simulates the effect of an API update from someone other than the
+// dispatcher -- e.g., crunch-run updating state to "Complete" when a
+// container exits.
+//
+// The resulting changes are not exposed through Get() or Entries()
+// until the next call to Update().
+func (q *Queue) Notify(upd arvados.Container) {
+       q.mtx.Lock()
+       defer q.mtx.Unlock()
+       for i, ctr := range q.Containers {
+               if ctr.UUID == upd.UUID {
+                       q.Containers[i] = upd
+                       return
+               }
+       }
+       q.Containers = append(q.Containers, upd)
+}
diff --git a/lib/dispatchcloud/test/ssh_service.go b/lib/dispatchcloud/test/ssh_service.go
new file mode 100644 (file)
index 0000000..b1e4e03
--- /dev/null
@@ -0,0 +1,169 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "bytes"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "net"
+       "strings"
+       "sync"
+
+       "golang.org/x/crypto/ssh"
+       check "gopkg.in/check.v1"
+)
+
+func LoadTestKey(c *check.C, fnm string) (ssh.PublicKey, ssh.Signer) {
+       rawpubkey, err := ioutil.ReadFile(fnm + ".pub")
+       c.Assert(err, check.IsNil)
+       pubkey, _, _, _, err := ssh.ParseAuthorizedKey(rawpubkey)
+       c.Assert(err, check.IsNil)
+       rawprivkey, err := ioutil.ReadFile(fnm)
+       c.Assert(err, check.IsNil)
+       privkey, err := ssh.ParsePrivateKey(rawprivkey)
+       c.Assert(err, check.IsNil)
+       return pubkey, privkey
+}
+
+// An SSHExecFunc handles an "exec" session on a multiplexed SSH
+// connection.
+type SSHExecFunc func(command string, stdin io.Reader, stdout, stderr io.Writer) uint32
+
+// An SSHService accepts SSH connections on an available TCP port and
+// passes clients' "exec" sessions to the provided SSHExecFunc.
+type SSHService struct {
+       Exec           SSHExecFunc
+       HostKey        ssh.Signer
+       AuthorizedKeys []ssh.PublicKey
+
+       listener net.Listener
+       conn     *ssh.ServerConn
+       setup    sync.Once
+       mtx      sync.Mutex
+       started  chan bool
+       closed   bool
+       err      error
+}
+
+// Address returns the host:port where the SSH server is listening. It
+// returns "" if called before the server is ready to accept
+// connections.
+func (ss *SSHService) Address() string {
+       ss.setup.Do(ss.start)
+       ss.mtx.Lock()
+       ln := ss.listener
+       ss.mtx.Unlock()
+       if ln == nil {
+               return ""
+       }
+       return ln.Addr().String()
+}
+
+// Close shuts down the server and releases resources. Established
+// connections are unaffected.
+func (ss *SSHService) Close() {
+       ss.Start()
+       ss.mtx.Lock()
+       ln := ss.listener
+       ss.closed = true
+       ss.mtx.Unlock()
+       if ln != nil {
+               ln.Close()
+       }
+}
+
+// Start returns when the server is ready to accept connections.
+func (ss *SSHService) Start() error {
+       ss.setup.Do(ss.start)
+       <-ss.started
+       return ss.err
+}
+
+func (ss *SSHService) start() {
+       ss.started = make(chan bool)
+       go ss.run()
+}
+
+func (ss *SSHService) run() {
+       defer close(ss.started)
+       config := &ssh.ServerConfig{
+               PublicKeyCallback: func(c ssh.ConnMetadata, pubKey ssh.PublicKey) (*ssh.Permissions, error) {
+                       for _, ak := range ss.AuthorizedKeys {
+                               if bytes.Equal(ak.Marshal(), pubKey.Marshal()) {
+                                       return &ssh.Permissions{}, nil
+                               }
+                       }
+                       return nil, fmt.Errorf("unknown public key for %q", c.User())
+               },
+       }
+       config.AddHostKey(ss.HostKey)
+
+       listener, err := net.Listen("tcp", ":")
+       if err != nil {
+               ss.err = err
+               return
+       }
+
+       ss.mtx.Lock()
+       ss.listener = listener
+       ss.mtx.Unlock()
+
+       go func() {
+               for {
+                       nConn, err := listener.Accept()
+                       if err != nil && strings.Contains(err.Error(), "use of closed network connection") && ss.closed {
+                               return
+                       } else if err != nil {
+                               log.Printf("accept: %s", err)
+                               return
+                       }
+                       go ss.serveConn(nConn, config)
+               }
+       }()
+}
+
+func (ss *SSHService) serveConn(nConn net.Conn, config *ssh.ServerConfig) {
+       defer nConn.Close()
+       conn, newchans, reqs, err := ssh.NewServerConn(nConn, config)
+       if err != nil {
+               log.Printf("ssh.NewServerConn: %s", err)
+               return
+       }
+       defer conn.Close()
+       go ssh.DiscardRequests(reqs)
+       for newch := range newchans {
+               if newch.ChannelType() != "session" {
+                       newch.Reject(ssh.UnknownChannelType, "unknown channel type")
+                       continue
+               }
+               ch, reqs, err := newch.Accept()
+               if err != nil {
+                       log.Printf("accept channel: %s", err)
+                       return
+               }
+               var execReq struct {
+                       Command string
+               }
+               go func() {
+                       for req := range reqs {
+                               if req.Type == "exec" && execReq.Command == "" {
+                                       req.Reply(true, nil)
+                                       ssh.Unmarshal(req.Payload, &execReq)
+                                       go func() {
+                                               var resp struct {
+                                                       Status uint32
+                                               }
+                                               resp.Status = ss.Exec(execReq.Command, ch, ch, ch.Stderr())
+                                               ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
+                                               ch.Close()
+                                       }()
+                               }
+                       }
+               }()
+       }
+}
diff --git a/lib/dispatchcloud/test/sshkey_dispatch b/lib/dispatchcloud/test/sshkey_dispatch
new file mode 100644 (file)
index 0000000..5584519
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEowIBAAKCAQEAqYm4XsQHm8sBSZFwUX5VeW1OkGsfoNzcGPG2nzzYRhNhClYZ
+0ABHhUk82HkaC/8l6d/jpYTf42HrK42nNQ0r0Yzs7qw8yZMQioK4Yk+kFyVLF78E
+GRG4pGAWXFs6pUchs/lm8fo9zcda4R3XeqgI+NO+nEERXmdRJa1FhI+Za3/S/+CV
+mg+6O00wZz2+vKmDPptGN4MCKmQOCKsMJts7wSZGyVcTtdNv7jjfr6yPAIOIL8X7
+LtarBCFaK/pD7uWll/Uj7h7D8K48nIZUrvBJJjXL8Sm4LxCNoz3Z83k8J5ZzuDRD
+gRiQe/C085mhO6VL+2fypDLwcKt1tOL8fI81MwIDAQABAoIBACR3tEnmHsDbNOav
+Oxq8cwRQh9K2yDHg8BMJgz/TZa4FIx2HEbxVIw0/iLADtJ+Z/XzGJQCIiWQuvtg6
+exoFQESt7JUWRWkSkj9JCQJUoTY9Vl7APtBpqG7rIEQzd3TvzQcagZNRQZQO6rR7
+p8sBdBSZ72lK8cJ9tM3G7Kor/VNK7KgRZFNhEWnmvEa3qMd4hzDcQ4faOn7C9NZK
+dwJAuJVVfwOLlOORYcyEkvksLaDOK2DsB/p0AaCpfSmThRbBKN5fPXYaKgUdfp3w
+70Hpp27WWymb1cgjyqSH3DY+V/kvid+5QxgxCBRq865jPLn3FFT9bWEVS/0wvJRj
+iMIRrjECgYEA4Ffv9rBJXqVXonNQbbstd2PaprJDXMUy9/UmfHL6pkq1xdBeuM7v
+yf2ocXheA8AahHtIOhtgKqwv/aRhVK0ErYtiSvIk+tXG+dAtj/1ZAKbKiFyxjkZV
+X72BH7cTlR6As5SRRfWM/HaBGEgED391gKsI5PyMdqWWdczT5KfxAksCgYEAwXYE
+ewPmV1GaR5fbh2RupoPnUJPMj36gJCnwls7sGaXDQIpdlq56zfKgrLocGXGgj+8f
+QH7FHTJQO15YCYebtsXWwB3++iG43gVlJlecPAydsap2CCshqNWC5JU5pan0QzsP
+exzNzWqfUPSbTkR2SRaN+MenZo2Y/WqScOAth7kCgYBgVoLujW9EXH5QfXJpXLq+
+jTvE38I7oVcs0bJwOLPYGzcJtlwmwn6IYAwohgbhV2pLv+EZSs42JPEK278MLKxY
+lgVkp60npgunFTWroqDIvdc1TZDVxvA8h9VeODEJlSqxczgbMcIUXBM9yRctTI+5
+7DiKlMUA4kTFW2sWwuOlFwKBgGXvrYS0FVbFJKm8lmvMu5D5x5RpjEu/yNnFT4Pn
+G/iXoz4Kqi2PWh3STl804UF24cd1k94D7hDoReZCW9kJnz67F+C67XMW+bXi2d1O
+JIBvlVfcHb1IHMA9YG7ZQjrMRmx2Xj3ce4RVPgUGHh8ra7gvLjd72/Tpf0doNClN
+ti/hAoGBAMW5D3LhU05LXWmOqpeT4VDgqk4MrTBcstVe7KdVjwzHrVHCAmI927vI
+pjpphWzpC9m3x4OsTNf8m+g6H7f3IiQS0aiFNtduXYlcuT5FHS2fSATTzg5PBon9
+1E6BudOve+WyFyBs7hFWAqWFBdWujAl4Qk5Ek09U2ilFEPE7RTgJ
+-----END RSA PRIVATE KEY-----
diff --git a/lib/dispatchcloud/test/sshkey_dispatch.pub b/lib/dispatchcloud/test/sshkey_dispatch.pub
new file mode 100644 (file)
index 0000000..1d5c1ea
--- /dev/null
@@ -0,0 +1 @@
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCpibhexAebywFJkXBRflV5bU6Qax+g3NwY8bafPNhGE2EKVhnQAEeFSTzYeRoL/yXp3+OlhN/jYesrjac1DSvRjOzurDzJkxCKgrhiT6QXJUsXvwQZEbikYBZcWzqlRyGz+Wbx+j3Nx1rhHdd6qAj4076cQRFeZ1ElrUWEj5lrf9L/4JWaD7o7TTBnPb68qYM+m0Y3gwIqZA4Iqwwm2zvBJkbJVxO102/uON+vrI8Ag4gvxfsu1qsEIVor+kPu5aWX9SPuHsPwrjychlSu8EkmNcvxKbgvEI2jPdnzeTwnlnO4NEOBGJB78LTzmaE7pUv7Z/KkMvBwq3W04vx8jzUz tom@curve
diff --git a/lib/dispatchcloud/test/sshkey_vm b/lib/dispatchcloud/test/sshkey_vm
new file mode 100644 (file)
index 0000000..10b7ed1
--- /dev/null
@@ -0,0 +1,27 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpQIBAAKCAQEApIfWk2StZGDtmunumIeXLJ46AQrbHHvuxrSAkQf6+zUwjB2I
+rse7ezBRHWcge9U5EsigixmhUM4ozFLnUQNwC862jbmsjbyA97arG/REECNlUrEB
+HQPYHhai5yyJ89AfjWVxKyINfW0K2HX1R8nl4kdVraAgpohPLh0dGjfwzm/BcXDG
++TxW9zRz0KCs9ZRI6s2MNdv08ahKQ0azk8gRTqMADJmYNWIo3zPQ+fhlwyr6EZJ/
+HFbRtjpajEPMJPwoVPO+Wj6wztfHDYKkPIrIWbhMl6w+tEKdsmygd3Iq94ktLS3X
+AbRCfn4njS2QSlkKFEepkUJWCSSWZgFn6DLm2wIDAQABAoIBAQCb137LxcTnG1h0
+L7isCWKMBKN0cU/xvwIAfOB6f1CfuVXuodrhkpZmrPFoJFKEeQbCX/6RQwmlfGDw
+iGZKOjNbO8V2oLRs3GxcNk4FAG2ny58hoD8puIZwmYhb57gTlMMOL1PuQyb78tkf
+Bzv5b6ermV3yQ4Ypt1solrMGLo6NOZD0oDX9p0Zt9kueIhjzgP0v5//T1F4PGHZK
++sLSsMiu9u6F+PB+Oc6uv0Zee9Lnts/QiWH5f18oEculjwKWFx+JwJWiLffGg2Bl
+vbpmvHFRoRWkHTpgSiLwSUqs0ZUWU9R5h11ROg5L39MLsxQoBvHsPEnP5ssN8jGt
+aH86EZjBAoGBAM+A5B/UjhIn9m05EhDTDRzI92hGhM8f7uAwobbnjvIQyZbWlBwj
+2TmgbJdpTGVbD+iTBIwKQdcFBbWobTCZsNMpghqA/ir4YIAnZ5OX9VQ1Bc+bWE7V
+dPmMVpCgyg+ERAe+79FrYWcI3vhnBpHCsY/9p9pGQIKDzlGTWNF1HJGjAoGBAMr7
+2CTVnFImTgD3E+rH4AAAfkz+cyqfK6BUhli/NifFYZhWCs16r9QCGSORnp4gPhMY
+3mf7VBs9rk123zOMo89eJt3adTgbZ+QIxXeXilGXpbT3w1+CJMaZRrIy80E1tB5/
+KvDZcrZ78o8XWMNUa+9k55ukvgyC24ICAmOIWNlpAoGBALEFvphBF2r52MtZUsYz
+pw4VjKvS7V5eWcW891k4tsRf+frK2NQg6SK2b63EUT5ur2W0dr6ZyY2MZVCSfYRm
+uWmMEchWn389IeZyt3Q8wTize1+foXivtflm9jqwUXFnXzpUc/du6kuiT8YO7pXP
+SPgUZ+xY3pP5qjwBvlYC2PqNAoGAZ1CKMi1bdGC0wT8BLzXuqHGX136HhcEgRmnf
+O5qPaOzJAO2CcBWrGuC6hOUgc+F7VuMIiKpeo8LgTeNcNfO2iNymMbN4iEdCuMlS
+IM3MBD2IhTS6h4lJSKBJYHgYYi+AbylQ5Of4wDMUQYqjjkAQ8/dK/2h5pwqPyXtW
+VezXNEkCgYEAq4S0++y9tjlLn+w9BIkmx3bAVRDQZIzIEwxTh+jpqaUp1J0iyseJ
+71pwqQojGNF6x8GglVXa6bMrETae21WhEeHnWmzlpCWIODsYPUQ+erjDuAWi9eGk
+HLklqSEoLB8pzC6zDqjxDw+CnGERIDSaoaeoWiNKZ95IH1WiEwYjuxU=
+-----END RSA PRIVATE KEY-----
diff --git a/lib/dispatchcloud/test/sshkey_vm.pub b/lib/dispatchcloud/test/sshkey_vm.pub
new file mode 100644 (file)
index 0000000..b9d44c9
--- /dev/null
@@ -0,0 +1 @@
+ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQCkh9aTZK1kYO2a6e6Yh5csnjoBCtsce+7GtICRB/r7NTCMHYiux7t7MFEdZyB71TkSyKCLGaFQzijMUudRA3ALzraNuayNvID3tqsb9EQQI2VSsQEdA9geFqLnLInz0B+NZXErIg19bQrYdfVHyeXiR1WtoCCmiE8uHR0aN/DOb8FxcMb5PFb3NHPQoKz1lEjqzYw12/TxqEpDRrOTyBFOowAMmZg1YijfM9D5+GXDKvoRkn8cVtG2OlqMQ8wk/ChU875aPrDO18cNgqQ8ishZuEyXrD60Qp2ybKB3cir3iS0tLdcBtEJ+fieNLZBKWQoUR6mRQlYJJJZmAWfoMubb tom@curve
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
new file mode 100644 (file)
index 0000000..8bdfaa9
--- /dev/null
@@ -0,0 +1,318 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package test
+
+import (
+       "crypto/rand"
+       "errors"
+       "fmt"
+       "io"
+       math_rand "math/rand"
+       "regexp"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "github.com/mitchellh/mapstructure"
+       "golang.org/x/crypto/ssh"
+)
+
+// A StubDriver implements cloud.Driver by setting up local SSH
+// servers that do fake command executions.
+type StubDriver struct {
+       HostKey        ssh.Signer
+       AuthorizedKeys []ssh.PublicKey
+
+       // SetupVM, if set, is called upon creation of each new
+       // StubVM. This is the caller's opportunity to customize the
+       // VM's error rate and other behaviors.
+       SetupVM func(*StubVM)
+
+       // StubVM's fake crunch-run uses this Queue to read and update
+       // container state.
+       Queue *Queue
+
+       // Frequency of artificially introduced errors on calls to
+       // Destroy. 0=always succeed, 1=always fail.
+       ErrorRateDestroy float64
+
+       instanceSets []*StubInstanceSet
+}
+
+// InstanceSet returns a new *StubInstanceSet.
+func (sd *StubDriver) InstanceSet(params map[string]interface{}, id cloud.InstanceSetID) (cloud.InstanceSet, error) {
+       sis := StubInstanceSet{
+               driver:  sd,
+               servers: map[cloud.InstanceID]*StubVM{},
+       }
+       sd.instanceSets = append(sd.instanceSets, &sis)
+       return &sis, mapstructure.Decode(params, &sis)
+}
+
+// InstanceSets returns all instances that have been created by the
+// driver. This can be used to test a component that uses the driver
+// but doesn't expose the InstanceSets it has created.
+func (sd *StubDriver) InstanceSets() []*StubInstanceSet {
+       return sd.instanceSets
+}
+
+type StubInstanceSet struct {
+       driver  *StubDriver
+       servers map[cloud.InstanceID]*StubVM
+       mtx     sync.RWMutex
+       stopped bool
+}
+
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       if sis.stopped {
+               return nil, errors.New("StubInstanceSet: Create called after Stop")
+       }
+       ak := sis.driver.AuthorizedKeys
+       if authKey != nil {
+               ak = append([]ssh.PublicKey{authKey}, ak...)
+       }
+       svm := &StubVM{
+               sis:          sis,
+               id:           cloud.InstanceID(fmt.Sprintf("stub-%s-%x", it.ProviderType, math_rand.Int63())),
+               tags:         copyTags(tags),
+               providerType: it.ProviderType,
+       }
+       svm.SSHService = SSHService{
+               HostKey:        sis.driver.HostKey,
+               AuthorizedKeys: ak,
+               Exec:           svm.Exec,
+       }
+       if setup := sis.driver.SetupVM; setup != nil {
+               setup(svm)
+       }
+       sis.servers[svm.id] = svm
+       return svm.Instance(), nil
+}
+
+func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
+       sis.mtx.RLock()
+       defer sis.mtx.RUnlock()
+       var r []cloud.Instance
+       for _, ss := range sis.servers {
+               r = append(r, ss.Instance())
+       }
+       return r, nil
+}
+
+func (sis *StubInstanceSet) Stop() {
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       if sis.stopped {
+               panic("Stop called twice")
+       }
+       sis.stopped = true
+}
+
+// StubVM is a fake server that runs an SSH service. It represents a
+// VM running in a fake cloud.
+//
+// Note this is distinct from a stubInstance, which is a snapshot of
+// the VM's metadata. Like a VM in a real cloud, a StubVM keeps
+// running (and might change IP addresses, shut down, etc.)  without
+// updating any stubInstances that have been returned to callers.
+type StubVM struct {
+       Boot                 time.Time
+       Broken               time.Time
+       CrunchRunMissing     bool
+       CrunchRunCrashRate   float64
+       CrunchRunDetachDelay time.Duration
+       ExecuteContainer     func(arvados.Container) int
+
+       sis          *StubInstanceSet
+       id           cloud.InstanceID
+       tags         cloud.InstanceTags
+       providerType string
+       SSHService   SSHService
+       running      map[string]bool
+       sync.Mutex
+}
+
+func (svm *StubVM) Instance() stubInstance {
+       svm.Lock()
+       defer svm.Unlock()
+       return stubInstance{
+               svm:  svm,
+               addr: svm.SSHService.Address(),
+               // We deliberately return a cached/stale copy of the
+               // real tags here, so that (Instance)Tags() sometimes
+               // returns old data after a call to
+               // (Instance)SetTags().  This is permitted by the
+               // driver interface, and this might help remind
+               // callers that they need to tolerate it.
+               tags: copyTags(svm.tags),
+       }
+}
+
+func (svm *StubVM) Exec(command string, stdin io.Reader, stdout, stderr io.Writer) uint32 {
+       queue := svm.sis.driver.Queue
+       uuid := regexp.MustCompile(`.{5}-dz642-.{15}`).FindString(command)
+       if eta := svm.Boot.Sub(time.Now()); eta > 0 {
+               fmt.Fprintf(stderr, "stub is booting, ETA %s\n", eta)
+               return 1
+       }
+       if !svm.Broken.IsZero() && svm.Broken.Before(time.Now()) {
+               fmt.Fprintf(stderr, "cannot fork\n")
+               return 2
+       }
+       if svm.CrunchRunMissing && strings.Contains(command, "crunch-run") {
+               fmt.Fprint(stderr, "crunch-run: command not found\n")
+               return 1
+       }
+       if strings.HasPrefix(command, "crunch-run --detach ") {
+               svm.Lock()
+               if svm.running == nil {
+                       svm.running = map[string]bool{}
+               }
+               svm.running[uuid] = true
+               svm.Unlock()
+               time.Sleep(svm.CrunchRunDetachDelay)
+               fmt.Fprintf(stderr, "starting %s\n", uuid)
+               logger := logrus.WithField("ContainerUUID", uuid)
+               logger.Printf("[test] starting crunch-run stub")
+               go func() {
+                       crashluck := math_rand.Float64()
+                       ctr, ok := queue.Get(uuid)
+                       if !ok {
+                               logger.Print("[test] container not in queue")
+                               return
+                       }
+                       if crashluck > svm.CrunchRunCrashRate/2 {
+                               time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+                               ctr.State = arvados.ContainerStateRunning
+                               queue.Notify(ctr)
+                       }
+
+                       time.Sleep(time.Duration(math_rand.Float64()*20) * time.Millisecond)
+                       svm.Lock()
+                       _, running := svm.running[uuid]
+                       svm.Unlock()
+                       if !running {
+                               logger.Print("[test] container was killed")
+                               return
+                       }
+                       if svm.ExecuteContainer != nil {
+                               ctr.ExitCode = svm.ExecuteContainer(ctr)
+                       }
+                       // TODO: Check whether the stub instance has
+                       // been destroyed, and if so, don't call
+                       // queue.Notify. Then "container finished
+                       // twice" can be classified as a bug.
+                       if crashluck < svm.CrunchRunCrashRate {
+                               logger.Print("[test] crashing crunch-run stub")
+                       } else {
+                               ctr.State = arvados.ContainerStateComplete
+                               queue.Notify(ctr)
+                       }
+                       logger.Print("[test] exiting crunch-run stub")
+                       svm.Lock()
+                       defer svm.Unlock()
+                       delete(svm.running, uuid)
+               }()
+               return 0
+       }
+       if command == "crunch-run --list" {
+               svm.Lock()
+               defer svm.Unlock()
+               for uuid := range svm.running {
+                       fmt.Fprintf(stdout, "%s\n", uuid)
+               }
+               return 0
+       }
+       if strings.HasPrefix(command, "crunch-run --kill ") {
+               svm.Lock()
+               defer svm.Unlock()
+               if svm.running[uuid] {
+                       delete(svm.running, uuid)
+               } else {
+                       fmt.Fprintf(stderr, "%s: container is not running\n", uuid)
+               }
+               return 0
+       }
+       if command == "true" {
+               return 0
+       }
+       fmt.Fprintf(stderr, "%q: command not found", command)
+       return 1
+}
+
+type stubInstance struct {
+       svm  *StubVM
+       addr string
+       tags cloud.InstanceTags
+}
+
+func (si stubInstance) ID() cloud.InstanceID {
+       return si.svm.id
+}
+
+func (si stubInstance) Address() string {
+       return si.addr
+}
+
+func (si stubInstance) Destroy() error {
+       if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
+               return errors.New("instance could not be destroyed")
+       }
+       si.svm.SSHService.Close()
+       sis := si.svm.sis
+       sis.mtx.Lock()
+       defer sis.mtx.Unlock()
+       delete(sis.servers, si.svm.id)
+       return nil
+}
+
+func (si stubInstance) ProviderType() string {
+       return si.svm.providerType
+}
+
+func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
+       tags = copyTags(tags)
+       svm := si.svm
+       go func() {
+               svm.Lock()
+               defer svm.Unlock()
+               svm.tags = tags
+       }()
+       return nil
+}
+
+func (si stubInstance) Tags() cloud.InstanceTags {
+       return si.tags
+}
+
+func (si stubInstance) String() string {
+       return string(si.svm.id)
+}
+
+func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
+       buf := make([]byte, 512)
+       _, err := io.ReadFull(rand.Reader, buf)
+       if err != nil {
+               return err
+       }
+       sig, err := si.svm.sis.driver.HostKey.Sign(rand.Reader, buf)
+       if err != nil {
+               return err
+       }
+       return key.Verify(buf, sig)
+}
+
+func copyTags(src cloud.InstanceTags) cloud.InstanceTags {
+       dst := cloud.InstanceTags{}
+       for k, v := range src {
+               dst[k] = v
+       }
+       return dst
+}
diff --git a/lib/dispatchcloud/worker/gocheck_test.go b/lib/dispatchcloud/worker/gocheck_test.go
new file mode 100644 (file)
index 0000000..b4ca66c
--- /dev/null
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "testing"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
new file mode 100644 (file)
index 0000000..ff5f762
--- /dev/null
@@ -0,0 +1,684 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "io"
+       "sort"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+const (
+       tagKeyInstanceType = "InstanceType"
+       tagKeyHold         = "Hold"
+)
+
+// An InstanceView shows a worker's current state and recent activity.
+type InstanceView struct {
+       Instance             string
+       Price                float64
+       ArvadosInstanceType  string
+       ProviderInstanceType string
+       LastContainerUUID    string
+       LastBusy             time.Time
+       WorkerState          string
+}
+
+// An Executor executes shell commands on a remote host.
+type Executor interface {
+       // Run cmd on the current target.
+       Execute(cmd string, stdin io.Reader) (stdout, stderr []byte, err error)
+
+       // Use the given target for subsequent operations. The new
+       // target is the same host as the previous target, but it
+       // might return a different address and verify a different
+       // host key.
+       //
+       // SetTarget is called frequently, and in most cases the new
+       // target will behave exactly the same as the old one. An
+       // implementation should optimize accordingly.
+       //
+       // SetTarget must not block on concurrent Execute calls.
+       SetTarget(cloud.ExecutorTarget)
+
+       Close()
+}
+
+const (
+       defaultSyncInterval       = time.Minute
+       defaultProbeInterval      = time.Second * 10
+       defaultMaxProbesPerSecond = 10
+       defaultTimeoutIdle        = time.Minute
+       defaultTimeoutBooting     = time.Minute * 10
+       defaultTimeoutProbe       = time.Minute * 10
+       defaultTimeoutShutdown    = time.Second * 10
+)
+
+func duration(conf arvados.Duration, def time.Duration) time.Duration {
+       if conf > 0 {
+               return time.Duration(conf)
+       } else {
+               return def
+       }
+}
+
+// NewPool creates a Pool of workers backed by instanceSet.
+//
+// New instances are configured and set up according to the given
+// cluster configuration.
+func NewPool(logger logrus.FieldLogger, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, cluster *arvados.Cluster) *Pool {
+       wp := &Pool{
+               logger:             logger,
+               instanceSet:        instanceSet,
+               newExecutor:        newExecutor,
+               bootProbeCommand:   cluster.CloudVMs.BootProbeCommand,
+               imageID:            cloud.ImageID(cluster.CloudVMs.ImageID),
+               instanceTypes:      cluster.InstanceTypes,
+               maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
+               probeInterval:      duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
+               syncInterval:       duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
+               timeoutIdle:        duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+               timeoutBooting:     duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+               timeoutProbe:       duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+               timeoutShutdown:    duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+               stop:               make(chan bool),
+       }
+       wp.registerMetrics(reg)
+       go func() {
+               wp.setupOnce.Do(wp.setup)
+               go wp.runMetrics()
+               go wp.runProbes()
+               go wp.runSync()
+       }()
+       return wp
+}
+
+// Pool is a resizable worker pool backed by a cloud.InstanceSet. A
+// zero Pool should not be used. Call NewPool to create a new Pool.
+type Pool struct {
+       // configuration
+       logger             logrus.FieldLogger
+       instanceSet        cloud.InstanceSet
+       newExecutor        func(cloud.Instance) Executor
+       bootProbeCommand   string
+       imageID            cloud.ImageID
+       instanceTypes      map[string]arvados.InstanceType
+       syncInterval       time.Duration
+       probeInterval      time.Duration
+       maxProbesPerSecond int
+       timeoutIdle        time.Duration
+       timeoutBooting     time.Duration
+       timeoutProbe       time.Duration
+       timeoutShutdown    time.Duration
+
+       // private state
+       subscribers  map[<-chan struct{}]chan<- struct{}
+       creating     map[arvados.InstanceType][]time.Time // start times of unfinished (InstanceSet)Create calls
+       workers      map[cloud.InstanceID]*worker
+       loaded       bool                 // loaded list of instances from InstanceSet at least once
+       exited       map[string]time.Time // containers whose crunch-run proc has exited, but KillContainer has not been called
+       atQuotaUntil time.Time
+       atQuotaErr   cloud.QuotaError
+       stop         chan bool
+       mtx          sync.RWMutex
+       setupOnce    sync.Once
+
+       mInstances         prometheus.Gauge
+       mContainersRunning prometheus.Gauge
+       mVCPUs             prometheus.Gauge
+       mVCPUsInuse        prometheus.Gauge
+       mMemory            prometheus.Gauge
+       mMemoryInuse       prometheus.Gauge
+}
+
+// Subscribe returns a channel that becomes ready whenever a worker's
+// state changes.
+//
+// Example:
+//
+//     ch := wp.Subscribe()
+//     defer wp.Unsubscribe(ch)
+//     for range ch {
+//             // ...try scheduling some work...
+//             if done {
+//                     break
+//             }
+//     }
+func (wp *Pool) Subscribe() <-chan struct{} {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       ch := make(chan struct{}, 1)
+       wp.subscribers[ch] = ch
+       return ch
+}
+
+// Unsubscribe stops sending updates to the given channel.
+func (wp *Pool) Unsubscribe(ch <-chan struct{}) {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       delete(wp.subscribers, ch)
+}
+
+// Unallocated returns the number of unallocated (creating + booting +
+// idle + unknown) workers for each instance type.
+func (wp *Pool) Unallocated() map[arvados.InstanceType]int {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       unalloc := map[arvados.InstanceType]int{}
+       creating := map[arvados.InstanceType]int{}
+       for it, times := range wp.creating {
+               creating[it] = len(times)
+       }
+       for _, wkr := range wp.workers {
+               if !(wkr.state == StateIdle || wkr.state == StateBooting || wkr.state == StateUnknown) {
+                       continue
+               }
+               it := wkr.instType
+               unalloc[it]++
+               if wkr.state == StateUnknown && creating[it] > 0 && wkr.appeared.After(wp.creating[it][0]) {
+                       // If up to N new workers appear in
+                       // Instances() while we are waiting for N
+                       // Create() calls to complete, we assume we're
+                       // just seeing a race between Instances() and
+                       // Create() responses.
+                       //
+                       // The other common reason why nodes have
+                       // state==Unknown is that they appeared at
+                       // startup, before any Create calls. They
+                       // don't match the above timing condition, so
+                       // we never mistakenly attribute them to
+                       // pending Create calls.
+                       creating[it]--
+               }
+       }
+       for it, c := range creating {
+               unalloc[it] += c
+       }
+       return unalloc
+}
+
+// Create a new instance with the given type, and add it to the worker
+// pool. The worker is added immediately; instance creation runs in
+// the background.
+func (wp *Pool) Create(it arvados.InstanceType) error {
+       logger := wp.logger.WithField("InstanceType", it.Name)
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if time.Now().Before(wp.atQuotaUntil) {
+               return wp.atQuotaErr
+       }
+       tags := cloud.InstanceTags{tagKeyInstanceType: it.Name}
+       now := time.Now()
+       wp.creating[it] = append(wp.creating[it], now)
+       go func() {
+               defer wp.notify()
+               inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
+               wp.mtx.Lock()
+               defer wp.mtx.Unlock()
+               // Remove our timestamp marker from wp.creating
+               for i, t := range wp.creating[it] {
+                       if t == now {
+                               copy(wp.creating[it][i:], wp.creating[it][i+1:])
+                               wp.creating[it] = wp.creating[it][:len(wp.creating[it])-1]
+                               break
+                       }
+               }
+               if err, ok := err.(cloud.QuotaError); ok && err.IsQuotaError() {
+                       wp.atQuotaErr = err
+                       wp.atQuotaUntil = time.Now().Add(time.Minute)
+               }
+               if err != nil {
+                       logger.WithError(err).Error("create failed")
+                       return
+               }
+               wp.updateWorker(inst, it, StateBooting)
+       }()
+       return nil
+}
+
+// AtQuota returns true if Create is not expected to work at the
+// moment.
+func (wp *Pool) AtQuota() bool {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       return time.Now().Before(wp.atQuotaUntil)
+}
+
+// Add or update worker attached to the given instance. Use
+// initialState if a new worker is created.
+//
+// The second return value is true if a new worker is created.
+//
+// Caller must have lock.
+func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType, initialState State) (*worker, bool) {
+       id := inst.ID()
+       if wkr := wp.workers[id]; wkr != nil {
+               wkr.executor.SetTarget(inst)
+               wkr.instance = inst
+               wkr.updated = time.Now()
+               if initialState == StateBooting && wkr.state == StateUnknown {
+                       wkr.state = StateBooting
+               }
+               return wkr, false
+       }
+       if initialState == StateUnknown && inst.Tags()[tagKeyHold] != "" {
+               initialState = StateHold
+       }
+       logger := wp.logger.WithFields(logrus.Fields{
+               "InstanceType": it.Name,
+               "Instance":     inst,
+       })
+       logger.WithField("State", initialState).Infof("instance appeared in cloud")
+       now := time.Now()
+       wkr := &worker{
+               mtx:      &wp.mtx,
+               wp:       wp,
+               logger:   logger,
+               executor: wp.newExecutor(inst),
+               state:    initialState,
+               instance: inst,
+               instType: it,
+               appeared: now,
+               probed:   now,
+               busy:     now,
+               updated:  now,
+               running:  make(map[string]struct{}),
+               starting: make(map[string]struct{}),
+               probing:  make(chan struct{}, 1),
+       }
+       wp.workers[id] = wkr
+       return wkr, true
+}
+
+// caller must have lock.
+func (wp *Pool) notifyExited(uuid string, t time.Time) {
+       wp.exited[uuid] = t
+}
+
+// Shutdown shuts down a worker with the given type, or returns false
+// if all workers with the given type are busy.
+func (wp *Pool) Shutdown(it arvados.InstanceType) bool {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       logger := wp.logger.WithField("InstanceType", it.Name)
+       logger.Info("shutdown requested")
+       for _, tryState := range []State{StateBooting, StateIdle} {
+               // TODO: shutdown the worker with the longest idle
+               // time (Idle) or the earliest create time (Booting)
+               for _, wkr := range wp.workers {
+                       if wkr.state == tryState && wkr.instType == it {
+                               logger.WithField("Instance", wkr.instance).Info("shutting down")
+                               wkr.shutdown()
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+// CountWorkers returns the current number of workers in each state.
+func (wp *Pool) CountWorkers() map[State]int {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       r := map[State]int{}
+       for _, w := range wp.workers {
+               r[w.state]++
+       }
+       return r
+}
+
+// Running returns the container UUIDs being prepared/run on workers.
+func (wp *Pool) Running() map[string]time.Time {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       r := map[string]time.Time{}
+       for _, wkr := range wp.workers {
+               for uuid := range wkr.running {
+                       r[uuid] = time.Time{}
+               }
+               for uuid := range wkr.starting {
+                       r[uuid] = time.Time{}
+               }
+       }
+       for uuid, exited := range wp.exited {
+               r[uuid] = exited
+       }
+       return r
+}
+
+// StartContainer starts a container on an idle worker immediately if
+// possible, otherwise returns false.
+func (wp *Pool) StartContainer(it arvados.InstanceType, ctr arvados.Container) bool {
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       var wkr *worker
+       for _, w := range wp.workers {
+               if w.instType == it && w.state == StateIdle {
+                       if wkr == nil || w.busy.After(wkr.busy) {
+                               wkr = w
+                       }
+               }
+       }
+       if wkr == nil {
+               return false
+       }
+       wkr.startContainer(ctr)
+       return true
+}
+
+// KillContainer kills the crunch-run process for the given container
+// UUID, if it's running on any worker.
+//
+// KillContainer returns immediately; the act of killing the container
+// takes some time, and runs in the background.
+func (wp *Pool) KillContainer(uuid string) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wp.exited[uuid]; ok {
+               wp.logger.WithField("ContainerUUID", uuid).Debug("clearing placeholder for exited crunch-run process")
+               delete(wp.exited, uuid)
+               return
+       }
+       for _, wkr := range wp.workers {
+               if _, ok := wkr.running[uuid]; ok {
+                       go wp.kill(wkr, uuid)
+                       return
+               }
+       }
+       wp.logger.WithField("ContainerUUID", uuid).Debug("cannot kill: already disappeared")
+}
+
+func (wp *Pool) kill(wkr *worker, uuid string) {
+       logger := wp.logger.WithFields(logrus.Fields{
+               "ContainerUUID": uuid,
+               "Instance":      wkr.instance,
+       })
+       logger.Debug("killing process")
+       stdout, stderr, err := wkr.executor.Execute("crunch-run --kill 15 "+uuid, nil)
+       if err != nil {
+               logger.WithFields(logrus.Fields{
+                       "stderr": string(stderr),
+                       "stdout": string(stdout),
+                       "error":  err,
+               }).Warn("kill failed")
+               return
+       }
+       logger.Debug("killing process succeeded")
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       if _, ok := wkr.running[uuid]; ok {
+               delete(wkr.running, uuid)
+               if wkr.state == StateRunning && len(wkr.running)+len(wkr.starting) == 0 {
+                       wkr.state = StateIdle
+               }
+               wkr.updated = time.Now()
+               go wp.notify()
+       }
+}
+
+func (wp *Pool) registerMetrics(reg *prometheus.Registry) {
+       if reg == nil {
+               reg = prometheus.NewRegistry()
+       }
+       wp.mInstances = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "instances_total",
+               Help:      "Number of cloud VMs including pending, booting, running, held, and shutting down.",
+       })
+       reg.MustRegister(wp.mInstances)
+       wp.mContainersRunning = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "containers_running",
+               Help:      "Number of containers reported running by cloud VMs.",
+       })
+       reg.MustRegister(wp.mContainersRunning)
+
+       wp.mVCPUs = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "vcpus_total",
+               Help:      "Total VCPUs on all cloud VMs.",
+       })
+       reg.MustRegister(wp.mVCPUs)
+       wp.mVCPUsInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "vcpus_inuse",
+               Help:      "VCPUs on cloud VMs that are running containers.",
+       })
+       reg.MustRegister(wp.mVCPUsInuse)
+       wp.mMemory = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "memory_bytes_total",
+               Help:      "Total memory on all cloud VMs.",
+       })
+       reg.MustRegister(wp.mMemory)
+       wp.mMemoryInuse = prometheus.NewGauge(prometheus.GaugeOpts{
+               Namespace: "arvados",
+               Subsystem: "dispatchcloud",
+               Name:      "memory_bytes_inuse",
+               Help:      "Memory on cloud VMs that are running containers.",
+       })
+       reg.MustRegister(wp.mMemoryInuse)
+}
+
+func (wp *Pool) runMetrics() {
+       ch := wp.Subscribe()
+       defer wp.Unsubscribe(ch)
+       for range ch {
+               wp.updateMetrics()
+       }
+}
+
+func (wp *Pool) updateMetrics() {
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+
+       var alloc, cpu, cpuInuse, mem, memInuse int64
+       for _, wkr := range wp.workers {
+               cpu += int64(wkr.instType.VCPUs)
+               mem += int64(wkr.instType.RAM)
+               if len(wkr.running)+len(wkr.starting) == 0 {
+                       continue
+               }
+               alloc += int64(len(wkr.running) + len(wkr.starting))
+               cpuInuse += int64(wkr.instType.VCPUs)
+               memInuse += int64(wkr.instType.RAM)
+       }
+       wp.mInstances.Set(float64(len(wp.workers)))
+       wp.mContainersRunning.Set(float64(alloc))
+       wp.mVCPUs.Set(float64(cpu))
+       wp.mMemory.Set(float64(mem))
+       wp.mVCPUsInuse.Set(float64(cpuInuse))
+       wp.mMemoryInuse.Set(float64(memInuse))
+}
+
+func (wp *Pool) runProbes() {
+       maxPPS := wp.maxProbesPerSecond
+       if maxPPS < 1 {
+               maxPPS = defaultMaxProbesPerSecond
+       }
+       limitticker := time.NewTicker(time.Second / time.Duration(maxPPS))
+       defer limitticker.Stop()
+
+       probeticker := time.NewTicker(wp.probeInterval)
+       defer probeticker.Stop()
+
+       workers := []cloud.InstanceID{}
+       for range probeticker.C {
+               workers = workers[:0]
+               wp.mtx.Lock()
+               for id, wkr := range wp.workers {
+                       if wkr.state == StateShutdown || wkr.shutdownIfIdle() {
+                               continue
+                       }
+                       workers = append(workers, id)
+               }
+               wp.mtx.Unlock()
+
+               for _, id := range workers {
+                       wp.mtx.Lock()
+                       wkr, ok := wp.workers[id]
+                       wp.mtx.Unlock()
+                       if !ok {
+                               // Deleted while we were probing
+                               // others
+                               continue
+                       }
+                       go wkr.ProbeAndUpdate()
+                       select {
+                       case <-wp.stop:
+                               return
+                       case <-limitticker.C:
+                       }
+               }
+       }
+}
+
+func (wp *Pool) runSync() {
+       // sync once immediately, then wait syncInterval, sync again,
+       // etc.
+       timer := time.NewTimer(1)
+       for {
+               select {
+               case <-timer.C:
+                       err := wp.getInstancesAndSync()
+                       if err != nil {
+                               wp.logger.WithError(err).Warn("sync failed")
+                       }
+                       timer.Reset(wp.syncInterval)
+               case <-wp.stop:
+                       wp.logger.Debug("worker.Pool stopped")
+                       return
+               }
+       }
+}
+
+// Stop synchronizing with the InstanceSet.
+func (wp *Pool) Stop() {
+       wp.setupOnce.Do(wp.setup)
+       close(wp.stop)
+}
+
+// Instances returns an InstanceView for each worker in the pool,
+// summarizing its current state and recent activity.
+func (wp *Pool) Instances() []InstanceView {
+       var r []InstanceView
+       wp.setupOnce.Do(wp.setup)
+       wp.mtx.Lock()
+       for _, w := range wp.workers {
+               r = append(r, InstanceView{
+                       Instance:             w.instance.String(),
+                       Price:                w.instType.Price,
+                       ArvadosInstanceType:  w.instType.Name,
+                       ProviderInstanceType: w.instType.ProviderType,
+                       LastContainerUUID:    w.lastUUID,
+                       LastBusy:             w.busy,
+                       WorkerState:          w.state.String(),
+               })
+       }
+       wp.mtx.Unlock()
+       sort.Slice(r, func(i, j int) bool {
+               return strings.Compare(r[i].Instance, r[j].Instance) < 0
+       })
+       return r
+}
+
+func (wp *Pool) setup() {
+       wp.creating = map[arvados.InstanceType][]time.Time{}
+       wp.exited = map[string]time.Time{}
+       wp.workers = map[cloud.InstanceID]*worker{}
+       wp.subscribers = map[<-chan struct{}]chan<- struct{}{}
+}
+
+func (wp *Pool) notify() {
+       wp.mtx.RLock()
+       defer wp.mtx.RUnlock()
+       for _, send := range wp.subscribers {
+               select {
+               case send <- struct{}{}:
+               default:
+               }
+       }
+}
+
+func (wp *Pool) getInstancesAndSync() error {
+       wp.setupOnce.Do(wp.setup)
+       wp.logger.Debug("getting instance list")
+       threshold := time.Now()
+       instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+       if err != nil {
+               return err
+       }
+       wp.sync(threshold, instances)
+       wp.logger.Debug("sync done")
+       return nil
+}
+
+// Add/remove/update workers based on instances, which was obtained
+// from the instanceSet. However, don't clobber any other updates that
+// already happened after threshold.
+func (wp *Pool) sync(threshold time.Time, instances []cloud.Instance) {
+       wp.mtx.Lock()
+       defer wp.mtx.Unlock()
+       wp.logger.WithField("Instances", len(instances)).Debug("sync instances")
+       notify := false
+
+       for _, inst := range instances {
+               itTag := inst.Tags()[tagKeyInstanceType]
+               it, ok := wp.instanceTypes[itTag]
+               if !ok {
+                       wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
+                       continue
+               }
+               if wkr, isNew := wp.updateWorker(inst, it, StateUnknown); isNew {
+                       notify = true
+               } else if wkr.state == StateShutdown && time.Since(wkr.destroyed) > wp.timeoutShutdown {
+                       wp.logger.WithField("Instance", inst).Info("worker still listed after shutdown; retrying")
+                       wkr.shutdown()
+               }
+       }
+
+       for id, wkr := range wp.workers {
+               if wkr.updated.After(threshold) {
+                       continue
+               }
+               logger := wp.logger.WithFields(logrus.Fields{
+                       "Instance":    wkr.instance,
+                       "WorkerState": wkr.state,
+               })
+               logger.Info("instance disappeared in cloud")
+               delete(wp.workers, id)
+               go wkr.executor.Close()
+               notify = true
+       }
+
+       if !wp.loaded {
+               wp.loaded = true
+               wp.logger.WithField("N", len(wp.workers)).Info("loaded initial instance list")
+       }
+
+       if notify {
+               go wp.notify()
+       }
+}
diff --git a/lib/dispatchcloud/worker/pool_test.go b/lib/dispatchcloud/worker/pool_test.go
new file mode 100644 (file)
index 0000000..3867e2c
--- /dev/null
@@ -0,0 +1,135 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "io"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+const GiB arvados.ByteSize = 1 << 30
+
+var _ = check.Suite(&PoolSuite{})
+
+type lessChecker struct {
+       *check.CheckerInfo
+}
+
+func (*lessChecker) Check(params []interface{}, names []string) (result bool, error string) {
+       return params[0].(int) < params[1].(int), ""
+}
+
+var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
+
+type PoolSuite struct{}
+
+func (suite *PoolSuite) SetUpSuite(c *check.C) {
+       logrus.StandardLogger().SetLevel(logrus.DebugLevel)
+}
+
+func (suite *PoolSuite) TestStartContainer(c *check.C) {
+       // TODO: use an instanceSet stub with an SSH server
+}
+
+func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
+       // TODO: use an instanceSet stub with an SSH server
+}
+
+func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
+       lameInstanceSet := &test.LameInstanceSet{Hold: make(chan bool)}
+       type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
+       type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
+       pool := &Pool{
+               logger:      logrus.StandardLogger(),
+               newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               instanceSet: lameInstanceSet,
+               instanceTypes: arvados.InstanceTypeMap{
+                       type1.Name: type1,
+                       type2.Name: type2,
+               },
+       }
+       notify := pool.Subscribe()
+       defer pool.Unsubscribe(notify)
+       notify2 := pool.Subscribe()
+       defer pool.Unsubscribe(notify2)
+
+       c.Check(pool.Unallocated()[type1], check.Equals, 0)
+       c.Check(pool.Unallocated()[type2], check.Equals, 0)
+       pool.Create(type2)
+       pool.Create(type1)
+       pool.Create(type2)
+       c.Check(pool.Unallocated()[type1], check.Equals, 1)
+       c.Check(pool.Unallocated()[type2], check.Equals, 2)
+
+       // Unblock the pending Create calls.
+       go lameInstanceSet.Release(3)
+
+       // Wait for each instance to either return from its Create
+       // call, or show up in a poll.
+       suite.wait(c, pool, notify, func() bool {
+               pool.mtx.RLock()
+               defer pool.mtx.RUnlock()
+               return len(pool.workers) == 3
+       })
+
+       c.Check(pool.Shutdown(type2), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 1
+       })
+       c.Check(pool.Shutdown(type2), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 1 && pool.Unallocated()[type2] == 0
+       })
+       c.Check(pool.Shutdown(type2), check.Equals, false)
+       for {
+               // Consume any waiting notifications to ensure the
+               // next one we get is from Shutdown.
+               select {
+               case <-notify:
+                       continue
+               default:
+               }
+               break
+       }
+       c.Check(pool.Shutdown(type1), check.Equals, true)
+       suite.wait(c, pool, notify, func() bool {
+               return pool.Unallocated()[type1] == 0 && pool.Unallocated()[type2] == 0
+       })
+       select {
+       case <-notify2:
+       case <-time.After(time.Second):
+               c.Error("notify did not receive")
+       }
+       go lameInstanceSet.Release(3) // unblock Destroy calls
+}
+
+func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
+       timeout := time.NewTimer(time.Second).C
+       for !ready() {
+               select {
+               case <-notify:
+                       continue
+               case <-timeout:
+               }
+               break
+       }
+       c.Check(ready(), check.Equals, true)
+}
+
+type stubExecutor struct{}
+
+func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
+
+func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+       return nil, nil, nil
+}
+
+func (*stubExecutor) Close() {}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
new file mode 100644 (file)
index 0000000..c261863
--- /dev/null
@@ -0,0 +1,320 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package worker
+
+import (
+       "bytes"
+       "strings"
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/lib/cloud"
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
+)
+
+// State indicates whether a worker is available to do work, and (if
+// not) whether/when it is expected to become ready.
+type State int
+
+const (
+       StateUnknown  State = iota // might be running a container already
+       StateBooting               // instance is booting
+       StateIdle                  // instance booted, no containers are running
+       StateRunning               // instance is running one or more containers
+       StateShutdown              // worker has stopped monitoring the instance
+       StateHold                  // running, but not available to run new containers
+)
+
+const (
+       // TODO: configurable
+       maxPingFailTime = 10 * time.Minute
+)
+
+var stateString = map[State]string{
+       StateUnknown:  "unknown",
+       StateBooting:  "booting",
+       StateIdle:     "idle",
+       StateRunning:  "running",
+       StateShutdown: "shutdown",
+       StateHold:     "hold",
+}
+
+// String implements fmt.Stringer.
+func (s State) String() string {
+       return stateString[s]
+}
+
+// MarshalText implements encoding.TextMarshaler so a JSON encoding of
+// map[State]anything uses the state's string representation.
+func (s State) MarshalText() ([]byte, error) {
+       return []byte(stateString[s]), nil
+}
+
+type worker struct {
+       logger   logrus.FieldLogger
+       executor Executor
+       wp       *Pool
+
+       mtx       sync.Locker // must be wp's Locker.
+       state     State
+       instance  cloud.Instance
+       instType  arvados.InstanceType
+       vcpus     int64
+       memory    int64
+       appeared  time.Time
+       probed    time.Time
+       updated   time.Time
+       busy      time.Time
+       destroyed time.Time
+       lastUUID  string
+       running   map[string]struct{} // remember to update state idle<->running when this changes
+       starting  map[string]struct{} // remember to update state idle<->running when this changes
+       probing   chan struct{}
+}
+
+// caller must have lock.
+func (wkr *worker) startContainer(ctr arvados.Container) {
+       logger := wkr.logger.WithFields(logrus.Fields{
+               "ContainerUUID": ctr.UUID,
+               "Priority":      ctr.Priority,
+       })
+       logger = logger.WithField("Instance", wkr.instance)
+       logger.Debug("starting container")
+       wkr.starting[ctr.UUID] = struct{}{}
+       wkr.state = StateRunning
+       go func() {
+               stdout, stderr, err := wkr.executor.Execute("crunch-run --detach '"+ctr.UUID+"'", nil)
+               wkr.mtx.Lock()
+               defer wkr.mtx.Unlock()
+               now := time.Now()
+               wkr.updated = now
+               wkr.busy = now
+               delete(wkr.starting, ctr.UUID)
+               wkr.running[ctr.UUID] = struct{}{}
+               wkr.lastUUID = ctr.UUID
+               if err != nil {
+                       logger.WithField("stdout", string(stdout)).
+                               WithField("stderr", string(stderr)).
+                               WithError(err).
+                               Error("error starting crunch-run process")
+                       // Leave uuid in wkr.running, though: it's
+                       // possible the error was just a communication
+                       // failure and the process was in fact
+                       // started.  Wait for next probe to find out.
+                       return
+               }
+               logger.Info("crunch-run process started")
+               wkr.lastUUID = ctr.UUID
+       }()
+}
+
+// ProbeAndUpdate conducts appropriate boot/running probes (if any)
+// for the worker's curent state. If a previous probe is still
+// running, it does nothing.
+//
+// It should be called in a new goroutine.
+func (wkr *worker) ProbeAndUpdate() {
+       select {
+       case wkr.probing <- struct{}{}:
+               wkr.probeAndUpdate()
+               <-wkr.probing
+       default:
+               wkr.logger.Debug("still waiting for last probe to finish")
+       }
+}
+
+// should be called in a new goroutine
+func (wkr *worker) probeAndUpdate() {
+       wkr.mtx.Lock()
+       updated := wkr.updated
+       needProbeRunning := wkr.state == StateRunning || wkr.state == StateIdle
+       needProbeBooted := wkr.state == StateUnknown || wkr.state == StateBooting
+       wkr.mtx.Unlock()
+       if !needProbeBooted && !needProbeRunning {
+               return
+       }
+
+       var (
+               ctrUUIDs []string
+               ok       bool
+               stderr   []byte
+       )
+       if needProbeBooted {
+               ok, stderr = wkr.probeBooted()
+               wkr.mtx.Lock()
+               if ok || wkr.state == StateRunning || wkr.state == StateIdle {
+                       wkr.logger.Info("instance booted; will try probeRunning")
+                       needProbeRunning = true
+               }
+               wkr.mtx.Unlock()
+       }
+       if needProbeRunning {
+               ctrUUIDs, ok, stderr = wkr.probeRunning()
+       }
+       logger := wkr.logger.WithField("stderr", string(stderr))
+       wkr.mtx.Lock()
+       defer wkr.mtx.Unlock()
+       if !ok {
+               if wkr.state == StateShutdown && wkr.updated.After(updated) {
+                       // Skip the logging noise if shutdown was
+                       // initiated during probe.
+                       return
+               }
+               dur := time.Since(wkr.probed)
+               logger := logger.WithFields(logrus.Fields{
+                       "Duration": dur,
+                       "State":    wkr.state,
+               })
+               if wkr.state == StateBooting && !needProbeRunning {
+                       // If we know the instance has never passed a
+                       // boot probe, it's not noteworthy that it
+                       // hasn't passed this probe.
+                       logger.Debug("new instance not responding")
+               } else {
+                       logger.Info("instance not responding")
+               }
+               wkr.shutdownIfBroken(dur)
+               return
+       }
+
+       updateTime := time.Now()
+       wkr.probed = updateTime
+
+       if updated != wkr.updated {
+               // Worker was updated after the probe began, so
+               // wkr.running might have a container UUID that was
+               // not yet running when ctrUUIDs was generated. Leave
+               // wkr.running alone and wait for the next probe to
+               // catch up on any changes.
+               return
+       }
+
+       if len(ctrUUIDs) > 0 {
+               wkr.busy = updateTime
+               wkr.lastUUID = ctrUUIDs[0]
+       } else if len(wkr.running) > 0 {
+               // Actual last-busy time was sometime between wkr.busy
+               // and now. Now is the earliest opportunity to take
+               // advantage of the non-busy state, though.
+               wkr.busy = updateTime
+       }
+       running := map[string]struct{}{}
+       changed := false
+       for _, uuid := range ctrUUIDs {
+               running[uuid] = struct{}{}
+               if _, ok := wkr.running[uuid]; !ok {
+                       changed = true
+               }
+       }
+       for uuid := range wkr.running {
+               if _, ok := running[uuid]; !ok {
+                       logger.WithField("ContainerUUID", uuid).Info("crunch-run process ended")
+                       wkr.wp.notifyExited(uuid, updateTime)
+                       changed = true
+               }
+       }
+       if wkr.state == StateUnknown || wkr.state == StateBooting {
+               wkr.state = StateIdle
+               changed = true
+       }
+       if changed {
+               wkr.running = running
+               if wkr.state == StateIdle && len(wkr.starting)+len(wkr.running) > 0 {
+                       wkr.state = StateRunning
+               } else if wkr.state == StateRunning && len(wkr.starting)+len(wkr.running) == 0 {
+                       wkr.state = StateIdle
+               }
+               wkr.updated = updateTime
+               go wkr.wp.notify()
+       }
+}
+
+func (wkr *worker) probeRunning() (running []string, ok bool, stderr []byte) {
+       cmd := "crunch-run --list"
+       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       if err != nil {
+               wkr.logger.WithFields(logrus.Fields{
+                       "Command": cmd,
+                       "stdout":  string(stdout),
+                       "stderr":  string(stderr),
+               }).WithError(err).Warn("probe failed")
+               return nil, false, stderr
+       }
+       stdout = bytes.TrimRight(stdout, "\n")
+       if len(stdout) == 0 {
+               return nil, true, stderr
+       }
+       return strings.Split(string(stdout), "\n"), true, stderr
+}
+
+func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
+       cmd := wkr.wp.bootProbeCommand
+       if cmd == "" {
+               cmd = "true"
+       }
+       stdout, stderr, err := wkr.executor.Execute(cmd, nil)
+       logger := wkr.logger.WithFields(logrus.Fields{
+               "Command": cmd,
+               "stdout":  string(stdout),
+               "stderr":  string(stderr),
+       })
+       if err != nil {
+               logger.WithError(err).Debug("boot probe failed")
+               return false, stderr
+       }
+       logger.Info("boot probe succeeded")
+       return true, stderr
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfBroken(dur time.Duration) {
+       if wkr.state == StateHold {
+               return
+       }
+       label, threshold := "", wkr.wp.timeoutProbe
+       if wkr.state == StateBooting {
+               label, threshold = "new ", wkr.wp.timeoutBooting
+       }
+       if dur < threshold {
+               return
+       }
+       wkr.logger.WithFields(logrus.Fields{
+               "Duration": dur,
+               "Since":    wkr.probed,
+               "State":    wkr.state,
+       }).Warnf("%sinstance unresponsive, shutting down", label)
+       wkr.shutdown()
+}
+
+// caller must have lock.
+func (wkr *worker) shutdownIfIdle() bool {
+       if wkr.state != StateIdle {
+               return false
+       }
+       age := time.Since(wkr.busy)
+       if age < wkr.wp.timeoutIdle {
+               return false
+       }
+       wkr.logger.WithField("Age", age).Info("shutdown idle worker")
+       wkr.shutdown()
+       return true
+}
+
+// caller must have lock
+func (wkr *worker) shutdown() {
+       now := time.Now()
+       wkr.updated = now
+       wkr.destroyed = now
+       wkr.state = StateShutdown
+       go wkr.wp.notify()
+       go func() {
+               err := wkr.instance.Destroy()
+               if err != nil {
+                       wkr.logger.WithError(err).Warn("shutdown failed")
+                       return
+               }
+       }()
+}
index 26c85d300ddcb17c8038d31c4d0f8cd1d39aabc9..0b2a22788e6f98537b0f5a3437a2d540a57d47ee 100644 (file)
@@ -119,6 +119,39 @@ class ArvPathMapper(PathMapper):
         else:
             raise SourceLine(obj, "location", WorkflowException).makeError("Don't know what to do with '%s'" % obj["location"])
 
+    def needs_new_collection(self, srcobj, prefix=""):
+        """Check if files need to be staged into a new collection.
+
+        If all the files are in the same collection and in the same
+        paths they would be staged to, return False.  Otherwise, a new
+        collection is needed with files copied/created in the
+        appropriate places.
+        """
+
+        loc = srcobj["location"]
+        if loc.startswith("_:"):
+            return True
+        if prefix:
+            if loc != prefix+srcobj["basename"]:
+                return True
+        else:
+            i = loc.rfind("/")
+            if i > -1:
+                prefix = loc[:i+1]
+            else:
+                prefix = loc+"/"
+        if srcobj["class"] == "File" and loc not in self._pathmap:
+            return True
+        for s in srcobj.get("secondaryFiles", []):
+            if self.needs_new_collection(s, prefix):
+                return True
+        if srcobj.get("listing"):
+            prefix = "%s%s/" % (prefix, srcobj["basename"])
+            for l in srcobj["listing"]:
+                if self.needs_new_collection(l, prefix):
+                    return True
+        return False
+
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
         uploadfiles = set()
@@ -169,6 +202,13 @@ class ArvPathMapper(PathMapper):
             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
 
+                # If all secondary files/directories are located in
+                # the same collection as the primary file and the
+                # paths and names that are consistent with staging,
+                # don't create a new collection.
+                if not self.needs_new_collection(srcobj):
+                    continue
+
                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                   keep_client=self.arvrunner.keep_client,
                                                   num_retries=self.arvrunner.num_retries                                                  )
index fb3c257d93e1be9cac211defc97d3282100ccdbc..b78e89012ad62c5f952476da0553b2d26dac5fd3 100644 (file)
@@ -102,3 +102,132 @@ class TestPathmap(unittest.TestCase):
                 "class": "File",
                 "location": "file:tests/hw.py"
             }], "", "/test/%s", "/test/%s/%s")
+
+    def test_needs_new_collection(self):
+        arvrunner = arvados_cwl.executor.ArvCwlExecutor(self.api)
+
+        # Plain file.  Don't need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py"
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        self.assertFalse(p.needs_new_collection(a))
+
+        # A file that isn't in the pathmap (for some reason).  Need a new collection.
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        self.assertTrue(p.needs_new_collection(a))
+
+        # A file with a secondary file in the same collection.  Don't need
+        # a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "File",
+                "location": "keep:99999999999999999999999999999991+99/hw.pyc",
+                "basename": "hw.pyc"
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.pyc"] = True
+        self.assertFalse(p.needs_new_collection(a))
+
+        # Secondary file is in a different collection from the
+        # a new collectionprimary.  Need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "File",
+                "location": "keep:99999999999999999999999999999992+99/hw.pyc",
+                "basename": "hw.pyc"
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999992+99/hw.pyc"] = True
+        self.assertTrue(p.needs_new_collection(a))
+
+        # Secondary file should be staged to a different name than
+        # path in location.  Need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "File",
+                "location": "keep:99999999999999999999999999999991+99/hw.pyc",
+                "basename": "hw.other"
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.pyc"] = True
+        self.assertTrue(p.needs_new_collection(a))
+
+        # Secondary file is a directory.  Do not need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "Directory",
+                "location": "keep:99999999999999999999999999999991+99/hw",
+                "basename": "hw",
+                "listing": [{
+                    "class": "File",
+                    "location": "keep:99999999999999999999999999999991+99/hw/h2",
+                    "basename": "h2"
+                }]
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw/h2"] = True
+        self.assertFalse(p.needs_new_collection(a))
+
+        # Secondary file is a renamed directory.  Need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "Directory",
+                "location": "keep:99999999999999999999999999999991+99/hw",
+                "basename": "wh",
+                "listing": [{
+                    "class": "File",
+                    "location": "keep:99999999999999999999999999999991+99/hw/h2",
+                    "basename": "h2"
+                }]
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw"] = True
+        p._pathmap["keep:99999999999999999999999999999991+99/hw/h2"] = True
+        self.assertTrue(p.needs_new_collection(a))
+
+        # Secondary file is a file literal.  Need a new collection.
+        a = {
+            "class": "File",
+            "location": "keep:99999999999999999999999999999991+99/hw.py",
+            "basename": "hw.py",
+            "secondaryFiles": [{
+                "class": "File",
+                "location": "_:123",
+                "basename": "hw.pyc",
+                "contents": "123"
+            }]
+        }
+        p = ArvPathMapper(arvrunner, [], "", "%s", "%s/%s")
+        p._pathmap["keep:99999999999999999999999999999991+99/hw.py"] = True
+        p._pathmap["_:123"] = True
+        self.assertTrue(p.needs_new_collection(a))
index e2e9907d5d115ebb61a53ae873249511b3732a50..bfa86abf6a48a1fcf30eda2618bdfaf28b0a2efb 100644 (file)
@@ -60,6 +60,8 @@ type Cluster struct {
        ManagementToken    string
        NodeProfiles       map[string]NodeProfile
        InstanceTypes      InstanceTypeMap
+       CloudVMs           CloudVMs
+       Dispatch           Dispatch
        HTTPRequestTimeout Duration
        RemoteClusters     map[string]RemoteCluster
        PostgreSQL         PostgreSQL
@@ -95,6 +97,50 @@ type InstanceType struct {
        Preemptible  bool
 }
 
+type Dispatch struct {
+       // PEM encoded SSH key (RSA, DSA, or ECDSA) able to log in to
+       // cloud VMs.
+       PrivateKey []byte
+
+       // Max time for workers to come up before abandoning stale
+       // locks from previous run
+       StaleLockTimeout Duration
+
+       // Interval between queue polls
+       PollInterval Duration
+
+       // Interval between probes to each worker
+       ProbeInterval Duration
+
+       // Maximum total worker probes per second
+       MaxProbesPerSecond int
+}
+
+type CloudVMs struct {
+       // Shell command that exits zero IFF the VM is fully booted
+       // and ready to run containers, e.g., "mount | grep
+       // /encrypted-tmp"
+       BootProbeCommand string
+       SyncInterval     Duration
+
+       // Maximum idle time before automatic shutdown
+       TimeoutIdle Duration
+
+       // Maximum booting time before automatic shutdown
+       TimeoutBooting Duration
+
+       // Maximum time with no successful probes before automatic shutdown
+       TimeoutProbe Duration
+
+       // Time after shutdown to retry shutdown
+       TimeoutShutdown Duration
+
+       ImageID string
+
+       Driver           string
+       DriverParameters map[string]interface{}
+}
+
 type InstanceTypeMap map[string]InstanceType
 
 var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
@@ -159,45 +205,48 @@ func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
 }
 
 type NodeProfile struct {
-       Controller  SystemServiceInstance `json:"arvados-controller"`
-       Health      SystemServiceInstance `json:"arvados-health"`
-       Keepbalance SystemServiceInstance `json:"keep-balance"`
-       Keepproxy   SystemServiceInstance `json:"keepproxy"`
-       Keepstore   SystemServiceInstance `json:"keepstore"`
-       Keepweb     SystemServiceInstance `json:"keep-web"`
-       Nodemanager SystemServiceInstance `json:"arvados-node-manager"`
-       RailsAPI    SystemServiceInstance `json:"arvados-api-server"`
-       Websocket   SystemServiceInstance `json:"arvados-ws"`
-       Workbench   SystemServiceInstance `json:"arvados-workbench"`
+       Controller    SystemServiceInstance `json:"arvados-controller"`
+       Health        SystemServiceInstance `json:"arvados-health"`
+       Keepbalance   SystemServiceInstance `json:"keep-balance"`
+       Keepproxy     SystemServiceInstance `json:"keepproxy"`
+       Keepstore     SystemServiceInstance `json:"keepstore"`
+       Keepweb       SystemServiceInstance `json:"keep-web"`
+       Nodemanager   SystemServiceInstance `json:"arvados-node-manager"`
+       DispatchCloud SystemServiceInstance `json:"arvados-dispatch-cloud"`
+       RailsAPI      SystemServiceInstance `json:"arvados-api-server"`
+       Websocket     SystemServiceInstance `json:"arvados-ws"`
+       Workbench     SystemServiceInstance `json:"arvados-workbench"`
 }
 
 type ServiceName string
 
 const (
-       ServiceNameRailsAPI    ServiceName = "arvados-api-server"
-       ServiceNameController  ServiceName = "arvados-controller"
-       ServiceNameNodemanager ServiceName = "arvados-node-manager"
-       ServiceNameWorkbench   ServiceName = "arvados-workbench"
-       ServiceNameWebsocket   ServiceName = "arvados-ws"
-       ServiceNameKeepbalance ServiceName = "keep-balance"
-       ServiceNameKeepweb     ServiceName = "keep-web"
-       ServiceNameKeepproxy   ServiceName = "keepproxy"
-       ServiceNameKeepstore   ServiceName = "keepstore"
+       ServiceNameRailsAPI      ServiceName = "arvados-api-server"
+       ServiceNameController    ServiceName = "arvados-controller"
+       ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+       ServiceNameNodemanager   ServiceName = "arvados-node-manager"
+       ServiceNameWorkbench     ServiceName = "arvados-workbench"
+       ServiceNameWebsocket     ServiceName = "arvados-ws"
+       ServiceNameKeepbalance   ServiceName = "keep-balance"
+       ServiceNameKeepweb       ServiceName = "keep-web"
+       ServiceNameKeepproxy     ServiceName = "keepproxy"
+       ServiceNameKeepstore     ServiceName = "keepstore"
 )
 
 // ServicePorts returns the configured listening address (or "" if
 // disabled) for each service on the node.
 func (np *NodeProfile) ServicePorts() map[ServiceName]string {
        return map[ServiceName]string{
-               ServiceNameRailsAPI:    np.RailsAPI.Listen,
-               ServiceNameController:  np.Controller.Listen,
-               ServiceNameNodemanager: np.Nodemanager.Listen,
-               ServiceNameWorkbench:   np.Workbench.Listen,
-               ServiceNameWebsocket:   np.Websocket.Listen,
-               ServiceNameKeepbalance: np.Keepbalance.Listen,
-               ServiceNameKeepweb:     np.Keepweb.Listen,
-               ServiceNameKeepproxy:   np.Keepproxy.Listen,
-               ServiceNameKeepstore:   np.Keepstore.Listen,
+               ServiceNameRailsAPI:      np.RailsAPI.Listen,
+               ServiceNameController:    np.Controller.Listen,
+               ServiceNameDispatchCloud: np.DispatchCloud.Listen,
+               ServiceNameNodemanager:   np.Nodemanager.Listen,
+               ServiceNameWorkbench:     np.Workbench.Listen,
+               ServiceNameWebsocket:     np.Websocket.Listen,
+               ServiceNameKeepbalance:   np.Keepbalance.Listen,
+               ServiceNameKeepweb:       np.Keepweb.Listen,
+               ServiceNameKeepproxy:     np.Keepproxy.Listen,
+               ServiceNameKeepstore:     np.Keepstore.Listen,
        }
 }
 
index b70b4ac917672f363096a810cd35e3689f5132f9..02a0d76decbad272baee737282b5087a72a33c60 100644 (file)
@@ -18,10 +18,11 @@ type Container struct {
        Mounts               map[string]Mount     `json:"mounts"`
        Output               string               `json:"output"`
        OutputPath           string               `json:"output_path"`
-       Priority             int                  `json:"priority"`
+       Priority             int64                `json:"priority"`
        RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
        State                ContainerState       `json:"state"`
        SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
+       ExitCode             int                  `json:"exit_code"`
 }
 
 // Container is an arvados#container resource.
index cb47c9e6705ea096087199813050e3c3095f4974..122355be987755b161d38a2e46e0bc2cc4f52208 100644 (file)
@@ -107,15 +107,16 @@ func (s *AggregatorSuite) TestHealthy(c *check.C) {
        srv, listen := s.stubServer(&healthyHandler{})
        defer srv.Close()
        s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
-               Controller:  arvados.SystemServiceInstance{Listen: listen},
-               Keepbalance: arvados.SystemServiceInstance{Listen: listen},
-               Keepproxy:   arvados.SystemServiceInstance{Listen: listen},
-               Keepstore:   arvados.SystemServiceInstance{Listen: listen},
-               Keepweb:     arvados.SystemServiceInstance{Listen: listen},
-               Nodemanager: arvados.SystemServiceInstance{Listen: listen},
-               RailsAPI:    arvados.SystemServiceInstance{Listen: listen},
-               Websocket:   arvados.SystemServiceInstance{Listen: listen},
-               Workbench:   arvados.SystemServiceInstance{Listen: listen},
+               Controller:    arvados.SystemServiceInstance{Listen: listen},
+               DispatchCloud: arvados.SystemServiceInstance{Listen: listen},
+               Keepbalance:   arvados.SystemServiceInstance{Listen: listen},
+               Keepproxy:     arvados.SystemServiceInstance{Listen: listen},
+               Keepstore:     arvados.SystemServiceInstance{Listen: listen},
+               Keepweb:       arvados.SystemServiceInstance{Listen: listen},
+               Nodemanager:   arvados.SystemServiceInstance{Listen: listen},
+               RailsAPI:      arvados.SystemServiceInstance{Listen: listen},
+               Websocket:     arvados.SystemServiceInstance{Listen: listen},
+               Workbench:     arvados.SystemServiceInstance{Listen: listen},
        }
        s.handler.ServeHTTP(s.resp, s.req)
        resp := s.checkOK(c)
@@ -132,15 +133,16 @@ func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
        srvU, listenU := s.stubServer(&unhealthyHandler{})
        defer srvU.Close()
        s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
-               Controller:  arvados.SystemServiceInstance{Listen: listenH},
-               Keepbalance: arvados.SystemServiceInstance{Listen: listenH},
-               Keepproxy:   arvados.SystemServiceInstance{Listen: listenH},
-               Keepstore:   arvados.SystemServiceInstance{Listen: listenH},
-               Keepweb:     arvados.SystemServiceInstance{Listen: listenH},
-               Nodemanager: arvados.SystemServiceInstance{Listen: listenH},
-               RailsAPI:    arvados.SystemServiceInstance{Listen: listenH},
-               Websocket:   arvados.SystemServiceInstance{Listen: listenH},
-               Workbench:   arvados.SystemServiceInstance{Listen: listenH},
+               Controller:    arvados.SystemServiceInstance{Listen: listenH},
+               DispatchCloud: arvados.SystemServiceInstance{Listen: listenH},
+               Keepbalance:   arvados.SystemServiceInstance{Listen: listenH},
+               Keepproxy:     arvados.SystemServiceInstance{Listen: listenH},
+               Keepstore:     arvados.SystemServiceInstance{Listen: listenH},
+               Keepweb:       arvados.SystemServiceInstance{Listen: listenH},
+               Nodemanager:   arvados.SystemServiceInstance{Listen: listenH},
+               RailsAPI:      arvados.SystemServiceInstance{Listen: listenH},
+               Websocket:     arvados.SystemServiceInstance{Listen: listenH},
+               Workbench:     arvados.SystemServiceInstance{Listen: listenH},
        }
        s.handler.Config.Clusters["zzzzz"].NodeProfiles["127.0.0.1"] = arvados.NodeProfile{
                Keepstore: arvados.SystemServiceInstance{Listen: listenU},
index 29fad32bd150749d1e1bcb1df696359adbde0a0f..092524d8063b7ea818f99f1fe20f37957c8a2c15 100644 (file)
@@ -197,7 +197,7 @@ func (disp *Dispatcher) run() error {
        defer disp.sqCheck.Stop()
 
        if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
-               go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster)
+               go SlurmNodeTypeFeatureKludge(disp.cluster)
        }
 
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
diff --git a/services/crunch-dispatch-slurm/node_type.go b/services/crunch-dispatch-slurm/node_type.go
new file mode 100644 (file)
index 0000000..62a9693
--- /dev/null
@@ -0,0 +1,72 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "log"
+       "os/exec"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
+// type name as a valid feature name, even if no instances of that
+// type have appeared yet.
+//
+// It takes advantage of some SLURM peculiarities:
+//
+// (1) A feature is valid after it has been offered by a node, even if
+// it is no longer offered by any node. So, to make a feature name
+// valid, we can add it to a dummy node ("compute0"), then remove it.
+//
+// (2) To test whether a set of feature names are valid without
+// actually submitting a job, we can call srun --test-only with the
+// desired features.
+//
+// SlurmNodeTypeFeatureKludge does a test-and-fix operation
+// immediately, and then periodically, in case slurm restarts and
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
+func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
+       if len(cc.InstanceTypes) == 0 {
+               return
+       }
+       var features []string
+       for _, it := range cc.InstanceTypes {
+               features = append(features, "instancetype="+it.Name)
+       }
+       for {
+               slurmKludge(features)
+               time.Sleep(2 * time.Second)
+       }
+}
+
+const slurmDummyNode = "compute0"
+
+func slurmKludge(features []string) {
+       allFeatures := strings.Join(features, ",")
+
+       cmd := exec.Command("sinfo", "--nodes="+slurmDummyNode, "--format=%f", "--noheader")
+       out, err := cmd.CombinedOutput()
+       if err != nil {
+               log.Printf("running %q %q: %s (output was %q)", cmd.Path, cmd.Args, err, out)
+               return
+       }
+       if string(out) == allFeatures+"\n" {
+               // Already configured correctly, nothing to do.
+               return
+       }
+
+       log.Printf("configuring node %q with all node type features", slurmDummyNode)
+       cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+allFeatures)
+       log.Printf("running: %q %q", cmd.Path, cmd.Args)
+       out, err = cmd.CombinedOutput()
+       if err != nil {
+               log.Printf("error: scontrol: %s (output was %q)", err, out)
+       }
+}
diff --git a/services/crunch-run/background.go b/services/crunch-run/background.go
new file mode 100644 (file)
index 0000000..a508538
--- /dev/null
@@ -0,0 +1,237 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "os"
+       "os/exec"
+       "path/filepath"
+       "strings"
+       "syscall"
+       "time"
+)
+
+var (
+       lockdir    = "/var/lock"
+       lockprefix = "crunch-run-"
+       locksuffix = ".lock"
+)
+
+// procinfo is saved in each process's lockfile.
+type procinfo struct {
+       UUID   string
+       PID    int
+       Stdout string
+       Stderr string
+}
+
+// Detach acquires a lock for the given uuid, and starts the current
+// program as a child process (with -no-detach prepended to the given
+// arguments so the child knows not to detach again). The lock is
+// passed along to the child process.
+func Detach(uuid string, args []string, stdout, stderr io.Writer) int {
+       return exitcode(stderr, detach(uuid, args, stdout, stderr))
+}
+func detach(uuid string, args []string, stdout, stderr io.Writer) error {
+       lockfile, err := func() (*os.File, error) {
+               // We must hold the dir-level lock between
+               // opening/creating the lockfile and acquiring LOCK_EX
+               // on it, to avoid racing with the ListProcess's
+               // alive-checking and garbage collection.
+               dirlock, err := lockall()
+               if err != nil {
+                       return nil, err
+               }
+               defer dirlock.Close()
+               lockfile, err := os.OpenFile(filepath.Join(lockdir, lockprefix+uuid+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+               if err != nil {
+                       return nil, err
+               }
+               err = syscall.Flock(int(lockfile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+               if err != nil {
+                       lockfile.Close()
+                       return nil, err
+               }
+               return lockfile, nil
+       }()
+       if err != nil {
+               return err
+       }
+       defer lockfile.Close()
+       lockfile.Truncate(0)
+
+       outfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stdout-")
+       if err != nil {
+               return err
+       }
+       defer outfile.Close()
+       errfile, err := ioutil.TempFile("", "crunch-run-"+uuid+"-stderr-")
+       if err != nil {
+               os.Remove(outfile.Name())
+               return err
+       }
+       defer errfile.Close()
+
+       cmd := exec.Command(args[0], append([]string{"-no-detach"}, args[1:]...)...)
+       cmd.Stdout = outfile
+       cmd.Stderr = errfile
+       // Child inherits lockfile.
+       cmd.ExtraFiles = []*os.File{lockfile}
+       // Ensure child isn't interrupted even if we receive signals
+       // from parent (sshd) while sending lockfile content to
+       // caller.
+       cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+       err = cmd.Start()
+       if err != nil {
+               os.Remove(outfile.Name())
+               os.Remove(errfile.Name())
+               return err
+       }
+
+       w := io.MultiWriter(stdout, lockfile)
+       err = json.NewEncoder(w).Encode(procinfo{
+               UUID:   uuid,
+               PID:    cmd.Process.Pid,
+               Stdout: outfile.Name(),
+               Stderr: errfile.Name(),
+       })
+       if err != nil {
+               os.Remove(outfile.Name())
+               os.Remove(errfile.Name())
+               return err
+       }
+       return nil
+}
+
+// KillProcess finds the crunch-run process corresponding to the given
+// uuid, and sends the given signal to it. It then waits up to 1
+// second for the process to die. It returns 0 if the process is
+// successfully killed or didn't exist in the first place.
+func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
+       return exitcode(stderr, kill(uuid, signal, stdout, stderr))
+}
+
+func kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) error {
+       path := filepath.Join(lockdir, lockprefix+uuid+locksuffix)
+       f, err := os.Open(path)
+       if os.IsNotExist(err) {
+               return nil
+       } else if err != nil {
+               return err
+       }
+       defer f.Close()
+
+       var pi procinfo
+       err = json.NewDecoder(f).Decode(&pi)
+       if err != nil {
+               return fmt.Errorf("%s: %s\n", path, err)
+       }
+
+       if pi.UUID != uuid || pi.PID == 0 {
+               return fmt.Errorf("%s: bogus procinfo: %+v", path, pi)
+       }
+
+       proc, err := os.FindProcess(pi.PID)
+       if err != nil {
+               return err
+       }
+
+       err = proc.Signal(signal)
+       for deadline := time.Now().Add(time.Second); err == nil && time.Now().Before(deadline); time.Sleep(time.Second / 100) {
+               err = proc.Signal(syscall.Signal(0))
+       }
+       if err == nil {
+               return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
+       }
+       fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err)
+       return nil
+}
+
+// List UUIDs of active crunch-run processes.
+func ListProcesses(stdout, stderr io.Writer) int {
+       return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
+               if info.IsDir() {
+                       return filepath.SkipDir
+               }
+               if name := info.Name(); !strings.HasPrefix(name, lockprefix) || !strings.HasSuffix(name, locksuffix) {
+                       return nil
+               }
+               if info.Size() == 0 {
+                       // race: process has opened/locked but hasn't yet written pid/uuid
+                       return nil
+               }
+
+               f, err := os.Open(path)
+               if err != nil {
+                       return nil
+               }
+               defer f.Close()
+
+               // Ensure other processes don't acquire this lockfile
+               // after we have decided it is abandoned but before we
+               // have deleted it.
+               dirlock, err := lockall()
+               if err != nil {
+                       return err
+               }
+               err = syscall.Flock(int(f.Fd()), syscall.LOCK_SH|syscall.LOCK_NB)
+               if err == nil {
+                       // lockfile is stale
+                       err := os.Remove(path)
+                       dirlock.Close()
+                       if err != nil {
+                               fmt.Fprintln(stderr, err)
+                       }
+                       return nil
+               }
+               dirlock.Close()
+
+               var pi procinfo
+               err = json.NewDecoder(f).Decode(&pi)
+               if err != nil {
+                       fmt.Fprintf(stderr, "%s: %s\n", path, err)
+                       return nil
+               }
+               if pi.UUID == "" || pi.PID == 0 {
+                       fmt.Fprintf(stderr, "%s: bogus procinfo: %+v", path, pi)
+                       return nil
+               }
+
+               fmt.Fprintln(stdout, pi.UUID)
+               return nil
+       }))
+}
+
+// If err is nil, return 0 ("success"); otherwise, print err to stderr
+// and return 1.
+func exitcode(stderr io.Writer, err error) int {
+       if err != nil {
+               fmt.Fprintln(stderr, err)
+               return 1
+       }
+       return 0
+}
+
+// Acquire a dir-level lock. Must be held while creating or deleting
+// container-specific lockfiles, to avoid races during the intervals
+// when those container-specific lockfiles are open but not locked.
+//
+// Caller releases the lock by closing the returned file.
+func lockall() (*os.File, error) {
+       f, err := os.OpenFile(filepath.Join(lockdir, lockprefix+"all"+locksuffix), os.O_CREATE|os.O_RDWR, 0700)
+       if err != nil {
+               return nil, err
+       }
+       err = syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
+       if err != nil {
+               f.Close()
+               return nil, err
+       }
+       return f, nil
+}
index 7d933632c97c5511b290b72de1f7e1c3e0879159..2b9a119581dfd7c4f3245b1e57317ae95155f5b9 100644 (file)
@@ -1736,6 +1736,10 @@ func main() {
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
+       detach := flag.Bool("detach", false, "Detach from parent process and run in the background")
+       sleep := flag.Duration("sleep", 0, "Delay before starting (testing use only)")
+       kill := flag.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
+       list := flag.Bool("list", false, "List UUIDs of existing crunch-run processes")
        enableNetwork := flag.String("container-enable-networking", "default",
                `Specify if networking should be enabled for container.  One of 'default', 'always':
        default: only enable networking if container requests it.
@@ -1747,8 +1751,30 @@ func main() {
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
        getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+
+       ignoreDetachFlag := false
+       if len(os.Args) > 1 && os.Args[1] == "-no-detach" {
+               // This process was invoked by a parent process, which
+               // has passed along its own arguments, including
+               // -detach, after the leading -no-detach flag.  Strip
+               // the leading -no-detach flag (it's not recognized by
+               // flag.Parse()) and ignore the -detach flag that
+               // comes later.
+               os.Args = append([]string{os.Args[0]}, os.Args[2:]...)
+               ignoreDetachFlag = true
+       }
+
        flag.Parse()
 
+       switch {
+       case *detach && !ignoreDetachFlag:
+               os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
+       case *kill >= 0:
+               os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+       case *list:
+               os.Exit(ListProcesses(os.Stdout, os.Stderr))
+       }
+
        // Print version information if requested
        if *getVersion {
                fmt.Printf("crunch-run %s\n", version)
@@ -1756,6 +1782,7 @@ func main() {
        }
 
        log.Printf("crunch-run %s started", version)
+       time.Sleep(*sleep)
 
        containerId := flag.Arg(0)
 
index 69fc2cedee7c41b6a637e1b7e1f920cb8c17e244..a258b8e2329eeb2aa1c6b6707678e89d750849df 100755 (executable)
@@ -50,6 +50,10 @@ if test -z "$COMPOSER_ROOT" ; then
     COMPOSER_ROOT="$ARVBOX_DATA/composer"
 fi
 
+if test -z "$WORKBENCH2_ROOT" ; then
+    WORKBENCH2_ROOT="$ARVBOX_DATA/workbench2"
+fi
+
 PG_DATA="$ARVBOX_DATA/postgres"
 VAR_DATA="$ARVBOX_DATA/var"
 PASSENGER="$ARVBOX_DATA/passenger"
@@ -99,9 +103,10 @@ wait_for_arvbox() {
     docker logs -f $ARVBOX_CONTAINER > $FF &
     LOGPID=$!
     while read line ; do
-        echo $line
-        if echo $line | grep "Workbench is running at" >/dev/null ; then
+        if echo $line | grep "ok: down: ready:" >/dev/null ; then
             kill $LOGPID
+       else
+           echo $line
         fi
     done < $FF
     rm $FF
@@ -158,7 +163,8 @@ run() {
         echo $localip > $iptemp
         chmod og+r $iptemp
         PUBLIC="--volume=$iptemp:/var/run/localip_override
-              --publish=80:80
+              --publish=443:443
+              --publish=3001:3001
               --publish=8000:8000
               --publish=8900:8900
               --publish=9001:9001
@@ -205,6 +211,9 @@ run() {
         if ! test -d "$COMPOSER_ROOT" ; then
             git clone https://github.com/curoverse/composer.git "$COMPOSER_ROOT"
         fi
+        if ! test -d "$WORKBENCH2_ROOT" ; then
+            git clone https://github.com/curoverse/arvados-workbench2.git "$WORKBENCH2_ROOT"
+        fi
 
         if test "$CONFIG" = test ; then
 
@@ -218,6 +227,7 @@ run() {
                        "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
                        "--volume=$SSO_ROOT:/usr/src/sso:rw" \
                        "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
+                       "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
                        "--volume=$PG_DATA:/var/lib/postgresql:rw" \
                        "--volume=$VAR_DATA:/var/lib/arvados:rw" \
                        "--volume=$PASSENGER:/var/lib/passenger:rw" \
@@ -261,6 +271,7 @@ run() {
                    "--volume=$ARVADOS_ROOT:/usr/src/arvados:rw" \
                    "--volume=$SSO_ROOT:/usr/src/sso:rw" \
                    "--volume=$COMPOSER_ROOT:/usr/src/composer:rw" \
+                   "--volume=$WORKBENCH2_ROOT:/usr/src/workbench2:rw" \
                    "--volume=$PG_DATA:/var/lib/postgresql:rw" \
                    "--volume=$VAR_DATA:/var/lib/arvados:rw" \
                    "--volume=$PASSENGER:/var/lib/passenger:rw" \
@@ -274,6 +285,7 @@ run() {
             updateconf
             wait_for_arvbox
             echo "The Arvados source code is checked out at: $ARVADOS_ROOT"
+           echo "The Arvados testing root certificate is $VAR_DATA/root-cert.pem"
         else
             echo "Unknown configuration '$CONFIG'"
         fi
index 4f915946f9e402e680c4add0331d4e1c18130b33..1c1ad17814b5e4b8f12ce51450afee5dcf8da42a 100644 (file)
@@ -84,7 +84,11 @@ ENV NODEVERSION v6.11.4
 RUN curl -L -f https://nodejs.org/dist/${NODEVERSION}/node-${NODEVERSION}-linux-x64.tar.xz | tar -C /usr/local -xJf - && \
     ln -s ../node-${NODEVERSION}-linux-x64/bin/node ../node-${NODEVERSION}-linux-x64/bin/npm /usr/local/bin
 
+# Set UTF-8 locale
 RUN echo en_US.UTF-8 UTF-8 > /etc/locale.gen && locale-gen
+ENV LANG en_US.UTF-8
+ENV LANGUAGE en_US:en
+ENV LC_ALL en_US.UTF-8
 
 ARG arvados_version
 RUN echo arvados_version is git commit $arvados_version
index 7cb51edfdc61ab6c9b67b29353eeb2f79e8774eb..dbfa3f1124e167ccb4a5898897d36fb1bfa6534d 100644 (file)
@@ -6,6 +6,7 @@ FROM arvados/arvbox-base
 ARG arvados_version
 ARG sso_version=master
 ARG composer_version=master
+ARG workbench2_version=master
 
 RUN cd /usr/src && \
     git clone --no-checkout https://github.com/curoverse/arvados.git && \
@@ -13,7 +14,9 @@ RUN cd /usr/src && \
     git clone --no-checkout https://github.com/curoverse/sso-devise-omniauth-provider.git sso && \
     git -C sso checkout ${sso_version} && \
     git clone --no-checkout https://github.com/curoverse/composer.git && \
-    git -C composer checkout ${composer_version}
+    git -C composer checkout ${composer_version} && \
+    git clone --no-checkout https://github.com/curoverse/arvados-workbench2.git workbench2 && \
+    git -C workbench2 checkout ${workbench2_version}
 
 ADD service/ /var/lib/arvbox/service
 RUN ln -sf /var/lib/arvbox/service /etc
@@ -25,6 +28,7 @@ RUN echo "production" > /var/lib/arvados/workbench_rails_env
 RUN chown -R 1000:1000 /usr/src && /usr/local/lib/arvbox/createusers.sh
 
 RUN sudo -u arvbox /var/lib/arvbox/service/composer/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/workbench2/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/sso/run-service --only-deps
 RUN sudo -u arvbox /var/lib/arvbox/service/api/run-service --only-deps
index 6dd6a65695559a1e0024a0d2af4693632bf6da2e..0f283830f5b4e62fec3f59d761bdfb6704163e4e 100755 (executable)
@@ -38,9 +38,6 @@ if ! test -s /var/lib/arvados/management_token ; then
 fi
 management_token=$(cat /var/lib/arvados/management_token)
 
-# self signed key will be created by SSO server script.
-test -s /var/lib/arvados/self-signed.key
-
 sso_app_secret=$(cat /var/lib/arvados/sso_app_secret)
 
 if test -s /var/lib/arvados/vm-uuid ; then
@@ -58,9 +55,9 @@ $RAILS_ENV:
   sso_app_secret: $sso_app_secret
   sso_app_id: arvados-server
   sso_provider_url: "https://$localip:${services[sso]}"
-  sso_insecure: true
-  workbench_address: "http://$localip/"
-  websocket_address: "ws://$localip:${services[websockets]}/websocket"
+  sso_insecure: false
+  workbench_address: "https://$localip/"
+  websocket_address: "wss://$localip:${services[websockets-ssl]}/websocket"
   git_repo_ssh_base: "git@$localip:"
   git_repo_https_base: "http://$localip:${services[arv-git-httpd]}/"
   new_users_are_active: true
@@ -70,7 +67,7 @@ $RAILS_ENV:
   auto_setup_new_users_with_repository: true
   default_collection_replication: 1
   docker_image_formats: ["v2"]
-  keep_web_service_url: http://$localip:${services[keep-web]}/
+  keep_web_service_url: https://$localip:${services[keep-web-ssl]}/
   ManagementToken: $management_token
 EOF
 
index a82a964ea9c2f7cec5f16fd474664e89acc2a45c..56d0fa01351c20e02039ca0d801dcf3e8ca10cbd 100644 (file)
@@ -19,20 +19,24 @@ fi
 
 declare -A services
 services=(
-  [workbench]=80
+  [workbench]=443
+  [workbench2]=3000
+  [workbench2-ssl]=3001
   [api]=8004
   [controller]=8003
   [controller-ssl]=8000
   [sso]=8900
   [composer]=4200
   [arv-git-httpd]=9001
-  [keep-web]=9002
+  [keep-web]=9003
+  [keep-web-ssl]=9002
   [keepproxy]=25100
   [keepstore0]=25107
   [keepstore1]=25108
   [ssh]=22
   [doc]=8001
-  [websockets]=8002
+  [websockets]=8005
+  [websockets-ssl]=8002
 )
 
 if test "$(id arvbox -u 2>/dev/null)" = 0 ; then
diff --git a/tools/arvbox/lib/arvbox/docker/service/certificate/log/main/.gitstub b/tools/arvbox/lib/arvbox/docker/service/certificate/log/main/.gitstub
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/tools/arvbox/lib/arvbox/docker/service/certificate/log/run b/tools/arvbox/lib/arvbox/docker/service/certificate/log/run
new file mode 120000 (symlink)
index 0000000..d6aef4a
--- /dev/null
@@ -0,0 +1 @@
+/usr/local/lib/arvbox/logger
\ No newline at end of file
diff --git a/tools/arvbox/lib/arvbox/docker/service/certificate/run b/tools/arvbox/lib/arvbox/docker/service/certificate/run
new file mode 100755 (executable)
index 0000000..2b802f2
--- /dev/null
@@ -0,0 +1,81 @@
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+if test ! -s /var/lib/arvados/root-cert.pem ; then
+    # req           signing request sub-command
+    # -new          new certificate request
+    # -nodes        "no des" don't encrypt key
+    # -sha256       include sha256 fingerprint
+    # -x509         generate self-signed certificate
+    # -subj         certificate subject
+    # -reqexts      certificate request extension for subjectAltName
+    # -extensions   certificate request extension for subjectAltName
+    # -config       certificate generation configuration plus subjectAltName
+    # -out          certificate output
+    # -keyout       private key output
+    # -days         certificate lifetime
+    openssl req \
+           -new \
+           -nodes \
+           -sha256 \
+           -x509 \
+           -subj "/C=US/ST=MA/O=Arvados testing/OU=arvbox/CN=arvbox testing root CA for ${uuid_prefix}" \
+           -extensions x509_ext \
+           -config <(cat /etc/ssl/openssl.cnf \
+                         <(printf "\n[x509_ext]\nbasicConstraints=critical,CA:true,pathlen:0\nkeyUsage=critical,keyCertSign,cRLSign")) \
+            -out /var/lib/arvados/root-cert.pem \
+            -keyout /var/lib/arvados/root-cert.key \
+            -days 365
+    chown arvbox:arvbox /var/lib/arvados/root-cert.*
+fi
+
+if test ! -s /var/lib/arvados/server-cert-${localip}.pem ; then
+    # req           signing request sub-command
+    # -new          new certificate request
+    # -nodes        "no des" don't encrypt key
+    # -sha256       include sha256 fingerprint
+    # -subj         certificate subject
+    # -reqexts      certificate request extension for subjectAltName
+    # -extensions   certificate request extension for subjectAltName
+    # -config       certificate generation configuration plus subjectAltName
+    # -out          certificate output
+    # -keyout       private key output
+    # -days         certificate lifetime
+    openssl req \
+           -new \
+           -nodes \
+           -sha256 \
+           -subj "/C=US/ST=MA/O=Arvados testing for ${uuid_prefix}/OU=arvbox/CN=localhost" \
+           -reqexts x509_ext \
+           -extensions x509_ext \
+           -config <(cat /etc/ssl/openssl.cnf \
+                         <(printf "\n[x509_ext]\nkeyUsage=critical,digitalSignature,keyEncipherment\nsubjectAltName=DNS:localhost,IP:$localip")) \
+            -out /var/lib/arvados/server-cert-${localip}.csr \
+            -keyout /var/lib/arvados/server-cert-${localip}.key \
+            -days 365
+
+    openssl x509 \
+           -req \
+           -in /var/lib/arvados/server-cert-${localip}.csr \
+           -CA /var/lib/arvados/root-cert.pem \
+           -CAkey /var/lib/arvados/root-cert.key \
+           -out /var/lib/arvados/server-cert-${localip}.pem \
+           -set_serial $RANDOM$RANDOM \
+           -extfile <(cat /etc/ssl/openssl.cnf \
+                         <(printf "\n[x509_ext]\nkeyUsage=critical,digitalSignature,keyEncipherment\nsubjectAltName=DNS:localhost,IP:$localip")) \
+           -extensions x509_ext
+
+    chown arvbox:arvbox /var/lib/arvados/server-cert-${localip}.*
+fi
+
+cp /var/lib/arvados/root-cert.pem /usr/local/share/ca-certificates/arvados-testing-cert.crt
+update-ca-certificates
+
+sv stop certificate
\ No newline at end of file
index cd2f86a27e8f96c8a8a37d1a7726a6d88c79dbf6..50a8ce1204bc9c9dacbe76055112fa871716987c 100755 (executable)
@@ -5,4 +5,4 @@
 
 set -e
 
-/usr/local/lib/arvbox/runsu.sh $0-service $1
+exec /usr/local/lib/arvbox/runsu.sh $0-service $1
index eea0e120b29917d31f25016da47e94394804a8c5..6055efc4791e93978ac806f2f3111d7e15c758bb 100755 (executable)
@@ -114,7 +114,7 @@ $RAILS_ENV:
   gitolite_tmp: /var/lib/arvados/git
   arvados_api_host: $localip:${services[controller-ssl]}
   arvados_api_token: "$ARVADOS_API_TOKEN"
-  arvados_api_host_insecure: true
+  arvados_api_host_insecure: false
   gitolite_arvados_git_user_key: "$git_user_key"
 EOF
 
index a55660eb8ab1cd7448c00db4da19fc2632dda473..cf72ed2c2c97fc4364d5148fbf08d7b77807fcc3 100755 (executable)
@@ -37,8 +37,8 @@ http {
   server {
     listen *:${services[controller-ssl]} ssl default_server;
     server_name controller;
-    ssl_certificate "/var/lib/arvados/self-signed.pem";
-    ssl_certificate_key "/var/lib/arvados/self-signed.key";
+    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
     location  / {
       proxy_pass http://controller;
       proxy_set_header Host \$http_host;
@@ -47,6 +47,71 @@ http {
       proxy_redirect off;
     }
   }
+
+upstream arvados-ws {
+  server localhost:${services[websockets]};
+}
+server {
+  listen *:${services[websockets-ssl]} ssl default_server;
+  server_name           websockets;
+
+  proxy_connect_timeout 90s;
+  proxy_read_timeout    300s;
+
+  ssl                   on;
+  ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+  ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+
+  location / {
+    proxy_pass          http://arvados-ws;
+    proxy_set_header    Upgrade         \$http_upgrade;
+    proxy_set_header    Connection      "upgrade";
+    proxy_set_header Host \$http_host;
+    proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+  }
+}
+
+  upstream workbench2 {
+    server localhost:${services[workbench2]};
+  }
+  server {
+    listen *:${services[workbench2-ssl]} ssl default_server;
+    server_name workbench2;
+    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+    location  / {
+      proxy_pass http://workbench2;
+      proxy_set_header Host \$http_host;
+      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+      proxy_set_header X-Forwarded-Proto https;
+      proxy_redirect off;
+    }
+    location  /sockjs-node {
+      proxy_pass http://workbench2;
+      proxy_set_header    Upgrade         \$http_upgrade;
+      proxy_set_header    Connection      "upgrade";
+      proxy_set_header Host \$http_host;
+      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+    }
+  }
+
+  upstream keep-web {
+    server localhost:${services[keep-web]};
+  }
+  server {
+    listen *:${services[keep-web-ssl]} ssl default_server;
+    server_name keep-web;
+    ssl_certificate "/var/lib/arvados/server-cert-${localip}.pem";
+    ssl_certificate_key "/var/lib/arvados/server-cert-${localip}.key";
+    location  / {
+      proxy_pass http://keep-web;
+      proxy_set_header Host \$http_host;
+      proxy_set_header X-Forwarded-For \$proxy_add_x_forwarded_for;
+      proxy_set_header X-Forwarded-Proto https;
+      proxy_redirect off;
+    }
+  }
+
 }
 
 EOF
index 7766fb7ec77b687c7339bfe04ca9d15677ac089a..470d10537556ab797b95edb1042b06411703f820 100755 (executable)
@@ -90,6 +90,7 @@ fi
 
 echo
 echo "Your Arvados-in-a-box is ready!"
-echo "Workbench is running at http://$localip"
+echo "Workbench is running at https://$localip"
+echo "Workbench2 is running at https://$localip:${services[workbench2-ssl]}"
 
 rm -r /tmp/arvbox-ready
index 74946f074728e4eb844f093824fed46fd54032a3..b1aedaaf3880273e16f107abfa9338ac183be25e 100755 (executable)
@@ -10,4 +10,4 @@ if ! test -d /var/run/sshd ; then
    mkdir /var/run/sshd
    chmod 0755 /var/run/sshd
 fi
-/usr/sbin/sshd -D
+exec /usr/sbin/sshd -D
index 28140594926be5381737bd85adef390d5fb6f209..af49d4b3c0f829618f6572b800b5eb85597fc779 100755 (executable)
@@ -35,9 +35,7 @@ if ! test -s /var/lib/arvados/sso_secret_token ; then
 fi
 secret_token=$(cat /var/lib/arvados/sso_secret_token)
 
-if ! test -s /var/lib/arvados/self-signed.key ; then
-  openssl req -new -x509 -nodes -out /var/lib/arvados/self-signed.pem -keyout /var/lib/arvados/self-signed.key -days 365 -subj '/CN=localhost'
-fi
+test -s /var/lib/arvados/server-cert-${localip}.pem
 
 cat >config/application.yml <<EOF
 $RAILS_ENV:
@@ -92,5 +90,5 @@ if test "$1" = "--only-setup" ; then
 fi
 
 exec bundle exec passenger start --port=${services[sso]} \
-     --ssl --ssl-certificate=/var/lib/arvados/self-signed.pem \
-     --ssl-certificate-key=/var/lib/arvados/self-signed.key
+     --ssl --ssl-certificate=/var/lib/arvados/server-cert-${localip}.pem \
+     --ssl-certificate-key=/var/lib/arvados/server-cert-${localip}.key
index ebdf266c6b0a981710fa598f87968a2022047149..cc330324743a4814bb4c9fee4e4a22e7b1a287de 100755 (executable)
@@ -28,13 +28,13 @@ database_pw=$(cat /var/lib/arvados/api_database_pw)
 cat >/var/lib/arvados/arvados-ws.yml <<EOF
 Client:
   APIHost: $localip:${services[controller-ssl]}
-  Insecure: true
+  Insecure: false
 Postgres:
   dbname: arvados_$RAILS_ENV
   user: arvados
   password: $database_pw
   host: localhost
-Listen: :8002
+Listen: localhost:${services[websockets]}
 EOF
 
 exec /usr/local/bin/arvados-ws -config /var/lib/arvados/arvados-ws.yml
index 5615884f75c25e6d9be859f6181d464d4bfefab2..e65801b447a6819ce4be7f112f2dbbe5aa6e39a9 100755 (executable)
@@ -23,5 +23,7 @@ fi
 
 if test "$1" != "--only-deps" ; then
     exec bundle exec passenger start --port=${services[workbench]} \
+        --ssl --ssl-certificate=/var/lib/arvados/server-cert-${localip}.pem \
+        --ssl-certificate-key=/var/lib/arvados/server-cert-${localip}.key \
          --user arvbox
 fi
index 366096ace7a24b28f7286f24d13d941bde368846..68c87233f0001b25a05e38917a3b1356fa49822c 100755 (executable)
@@ -33,18 +33,14 @@ if ! test -s /var/lib/arvados/workbench_secret_token ; then
 fi
 secret_token=$(cat /var/lib/arvados/workbench_secret_token)
 
-if ! test -s self-signed.key ; then
-  openssl req -new -x509 -nodes -out self-signed.pem -keyout self-signed.key -days 365 -subj '/CN=localhost'
-fi
-
 cat >config/application.yml <<EOF
 $RAILS_ENV:
   secret_token: $secret_token
   arvados_login_base: https://$localip:${services[controller-ssl]}/login
   arvados_v1_base: https://$localip:${services[controller-ssl]}/arvados/v1
-  arvados_insecure_https: true
-  keep_web_download_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
-  keep_web_url: http://$localip:${services[keep-web]}/c=%{uuid_or_pdh}
+  arvados_insecure_https: false
+  keep_web_download_url: https://$localip:${services[keep-web-ssl]}/c=%{uuid_or_pdh}
+  keep_web_url: https://$localip:${services[keep-web-ssl]}/c=%{uuid_or_pdh}
   arvados_docsite: http://$localip:${services[doc]}/
   force_ssl: false
   composer_url: http://$localip:${services[composer]}
diff --git a/tools/arvbox/lib/arvbox/docker/service/workbench2/log/main/.gitstub b/tools/arvbox/lib/arvbox/docker/service/workbench2/log/main/.gitstub
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/tools/arvbox/lib/arvbox/docker/service/workbench2/log/run b/tools/arvbox/lib/arvbox/docker/service/workbench2/log/run
new file mode 120000 (symlink)
index 0000000..d6aef4a
--- /dev/null
@@ -0,0 +1 @@
+/usr/local/lib/arvbox/logger
\ No newline at end of file
diff --git a/tools/arvbox/lib/arvbox/docker/service/workbench2/run b/tools/arvbox/lib/arvbox/docker/service/workbench2/run
new file mode 100755 (executable)
index 0000000..50a8ce1
--- /dev/null
@@ -0,0 +1,8 @@
+#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+set -e
+
+exec /usr/local/lib/arvbox/runsu.sh $0-service $1
diff --git a/tools/arvbox/lib/arvbox/docker/service/workbench2/run-service b/tools/arvbox/lib/arvbox/docker/service/workbench2/run-service
new file mode 100755 (executable)
index 0000000..2dbef4a
--- /dev/null
@@ -0,0 +1,33 @@
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+.  /usr/local/lib/arvbox/common.sh
+
+cd /usr/src/workbench2
+
+npm -d install --prefix /usr/local --global yarn
+
+yarn install
+
+if test "$1" = "--only-deps" ; then
+    exit
+fi
+
+cat <<EOF > /usr/src/workbench2/public/config.json
+{
+  "API_HOST": "${localip}:${services[controller-ssl]}",
+  "VOCABULARY_URL": "vocabulary-example.json",
+  "FILE_VIEWERS_CONFIG_URL": "file-viewers-example.json"
+}
+EOF
+
+export HTTPS=false
+# Can't use "yarn start", need to run the dev server script
+# directly so that the TERM signal from "sv restart" gets to the
+# right process.
+exec node node_modules/react-scripts-ts/scripts/start.js
index c0e6fee5dbe63ed053c70fcc1ac0875f13813381..ec296d21d1c25a92c14639c36658c4b0aa10bce5 100644 (file)
                        "revision": "b8bc1bf767474819792c23f32d8286a45736f1c6",
                        "revisionTime": "2016-12-03T19:45:07Z"
                },
+               {
+                       "checksumSHA1": "ewGq4nGalpCQOHcmBTdAEQx1wW0=",
+                       "path": "github.com/mitchellh/mapstructure",
+                       "revision": "bb74f1db0675b241733089d5a1faa5dd8b0ef57b",
+                       "revisionTime": "2018-05-11T14:21:26Z"
+               },
                {
                        "checksumSHA1": "OFNit1Qx2DdWhotfREKodDNUwCM=",
                        "path": "github.com/opencontainers/go-digest",