Merge branch 'master' into 9369-arv-cwl-docs
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 15 Jul 2016 17:11:49 +0000 (13:11 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 15 Jul 2016 17:11:49 +0000 (13:11 -0400)
86 files changed:
apps/workbench/app/assets/javascripts/infinite_scroll.js
apps/workbench/app/controllers/application_controller.rb
apps/workbench/app/controllers/container_requests_controller.rb
apps/workbench/app/controllers/containers_controller.rb
apps/workbench/app/controllers/jobs_controller.rb
apps/workbench/app/controllers/projects_controller.rb
apps/workbench/app/models/container_request.rb
apps/workbench/app/views/projects/_show_jobs_and_pipelines.html.erb [deleted file]
apps/workbench/app/views/projects/_show_pipelines_and_processes.html.erb [new file with mode: 0644]
apps/workbench/test/controllers/projects_controller_test.rb
apps/workbench/test/integration/anonymous_access_test.rb
apps/workbench/test/integration/download_test.rb
apps/workbench/test/integration/pipeline_instances_test.rb
apps/workbench/test/integration/projects_test.rb
build/run-build-packages.sh
build/run-tests.sh
crunch_scripts/cwl-runner
doc/api/methods/groups.html.textile.liquid
doc/install/install-keepstore.html.textile.liquid
docker/jobs/Dockerfile
lib/crunchstat/crunchstat.go [new file with mode: 0644]
lib/crunchstat/crunchstat_test.go [new file with mode: 0644]
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/test_with_arvbox.sh
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/tool/submit_tool.cwl
sdk/cwl/tests/wf/inputs_test.cwl
sdk/cwl/tests/wf/submit_wf.cwl
sdk/go/arvados/client_test.go
sdk/go/arvados/keep_service.go
sdk/go/crunchrunner/crunchrunner.go
sdk/go/crunchrunner/crunchrunner_test.go
sdk/go/keepclient/collectionreader.go
sdk/python/arvados/commands/run.py
sdk/python/arvados/keep.py
sdk/python/tests/run_test_server.py
services/api/app/controllers/arvados/v1/groups_controller.rb
services/api/lib/eventbus.rb
services/api/test/fixtures/container_requests.yml
services/api/test/fixtures/groups.yml
services/api/test/fixtures/jobs.yml
services/api/test/fixtures/pipeline_instances.yml
services/api/test/functional/arvados/v1/groups_controller_test.rb
services/api/test/integration/websocket_test.rb
services/api/test/websocket_runner.rb
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-local/crunch-dispatch-local_test.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/crunch-run/logging.go
services/crunch-run/logging_test.go
services/crunchstat/crunchstat.go
services/crunchstat/crunchstat_test.go
services/datamanager/collection/collection.go
services/datamanager/collection/collection_test.go
services/datamanager/keep/keep.go
services/datamanager/keep/keep_test.go
services/datamanager/summary/pull_list_test.go
services/datamanager/summary/summary_test.go
services/datamanager/summary/trash_list.go
services/datamanager/summary/trash_list_test.go
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/balance_test.go
services/keep-balance/integration_test.go
services/keep-balance/main.go
services/keepstore/azure_blob_volume.go
services/keepstore/keepstore.go
services/keepstore/s3_volume.go
services/keepstore/trash_worker.go
services/keepstore/trash_worker_test.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_unix.go
services/keepstore/work_queue_test.go
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/tests/test_computenode_dispatch.py
tools/arvbox/bin/arvbox

index 047858c5a0e3a9811408de40442f24605994868d..a0c9efc5231e5b2d27a66e6af6e1489770b12009 100644 (file)
@@ -151,7 +151,8 @@ function mergeInfiniteContentParams($container) {
     // For example, filterable.js writes filters in
     // infiniteContentParamsFilterable ("search for text foo")
     // without worrying about clobbering the filters set up by the
-    // tab pane ("only show jobs and pipelines in this tab").
+    // tab pane ("only show container requests and pipeline instances
+    // in this tab").
     $.each($container.data(), function(datakey, datavalue) {
         // Note: We attach these data to DOM elements using
         // <element data-foo-bar="baz">. We store/retrieve them
index 0ed629403c41fb3b10ca545af37200515f711fee..648cae85a67006dc46e2410f3b7e3afc99637f8e 100644 (file)
@@ -115,7 +115,7 @@ class ApplicationController < ActionController::Base
   # Column names should always be qualified by a table name and a direction is optional, defaulting to asc
   # (e.g. "collections.name" or "collections.name desc").
   # If a column name is specified, that table will be sorted by that column.
-  # If there are objects from different models that will be shown (such as in Jobs and Pipelines tab),
+  # If there are objects from different models that will be shown (such as in Pipelines and processes tab),
   # then a sort column name can optionally be specified for each model, passed as an comma-separated list (e.g. "jobs.script, pipeline_instances.name")
   # Currently only one sort column name and direction can be specified for each model.
   def load_filters_and_paging_params
index 4a32cd8171c53ffa64d17a1e4640abb7ca837bf6..f5a68fec27eb00f64e1d027d29c3a4079dd0687b 100644 (file)
@@ -1,4 +1,9 @@
 class ContainerRequestsController < ApplicationController
+  skip_around_filter :require_thread_api_token, if: proc { |ctrl|
+    Rails.configuration.anonymous_user_token and
+    'show' == ctrl.action_name
+  }
+
   def show_pane_list
     %w(Status Log Advanced)
   end
index 86582dff4fe85ce5073f9f3a8e8851680028b9f0..1df2c3acb0f5bcba19562c57b8f794c641375c88 100644 (file)
@@ -1,4 +1,9 @@
 class ContainersController < ApplicationController
+  skip_around_filter :require_thread_api_token, if: proc { |ctrl|
+    Rails.configuration.anonymous_user_token and
+    'show' == ctrl.action_name
+  }
+
   def show_pane_list
     %w(Status Log Advanced)
   end
index 398417734c71c34f2aaac71fbf700eaf4d5f50d1..f18a79d646c4a0a1dc774e52f0c2d4da1c8f9346 100644 (file)
@@ -61,14 +61,9 @@ class JobsController < ApplicationController
   end
 
   def logs
-    @logs = Log.select(%w(event_type object_uuid event_at properties))
-               .order('event_at DESC')
-               .filter([["event_type",  "=", "stderr"],
-                        ["object_uuid", "in", [@object.uuid]]])
-               .limit(500)
-               .results
-               .to_a
-               .map{ |e| e.serializable_hash.merge({ 'prepend' => true }) }
+    @logs = @object.
+      stderr_log_query(Rails.configuration.running_job_log_records_to_fetch).
+      map { |e| e.serializable_hash.merge({ 'prepend' => true }) }
     respond_to do |format|
       format.json { render json: @logs }
     end
index e49ed1fab65f38b6631c0298f8ba508feacd9087..3674e314a835742dc8071770fe771e41dcef7352 100644 (file)
@@ -63,8 +63,8 @@ class ProjectsController < ApplicationController
       }
     pane_list <<
       {
-        :name => 'Jobs_and_pipelines',
-        :filters => [%w(uuid is_a) + [%w(arvados#job arvados#pipelineInstance)]]
+        :name => 'Pipelines_and_processes',
+        :filters => [%w(uuid is_a) + [%w(arvados#containerRequest arvados#pipelineInstance)]]
       }
     pane_list <<
       {
@@ -213,9 +213,13 @@ class ProjectsController < ApplicationController
       @name_link_for = {}
       kind_filters.each do |attr,op,val|
         (val.is_a?(Array) ? val : [val]).each do |type|
+          filters = @filters - kind_filters + [['uuid', 'is_a', type]]
+          if type == 'arvados#containerRequest'
+            filters = filters + [['container_requests.requesting_container_uuid', '=', nil]]
+          end
           objects = @object.contents(order: @order,
                                      limit: @limit,
-                                     filters: (@filters - kind_filters + [['uuid', 'is_a', type]]),
+                                     filters: filters,
                                     )
           objects.each do |object|
             @name_link_for[object.andand.uuid] = objects.links_for(object, 'name').first
index 62d8bff042c16dec335f746ff6f0991e5e37250e..0148de51f7459a678d49547fe4f24a10e6bc27e9 100644 (file)
@@ -7,6 +7,10 @@ class ContainerRequest < ArvadosBase
     [ 'description' ]
   end
 
+  def self.goes_in_projects?
+    true
+  end
+
   def work_unit(label=nil)
     ContainerWorkUnit.new(self, label)
   end
diff --git a/apps/workbench/app/views/projects/_show_jobs_and_pipelines.html.erb b/apps/workbench/app/views/projects/_show_jobs_and_pipelines.html.erb
deleted file mode 100644 (file)
index 3637ef4..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-<%= render_pane 'tab_contents', to_string: true, locals: {
-        limit: 50,
-           filters: [['uuid', 'is_a', ["arvados#job", "arvados#pipelineInstance"]]],
-           sortable_columns: { 'name' => 'jobs.script, pipeline_instances.name', 'description' => 'jobs.description, pipeline_instances.description' }
-    }.merge(local_assigns) %>
diff --git a/apps/workbench/app/views/projects/_show_pipelines_and_processes.html.erb b/apps/workbench/app/views/projects/_show_pipelines_and_processes.html.erb
new file mode 100644 (file)
index 0000000..1ee3070
--- /dev/null
@@ -0,0 +1,5 @@
+<%= render_pane 'tab_contents', to_string: true, locals: {
+      limit: 50,
+      filters: [['uuid', 'is_a', ["arvados#containerRequest", "arvados#pipelineInstance"]]],
+      sortable_columns: { 'name' => 'container_requests.name, pipeline_instances.name', 'description' => 'container_requests.description, pipeline_instances.description' }
+    }.merge(local_assigns) %>
index 58914a84ac87b5b0949f07d634a826226a2b64af..c0519bcedfd6457ed3aca4608cc6e14289a9c473 100644 (file)
@@ -421,7 +421,7 @@ class ProjectsControllerTest < ActionController::TestCase
 
   [
     ["active", 5, ["aproject", "asubproject"], "anonymously_accessible_project"],
-    ["user1_with_load", 2, ["project_with_10_collections"], "project_with_2_pipelines_and_60_jobs"],
+    ["user1_with_load", 2, ["project_with_10_collections"], "project_with_2_pipelines_and_60_crs"],
     ["admin", 5, ["anonymously_accessible_project", "subproject_in_anonymous_accessible_project"], "aproject"],
   ].each do |user, page_size, tree_segment, unexpected|
     test "build my projects tree for #{user} user and verify #{unexpected} is omitted" do
index d58a0315ee54595c2e3d4d10e09c4a822d2e93ae..6e28e4efb4525363fdf3fb1184348b08d3a19647 100644 (file)
@@ -68,7 +68,7 @@ class AnonymousAccessTest < ActionDispatch::IntegrationTest
 
     assert_selector 'a', text: 'Description'
     assert_selector 'a', text: 'Data collections'
-    assert_selector 'a', text: 'Jobs and pipelines'
+    assert_selector 'a', text: 'Pipelines and processes'
     assert_selector 'a', text: 'Pipeline templates'
     assert_selector 'a', text: 'Subprojects'
     assert_selector 'a', text: 'Advanced'
@@ -123,39 +123,35 @@ class AnonymousAccessTest < ActionDispatch::IntegrationTest
   end
 
   [
-    'running_job',
-    'completed_job',
+    'running anonymously accessible cr',
     'pipelineInstance'
-  ].each do |type|
-    test "anonymous user accesses jobs and pipelines tab in shared project and clicks on #{type}" do
+  ].each do |proc|
+    test "anonymous user accesses pipelines and processes tab in shared project and clicks on '#{proc}'" do
       visit PUBLIC_PROJECT
       click_link 'Data collections'
       assert_text 'GNU General Public License'
 
-      click_link 'Jobs and pipelines'
+      click_link 'Pipelines and processes'
       assert_text 'Pipeline in publicly accessible project'
 
-      # click on the specified job
-      if type.include? 'job'
-        verify_job_row type
-      else
+      if proc.include? 'pipeline'
         verify_pipeline_instance_row
+      else
+        verify_container_request_row proc
       end
     end
   end
 
-  def verify_job_row look_for
+  def verify_container_request_row look_for
     within first('tr', text: look_for) do
       click_link 'Show'
     end
     assert_text 'Public Projects Unrestricted public data'
-    assert_text 'script_version'
+    assert_text 'command'
 
     assert_text 'zzzzz-tpzed-xurymjxw79nv3jz' # modified by user
     assert_no_selector 'a', text: 'zzzzz-tpzed-xurymjxw79nv3jz'
-    assert_no_selector 'a', text: 'Move job'
     assert_no_selector 'button', text: 'Cancel'
-    assert_no_selector 'button', text: 'Re-run job'
   end
 
   def verify_pipeline_instance_row
index 8a16fb8a66b547ae704cc2791f06f330a5268bc9..9359475a41a889dda8f2f63965434d3457bd89ab 100644 (file)
@@ -65,13 +65,13 @@ class DownloadTest < ActionDispatch::IntegrationTest
     within "#collection_files" do
       find('[title~=Download]').click
     end
-    wait_for_download 'w a z', 'w a z'
+    wait_for_download 'w a z', 'w a z', timeout: 6
   end
 
-  def wait_for_download filename, expect_data
+  def wait_for_download filename, expect_data, timeout: 3
     data = nil
     tries = 0
-    while tries < 20
+    while tries < timeout*10
       sleep 0.1
       tries += 1
       data = File.read(DownloadHelper.path.join filename) rescue nil
index 2ab8beb294ab8f2ae99e1c6866d2ad7efbcb0822..3d8cbf0b630ee9ddcd7db17ccc4398c645db2b0c 100644 (file)
@@ -82,7 +82,7 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
       wait_for_ajax
     end
 
-    click_link 'Jobs and pipelines'
+    click_link 'Pipelines and processes'
     find('tr[data-kind="arvados#pipelineInstance"]', text: '(none)').
       find('a', text: 'Show').
       click
index 01e84b1c0219d19551122356006f7081b0d42629..1c18a436fdbb3fcbaab3de74e9a350ce3e87bbab 100644 (file)
@@ -514,23 +514,23 @@ class ProjectsTest < ActionDispatch::IntegrationTest
 
   [
     ['project_with_10_pipelines', 10, 0],
-    ['project_with_2_pipelines_and_60_jobs', 2, 60],
+    ['project_with_2_pipelines_and_60_crs', 2, 60],
     ['project_with_25_pipelines', 25, 0],
-  ].each do |project_name, num_pipelines, num_jobs|
-    test "scroll pipeline instances tab for #{project_name} with #{num_pipelines} pipelines and #{num_jobs} jobs" do
-      item_list_parameter = "Jobs_and_pipelines"
+  ].each do |project_name, num_pipelines, num_crs|
+    test "scroll pipeline instances tab for #{project_name} with #{num_pipelines} pipelines and #{num_crs} container requests" do
+      item_list_parameter = "Pipelines_and_processes"
       scroll_setup project_name,
-                   num_pipelines + num_jobs,
+                   num_pipelines + num_crs,
                    item_list_parameter
       # check the general scrolling and the pipelines
       scroll_items_check num_pipelines,
                          "pipeline_",
                          item_list_parameter,
                          'tr[data-kind="arvados#pipelineInstance"]'
-      # Check job count separately
-      jobs_found = page.all('tr[data-kind="arvados#job"]')
-      found_job_count = jobs_found.count
-      assert_equal num_jobs, found_job_count, 'Did not find expected number of jobs'
+      # Check container request count separately
+      crs_found = page.all('tr[data-kind="arvados#containerRequest"]')
+      found_cr_count = crs_found.count
+      assert_equal num_crs, found_cr_count, 'Did not find expected number of container requests'
     end
   end
 
@@ -618,8 +618,8 @@ class ProjectsTest < ActionDispatch::IntegrationTest
       assert_no_selector 'li.disabled', text: 'Copy selected'
     end
 
-    # Go to Jobs and pipelines tab and assert none selected
-    click_link 'Jobs and pipelines'
+    # Go to Pipelines and processes tab and assert none selected
+    click_link 'Pipelines and processes'
     wait_for_ajax
 
     # Since this is the first visit to this tab, all selection options should be disabled
index 783d3d6f957cb1fa5b3e167df6c3f5cedd1dfecd..1763ff3ba348ad47897e6e1601642990681a19f7 100755 (executable)
@@ -459,14 +459,15 @@ fpm_build $WORKSPACE/sdk/cwl "${PYTHON2_PKG_PREFIX}-arvados-cwl-runner" 'Curover
 # So we build this thing separately.
 #
 # Ward, 2016-03-17
-fpm_build schema_salad "" "" python 1.12.20160610104117
+fpm_build schema_salad "" "" python 1.14.20160708181155
 
 # And schema_salad now depends on ruamel-yaml, which apparently has a braindead setup.py that requires special arguments to build (otherwise, it aborts with 'error: you have to install with "pip install ."'). Sigh.
 # Ward, 2016-05-26
-fpm_build ruamel.yaml "" "" python "" --python-setup-py-arguments "--single-version-externally-managed"
+# ...and schema_salad 1.12.20160610104117 doesn't work with ruamel-yaml > 0.11.11.
+fpm_build ruamel.yaml "" "" python 0.11.11 --python-setup-py-arguments "--single-version-externally-managed"
 
 # And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20160630171631
+fpm_build cwltool "" "" python 1.0.20160714182449
 
 # FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
 fpm_build rdflib-jsonld "" "" python 0.3.0
index 30a80f527afabcee038350a963a077268b02aaa2..2e8641a5f335dcfb963c45730103b8222c771346 100755 (executable)
@@ -714,6 +714,7 @@ gostuff=(
     sdk/go/manifest
     sdk/go/streamer
     sdk/go/crunchrunner
+    lib/crunchstat
     services/arv-git-httpd
     services/crunchstat
     services/keep-web
index c786fc10b1715e1c5912e13ee98011aa94dceab3..2a1873a84e6925f9c1edf6057dc2b01a392d25ff 100755 (executable)
@@ -19,7 +19,7 @@ import os
 import json
 import argparse
 from arvados.api import OrderedJsonModel
-from cwltool.process import adjustFiles
+from cwltool.process import adjustFileObjs
 from cwltool.load_tool import load_tool
 
 # Print package versions
@@ -32,20 +32,23 @@ try:
 
     def keeppath(v):
         if arvados.util.keep_locator_pattern.match(v):
-            return "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
+            return "keep:%s" % v
         else:
             return v
 
-    job_order_object["cwl:tool"] = keeppath(job_order_object["cwl:tool"])
+    def keeppathObj(v):
+        v["location"] = keeppath(v["location"])
+
+    job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
 
     for k,v in job_order_object.items():
         if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
             job_order_object[k] = {
                 "class": "File",
-                "path": keeppath(v)
+                "location": "keep:%s" % v
             }
 
-    adjustFiles(job_order_object, keeppath)
+    adjustFileObjs(job_order_object, keeppathObj)
 
     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
 
@@ -63,15 +66,16 @@ try:
     outputObj = runner.arvExecutor(t, job_order_object, **vars(args))
 
     files = {}
-    def capture(path):
+    def capture(fileobj):
+        path = fileobj["location"]
         sp = path.split("/")
         col = sp[0][5:]
         if col not in files:
             files[col] = set()
         files[col].add("/".join(sp[1:]))
-        return path
+        fileobj["location"] = path
 
-    adjustFiles(outputObj, capture)
+    adjustFileObjs(outputObj, capture)
 
     final = arvados.collection.Collection()
 
@@ -80,10 +84,10 @@ try:
             for f in c:
                 final.copy(f, f, c, True)
 
-    def makeRelative(path):
-        return "/".join(path.split("/")[1:])
+    def makeRelative(fileobj):
+        fileobj["location"] = "/".join(fileobj["location"].split("/")[1:])
 
-    adjustFiles(outputObj, makeRelative)
+    adjustFileObjs(outputObj, makeRelative)
 
     with final.open("cwl.output.json", "w") as f:
         json.dump(outputObj, f, indent=4)
index 9f20a88a9519d09eb5d7fe040c93706379bc089d..cd9633db427aa1807d4a600f6533225f543e4b34 100644 (file)
@@ -29,6 +29,8 @@ table(table table-bordered table-condensed).
 
 Note: Because adding access tokens to manifests can be computationally expensive, the @manifest_text@ field is not included in listed collections.  If you need it, request a "list of collections":{{site.baseurl}}/api/methods/collections.html with the filter @["owner_uuid", "=", GROUP_UUID]@, and @"manifest_text"@ listed in the select parameter.
 
+Note: Use filters with the attribute format @<item type>.<field name>@ to filter items of a specific type. For example: @["pipeline_instances.state", "=", "Complete"]@ to filter @pipeline_instances@ where @state@ is @Complete@. All other types of items owned by this group will be unimpacted by this filter and will still be included.
+
 h2. create
 
 Create a new Group.
index 6548422f4f8d0492cfac61a25257c365f238bcde..102a3f470ee661d076b14b7c209eacdee525e415 100644 (file)
@@ -35,28 +35,66 @@ Verify that Keepstore is functional:
 
 <notextile>
 <pre><code>~$ <span class="userinput">keepstore -h</span>
-2015/05/08 13:41:16 keepstore starting, pid 2565
+2016/07/01 14:06:21 keepstore starting, pid 32339
 Usage of ./keepstore:
-  -azure-storage-account-key-file="": File containing the account key used for subsequent --azure-storage-container-volume arguments.
-  -azure-storage-account-name="": Azure storage account name used for subsequent --azure-storage-container-volume arguments.
-  -azure-storage-container-volume=[]: Use the given container as a storage volume. Can be given multiple times.
-  -azure-storage-replication=3: Replication level to report to clients when data is stored in an Azure container.
-  -blob-signature-ttl=1209600: Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. See services/api/config/application.default.yml.
-  -blob-signing-key-file="": File containing the secret key for generating and verifying blob permission signatures.
-  -data-manager-token-file="": File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
-  -enforce-permissions=false: Enforce permission signatures on requests.
-  -listen=":25107": Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.
-  -max-buffers=128: Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.
+  -azure-max-get-bytes int
+       Maximum bytes to request in a single GET request. If smaller than 67108864, use multiple concurrent range requests to retrieve a block. (default 67108864)
+  -azure-storage-account-key-file string
+       File containing the account key used for subsequent --azure-storage-container-volume arguments.
+  -azure-storage-account-name string
+       Azure storage account name used for subsequent --azure-storage-container-volume arguments.
+  -azure-storage-container-volume value
+       Use the given container as a storage volume. Can be given multiple times. (default [])
+  -azure-storage-replication int
+       Replication level to report to clients when data is stored in an Azure container. (default 3)
+  -blob-signature-ttl int
+       Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. See services/api/config/application.default.yml. (default 1209600)
+  -blob-signing-key-file string
+       File containing the secret key for generating and verifying blob permission signatures.
+  -data-manager-token-file string
+       File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
+  -enforce-permissions
+       Enforce permission signatures on requests.
+  -listen string
+       Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces. (default ":25107")
+  -max-buffers int
+       Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released. (default 128)
   -max-requests int
-   Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
-  -never-delete=false: If set, nothing will be deleted. HTTP 405 will be returned for valid DELETE requests.
-  -permission-key-file="": Synonym for -blob-signing-key-file.
-  -permission-ttl=0: Synonym for -blob-signature-ttl.
-  -pid="": Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.
-  -readonly=false: Do not write, delete, or touch anything on the following volumes.
-  -serialize=false: Serialize read and write operations on the following volumes.
-  -volume=[]: Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named "keep" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.
-  -volumes=[]: Deprecated synonym for -volume.
+       Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
+  -never-delete
+       If true, nothing will be deleted. Warning: the relevant features in keepstore and data manager have not been extensively tested. You should leave this option alone unless you can afford to lose data. (default true)
+  -permission-key-file string
+       Synonym for -blob-signing-key-file.
+  -permission-ttl int
+       Synonym for -blob-signature-ttl.
+  -pid fuser -k pidfile
+       Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so fuser -k pidfile is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.
+  -readonly
+       Do not write, delete, or touch anything on the following volumes.
+  -s3-access-key-file string
+       File containing the access key used for subsequent -s3-bucket-volume arguments.
+  -s3-bucket-volume value
+       Use the given bucket as a storage volume. Can be given multiple times. (default [])
+  -s3-endpoint string
+       Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use "https://storage.googleapis.com".
+  -s3-region string
+       AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are ["ap-southeast-1" "eu-west-1" "us-gov-west-1" "sa-east-1" "cn-north-1" "ap-northeast-1" "ap-southeast-2" "eu-central-1" "us-east-1" "us-west-1" "us-west-2"].
+  -s3-replication int
+       Replication level reported to clients for subsequent -s3-bucket-volume arguments. (default 2)
+  -s3-secret-key-file string
+       File containing the secret key used for subsequent -s3-bucket-volume arguments.
+  -s3-unsafe-delete
+       EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.
+  -serialize
+       Serialize read and write operations on the following volumes.
+  -trash-check-interval duration
+       Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day. (default 24h0m0s)
+  -trash-lifetime duration
+       Time duration after a block is trashed during which it can be recovered using an /untrash request
+  -volume value
+       Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named "keep" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead. (default [])
+  -volumes value
+       Deprecated synonym for -volume. (default [])
 </code></pre>
 </notextile>
 
index d80c3a882defe43676476df144401eee64d97728..e1e7e87c5e53d0c297ec6d2e3ad0870890f402ff 100644 (file)
@@ -11,7 +11,7 @@ RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3
 ARG COMMIT=latest
 RUN echo $COMMIT && apt-get update -q
 
-RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
+RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libgnutls28-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
 
 # Install dependencies and set up system.
 RUN /usr/sbin/adduser --disabled-password \
diff --git a/lib/crunchstat/crunchstat.go b/lib/crunchstat/crunchstat.go
new file mode 100644 (file)
index 0000000..03cfa7d
--- /dev/null
@@ -0,0 +1,439 @@
+// Package crunchstat reports resource usage (CPU, memory, disk,
+// network) for a cgroup.
+package crunchstat
+
+import (
+       "bufio"
+       "bytes"
+       "errors"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "os"
+       "strconv"
+       "strings"
+       "time"
+)
+
+// This magically allows us to look up userHz via _SC_CLK_TCK:
+
+/*
+#include <unistd.h>
+#include <sys/types.h>
+#include <pwd.h>
+#include <stdlib.h>
+*/
+import "C"
+
+// A Reporter gathers statistics for a cgroup and writes them to a
+// log.Logger.
+type Reporter struct {
+       // CID of the container to monitor. If empty, read the CID
+       // from CIDFile (first waiting until a non-empty file appears
+       // at CIDFile). If CIDFile is also empty, report host
+       // statistics.
+       CID string
+
+       // Path to a file we can read CID from.
+       CIDFile string
+
+       // Where cgroup accounting files live on this system, e.g.,
+       // "/sys/fs/cgroup".
+       CgroupRoot string
+
+       // Parent cgroup, e.g., "docker".
+       CgroupParent string
+
+       // Interval between samples. Must be positive.
+       PollPeriod time.Duration
+
+       // Where to write statistics. Must not be nil.
+       Logger *log.Logger
+
+       reportedStatFile map[string]string
+       lastNetSample    map[string]ioSample
+       lastDiskSample   map[string]ioSample
+       lastCPUSample    cpuSample
+
+       done chan struct{}
+}
+
+// Start starts monitoring in a new goroutine, and returns
+// immediately.
+//
+// The monitoring goroutine waits for a non-empty CIDFile to appear
+// (unless CID is non-empty). Then it waits for the accounting files
+// to appear for the monitored container. Then it collects and reports
+// statistics until Stop is called.
+//
+// Callers should not call Start more than once.
+//
+// Callers should not modify public data fields after calling Start.
+func (r *Reporter) Start() {
+       r.done = make(chan struct{})
+       go r.run()
+}
+
+// Stop reporting. Do not call more than once, or before calling
+// Start.
+//
+// Nothing will be logged after Stop returns.
+func (r *Reporter) Stop() {
+       close(r.done)
+}
+
+func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
+       content, err := ioutil.ReadAll(in)
+       if err != nil {
+               r.Logger.Print(err)
+       }
+       return content, err
+}
+
+// Open the cgroup stats file in /sys/fs corresponding to the target
+// cgroup, and return an io.ReadCloser. If no stats file is available,
+// return nil.
+//
+// Log the file that was opened, if it isn't the same file opened on
+// the last openStatFile for this stat.
+//
+// Log "not available" if no file is found and either this stat has
+// been available in the past, or verbose==true.
+//
+// TODO: Instead of trying all options, choose a process in the
+// container, and read /proc/PID/cgroup to determine the appropriate
+// cgroup root for the given statgroup. (This will avoid falling back
+// to host-level stats during container setup and teardown.)
+func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
+       var paths []string
+       if r.CID != "" {
+               // Collect container's stats
+               paths = []string{
+                       fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
+                       fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
+               }
+       } else {
+               // Collect this host's stats
+               paths = []string{
+                       fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
+                       fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
+               }
+       }
+       var path string
+       var file *os.File
+       var err error
+       for _, path = range paths {
+               file, err = os.Open(path)
+               if err == nil {
+                       break
+               } else {
+                       path = ""
+               }
+       }
+       if pathWas := r.reportedStatFile[stat]; pathWas != path {
+               // Log whenever we start using a new/different cgroup
+               // stat file for a given statistic. This typically
+               // happens 1 to 3 times per statistic, depending on
+               // whether we happen to collect stats [a] before any
+               // processes have been created in the container and
+               // [b] after all contained processes have exited.
+               if path == "" && verbose {
+                       r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
+               } else if pathWas != "" {
+                       r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
+               } else {
+                       r.Logger.Printf("notice: reading stats from %s\n", path)
+               }
+               r.reportedStatFile[stat] = path
+       }
+       return file, err
+}
+
+func (r *Reporter) getContainerNetStats() (io.Reader, error) {
+       procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
+       if err != nil {
+               return nil, err
+       }
+       defer procsFile.Close()
+       reader := bufio.NewScanner(procsFile)
+       for reader.Scan() {
+               taskPid := reader.Text()
+               statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
+               stats, err := ioutil.ReadFile(statsFilename)
+               if err != nil {
+                       r.Logger.Print(err)
+                       continue
+               }
+               return strings.NewReader(string(stats)), nil
+       }
+       return nil, errors.New("Could not read stats for any proc in container")
+}
+
+type ioSample struct {
+       sampleTime time.Time
+       txBytes    int64
+       rxBytes    int64
+}
+
+func (r *Reporter) doBlkIOStats() {
+       c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
+       if err != nil {
+               return
+       }
+       defer c.Close()
+       b := bufio.NewScanner(c)
+       var sampleTime = time.Now()
+       newSamples := make(map[string]ioSample)
+       for b.Scan() {
+               var device, op string
+               var val int64
+               if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
+                       continue
+               }
+               var thisSample ioSample
+               var ok bool
+               if thisSample, ok = newSamples[device]; !ok {
+                       thisSample = ioSample{sampleTime, -1, -1}
+               }
+               switch op {
+               case "Read":
+                       thisSample.rxBytes = val
+               case "Write":
+                       thisSample.txBytes = val
+               }
+               newSamples[device] = thisSample
+       }
+       for dev, sample := range newSamples {
+               if sample.txBytes < 0 || sample.rxBytes < 0 {
+                       continue
+               }
+               delta := ""
+               if prev, ok := r.lastDiskSample[dev]; ok {
+                       delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
+                               sample.sampleTime.Sub(prev.sampleTime).Seconds(),
+                               sample.txBytes-prev.txBytes,
+                               sample.rxBytes-prev.rxBytes)
+               }
+               r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
+               r.lastDiskSample[dev] = sample
+       }
+}
+
+type memSample struct {
+       sampleTime time.Time
+       memStat    map[string]int64
+}
+
+func (r *Reporter) doMemoryStats() {
+       c, err := r.openStatFile("memory", "memory.stat", true)
+       if err != nil {
+               return
+       }
+       defer c.Close()
+       b := bufio.NewScanner(c)
+       thisSample := memSample{time.Now(), make(map[string]int64)}
+       wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
+       for b.Scan() {
+               var stat string
+               var val int64
+               if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
+                       continue
+               }
+               thisSample.memStat[stat] = val
+       }
+       var outstat bytes.Buffer
+       for _, key := range wantStats {
+               if val, ok := thisSample.memStat[key]; ok {
+                       outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+               }
+       }
+       r.Logger.Printf("mem%s\n", outstat.String())
+}
+
+func (r *Reporter) doNetworkStats() {
+       sampleTime := time.Now()
+       stats, err := r.getContainerNetStats()
+       if err != nil {
+               return
+       }
+
+       scanner := bufio.NewScanner(stats)
+       for scanner.Scan() {
+               var ifName string
+               var rx, tx int64
+               words := strings.Fields(scanner.Text())
+               if len(words) != 17 {
+                       // Skip lines with wrong format
+                       continue
+               }
+               ifName = strings.TrimRight(words[0], ":")
+               if ifName == "lo" || ifName == "" {
+                       // Skip loopback interface and lines with wrong format
+                       continue
+               }
+               if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
+                       continue
+               }
+               if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
+                       continue
+               }
+               nextSample := ioSample{}
+               nextSample.sampleTime = sampleTime
+               nextSample.txBytes = tx
+               nextSample.rxBytes = rx
+               var delta string
+               if prev, ok := r.lastNetSample[ifName]; ok {
+                       interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
+                       delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
+                               interval,
+                               tx-prev.txBytes,
+                               rx-prev.rxBytes)
+               }
+               r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
+               r.lastNetSample[ifName] = nextSample
+       }
+}
+
+type cpuSample struct {
+       hasData    bool // to distinguish the zero value from real data
+       sampleTime time.Time
+       user       float64
+       sys        float64
+       cpus       int64
+}
+
+// Return the number of CPUs available in the container. Return 0 if
+// we can't figure out the real number of CPUs.
+func (r *Reporter) getCPUCount() int64 {
+       cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
+       if err != nil {
+               return 0
+       }
+       defer cpusetFile.Close()
+       b, err := r.readAllOrWarn(cpusetFile)
+       if err != nil {
+               return 0
+       }
+       sp := strings.Split(string(b), ",")
+       cpus := int64(0)
+       for _, v := range sp {
+               var min, max int64
+               n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
+               if n == 2 {
+                       cpus += (max - min) + 1
+               } else {
+                       cpus++
+               }
+       }
+       return cpus
+}
+
+func (r *Reporter) doCPUStats() {
+       statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
+       if err != nil {
+               return
+       }
+       defer statFile.Close()
+       b, err := r.readAllOrWarn(statFile)
+       if err != nil {
+               return
+       }
+
+       var userTicks, sysTicks int64
+       fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
+       userHz := float64(C.sysconf(C._SC_CLK_TCK))
+       nextSample := cpuSample{
+               hasData:    true,
+               sampleTime: time.Now(),
+               user:       float64(userTicks) / userHz,
+               sys:        float64(sysTicks) / userHz,
+               cpus:       r.getCPUCount(),
+       }
+
+       delta := ""
+       if r.lastCPUSample.hasData {
+               delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
+                       nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
+                       nextSample.user-r.lastCPUSample.user,
+                       nextSample.sys-r.lastCPUSample.sys)
+       }
+       r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
+               nextSample.user, nextSample.sys, nextSample.cpus, delta)
+       r.lastCPUSample = nextSample
+}
+
+// Report stats periodically until we learn (via r.done) that someone
+// called Stop.
+func (r *Reporter) run() {
+       r.reportedStatFile = make(map[string]string)
+
+       if !r.waitForCIDFile() || !r.waitForCgroup() {
+               return
+       }
+
+       r.lastNetSample = make(map[string]ioSample)
+       r.lastDiskSample = make(map[string]ioSample)
+
+       ticker := time.NewTicker(r.PollPeriod)
+       for {
+               r.doMemoryStats()
+               r.doCPUStats()
+               r.doBlkIOStats()
+               r.doNetworkStats()
+               select {
+               case <-r.done:
+                       return
+               case <-ticker.C:
+               }
+       }
+}
+
+// If CID is empty, wait for it to appear in CIDFile. Return true if
+// we get it before we learn (via r.done) that someone called Stop.
+func (r *Reporter) waitForCIDFile() bool {
+       if r.CID != "" || r.CIDFile == "" {
+               return true
+       }
+
+       ticker := time.NewTicker(100 * time.Millisecond)
+       defer ticker.Stop()
+       for {
+               cid, err := ioutil.ReadFile(r.CIDFile)
+               if err == nil && len(cid) > 0 {
+                       r.CID = string(cid)
+                       return true
+               }
+               select {
+               case <-ticker.C:
+               case <-r.done:
+                       r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+                       return false
+               }
+       }
+}
+
+// Wait for the cgroup stats files to appear in cgroup_root. Return
+// true if they appear before r.done indicates someone called Stop. If
+// they don't appear within one poll interval, log a warning and keep
+// waiting.
+func (r *Reporter) waitForCgroup() bool {
+       ticker := time.NewTicker(100 * time.Millisecond)
+       defer ticker.Stop()
+       warningTimer := time.After(r.PollPeriod)
+       for {
+               c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
+               if err == nil {
+                       c.Close()
+                       return true
+               }
+               select {
+               case <-ticker.C:
+               case <-warningTimer:
+                       r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+               case <-r.done:
+                       r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+                       return false
+               }
+       }
+}
diff --git a/lib/crunchstat/crunchstat_test.go b/lib/crunchstat/crunchstat_test.go
new file mode 100644 (file)
index 0000000..697f235
--- /dev/null
@@ -0,0 +1,62 @@
+package crunchstat
+
+import (
+       "bufio"
+       "io"
+       "log"
+       "os"
+       "regexp"
+       "testing"
+)
+
+func bufLogger() (*log.Logger, *bufio.Reader) {
+       r, w := io.Pipe()
+       logger := log.New(w, "", 0)
+       return logger, bufio.NewReader(r)
+}
+
+func TestReadAllOrWarnFail(t *testing.T) {
+       logger, rcv := bufLogger()
+       rep := Reporter{Logger: logger}
+
+       done := make(chan bool)
+       var msg []byte
+       var err error
+       go func() {
+               msg, err = rcv.ReadBytes('\n')
+               close(done)
+       }()
+       {
+               // The special file /proc/self/mem can be opened for
+               // reading, but reading from byte 0 returns an error.
+               f, err := os.Open("/proc/self/mem")
+               if err != nil {
+                       t.Fatalf("Opening /proc/self/mem: %s", err)
+               }
+               if x, err := rep.readAllOrWarn(f); err == nil {
+                       t.Fatalf("Expected error, got %v", x)
+               }
+       }
+       <-done
+       if err != nil {
+               t.Fatal(err)
+       } else if matched, err := regexp.MatchString("^read /proc/self/mem: .*", string(msg)); err != nil || !matched {
+               t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+       }
+}
+
+func TestReadAllOrWarnSuccess(t *testing.T) {
+       rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+
+       f, err := os.Open("./crunchstat_test.go")
+       if err != nil {
+               t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+       }
+       data, err := rep.readAllOrWarn(f)
+       if err != nil {
+               t.Fatalf("got error %s", err)
+       }
+       if matched, err := regexp.MatchString("^package crunchstat\n", string(data)); err != nil || !matched {
+               t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+       }
+}
index 962a690d6813cb96e14f06c7d5fb430d5d5c8e98..cf968809b9016d1df1ab408bd75b4192b4ad1eb5 100644 (file)
@@ -110,6 +110,7 @@ class ArvCwlRunner(object):
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
+        self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
 
         if kwargs.get("create_template"):
             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
