8460: Merge branch 'master' into 8460-websocket-go
authorTom Clegg <tom@curoverse.com>
Sun, 11 Dec 2016 05:59:55 +0000 (00:59 -0500)
committerTom Clegg <tom@curoverse.com>
Sun, 11 Dec 2016 05:59:55 +0000 (00:59 -0500)
86 files changed:
README.md
apps/workbench/app/controllers/work_units_controller.rb
apps/workbench/app/models/proxy_work_unit.rb
apps/workbench/app/views/container_requests/_show_inputs.html.erb
apps/workbench/app/views/pipeline_instances/_running_component.html.erb
apps/workbench/app/views/pipeline_instances/_show_components_running.html.erb
apps/workbench/test/integration/container_requests_test.rb
apps/workbench/test/integration/work_units_test.rb
build/run-build-packages-one-target.sh
build/run-build-packages.sh
build/run-tests.sh
doc/_config.yml
doc/_includes/_arvados_cwl_runner.liquid [new file with mode: 0644]
doc/_includes/_register_cwl_workflow.liquid [new file with mode: 0644]
doc/_includes/_what_is_cwl.liquid [new file with mode: 0644]
doc/user/cwl/cwl-runner.html.textile.liquid
doc/user/topics/arv-copy.html.textile.liquid
doc/user/topics/running-workflow-command-line.html.textile.liquid [new file with mode: 0644]
doc/user/tutorials/tutorial-pipeline-workbench.html.textile.liquid [deleted file]
doc/user/tutorials/tutorial-workflow-workbench.html.textile.liquid [new file with mode: 0644]
doc/user/tutorials/writing-cwl-workflow.html.textile.liquid [new file with mode: 0644]
sdk/cli/bin/crunch-job
sdk/cli/test/test_arv-keep-get.rb
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_submit.py
sdk/go/logger/logger.go [deleted file]
sdk/go/logger/util.go [deleted file]
sdk/go/util/util.go [deleted file]
sdk/python/arvados/_version.py [new file with mode: 0644]
sdk/python/arvados/commands/arv_copy.py
sdk/python/arvados/commands/keepdocker.py
sdk/python/arvados/commands/ls.py
sdk/python/arvados/commands/put.py
sdk/python/arvados/commands/run.py
sdk/python/arvados/commands/ws.py
sdk/python/bin/arv-get
sdk/python/bin/arv-normalize
sdk/python/setup.py
sdk/python/tests/arvados_testutil.py
sdk/python/tests/test_arv_copy.py [new file with mode: 0644]
sdk/python/tests/test_arv_keepdocker.py [new file with mode: 0644]
sdk/python/tests/test_arv_ls.py
sdk/python/tests/test_arv_normalize.py [new file with mode: 0644]
sdk/python/tests/test_arv_put.py
sdk/python/tests/test_arv_run.py [new file with mode: 0644]
sdk/python/tests/test_arv_ws.py
services/api/test/fixtures/pipeline_templates.yml
services/api/test/fixtures/workflows.yml
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/squeue.go
services/crunch-run/crunchrun.go
services/datamanager/collection/collection.go [deleted file]
services/datamanager/collection/collection_test.go [deleted file]
services/datamanager/collection/testing.go [deleted file]
services/datamanager/datamanager.go [deleted file]
services/datamanager/datamanager_test.go [deleted file]
services/datamanager/experimental/datamanager.py [deleted file]
services/datamanager/experimental/datamanager_test.py [deleted file]
services/datamanager/keep/keep.go [deleted file]
services/datamanager/keep/keep_test.go [deleted file]
services/datamanager/loggerutil/loggerutil.go [deleted file]
services/datamanager/summary/canonical_string.go [deleted file]
services/datamanager/summary/file.go [deleted file]
services/datamanager/summary/pull_list.go [deleted file]
services/datamanager/summary/pull_list_test.go [deleted file]
services/datamanager/summary/summary.go [deleted file]
services/datamanager/summary/summary_test.go [deleted file]
services/datamanager/summary/trash_list.go [deleted file]
services/datamanager/summary/trash_list_test.go [deleted file]
services/fuse/arvados_fuse/_version.py [new file with mode: 0644]
services/fuse/arvados_fuse/command.py
services/fuse/setup.py
services/fuse/tests/test_command_args.py
services/keep-balance/balance.go
services/keepstore/s3_volume_test.go
services/nodemanager/arvnodeman/_version.py [new file with mode: 0644]
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/setup.py
services/nodemanager/tests/test_arguments.py [new file with mode: 0644]
services/nodemanager/tests/testutil.py

index cf09171b024716b4a57a412746c7d1a9decbb87e..419ca15957cc4cd2aa1b00048dd61feb2c6375f3 100644 (file)
--- a/README.md
+++ b/README.md
@@ -59,6 +59,7 @@ contributers to Arvados.
 ## Development
 
 [![Build Status](https://ci.curoverse.com/buildStatus/icon?job=run-tests)](https://ci.curoverse.com/job/run-tests/)
+[![Go Report Card](https://goreportcard.com/badge/github.com/curoverse/arvados)](https://goreportcard.com/report/github.com/curoverse/arvados)
 
 The Arvados public bug tracker is located at https://dev.arvados.org/projects/arvados/issues
 
index 3b611aa25b74e28663d9b7ecc2b0647670f066c8..550bdb7e953f7fe47a899cbe674a39ed457a9529 100644 (file)
@@ -57,7 +57,7 @@ class WorkUnitsController < ApplicationController
       workflow = Workflow.find? template_uuid
       if workflow.definition
         begin
-          wf_json = YAML::load(workflow.definition)
+          wf_json = ActiveSupport::HashWithIndifferentAccess.new YAML::load(workflow.definition)
         rescue => e
           logger.error "Error converting definition yaml to json: #{e.message}"
           raise ArgumentError, "Error converting definition yaml to json: #{e.message}"
@@ -77,11 +77,21 @@ class WorkUnitsController < ApplicationController
       attrs['cwd'] = "/var/spool/cwl"
       attrs['output_path'] = "/var/spool/cwl"
 
+      input_defaults = {}
+      if wf_json
+        inputs = get_cwl_inputs(wf_json)
+        inputs.each do |input|
+          if input[:default]
+            input_defaults[cwl_shortname(input[:id])] = input[:default]
+          end
+        end
+      end
+
       # mounts
       mounts = {
         "/var/lib/cwl/cwl.input.json" => {
           "kind" => "json",
-          "content" => {}
+          "content" => input_defaults
         },
         "stdout" => {
           "kind" => "file",
index 48bc3a04bc95dd41915e317449e7287ed4e42bce..771fdac47617fdd8855b8f0c00c10d5625064404 100644 (file)
@@ -189,7 +189,7 @@ class ProxyWorkUnit < WorkUnit
   def cputime
     if state_label != "Queued"
       if started_at
-        (runtime_constraints.andand[:min_nodes] || 1) * ((finished_at || Time.now()) - started_at)
+        (runtime_constraints.andand[:min_nodes] || 1).to_i * ((finished_at || Time.now()) - started_at)
       end
     end
   end
@@ -276,14 +276,14 @@ class ProxyWorkUnit < WorkUnit
       if children.any?
         cpu_time = children.map { |c|
           if c.started_at
-             (c.runtime_constraints.andand[:min_nodes] || 1) * ((c.finished_at || Time.now()) - c.started_at)
+             (c.runtime_constraints.andand[:min_nodes] || 1).to_i * ((c.finished_at || Time.now()) - c.started_at)
           else
             0
           end
         }.reduce(:+) || 0
       else
         if started_at
-          cpu_time = (runtime_constraints.andand[:min_nodes] || 1) * ((finished_at || Time.now()) - started_at)
+          cpu_time = (runtime_constraints.andand[:min_nodes] || 1).to_i * ((finished_at || Time.now()) - started_at)
         end
       end
 
index a6c4bffacd2fc1add6043a16226e28bbae15affa..b2fb245454aae2ead67ca1851ba5f57700678512 100644 (file)
@@ -1,22 +1,30 @@
-<% n_inputs = cwl_inputs_required(@object, get_cwl_inputs(@object.mounts[:"/var/lib/cwl/workflow.json"][:content]), [:mounts, :"/var/lib/cwl/cwl.input.json", :content]) %>
+<%
+n_inputs = if @object.mounts[:"/var/lib/cwl/workflow.json"] && @object.mounts[:"/var/lib/cwl/cwl.input.json"]
+             cwl_inputs_required(@object, get_cwl_inputs(@object.mounts[:"/var/lib/cwl/workflow.json"][:content]), [:mounts, :"/var/lib/cwl/cwl.input.json", :content])
+           else
+             0
+           end
+%>
 
 <% content_for :pi_input_form do %>
 <form role="form" style="width:60%">
   <div class="form-group">
-    <% workflow = @object.mounts[:"/var/lib/cwl/workflow.json"][:content] %>
-    <% inputs = get_cwl_inputs(workflow) %>
-    <% inputs.each do |input| %>
-      <label for="#input-<%= cwl_shortname(input[:id]) %>">
-        <%= input[:label] || cwl_shortname(input[:id]) %>
-      </label>
-      <div>
-        <p class="form-control-static">
-          <%= render_cwl_input @object, input, [:mounts, :"/var/lib/cwl/cwl.input.json", :content] %>
+    <% workflow = @object.mounts[:"/var/lib/cwl/workflow.json"].andand[:content] %>
+    <% if workflow %>
+      <% inputs = get_cwl_inputs(workflow) %>
+      <% inputs.each do |input| %>
+        <label for="#input-<%= cwl_shortname(input[:id]) %>">
+          <%= input[:label] || cwl_shortname(input[:id]) %>
+        </label>
+        <div>
+          <p class="form-control-static">
+            <%= render_cwl_input @object, input, [:mounts, :"/var/lib/cwl/cwl.input.json", :content] %>
+          </p>
+        </div>
+        <p class="help-block">
+          <%= input[:doc] %>
         </p>
-      </div>
-      <p class="help-block">
-        <%= input[:doc] %>
-      </p>
+      <% end %>
     <% end %>
   </div>
 </form>
index ded535ef3ad5109e81a33ea1fd9815cde8ac6905..06ed01ee6efd71282f6c1e647bdd542c717b1569 100644 (file)
@@ -48,7 +48,7 @@
           <div class="col-md-3">
             <% if current_job[:started_at] %>
               <% walltime = ((if current_job[:finished_at] then current_job[:finished_at] else Time.now() end) - current_job[:started_at]) %>
-              <% cputime = (current_job[:runtime_constraints].andand[:min_nodes] || 1) *
+              <% cputime = (current_job[:runtime_constraints].andand[:min_nodes] || 1).to_i *
                            ((current_job[:finished_at] || Time.now()) - current_job[:started_at]) %>
               <%= render_runtime(walltime, false) %>
               <% if cputime > 0 %> / <%= render_runtime(cputime, false) %> (<%= (cputime/walltime).round(1) %>&Cross;)<% end %>
index 4343f2e57b5adbb64dfb0fbabe177b9d7f937b7a..a4eb6ffb2abad2b959c9bfe48718d5f951227b59 100644 (file)
@@ -66,7 +66,7 @@
     <%
         cputime = pipeline_jobs.map { |j|
         if j[:job][:started_at]
-          (j[:job][:runtime_constraints].andand[:min_nodes] || 1) * ((j[:job][:finished_at] || Time.now()) - j[:job][:started_at])
+          (j[:job][:runtime_constraints].andand[:min_nodes] || 1).to_i * ((j[:job][:finished_at] || Time.now()) - j[:job][:started_at])
         else
           0
         end
index df6584ebb6490cedac2fe439a1a77110a9feeb84..bd3a813f72af4b8ea77c35568b6f737a1389237d 100644 (file)
@@ -96,4 +96,17 @@ class ContainerRequestsTest < ActionDispatch::IntegrationTest
     wait_for_ajax
     assert_text 'This container is queued'
   end
+
+  test "Run button enabled when workflow is empty and no inputs are needed" do
+    visit page_with_token("active")
+
+    find('.btn', text: 'Run a process').click
+    within('.modal-dialog') do
+      find('.selectable', text: 'Valid workflow with no definition yaml').click
+      find('.btn', text: 'Next: choose inputs').click
+    end
+
+    assert_text 'This workflow does not need any further inputs'
+    page.assert_selector 'a', text: 'Run'
+  end
 end
index 3f551a012ea62a692b5fdfcf0c99de0208b359bd..5b5848ee7766580003ee10be1c28ba944070bf36 100644 (file)
@@ -109,8 +109,8 @@ class WorkUnitsTest < ActionDispatch::IntegrationTest
   end
 
   [
-    ['Two Part Pipeline Template', 'part-one', 'Provide a value for the following'],
-    ['Workflow with input specifications', 'this workflow has inputs specified', 'Provide a value for the following'],
+    ['Pipeline with default input specifications', 'part-one', 'Provide values for the following'],
+    ['Workflow with default input specifications', 'this workflow has inputs specified', 'Provide a value for the following'],
   ].each do |template_name, preview_txt, process_txt|
     test "run a process using template #{template_name} from dashboard" do
       visit page_with_token('admin')
@@ -131,6 +131,10 @@ class WorkUnitsTest < ActionDispatch::IntegrationTest
       # in the process page now
       assert_text process_txt
       assert_selector 'a', text: template_name
+
+      assert_equal "Set value for ex_string_def", find('div.form-group > div > p.form-control-static > a', text: "hello-testing-123")[:"data-title"]
+
+      page.assert_selector 'a.disabled,button.disabled', text: 'Run'
     end
   end
 
index 60250c9d50b4b07befa49903645ae00cd0b8a4d6..16c7129d9b8efc0747da64f394366680f1ce4fc6 100755 (executable)
@@ -127,7 +127,6 @@ popd
 
 if test -z "$packages" ; then
     packages="arvados-api-server
-        arvados-data-manager
         arvados-docker-cleaner
         arvados-git-httpd
         arvados-node-manager
index 08b2cd204a0046defb3db95741639bd28042008b..f5773e7a959a966fc18e9a8b4671218b1714b8cb 100755 (executable)
@@ -424,8 +424,6 @@ package_go_binary services/crunch-run crunch-run \
     "Supervise a single Crunch container"
 package_go_binary services/crunchstat crunchstat \
     "Gather cpu/memory/network statistics of running Crunch jobs"
-package_go_binary services/datamanager arvados-data-manager \
-    "Ensure block replication levels, report disk usage, and determine which blocks should be deleted when space is needed"
 package_go_binary services/keep-balance keep-balance \
     "Rebalance and garbage-collect data blocks stored in Arvados Keep"
 package_go_binary services/keepproxy keepproxy \
@@ -452,7 +450,7 @@ package_go_binary tools/keep-exercise keep-exercise \
 # 2014-05-15
 cd $WORKSPACE/packages/$TARGET
 rm -rf "$WORKSPACE/sdk/python/build"
-fpm_build $WORKSPACE/sdk/python "${PYTHON2_PKG_PREFIX}-arvados-python-client" 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/sdk/python/arvados_python_client.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados Python SDK" --deb-recommends=git
+fpm_build $WORKSPACE/sdk/python "${PYTHON2_PKG_PREFIX}-arvados-python-client" 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/sdk/python/arvados_python_client.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados Python SDK" --depends "${PYTHON2_PKG_PREFIX}-setuptools" --deb-recommends=git
 
 # cwl-runner
 cd $WORKSPACE/packages/$TARGET
@@ -505,12 +503,12 @@ fi
 # not omit the python- prefix first.
 cd $WORKSPACE/packages/$TARGET
 rm -rf "$WORKSPACE/services/fuse/build"
-fpm_build $WORKSPACE/services/fuse "${PYTHON2_PKG_PREFIX}-arvados-fuse" 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/fuse/arvados_fuse.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Keep FUSE driver"
+fpm_build $WORKSPACE/services/fuse "${PYTHON2_PKG_PREFIX}-arvados-fuse" 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/fuse/arvados_fuse.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Keep FUSE driver" --depends "${PYTHON2_PKG_PREFIX}-setuptools"
 
 # The node manager
 cd $WORKSPACE/packages/$TARGET
 rm -rf "$WORKSPACE/services/nodemanager/build"
-fpm_build $WORKSPACE/services/nodemanager arvados-node-manager 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/nodemanager/arvados_node_manager.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados node manager"
+fpm_build $WORKSPACE/services/nodemanager arvados-node-manager 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/nodemanager/arvados_node_manager.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados node manager" --depends "${PYTHON2_PKG_PREFIX}-setuptools"
 
 # The Docker image cleaner
 cd $WORKSPACE/packages/$TARGET
index efcab595791f6f6f0db88eb23d4445524ca073eb..e0e1ce28524327851e6925d26284530ced1dd5a0 100755 (executable)
@@ -773,10 +773,6 @@ gostuff=(
     sdk/go/keepclient
     services/keep-balance
     services/keepproxy
-    services/datamanager/summary
-    services/datamanager/collection
-    services/datamanager/keep
-    services/datamanager
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
index 96aea34d36173a07c7952480d1fd59dcf3f90409..30df754b7af8394e0472c241aebd59abf37bcbb2 100644 (file)
@@ -28,9 +28,9 @@ navbar:
     - Welcome:
       - user/index.html.textile.liquid
       - user/getting_started/community.html.textile.liquid
-    - Run a pipeline using Workbench:
+    - Run a workflow using Workbench:
       - user/getting_started/workbench.html.textile.liquid
-      - user/tutorials/tutorial-pipeline-workbench.html.textile.liquid
+      - user/tutorials/tutorial-workflow-workbench.html.textile.liquid
     - Access an Arvados virtual machine:
       - user/getting_started/vm-login-with-webshell.html.textile.liquid
       - user/getting_started/ssh-access-unix.html.textile.liquid
@@ -47,13 +47,15 @@ navbar:
       - user/cwl/cwl-runner.html.textile.liquid
       - user/cwl/cwl-style.html.textile.liquid
     - Working on the command line:
+      - user/topics/running-workflow-command-line.html.textile.liquid
       - user/topics/running-pipeline-command-line.html.textile.liquid
       - user/topics/arv-run.html.textile.liquid
     - Working with git repositories:
       - user/tutorials/add-new-repository.html.textile.liquid
       - user/tutorials/git-arvados-guide.html.textile.liquid
-    - Develop an Arvados pipeline:
+    - Develop an Arvados workflow:
       - user/tutorials/intro-crunch.html.textile.liquid
+      - user/tutorials/writing-cwl-workflow.html.textile.liquid
       - user/tutorials/running-external-program.html.textile.liquid
       - user/topics/crunch-tools-overview.html.textile.liquid
       - user/tutorials/tutorial-firstscript.html.textile.liquid
diff --git a/doc/_includes/_arvados_cwl_runner.liquid b/doc/_includes/_arvados_cwl_runner.liquid
new file mode 100644 (file)
index 0000000..9a10bab
--- /dev/null
@@ -0,0 +1,62 @@
+h3. Submit a workflow and wait for results
+
+Use @arvados-cwl-runner@ to submit CWL workflows to Arvados.  After submitting the job, it will wait for the workflow to complete and print out the final result to standard output.
+
+*Note:* Once submitted, the workflow runs entirely on Arvados, so even if you interrupt @arvados-cwl-runner@ or log out, the workflow will continue to run.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner bwa-mem.cwl bwa-mem-input.yml</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-06-30 14:56:36 arvados.arv-run[27002] INFO: Upload local files: "bwa-mem.cwl"
+2016-06-30 14:56:36 arvados.arv-run[27002] INFO: Uploaded to qr1hi-4zz18-h7ljh5u76760ww2
+2016-06-30 14:56:40 arvados.cwl-runner[27002] INFO: Submitted job qr1hi-8i9sb-fm2n3b1w0l6bskg
+2016-06-30 14:56:41 arvados.cwl-runner[27002] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-fm2n3b1w0l6bskg) is Running
+2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-fm2n3b1w0l6bskg) is Complete
+2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
+{
+    "aligned_sam": {
+        "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+        "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
+        "class": "File",
+        "size": 30738986
+    }
+}
+</code></pre>
+</notextile>
+
+h3. Submit a workflow with no waiting
+
+To submit a workflow and exit immediately, use the @--no-wait@ option.  This will submit the workflow to Arvados, print out the UUID of the job that was submitted to standard output, and exit.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --no-wait bwa-mem.cwl bwa-mem-input.yml</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-06-30 15:07:52 arvados.arv-run[12480] INFO: Upload local files: "bwa-mem.cwl"
+2016-06-30 15:07:52 arvados.arv-run[12480] INFO: Uploaded to qr1hi-4zz18-eqnfwrow8aysa9q
+2016-06-30 15:07:52 arvados.cwl-runner[12480] INFO: Submitted job qr1hi-8i9sb-fm2n3b1w0l6bskg
+qr1hi-8i9sb-fm2n3b1w0l6bskg
+</code></pre>
+</notextile>
+
+h3. Run a workflow locally
+
+To run a workflow with local control, use @--local@.  This means that the host where you run @arvados-cwl-runner@ will be responsible for submitting jobs. With @--local@, if you interrupt @arvados-cwl-runner@ or log out, the workflow will be terminated.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --local bwa-mem.cwl bwa-mem-input.yml</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 10:05:19 arvados.cwl-runner[16290] INFO: Pipeline instance qr1hi-d1hrv-92wcu6ldtio74r4
+2016-07-01 10:05:28 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Queued
+2016-07-01 10:05:29 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Running
+2016-07-01 10:05:45 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Complete
+2016-07-01 10:05:46 arvados.cwl-runner[16290] INFO: Overall process status is success
+{
+    "aligned_sam": {
+        "size": 30738986,
+        "path": "keep:15f56bad0aaa7364819bf14ca2a27c63+88/HWI-ST1027_129_D0THKACXX.1_1.sam",
+        "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
+        "class": "File"
+    }
+}
+</code></pre>
+</notextile>
diff --git a/doc/_includes/_register_cwl_workflow.liquid b/doc/_includes/_register_cwl_workflow.liquid
new file mode 100644 (file)
index 0000000..438115b
--- /dev/null
@@ -0,0 +1,21 @@
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to qr1hi-4zz18-7e0hedrmkuyoei3
+2016-07-01 12:21:01 arvados.cwl-runner[15796] INFO: Created template qr1hi-p5p6p-rjleou1dwr167v5
+qr1hi-p5p6p-rjleou1dwr167v5
+</code></pre>
+</notextile>
+
+You can provide a partial input file to set default values for the workflow input parameters:
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
+2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to qr1hi-4zz18-0f91qkovk4ml18o
+2016-07-01 14:09:50 arvados.cwl-runner[3730] INFO: Created template qr1hi-p5p6p-0deqe6nuuyqns2i
+qr1hi-p5p6p-0deqe6nuuyqns2i
+</code></pre>
+</notextile>
diff --git a/doc/_includes/_what_is_cwl.liquid b/doc/_includes/_what_is_cwl.liquid
new file mode 100644 (file)
index 0000000..d7b890d
--- /dev/null
@@ -0,0 +1 @@
+The "Common Workflow Language (CWL)":http://commonwl.org is a multi-vendor open standard for describing analysis tools and workflows that are portable across a variety of platforms.  CWL is the recommended way to develop and run workflows for Arvados.  Arvados supports the "CWL v1.0":http://commonwl.org/v1.0 specification.
index 30069907432097120d9e18ec8054126a1887fbee..ecff47f059e8cc9c7a6ff03aaef7e605d2ba45fc 100644 (file)
@@ -4,13 +4,17 @@ navsection: userguide
 title: Using Common Workflow Language
 ...
 
-The "Common Workflow Language (CWL)":http://commonwl.org is a multi-vendor open standard for describing analysis tools and workflows that are portable across a variety of platforms.  CWL is the recommended way to develop and run workflows for Arvados.  Arvados supports the "CWL v1.0":http://commonwl.org/v1.0 specification.
+{% include 'what_is_cwl' %}
 
 {% include 'tutorial_expectations' %}
 
-h2. Setting up
+h2. Preparing to work with Arvados CWL runner
 
-The @arvados-cwl-runner@ client is installed by default on Arvados shell nodes.  However, if you do not have @arvados-cwl-runner@, you may install it using @pip@:
+h3. arvados-cwl-runner
+
+The @arvados-cwl-runner@ client is installed by default on Arvados shell nodes.
+
+However, if you do not have @arvados-cwl-runner@, you may install it using @pip@:
 
 <notextile>
 <pre><code>~$ <span class="userinput">virtualenv ~/venv</span>
@@ -20,9 +24,11 @@ The @arvados-cwl-runner@ client is installed by default on Arvados shell nodes.
 </code></pre>
 </notextile>
 
-h3. Docker
+h3. Check Docker access
 
-Certain features of @arvados-cwl-runner@ require access to Docker.  You can determine if you have access to Docker by running @docker version@:
+Certain features of @arvados-cwl-runner@ require access to Docker.
+
+You can determine if you have access to Docker by running @docker version@:
 
 <notextile>
 <pre><code>~$ <span class="userinput">docker version</span>
@@ -44,9 +50,9 @@ Server:
 </code></pre>
 </notextile>
 
-If this returns an error, contact the sysadmin of your cluster for assistance.  Alternatively, if you have Docker installed on your local workstation, you may follow the instructions above to install @arvados-cwl-runner@.
+If this returns an error, contact the sysadmin of your cluster for assistance.
 
-h3. Getting the example files
+h3. Get the example files
 
 The tutorial files are located in the documentation section of the Arvados source repository:
 
@@ -72,60 +78,7 @@ If you do not wish to create an account on "https://cloud.curoverse.com":https:/
 
 h2. Submitting a workflow to an Arvados cluster
 
-Use @arvados-cwl-runner@ to submit CWL workflows to Arvados.  After submitting the job, it will wait for the workflow to complete and print out the final result to standard output.  Note that once submitted, the workflow runs entirely on Arvados, so even if you interrupt @arvados-cwl-runner@ or log out, the workflow will continue to run.
-
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner bwa-mem.cwl bwa-mem-input.yml</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-06-30 14:56:36 arvados.arv-run[27002] INFO: Upload local files: "bwa-mem.cwl"
-2016-06-30 14:56:36 arvados.arv-run[27002] INFO: Uploaded to qr1hi-4zz18-h7ljh5u76760ww2
-2016-06-30 14:56:40 arvados.cwl-runner[27002] INFO: Submitted job qr1hi-8i9sb-fm2n3b1w0l6bskg
-2016-06-30 14:56:41 arvados.cwl-runner[27002] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-fm2n3b1w0l6bskg) is Running
-2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-fm2n3b1w0l6bskg) is Complete
-2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
-{
-    "aligned_sam": {
-        "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
-        "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
-        "class": "File",
-        "size": 30738986
-    }
-}
-</code></pre>
-</notextile>
-
-To submit a workflow and exit immediately, use the @--no-wait@ option.  This will print out the uuid of the job that was submitted to standard output.
-
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --no-wait bwa-mem.cwl bwa-mem-input.yml</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-06-30 15:07:52 arvados.arv-run[12480] INFO: Upload local files: "bwa-mem.cwl"
-2016-06-30 15:07:52 arvados.arv-run[12480] INFO: Uploaded to qr1hi-4zz18-eqnfwrow8aysa9q
-2016-06-30 15:07:52 arvados.cwl-runner[12480] INFO: Submitted job qr1hi-8i9sb-fm2n3b1w0l6bskg
-qr1hi-8i9sb-fm2n3b1w0l6bskg
-</code></pre>
-</notextile>
-
-To run a workflow with local control, use @--local@.  This means that the host where you run @arvados-cwl-runner@ will be responsible for submitting jobs. With @--local@, if you interrupt @arvados-cwl-runner@ or log out, the workflow will be terminated.
-
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --local bwa-mem.cwl bwa-mem-input.yml</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-07-01 10:05:19 arvados.cwl-runner[16290] INFO: Pipeline instance qr1hi-d1hrv-92wcu6ldtio74r4
-2016-07-01 10:05:28 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Queued
-2016-07-01 10:05:29 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Running
-2016-07-01 10:05:45 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Complete
-2016-07-01 10:05:46 arvados.cwl-runner[16290] INFO: Overall process status is success
-{
-    "aligned_sam": {
-        "size": 30738986,
-        "path": "keep:15f56bad0aaa7364819bf14ca2a27c63+88/HWI-ST1027_129_D0THKACXX.1_1.sam",
-        "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
-        "class": "File"
-    }
-}
-</code></pre>
-</notextile>
+{% include 'arvados_cwl_runner' %}
 
 h2. Work reuse
 
@@ -145,27 +98,7 @@ h2. Registering a workflow to use in Workbench
 
 Use @--create-workflow@ to register a CWL workflow with Arvados.  This enables you to share workflows with other Arvados users, and run them by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard.
 
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
-2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to qr1hi-4zz18-7e0hedrmkuyoei3
-2016-07-01 12:21:01 arvados.cwl-runner[15796] INFO: Created template qr1hi-p5p6p-rjleou1dwr167v5
-qr1hi-p5p6p-rjleou1dwr167v5
-</code></pre>
-</notextile>
-
-You can provide a partial input file to set default values for the workflow input parameters:
-
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
-2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to qr1hi-4zz18-0f91qkovk4ml18o
-2016-07-01 14:09:50 arvados.cwl-runner[3730] INFO: Created template qr1hi-p5p6p-0deqe6nuuyqns2i
-qr1hi-p5p6p-0deqe6nuuyqns2i
-</code></pre>
-</notextile>
+{% include 'register_cwl_workflow' %}
 
 h2. Making workflows directly executable
 
@@ -226,7 +159,7 @@ arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107,
 
 h2. Developing workflows
 
-For an introduction and and detailed documentation about writing CWL, see the "User Guide":http://commonwl.org/v1.0/UserGuide.html and the "Specification":http://commonwl.org/v1.0 .
+For an introduction and and detailed documentation about writing CWL, see the "CWL User Guide":http://commonwl.org/v1.0/UserGuide.html and the "CWL Specification":http://commonwl.org/v1.0 .
 
 To run on Arvados, a workflow should provide a @DockerRequirement@ in the @hints@ section.
 
index ed0a126a41cf6d07f018ff7fe00cad49d5b32fd8..223f2fe311b82ec76c24c258293d84d8e38c92e9 100644 (file)
@@ -4,6 +4,9 @@ navsection: userguide
 title: "Using arv-copy"
 ...
 
+{% include 'crunch1only_begin' %}
+On those sites, the "copy a pipeline template" feature described below is not available. However, "copy a workflow" feature is not yet implemented.
+{% include 'crunch1only_end' %}
 
 This tutorial describes how to copy Arvados objects from one cluster to another by using @arv-copy@.
 
diff --git a/doc/user/topics/running-workflow-command-line.html.textile.liquid b/doc/user/topics/running-workflow-command-line.html.textile.liquid
new file mode 100644 (file)
index 0000000..f70d3e8
--- /dev/null
@@ -0,0 +1,17 @@
+---
+layout: default
+navsection: userguide
+title: "Running an Arvados workflow"
+...
+
+{% include 'what_is_cwl' %}
+
+{% include 'tutorial_expectations' %}
+
+h2. arvados-cwl-runner
+
+The arvados-cwl-runner tool can be used to submit workflows to Arvados cluster using the command prompt.
+
+The following examples assume that you have prepared to run arvados-cwl-runner tool as explained in the "Using Common Workflow Language":{{site.baseurl}}/user/topics/running-workflow-command-line.html.textile.liquid page.
+
+{% include 'arvados_cwl_runner' %}
diff --git a/doc/user/tutorials/tutorial-pipeline-workbench.html.textile.liquid b/doc/user/tutorials/tutorial-pipeline-workbench.html.textile.liquid
deleted file mode 100644 (file)
index 37a575c..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
----
-layout: default
-navsection: userguide
-title: "Running a pipeline using Workbench"
-...
-
-{% include 'crunch1only_begin' %}
-On those sites, the details will be slightly different and the example pipeline might not be available.
-{% include 'crunch1only_end' %}
-
-A "pipeline" (sometimes called a "workflow" in other systems) is a sequence of steps that apply various programs or tools to transform input data to output data.  Pipelines are the principal means of performing computation with Arvados.  This tutorial demonstrates how to run a single-stage pipeline to take a small data set of paired-end reads from a sample "exome":https://en.wikipedia.org/wiki/Exome in "FASTQ":https://en.wikipedia.org/wiki/FASTQ_format format and align them to "Chromosome 19":https://en.wikipedia.org/wiki/Chromosome_19_%28human%29 using the "bwa mem":http://bio-bwa.sourceforge.net/ tool, producing a "Sequence Alignment/Map (SAM)":https://samtools.github.io/ file.  This tutorial will introduce the following Arvados features:
-
-<div>
-* How to create a new pipeline from an existing template.
-* How to browse and select input data for the pipeline and submit the pipeline to run on the Arvados cluster.
-* How to access your pipeline results.
-</div>
-
-notextile. <div class="spaced-out">
-
-h3. Steps
-
-# Start from the *Workbench Dashboard*.  You can access the Dashboard by clicking on *<i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard* in the upper left corner of any Workbench page.
-# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button.  This will open a dialog box titled *Choose a pipeline to run*.
-# In the search box, type in *Tutorial align using bwa mem*.
-# Select *<i class="fa fa-fw fa-gear"></i> Tutorial align using bwa mem* and click the <span class="btn btn-sm btn-primary" >Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span> button.  This will create a new pipeline in your *Home* project and will open it. You can now supply the inputs for the pipeline.
-# The first input parameter to the pipeline is *"reference_collection" parameter for run-command script in bwa-mem component*.  Click the <span class="btn btn-sm btn-primary">Choose</span> button beneath that header.  This will open a dialog box titled *Choose a dataset for "reference_collection" parameter for run-command script in bwa-mem component*.
-# Open the *Home <span class="caret"></span>* menu and select *All Projects*. Search for and select *<i class="fa fa-fw fa-archive"></i> Tutorial chromosome 19 reference* and click the <span class="btn btn-sm btn-primary" >OK</span> button.
-# Repeat the previous two steps to set the *"sample" parameter for run-command script in bwa-mem component* parameter to *<i class="fa fa-fw fa-archive"></i> Tutorial sample exome*.
-# Click on the <span class="btn btn-sm btn-primary" >Run <i class="fa fa-fw fa-play"></i></span> button.  The page updates to show you that the pipeline has been submitted to run on the Arvados cluster.
-# After the pipeline starts running, you can track the progress by watching log messages from jobs.  This page refreshes automatically.  You will see a <span class="label label-success">complete</span> label when the pipeline completes successfully.
-# Click on the *Output* link to see the results of the job.  This will load a new page listing the output files from this pipeline.  You'll see the output SAM file from the alignment tool under the *Files* tab.
-# Click on the <span class="btn btn-sm btn-info"><i class="fa fa-download"></i></span> download button to the right of the SAM file to download your results.
-
-notextile. </div>
diff --git a/doc/user/tutorials/tutorial-workflow-workbench.html.textile.liquid b/doc/user/tutorials/tutorial-workflow-workbench.html.textile.liquid
new file mode 100644 (file)
index 0000000..445ce75
--- /dev/null
@@ -0,0 +1,27 @@
+---
+layout: default
+navsection: userguide
+title: "Running a workflow using Workbench"
+...
+
+A "workflow" (sometimes called a "pipeline" in other systems) is a sequence of steps that apply various programs or tools to transform input data to output data.  Workflows are the principal means of performing computation with Arvados.  This tutorial demonstrates how to run a single-stage workflow to take a small data set of paired-end reads from a sample "exome":https://en.wikipedia.org/wiki/Exome in "FASTQ":https://en.wikipedia.org/wiki/FASTQ_format format and align them to "Chromosome 19":https://en.wikipedia.org/wiki/Chromosome_19_%28human%29 using the "bwa mem":http://bio-bwa.sourceforge.net/ tool, producing a "Sequence Alignment/Map (SAM)":https://samtools.github.io/ file.  This tutorial will introduce the following Arvados features:
+
+<div>
+* How to create a new process from an existing workflow.
+* How to browse and select input data for the workflow and submit the process to run on the Arvados cluster.
+* How to access your process results.
+</div>
+
+h3. Steps
+
+# Start from the *Workbench Dashboard*.  You can access the Dashboard by clicking on *<i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard* in the upper left corner of any Workbench page.
+# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button.  This will open a dialog box titled *Choose a pipeline or workflow to run*.
+# In the search box, type in *Tutorial bwa mem cwl*.
+# Select *<i class="fa fa-fw fa-gear"></i> Tutorial bwa mem cwl* and click the <span class="btn btn-sm btn-primary" >Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span> button.  This will create a new process in your *Home* project and will open it. You can now supply the inputs for the process. Please note that all required inputs are populated with default values and you can change them if you prefer.
+# For example, let's see how to change *"reference" parameter* for this workflow. Click the <span class="btn btn-sm btn-primary">Choose</span> button beneath the *"reference" parameter* header.  This will open a dialog box titled *Choose a dataset for "reference" parameter for cwl-runner in bwa-mem.cwl component*.
+# Open the *Home <span class="caret"></span>* menu and select *All Projects*. Search for and select *<i class="fa fa-fw fa-archive"></i> Tutorial chromosome 19 reference*. You will then see a list of files. Select *<i class="fa fa-fw fa-file"></i> 19-fasta.bwt* and click the <span class="btn btn-sm btn-primary" >OK</span> button.
+# Repeat the previous two steps to set the *"read_p1" parameter for cwl-runner script in bwa-mem.cwl component* and *"read_p2" parameter for cwl-runner script in bwa-mem.cwl component* parameters.
+# Click on the <span class="btn btn-sm btn-primary" >Run <i class="fa fa-fw fa-play"></i></span> button.  The page updates to show you that the process has been submitted to run on the Arvados cluster.
+# After the process starts running, you can track the progress by watching log messages from the component(s).  This page refreshes automatically.  You will see a <span class="label label-success">complete</span> label when the process completes successfully.
+# Click on the *Output* link to see the results of the process.  This will load a new page listing the output files from this process.  You'll see the output SAM file from the alignment tool under the *Files* tab.
+# Click on the <span class="btn btn-sm btn-info"><i class="fa fa-download"></i></span> download button to the right of the SAM file to download your results.
diff --git a/doc/user/tutorials/writing-cwl-workflow.html.textile.liquid b/doc/user/tutorials/writing-cwl-workflow.html.textile.liquid
new file mode 100644 (file)
index 0000000..cd282c8
--- /dev/null
@@ -0,0 +1,28 @@
+---
+layout: default
+navsection: userguide
+title: "Writing a CWL workflow"
+...
+
+{% include 'what_is_cwl' %}
+
+{% include 'tutorial_expectations' %}
+
+h2. Registering a CWL workflow
+
+Use @--create-workflow@ to register a CWL workflow with Arvados.
+
+The following examples assume that you have prepared to run arvados-cwl-runner tool as explained in the "Using Common Workflow Language":{{site.baseurl}}/user/topics/running-workflow-command-line.html.textile.liquid page.
+
+{% include 'register_cwl_workflow' %}
+
+h2. Running a CWL workflow
+
+h3. Running a workflow at command prompt
+
+Not yet implemented
+
+h3. Running a workflow using Workbench
+
+The workflow can also be executed using Workbench. Go to the Workbench Dashboard and click the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button and select the desired workflow.
+
index 358743608b1f7f5e796e5d3f3d90e3c9dc6f8cb6..40c9cf325cf11f11d077b6783f0b6fd46b47c74d 100755 (executable)
@@ -864,9 +864,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
         ."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
         ."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
         .q{&& declare -a VOLUMES=() }
-        .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner") ; fi }
-        .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt") ; }
-        .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt") ; fi };
+        .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
+        .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
+        .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
 
     $command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
     $ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
index d0224aedb01e354408f5a710eb88dff344314a90..04f454369cc6541be477fe3f585c637f45c7aee9 100644 (file)
@@ -20,6 +20,14 @@ class TestArvKeepGet < Minitest::Test
     assert_match /^usage:/, err
   end
 
+  def test_get_version
+    out, err = capture_subprocess_io do
+      assert_arv_get '--version'
+    end
+    assert_empty(out, "STDOUT not expected: '#{out}'")
+    assert_match(/[0-9]+\.[0-9]+\.[0-9]+/, err, "Version information incorrect: '#{err}'")
+  end
+
   def test_help
     out, err = capture_subprocess_io do
       assert_arv_get '-h'
index cacb7b81a0321e9f6da0726fffc04f8c16bee0eb..31272c825ea0f14874a6a13609c9df746ecc8871 100644 (file)
@@ -319,7 +319,8 @@ class ArvCwlRunner(object):
                 tmpl = RunnerTemplate(self, tool, job_order,
                                       kwargs.get("enable_reuse"),
                                       uuid=existing_uuid,
-                                      submit_runner_ram=kwargs.get("submit_runner_ram"))
+                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                      name=kwargs.get("name"))
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return tmpl.uuid
@@ -327,7 +328,8 @@ class ArvCwlRunner(object):
                 return upload_workflow(self, tool, job_order,
                                        self.project_uuid,
                                        uuid=existing_uuid,
-                                       submit_runner_ram=kwargs.get("submit_runner_ram"))
+                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                       name=kwargs.get("name"))
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -359,17 +361,19 @@ class ArvCwlRunner(object):
                                          **kwargs).next()
                 else:
                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
-                                                self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
+                                                self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                                name=kwargs.get("name"))
             else:
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
-                                      self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
+                                      self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                      name=kwargs.get("name"))
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
                     "owner_uuid": self.project_uuid,
