Merge branch 'master' into 11850-singlecontainer-max-requirements
authorjiayong2 <jiayong@math.mit.edu>
Mon, 26 Feb 2018 20:11:08 +0000 (20:11 +0000)
committerjiayong2 <jiayong@math.mit.edu>
Mon, 26 Feb 2018 20:11:08 +0000 (20:11 +0000)
Arvados-DCO-1.1-Signed-off-by: Jiayong Li <jiayong@math.mit.edu>

80 files changed:
apps/workbench/app/assets/javascripts/components/sessions.js
apps/workbench/app/assets/javascripts/work_unit_log.js
apps/workbench/app/views/container_requests/_show_object_description_cell.html.erb [new file with mode: 0644]
apps/workbench/app/views/container_requests/_state_label.html.erb [new file with mode: 0644]
apps/workbench/app/views/jobs/_show_log.html.erb
apps/workbench/app/views/pipeline_instances/_show_log.html.erb
apps/workbench/app/views/work_units/_show_log.html.erb
apps/workbench/test/controllers/container_requests_controller_test.rb
build/run-tests.sh
doc/_config.yml
doc/sdk/R/index.html.textile.liquid [moved from doc/sdk/R/R.html.textile.liquid with 100% similarity]
doc/sdk/index.html.textile.liquid
lib/dispatchcloud/gocheck_test.go [new file with mode: 0644]
lib/dispatchcloud/node_size.go [new file with mode: 0644]
lib/dispatchcloud/node_size_test.go [new file with mode: 0644]
sdk/R/install_deps.R [new file with mode: 0644]
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/collection_fs_test.go
sdk/go/arvados/config.go
sdk/go/arvados/container.go
sdk/go/httpserver/logger.go
sdk/go/httpserver/responsewriter.go
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/keep.py
sdk/python/tests/test_keep_client.py
services/api/app/controllers/application_controller.rb
services/api/app/controllers/arvados/v1/nodes_controller.rb
services/api/app/models/collection.rb
services/api/app/models/node.rb
services/api/db/migrate/20180216203422_add_storage_classes_to_collections.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/fixtures/collections.yml
services/api/test/functional/arvados/v1/filters_test.rb
services/api/test/functional/arvados/v1/nodes_controller_test.rb
services/api/test/unit/collection_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/squeue.go
services/crunch-dispatch-slurm/usage.go
services/fuse/arvados_fuse/__init__.py
services/fuse/arvados_fuse/fresh.py
services/fuse/arvados_fuse/fusefile.py
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/bufferpool.go
services/keepstore/bufferpool_test.go
services/keepstore/config.go
services/keepstore/config_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/mounts_test.go
services/keepstore/pull_worker.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/trash_worker.go
services/keepstore/volume.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/nodelist.py
services/nodemanager/tests/integration_test.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
services/nodemanager/tests/test_computenode_driver_gce.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_jobqueue.py
services/nodemanager/tests/test_nodelist.py
services/nodemanager/tests/testutil.py
tools/arvbox/lib/arvbox/docker/service/doc/run-service
tools/sync-groups/sync-groups.go
vendor/vendor.json

index e172d3a3f3500c36b3b978b3cf3da79ce7693f01..04ca6ac9268c60c5c8c80e874099549e912a529b 100644 (file)
@@ -61,10 +61,12 @@ window.SessionsTable = {
                                     onclick: db.login.bind(db, session.baseURL),
                                 }, session.listedHost ? 'Enable ':'Log in ', m('span.glyphicon.glyphicon-log-in')))
                             ],
-                            m('td', session.isFromRails ? null : m('button.btn.btn-xs.btn-default', {
-                                uuidPrefix: uuidPrefix,
-                                onclick: m.withAttr('uuidPrefix', db.trash),
-                            }, 'Remove ', m('span.glyphicon.glyphicon-trash'))),
+                            m('td', (session.isFromRails || session.listedHost) ? null :
+                                m('button.btn.btn-xs.btn-default', {
+                                    uuidPrefix: uuidPrefix,
+                                    onclick: m.withAttr('uuidPrefix', db.trash),
+                                }, 'Remove ', m('span.glyphicon.glyphicon-trash'))
+                            ),
                         ])
                     }),
                 ]),
index 4962994cdef7aec94e2d1430454b07131628da1f..c43bae0e3c7e080d01c0fc69d19e2435612777e1 100644 (file)
@@ -57,7 +57,7 @@ $(document).on('arv-log-event', '.arv-log-event-handler-append-logs', function(e
         return;
     }
 
-    wasatbottom = ($(this).scrollTop() + $(this).height() >= this.scrollHeight);
+    wasatbottom = (this.scrollTop + this.clientHeight >= this.scrollHeight);
     if (eventData.prepend) {
         $(this).prepend(txt);
     } else {
diff --git a/apps/workbench/app/views/container_requests/_show_object_description_cell.html.erb b/apps/workbench/app/views/container_requests/_show_object_description_cell.html.erb
new file mode 100644 (file)
index 0000000..2df207a
--- /dev/null
@@ -0,0 +1,8 @@
+<%# Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: AGPL-3.0 %>
+
+<div class="nowrap">
+  <%= object.content_summary %><br />
+  <%= render partial: 'container_requests/state_label', locals: {object: object} %>
+</div>
diff --git a/apps/workbench/app/views/container_requests/_state_label.html.erb b/apps/workbench/app/views/container_requests/_state_label.html.erb
new file mode 100644 (file)
index 0000000..1ddd2b2
--- /dev/null
@@ -0,0 +1,8 @@
+<%# Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: AGPL-3.0 %>
+
+<% wu = object.work_unit object.name %>
+<span class="label label-<%=wu.state_bootstrap_class%>">
+  <%=wu.state_label%>
+</span>
index 821b4bcdf27de248d1b3b29a81e0fb4e18bad04b..e84641d431205cdf816822836071183a61009822 100644 (file)
@@ -9,11 +9,11 @@ SPDX-License-Identifier: AGPL-3.0 %>
      style="display:none"
      data-object-uuid="<%= @object.uuid %>"></div>
 
-<div id="event_log_div"
+<pre id="event_log_div"
      class="arv-log-event-listener arv-log-event-handler-append-logs arv-job-log-window"
      data-object-uuid="<%= @object.uuid %>"
   ><%= @object.stderr_log_lines(Rails.configuration.running_job_log_records_to_fetch).join("\n") %>
-</div>
+</pre>
 
 <%# Applying a long throttle suppresses the auto-refresh of this
     partial that would normally be triggered by arv-log-event. %>
index 9c779bc679dbac3a0833a12b7f0246e7c3146980..24937ba0fdc03f56eabd1562fb6a957872ce5285 100644 (file)
@@ -35,10 +35,10 @@ SPDX-License-Identifier: AGPL-3.0 %>
 <% unless still_logging.empty? %>
   <h4>Logs in progress</h4>
 
-  <div id="event_log_div"
+  <pre id="event_log_div"
        class="arv-log-event-listener arv-log-event-handler-append-logs arv-log-event-subscribe-to-pipeline-job-uuids arv-job-log-window"
        data-object-uuids="<%= @object.stderr_log_object_uuids.join(' ') %>"
-       ><%= @object.stderr_log_lines.join("\n") %></div>
+       ><%= @object.stderr_log_lines.join("\n") %></pre>
 
   <%# Applying a long throttle suppresses the auto-refresh of this
       partial that would normally be triggered by arv-log-event. %>
index 0d86eab50d81d8e4e69ec416777cf77c49629f45..1f643acdc9ff6a1dc042a3de86f6ab9a78357481 100644 (file)
@@ -18,11 +18,11 @@ SPDX-License-Identifier: AGPL-3.0 %>
 <%# Still running, or recently finished and logs are still available from logs table %>
 <%# Show recent logs in terminal window %>
 <h4>Recent logs</h4>
-<div id="event_log_div"
+<pre id="event_log_div"
      class="arv-log-event-listener arv-log-event-handler-append-logs arv-job-log-window"
      data-object-uuids="<%= wu.log_object_uuids.join(' ') %>"
   ><%= live_log_lines %>
-</div>
+</pre>
 
 <%# Applying a long throttle suppresses the auto-refresh of this
     partial that would normally be triggered by arv-log-event. %>
index 261169cd1f954c352aaba6e58e577c9d0a955b4d..89e1506f4bf4163e5818f9eae0d209551e195d75 100644 (file)
@@ -17,7 +17,7 @@ class ContainerRequestsControllerTest < ActionController::TestCase
 
     assert_select "a", {:href=>"/collections/#{container['log']}", :text=>"Download the log"}
     assert_select "a", {:href=>"#{container['log']}/baz"}
-    assert_not_includes @response.body, '<div id="event_log_div"'
+    assert_not_includes @response.body, '<pre id="event_log_div"'
   end
 
   test "visit running container request log tab" do
@@ -30,7 +30,7 @@ class ContainerRequestsControllerTest < ActionController::TestCase
     get :show, {id: cr['uuid'], tab_pane: 'Log'}, session_for(:active)
     assert_response :success
 
-    assert_includes @response.body, '<div id="event_log_div"'
+    assert_includes @response.body, '<pre id="event_log_div"'
     assert_select 'Download the log', false
   end
 
index ec3c4637703abdd4749d6efd00980f854c6ee02f..48b3eab38ac864ab0c66d4a40d17502f60d4cdb3 100755 (executable)
@@ -74,6 +74,7 @@ doc
 lib/cli
 lib/cmd
 lib/crunchstat
+lib/dispatchcloud
 services/api
 services/arv-git-httpd
 services/crunchstat
@@ -780,19 +781,7 @@ do_install sdk/ruby ruby_sdk
 
 install_R_sdk() {
     cd "$WORKSPACE/sdk/R" \
-       && R --quiet --vanilla <<EOF
-options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
-if (!requireNamespace("devtools")) {
-  install.packages("devtools")
-}
-if (!requireNamespace("roxygen2")) {
-  install.packages("roxygen2")
-}
-if (!requireNamespace("pkgdown")) {
-  devtools::install_github("hadley/pkgdown")
-}
-devtools::install_dev_deps()
-EOF
+       && R --quiet --vanilla --file=install_deps.R
 }
 do_install sdk/R R_sdk
 
@@ -888,6 +877,7 @@ gostuff=(
     lib/cli
     lib/cmd
     lib/crunchstat
+    lib/dispatchcloud
     sdk/go/arvados
     sdk/go/arvadosclient
     sdk/go/blockdigest
index 680f940f26b04f3c4fa9885ccd0fa291d59fc7a8..9be14fdd50eff031ff8818038fba9ad383e29177 100644 (file)
@@ -104,7 +104,7 @@ navbar:
       - sdk/go/index.html.textile.liquid
       - sdk/go/example.html.textile.liquid
     - R:
-      - sdk/R/R.html.textile.liquid
+      - sdk/R/index.html.textile.liquid
     - Perl:
       - sdk/perl/index.html.textile.liquid
       - sdk/perl/example.html.textile.liquid
index fbc5ca11b68ae09dfffefde5ddc3274835ceb940..dbfcaedc71742d56cbfbb2facf500b379ec6b196 100644 (file)
@@ -14,7 +14,7 @@ This section documents language bindings for the "Arvados API":{{site.baseurl}}/
 * "Python SDK":{{site.baseurl}}/sdk/python/sdk-python.html
 * "Command line SDK":{{site.baseurl}}/sdk/cli/install.html ("arv")
 * "Go SDK":{{site.baseurl}}/sdk/go/index.html
-* "R SDK":{{site.baseurl}}/sdk/go/index.html
+* "R SDK":{{site.baseurl}}/sdk/R/index.html
 * "Perl SDK":{{site.baseurl}}/sdk/perl/index.html
 * "Ruby SDK":{{site.baseurl}}/sdk/ruby/index.html
 * "Java SDK":{{site.baseurl}}/sdk/java/index.html
diff --git a/lib/dispatchcloud/gocheck_test.go b/lib/dispatchcloud/gocheck_test.go
new file mode 100644 (file)
index 0000000..22f89f0
--- /dev/null
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "testing"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
diff --git a/lib/dispatchcloud/node_size.go b/lib/dispatchcloud/node_size.go
new file mode 100644 (file)
index 0000000..34f83a6
--- /dev/null
@@ -0,0 +1,128 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "bytes"
+       "errors"
+       "log"
+       "os/exec"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+       ErrConstraintsNotSatisfiable  = errors.New("constraints not satisfiable by any configured instance type")
+       ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+       discountConfiguredRAMPercent  = 5
+)
+
+// ChooseInstanceType returns the cheapest available
+// arvados.InstanceType big enough to run ctr.
+func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
+       if len(cc.InstanceTypes) == 0 {
+               err = ErrInstanceTypesNotConfigured
+               return
+       }
+
+       needScratch := int64(0)
+       for _, m := range ctr.Mounts {
+               if m.Kind == "tmp" {
+                       needScratch += m.Capacity
+               }
+       }
+
+       needVCPUs := ctr.RuntimeConstraints.VCPUs
+
+       needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
+       needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
+
+       err = ErrConstraintsNotSatisfiable
+       for _, it := range cc.InstanceTypes {
+               switch {
+               case err == nil && it.Price > best.Price:
+               case it.Scratch < needScratch:
+               case it.RAM < needRAM:
+               case it.VCPUs < needVCPUs:
+               case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
+                       // Equal price, but worse specs
+               default:
+                       // Lower price || (same price && better specs)
+                       best = it
+                       err = nil
+               }
+       }
+       return
+}
+
+// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
+// type name as a valid feature name, even if no instances of that
+// type have appeared yet.
+//
+// It takes advantage of some SLURM peculiarities:
+//
+// (1) A feature is valid after it has been offered by a node, even if
+// it is no longer offered by any node. So, to make a feature name
+// valid, we can add it to a dummy node ("compute0"), then remove it.
+//
+// (2) when srun is given an invalid --gres argument and an invalid
+// --constraint argument, the error message mentions "Invalid feature
+// specification". So, to test whether a feature name is valid without
+// actually submitting a job, we can call srun with the feature name
+// and an invalid --gres argument.
+//
+// SlurmNodeTypeFeatureKludge does a test-and-fix operation
+// immediately, and then periodically, in case slurm restarts and
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
+func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
+       if len(cc.InstanceTypes) == 0 {
+               return
+       }
+       var features []string
+       for _, it := range cc.InstanceTypes {
+               features = append(features, "instancetype="+it.Name)
+       }
+       for {
+               slurmKludge(features)
+               time.Sleep(time.Minute)
+       }
+}
+
+var (
+       slurmDummyNode     = "compute0"
+       slurmErrBadFeature = "Invalid feature"
+       slurmErrBadGres    = "Invalid generic resource"
+)
+
+func slurmKludge(features []string) {
+       cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
+       out, err := cmd.CombinedOutput()
+       switch {
+       case err == nil:
+               log.Printf("warning: guaranteed-to-fail srun command did not fail: %q %q", cmd.Path, cmd.Args)
+               log.Printf("output was: %q", out)
+
+       case bytes.Contains(out, []byte(slurmErrBadFeature)):
+               log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
+               for _, nodeFeatures := range []string{strings.Join(features, ","), ""} {
+                       cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+nodeFeatures)
+                       log.Printf("running: %q %q", cmd.Path, cmd.Args)
+                       out, err := cmd.CombinedOutput()
+                       if err != nil {
+                               log.Printf("error: scontrol: %s (output was %q)", err, out)
+                       }
+               }
+
+       case bytes.Contains(out, []byte(slurmErrBadGres)):
+               // Evidently our node-type feature names are all valid.
+
+       default:
+               log.Printf("warning: expected srun error %q or %q, but output was %q", slurmErrBadFeature, slurmErrBadGres, out)
+       }
+}
diff --git a/lib/dispatchcloud/node_size_test.go b/lib/dispatchcloud/node_size_test.go
new file mode 100644 (file)
index 0000000..0c02a0e
--- /dev/null
@@ -0,0 +1,93 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&NodeSizeSuite{})
+
+const GiB = int64(1 << 30)
+
+type NodeSizeSuite struct{}
+
+func (*NodeSizeSuite) TestChooseNotConfigured(c *check.C) {
+       _, err := ChooseInstanceType(&arvados.Cluster{}, &arvados.Container{
+               RuntimeConstraints: arvados.RuntimeConstraints{
+                       RAM:   1234567890,
+                       VCPUs: 2,
+               },
+       })
+       c.Check(err, check.Equals, ErrInstanceTypesNotConfigured)
+}
+
+func (*NodeSizeSuite) TestChooseUnsatisfiable(c *check.C) {
+       checkUnsatisfiable := func(ctr *arvados.Container) {
+               _, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: []arvados.InstanceType{
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small1"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4", Scratch: GiB},
+               }}, ctr)
+               c.Check(err, check.Equals, ErrConstraintsNotSatisfiable)
+       }
+
+       for _, rc := range []arvados.RuntimeConstraints{
+               {RAM: 9876543210, VCPUs: 2},
+               {RAM: 1234567890, VCPUs: 20},
+               {RAM: 1234567890, VCPUs: 2, KeepCacheRAM: 9876543210},
+       } {
+               checkUnsatisfiable(&arvados.Container{RuntimeConstraints: rc})
+       }
+       checkUnsatisfiable(&arvados.Container{
+               Mounts:             map[string]arvados.Mount{"/tmp": {Kind: "tmp", Capacity: 2 * GiB}},
+               RuntimeConstraints: arvados.RuntimeConstraints{RAM: 12345, VCPUs: 1},
+       })
+}
+
+func (*NodeSizeSuite) TestChoose(c *check.C) {
+       for _, menu := range [][]arvados.InstanceType{
+               {
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+               },
+               {
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "goodenough"},
+                       {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+               },
+               {
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Name: "small"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "goodenough"},
+                       {Price: 2.2, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+               },
+               {
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: GiB, Name: "small"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: GiB, Name: "nearly"},
+                       {Price: 3.3, RAM: 4000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "best"},
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Name: "costly"},
+               },
+       } {
+               best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
+                       Mounts: map[string]arvados.Mount{
+                               "/tmp": {Kind: "tmp", Capacity: 2 * GiB},
+                       },
+                       RuntimeConstraints: arvados.RuntimeConstraints{
+                               VCPUs:        2,
+                               RAM:          987654321,
+                               KeepCacheRAM: 123456789,
+                       },
+               })
+               c.Check(err, check.IsNil)
+               c.Check(best.Name, check.Equals, "best")
+               c.Check(best.RAM >= 1234567890, check.Equals, true)
+               c.Check(best.VCPUs >= 2, check.Equals, true)
+               c.Check(best.Scratch >= 2*GiB, check.Equals, true)
+       }
+}
diff --git a/sdk/R/install_deps.R b/sdk/R/install_deps.R
new file mode 100644 (file)
index 0000000..a54a9a2
--- /dev/null
@@ -0,0 +1,18 @@
+options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
+if (!requireNamespace("devtools")) {
+  install.packages("devtools")
+}
+if (!requireNamespace("roxygen2")) {
+  install.packages("roxygen2")
+}
+
+# These install from github so install known-good versions instead of
+# letting any push to master break our build.
+if (!requireNamespace("pkgload")) {
+  devtools::install_github("r-lib/pkgload", ref="7a97de62adf1793c03e73095937e4655baad79c9")
+}
+if (!requireNamespace("pkgdown")) {
+  devtools::install_github("r-lib/pkgdown", ref="897ffbc016549c11c4263cb5d1f6e9f5c99efb45")
+}
+
+devtools::install_dev_deps()
index 71ddd172214c4dac1b20907c8cf5a18bce6c37b2..4701b4d8f13a29a2c1dc8f3bc5558de788a5a1fa 100644 (file)
@@ -405,6 +405,7 @@ class ArvCwlRunner(object):
                         "success")
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+        self.eval_timeout = kwargs.get("eval_timeout")
 
         kwargs["make_fs_access"] = make_fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