@@ -119,16 +120,20 @@ class ArvCwlRunner(object):
 
         self.debug = kwargs.get("debug")
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
-        self.fs_access = CollectionFsAccess(kwargs["basedir"])
 
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+        kwargs["use_container"] = True
+        kwargs["tmpdir_prefix"] = "tmp"
+        kwargs["on_error"] = "continue"
 
         if self.work_api == "containers":
             kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
+            kwargs["docker_outdir"] = "$(task.outdir)"
             kwargs["tmpdir"] = "$(task.tmpdir)"
 
         runnerjob = None
@@ -169,7 +174,6 @@ class ArvCwlRunner(object):
                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
             jobiter = tool.job(job_order,
                                self.output_callback,
-                               docker_outdir="$(task.outdir)",
                                **kwargs)
 
         try:
@@ -211,6 +215,9 @@ class ArvCwlRunner(object):
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
+        if self.final_status != "success":
+            raise WorkflowException("Workflow failed.")
+
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
@@ -303,6 +310,7 @@ def main(args, stdout, stderr, api_client=None):
         return 1
 
     arvargs.conformance_test = None
+    arvargs.use_container = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
index 9bf93e7c5661420727ff01ad3da9e16aaec40271..73ba8d52089049fad65db3a0ded1da9a63472759 100644 (file)
@@ -3,7 +3,8 @@ import json
 import os
 
 from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