-                    "name": shortname(tool.tool["id"]),
+                    "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
@@ -542,6 +546,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
                         default=1024)
 
+    parser.add_argument("--name", type=str,
+                        help="Name to use for workflow execution instance.",
+                        default=None)
+
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
@@ -565,6 +573,10 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     job_order_object = None
     arvargs = parser.parse_args(args)
 
+    if arvargs.version:
+        print versionstring()
+        return
+
     if arvargs.update_workflow:
         if arvargs.update_workflow.find('-7fd4e-') == 5:
             want_api = 'containers'
index 93f84f8fd173e1ffa3fd41b1632396e3148ab178..e34145a143558bcedb64bd19b9cf1f5955e21640 100644 (file)
@@ -281,7 +281,7 @@ class RunnerJob(Runner):
         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
             body={
                 "owner_uuid": self.arvrunner.project_uuid,
-                "name": shortname(self.tool.tool["id"]),
+                "name": self.name,
                 "components": {"cwl-runner": job_spec },
                 "state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
         logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
@@ -309,7 +309,8 @@ class RunnerTemplate(object):
         'string': 'text',
     }
 
-    def __init__(self, runner, tool, job_order, enable_reuse, uuid, submit_runner_ram=0):
+    def __init__(self, runner, tool, job_order, enable_reuse, uuid,
+                 submit_runner_ram=0, name=None):
         self.runner = runner
         self.tool = tool
         self.job = RunnerJob(
@@ -319,7 +320,8 @@ class RunnerTemplate(object):
             enable_reuse=enable_reuse,
             output_name=None,
             output_tags=None,
-            submit_runner_ram=submit_runner_ram)
+            submit_runner_ram=submit_runner_ram,
+            name=name)
         self.uuid = uuid
 
     def pipeline_component_spec(self):
index a9c3bd5b40a63eed3708c3f73327afc0cd16dcc7..703bb47d8e85cd24b38db6f943f6a1306da0e140 100644 (file)
@@ -18,7 +18,8 @@ from .perf import Perf
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, submit_runner_ram=0):
+def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+                    submit_runner_ram=0, name=None):
     upload_docker(arvRunner, tool)
 
     document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
@@ -33,7 +34,9 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, submit_
         if sn in job_order:
             inp["default"] = job_order[sn]
 
-    name = os.path.basename(tool.tool["id"])
+    if not name:
+        name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
+
     upload_dependencies(arvRunner, name, document_loader,
                         packed, uri, False)
 
@@ -41,7 +44,7 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, submit_
 
     body = {
         "workflow": {
-            "name": tool.tool.get("label", name),
+            "name": name,
             "description": tool.tool.get("doc", ""),
             "definition":yaml.safe_dump(packed)
         }}
index d7858cf5e0039fe12fd413bff9812a0342a99cd8..2d13e6640b1c57b3ccfb22c9c3c9bac79b6ecda8 100644 (file)
@@ -162,7 +162,8 @@ def arvados_jobs_image(arvrunner):
 
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse,
-                 output_name, output_tags, submit_runner_ram=0):
+                 output_name, output_tags, submit_runner_ram=0,
+                 name=None):
         self.arvrunner = runner
         self.tool = tool
         self.job_order = job_order
@@ -172,6 +173,8 @@ class Runner(object):
         self.final_output = None
         self.output_name = output_name
         self.output_tags = output_tags
+        self.name = name
+
         if submit_runner_ram:
             self.submit_runner_ram = submit_runner_ram
         else:
@@ -184,7 +187,8 @@ class Runner(object):
         pass
 
     def arvados_job_spec(self, *args, **kwargs):
-        self.name = os.path.basename(self.tool.tool["id"])
+        if self.name is None:
+            self.name = os.path.basename(self.tool.tool["id"])
         workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
         adjustDirObjs(self.job_order, trim_listing)
         return workflowmapper
index 7751644f51795e60271759287290381919f3dc4d..7a8ec3a0dd3c51743720076f67c230d6af4706c8 100644 (file)
@@ -49,7 +49,8 @@ setup(name='arvados-cwl-runner',
       # when updating the cwltool version pin.
       install_requires=[
           'cwltool==1.0.20161128202906',
-          'arvados-python-client>=0.1.20160826210445'
+          'arvados-python-client>=0.1.20160826210445',
+          'setuptools'
       ],
       data_files=[
           ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
index dea04b3222ea13186f38c9c309ca0bf6fe9ba893..d917aef57a8ea2378b43193e7ff01f066a73f1d2 100644 (file)
@@ -325,6 +325,25 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_pipeline_uuid + '\n')
 
+
+    @mock.patch("time.sleep")
+    @stubs
+    def test_submit_pipeline_name(self, stubs, tm):
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--debug", "--name=hello job 123",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.expect_pipeline_instance["name"] = "hello job 123"
+
+        expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+        stubs.api.pipeline_instances().create.assert_called_with(
+            body=expect_pipeline)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_pipeline_uuid + '\n')
+
     @mock.patch("time.sleep")
     @stubs
     def test_submit_output_tags(self, stubs, tm):
@@ -489,6 +508,26 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
+    @stubs
+    def test_submit_container_name(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--name=hello container 123",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        stubs.expect_container_spec["name"] = "hello container 123"
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        stubs.api.container_requests().create.assert_called_with(
+            body=expect_container)
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
     @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
     @mock.patch("cwltool.docker.get_image")
     @mock.patch("arvados.api")
@@ -533,23 +572,9 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner))
 
 class TestCreateTemplate(unittest.TestCase):