@@ -413,6 +414,8 @@ class ArvCwlRunner(object):
         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
 
         if self.work_api == "containers":
+            if self.ignore_docker_for_reuse:
+                raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
index abe67c8fb3c552c7e66925093ae1377b1e26b4e9..a2aaa8d49e176a1795c5a5f1d7c17f4a84b658ad 100644 (file)
@@ -385,6 +385,8 @@ class RunnerContainer(Runner):
         if self.arvrunner.project_uuid:
             command.append("--project-uuid="+self.arvrunner.project_uuid)
 
+        command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
index 69f918ead939c4a56fdbed12449c3cf2fca630ec..0b577b06a2e324dbea743244da955f2661a52bea 100644 (file)
@@ -32,6 +32,7 @@ class CollectionCache(object):
                  min_entries=2):
         self.api_client = api_client
         self.keep_client = keep_client
+        self.num_retries = num_retries
         self.collections = OrderedDict()
         self.lock = threading.Lock()
         self.total = 0
@@ -54,7 +55,8 @@ class CollectionCache(object):
             if pdh not in self.collections:
                 logger.debug("Creating collection reader for %s", pdh)
                 cr = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
-                                                         keep_client=self.keep_client)
+                                                         keep_client=self.keep_client,
+                                                         num_retries=self.num_retries)
                 sz = len(cr.manifest_text()) * 128
                 self.collections[pdh] = (cr, sz)
                 self.total += sz
index 9cabea0794716fa79d88752616d4e7205ae6b4eb..4ab5fb524c8427b21b64c8f1aa15dbbfe10b3cb8 100644 (file)
@@ -234,7 +234,7 @@ def stubs(func):
             'state': 'Committed',
             'owner_uuid': None,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                        '--enable-reuse', '--on-error=continue',