+from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.pathmapper import adjustFiles
 
 import arvados.collection
 
@@ -41,14 +42,24 @@ class ArvadosContainer(object):
             }
         }
 
+        dirs = set()
         for f in self.pathmapper.files():
-            _, p = self.pathmapper.mapper(f)
-            mounts[p] = {
-                "kind": "collection",
-                "portable_data_hash": p[6:]
-            }
+            _, p, tp = self.pathmapper.mapper(f)
+            if tp == "Directory" and '/' not in p[6:]:
+                mounts[p] = {
+                    "kind": "collection",
+                    "portable_data_hash": p[6:]
+                }
+                dirs.add(p[6:])
+        for f in self.pathmapper.files():
+            _, p, tp = self.pathmapper.mapper(f)
+            if p[6:].split("/")[0] not in dirs:
+                mounts[p] = {
+                    "kind": "collection",
+                    "portable_data_hash": p[6:]
+                }
 
-        if self.generatefiles:
+        if self.generatefiles["listing"]:
             raise UnsupportedRequirement("Generate files not supported")
 
         container_request["environment"] = {"TMPDIR": "/tmp"}
@@ -58,6 +69,9 @@ class ArvadosContainer(object):
         if self.stdin:
             raise UnsupportedRequirement("Stdin redirection currently not suppported")
 
+        if self.stderr:
+            raise UnsupportedRequirement("Stderr redirection currently not suppported")
+
         if self.stdout:
             mounts["stdout"] = {"kind": "file",
                                 "path": "%s/%s" % (self.outdir, self.stdout)}
index f129dfa80436451b0e0fc04dd0aef93df60f3417..00355973bd6c6e1416fa3e96480c96771bdc3227 100644 (file)
@@ -4,7 +4,7 @@ import copy
 
 from cwltool.process import get_feature, shortname
 from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
+from cwltool.draft2tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 
@@ -12,6 +12,7 @@ import arvados.collection
 
 from .arvdocker import arv_docker_get_image
 from .runner import Runner
+from .pathmapper import InitialWorkDirPathMapper
 from . import done
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -34,30 +35,42 @@ class ArvadosJob(object):
         }
         runtime_constraints = {}
 
-        if self.generatefiles:
+        if self.generatefiles["listing"]:
             vwd = arvados.collection.Collection()
             script_parameters["task.vwd"] = {}
-            for t in self.generatefiles:
-                if isinstance(self.generatefiles[t], dict):
-                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
-                    vwd.copy(rest, t, source_collection=src)
-                else:
-                    with vwd.open(t, "w") as f:
-                        f.write(self.generatefiles[t].encode('utf-8'))
+            generatemapper = InitialWorkDirPathMapper([self.generatefiles], "", "",
+                                        separateDirs=False)
+            for f, p in generatemapper.items():
+                if p.type == "CreateFile":
+                    with vwd.open(p.target, "w") as n:
+                        n.write(p.resolved.encode("utf-8"))
             vwd.save_new()
-            for t in self.generatefiles:
-                script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+            for f, p in generatemapper.items():
+                if p.type == "File":
+                    script_parameters["task.vwd"][p.target] = p.resolved
+                if p.type == "CreateFile":
+                    script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target)
 
         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
         if self.environment:
             script_parameters["task.env"].update(self.environment)
 
         if self.stdin:
-            script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+            script_parameters["task.stdin"] = self.stdin
 
         if self.stdout:
             script_parameters["task.stdout"] = self.stdout
 
+        if self.stderr:
+            script_parameters["task.stderr"] = self.stderr
+
+        if self.successCodes:
+            script_parameters["task.successCodes"] = self.successCodes
+        if self.temporaryFailCodes:
+            script_parameters["task.temporaryFailCodes"] = self.temporaryFailCodes
+        if self.permanentFailCodes:
+            script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
+
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
@@ -284,7 +297,7 @@ class RunnerTemplate(object):
 
             # Title and description...
             title = param.pop('label', '')
-            descr = param.pop('description', '').rstrip('\n')
+            descr = param.pop('doc', '').rstrip('\n')
             if title:
                 param['title'] = title
             if descr:
@@ -297,8 +310,8 @@ class RunnerTemplate(object):
                 pass
             elif not isinstance(value, dict):
                 param['value'] = value
-            elif param.get('dataclass') == 'File' and value.get('path'):
-                param['value'] = value['path']
+            elif param.get('dataclass') == 'File' and value.get('location'):
+                param['value'] = value['location']
 
             spec['script_parameters'][param_id] = param
         spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
index a2dffa675a164e296dc88e966f02de273f2f69c4..7107ba0a301ff9a80cf03386f78817ec021b2fd9 100644 (file)
@@ -17,7 +17,8 @@ class ArvadosCommandTool(CommandLineTool):
         elif self.work_api == "jobs":
             return ArvadosJob(self.arvrunner)
 
-    def makePathMapper(self, reffiles, **kwargs):
+    def makePathMapper(self, reffiles, stagedir, **kwargs):
+        # type: (List[Any], unicode, **Any) -> PathMapper
         if self.work_api == "containers":
             return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
                                  "/keep/%s",
@@ -32,6 +33,8 @@ class ArvadosCommandTool(CommandLineTool):
     def job(self, joborder, output_callback, **kwargs):
         if self.work_api == "containers":
             kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["docker_outdir"] = "/var/spool/cwl"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
+            kwargs["docker_outdir"] = "$(task.outdir)"
         return super(ArvadosCommandTool, self).job(joborder, output_callback, **kwargs)
index 28b0feeff46fd74d6a3bb0a10ffbca48f912fb28..d2d38b00c35a89f2a1a1f179a0468b821e48670a 100644 (file)
@@ -2,15 +2,18 @@ import fnmatch
 import os
 
 import cwltool.process
+from cwltool.pathmapper import abspath
 
 import arvados.util
 import arvados.collection
+import arvados.arvfile
 
 class CollectionFsAccess(cwltool.process.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
 
-    def __init__(self, basedir):
+    def __init__(self, basedir, api_client=None):
         super(CollectionFsAccess, self).__init__(basedir)
+        self.api_client = api_client
         self.collections = {}
 
     def get_collection(self, path):
@@ -18,7 +21,7 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
             pdh = p[0][5:]
             if pdh not in self.collections:
-                self.collections[pdh] = arvados.collection.CollectionReader(pdh)
+                self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client)
             return (self.collections[pdh], "/".join(p[1:]))
         else:
             return (None, path)
@@ -47,6 +50,8 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
 
     def glob(self, pattern):
         collection, rest = self.get_collection(pattern)
+        if collection and not rest:
+            return [pattern]
         patternsegments = rest.split("/")
         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
 
@@ -55,11 +60,47 @@ class CollectionFsAccess(cwltool.process.StdFsAccess):
         if collection:
             return collection.open(rest, mode)
         else:
-            return open(self._abs(fn), mode)
+            return super(CollectionFsAccess, self).open(self._abs(fn), mode)
 
     def exists(self, fn):
         collection, rest = self.get_collection(fn)
         if collection:
             return collection.exists(rest)
         else:
-            return os.path.exists(self._abs(fn))
+            return super(CollectionFsAccess, self).exists(fn)
+
+    def isfile(self, fn):  # type: (unicode) -> bool
+        collection, rest = self.get_collection(fn)
+        if collection:
+            if rest:
+                return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
+            else:
+                return False
+        else:
+            return super(CollectionFsAccess, self).isfile(fn)
+
+    def isdir(self, fn):  # type: (unicode) -> bool
+        collection, rest = self.get_collection(fn)
+        if collection:
+            if rest:
+                return isinstance(collection.find(rest), arvados.collection.Collection)
+            else:
+                return True
+        else:
+            return super(CollectionFsAccess, self).isdir(fn)
+
+    def listdir(self, fn):  # type: (unicode) -> List[unicode]
+        collection, rest = self.get_collection(fn)
+        if rest:
+            dir = collection.find(rest)
+        else:
+            dir = collection
+        if collection:
+            return [abspath(l, fn) for l in dir.keys()]
+        else:
+            return super(CollectionFsAccess, self).listdir(fn)
+
+    def join(self, path, *paths): # type: (unicode, *unicode) -> unicode
+        if paths and paths[-1].startswith("keep:") and arvados.util.keep_locator_pattern.match(paths[-1][5:]):
+            return paths[-1]
+        return os.path.join(path, *paths)
index 9538a9176f4c7bd876376452a3da8ea38dd21bfb..fb4ae5ad45ffdb3331dc5151915cb71987d241b5 100644 (file)
 import re
+import logging
+import uuid
 
 import arvados.commands.run
 import arvados.collection
-import cwltool.pathmapper
 
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
+from cwltool.pathmapper import PathMapper, MapperEnt, abspath
+from cwltool.workflow import WorkflowException
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+class ArvPathMapper(PathMapper):
     """Convert container-local paths to and from Keep collection ids."""
 
+    pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
+    pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.+)?$')
+
     def __init__(self, arvrunner, referenced_files, input_basedir,
                  collection_pattern, file_pattern, name=None, **kwargs):
-        self._pathmap = arvrunner.get_uploaded()
-        uploadfiles = set()
-
-        pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+        self.arvrunner = arvrunner
+        self.input_basedir = input_basedir
+        self.collection_pattern = collection_pattern
+        self.file_pattern = file_pattern
+        self.name = name
+        super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
 
-        for src in referenced_files:
-            if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, collection_pattern % src[5:])
+    def visit(self, srcobj, uploadfiles):
+        src = srcobj["location"]
+        if srcobj["class"] == "File":
             if "#" in src:
                 src = src[:src.index("#")]
+            if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
+                self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
             if src not in self._pathmap:
-                ab = cwltool.pathmapper.abspath(src, input_basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
-                if kwargs.get("conformance_test"):
-                    self._pathmap[src] = (src, ab)
-                elif isinstance(st, arvados.commands.run.UploadFile):
+                # Local FS ref, may need to be uploaded or may be on keep
+                # mount.
+                ab = abspath(src, self.input_basedir)
+                st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+                if isinstance(st, arvados.commands.run.UploadFile):
                     uploadfiles.add((src, ab, st))
                 elif isinstance(st, arvados.commands.run.ArvFile):
-                    self._pathmap[src] = (ab, st.fn)
+                    self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+                elif src.startswith("_:") and "contents" in srcobj:
+                    pass
                 else:
-                    raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+                    raise WorkflowException("Input file path '%s' is invalid" % st)
+            if "secondaryFiles" in srcobj:
+                for l in srcobj["secondaryFiles"]:
+                    self.visit(l, uploadfiles)
+        elif srcobj["class"] == "Directory":
+            if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+                self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "Directory")
+            else:
+                for l in srcobj["listing"]:
+                    self.visit(l, uploadfiles)
+
+    def addentry(self, obj, c, path, subdirs):
+        if obj["location"] in self._pathmap:
+            src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
+            c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+            for l in obj.get("secondaryFiles", []):
+                self.addentry(l, c, path, subdirs)
+        elif obj["class"] == "Directory":
+            for l in obj["listing"]:
+                self.addentry(l, c, path + "/" + obj["basename"], subdirs)
+            subdirs.append((obj["location"], path + "/" + obj["basename"]))
+        elif obj["location"].startswith("_:") and "contents" in obj:
+            with c.open(path + "/" + obj["basename"], "w") as f:
+                f.write(obj["contents"].encode("utf-8"))
+        else:
+            raise WorkflowException("Don't know what to do with '%s'" % obj["location"])
+
+    def setup(self, referenced_files, basedir):
+        # type: (List[Any], unicode) -> None
+        self._pathmap = self.arvrunner.get_uploaded()
+        uploadfiles = set()
+
+        for srcobj in referenced_files:
+            self.visit(srcobj, uploadfiles)
 
         if uploadfiles:
             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
-                                             arvrunner.api,
-                                             dry_run=kwargs.get("dry_run"),
-                                             num_retries=3,
-                                             fnPattern=file_pattern,
-                                             name=name,
-                                             project=arvrunner.project_uuid)
+                                             self.arvrunner.api,
+                                             dry_run=False,
+                                             num_retries=self.arvrunner.num_retries,
+                                             fnPattern=self.file_pattern,
+                                             name=self.name,
+                                             project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            arvrunner.add_uploaded(src, (ab, st.fn))
-            self._pathmap[src] = (ab, st.fn)
+            self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File")
+            self.arvrunner.add_uploaded(src, self._pathmap[src])
+
+        for srcobj in referenced_files:
+            if srcobj["class"] == "Directory":
+                if srcobj["location"] not in self._pathmap:
+                    c = arvados.collection.Collection(api_client=self.arvrunner.api,
+                                                      num_retries=self.arvrunner.num_retries)
+                    subdirs = []
+                    for l in srcobj["listing"]:
+                        self.addentry(l, c, ".", subdirs)
+
+                    check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
+                    if not check["items"]:
+                        c.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+                    ab = self.collection_pattern % c.portable_data_hash()
+                    self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "Directory")
+                    for loc, sub in subdirs:
+                        ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+                        self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+            elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
+                (srcobj["location"].startswith("_:") and "contents" in srcobj)):
+
+                c = arvados.collection.Collection(api_client=self.arvrunner.api,
+                                                  num_retries=self.arvrunner.num_retries                                                  )
+                subdirs = []
+                self.addentry(srcobj, c, ".", subdirs)
+
+                check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
+                if not check["items"]:
+                    c.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+                ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
+                self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "File")
+                if srcobj.get("secondaryFiles"):
+                    ab = self.collection_pattern % c.portable_data_hash()
+                    self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt(ab, ab, "Directory")
+                for loc, sub in subdirs:
+                    ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+                    self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
 
         self.keepdir = None
 
@@ -53,3 +140,17 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
             return (target, "keep:" + target[len(self.keepdir)+1:])
         else:
             return super(ArvPathMapper, self).reversemap(target)
+
+class InitialWorkDirPathMapper(PathMapper):
+    def setup(self, referenced_files, basedir):
+        # type: (List[Any], unicode) -> None
+
+        # Go through each file and set the target to its own directory along
+        # with any secondary files.
+        stagedir = self.stagedir
+        for fob in referenced_files:
+            self.visit(fob, stagedir, basedir)
+
+        for path, (ab, tgt, type) in self._pathmap.items():
+            if type in ("File", "Directory") and ab.startswith("keep:"):
+                self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
index 629b1042bb75400b9e8c6b05dacd65e3876362fc..d7d5d2b32d620f96683294757e27d0f3396307f1 100644 (file)
@@ -3,11 +3,14 @@ import urlparse
 from functools import partial
 import logging
 import json
+import re
 
+import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
 from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import arvados.collection
 
@@ -16,6 +19,8 @@ from .pathmapper import ArvPathMapper
 
 logger = logging.getLogger('arvados.cwl-runner')
 
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner
@@ -41,15 +46,14 @@ class Runner(object):
     def arvados_job_spec(self, *args, **kwargs):
         self.upload_docker(self.tool)
 
-        workflowfiles = set()
-        jobfiles = set()
-        workflowfiles.add(self.tool.tool["id"])
+        workflowfiles = []
+        jobfiles = []
+        workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
 
         self.name = os.path.basename(self.tool.tool["id"])
 
         def visitFiles(files, path):
-            files.add(path)
-            return path
+            files.append(path)
 
         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
         loaded = set()
@@ -63,10 +67,15 @@ class Runner(object):
 
         sc = scandeps(uri, workflowobj,
                       set(("$import", "run")),
-                      set(("$include", "$schemas", "path")),
+                      set(("$include", "$schemas", "path", "location")),
                       loadref)
-        adjustFiles(sc, partial(visitFiles, workflowfiles))
-        adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+        adjustFileObjs(sc, partial(visitFiles, workflowfiles))
+        adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
+        adjustDirObjs(sc, partial(visitFiles, workflowfiles))
+        adjustDirObjs(self.job_order, partial(visitFiles, jobfiles))
+
+        normalizeFilesDirs(jobfiles)
+        normalizeFilesDirs(workflowfiles)
 
         keepprefix = kwargs.get("keepprefix", "")
         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
@@ -81,7 +90,10 @@ class Runner(object):
                                   name=os.path.basename(self.job_order.get("id", "#")),
                                   **kwargs)
 
-        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+        def setloc(p):
+            p["location"] = jobmapper.mapper(p["location"])[1]
+        adjustFileObjs(self.job_order, setloc)
+        adjustDirObjs(self.job_order, setloc)
 
         if "id" in self.job_order:
             del self.job_order["id"]
@@ -109,12 +121,12 @@ class Runner(object):
                 outc = arvados.collection.Collection(record["output"])
                 with outc.open("cwl.output.json") as f:
                     outputs = json.load(f)
-                def keepify(path):
+                def keepify(fileobj):
+                    path = fileobj["location"]
                     if not path.startswith("keep:"):
-                        return "keep:%s/%s" % (record["output"], path)
-                    else:
-                        return path
-                adjustFiles(outputs, keepify)
+                        fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+                adjustFileObjs(outputs, keepify)
+                adjustDirObjs(outputs, keepify)
             except Exception as e:
                 logger.error("While getting final output object: %s", e)
             self.arvrunner.output_callback(outputs, processStatus)
index 131350d5f9ef58aae50757c37e88988b7a96fd1a..36ad22b4baefa0f6ab28349e8688b6627718ff35 100644 (file)
@@ -29,8 +29,10 @@ setup(name='arvados-cwl-runner',
           'bin/cwl-runner',
           'bin/arvados-cwl-runner'
       ],
+      # Make sure to update arvados/build/run-build-packages.sh as well
+      # when updating the cwltool version pin.
       install_requires=[
-          'cwltool==1.0.20160630171631',
+          'cwltool==1.0.20160714182449',
           'arvados-python-client>=0.1.20160322001610'
       ],
       data_files=[
index 5501e2964557fa5346d32ab798aa3e4a2381abb8..29109a4e6a61309b7968b7b81b5bb0eeca3499ea 100755 (executable)
@@ -1,5 +1,7 @@
 #!/bin/sh
 
+set -x
+
 if ! which arvbox >/dev/null ; then
     export PATH=$PATH:$(readlink -f $(dirname $0)/../../tools/arvbox/bin)
 fi
@@ -8,6 +10,7 @@ reset_container=1
 leave_running=0
 config=dev
 docker_pull=1
+tag=""
 
 while test -n "$1" ; do
     arg="$1"
@@ -28,8 +31,12 @@ while test -n "$1" ; do
             docker_pull=0
             shift
             ;;
+        --tag)
+            tag=$2
+            shift ; shift
+            ;;
         -h|--help)
-            echo "$0 [--no-reset-container] [--leave-running] [--no-docker-pull] [--config dev|localdemo]"
+            echo "$0 [--no-reset-container] [--leave-running] [--no-docker-pull] [--config dev|localdemo] [--tag docker_tag]"
             exit
             ;;
         *)
@@ -46,7 +53,7 @@ if test $reset_container = 1 ; then
     arvbox reset -f
 fi
 
-arvbox start $config
+arvbox start $config $tag
 
 arvbox pipe <<EOF
 set -eu -o pipefail
@@ -69,7 +76,7 @@ export ARVADOS_API_HOST_INSECURE=1
 export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
 
 if test $docker_pull = 1 ; then
-  arv-keepdocker --pull arvados/jobs
+  arv-keepdocker --pull arvados/jobs $tag
 fi
 
 cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
index 6f056ab0c4fcd1d2f33ac4106fb51442a0022e19..2dff3e8f9dbe422a7ca6e95037384cc4618c920b 100644 (file)
@@ -41,17 +41,24 @@ def stubs(func):
         stubs.api.collections().create().execute.side_effect = ({
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
             "portable_data_hash": "99999999999999999999999999999991+99",
+            "manifest_text": ""
         }, {
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
             "portable_data_hash": "99999999999999999999999999999992+99",
+            "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"
         },
         {
             "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4",
             "portable_data_hash": "99999999999999999999999999999994+99",
             "manifest_text": ""
-        })
+        },
+        {
+            "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz5",
+            "portable_data_hash": "99999999999999999999999999999995+99",
+            "manifest_text": ""
+        }        )
         stubs.api.collections().get().execute.return_value = {
-            "portable_data_hash": "99999999999999999999999999999993+99"}
+            "portable_data_hash": "99999999999999999999999999999993+99", "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"}
 
         stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         stubs.api.jobs().create().execute.return_value = {
@@ -76,7 +83,8 @@ def stubs(func):
             },
             'script_parameters': {
                 'x': {
-                    'path': '99999999999999999999999999999992+99/blorp.txt',
+                    'basename': 'blorp.txt',
+                    'location': '99999999999999999999999999999994+99/blorp.txt',
                     'class': 'File'
                 },
                 'cwl:tool':
@@ -103,7 +111,7 @@ def stubs(func):
                     'kind': 'file'
                 },
                 '/var/lib/cwl/job/cwl.input.json': {
-                    'portable_data_hash': '33be5c865fe12e1e4788d2f1bc627f7a+60/cwl.input.json',
+                    'portable_data_hash': '765fda0d9897729ff467a4609879c00a+60/cwl.input.json',
                     'kind': 'collection'
                 }
             },