-    @stubs
-    def test_create(self, stubs):
-        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
-        capture_stdout = cStringIO.StringIO()
-
-        exited = arvados_cwl.main(
-            ["--create-workflow", "--debug",
-             "--project-uuid", project_uuid,
-             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
-            capture_stdout, sys.stderr, api_client=stubs.api)
-        self.assertEqual(exited, 0)
-
-        stubs.api.pipeline_instances().create.refute_called()
-        stubs.api.jobs().create.refute_called()
+    existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
 
-        expect_component = copy.deepcopy(stubs.expect_job_spec)
+    def _adjust_script_params(self, expect_component):
         expect_component['script_parameters']['x'] = {
             'dataclass': 'File',
             'required': True,
@@ -567,6 +592,26 @@ class TestCreateTemplate(unittest.TestCase):
             'required': True,
             'type': 'Directory',
         }
+
+    @stubs
+    def test_create(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+        capture_stdout = cStringIO.StringIO()
+
+        exited = arvados_cwl.main(
+            ["--create-workflow", "--debug",
+             "--api=jobs",
+             "--project-uuid", project_uuid,
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.api.pipeline_instances().create.refute_called()
+        stubs.api.jobs().create.refute_called()
+
+        expect_component = copy.deepcopy(stubs.expect_job_spec)
+        self._adjust_script_params(expect_component)
         expect_template = {
             "components": {
                 "submit_wf.cwl": expect_component,
@@ -581,6 +626,76 @@ class TestCreateTemplate(unittest.TestCase):
                          stubs.expect_pipeline_template_uuid + '\n')
 
 
+    @stubs
+    def test_create_name(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+        capture_stdout = cStringIO.StringIO()
+
+        exited = arvados_cwl.main(
+            ["--create-workflow", "--debug",
+             "--project-uuid", project_uuid,
+             "--api=jobs",
+             "--name", "testing 123",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.api.pipeline_instances().create.refute_called()
+        stubs.api.jobs().create.refute_called()
+
+        expect_component = copy.deepcopy(stubs.expect_job_spec)
+        self._adjust_script_params(expect_component)
+        expect_template = {
+            "components": {
+                "testing 123": expect_component,
+            },
+            "name": "testing 123",
+            "owner_uuid": project_uuid,
+        }
+        stubs.api.pipeline_templates().create.assert_called_with(
+            body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
+
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_pipeline_template_uuid + '\n')
+
+
+    @stubs
+    def test_update_name(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+        capture_stdout = cStringIO.StringIO()
+
+        exited = arvados_cwl.main(
+            ["--update-workflow", self.existing_template_uuid,
+             "--debug",
+             "--project-uuid", project_uuid,
+             "--api=jobs",
+             "--name", "testing 123",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.api.pipeline_instances().create.refute_called()
+        stubs.api.jobs().create.refute_called()
+
+        expect_component = copy.deepcopy(stubs.expect_job_spec)
+        self._adjust_script_params(expect_component)
+        expect_template = {
+            "components": {
+                "testing 123": expect_component,
+            },
+            "name": "testing 123",
+            "owner_uuid": project_uuid,
+        }
+        stubs.api.pipeline_templates().create.refute_called()
+        stubs.api.pipeline_templates().update.assert_called_with(
+            body=JsonDiffMatcher(expect_template), uuid=self.existing_template_uuid)
+
+        self.assertEqual(capture_stdout.getvalue(),
+                         self.existing_template_uuid + '\n')
+
+
 class TestCreateWorkflow(unittest.TestCase):
     existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
     expect_workflow = open("tests/wf/expect_packed.cwl").read()
@@ -616,6 +731,39 @@ class TestCreateWorkflow(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_workflow_uuid + '\n')
 
+
+    @stubs
+    def test_create_name(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+        capture_stdout = cStringIO.StringIO()
+
+        exited = arvados_cwl.main(
+            ["--create-workflow", "--debug",
+             "--api=containers",
+             "--project-uuid", project_uuid,
+             "--name", "testing 123",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        stubs.api.pipeline_templates().create.refute_called()
+        stubs.api.container_requests().create.refute_called()
+
+        body = {
+            "workflow": {
+                "owner_uuid": project_uuid,
+                "name": "testing 123",
+                "description": "",
+                "definition": self.expect_workflow,
+            }
+        }
+        stubs.api.workflows().create.assert_called_with(
+            body=JsonDiffMatcher(body))
+
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_workflow_uuid + '\n')
+
     @stubs
     def test_incompatible_api(self, stubs):
         capture_stderr = cStringIO.StringIO()
@@ -658,6 +806,31 @@ class TestCreateWorkflow(unittest.TestCase):
                          self.existing_workflow_uuid + '\n')
 
 
+    @stubs
+    def test_update_name(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+
+        exited = arvados_cwl.main(
+            ["--update-workflow", self.existing_workflow_uuid,
+             "--debug", "--name", "testing 123",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        body = {
+            "workflow": {
+                "name": "testing 123",
+                "description": "",
+                "definition": self.expect_workflow,
+            }
+        }
+        stubs.api.workflows().update.assert_called_with(
+            uuid=self.existing_workflow_uuid,
+            body=JsonDiffMatcher(body))
+        self.assertEqual(capture_stdout.getvalue(),
+                         self.existing_workflow_uuid + '\n')
+
+
 class TestTemplateInputs(unittest.TestCase):
     expect_template = {
         "components": {
@@ -709,7 +882,7 @@ class TestTemplateInputs(unittest.TestCase):
     @stubs
     def test_inputs_empty(self, stubs):
         exited = arvados_cwl.main(
-            ["--create-template", "--no-wait",
+            ["--create-template",
              "tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
             cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
@@ -720,7 +893,7 @@ class TestTemplateInputs(unittest.TestCase):
     @stubs
     def test_inputs(self, stubs):
         exited = arvados_cwl.main(
-            ["--create-template", "--no-wait",
+            ["--create-template",
              "tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
             cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go
deleted file mode 100644 (file)
index 6dd7fb3..0000000
+++ /dev/null
@@ -1,204 +0,0 @@
-// Logger periodically writes a log to the Arvados SDK.
-//
-// This package is useful for maintaining a log object that is updated
-// over time. This log object will be periodically written to the log,
-// as specified by WriteInterval in the Params.
-//
-// This package is safe for concurrent use as long as:
-// The maps passed to a LogMutator are not accessed outside of the
-// LogMutator
-//
-// Usage:
-// arvLogger := logger.NewLogger(params)
-// arvLogger.Update(func(properties map[string]interface{},
-//     entry map[string]interface{}) {
-//   // Modifiy properties and entry however you want
-//   // properties is a shortcut for entry["properties"].(map[string]interface{})
-//   // properties can take any (valid) values you want to give it,
-//   // entry will only take the fields listed at
-//   // http://doc.arvados.org/api/schema/Log.html
-//   // Valid values for properties are anything that can be json
-//   // encoded (i.e. will not error if you call json.Marshal() on it.
-// })
-package logger
-
-import (
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "log"
-       "time"
-)
-
-const (
-       startSuffix              = "-start"
-       partialSuffix            = "-partial"
-       finalSuffix              = "-final"
-       numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
-)
-
-type LoggerParams struct {
-       Client          *arvadosclient.ArvadosClient // The client we use to write log entries
-       EventTypePrefix string                       // The prefix we use for the event type in the log entry
-       WriteInterval   time.Duration                // Wait at least this long between log writes
-}
-
-// A LogMutator is a function which modifies the log entry.
-// It takes two maps as arguments, properties is the first and entry
-// is the second
-// properties is a shortcut for entry["properties"].(map[string]interface{})
-// properties can take any values you want to give it.
-// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
-// properties and entry are only safe to access inside the LogMutator,
-// they should not be stored anywhere, otherwise you'll risk
-// concurrent access.
-type LogMutator func(map[string]interface{}, map[string]interface{})
-
-// A Logger is used to build up a log entry over time and write every
-// version of it.
-type Logger struct {
-       // The data we write
-       data       map[string]interface{} // The entire map that we give to the api
-       entry      map[string]interface{} // Convenience shortcut into data
-       properties map[string]interface{} // Convenience shortcut into data
-
-       params LoggerParams // Parameters we were given
-
-       // Variables to coordinate updating and writing.
-       modified    bool            // Has this data been modified since the last write?
-       workToDo    chan LogMutator // Work to do in the worker thread.
-       writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
-       hasWritten  bool            // Whether we've written at all yet.
-       noMoreWork  chan bool       // Signals that we're done writing.
-
-       writeHooks []LogMutator // Mutators we call before each write.
-}
-
-// Create a new logger based on the specified parameters.
-func NewLogger(params LoggerParams) (l *Logger, err error) {
-       // sanity check parameters
-       if &params.Client == nil {
-               err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()")
-               return
-       }
-       if params.EventTypePrefix == "" {
-               err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()")
-               return
-       }
-
-       l = &Logger{
-               data:        make(map[string]interface{}),
-               entry:       make(map[string]interface{}),
-               properties:  make(map[string]interface{}),
-               params:      params,
-               workToDo:    make(chan LogMutator, 10),
-               writeTicker: time.NewTicker(params.WriteInterval),
-               noMoreWork:  make(chan bool, numberNoMoreWorkMessages)}
-
-       l.data["log"] = l.entry
-       l.entry["properties"] = l.properties
-
-       // Start the worker goroutine.
-       go l.work()
-
-       return l, nil
-}
-
-// Exported functions will be called from other goroutines, therefore
-// all they are allowed to do is enqueue work to be done in the worker
-// goroutine.
-
-// Enqueues an update. This will happen in another goroutine after
-// this method returns.
-func (l *Logger) Update(mutator LogMutator) {
-       l.workToDo <- mutator
-}
-
-// Similar to Update(), but writes the log entry as soon as possible
-// (ignoring MinimumWriteInterval) and blocks until the entry has been
-// written. This is useful if you know that you're about to quit
-// (e.g. if you discovered a fatal error, or you're finished), since
-// go will not wait for timers (including the pending write timer) to
-// go off before exiting.
-func (l *Logger) FinalUpdate(mutator LogMutator) {
-       // TODO(misha): Consider not accepting any future updates somehow,
-       // since they won't get written if they come in after this.
-
-       // Stop the periodic write ticker. We'll perform the final write
-       // before returning from this function.
-       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
-               l.writeTicker.Stop()
-       }
-
-       // Apply the final update
-       l.workToDo <- mutator
-
-       // Perform the final write and signal that we can return.
-       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
-               l.write(true)
-               for i := 0; i < numberNoMoreWorkMessages; {
-                       l.noMoreWork <- true
-               }
-       }
-
-       // Wait until we've performed the write.
-       <-l.noMoreWork
-}
-
-// Adds a hook which will be called every time this logger writes an entry.
-func (l *Logger) AddWriteHook(hook LogMutator) {
-       // We do the work in a LogMutator so that it happens in the worker
-       // goroutine.
-       l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
-               l.writeHooks = append(l.writeHooks, hook)
-       }
-}
-
-// The worker loop
-func (l *Logger) work() {
-       for {
-               select {
-               case <-l.writeTicker.C:
-                       if l.modified {
-                               l.write(false)
-                               l.modified = false
-                       }
-               case mutator := <-l.workToDo:
-                       mutator(l.properties, l.entry)
-                       l.modified = true
-               case <-l.noMoreWork:
-                       return
-               }
-       }
-}
-
-// Actually writes the log entry.
-func (l *Logger) write(isFinal bool) {
-
-       // Run all our hooks
-       for _, hook := range l.writeHooks {
-               hook(l.properties, l.entry)
-       }
-
-       // Update the event type.
-       if isFinal {
-               l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
-       } else if l.hasWritten {
-               l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
-       } else {
-               l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
-       }
-       l.hasWritten = true
-
-       // Write the log entry.
-       // This is a network write and will take a while, which is bad
-       // because we're blocking all the other work on this goroutine.
-       //
-       // TODO(misha): Consider rewriting this so that we can encode l.data
-       // into a string, and then perform the actual write in another
-       // routine. This will be tricky and will require support in the
-       // client.
-       err := l.params.Client.Create("logs", l.data, nil)
-       if err != nil {
-               log.Printf("Received error writing %v: %v", l.data, err)
-       }
-}
diff --git a/sdk/go/logger/util.go b/sdk/go/logger/util.go
deleted file mode 100644 (file)
index 6425aca..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-// Helper methods for interacting with Logger.
-package logger
-
-// Retrieves the map[string]interface{} stored at parent[key] if it
-// exists, otherwise it makes it and stores it there.
-// This is useful for logger because you may not know if a map you
-// need has already been created.
-func GetOrCreateMap(
-       parent map[string]interface{},
-       key string) (child map[string]interface{}) {
-       read, exists := parent[key]
-       if exists {
-               child = read.(map[string]interface{})
-
-       } else {
-               child = make(map[string]interface{})
-               parent[key] = child
-       }
-       return
-}
diff --git a/sdk/go/util/util.go b/sdk/go/util/util.go
deleted file mode 100644 (file)
index ac510de..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-/* Helper methods for dealing with responses from API Server. */
-
-package util
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-)
-
-func UserIsAdmin(arv *arvadosclient.ArvadosClient) (is_admin bool, err error) {
-       type user struct {
-               IsAdmin bool `json:"is_admin"`
-       }
-       var u user
-       err = arv.Call("GET", "users", "", "current", nil, &u)
-       return u.IsAdmin, err
-}
-
-// Returns the total count of a particular type of resource
-//
-//   resource - the arvados resource to count
-// return
-//   count - the number of items of type resource the api server reports, if no error
-//   err - error accessing the resource, or nil if no error
-func NumberItemsAvailable(client *arvadosclient.ArvadosClient, resource string) (count int, err error) {
-       var response struct {
-               ItemsAvailable int `json:"items_available"`
-       }
-       sdkParams := arvadosclient.Dict{"limit": 0}
-       err = client.List(resource, sdkParams, &response)
-       if err == nil {
-               count = response.ItemsAvailable
-       }
-       return
-}
diff --git a/sdk/python/arvados/_version.py b/sdk/python/arvados/_version.py
new file mode 100644 (file)
index 0000000..d823afc
--- /dev/null
@@ -0,0 +1,3 @@
+import pkg_resources
+
+__version__ = pkg_resources.require('arvados-python-client')[0].version
index badbd668d951c46dd882b2468940463a17610728..1f72635406e4ecf62f7fa1245334cce96ae7b215 100755 (executable)
@@ -35,6 +35,7 @@ import arvados.commands._util as arv_cmd
 import arvados.commands.keepdocker
 
 from arvados.api import OrderedJsonModel
+from arvados._version import __version__
 
 COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
 
@@ -61,6 +62,9 @@ src_owner_uuid = None
 def main():
     copy_opts = argparse.ArgumentParser(add_help=False)
 
+    copy_opts.add_argument(
+        '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
+        help='Print version and exit.')
     copy_opts.add_argument(
         '-v', '--verbose', dest='verbose', action='store_true',
         help='Verbose output.')
index 9310f066219ae3063153e4a4393ecba771b7c6ff..3a0b64c38f4f543d5b52c8f96f4b38b03a4d741a 100644 (file)
@@ -21,6 +21,8 @@ import arvados.commands._util as arv_cmd
 import arvados.commands.put as arv_put
 import ciso8601
 
+from arvados._version import __version__
+
 EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
 STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
 
@@ -28,6 +30,9 @@ DockerImage = collections.namedtuple(
     'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
 
 keepdocker_parser = argparse.ArgumentParser(add_help=False)
+keepdocker_parser.add_argument(
+    '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
+    help='Print version and exit.')
 keepdocker_parser.add_argument(
     '-f', '--force', action='store_true', default=False,
     help="Re-upload the image even if it already exists on the server")
index e87244d7d12426d060bcb8a724445406448eb160..a2f2e542754f7e2e44edbd5673cf36d2c5d130af 100755 (executable)
@@ -3,10 +3,13 @@
 from __future__ import print_function
 
 import argparse
+import sys
 
 import arvados
 import arvados.commands._util as arv_cmd
 
+from arvados._version import __version__
+
 def parse_args(args):
     parser = argparse.ArgumentParser(
         description='List contents of a manifest',
@@ -16,6 +19,9 @@ def parse_args(args):
                         help="""Collection UUID or locator""")
     parser.add_argument('-s', action='store_true',
                         help="""List file sizes, in KiB.""")
+    parser.add_argument('--version', action='version',
+                        version="%s %s" % (sys.argv[0], __version__),
+                        help='Print version and exit.')
 
     return parser.parse_args(args)
 
index 34cef6725500e6cb5cd12ec317aa1be8eeb4bd80..e3b41b26d370abbd37210ae477ea25002b66c781 100644 (file)
@@ -23,6 +23,7 @@ import threading
 import copy
 import logging
 from apiclient import errors as apiclient_errors
+from arvados._version import __version__
 
 import arvados.commands._util as arv_cmd
 
@@ -31,6 +32,9 @@ api_client = None
 
 upload_opts = argparse.ArgumentParser(add_help=False)
 
+upload_opts.add_argument('--version', action='version',
+                         version="%s %s" % (sys.argv[0], __version__),
+                         help='Print version and exit.')
 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
                          help="""
 Local file or directory. Default: read from standard input.
index 54df452394e47bc7b44437bf580a3af2dc17b36e..8403327b44342befc23b7e1a3650213378f70b86 100644 (file)
@@ -11,22 +11,38 @@ import put
 import time
 import subprocess
 import logging
+import sys
 import arvados.commands._util as arv_cmd
 
+from arvados._version import __version__
+
 logger = logging.getLogger('arvados.arv-run')
 logger.setLevel(logging.INFO)
 
 arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
-arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
-arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-run-pipeline-instance")
-arvrun_parser.add_argument('--docker-image', type=str, help="Docker image to use, otherwise use instance default.")
-arvrun_parser.add_argument('--ignore-rcode', action="store_true", help="Commands that return non-zero return codes should not be considered failed.")
-arvrun_parser.add_argument('--no-reuse', action="store_true", help="Do not reuse past jobs.")
-arvrun_parser.add_argument('--no-wait', action="store_true", help="Do not wait and display logs after submitting command, just exit.")
-arvrun_parser.add_argument('--project-uuid', type=str, help="Parent project of the pipeline")
-arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git repository passed to arv-crunch-job when using --local")
-arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of component, default 'arvados'")
-arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of component, default 'master'")
+arvrun_parser.add_argument('--dry-run', action="store_true",
+                           help="Print out the pipeline that would be submitted and exit")
+arvrun_parser.add_argument('--local', action="store_true",
+                           help="Run locally using arv-run-pipeline-instance")
+arvrun_parser.add_argument('--docker-image', type=str,
+                           help="Docker image to use, otherwise use instance default.")
+arvrun_parser.add_argument('--ignore-rcode', action="store_true",
+                           help="Commands that return non-zero return codes should not be considered failed.")
+arvrun_parser.add_argument('--no-reuse', action="store_true",
+                           help="Do not reuse past jobs.")
+arvrun_parser.add_argument('--no-wait', action="store_true",
+                           help="Do not wait and display logs after submitting command, just exit.")
+arvrun_parser.add_argument('--project-uuid', type=str,
+                           help="Parent project of the pipeline")
+arvrun_parser.add_argument('--git-dir', type=str, default="",
+                           help="Git repository passed to arv-crunch-job when using --local")
+arvrun_parser.add_argument('--repository', type=str, default="arvados",
+                           help="repository field of component, default 'arvados'")
+arvrun_parser.add_argument('--script-version', type=str, default="master",
+                           help="script_version field of component, default 'master'")
+arvrun_parser.add_argument('--version', action='version',
+                           version="%s %s" % (sys.argv[0], __version__),
+                           help='Print version and exit.')
 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
 
 class ArvFile(object):
index f6dee177d9a6b1e5a69e44d1edefd396280f0ed7..72ef1befed85ffd4d8b883270ebefa0a3bcd3dac 100644 (file)
@@ -6,12 +6,16 @@ import argparse
 import arvados
 import json
 from arvados.events import subscribe
+from arvados._version import __version__
 import signal
 
 def main(arguments=None):
     logger = logging.getLogger('arvados.arv-ws')
 
     parser = argparse.ArgumentParser()
+    parser.add_argument('--version', action='version',
+                        version="%s %s" % (sys.argv[0], __version__),
+                        help='Print version and exit.')
     parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
     parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
     parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
index 60d4bec3b95c429643d7df4a600f72754954809a..f91b3977090da7c6f8b30844635174d122e67ba2 100755 (executable)
@@ -11,6 +11,8 @@ import logging
 import arvados
 import arvados.commands._util as arv_cmd
 
+from arvados._version import __version__
+
 logger = logging.getLogger('arvados.arv-get')
 
 def abort(msg, code=1):
@@ -20,6 +22,9 @@ def abort(msg, code=1):
 parser = argparse.ArgumentParser(
     description='Copy data from Keep to a local file or pipe.',
     parents=[arv_cmd.retry_opt])
+parser.add_argument('--version', action='version',
+                    version="%s %s" % (sys.argv[0], __version__),
+                    help='Print version and exit.')
 parser.add_argument('locator', type=str,
                     help="""
 Collection locator, optionally with a file path or prefix.
index b059d79459278e4859cdd7221183f2b863a3e73b..05a055e10855066588a707d539b2f250ea527be6 100755 (executable)
@@ -7,16 +7,22 @@ import re
 import string
 import sys
 
+import arvados
+from arvados._version import __version__
+
 parser = argparse.ArgumentParser(
     description='Read manifest on standard input and put normalized manifest on standard output.')
 
-parser.add_argument('--extract', type=str, help="The file to extract from the input manifest")
-parser.add_argument('--strip', action='store_true', help="Strip authorization tokens")
+parser.add_argument('--extract', type=str,
+                    help="The file to extract from the input manifest")
+parser.add_argument('--strip', action='store_true',
+                    help="Strip authorization tokens")
+parser.add_argument('--version', action='version',
+                    version="%s %s" % (sys.argv[0], __version__),
+                    help='Print version and exit.')
 
 args = parser.parse_args()
 
-import arvados
-
 r = sys.stdin.read()
 
 cr = arvados.CollectionReader(r)
index e0aae9625eb54d82eb4ee983696487079fa0d441..9d7d2481fdbb69c1635f932e7171663be9739e14 100644 (file)
@@ -51,6 +51,7 @@ setup(name='arvados-python-client',
           'httplib2',
           'pycurl >=7.19.5.1, <7.21.5',
           'python-gflags<3.0',
+          'setuptools',
           'ws4py'
       ],
       test_suite='tests',
index 71c9b178e7525808508babf86a383a37b4ab4ba6..dae3dd3b7b19c923ff53381e9f3ebef8c5abae49 100644 (file)
@@ -1,6 +1,7 @@
 #!/usr/bin/env python
 
 import arvados
+import contextlib
 import errno
 import hashlib
 import httplib
@@ -11,6 +12,7 @@ import os
 import pycurl
 import Queue
 import shutil
+import sys
 import tempfile
 import unittest
 
@@ -50,6 +52,17 @@ def mock_api_responses(api_client, body, codes, headers={}):
 def str_keep_locator(s):
     return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
 
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+    orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+    orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+    try:
+        yield
+    finally:
+        sys.stdout = orig_stdout
+        sys.stderr = orig_stderr
+
+
 class FakeCurl:
     @classmethod
     def make(cls, code, body='', headers={}):
diff --git a/sdk/python/tests/test_arv_copy.py b/sdk/python/tests/test_arv_copy.py
new file mode 100644 (file)
index 0000000..e291ee0
--- /dev/null
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import os
+import sys
+import tempfile
+import unittest
+
+import arvados.commands.arv_copy as arv_copy
+import arvados_testutil as tutil
+
+class ArvCopyTestCase(unittest.TestCase):
+    def run_copy(self, args):
+        sys.argv = ['arv-copy'] + args
+        return arv_copy.main()
+
+    def test_unsupported_arg(self):
+        with self.assertRaises(SystemExit):
+            self.run_copy(['-x=unknown'])
+
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with tutil.redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.run_copy(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
diff --git a/sdk/python/tests/test_arv_keepdocker.py b/sdk/python/tests/test_arv_keepdocker.py
new file mode 100644 (file)
index 0000000..bb94db5
--- /dev/null
@@ -0,0 +1,30 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import os
+import sys
+import tempfile
+import unittest
+
+import arvados.commands.keepdocker as arv_keepdocker
+import arvados_testutil as tutil
+
+
+class ArvKeepdockerTestCase(unittest.TestCase):
+    def run_arv_keepdocker(self, args):
+        sys.argv = ['arv-keepdocker'] + args
+        return arv_keepdocker.main()
+
+    def test_unsupported_arg(self):
+        with self.assertRaises(SystemExit):
+            self.run_arv_keepdocker(['-x=unknown'])
+
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with tutil.redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.run_arv_keepdocker(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
index 664b57fc00a57cef068e352232d55d0dfa548a58..5064f07d722ee77efc0c8a4f733eaf86d02b8b39 100644 (file)
@@ -2,15 +2,17 @@
 # -*- coding: utf-8 -*-
 
 import io
+import os
 import random
-
+import sys
 import mock
+import tempfile
 
 import arvados.errors as arv_error
 import arvados.commands.ls as arv_ls
 import run_test_server
 
-from arvados_testutil import str_keep_locator
+from arvados_testutil import str_keep_locator, redirected_streams
 
 class ArvLsTestCase(run_test_server.TestCaseWithServers):
     FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
@@ -78,3 +80,12 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers):
             arv_error.NotFoundError)
         self.assertNotEqual(0, self.run_ls([self.FAKE_UUID], api_client))
         self.assertNotEqual('', self.stderr.getvalue())
+
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.run_ls(['--version'], None)
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
diff --git a/sdk/python/tests/test_arv_normalize.py b/sdk/python/tests/test_arv_normalize.py
new file mode 100644 (file)
index 0000000..8bce7e3
--- /dev/null
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import subprocess
+import sys
+import tempfile
+import unittest
+
+
+class ArvNormalizeTestCase(unittest.TestCase):
+    def run_arv_normalize(self, args=[]):
+        p = subprocess.Popen([sys.executable, 'bin/arv-normalize'] + args,
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE)
+        (stdout, stderr) = p.communicate()
+        return p.returncode, stdout, stderr
+
+    def test_unsupported_arg(self):
+        returncode, out, err = self.run_arv_normalize(['-x=unknown'])
+        self.assertNotEqual(0, returncode)
+
+    def test_version_argument(self):
+        returncode, out, err = self.run_arv_normalize(['--version'])
+        self.assertEqual(0, returncode)
+        self.assertEqual('', out)
+        self.assertNotEqual('', err)
+        self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
index 7a0120c02814d00b27e81dd41fbb50e51ef2855c..f35e4c725c1ebfb74062b44b4c4c4d5477684f9a 100644 (file)
@@ -2,6 +2,7 @@
 # -*- coding: utf-8 -*-
 
 import apiclient
+import io
 import mock
 import os
 import pwd
@@ -408,6 +409,15 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
                 delattr(self, outbuf)
         super(ArvadosPutTest, self).tearDown()
 
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with tutil.redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.call_main_with_args(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
     def test_simple_file_put(self):
         self.call_main_on_test_file()
 
diff --git a/sdk/python/tests/test_arv_run.py b/sdk/python/tests/test_arv_run.py
new file mode 100644 (file)
index 0000000..3d04d27
--- /dev/null
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import os
+import sys
+import tempfile
+import unittest
+
+import arvados.commands.run as arv_run
+import arvados_testutil as tutil
+
+class ArvRunTestCase(unittest.TestCase):
+    def run_arv_run(self, args):
+        sys.argv = ['arv-run'] + args
+        return arv_run.main()
+
+    def test_unsupported_arg(self):
+        with self.assertRaises(SystemExit):
+            self.run_arv_run(['-x=unknown'])
+
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with tutil.redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.run_arv_run(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
index 5a018273a4d0c8aa6b35970cbb151227083e0a47..2a85e04e87c06067bd7d83773295cf049f747852 100644 (file)
@@ -1,8 +1,14 @@
 #!/usr/bin/env python
 
+import io
+import os
+import sys
+import tempfile
 import unittest
+
 import arvados.errors as arv_error
 import arvados.commands.ws as arv_ws
+import arvados_testutil as tutil
 
 class ArvWsTestCase(unittest.TestCase):
     def run_ws(self, args):
@@ -11,3 +17,12 @@ class ArvWsTestCase(unittest.TestCase):
     def test_unsupported_arg(self):
         with self.assertRaises(SystemExit):
             self.run_ws(['-x=unknown'])
+
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with tutil.redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.run_ws(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
index cbd82de9241101a72cc1c263903b9a403a8234fa..49503c971236af12a936085d92084a79e2dfc0ba 100644 (file)
@@ -243,3 +243,25 @@ template_in_asubproject_with_same_name_as_one_in_active_user_home:
         dataclass: Collection
         title: "Foo/bar pair"
         description: "Provide a collection containing at least two files."
+
+workflow_with_input_defaults:
+  uuid: zzzzz-p5p6p-aox0k0ofxrystg2
+  owner_uuid: zzzzz-j7d0g-v955i6s2oi1cbso
+  created_at: 2014-04-14 12:35:04 -0400
+  updated_at: 2014-04-14 12:35:04 -0400
+  modified_at: 2014-04-14 12:35:04 -0400
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  name: Pipeline with default input specifications
+  components:
+    part-one:
+      script: foo
+      script_version: master
+      script_parameters:
+        ex_string:
+          required: true
+          dataclass: string
+        ex_string_def:
+          required: true
+          dataclass: string
+          default: hello-testing-123
\ No newline at end of file
index 4badf9e175f21c1fb521befa6d6c6bbe8024af5b..f79320e907801cae499cdcbf809a4602194656f4 100644 (file)
@@ -44,3 +44,22 @@ workflow_with_input_specifications:
       inputBinding:
         position: 1
     outputs: []
+
+workflow_with_input_defaults:
+  uuid: zzzzz-7fd4e-validwithinput2
+  owner_uuid: zzzzz-j7d0g-zhxawtyetzwc5f0
+  name: Workflow with default input specifications
+  description: this workflow has inputs specified
+  created_at: <%= 1.minute.ago.to_s(:db) %>
+  definition: |
+    cwlVersion: v1.0
+    class: CommandLineTool
+    baseCommand:
+    - echo
+    inputs:
+    - type: string
+      id: ex_string
+    - type: string
+      id: ex_string_def
+      default: hello-testing-123
+    outputs: []
index 3c4f281912842a0ceedb6df409aa61e80fa38fa2..e768b509cd6f2c69bb529d9e9a90e2d923e422ce 100644 (file)
@@ -195,6 +195,7 @@ func submit(dispatcher *dispatch.Dispatcher,
                b, _ := ioutil.ReadAll(stdoutReader)
                stdoutReader.Close()
                stdoutChan <- b
+               close(stdoutChan)
        }()
 
        stderrChan := make(chan []byte)
@@ -202,6 +203,7 @@ func submit(dispatcher *dispatch.Dispatcher,
                b, _ := ioutil.ReadAll(stderrReader)
                stderrReader.Close()
                stderrChan <- b
+               close(stderrChan)
        }()
 
        // Send a tiny script on stdin to execute the crunch-run command
@@ -209,13 +211,10 @@ func submit(dispatcher *dispatch.Dispatcher,
        io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
        stdinWriter.Close()
 
-       err = cmd.Wait()
-
        stdoutMsg := <-stdoutChan
        stderrmsg := <-stderrChan
 
-       close(stdoutChan)
-       close(stderrChan)
+       err = cmd.Wait()
 
        if err != nil {
                submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
@@ -302,12 +301,13 @@ func run(dispatcher *dispatch.Dispatcher,
 
                                // Mutex between squeue sync and running sbatch or scancel.
                                squeueUpdater.SlurmLock.Lock()
-                               err := scancelCmd(container).Run()
+                               cmd := scancelCmd(container)
+                               msg, err := cmd.CombinedOutput()
                                squeueUpdater.SlurmLock.Unlock()
 
                                if err != nil {
-                                       log.Printf("Error stopping container %s with scancel: %v",
-                                               container.UUID, err)
+                                       log.Printf("Error stopping container %s with %v %v: %v %v",
+                                               container.UUID, cmd.Path, cmd.Args, err, string(msg))
                                        if squeueUpdater.CheckSqueue(container.UUID) {
                                                log.Printf("Container %s is still in squeue after scancel.",
                                                        container.UUID)
index fbea48e548a59f78718cb0afa419b5a84a1cd89b..40461031e214486f1dbed9feeda6aae0d97fb76c 100644 (file)
@@ -81,7 +81,8 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
                return exec.Command("echo")
        }
 
-       container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+       container := s.integrationTest(c,
+               func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
                []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -134,7 +135,7 @@ func (s *TestSuite) integrationTest(c *C,
        }(squeueCmd)
        squeueCmd = newSqueueCmd
 
-       // There should be no queued containers now
+       // There should be one queued container
        params := arvadosclient.Dict{
                "filters": [][]string{{"state", "=", "Queued"}},
        }
index 61decde61c4bd61d0a92e96bde20ff0c82780f57..45d06c8c1e27f12f2bc6e83ca262ab2ff7f08a53 100644 (file)
@@ -2,6 +2,8 @@ package main
 
 import (
        "bufio"
+       "io"
+       "io/ioutil"
        "log"
        "os/exec"
        "sync"
@@ -45,31 +47,49 @@ func (squeue *Squeue) RunSqueue() {
                log.Printf("Error creating stdout pipe for squeue: %v", err)
                return
        }
+
+       stderrReader, err := cmd.StderrPipe()
+       if err != nil {
+               log.Printf("Error creating stderr pipe for squeue: %v", err)
+               return
+       }
+
        err = cmd.Start()
        if err != nil {
                log.Printf("Error running squeue: %v", err)
                return
        }
+
+       stderrChan := make(chan []byte)
+       go func() {
+               b, _ := ioutil.ReadAll(stderrReader)
+               stderrChan <- b
+               close(stderrChan)
+       }()
+
        scanner := bufio.NewScanner(sq)
        for scanner.Scan() {
                newSqueueContents = append(newSqueueContents, scanner.Text())
        }
-       if err := scanner.Err(); err != nil {
-               cmd.Wait()
-               log.Printf("Error reading from squeue pipe: %v", err)
-               return
-       }
+       io.Copy(ioutil.Discard, sq)
+
+       stderrmsg := <-stderrChan
 
        err = cmd.Wait()
+
+       if scanner.Err() != nil {
+               log.Printf("Error reading from squeue pipe: %v", err)
+       }
        if err != nil {
-               log.Printf("Error running squeue: %v", err)
-               return
+               log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
        }
 
-       squeue.squeueCond.L.Lock()
-       squeue.squeueContents = newSqueueContents
-       squeue.squeueCond.Broadcast()
-       squeue.squeueCond.L.Unlock()
+       if scanner.Err() == nil && err == nil {
+               squeue.squeueCond.L.Lock()
+               squeue.squeueContents = newSqueueContents
+               squeue.squeueCond.Broadcast()
+               squeue.squeueCond.L.Unlock()
+       }
 }
 
 // CheckSqueue checks if a given container UUID is in the slurm queue.  This
index ade40c6b03a4d4a98812172aab31da5173453c4e..2e475c72e64842b15aa6c7dee88446bc0056b802 100644 (file)
@@ -800,6 +800,7 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr(err)
 
                if runner.finalState == "Queued" {
+                       runner.CrunchLog.Close()
                        runner.UpdateContainerFinal()
                        return
                }
@@ -832,6 +833,7 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
+               runner.finalState = "Cancelled"
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -839,6 +841,7 @@ func (runner *ContainerRunner) Run() (err error) {
        // set up FUSE mount and binds
        err = runner.SetupMounts()
        if err != nil {
+               runner.finalState = "Cancelled"
                err = fmt.Errorf("While setting up mounts: %v", err)
                return
        }
diff --git a/services/datamanager/collection/collection.go b/services/datamanager/collection/collection.go
deleted file mode 100644 (file)
index 05e7a5f..0000000
+++ /dev/null
@@ -1,408 +0,0 @@
-// Deals with parsing Collection responses from API Server.
-
-package collection
-
-import (
-       "flag"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "git.curoverse.com/arvados.git/sdk/go/util"
-       "log"
-       "os"
-       "runtime/pprof"
-       "time"
-)
-
-var (
-       HeapProfileFilename string
-)
-
-// Collection representation
-type Collection struct {
-       UUID              string
-       OwnerUUID         string
-       ReplicationLevel  int
-       BlockDigestToSize map[blockdigest.BlockDigest]int
-       TotalSize         int
-}
-
-// ReadCollections holds information about collections from API server
-type ReadCollections struct {
-       ReadAllCollections        bool
-       UUIDToCollection          map[string]Collection
-       OwnerToCollectionSize     map[string]int
-       BlockToDesiredReplication map[blockdigest.DigestWithSize]int
-       CollectionUUIDToIndex     map[string]int
-       CollectionIndexToUUID     []string
-       BlockToCollectionIndices  map[blockdigest.DigestWithSize][]int
-}
-
-// GetCollectionsParams params
-type GetCollectionsParams struct {
-       Client    *arvadosclient.ArvadosClient
-       Logger    *logger.Logger
-       BatchSize int
-}
-
-// SdkCollectionInfo holds collection info from api
-type SdkCollectionInfo struct {
-       UUID               string    `json:"uuid"`
-       OwnerUUID          string    `json:"owner_uuid"`
-       ReplicationDesired int       `json:"replication_desired"`
-       ModifiedAt         time.Time `json:"modified_at"`
-       ManifestText       string    `json:"manifest_text"`
-}
-
-// SdkCollectionList lists collections from api
-type SdkCollectionList struct {
-       ItemsAvailable int                 `json:"items_available"`
-       Items          []SdkCollectionInfo `json:"items"`
-}
-
-func init() {
-       flag.StringVar(&HeapProfileFilename,
-               "heap-profile",
-               "",
-               "File to write the heap profiles to. Leave blank to skip profiling.")
-}
-
-// WriteHeapProfile writes the heap profile to a file for later review.
-// Since a file is expected to only contain a single heap profile this
-// function overwrites the previously written profile, so it is safe
-// to call multiple times in a single run.
-// Otherwise we would see cumulative numbers as explained here:
-// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() error {
-       if HeapProfileFilename != "" {
-               heapProfile, err := os.Create(HeapProfileFilename)
-               if err != nil {
-                       return err
-               }
-
-               defer heapProfile.Close()
-
-               err = pprof.WriteHeapProfile(heapProfile)
-               return err
-       }
-
-       return nil
-}
-
-// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
-       results, err = GetCollections(params)
-       if err != nil {
-               return
-       }
-
-       results.Summarize(params.Logger)
-
-       log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
-       log.Printf("Read and processed %d collections",
-               len(results.UUIDToCollection))
-
-       // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
-       // lots of behaviors can become warnings (and obviously we can't
-       // write anything).
-       // if !readCollections.ReadAllCollections {
-       //      log.Fatalf("Did not read all collections")
-       // }
-
-       return
-}
-
-// GetCollections gets collections from api
-func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
-       if &params.Client == nil {
-               err = fmt.Errorf("params.Client passed to GetCollections() should " +
-                       "contain a valid ArvadosClient, but instead it is nil.")
-               return
-       }
-
-       fieldsWanted := []string{"manifest_text",
-               "owner_uuid",
-               "uuid",
-               "replication_desired",
-               "modified_at"}
-
-       sdkParams := arvadosclient.Dict{
-               "select":  fieldsWanted,
-               "order":   []string{"modified_at ASC", "uuid ASC"},
-               "filters": [][]string{{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
-               "offset":  0}
-
-       if params.BatchSize > 0 {
-               sdkParams["limit"] = params.BatchSize
-       }
-
-       var defaultReplicationLevel int
-       {
-               var value interface{}
-               value, err = params.Client.Discovery("defaultCollectionReplication")
-               if err != nil {
-                       return
-               }
-
-               defaultReplicationLevel = int(value.(float64))
-               if defaultReplicationLevel <= 0 {
-                       err = fmt.Errorf("Default collection replication returned by arvados SDK "+
-                               "should be a positive integer but instead it was %d.",
-                               defaultReplicationLevel)
-                       return
-               }
-       }
-
-       initialNumberOfCollectionsAvailable, err :=
-               util.NumberItemsAvailable(params.Client, "collections")
-       if err != nil {
-               return
-       }
-       // Include a 1% margin for collections added while we're reading so
-       // that we don't have to grow the map in most cases.
-       maxExpectedCollections := int(
-               float64(initialNumberOfCollectionsAvailable) * 1.01)
-       results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
-
-       if params.Logger != nil {
-               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
-                       collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
-                       collectionInfo["batch_size"] = params.BatchSize
-                       collectionInfo["default_replication_level"] = defaultReplicationLevel
-               })
-       }
-
-       // These values are just for getting the loop to run the first time,
-       // afterwards they'll be set to real values.
-       remainingCollections := 1
-       var totalCollections int
-       var previousTotalCollections int
-       for remainingCollections > 0 {
-               // We're still finding new collections
-
-               // Write the heap profile for examining memory usage
-               err = WriteHeapProfile()
-               if err != nil {
-                       return
-               }
-
-               // Get next batch of collections.
-               var collections SdkCollectionList
-               err = params.Client.List("collections", sdkParams, &collections)
-               if err != nil {
-                       return
-               }
-               batchCollections := len(collections.Items)
-
-               // We must always have at least one collection in the batch
-               if batchCollections < 1 {
-                       err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
-                       return
-               }
-
-               // Update count of remaining collections
-               remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
-
-               // Process collection and update our date filter.
-               latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
-                       collections.Items,
-                       defaultReplicationLevel,
-                       results.UUIDToCollection)
-               if err != nil {
-                       return results, err
-               }
-               if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
-                       sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
-                       sdkParams["offset"] = 0
-               } else {
-                       sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
-               }
-
-               // update counts
-               previousTotalCollections = totalCollections
-               totalCollections = len(results.UUIDToCollection)
-
-               log.Printf("%d collections read, %d (%d new) in last batch, "+
-                       "%d remaining, "+
-                       "%s latest modified date, %.0f %d %d avg,max,total manifest size",
-                       totalCollections,
-                       batchCollections,
-                       totalCollections-previousTotalCollections,
-                       remainingCollections,
-                       sdkParams["filters"].([][]string)[0][2],
-                       float32(totalManifestSize)/float32(totalCollections),
-                       maxManifestSize, totalManifestSize)
-
-               if params.Logger != nil {
-                       params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               collectionInfo := logger.GetOrCreateMap(p, "collection_info")
-                               collectionInfo["collections_read"] = totalCollections
-                               collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
-                               collectionInfo["total_manifest_size"] = totalManifestSize
-                               collectionInfo["max_manifest_size"] = maxManifestSize
-                       })
-               }
-       }
-
-       // Make one final API request to verify that we have processed all collections available up to the latest modification date
-       var collections SdkCollectionList
-       sdkParams["filters"].([][]string)[0][1] = "<="
-       sdkParams["limit"] = 0
-       err = params.Client.List("collections", sdkParams, &collections)
-       if err != nil {
-               return
-       }
-       finalNumberOfCollectionsAvailable, err :=
-               util.NumberItemsAvailable(params.Client, "collections")
-       if err != nil {
-               return
-       }
-       if totalCollections < finalNumberOfCollectionsAvailable {
-               err = fmt.Errorf("API server indicates a total of %d collections "+
-                       "available up to %v, but we only retrieved %d. "+
-                       "Refusing to continue as this could indicate an "+
-                       "otherwise undetected failure.",
-                       finalNumberOfCollectionsAvailable,
-                       sdkParams["filters"].([][]string)[0][2],
-                       totalCollections)
-               return
-       }
-
-       // Write the heap profile for examining memory usage
-       err = WriteHeapProfile()
-
-       return
-}
-
-// StrCopy returns a newly allocated string.
-// It is useful to copy slices so that the garbage collector can reuse
-// the memory of the longer strings they came from.
-func StrCopy(s string) string {
-       return string([]byte(s))
-}
-
-// ProcessCollections read from api server
-func ProcessCollections(arvLogger *logger.Logger,
-       receivedCollections []SdkCollectionInfo,
-       defaultReplicationLevel int,
-       UUIDToCollection map[string]Collection,
-) (
-       latestModificationDate time.Time,
-       maxManifestSize, totalManifestSize uint64,
-       err error,
-) {
-       for _, sdkCollection := range receivedCollections {
-               collection := Collection{UUID: StrCopy(sdkCollection.UUID),
-                       OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
-                       ReplicationLevel:  sdkCollection.ReplicationDesired,
-                       BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
-
-               if sdkCollection.ModifiedAt.IsZero() {
-                       err = fmt.Errorf(
-                               "Arvados SDK collection returned with unexpected zero "+
-                                       "modification date. This probably means that either we failed to "+
-                                       "parse the modification date or the API server has changed how "+
-                                       "it returns modification dates: %+v",
-                               collection)
-                       return
-               }
-
-               if sdkCollection.ModifiedAt.After(latestModificationDate) {
-                       latestModificationDate = sdkCollection.ModifiedAt
-               }
-
-               if collection.ReplicationLevel == 0 {
-                       collection.ReplicationLevel = defaultReplicationLevel
-               }
-
-               manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
-               manifestSize := uint64(len(sdkCollection.ManifestText))
-
-               if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
-                       totalManifestSize += manifestSize
-               }
-               if manifestSize > maxManifestSize {
-                       maxManifestSize = manifestSize
-               }
-
-               blockChannel := manifest.BlockIterWithDuplicates()
-               for block := range blockChannel {
-                       if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
-                               log.Printf(
-                                       "Collection %s contains multiple sizes (%d and %d) for block %s",
-                                       collection.UUID,
-                                       storedSize,
-                                       block.Size,
-                                       block.Digest)
-                       }
-                       collection.BlockDigestToSize[block.Digest] = block.Size
-               }
-               if manifest.Err != nil {
-                       err = manifest.Err
-                       return
-               }
-
-               collection.TotalSize = 0
-               for _, size := range collection.BlockDigestToSize {
-                       collection.TotalSize += size
-               }
-               UUIDToCollection[collection.UUID] = collection
-
-               // Clear out all the manifest strings that we don't need anymore.
-               // These hopefully form the bulk of our memory usage.
-               manifest.Text = ""
-               sdkCollection.ManifestText = ""
-       }
-
-       return
-}
-
-// Summarize the collections read
-func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
-       readCollections.OwnerToCollectionSize = make(map[string]int)
-       readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
-       numCollections := len(readCollections.UUIDToCollection)
-       readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
-       readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
-       readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
-
-       for _, coll := range readCollections.UUIDToCollection {
-               collectionIndex := len(readCollections.CollectionIndexToUUID)
-               readCollections.CollectionIndexToUUID =
-                       append(readCollections.CollectionIndexToUUID, coll.UUID)
-               readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
-
-               readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
-                       readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
-
-               for block, size := range coll.BlockDigestToSize {
-                       locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
-                       readCollections.BlockToCollectionIndices[locator] =
-                               append(readCollections.BlockToCollectionIndices[locator],
-                                       collectionIndex)
-                       storedReplication := readCollections.BlockToDesiredReplication[locator]
-                       if coll.ReplicationLevel > storedReplication {
-                               readCollections.BlockToDesiredReplication[locator] =
-                                       coll.ReplicationLevel
-                       }
-               }
-       }
-
-       if arvLogger != nil {
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       collectionInfo := logger.GetOrCreateMap(p, "collection_info")
-                       // Since maps are shallow copied, we run a risk of concurrent
-                       // updates here. By copying results.OwnerToCollectionSize into
-                       // the log, we're assuming that it won't be updated.
-                       collectionInfo["owner_to_collection_size"] =
-                               readCollections.OwnerToCollectionSize
-                       collectionInfo["distinct_blocks_named"] =
-                               len(readCollections.BlockToDesiredReplication)
-               })
-       }
-
-       return
-}
diff --git a/services/datamanager/collection/collection_test.go b/services/datamanager/collection/collection_test.go
deleted file mode 100644 (file)
index 1bf6a89..0000000
+++ /dev/null
@@ -1,202 +0,0 @@
-package collection
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       . "gopkg.in/check.v1"
-       "net/http"
-       "net/http/httptest"
-       "testing"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) {
-       TestingT(t)
-}
-
-type MySuite struct{}
-
-var _ = Suite(&MySuite{})
-
-// This captures the result we expect from
-// ReadCollections.Summarize().  Because CollectionUUIDToIndex is
-// indeterminate, we replace BlockToCollectionIndices with
-// BlockToCollectionUuids.
-type ExpectedSummary struct {
-       OwnerToCollectionSize     map[string]int
-       BlockToDesiredReplication map[blockdigest.DigestWithSize]int
-       BlockToCollectionUuids    map[blockdigest.DigestWithSize][]string
-}
-
-func CompareSummarizedReadCollections(c *C,
-       summarized ReadCollections,
-       expected ExpectedSummary) {
-
-       c.Assert(summarized.OwnerToCollectionSize, DeepEquals,
-               expected.OwnerToCollectionSize)
-
-       c.Assert(summarized.BlockToDesiredReplication, DeepEquals,
-               expected.BlockToDesiredReplication)
-
-       summarizedBlockToCollectionUuids :=
-               make(map[blockdigest.DigestWithSize]map[string]struct{})
-       for digest, indices := range summarized.BlockToCollectionIndices {
-               uuidSet := make(map[string]struct{})
-               summarizedBlockToCollectionUuids[digest] = uuidSet
-               for _, index := range indices {
-                       uuidSet[summarized.CollectionIndexToUUID[index]] = struct{}{}
-               }
-       }
-
-       expectedBlockToCollectionUuids :=
-               make(map[blockdigest.DigestWithSize]map[string]struct{})
-       for digest, uuidSlice := range expected.BlockToCollectionUuids {
-               uuidSet := make(map[string]struct{})
-               expectedBlockToCollectionUuids[digest] = uuidSet
-               for _, uuid := range uuidSlice {
-                       uuidSet[uuid] = struct{}{}
-               }
-       }
-
-       c.Assert(summarizedBlockToCollectionUuids, DeepEquals,
-               expectedBlockToCollectionUuids)
-}
-
-func (s *MySuite) TestSummarizeSimple(checker *C) {
-       rc := MakeTestReadCollections([]TestCollectionSpec{{
-               ReplicationLevel: 5,
-               Blocks:           []int{1, 2},
-       }})
-
-       rc.Summarize(nil)
-
-       c := rc.UUIDToCollection["col0"]
-
-       blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
-       blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
-
-       expected := ExpectedSummary{
-               OwnerToCollectionSize:     map[string]int{c.OwnerUUID: c.TotalSize},
-               BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{blockDigest1: 5, blockDigest2: 5},
-               BlockToCollectionUuids:    map[blockdigest.DigestWithSize][]string{blockDigest1: {c.UUID}, blockDigest2: {c.UUID}},
-       }
-
-       CompareSummarizedReadCollections(checker, rc, expected)
-}
-
-func (s *MySuite) TestSummarizeOverlapping(checker *C) {
-       rc := MakeTestReadCollections([]TestCollectionSpec{
-               {
-                       ReplicationLevel: 5,
-                       Blocks:           []int{1, 2},
-               },
-               {
-                       ReplicationLevel: 8,
-                       Blocks:           []int{2, 3},
-               },
-       })
-
-       rc.Summarize(nil)
-
-       c0 := rc.UUIDToCollection["col0"]
-       c1 := rc.UUIDToCollection["col1"]
-
-       blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
-       blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
-       blockDigest3 := blockdigest.MakeTestDigestWithSize(3)
-
-       expected := ExpectedSummary{
-               OwnerToCollectionSize: map[string]int{
-                       c0.OwnerUUID: c0.TotalSize,
-                       c1.OwnerUUID: c1.TotalSize,
-               },
-               BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{
-                       blockDigest1: 5,
-                       blockDigest2: 8,
-                       blockDigest3: 8,
-               },
-               BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{
-                       blockDigest1: {c0.UUID},
-                       blockDigest2: {c0.UUID, c1.UUID},
-                       blockDigest3: {c1.UUID},
-               },
-       }
-
-       CompareSummarizedReadCollections(checker, rc, expected)
-}
-
-type APITestData struct {
-       // path and response map
-       responses map[string]arvadostest.StubResponse
-
-       // expected error, if any
-       expectedError string
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_DiscoveryError(c *C) {
-       testGetCollectionsAndSummarize(c,
-               APITestData{
-                       responses:     make(map[string]arvadostest.StubResponse),
-                       expectedError: "arvados API server error: 500.*",
-               })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_ApiErrorGetCollections(c *C) {
-       respMap := make(map[string]arvadostest.StubResponse)
-       respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
-       respMap["/arvados/v1/collections"] = arvadostest.StubResponse{-1, ``}
-
-       testGetCollectionsAndSummarize(c,
-               APITestData{
-                       responses:     respMap,
-                       expectedError: "arvados API server error: 302.*",
-               })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadStreamName(c *C) {
-       respMap := make(map[string]arvadostest.StubResponse)
-       respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
-       respMap["/arvados/v1/collections"] = arvadostest.StubResponse{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"badstreamname"}]}`}
-
-       testGetCollectionsAndSummarize(c,
-               APITestData{
-                       responses:     respMap,
-                       expectedError: "Invalid stream name: badstreamname",
-               })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadFileToken(c *C) {
-       respMap := make(map[string]arvadostest.StubResponse)
-       respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
-       respMap["/arvados/v1/collections"] = arvadostest.StubResponse{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"./goodstream acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt file2.txt"}]}`}
-
-       testGetCollectionsAndSummarize(c,
-               APITestData{
-                       responses:     respMap,
-                       expectedError: "Invalid file token: file2.txt",
-               })
-}
-
-func testGetCollectionsAndSummarize(c *C, testData APITestData) {
-       apiStub := arvadostest.ServerStub{testData.responses}
-
-       api := httptest.NewServer(&apiStub)
-       defer api.Close()
-
-       arv := &arvadosclient.ArvadosClient{
-               Scheme:    "http",
-               ApiServer: api.URL[7:],
-               ApiToken:  "abc123",
-               Client:    &http.Client{Transport: &http.Transport{}},
-       }
-
-       // GetCollectionsAndSummarize
-       _, err := GetCollectionsAndSummarize(GetCollectionsParams{arv, nil, 10})
-
-       if testData.expectedError == "" {
-               c.Assert(err, IsNil)
-       } else {
-               c.Assert(err, ErrorMatches, testData.expectedError)
-       }
-}
diff --git a/services/datamanager/collection/testing.go b/services/datamanager/collection/testing.go
deleted file mode 100644 (file)
index 2238433..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-// Code used for testing only.
-
-package collection
-
-import (
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-)
-
-// TestCollectionSpec with test blocks and desired replication level
-type TestCollectionSpec struct {
-       // The desired replication level
-       ReplicationLevel int
-       // Blocks this contains, represented by ints. Ints repeated will
-       // still only represent one block
-       Blocks []int
-}
-
-// MakeTestReadCollections creates a ReadCollections object for testing
-// based on the give specs. Only the ReadAllCollections and UUIDToCollection
-// fields are populated. To populate other fields call rc.Summarize().
-func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) {
-       rc = ReadCollections{
-               ReadAllCollections: true,
-               UUIDToCollection:   map[string]Collection{},
-       }
-
-       for i, spec := range specs {
-               c := Collection{
-                       UUID:              fmt.Sprintf("col%d", i),
-                       OwnerUUID:         fmt.Sprintf("owner%d", i),
-                       ReplicationLevel:  spec.ReplicationLevel,
-                       BlockDigestToSize: map[blockdigest.BlockDigest]int{},
-               }
-               rc.UUIDToCollection[c.UUID] = c
-               for _, j := range spec.Blocks {
-                       c.BlockDigestToSize[blockdigest.MakeTestBlockDigest(j)] = j
-               }
-               // We compute the size in a separate loop because the value
-               // computed in the above loop would be invalid if c.Blocks
-               // contained duplicates.
-               for _, size := range c.BlockDigestToSize {
-                       c.TotalSize += size
-               }
-       }
-       return
-}
-
-// CollectionIndicesForTesting returns a slice giving the collection
-// index of each collection that was passed in to MakeTestReadCollections.
-// rc.Summarize() must be called before this method, since Summarize()
-// assigns an index to each collection.
-func (rc ReadCollections) CollectionIndicesForTesting() (indices []int) {
-       // TODO(misha): Assert that rc.Summarize() has been called.
-       numCollections := len(rc.CollectionIndexToUUID)
-       indices = make([]int, numCollections)
-       for i := 0; i < numCollections; i++ {
-               indices[i] = rc.CollectionUUIDToIndex[fmt.Sprintf("col%d", i)]
-       }
-       return
-}
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
deleted file mode 100644 (file)
index 5250d17..0000000
+++ /dev/null
@@ -1,220 +0,0 @@
-/* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
-
-package main
-
-import (
-       "errors"
-       "flag"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/sdk/go/util"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
-       "git.curoverse.com/arvados.git/services/datamanager/summary"
-       "log"
-       "time"
-)
-
-var (
-       logEventTypePrefix  string
-       logFrequencySeconds int
-       minutesBetweenRuns  int
-       collectionBatchSize int
-       dryRun              bool
-)
-
-func init() {
-       flag.StringVar(&logEventTypePrefix,
-               "log-event-type-prefix",
-               "experimental-data-manager",
-               "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
-       flag.IntVar(&logFrequencySeconds,
-               "log-frequency-seconds",
-               20,
-               "How frequently we'll write log entries in seconds.")
-       flag.IntVar(&minutesBetweenRuns,
-               "minutes-between-runs",
-               0,
-               "How many minutes we wait between data manager runs. 0 means run once and exit.")
-       flag.IntVar(&collectionBatchSize,
-               "collection-batch-size",
-               1000,
-               "How many collections to request in each batch.")
-       flag.BoolVar(&dryRun,
-               "dry-run",
-               false,
-               "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.")
-}
-
-func main() {
-       flag.Parse()
-
-       if minutesBetweenRuns == 0 {
-               arv, err := arvadosclient.MakeArvadosClient()
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
-               }
-               err = singlerun(arv)
-               if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
-               }
-       } else {
-               waitTime := time.Minute * time.Duration(minutesBetweenRuns)
-               for {
-                       log.Println("Beginning Run")
-                       arv, err := arvadosclient.MakeArvadosClient()
-                       if err != nil {
-                               loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
-                       }
-                       err = singlerun(arv)
-                       if err != nil {
-                               log.Printf("singlerun: %v", err)
-                       }
-                       log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
-                       time.Sleep(waitTime)
-               }
-       }
-}
-
-var arvLogger *logger.Logger
-
-func singlerun(arv *arvadosclient.ArvadosClient) error {
-       var err error
-       if isAdmin, err := util.UserIsAdmin(arv); err != nil {
-               return errors.New("Error verifying admin token: " + err.Error())
-       } else if !isAdmin {
-               return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
-       }
-
-       if logEventTypePrefix != "" {
-               arvLogger, err = logger.NewLogger(logger.LoggerParams{
-                       Client:          arv,
-                       EventTypePrefix: logEventTypePrefix,
-                       WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
-       }
-
-       loggerutil.LogRunInfo(arvLogger)
-       if arvLogger != nil {
-               arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
-       }
-
-       var (
-               dataFetcher     summary.DataFetcher
-               readCollections collection.ReadCollections
-               keepServerInfo  keep.ReadServers
-       )
-
-       if summary.ShouldReadData() {
-               dataFetcher = summary.ReadData
-       } else {
-               dataFetcher = BuildDataFetcher(arv)
-       }
-
-       err = dataFetcher(arvLogger, &readCollections, &keepServerInfo)
-       if err != nil {
-               return err
-       }
-
-       err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
-       if err != nil {
-               return err
-       }
-
-       buckets := summary.BucketReplication(readCollections, keepServerInfo)
-       bucketCounts := buckets.Counts()
-
-       replicationSummary := buckets.SummarizeBuckets(readCollections)
-       replicationCounts := replicationSummary.ComputeCounts()
-
-       log.Printf("Blocks In Collections: %d, "+
-               "\nBlocks In Keep: %d.",
-               len(readCollections.BlockToDesiredReplication),
-               len(keepServerInfo.BlockToServers))
-       log.Println(replicationCounts.PrettyPrint())
-
-       log.Printf("Blocks Histogram:")
-       for _, rlbss := range bucketCounts {
-               log.Printf("%+v: %10d",
-                       rlbss.Levels,
-                       rlbss.Count)
-       }
-
-       kc, err := keepclient.MakeKeepClient(arv)
-       if err != nil {
-               return fmt.Errorf("Error setting up keep client %v", err.Error())
-       }
-
-       // Log that we're finished. We force the recording, since go will
-       // not wait for the write timer before exiting.
-       if arvLogger != nil {
-               defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
-                       summaryInfo := logger.GetOrCreateMap(p, "summary_info")
-                       summaryInfo["block_replication_counts"] = bucketCounts
-                       summaryInfo["replication_summary"] = replicationCounts
-                       p["summary_info"] = summaryInfo
-
-                       p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
-               })
-       }
-
-       pullServers := summary.ComputePullServers(kc,
-               &keepServerInfo,
-               readCollections.BlockToDesiredReplication,
-               replicationSummary.UnderReplicatedBlocks)
-
-       pullLists := summary.BuildPullLists(pullServers)
-
-       trashLists, trashErr := summary.BuildTrashLists(kc,
-               &keepServerInfo,
-               replicationSummary.KeepBlocksNotInCollections)
-
-       err = summary.WritePullLists(arvLogger, pullLists, dryRun)
-       if err != nil {
-               return err
-       }
-
-       if trashErr != nil {
-               return err
-       }
-       keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)
-
-       return nil
-}
-
-// BuildDataFetcher returns a data fetcher that fetches data from remote servers.
-func BuildDataFetcher(arv *arvadosclient.ArvadosClient) summary.DataFetcher {
-       return func(
-               arvLogger *logger.Logger,
-               readCollections *collection.ReadCollections,
-               keepServerInfo *keep.ReadServers,
-       ) error {
-               collDone := make(chan struct{})
-               var collErr error
-               go func() {
-                       *readCollections, collErr = collection.GetCollectionsAndSummarize(
-                               collection.GetCollectionsParams{
-                                       Client:    arv,
-                                       Logger:    arvLogger,
-                                       BatchSize: collectionBatchSize})
-                       collDone <- struct{}{}
-               }()
-
-               var keepErr error
-               *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize(
-                       keep.GetKeepServersParams{
-                               Client: arv,
-                               Logger: arvLogger,
-                               Limit:  1000})
-
-               <-collDone
-
-               // Return a nil error only if both parts succeeded.
-               if collErr != nil {
-                       return collErr
-               }
-               return keepErr
-       }
-}
diff --git a/services/datamanager/datamanager_test.go b/services/datamanager/datamanager_test.go
deleted file mode 100644 (file)
index 7a8fff5..0000000
+++ /dev/null
@@ -1,732 +0,0 @@
-package main
-
-import (
-       "encoding/json"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/summary"
-       "io/ioutil"
-       "net/http"
-       "os"
-       "os/exec"
-       "path"
-       "regexp"
-       "strings"
-       "testing"
-       "time"
-)
-
-var arv *arvadosclient.ArvadosClient
-var keepClient *keepclient.KeepClient
-var keepServers []string
-
-func SetupDataManagerTest(t *testing.T) {
-       os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
-
-       // start api and keep servers
-       arvadostest.ResetEnv()
-       arvadostest.StartAPI()
-       arvadostest.StartKeep(2, false)
-
-       var err error
-       arv, err = arvadosclient.MakeArvadosClient()
-       if err != nil {
-               t.Fatalf("Error making arvados client: %s", err)
-       }
-       arv.ApiToken = arvadostest.DataManagerToken
-
-       // keep client
-       keepClient = &keepclient.KeepClient{
-               Arvados:       arv,
-               Want_replicas: 2,
-               Client:        &http.Client{},
-       }
-
-       // discover keep services
-       if err = keepClient.DiscoverKeepServers(); err != nil {
-               t.Fatalf("Error discovering keep services: %s", err)
-       }
-       keepServers = []string{}
-       for _, host := range keepClient.LocalRoots() {
-               keepServers = append(keepServers, host)
-       }
-}
-
-func TearDownDataManagerTest(t *testing.T) {
-       arvadostest.StopKeep(2)
-       arvadostest.StopAPI()
-       summary.WriteDataTo = ""
-       collection.HeapProfileFilename = ""
-}
-
-func putBlock(t *testing.T, data string) string {
-       locator, _, err := keepClient.PutB([]byte(data))
-       if err != nil {
-               t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
-       }
-       if locator == "" {
-               t.Fatalf("No locator found after putting test data")
-       }
-
-       splits := strings.Split(locator, "+")
-       return splits[0] + "+" + splits[1]
-}
-
-func getBlock(t *testing.T, locator string, data string) {
-       reader, blocklen, _, err := keepClient.Get(locator)
-       if err != nil {
-               t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
-       }
-       if reader == nil {
-               t.Fatalf("No reader found after putting test data")
-       }
-       if blocklen != int64(len(data)) {
-               t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
-       }
-
-       all, err := ioutil.ReadAll(reader)
-       if string(all) != data {
-               t.Fatalf("Data read %s did not match expected data %s", string(all), data)
-       }
-}
-
-// Create a collection using arv-put
-func createCollection(t *testing.T, data string) string {
-       tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
-       defer os.Remove(tempfile.Name())
-
-       _, err = tempfile.Write([]byte(data))
-       if err != nil {
-               t.Fatalf("Error writing to tempfile %v", err)
-       }
-
-       // arv-put
-       output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
-       if err != nil {
-               t.Fatalf("Error running arv-put %s", err)
-       }
-
-       uuid := string(output[0:27]) // trim terminating char
-       return uuid
-}
-
-// Get collection locator
-var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
-
-func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
-       manifest := getCollection(t, uuid)["manifest_text"].(string)
-
-       locator := strings.Split(manifest, " ")[1]
-       match := locatorMatcher.FindStringSubmatch(locator)
-       if match == nil {
-               t.Fatalf("No locator found in collection manifest %s", manifest)
-       }
-
-       return match[1] + "+" + match[2]
-}
-
-func switchToken(t string) func() {
-       orig := arv.ApiToken
-       restore := func() {
-               arv.ApiToken = orig
-       }
-       arv.ApiToken = t
-       return restore
-}
-
-func getCollection(t *testing.T, uuid string) Dict {
-       defer switchToken(arvadostest.AdminToken)()
-
-       getback := make(Dict)
-       err := arv.Get("collections", uuid, nil, &getback)
-       if err != nil {
-               t.Fatalf("Error getting collection %s", err)
-       }
-       if getback["uuid"] != uuid {
-               t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
-       }
-
-       return getback
-}
-
-func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
-       defer switchToken(arvadostest.AdminToken)()
-
-       err := arv.Update("collections", uuid, arvadosclient.Dict{
-               "collection": arvadosclient.Dict{
-                       paramName: paramValue,
-               },
-       }, &arvadosclient.Dict{})
-
-       if err != nil {
-               t.Fatalf("Error updating collection %s", err)
-       }
-}
-
-type Dict map[string]interface{}
-
-func deleteCollection(t *testing.T, uuid string) {
-       defer switchToken(arvadostest.AdminToken)()
-
-       getback := make(Dict)
-       err := arv.Delete("collections", uuid, nil, &getback)
-       if err != nil {
-               t.Fatalf("Error deleting collection %s", err)
-       }
-       if getback["uuid"] != uuid {
-               t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
-       }
-}
-
-func dataManagerSingleRun(t *testing.T) {
-       err := singlerun(arv)
-       if err != nil {
-               t.Fatalf("Error during singlerun %s", err)
-       }
-}
-
-func getBlockIndexesForServer(t *testing.T, i int) []string {
-       var indexes []string
-
-       path := keepServers[i] + "/index"
-       client := http.Client{}
-       req, err := http.NewRequest("GET", path, nil)
-       req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
-       req.Header.Add("Content-Type", "application/octet-stream")
-       resp, err := client.Do(req)
-       defer resp.Body.Close()
-
-       if err != nil {
-               t.Fatalf("Error during %s %s", path, err)
-       }
-
-       body, err := ioutil.ReadAll(resp.Body)
-       if err != nil {
-               t.Fatalf("Error reading response from %s %s", path, err)
-       }
-
-       lines := strings.Split(string(body), "\n")
-       for _, line := range lines {
-               indexes = append(indexes, strings.Split(line, " ")...)
-       }
-
-       return indexes
-}
-
-func getBlockIndexes(t *testing.T) [][]string {
-       var indexes [][]string
-
-       for i := 0; i < len(keepServers); i++ {
-               indexes = append(indexes, getBlockIndexesForServer(t, i))
-       }
-       return indexes
-}
-
-func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
-       blocks := getBlockIndexes(t)
-
-       for _, block := range notExpected {
-               for _, idx := range blocks {
-                       if valueInArray(block, idx) {
-                               t.Fatalf("Found unexpected block %s", block)
-                       }
-               }
-       }
-
-       for _, block := range expected {
-               nFound := 0
-               for _, idx := range blocks {
-                       if valueInArray(block, idx) {
-                               nFound++
-                       }
-               }
-               if nFound < minReplication {
-                       t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
-               }
-       }
-}
-
-func valueInArray(value string, list []string) bool {
-       for _, v := range list {
-               if value == v {
-                       return true
-               }
-       }
-       return false
-}
-
-// Test env uses two keep volumes. The volume names can be found by reading the files
-// ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
-//
-// The keep volumes are of the dir structure: volumeN/subdir/locator
-func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
-       // First get rid of any size hints in the locators
-       var trimmedBlockLocators []string
-       for _, block := range oldUnusedBlockLocators {
-               trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
-       }
-
-       // Get the working dir so that we can read keep{n}.volume files
-       wd, err := os.Getwd()
-       if err != nil {
-               t.Fatalf("Error getting working dir %s", err)
-       }
-
-       // Now cycle through the two keep volumes
-       oldTime := time.Now().AddDate(0, -2, 0)
-       for i := 0; i < 2; i++ {
-               filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
-               volumeDir, err := ioutil.ReadFile(filename)
-               if err != nil {
-                       t.Fatalf("Error reading keep volume file %s %s", filename, err)
-               }
-
-               // Read the keep volume dir structure
-               volumeContents, err := ioutil.ReadDir(string(volumeDir))
-               if err != nil {
-                       t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
-               }
-
-               // Read each subdir for each of the keep volume dir
-               for _, subdir := range volumeContents {
-                       subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
-                       subdirContents, err := ioutil.ReadDir(string(subdirName))
-                       if err != nil {
-                               t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
-                       }
-
-                       // Now we got to the files. The files are names are the block locators
-                       for _, fileInfo := range subdirContents {
-                               blockName := fileInfo.Name()
-                               myname := fmt.Sprintf("%s/%s", subdirName, blockName)
-                               if valueInArray(blockName, trimmedBlockLocators) {
-                                       err = os.Chtimes(myname, oldTime, oldTime)
-                               }
-                       }
-               }
-       }
-}
-
-func getStatus(t *testing.T, path string) interface{} {
-       client := http.Client{}
-       req, err := http.NewRequest("GET", path, nil)
-       req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
-       req.Header.Add("Content-Type", "application/octet-stream")
-       resp, err := client.Do(req)
-       if err != nil {
-               t.Fatalf("Error during %s %s", path, err)
-       }
-       defer resp.Body.Close()
-
-       var s interface{}
-       json.NewDecoder(resp.Body).Decode(&s)
-
-       return s
-}
-
-// Wait until PullQueue and TrashQueue are empty on all keepServers.
-func waitUntilQueuesFinishWork(t *testing.T) {
-       for _, ks := range keepServers {
-               for done := false; !done; {
-                       time.Sleep(100 * time.Millisecond)
-                       s := getStatus(t, ks+"/status.json")
-                       for _, qName := range []string{"PullQueue", "TrashQueue"} {
-                               qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
-                               if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
-                                       done = true
-                               }
-                       }
-               }
-       }
-}
-
-// Create some blocks and backdate some of them.
-// Also create some collections and delete some of them.
-// Verify block indexes.
-func TestPutAndGetBlocks(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       // Put some blocks which will be backdated later on
-       // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
-       // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
-       var oldUnusedBlockLocators []string
-       oldUnusedBlockData := "this block will have older mtime"
-       for i := 0; i < 5; i++ {
-               oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
-       }
-       for i := 0; i < 5; i++ {
-               getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
-       }
-
-       // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
-       oldUsedBlockData := "this collection block will have older mtime"
-       oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
-       getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
-
-       // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
-       // Hence, even though unreferenced, these should not be deleted when datamanager runs.
-       var newBlockLocators []string
-       newBlockData := "this block is newer"
-       for i := 0; i < 5; i++ {
-               newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
-       }
-       for i := 0; i < 5; i++ {
-               getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
-       }
-
-       // Create a collection that would be deleted later on
-       toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
-       toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
-
-       // Create another collection that has the same data as the one of the old blocks
-       oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
-       oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
-       if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
-               t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
-       }
-
-       // Create another collection whose replication level will be changed
-       replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
-       replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
-
-       // Create two collections with same data; one will be deleted later on
-       dataForTwoCollections := "one of these collections will be deleted"
-       oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
-       oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
-       secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
-       secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
-       if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
-               t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
-       }
-
-       // create collection with empty manifest text
-       emptyBlockLocator := putBlock(t, "")
-       emptyCollection := createCollection(t, "")
-
-       // Verify blocks before doing any backdating / deleting.
-       var expected []string
-       expected = append(expected, oldUnusedBlockLocators...)
-       expected = append(expected, newBlockLocators...)
-       expected = append(expected, toBeDeletedCollectionLocator)
-       expected = append(expected, replicationCollectionLocator)
-       expected = append(expected, oneOfTwoWithSameDataLocator)
-       expected = append(expected, secondOfTwoWithSameDataLocator)
-       expected = append(expected, emptyBlockLocator)
-
-       verifyBlocks(t, nil, expected, 2)
-
-       // Run datamanager in singlerun mode
-       dataManagerSingleRun(t)
-       waitUntilQueuesFinishWork(t)
-
-       verifyBlocks(t, nil, expected, 2)
-
-       // Backdate the to-be old blocks and delete the collections
-       backdateBlocks(t, oldUnusedBlockLocators)
-       deleteCollection(t, toBeDeletedCollectionUUID)
-       deleteCollection(t, secondOfTwoWithSameDataUUID)
-       backdateBlocks(t, []string{emptyBlockLocator})
-       deleteCollection(t, emptyCollection)
-
-       // Run data manager again
-       dataManagerSingleRun(t)
-       waitUntilQueuesFinishWork(t)
-
-       // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
-       expected = expected[:0]
-       expected = append(expected, oldUsedBlockLocator)
-       expected = append(expected, newBlockLocators...)
-       expected = append(expected, toBeDeletedCollectionLocator)
-       expected = append(expected, oneOfTwoWithSameDataLocator)
-       expected = append(expected, secondOfTwoWithSameDataLocator)
-       expected = append(expected, emptyBlockLocator) // even when unreferenced, this remains
-
-       verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
-
-       // Reduce desired replication on replicationCollectionUUID
-       // collection, and verify that Data Manager does not reduce
-       // actual replication any further than that. (It might not
-       // reduce actual replication at all; that's OK for this test.)
-
-       // Reduce desired replication level.
-       updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
-       collection := getCollection(t, replicationCollectionUUID)
-       if collection["replication_desired"].(interface{}) != float64(1) {
-               t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
-       }
-
-       // Verify data is currently overreplicated.
-       verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
-
-       // Run data manager again
-       dataManagerSingleRun(t)
-       waitUntilQueuesFinishWork(t)
-
-       // Verify data is not underreplicated.
-       verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
-
-       // Verify *other* collections' data is not underreplicated.
-       verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
-}
-
-func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       for i := 0; i < 10; i++ {
-               err := singlerun(arv)
-               if err != nil {
-                       t.Fatalf("Got an error during datamanager singlerun: %v", err)
-               }
-       }
-}
-
-func TestGetStatusRepeatedly(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       for i := 0; i < 10; i++ {
-               for j := 0; j < 2; j++ {
-                       s := getStatus(t, keepServers[j]+"/status.json")
-
-                       var pullQueueStatus interface{}
-                       pullQueueStatus = s.(map[string]interface{})["PullQueue"]
-                       var trashQueueStatus interface{}
-                       trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
-
-                       if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
-                               pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
-                               trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
-                               trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
-                               t.Fatalf("PullQueue and TrashQueue status not found")
-                       }
-
-                       time.Sleep(100 * time.Millisecond)
-               }
-       }
-}
-
-func TestRunDatamanagerWithBogusServer(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       arv.ApiServer = "bogus-server"
-
-       err := singlerun(arv)
-       if err == nil {
-               t.Fatalf("Expected error during singlerun with bogus server")
-       }
-}
-
-func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       arv.ApiToken = arvadostest.ActiveToken
-
-       err := singlerun(arv)
-       if err == nil {
-               t.Fatalf("Expected error during singlerun as non-admin user")
-       }
-}
-
-func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
-       testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
-}
-
-func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
-       badpath, err := arvadostest.CreateBadPath()
-       if err != nil {
-               t.Fatalf(err.Error())
-       }
-       defer func() {
-               err = arvadostest.DestroyBadPath(badpath)
-               if err != nil {
-                       t.Fatalf(err.Error())
-               }
-       }()
-       testOldBlocksNotDeletedOnDataManagerError(t, path.Join(badpath, "writetofile"), "", true, true)
-}
-
-func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
-       badpath, err := arvadostest.CreateBadPath()
-       if err != nil {
-               t.Fatalf(err.Error())
-       }
-       defer func() {
-               err = arvadostest.DestroyBadPath(badpath)
-               if err != nil {
-                       t.Fatalf(err.Error())
-               }
-       }()
-       testOldBlocksNotDeletedOnDataManagerError(t, "", path.Join(badpath, "heapprofilefile"), true, true)
-}
-
-// Create some blocks and backdate some of them.
-// Run datamanager while producing an error condition.
-// Verify that the blocks are hence not deleted.
-func testOldBlocksNotDeletedOnDataManagerError(t *testing.T, writeDataTo string, heapProfileFile string, expectError bool, expectOldBlocks bool) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       // Put some blocks and backdate them.
-       var oldUnusedBlockLocators []string
-       oldUnusedBlockData := "this block will have older mtime"
-       for i := 0; i < 5; i++ {
-               oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
-       }
-       backdateBlocks(t, oldUnusedBlockLocators)
-
-       // Run data manager
-       summary.WriteDataTo = writeDataTo
-       collection.HeapProfileFilename = heapProfileFile
-
-       err := singlerun(arv)
-       if !expectError {
-               if err != nil {
-                       t.Fatalf("Got an error during datamanager singlerun: %v", err)
-               }
-       } else {
-               if err == nil {
-                       t.Fatalf("Expected error during datamanager singlerun")
-               }
-       }
-       waitUntilQueuesFinishWork(t)
-
-       // Get block indexes and verify that all backdated blocks are not/deleted as expected
-       if expectOldBlocks {
-               verifyBlocks(t, nil, oldUnusedBlockLocators, 2)
-       } else {
-               verifyBlocks(t, oldUnusedBlockLocators, nil, 2)
-       }
-}
-
-// Create a collection with multiple streams and blocks
-func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, numBlocks int) (string, []string) {
-       defer switchToken(arvadostest.AdminToken)()
-
-       manifest := ""
-       locators := make(map[string]bool)
-       for s := 0; s < numStreams; s++ {
-               manifest += fmt.Sprintf("./stream%d ", s)
-               for b := 0; b < numBlocks; b++ {
-                       locator, _, err := keepClient.PutB([]byte(fmt.Sprintf("%s in stream %d and block %d", data, s, b)))
-                       if err != nil {
-                               t.Fatalf("Error creating block %d in stream %d: %v", b, s, err)
-                       }
-                       locators[strings.Split(locator, "+A")[0]] = true
-                       manifest += locator + " "
-               }
-               manifest += "0:1:dummyfile.txt\n"
-       }
-
-       collection := make(Dict)
-       err := arv.Create("collections",
-               arvadosclient.Dict{"collection": arvadosclient.Dict{"manifest_text": manifest}},
-               &collection)
-
-       if err != nil {
-               t.Fatalf("Error creating collection %v", err)
-       }
-
-       var locs []string
-       for k := range locators {
-               locs = append(locs, k)
-       }
-
-       return collection["uuid"].(string), locs
-}
-
-// Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
-// Also, create stray block and backdate it.
-// After datamanager run: expect blocks from the collection, but not the stray block.
-func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
-       testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
-}
-
-// Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
-// keepstore of a service type other than "disk". Only the "disk" type services
-// will be indexed by datamanager and hence should work the same way.
-func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
-       testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
-}
-
-// Test datamanager with dry-run. Expect no block to be deleted.
-func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
-       testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
-}
-
-func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
-       defer TearDownDataManagerTest(t)
-       SetupDataManagerTest(t)
-
-       // create collection whose blocks will be backdated
-       collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
-       if collectionWithOldBlocks == "" {
-               t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
-       }
-       if len(oldBlocks) != numStreams*numBlocks {
-               t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
-       }
-
-       // create a stray block that will be backdated
-       strayOldBlock := putBlock(t, "this stray block is old")
-
-       expected := []string{strayOldBlock}
-       expected = append(expected, oldBlocks...)
-       verifyBlocks(t, nil, expected, 2)
-
-       // Backdate old blocks; but the collection still references these blocks
-       backdateBlocks(t, oldBlocks)
-
-       // also backdate the stray old block
-       backdateBlocks(t, []string{strayOldBlock})
-
-       // If requested, create an extra keepserver with the given type
-       // This should be ignored during indexing and hence not change the datamanager outcome
-       var extraKeepServerUUID string
-       if createExtraKeepServerWithType != "" {
-               extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
-               defer deleteExtraKeepServer(extraKeepServerUUID)
-       }
-
-       // run datamanager
-       dryRun = isDryRun
-       dataManagerSingleRun(t)
-
-       if dryRun {
-               // verify that all blocks, including strayOldBlock, are still to be found
-               verifyBlocks(t, nil, expected, 2)
-       } else {
-               // verify that strayOldBlock is not to be found, but the collections blocks are still there
-               verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
-       }
-}
-
-// Add one more keepstore with the given service type
-func addExtraKeepServer(t *testing.T, serviceType string) string {
-       defer switchToken(arvadostest.AdminToken)()
-
-       extraKeepService := make(arvadosclient.Dict)
-       err := arv.Create("keep_services",
-               arvadosclient.Dict{"keep_service": arvadosclient.Dict{
-                       "service_host":     "localhost",
-                       "service_port":     "21321",
-                       "service_ssl_flag": false,
-                       "service_type":     serviceType}},
-               &extraKeepService)
-       if err != nil {
-               t.Fatal(err)
-       }
-
-       return extraKeepService["uuid"].(string)
-}
-
-func deleteExtraKeepServer(uuid string) {
-       defer switchToken(arvadostest.AdminToken)()
-       arv.Delete("keep_services", uuid, nil, nil)
-}
diff --git a/services/datamanager/experimental/datamanager.py b/services/datamanager/experimental/datamanager.py
deleted file mode 100755 (executable)
index 8207bdc..0000000
+++ /dev/null
@@ -1,887 +0,0 @@
-#! /usr/bin/env python
-
-import arvados
-
-import argparse
-import cgi
-import csv
-import json
-import logging
-import math
-import pprint
-import re
-import threading
-import urllib2
-
-from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from collections import defaultdict, Counter
-from functools import partial
-from operator import itemgetter
-from SocketServer import ThreadingMixIn
-
-arv = arvados.api('v1')
-
-# Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
-byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
-def fileSizeFormat(value):
-  exponent = 0 if value == 0 else int(math.log(value, 1024))
-  return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
-                         byteunits[exponent])
-
-def percentageFloor(x):
-  """ Returns a float which is the input rounded down to the neared 0.01.
-
-e.g. precentageFloor(0.941354) = 0.94
-"""
-  return math.floor(x*100) / 100.0
-
-
-def byteSizeFromValidUuid(valid_uuid):
-  return int(valid_uuid.split('+')[1])
-
-class maxdict(dict):
-  """A dictionary that holds the largest value entered for each key."""
-  def addValue(self, key, value):
-    dict.__setitem__(self, key, max(dict.get(self, key), value))
-  def addValues(self, kv_pairs):
-    for key,value in kv_pairs:
-      self.addValue(key, value)
-  def addDict(self, d):
-    self.addValues(d.items())
-
-class CollectionInfo:
-  DEFAULT_PERSISTER_REPLICATION_LEVEL=2
-  all_by_uuid = {}
-
-  def __init__(self, uuid):
-    if CollectionInfo.all_by_uuid.has_key(uuid):
-      raise ValueError('Collection for uuid "%s" already exists.' % uuid)
-    self.uuid = uuid
-    self.block_uuids = set()  # uuids of keep blocks in this collection
-    self.reader_uuids = set()  # uuids of users who can read this collection
-    self.persister_uuids = set()  # uuids of users who want this collection saved
-    # map from user uuid to replication level they desire
-    self.persister_replication = maxdict()
-
-    # The whole api response in case we need anything else later.
-    self.api_response = []
-    CollectionInfo.all_by_uuid[uuid] = self
-
-  def byteSize(self):
-    return sum(map(byteSizeFromValidUuid, self.block_uuids))
-
-  def __str__(self):
-    return ('CollectionInfo uuid: %s\n'
-            '               %d block(s) containing %s\n'
-            '               reader_uuids: %s\n'
-            '               persister_replication: %s' %
-            (self.uuid,
-             len(self.block_uuids),
-             fileSizeFormat(self.byteSize()),
-             pprint.pformat(self.reader_uuids, indent = 15),
-             pprint.pformat(self.persister_replication, indent = 15)))
-
-  @staticmethod
-  def get(uuid):
-    if not CollectionInfo.all_by_uuid.has_key(uuid):
-      CollectionInfo(uuid)
-    return CollectionInfo.all_by_uuid[uuid]
-
-
-def extractUuid(candidate):
-  """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
-  match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
-  return match and match.group(1)
-
-def checkUserIsAdmin():
-  current_user = arv.users().current().execute()
-
-  if not current_user['is_admin']:
-    log.warning('Current user %s (%s - %s) does not have '
-                'admin access and will not see much of the data.',
-                current_user['full_name'],
-                current_user['email'],
-                current_user['uuid'])
-    if args.require_admin_user:
-      log.critical('Exiting, rerun with --no-require-admin-user '
-                   'if you wish to continue.')
-      exit(1)
-
-def buildCollectionsList():
-  if args.uuid:
-    return [args.uuid,]
-  else:
-    collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
-
-    print ('Returned %d of %d collections.' %
-           (len(collections_list_response['items']),
-            collections_list_response['items_available']))
-
-    return [item['uuid'] for item in collections_list_response['items']]
-
-
-def readCollections(collection_uuids):
-  for collection_uuid in collection_uuids:
-    collection_block_uuids = set()
-    collection_response = arv.collections().get(uuid=collection_uuid).execute()
-    collection_info = CollectionInfo.get(collection_uuid)
-    collection_info.api_response = collection_response
-    manifest_lines = collection_response['manifest_text'].split('\n')
-
-    if args.verbose:
-      print 'Manifest text for %s:' % collection_uuid
-      pprint.pprint(manifest_lines)
-
-    for manifest_line in manifest_lines:
-      if manifest_line:
-        manifest_tokens = manifest_line.split(' ')
-        if args.verbose:
-          print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
-        stream_name = manifest_tokens[0]
-
-        line_block_uuids = set(filter(None,
-                                      [extractUuid(candidate)
-                                       for candidate in manifest_tokens[1:]]))
-        collection_info.block_uuids.update(line_block_uuids)
-
-        # file_tokens = [token
-        #                for token in manifest_tokens[1:]
-        #                if extractUuid(token) is None]
-
-        # # Sort file tokens by start position in case they aren't already
-        # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
-
-        # if args.verbose:
-        #   print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
-        #   print 'file_tokens: ' + pprint.pformat(file_tokens)
-
-
-def readLinks():
-  link_classes = set()
-
-  for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
-    # TODO(misha): We may not be seing all the links, but since items
-    # available does not return an accurate number, I don't knos how
-    # to confirm that we saw all of them.
-    collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
-    link_classes.update([link['link_class'] for link in collection_links_response['items']])
-    for link in collection_links_response['items']:
-      if link['link_class'] == 'permission':
-        collection_info.reader_uuids.add(link['tail_uuid'])
-      elif link['link_class'] == 'resources':
-        replication_level = link['properties'].get(
-          'replication',
-          CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
-        collection_info.persister_replication.addValue(
-          link['tail_uuid'],
-          replication_level)
-        collection_info.persister_uuids.add(link['tail_uuid'])
-
-  print 'Found the following link classes:'
-  pprint.pprint(link_classes)
-
-def reportMostPopularCollections():
-  most_popular_collections = sorted(
-    CollectionInfo.all_by_uuid.values(),
-    key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
-    reverse=True)[:10]
-
-  print 'Most popular Collections:'
-  for collection_info in most_popular_collections:
-    print collection_info
-
-
-def buildMaps():
-  for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
-    # Add the block holding the manifest itself for all calculations
-    block_uuids = collection_info.block_uuids.union([collection_uuid,])
-    for block_uuid in block_uuids:
-      block_to_collections[block_uuid].add(collection_uuid)
-      block_to_readers[block_uuid].update(collection_info.reader_uuids)
-      block_to_persisters[block_uuid].update(collection_info.persister_uuids)
-      block_to_persister_replication[block_uuid].addDict(
-        collection_info.persister_replication)
-    for reader_uuid in collection_info.reader_uuids:
-      reader_to_collections[reader_uuid].add(collection_uuid)
-      reader_to_blocks[reader_uuid].update(block_uuids)
-    for persister_uuid in collection_info.persister_uuids:
-      persister_to_collections[persister_uuid].add(collection_uuid)
-      persister_to_blocks[persister_uuid].update(block_uuids)
-
-
-def itemsByValueLength(original):
-  return sorted(original.items(),
-                key=lambda item:len(item[1]),
-                reverse=True)
-
-
-def reportBusiestUsers():
-  busiest_readers = itemsByValueLength(reader_to_collections)
-  print 'The busiest readers are:'
-  for reader,collections in busiest_readers:
-    print '%s reading %d collections.' % (reader, len(collections))
-  busiest_persisters = itemsByValueLength(persister_to_collections)
-  print 'The busiest persisters are:'
-  for persister,collections in busiest_persisters:
-    print '%s reading %d collections.' % (persister, len(collections))
-
-
-def blockDiskUsage(block_uuid):
-  """Returns the disk usage of a block given its uuid.
-
-  Will return 0 before reading the contents of the keep servers.
-  """
-  return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
-
-def blockPersistedUsage(user_uuid, block_uuid):
-  return (byteSizeFromValidUuid(block_uuid) *
-          block_to_persister_replication[block_uuid].get(user_uuid, 0))
-
-memo_computeWeightedReplicationCosts = {}
-def computeWeightedReplicationCosts(replication_levels):
-  """Computes the relative cost of varied replication levels.
-
-  replication_levels: a tuple of integers representing the desired
-  replication level. If n users want a replication level of x then x
-  should appear n times in replication_levels.
-
-  Returns a dictionary from replication level to cost.
-
-  The basic thinking is that the cost of replicating at level x should
-  be shared by everyone who wants replication of level x or higher.
-
-  For example, if we have two users who want 1 copy, one user who
-  wants 3 copies and two users who want 6 copies:
-  the input would be [1, 1, 3, 6, 6] (or any permutation)
-
-  The cost of the first copy is shared by all 5 users, so they each
-  pay 1 copy / 5 users = 0.2.
-  The cost of the second and third copies shared by 3 users, so they
-  each pay 2 copies / 3 users = 0.67 (plus the above costs)
-  The cost of the fourth, fifth and sixth copies is shared by two
-  users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
-
-  Here are some other examples:
-  computeWeightedReplicationCosts([1,]) -> {1:1.0}
-  computeWeightedReplicationCosts([2,]) -> {2:2.0}
-  computeWeightedReplicationCosts([1,1]) -> {1:0.5}
-  computeWeightedReplicationCosts([2,2]) -> {1:1.0}
-  computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
-  computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
-  computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
-  """
-  replication_level_counts = sorted(Counter(replication_levels).items())
-
-  memo_key = str(replication_level_counts)
-
-  if not memo_key in memo_computeWeightedReplicationCosts:
-    last_level = 0
-    current_cost = 0
-    total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
-    cost_for_level = {}
-    for replication_level, count in replication_level_counts:
-      copies_added = replication_level - last_level
-      # compute marginal cost from last level and add it to the last cost
-      current_cost += copies_added / total_interested
-      cost_for_level[replication_level] = current_cost
-      # update invariants
-      last_level = replication_level
-      total_interested -= count
-    memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
-
-  return memo_computeWeightedReplicationCosts[memo_key]
-
-def blockPersistedWeightedUsage(user_uuid, block_uuid):
-  persister_replication_for_block = block_to_persister_replication[block_uuid]
-  user_replication = persister_replication_for_block[user_uuid]
-  return (
-    byteSizeFromValidUuid(block_uuid) *
-    computeWeightedReplicationCosts(
-      persister_replication_for_block.values())[user_replication])
-
-
-def computeUserStorageUsage():
-  for user, blocks in reader_to_blocks.items():
-    user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
-        byteSizeFromValidUuid,
-        blocks))
-    user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
-        lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
-                                 len(block_to_readers[block_uuid])),
-        blocks))
-  for user, blocks in persister_to_blocks.items():
-    user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
-        partial(blockPersistedUsage, user),
-        blocks))
-    user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
-        partial(blockPersistedWeightedUsage, user),
-        blocks))
-
-def printUserStorageUsage():
-  print ('user: unweighted readable block size, weighted readable block size, '
-         'unweighted persisted block size, weighted persisted block size:')
-  for user, usage in user_to_usage.items():
-    print ('%s: %s %s %s %s' %
-           (user,
-            fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
-            fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
-            fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
-            fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
-
-def logUserStorageUsage():
-  for user, usage in user_to_usage.items():
-    body = {}
-    # user could actually represent a user or a group. We don't set
-    # the object_type field since we don't know which we have.
-    body['object_uuid'] = user
-    body['event_type'] = args.user_storage_log_event_type
-    properties = {}
-    properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
-    properties['read_collections_weighted_bytes'] = (
-      usage[WEIGHTED_READ_SIZE_COL])
-    properties['persisted_collections_total_bytes'] = (
-      usage[UNWEIGHTED_PERSIST_SIZE_COL])
-    properties['persisted_collections_weighted_bytes'] = (
-      usage[WEIGHTED_PERSIST_SIZE_COL])
-    body['properties'] = properties
-    # TODO(misha): Confirm that this will throw an exception if it
-    # fails to create the log entry.
-    arv.logs().create(body=body).execute()
-
-def getKeepServers():
-  response = arv.keep_disks().list().execute()
-  return [[keep_server['service_host'], keep_server['service_port']]
-          for keep_server in response['items']]
-
-
-def getKeepBlocks(keep_servers):
-  blocks = []
-  for host,port in keep_servers:
-    response = urllib2.urlopen('http://%s:%d/index' % (host, port))
-    server_blocks = [line.split(' ')
-                     for line in response.read().split('\n')
-                     if line]
-    server_blocks = [(block_id, int(mtime))
-                     for block_id, mtime in server_blocks]
-    blocks.append(server_blocks)
-  return blocks
-
-def getKeepStats(keep_servers):
-  MOUNT_COLUMN = 5
-  TOTAL_COLUMN = 1
-  FREE_COLUMN = 3
-  DISK_BLOCK_SIZE = 1024
-  stats = []
-  for host,port in keep_servers:
-    response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
-
-    parsed_json = json.load(response)
-    df_entries = [line.split()
-                  for line in parsed_json['df'].split('\n')
-                  if line]
-    keep_volumes = [columns
-                    for columns in df_entries
-                    if 'keep' in columns[MOUNT_COLUMN]]
-    total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
-                                                  keep_volumes)))
-    free_space =  DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
-                                                  keep_volumes)))
-    stats.append([total_space, free_space])
-  return stats
-
-
-def computeReplication(keep_blocks):
-  for server_blocks in keep_blocks:
-    for block_uuid, _ in server_blocks:
-      block_to_replication[block_uuid] += 1
-  log.debug('Seeing the following replication levels among blocks: %s',
-            str(set(block_to_replication.values())))
-
-
-def computeGarbageCollectionCandidates():
-  for server_blocks in keep_blocks:
-    block_to_latest_mtime.addValues(server_blocks)
-  empty_set = set()
-  garbage_collection_priority = sorted(
-    [(block,mtime)
-     for block,mtime in block_to_latest_mtime.items()
-     if len(block_to_persisters.get(block,empty_set)) == 0],
-    key = itemgetter(1))
-  global garbage_collection_report
-  garbage_collection_report = []
-  cumulative_disk_size = 0
-  for block,mtime in garbage_collection_priority:
-    disk_size = blockDiskUsage(block)
-    cumulative_disk_size += disk_size
-    garbage_collection_report.append(
-      (block,
-       mtime,
-       disk_size,
-       cumulative_disk_size,
-       float(free_keep_space + cumulative_disk_size)/total_keep_space))
-
-  print 'The oldest Garbage Collection Candidates: '
-  pprint.pprint(garbage_collection_report[:20])
-
-
-def outputGarbageCollectionReport(filename):
-  with open(filename, 'wb') as csvfile:
-    gcwriter = csv.writer(csvfile)
-    gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
-                       'cumulative size', 'disk free'])
-    for line in garbage_collection_report:
-      gcwriter.writerow(line)
-
-def computeGarbageCollectionHistogram():
-  # TODO(misha): Modify this to allow users to specify the number of
-  # histogram buckets through a flag.
-  histogram = []
-  last_percentage = -1
-  for _,mtime,_,_,disk_free in garbage_collection_report:
-    curr_percentage = percentageFloor(disk_free)
-    if curr_percentage > last_percentage:
-      histogram.append( (mtime, curr_percentage) )
-    last_percentage = curr_percentage
-
-  log.info('Garbage collection histogram is: %s', histogram)
-
-  return histogram
-
-
-def logGarbageCollectionHistogram():
-  body = {}
-  # TODO(misha): Decide whether we should specify an object_uuid in
-  # the body and if so, which uuid to use.
-  body['event_type'] = args.block_age_free_space_histogram_log_event_type
-  properties = {}
-  properties['histogram'] = garbage_collection_histogram
-  body['properties'] = properties
-  # TODO(misha): Confirm that this will throw an exception if it
-  # fails to create the log entry.
-  arv.logs().create(body=body).execute()
-
-
-def detectReplicationProblems():
-  blocks_not_in_any_collections.update(
-    set(block_to_replication.keys()).difference(block_to_collections.keys()))
-  underreplicated_persisted_blocks.update(
-    [uuid
-     for uuid, persister_replication in block_to_persister_replication.items()
-     if len(persister_replication) > 0 and
-     block_to_replication[uuid] < max(persister_replication.values())])
-  overreplicated_persisted_blocks.update(
-    [uuid
-     for uuid, persister_replication in block_to_persister_replication.items()
-     if len(persister_replication) > 0 and
-     block_to_replication[uuid] > max(persister_replication.values())])
-
-  log.info('Found %d blocks not in any collections, e.g. %s...',
-           len(blocks_not_in_any_collections),
-           ','.join(list(blocks_not_in_any_collections)[:5]))
-  log.info('Found %d underreplicated blocks, e.g. %s...',
-           len(underreplicated_persisted_blocks),
-           ','.join(list(underreplicated_persisted_blocks)[:5]))
-  log.info('Found %d overreplicated blocks, e.g. %s...',
-           len(overreplicated_persisted_blocks),
-           ','.join(list(overreplicated_persisted_blocks)[:5]))
-
-  # TODO:
-  #  Read blocks sorted by mtime
-  #  Cache window vs % free space
-  #  Collections which candidates will appear in
-  #  Youngest underreplicated read blocks that appear in collections.
-  #  Report Collections that have blocks which are missing from (or
-  #   underreplicated in) keep.
-
-
-# This is the main flow here
-
-parser = argparse.ArgumentParser(description='Report on keep disks.')
-"""The command line argument parser we use.
-
-We only use it in the __main__ block, but leave it outside the block
-in case another package wants to use it or customize it by specifying
-it as a parent to their commandline parser.
-"""
-parser.add_argument('-m',
-                    '--max-api-results',
-                    type=int,
-                    default=5000,
-                    help=('The max results to get at once.'))
-parser.add_argument('-p',
-                    '--port',
-                    type=int,
-                    default=9090,
-                    help=('The port number to serve on. 0 means no server.'))
-parser.add_argument('-v',
-                    '--verbose',
-                    help='increase output verbosity',
-                    action='store_true')
-parser.add_argument('-u',
-                    '--uuid',
-                    help='uuid of specific collection to process')
-parser.add_argument('--require-admin-user',
-                    action='store_true',
-                    default=True,
-                    help='Fail if the user is not an admin [default]')
-parser.add_argument('--no-require-admin-user',
-                    dest='require_admin_user',
-                    action='store_false',
-                    help=('Allow users without admin permissions with '
-                          'only a warning.'))
-parser.add_argument('--log-to-workbench',
-                    action='store_true',
-                    default=False,
-                    help='Log findings to workbench')
-parser.add_argument('--no-log-to-workbench',
-                    dest='log_to_workbench',
-                    action='store_false',
-                    help='Don\'t log findings to workbench [default]')
-parser.add_argument('--user-storage-log-event-type',
-                    default='user-storage-report',
-                    help=('The event type to set when logging user '
-                          'storage usage to workbench.'))
-parser.add_argument('--block-age-free-space-histogram-log-event-type',
-                    default='block-age-free-space-histogram',
-                    help=('The event type to set when logging user '
-                          'storage usage to workbench.'))
-parser.add_argument('--garbage-collection-file',
-                    default='',
-                    help=('The file to write a garbage collection report, or '
-                          'leave empty for no report.'))
-
-args = None
-
-# TODO(misha): Think about moving some of this to the __main__ block.
-log = logging.getLogger('arvados.services.datamanager')
-stderr_handler = logging.StreamHandler()
-log.setLevel(logging.INFO)
-stderr_handler.setFormatter(
-  logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
-log.addHandler(stderr_handler)
-
-# Global Data - don't try this at home
-collection_uuids = []
-
-# These maps all map from uuids to a set of uuids
-block_to_collections = defaultdict(set)  # keep blocks
-reader_to_collections = defaultdict(set)  # collection(s) for which the user has read access
-persister_to_collections = defaultdict(set)  # collection(s) which the user has persisted
-block_to_readers = defaultdict(set)
-block_to_persisters = defaultdict(set)
-block_to_persister_replication = defaultdict(maxdict)
-reader_to_blocks = defaultdict(set)
-persister_to_blocks = defaultdict(set)
-
-UNWEIGHTED_READ_SIZE_COL = 0
-WEIGHTED_READ_SIZE_COL = 1
-UNWEIGHTED_PERSIST_SIZE_COL = 2
-WEIGHTED_PERSIST_SIZE_COL = 3
-NUM_COLS = 4
-user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
-
-keep_servers = []
-keep_blocks = []
-keep_stats = []
-total_keep_space = 0
-free_keep_space =  0
-
-block_to_replication = defaultdict(lambda: 0)
-block_to_latest_mtime = maxdict()
-
-garbage_collection_report = []
-"""A list of non-persisted blocks, sorted by increasing mtime
-
-Each entry is of the form (block uuid, latest mtime, disk size,
-cumulative size)
-
-* block uuid: The id of the block we want to delete
-* latest mtime: The latest mtime of the block across all keep servers.
-* disk size: The total disk space used by this block (block size
-multiplied by current replication level)
-* cumulative disk size: The sum of this block's disk size and all the
-blocks listed above it
-* disk free: The proportion of our disk space that would be free if we
-deleted this block and all the above. So this is (free disk space +
-cumulative disk size) / total disk capacity
-"""
-
-garbage_collection_histogram = []
-""" Shows the tradeoff of keep block age vs keep disk free space.
-
-Each entry is of the form (mtime, Disk Proportion).
-
-An entry of the form (1388747781, 0.52) means that if we deleted the
-oldest non-presisted blocks until we had 52% of the disk free, then
-all blocks with an mtime greater than 1388747781 would be preserved.
-"""
-
-# Stuff to report on
-blocks_not_in_any_collections = set()
-underreplicated_persisted_blocks = set()
-overreplicated_persisted_blocks = set()
-
-all_data_loaded = False
-
-def loadAllData():
-  checkUserIsAdmin()
-
-  log.info('Building Collection List')
-  global collection_uuids
-  collection_uuids = filter(None, [extractUuid(candidate)
-                                   for candidate in buildCollectionsList()])
-
-  log.info('Reading Collections')
-  readCollections(collection_uuids)
-
-  if args.verbose:
-    pprint.pprint(CollectionInfo.all_by_uuid)
-
-  log.info('Reading Links')
-  readLinks()
-
-  reportMostPopularCollections()
-
-  log.info('Building Maps')
-  buildMaps()
-
-  reportBusiestUsers()
-
-  log.info('Getting Keep Servers')
-  global keep_servers
-  keep_servers = getKeepServers()
-
-  print keep_servers
-
-  log.info('Getting Blocks from each Keep Server.')
-  global keep_blocks
-  keep_blocks = getKeepBlocks(keep_servers)
-
-  log.info('Getting Stats from each Keep Server.')
-  global keep_stats, total_keep_space, free_keep_space
-  keep_stats = getKeepStats(keep_servers)
-
-  total_keep_space = sum(map(itemgetter(0), keep_stats))
-  free_keep_space = sum(map(itemgetter(1), keep_stats))
-
-  # TODO(misha): Delete this hack when the keep servers are fixed!
-  # This hack deals with the fact that keep servers report each other's disks.
-  total_keep_space /= len(keep_stats)
-  free_keep_space /= len(keep_stats)
-
-  log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
-           (fileSizeFormat(total_keep_space),
-            fileSizeFormat(free_keep_space),
-            100*free_keep_space/total_keep_space))
-
-  computeReplication(keep_blocks)
-
-  log.info('average replication level is %f',
-           (float(sum(block_to_replication.values())) /
-            len(block_to_replication)))
-
-  computeGarbageCollectionCandidates()
-
-  if args.garbage_collection_file:
-    log.info('Writing garbage Collection report to %s',
-             args.garbage_collection_file)
-    outputGarbageCollectionReport(args.garbage_collection_file)
-
-  global garbage_collection_histogram
-  garbage_collection_histogram = computeGarbageCollectionHistogram()
-
-  if args.log_to_workbench:
-    logGarbageCollectionHistogram()
-
-  detectReplicationProblems()
-
-  computeUserStorageUsage()
-  printUserStorageUsage()
-  if args.log_to_workbench:
-    logUserStorageUsage()
-
-  global all_data_loaded
-  all_data_loaded = True
-
-
-class DataManagerHandler(BaseHTTPRequestHandler):
-  USER_PATH = 'user'
-  COLLECTION_PATH = 'collection'
-  BLOCK_PATH = 'block'
-
-  def userLink(self, uuid):
-    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
-            {'uuid': uuid,
-             'path': DataManagerHandler.USER_PATH})
-
-  def collectionLink(self, uuid):
-    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
-            {'uuid': uuid,
-             'path': DataManagerHandler.COLLECTION_PATH})
-
-  def blockLink(self, uuid):
-    return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
-            {'uuid': uuid,
-             'path': DataManagerHandler.BLOCK_PATH})
-
-  def writeTop(self, title):
-    self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
-
-  def writeBottom(self):
-    self.wfile.write('</BODY></HTML>\n')
-
-  def writeHomePage(self):
-    self.send_response(200)
-    self.end_headers()
-    self.writeTop('Home')
-    self.wfile.write('<TABLE>')
-    self.wfile.write('<TR><TH>user'
-                     '<TH>unweighted readable block size'
-                     '<TH>weighted readable block size'
-                     '<TH>unweighted persisted block size'
-                     '<TH>weighted persisted block size</TR>\n')
-    for user, usage in user_to_usage.items():
-      self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
-                       (self.userLink(user),
-                        fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
-                        fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
-                        fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
-                        fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
-    self.wfile.write('</TABLE>\n')
-    self.writeBottom()
-
-  def userExists(self, uuid):
-    # Currently this will return false for a user who exists but
-    # doesn't appear on any manifests.
-    # TODO(misha): Figure out if we need to fix this.
-    return user_to_usage.has_key(uuid)
-
-  def writeUserPage(self, uuid):
-    if not self.userExists(uuid):
-      self.send_error(404,
-                      'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
-    else:
-      # Here we assume that since a user exists, they don't need to be
-      # html escaped.
-      self.send_response(200)
-      self.end_headers()
-      self.writeTop('User %s' % uuid)
-      self.wfile.write('<TABLE>')
-      self.wfile.write('<TR><TH>user'
-                       '<TH>unweighted readable block size'
-                       '<TH>weighted readable block size'
-                       '<TH>unweighted persisted block size'
-                       '<TH>weighted persisted block size</TR>\n')
-      usage = user_to_usage[uuid]
-      self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
-                       (self.userLink(uuid),
-                        fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
-                        fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
-                        fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
-                        fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
-      self.wfile.write('</TABLE>\n')
-      self.wfile.write('<P>Persisting Collections: %s\n' %
-                       ', '.join(map(self.collectionLink,
-                                     persister_to_collections[uuid])))
-      self.wfile.write('<P>Reading Collections: %s\n' %
-                       ', '.join(map(self.collectionLink,
-                                     reader_to_collections[uuid])))
-      self.writeBottom()
-
-  def collectionExists(self, uuid):
-    return CollectionInfo.all_by_uuid.has_key(uuid)
-
-  def writeCollectionPage(self, uuid):
-    if not self.collectionExists(uuid):
-      self.send_error(404,
-                      'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
-    else:
-      collection = CollectionInfo.get(uuid)
-      # Here we assume that since a collection exists, its id doesn't
-      # need to be html escaped.
-      self.send_response(200)
-      self.end_headers()
-      self.writeTop('Collection %s' % uuid)
-      self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
-      self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
-                       fileSizeFormat(collection.byteSize()))
-      self.wfile.write('<P>Readers: %s\n' %
-                       ', '.join(map(self.userLink, collection.reader_uuids)))
-
-      if len(collection.persister_replication) == 0:
-        self.wfile.write('<P>No persisters\n')
-      else:
-        replication_to_users = defaultdict(set)
-        for user,replication in collection.persister_replication.items():
-          replication_to_users[replication].add(user)
-        replication_levels = sorted(replication_to_users.keys())
-
-        self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
-                         'out at %dx replication:\n' %
-                         (len(collection.persister_replication),
-                          len(replication_levels),
-                          replication_levels[-1]))
-
-        # TODO(misha): This code is used twice, let's move it to a method.
-        self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
-                         '<TH>'.join(['Replication Level ' + str(x)
-                                      for x in replication_levels]))
-        self.wfile.write('<TR>\n')
-        for replication_level in replication_levels:
-          users = replication_to_users[replication_level]
-          self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
-              map(self.userLink, users)))
-        self.wfile.write('</TR></TABLE>\n')
-
-      replication_to_blocks = defaultdict(set)
-      for block in collection.block_uuids:
-        replication_to_blocks[block_to_replication[block]].add(block)
-      replication_levels = sorted(replication_to_blocks.keys())
-      self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
-                       (len(collection.block_uuids), len(replication_levels)))
-      self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
-                       '<TH>'.join(['Replication Level ' + str(x)
-                                    for x in replication_levels]))
-      self.wfile.write('<TR>\n')
-      for replication_level in replication_levels:
-        blocks = replication_to_blocks[replication_level]
-        self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
-      self.wfile.write('</TR></TABLE>\n')
-
-
-  def do_GET(self):
-    if not all_data_loaded:
-      self.send_error(503,
-                      'Sorry, but I am still loading all the data I need.')
-    else:
-      # Removing leading '/' and process request path
-      split_path = self.path[1:].split('/')
-      request_type = split_path[0]
-      log.debug('path (%s) split as %s with request_type %s' % (self.path,
-                                                                split_path,
-                                                                request_type))
-      if request_type == '':
-        self.writeHomePage()
-      elif request_type == DataManagerHandler.USER_PATH:
-        self.writeUserPage(split_path[1])
-      elif request_type == DataManagerHandler.COLLECTION_PATH:
-        self.writeCollectionPage(split_path[1])
-      else:
-        self.send_error(404, 'Unrecognized request path.')
-    return
-
-class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
-  """Handle requests in a separate thread."""
-
-
-if __name__ == '__main__':
-  args = parser.parse_args()
-
-  if args.port == 0:
-    loadAllData()
-  else:
-    loader = threading.Thread(target = loadAllData, name = 'loader')
-    loader.start()
-
-    server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
-    server.serve_forever()
diff --git a/services/datamanager/experimental/datamanager_test.py b/services/datamanager/experimental/datamanager_test.py
deleted file mode 100755 (executable)
index 0842c16..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-#! /usr/bin/env python
-
-import datamanager
-import unittest
-
-class TestComputeWeightedReplicationCosts(unittest.TestCase):
-  def test_obvious(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,]),
-                     {1:1.0})
-
-  def test_simple(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([2,]),
-                     {2:2.0})
-
-  def test_even_split(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1]),
-                     {1:0.5})
-
-  def test_even_split_bigger(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([2,2]),
-                     {2:1.0})
-
-  def test_uneven_split(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,2]),
-                     {1:0.5, 2:1.5})
-
-  def test_uneven_split_bigger(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3]),
-                     {1:0.5, 3:2.5})
-
-  def test_uneven_split_jumble(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3,6,6,10]),
-                     {1:0.2, 3:0.7, 6:1.7, 10:5.7})
-
-  def test_documentation_example(self):
-    self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1,3,6,6]),
-                     {1:0.2, 3: 0.2 + 2.0 / 3, 6: 0.2 + 2.0 / 3 + 1.5})
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
deleted file mode 100644 (file)
index 39d2d5b..0000000
+++ /dev/null
@@ -1,551 +0,0 @@
-/* Deals with getting Keep Server blocks from API Server and Keep Servers. */
-
-package keep
-
-import (
-       "bufio"
-       "encoding/json"
-       "errors"
-       "flag"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "io"
-       "io/ioutil"
-       "log"
-       "net/http"
-       "strconv"
-       "strings"
-       "time"
-)
-
-// ServerAddress struct
-type ServerAddress struct {
-       SSL         bool   `json:"service_ssl_flag"`
-       Host        string `json:"service_host"`
-       Port        int    `json:"service_port"`
-       UUID        string `json:"uuid"`
-       ServiceType string `json:"service_type"`
-}
-
-// BlockInfo is info about a particular block returned by the server
-type BlockInfo struct {
-       Digest blockdigest.DigestWithSize
-       Mtime  int64 // TODO(misha): Replace this with a timestamp.
-}
-
-// BlockServerInfo is info about a specified block given by a server
-type BlockServerInfo struct {
-       ServerIndex int
-       Mtime       int64 // TODO(misha): Replace this with a timestamp.
-}
-
-// ServerContents struct
-type ServerContents struct {
-       BlockDigestToInfo map[blockdigest.DigestWithSize]BlockInfo
-}
-
-// ServerResponse struct
-type ServerResponse struct {
-       Address  ServerAddress
-       Contents ServerContents
-       Err      error
-}
-
-// ReadServers struct
-type ReadServers struct {
-       ReadAllServers           bool
-       KeepServerIndexToAddress []ServerAddress
-       KeepServerAddressToIndex map[ServerAddress]int
-       ServerToContents         map[ServerAddress]ServerContents
-       BlockToServers           map[blockdigest.DigestWithSize][]BlockServerInfo
-       BlockReplicationCounts   map[int]int
-}
-
-// GetKeepServersParams struct
-type GetKeepServersParams struct {
-       Client *arvadosclient.ArvadosClient
-       Logger *logger.Logger
-       Limit  int
-}
-
-// ServiceList consists of the addresses of all the available kee servers
-type ServiceList struct {
-       ItemsAvailable int             `json:"items_available"`
-       KeepServers    []ServerAddress `json:"items"`
-}
-
-var serviceType string
-
-func init() {
-       flag.StringVar(&serviceType,
-               "service-type",
-               "disk",
-               "Operate only on keep_services with the specified service_type, ignoring all others.")
-}
-
-// String
-// TODO(misha): Change this to include the UUID as well.
-func (s ServerAddress) String() string {
-       return s.URL()
-}
-
-// URL of the keep server
-func (s ServerAddress) URL() string {
-       if s.SSL {
-               return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
-       }
-       return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
-}
-
-// GetKeepServersAndSummarize gets keep servers from api
-func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers, err error) {
-       results, err = GetKeepServers(params)
-       if err != nil {
-               return
-       }
-       log.Printf("Returned %d keep disks", len(results.ServerToContents))
-
-       results.Summarize(params.Logger)
-       log.Printf("Replication level distribution: %v",
-               results.BlockReplicationCounts)
-
-       return
-}
-
-// GetKeepServers from api server
-func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
-       sdkParams := arvadosclient.Dict{
-               "filters": [][]string{{"service_type", "!=", "proxy"}},
-       }
-       if params.Limit > 0 {
-               sdkParams["limit"] = params.Limit
-       }
-
-       var sdkResponse ServiceList
-       err = params.Client.List("keep_services", sdkParams, &sdkResponse)
-
-       if err != nil {
-               return
-       }
-
-       var keepServers []ServerAddress
-       for _, server := range sdkResponse.KeepServers {
-               if server.ServiceType == serviceType {
-                       keepServers = append(keepServers, server)
-               } else {
-                       log.Printf("Skipping keep_service %q because its service_type %q does not match -service-type=%q", server, server.ServiceType, serviceType)
-               }
-       }
-
-       if len(keepServers) == 0 {
-               return results, fmt.Errorf("Found no keepservices with the service type %v", serviceType)
-       }
-
-       if params.Logger != nil {
-               params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
-                       keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
-                       keepInfo["keep_servers"] = sdkResponse.KeepServers
-                       keepInfo["indexable_keep_servers"] = keepServers
-               })
-       }
-
-       log.Printf("Received keep services list: %+v", sdkResponse)
-
-       if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
-               return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse)
-       }
-
-       results.KeepServerIndexToAddress = keepServers
-       results.KeepServerAddressToIndex = make(map[ServerAddress]int)
-       for i, address := range results.KeepServerIndexToAddress {
-               results.KeepServerAddressToIndex[address] = i
-       }
-
-       log.Printf("Got Server Addresses: %v", results)
-
-       // Send off all the index requests concurrently
-       responseChan := make(chan ServerResponse)
-       for _, keepServer := range results.KeepServerIndexToAddress {
-               // The above keepsServer variable is reused for each iteration, so
-               // it would be shared across all goroutines. This would result in
-               // us querying one server n times instead of n different servers
-               // as we intended. To avoid this we add it as an explicit
-               // parameter which gets copied. This bug and solution is described
-               // in https://golang.org/doc/effective_go.html#channels
-               go func(keepServer ServerAddress) {
-                       responseChan <- GetServerContents(params.Logger,
-                               keepServer,
-                               params.Client)
-               }(keepServer)
-       }
-
-       results.ServerToContents = make(map[ServerAddress]ServerContents)
-       results.BlockToServers = make(map[blockdigest.DigestWithSize][]BlockServerInfo)
-
-       // Read all the responses
-       for i := range results.KeepServerIndexToAddress {
-               _ = i // Here to prevent go from complaining.
-               response := <-responseChan
-
-               // Check if there were any errors during GetServerContents
-               if response.Err != nil {
-                       return results, response.Err
-               }
-
-               log.Printf("Received channel response from %v containing %d files",
-                       response.Address,
-                       len(response.Contents.BlockDigestToInfo))
-               results.ServerToContents[response.Address] = response.Contents
-               serverIndex := results.KeepServerAddressToIndex[response.Address]
-               for _, blockInfo := range response.Contents.BlockDigestToInfo {
-                       results.BlockToServers[blockInfo.Digest] = append(
-                               results.BlockToServers[blockInfo.Digest],
-                               BlockServerInfo{ServerIndex: serverIndex,
-                                       Mtime: blockInfo.Mtime})
-               }
-       }
-       return
-}
-
-// GetServerContents of the keep server
-func GetServerContents(arvLogger *logger.Logger,
-       keepServer ServerAddress,
-       arv *arvadosclient.ArvadosClient) (response ServerResponse) {
-
-       err := GetServerStatus(arvLogger, keepServer, arv)
-       if err != nil {
-               response.Err = err
-               return
-       }
-
-       req, err := CreateIndexRequest(arvLogger, keepServer, arv)
-       if err != nil {
-               response.Err = err
-               return
-       }
-
-       resp, err := arv.Client.Do(req)
-       if err != nil {
-               response.Err = err
-               return
-       }
-
-       response, err = ReadServerResponse(arvLogger, keepServer, resp)
-       if err != nil {
-               response.Err = err
-               return
-       }
-
-       return
-}
-
-// GetServerStatus get keep server status by invoking /status.json
-func GetServerStatus(arvLogger *logger.Logger,
-       keepServer ServerAddress,
-       arv *arvadosclient.ArvadosClient) error {
-       url := fmt.Sprintf("http://%s:%d/status.json",
-               keepServer.Host,
-               keepServer.Port)
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := make(map[string]interface{})
-                       serverInfo["status_request_sent_at"] = now
-                       serverInfo["host"] = keepServer.Host
-                       serverInfo["port"] = keepServer.Port
-
-                       keepInfo[keepServer.UUID] = serverInfo
-               })
-       }
-
-       resp, err := arv.Client.Get(url)
-       if err != nil {
-               return fmt.Errorf("Error getting keep status from %s: %v", url, err)
-       } else if resp.StatusCode != 200 {
-               return fmt.Errorf("Received error code %d in response to request "+
-                       "for %s status: %s",
-                       resp.StatusCode, url, resp.Status)
-       }
-
-       var keepStatus map[string]interface{}
-       decoder := json.NewDecoder(resp.Body)
-       decoder.UseNumber()
-       err = decoder.Decode(&keepStatus)
-       if err != nil {
-               return fmt.Errorf("Error decoding keep status from %s: %v", url, err)
-       }
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-                       serverInfo["status_response_processed_at"] = now
-                       serverInfo["status"] = keepStatus
-               })
-       }
-
-       return nil
-}
-
-// CreateIndexRequest to the keep server
-func CreateIndexRequest(arvLogger *logger.Logger,
-       keepServer ServerAddress,
-       arv *arvadosclient.ArvadosClient) (req *http.Request, err error) {
-       url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
-       log.Println("About to fetch keep server contents from " + url)
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-                       serverInfo["index_request_sent_at"] = now
-               })
-       }
-
-       req, err = http.NewRequest("GET", url, nil)
-       if err != nil {
-               return req, fmt.Errorf("Error building http request for %s: %v", url, err)
-       }
-
-       req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
-       return req, err
-}
-
-// ReadServerResponse reads reasponse from keep server
-func ReadServerResponse(arvLogger *logger.Logger,
-       keepServer ServerAddress,
-       resp *http.Response) (response ServerResponse, err error) {
-
-       if resp.StatusCode != 200 {
-               return response, fmt.Errorf("Received error code %d in response to index request for %s: %s",
-                       resp.StatusCode, keepServer.String(), resp.Status)
-       }
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-                       serverInfo["index_response_received_at"] = now
-               })
-       }
-
-       response.Address = keepServer
-       response.Contents.BlockDigestToInfo =
-               make(map[blockdigest.DigestWithSize]BlockInfo)
-       reader := bufio.NewReader(resp.Body)
-       numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
-       for {
-               numLines++
-               line, err := reader.ReadString('\n')
-               if err == io.EOF {
-                       return response, fmt.Errorf("Index from %s truncated at line %d",
-                               keepServer.String(), numLines)
-               } else if err != nil {
-                       return response, fmt.Errorf("Error reading index response from %s at line %d: %v",
-                               keepServer.String(), numLines, err)
-               }
-               if line == "\n" {
-                       if _, err := reader.Peek(1); err == nil {
-                               extra, _ := reader.ReadString('\n')
-                               return response, fmt.Errorf("Index from %s had trailing data at line %d after EOF marker: %s",
-                                       keepServer.String(), numLines+1, extra)
-                       } else if err != io.EOF {
-                               return response, fmt.Errorf("Index from %s had read error after EOF marker at line %d: %v",
-                                       keepServer.String(), numLines, err)
-                       }
-                       numLines--
-                       break
-               }
-               blockInfo, err := parseBlockInfoFromIndexLine(line)
-               if err != nil {
-                       return response, fmt.Errorf("Error parsing BlockInfo from index line "+
-                               "received from %s: %v",
-                               keepServer.String(),
-                               err)
-               }
-
-               if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
-                       // This server returned multiple lines containing the same block digest.
-                       numDuplicates++
-                       // Keep the block that's newer.
-                       if storedBlock.Mtime < blockInfo.Mtime {
-                               response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
-                       }
-               } else {
-                       response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
-               }
-       }
-
-       log.Printf("%s index contained %d lines with %d duplicates with "+
-               "%d size disagreements",
-               keepServer.String(),
-               numLines,
-               numDuplicates,
-               numSizeDisagreements)
-
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-
-                       serverInfo["processing_finished_at"] = now
-                       serverInfo["lines_received"] = numLines
-                       serverInfo["duplicates_seen"] = numDuplicates
-                       serverInfo["size_disagreements_seen"] = numSizeDisagreements
-               })
-       }
-       resp.Body.Close()
-       return
-}
-
-func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err error) {
-       tokens := strings.Fields(indexLine)
-       if len(tokens) != 2 {
-               err = fmt.Errorf("Expected 2 tokens per line but received a "+
-                       "line containing %#q instead.",
-                       tokens)
-       }
-
-       var locator blockdigest.BlockLocator
-       if locator, err = blockdigest.ParseBlockLocator(tokens[0]); err != nil {
-               err = fmt.Errorf("%v Received error while parsing line \"%#q\"",
-                       err, indexLine)
-               return
-       }
-       if len(locator.Hints) > 0 {
-               err = fmt.Errorf("Block locator in index line should not contain hints "+
-                       "but it does: %#q",
-                       locator)
-               return
-       }
-
-       var ns int64
-       ns, err = strconv.ParseInt(tokens[1], 10, 64)
-       if err != nil {
-               return
-       }
-       if ns < 1e12 {
-               // An old version of keepstore is giving us timestamps
-               // in seconds instead of nanoseconds. (This threshold
-               // correctly handles all times between 1970-01-02 and
-               // 33658-09-27.)
-               ns = ns * 1e9
-       }
-       blockInfo.Mtime = ns
-       blockInfo.Digest = blockdigest.DigestWithSize{
-               Digest: locator.Digest,
-               Size:   uint32(locator.Size),
-       }
-       return
-}
-
-// Summarize results from keep server
-func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
-       readServers.BlockReplicationCounts = make(map[int]int)
-       for _, infos := range readServers.BlockToServers {
-               replication := len(infos)
-               readServers.BlockReplicationCounts[replication]++
-       }
-
-       if arvLogger != nil {
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                       keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
-               })
-       }
-}
-
-// TrashRequest struct
-type TrashRequest struct {
-       Locator    string `json:"locator"`
-       BlockMtime int64  `json:"block_mtime"`
-}
-
-// TrashList is an array of TrashRequest objects
-type TrashList []TrashRequest
-
-// SendTrashLists to trash queue
-func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList, dryRun bool) (errs []error) {
-       count := 0
-       barrier := make(chan error)
-
-       client := kc.Client
-
-       for url, v := range spl {
-               if arvLogger != nil {
-                       // We need a local variable because Update doesn't call our mutator func until later,
-                       // when our list variable might have been reused by the next loop iteration.
-                       url := url
-                       trashLen := len(v)
-                       arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               trashListInfo := logger.GetOrCreateMap(p, "trash_list_len")
-                               trashListInfo[url] = trashLen
-                       })
-               }
-
-               if dryRun {
-                       log.Printf("dry run, not sending trash list to service %s with %d blocks", url, len(v))
-                       continue
-               }
-
-               count++
-               log.Printf("Sending trash list to %v", url)
-
-               go (func(url string, v TrashList) {
-                       pipeReader, pipeWriter := io.Pipe()
-                       go (func() {
-                               enc := json.NewEncoder(pipeWriter)
-                               enc.Encode(v)
-                               pipeWriter.Close()
-                       })()
-
-                       req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
-                       if err != nil {
-                               log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
-                               barrier <- err
-                               return
-                       }
-
-                       req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
-
-                       // Make the request
-                       var resp *http.Response
-                       if resp, err = client.Do(req); err != nil {
-                               log.Printf("Error sending trash list to %v error: %v", url, err.Error())
-                               barrier <- err
-                               return
-                       }
-
-                       log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
-
-                       io.Copy(ioutil.Discard, resp.Body)
-                       resp.Body.Close()
-
-                       if resp.StatusCode != 200 {
-                               barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode))
-                       } else {
-                               barrier <- nil
-                       }
-               })(url, v)
-       }
-
-       for i := 0; i < count; i++ {
-               b := <-barrier
-               if b != nil {
-                       errs = append(errs, b)
-               }
-       }
-
-       return errs
-}
diff --git a/services/datamanager/keep/keep_test.go b/services/datamanager/keep/keep_test.go
deleted file mode 100644 (file)
index ca8797e..0000000
+++ /dev/null
@@ -1,278 +0,0 @@
-package keep
-
-import (
-       "encoding/json"
-       "fmt"
-       "net"
-       "net/http"
-       "net/http/httptest"
-       "net/url"
-       "strconv"
-       "strings"
-       "testing"
-
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-
-       . "gopkg.in/check.v1"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) {
-       TestingT(t)
-}
-
-type KeepSuite struct{}
-
-var _ = Suite(&KeepSuite{})
-
-type TestHandler struct {
-       request TrashList
-}
-
-func (ts *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
-       r := json.NewDecoder(req.Body)
-       r.Decode(&ts.request)
-}
-
-func (s *KeepSuite) TestSendTrashLists(c *C) {
-       th := TestHandler{}
-       server := httptest.NewServer(&th)
-       defer server.Close()
-
-       tl := map[string]TrashList{
-               server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
-
-       arv := &arvadosclient.ArvadosClient{ApiToken: "abc123"}
-       kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
-       kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
-               map[string]string{"xxxx": server.URL},
-               map[string]string{})
-
-       err := SendTrashLists(nil, &kc, tl, false)
-
-       c.Check(err, IsNil)
-
-       c.Check(th.request,
-               DeepEquals,
-               tl[server.URL])
-
-}
-
-type TestHandlerError struct {
-}
-
-func (tse *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
-       http.Error(writer, "I'm a teapot", 418)
-}
-
-func sendTrashListError(c *C, server *httptest.Server) {
-       tl := map[string]TrashList{
-               server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
-
-       arv := &arvadosclient.ArvadosClient{ApiToken: "abc123"}
-       kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
-       kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
-               map[string]string{"xxxx": server.URL},
-               map[string]string{})
-
-       err := SendTrashLists(nil, &kc, tl, false)
-
-       c.Check(err, NotNil)
-       c.Check(err[0], NotNil)
-}
-
-func (s *KeepSuite) TestSendTrashListErrorResponse(c *C) {
-       server := httptest.NewServer(&TestHandlerError{})
-       sendTrashListError(c, server)
-       defer server.Close()
-}
-
-func (s *KeepSuite) TestSendTrashListUnreachable(c *C) {
-       sendTrashListError(c, httptest.NewUnstartedServer(&TestHandler{}))
-}
-
-type APITestData struct {
-       numServers int
-       serverType string
-       statusCode int
-}
-
-func (s *KeepSuite) TestGetKeepServers_UnsupportedServiceType(c *C) {
-       testGetKeepServersFromAPI(c, APITestData{1, "notadisk", 200}, "Found no keepservices with the service type disk")
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReceivedTooFewServers(c *C) {
-       testGetKeepServersFromAPI(c, APITestData{2, "disk", 200}, "Did not receive all available keep servers")
-}
-
-func (s *KeepSuite) TestGetKeepServers_ServerError(c *C) {
-       testGetKeepServersFromAPI(c, APITestData{-1, "disk", -1}, "arvados API server error")
-}
-
-func testGetKeepServersFromAPI(c *C, testData APITestData, expectedError string) {
-       keepServers := ServiceList{
-               ItemsAvailable: testData.numServers,
-               KeepServers: []ServerAddress{{
-                       SSL:         false,
-                       Host:        "example.com",
-                       Port:        12345,
-                       UUID:        "abcdefg",
-                       ServiceType: testData.serverType,
-               }},
-       }
-
-       ksJSON, _ := json.Marshal(keepServers)
-       apiStubResponses := make(map[string]arvadostest.StubResponse)
-       apiStubResponses["/arvados/v1/keep_services"] = arvadostest.StubResponse{testData.statusCode, string(ksJSON)}
-       apiStub := arvadostest.ServerStub{apiStubResponses}
-
-       api := httptest.NewServer(&apiStub)
-       defer api.Close()
-
-       arv := &arvadosclient.ArvadosClient{
-               Scheme:    "http",
-               ApiServer: api.URL[7:],
-               ApiToken:  "abc123",
-               Client:    &http.Client{Transport: &http.Transport{}},
-       }
-
-       kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
-       kc.SetServiceRoots(map[string]string{"xxxx": "http://example.com:23456"},
-               map[string]string{"xxxx": "http://example.com:23456"},
-               map[string]string{})
-
-       params := GetKeepServersParams{
-               Client: arv,
-               Logger: nil,
-               Limit:  10,
-       }
-
-       _, err := GetKeepServersAndSummarize(params)
-       c.Assert(err, NotNil)
-       c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*", expectedError))
-}
-
-type KeepServerTestData struct {
-       // handle /status.json
-       statusStatusCode int
-
-       // handle /index
-       indexStatusCode   int
-       indexResponseBody string
-
-       // expected error, if any
-       expectedError string
-}
-
-func (s *KeepSuite) TestGetKeepServers_ErrorGettingKeepServerStatus(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{500, 200, "ok",
-               ".*http://.* 500 Internal Server Error"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_GettingIndex(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, -1, "notok",
-               ".*redirect-loop.*"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ErrorReadServerResponse(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 500, "notok",
-               ".*http://.* 500 Internal Server Error"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseTuncatedAtLineOne(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200,
-               "notterminatedwithnewline", "Index from http://.* truncated at line 1"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_InvalidBlockLocatorPattern(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200, "testing\n",
-               "Error parsing BlockInfo from index line.*"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseEmpty(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200, "\n", ""})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseWithTwoBlocks(c *C) {
-       testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200,
-               "51752ba076e461ec9ec1d27400a08548+20 1447526361\na048cc05c02ba1ee43ad071274b9e547+52 1447526362\n\n", ""})
-}
-
-func testGetKeepServersAndSummarize(c *C, testData KeepServerTestData) {
-       ksStubResponses := make(map[string]arvadostest.StubResponse)
-       ksStubResponses["/status.json"] = arvadostest.StubResponse{testData.statusStatusCode, string(`{}`)}
-       ksStubResponses["/index"] = arvadostest.StubResponse{testData.indexStatusCode, testData.indexResponseBody}
-       ksStub := arvadostest.ServerStub{ksStubResponses}
-       ks := httptest.NewServer(&ksStub)
-       defer ks.Close()
-
-       ksURL, err := url.Parse(ks.URL)
-       c.Check(err, IsNil)
-       ksHost, port, err := net.SplitHostPort(ksURL.Host)
-       ksPort, err := strconv.Atoi(port)
-       c.Check(err, IsNil)
-
-       servers_list := ServiceList{
-               ItemsAvailable: 1,
-               KeepServers: []ServerAddress{{
-                       SSL:         false,
-                       Host:        ksHost,
-                       Port:        ksPort,
-                       UUID:        "abcdefg",
-                       ServiceType: "disk",
-               }},
-       }
-       ksJSON, _ := json.Marshal(servers_list)
-       apiStubResponses := make(map[string]arvadostest.StubResponse)
-       apiStubResponses["/arvados/v1/keep_services"] = arvadostest.StubResponse{200, string(ksJSON)}
-       apiStub := arvadostest.ServerStub{apiStubResponses}
-
-       api := httptest.NewServer(&apiStub)
-       defer api.Close()
-
-       arv := &arvadosclient.ArvadosClient{
-               Scheme:    "http",
-               ApiServer: api.URL[7:],
-               ApiToken:  "abc123",
-               Client:    &http.Client{Transport: &http.Transport{}},
-       }
-
-       kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
-       kc.SetServiceRoots(map[string]string{"xxxx": ks.URL},
-               map[string]string{"xxxx": ks.URL},
-               map[string]string{})
-
-       params := GetKeepServersParams{
-               Client: arv,
-               Logger: nil,
-               Limit:  10,
-       }
-
-       // GetKeepServersAndSummarize
-       results, err := GetKeepServersAndSummarize(params)
-
-       if testData.expectedError == "" {
-               c.Assert(err, IsNil)
-               c.Assert(results, NotNil)
-
-               blockToServers := results.BlockToServers
-
-               blockLocators := strings.Split(testData.indexResponseBody, "\n")
-               for _, loc := range blockLocators {
-                       locator := strings.Split(loc, " ")[0]
-                       if locator != "" {
-                               blockLocator, err := blockdigest.ParseBlockLocator(locator)
-                               c.Assert(err, IsNil)
-
-                               blockDigestWithSize := blockdigest.DigestWithSize{blockLocator.Digest, uint32(blockLocator.Size)}
-                               blockServerInfo := blockToServers[blockDigestWithSize]
-                               c.Assert(blockServerInfo[0].Mtime, NotNil)
-                       }
-               }
-       } else {
-               c.Assert(err, ErrorMatches, testData.expectedError)
-       }
-}
diff --git a/services/datamanager/loggerutil/loggerutil.go b/services/datamanager/loggerutil/loggerutil.go
deleted file mode 100644 (file)
index 8111425..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-/* Datamanager-specific logging methods. */
-
-package loggerutil
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "log"
-       "os"
-       "runtime"
-       "time"
-)
-
-// Useful to call at the beginning of execution to log info about the
-// current run.
-func LogRunInfo(arvLogger *logger.Logger) {
-       if arvLogger != nil {
-               now := time.Now()
-               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       runInfo := logger.GetOrCreateMap(p, "run_info")
-                       runInfo["started_at"] = now
-                       runInfo["args"] = os.Args
-                       hostname, err := os.Hostname()
-                       if err != nil {
-                               runInfo["hostname_error"] = err.Error()
-                       } else {
-                               runInfo["hostname"] = hostname
-                       }
-                       runInfo["pid"] = os.Getpid()
-               })
-       }
-}
-
-// A LogMutator that records the current memory usage. This is most useful as a logger write hook.
-func LogMemoryAlloc(p map[string]interface{}, e map[string]interface{}) {
-       runInfo := logger.GetOrCreateMap(p, "run_info")
-       var memStats runtime.MemStats
-       runtime.ReadMemStats(&memStats)
-       runInfo["memory_bytes_in_use"] = memStats.Alloc
-       runInfo["memory_bytes_reserved"] = memStats.Sys
-}
-
-func FatalWithMessage(arvLogger *logger.Logger, message string) {
-       if arvLogger != nil {
-               arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
-                       p["FATAL"] = message
-                       runInfo := logger.GetOrCreateMap(p, "run_info")
-                       runInfo["finished_at"] = time.Now()
-               })
-       }
-
-       log.Fatalf(message)
-}
diff --git a/services/datamanager/summary/canonical_string.go b/services/datamanager/summary/canonical_string.go
deleted file mode 100644 (file)
index 152314c..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-/* Ensures that we only have one copy of each unique string. This is
-/* not designed for concurrent access. */
-
-package summary
-
-// This code should probably be moved somewhere more universal.
-
-// CanonicalString struct
-type CanonicalString struct {
-       m map[string]string
-}
-
-// Get a CanonicalString
-func (cs *CanonicalString) Get(s string) (r string) {
-       if cs.m == nil {
-               cs.m = make(map[string]string)
-       }
-       value, found := cs.m[s]
-       if found {
-               return value
-       }
-
-       // s may be a substring of a much larger string.
-       // If we store s, it will prevent that larger string from getting
-       // garbage collected.
-       // If this is something you worry about you should change this code
-       // to make an explict copy of s using a byte array.
-       cs.m[s] = s
-       return s
-}
diff --git a/services/datamanager/summary/file.go b/services/datamanager/summary/file.go
deleted file mode 100644 (file)
index 6e463d7..0000000
+++ /dev/null
@@ -1,115 +0,0 @@
-// Handles writing data to and reading data from disk to speed up development.
-
-package summary
-
-import (
-       "encoding/gob"
-       "flag"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "log"
-       "os"
-)
-
-// Used to locally cache data read from servers to reduce execution
-// time when developing. Not for use in production.
-type serializedData struct {
-       ReadCollections collection.ReadCollections
-       KeepServerInfo  keep.ReadServers
-}
-
-var (
-       WriteDataTo  string
-       readDataFrom string
-)
-
-// DataFetcher to fetch data from keep servers
-type DataFetcher func(arvLogger *logger.Logger,
-       readCollections *collection.ReadCollections,
-       keepServerInfo *keep.ReadServers) error
-
-func init() {
-       flag.StringVar(&WriteDataTo,
-               "write-data-to",
-               "",
-               "Write summary of data received to this file. Used for development only.")
-       flag.StringVar(&readDataFrom,
-               "read-data-from",
-               "",
-               "Avoid network i/o and read summary data from this file instead. Used for development only.")
-}
-
-// MaybeWriteData writes data we've read to a file.
-//
-// This is useful for development, so that we don't need to read all
-// our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func MaybeWriteData(arvLogger *logger.Logger,
-       readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) error {
-       if WriteDataTo == "" {
-               return nil
-       }
-       summaryFile, err := os.Create(WriteDataTo)
-       if err != nil {
-               return err
-       }
-       defer summaryFile.Close()
-
-       enc := gob.NewEncoder(summaryFile)
-       data := serializedData{
-               ReadCollections: readCollections,
-               KeepServerInfo:  keepServerInfo}
-       err = enc.Encode(data)
-       if err != nil {
-               return err
-       }
-       log.Printf("Wrote summary data to: %s", WriteDataTo)
-       return nil
-}
-
-// ShouldReadData should not be used outside of development
-func ShouldReadData() bool {
-       return readDataFrom != ""
-}
-
-// ReadData reads data that we've written to a file.
-//
-// This is useful for development, so that we don't need to read all
-// our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func ReadData(arvLogger *logger.Logger,
-       readCollections *collection.ReadCollections,
-       keepServerInfo *keep.ReadServers) error {
-       if readDataFrom == "" {
-               return fmt.Errorf("ReadData() called with empty filename.")
-       }
-       summaryFile, err := os.Open(readDataFrom)
-       if err != nil {
-               return err
-       }
-       defer summaryFile.Close()
-
-       dec := gob.NewDecoder(summaryFile)
-       data := serializedData{}
-       err = dec.Decode(&data)
-       if err != nil {
-               return err
-       }
-
-       // re-summarize data, so that we can update our summarizing
-       // functions without needing to do all our network i/o
-       data.ReadCollections.Summarize(arvLogger)
-       data.KeepServerInfo.Summarize(arvLogger)
-
-       *readCollections = data.ReadCollections
-       *keepServerInfo = data.KeepServerInfo
-       log.Printf("Read summary data from: %s", readDataFrom)
-       return nil
-}
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
deleted file mode 100644 (file)
index d7fb3eb..0000000
+++ /dev/null
@@ -1,215 +0,0 @@
-// Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
-
-package summary
-
-import (
-       "encoding/json"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "log"
-       "os"
-       "strings"
-)
-
-// Locator is a block digest
-type Locator blockdigest.DigestWithSize
-
-// MarshalJSON encoding
-func (l Locator) MarshalJSON() ([]byte, error) {
-       return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
-}
-
-// PullRequest represents one entry in the Pull List
-type PullRequest struct {
-       Locator Locator  `json:"locator"`
-       Servers []string `json:"servers"`
-}
-
-// PullList for a particular server
-type PullList []PullRequest
-
-// PullListByLocator implements sort.Interface for PullList based on
-// the Digest.
-type PullListByLocator PullList
-
-func (a PullListByLocator) Len() int      { return len(a) }
-func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a PullListByLocator) Less(i, j int) bool {
-       di, dj := a[i].Locator.Digest, a[j].Locator.Digest
-       if di.H < dj.H {
-               return true
-       } else if di.H == dj.H {
-               if di.L < dj.L {
-                       return true
-               } else if di.L == dj.L {
-                       return a[i].Locator.Size < a[j].Locator.Size
-               }
-       }
-       return false
-}
-
-// PullServers struct
-// For a given under-replicated block, this structure represents which
-// servers should pull the specified block and which servers they can
-// pull it from.
-type PullServers struct {
-       To   []string // Servers that should pull the specified block
-       From []string // Servers that already contain the specified block
-}
-
-// ComputePullServers creates a map from block locator to PullServers
-// with one entry for each under-replicated block.
-//
-// This method ignores zero-replica blocks since there are no servers
-// to pull them from, so callers should feel free to omit them, but
-// this function will ignore them if they are provided.
-func ComputePullServers(kc *keepclient.KeepClient,
-       keepServerInfo *keep.ReadServers,
-       blockToDesiredReplication map[blockdigest.DigestWithSize]int,
-       underReplicated BlockSet) (m map[Locator]PullServers) {
-       m = map[Locator]PullServers{}
-       // We use CanonicalString to avoid filling memory with duplicate
-       // copies of the same string.
-       var cs CanonicalString
-
-       // Servers that are writeable
-       writableServers := map[string]struct{}{}
-       for _, url := range kc.WritableLocalRoots() {
-               writableServers[cs.Get(url)] = struct{}{}
-       }
-
-       for block := range underReplicated {
-               serversStoringBlock := keepServerInfo.BlockToServers[block]
-               numCopies := len(serversStoringBlock)
-               numCopiesMissing := blockToDesiredReplication[block] - numCopies
-               if numCopiesMissing > 0 {
-                       // We expect this to always be true, since the block was listed
-                       // in underReplicated.
-
-                       if numCopies > 0 {
-                               // Not much we can do with blocks with no copies.
-
-                               // A server's host-port string appears as a key in this map
-                               // iff it contains the block.
-                               serverHasBlock := map[string]struct{}{}
-                               for _, info := range serversStoringBlock {
-                                       sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
-                                       serverHasBlock[cs.Get(sa.URL())] = struct{}{}
-                               }
-
-                               roots := keepclient.NewRootSorter(kc.LocalRoots(),
-                                       block.String()).GetSortedRoots()
-
-                               l := Locator(block)
-                               m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
-                                       roots, numCopiesMissing)
-                       }
-               }
-       }
-       return m
-}
-
-// CreatePullServers creates a pull list in which the To and From
-// fields preserve the ordering of sorted servers and the contents
-// are all canonical strings.
-func CreatePullServers(cs CanonicalString,
-       serverHasBlock map[string]struct{},
-       writableServers map[string]struct{},
-       sortedServers []string,
-       maxToFields int) (ps PullServers) {
-
-       ps = PullServers{
-               To:   make([]string, 0, maxToFields),
-               From: make([]string, 0, len(serverHasBlock)),
-       }
-
-       for _, host := range sortedServers {
-               // Strip the protocol portion of the url.
-               // Use the canonical copy of the string to avoid memory waste.
-               server := cs.Get(host)
-               _, hasBlock := serverHasBlock[server]
-               if hasBlock {
-                       // The from field should include the protocol.
-                       ps.From = append(ps.From, cs.Get(host))
-               } else if len(ps.To) < maxToFields {
-                       _, writable := writableServers[host]
-                       if writable {
-                               ps.To = append(ps.To, server)
-                       }
-               }
-       }
-
-       return
-}
-
-// RemoveProtocolPrefix strips the protocol prefix from a url.
-func RemoveProtocolPrefix(url string) string {
-       return url[(strings.LastIndex(url, "/") + 1):]
-}
-
-// BuildPullLists produces a PullList for each keep server.
-func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
-       spl = map[string]PullList{}
-       // We don't worry about canonicalizing our strings here, because we
-       // assume lps was created by ComputePullServers() which already
-       // canonicalized the strings for us.
-       for locator, pullServers := range lps {
-               for _, destination := range pullServers.To {
-                       pullList, pullListExists := spl[destination]
-                       if !pullListExists {
-                               pullList = PullList{}
-                       }
-                       spl[destination] = append(pullList,
-                               PullRequest{Locator: locator, Servers: pullServers.From})
-               }
-       }
-       return
-}
-
-// WritePullLists writes each pull list to a file.
-// The filename is based on the hostname.
-//
-// This is just a hack for prototyping, it is not expected to be used
-// in production.
-func WritePullLists(arvLogger *logger.Logger,
-       pullLists map[string]PullList,
-       dryRun bool) error {
-       r := strings.NewReplacer(":", ".")
-
-       for host, list := range pullLists {
-               if arvLogger != nil {
-                       // We need a local variable because Update doesn't call our mutator func until later,
-                       // when our list variable might have been reused by the next loop iteration.
-                       host := host
-                       listLen := len(list)
-                       arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               pullListInfo := logger.GetOrCreateMap(p, "pull_list_len")
-                               pullListInfo[host] = listLen
-                       })
-               }
-
-               if dryRun {
-                       log.Print("dry run, not sending pull list to service %s with %d blocks", host, len(list))
-                       continue
-               }
-
-               filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
-               pullListFile, err := os.Create(filename)
-               if err != nil {
-                       return err
-               }
-               defer pullListFile.Close()
-
-               enc := json.NewEncoder(pullListFile)
-               err = enc.Encode(list)
-               if err != nil {
-                       return err
-               }
-               log.Printf("Wrote pull list to %s.", filename)
-       }
-
-       return nil
-}
diff --git a/services/datamanager/summary/pull_list_test.go b/services/datamanager/summary/pull_list_test.go
deleted file mode 100644 (file)
index 60b495c..0000000
+++ /dev/null
@@ -1,272 +0,0 @@
-package summary
-
-import (
-       "encoding/json"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       . "gopkg.in/check.v1"
-       "sort"
-       "testing"
-)
-
-// Gocheck boilerplate
-func TestPullLists(t *testing.T) {
-       TestingT(t)
-}
-
-type PullSuite struct{}
-
-var _ = Suite(&PullSuite{})
-
-// Helper method to declare string sets more succinctly
-// Could be placed somewhere more general.
-func stringSet(slice ...string) (m map[string]struct{}) {
-       m = map[string]struct{}{}
-       for _, s := range slice {
-               m[s] = struct{}{}
-       }
-       return
-}
-
-func (s *PullSuite) TestPullListPrintsJSONCorrectly(c *C) {
-       pl := PullList{PullRequest{
-               Locator: Locator(blockdigest.MakeTestDigestSpecifySize(0xBadBeef, 56789)),
-               Servers: []string{"keep0.qr1hi.arvadosapi.com:25107",
-                       "keep1.qr1hi.arvadosapi.com:25108"}}}
-
-       b, err := json.Marshal(pl)
-       c.Assert(err, IsNil)
-       expectedOutput := `[{"locator":"0000000000000000000000000badbeef+56789",` +
-               `"servers":["keep0.qr1hi.arvadosapi.com:25107",` +
-               `"keep1.qr1hi.arvadosapi.com:25108"]}]`
-       c.Check(string(b), Equals, expectedOutput)
-}
-
-func (s *PullSuite) TestCreatePullServers(c *C) {
-       var cs CanonicalString
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet(),
-                       stringSet(),
-                       []string{},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{}, From: []string{}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet(),
-                       []string{},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{}, From: []string{}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep0:25107"),
-                       []string{"https://keep0:25107"},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{}, From: []string{"https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{"https://keep3:25110", "https://keep2:25109"},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
-                       5),
-               DeepEquals,
-               PullServers{To: []string{"https://keep3:25110"},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
-                       1),
-               DeepEquals,
-               PullServers{To: []string{"https://keep3:25110"},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep2:25109",
-                               "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109",
-                               "https://keep1:25108", "https://keep0:25107"},
-                       1),
-               DeepEquals,
-               PullServers{To: []string{"https://keep3:25110"},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
-       c.Check(
-               CreatePullServers(cs,
-                       stringSet("https://keep0:25107", "https://keep1:25108"),
-                       stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
-                       []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
-                       0),
-               DeepEquals,
-               PullServers{To: []string{},
-                       From: []string{"https://keep1:25108", "https://keep0:25107"}})
-}
-
-// Checks whether two pull list maps are equal. Since pull lists are
-// ordered arbitrarily, we need to sort them by digest before
-// comparing them for deep equality.
-type pullListMapEqualsChecker struct {
-       *CheckerInfo
-}
-
-func (c *pullListMapEqualsChecker) Check(params []interface{}, names []string) (result bool, error string) {
-       obtained, ok := params[0].(map[string]PullList)
-       if !ok {
-               return false, "First parameter is not a PullList map"
-       }
-       expected, ok := params[1].(map[string]PullList)
-       if !ok {
-               return false, "Second parameter is not a PullList map"
-       }
-
-       for _, v := range obtained {
-               sort.Sort(PullListByLocator(v))
-       }
-       for _, v := range expected {
-               sort.Sort(PullListByLocator(v))
-       }
-
-       return DeepEquals.Check(params, names)
-}
-
-var PullListMapEquals Checker = &pullListMapEqualsChecker{&CheckerInfo{
-       Name:   "PullListMapEquals",
-       Params: []string{"obtained", "expected"},
-}}
-
-func (s *PullSuite) TestBuildPullLists(c *C) {
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{}),
-               PullListMapEquals,
-               map[string]PullList{})
-
-       locator1 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xBadBeef)}
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{}, From: []string{}}}),
-               PullListMapEquals,
-               map[string]PullList{})
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{}, From: []string{"f1", "f2"}}}),
-               PullListMapEquals,
-               map[string]PullList{})
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}}}),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}}})
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{"t1"}, From: []string{}}}),
-               PullListMapEquals,
-               map[string]PullList{"t1": {
-                       PullRequest{locator1, []string{}}}})
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {
-                               To:   []string{"t1", "t2"},
-                               From: []string{"f1", "f2"},
-                       }}),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
-                       "t2": {PullRequest{locator1, []string{"f1", "f2"}}},
-               })
-
-       locator2 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xCabbed)}
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}},
-                       locator2: {To: []string{"t2"}, From: []string{"f3", "f4"}}}),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
-                       "t2": {PullRequest{locator2, []string{"f3", "f4"}}},
-               })
-
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {
-                               To:   []string{"t1"},
-                               From: []string{"f1", "f2"}},
-                       locator2: {
-                               To:   []string{"t2", "t1"},
-                               From: []string{"f3", "f4"}},
-               }),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {
-                               PullRequest{locator1, []string{"f1", "f2"}},
-                               PullRequest{locator2, []string{"f3", "f4"}},
-                       },
-                       "t2": {
-                               PullRequest{locator2, []string{"f3", "f4"}},
-                       },
-               })
-
-       locator3 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xDeadBeef)}
-       locator4 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xFedBeef)}
-       c.Check(
-               BuildPullLists(map[Locator]PullServers{
-                       locator1: {
-                               To:   []string{"t1"},
-                               From: []string{"f1", "f2"}},
-                       locator2: {
-                               To:   []string{"t2", "t1"},
-                               From: []string{"f3", "f4"}},
-                       locator3: {
-                               To:   []string{"t3", "t2", "t1"},
-                               From: []string{"f4", "f5"}},
-                       locator4: {
-                               To:   []string{"t4", "t3", "t2", "t1"},
-                               From: []string{"f1", "f5"}},
-               }),
-               PullListMapEquals,
-               map[string]PullList{
-                       "t1": {
-                               PullRequest{locator1, []string{"f1", "f2"}},
-                               PullRequest{locator2, []string{"f3", "f4"}},
-                               PullRequest{locator3, []string{"f4", "f5"}},
-                               PullRequest{locator4, []string{"f1", "f5"}},
-                       },
-                       "t2": {
-                               PullRequest{locator2, []string{"f3", "f4"}},
-                               PullRequest{locator3, []string{"f4", "f5"}},
-                               PullRequest{locator4, []string{"f1", "f5"}},
-                       },
-                       "t3": {
-                               PullRequest{locator3, []string{"f4", "f5"}},
-                               PullRequest{locator4, []string{"f1", "f5"}},
-                       },
-                       "t4": {
-                               PullRequest{locator4, []string{"f1", "f5"}},
-                       },
-               })
-}
diff --git a/services/datamanager/summary/summary.go b/services/datamanager/summary/summary.go
deleted file mode 100644 (file)
index 9fb0316..0000000
+++ /dev/null
@@ -1,277 +0,0 @@
-// Summarizes Collection Data and Keep Server Contents.
-
-package summary
-
-// TODO(misha): Check size of blocks as well as their digest.
-
-import (
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "sort"
-)
-
-// BlockSet is a map of blocks
-type BlockSet map[blockdigest.DigestWithSize]struct{}
-
-// Insert adds a single block to the set.
-func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
-       bs[digest] = struct{}{}
-}
-
-// Union adds a set of blocks to the set.
-func (bs BlockSet) Union(obs BlockSet) {
-       for k, v := range obs {
-               bs[k] = v
-       }
-}
-
-// CollectionIndexSet is used to save space. To convert to and from
-// the uuid, use collection.ReadCollections' fields
-// CollectionIndexToUUID and CollectionUUIDToIndex.
-type CollectionIndexSet map[int]struct{}
-
-// Insert adds a single collection to the set. The collection is specified by
-// its index.
-func (cis CollectionIndexSet) Insert(collectionIndex int) {
-       cis[collectionIndex] = struct{}{}
-}
-
-// ToCollectionIndexSet gets block to collection indices
-func (bs BlockSet) ToCollectionIndexSet(
-       readCollections collection.ReadCollections,
-       collectionIndexSet *CollectionIndexSet) {
-       for block := range bs {
-               for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
-                       collectionIndexSet.Insert(collectionIndex)
-               }
-       }
-}
-
-// ReplicationLevels struct
-// Keeps track of the requested and actual replication levels.
-// Currently this is only used for blocks but could easily be used for
-// collections as well.
-type ReplicationLevels struct {
-       // The requested replication level.
-       // For Blocks this is the maximum replication level among all the
-       // collections this block belongs to.
-       Requested int
-
-       // The actual number of keep servers this is on.
-       Actual int
-}
-
-// ReplicationLevelBlockSetMap maps from replication levels to their blocks.
-type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
-
-// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap
-// which only reports the number of blocks, not which blocks.
-type ReplicationLevelBlockCount struct {
-       Levels ReplicationLevels
-       Count  int
-}
-
-// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting.
-type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
-
-// ReplicationSummary sturct
-type ReplicationSummary struct {
-       CollectionBlocksNotInKeep  BlockSet
-       UnderReplicatedBlocks      BlockSet
-       OverReplicatedBlocks       BlockSet
-       CorrectlyReplicatedBlocks  BlockSet
-       KeepBlocksNotInCollections BlockSet
-
-       CollectionsNotFullyInKeep      CollectionIndexSet
-       UnderReplicatedCollections     CollectionIndexSet
-       OverReplicatedCollections      CollectionIndexSet
-       CorrectlyReplicatedCollections CollectionIndexSet
-}
-
-// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary.
-type ReplicationSummaryCounts struct {
-       CollectionBlocksNotInKeep      int
-       UnderReplicatedBlocks          int
-       OverReplicatedBlocks           int
-       CorrectlyReplicatedBlocks      int
-       KeepBlocksNotInCollections     int
-       CollectionsNotFullyInKeep      int
-       UnderReplicatedCollections     int
-       OverReplicatedCollections      int
-       CorrectlyReplicatedCollections int
-}
-
-// GetOrCreate gets the BlockSet for a given set of ReplicationLevels,
-// creating it if it doesn't already exist.
-func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
-       repLevels ReplicationLevels) (bs BlockSet) {
-       bs, exists := rlbs[repLevels]
-       if !exists {
-               bs = make(BlockSet)
-               rlbs[repLevels] = bs
-       }
-       return
-}
-
-// Insert adds a block to the set for a given replication level.
-func (rlbs ReplicationLevelBlockSetMap) Insert(
-       repLevels ReplicationLevels,
-       block blockdigest.DigestWithSize) {
-       rlbs.GetOrCreate(repLevels).Insert(block)
-}
-
-// Union adds a set of blocks to the set for a given replication level.
-func (rlbs ReplicationLevelBlockSetMap) Union(
-       repLevels ReplicationLevels,
-       bs BlockSet) {
-       rlbs.GetOrCreate(repLevels).Union(bs)
-}
-
-// Counts outputs a sorted list of ReplicationLevelBlockCounts.
-func (rlbs ReplicationLevelBlockSetMap) Counts() (
-       sorted ReplicationLevelBlockSetSlice) {
-       sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
-       i := 0
-       for levels, set := range rlbs {
-               sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
-               i++
-       }
-       sort.Sort(sorted)
-       return
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Len() int {
-       return len(rlbss)
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
-       return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
-               (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
-                       rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
-       rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
-}
-
-// ComputeCounts returns ReplicationSummaryCounts
-func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
-       // TODO(misha): Consider rewriting this method to iterate through
-       // the fields using reflection, instead of explictily listing the
-       // fields as we do now.
-       rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
-       rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
-       rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
-       rsc.CorrectlyReplicatedBlocks = len(rs.CorrectlyReplicatedBlocks)
-       rsc.KeepBlocksNotInCollections = len(rs.KeepBlocksNotInCollections)
-       rsc.CollectionsNotFullyInKeep = len(rs.CollectionsNotFullyInKeep)
-       rsc.UnderReplicatedCollections = len(rs.UnderReplicatedCollections)
-       rsc.OverReplicatedCollections = len(rs.OverReplicatedCollections)
-       rsc.CorrectlyReplicatedCollections = len(rs.CorrectlyReplicatedCollections)
-       return rsc
-}
-
-// PrettyPrint ReplicationSummaryCounts
-func (rsc ReplicationSummaryCounts) PrettyPrint() string {
-       return fmt.Sprintf("Replication Block Counts:"+
-               "\n Missing From Keep: %d, "+
-               "\n Under Replicated: %d, "+
-               "\n Over Replicated: %d, "+
-               "\n Replicated Just Right: %d, "+
-               "\n Not In Any Collection: %d. "+
-               "\nReplication Collection Counts:"+
-               "\n Missing From Keep: %d, "+
-               "\n Under Replicated: %d, "+
-               "\n Over Replicated: %d, "+
-               "\n Replicated Just Right: %d.",
-               rsc.CollectionBlocksNotInKeep,
-               rsc.UnderReplicatedBlocks,
-               rsc.OverReplicatedBlocks,
-               rsc.CorrectlyReplicatedBlocks,
-               rsc.KeepBlocksNotInCollections,
-               rsc.CollectionsNotFullyInKeep,
-               rsc.UnderReplicatedCollections,
-               rsc.OverReplicatedCollections,
-               rsc.CorrectlyReplicatedCollections)
-}
-
-// BucketReplication returns ReplicationLevelBlockSetMap
-func BucketReplication(readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) {
-       rlbs = make(ReplicationLevelBlockSetMap)
-
-       for block, requestedReplication := range readCollections.BlockToDesiredReplication {
-               rlbs.Insert(
-                       ReplicationLevels{
-                               Requested: requestedReplication,
-                               Actual:    len(keepServerInfo.BlockToServers[block])},
-                       block)
-       }
-
-       for block, servers := range keepServerInfo.BlockToServers {
-               if 0 == readCollections.BlockToDesiredReplication[block] {
-                       rlbs.Insert(
-                               ReplicationLevels{Requested: 0, Actual: len(servers)},
-                               block)
-               }
-       }
-       return
-}
-
-// SummarizeBuckets reads collections and summarizes
-func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets(
-       readCollections collection.ReadCollections) (
-       rs ReplicationSummary) {
-       rs.CollectionBlocksNotInKeep = make(BlockSet)
-       rs.UnderReplicatedBlocks = make(BlockSet)
-       rs.OverReplicatedBlocks = make(BlockSet)
-       rs.CorrectlyReplicatedBlocks = make(BlockSet)
-       rs.KeepBlocksNotInCollections = make(BlockSet)
-
-       rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
-       rs.UnderReplicatedCollections = make(CollectionIndexSet)
-       rs.OverReplicatedCollections = make(CollectionIndexSet)
-       rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
-
-       for levels, bs := range rlbs {
-               if levels.Actual == 0 {
-                       rs.CollectionBlocksNotInKeep.Union(bs)
-               } else if levels.Requested == 0 {
-                       rs.KeepBlocksNotInCollections.Union(bs)
-               } else if levels.Actual < levels.Requested {
-                       rs.UnderReplicatedBlocks.Union(bs)
-               } else if levels.Actual > levels.Requested {
-                       rs.OverReplicatedBlocks.Union(bs)
-               } else {
-                       rs.CorrectlyReplicatedBlocks.Union(bs)
-               }
-       }
-
-       rs.CollectionBlocksNotInKeep.ToCollectionIndexSet(readCollections,
-               &rs.CollectionsNotFullyInKeep)
-       // Since different collections can specify different replication
-       // levels, the fact that a block is under-replicated does not imply
-       // that all collections that it belongs to are under-replicated, but
-       // we'll ignore that for now.
-       // TODO(misha): Fix this and report the correct set of collections.
-       rs.UnderReplicatedBlocks.ToCollectionIndexSet(readCollections,
-               &rs.UnderReplicatedCollections)
-       rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
-               &rs.OverReplicatedCollections)
-
-       for i := range readCollections.CollectionIndexToUUID {
-               if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
-               } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
-               } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
-               } else {
-                       rs.CorrectlyReplicatedCollections.Insert(i)
-               }
-       }
-
-       return
-}
diff --git a/services/datamanager/summary/summary_test.go b/services/datamanager/summary/summary_test.go
deleted file mode 100644 (file)
index 8268404..0000000
+++ /dev/null
@@ -1,220 +0,0 @@
-package summary
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/services/datamanager/collection"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "reflect"
-       "sort"
-       "testing"
-)
-
-func BlockSetFromSlice(digests []int) (bs BlockSet) {
-       bs = make(BlockSet)
-       for _, digest := range digests {
-               bs.Insert(blockdigest.MakeTestDigestWithSize(digest))
-       }
-       return
-}
-
-func CollectionIndexSetFromSlice(indices []int) (cis CollectionIndexSet) {
-       cis = make(CollectionIndexSet)
-       for _, index := range indices {
-               cis.Insert(index)
-       }
-       return
-}
-
-func (cis CollectionIndexSet) ToSlice() (ints []int) {
-       ints = make([]int, len(cis))
-       i := 0
-       for collectionIndex := range cis {
-               ints[i] = collectionIndex
-               i++
-       }
-       sort.Ints(ints)
-       return
-}
-
-// Helper method to meet interface expected by older tests.
-func SummarizeReplication(readCollections collection.ReadCollections,
-       keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
-       return BucketReplication(readCollections, keepServerInfo).
-               SummarizeBuckets(readCollections)
-}
-
-// Takes a map from block digest to replication level and represents
-// it in a keep.ReadServers structure.
-func SpecifyReplication(digestToReplication map[int]int) (rs keep.ReadServers) {
-       rs.BlockToServers = make(map[blockdigest.DigestWithSize][]keep.BlockServerInfo)
-       for digest, replication := range digestToReplication {
-               rs.BlockToServers[blockdigest.MakeTestDigestWithSize(digest)] =
-                       make([]keep.BlockServerInfo, replication)
-       }
-       return
-}
-
-// Verifies that
-// blocks.ToCollectionIndexSet(rc.BlockToCollectionIndices) returns
-// expectedCollections.
-func VerifyToCollectionIndexSet(
-       t *testing.T,
-       blocks []int,
-       blockToCollectionIndices map[int][]int,
-       expectedCollections []int) {
-
-       expected := CollectionIndexSetFromSlice(expectedCollections)
-
-       rc := collection.ReadCollections{
-               BlockToCollectionIndices: map[blockdigest.DigestWithSize][]int{},
-       }
-       for digest, indices := range blockToCollectionIndices {
-               rc.BlockToCollectionIndices[blockdigest.MakeTestDigestWithSize(digest)] = indices
-       }
-
-       returned := make(CollectionIndexSet)
-       BlockSetFromSlice(blocks).ToCollectionIndexSet(rc, &returned)
-
-       if !reflect.DeepEqual(returned, expected) {
-               t.Errorf("Expected %v.ToCollectionIndexSet(%v) to return \n %v \n but instead received \n %v",
-                       blocks,
-                       blockToCollectionIndices,
-                       expectedCollections,
-                       returned.ToSlice())
-       }
-}
-
-func TestToCollectionIndexSet(t *testing.T) {
-       VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{6: {0}}, []int{0})
-       VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1}}, []int{1})
-       VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1, 9}}, []int{1, 9})
-       VerifyToCollectionIndexSet(t, []int{5, 6},
-               map[int][]int{5: {2, 3}, 6: {3, 4}},
-               []int{2, 3, 4})
-       VerifyToCollectionIndexSet(t, []int{5, 6},
-               map[int][]int{5: {8}, 6: {4}},
-               []int{4, 8})
-       VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{5: {0}}, []int{})
-}
-
-func TestSimpleSummary(t *testing.T) {
-       rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               {ReplicationLevel: 1, Blocks: []int{1, 2}},
-       })
-       rc.Summarize(nil)
-       cIndex := rc.CollectionIndicesForTesting()
-
-       keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1})
-
-       expectedSummary := ReplicationSummary{
-               CollectionBlocksNotInKeep:  BlockSet{},
-               UnderReplicatedBlocks:      BlockSet{},
-               OverReplicatedBlocks:       BlockSet{},
-               CorrectlyReplicatedBlocks:  BlockSetFromSlice([]int{1, 2}),
-               KeepBlocksNotInCollections: BlockSet{},
-
-               CollectionsNotFullyInKeep:      CollectionIndexSet{},
-               UnderReplicatedCollections:     CollectionIndexSet{},
-               OverReplicatedCollections:      CollectionIndexSet{},
-               CorrectlyReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[0]}),
-       }
-
-       returnedSummary := SummarizeReplication(rc, keepInfo)
-
-       if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v", expectedSummary, returnedSummary)
-       }
-}
-
-func TestMissingBlock(t *testing.T) {
-       rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               {ReplicationLevel: 1, Blocks: []int{1, 2}},
-       })
-       rc.Summarize(nil)
-       cIndex := rc.CollectionIndicesForTesting()
-
-       keepInfo := SpecifyReplication(map[int]int{1: 1})
-
-       expectedSummary := ReplicationSummary{
-               CollectionBlocksNotInKeep:  BlockSetFromSlice([]int{2}),
-               UnderReplicatedBlocks:      BlockSet{},
-               OverReplicatedBlocks:       BlockSet{},
-               CorrectlyReplicatedBlocks:  BlockSetFromSlice([]int{1}),
-               KeepBlocksNotInCollections: BlockSet{},
-
-               CollectionsNotFullyInKeep:      CollectionIndexSetFromSlice([]int{cIndex[0]}),
-               UnderReplicatedCollections:     CollectionIndexSet{},
-               OverReplicatedCollections:      CollectionIndexSet{},
-               CorrectlyReplicatedCollections: CollectionIndexSet{},
-       }
-
-       returnedSummary := SummarizeReplication(rc, keepInfo)
-
-       if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v",
-                       expectedSummary,
-                       returnedSummary)
-       }
-}
-
-func TestUnderAndOverReplicatedBlocks(t *testing.T) {
-       rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               {ReplicationLevel: 2, Blocks: []int{1, 2}},
-       })
-       rc.Summarize(nil)
-       cIndex := rc.CollectionIndicesForTesting()
-
-       keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 3})
-
-       expectedSummary := ReplicationSummary{
-               CollectionBlocksNotInKeep:  BlockSet{},
-               UnderReplicatedBlocks:      BlockSetFromSlice([]int{1}),
-               OverReplicatedBlocks:       BlockSetFromSlice([]int{2}),
-               CorrectlyReplicatedBlocks:  BlockSet{},
-               KeepBlocksNotInCollections: BlockSet{},
-
-               CollectionsNotFullyInKeep:      CollectionIndexSet{},
-               UnderReplicatedCollections:     CollectionIndexSetFromSlice([]int{cIndex[0]}),
-               OverReplicatedCollections:      CollectionIndexSetFromSlice([]int{cIndex[0]}),
-               CorrectlyReplicatedCollections: CollectionIndexSet{},
-       }
-
-       returnedSummary := SummarizeReplication(rc, keepInfo)
-
-       if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v",
-                       expectedSummary,
-                       returnedSummary)
-       }
-}
-
-func TestMixedReplication(t *testing.T) {
-       rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
-               {ReplicationLevel: 1, Blocks: []int{1, 2}},
-               {ReplicationLevel: 1, Blocks: []int{3, 4}},
-               {ReplicationLevel: 2, Blocks: []int{5, 6}},
-       })
-       rc.Summarize(nil)
-       cIndex := rc.CollectionIndicesForTesting()
-
-       keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1, 3: 1, 5: 1, 6: 3, 7: 2})
-
-       expectedSummary := ReplicationSummary{
-               CollectionBlocksNotInKeep:  BlockSetFromSlice([]int{4}),
-               UnderReplicatedBlocks:      BlockSetFromSlice([]int{5}),
-               OverReplicatedBlocks:       BlockSetFromSlice([]int{6}),
-               CorrectlyReplicatedBlocks:  BlockSetFromSlice([]int{1, 2, 3}),
-               KeepBlocksNotInCollections: BlockSetFromSlice([]int{7}),
-
-               CollectionsNotFullyInKeep:      CollectionIndexSetFromSlice([]int{cIndex[1]}),
-               UnderReplicatedCollections:     CollectionIndexSetFromSlice([]int{cIndex[2]}),
-               OverReplicatedCollections:      CollectionIndexSetFromSlice([]int{cIndex[2]}),
-               CorrectlyReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[0]}),
-       }
-
-       returnedSummary := SummarizeReplication(rc, keepInfo)
-
-       if !reflect.DeepEqual(returnedSummary, expectedSummary) {
-               t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUUID, rc.BlockToCollectionIndices)
-       }
-}
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
deleted file mode 100644 (file)
index 3e4d387..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-// Code for generating trash lists
-
-package summary
-
-import (
-       "errors"
-       "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "time"
-)
-
-// BuildTrashLists builds list of blocks to be sent to trash queue
-func BuildTrashLists(kc *keepclient.KeepClient,
-       keepServerInfo *keep.ReadServers,
-       keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
-
-       // Servers that are writeable
-       writableServers := map[string]struct{}{}
-       for _, url := range kc.WritableLocalRoots() {
-               writableServers[url] = struct{}{}
-       }
-
-       _ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
-       if err != nil {
-               return nil, errors.New(fmt.Sprintf("Failed to get blobSignatureTtl, can't build trash lists: %v", err))
-       }
-
-       ttl := int64(_ttl.(float64))
-
-       // expire unreferenced blocks more than "ttl" seconds old.
-       expiry := time.Now().UTC().UnixNano() - ttl*1e9
-
-       return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
-}
-
-func buildTrashListsInternal(writableServers map[string]struct{},
-       keepServerInfo *keep.ReadServers,
-       expiry int64,
-       keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
-
-       m = make(map[string]keep.TrashList)
-
-       for block := range keepBlocksNotInCollections {
-               for _, blockOnServer := range keepServerInfo.BlockToServers[block] {
-                       if blockOnServer.Mtime >= expiry {
-                               continue
-                       }
-
-                       // block is older than expire cutoff
-                       srv := keepServerInfo.KeepServerIndexToAddress[blockOnServer.ServerIndex].String()
-
-                       if _, writable := writableServers[srv]; !writable {
-                               continue
-                       }
-
-                       m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: blockOnServer.Mtime})
-               }
-       }
-       return
-
-}
diff --git a/services/datamanager/summary/trash_list_test.go b/services/datamanager/summary/trash_list_test.go
deleted file mode 100644 (file)
index 3626904..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-package summary
-
-import (
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "git.curoverse.com/arvados.git/services/datamanager/keep"
-       . "gopkg.in/check.v1"
-       "testing"
-)
-
-// Gocheck boilerplate
-func TestTrash(t *testing.T) {
-       TestingT(t)
-}
-
-type TrashSuite struct{}
-
-var _ = Suite(&TrashSuite{})
-
-func (s *TrashSuite) TestBuildTrashLists(c *C) {
-       var sv0 = keep.ServerAddress{Host: "keep0.example.com", Port: 80}
-       var sv1 = keep.ServerAddress{Host: "keep1.example.com", Port: 80}
-
-       var block0 = blockdigest.MakeTestDigestWithSize(0xdeadbeef)
-       var block1 = blockdigest.MakeTestDigestWithSize(0xfedbeef)
-
-       var keepServerInfo = keep.ReadServers{
-               KeepServerIndexToAddress: []keep.ServerAddress{sv0, sv1},
-               BlockToServers: map[blockdigest.DigestWithSize][]keep.BlockServerInfo{
-                       block0: {
-                               {0, 99},
-                               {1, 101}},
-                       block1: {
-                               {0, 99},
-                               {1, 101}}}}
-
-       // only block0 is in delete set
-       var bs = make(BlockSet)
-       bs[block0] = struct{}{}
-
-       // Test trash list where only sv0 is on writable list.
-       c.Check(buildTrashListsInternal(
-               map[string]struct{}{
-                       sv0.URL(): {}},
-               &keepServerInfo,
-               110,
-               bs),
-               DeepEquals,
-               map[string]keep.TrashList{
-                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
-
-       // Test trash list where both sv0 and sv1 are on writable list.
-       c.Check(buildTrashListsInternal(
-               map[string]struct{}{
-                       sv0.URL(): {},
-                       sv1.URL(): {}},
-               &keepServerInfo,
-               110,
-               bs),
-               DeepEquals,
-               map[string]keep.TrashList{
-                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}},
-                       "http://keep1.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
-
-       // Test trash list where only block on sv0 is expired
-       c.Check(buildTrashListsInternal(
-               map[string]struct{}{
-                       sv0.URL(): {},
-                       sv1.URL(): {}},
-               &keepServerInfo,
-               100,
-               bs),
-               DeepEquals,
-               map[string]keep.TrashList{
-                       "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
-
-}
diff --git a/services/fuse/arvados_fuse/_version.py b/services/fuse/arvados_fuse/_version.py
new file mode 100644 (file)
index 0000000..837d4b9
--- /dev/null
@@ -0,0 +1,3 @@
+import pkg_resources
+
+__version__ = pkg_resources.require('arvados_fuse')[0].version
index 3f89732bea25dcd1ca546fbef126227e9e0a9256..f2948f9e45f295b43544615ad75c764c35856053 100644 (file)
@@ -13,6 +13,7 @@ import time
 import arvados.commands._util as arv_cmd
 from arvados_fuse import crunchstat
 from arvados_fuse import *
+from arvados_fuse._version import __version__
 
 class ArgumentParser(argparse.ArgumentParser):
     def __init__(self):
@@ -24,6 +25,9 @@ class ArgumentParser(argparse.ArgumentParser):
     mountpoint before --exec, or mark the end of your --exec arguments
     with "--".
             """)
+        self.add_argument('--version', action='version',
+                          version="%s %s" % (sys.argv[0], __version__),
+                          help='Print version and exit.')
         self.add_argument('mountpoint', type=str, help="""Mount point.""")
         self.add_argument('--allow-other', action='store_true',
                             help="""Let other users read the mount""")
index d7e1a8afb302b26ae582bc5a3a5aaecc9514ae7c..9e282caf49919972b3fefe60001c603ae8176305 100644 (file)
@@ -40,7 +40,8 @@ setup(name='arvados_fuse',
         'arvados-python-client >= 0.1.20151118035730',
         'llfuse==0.41.1',
         'python-daemon',
-        'ciso8601'
+        'ciso8601',
+        'setuptools'
         ],
       test_suite='tests',
       tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
index e8488d7ff967179423f3732c8e6e56b05194ed58..57b4a37826d6c4b4a73a22c6c51716021367f22b 100644 (file)
@@ -3,6 +3,7 @@ import arvados_fuse
 import arvados_fuse.command
 import contextlib
 import functools
+import io
 import json
 import llfuse
 import logging
@@ -48,6 +49,14 @@ class MountArgsTest(unittest.TestCase):
             ent = ent[p]
         return ent
 
+    @contextlib.contextmanager
+    def stderrMatches(self, stderr):
+        orig, sys.stderr = sys.stderr, stderr
+        try:
+            yield
+        finally:
+            sys.stderr = orig
+
     def check_ent_type(self, cls, *path):
         ent = self.lookup(self.mnt, *path)
         self.assertEqual(ent.__class__, cls)
@@ -170,6 +179,13 @@ class MountArgsTest(unittest.TestCase):
                          run_test_server.fixture('users')['active']['uuid'])
         self.assertEqual(True, self.mnt.listen_for_events)
 
+    def test_version_argument(self):
+        orig, sys.stderr = sys.stderr, io.BytesIO()
+        with self.assertRaises(SystemExit):
+            args = arvados_fuse.command.ArgumentParser().parse_args(['--version'])
+        self.assertRegexpMatches(sys.stderr.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+        sys.stderr = orig
+
     @noexit
     @mock.patch('arvados.events.subscribe')
     def test_disable_event_listening(self, mock_subscribe):
index 8fc06c3534b76054cecbfdb1116007579952bcb1..9389f19ed801cf1ee840642d2078b70de8aa9e50 100644 (file)
@@ -246,7 +246,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                        }
                        if len(errs) > 0 {
                                // Some other goroutine encountered an
-                               // error -- any futher effort here
+                               // error -- any further effort here
                                // will be wasted.
                                return
                        }
index 6389d503dfc5ccee32471d308bf81ee64b012135..c43b85b1c588700ca41378432363ea41545e5dd0 100644 (file)
@@ -7,6 +7,8 @@ import (
        "encoding/json"
        "fmt"
        "io/ioutil"
+       "net/http"
+       "net/http/httptest"
        "os"
        "time"
 
@@ -112,6 +114,94 @@ func (s *StubbedS3Suite) TestStats(c *check.C) {
        c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
 }
 
+type blockingHandler struct {
+       requested chan *http.Request
+       unblock   chan struct{}
+}
+
+func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       if h.requested != nil {
+               h.requested <- r
+       }
+       if h.unblock != nil {
+               <-h.unblock
+       }
+       http.Error(w, "nothing here", http.StatusNotFound)
+}
+
+func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := make([]byte, 3)
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
+               _, err := v.Get(ctx, loc, buf)
+               return err
+       })
+}
+
+func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := []byte("bar")
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
+               return v.Compare(ctx, loc, buf)
+       })
+}
+
+func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       buf := []byte("foo")
+
+       s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
+               return v.Put(ctx, loc, buf)
+       })
+}
+
+func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
+       handler := &blockingHandler{}
+       srv := httptest.NewServer(handler)
+       defer srv.Close()
+
+       v := s.newTestableVolume(c, 5*time.Minute, false, 2)
+       vol := *v.S3Volume
+       vol.Endpoint = srv.URL
+       v = &TestableS3Volume{S3Volume: &vol}
+       v.Start()
+
+       ctx, cancel := context.WithCancel(context.Background())
+
+       handler.requested = make(chan *http.Request)
+       handler.unblock = make(chan struct{})
+       defer close(handler.unblock)
+
+       doneFunc := make(chan struct{})
+       go func() {
+               err := testFunc(ctx, v)
+               c.Check(err, check.Equals, context.Canceled)
+               close(doneFunc)
+       }()
+
+       timeout := time.After(10 * time.Second)
+
+       // Wait for the stub server to receive a request, meaning
+       // Get() is waiting for an s3 operation.
+       select {
+       case <-timeout:
+               c.Fatal("timed out waiting for test func to call our handler")
+       case <-doneFunc:
+               c.Fatal("test func finished without even calling our handler!")
+       case <-handler.requested:
+       }
+
+       cancel()
+
+       select {
+       case <-timeout:
+               c.Fatal("timed out")
+       case <-doneFunc:
+       }
+}
+
 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
        defer func(tl, bs arvados.Duration) {
                theConfig.TrashLifetime = tl
@@ -320,18 +410,9 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
        srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
        c.Assert(err, check.IsNil)
 
-       tmp, err := ioutil.TempFile("", "keepstore")
-       c.Assert(err, check.IsNil)
-       defer os.Remove(tmp.Name())
-       _, err = tmp.Write([]byte("xxx\n"))
-       c.Assert(err, check.IsNil)
-       c.Assert(tmp.Close(), check.IsNil)
-
        v := &TestableS3Volume{
                S3Volume: &S3Volume{
                        Bucket:             TestBucketName,
-                       AccessKeyFile:      tmp.Name(),
-                       SecretKeyFile:      tmp.Name(),
                        Endpoint:           srv.URL(),
                        Region:             "test-region-1",
                        LocationConstraint: true,
@@ -341,15 +422,31 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
                        ReadOnly:           readonly,
                        IndexPageSize:      1000,
                },
+               c:           c,
                server:      srv,
                serverClock: clock,
        }
-       c.Assert(v.Start(), check.IsNil)
+       v.Start()
        err = v.bucket.PutBucket(s3.ACL("private"))
        c.Assert(err, check.IsNil)
        return v
 }
 
+func (v *TestableS3Volume) Start() error {
+       tmp, err := ioutil.TempFile("", "keepstore")
+       v.c.Assert(err, check.IsNil)
+       defer os.Remove(tmp.Name())
+       _, err = tmp.Write([]byte("xxx\n"))
+       v.c.Assert(err, check.IsNil)
+       v.c.Assert(tmp.Close(), check.IsNil)
+
+       v.S3Volume.AccessKeyFile = tmp.Name()
+       v.S3Volume.SecretKeyFile = tmp.Name()
+
+       v.c.Assert(v.S3Volume.Start(), check.IsNil)
+       return nil
+}
+
 // PutRaw skips the ContentMD5 test
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
        err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
diff --git a/services/nodemanager/arvnodeman/_version.py b/services/nodemanager/arvnodeman/_version.py
new file mode 100644 (file)
index 0000000..9a29cc1
--- /dev/null
@@ -0,0 +1,3 @@
+import pkg_resources
+
+__version__ = pkg_resources.require('arvados-node-manager')[0].version
index b853f00a6728693cce4b855021e18bb35c869087..1c6d214fe8818e9dd49e94b413daa6609096a4c8 100644 (file)
@@ -31,6 +31,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         create_kwargs = create_kwargs.copy()
         create_kwargs.setdefault('external_ip', None)
         create_kwargs.setdefault('ex_metadata', {})
+        self._project = auth_kwargs.get("project")
         super(ComputeNodeDriver, self).__init__(
             auth_kwargs, list_kwargs, create_kwargs,
             driver_class)
@@ -44,7 +45,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
 
     def _init_image(self, image_name):
         return 'image', self.search_for(
-            image_name, 'list_images', self._name_key)
+            image_name, 'list_images', self._name_key, ex_project=self._project)
 
     def _init_network(self, network_name):
         return 'ex_network', self.search_for(
index 1be7e46387ff6c5bfe38d4e4805694fb7986cfa7..87ce48769e30235c8407f65bb73508633f97f916 100644 (file)
@@ -17,6 +17,7 @@ from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
 from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
 from .timedcallback import TimedCallBackActor
+from ._version import __version__
 
 node_daemon = None
 
@@ -28,6 +29,10 @@ def parse_cli(args):
     parser = argparse.ArgumentParser(
         prog='arvados-node-manager',
         description="Dynamically allocate Arvados cloud compute nodes")
+    parser.add_argument(
+        '--version', action='version',
+        version="%s %s" % (sys.argv[0], __version__),
+        help='Print version and exit.')
     parser.add_argument(
         '--foreground', action='store_true', default=False,
         help="Run in the foreground.  Don't daemonize.")
index 3d838e49b443750be9608eec67738bbdb9b679f2..c30108f44bb65a487945e665e7f2afae91528c00 100644 (file)
@@ -33,6 +33,7 @@ setup(name='arvados-node-manager',
         'arvados-python-client>=0.1.20150206225333',
         'pykka',
         'python-daemon',
+        'setuptools'
         ],
       dependency_links = [
           "https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev4.zip"
diff --git a/services/nodemanager/tests/test_arguments.py b/services/nodemanager/tests/test_arguments.py
new file mode 100644 (file)
index 0000000..f98309a
--- /dev/null
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+
+import io
+import os
+import sys
+import tempfile
+import unittest
+
+import arvnodeman.launcher as nodeman
+from . import testutil
+
+class ArvNodemArgumentsTestCase(unittest.TestCase):
+    def run_nodeman(self, args):
+        return nodeman.main(args)
+
+    def test_unsupported_arg(self):
+        with self.assertRaises(SystemExit):
+            self.run_nodeman(['-x=unknown'])
+
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with testutil.redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.run_nodeman(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
index 1b6aab3cafed16cfc0960d1a39a32d669fe53ffb..15337c4120173b6a7f2ca6b21f0924bd571f2326 100644 (file)
@@ -2,13 +2,15 @@
 
 from __future__ import absolute_import, print_function
 
+import contextlib
 import datetime
+import mock
+import pykka
+import sys
 import threading
 import time
 
 import libcloud.common.types as cloud_types
-import mock
-import pykka
 
 from . import pykka_timeout
 
@@ -55,6 +57,17 @@ def cloud_node_fqdn(node):
 def ip_address_mock(last_octet):
     return '10.20.30.{}'.format(last_octet)
 
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+    orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+    orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+    try:
+        yield
+    finally:
+        sys.stdout = orig_stdout
+        sys.stderr = orig_stderr
+
+
 class MockShutdownTimer(object):
     def _set_state(self, is_open, next_opening):
         self.window_open = lambda: is_open