+                        '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
@@ -499,7 +499,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = [
             'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-            '--disable-reuse', '--on-error=continue',
+            '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
 
@@ -522,7 +522,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = [
             'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-            '--disable-reuse', '--on-error=continue',
+            '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
         expect_container["name"] = "submit_wf_no_reuse.cwl"
@@ -557,7 +557,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  '--enable-reuse', '--on-error=stop',
+                                                  '--enable-reuse', '--on-error=stop', '--eval-timeout=20',
                                                   '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -581,7 +581,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  "--output-name="+output_name, '--enable-reuse', '--on-error=continue',
+                                                  "--output-name="+output_name, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                                                   '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["output_name"] = output_name
 
@@ -606,7 +606,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                                        '--enable-reuse', '--on-error=continue',
-                                       "--intermediate-output-ttl=3600",
+                                       "--intermediate-output-ttl=3600", '--eval-timeout=20',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -629,7 +629,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
                                        '--enable-reuse', '--on-error=continue',
-                                       "--trash-intermediate",
+                                       "--trash-intermediate", '--eval-timeout=20',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -653,7 +653,7 @@ class TestSubmit(unittest.TestCase):
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                                  "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue',
+                                                  "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                                                   '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -735,7 +735,7 @@ class TestSubmit(unittest.TestCase):
             'name': 'expect_arvworkflow.cwl#main',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                        '--enable-reuse', '--on-error=continue',
+                        '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                         '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
@@ -851,7 +851,7 @@ class TestSubmit(unittest.TestCase):
             'name': 'a test workflow',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                        '--enable-reuse', '--on-error=continue',
+                        '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
@@ -908,7 +908,30 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["owner_uuid"] = project_uuid
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
-                                       '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+                                       '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid, '--eval-timeout=20',
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+    @stubs
+    def test_submit_container_eval_timeout(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--eval-timeout=60",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                       '--enable-reuse', '--on-error=continue', '--eval-timeout=60.0',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
index bd5d08bcf5e8f278606be6bac2037ce7b9215ecb..5b9d0e2effc153ca5322a8ffea3955301def2000 100644 (file)
@@ -433,7 +433,7 @@ func (s *CollectionFSSuite) TestMkdir(c *check.C) {
        err = s.fs.Remove("foo/bar")
        c.Check(err, check.IsNil)
 
-       // mkdir succeds after the file is deleted
+       // mkdir succeeds after the file is deleted
        err = s.fs.Mkdir("foo/bar", 0755)
        c.Check(err, check.IsNil)
 
index ca0df1fc907b86ed4bec09b3a05c1697a6295cd4..9ed0eacf23e6d753c1b6c2a0f781282c96dde8cc 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package arvados
 
 import (
@@ -48,6 +52,16 @@ type Cluster struct {
        ClusterID       string `json:"-"`
        ManagementToken string
        SystemNodes     map[string]SystemNode
+       InstanceTypes   []InstanceType
+}
+
+type InstanceType struct {
+       Name         string
+       ProviderType string
+       VCPUs        int
+       RAM          int64
+       Scratch      int64
+       Price        float64
 }
 
 // GetThisSystemNode returns a SystemNode for the node we're running
index a541a8dca77fb03b9d6728fd8c9c13c5836414c8..20d007c5c818daf3190b9852e840fd8a2f97d47e 100644 (file)
@@ -41,9 +41,9 @@ type Mount struct {
 // CPU) and network connectivity.
 type RuntimeConstraints struct {
        API          *bool
-       RAM          int `json:"ram"`
-       VCPUs        int `json:"vcpus"`
-       KeepCacheRAM int `json:"keep_cache_ram"`
+       RAM          int64 `json:"ram"`
+       VCPUs        int   `json:"vcpus"`
+       KeepCacheRAM int64 `json:"keep_cache_ram"`
 }
 
 // SchedulingParameters specify a container's scheduling parameters
index decb2ff28b7a650546253aeea09675d70256dbea..569931a3edd732b4fb3d48a09db318622bd08075 100644 (file)
@@ -10,7 +10,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/stats"
-       log "github.com/Sirupsen/logrus"
+       "github.com/Sirupsen/logrus"
 )
 
 type contextKey struct {
@@ -19,13 +19,15 @@ type contextKey struct {
 
 var requestTimeContextKey = contextKey{"requestTime"}
 
+var Logger logrus.FieldLogger = logrus.StandardLogger()
+
 // LogRequests wraps an http.Handler, logging each request and
 // response via logrus.
 func LogRequests(h http.Handler) http.Handler {
        return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
                w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
                req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
-               lgr := log.WithFields(log.Fields{
+               lgr := Logger.WithFields(logrus.Fields{
                        "RequestID":       req.Header.Get("X-Request-Id"),
                        "remoteAddr":      req.RemoteAddr,
                        "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
@@ -39,22 +41,26 @@ func LogRequests(h http.Handler) http.Handler {
        })
 }
 
-func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+func logRequest(w *responseTimer, req *http.Request, lgr *logrus.Entry) {
        lgr.Info("request")
 }
 
-func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+func logResponse(w *responseTimer, req *http.Request, lgr *logrus.Entry) {
        if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
                tDone := time.Now()
-               lgr = lgr.WithFields(log.Fields{
+               lgr = lgr.WithFields(logrus.Fields{
                        "timeTotal":     stats.Duration(tDone.Sub(tStart)),
                        "timeToStatus":  stats.Duration(w.writeTime.Sub(tStart)),
                        "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
                })
        }
-       lgr.WithFields(log.Fields{
-               "respStatusCode": w.WroteStatus(),
-               "respStatus":     http.StatusText(w.WroteStatus()),
+       respCode := w.WroteStatus()
+       if respCode == 0 {
+               respCode = http.StatusOK
+       }
+       lgr.WithFields(logrus.Fields{
+               "respStatusCode": respCode,
+               "respStatus":     http.StatusText(respCode),
                "respBytes":      w.WroteBodyBytes(),
        }).Info("response")
 }
@@ -65,6 +71,13 @@ type responseTimer struct {
        writeTime time.Time
 }
 
+func (rt *responseTimer) CloseNotify() <-chan bool {
+       if cn, ok := rt.ResponseWriter.(http.CloseNotifier); ok {
+               return cn.CloseNotify()
+       }
+       return nil
+}
+
 func (rt *responseTimer) WriteHeader(code int) {
        if !rt.wrote {
                rt.wrote = true
index f17bc820a98f5c242ffc24122d1176cc5c21a57e..d37822ffe3e5cd0f582a59a3ee45b1d322fed4ac 100644 (file)
@@ -28,6 +28,13 @@ func WrapResponseWriter(orig http.ResponseWriter) ResponseWriter {
        return &responseWriter{ResponseWriter: orig}
 }
 
+func (w *responseWriter) CloseNotify() <-chan bool {
+       if cn, ok := w.ResponseWriter.(http.CloseNotifier); ok {
+               return cn.CloseNotify()
+       }
+       return nil
+}
+
 func (w *responseWriter) WriteHeader(s int) {
        w.wroteStatus = s
        w.ResponseWriter.WriteHeader(s)
index aa6bdad90bea0551f7043fbdd0889f6dea2ff6a8..f4580f346bbed43a5642e974d54c8e5922c24efd 100644 (file)
@@ -866,6 +866,9 @@ class ArvadosFile(object):
 
     """
 
+    __slots__ = ('parent', 'name', '_writers', '_committed',
+                 '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
     def __init__(self, parent, name, stream=[], segments=[]):
         """
         ArvadosFile constructor.
index 4be098d3511656e42a176b5fe46ea0de83355b10..33333ee86558c4b0244917a9ffb2c75645d321fa 100644 (file)
@@ -1531,6 +1531,10 @@ class Collection(RichCollectionBase):
 
         return text
 
+    _token_re = re.compile(r'(\S+)(\s+|$)')
+    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1549,7 +1553,7 @@ class Collection(RichCollectionBase):
         stream_name = None
         state = STREAM_NAME
 
-        for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+        for token_and_separator in self._token_re.finditer(manifest_text):
             tok = token_and_separator.group(1)
             sep = token_and_separator.group(2)
 
@@ -1564,7 +1568,7 @@ class Collection(RichCollectionBase):
                 continue
 
             if state == BLOCKS:
-                block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+                block_locator = self._block_re.match(tok)
                 if block_locator:
                     blocksize = int(block_locator.group(1))
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
@@ -1573,7 +1577,7 @@ class Collection(RichCollectionBase):
                     state = SEGMENTS
 
             if state == SEGMENTS:
-                file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+                file_segment = self._segment_re.match(tok)
                 if file_segment:
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
index 351f7f5dda8a96ebb805fd4d4896380cb3addbb8..e8e95afc7013650c67e753a3f2de4e7ec227fc44 100644 (file)
@@ -541,7 +541,7 @@ class KeepClient(object):
             self._lastheadername = name
             self._headers[name] = value
             # Returning None implies all bytes were written
-    
+
 
     class KeepWriterQueue(queue.Queue):
         def __init__(self, copies):
@@ -552,19 +552,19 @@ class KeepClient(object):
             self.successful_copies_lock = threading.Lock()
             self.pending_tries = copies
             self.pending_tries_notification = threading.Condition()
-        
+
         def write_success(self, response, replicas_nr):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
             with self.pending_tries_notification:
                 self.pending_tries_notification.notify_all()
-        
+
         def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
-        
+
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
@@ -613,25 +613,25 @@ class KeepClient(object):
             for _ in range(num_threads):
                 w = KeepClient.KeepWriterThread(self.queue, data, data_hash, timeout)
                 self.workers.append(w)
-        
+
         def add_task(self, ks, service_root):
             self.queue.put((ks, service_root))
             self.total_task_nr += 1
-        
+
         def done(self):
             return self.queue.successful_copies
-        
+
         def join(self):
             # Start workers
             for worker in self.workers:
                 worker.start()
             # Wait for finished work
             self.queue.join()
-        
+
         def response(self):
             return self.queue.response
-    
-    
+
+
     class KeepWriterThread(threading.Thread):
         TaskFailed = RuntimeError()
 
@@ -996,84 +996,90 @@ class KeepClient(object):
 
         self.get_counter.add(1)
 
-        locator = KeepLocator(loc_s)
-        if method == "GET":
-            slot, first = self.block_cache.reserve_cache(locator.md5sum)
-            if not first:
-                self.hits_counter.add(1)
-                v = slot.get()
-                return v
-
-        self.misses_counter.add(1)
-
-        headers = {
-            'X-Request-Id': (request_id or
-                             (hasattr(self, 'api_client') and self.api_client.request_id) or
-                             arvados.util.new_request_id()),
-        }
-
-        # If the locator has hints specifying a prefix (indicating a
-        # remote keepproxy) or the UUID of a local gateway service,
-        # read data from the indicated service(s) instead of the usual
-        # list of local disk services.
-        hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
-                      for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
-        hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
-                           for hint in locator.hints if (
-                                   hint.startswith('K@') and
-                                   len(hint) == 29 and
-                                   self._gateway_services.get(hint[2:])
-                                   )])
-        # Map root URLs to their KeepService objects.
-        roots_map = {
-            root: self.KeepService(root, self._user_agent_pool,
-                                   upload_counter=self.upload_counter,
-                                   download_counter=self.download_counter,
-                                   headers=headers)
-            for root in hint_roots
-        }
-
-        # See #3147 for a discussion of the loop implementation.  Highlights:
-        # * Refresh the list of Keep services after each failure, in case
-        #   it's being updated.
-        # * Retry until we succeed, we're out of retries, or every available
-        #   service has returned permanent failure.
-        sorted_roots = []
-        roots_map = {}
+        slot = None
         blob = None
-        loop = retry.RetryLoop(num_retries, self._check_loop_result,
-                               backoff_start=2)
-        for tries_left in loop:
-            try:
-                sorted_roots = self.map_new_services(
-                    roots_map, locator,
-                    force_rebuild=(tries_left < num_retries),
-                    need_writable=False,
-                    headers=headers)
-            except Exception as error:
-                loop.save_result(error)
-                continue
+        try:
+            locator = KeepLocator(loc_s)
+            if method == "GET":
+                slot, first = self.block_cache.reserve_cache(locator.md5sum)
+                if not first:
+                    self.hits_counter.add(1)
+                    blob = slot.get()
+                    if blob is None:
+                        raise arvados.errors.KeepReadError(
+                            "failed to read {}".format(loc_s))
+                    return blob
+
+            self.misses_counter.add(1)
+
+            headers = {
+                'X-Request-Id': (request_id or
+                                 (hasattr(self, 'api_client') and self.api_client.request_id) or
+                                 arvados.util.new_request_id()),
+            }
+
+            # If the locator has hints specifying a prefix (indicating a
+            # remote keepproxy) or the UUID of a local gateway service,
+            # read data from the indicated service(s) instead of the usual
+            # list of local disk services.
+            hint_roots = ['http://keep.{}.arvadosapi.com/'.format(hint[2:])
+                          for hint in locator.hints if hint.startswith('K@') and len(hint) == 7]
+            hint_roots.extend([self._gateway_services[hint[2:]]['_service_root']
+                               for hint in locator.hints if (
+                                       hint.startswith('K@') and
+                                       len(hint) == 29 and
+                                       self._gateway_services.get(hint[2:])
+                                       )])
+            # Map root URLs to their KeepService objects.
+            roots_map = {
+                root: self.KeepService(root, self._user_agent_pool,
+                                       upload_counter=self.upload_counter,
+                                       download_counter=self.download_counter,
+                                       headers=headers)
+                for root in hint_roots
+            }
+
+            # See #3147 for a discussion of the loop implementation.  Highlights:
+            # * Refresh the list of Keep services after each failure, in case
+            #   it's being updated.
+            # * Retry until we succeed, we're out of retries, or every available
+            #   service has returned permanent failure.
+            sorted_roots = []
+            roots_map = {}
+            loop = retry.RetryLoop(num_retries, self._check_loop_result,
+                                   backoff_start=2)
+            for tries_left in loop:
+                try:
+                    sorted_roots = self.map_new_services(
+                        roots_map, locator,
+                        force_rebuild=(tries_left < num_retries),
+                        need_writable=False,
+                        headers=headers)
+                except Exception as error:
+                    loop.save_result(error)
+                    continue
 
-            # Query KeepService objects that haven't returned
-            # permanent failure, in our specified shuffle order.
-            services_to_try = [roots_map[root]
-                               for root in sorted_roots
-                               if roots_map[root].usable()]
-            for keep_service in services_to_try:
-                blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
-                if blob is not None:
-                    break
-            loop.save_result((blob, len(services_to_try)))
-
-        # Always cache the result, then return it if we succeeded.
-        if method == "GET":
-            slot.set(blob)
-            self.block_cache.cap_cache()
-        if loop.success():
-            if method == "HEAD":
-                return True
-            else:
-                return blob
+                # Query KeepService objects that haven't returned
+                # permanent failure, in our specified shuffle order.
+                services_to_try = [roots_map[root]
+                                   for root in sorted_roots
+                                   if roots_map[root].usable()]
+                for keep_service in services_to_try:
+                    blob = keep_service.get(locator, method=method, timeout=self.current_timeout(num_retries-tries_left))
+                    if blob is not None:
+                        break
+                loop.save_result((blob, len(services_to_try)))
+
+            # Always cache the result, then return it if we succeeded.
+            if loop.success():
+                if method == "HEAD":
+                    return True
+                else:
+                    return blob
+        finally:
+            if slot is not None:
+                slot.set(blob)
+                self.block_cache.cap_cache()
 
         # Q: Including 403 is necessary for the Keep tests to continue
         # passing, but maybe they should expect KeepReadError instead?
@@ -1144,7 +1150,7 @@ class KeepClient(object):
                 loop.save_result(error)
                 continue
 
-            writer_pool = KeepClient.KeepWriterThreadPool(data=data, 
+            writer_pool = KeepClient.KeepWriterThreadPool(data=data,
                                                         data_hash=data_hash,
                                                         copies=copies - done,
                                                         max_service_replicas=self.max_replicas_per_service,
index e0bb734b21fbf2671c51a4ce22dd5c954432a488..872c93bae25b5480de1cbf91400f716543415700 100644 (file)
@@ -1171,7 +1171,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
 
         def finished(self):
             return False
-    
+
     def setUp(self):
         self.copies = 3
         self.pool = arvados.KeepClient.KeepWriterThreadPool(
@@ -1215,7 +1215,7 @@ class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
             self.pool.add_task(ks, None)
         self.pool.join()
         self.assertEqual(self.pool.done(), self.copies-1)
-    
+
 
 @tutil.skip_sleep
 class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
@@ -1250,3 +1250,27 @@ class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 self.keep_client.put('foo', num_retries=1, copies=2)
         self.assertEqual(2, req_mock.call_count)
+
+class KeepClientAPIErrorTest(unittest.TestCase):
+    def test_api_fail(self):
+        class ApiMock(object):
+            def __getattr__(self, r):
+                if r == "api_token":
+                    return "abc"
+                else:
+                    raise arvados.errors.KeepReadError()
+        keep_client = arvados.KeepClient(api_client=ApiMock(),
+                                             proxy='', local_store='')
+
+        # The bug this is testing for is that if an API (not
+        # keepstore) exception is thrown as part of a get(), the next
+        # attempt to get that same block will result in a deadlock.
+        # This is why there are two get()s in a row.  Unfortunately,
+        # the failure mode for this test is that the test suite
+        # deadlocks, there isn't a good way to avoid that without
+        # adding a special case that has no use except for this test.
+
+        with self.assertRaises(arvados.errors.KeepReadError):
+            keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
+        with self.assertRaises(arvados.errors.KeepReadError):
+            keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8+3")
index c94ce89395840452398e8d2b7944cbaf64df3b71..c4f64f6039b3683127d2b5735ae11064446d10cb 100644 (file)
@@ -554,6 +554,10 @@ class ApplicationController < ActionController::Base
     }
   end
 
+  def self._update_requires_parameters
+    {}
+  end
+
   def self._index_requires_parameters
     {
       filters: { type: 'array', required: false },
index 7ee8c2f149e590dea892ee6723fe42b5450b29c3..a2b22ea7f6c55709dff9d745368cbbe46bed5669 100644 (file)
@@ -9,15 +9,38 @@ class Arvados::V1::NodesController < ApplicationController
 
   include DbCurrentTime
 
+  def self._ping_requires_parameters
+    { ping_secret: {required: true} }
+  end
+
+  def self._create_requires_parameters
+    super.merge(
+      { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+  end
+
+  def self._update_requires_parameters
+    super.merge(
+      { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+  end
+
+  def create
+    @object = model_class.new(resource_attrs)
+    @object.assign_slot if params[:assign_slot]
+    @object.save!
+    show
+  end
+
   def update
     if resource_attrs[:job_uuid].is_a? String
       @object.job_readable = readable_job_uuids([resource_attrs[:job_uuid]]).any?
     end
-    super
-  end
-
-  def self._ping_requires_parameters
-    { ping_secret: {required: true} }
+    attrs_to_update = resource_attrs.reject { |k,v|
+      [:kind, :etag, :href].index k
+    }
+    @object.update_attributes!(attrs_to_update)
+    @object.assign_slot if params[:assign_slot]
+    @object.save!
+    show
   end
 
   def ping
index c5fd96eca4747d866795e54ee0931cc4e63ff4f3..a088d48e68f466a6b36ad4d663a031008cc95fd7 100644 (file)
@@ -15,13 +15,18 @@ class Collection < ArvadosModel
   include Trashable
 
   serialize :properties, Hash
+  serialize :storage_classes_desired, Array
+  serialize :storage_classes_confirmed, Array
 
   before_validation :default_empty_manifest
+  before_validation :default_storage_classes, on: :create
   before_validation :check_encoding
   before_validation :check_manifest_validity
   before_validation :check_signatures
   before_validation :strip_signatures_and_update_replication_confirmed
   validate :ensure_pdh_matches_manifest_text
+  validate :ensure_storage_classes_desired_is_not_empty
+  validate :ensure_storage_classes_contain_non_empty_strings
   before_save :set_file_names
 
   api_accessible :user, extend: :common do |t|
@@ -34,6 +39,9 @@ class Collection < ArvadosModel
     t.add :replication_desired
     t.add :replication_confirmed
     t.add :replication_confirmed_at
+    t.add :storage_classes_desired
+    t.add :storage_classes_confirmed
+    t.add :storage_classes_confirmed_at
     t.add :delete_at
     t.add :trash_at
     t.add :is_trashed
@@ -436,7 +444,7 @@ class Collection < ArvadosModel
   end
 
   def self.full_text_searchable_columns
-    super - ["manifest_text"]
+    super - ["manifest_text", "storage_classes_desired", "storage_classes_confirmed"]
   end
 
   def self.where *args
@@ -445,6 +453,17 @@ class Collection < ArvadosModel
   end
 
   protected
+
+  # Although the defaults for these columns is already set up on the schema,
+  # collection creation from an API client seems to ignore them, making the
+  # validation on empty desired storage classes return an error.
+  def default_storage_classes
+    if self.storage_classes_desired.nil? || self.storage_classes_desired.empty?
+      self.storage_classes_desired = ["default"]
+    end
+    self.storage_classes_confirmed ||= []
+  end
+
   def portable_manifest_text
     self.class.munge_manifest_locators(manifest_text) do |match|
       if match[2] # size
@@ -472,12 +491,30 @@ class Collection < ArvadosModel
   end
 
   def ensure_permission_to_save
-    if (not current_user.andand.is_admin and
-        (replication_confirmed_at_changed? or replication_confirmed_changed?) and
-        not (replication_confirmed_at.nil? and replication_confirmed.nil?))
-      raise ArvadosModel::PermissionDeniedError.new("replication_confirmed and replication_confirmed_at attributes cannot be changed, except by setting both to nil")
+    if (not current_user.andand.is_admin)
+      if (replication_confirmed_at_changed? or replication_confirmed_changed?) and
+        not (replication_confirmed_at.nil? and replication_confirmed.nil?)
+        raise ArvadosModel::PermissionDeniedError.new("replication_confirmed and replication_confirmed_at attributes cannot be changed, except by setting both to nil")
+      end
+      if (storage_classes_confirmed_changed? or storage_classes_confirmed_at_changed?) and
+        not (storage_classes_confirmed == [] and storage_classes_confirmed_at.nil?)
+        raise ArvadosModel::PermissionDeniedError.new("storage_classes_confirmed and storage_classes_confirmed_at attributes cannot be changed, except by setting them to [] and nil respectively")
+      end
     end
     super
   end
 
+  def ensure_storage_classes_desired_is_not_empty
+    if self.storage_classes_desired.empty?
+      raise ArvadosModel::InvalidStateTransitionError.new("storage_classes_desired shouldn't be empty")
+    end
+  end
+
+  def ensure_storage_classes_contain_non_empty_strings
+    (self.storage_classes_desired + self.storage_classes_confirmed).each do |c|
+      if !c.is_a?(String) || c == ''
+        raise ArvadosModel::InvalidStateTransitionError.new("storage classes should only be non-empty strings")
+      end
+    end
+  end
 end
index bf1b636c52836bd54c75373c29fde51897e4b766..3d8b91b4b62df590edd2c1049a5ea69e17224bef 100644 (file)
@@ -106,27 +106,7 @@ class Node < ArvadosModel
       end
     end
 
-    # Assign slot_number
-    if self.slot_number.nil?
-      while true
-        n = self.class.available_slot_number
-        if n.nil?
-          raise "No available node slots"
-        end
-        self.slot_number = n
-        begin
-          self.save!
-          break
-        rescue ActiveRecord::RecordNotUnique
-          # try again
-        end
-      end
-    end
-
-    # Assign hostname
-    if self.hostname.nil? and Rails.configuration.assign_node_hostname
-      self.hostname = self.class.hostname_for_slot(self.slot_number)
-    end
+    assign_slot
 
     # Record other basic stats
     ['total_cpu_cores', 'total_ram_mb', 'total_scratch_mb'].each do |key|
@@ -140,8 +120,30 @@ class Node < ArvadosModel
     save!
   end
 
+  def assign_slot
+    return if self.slot_number.andand > 0
+    while true
+      self.slot_number = self.class.available_slot_number
+      if self.slot_number.nil?
+        raise "No available node slots"
+      end
+      begin
+        save!
+        return assign_hostname
+      rescue ActiveRecord::RecordNotUnique
+        # try again
+      end
+    end
+  end
+
   protected
 
+  def assign_hostname
+    if self.hostname.nil? and Rails.configuration.assign_node_hostname
+      self.hostname = self.class.hostname_for_slot(self.slot_number)
+    end
+  end
+
   def self.available_slot_number
     # Join the sequence 1..max with the nodes table. Return the first
     # (i.e., smallest) value that doesn't match the slot_number of any
diff --git a/services/api/db/migrate/20180216203422_add_storage_classes_to_collections.rb b/services/api/db/migrate/20180216203422_add_storage_classes_to_collections.rb
new file mode 100644 (file)
index 0000000..112c2ba
--- /dev/null
@@ -0,0 +1,17 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddStorageClassesToCollections < ActiveRecord::Migration
+  def up
+    add_column :collections, :storage_classes_desired, :jsonb, :default => ["default"]
+    add_column :collections, :storage_classes_confirmed, :jsonb, :default => []
+    add_column :collections, :storage_classes_confirmed_at, :datetime, :default => nil, :null => true
+  end
+
+  def down
+    remove_column :collections, :storage_classes_desired
+    remove_column :collections, :storage_classes_confirmed
+    remove_column :collections, :storage_classes_confirmed_at
+  end
+end
index 14729d31bc91a558dbead7de381e80f85ffb0cfe..357e95c564f885ff6da725091b0711dac0cbaffe 100644 (file)
@@ -170,7 +170,10 @@ CREATE TABLE collections (
     delete_at timestamp without time zone,
     file_names character varying(8192),
     trash_at timestamp without time zone,
-    is_trashed boolean DEFAULT false NOT NULL
+    is_trashed boolean DEFAULT false NOT NULL,
+    storage_classes_desired jsonb DEFAULT '["default"]'::jsonb,
+    storage_classes_confirmed jsonb DEFAULT '[]'::jsonb,
+    storage_classes_confirmed_at timestamp without time zone
 );
 
 
@@ -3049,3 +3052,5 @@ INSERT INTO schema_migrations (version) VALUES ('20171208203841');
 
 INSERT INTO schema_migrations (version) VALUES ('20171212153352');
 
+INSERT INTO schema_migrations (version) VALUES ('20180216203422');
+
index ea87cca36fb5d266ad1bf7a93ae690e5783f6747..807047e53ab40d07ab28b875b57a0c90a5b22096 100644 (file)
@@ -536,6 +536,48 @@ replication_desired_2_confirmed_2:
   manifest_text: ". acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 0:3:foo 3:6:bar\n"
   name: replication want=2 have=2
 
+storage_classes_desired_default_unconfirmed:
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2015-02-07 00:21:35.050333515 Z
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  modified_at: 2015-02-07 00:21:35.050189104 Z
+  portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+  storage_classes_desired: ["default"]
+  storage_classes_confirmed_at: ~
+  storage_classes_confirmed: ~
+  updated_at: 2015-02-07 00:21:35.050126576 Z
+  uuid: zzzzz-4zz18-3t236wrz4769tga
+  manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+  name: storage classes want=[default] have=[]
+
+storage_classes_desired_default_confirmed_default:
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2015-02-07 00:21:35.050333515 Z
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  modified_at: 2015-02-07 00:21:35.050189104 Z
+  portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+  storage_classes_desired: ["default"]
+  storage_classes_confirmed_at: 2015-02-07 00:21:35.050126576 Z
+  storage_classes_confirmed: ["default"]
+  updated_at: 2015-02-07 00:21:35.050126576 Z
+  uuid: zzzzz-4zz18-3t236wr12769tga
+  manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+  name: storage classes want=[default] have=[default]
+
+storage_classes_desired_archive_confirmed_default:
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2015-02-07 00:21:35.050333515 Z
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  modified_at: 2015-02-07 00:21:35.050189104 Z
+  portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
+  storage_classes_desired: ["archive"]
+  storage_classes_confirmed_at: ~
+  storage_classes_confirmed: ["default"]
+  updated_at: 2015-02-07 00:21:35.050126576 Z
+  uuid: zzzzz-4zz18-3t236wr12769qqa
+  manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+  name: storage classes want=[archive] have=[default]
+
 collection_with_empty_properties:
   uuid: zzzzz-4zz18-emptyproperties
   portable_data_hash: fa7aeb5140e2848d39b416daeef4ffc5+45
index ef120b1ca86b79ff54b028fc3c1cf6c5a5e6806a..c76b94e4823bc7542a100cdfe6895757744c7347 100644 (file)
@@ -193,7 +193,7 @@ class Arvados::V1::FiltersTest < ActionController::TestCase
     end
   end
 
-  test "jsonb 'exists' and '!=' filter" do
+  test "jsonb hash 'exists' and '!=' filter" do
     @controller = Arvados::V1::CollectionsController.new
     authorize_with :admin
     get :index, {
@@ -208,7 +208,24 @@ class Arvados::V1::FiltersTest < ActionController::TestCase
     assert_includes(found, collections(:collection_with_prop1_other1).uuid)
   end
 
-  test "jsonb alternate form 'exists' and '!=' filter" do
+  test "jsonb array 'exists'" do
+    @controller = Arvados::V1::CollectionsController.new
+    authorize_with :admin
+    get :index, {
+      filters: [ ['storage_classes_confirmed.default', 'exists', true] ]
+    }
+    assert_response :success
+    found = assigns(:objects).collect(&:uuid)
+    assert_equal 2, found.length
+    assert_not_includes(found,
+      collections(:storage_classes_desired_default_unconfirmed).uuid)
+    assert_includes(found,
+      collections(:storage_classes_desired_default_confirmed_default).uuid)
+    assert_includes(found,
+      collections(:storage_classes_desired_archive_confirmed_default).uuid)
+  end
+
+  test "jsonb hash alternate form 'exists' and '!=' filter" do
     @controller = Arvados::V1::CollectionsController.new
     authorize_with :admin
     get :index, {
@@ -223,6 +240,23 @@ class Arvados::V1::FiltersTest < ActionController::TestCase
     assert_includes(found, collections(:collection_with_prop1_other1).uuid)
   end
 
+  test "jsonb array alternate form 'exists' filter" do
+    @controller = Arvados::V1::CollectionsController.new
+    authorize_with :admin
+    get :index, {
+      filters: [ ['storage_classes_confirmed', 'exists', 'default'] ]
+    }
+    assert_response :success
+    found = assigns(:objects).collect(&:uuid)
+    assert_equal 2, found.length
+    assert_not_includes(found,
+      collections(:storage_classes_desired_default_unconfirmed).uuid)
+    assert_includes(found,
+      collections(:storage_classes_desired_default_confirmed_default).uuid)
+    assert_includes(found,
+      collections(:storage_classes_desired_archive_confirmed_default).uuid)
+  end
+
   test "jsonb 'exists' must be boolean" do
     @controller = Arvados::V1::CollectionsController.new
     authorize_with :admin
index f9e5be454e4f8cd3de8b6d1be68134263e63245d..dc8b3acdd7de02c83f0a668426bcf63a078c5111 100644 (file)
@@ -78,6 +78,48 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     assert_not_nil json_response['uuid']
     assert_not_nil json_response['info'].is_a? Hash
     assert_not_nil json_response['info']['ping_secret']
+    assert_nil json_response['slot_number']
+    assert_nil json_response['hostname']
+  end
+
+  test "create node and assign slot" do
+    authorize_with :admin
+    post :create, {node: {}, assign_slot: true}
+    assert_response :success
+    assert_not_nil json_response['uuid']
+    assert_not_nil json_response['info'].is_a? Hash
+    assert_not_nil json_response['info']['ping_secret']
+    assert_operator 0, :<, json_response['slot_number']
+    n = json_response['slot_number']
+    assert_equal "compute#{n}", json_response['hostname']
+
+    node = Node.where(uuid: json_response['uuid']).first
+    assert_equal n, node.slot_number
+    assert_equal "compute#{n}", node.hostname
+  end
+
+  test "update node and assign slot" do
+    authorize_with :admin
+    node = nodes(:new_with_no_hostname)
+    post :update, {id: node.uuid, node: {}, assign_slot: true}
+    assert_response :success
+    assert_operator 0, :<, json_response['slot_number']
+    n = json_response['slot_number']
+    assert_equal "compute#{n}", json_response['hostname']
+
+    node.reload
+    assert_equal n, node.slot_number
+    assert_equal "compute#{n}", node.hostname
+  end
+
+  test "update node and assign slot, don't clobber hostname" do
+    authorize_with :admin
+    node = nodes(:new_with_custom_hostname)
+    post :update, {id: node.uuid, node: {}, assign_slot: true}
+    assert_response :success
+    assert_operator 0, :<, json_response['slot_number']
+    n = json_response['slot_number']
+    assert_equal "custom1", json_response['hostname']
   end
 
   test "ping adds node stats to info" do
index 62e3755a3fb8793d3b05c2766fbc5c5705245315..d425bc63c0e2e24511b446669271a56c11f04c68 100644 (file)
@@ -221,6 +221,81 @@ class CollectionTest < ActiveSupport::TestCase
     end
   end
 
+  test "storage_classes_desired cannot be empty" do
+    act_as_user users(:active) do
+      c = collections(:collection_owned_by_active)
+      c.update_attributes storage_classes_desired: ["hot"]
+      assert_equal ["hot"], c.storage_classes_desired
+      assert_raise ArvadosModel::InvalidStateTransitionError do
+        c.update_attributes storage_classes_desired: []
+      end
+    end
+  end
+
+  test "storage classes lists should only contain non-empty strings" do
+    c = collections(:storage_classes_desired_default_unconfirmed)
+    act_as_user users(:admin) do
+      assert c.update_attributes(storage_classes_desired: ["default", "a_string"],
+                                 storage_classes_confirmed: ["another_string"])
+      [
+        ["storage_classes_desired", ["default", 42]],
+        ["storage_classes_confirmed", [{the_answer: 42}]],
+        ["storage_classes_desired", ["default", ""]],
+        ["storage_classes_confirmed", [""]],
+      ].each do |attr, val|
+        assert_raise ArvadosModel::InvalidStateTransitionError do
+          assert c.update_attributes({attr => val})
+        end
+      end
+    end
+  end
+
+  test "storage_classes_confirmed* can be set by admin user" do
+    c = collections(:storage_classes_desired_default_unconfirmed)
+    act_as_user users(:admin) do
+      assert c.update_attributes(storage_classes_confirmed: ["default"],
+                                 storage_classes_confirmed_at: Time.now)
+    end
+  end
+
+  test "storage_classes_confirmed* cannot be set by non-admin user" do
+    act_as_user users(:active) do
+      c = collections(:storage_classes_desired_default_unconfirmed)
+      # Cannot set just one at a time.
+      assert_raise ArvadosModel::PermissionDeniedError do
+        c.update_attributes storage_classes_confirmed: ["default"]
+      end
+      c.reload
+      assert_raise ArvadosModel::PermissionDeniedError do
+        c.update_attributes storage_classes_confirmed_at: Time.now
+      end
+      # Cannot set bot at once, either.
+      c.reload
+      assert_raise ArvadosModel::PermissionDeniedError do
+        assert c.update_attributes(storage_classes_confirmed: ["default"],
+                                   storage_classes_confirmed_at: Time.now)
+      end
+    end
+  end
+
+  test "storage_classes_confirmed* can be cleared (but only together) by non-admin user" do
+    act_as_user users(:active) do
+      c = collections(:storage_classes_desired_default_confirmed_default)
+      # Cannot clear just one at a time.
+      assert_raise ArvadosModel::PermissionDeniedError do
+        c.update_attributes storage_classes_confirmed: []
+      end
+      c.reload
+      assert_raise ArvadosModel::PermissionDeniedError do
+        c.update_attributes storage_classes_confirmed_at: nil
+      end
+      # Can clear both at once.
+      c.reload
+      assert c.update_attributes(storage_classes_confirmed: [],
+                                 storage_classes_confirmed_at: nil)
+    end
+  end
+
   [0, 2, 4, nil].each do |ask|
     test "set replication_desired to #{ask.inspect}" do
       Rails.configuration.default_collection_replication = 2
index ae2ca58421d3f3a58f0a4a0f28cbdc2beb56e6af..f77023697e0f54ccaa12e2e7bc1bf3dd39f71509 100644 (file)
@@ -17,6 +17,7 @@ import (
        "strings"
        "time"
 
+       "git.curoverse.com/arvados.git/lib/dispatchcloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
@@ -24,10 +25,17 @@ import (
        "github.com/coreos/go-systemd/daemon"
 )
 
-var version = "dev"
+var (
+       version           = "dev"
+       defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+)
+
+type Dispatcher struct {
+       *dispatch.Dispatcher
+       cluster *arvados.Cluster
+       sqCheck *SqueueChecker
+       slurm   Slurm
 
-// Config used by crunch-dispatch-slurm
-type Config struct {
        Client arvados.Client
 
        SbatchArguments []string
@@ -39,29 +47,33 @@ type Config struct {
        // Example: []string{"crunch-run", "--cgroup-parent-subsystem=memory"}
        CrunchRunCommand []string
 
+       // Extra RAM to reserve (in Bytes) for SLURM job, in addition
+       // to the amount specified in the container's RuntimeConstraints
+       ReserveExtraRAM int64
+
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
-
-       slurm Slurm
 }
 
 func main() {
-       theConfig.slurm = &slurmCLI{}
-       err := doMain()
+       disp := &Dispatcher{}
+       err := disp.Run(os.Args[0], os.Args[1:])
        if err != nil {
                log.Fatal(err)
        }
 }
 
-var (
-       theConfig Config
-       sqCheck   = &SqueueChecker{}
-)
-
-const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+func (disp *Dispatcher) Run(prog string, args []string) error {
+       if err := disp.configure(prog, args); err != nil {
+               return err
+       }
+       disp.setup()
+       return disp.run()
+}
 
-func doMain() error {
-       flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+// configure() loads config files. Tests skip this.
+func (disp *Dispatcher) configure(prog string, args []string) error {
+       flags := flag.NewFlagSet(prog, flag.ExitOnError)
        flags.Usage = func() { usage(flags) }
 
        configPath := flags.String(
@@ -77,7 +89,7 @@ func doMain() error {
                false,
                "Print version information and exit.")
        // Parse args; omit the first arg which is the command name
-       flags.Parse(os.Args[1:])
+       flags.Parse(args)
 
        // Print version information if requested
        if *getVersion {
@@ -87,63 +99,84 @@ func doMain() error {
 
        log.Printf("crunch-dispatch-slurm %s started", version)
 
-       err := readConfig(&theConfig, *configPath)
+       err := disp.readConfig(*configPath)
        if err != nil {
                return err
        }
 
-       if theConfig.CrunchRunCommand == nil {
-               theConfig.CrunchRunCommand = []string{"crunch-run"}
+       if disp.CrunchRunCommand == nil {
+               disp.CrunchRunCommand = []string{"crunch-run"}
        }
 
-       if theConfig.PollPeriod == 0 {
-               theConfig.PollPeriod = arvados.Duration(10 * time.Second)
+       if disp.PollPeriod == 0 {
+               disp.PollPeriod = arvados.Duration(10 * time.Second)
        }
 
-       if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
+       if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
                // Copy real configs into env vars so [a]
                // MakeArvadosClient() uses them, and [b] they get
                // propagated to crunch-run via SLURM.
-               os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
-               os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
+               os.Setenv("ARVADOS_API_HOST", disp.Client.APIHost)
+               os.Setenv("ARVADOS_API_TOKEN", disp.Client.AuthToken)
                os.Setenv("ARVADOS_API_HOST_INSECURE", "")
-               if theConfig.Client.Insecure {
+               if disp.Client.Insecure {
                        os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
                }
-               os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
+               os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
                os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
        } else {
                log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
        }
 
        if *dumpConfig {
-               log.Fatal(config.DumpAndExit(theConfig))
+               return config.DumpAndExit(disp)
+       }
+
+       siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if os.IsNotExist(err) {
+               log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
+       } else if err != nil {
+               return fmt.Errorf("error loading config: %s", err)
+       } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
+               return fmt.Errorf("config error: %s", err)
        }
 
+       return nil
+}
+
+// setup() initializes private fields after configure().
+func (disp *Dispatcher) setup() {
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Printf("Error making Arvados client: %v", err)
-               return err
+               log.Fatalf("Error making Arvados client: %v", err)
        }
        arv.Retries = 25
 
-       sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
-       defer sqCheck.Stop()
-
-       dispatcher := &dispatch.Dispatcher{
+       disp.slurm = &slurmCLI{}
+       disp.sqCheck = &SqueueChecker{
+               Period: time.Duration(disp.PollPeriod),
+               Slurm:  disp.slurm,
+       }
+       disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
-               RunContainer:   run,
-               PollPeriod:     time.Duration(theConfig.PollPeriod),
-               MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
+               RunContainer:   disp.runContainer,
+               PollPeriod:     time.Duration(disp.PollPeriod),
+               MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+       }
+}
+
+func (disp *Dispatcher) run() error {
+       defer disp.sqCheck.Stop()
+
+       if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
+               go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster)
        }
 
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
-
-       go checkSqueueForOrphans(dispatcher, sqCheck)
-
-       return dispatcher.Run(context.Background())
+       go disp.checkSqueueForOrphans()
+       return disp.Dispatcher.Run(context.Background())
 }
 
 var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
@@ -153,19 +186,19 @@ var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`
 // jobs started by a previous dispatch process that never released
 // their slurm allocations even though their container states are
 // Cancelled or Complete. See https://dev.arvados.org/issues/10979
-func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
-       for _, uuid := range sqCheck.All() {
+func (disp *Dispatcher) checkSqueueForOrphans() {
+       for _, uuid := range disp.sqCheck.All() {
                if !containerUuidPattern.MatchString(uuid) {
                        continue
                }
-               err := dispatcher.TrackContainer(uuid)
+               err := disp.TrackContainer(uuid)
                if err != nil {
                        log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
                }
        }
 }
 
-func niceness(priority int) int {
+func (disp *Dispatcher) niceness(priority int) int {
        if priority > 1000 {
                priority = 1000
        }
@@ -176,8 +209,8 @@ func niceness(priority int) int {
        return (1000 - priority) * 10
 }
 
-func sbatchArgs(container arvados.Container) []string {
-       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
+func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
+       mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
 
        var disk int64
        for _, m := range container.Mounts {
@@ -188,46 +221,65 @@ func sbatchArgs(container arvados.Container) []string {
        disk = int64(math.Ceil(float64(disk) / float64(1048576)))
 
        var sbatchArgs []string
-       sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
+       sbatchArgs = append(sbatchArgs, disp.SbatchArguments...)
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
-       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
 
-       return sbatchArgs
+       if disp.cluster == nil {
+               // no instance types configured
+       } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+               // ditto
+       } else if err != nil {
+               return nil, err
+       } else {
+               sbatchArgs = append(sbatchArgs, "--constraint=instancetype="+it.Name)
+       }
+
+       return sbatchArgs, nil
 }
 
-func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
+func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
        // append() here avoids modifying crunchRunCommand's
        // underlying array, which is shared with other goroutines.
        crArgs := append([]string(nil), crunchRunCommand...)
        crArgs = append(crArgs, container.UUID)
        crScript := strings.NewReader(execScript(crArgs))
 
-       sqCheck.L.Lock()
-       defer sqCheck.L.Unlock()
+       disp.sqCheck.L.Lock()
+       defer disp.sqCheck.L.Unlock()
 
-       sbArgs := sbatchArgs(container)
+       sbArgs, err := disp.sbatchArgs(container)
+       if err != nil {
+               return err
+       }
        log.Printf("running sbatch %+q", sbArgs)
-       return theConfig.slurm.Batch(crScript, sbArgs)
+       return disp.slurm.Batch(crScript, sbArgs)
 }
 
 // Submit a container to the slurm queue (or resume monitoring if it's
 // already in the queue).  Cancel the slurm job if the container's
 // priority changes to zero or its state indicates it's no longer
 // running.
-func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
-       if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
+       if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
                log.Printf("Submitting container %s to slurm", ctr.UUID)
-               if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
-                       text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+               if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
+                       var text string
+                       if err == dispatchcloud.ErrConstraintsNotSatisfiable {
+                               text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
+                               disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+                       } else {
+                               text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+                       }
                        log.Print(text)
 
                        lr := arvadosclient.Dict{"log": arvadosclient.Dict{
@@ -248,7 +300,7 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
        // no point in waiting for further dispatch updates: just
        // clean up and return.
        go func(uuid string) {
-               for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
+               for ctx.Err() == nil && disp.sqCheck.HasUUID(uuid) {
                }
                cancel()
        }(ctr.UUID)
@@ -269,56 +321,56 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        return
                case updated, ok := <-status:
                        if !ok {
-                               log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
-                               scancel(ctr)
+                               log.Printf("container %s is done: cancel slurm job", ctr.UUID)
+                               disp.scancel(ctr)
                        } else if updated.Priority == 0 {
-                               log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
-                               scancel(ctr)
+                               log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+                               disp.scancel(ctr)
                        } else {
-                               renice(updated)
+                               disp.renice(updated)
                        }
                }
        }
 }
 
-func scancel(ctr arvados.Container) {
-       sqCheck.L.Lock()
-       err := theConfig.slurm.Cancel(ctr.UUID)
-       sqCheck.L.Unlock()
+func (disp *Dispatcher) scancel(ctr arvados.Container) {
+       disp.sqCheck.L.Lock()
+       err := disp.slurm.Cancel(ctr.UUID)
+       disp.sqCheck.L.Unlock()
 
        if err != nil {
                log.Printf("scancel: %s", err)
                time.Sleep(time.Second)
-       } else if sqCheck.HasUUID(ctr.UUID) {
+       } else if disp.sqCheck.HasUUID(ctr.UUID) {
                log.Printf("container %s is still in squeue after scancel", ctr.UUID)
                time.Sleep(time.Second)
        }
 }
 
-func renice(ctr arvados.Container) {
-       nice := niceness(ctr.Priority)
-       oldnice := sqCheck.GetNiceness(ctr.UUID)
+func (disp *Dispatcher) renice(ctr arvados.Container) {
+       nice := disp.niceness(ctr.Priority)
+       oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
        if nice == oldnice || oldnice == -1 {
                return
        }
        log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
-       sqCheck.L.Lock()
-       err := theConfig.slurm.Renice(ctr.UUID, nice)
-       sqCheck.L.Unlock()
+       disp.sqCheck.L.Lock()
+       err := disp.slurm.Renice(ctr.UUID, nice)
+       disp.sqCheck.L.Unlock()
 
        if err != nil {
                log.Printf("renice: %s", err)
                time.Sleep(time.Second)
                return
        }
-       if sqCheck.HasUUID(ctr.UUID) {
+       if disp.sqCheck.HasUUID(ctr.UUID) {
                log.Printf("container %s has arvados priority %d, slurm nice %d",
-                       ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+                       ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
        }
 }
 
-func readConfig(dst interface{}, path string) error {
-       err := config.LoadFile(dst, path)
+func (disp *Dispatcher) readConfig(path string) error {
+       err := config.LoadFile(disp, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
                log.Printf("Config not specified. Continue with default configuration.")
                err = nil
index 830976d66bdd9e3dbce855936a241b7b83ec8e7b..9fb5d6627eefa694181f4223d0e6dad17df45881 100644 (file)
@@ -20,6 +20,7 @@ import (
        "testing"
        "time"
 
+       "git.curoverse.com/arvados.git/lib/dispatchcloud"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
@@ -32,39 +33,27 @@ func Test(t *testing.T) {
        TestingT(t)
 }
 
-var _ = Suite(&TestSuite{})
-var _ = Suite(&MockArvadosServerSuite{})
+var _ = Suite(&IntegrationSuite{})
+var _ = Suite(&StubbedSuite{})
 
-type TestSuite struct{}
-type MockArvadosServerSuite struct{}
-
-var initialArgs []string
-
-func (s *TestSuite) SetUpSuite(c *C) {
-       initialArgs = os.Args
-}
-
-func (s *TestSuite) TearDownSuite(c *C) {
+type IntegrationSuite struct {
+       disp  Dispatcher
+       slurm slurmFake
 }
 
-func (s *TestSuite) SetUpTest(c *C) {
-       args := []string{"crunch-dispatch-slurm"}
-       os.Args = args
-
+func (s *IntegrationSuite) SetUpTest(c *C) {
        arvadostest.StartAPI()
        os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
+       s.disp = Dispatcher{}
+       s.disp.setup()
+       s.slurm = slurmFake{}
 }
 
-func (s *TestSuite) TearDownTest(c *C) {
-       os.Args = initialArgs
+func (s *IntegrationSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
        arvadostest.StopAPI()
 }
 
-func (s *MockArvadosServerSuite) TearDownTest(c *C) {
-       arvadostest.ResetEnv()
-}
-
 type slurmFake struct {
        didBatch  [][]string
        didCancel []string
@@ -102,7 +91,7 @@ func (sf *slurmFake) Cancel(name string) error {
        return nil
 }
 
-func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+func (s *IntegrationSuite) integrationTest(c *C,
        expectBatch [][]string,
        runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
        arvadostest.ResetEnv()
@@ -110,11 +99,6 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
        arv, err := arvadosclient.MakeArvadosClient()
        c.Assert(err, IsNil)
 
-       defer func(orig Slurm) {
-               theConfig.slurm = orig
-       }(theConfig.slurm)
-       theConfig.slurm = slurm
-
        // There should be one queued container
        params := arvadosclient.Dict{
                "filters": [][]string{{"state", "=", "Queued"}},
@@ -124,34 +108,35 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
        c.Check(err, IsNil)
        c.Check(len(containers.Items), Equals, 1)
 
-       theConfig.CrunchRunCommand = []string{"echo"}
+       s.disp.CrunchRunCommand = []string{"echo"}
 
        ctx, cancel := context.WithCancel(context.Background())
        doneRun := make(chan struct{})
 
-       dispatcher := dispatch.Dispatcher{
+       s.disp.Dispatcher = &dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Duration(1) * time.Second,
                RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
                        go func() {
                                runContainer(disp, ctr)
-                               slurm.queue = ""
+                               s.slurm.queue = ""
                                doneRun <- struct{}{}
                        }()
-                       run(disp, ctr, status)
+                       s.disp.runContainer(disp, ctr, status)
                        cancel()
                },
        }
 
-       sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
+       s.disp.slurm = &s.slurm
+       s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
 
-       err = dispatcher.Run(ctx)
+       err = s.disp.Dispatcher.Run(ctx)
        <-doneRun
        c.Assert(err, Equals, context.Canceled)
 
-       sqCheck.Stop()
+       s.disp.sqCheck.Stop()
 
-       c.Check(slurm.didBatch, DeepEquals, expectBatch)
+       c.Check(s.slurm.didBatch, DeepEquals, expectBatch)
 
        // There should be no queued containers now
        err = arv.List("containers", params, &containers)
@@ -165,9 +150,9 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
        return container
 }
 
-func (s *TestSuite) TestIntegrationNormal(c *C) {
+func (s *IntegrationSuite) TestNormal(c *C) {
+       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
        container := s.integrationTest(c,
-               &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -177,12 +162,11 @@ func (s *TestSuite) TestIntegrationNormal(c *C) {
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
-func (s *TestSuite) TestIntegrationCancel(c *C) {
-       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+func (s *IntegrationSuite) TestCancel(c *C) {
+       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
        readyToCancel := make(chan bool)
-       slurm.onCancel = func() { <-readyToCancel }
+       s.slurm.onCancel = func() { <-readyToCancel }
        container := s.integrationTest(c,
-               slurm,
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -195,12 +179,12 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
                        close(readyToCancel)
                })
        c.Check(container.State, Equals, arvados.ContainerStateCancelled)
-       c.Check(len(slurm.didCancel) > 1, Equals, true)
-       c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
+       c.Check(len(s.slurm.didCancel) > 1, Equals, true)
+       c.Check(s.slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
 }
 
-func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-       container := s.integrationTest(c, &slurmFake{},
+func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
+       container := s.integrationTest(c,
                [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--mem=%d", 11445),
@@ -215,9 +199,9 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
        c.Check(container.State, Equals, arvados.ContainerStateCancelled)
 }
 
-func (s *TestSuite) TestSbatchFail(c *C) {
+func (s *IntegrationSuite) TestSbatchFail(c *C) {
+       s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
        container := s.integrationTest(c,
-               &slurmFake{errBatch: errors.New("something terrible happened")},
                [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -236,15 +220,42 @@ func (s *TestSuite) TestSbatchFail(c *C) {
        c.Assert(len(ll.Items), Equals, 1)
 }
 
-func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
+func (s *IntegrationSuite) TestChangePriority(c *C) {
+       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       container := s.integrationTest(c, nil,
+               func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
+                       dispatcher.UpdateState(container.UUID, dispatch.Running)
+                       time.Sleep(time.Second)
+                       dispatcher.Arv.Update("containers", container.UUID,
+                               arvadosclient.Dict{
+                                       "container": arvadosclient.Dict{"priority": 600}},
+                               nil)
+                       time.Sleep(time.Second)
+                       dispatcher.UpdateState(container.UUID, dispatch.Complete)
+               })
+       c.Check(container.State, Equals, arvados.ContainerStateComplete)
+       c.Assert(len(s.slurm.didRenice), Not(Equals), 0)
+       c.Check(s.slurm.didRenice[len(s.slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+}
+
+type StubbedSuite struct {
+       disp Dispatcher
+}
+
+func (s *StubbedSuite) SetUpTest(c *C) {
+       s.disp = Dispatcher{}
+       s.disp.setup()
+}
+
+func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-       testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
+       s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
-func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
        apiStub := arvadostest.ServerStub{apiStubResponses}
 
        api := httptest.NewServer(&apiStub)
@@ -262,7 +273,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        log.SetOutput(io.MultiWriter(buf, os.Stderr))
        defer log.SetOutput(os.Stderr)
 
-       theConfig.CrunchRunCommand = []string{crunchCmd}
+       s.disp.CrunchRunCommand = []string{crunchCmd}
 
        ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
@@ -274,7 +285,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                                disp.UpdateState(ctr.UUID, dispatch.Running)
                                disp.UpdateState(ctr.UUID, dispatch.Complete)
                        }()
-                       run(disp, ctr, status)
+                       s.disp.runContainer(disp, ctr, status)
                        cancel()
                },
        }
@@ -292,15 +303,12 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }
 
-func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
-       var config Config
-       err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
+func (s *StubbedSuite) TestNoSuchConfigFile(c *C) {
+       err := s.disp.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
        c.Assert(err, NotNil)
 }
 
-func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
-       var config Config
-
+func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) {
        tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
        c.Check(err, IsNil)
        defer os.Remove(tmpfile.Name())
@@ -308,13 +316,11 @@ func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
        _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
        c.Check(err, IsNil)
 
-       err = readConfig(&config, tmpfile.Name())
+       err = s.disp.readConfig(tmpfile.Name())
        c.Assert(err, NotNil)
 }
 
-func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
-       var config Config
-
+func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) {
        tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
        c.Check(err, IsNil)
        defer os.Remove(tmpfile.Name())
@@ -322,14 +328,12 @@ func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
        _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
        c.Check(err, IsNil)
 
-       err = readConfig(&config, tmpfile.Name())
+       err = s.disp.readConfig(tmpfile.Name())
        c.Assert(err, IsNil)
-       c.Check(0, Equals, len(config.SbatchArguments))
+       c.Check(0, Equals, len(s.disp.SbatchArguments))
 }
 
-func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
-       var config Config
-
+func (s *StubbedSuite) TestReadConfig(c *C) {
        tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
        c.Check(err, IsNil)
        defer os.Remove(tmpfile.Name())
@@ -339,66 +343,90 @@ func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
        _, err = tmpfile.Write([]byte(argsS))
        c.Check(err, IsNil)
 
-       err = readConfig(&config, tmpfile.Name())
+       err = s.disp.readConfig(tmpfile.Name())
        c.Assert(err, IsNil)
-       c.Check(3, Equals, len(config.SbatchArguments))
-       c.Check(args, DeepEquals, config.SbatchArguments)
+       c.Check(args, DeepEquals, s.disp.SbatchArguments)
 }
 
-func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
-       testSbatchFuncWithArgs(c, nil)
-}
-
-func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
-       testSbatchFuncWithArgs(c, []string{})
-}
+func (s *StubbedSuite) TestSbatchArgs(c *C) {
+       container := arvados.Container{
+               UUID:               "123",
+               RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
+               Priority:           1,
+       }
 
-func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
-       testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
+       for _, defaults := range [][]string{
+               nil,
+               {},
+               {"--arg1=v1", "--arg2"},
+       } {
+               c.Logf("%#v", defaults)
+               s.disp.SbatchArguments = defaults
+
+               args, err := s.disp.sbatchArgs(container)
+               c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
+               c.Check(err, IsNil)
+       }
 }
 
-func testSbatchFuncWithArgs(c *C, args []string) {
-       defer func() { theConfig.SbatchArguments = nil }()
-       theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
-
+func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
        container := arvados.Container{
                UUID:               "123",
                RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
-               Priority:           1}
+               Priority:           1,
+       }
 
-       var expected []string
-       expected = append(expected, theConfig.SbatchArguments...)
-       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-       c.Check(sbatchArgs(container), DeepEquals, expected)
+       for _, trial := range []struct {
+               types      []arvados.InstanceType
+               sbatchArgs []string
+               err        error
+       }{
+               // Choose node type => use --constraint arg
+               {
+                       types: []arvados.InstanceType{
+                               {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+                               {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
+                               {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
+                               {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
+                       },
+                       sbatchArgs: []string{"--constraint=instancetype=a1.medium"},
+               },
+               // No node types configured => no slurm constraint
+               {
+                       types:      nil,
+                       sbatchArgs: nil,
+               },
+               // No node type is big enough => error
+               {
+                       types: []arvados.InstanceType{
+                               {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+                       },
+                       err: dispatchcloud.ErrConstraintsNotSatisfiable,
+               },
+       } {
+               c.Logf("%#v", trial)
+               s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
+
+               args, err := s.disp.sbatchArgs(container)
+               c.Check(err, Equals, trial.err)
+               if trial.err == nil {
+                       c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
+               }
+       }
 }
 
-func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
+func (s *StubbedSuite) TestSbatchPartition(c *C) {
        container := arvados.Container{
                UUID:                 "123",
                RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
                SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
-               Priority:             1}
+               Priority:             1,
+       }
 
-       c.Check(sbatchArgs(container), DeepEquals, []string{
+       args, err := s.disp.sbatchArgs(container)
+       c.Check(args, DeepEquals, []string{
                "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
                "--partition=blurb,b2",
        })
-}
-
-func (s *TestSuite) TestIntegrationChangePriority(c *C) {
-       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
-       container := s.integrationTest(c, slurm, nil,
-               func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
-                       dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(time.Second)
-                       dispatcher.Arv.Update("containers", container.UUID,
-                               arvadosclient.Dict{
-                                       "container": arvadosclient.Dict{"priority": 600}},
-                               nil)
-                       time.Sleep(time.Second)
-                       dispatcher.UpdateState(container.UUID, dispatch.Complete)
-               })
-       c.Check(container.State, Equals, arvados.ContainerStateComplete)
-       c.Assert(len(slurm.didRenice), Not(Equals), 0)
-       c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+       c.Check(err, IsNil)
 }
index 5ecfe8ff2fc049201eb27c8e58c7a02eb84cbbb4..adb620ea8d34778f9d3c4d32edd42f518867b5a4 100644 (file)
@@ -22,6 +22,7 @@ type jobPriority struct {
 // command 'squeue'.
 type SqueueChecker struct {
        Period    time.Duration
+       Slurm     Slurm
        uuids     map[string]jobPriority
        startOnce sync.Once
        done      chan struct{}
@@ -77,7 +78,7 @@ func (sqc *SqueueChecker) check() {
        sqc.L.Lock()
        defer sqc.L.Unlock()
 
-       cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
+       cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
        cmd.Stdout, cmd.Stderr = stdout, stderr
        if err := cmd.Run(); err != nil {
index f64c5023952b42d9ed7e4483941b6dfd561f98a1..032d86284d5e0a9fc8a3d712a0283597ec29d765 100644 (file)
@@ -20,7 +20,8 @@ var exampleConfigFile = []byte(`
        },
        "CrunchRunCommand": ["crunch-run"],
        "PollPeriod": "10s",
-       "SbatchArguments": ["--partition=foo", "--exclude=node13"]
+       "SbatchArguments": ["--partition=foo", "--exclude=node13"],
+       "ReserveExtraRAM": 268435456,
     }`)
 
 func usage(fs *flag.FlagSet) {
index 788d475e33c0d094d719503e6b9fc4dba386e1ec..f1e49f5afcffff32143b9033c5f83dddcd0c7c65 100644 (file)
@@ -156,12 +156,30 @@ class InodeCache(object):
 
     def _remove(self, obj, clear):
         if clear:
-            if obj.in_use():
-                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
-                return
+            # Kernel behavior seems to be that if a file is
+            # referenced, its parents remain referenced too. This
+            # means has_ref() exits early when a collection is not
+            # candidate for eviction.
+            #
+            # By contrast, in_use() doesn't increment references on
+            # parents, so it requires a full tree walk to determine if
+            # a collection is a candidate for eviction.  This takes
+            # .07s for 240000 files, which becomes a major drag when
+            # cap_cache is being called several times a second and
+            # there are multiple non-evictable collections in the
+            # cache.
+            #
+            # So it is important for performance that we do the
+            # has_ref() check first.
+
             if obj.has_ref(True):
                 _logger.debug("InodeCache cannot clear inode %i, still referenced", obj.inode)
                 return
+
+            if obj.in_use():
+                _logger.debug("InodeCache cannot clear inode %i, in use", obj.inode)
+                return
+
             obj.kernel_invalidate()
             _logger.debug("InodeCache sent kernel invalidate inode %i", obj.inode)
             obj.clear()
@@ -202,7 +220,8 @@ class InodeCache(object):
                     if obj not in self._by_uuid[obj.cache_uuid]:
                         self._by_uuid[obj.cache_uuid].append(obj)
             self._total += obj.objsize()
-            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i", obj.inode, obj.objsize(), obj.cache_uuid, self._total)
+            _logger.debug("InodeCache touched inode %i (size %i) (uuid %s) total now %i (%i entries)",
+                          obj.inode, obj.objsize(), obj.cache_uuid, self._total, len(self._entries))
             self.cap_cache()
 
     def touch(self, obj):
index 8b680f0663d25cf423e68251f1a82b8ed7384bc2..2a3a19c54c66005a6f96cd8d1dbd6de3c6345aad 100644 (file)
@@ -59,6 +59,10 @@ class FreshBase(object):
     * Clear the object contents (invalidates the object)
 
     """
+
+    __slots__ = ("_stale", "_poll", "_last_update", "_atime", "_poll_time", "use_count",
+                 "ref_count", "dead", "cache_size", "cache_uuid", "allow_attr_cache")
+
     def __init__(self):
         self._stale = True
         self._poll = False
index 585536176007bdfcc889a47647f85114e6a34fb7..cedb4fb451cdf6fbdaefe0b4caa3a20ef424d69e 100644 (file)
@@ -15,6 +15,8 @@ _logger = logging.getLogger('arvados.arvados_fuse')
 class File(FreshBase):
     """Base for file objects."""
 
+    __slots__ = ("inode", "parent_inode", "_mtime")
+
     def __init__(self, parent_inode, _mtime=0):
         super(File, self).__init__()
         self.inode = None
@@ -46,6 +48,8 @@ class File(FreshBase):
 class FuseArvadosFile(File):
     """Wraps a ArvadosFile."""
 
+    __slots__ = ('arvfile',)
+
     def __init__(self, parent_inode, arvfile, _mtime):
         super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
         self.arvfile = arvfile
index 62c856da340183d0483503e0a2393816acaf0321..f18d82c06b29b7948a90431be686001b1bd9e572 100644 (file)
@@ -21,8 +21,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       log "github.com/Sirupsen/logrus"
-       "github.com/curoverse/azure-sdk-for-go/storage"
+       "github.com/Azure/azure-sdk-for-go/storage"
 )
 
 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
@@ -106,9 +105,18 @@ type AzureBlobVolume struct {
        AzureReplication      int
        ReadOnly              bool
        RequestTimeout        arvados.Duration
+       StorageClasses        []string
 
-       azClient storage.Client
-       bsClient *azureBlobClient
+       azClient  storage.Client
+       container *azureContainer
+}
+
+// singleSender is a single-attempt storage.Sender.
+type singleSender struct{}
+
+// Send performs req exactly once.
+func (*singleSender) Send(c *storage.Client, req *http.Request) (resp *http.Response, err error) {
+       return c.HTTPClient.Do(req)
 }
 
 // Examples implements VolumeWithExamples.
@@ -156,6 +164,7 @@ func (v *AzureBlobVolume) Start() error {
        if err != nil {
                return fmt.Errorf("creating Azure storage client: %s", err)
        }
+       v.azClient.Sender = &singleSender{}
 
        if v.RequestTimeout == 0 {
                v.RequestTimeout = azureDefaultRequestTimeout
@@ -164,15 +173,13 @@ func (v *AzureBlobVolume) Start() error {
                Timeout: time.Duration(v.RequestTimeout),
        }
        bs := v.azClient.GetBlobService()
-       v.bsClient = &azureBlobClient{
-               client: &bs,
+       v.container = &azureContainer{
+               ctr: bs.GetContainerReference(v.ContainerName),
        }
 
-       ok, err := v.bsClient.ContainerExists(v.ContainerName)
-       if err != nil {
+       if ok, err := v.container.Exists(); err != nil {
                return err
-       }
-       if !ok {
+       } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
        return nil
@@ -185,7 +192,7 @@ func (v *AzureBlobVolume) DeviceID() string {
 
 // Return true if expires_at metadata attribute is found on the block
 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
-       metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
+       metadata, err := v.container.GetBlobMetadata(loc)
        if err != nil {
                return false, metadata, v.translateError(err)
        }
@@ -252,7 +259,7 @@ func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int,
        if azureMaxGetBytes < BlockSize {
                // Unfortunately the handler doesn't tell us how long the blob
                // is expected to be, so we have to ask Azure.
-               props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+               props, err := v.container.GetBlobProperties(loc)
                if err != nil {
                        return 0, v.translateError(err)
                }
@@ -294,9 +301,9 @@ func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int,
                        go func() {
                                defer close(gotRdr)
                                if startPos == 0 && endPos == expectSize {
-                                       rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+                                       rdr, err = v.container.GetBlob(loc)
                                } else {
-                                       rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+                                       rdr, err = v.container.GetBlobRange(loc, startPos, endPos-1, nil)
                                }
                        }()
                        select {
@@ -365,7 +372,7 @@ func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte
        gotRdr := make(chan struct{})
        go func() {
                defer close(gotRdr)
-               rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+               rdr, err = v.container.GetBlob(loc)
        }()
        select {
        case <-ctx.Done():
@@ -412,7 +419,7 @@ func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) err
                        body = http.NoBody
                        bufr.Close()
                }
-               errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil)
+               errChan <- v.container.CreateBlockBlobFromReader(loc, len(block), body, nil)
        }()
        select {
        case <-ctx.Done():
@@ -446,7 +453,7 @@ func (v *AzureBlobVolume) Touch(loc string) error {
        }
 
        metadata["touch"] = fmt.Sprintf("%d", time.Now())
-       return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
+       return v.container.SetBlobMetadata(loc, metadata, nil)
 }
 
 // Mtime returns the last-modified property of a block blob.
@@ -459,11 +466,11 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
                return time.Time{}, os.ErrNotExist
        }
 
-       props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+       props, err := v.container.GetBlobProperties(loc)
        if err != nil {
                return time.Time{}, err
        }
-       return time.Parse(time.RFC1123, props.LastModified)
+       return time.Time(props.LastModified), nil
 }
 
 // IndexTo writes a list of Keep blocks that are stored in the
@@ -471,22 +478,19 @@ func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        params := storage.ListBlobsParameters{
                Prefix:  prefix,
-               Include: "metadata",
+               Include: &storage.IncludeBlobDataset{Metadata: true},
        }
        for {
-               resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
+               resp, err := v.container.ListBlobs(params)
                if err != nil {
                        return err
                }
                for _, b := range resp.Blobs {
-                       t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
-                       if err != nil {
-                               return err
-                       }
                        if !v.isKeepBlock(b.Name) {
                                continue
                        }
-                       if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
+                       modtime := time.Time(b.Properties.LastModified)
+                       if b.Properties.ContentLength == 0 && modtime.Add(azureWriteRaceInterval).After(time.Now()) {
                                // A new zero-length blob is probably
                                // just a new non-empty blob that
                                // hasn't committed its data yet (see
@@ -498,7 +502,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                                // Trashed blob; exclude it from response
                                continue
                        }
-                       fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
+                       fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, modtime.UnixNano())
                }
                if resp.NextMarker == "" {
                        return nil
@@ -518,7 +522,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
        // we get the Etag before checking Mtime, and use If-Match to
        // ensure we don't delete data if Put() or Touch() happens
        // between our calls to Mtime() and DeleteBlob().
-       props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
+       props, err := v.container.GetBlobProperties(loc)
        if err != nil {
                return err
        }
@@ -530,16 +534,16 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 
        // If TrashLifetime == 0, just delete it
        if theConfig.TrashLifetime == 0 {
-               return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
-                       "If-Match": props.Etag,
+               return v.container.DeleteBlob(loc, &storage.DeleteBlobOptions{
+                       IfMatch: props.Etag,
                })
        }
 
        // Otherwise, mark as trash
-       return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
+       return v.container.SetBlobMetadata(loc, storage.BlobMetadata{
                "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
-       }, map[string]string{
-               "If-Match": props.Etag,
+       }, &storage.SetBlobMetadataOptions{
+               IfMatch: props.Etag,
        })
 }
 
@@ -547,7 +551,7 @@ func (v *AzureBlobVolume) Trash(loc string) error {
 // Delete the expires_at metadata attribute
 func (v *AzureBlobVolume) Untrash(loc string) error {
        // if expires_at does not exist, return NotFoundError
-       metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
+       metadata, err := v.container.GetBlobMetadata(loc)
        if err != nil {
                return v.translateError(err)
        }
@@ -557,7 +561,7 @@ func (v *AzureBlobVolume) Untrash(loc string) error {
 
        // reset expires_at metadata attribute
        metadata["expires_at"] = ""
-       err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
+       err = v.container.SetBlobMetadata(loc, metadata, nil)
        return v.translateError(err)
 }
 
@@ -587,6 +591,11 @@ func (v *AzureBlobVolume) Replication() int {
        return v.AzureReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *AzureBlobVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // If possible, translate an Azure SDK error to a recognizable error
 // like os.ErrNotExist.
 func (v *AzureBlobVolume) translateError(err error) error {
@@ -612,10 +621,10 @@ func (v *AzureBlobVolume) isKeepBlock(s string) bool {
 func (v *AzureBlobVolume) EmptyTrash() {
        var bytesDeleted, bytesInTrash int64
        var blocksDeleted, blocksInTrash int
-       params := storage.ListBlobsParameters{Include: "metadata"}
+       params := storage.ListBlobsParameters{Include: &storage.IncludeBlobDataset{Metadata: true}}
 
        for {
-               resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
+               resp, err := v.container.ListBlobs(params)
                if err != nil {
                        log.Printf("EmptyTrash: ListBlobs: %v", err)
                        break
@@ -639,8 +648,8 @@ func (v *AzureBlobVolume) EmptyTrash() {
                                continue
                        }
 
-                       err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
-                               "If-Match": b.Properties.Etag,
+                       err = v.container.DeleteBlob(b.Name, &storage.DeleteBlobOptions{
+                               IfMatch: b.Properties.Etag,
                        })
                        if err != nil {
                                log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
@@ -660,7 +669,7 @@ func (v *AzureBlobVolume) EmptyTrash() {
 
 // InternalStats returns bucket I/O and API call counters.
 func (v *AzureBlobVolume) InternalStats() interface{} {
-       return &v.bsClient.stats
+       return &v.container.stats
 }
 
 type azureBlobStats struct {
@@ -688,75 +697,105 @@ func (s *azureBlobStats) TickErr(err error) {
        s.statsTicker.TickErr(err, errType)
 }
 
-// azureBlobClient wraps storage.BlobStorageClient in order to count
-// I/O and API usage stats.
-type azureBlobClient struct {
-       client *storage.BlobStorageClient
-       stats  azureBlobStats
+// azureContainer wraps storage.Container in order to count I/O and
+// API usage stats.
+type azureContainer struct {
+       ctr   *storage.Container
+       stats azureBlobStats
 }
 
-func (c *azureBlobClient) ContainerExists(cname string) (bool, error) {
+func (c *azureContainer) Exists() (bool, error) {
        c.stats.Tick(&c.stats.Ops)
-       ok, err := c.client.ContainerExists(cname)
+       ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
        return ok, err
 }
 
-func (c *azureBlobClient) GetBlobMetadata(cname, bname string) (map[string]string, error) {
+func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
-       m, err := c.client.GetBlobMetadata(cname, bname)
+       b := c.ctr.GetBlobReference(bname)
+       err := b.GetMetadata(nil)
        c.stats.TickErr(err)
-       return m, err
+       return b.Metadata, err
 }
 
-func (c *azureBlobClient) GetBlobProperties(cname, bname string) (*storage.BlobProperties, error) {
+func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
-       p, err := c.client.GetBlobProperties(cname, bname)
+       b := c.ctr.GetBlobReference(bname)
+       err := b.GetProperties(nil)
        c.stats.TickErr(err)
-       return p, err
+       return &b.Properties, err
 }
 
-func (c *azureBlobClient) GetBlob(cname, bname string) (io.ReadCloser, error) {
+func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
-       rdr, err := c.client.GetBlob(cname, bname)
+       b := c.ctr.GetBlobReference(bname)
+       rdr, err := b.Get(nil)
        c.stats.TickErr(err)
        return NewCountingReader(rdr, c.stats.TickInBytes), err
 }
 
-func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[string]string) (io.ReadCloser, error) {
+func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
-       rdr, err := c.client.GetBlobRange(cname, bname, byterange, hdrs)
+       b := c.ctr.GetBlobReference(bname)
+       rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
+               Range: &storage.BlobRange{
+                       Start: uint64(start),
+                       End:   uint64(end),
+               },
+               GetBlobOptions: opts,
+       })
        c.stats.TickErr(err)
        return NewCountingReader(rdr, c.stats.TickInBytes), err
 }
 
-func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
+// If we give it an io.Reader that doesn't also have a Len() int
+// method, the Azure SDK determines data size by copying the data into
+// a new buffer, which is not a good use of memory.
+type readerWithAzureLen struct {
+       io.Reader
+       len int
+}
+
+// Len satisfies the private lener interface in azure-sdk-for-go.
+func (r *readerWithAzureLen) Len() int {
+       return r.len
+}
+
+func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
-               rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+               rdr = &readerWithAzureLen{
+                       Reader: NewCountingReader(rdr, c.stats.TickOutBytes),
+                       len:    size,
+               }
        }
-       err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
+       b := c.ctr.GetBlobReference(bname)
+       err := b.CreateBlockBlobFromReader(rdr, opts)
        c.stats.TickErr(err)
        return err
 }
 
-func (c *azureBlobClient) SetBlobMetadata(cname, bname string, m, hdrs map[string]string) error {
+func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
-       err := c.client.SetBlobMetadata(cname, bname, m, hdrs)
+       b := c.ctr.GetBlobReference(bname)
+       b.Metadata = m
+       err := b.SetMetadata(opts)
        c.stats.TickErr(err)
        return err
 }
 
-func (c *azureBlobClient) ListBlobs(cname string, params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
-       resp, err := c.client.ListBlobs(cname, params)
+       resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
        return resp, err
 }
 
-func (c *azureBlobClient) DeleteBlob(cname, bname string, hdrs map[string]string) error {
+func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
-       err := c.client.DeleteBlob(cname, bname, hdrs)
+       b := c.ctr.GetBlobReference(bname)
+       err := b.Delete(opts)
        c.stats.TickErr(err)
        return err
 }
index 06216edcb82aeddc3617c121cecc4e1a6387be4a..60a7911768f009ef6209292d6c1e04b6cccbe6e7 100644 (file)
@@ -26,8 +26,8 @@ import (
        "testing"
        "time"
 
-       log "github.com/Sirupsen/logrus"
-       "github.com/curoverse/azure-sdk-for-go/storage"
+       "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -36,7 +36,7 @@ const (
        // used by Microsoft's Azure emulator: the Azure SDK
        // recognizes that magic string and changes its behavior to
        // cater to the Azure SDK's own test suite.
-       fakeAccountName = "fakeAccountName"
+       fakeAccountName = "fakeaccountname"
        fakeAccountKey  = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
 )
 
@@ -308,7 +308,7 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
                                b := storage.Blob{
                                        Name: hash,
                                        Properties: storage.BlobProperties{
-                                               LastModified:  blob.Mtime.Format(time.RFC1123),
+                                               LastModified:  storage.TimeRFC1123(blob.Mtime),
                                                ContentLength: int64(len(blob.Data)),
                                                Etag:          blob.Etag,
                                        },
@@ -386,7 +386,7 @@ func NewTestableAzureBlobVolume(t TB, readonly bool, replication int) *TestableA
                ReadOnly:         readonly,
                AzureReplication: replication,
                azClient:         azClient,
-               bsClient:         &azureBlobClient{client: &bs},
+               container:        &azureContainer{ctr: bs.GetContainerReference(container)},
        }
 
        return &TestableAzureBlobVolume{
@@ -708,6 +708,18 @@ func (s *StubbedAzureBlobSuite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
+func (s *StubbedAzureBlobSuite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: Azure
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.ContainerName, locator, data)
 }
index 91417fd52c37a23f940bb78a430cee56a9808c36..d2e7c9ebd3460b75855ca830ab05f6f9f5ab02b9 100644 (file)
@@ -8,8 +8,6 @@ import (
        "sync"
        "sync/atomic"
        "time"
-
-       log "github.com/Sirupsen/logrus"
 )
 
 type bufferPool struct {
index a6479e44378b75f1e9ad186871ac55834a3f4779..21b03edd49b967356c74958b1a1d762851babb60 100644 (file)
@@ -5,8 +5,9 @@
 package main
 
 import (
-       . "gopkg.in/check.v1"
        "time"
+
+       . "gopkg.in/check.v1"
 )
 
 var _ = Suite(&BufferPoolSuite{})
index 0a3ece41ad9d88589513c30a878f172276afd93c..17d6acdb68cca7463b9a7e9e49b9e1d9f3510229 100644 (file)
@@ -9,11 +9,17 @@ import (
        "encoding/json"
        "fmt"
        "io/ioutil"
+       "net/http"
+       "strconv"
        "strings"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       log "github.com/Sirupsen/logrus"
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+       "github.com/Sirupsen/logrus"
+       "github.com/golang/protobuf/jsonpb"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
 )
 
 type Config struct {
@@ -42,9 +48,23 @@ type Config struct {
        debugLogf       func(string, ...interface{})
 
        ManagementToken string
+
+       metrics
 }
 
-var theConfig = DefaultConfig()
+var (
+       theConfig = DefaultConfig()
+       formatter = map[string]logrus.Formatter{
+               "text": &logrus.TextFormatter{
+                       FullTimestamp:   true,
+                       TimestampFormat: rfc3339NanoFixed,
+               },
+               "json": &logrus.JSONFormatter{
+                       TimestampFormat: rfc3339NanoFixed,
+               },
+       }
+       log = logrus.StandardLogger()
+)
 
 const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
@@ -66,26 +86,19 @@ func DefaultConfig() *Config {
 // fields, and before using the config.
 func (cfg *Config) Start() error {
        if cfg.Debug {
-               log.SetLevel(log.DebugLevel)
+               log.Level = logrus.DebugLevel
                cfg.debugLogf = log.Printf
                cfg.debugLogf("debugging enabled")
        } else {
+               log.Level = logrus.InfoLevel
                cfg.debugLogf = func(string, ...interface{}) {}
        }
 
-       switch strings.ToLower(cfg.LogFormat) {
-       case "text":
-               log.SetFormatter(&log.TextFormatter{
-                       FullTimestamp:   true,
-                       TimestampFormat: rfc3339NanoFixed,
-               })
-       case "json":
-               log.SetFormatter(&log.JSONFormatter{
-                       TimestampFormat: rfc3339NanoFixed,
-               })
-       default:
+       f := formatter[strings.ToLower(cfg.LogFormat)]
+       if f == nil {
                return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
        }
+       log.Formatter = f
 
        if cfg.MaxBuffers < 0 {
                return fmt.Errorf("MaxBuffers must be greater than zero")
@@ -142,15 +155,71 @@ func (cfg *Config) Start() error {
        return nil
 }
 
+type metrics struct {
+       registry     *prometheus.Registry
+       reqDuration  *prometheus.SummaryVec
+       timeToStatus *prometheus.SummaryVec
+       exportProm   http.Handler
+}
+
+func (*metrics) Levels() []logrus.Level {
+       return logrus.AllLevels
+}
+
+func (m *metrics) Fire(ent *logrus.Entry) error {
+       if tts, ok := ent.Data["timeToStatus"].(stats.Duration); !ok {
+       } else if method, ok := ent.Data["reqMethod"].(string); !ok {
+       } else if code, ok := ent.Data["respStatusCode"].(int); !ok {
+       } else {
+               m.timeToStatus.WithLabelValues(strconv.Itoa(code), strings.ToLower(method)).Observe(time.Duration(tts).Seconds())
+       }
+       return nil
+}
+
+func (m *metrics) setup() {
+       m.registry = prometheus.NewRegistry()
+       m.timeToStatus = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+               Name: "time_to_status_seconds",
+               Help: "Summary of request TTFB.",
+       }, []string{"code", "method"})
+       m.reqDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
+               Name: "request_duration_seconds",
+               Help: "Summary of request duration.",
+       }, []string{"code", "method"})
+       m.registry.MustRegister(m.timeToStatus)
+       m.registry.MustRegister(m.reqDuration)
+       m.exportProm = promhttp.HandlerFor(m.registry, promhttp.HandlerOpts{
+               ErrorLog: log,
+       })
+       log.AddHook(m)
+}
+
+func (m *metrics) exportJSON(w http.ResponseWriter, req *http.Request) {
+       jm := jsonpb.Marshaler{Indent: "  "}
+       mfs, _ := m.registry.Gather()
+       w.Write([]byte{'['})
+       for i, mf := range mfs {
+               if i > 0 {
+                       w.Write([]byte{','})
+               }
+               jm.Marshal(w, mf)
+       }
+       w.Write([]byte{']'})
+}
+
+func (m *metrics) Instrument(next http.Handler) http.Handler {
+       return promhttp.InstrumentHandlerDuration(m.reqDuration, next)
+}
+
 // VolumeTypes is built up by init() funcs in the source files that
 // define the volume types.
 var VolumeTypes = []func() VolumeWithExamples{}
 
 type VolumeList []Volume
 
-// UnmarshalJSON, given an array of objects, deserializes each object
-// as the volume type indicated by the object's Type field.
-func (vols *VolumeList) UnmarshalJSON(data []byte) error {
+// UnmarshalJSON -- given an array of objects -- deserializes each
+// object as the volume type indicated by the object's Type field.
+func (vl *VolumeList) UnmarshalJSON(data []byte) error {
        typeMap := map[string]func() VolumeWithExamples{}
        for _, factory := range VolumeTypes {
                t := factory().Type()
@@ -183,7 +252,7 @@ func (vols *VolumeList) UnmarshalJSON(data []byte) error {
                if err != nil {
                        return err
                }
-               *vols = append(*vols, vol)
+               *vl = append(*vl, vol)
        }
        return nil
 }
index 5e55042c28747de26c0f9e1ad1806bcec00ad5c7..d6471e3d4547d45f577131c0a6ecb2f1ca59c52c 100644 (file)
@@ -5,9 +5,10 @@
 package main
 
 import (
-       log "github.com/Sirupsen/logrus"
+       "github.com/Sirupsen/logrus"
 )
 
 func init() {
+       log.Level = logrus.DebugLevel
        theConfig.debugLogf = log.Printf
 }
index daf4fc69ddff2702f0da43fd0af4d4927374bb2c..258604ce59c50176a4d4473fd7dbf87c8d66de9c 100644 (file)
@@ -31,7 +31,6 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       log "github.com/Sirupsen/logrus"
 )
 
 type router struct {
@@ -41,54 +40,63 @@ type router struct {
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter() *router {
-       rest := mux.NewRouter()
-       rtr := &router{Router: rest}
+func MakeRESTRouter() http.Handler {
+       rtr := &router{Router: mux.NewRouter()}
 
-       rest.HandleFunc(
+       rtr.HandleFunc(
                `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
-       rest.HandleFunc(
+       rtr.HandleFunc(
                `/{hash:[0-9a-f]{32}}+{hints}`,
                GetBlockHandler).Methods("GET", "HEAD")
 
-       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
-       rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
+       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
+       rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
        // List all blocks stored here. Privileged client only.
-       rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
+       rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
        // List blocks stored here whose hash has the given prefix.
        // Privileged client only.
-       rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
+       rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
 
        // Internals/debugging info (runtime.MemStats)
-       rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
+       rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
 
        // List volumes: path, device number, bytes used/avail.
-       rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
+       rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
 
        // List mounts: UUID, readonly, tier, device ID, ...
-       rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
-       rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
-       rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
+       rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
+       rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
+       rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
 
        // Replace the current pull queue.
-       rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
+       rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
 
        // Replace the current trash queue.
-       rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
+       rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
        // Untrash moves blocks from trash back into store
-       rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+       rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
 
-       rest.Handle("/_health/{check}", &health.Handler{
+       rtr.Handle("/_health/{check}", &health.Handler{
                Token:  theConfig.ManagementToken,
                Prefix: "/_health/",
        }).Methods("GET")
 
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
-       rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
+       rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
-       return rtr
+       theConfig.metrics.setup()
+
+       rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+
+       mux := http.NewServeMux()
+       mux.Handle("/", theConfig.metrics.Instrument(
+               httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
+       mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
+       mux.Handle("/metrics", theConfig.metrics.exportProm)
+
+       return mux
 }
 
 // BadRequestHandler is a HandleFunc to address bad requests.
index b8a0ffb1cba46777ff1e2d1c745eb8102ea5fa61..03eef7e76b0b897ed2cb70b95f22989b76436123 100644 (file)
@@ -16,9 +16,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
 )
 
@@ -160,9 +158,6 @@ func main() {
 
        // Middleware/handler stack
        router := MakeRESTRouter()
-       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
-       router.limiter = limiter
-       http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
@@ -204,7 +199,7 @@ func main() {
                log.Printf("Error notifying init daemon: %v", err)
        }
        log.Println("listening at", listener.Addr())
-       srv := &http.Server{}
+       srv := &http.Server{Handler: router}
        srv.Serve(listener)
 }
 
index b4544d0f92225e3d17678dba8cd3b52865206b0f..66a212456d51c78e09cc12dc41bdccb17bf953df 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "bytes"
        "context"
        "encoding/json"
        "net/http"
@@ -24,8 +25,10 @@ type MountsSuite struct {
 func (s *MountsSuite) SetUpTest(c *check.C) {
        s.vm = MakeTestVolumeManager(2)
        KeepVM = s.vm
-       s.rtr = MakeRESTRouter()
+       theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
+       theConfig.Start()
+       s.rtr = MakeRESTRouter()
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
@@ -40,14 +43,14 @@ func (s *MountsSuite) TestMounts(c *check.C) {
        vols[0].Put(context.Background(), TestHash, TestBlock)
        vols[1].Put(context.Background(), TestHash2, TestBlock2)
 
-       resp := s.call("GET", "/mounts", "")
+       resp := s.call("GET", "/mounts", "", nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        var mntList []struct {
-               UUID        string
-               DeviceID    string
-               ReadOnly    bool
-               Replication int
-               Tier        int
+               UUID           string
+               DeviceID       string
+               ReadOnly       bool
+               Replication    int
+               StorageClasses []string
        }
        err := json.Unmarshal(resp.Body.Bytes(), &mntList)
        c.Assert(err, check.IsNil)
@@ -58,13 +61,13 @@ func (s *MountsSuite) TestMounts(c *check.C) {
                c.Check(m.DeviceID, check.Equals, "mock-device-id")
                c.Check(m.ReadOnly, check.Equals, false)
                c.Check(m.Replication, check.Equals, 1)
-               c.Check(m.Tier, check.Equals, 1)
+               c.Check(m.StorageClasses, check.DeepEquals, []string{"default"})
        }
        c.Check(mntList[0].UUID, check.Not(check.Equals), mntList[1].UUID)
 
        // Bad auth
        for _, tok := range []string{"", "xyzzy"} {
-               resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks", tok)
+               resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks", tok, nil)
                c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
                c.Check(resp.Body.String(), check.Equals, "Unauthorized\n")
        }
@@ -72,34 +75,74 @@ func (s *MountsSuite) TestMounts(c *check.C) {
        tok := arvadostest.DataManagerToken
 
        // Nonexistent mount UUID
-       resp = s.call("GET", "/mounts/X/blocks", tok)
+       resp = s.call("GET", "/mounts/X/blocks", tok, nil)
        c.Check(resp.Code, check.Equals, http.StatusNotFound)
        c.Check(resp.Body.String(), check.Equals, "mount not found\n")
 
        // Complete index of first mount
-       resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks", tok)
+       resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks", tok, nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        c.Check(resp.Body.String(), check.Matches, TestHash+`\+[0-9]+ [0-9]+\n\n`)
 
        // Partial index of first mount (one block matches prefix)
-       resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks?prefix="+TestHash[:2], tok)
+       resp = s.call("GET", "/mounts/"+mntList[0].UUID+"/blocks?prefix="+TestHash[:2], tok, nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        c.Check(resp.Body.String(), check.Matches, TestHash+`\+[0-9]+ [0-9]+\n\n`)
 
        // Complete index of second mount (note trailing slash)
-       resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/", tok)
+       resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/", tok, nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        c.Check(resp.Body.String(), check.Matches, TestHash2+`\+[0-9]+ [0-9]+\n\n`)
 
        // Partial index of second mount (no blocks match prefix)
-       resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/?prefix="+TestHash[:2], tok)
+       resp = s.call("GET", "/mounts/"+mntList[1].UUID+"/blocks/?prefix="+TestHash[:2], tok, nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        c.Check(resp.Body.String(), check.Equals, "\n")
 }
 
-func (s *MountsSuite) call(method, path, tok string) *httptest.ResponseRecorder {
+func (s *MountsSuite) TestMetrics(c *check.C) {
+       s.call("PUT", "/"+TestHash, "", TestBlock)
+       s.call("PUT", "/"+TestHash2, "", TestBlock2)
+       resp := s.call("GET", "/metrics.json", "", nil)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       var j []struct {
+               Name   string
+               Help   string
+               Type   string
+               Metric []struct {
+                       Label []struct {
+                               Name  string
+                               Value string
+                       }
+                       Summary struct {
+                               SampleCount string  `json:"sample_count"`
+                               SampleSum   float64 `json:"sample_sum"`
+                               Quantile    []struct {
+                                       Quantile float64
+                                       Value    float64
+                               }
+                       }
+               }
+       }
+       json.NewDecoder(resp.Body).Decode(&j)
+       found := make(map[string]bool)
+       for _, g := range j {
+               for _, m := range g.Metric {
+                       if len(m.Label) == 2 && m.Label[0].Name == "code" && m.Label[0].Value == "200" && m.Label[1].Name == "method" && m.Label[1].Value == "put" {
+                               c.Check(m.Summary.SampleCount, check.Equals, "2")
+                               c.Check(len(m.Summary.Quantile), check.Not(check.Equals), 0)
+                               c.Check(m.Summary.Quantile[0].Value, check.Not(check.Equals), float64(0))
+                               found[g.Name] = true
+                       }
+               }
+       }
+       c.Check(found["request_duration_seconds"], check.Equals, true)
+       c.Check(found["time_to_status_seconds"], check.Equals, true)
+}
+
+func (s *MountsSuite) call(method, path, tok string, body []byte) *httptest.ResponseRecorder {
        resp := httptest.NewRecorder()
-       req, _ := http.NewRequest(method, path, nil)
+       req, _ := http.NewRequest(method, path, bytes.NewReader(body))
        if tok != "" {
                req.Header.Set("Authorization", "OAuth2 "+tok)
        }
index f821fb548350587528c29bc9fab53528b424eaab..42b5d5889d30984685d43655f66fb3857f98de92 100644 (file)
@@ -13,8 +13,6 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-
-       log "github.com/Sirupsen/logrus"
 )
 
 // RunPullWorker receives PullRequests from pullq, invokes
index 61e69f9096503eb88cf9328af5e85f129dba4226..a60b2fc27e321f553c9784691702282ecb39a6e4 100644 (file)
@@ -23,7 +23,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
-       log "github.com/Sirupsen/logrus"
 )
 
 const (
@@ -153,6 +152,7 @@ type S3Volume struct {
        RaceWindow         arvados.Duration
        ReadOnly           bool
        UnsafeDelete       bool
+       StorageClasses     []string
 
        bucket *s3bucket
 
@@ -687,6 +687,11 @@ func (v *S3Volume) Replication() int {
        return v.S3Replication
 }
 
+// GetStorageClasses implements Volume
+func (v *S3Volume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
 
 func (v *S3Volume) isKeepBlock(s string) bool {
index 3d4a1956230473e264c3b22fc8d5db112b22160f..4081e1e63c4825a08712a93bd552de7818f018d5 100644 (file)
@@ -19,7 +19,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
-       log "github.com/Sirupsen/logrus"
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -436,6 +436,18 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
        return v
 }
 
+func (s *StubbedS3Suite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: S3
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
+
 func (v *TestableS3Volume) Start() error {
        tmp, err := ioutil.TempFile("", "keepstore")
        v.c.Assert(err, check.IsNil)
index 51fbb947917328e30db244ca841441a983d8bec5..cbb831ebc000849c71fedd945a4574ea6f4f1171 100644 (file)
@@ -9,7 +9,6 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       log "github.com/Sirupsen/logrus"
 )
 
 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
index 69802abdd1b5c4e22422293331d6cb0eec371896..1f8fba5d067c2a0731cb05eeebf81cc76bc315b7 100644 (file)
@@ -240,6 +240,9 @@ type Volume interface {
        // Return a globally unique ID of the underlying storage
        // device if possible, otherwise "".
        DeviceID() string
+
+       // Get the storage classes associated with this volume
+       GetStorageClasses() []string
 }
 
 // A VolumeWithExamples provides example configs to display in the
@@ -284,12 +287,12 @@ type VolumeManager interface {
 
 // A VolumeMount is an attachment of a Volume to a VolumeManager.
 type VolumeMount struct {
-       UUID        string
-       DeviceID    string
-       ReadOnly    bool
-       Replication int
-       Tier        int
-       volume      Volume
+       UUID           string
+       DeviceID       string
+       ReadOnly       bool
+       Replication    int
+       StorageClasses []string
+       volume         Volume
 }
 
 // Generate a UUID the way API server would for a "KeepVolumeMount"
@@ -326,13 +329,17 @@ func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
        }
        vm.mountMap = make(map[string]*VolumeMount)
        for _, v := range volumes {
+               sc := v.GetStorageClasses()
+               if len(sc) == 0 {
+                       sc = []string{"default"}
+               }
                mnt := &VolumeMount{
-                       UUID:        (*VolumeMount)(nil).generateUUID(),
-                       DeviceID:    v.DeviceID(),
-                       ReadOnly:    !v.Writable(),
-                       Replication: v.Replication(),
-                       Tier:        1,
-                       volume:      v,
+                       UUID:           (*VolumeMount)(nil).generateUUID(),
+                       DeviceID:       v.DeviceID(),
+                       ReadOnly:       !v.Writable(),
+                       Replication:    v.Replication(),
+                       StorageClasses: sc,
+                       volume:         v,
                }
                vm.iostats[v] = &ioStats{}
                vm.mounts = append(vm.mounts, mnt)
index baed6a71b60ae556a28b0cc3478e147ad77f90af..43ddd090cc1cfd22419e80aa86f1e838ffebd479 100644 (file)
@@ -241,3 +241,7 @@ func (v *MockVolume) Replication() int {
 
 func (v *MockVolume) EmptyTrash() {
 }
+
+func (v *MockVolume) GetStorageClasses() []string {
+       return nil
+}
index da9b110c56e80f1c4572aceb09b65c6dd29e8578..b4f18ad13e6d0c93e0c0b40023b9153d3c7a6d99 100644 (file)
@@ -20,8 +20,6 @@ import (
        "sync"
        "syscall"
        "time"
-
-       log "github.com/Sirupsen/logrus"
 )
 
 type unixVolumeAdder struct {
@@ -112,6 +110,7 @@ type UnixVolume struct {
        ReadOnly             bool
        Serialize            bool
        DirectoryReplication int
+       StorageClasses       []string
 
        // something to lock during IO, typically a sync.Mutex (or nil
        // to skip locking)
@@ -646,6 +645,11 @@ func (v *UnixVolume) Replication() int {
        return v.DirectoryReplication
 }
 
+// GetStorageClasses implements Volume
+func (v *UnixVolume) GetStorageClasses() []string {
+       return v.StorageClasses
+}
+
 // InternalStats returns I/O and filesystem ops counters.
 func (v *UnixVolume) InternalStats() interface{} {
        return &v.os.stats
index ea3d91d98c2e42deed99417c87254af42b8148e8..7f1cd219644ab241f2c0a8a0e2353c8f4c16844f 100644 (file)
@@ -19,6 +19,7 @@ import (
        "testing"
        "time"
 
+       "github.com/ghodss/yaml"
        check "gopkg.in/check.v1"
 )
 
@@ -427,3 +428,15 @@ func (s *UnixVolumeSuite) TestStats(c *check.C) {
        c.Check(err, check.IsNil)
        c.Check(stats(), check.Matches, `.*"FlockOps":2,.*`)
 }
+
+func (s *UnixVolumeSuite) TestConfig(c *check.C) {
+       var cfg Config
+       err := yaml.Unmarshal([]byte(`
+Volumes:
+  - Type: Directory
+    StorageClasses: ["class_a", "class_b"]
+`), &cfg)
+
+       c.Check(err, check.IsNil)
+       c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
+}
index 6c61e32b8db2c10cbef17216e8e3c80c9e7bfa4e..37d7088b7a7c65bc8632e21269f465d6850d50b9 100644 (file)
@@ -113,14 +113,16 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
-        self.arvados_node = self._arvados.nodes().create(body={}).execute()
+        self.arvados_node = self._arvados.nodes().create(
+            body={}, assign_slot=True).execute()
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
-        self.arvados_node = self._clean_arvados_node(
-            node, "Prepared by Node Manager")
+        self._clean_arvados_node(node, "Prepared by Node Manager")
+        self.arvados_node = self._arvados.nodes().update(
+            uuid=node['uuid'], body={}, assign_slot=True).execute()
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._finish_on_exception
@@ -315,7 +317,8 @@ class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
 
     @RetryMixin._retry()
     def sync_node(self, cloud_node, arvados_node):
-        return self._cloud.sync_node(cloud_node, arvados_node)
+        if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
+            return self._cloud.sync_node(cloud_node, arvados_node)
 
 
 class ComputeNodeMonitorActor(config.actor_class):
@@ -326,14 +329,13 @@ class ComputeNodeMonitorActor(config.actor_class):
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+                 timer_actor, update_actor, cloud_client,
                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.tell_proxy()
         self._shutdowns = shutdown_timer
-        self._cloud_node_fqdn = cloud_fqdn_func
         self._timer = timer_actor
         self._update = update_actor
         self._cloud = cloud_client
@@ -486,8 +488,11 @@ class ComputeNodeMonitorActor(config.actor_class):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
-        # If the cloud node's FQDN doesn't match what's in the Arvados node
-        # record, make them match.
+        """Called when the latest Arvados node record is retrieved.
+
+        Calls the updater's sync_node() method.
+
+        """
         # This method is a little unusual in the way it just fires off the
         # request without checking the result or retrying errors.  That's
         # because this update happens every time we reload the Arvados node
@@ -496,7 +501,5 @@ class ComputeNodeMonitorActor(config.actor_class):
         # the logic to throttle those effective retries when there's trouble.
         if arvados_node is not None:
             self.arvados_node = arvados_node
-            if (self._cloud_node_fqdn(self.cloud_node) !=
-                  arvados_node_fqdn(self.arvados_node)):
-                self._update.sync_node(self.cloud_node, self.arvados_node)
+            self._update.sync_node(self.cloud_node, self.arvados_node)
             self._later.consider_shutdown()
index c8883c3ae70f6614b7bd9063030c14e264dfe543..1cf8f4e41d776e5861c41816aff34cf2d98604db 100644 (file)
@@ -8,8 +8,8 @@ from __future__ import absolute_import, print_function
 import subprocess
 import time
 
-from . import \
-    ComputeNodeSetupActor, ComputeNodeMonitorActor
+from . import ComputeNodeMonitorActor
+from . import ComputeNodeSetupActor as SetupActorBase
 from . import ComputeNodeShutdownActor as ShutdownActorBase
 from . import ComputeNodeUpdateActor as UpdateActorBase
 from .. import RetryMixin
@@ -20,16 +20,32 @@ class SlurmMixin(object):
                                   'fail\n', 'fail*\n'])
     SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
 
-    def _set_node_state(self, nodename, state, *args):
-        cmd = ['scontrol', 'update', 'NodeName=' + nodename,
-               'State=' + state]
-        cmd.extend(args)
-        subprocess.check_output(cmd)
+    def _update_slurm_node(self, nodename, updates):
+        cmd = ['scontrol', 'update', 'NodeName=' + nodename] + updates
+        try:
+            subprocess.check_output(cmd)
+        except:
+            self._logger.error(
+                "SLURM update %r failed", cmd, exc_info=True)
+
+    def _update_slurm_size_attrs(self, nodename, size):
+        self._update_slurm_node(nodename, [
+            'Weight=%i' % int(size.price * 1000),
+            'Features=instancetype=' + size.id,
+        ])
 
     def _get_slurm_state(self, nodename):
         return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
 
 
+class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
+    def create_cloud_node(self):
+        hostname = self.arvados_node.get("hostname")
+        if hostname:
+            self._update_slurm_size_attrs(hostname, self.cloud_size)
+        return super(ComputeNodeSetupActor, self).create_cloud_node()
+
+
 class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
     def on_start(self):
         arv_node = self._arvados_node()
@@ -47,7 +63,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if self._nodename:
             if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
                 # Resume from "drng" or "drain"
-                self._set_node_state(self._nodename, 'RESUME')
+                self._update_slurm_node(self._nodename, ['State=RESUME'])
             else:
                 # Node is in a state such as 'idle' or 'alloc' so don't
                 # try to resume it because that will just raise an error.
@@ -59,7 +75,8 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if self.cancel_reason is not None:
             return
         if self._nodename:
-            self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+            self._update_slurm_node(self._nodename, [
+                'State=DRAIN', 'Reason=Node Manager shutdown'])
             self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
             self._later.await_slurm_drain()
         else:
@@ -82,15 +99,20 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
 
     def _destroy_node(self):
         if self._nodename:
-            self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+            self._update_slurm_node(self._nodename, [
+                'State=DOWN', 'Reason=Node Manager shutdown'])
         super(ComputeNodeShutdownActor, self)._destroy_node()
 
 
-class ComputeNodeUpdateActor(UpdateActorBase):
+class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
     def sync_node(self, cloud_node, arvados_node):
-        if arvados_node.get("hostname"):
-            try:
-                subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
-            except:
-                self._logger.error("Unable to set slurm node weight.", exc_info=True)
-        return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)
+        """Keep SLURM's node properties up to date."""
+        hostname = arvados_node.get("hostname")
+        features = arvados_node.get("slurm_node_features", "").split(",")
+        sizefeature = "instancetype=" + cloud_node.size.id
+        if hostname and sizefeature not in features:
+            # This probably means SLURM has restarted and lost our
+            # dynamically configured node weights and features.
+            self._update_slurm_size_attrs(hostname, cloud_node.size)
+        return super(ComputeNodeUpdateActor, self).sync_node(
+            cloud_node, arvados_node)
index 419557fe288ded9c4c4706bcd47a58b035ce2e65..3f1d575361a461f322e6475fab28b059d973e193 100644 (file)
@@ -38,7 +38,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         super(ComputeNodeDriver, self).__init__(
             auth_kwargs, list_kwargs, create_kwargs,
             driver_class)
-        self._sizes_by_name = {sz.name: sz for sz in self.sizes.itervalues()}
+        self._sizes_by_id = {sz.id: sz for sz in self.sizes.itervalues()}
         self._disktype_links = {dt.name: self._object_link(dt)
                                 for dt in self.real.ex_list_disktypes()}
 
@@ -120,7 +120,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         # and monkeypatch the results when that's the case.
         if nodelist and not hasattr(nodelist[0].size, 'id'):
             for node in nodelist:
-                node.size = self._sizes_by_name[node.size]
+                node.size = self._sizes_by_id[node.size]
         return nodelist
 
     @classmethod
index dd441edb6b70f1df4c8144e818bdd0501c443ffc..73b58bfe65fc0cc871579a22500424d2ec2f87fc 100644 (file)
@@ -167,7 +167,6 @@ class NodeManagerDaemonActor(actor_class):
             cloud_node=cloud_node,
             cloud_node_start_time=start_time,
             shutdown_timer=shutdown_timer,
-            cloud_fqdn_func=self._cloud_driver.node_fqdn,
             update_actor=self._cloud_updater,
             timer_actor=self._timer,
             arvados_node=None,
index 4d2d3e0c0ace3e6ff9db5832d3f8a9dcc4b7ad9a..0360bfc5424913c2f85bafc52d2ceaf29bd41296 100644 (file)
@@ -6,6 +6,7 @@
 from __future__ import absolute_import, print_function
 
 import logging
+import re
 import subprocess
 
 import arvados.util
@@ -74,13 +75,15 @@ class ServerCalculator(object):
             return fallback
 
     def cloud_size_for_constraints(self, constraints):
+        specified_size = constraints.get('instance_type')
         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
         for size in self.cloud_sizes:
-            if size.meets_constraints(**wants):
-                return size
+            if (size.meets_constraints(**wants) and
+                (specified_size is None or size.id == specified_size)):
+                    return size
         return None
 
     def servers_for_queue(self, queue):
@@ -92,8 +95,7 @@ class ServerCalculator(object):
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
                 unsatisfiable_jobs[job['uuid']] = (
-                    'Requirements for a single node exceed the available '
-                    'cloud node size')
+                    "Constraints cannot be satisfied by any node type")
             elif (want_count > self.max_nodes):
                 unsatisfiable_jobs[job['uuid']] = (
                     "Job's min_nodes constraint is greater than the configured "
@@ -152,21 +154,43 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
         queuelist = []
         if self.slurm_queue:
             # cpus, memory, tempory disk space, reason, job name
-            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
             for out in squeue_out.splitlines():
                 try:
-                    cpu, ram, disk, reason, jobname = out.split("|", 4)
-                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
-                        queuelist.append({
-                            "uuid": jobname,
-                            "runtime_constraints": {
-                                "min_cores_per_node": cpu,
-                                "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                                "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                            }
-                        })
+                    cpu, ram, disk, reason, jobname, features = out.split("|", 5)
                 except ValueError:
-                    pass
+                    self._logger.warning("ignored malformed line in squeue output: %r", out)
+                    continue
+                if '-dz642-' not in jobname:
+                    continue
+                if not re.search(r'ReqNodeNotAvail|Resources|Priority', reason):
+                    continue
+
+                for feature in features.split(','):
+                    m = re.match(r'instancetype=(.*)', feature)
+                    if not m:
+                        continue
+                    instance_type = m.group(1)
+                    # Ignore cpu/ram/scratch requirements, bring up
+                    # the requested node type.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "instance_type": instance_type,
+                        }
+                    })
+                    break
+                else:
+                    # No instance type specified. Choose a node type
+                    # to suit cpu/ram/scratch requirements.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "min_cores_per_node": cpu,
+                            "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                            "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                        }
+                    })
 
         if self.jobs_queue:
             queuelist.extend(self._client.jobs().queue().execute()['items'])
index 70ad54d789cff1e34e4f39beb759939b7b2bdf3d..4b9d5b60fb0ce5131d865f4b3d97b0652afb88c8 100644 (file)
@@ -15,8 +15,9 @@ import arvados.util
 class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to poll the Arvados node list.
 
-    This actor regularly polls the list of Arvados node records, and
-    sends it to subscribers.
+    This actor regularly polls the list of Arvados node records,
+    augments it with the latest SLURM node info (`sinfo`), and sends
+    it to subscribers.
     """
 
     def is_common_error(self, exception):
@@ -29,28 +30,32 @@ class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
         nodelist = arvados.util.list_all(self._client.nodes().list)
 
         # node hostname, state
-        sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n %t"])
+        sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n|%t|%f"])
         nodestates = {}
+        nodefeatures = {}
         for out in sinfo_out.splitlines():
             try:
-                nodename, state = out.split(" ", 2)
-                if state in ('alloc', 'alloc*',
-                             'comp',  'comp*',
-                             'mix',   'mix*',
-                             'drng',  'drng*'):
-                    nodestates[nodename] = 'busy'
-                elif state in ('idle', 'fail'):
-                    nodestates[nodename] = state
-                else:
-                    nodestates[nodename] = 'down'
+                nodename, state, features = out.split("|", 3)
             except ValueError:
-                pass
+                continue
+            if state in ('alloc', 'alloc*',
+                         'comp',  'comp*',
+                         'mix',   'mix*',
+                         'drng',  'drng*'):
+                nodestates[nodename] = 'busy'
+            elif state in ('idle', 'fail'):
+                nodestates[nodename] = state
+            else:
+                nodestates[nodename] = 'down'
+            if features != "(null)":
+                nodefeatures[nodename] = features
 
         for n in nodelist:
             if n["slot_number"] and n["hostname"] and n["hostname"] in nodestates:
                 n["crunch_worker_state"] = nodestates[n["hostname"]]
             else:
                 n["crunch_worker_state"] = 'down'
+            n["slurm_node_features"] = nodefeatures.get(n["hostname"], "")
 
         return nodelist
 
index d5b55540f8e66eea3749d1828d3203ae89521f92..7b8ba391c9ced07de6a39d7b7952695d619b126e 100755 (executable)
@@ -25,9 +25,13 @@ from functools import partial
 import arvados
 import StringIO
 
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
+handler = logging.StreamHandler(sys.stderr)
+handler.setFormatter(formatter)
 logger = logging.getLogger("logger")
 logger.setLevel(logging.INFO)
-logger.addHandler(logging.StreamHandler(sys.stderr))
+logger.addHandler(handler)
 
 detail = logging.getLogger("detail")
 detail.setLevel(logging.INFO)
@@ -35,7 +39,9 @@ if os.environ.get("ANMTEST_LOGLEVEL"):
     detail_content = sys.stderr
 else:
     detail_content = StringIO.StringIO()
-detail.addHandler(logging.StreamHandler(detail_content))
+handler = logging.StreamHandler(detail_content)
+handler.setFormatter(formatter)
+detail.addHandler(handler)
 
 fake_slurm = None
 compute_nodes = None
@@ -52,14 +58,14 @@ def update_script(path, val):
 def set_squeue(g):
     global all_jobs
     update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
-                  "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+                  "\n".join("echo '1|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
     return 0
 
 def set_queue_unsatisfiable(g):
     global all_jobs, unsatisfiable_job_scancelled
     # Simulate a job requesting a 99 core node.
     update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
-                  "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+                  "\n".join("echo '99|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
     update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
                   "\ntouch %s" % unsatisfiable_job_scancelled)
     return 0
@@ -78,7 +84,7 @@ def job_cancelled(g):
             ['event_type', '=', 'stderr'],
         ]).execute()['items'][0]
     if not re.match(
-            r"Requirements for a single node exceed the available cloud node size",
+            r"Constraints cannot be satisfied",
             log_entry['properties']['text']):
         return 1
     return 0
@@ -88,7 +94,7 @@ def node_paired(g):
     compute_nodes[g.group(1)] = g.group(3)
 
     update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
-                  "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+                  "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
 
     for k,v in all_jobs.items():
         if v == "ReqNodeNotAvail":
@@ -101,7 +107,7 @@ def node_paired(g):
 
 def remaining_jobs(g):
     update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
-                  "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+                  "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
 
     for k,v in all_jobs.items():
         all_jobs[k] = "Running"
@@ -113,7 +119,7 @@ def remaining_jobs(g):
 
 def node_busy(g):
     update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
-                  "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+                  "\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
     return 0
 
 def node_shutdown(g):
index 4b352059e629bbc22c667347812d7046960c0da7..0a2deb8a9cdd70ca7a72f1ef41b067bbe2f00ea4 100644 (file)
@@ -21,9 +21,16 @@ from arvnodeman.computenode.driver import BaseComputeNodeDriver
 from . import testutil
 
 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    ACTOR_CLASS = dispatch.ComputeNodeSetupActor
+
     def make_mocks(self, arvados_effect=None):
         if arvados_effect is None:
-            arvados_effect = [testutil.arvados_node_mock()]
+            arvados_effect = [testutil.arvados_node_mock(
+                slot_number=None,
+                hostname=None,
+                first_ping_at=None,
+                last_ping_at=None,
+            )]
         self.arvados_effect = arvados_effect
         self.timer = testutil.MockTimer()
         self.api_client = mock.MagicMock(name='api_client')
@@ -35,7 +42,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
     def make_actor(self, arv_node=None):
         if not hasattr(self, 'timer'):
             self.make_mocks(arvados_effect=[arv_node] if arv_node else None)
-        self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+        self.setup_actor = self.ACTOR_CLASS.start(
             self.timer, self.api_client, self.cloud_client,
             testutil.MockSize(1), arv_node).proxy()
 
@@ -56,6 +63,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.assertEqual(self.arvados_effect[-1],
                          self.setup_actor.arvados_node.get(self.TIMEOUT))
         assert(finished.wait(self.TIMEOUT))
+        self.api_client.nodes().create.called_with(body={}, assign_slot=True)
         self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
         self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
         self.assert_node_properties_updated()
@@ -71,7 +79,8 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
                          self.setup_actor.arvados_node.get(self.TIMEOUT))
         assert(finished.wait(self.TIMEOUT))
         self.assert_node_properties_updated()
-        self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
+        self.api_client.nodes().create.called_with(body={}, assign_slot=True)
+        self.assertEqual(3, self.api_client.nodes().update().execute.call_count)
         self.assertEqual(self.cloud_client.create_node(),
                          self.setup_actor.cloud_node.get(self.TIMEOUT))
 
@@ -188,7 +197,7 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
             start_time = time.time()
         monitor_actor = dispatch.ComputeNodeMonitorActor.start(
             self.cloud_node, start_time, self.shutdowns,
-            testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+            self.timer, self.updates, self.cloud_client,
             self.arvados_node)
         self.shutdown_actor = self.ACTOR_CLASS.start(
             self.timer, self.cloud_client, self.arvados_client, monitor_actor,
@@ -326,7 +335,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
             start_time = time.time()
         self.node_actor = dispatch.ComputeNodeMonitorActor.start(
             self.cloud_mock, start_time, self.shutdowns,
-            testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+            self.timer, self.updates, self.cloud_client,
             arv_node, boot_fail_after=300).proxy()
         self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
 
@@ -511,19 +520,10 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(testutil.ip_address_mock(4),
                          current_arvados['ip_address'])
 
-    def test_update_arvados_node_syncs_when_fqdn_mismatch(self):
+    def test_update_arvados_node_calls_sync_node(self):
         self.make_mocks(5)
         self.cloud_mock.extra['testname'] = 'cloudfqdn.zzzzz.arvadosapi.com'
         self.make_actor()
         arv_node = testutil.arvados_node_mock(5)
         self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
         self.assertEqual(1, self.updates.sync_node.call_count)
-
-    def test_update_arvados_node_skips_sync_when_fqdn_match(self):
-        self.make_mocks(6)
-        arv_node = testutil.arvados_node_mock(6)
-        self.cloud_mock.extra['testname'] ='{n[hostname]}.{n[domain]}'.format(
-            n=arv_node)
-        self.make_actor()
-        self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
-        self.assertEqual(0, self.updates.sync_node.call_count)
index 0b6162dfaa64405df53794bb575bfffd2420bbff..b61db5cba1c57918c622d1ee815461ba6fe6de77 100644 (file)
@@ -13,7 +13,10 @@ import mock
 
 import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
 from . import testutil
-from .test_computenode_dispatch import ComputeNodeShutdownActorMixin, ComputeNodeUpdateActorTestCase
+from .test_computenode_dispatch import \
+    ComputeNodeShutdownActorMixin, \
+    ComputeNodeSetupActorTestCase, \
+    ComputeNodeUpdateActorTestCase
 
 @mock.patch('subprocess.check_output')
 class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
@@ -123,4 +126,18 @@ class SLURMComputeNodeUpdateActorTestCase(ComputeNodeUpdateActorTestCase):
         cloud_node = testutil.cloud_node_mock()
         arv_node = testutil.arvados_node_mock()
         self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
-        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000'])
+        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000', 'Features=instancetype=z99.test'])
+
+class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
+    ACTOR_CLASS = slurm_dispatch.ComputeNodeSetupActor
+
+    @mock.patch('subprocess.check_output')
+    def test_update_node_features(self, check_output):
+        # `scontrol update` happens only if the Arvados node record
+        # has a hostname. ComputeNodeSetupActorTestCase.make_mocks
+        # uses mocks with scrubbed hostnames, so we override with the
+        # default testutil.arvados_node_mock.
+        self.make_mocks(arvados_effect=[testutil.arvados_node_mock()])
+        self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
index cfc4add63b20992b452fbc59ecb02c02e8ebdaf5..f0942e93785571f8ae4e3cdb7f0c78eb173ee7b6 100644 (file)
@@ -211,7 +211,7 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         # patches that up in listings.
         size = testutil.MockSize(2)
         node = testutil.cloud_node_mock(size=size)
-        node.size = size.name
+        node.size = size.id
         self.driver_mock().list_sizes.return_value = [size]
         self.driver_mock().list_nodes.return_value = [node]
         driver = self.new_driver()
index ebe7408e705b02e2d55b2d757ef5367953f23242..50fa0aa68a4fb26c58cdb6978594805e314a7e31 100644 (file)
@@ -23,12 +23,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
 
     def busywait(self, f):
-        n = 0
-        while not f() and n < 200:
+        for n in xrange(200):
+            ok = f()
+            if ok:
+                return
             time.sleep(.1)
             self.daemon.ping().get(self.TIMEOUT)
-            n += 1
-        self.assertTrue(f())
+        self.assertTrue(ok) # always falsy, but not necessarily False
 
     def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
index b1d5e002767a000d7487aa82c8ee5bb9c312e320..52232453bd0872cf88da74a7823ae0ee4e951724 100644 (file)
@@ -159,7 +159,7 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
         job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
         container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
-        mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+        mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "|\n"
 
         self.build_monitor([{'items': [{'uuid': job_uuid}]}],
                            self.MockCalculatorUnsatisfiableJobs(), True, True)
@@ -181,8 +181,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_squeue_server_list(self, mock_squeue):
-        mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+        mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -195,8 +195,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_squeue_server_list_suffix(self, mock_squeue):
-        mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+        mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -207,6 +207,16 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
         self.subscriber.assert_called_with([testutil.MockSize(1),
                                             testutil.MockSize(2)])
 
+    @mock.patch("subprocess.check_output")
+    def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
+        mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test\n"""
+        super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
+            [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+                                                                True, True)
+        self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
+        self.subscriber.assert_called_with([testutil.MockSize(2)])
+
     def test_coerce_to_mb(self):
         self.assertEqual(1, jobqueue.JobQueueMonitorActor.coerce_to_mb("1"))
         self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512"))
index 11f41b8d9ab68458fae21aa6b86685630d8d1a3f..5becd0c2241386e34b6dfef8e57a29b025335a67 100644 (file)
@@ -42,10 +42,15 @@ class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_update_from_sinfo(self, sinfo_mock):
-        sinfo_mock.return_value = "compute99 alloc"
-        node = testutil.arvados_node_mock()
+        sinfo_mock.return_value = """compute1|idle|instancetype=a1.test
+compute2|alloc|(null)
+notarvados12345|idle|(null)
+"""
+        nodeIdle = testutil.arvados_node_mock(node_num=1)
+        nodeBusy = testutil.arvados_node_mock(node_num=2)
+        nodeMissing = testutil.arvados_node_mock(node_num=99)
         self.build_monitor([{
-            'items': [node],
+            'items': [nodeIdle, nodeBusy, nodeMissing],
             'items_available': 1,
             'offset': 0
         }, {
@@ -53,11 +58,18 @@ class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
             'items_available': 1,
             'offset': 1
         }])
-        self.monitor.subscribe_to(node['uuid'],
+        self.monitor.subscribe_to(nodeMissing['uuid'],
                                   self.subscriber).get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
-        self.subscriber.assert_called_with(node)
-        self.assertEqual("busy", node["crunch_worker_state"])
+        self.subscriber.assert_called_with(nodeMissing)
+
+        self.assertEqual("idle", nodeIdle["crunch_worker_state"])
+        self.assertEqual("busy", nodeBusy["crunch_worker_state"])
+        self.assertEqual("down", nodeMissing["crunch_worker_state"])
+
+        self.assertEqual("instancetype=a1.test", nodeIdle["slurm_node_features"])
+        self.assertEqual("", nodeBusy["slurm_node_features"])
+        self.assertEqual("", nodeMissing["slurm_node_features"])
 
 
 class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
index 6e134375bb8aec05bdd71f830e28f277d3cff5b5..555144c4d05d2bc562d9bc2357fa93421f64b35f 100644 (file)
@@ -55,7 +55,7 @@ def cloud_object_mock(name_id, **extra):
 def cloud_node_fqdn(node):
     # We intentionally put the FQDN somewhere goofy to make sure tested code is
     # using this function for lookups.
-    return node.extra.get('testname', 'NoTestName')
+    return node.extra.get('testname', node.name+'.NoTestName.invalid')
 
 def ip_address_mock(last_octet):
     return '10.20.30.{}'.format(last_octet)
@@ -80,7 +80,7 @@ class MockShutdownTimer(object):
 class MockSize(object):
     def __init__(self, factor):
         self.id = 'z{}.test'.format(factor)
-        self.name = self.id
+        self.name = 'test size '+self.id
         self.ram = 128 * factor
         self.disk = factor   # GB
         self.scratch = 1000 * factor # MB
index 97cc79d32fd2d110f6bd879441316cdcfec6adc1..183ff2abfd5e4e162c5b0102c298991adeb33cdf 100755 (executable)
@@ -12,19 +12,7 @@ cd /usr/src/arvados/doc
 run_bundler --without=development
 
 cd /usr/src/arvados/sdk/R
-R --quiet --vanilla <<EOF
-options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
-if (!requireNamespace("devtools")) {
-  install.packages("devtools")
-}
-if (!requireNamespace("roxygen2")) {
-  install.packages("roxygen2")
-}
-if (!requireNamespace("pkgdown")) {
-  devtools::install_github("hadley/pkgdown")
-}
-devtools::install_dev_deps()
-EOF
+R --quiet --vanilla --file=install_deps.R
 
 if test "$1" = "--only-deps" ; then
     exit
index 10569b2e139b89a2904820ab62dab6cbc3a747b5..af7b2e92ebeb0cb60697ac03dacc25e33553782b 100644 (file)
@@ -217,7 +217,7 @@ func SetParentGroup(cfg *ConfigParams) error {
                        return fmt.Errorf("error searching for parent group: %s", err)
                }
                if len(gl.Items) == 0 {
-                       // Default parent group not existant, create one.
+                       // Default parent group does not exist, create it.
                        if cfg.Verbose {
                                log.Println("Default parent group not found, creating...")
                        }
index aeac93e475bb265d1ed7fb7fcfb75870604838e3..a4f750b4c4d0445567ad20da7ac9408eb12a692d 100644 (file)
                        "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
                        "revisionTime": "2017-07-27T13:52:37Z"
                },
+               {
+                       "checksumSHA1": "xHZe/h/tyrqmS9qiR03bLfRv5FI=",
+                       "path": "github.com/Azure/azure-sdk-for-go/storage",
+                       "revision": "f8eeb65a1a1f969696b49aada9d24073f2c2acd1",
+                       "revisionTime": "2018-02-15T19:19:13Z"
+               },
+               {
+                       "checksumSHA1": "PfyfOXsPbGEWmdh54cguqzdwloY=",
+                       "path": "github.com/Azure/azure-sdk-for-go/version",
+                       "revision": "471256ff7c6c93b96131845cef5309d20edd313d",
+                       "revisionTime": "2018-02-14T01:17:07Z"
+               },
+               {
+                       "checksumSHA1": "LQWU/2M2E4L/hVzT9BVW1SkLrpA=",
+                       "path": "github.com/Azure/go-autorest/autorest",
+                       "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+                       "revisionTime": "2017-11-30T17:00:06Z"
+               },
+               {
+                       "checksumSHA1": "nBQ7cdhoeYUur6G6HG97uueoDmE=",
+                       "path": "github.com/Azure/go-autorest/autorest/adal",
+                       "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+                       "revisionTime": "2017-11-30T17:00:06Z"
+               },
+               {
+                       "checksumSHA1": "zXyLmDVpkYkIsL0yinNLoW82IZc=",
+                       "path": "github.com/Azure/go-autorest/autorest/azure",
+                       "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+                       "revisionTime": "2017-11-30T17:00:06Z"
+               },
+               {
+                       "checksumSHA1": "9nXCi9qQsYjxCeajJKWttxgEt0I=",
+                       "path": "github.com/Azure/go-autorest/autorest/date",
+                       "revision": "a91c94d19d5efcb398b3aab64b8766e724aa7442",
+                       "revisionTime": "2017-11-30T17:00:06Z"
+               },
                {
                        "checksumSHA1": "o/3cn04KAiwC7NqNVvmfVTD+hgA=",
                        "path": "github.com/Microsoft/go-winio",
                        "revision": "d682213848ed68c0a260ca37d6dd5ace8423f5ba",
                        "revisionTime": "2017-12-05T20:32:29Z"
                },
+               {
+                       "checksumSHA1": "spyv5/YFBjYyZLZa1U2LBfDR8PM=",
+                       "path": "github.com/beorn7/perks/quantile",
+                       "revision": "4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9",
+                       "revisionTime": "2016-08-04T10:47:26Z"
+               },
                {
                        "checksumSHA1": "+Zz+leZHHC9C0rx8DoRuffSRPso=",
                        "path": "github.com/coreos/go-systemd/daemon",
                        "revisionTime": "2018-01-08T08:51:32Z"
                },
                {
-                       "checksumSHA1": "pAu+do4x7E5SFLfIqJeGwhcOd6E=",
-                       "path": "github.com/curoverse/azure-sdk-for-go/storage",
-                       "revision": "1620af6b32398bfc91827ceae54a8cc1f55df04d",
-                       "revisionTime": "2016-12-14T20:08:43Z"
+                       "checksumSHA1": "+TKtBzv23ywvmmqRiGEjUba4YmI=",
+                       "path": "github.com/dgrijalva/jwt-go",
+                       "revision": "dbeaa9332f19a944acb5736b4456cfcc02140e29",
+                       "revisionTime": "2017-10-19T21:57:19Z"
                },
                {
                        "checksumSHA1": "Gj+xR1VgFKKmFXYOJMnAczC3Znk=",
                        "revision": "160de10b2537169b5ae3e7e221d28269ef40d311",
                        "revisionTime": "2018-01-04T10:21:28Z"
                },
+               {
+                       "checksumSHA1": "iVfdaLxIDjfk2KLP8dCMIbsxZZM=",
+                       "path": "github.com/golang/protobuf/jsonpb",
+                       "revision": "1e59b77b52bf8e4b449a57e6f79f21226d571845",
+                       "revisionTime": "2017-11-13T18:07:20Z"
+               },
+               {
+                       "checksumSHA1": "yqF125xVSkmfLpIVGrLlfE05IUk=",
+                       "path": "github.com/golang/protobuf/proto",
+                       "revision": "1e59b77b52bf8e4b449a57e6f79f21226d571845",
+                       "revisionTime": "2017-11-13T18:07:20Z"
+               },
+               {
+                       "checksumSHA1": "Ylq6kq3KWBy6mu68oyEwenhNMdg=",
+                       "path": "github.com/golang/protobuf/ptypes/struct",
+                       "revision": "1e59b77b52bf8e4b449a57e6f79f21226d571845",
+                       "revisionTime": "2017-11-13T18:07:20Z"
+               },
                {
                        "checksumSHA1": "iIUYZyoanCQQTUaWsu8b+iOSPt4=",
                        "origin": "github.com/docker/docker/vendor/github.com/gorilla/context",
                        "revision": "83612a56d3dd153a94a629cd64925371c9adad78",
                        "revisionTime": "2017-11-26T05:04:59Z"
                },
+               {
+                       "checksumSHA1": "T9E+5mKBQ/BX4wlNxgaPfetxdeI=",
+                       "path": "github.com/marstr/guid",
+                       "revision": "8bdf7d1a087ccc975cf37dd6507da50698fd19ca",
+                       "revisionTime": "2017-04-27T23:51:15Z"
+               },
+               {
+                       "checksumSHA1": "bKMZjd2wPw13VwoE7mBeSv5djFA=",
+                       "path": "github.com/matttproud/golang_protobuf_extensions/pbutil",
+                       "revision": "c12348ce28de40eed0136aa2b644d0ee0650e56c",
+                       "revisionTime": "2016-04-24T11:30:07Z"
+               },
                {
                        "checksumSHA1": "V/quM7+em2ByJbWBLOsEwnY3j/Q=",
                        "path": "github.com/mitchellh/go-homedir",
                        "revision": "e881fd58d78e04cf6d0de1217f8707c8cc2249bc",
                        "revisionTime": "2017-12-16T07:03:16Z"
                },
+               {
+                       "checksumSHA1": "Ajt29IHVbX99PUvzn8Gc/lMCXBY=",
+                       "path": "github.com/prometheus/client_golang/prometheus",
+                       "revision": "9bb6ab929dcbe1c8393cd9ef70387cb69811bd1c",
+                       "revisionTime": "2018-02-03T14:28:15Z"
+               },
+               {
+                       "checksumSHA1": "c3Ui7nnLiJ4CAGWZ8dGuEgqHd8s=",
+                       "path": "github.com/prometheus/client_golang/prometheus/promhttp",
+                       "revision": "9bb6ab929dcbe1c8393cd9ef70387cb69811bd1c",
+                       "revisionTime": "2018-02-03T14:28:15Z"
+               },
+               {
+                       "checksumSHA1": "DvwvOlPNAgRntBzt3b3OSRMS2N4=",
+                       "path": "github.com/prometheus/client_model/go",
+                       "revision": "99fa1f4be8e564e8a6b613da7fa6f46c9edafc6c",
+                       "revisionTime": "2017-11-17T10:05:41Z"
+               },
+               {
+                       "checksumSHA1": "xfnn0THnqNwjwimeTClsxahYrIo=",
+                       "path": "github.com/prometheus/common/expfmt",
+                       "revision": "89604d197083d4781071d3c65855d24ecfb0a563",
+                       "revisionTime": "2018-01-10T21:49:58Z"
+               },
+               {
+                       "checksumSHA1": "GWlM3d2vPYyNATtTFgftS10/A9w=",
+                       "path": "github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg",
+                       "revision": "89604d197083d4781071d3c65855d24ecfb0a563",
+                       "revisionTime": "2018-01-10T21:49:58Z"
+               },
+               {
+                       "checksumSHA1": "YU+/K48IMawQnToO4ETE6a+hhj4=",
+                       "path": "github.com/prometheus/common/model",
+                       "revision": "89604d197083d4781071d3c65855d24ecfb0a563",
+                       "revisionTime": "2018-01-10T21:49:58Z"
+               },
+               {
+                       "checksumSHA1": "lolK0h7LSVERIX8zLyVQ/+7wEyA=",
+                       "path": "github.com/prometheus/procfs",
+                       "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
+                       "revisionTime": "2018-01-25T13:30:57Z"
+               },
+               {
+                       "checksumSHA1": "lv9rIcjbVEGo8AT1UCUZXhXrfQc=",
+                       "path": "github.com/prometheus/procfs/internal/util",
+                       "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
+                       "revisionTime": "2018-01-25T13:30:57Z"
+               },
+               {
+                       "checksumSHA1": "BXJH5h2ri8SU5qC6kkDvTIGCky4=",
+                       "path": "github.com/prometheus/procfs/nfs",
+                       "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
+                       "revisionTime": "2018-01-25T13:30:57Z"
+               },
+               {
+                       "checksumSHA1": "yItvTQLUVqm/ArLEbvEhqG0T5a0=",
+                       "path": "github.com/prometheus/procfs/xfs",
+                       "revision": "cb4147076ac75738c9a7d279075a253c0cc5acbd",
+                       "revisionTime": "2018-01-25T13:30:57Z"
+               },
+               {
+                       "checksumSHA1": "eDQ6f1EsNf+frcRO/9XukSEchm8=",
+                       "path": "github.com/satori/go.uuid",
+                       "revision": "36e9d2ebbde5e3f13ab2e25625fd453271d6522e",
+                       "revisionTime": "2018-01-03T17:44:51Z"
+               },
                {
                        "checksumSHA1": "UwtyqB7CaUWPlw0DVJQvw0IFQZs=",
                        "path": "github.com/sergi/go-diff/diffmatchpatch",