@@ -138,13 +146,19 @@ class TestSubmit(unittest.TestCase):
             mock.call(),
             mock.call(body={
                 'manifest_text':
-                './tool a3954c369b8924d40547ec8cf5f6a7f4+449 '
-                '0:16:blub.txt 16:433:submit_tool.cwl\n./wf '
-                'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n',
+                './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
+                '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
+                '4d31c5fefd087faf67ca8db0111af36c+353 0:353:submit_wf.cwl\n',
                 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
                 'name': 'submit_wf.cwl',
             }, ensure_unique_name=True),
             mock.call().execute(),
+            mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
+                            '0:0:blub.txt 0:0:submit_tool.cwl\n',
+                            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                            'name': 'New collection'},
+                      ensure_unique_name=True),
+            mock.call().execute(num_retries=4),
             mock.call(body={
                 'manifest_text':
                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
@@ -191,13 +205,19 @@ class TestSubmit(unittest.TestCase):
             mock.call(),
             mock.call(body={
                 'manifest_text':
-                './tool a3954c369b8924d40547ec8cf5f6a7f4+449 '
-                '0:16:blub.txt 16:433:submit_tool.cwl\n./wf '
-                'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n',
+                './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
+                '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
+                '4d31c5fefd087faf67ca8db0111af36c+353 0:353:submit_wf.cwl\n',
                 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
                 'name': 'submit_wf.cwl',
             }, ensure_unique_name=True),
             mock.call().execute(),
+            mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
+                            '0:0:blub.txt 0:0:submit_tool.cwl\n',
+                            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                            'name': 'New collection'},
+                      ensure_unique_name=True),
+            mock.call().execute(num_retries=4),
             mock.call(body={
                 'manifest_text':
                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
@@ -222,7 +242,7 @@ class TestCreateTemplate(unittest.TestCase):
         capture_stdout = cStringIO.StringIO()
 
         exited = arvados_cwl.main(
-            ["--create-template", "--no-wait",
+            ["--create-template", "--no-wait", "--debug",
              "--project-uuid", project_uuid,
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             capture_stdout, sys.stderr, api_client=stubs.api)
@@ -236,7 +256,7 @@ class TestCreateTemplate(unittest.TestCase):
             'dataclass': 'File',
             'required': True,
             'type': 'File',
-            'value': '99999999999999999999999999999992+99/blorp.txt',
+            'value': '99999999999999999999999999999994+99/blorp.txt',
         }
         expect_template = {
             "components": {
@@ -327,7 +347,7 @@ class TestTemplateInputs(unittest.TestCase):
         expect_template["owner_uuid"] = stubs.fake_user_uuid
         params = expect_template[
             "components"]["inputs_test.cwl"]["script_parameters"]
-        params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
+        params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
         params["floatInput"]["value"] = 1.234
         params["boolInput"]["value"] = True
 
index e9fa423ee664bf1c3513d9d8ed80c01616ee2c71..19df1de2086b70b462579b8c46dfa02cfe995dc4 100644 (file)
@@ -4,7 +4,7 @@
 # value blub.txt) and uploading to Keep works as intended.
 
 class: CommandLineTool
-cwlVersion: draft-3
+cwlVersion: v1.0
 requirements:
   - class: DockerRequirement
     dockerPull: debian:8
@@ -13,7 +13,7 @@ inputs:
     type: File
     default:
       class: File
-      path: blub.txt
+      location: blub.txt
     inputBinding:
       position: 1
 outputs: []
index ec43207c696aa8916f89981f4fc71d01d79a9773..5fea4fdddfe9058745f2ded5d070c324d9b6a4e0 100644 (file)
@@ -2,12 +2,12 @@
 # various input types as script_parameters in pipeline templates.
 
 class: Workflow
-cwlVersion: draft-3
+cwlVersion: v1.0
 inputs:
   - id: "#fileInput"
     type: File
     label: It's a file; we expect to find some characters in it.
-    description: |
+    doc: |
       If there were anything further to say, it would be said here,
       or here.
   - id: "#boolInput"
@@ -22,7 +22,7 @@ inputs:
 outputs: []
 steps:
   - id: step1
-    inputs:
-      - { id: x, source: "#x" }
-    outputs: []
+    in:
+      - { id: x, source: "#fileInput" }
+    out: []
     run: ../tool/submit_tool.cwl
index 36db603cc6040ed181986b9322dc50e4d4634b32..874c72c50f538b918c975b617c6ade3f1f680f72 100644 (file)
@@ -4,14 +4,14 @@
 # (e.g. submit_tool.cwl) and uploading to Keep works as intended.
 
 class: Workflow
-cwlVersion: draft-3
+cwlVersion: v1.0
 inputs:
   - id: x
     type: File
 outputs: []
 steps:
   - id: step1
-    inputs:
+    in:
       - { id: x, source: "#x" }
-    outputs: []
+    out: []
     run: ../tool/submit_tool.cwl
index 422ad9037acc45ed6a2a39b9f8be5ab9bedcf3c8..5011aa81f689c0bf1d6098c123bcaeee14ba6c41 100644 (file)
@@ -117,7 +117,7 @@ func TestAnythingToValues(t *testing.T) {
                        },
                },
                {
-                       in: map[string]interface{}{"foo": map[string]interface{}{"bar":1.234}},
+                       in: map[string]interface{}{"foo": map[string]interface{}{"bar": 1.234}},
                        ok: func(out url.Values) bool {
                                return out.Get("foo") == `{"bar":1.234}`
                        },
index 4af1b7910f6f3b111583ad91fa5416ef520b4ac5..b29748a2247342a2497a4d4018e41da5174e471e 100644 (file)
@@ -30,6 +30,7 @@ type KeepServiceList struct {
 // us about a stored block.
 type KeepServiceIndexEntry struct {
        SizedDigest
+       // Time of last write, in nanoseconds since Unix epoch
        Mtime int64
 }
 
@@ -108,6 +109,14 @@ func (s *KeepService) Index(c *Client, prefix string) ([]KeepServiceIndexEntry,
                if err != nil {
                        return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
                }
+               if mtime < 1e12 {
+                       // An old version of keepstore is giving us
+                       // timestamps in seconds instead of
+                       // nanoseconds. (This threshold correctly
+                       // handles all times between 1970-01-02 and
+                       // 33658-09-27.)
+                       mtime = mtime * 1e9
+               }
                entries = append(entries, KeepServiceIndexEntry{
                        SizedDigest: SizedDigest(fields[0]),
                        Mtime:       mtime,
index 14c75afff282cbfd6fc389f0d81678cadb502260..040d7c20be43ab3edd5d0a4415362c35578b9af9 100644 (file)
@@ -20,6 +20,7 @@ type TaskDef struct {
        Env                map[string]string `json:"task.env"`
        Stdin              string            `json:"task.stdin"`
        Stdout             string            `json:"task.stdout"`
+       Stderr             string            `json:"task.stderr"`
        Vwd                map[string]string `json:"task.vwd"`
        SuccessCodes       []int             `json:"task.successCodes"`
        PermanentFailCodes []int             `json:"task.permanentFailCodes"`
@@ -80,13 +81,13 @@ func checkOutputFilename(outdir, fn string) error {
        return nil
 }
 
-func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
        if taskp.Vwd != nil {
                for k, v := range taskp.Vwd {
                        v = substitute(v, replacements)
                        err = checkOutputFilename(outdir, k)
                        if err != nil {
-                               return "", "", err
+                               return "", "", "", err
                        }
                        os.Symlink(v, outdir+"/"+k)
                }
@@ -97,26 +98,39 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
                stdin = substitute(taskp.Stdin, replacements)
                cmd.Stdin, err = os.Open(stdin)
                if err != nil {
-                       return "", "", err
+                       return "", "", "", err
                }
        }
 
        if taskp.Stdout != "" {
                err = checkOutputFilename(outdir, taskp.Stdout)
                if err != nil {
-                       return "", "", err
+                       return "", "", "", err
                }
                // Set up stdout redirection
                stdout = outdir + "/" + taskp.Stdout
                cmd.Stdout, err = os.Create(stdout)
                if err != nil {
-                       return "", "", err
+                       return "", "", "", err
                }
        } else {
                cmd.Stdout = os.Stdout
        }
 
-       cmd.Stderr = os.Stderr
+       if taskp.Stderr != "" {
+               err = checkOutputFilename(outdir, taskp.Stderr)
+               if err != nil {
+                       return "", "", "", err
+               }
+               // Set up stderr redirection
+               stderr = outdir + "/" + taskp.Stderr
+               cmd.Stderr, err = os.Create(stderr)
+               if err != nil {
+                       return "", "", "", err
+               }
+       } else {
+               cmd.Stderr = os.Stderr
+       }
 
        if taskp.Env != nil {
                // Set up subprocess environment
@@ -126,7 +140,7 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[
                        cmd.Env = append(cmd.Env, k+"="+v)
                }
        }
-       return stdin, stdout, nil
+       return stdin, stdout, stderr, nil
 }
 
 // Set up signal handlers.  Go sends signal notifications to a "signal
@@ -227,8 +241,8 @@ func runner(api IArvadosClient,
 
        cmd.Dir = outdir
 
-       var stdin, stdout string
-       stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+       var stdin, stdout, stderr string
+       stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
        if err != nil {
                return err
        }
@@ -240,7 +254,10 @@ func runner(api IArvadosClient,
        if stdout != "" {
                stdout = " > " + stdout
        }
-       log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+       if stderr != "" {
+               stderr = " 2> " + stderr
+       }
+       log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
 
        var caughtSignal os.Signal
        sigChan := setupSignals(cmd)
index 52d5c1a64e5181fe78aecf891057dc89b18bc836..9805412d13fd5fb53d6809eee9a7e9d379ef74eb 100644 (file)
@@ -53,7 +53,7 @@ func (s *TestSuite) TestSimpleRun(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"echo", "foo"}}}}},
                Task{Sequence: 0})
        c.Check(err, IsNil)
@@ -89,8 +89,8 @@ func (s *TestSuite) TestSimpleRunSubtask(c *C) {
                tmpdir,
                "",
                Job{Script_parameters: Tasks{[]TaskDef{
-                       TaskDef{Command: []string{"echo", "bar"}},
-                       TaskDef{Command: []string{"echo", "foo"}}}}},
+                       {Command: []string{"echo", "bar"}},
+                       {Command: []string{"echo", "foo"}}}}},
                Task{Parameters: TaskDef{
                        Command: []string{"echo", "foo"},
                        Stdout:  "output.txt"},
@@ -118,7 +118,7 @@ func (s *TestSuite) TestRedirect(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"cat"},
                        Stdout:  "output.txt",
                        Stdin:   tmpfile.Name()}}}},
@@ -140,7 +140,7 @@ func (s *TestSuite) TestEnv(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"/bin/sh", "-c", "echo $BAR"},
                        Stdout:  "output.txt",
                        Env:     map[string]string{"BAR": "foo"}}}}},
@@ -161,7 +161,7 @@ func (s *TestSuite) TestEnvSubstitute(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "foo\n",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"/bin/sh", "-c", "echo $BAR"},
                        Stdout:  "output.txt",
                        Env:     map[string]string{"BAR": "$(task.keep)"}}}}},
@@ -182,7 +182,7 @@ func (s *TestSuite) TestEnvReplace(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"/bin/sh", "-c", "echo $PATH"},
                        Stdout:  "output.txt",
                        Env:     map[string]string{"PATH": "foo"}}}}},
@@ -211,12 +211,12 @@ func (t SubtaskTestClient) Update(resourceType string, uuid string, parameters a
 func (s *TestSuite) TestScheduleSubtask(c *C) {
 
        api := SubtaskTestClient{c, []Task{
-               Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+               {Job_uuid: "zzzz-8i9sb-111111111111111",
                        Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
                        Sequence:                 1,
                        Parameters: TaskDef{
                                Command: []string{"echo", "bar"}}},
-               Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+               {Job_uuid: "zzzz-8i9sb-111111111111111",
                        Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
                        Sequence:                 1,
                        Parameters: TaskDef{
@@ -234,8 +234,8 @@ func (s *TestSuite) TestScheduleSubtask(c *C) {
                tmpdir,
                "",
                Job{Script_parameters: Tasks{[]TaskDef{
-                       TaskDef{Command: []string{"echo", "bar"}},
-                       TaskDef{Command: []string{"echo", "foo"}}}}},
+                       {Command: []string{"echo", "bar"}},
+                       {Command: []string{"echo", "foo"}}}}},
                Task{Sequence: 0})
        c.Check(err, IsNil)
 
@@ -252,7 +252,7 @@ func (s *TestSuite) TestRunFail(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
                Task{Sequence: 0})
        c.Check(err, FitsTypeOf, PermFail{})
@@ -269,7 +269,7 @@ func (s *TestSuite) TestRunSuccessCode(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command:      []string{"/bin/sh", "-c", "exit 1"},
                        SuccessCodes: []int{0, 1}}}}},
                Task{Sequence: 0})
@@ -287,7 +287,7 @@ func (s *TestSuite) TestRunFailCode(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command:            []string{"/bin/sh", "-c", "exit 0"},
                        PermanentFailCodes: []int{0, 1}}}}},
                Task{Sequence: 0})
@@ -305,7 +305,7 @@ func (s *TestSuite) TestRunTempFailCode(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command:            []string{"/bin/sh", "-c", "exit 1"},
                        TemporaryFailCodes: []int{1}}}}},
                Task{Sequence: 0})
@@ -329,7 +329,7 @@ func (s *TestSuite) TestVwd(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"ls", "output.txt"},
                        Vwd: map[string]string{
                                "output.txt": tmpfile.Name()}}}}},
@@ -361,7 +361,7 @@ func (s *TestSuite) TestSubstitutionStdin(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                keepmount,
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"cat"},
                        Stdout:  "output.txt",
                        Stdin:   "$(task.keep)/file1.txt"}}}},
@@ -389,7 +389,7 @@ func (s *TestSuite) TestSubstitutionCommandLine(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                keepmount,
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"cat", "$(task.keep)/file1.txt"},
                        Stdout:  "output.txt"}}}},
                Task{Sequence: 0})
@@ -417,7 +417,7 @@ func (s *TestSuite) TestSignal(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"sleep", "4"}}}}},
                Task{Sequence: 0})
        c.Check(err, FitsTypeOf, PermFail{})
@@ -437,7 +437,7 @@ func (s *TestSuite) TestQuoting(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+               Job{Script_parameters: Tasks{[]TaskDef{{
                        Command: []string{"echo", "foo"},
                        Stdout:  "s ub:dir/:e vi\nl"}}}},
                Task{Sequence: 0})
index bed60f499562a36c4585018932860fe35df34701..33bb58710e0c94e1cfa562b8bd1c56afff62a4d7 100644 (file)
@@ -243,7 +243,7 @@ GET:
        // In case we exited the above loop early: before returning,
        // drain the toGet channel so its sender doesn't sit around
        // blocking forever.
-       for _ = range r.toGet {
+       for range r.toGet {
        }
 }
 
index 5d29c45117acd71e924838bb9b758af77d8e9b91..54df452394e47bc7b44437bf580a3af2dc17b36e 100644 (file)
@@ -171,6 +171,7 @@ def uploadfiles(files, api, dry_run=False, num_retries=0, project=None, fnPatter
         pdh = item["portable_data_hash"]
 
     for c in files:
+        c.keepref = "%s/%s" % (pdh, c.fn)
         c.fn = fnPattern % (pdh, c.fn)
 
     os.chdir(orgdir)
index 778b90912cb48adf3ce0f77979abcc93d7a83925..80633b7c37593b72d29b7053d312262f54607a12 100644 (file)
@@ -9,6 +9,7 @@ import Queue
 import re
 import socket
 import ssl
+import sys
 import threading
 import timer
 
@@ -22,6 +23,17 @@ _logger = logging.getLogger('arvados.keep')
 global_client_object = None
 
 
+# Monkey patch TCP constants when not available (apple). Values sourced from:
+# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
+if sys.platform == 'darwin':
+    if not hasattr(socket, 'TCP_KEEPALIVE'):
+        socket.TCP_KEEPALIVE = 0x010
+    if not hasattr(socket, 'TCP_KEEPINTVL'):
+        socket.TCP_KEEPINTVL = 0x101
+    if not hasattr(socket, 'TCP_KEEPCNT'):
+        socket.TCP_KEEPCNT = 0x102
+
+
 class KeepLocator(object):
     EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
     HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
@@ -299,7 +311,9 @@ class KeepClient(object):
             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
             s = socket.socket(family, socktype, protocol)
             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
-            s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+            # Will throw invalid protocol error on mac. This test prevents that.
+            if hasattr(socket, 'TCP_KEEPIDLE'):
+                s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
             s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
             return s
 
index 155bcedc62c4dbcbfb03f4be8488bd5993f4efbc..8f2d4406fe1c50cc4f4f3529367e970525300dcc 100644 (file)
@@ -192,7 +192,7 @@ def _fifo2stderr(label):
             raise
     os.mkfifo(fifo, 0700)
     subprocess.Popen(
-        ['sed', '-e', 's/^/['+label+'] /', fifo],
+        ['stdbuf', '-i0', '-oL', '-eL', 'sed', '-e', 's/^/['+label+'] /', fifo],
         stdout=sys.stderr)
     return fifo
 
index eae6dca8c0332ae820fbedbb3965f3112453dfb9..a1bfb8bc5ec6620a27d9e8da2cf87a9d885ee049 100644 (file)
@@ -61,10 +61,21 @@ class Arvados::V1::GroupsController < ApplicationController
     request_orders = @orders.clone
     @orders = []
 
-    [Group,
-     Job, PipelineInstance, PipelineTemplate,
+    request_filters = @filters
+
+    klasses = [Group,
+     Job, PipelineInstance, PipelineTemplate, ContainerRequest,
      Collection,
-     Human, Specimen, Trait].each do |klass|
+     Human, Specimen, Trait]
+
+    table_names = klasses.map(&:table_name)
+    request_filters.each do |col, op, val|
+      if col.index('.') && !table_names.include?(col.split('.', 2)[0])
+        raise ArgumentError.new("Invalid attribute '#{col}' in filter")
+      end
+    end
+
+    klasses.each do |klass|
       # If the currently requested orders specifically match the
       # table_name for the current klass, apply that order.
       # Otherwise, order by recency.
@@ -81,6 +92,16 @@ class Arvados::V1::GroupsController < ApplicationController
         where_conds[:group_class] = "project"
       end
 
+      @filters = request_filters.map do |col, op, val|
+        if !col.index('.')
+          [col, op, val]
+        elsif (col = col.split('.', 2))[0] == klass.table_name
+          [col[1], op, val]
+        else
+          nil
+        end
+      end.compact
+
       @objects = klass.readable_by(*@read_users).
         order(request_order).where(where_conds)
       @limit = limit_all - all_objects.count
index aaeebdccf0cd52e8d0fb38aa38ce9d2bf436b626..e7f2bb13108e327d533a673caae31bdb8e7f09e6 100644 (file)
@@ -93,8 +93,8 @@ class EventBus
     begin
       # Must have at least one filter set up to receive events
       if ws.filters.length > 0
-        # Start with log rows readable by user, sorted in ascending order
-        logs = Log.readable_by(ws.user).order("id asc")
+        # Start with log rows readable by user
+        logs = Log.readable_by(ws.user)
 
         cond_id = nil
         cond_out = []
@@ -132,11 +132,21 @@ class EventBus
           logs = logs.where(cond_id, *param_out)
         end
 
-        # Execute query and actually send the matching log rows
-        logs.each do |l|
+        # Execute query and actually send the matching log rows. Load
+        # the full log records only when we're ready to send them,
+        # though: otherwise, (1) postgres has to build the whole
+        # result set and return it to us before we can send the first
+        # event, and (2) we store lots of records in memory while
+        # waiting to spool them out to the client. Both of these are
+        # troublesome when log records are large (e.g., a collection
+        # update contains both old and new manifest_text).
+        #
+        # Note: find_each implies order('id asc'), which is what we
+        # want.
+        logs.select(:id).find_each do |l|
           if not ws.sent_ids.include?(l.id)
             # only send if not a duplicate
-            ws.send(l.as_api_response.to_json)
+            ws.send(Log.find(l.id).as_api_response.to_json)
           end
           if not ws.last_log_id.nil?
             # record ids only when sending "catchup" messages, not notifies
index 1e3d773550579b03a188d2ea129928cd457cf291..04746d3abb9bcb281707b3a88fa948458a60f938 100644 (file)
@@ -110,3 +110,36 @@ cr_for_requester2:
   output_path: test
   command: ["echo", "hello"]
   requesting_container_uuid: zzzzz-dz642-requestercntnr1
+
+running_anonymous_accessible:
+  uuid: zzzzz-xvhdp-runninganonaccs
+  owner_uuid: zzzzz-j7d0g-zhxawtyetzwc5f0
+  name: running anonymously accessible cr
+  state: Committed
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  modified_at: 2016-01-11 11:11:11.111111111 Z
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  container_image: test
+  cwd: test
+  output_path: test
+  command: ["echo", "hello"]
+  container_uuid: zzzzz-dz642-runningcontain2
+
+# Test Helper trims the rest of the file
+
+# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
+
+# container requests in project_with_2_pipelines_and_60_crs
+<% for i in 1..60 do %>
+cr_<%=i%>_of_60:
+  uuid: zzzzz-xvhdp-oneof60crs<%= i.to_s.rjust(5, '0') %>
+  created_at: <%= ((i+5)/5).hour.ago.to_s(:db) %>
+  owner_uuid: zzzzz-j7d0g-nnncrspipelines
+  name: cr-<%= i.to_s %>
+  output_path: test
+  command: ["echo", "hello"]
+<% end %>
+
+# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
index 4029846484d41a79acc2443246bae76a8c526fa3..b90a25ced816e2a19fdeb5d70e5b5fa6c4a2f7a7 100644 (file)
@@ -196,15 +196,15 @@ project_with_10_pipelines:
   description: project with 10 pipelines
   group_class: project
 
-project_with_2_pipelines_and_60_jobs:
-  uuid: zzzzz-j7d0g-nnjobspipelines
+project_with_2_pipelines_and_60_crs:
+  uuid: zzzzz-j7d0g-nnncrspipelines
   owner_uuid: zzzzz-tpzed-user1withloadab
   created_at: 2014-04-21 15:37:48 -0400
   modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
   modified_by_user_uuid: zzzzz-tpzed-user1withloadab
   modified_at: 2014-04-21 15:37:48 -0400
   updated_at: 2014-04-21 15:37:48 -0400
-  name: project with 2 pipelines and 60 jobs
+  name: project with 2 pipelines and 60 crs
   description: This will result in two pages in the display
   group_class: project
 
index d0c22d305954a2e832d3e8c4dac43725a982db26..95cb967ffc1a9c71b418e2949b063ed9f419d4c1 100644 (file)
@@ -527,19 +527,3 @@ running_job_with_components:
   components:
     component1: zzzzz-8i9sb-jyq01m7in1jlofj
     component2: zzzzz-d1hrv-partdonepipelin
-
-# Test Helper trims the rest of the file
-
-# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
-
-# jobs in project_with_2_pipelines_and_60_jobs
-<% for i in 1..60 do %>
-job_<%=i%>_of_60:
-  uuid: zzzzz-8i9sb-oneof100jobs<%= i.to_s.rjust(3, '0') %>
-  created_at: <%= ((i+5)/5).minute.ago.to_s(:db) %>
-  owner_uuid: zzzzz-j7d0g-nnjobspipelines
-  script_version: 7def43a4d3f20789dda4700f703b5514cc3ed250
-  state: Complete
-<% end %>
-
-# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
index 04a200ddb08d38304926d8babeafe181f7d1752e..34dbe9603bcc9c53ec6fd15f7c78a4a082643dfc 100644 (file)
@@ -445,13 +445,13 @@ pipeline_<%=i%>_of_10:
           title: foo instance input
 <% end %>
 
-# pipelines in project_with_2_pipelines_and_100_jobs
+# pipelines in project_with_2_pipelines_and_60_crs
 <% for i in 1..2 do %>
-pipeline_<%=i%>_of_2_pipelines_and_100_jobs:
+pipeline_<%=i%>_of_2_pipelines_and_60_crs:
   name: pipeline_<%= i %>
   state: New
   uuid: zzzzz-d1hrv-abcgneyn6brx<%= i.to_s.rjust(3, '0') %>
-  owner_uuid: zzzzz-j7d0g-nnjobspipelines
+  owner_uuid: zzzzz-j7d0g-nnncrspipelines
   created_at: <%= i.minute.ago.to_s(:db) %>
   components:
     foo:
index 00846795b4d7f7501964d0b888ba87739ce6c9d7..10534a70610a8188d35863992f2810ac29195937 100644 (file)
@@ -423,4 +423,29 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
     end
     assert_equal true, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
   end
+
+  [
+    [['owner_uuid', '!=', 'zzzzz-tpzed-xurymjxw79nv3jz'], 200,
+        'zzzzz-d1hrv-subprojpipeline', 'zzzzz-d1hrv-1xfj6xkicf2muk2'],
+    [["pipeline_instances.state", "not in", ["Complete", "Failed"]], 200,
+        'zzzzz-d1hrv-1xfj6xkicf2muk2', 'zzzzz-d1hrv-i3e77t9z5y8j9cc'],
+    [['container_requests.requesting_container_uuid', '=', nil], 200,
+        'zzzzz-xvhdp-cr4queuedcontnr', 'zzzzz-xvhdp-cr4requestercn2'],
+    [['container_requests.no_such_column', '=', nil], 422],
+    [['container_requests.', '=', nil], 422],
+    [['.requesting_container_uuid', '=', nil], 422],
+    [['no_such_table.uuid', '!=', 'zzzzz-tpzed-xurymjxw79nv3jz'], 422],
+  ].each do |filter, expect_code, expect_uuid, not_expect_uuid|
+    test "get contents with '#{filter}' filter" do
+      authorize_with :active
+      get :contents, filters: [filter], format: :json
+      assert_response expect_code
+      if expect_code == 200
+        assert_not_empty json_response['items']
+        item_uuids = json_response['items'].collect {|item| item['uuid']}
+        assert_includes(item_uuids, expect_uuid)
+        assert_not_includes(item_uuids, not_expect_uuid)
+      end
+    end
+  end
 end
index 98ae103d1a464fbb550ba46f8c3669736eae3981..0c99fcc4e646f0a4228944d81a2cc6e6c9cc0ba0 100644 (file)
@@ -23,9 +23,9 @@ class WebsocketTest < ActionDispatch::IntegrationTest
 
     EM.run {
       if token
-        ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket?api_token=#{api_client_authorizations(token).api_token}")
+        ws = Faye::WebSocket::Client.new("ws://localhost:#{WEBSOCKET_PORT}/websocket?api_token=#{api_client_authorizations(token).api_token}")
       else
-        ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket")
+        ws = Faye::WebSocket::Client.new("ws://localhost:#{WEBSOCKET_PORT}/websocket")
       end
 
       ws.on :open do |event|
index 65af8ce2bd9a732e1e7d2ace6772f618670cc5dc..be32a0f299d0b396b867c0ff9943fc3234da7ce3 100644 (file)
@@ -1,7 +1,12 @@
 require 'bundler'
+require 'socket'
 
 $ARV_API_SERVER_DIR = File.expand_path('../..', __FILE__)
-SERVER_PID_PATH = 'tmp/pids/passenger.3002.pid'
+
+s = TCPServer.new('0.0.0.0', 0)
+WEBSOCKET_PORT = s.addr[1]
+s.close
+SERVER_PID_PATH = "tmp/pids/passenger.#{WEBSOCKET_PORT}.pid"
 
 class WebsocketTestRunner < MiniTest::Unit
   def _system(*cmd)
@@ -15,7 +20,7 @@ class WebsocketTestRunner < MiniTest::Unit
   def _run(args=[])
     server_pid = Dir.chdir($ARV_API_SERVER_DIR) do |apidir|
       # Only passenger seems to be able to run the websockets server successfully.
-      _system('passenger', 'start', '-d', '-p3002')
+      _system('passenger', 'start', '-d', "-p#{WEBSOCKET_PORT}")
       timeout = Time.now.tv_sec + 10
       begin
         sleep 0.2
@@ -35,7 +40,7 @@ class WebsocketTestRunner < MiniTest::Unit
       super(args)
     ensure
       Dir.chdir($ARV_API_SERVER_DIR) do
-        _system('passenger', 'stop', '-p3002')
+        _system('passenger', 'stop', "-p#{WEBSOCKET_PORT}")
       end
       # DatabaseCleaner leaves the database empty. Prefer to leave it full.
       dc = DatabaseController.new
index 936a9088ed0c3d3affe6c3e0f9555d9e230d0c99..0ca765185119c152dd11870641c15f905042311e 100644 (file)
@@ -168,7 +168,7 @@ func run(dispatcher *dispatch.Dispatcher,
        }
 
        // drain any subsequent status changes
-       for _ = range status {
+       for range status {
        }
 
        log.Printf("Finalized container %v", uuid)
index 9628bf2f0aac3beb8ccc58768d1498fc3371a9a2..17f9d671a7fdcb084005ea34cdd6c134cd2e3524 100644 (file)
@@ -88,7 +88,7 @@ func (s *TestSuite) TestIntegration(c *C) {
 
        // There should be no queued containers now
        params := arvadosclient.Dict{
-               "filters": [][]string{[]string{"state", "=", "Queued"}},
+               "filters": [][]string{{"state", "=", "Queued"}},
        }
        var containers arvados.ContainerList
        err = arv.List("containers", params, &containers)
index 4bfff6a5f0ccfe15a5a5e452f4536c01693df976..46df5281ef6a81ceb1deabf371c1a8bf7b5c7fe2 100644 (file)
@@ -72,7 +72,7 @@ func doMain() error {
 // sbatchCmd
 func sbatchFunc(container arvados.Container) *exec.Cmd {
        memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
-       return exec.Command("sbatch", "--share", "--parsable",
+       return exec.Command("sbatch", "--share",
                fmt.Sprintf("--job-name=%s", container.UUID),
                fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
                fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs),
@@ -90,9 +90,7 @@ var scancelCmd = scancelFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher,
-       container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) {
-       submitErr = nil
-
+       container arvados.Container, crunchRunCommand string) (submitErr error) {
        defer func() {
                // If we didn't get as far as submitting a slurm job,
                // unlock the container and return it to the queue.
@@ -171,9 +169,7 @@ func submit(dispatcher *dispatch.Dispatcher,
                return
        }
 
-       // If everything worked out, got the jobid on stdout
-       jobid = strings.TrimSpace(string(stdoutMsg))
-
+       log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
        return
 }
 
@@ -194,7 +190,7 @@ func monitorSubmitOrCancel(dispatcher *dispatch.Dispatcher, container arvados.Co
 
                        log.Printf("About to submit queued container %v", container.UUID)
 
-                       if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+                       if err := submit(dispatcher, container, *crunchRunCommand); err != nil {
                                log.Printf("Error submitting container %s to slurm: %v",
                                        container.UUID, err)
                                // maybe sbatch is broken, put it back to queued
index b72ad9fa9dea802bd30a9aa70d84c817493cec0f..ede767c82add9eb1dc9fa75f8bbb6ef917cdf0ac 100644 (file)
@@ -95,7 +95,7 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
 }
 
 func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-       container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
+       container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share",
                fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                fmt.Sprintf("--mem-per-cpu=%d", 2862),
                fmt.Sprintf("--cpus-per-task=%d", 4),
@@ -136,7 +136,7 @@ func (s *TestSuite) integrationTest(c *C,
 
        // There should be no queued containers now
        params := arvadosclient.Dict{
-               "filters": [][]string{[]string{"state", "=", "Queued"}},
+               "filters": [][]string{{"state", "=", "Queued"}},
        }
        var containers arvados.ContainerList
        err = arv.List("containers", params, &containers)
index 7da1beb20a4d5e4986eec2f8643d1ae99edeea2f..32d524abca2f59689e56efe59b526d9da8f37181 100644 (file)
@@ -5,6 +5,7 @@ import (
        "errors"
        "flag"
        "fmt"
+       "git.curoverse.com/arvados.git/lib/crunchstat"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -91,6 +92,12 @@ type ContainerRunner struct {
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
+
+       statLogger   io.WriteCloser
+       statReporter *crunchstat.Reporter
+       statInterval time.Duration
+       cgroupRoot   string
+       cgroupParent string
 }
 
 // SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -102,7 +109,7 @@ func (runner *ContainerRunner) SetupSignals() {
        signal.Notify(runner.SigChan, syscall.SIGQUIT)
 
        go func(sig <-chan os.Signal) {
-               for _ = range sig {
+               for range sig {
                        if !runner.Cancelled {
                                runner.CancelLock.Lock()
                                runner.Cancelled = true
@@ -366,6 +373,14 @@ func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
                                runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
                        }
 
+                       if runner.statReporter != nil {
+                               runner.statReporter.Stop()
+                               closeerr = runner.statLogger.Close()
+                               if closeerr != nil {
+                                       runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
+                               }
+                       }
+
                        runner.loggingDone <- true
                        close(runner.loggingDone)
                        return
@@ -373,6 +388,18 @@ func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
        }
 }
 
+func (runner *ContainerRunner) StartCrunchstat() {
+       runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
+       runner.statReporter = &crunchstat.Reporter{
+               CID:          runner.ContainerID,
+               Logger:       log.New(runner.statLogger, "", 0),
+               CgroupParent: runner.cgroupParent,
+               CgroupRoot:   runner.cgroupRoot,
+               PollPeriod:   runner.statInterval,
+       }
+       runner.statReporter.Start()
+}
+
 // AttachLogs connects the docker container stdout and stderr logs to the
 // Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
@@ -752,6 +779,8 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
+       runner.StartCrunchstat()
+
        if runner.IsCancelled() {
                return
        }
@@ -792,6 +821,9 @@ func NewContainerRunner(api IArvadosClient,
 }
 
 func main() {
+       statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
+       cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+       cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup")
        flag.Parse()
 
        containerId := flag.Arg(0)
@@ -816,6 +848,9 @@ func main() {
        }
 
        cr := NewContainerRunner(api, kc, docker, containerId)
+       cr.statInterval = *statInterval
+       cr.cgroupRoot = *cgroupRoot
+       cr.cgroupParent = *cgroupParent
 
        err = cr.Run()
        if err != nil {
index 9880230ce8785a3d61d4c836b52f1091b65274dc..d95ff086312c4552e534f9e0ca9803e85e053231 100644 (file)
@@ -14,7 +14,6 @@ import (
        . "gopkg.in/check.v1"
        "io"
        "io/ioutil"
-       "log"
        "os"
        "os/exec"
        "sort"
@@ -139,7 +138,7 @@ func (client *ArvTestClient) Create(resourceType string,
        client.Mutex.Lock()
        defer client.Mutex.Unlock()
 
-       client.Calls += 1
+       client.Calls++
        client.Content = append(client.Content, parameters)
 
        if resourceType == "logs" {
@@ -192,7 +191,7 @@ func (client *ArvTestClient) Get(resourceType string, uuid string, parameters ar
 func (client *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
        client.Mutex.Lock()
        defer client.Mutex.Unlock()
-       client.Calls += 1
+       client.Calls++
        client.Content = append(client.Content, parameters)
        if resourceType == "containers" {
                if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
@@ -206,7 +205,7 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
 // parameters match jpath/string. E.g., CalledWith(c, "foo.bar",
 // "baz") returns parameters with parameters["foo"]["bar"]=="baz". If
 // no call matches, it returns nil.
-func (client *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict {
+func (client *ArvTestClient) CalledWith(jpath string, expect interface{}) arvadosclient.Dict {
 call:
        for _, content := range client.Content {
                var v interface{} = content
@@ -217,7 +216,7 @@ call:
                                v = dict[k]
                        }
                }
-               if v, ok := v.(string); ok && v == expect {
+               if v == expect {
                        return content
                }
        }
@@ -518,6 +517,7 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT
 
        api = &ArvTestClient{Container: rec}
        cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr.statInterval = 100 * time.Millisecond
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
 
@@ -553,14 +553,45 @@ func (s *TestSuite) TestFullRunHello(c *C) {
                t.finish <- dockerclient.WaitResult{}
        })
 
-       c.Check(api.Calls, Equals, 7)
-       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
 
 }
 
+func (s *TestSuite) TestCrunchstat(c *C) {
+       api, _ := FullRunHelper(c, `{
+               "command": ["sleep", "1"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": ".",
+               "environment": {},
+               "mounts": {"/tmp": {"kind": "tmp"} },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`, func(t *TestDockerClient) {
+               time.Sleep(time.Second)
+               t.logWriter.Close()
+               t.finish <- dockerclient.WaitResult{}
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+
+       // We didn't actually start a container, so crunchstat didn't
+       // find accounting files and therefore didn't log any stats.
+       // It should have logged a "can't find accounting files"
+       // message after one poll interval, though, so we can confirm
+       // it's alive:
+       c.Assert(api.Logs["crunchstat"], NotNil)
+       c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files have not appeared after 100ms.*`)
+
+       // The "files never appeared" log assures us that we called
+       // (*crunchstat.Reporter)Stop(), and that we set it up with
+       // the correct container ID "abcde":
+       c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files never appeared for abcde\n`)
+}
+
 func (s *TestSuite) TestFullRunStderr(c *C) {
        api, _ := FullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
@@ -578,10 +609,10 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 1}
        })
 
-       c.Assert(api.Calls, Equals, 8)
-       c.Check(api.Content[7]["container"].(arvadosclient.Dict)["log"], NotNil)
-       c.Check(api.Content[7]["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
-       c.Check(api.Content[7]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+       final := api.CalledWith("container.state", "Complete")
+       c.Assert(final, NotNil)
+       c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+       c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
        c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
@@ -603,12 +634,9 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Calls, Equals, 7)
-       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
-       log.Print(api.Logs["stdout"].String())
-
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       c.Log(api.Logs["stdout"])
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
 }
 
@@ -628,10 +656,8 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Calls, Equals, 7)
-       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
 }
 
@@ -682,9 +708,8 @@ func (s *TestSuite) TestCancel(c *C) {
                }
        }
 
-       c.Assert(api.Calls, Equals, 6)
-       c.Check(api.Content[5]["container"].(arvadosclient.Dict)["log"], IsNil)
-       c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
+       c.Check(api.CalledWith("container.log", nil), NotNil)
+       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
 
 }
@@ -705,10 +730,8 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Calls, Equals, 7)
-       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
 }
 
@@ -787,16 +810,16 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 }
 
 func (s *TestSuite) TestStdout(c *C) {
-       helperRecord := `{`
-       helperRecord += `"command": ["/bin/sh", "-c", "echo $FROBIZ"],`
-       helperRecord += `"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",`
-       helperRecord += `"cwd": "/bin",`
-       helperRecord += `"environment": {"FROBIZ": "bilbo"},`
-       helperRecord += `"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },`
-       helperRecord += `"output_path": "/tmp",`
-       helperRecord += `"priority": 1,`
-       helperRecord += `"runtime_constraints": {}`
-       helperRecord += `}`
+       helperRecord := `{
+               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": "/bin",
+               "environment": {"FROBIZ": "bilbo"},
+               "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`
 
        api, _ := FullRunHelper(c, helperRecord, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
@@ -804,10 +827,9 @@ func (s *TestSuite) TestStdout(c *C) {
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Assert(api.Calls, Equals, 6)
-       c.Check(api.Content[5]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
-       c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-       c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), Not(IsNil))
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
 }
 
 // Used by the TestStdoutWithWrongPath*()
index 20928dbef769b0d4dd419ec0f8693541c93ba369..db9d101b581b560314374792421c134f918c47b0 100644 (file)
@@ -38,17 +38,17 @@ type ThrottledLogger struct {
        Immediate *log.Logger
 }
 
-// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
-// because the RFC3339Nano format isn't fixed width.
-const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
+// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
+const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
 
-// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
-func RFC3339Timestamp(now time.Time) string {
-       return now.Format(RFC3339Fixed)
+// RFC3339Timestamp formats t as RFC3339NanoFixed.
+func RFC3339Timestamp(t time.Time) string {
+       return t.Format(RFC3339NanoFixed)
 }
 
-// Write to the internal buffer.  Prepend a timestamp to each line of the input
-// data.
+// Write prepends a timestamp to each line of the input data and
+// appends to the internal buffer. Each line is also logged to
+// tl.Immediate, if tl.Immediate is not nil.
 func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
        tl.Mutex.Lock()
        if tl.buf == nil {
@@ -58,13 +58,20 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
 
        now := tl.Timestamper(time.Now().UTC())
        sc := bufio.NewScanner(bytes.NewBuffer(p))
-       for sc.Scan() {
-               _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
+       for err == nil && sc.Scan() {
+               out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
                if tl.Immediate != nil {
-                       tl.Immediate.Printf("%s %s\n", now, sc.Text())
+                       tl.Immediate.Print(out[:len(out)-1])
                }
+               _, err = io.WriteString(tl.buf, out)
        }
-       return len(p), err
+       if err == nil {
+               err = sc.Err()
+               if err == nil {
+                       n = len(p)
+               }
+       }
+       return
 }
 
 // Periodically check the current buffer; if not empty, send it on the
@@ -158,17 +165,18 @@ func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool) {
 // (b) batches log messages and only calls the underlying Writer at most once
 // per second.
 func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
-       alw := &ThrottledLogger{}
-       alw.flusherDone = make(chan bool)
-       alw.writer = writer
-       alw.Logger = log.New(alw, "", 0)
-       alw.Timestamper = RFC3339Timestamp
-       go alw.flusher()
-       return alw
+       tl := &ThrottledLogger{}
+       tl.flusherDone = make(chan bool)
+       tl.writer = writer
+       tl.Logger = log.New(tl, "", 0)
+       tl.Timestamper = RFC3339Timestamp
+       go tl.flusher()
+       return tl
 }
 
-// ArvLogWriter implements a writer that writes to each of a WriteCloser
-// (typically CollectionFileWriter) and creates an API server log entry.
+// ArvLogWriter is an io.WriteCloser that processes each write by
+// writing it through to another io.WriteCloser (typically a
+// CollectionFileWriter) and creating an Arvados log entry.
 type ArvLogWriter struct {
        ArvClient     IArvadosClient
        UUID          string
index bb3123a1025a810f0165219967161b6977d8f889..ceb8ca87b00ba25a0a9dc1ac2f2f6ca591cdd0b8 100644 (file)
@@ -16,7 +16,11 @@ type TestTimestamper struct {
 
 func (this *TestTimestamper) Timestamp(t time.Time) string {
        this.count += 1
-       return fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count)
+       t, err := time.ParseInLocation(time.RFC3339Nano, fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count), t.Location())
+       if err != nil {
+               panic(err)
+       }
+       return RFC3339Timestamp(t)
 }
 
 // Gocheck boilerplate
index 6bce3258d9857808f24e677ce5374f0f2de61a23..cae95fdd9d6cfd30110764e4ea7c87188c0ed6aa 100644 (file)
@@ -2,485 +2,122 @@ package main
 
 import (
        "bufio"
-       "bytes"
-       "errors"
        "flag"
-       "fmt"
        "io"
-       "io/ioutil"
        "log"
        "os"
        "os/exec"
        "os/signal"
-       "strconv"
-       "strings"
        "syscall"
        "time"
-)
 
-/*
-#include <unistd.h>
-#include <sys/types.h>
-#include <pwd.h>
-#include <stdlib.h>
-*/
-import "C"
+       "git.curoverse.com/arvados.git/lib/crunchstat"
+)
 
-// The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
+const MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
 
-type Cgroup struct {
-       root   string
-       parent string
-       cid    string
-}
+func main() {
+       reporter := crunchstat.Reporter{
+               Logger: log.New(os.Stderr, "crunchstat: ", 0),
+       }
 
-var childLog = log.New(os.Stderr, "", 0)
-var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+       flag.StringVar(&reporter.CgroupRoot, "cgroup-root", "", "Root of cgroup tree")
+       flag.StringVar(&reporter.CgroupParent, "cgroup-parent", "", "Name of container parent under cgroup")
+       flag.StringVar(&reporter.CIDFile, "cgroup-cid", "", "Path to container id file")
+       pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds")
 
-const (
-       MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
-)
+       flag.Parse()
 
-func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
-       reader := bufio.NewReaderSize(in, MaxLogLine)
-       var prefix string
-       for {
-               line, isPrefix, err := reader.ReadLine()
-               if err == io.EOF {
-                       break
-               } else if err != nil {
-                       statLog.Fatal("error reading child stderr:", err)
-               }
-               var suffix string
-               if isPrefix {
-                       suffix = "[...]"
-               }
-               childLog.Print(prefix, string(line), suffix)
-               // Set up prefix for following line
-               if isPrefix {
-                       prefix = "[...]"
-               } else {
-                       prefix = ""
-               }
+       if reporter.CgroupRoot == "" {
+               reporter.Logger.Fatal("error: must provide -cgroup-root")
        }
-       done <- true
-       in.Close()
-}
+       reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
 
-func ReadAllOrWarn(in *os.File) ([]byte, error) {
-       content, err := ioutil.ReadAll(in)
-       if err != nil {
-               statLog.Printf("error reading %s: %s\n", in.Name(), err)
-       }
-       return content, err
-}
+       reporter.Start()
+       err := runCommand(flag.Args(), reporter.Logger)
+       reporter.Stop()
 
-var reportedStatFile = map[string]string{}
+       if err, ok := err.(*exec.ExitError); ok {
+               // The program has exited with an exit code != 0
 
-// Open the cgroup stats file in /sys/fs corresponding to the target
-// cgroup, and return an *os.File. If no stats file is available,
-// return nil.
-//
-// TODO: Instead of trying all options, choose a process in the
-// container, and read /proc/PID/cgroup to determine the appropriate
-// cgroup root for the given statgroup. (This will avoid falling back
-// to host-level stats during container setup and teardown.)
-func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) {
-       var paths []string
-       if cgroup.cid != "" {
-               // Collect container's stats
-               paths = []string{
-                       fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
-                       fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
-               }
-       } else {
-               // Collect this host's stats
-               paths = []string{
-                       fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
-                       fmt.Sprintf("%s/%s", cgroup.root, stat),
-               }
-       }
-       var path string
-       var file *os.File
-       var err error
-       for _, path = range paths {
-               file, err = os.Open(path)
-               if err == nil {
-                       break
+               // This works on both Unix and Windows. Although
+               // package syscall is generally platform dependent,
+               // WaitStatus is defined for both Unix and Windows and
+               // in both cases has an ExitStatus() method with the
+               // same signature.
+               if status, ok := err.Sys().(syscall.WaitStatus); ok {
+                       os.Exit(status.ExitStatus())
                } else {
-                       path = ""
+                       reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
                }
+       } else if err != nil {
+               reporter.Logger.Fatalln("error in cmd.Wait:", err)
        }
-       if pathWas, ok := reportedStatFile[stat]; !ok || pathWas != path {
-               // Log whenever we start using a new/different cgroup
-               // stat file for a given statistic. This typically
-               // happens 1 to 3 times per statistic, depending on
-               // whether we happen to collect stats [a] before any
-               // processes have been created in the container and
-               // [b] after all contained processes have exited.
-               if path == "" {
-                       statLog.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
-               } else if ok {
-                       statLog.Printf("notice: stats moved from %s to %s\n", reportedStatFile[stat], path)
-               } else {
-                       statLog.Printf("notice: reading stats from %s\n", path)
-               }
-               reportedStatFile[stat] = path
-       }
-       return file, err
 }
 
-func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
-       procsFile, err := OpenStatFile(cgroup, "cpuacct", "cgroup.procs")
-       if err != nil {
-               return nil, err
-       }
-       defer procsFile.Close()
-       reader := bufio.NewScanner(procsFile)
-       for reader.Scan() {
-               taskPid := reader.Text()
-               statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
-               stats, err := ioutil.ReadFile(statsFilename)
-               if err != nil {
-                       statLog.Printf("error reading %s: %s\n", statsFilename, err)
-                       continue
-               }
-               return strings.NewReader(string(stats)), nil
-       }
-       return nil, errors.New("Could not read stats for any proc in container")
-}
+func runCommand(argv []string, logger *log.Logger) error {
+       cmd := exec.Command(argv[0], argv[1:]...)
 
-type IoSample struct {
-       sampleTime time.Time
-       txBytes    int64
-       rxBytes    int64
-}
+       logger.Println("Running", argv)
 
-func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
-       c, err := OpenStatFile(cgroup, "blkio", "blkio.io_service_bytes")
-       if err != nil {
-               return
-       }
-       defer c.Close()
-       b := bufio.NewScanner(c)
-       var sampleTime = time.Now()
-       newSamples := make(map[string]IoSample)
-       for b.Scan() {
-               var device, op string
-               var val int64
-               if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
-                       continue
-               }
-               var thisSample IoSample
-               var ok bool
-               if thisSample, ok = newSamples[device]; !ok {
-                       thisSample = IoSample{sampleTime, -1, -1}
-               }
-               switch op {
-               case "Read":
-                       thisSample.rxBytes = val
-               case "Write":
-                       thisSample.txBytes = val
-               }
-               newSamples[device] = thisSample
-       }
-       for dev, sample := range newSamples {
-               if sample.txBytes < 0 || sample.rxBytes < 0 {
-                       continue
-               }
-               delta := ""
-               if prev, ok := lastSample[dev]; ok {
-                       delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
-                               sample.sampleTime.Sub(prev.sampleTime).Seconds(),
-                               sample.txBytes-prev.txBytes,
-                               sample.rxBytes-prev.rxBytes)
-               }
-               statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
-               lastSample[dev] = sample
-       }
-}
-
-type MemSample struct {
-       sampleTime time.Time
-       memStat    map[string]int64
-}
+       // Child process will use our stdin and stdout pipes
+       // (we close our copies below)
+       cmd.Stdin = os.Stdin
+       cmd.Stdout = os.Stdout
 
-func DoMemoryStats(cgroup Cgroup) {
-       c, err := OpenStatFile(cgroup, "memory", "memory.stat")
-       if err != nil {
-               return
-       }
-       defer c.Close()
-       b := bufio.NewScanner(c)
-       thisSample := MemSample{time.Now(), make(map[string]int64)}
-       wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
-       for b.Scan() {
-               var stat string
-               var val int64
-               if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
-                       continue
-               }
-               thisSample.memStat[stat] = val
-       }
-       var outstat bytes.Buffer
-       for _, key := range wantStats {
-               if val, ok := thisSample.memStat[key]; ok {
-                       outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+       // Forward SIGINT and SIGTERM to child process
+       sigChan := make(chan os.Signal, 1)
+       go func(sig <-chan os.Signal) {
+               catch := <-sig
+               if cmd.Process != nil {
+                       cmd.Process.Signal(catch)
                }
-       }
-       statLog.Printf("mem%s\n", outstat.String())
-}
+               logger.Println("notice: caught signal:", catch)
+       }(sigChan)
+       signal.Notify(sigChan, syscall.SIGTERM)
+       signal.Notify(sigChan, syscall.SIGINT)
 
-func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
-       sampleTime := time.Now()
-       stats, err := GetContainerNetStats(cgroup)
+       // Funnel stderr through our channel
+       stderr_pipe, err := cmd.StderrPipe()
        if err != nil {
-               return
+               logger.Fatalln("error in StderrPipe:", err)
        }
 
-       scanner := bufio.NewScanner(stats)
-       for scanner.Scan() {
-               var ifName string
-               var rx, tx int64
-               words := strings.Fields(scanner.Text())
-               if len(words) != 17 {
-                       // Skip lines with wrong format
-                       continue
-               }
-               ifName = strings.TrimRight(words[0], ":")
-               if ifName == "lo" || ifName == "" {
-                       // Skip loopback interface and lines with wrong format
-                       continue
-               }
-               if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
-                       continue
-               }
-               if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
-                       continue
-               }
-               nextSample := IoSample{}
-               nextSample.sampleTime = sampleTime
-               nextSample.txBytes = tx
-               nextSample.rxBytes = rx
-               var delta string
-               if prev, ok := lastSample[ifName]; ok {
-                       interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
-                       delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
-                               interval,
-                               tx-prev.txBytes,
-                               rx-prev.rxBytes)
-               }
-               statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
-               lastSample[ifName] = nextSample
+       // Run subprocess
+       if err := cmd.Start(); err != nil {
+               logger.Fatalln("error in cmd.Start:", err)
        }
-}
 
-type CpuSample struct {
-       hasData    bool // to distinguish the zero value from real data
-       sampleTime time.Time
-       user       float64
-       sys        float64
-       cpus       int64
-}
+       // Close stdin/stdout in this (parent) process
+       os.Stdin.Close()
+       os.Stdout.Close()
 
-// Return the number of CPUs available in the container. Return 0 if
-// we can't figure out the real number of CPUs.
-func GetCpuCount(cgroup Cgroup) int64 {
-       cpusetFile, err := OpenStatFile(cgroup, "cpuset", "cpuset.cpus")
-       if err != nil {
-               return 0
-       }
-       defer cpusetFile.Close()
-       b, err := ReadAllOrWarn(cpusetFile)
-       sp := strings.Split(string(b), ",")
-       cpus := int64(0)
-       for _, v := range sp {
-               var min, max int64
-               n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
-               if n == 2 {
-                       cpus += (max - min) + 1
-               } else {
-                       cpus += 1
-               }
-       }
-       return cpus
-}
+       copyPipeToChildLog(stderr_pipe, log.New(os.Stderr, "", 0))
 
-func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
-       statFile, err := OpenStatFile(cgroup, "cpuacct", "cpuacct.stat")
-       if err != nil {
-               return
-       }
-       defer statFile.Close()
-       b, err := ReadAllOrWarn(statFile)
-       if err != nil {
-               return
-       }
-
-       nextSample := CpuSample{true, time.Now(), 0, 0, GetCpuCount(cgroup)}
-       var userTicks, sysTicks int64
-       fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
-       user_hz := float64(C.sysconf(C._SC_CLK_TCK))
-       nextSample.user = float64(userTicks) / user_hz
-       nextSample.sys = float64(sysTicks) / user_hz
-
-       delta := ""
-       if lastSample.hasData {
-               delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
-                       nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds(),
-                       nextSample.user-lastSample.user,
-                       nextSample.sys-lastSample.sys)
-       }
-       statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
-               nextSample.user, nextSample.sys, nextSample.cpus, delta)
-       *lastSample = nextSample
+       return cmd.Wait()
 }
 
-func PollCgroupStats(cgroup Cgroup, poll int64, stop_poll_chan <-chan bool) {
-       var lastNetSample = map[string]IoSample{}
-       var lastDiskSample = map[string]IoSample{}
-       var lastCpuSample = CpuSample{}
-
-       poll_chan := make(chan bool, 1)
-       go func() {
-               // Send periodic poll events.
-               poll_chan <- true
-               for {
-                       time.Sleep(time.Duration(poll) * time.Millisecond)
-                       poll_chan <- true
-               }
-       }()
+func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+       reader := bufio.NewReaderSize(in, MaxLogLine)
+       var prefix string
        for {
-               select {
-               case <-stop_poll_chan:
-                       return
-               case <-poll_chan:
-                       // Emit stats, then select again.
-               }
-               DoMemoryStats(cgroup)
-               DoCpuStats(cgroup, &lastCpuSample)
-               DoBlkIoStats(cgroup, lastDiskSample)
-               DoNetworkStats(cgroup, lastNetSample)
-       }
-}
-
-func run(logger *log.Logger) error {
-
-       var (
-               cgroup_root    string
-               cgroup_parent  string
-               cgroup_cidfile string
-               wait           int64
-               poll           int64
-       )
-
-       flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
-       flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
-       flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
-       flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
-       flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
-
-       flag.Parse()
-
-       if cgroup_root == "" {
-               statLog.Fatal("error: must provide -cgroup-root")
-       }
-
-       finish_chan := make(chan bool)
-       defer close(finish_chan)
-
-       var cmd *exec.Cmd
-
-       if len(flag.Args()) > 0 {
-               // Set up subprocess
-               cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
-
-               childLog.Println("Running", flag.Args())
-
-               // Child process will use our stdin and stdout pipes
-               // (we close our copies below)
-               cmd.Stdin = os.Stdin
-               cmd.Stdout = os.Stdout
-
-               // Forward SIGINT and SIGTERM to inner process
-               sigChan := make(chan os.Signal, 1)
-               go func(sig <-chan os.Signal) {
-                       catch := <-sig
-                       if cmd.Process != nil {
-                               cmd.Process.Signal(catch)
-                       }
-                       statLog.Println("notice: caught signal:", catch)
-               }(sigChan)
-               signal.Notify(sigChan, syscall.SIGTERM)
-               signal.Notify(sigChan, syscall.SIGINT)
-
-               // Funnel stderr through our channel
-               stderr_pipe, err := cmd.StderrPipe()
-               if err != nil {
-                       statLog.Fatalln("error in StderrPipe:", err)
-               }
-               go CopyPipeToChildLog(stderr_pipe, finish_chan)
-
-               // Run subprocess
-               if err := cmd.Start(); err != nil {
-                       statLog.Fatalln("error in cmd.Start:", err)
-               }
-
-               // Close stdin/stdout in this (parent) process
-               os.Stdin.Close()
-               os.Stdout.Close()
-       }
-
-       // Read the cid file
-       var container_id string
-       if cgroup_cidfile != "" {
-               // wait up to 'wait' seconds for the cid file to appear
-               ok := false
-               var i time.Duration
-               for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
-                       cid, err := ioutil.ReadFile(cgroup_cidfile)
-                       if err == nil && len(cid) > 0 {
-                               ok = true
-                               container_id = string(cid)
-                               break
-                       }
-                       time.Sleep(100 * time.Millisecond)
+               line, isPrefix, err := reader.ReadLine()
+               if err == io.EOF {
+                       break
+               } else if err != nil {
+                       logger.Fatal("error reading child stderr:", err)
                }
-               if !ok {
-                       statLog.Println("error reading cid file:", cgroup_cidfile)
+               var suffix string
+               if isPrefix {
+                       suffix = "[...]"
                }
-       }
-
-       stop_poll_chan := make(chan bool, 1)
-       cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
-       go PollCgroupStats(cgroup, poll, stop_poll_chan)
-
-       // When the child exits, tell the polling goroutine to stop.
-       defer func() { stop_poll_chan <- true }()
-
-       // Wait for CopyPipeToChan to consume child's stderr pipe
-       <-finish_chan
-
-       return cmd.Wait()
-}
-
-func main() {
-       logger := log.New(os.Stderr, "crunchstat: ", 0)
-       if err := run(logger); err != nil {
-               if exiterr, ok := err.(*exec.ExitError); ok {
-                       // The program has exited with an exit code != 0
-
-                       // This works on both Unix and
-                       // Windows. Although package syscall is
-                       // generally platform dependent, WaitStatus is
-                       // defined for both Unix and Windows and in
-                       // both cases has an ExitStatus() method with
-                       // the same signature.
-                       if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
-                               os.Exit(status.ExitStatus())
-                       }
+               logger.Print(prefix, string(line), suffix)
+               // Set up prefix for following line
+               if isPrefix {
+                       prefix = "[...]"
                } else {
-                       statLog.Fatalln("error in cmd.Wait:", err)
+                       prefix = ""
                }
        }
+       in.Close()
 }
index 69f31afbc9589ce6cd6c9de2a731d5093e2c80cd..fe3b56d25876fd832d3596abe3db8e40852ebbf7 100644 (file)
@@ -6,56 +6,21 @@ import (
        "io"
        "log"
        "math/rand"
-       "os"
-       "regexp"
        "testing"
        "time"
 )
 
-func TestReadAllOrWarnFail(t *testing.T) {
-       rcv := captureLogs()
-       defer uncaptureLogs()
-       go func() {
-               // The special file /proc/self/mem can be opened for
-               // reading, but reading from byte 0 returns an error.
-               f, err := os.Open("/proc/self/mem")
-               if err != nil {
-                       t.Fatalf("Opening /proc/self/mem: %s", err)
-               }
-               if x, err := ReadAllOrWarn(f); err == nil {
-                       t.Fatalf("Expected error, got %v", x)
-               }
-       }()
-       if msg, err := rcv.ReadBytes('\n'); err != nil {
-               t.Fatal(err)
-       } else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
-               t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
-       }
-}
-
-func TestReadAllOrWarnSuccess(t *testing.T) {
-       f, err := os.Open("./crunchstat_test.go")
-       if err != nil {
-               t.Fatalf("Opening ./crunchstat_test.go: %s", err)
-       }
-       data, err := ReadAllOrWarn(f)
-       if err != nil {
-               t.Fatalf("got error %s", err)
-       }
-       if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
-               t.Fatalf("data failed regexp: %s", err)
-       }
-}
-
 // Test that CopyPipeToChildLog works even on lines longer than
 // bufio.MaxScanTokenSize.
 func TestCopyPipeToChildLogLongLines(t *testing.T) {
-       rcv := captureLogs()
-       defer uncaptureLogs()
+       logger, logBuf := bufLogger()
 
-       control := make(chan bool)
        pipeIn, pipeOut := io.Pipe()
-       go CopyPipeToChildLog(pipeIn, control)
+       copied := make(chan bool)
+       go func() {
+               copyPipeToChildLog(pipeIn, logger)
+               close(copied)
+       }()
 
        sentBytes := make([]byte, bufio.MaxScanTokenSize+MaxLogLine+(1<<22))
        go func() {
@@ -72,14 +37,14 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
                pipeOut.Close()
        }()
 
-       if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+       if before, err := logBuf.ReadBytes('\n'); err != nil || string(before) != "before\n" {
                t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
        }
 
        var receivedBytes []byte
        done := false
        for !done {
-               line, err := rcv.ReadBytes('\n')
+               line, err := logBuf.ReadBytes('\n')
                if err != nil {
                        t.Fatal(err)
                }
@@ -89,7 +54,7 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
                        }
                        line = line[5:]
                }
-               if len(line) >= 6 && string(line[len(line)-6:len(line)]) == "[...]\n" {
+               if len(line) >= 6 && string(line[len(line)-6:]) == "[...]\n" {
                        line = line[:len(line)-6]
                } else {
                        done = true
@@ -100,27 +65,20 @@ func TestCopyPipeToChildLogLongLines(t *testing.T) {
                t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
        }
 
-       if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+       if after, err := logBuf.ReadBytes('\n'); err != nil || string(after) != "after\n" {
                t.Fatalf("\"after\n\" not received (got \"%s\", %s)", after, err)
        }
 
        select {
        case <-time.After(time.Second):
                t.Fatal("Timeout")
-       case <-control:
+       case <-copied:
                // Done.
        }
 }
 
-func captureLogs() *bufio.Reader {
-       // Send childLog to our bufio reader instead of stderr
-       stderrIn, stderrOut := io.Pipe()
-       childLog = log.New(stderrOut, "", 0)
-       statLog = log.New(stderrOut, "crunchstat: ", 0)
-       return bufio.NewReader(stderrIn)
-}
-
-func uncaptureLogs() {
-       childLog = log.New(os.Stderr, "", 0)
-       statLog = log.New(os.Stderr, "crunchstat: ", 0)
+func bufLogger() (*log.Logger, *bufio.Reader) {
+       r, w := io.Pipe()
+       logger := log.New(w, "", 0)
+       return logger, bufio.NewReader(r)
 }
index 55b3f61c4e5ee32bcff3fab7082fda7334f08be4..5fcacffb7819e2eeee23475801fa98b729eeb969 100644 (file)
@@ -49,11 +49,11 @@ type GetCollectionsParams struct {
 
 // SdkCollectionInfo holds collection info from api
 type SdkCollectionInfo struct {
-       UUID                 string    `json:"uuid"`
-       OwnerUUID            string    `json:"owner_uuid"`
-       ReplicationDesired   int       `json:"replication_desired"`
-       ModifiedAt           time.Time `json:"modified_at"`
-       ManifestText         string    `json:"manifest_text"`
+       UUID               string    `json:"uuid"`
+       OwnerUUID          string    `json:"owner_uuid"`
+       ReplicationDesired int       `json:"replication_desired"`
+       ModifiedAt         time.Time `json:"modified_at"`
+       ManifestText       string    `json:"manifest_text"`
 }
 
 // SdkCollectionList lists collections from api
@@ -131,7 +131,7 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections, err e
        sdkParams := arvadosclient.Dict{
                "select":  fieldsWanted,
                "order":   []string{"modified_at ASC", "uuid ASC"},
-               "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+               "filters": [][]string{{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
                "offset":  0}
 
        if params.BatchSize > 0 {
index 47ab5fa4a8a6793f9712ebb4ba1ff5a9aea503aa..b23ef2cf0e4d658677e34847b73be12872653b21 100644 (file)
@@ -64,7 +64,7 @@ func CompareSummarizedReadCollections(c *C,
 }
 
 func (s *MySuite) TestSummarizeSimple(checker *C) {
-       rc := MakeTestReadCollections([]TestCollectionSpec{TestCollectionSpec{
+       rc := MakeTestReadCollections([]TestCollectionSpec{{
                ReplicationLevel: 5,
                Blocks:           []int{1, 2},
        }})
@@ -79,7 +79,7 @@ func (s *MySuite) TestSummarizeSimple(checker *C) {
        expected := ExpectedSummary{
                OwnerToCollectionSize:     map[string]int{c.OwnerUUID: c.TotalSize},
                BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{blockDigest1: 5, blockDigest2: 5},
-               BlockToCollectionUuids:    map[blockdigest.DigestWithSize][]string{blockDigest1: []string{c.UUID}, blockDigest2: []string{c.UUID}},
+               BlockToCollectionUuids:    map[blockdigest.DigestWithSize][]string{blockDigest1: {c.UUID}, blockDigest2: {c.UUID}},
        }
 
        CompareSummarizedReadCollections(checker, rc, expected)
@@ -87,11 +87,11 @@ func (s *MySuite) TestSummarizeSimple(checker *C) {
 
 func (s *MySuite) TestSummarizeOverlapping(checker *C) {
        rc := MakeTestReadCollections([]TestCollectionSpec{
-               TestCollectionSpec{
+               {
                        ReplicationLevel: 5,
                        Blocks:           []int{1, 2},
                },
-               TestCollectionSpec{
+               {
                        ReplicationLevel: 8,
                        Blocks:           []int{2, 3},
                },
@@ -117,9 +117,9 @@ func (s *MySuite) TestSummarizeOverlapping(checker *C) {
                        blockDigest3: 8,
                },
                BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{
-                       blockDigest1: []string{c0.UUID},
-                       blockDigest2: []string{c0.UUID, c1.UUID},
-                       blockDigest3: []string{c1.UUID},
+                       blockDigest1: {c0.UUID},
+                       blockDigest2: {c0.UUID, c1.UUID},
+                       blockDigest3: {c1.UUID},
                },
        }
 
index 206a9c43fd4878babf0d9a5340a68b787b15b71a..651c869ef0780a91ce10c03ed9756b81f9a5ca6f 100644 (file)
@@ -118,7 +118,7 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer
 // GetKeepServers from api server
 func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
        sdkParams := arvadosclient.Dict{
-               "filters": [][]string{[]string{"service_type", "!=", "proxy"}},
+               "filters": [][]string{{"service_type", "!=", "proxy"}},
        }
        if params.Limit > 0 {
                sdkParams["limit"] = params.Limit
@@ -430,13 +430,23 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err
                return
        }
 
-       blockInfo.Mtime, err = strconv.ParseInt(tokens[1], 10, 64)
+       var ns int64
+       ns, err = strconv.ParseInt(tokens[1], 10, 64)
        if err != nil {
                return
        }
-       blockInfo.Digest =
-               blockdigest.DigestWithSize{Digest: locator.Digest,
-                       Size: uint32(locator.Size)}
+       if ns < 1e12 {
+               // An old version of keepstore is giving us timestamps
+               // in seconds instead of nanoseconds. (This threshold
+               // correctly handles all times between 1970-01-02 and
+               // 33658-09-27.)
+               ns = ns * 1e9
+       }
+       blockInfo.Mtime = ns
+       blockInfo.Digest = blockdigest.DigestWithSize{
+               Digest: locator.Digest,
+               Size:   uint32(locator.Size),
+       }
        return
 }
 
index 79ff3f8f0763b1c2452e63afe9b8d553fbab84b1..66988498481bf848d0ace840abb6bf838e9f7cf9 100644 (file)
@@ -43,7 +43,7 @@ func (s *KeepSuite) TestSendTrashLists(c *C) {
        defer server.Close()
 
        tl := map[string]TrashList{
-               server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+               server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
 
        arv := arvadosclient.ArvadosClient{ApiToken: "abc123"}
        kc := keepclient.KeepClient{Arvados: &arv, Client: &http.Client{}}
@@ -70,7 +70,7 @@ func (tse *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Req
 
 func sendTrashListError(c *C, server *httptest.Server) {
        tl := map[string]TrashList{
-               server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+               server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
 
        arv := arvadosclient.ArvadosClient{ApiToken: "abc123"}
        kc := keepclient.KeepClient{Arvados: &arv, Client: &http.Client{}}
index e2050c2b1ebefbc42bf950fc1ad30121d63b9c84..60b495c41a89799a34d40d7bce649d0af0c9a5fb 100644 (file)
@@ -164,69 +164,69 @@ func (s *PullSuite) TestBuildPullLists(c *C) {
        locator1 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xBadBeef)}
        c.Check(
                BuildPullLists(map[Locator]PullServers{
-                       locator1: PullServers{To: []string{}, From: []string{}}}),
+                       locator1: {To: []string{}, From: []string{}}}),
                PullListMapEquals,
                map[string]PullList{})
 
        c.Check(
                BuildPullLists(map[Locator]PullServers{
-                       locator1: PullServers{To: []string{}, From: []string{"f1", "f2"}}}),
+                       locator1: {To: []string{}, From: []string{"f1", "f2"}}}),
                PullListMapEquals,
                map[string]PullList{})
 
        c.Check(
                BuildPullLists(map[Locator]PullServers{
-                       locator1: PullServers{To: []string{"t1"}, From: []string{"f1", "f2"}}}),
+                       locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}}}),
                PullListMapEquals,
                map[string]PullList{
-                       "t1": PullList{PullRequest{locator1, []string{"f1", "f2"}}}})
+                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}}})
 
        c.Check(
                BuildPullLists(map[Locator]PullServers{
-                       locator1: PullServers{To: []string{"t1"}, From: []string{}}}),
+                       locator1: {To: []string{"t1"}, From: []string{}}}),
                PullListMapEquals,
-               map[string]PullList{"t1": PullList{
+               map[string]PullList{"t1": {
                        PullRequest{locator1, []string{}}}})
 
        c.Check(
                BuildPullLists(map[Locator]PullServers{
-                       locator1: PullServers{
+                       locator1: {
                                To:   []string{"t1", "t2"},
                                From: []string{"f1", "f2"},
                        }}),
                PullListMapEquals,
                map[string]PullList{
-                       "t1": PullList{PullRequest{locator1, []string{"f1", "f2"}}},
-                       "t2": PullList{PullRequest{locator1, []string{"f1", "f2"}}},
+                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
+                       "t2": {PullRequest{locator1, []string{"f1", "f2"}}},
                })
 
        locator2 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xCabbed)}
        c.Check(
                BuildPullLists(map[Locator]PullServers{
-                       locator1: PullServers{To: []string{"t1"}, From: []string{"f1", "f2"}},
-                       locator2: PullServers{To: []string{"t2"}, From: []string{"f3", "f4"}}}),
+                       locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}},
+                       locator2: {To: []string{"t2"}, From: []string{"f3", "f4"}}}),
                PullListMapEquals,
                map[string]PullList{
-                       "t1": PullList{PullRequest{locator1, []string{"f1", "f2"}}},
-                       "t2": PullList{PullRequest{locator2, []string{"f3", "f4"}}},
+                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
+                       "t2": {PullRequest{locator2, []string{"f3", "f4"}}},
                })
 
        c.Check(
                BuildPullLists(map[Locator]PullServers{
-                       locator1: PullServers{
+                       locator1: {
                                To:   []string{"t1"},
                                From: []string{"f1", "f2"}},
-                       locator2: PullServers{
+                       locator2: {
                                To:   []string{"t2", "t1"},
                                From: []string{"f3", "f4"}},
                }),
                PullListMapEquals,
                map[string]PullList{
-                       "t1": PullList{
+                       "t1": {
                                PullRequest{locator1, []string{"f1", "f2"}},
                                PullRequest{locator2, []string{"f3", "f4"}},
                        },
-                       "t2": PullList{
+                       "t2": {
                                PullRequest{locator2, []string{"f3", "f4"}},
                        },
                })
@@ -235,37 +235,37 @@ func (s *PullSuite) TestBuildPullLists(c *C) {
        locator4 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xFedBeef)}
        c.Check(
                BuildPullLists(map[Locator]PullServers{
-                       locator1: PullServers{
+                       locator1: {
                                To:   []string{"t1"},
                                From: []string{"f1", "f2"}},
-                       locator2: PullServers{
+                       locator2: {
                                To:   []string{"t2", "t1"},
                                From: []string{"f3", "f4"}},
-                       locator3: PullServers{
+                       locator3: {
                                To:   []string{"t3", "t2", "t1"},
                                From: []string{"f4", "f5"}},
-                       locator4: PullServers{
+                       locator4: {
                                To:   []string{"t4", "t3", "t2", "t1"},
                                From: []string{"f1", "f5"}},
                }),
                PullListMapEquals,
                map[string]PullList{
-                       "t1": PullList{
+                       "t1": {
                                PullRequest{locator1, []string{"f1", "f2"}},
                                PullRequest{locator2, []string{"f3", "f4"}},
                                PullRequest{locator3, []string{"f4", "f5"}},
                                PullRequest{locator4, []string{"f1", "f5"}},
                        },
-                       "t2": PullList{
+                       "t2": {
                                PullRequest{locator2, []string{"f3", "f4"}},
                                PullRequest{locator3, []string{"f4", "f5"}},
                                PullRequest{locator4, []string{"f1", "f5"}},
                        },
-                       "t3": PullList{
+                       "t3": {
                                PullRequest{locator3, []string{"f4", "f5"}},
                                PullRequest{locator4, []string{"f1", "f5"}},
                        },
-                       "t4": PullList{
+                       "t4": {
                                PullRequest{locator4, []string{"f1", "f5"}},
                        },
                })
index cc4eb92560b26b385378ffa6d947abb2bc9f0168..82684041275ff602236823b68da5ef2fab6714cf 100644 (file)
@@ -85,21 +85,21 @@ func VerifyToCollectionIndexSet(
 }
 
 func TestToCollectionIndexSet(t *testing.T) {
-       VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{6: []int{0}}, []int{0})
-       VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: []int{1}}, []int{1})
-       VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: []int{1, 9}}, []int{1, 9})
+       VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{6: {0}}, []int{0})
+       VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1}}, []int{1})
+       VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1, 9}}, []int{1, 9})
        VerifyToCollectionIndexSet(t, []int{5, 6},
-               map[int][]int{5: []int{2, 3}, 6: []int{3, 4}},
+               map[int][]int{5: {2, 3}, 6: {3, 4}},
                []int{2, 3, 4})
        VerifyToCollectionIndexSet(t, []int{5, 6},
-               map[int][]int{5: []int{8}, 6: []int{4}},
+               map[int][]int{5: {8}, 6: {4}},
                []int{4, 8})
-       VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{5: []int{0}}, []int{})
+       VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{5: {0}}, []int{})
 }
 
 func TestSimpleSummary(t *testing.T) {
        rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
+               {ReplicationLevel: 1, Blocks: []int{1, 2}},
        })
        rc.Summarize(nil)
        cIndex := rc.CollectionIndicesForTesting()
@@ -128,7 +128,7 @@ func TestSimpleSummary(t *testing.T) {
 
 func TestMissingBlock(t *testing.T) {
        rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
+               {ReplicationLevel: 1, Blocks: []int{1, 2}},
        })
        rc.Summarize(nil)
        cIndex := rc.CollectionIndicesForTesting()
@@ -159,7 +159,7 @@ func TestMissingBlock(t *testing.T) {
 
 func TestUnderAndOverReplicatedBlocks(t *testing.T) {
        rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               collection.TestCollectionSpec{ReplicationLevel: 2, Blocks: []int{1, 2}},
+               {ReplicationLevel: 2, Blocks: []int{1, 2}},
        })
        rc.Summarize(nil)
        cIndex := rc.CollectionIndicesForTesting()
@@ -190,9 +190,9 @@ func TestUnderAndOverReplicatedBlocks(t *testing.T) {
 
 func TestMixedReplication(t *testing.T) {
        rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
-               collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{3, 4}},
-               collection.TestCollectionSpec{ReplicationLevel: 2, Blocks: []int{5, 6}},
+               {ReplicationLevel: 1, Blocks: []int{1, 2}},
+               {ReplicationLevel: 1, Blocks: []int{3, 4}},
+               {ReplicationLevel: 2, Blocks: []int{5, 6}},
        })
        rc.Summarize(nil)
        cIndex := rc.CollectionIndicesForTesting()
index b6ceacecde2b8e2ffe810deea9e3777aade06625..3e4d387b62e2c4ba3c7d039a7114bd5ad222d2da 100644 (file)
@@ -29,7 +29,7 @@ func BuildTrashLists(kc *keepclient.KeepClient,
        ttl := int64(_ttl.(float64))
 
        // expire unreferenced blocks more than "ttl" seconds old.
-       expiry := time.Now().UTC().Unix() - ttl
+       expiry := time.Now().UTC().UnixNano() - ttl*1e9
 
        return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
 }
index 555211fe0275e9a42b49625557f8d505999b9c2d..3626904f3309743f08c6f23a5b1185e6ccd5b886 100644 (file)
@@ -26,12 +26,12 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
        var keepServerInfo = keep.ReadServers{
                KeepServerIndexToAddress: []keep.ServerAddress{sv0, sv1},
                BlockToServers: map[blockdigest.DigestWithSize][]keep.BlockServerInfo{
-                       block0: []keep.BlockServerInfo{
-                               keep.BlockServerInfo{0, 99},
-                               keep.BlockServerInfo{1, 101}},
-                       block1: []keep.BlockServerInfo{
-                               keep.BlockServerInfo{0, 99},
-                               keep.BlockServerInfo{1, 101}}}}
+                       block0: {
+                               {0, 99},
+                               {1, 101}},
+                       block1: {
+                               {0, 99},
+                               {1, 101}}}}
 
        // only block0 is in delete set
        var bs = make(BlockSet)
@@ -40,37 +40,37 @@ func (s *TrashSuite) TestBuildTrashLists(c *C) {
        // Test trash list where only sv0 is on writable list.
        c.Check(buildTrashListsInternal(
                map[string]struct{}{
-                       sv0.URL(): struct{}{}},
+                       sv0.URL(): {}},
                &keepServerInfo,
                110,
                bs),
                DeepEquals,
                map[string]keep.TrashList{
-                       "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
+                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
 
        // Test trash list where both sv0 and sv1 are on writable list.
        c.Check(buildTrashListsInternal(
                map[string]struct{}{
-                       sv0.URL(): struct{}{},
-                       sv1.URL(): struct{}{}},
+                       sv0.URL(): {},
+                       sv1.URL(): {}},
                &keepServerInfo,
                110,
                bs),
                DeepEquals,
                map[string]keep.TrashList{
-                       "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}},
-                       "http://keep1.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
+                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}},
+                       "http://keep1.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
 
        // Test trash list where only block on sv0 is expired
        c.Check(buildTrashListsInternal(
                map[string]struct{}{
-                       sv0.URL(): struct{}{},
-                       sv1.URL(): struct{}{}},
+                       sv0.URL(): {},
+                       sv1.URL(): {}},
                &keepServerInfo,
                100,
                bs),
                DeepEquals,
                map[string]keep.TrashList{
-                       "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
+                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
 
 }
index 2d1a59e8909bc250b2ca995775496f1839adf9f9..25b474b9ca0a475f81cc08db02ef007e9fe009f0 100644 (file)
@@ -6,6 +6,7 @@ import (
        "math"
        "os"
        "runtime"
+       "sort"
        "strings"
        "sync"
        "time"
@@ -50,11 +51,17 @@ type Balancer struct {
 }
 
 // Run performs a balance operation using the given config and
-// runOptions. It should only be called once on a given Balancer
-// object. Typical usage:
+// runOptions, and returns RunOptions suitable for passing to a
+// subsequent balance operation.
 //
-//   err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+// Run should only be called once on a given Balancer object.
+//
+// Typical usage:
+//
+//   runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+       nextRunOptions = runOptions
+
        bal.Dumper = runOptions.Dumper
        bal.Logger = runOptions.Logger
        if bal.Logger == nil {
@@ -75,10 +82,20 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
        if err = bal.CheckSanityEarly(&config.Client); err != nil {
                return
        }
-       if runOptions.CommitTrash {
+       rs := bal.rendezvousState()
+       if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+               if runOptions.SafeRendezvousState != "" {
+                       bal.logf("notice: KeepServices list has changed since last run")
+               }
+               bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
                if err = bal.ClearTrashLists(&config.Client); err != nil {
                        return
                }
+               // The current rendezvous state becomes "safe" (i.e.,
+               // OK to compute changes for that state without
+               // clearing existing trash lists) only now, after we
+               // succeed in clearing existing trash lists.
+               nextRunOptions.SafeRendezvousState = rs
        }
        if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
                return
@@ -158,6 +175,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
        return nil
 }
 
+// rendezvousState returns a fingerprint (e.g., a sorted list of
+// UUID+host+port) of the current set of keep services.
+func (bal *Balancer) rendezvousState() string {
+       srvs := make([]string, 0, len(bal.KeepServices))
+       for _, srv := range bal.KeepServices {
+               srvs = append(srvs, srv.String())
+       }
+       sort.Strings(srvs)
+       return strings.Join(srvs, "; ")
+}
+
 // ClearTrashLists sends an empty trash list to each keep
 // service. Calling this before GetCurrentState avoids races.
 //
@@ -199,7 +227,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                return err
        }
        bal.DefaultReplication = dd.DefaultCollectionReplication
-       bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+       bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
 
        errs := make(chan error, 2+len(bal.KeepServices))
        wg := sync.WaitGroup{}
@@ -619,7 +647,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
                }(srv)
        }
        var lastErr error
-       for _ = range bal.KeepServices {
+       for range bal.KeepServices {
                if err := <-errs; err != nil {
                        bal.logf("%v", err)
                        lastErr = err
index a138d911a3352a6edf261c6295110d1091b3d98a..edc88aa26286bac6f131a3c890b1c3fb36c75ec6 100644 (file)
@@ -236,7 +236,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       err := (&Balancer{}).Run(s.config, opts)
+       _, err := (&Balancer{}).Run(s.config, opts)
        c.Check(err, check.ErrorMatches, "received zero collections")
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -254,7 +254,7 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
        s.stub.serveFourDiskKeepServices()
        indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
-       err := (&Balancer{}).Run(s.config, opts)
+       _, err := (&Balancer{}).Run(s.config, opts)
        c.Check(err, check.IsNil)
        c.Check(indexReqs.Count(), check.Equals, 0)
        c.Check(trashReqs.Count(), check.Equals, 0)
@@ -271,7 +271,7 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        s.stub.serveFourDiskKeepServices()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       err := (&Balancer{}).Run(s.config, opts)
+       _, err := (&Balancer{}).Run(s.config, opts)
        c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -289,7 +289,7 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       err := (&Balancer{}).Run(s.config, opts)
+       _, err := (&Balancer{}).Run(s.config, opts)
        c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -308,7 +308,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        var bal Balancer
-       err := bal.Run(s.config, opts)
+       _, err := bal.Run(s.config, opts)
        c.Check(err, check.IsNil)
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -332,7 +332,7 @@ func (s *runSuite) TestCommit(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        var bal Balancer
-       err := bal.Run(s.config, opts)
+       _, err := bal.Run(s.config, opts)
        c.Check(err, check.IsNil)
        c.Check(trashReqs.Count(), check.Equals, 8)
        c.Check(pullReqs.Count(), check.Equals, 4)
@@ -362,13 +362,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
        s.config.RunPeriod = arvados.Duration(time.Millisecond)
        go RunForever(s.config, opts, stop)
 
-       // Each run should send 4 clear trash lists + 4 pull lists + 4
-       // trash lists. We should complete four runs in much less than
+       // Each run should send 4 pull lists + 4 trash lists. The
+       // first run should also send 4 empty trash lists at
+       // startup. We should complete all four runs in much less than
        // a second.
        for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
                time.Sleep(time.Millisecond)
        }
        stop <- true
        c.Check(pullReqs.Count() >= 16, check.Equals, true)
-       c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
+       c.Check(trashReqs.Count(), check.Equals, pullReqs.Count() + 4)
 }
index 682a5fb070cf0ab7e8a2b0fff0bb92a750622e06..b93939c0526d3c1f8bb7da93fe1f9915ad74c6cc 100644 (file)
@@ -76,7 +76,7 @@ func (bal *balancerSuite) SetUpTest(c *check.C) {
                bal.KeepServices[srv.UUID] = srv
        }
 
-       bal.MinMtime = time.Now().Unix() - bal.signatureTTL
+       bal.MinMtime = time.Now().UnixNano() - bal.signatureTTL*1e9
 }
 
 func (bal *balancerSuite) TestPerfect(c *check.C) {
@@ -240,7 +240,7 @@ func (bal *balancerSuite) srvList(knownBlockID int, order slots) (srvs []*KeepSe
 // replList is like srvList but returns an "existing replicas" slice,
 // suitable for a BlockState test fixture.
 func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
-       mtime := time.Now().Unix() - bal.signatureTTL - 86400
+       mtime := time.Now().UnixNano() - (bal.signatureTTL+86400)*1e9
        for _, srv := range bal.srvList(knownBlockID, order) {
                repls = append(repls, Replica{srv, mtime})
                mtime++
index b090614607ceed2a6e2bb7e66354644843186a34..0793889259eedb8c2abde3c4d22e9929281c678b 100644 (file)
@@ -78,8 +78,10 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                        CommitTrash: true,
                        Logger:      log.New(logBuf, "", log.LstdFlags),
                }
-               err := (&Balancer{}).Run(s.config, opts)
+               nextOpts, err := (&Balancer{}).Run(s.config, opts)
                c.Check(err, check.IsNil)
+               c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
+               c.Check(nextOpts.CommitPulls, check.Equals, true)
                if iter == 0 {
                        c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
                        c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
index 364bb3ffd3f6b437ec1d46edf52223539fb1161b..da4fb62a6ac8f0772e550c5eb2a103de7904ad7e 100644 (file)
@@ -51,6 +51,12 @@ type RunOptions struct {
        CommitTrash bool
        Logger      *log.Logger
        Dumper      *log.Logger
+
+       // SafeRendezvousState from the most recent balance operation,
+       // or "" if unknown. If this changes from one run to the next,
+       // we need to watch out for races. See
+       // (*Balancer)ClearTrashLists.
+       SafeRendezvousState string
 }
 
 var debugf = func(string, ...interface{}) {}
@@ -98,7 +104,7 @@ func main() {
        if err != nil {
                // (don't run)
        } else if runOptions.Once {
-               err = (&Balancer{}).Run(config, runOptions)
+               _, err = (&Balancer{}).Run(config, runOptions)
        } else {
                err = RunForever(config, runOptions, nil)
        }
@@ -138,7 +144,9 @@ func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) e
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               err := (&Balancer{}).Run(config, runOptions)
+               bal := &Balancer{}
+               var err error
+               runOptions, err = bal.Run(config, runOptions)
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
index 99da2a3a3de35de90be820b1a5285e5b592004d7..48cb02647cfd098cdc67796ba992ac5cba327bde 100644 (file)
@@ -350,7 +350,7 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
                                // Trashed blob; exclude it from response
                                continue
                        }
-                       fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
+                       fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
                }
                if resp.NextMarker == "" {
                        return nil
index 819d52fe0adecd71670ab89d57f1967b64368b4a..00f7b3ce150e2171365644ebc9ea34c2921d0aa9 100644 (file)
@@ -197,8 +197,8 @@ func main() {
        flag.IntVar(
                &permissionTTLSec,
                "blob-signature-ttl",
-               int(time.Duration(2*7*24*time.Hour).Seconds()),
-               "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
+               2*7*24*3600,
+               "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
                        "See services/api/config/application.default.yml.")
        flag.BoolVar(
                &flagSerializeIO,
index 80a7c89f2ed4f6669566711c40c4d0a59940e439..98e12034f84922d0c6bb7c94ceeeda68888a9561 100644 (file)
@@ -86,7 +86,7 @@ func (s *s3VolumeAdder) Set(bucketName string) error {
 }
 
 func s3regions() (okList []string) {
-       for r, _ := range aws.Regions {
+       for r := range aws.Regions {
                okList = append(okList, r)
        }
        return
@@ -249,7 +249,7 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                        if !v.isKeepBlock(key.Key) {
                                continue
                        }
-                       fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
+                       fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.UnixNano())
                }
                if !listResp.IsTruncated {
                        break
index 62f63d57c8edb655b5078ebf637ce6d0ed0475bb..d11bc05192246a75e8ba4c95bd544b0712279ff6 100644 (file)
@@ -22,7 +22,7 @@ func RunTrashWorker(trashq *WorkQueue) {
 
 // TrashItem deletes the indicated block from every writable volume.
 func TrashItem(trashRequest TrashRequest) {
-       reqMtime := time.Unix(trashRequest.BlockMtime, 0)
+       reqMtime := time.Unix(0, trashRequest.BlockMtime)
        if time.Since(reqMtime) < blobSignatureTTL {
                log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
                        time.Since(reqMtime),
@@ -39,8 +39,8 @@ func TrashItem(trashRequest TrashRequest) {
                        log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
                        continue
                }
-               if trashRequest.BlockMtime != mtime.Unix() {
-                       log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
+               if trashRequest.BlockMtime != mtime.UnixNano() {
+                       log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
                        continue
                }
 
index d111caeac8e5b571202502e0aea63f07816365ba..94798d95acfd85216ad60982b71282d84530ef7d 100644 (file)
@@ -236,7 +236,7 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        // Create TrashRequest for the test
        trashRequest := TrashRequest{
                Locator:    testData.DeleteLocator,
-               BlockMtime: oldBlockTime.Unix(),
+               BlockMtime: oldBlockTime.UnixNano(),
        }
 
        // Run trash worker and put the trashRequest on trashq
index f8fe0d0ebce719c6c823fe9caa9fcce12324eb49..4291c6cd1f3964f06a095214d2e7308d35ea93d4 100644 (file)
@@ -7,6 +7,7 @@ import (
        "os"
        "regexp"
        "sort"
+       "strconv"
        "strings"
        "time"
 
@@ -355,10 +356,22 @@ func testIndexTo(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
+       // minMtime and maxMtime are the minimum and maximum
+       // acceptable values the index can report for our test
+       // blocks. 1-second precision is acceptable.
+       minMtime := time.Now().UTC().UnixNano()
+       minMtime -= minMtime % 1e9
+
        v.PutRaw(TestHash, TestBlock)
        v.PutRaw(TestHash2, TestBlock2)
        v.PutRaw(TestHash3, TestBlock3)
 
+       maxMtime := time.Now().UTC().UnixNano()
+       if maxMtime%1e9 > 0 {
+               maxMtime -= maxMtime % 1e9
+               maxMtime += 1e9
+       }
+
        // Blocks whose names aren't Keep hashes should be omitted from
        // index
        v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
@@ -371,15 +384,21 @@ func testIndexTo(t TB, factory TestableVolumeFactory) {
        indexRows := strings.Split(string(buf.Bytes()), "\n")
        sort.Strings(indexRows)
        sortedIndex := strings.Join(indexRows, "\n")
-       m, err := regexp.MatchString(
-               `^\n`+TestHash+`\+\d+ \d+\n`+
-                       TestHash3+`\+\d+ \d+\n`+
-                       TestHash2+`\+\d+ \d+$`,
-               sortedIndex)
-       if err != nil {
-               t.Error(err)
-       } else if !m {
+       m := regexp.MustCompile(
+               `^\n` + TestHash + `\+\d+ (\d+)\n` +
+                       TestHash3 + `\+\d+ \d+\n` +
+                       TestHash2 + `\+\d+ \d+$`,
+       ).FindStringSubmatch(sortedIndex)
+       if m == nil {
                t.Errorf("Got index %q for empty prefix", sortedIndex)
+       } else {
+               mtime, err := strconv.ParseInt(m[1], 10, 64)
+               if err != nil {
+                       t.Error(err)
+               } else if mtime < minMtime || mtime > maxMtime {
+                       t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
+                               mtime, minMtime, maxMtime)
+               }
        }
 
        for _, prefix := range []string{"f", "f15", "f15ac"} {
@@ -396,7 +415,7 @@ func testIndexTo(t TB, factory TestableVolumeFactory) {
 
        for _, prefix := range []string{"zero", "zip", "zilch"} {
                buf = new(bytes.Buffer)
-               v.IndexTo(prefix, buf)
+               err := v.IndexTo(prefix, buf)
                if err != nil {
                        t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
                } else if buf.Len() != 0 {
index 7aff85e59a4357acb1e27ce5386756feb96fa0e1..90189dc36cacab73276a322656755b32a580c909 100644 (file)
@@ -138,9 +138,8 @@ func (v *UnixVolume) Touch(loc string) error {
                return e
        }
        defer unlockfile(f)
-       now := time.Now().Unix()
-       utime := syscall.Utimbuf{now, now}
-       return syscall.Utime(p, &utime)
+       ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
 }
 
 // Mtime returns the stored timestamp for the given locator.
@@ -353,7 +352,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        _, err = fmt.Fprint(w,
                                name,
                                "+", fileInfo[0].Size(),
-                               " ", fileInfo[0].ModTime().Unix(),
+                               " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
                }
                blockdir.Close()
index 74c67f2dd0a6c1ee69748d24c162d95b5c98b16a..6b31795293ebd38eaa3837316fe001519c91b072 100644 (file)
@@ -98,7 +98,7 @@ func TestWorkQueueDoneness(t *testing.T) {
        gate := make(chan struct{})
        go func() {
                <-gate
-               for _ = range b.NextItem {
+               for range b.NextItem {
                        <-gate
                        time.Sleep(time.Millisecond)
                        b.DoneItem <- struct{}{}
index db799bc16b806beb09313fb56bbcee8e88f4a00a..c78f1c6b8d63160c40e7e57d9b29f57d07e59dcf 100644 (file)
@@ -211,8 +211,10 @@ class BaseComputeNodeDriver(RetryMixin):
         # libcloud compute drivers typically raise bare Exceptions to
         # represent API errors.  Return True for any exception that is
         # exactly an Exception, or a better-known higher-level exception.
-        if (exception is BaseHTTPError and
-            self.message and self.message.startswith("InvalidInstanceID.NotFound")):
+        if (type(exception) is BaseHTTPError and
+            exception.message and
+            (exception.message.startswith("InvalidInstanceID.NotFound") or
+             exception.message.startswith("InstanceLimitExceeded"))):
             return True
         return (isinstance(exception, cls.CLOUD_ERRORS) or
                 type(exception) is Exception)
index 227b5e5f3471ba4cf2e484461cd2c651f26a96e1..c3774c1b7afd8fa3c53f36ace1444daab0a22d81 100644 (file)
@@ -11,7 +11,10 @@ import mock
 import pykka
 import threading
 
+from libcloud.common.exceptions import BaseHTTPError
+
 import arvnodeman.computenode.dispatch as dispatch
+from arvnodeman.computenode.driver import BaseComputeNodeDriver
 from . import testutil
 
 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
@@ -25,6 +28,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.api_client.nodes().update().execute.side_effect = arvados_effect
         self.cloud_client = mock.MagicMock(name='cloud_client')
         self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+        self.cloud_client.is_cloud_exception = BaseComputeNodeDriver.is_cloud_exception
 
     def make_actor(self, arv_node=None):
         if not hasattr(self, 'timer'):
@@ -86,6 +90,28 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.make_actor()
         self.wait_for_assignment(self.setup_actor, 'cloud_node')
 
+    def test_unknown_basehttperror_not_retried(self):
+        self.make_mocks()
+        self.cloud_client.create_node.side_effect = [
+            BaseHTTPError(400, "Unknown"),
+            self.cloud_client.create_node.return_value,
+            ]
+        self.make_actor()
+        finished = threading.Event()
+        self.setup_actor.subscribe(lambda _: finished.set())
+        assert(finished.wait(self.TIMEOUT))
+        self.assertEqual(0, self.cloud_client.post_create_node.call_count)
+
+    def test_known_basehttperror_retried(self):
+        self.make_mocks()
+        self.cloud_client.create_node.side_effect = [
+            BaseHTTPError(400, "InstanceLimitExceeded"),
+            self.cloud_client.create_node.return_value,
+            ]
+        self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        self.assertEqual(1, self.cloud_client.post_create_node.call_count)
+
     def test_failed_post_create_retried(self):
         self.make_mocks()
         self.cloud_client.post_create_node.side_effect = [
index 842f612fac089aed33ed9ffcd68d650955dad5f1..2ebe13c895eb16a775ca247b028a60cdc6eba59c 100755 (executable)
@@ -101,6 +101,9 @@ wait_for_arvbox() {
 }
 
 run() {
+    CONFIG=$1
+    TAG=$2
+
     if docker ps -a --filter "status=running" | grep -E "$ARVBOX_CONTAINER$" -q ; then
         echo "Container $ARVBOX_CONTAINER is already running"
         exit 0
@@ -110,8 +113,13 @@ run() {
         echo "Container $ARVBOX_CONTAINER already exists but is not running; use restart or rebuild"
         exit 1
     fi
+   
+    if test ! -z "$TAG"
+    then
+       TAG=":$TAG"
+    fi
 
-    if echo "$1" | grep '^public' ; then
+    if echo "$CONFIG" | grep '^public' ; then
         if test -n "$ARVBOX_PUBLISH_IP" ; then
             localip=$ARVBOX_PUBLISH_IP
         else
@@ -136,7 +144,7 @@ run() {
         PUBLIC=""
     fi
 
-    if echo "$1" | grep 'demo$' ; then
+    if echo "$CONFIG" | grep 'demo$' ; then
         if test -d "$ARVBOX_DATA" ; then
             echo "It looks like you already have a development container named $ARVBOX_CONTAINER."
             echo "Set ARVBOX_CONTAINER to set a different name for your demo container"
@@ -153,7 +161,7 @@ run() {
                --privileged \
                --volumes-from $ARVBOX_CONTAINER-data \
                $PUBLIC \
-               arvados/arvbox-demo
+               arvados/arvbox-demo$TAG
         updateconf
         wait_for_arvbox
     else
@@ -167,8 +175,7 @@ run() {
             git clone https://github.com/curoverse/sso-devise-omniauth-provider.git "$SSO_ROOT"
         fi
 
-        if test "$1" = test ; then
-            shift
+        if test "$CONFIG" = test ; then
 
             mkdir -p $VAR_DATA/test
 
@@ -184,7 +191,7 @@ run() {
                    "--volume=$GEMS:/var/lib/gems:rw" \
                    "--volume=$PIPCACHE:/var/lib/pip:rw" \
                    "--volume=$GOSTUFF:/var/lib/gopath:rw" \
-                   arvados/arvbox-dev \
+                   arvados/arvbox-dev$TAG \
                    /usr/local/bin/runsvinit -svdir=/etc/test-service
 
             docker exec -ti \
@@ -210,7 +217,7 @@ run() {
                    WORKSPACE=/usr/src/arvados \
                    GEM_HOME=/var/lib/gems \
                    "$@"
-        elif echo "$1" | grep 'dev$' ; then
+        elif echo "$CONFIG" | grep 'dev$' ; then
             docker run \
                    --detach \
                    --name=$ARVBOX_CONTAINER \
@@ -224,12 +231,12 @@ run() {
                    "--volume=$PIPCACHE:/var/lib/pip:rw" \
                    "--volume=$GOSTUFF:/var/lib/gopath:rw" \
                    $PUBLIC \
-                   arvados/arvbox-dev
+                   arvados/arvbox-dev$TAG
             updateconf
             wait_for_arvbox
             echo "The Arvados source code is checked out at: $ARVADOS_ROOT"
         else
-            echo "Unknown configuration '$1'"
+            echo "Unknown configuration '$CONFIG'"
         fi
     fi
 }
@@ -426,7 +433,7 @@ case "$subcmd" in
         echo
         echo "build   <config>      build arvbox Docker image"
         echo "rebuild <config>      build arvbox Docker image, no layer cache"
-        echo "start|run <config>  start $ARVBOX_CONTAINER container"
+        echo "start|run <config> [tag]  start $ARVBOX_CONTAINER container"
         echo "open       open arvbox workbench in a web browser"
         echo "shell      enter arvbox shell"
         echo "ip         print arvbox docker container ip address"