## Development
[![Build Status](https://ci.curoverse.com/buildStatus/icon?job=run-tests)](https://ci.curoverse.com/job/run-tests/)
+[![Go Report Card](https://goreportcard.com/badge/github.com/curoverse/arvados)](https://goreportcard.com/report/github.com/curoverse/arvados)
The Arvados public bug tracker is located at https://dev.arvados.org/projects/arvados/issues
workflow = Workflow.find? template_uuid
if workflow.definition
begin
- wf_json = YAML::load(workflow.definition)
+ wf_json = ActiveSupport::HashWithIndifferentAccess.new YAML::load(workflow.definition)
rescue => e
logger.error "Error converting definition yaml to json: #{e.message}"
raise ArgumentError, "Error converting definition yaml to json: #{e.message}"
attrs['cwd'] = "/var/spool/cwl"
attrs['output_path'] = "/var/spool/cwl"
+ input_defaults = {}
+ if wf_json
+ inputs = get_cwl_inputs(wf_json)
+ inputs.each do |input|
+ if input[:default]
+ input_defaults[cwl_shortname(input[:id])] = input[:default]
+ end
+ end
+ end
+
# mounts
mounts = {
"/var/lib/cwl/cwl.input.json" => {
"kind" => "json",
- "content" => {}
+ "content" => input_defaults
},
"stdout" => {
"kind" => "file",
def cputime
if state_label != "Queued"
if started_at
- (runtime_constraints.andand[:min_nodes] || 1) * ((finished_at || Time.now()) - started_at)
+ (runtime_constraints.andand[:min_nodes] || 1).to_i * ((finished_at || Time.now()) - started_at)
end
end
end
if children.any?
cpu_time = children.map { |c|
if c.started_at
- (c.runtime_constraints.andand[:min_nodes] || 1) * ((c.finished_at || Time.now()) - c.started_at)
+ (c.runtime_constraints.andand[:min_nodes] || 1).to_i * ((c.finished_at || Time.now()) - c.started_at)
else
0
end
}.reduce(:+) || 0
else
if started_at
- cpu_time = (runtime_constraints.andand[:min_nodes] || 1) * ((finished_at || Time.now()) - started_at)
+ cpu_time = (runtime_constraints.andand[:min_nodes] || 1).to_i * ((finished_at || Time.now()) - started_at)
end
end
-<% n_inputs = cwl_inputs_required(@object, get_cwl_inputs(@object.mounts[:"/var/lib/cwl/workflow.json"][:content]), [:mounts, :"/var/lib/cwl/cwl.input.json", :content]) %>
+<%
+n_inputs = if @object.mounts[:"/var/lib/cwl/workflow.json"] && @object.mounts[:"/var/lib/cwl/cwl.input.json"]
+ cwl_inputs_required(@object, get_cwl_inputs(@object.mounts[:"/var/lib/cwl/workflow.json"][:content]), [:mounts, :"/var/lib/cwl/cwl.input.json", :content])
+ else
+ 0
+ end
+%>
<% content_for :pi_input_form do %>
<form role="form" style="width:60%">
<div class="form-group">
- <% workflow = @object.mounts[:"/var/lib/cwl/workflow.json"][:content] %>
- <% inputs = get_cwl_inputs(workflow) %>
- <% inputs.each do |input| %>
- <label for="#input-<%= cwl_shortname(input[:id]) %>">
- <%= input[:label] || cwl_shortname(input[:id]) %>
- </label>
- <div>
- <p class="form-control-static">
- <%= render_cwl_input @object, input, [:mounts, :"/var/lib/cwl/cwl.input.json", :content] %>
+ <% workflow = @object.mounts[:"/var/lib/cwl/workflow.json"].andand[:content] %>
+ <% if workflow %>
+ <% inputs = get_cwl_inputs(workflow) %>
+ <% inputs.each do |input| %>
+ <label for="#input-<%= cwl_shortname(input[:id]) %>">
+ <%= input[:label] || cwl_shortname(input[:id]) %>
+ </label>
+ <div>
+ <p class="form-control-static">
+ <%= render_cwl_input @object, input, [:mounts, :"/var/lib/cwl/cwl.input.json", :content] %>
+ </p>
+ </div>
+ <p class="help-block">
+ <%= input[:doc] %>
</p>
- </div>
- <p class="help-block">
- <%= input[:doc] %>
- </p>
+ <% end %>
<% end %>
</div>
</form>
<div class="col-md-3">
<% if current_job[:started_at] %>
<% walltime = ((if current_job[:finished_at] then current_job[:finished_at] else Time.now() end) - current_job[:started_at]) %>
- <% cputime = (current_job[:runtime_constraints].andand[:min_nodes] || 1) *
+ <% cputime = (current_job[:runtime_constraints].andand[:min_nodes] || 1).to_i *
((current_job[:finished_at] || Time.now()) - current_job[:started_at]) %>
<%= render_runtime(walltime, false) %>
<% if cputime > 0 %> / <%= render_runtime(cputime, false) %> (<%= (cputime/walltime).round(1) %>⨯)<% end %>
<%
cputime = pipeline_jobs.map { |j|
if j[:job][:started_at]
- (j[:job][:runtime_constraints].andand[:min_nodes] || 1) * ((j[:job][:finished_at] || Time.now()) - j[:job][:started_at])
+ (j[:job][:runtime_constraints].andand[:min_nodes] || 1).to_i * ((j[:job][:finished_at] || Time.now()) - j[:job][:started_at])
else
0
end
wait_for_ajax
assert_text 'This container is queued'
end
+
+ test "Run button enabled when workflow is empty and no inputs are needed" do
+ visit page_with_token("active")
+
+ find('.btn', text: 'Run a process').click
+ within('.modal-dialog') do
+ find('.selectable', text: 'Valid workflow with no definition yaml').click
+ find('.btn', text: 'Next: choose inputs').click
+ end
+
+ assert_text 'This workflow does not need any further inputs'
+ page.assert_selector 'a', text: 'Run'
+ end
end
end
[
- ['Two Part Pipeline Template', 'part-one', 'Provide a value for the following'],
- ['Workflow with input specifications', 'this workflow has inputs specified', 'Provide a value for the following'],
+ ['Pipeline with default input specifications', 'part-one', 'Provide values for the following'],
+ ['Workflow with default input specifications', 'this workflow has inputs specified', 'Provide a value for the following'],
].each do |template_name, preview_txt, process_txt|
test "run a process using template #{template_name} from dashboard" do
visit page_with_token('admin')
# in the process page now
assert_text process_txt
assert_selector 'a', text: template_name
+
+ assert_equal "Set value for ex_string_def", find('div.form-group > div > p.form-control-static > a', text: "hello-testing-123")[:"data-title"]
+
+ page.assert_selector 'a.disabled,button.disabled', text: 'Run'
end
end
if test -z "$packages" ; then
packages="arvados-api-server
- arvados-data-manager
arvados-docker-cleaner
arvados-git-httpd
arvados-node-manager
"Supervise a single Crunch container"
package_go_binary services/crunchstat crunchstat \
"Gather cpu/memory/network statistics of running Crunch jobs"
-package_go_binary services/datamanager arvados-data-manager \
- "Ensure block replication levels, report disk usage, and determine which blocks should be deleted when space is needed"
package_go_binary services/keep-balance keep-balance \
"Rebalance and garbage-collect data blocks stored in Arvados Keep"
package_go_binary services/keepproxy keepproxy \
# 2014-05-15
cd $WORKSPACE/packages/$TARGET
rm -rf "$WORKSPACE/sdk/python/build"
-fpm_build $WORKSPACE/sdk/python "${PYTHON2_PKG_PREFIX}-arvados-python-client" 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/sdk/python/arvados_python_client.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados Python SDK" --deb-recommends=git
+fpm_build $WORKSPACE/sdk/python "${PYTHON2_PKG_PREFIX}-arvados-python-client" 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/sdk/python/arvados_python_client.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados Python SDK" --depends "${PYTHON2_PKG_PREFIX}-setuptools" --deb-recommends=git
# cwl-runner
cd $WORKSPACE/packages/$TARGET
# not omit the python- prefix first.
cd $WORKSPACE/packages/$TARGET
rm -rf "$WORKSPACE/services/fuse/build"
-fpm_build $WORKSPACE/services/fuse "${PYTHON2_PKG_PREFIX}-arvados-fuse" 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/fuse/arvados_fuse.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Keep FUSE driver"
+fpm_build $WORKSPACE/services/fuse "${PYTHON2_PKG_PREFIX}-arvados-fuse" 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/fuse/arvados_fuse.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Keep FUSE driver" --depends "${PYTHON2_PKG_PREFIX}-setuptools"
# The node manager
cd $WORKSPACE/packages/$TARGET
rm -rf "$WORKSPACE/services/nodemanager/build"
-fpm_build $WORKSPACE/services/nodemanager arvados-node-manager 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/nodemanager/arvados_node_manager.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados node manager"
+fpm_build $WORKSPACE/services/nodemanager arvados-node-manager 'Curoverse, Inc.' 'python' "$(awk '($1 == "Version:"){print $2}' $WORKSPACE/services/nodemanager/arvados_node_manager.egg-info/PKG-INFO)" "--url=https://arvados.org" "--description=The Arvados node manager" --depends "${PYTHON2_PKG_PREFIX}-setuptools"
# The Docker image cleaner
cd $WORKSPACE/packages/$TARGET
sdk/go/keepclient
services/keep-balance
services/keepproxy
- services/datamanager/summary
- services/datamanager/collection
- services/datamanager/keep
- services/datamanager
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/crunch-run
- Welcome:
- user/index.html.textile.liquid
- user/getting_started/community.html.textile.liquid
- - Run a pipeline using Workbench:
+ - Run a workflow using Workbench:
- user/getting_started/workbench.html.textile.liquid
- - user/tutorials/tutorial-pipeline-workbench.html.textile.liquid
+ - user/tutorials/tutorial-workflow-workbench.html.textile.liquid
- Access an Arvados virtual machine:
- user/getting_started/vm-login-with-webshell.html.textile.liquid
- user/getting_started/ssh-access-unix.html.textile.liquid
- user/cwl/cwl-runner.html.textile.liquid
- user/cwl/cwl-style.html.textile.liquid
- Working on the command line:
+ - user/topics/running-workflow-command-line.html.textile.liquid
- user/topics/running-pipeline-command-line.html.textile.liquid
- user/topics/arv-run.html.textile.liquid
- Working with git repositories:
- user/tutorials/add-new-repository.html.textile.liquid
- user/tutorials/git-arvados-guide.html.textile.liquid
- - Develop an Arvados pipeline:
+ - Develop an Arvados workflow:
- user/tutorials/intro-crunch.html.textile.liquid
+ - user/tutorials/writing-cwl-workflow.html.textile.liquid
- user/tutorials/running-external-program.html.textile.liquid
- user/topics/crunch-tools-overview.html.textile.liquid
- user/tutorials/tutorial-firstscript.html.textile.liquid
--- /dev/null
+h3. Submit a workflow and wait for results
+
+Use @arvados-cwl-runner@ to submit CWL workflows to Arvados. After submitting the job, it will wait for the workflow to complete and print out the final result to standard output.
+
+*Note:* Once submitted, the workflow runs entirely on Arvados, so even if you interrupt @arvados-cwl-runner@ or log out, the workflow will continue to run.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner bwa-mem.cwl bwa-mem-input.yml</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-06-30 14:56:36 arvados.arv-run[27002] INFO: Upload local files: "bwa-mem.cwl"
+2016-06-30 14:56:36 arvados.arv-run[27002] INFO: Uploaded to qr1hi-4zz18-h7ljh5u76760ww2
+2016-06-30 14:56:40 arvados.cwl-runner[27002] INFO: Submitted job qr1hi-8i9sb-fm2n3b1w0l6bskg
+2016-06-30 14:56:41 arvados.cwl-runner[27002] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-fm2n3b1w0l6bskg) is Running
+2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-fm2n3b1w0l6bskg) is Complete
+2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
+{
+ "aligned_sam": {
+ "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
+ "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
+ "class": "File",
+ "size": 30738986
+ }
+}
+</code></pre>
+</notextile>
+
+h3. Submit a workflow with no waiting
+
+To submit a workflow and exit immediately, use the @--no-wait@ option. This will submit the workflow to Arvados, print out the UUID of the job that was submitted to standard output, and exit.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --no-wait bwa-mem.cwl bwa-mem-input.yml</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-06-30 15:07:52 arvados.arv-run[12480] INFO: Upload local files: "bwa-mem.cwl"
+2016-06-30 15:07:52 arvados.arv-run[12480] INFO: Uploaded to qr1hi-4zz18-eqnfwrow8aysa9q
+2016-06-30 15:07:52 arvados.cwl-runner[12480] INFO: Submitted job qr1hi-8i9sb-fm2n3b1w0l6bskg
+qr1hi-8i9sb-fm2n3b1w0l6bskg
+</code></pre>
+</notextile>
+
+h3. Run a workflow locally
+
+To run a workflow with local control, use @--local@. This means that the host where you run @arvados-cwl-runner@ will be responsible for submitting jobs. With @--local@, if you interrupt @arvados-cwl-runner@ or log out, the workflow will be terminated.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --local bwa-mem.cwl bwa-mem-input.yml</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 10:05:19 arvados.cwl-runner[16290] INFO: Pipeline instance qr1hi-d1hrv-92wcu6ldtio74r4
+2016-07-01 10:05:28 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Queued
+2016-07-01 10:05:29 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Running
+2016-07-01 10:05:45 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Complete
+2016-07-01 10:05:46 arvados.cwl-runner[16290] INFO: Overall process status is success
+{
+ "aligned_sam": {
+ "size": 30738986,
+ "path": "keep:15f56bad0aaa7364819bf14ca2a27c63+88/HWI-ST1027_129_D0THKACXX.1_1.sam",
+ "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
+ "class": "File"
+ }
+}
+</code></pre>
+</notextile>
--- /dev/null
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to qr1hi-4zz18-7e0hedrmkuyoei3
+2016-07-01 12:21:01 arvados.cwl-runner[15796] INFO: Created template qr1hi-p5p6p-rjleou1dwr167v5
+qr1hi-p5p6p-rjleou1dwr167v5
+</code></pre>
+</notextile>
+
+You can provide a partial input file to set default values for the workflow input parameters:
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
+2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to qr1hi-4zz18-0f91qkovk4ml18o
+2016-07-01 14:09:50 arvados.cwl-runner[3730] INFO: Created template qr1hi-p5p6p-0deqe6nuuyqns2i
+qr1hi-p5p6p-0deqe6nuuyqns2i
+</code></pre>
+</notextile>
--- /dev/null
+The "Common Workflow Language (CWL)":http://commonwl.org is a multi-vendor open standard for describing analysis tools and workflows that are portable across a variety of platforms. CWL is the recommended way to develop and run workflows for Arvados. Arvados supports the "CWL v1.0":http://commonwl.org/v1.0 specification.
title: Using Common Workflow Language
...
-The "Common Workflow Language (CWL)":http://commonwl.org is a multi-vendor open standard for describing analysis tools and workflows that are portable across a variety of platforms. CWL is the recommended way to develop and run workflows for Arvados. Arvados supports the "CWL v1.0":http://commonwl.org/v1.0 specification.
+{% include 'what_is_cwl' %}
{% include 'tutorial_expectations' %}
-h2. Setting up
+h2. Preparing to work with Arvados CWL runner
-The @arvados-cwl-runner@ client is installed by default on Arvados shell nodes. However, if you do not have @arvados-cwl-runner@, you may install it using @pip@:
+h3. arvados-cwl-runner
+
+The @arvados-cwl-runner@ client is installed by default on Arvados shell nodes.
+
+However, if you do not have @arvados-cwl-runner@, you may install it using @pip@:
<notextile>
<pre><code>~$ <span class="userinput">virtualenv ~/venv</span>
</code></pre>
</notextile>
-h3. Docker
+h3. Check Docker access
-Certain features of @arvados-cwl-runner@ require access to Docker. You can determine if you have access to Docker by running @docker version@:
+Certain features of @arvados-cwl-runner@ require access to Docker.
+
+You can determine if you have access to Docker by running @docker version@:
<notextile>
<pre><code>~$ <span class="userinput">docker version</span>
</code></pre>
</notextile>
-If this returns an error, contact the sysadmin of your cluster for assistance. Alternatively, if you have Docker installed on your local workstation, you may follow the instructions above to install @arvados-cwl-runner@.
+If this returns an error, contact the sysadmin of your cluster for assistance.
-h3. Getting the example files
+h3. Get the example files
The tutorial files are located in the documentation section of the Arvados source repository:
h2. Submitting a workflow to an Arvados cluster
-Use @arvados-cwl-runner@ to submit CWL workflows to Arvados. After submitting the job, it will wait for the workflow to complete and print out the final result to standard output. Note that once submitted, the workflow runs entirely on Arvados, so even if you interrupt @arvados-cwl-runner@ or log out, the workflow will continue to run.
-
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner bwa-mem.cwl bwa-mem-input.yml</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-06-30 14:56:36 arvados.arv-run[27002] INFO: Upload local files: "bwa-mem.cwl"
-2016-06-30 14:56:36 arvados.arv-run[27002] INFO: Uploaded to qr1hi-4zz18-h7ljh5u76760ww2
-2016-06-30 14:56:40 arvados.cwl-runner[27002] INFO: Submitted job qr1hi-8i9sb-fm2n3b1w0l6bskg
-2016-06-30 14:56:41 arvados.cwl-runner[27002] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-fm2n3b1w0l6bskg) is Running
-2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-fm2n3b1w0l6bskg) is Complete
-2016-06-30 14:57:12 arvados.cwl-runner[27002] INFO: Overall process status is success
-{
- "aligned_sam": {
- "path": "keep:54325254b226664960de07b3b9482349+154/HWI-ST1027_129_D0THKACXX.1_1.sam",
- "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
- "class": "File",
- "size": 30738986
- }
-}
-</code></pre>
-</notextile>
-
-To submit a workflow and exit immediately, use the @--no-wait@ option. This will print out the uuid of the job that was submitted to standard output.
-
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --no-wait bwa-mem.cwl bwa-mem-input.yml</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-06-30 15:07:52 arvados.arv-run[12480] INFO: Upload local files: "bwa-mem.cwl"
-2016-06-30 15:07:52 arvados.arv-run[12480] INFO: Uploaded to qr1hi-4zz18-eqnfwrow8aysa9q
-2016-06-30 15:07:52 arvados.cwl-runner[12480] INFO: Submitted job qr1hi-8i9sb-fm2n3b1w0l6bskg
-qr1hi-8i9sb-fm2n3b1w0l6bskg
-</code></pre>
-</notextile>
-
-To run a workflow with local control, use @--local@. This means that the host where you run @arvados-cwl-runner@ will be responsible for submitting jobs. With @--local@, if you interrupt @arvados-cwl-runner@ or log out, the workflow will be terminated.
-
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --local bwa-mem.cwl bwa-mem-input.yml</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-07-01 10:05:19 arvados.cwl-runner[16290] INFO: Pipeline instance qr1hi-d1hrv-92wcu6ldtio74r4
-2016-07-01 10:05:28 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Queued
-2016-07-01 10:05:29 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Running
-2016-07-01 10:05:45 arvados.cwl-runner[16290] INFO: Job bwa-mem.cwl (qr1hi-8i9sb-2nzzfbuf9zjrj4g) is Complete
-2016-07-01 10:05:46 arvados.cwl-runner[16290] INFO: Overall process status is success
-{
- "aligned_sam": {
- "size": 30738986,
- "path": "keep:15f56bad0aaa7364819bf14ca2a27c63+88/HWI-ST1027_129_D0THKACXX.1_1.sam",
- "checksum": "sha1$0dc46a3126d0b5d4ce213b5f0e86e2d05a54755a",
- "class": "File"
- }
-}
-</code></pre>
-</notextile>
+{% include 'arvados_cwl_runner' %}
h2. Work reuse
Use @--create-workflow@ to register a CWL workflow with Arvados. This enables you to share workflows with other Arvados users, and run them by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard.
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
-2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to qr1hi-4zz18-7e0hedrmkuyoei3
-2016-07-01 12:21:01 arvados.cwl-runner[15796] INFO: Created template qr1hi-p5p6p-rjleou1dwr167v5
-qr1hi-p5p6p-rjleou1dwr167v5
-</code></pre>
-</notextile>
-
-You can provide a partial input file to set default values for the workflow input parameters:
-
-<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
-arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
-2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
-2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to qr1hi-4zz18-0f91qkovk4ml18o
-2016-07-01 14:09:50 arvados.cwl-runner[3730] INFO: Created template qr1hi-p5p6p-0deqe6nuuyqns2i
-qr1hi-p5p6p-0deqe6nuuyqns2i
-</code></pre>
-</notextile>
+{% include 'register_cwl_workflow' %}
h2. Making workflows directly executable
h2. Developing workflows
-For an introduction and and detailed documentation about writing CWL, see the "User Guide":http://commonwl.org/v1.0/UserGuide.html and the "Specification":http://commonwl.org/v1.0 .
+For an introduction and and detailed documentation about writing CWL, see the "CWL User Guide":http://commonwl.org/v1.0/UserGuide.html and the "CWL Specification":http://commonwl.org/v1.0 .
To run on Arvados, a workflow should provide a @DockerRequirement@ in the @hints@ section.
title: "Using arv-copy"
...
+{% include 'crunch1only_begin' %}
+On those sites, the "copy a pipeline template" feature described below is not available. However, "copy a workflow" feature is not yet implemented.
+{% include 'crunch1only_end' %}
This tutorial describes how to copy Arvados objects from one cluster to another by using @arv-copy@.
--- /dev/null
+---
+layout: default
+navsection: userguide
+title: "Running an Arvados workflow"
+...
+
+{% include 'what_is_cwl' %}
+
+{% include 'tutorial_expectations' %}
+
+h2. arvados-cwl-runner
+
+The arvados-cwl-runner tool can be used to submit workflows to Arvados cluster using the command prompt.
+
+The following examples assume that you have prepared to run arvados-cwl-runner tool as explained in the "Using Common Workflow Language":{{site.baseurl}}/user/topics/running-workflow-command-line.html.textile.liquid page.
+
+{% include 'arvados_cwl_runner' %}
+++ /dev/null
----
-layout: default
-navsection: userguide
-title: "Running a pipeline using Workbench"
-...
-
-{% include 'crunch1only_begin' %}
-On those sites, the details will be slightly different and the example pipeline might not be available.
-{% include 'crunch1only_end' %}
-
-A "pipeline" (sometimes called a "workflow" in other systems) is a sequence of steps that apply various programs or tools to transform input data to output data. Pipelines are the principal means of performing computation with Arvados. This tutorial demonstrates how to run a single-stage pipeline to take a small data set of paired-end reads from a sample "exome":https://en.wikipedia.org/wiki/Exome in "FASTQ":https://en.wikipedia.org/wiki/FASTQ_format format and align them to "Chromosome 19":https://en.wikipedia.org/wiki/Chromosome_19_%28human%29 using the "bwa mem":http://bio-bwa.sourceforge.net/ tool, producing a "Sequence Alignment/Map (SAM)":https://samtools.github.io/ file. This tutorial will introduce the following Arvados features:
-
-<div>
-* How to create a new pipeline from an existing template.
-* How to browse and select input data for the pipeline and submit the pipeline to run on the Arvados cluster.
-* How to access your pipeline results.
-</div>
-
-notextile. <div class="spaced-out">
-
-h3. Steps
-
-# Start from the *Workbench Dashboard*. You can access the Dashboard by clicking on *<i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard* in the upper left corner of any Workbench page.
-# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button. This will open a dialog box titled *Choose a pipeline to run*.
-# In the search box, type in *Tutorial align using bwa mem*.
-# Select *<i class="fa fa-fw fa-gear"></i> Tutorial align using bwa mem* and click the <span class="btn btn-sm btn-primary" >Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span> button. This will create a new pipeline in your *Home* project and will open it. You can now supply the inputs for the pipeline.
-# The first input parameter to the pipeline is *"reference_collection" parameter for run-command script in bwa-mem component*. Click the <span class="btn btn-sm btn-primary">Choose</span> button beneath that header. This will open a dialog box titled *Choose a dataset for "reference_collection" parameter for run-command script in bwa-mem component*.
-# Open the *Home <span class="caret"></span>* menu and select *All Projects*. Search for and select *<i class="fa fa-fw fa-archive"></i> Tutorial chromosome 19 reference* and click the <span class="btn btn-sm btn-primary" >OK</span> button.
-# Repeat the previous two steps to set the *"sample" parameter for run-command script in bwa-mem component* parameter to *<i class="fa fa-fw fa-archive"></i> Tutorial sample exome*.
-# Click on the <span class="btn btn-sm btn-primary" >Run <i class="fa fa-fw fa-play"></i></span> button. The page updates to show you that the pipeline has been submitted to run on the Arvados cluster.
-# After the pipeline starts running, you can track the progress by watching log messages from jobs. This page refreshes automatically. You will see a <span class="label label-success">complete</span> label when the pipeline completes successfully.
-# Click on the *Output* link to see the results of the job. This will load a new page listing the output files from this pipeline. You'll see the output SAM file from the alignment tool under the *Files* tab.
-# Click on the <span class="btn btn-sm btn-info"><i class="fa fa-download"></i></span> download button to the right of the SAM file to download your results.
-
-notextile. </div>
--- /dev/null
+---
+layout: default
+navsection: userguide
+title: "Running a workflow using Workbench"
+...
+
+A "workflow" (sometimes called a "pipeline" in other systems) is a sequence of steps that apply various programs or tools to transform input data to output data. Workflows are the principal means of performing computation with Arvados. This tutorial demonstrates how to run a single-stage workflow to take a small data set of paired-end reads from a sample "exome":https://en.wikipedia.org/wiki/Exome in "FASTQ":https://en.wikipedia.org/wiki/FASTQ_format format and align them to "Chromosome 19":https://en.wikipedia.org/wiki/Chromosome_19_%28human%29 using the "bwa mem":http://bio-bwa.sourceforge.net/ tool, producing a "Sequence Alignment/Map (SAM)":https://samtools.github.io/ file. This tutorial will introduce the following Arvados features:
+
+<div>
+* How to create a new process from an existing workflow.
+* How to browse and select input data for the workflow and submit the process to run on the Arvados cluster.
+* How to access your process results.
+</div>
+
+h3. Steps
+
+# Start from the *Workbench Dashboard*. You can access the Dashboard by clicking on *<i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard* in the upper left corner of any Workbench page.
+# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button. This will open a dialog box titled *Choose a pipeline or workflow to run*.
+# In the search box, type in *Tutorial bwa mem cwl*.
+# Select *<i class="fa fa-fw fa-gear"></i> Tutorial bwa mem cwl* and click the <span class="btn btn-sm btn-primary" >Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span> button. This will create a new process in your *Home* project and will open it. You can now supply the inputs for the process. Please note that all required inputs are populated with default values and you can change them if you prefer.
+# For example, let's see how to change *"reference" parameter* for this workflow. Click the <span class="btn btn-sm btn-primary">Choose</span> button beneath the *"reference" parameter* header. This will open a dialog box titled *Choose a dataset for "reference" parameter for cwl-runner in bwa-mem.cwl component*.
+# Open the *Home <span class="caret"></span>* menu and select *All Projects*. Search for and select *<i class="fa fa-fw fa-archive"></i> Tutorial chromosome 19 reference*. You will then see a list of files. Select *<i class="fa fa-fw fa-file"></i> 19-fasta.bwt* and click the <span class="btn btn-sm btn-primary" >OK</span> button.
+# Repeat the previous two steps to set the *"read_p1" parameter for cwl-runner script in bwa-mem.cwl component* and *"read_p2" parameter for cwl-runner script in bwa-mem.cwl component* parameters.
+# Click on the <span class="btn btn-sm btn-primary" >Run <i class="fa fa-fw fa-play"></i></span> button. The page updates to show you that the process has been submitted to run on the Arvados cluster.
+# After the process starts running, you can track the progress by watching log messages from the component(s). This page refreshes automatically. You will see a <span class="label label-success">complete</span> label when the process completes successfully.
+# Click on the *Output* link to see the results of the process. This will load a new page listing the output files from this process. You'll see the output SAM file from the alignment tool under the *Files* tab.
+# Click on the <span class="btn btn-sm btn-info"><i class="fa fa-download"></i></span> download button to the right of the SAM file to download your results.
--- /dev/null
+---
+layout: default
+navsection: userguide
+title: "Writing a CWL workflow"
+...
+
+{% include 'what_is_cwl' %}
+
+{% include 'tutorial_expectations' %}
+
+h2. Registering a CWL workflow
+
+Use @--create-workflow@ to register a CWL workflow with Arvados.
+
+The following examples assume that you have prepared to run arvados-cwl-runner tool as explained in the "Using Common Workflow Language":{{site.baseurl}}/user/topics/running-workflow-command-line.html.textile.liquid page.
+
+{% include 'register_cwl_workflow' %}
+
+h2. Running a CWL workflow
+
+h3. Running a workflow at command prompt
+
+Not yet implemented
+
+h3. Running a workflow using Workbench
+
+The workflow can also be executed using Workbench. Go to the Workbench Dashboard and click the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button and select the desired workflow.
+
."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
.q{&& declare -a VOLUMES=() }
- .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner") ; fi }
- .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt") ; }
- .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt") ; fi };
+ .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner:ro") ; fi }
+ .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt:ro") ; }
+ .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt:ro") ; fi };
$command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
$ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
assert_match /^usage:/, err
end
+ def test_get_version
+ out, err = capture_subprocess_io do
+ assert_arv_get '--version'
+ end
+ assert_empty(out, "STDOUT not expected: '#{out}'")
+ assert_match(/[0-9]+\.[0-9]+\.[0-9]+/, err, "Version information incorrect: '#{err}'")
+ end
+
def test_help
out, err = capture_subprocess_io do
assert_arv_get '-h'
tmpl = RunnerTemplate(self, tool, job_order,
kwargs.get("enable_reuse"),
uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"))
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs.get("name"))
tmpl.save()
# cwltool.main will write our return value to stdout.
return tmpl.uuid
return upload_workflow(self, tool, job_order,
self.project_uuid,
uuid=existing_uuid,
- submit_runner_ram=kwargs.get("submit_runner_ram"))
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs.get("name"))
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
**kwargs).next()
else:
runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
- self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
+ self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs.get("name"))
else:
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
- self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"))
+ self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs.get("name"))
if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
# Create pipeline for local run
self.pipeline = self.api.pipeline_instances().create(
body={
"owner_uuid": self.project_uuid,
- "name": shortname(tool.tool["id"]),
+ "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
"components": {},
"state": "RunningOnClient"}).execute(num_retries=self.num_retries)
logger.info("Pipeline instance %s", self.pipeline["uuid"])
help="RAM (in MiB) required for the workflow runner job (default 1024)",
default=1024)
+ parser.add_argument("--name", type=str,
+ help="Name to use for workflow execution instance.",
+ default=None)
+
parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
job_order_object = None
arvargs = parser.parse_args(args)
+ if arvargs.version:
+ print versionstring()
+ return
+
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'
self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().create(
body={
"owner_uuid": self.arvrunner.project_uuid,
- "name": shortname(self.tool.tool["id"]),
+ "name": self.name,
"components": {"cwl-runner": job_spec },
"state": "RunningOnServer"}).execute(num_retries=self.arvrunner.num_retries)
logger.info("Created pipeline %s", self.arvrunner.pipeline["uuid"])
'string': 'text',
}
- def __init__(self, runner, tool, job_order, enable_reuse, uuid, submit_runner_ram=0):
+ def __init__(self, runner, tool, job_order, enable_reuse, uuid,
+ submit_runner_ram=0, name=None):
self.runner = runner
self.tool = tool
self.job = RunnerJob(
enable_reuse=enable_reuse,
output_name=None,
output_tags=None,
- submit_runner_ram=submit_runner_ram)
+ submit_runner_ram=submit_runner_ram,
+ name=name)
self.uuid = uuid
def pipeline_component_spec(self):
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None, submit_runner_ram=0):
+def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+ submit_runner_ram=0, name=None):
upload_docker(arvRunner, tool)
document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
if sn in job_order:
inp["default"] = job_order[sn]
- name = os.path.basename(tool.tool["id"])
+ if not name:
+ name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
+
upload_dependencies(arvRunner, name, document_loader,
packed, uri, False)
body = {
"workflow": {
- "name": tool.tool.get("label", name),
+ "name": name,
"description": tool.tool.get("doc", ""),
"definition":yaml.safe_dump(packed)
}}
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse,
- output_name, output_tags, submit_runner_ram=0):
+ output_name, output_tags, submit_runner_ram=0,
+ name=None):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
self.final_output = None
self.output_name = output_name
self.output_tags = output_tags
+ self.name = name
+
if submit_runner_ram:
self.submit_runner_ram = submit_runner_ram
else:
pass
def arvados_job_spec(self, *args, **kwargs):
- self.name = os.path.basename(self.tool.tool["id"])
+ if self.name is None:
+ self.name = os.path.basename(self.tool.tool["id"])
workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
adjustDirObjs(self.job_order, trim_listing)
return workflowmapper
# when updating the cwltool version pin.
install_requires=[
'cwltool==1.0.20161128202906',
- 'arvados-python-client>=0.1.20160826210445'
+ 'arvados-python-client>=0.1.20160826210445',
+ 'setuptools'
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
+
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_pipeline_name(self, stubs, tm):
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--debug", "--name=hello job 123",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.expect_pipeline_instance["name"] = "hello job 123"
+
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_pipeline_uuid + '\n')
+
@mock.patch("time.sleep")
@stubs
def test_submit_output_tags(self, stubs, tm):
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_container_name(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--name=hello container 123",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ stubs.expect_container_spec["name"] = "hello container 123"
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
@mock.patch("arvados.commands.keepdocker.find_one_image_hash")
@mock.patch("cwltool.docker.get_image")
@mock.patch("arvados.api")
self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner))
class TestCreateTemplate(unittest.TestCase):
- @stubs
- def test_create(self, stubs):
- project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
-
- capture_stdout = cStringIO.StringIO()
-
- exited = arvados_cwl.main(
- ["--create-workflow", "--debug",
- "--project-uuid", project_uuid,
- "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
- capture_stdout, sys.stderr, api_client=stubs.api)
- self.assertEqual(exited, 0)
-
- stubs.api.pipeline_instances().create.refute_called()
- stubs.api.jobs().create.refute_called()
+ existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
- expect_component = copy.deepcopy(stubs.expect_job_spec)
+ def _adjust_script_params(self, expect_component):
expect_component['script_parameters']['x'] = {
'dataclass': 'File',
'required': True,
'required': True,
'type': 'Directory',
}
+
+ @stubs
+ def test_create(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--create-workflow", "--debug",
+ "--api=jobs",
+ "--project-uuid", project_uuid,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.api.pipeline_instances().create.refute_called()
+ stubs.api.jobs().create.refute_called()
+
+ expect_component = copy.deepcopy(stubs.expect_job_spec)
+ self._adjust_script_params(expect_component)
expect_template = {
"components": {
"submit_wf.cwl": expect_component,
stubs.expect_pipeline_template_uuid + '\n')
+ @stubs
+ def test_create_name(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--create-workflow", "--debug",
+ "--project-uuid", project_uuid,
+ "--api=jobs",
+ "--name", "testing 123",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.api.pipeline_instances().create.refute_called()
+ stubs.api.jobs().create.refute_called()
+
+ expect_component = copy.deepcopy(stubs.expect_job_spec)
+ self._adjust_script_params(expect_component)
+ expect_template = {
+ "components": {
+ "testing 123": expect_component,
+ },
+ "name": "testing 123",
+ "owner_uuid": project_uuid,
+ }
+ stubs.api.pipeline_templates().create.assert_called_with(
+ body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
+
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_pipeline_template_uuid + '\n')
+
+
+ @stubs
+ def test_update_name(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--update-workflow", self.existing_template_uuid,
+ "--debug",
+ "--project-uuid", project_uuid,
+ "--api=jobs",
+ "--name", "testing 123",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.api.pipeline_instances().create.refute_called()
+ stubs.api.jobs().create.refute_called()
+
+ expect_component = copy.deepcopy(stubs.expect_job_spec)
+ self._adjust_script_params(expect_component)
+ expect_template = {
+ "components": {
+ "testing 123": expect_component,
+ },
+ "name": "testing 123",
+ "owner_uuid": project_uuid,
+ }
+ stubs.api.pipeline_templates().create.refute_called()
+ stubs.api.pipeline_templates().update.assert_called_with(
+ body=JsonDiffMatcher(expect_template), uuid=self.existing_template_uuid)
+
+ self.assertEqual(capture_stdout.getvalue(),
+ self.existing_template_uuid + '\n')
+
+
class TestCreateWorkflow(unittest.TestCase):
existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
expect_workflow = open("tests/wf/expect_packed.cwl").read()
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_workflow_uuid + '\n')
+
+ @stubs
+ def test_create_name(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--create-workflow", "--debug",
+ "--api=containers",
+ "--project-uuid", project_uuid,
+ "--name", "testing 123",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.api.pipeline_templates().create.refute_called()
+ stubs.api.container_requests().create.refute_called()
+
+ body = {
+ "workflow": {
+ "owner_uuid": project_uuid,
+ "name": "testing 123",
+ "description": "",
+ "definition": self.expect_workflow,
+ }
+ }
+ stubs.api.workflows().create.assert_called_with(
+ body=JsonDiffMatcher(body))
+
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_workflow_uuid + '\n')
+
@stubs
def test_incompatible_api(self, stubs):
capture_stderr = cStringIO.StringIO()
self.existing_workflow_uuid + '\n')
+ @stubs
+ def test_update_name(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--update-workflow", self.existing_workflow_uuid,
+ "--debug", "--name", "testing 123",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ body = {
+ "workflow": {
+ "name": "testing 123",
+ "description": "",
+ "definition": self.expect_workflow,
+ }
+ }
+ stubs.api.workflows().update.assert_called_with(
+ uuid=self.existing_workflow_uuid,
+ body=JsonDiffMatcher(body))
+ self.assertEqual(capture_stdout.getvalue(),
+ self.existing_workflow_uuid + '\n')
+
+
class TestTemplateInputs(unittest.TestCase):
expect_template = {
"components": {
@stubs
def test_inputs_empty(self, stubs):
exited = arvados_cwl.main(
- ["--create-template", "--no-wait",
+ ["--create-template",
"tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
@stubs
def test_inputs(self, stubs):
exited = arvados_cwl.main(
- ["--create-template", "--no-wait",
+ ["--create-template",
"tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
+++ /dev/null
-// Logger periodically writes a log to the Arvados SDK.
-//
-// This package is useful for maintaining a log object that is updated
-// over time. This log object will be periodically written to the log,
-// as specified by WriteInterval in the Params.
-//
-// This package is safe for concurrent use as long as:
-// The maps passed to a LogMutator are not accessed outside of the
-// LogMutator
-//
-// Usage:
-// arvLogger := logger.NewLogger(params)
-// arvLogger.Update(func(properties map[string]interface{},
-// entry map[string]interface{}) {
-// // Modifiy properties and entry however you want
-// // properties is a shortcut for entry["properties"].(map[string]interface{})
-// // properties can take any (valid) values you want to give it,
-// // entry will only take the fields listed at
-// // http://doc.arvados.org/api/schema/Log.html
-// // Valid values for properties are anything that can be json
-// // encoded (i.e. will not error if you call json.Marshal() on it.
-// })
-package logger
-
-import (
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "log"
- "time"
-)
-
-const (
- startSuffix = "-start"
- partialSuffix = "-partial"
- finalSuffix = "-final"
- numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
-)
-
-type LoggerParams struct {
- Client *arvadosclient.ArvadosClient // The client we use to write log entries
- EventTypePrefix string // The prefix we use for the event type in the log entry
- WriteInterval time.Duration // Wait at least this long between log writes
-}
-
-// A LogMutator is a function which modifies the log entry.
-// It takes two maps as arguments, properties is the first and entry
-// is the second
-// properties is a shortcut for entry["properties"].(map[string]interface{})
-// properties can take any values you want to give it.
-// entry will only take the fields listed at http://doc.arvados.org/api/schema/Log.html
-// properties and entry are only safe to access inside the LogMutator,
-// they should not be stored anywhere, otherwise you'll risk
-// concurrent access.
-type LogMutator func(map[string]interface{}, map[string]interface{})
-
-// A Logger is used to build up a log entry over time and write every
-// version of it.
-type Logger struct {
- // The data we write
- data map[string]interface{} // The entire map that we give to the api
- entry map[string]interface{} // Convenience shortcut into data
- properties map[string]interface{} // Convenience shortcut into data
-
- params LoggerParams // Parameters we were given
-
- // Variables to coordinate updating and writing.
- modified bool // Has this data been modified since the last write?
- workToDo chan LogMutator // Work to do in the worker thread.
- writeTicker *time.Ticker // On each tick we write the log data to arvados, if it has been modified.
- hasWritten bool // Whether we've written at all yet.
- noMoreWork chan bool // Signals that we're done writing.
-
- writeHooks []LogMutator // Mutators we call before each write.
-}
-
-// Create a new logger based on the specified parameters.
-func NewLogger(params LoggerParams) (l *Logger, err error) {
- // sanity check parameters
- if ¶ms.Client == nil {
- err = fmt.Errorf("Nil arvados client in LoggerParams passed in to NewLogger()")
- return
- }
- if params.EventTypePrefix == "" {
- err = fmt.Errorf("Empty event type prefix in LoggerParams passed in to NewLogger()")
- return
- }
-
- l = &Logger{
- data: make(map[string]interface{}),
- entry: make(map[string]interface{}),
- properties: make(map[string]interface{}),
- params: params,
- workToDo: make(chan LogMutator, 10),
- writeTicker: time.NewTicker(params.WriteInterval),
- noMoreWork: make(chan bool, numberNoMoreWorkMessages)}
-
- l.data["log"] = l.entry
- l.entry["properties"] = l.properties
-
- // Start the worker goroutine.
- go l.work()
-
- return l, nil
-}
-
-// Exported functions will be called from other goroutines, therefore
-// all they are allowed to do is enqueue work to be done in the worker
-// goroutine.
-
-// Enqueues an update. This will happen in another goroutine after
-// this method returns.
-func (l *Logger) Update(mutator LogMutator) {
- l.workToDo <- mutator
-}
-
-// Similar to Update(), but writes the log entry as soon as possible
-// (ignoring MinimumWriteInterval) and blocks until the entry has been
-// written. This is useful if you know that you're about to quit
-// (e.g. if you discovered a fatal error, or you're finished), since
-// go will not wait for timers (including the pending write timer) to
-// go off before exiting.
-func (l *Logger) FinalUpdate(mutator LogMutator) {
- // TODO(misha): Consider not accepting any future updates somehow,
- // since they won't get written if they come in after this.
-
- // Stop the periodic write ticker. We'll perform the final write
- // before returning from this function.
- l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
- l.writeTicker.Stop()
- }
-
- // Apply the final update
- l.workToDo <- mutator
-
- // Perform the final write and signal that we can return.
- l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
- l.write(true)
- for i := 0; i < numberNoMoreWorkMessages; {
- l.noMoreWork <- true
- }
- }
-
- // Wait until we've performed the write.
- <-l.noMoreWork
-}
-
-// Adds a hook which will be called every time this logger writes an entry.
-func (l *Logger) AddWriteHook(hook LogMutator) {
- // We do the work in a LogMutator so that it happens in the worker
- // goroutine.
- l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
- l.writeHooks = append(l.writeHooks, hook)
- }
-}
-
-// The worker loop
-func (l *Logger) work() {
- for {
- select {
- case <-l.writeTicker.C:
- if l.modified {
- l.write(false)
- l.modified = false
- }
- case mutator := <-l.workToDo:
- mutator(l.properties, l.entry)
- l.modified = true
- case <-l.noMoreWork:
- return
- }
- }
-}
-
-// Actually writes the log entry.
-func (l *Logger) write(isFinal bool) {
-
- // Run all our hooks
- for _, hook := range l.writeHooks {
- hook(l.properties, l.entry)
- }
-
- // Update the event type.
- if isFinal {
- l.entry["event_type"] = l.params.EventTypePrefix + finalSuffix
- } else if l.hasWritten {
- l.entry["event_type"] = l.params.EventTypePrefix + partialSuffix
- } else {
- l.entry["event_type"] = l.params.EventTypePrefix + startSuffix
- }
- l.hasWritten = true
-
- // Write the log entry.
- // This is a network write and will take a while, which is bad
- // because we're blocking all the other work on this goroutine.
- //
- // TODO(misha): Consider rewriting this so that we can encode l.data
- // into a string, and then perform the actual write in another
- // routine. This will be tricky and will require support in the
- // client.
- err := l.params.Client.Create("logs", l.data, nil)
- if err != nil {
- log.Printf("Received error writing %v: %v", l.data, err)
- }
-}
+++ /dev/null
-// Helper methods for interacting with Logger.
-package logger
-
-// Retrieves the map[string]interface{} stored at parent[key] if it
-// exists, otherwise it makes it and stores it there.
-// This is useful for logger because you may not know if a map you
-// need has already been created.
-func GetOrCreateMap(
- parent map[string]interface{},
- key string) (child map[string]interface{}) {
- read, exists := parent[key]
- if exists {
- child = read.(map[string]interface{})
-
- } else {
- child = make(map[string]interface{})
- parent[key] = child
- }
- return
-}
+++ /dev/null
-/* Helper methods for dealing with responses from API Server. */
-
-package util
-
-import (
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-)
-
-func UserIsAdmin(arv *arvadosclient.ArvadosClient) (is_admin bool, err error) {
- type user struct {
- IsAdmin bool `json:"is_admin"`
- }
- var u user
- err = arv.Call("GET", "users", "", "current", nil, &u)
- return u.IsAdmin, err
-}
-
-// Returns the total count of a particular type of resource
-//
-// resource - the arvados resource to count
-// return
-// count - the number of items of type resource the api server reports, if no error
-// err - error accessing the resource, or nil if no error
-func NumberItemsAvailable(client *arvadosclient.ArvadosClient, resource string) (count int, err error) {
- var response struct {
- ItemsAvailable int `json:"items_available"`
- }
- sdkParams := arvadosclient.Dict{"limit": 0}
- err = client.List(resource, sdkParams, &response)
- if err == nil {
- count = response.ItemsAvailable
- }
- return
-}
--- /dev/null
+import pkg_resources
+
+__version__ = pkg_resources.require('arvados-python-client')[0].version
import arvados.commands.keepdocker
from arvados.api import OrderedJsonModel
+from arvados._version import __version__
COMMIT_HASH_RE = re.compile(r'^[0-9a-f]{1,40}$')
def main():
copy_opts = argparse.ArgumentParser(add_help=False)
+ copy_opts.add_argument(
+ '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
copy_opts.add_argument(
'-v', '--verbose', dest='verbose', action='store_true',
help='Verbose output.')
import arvados.commands.put as arv_put
import ciso8601
+from arvados._version import __version__
+
EARLIEST_DATETIME = datetime.datetime(datetime.MINYEAR, 1, 1, 0, 0, 0)
STAT_CACHE_ERRORS = (IOError, OSError, ValueError)
'DockerImage', ['repo', 'tag', 'hash', 'created', 'vsize'])
keepdocker_parser = argparse.ArgumentParser(add_help=False)
+keepdocker_parser.add_argument(
+ '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
keepdocker_parser.add_argument(
'-f', '--force', action='store_true', default=False,
help="Re-upload the image even if it already exists on the server")
from __future__ import print_function
import argparse
+import sys
import arvados
import arvados.commands._util as arv_cmd
+from arvados._version import __version__
+
def parse_args(args):
parser = argparse.ArgumentParser(
description='List contents of a manifest',
help="""Collection UUID or locator""")
parser.add_argument('-s', action='store_true',
help="""List file sizes, in KiB.""")
+ parser.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
return parser.parse_args(args)
import copy
import logging
from apiclient import errors as apiclient_errors
+from arvados._version import __version__
import arvados.commands._util as arv_cmd
upload_opts = argparse.ArgumentParser(add_help=False)
+upload_opts.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
help="""
Local file or directory. Default: read from standard input.
import time
import subprocess
import logging
+import sys
import arvados.commands._util as arv_cmd
+from arvados._version import __version__
+
logger = logging.getLogger('arvados.arv-run')
logger.setLevel(logging.INFO)
arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
-arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
-arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-run-pipeline-instance")
-arvrun_parser.add_argument('--docker-image', type=str, help="Docker image to use, otherwise use instance default.")
-arvrun_parser.add_argument('--ignore-rcode', action="store_true", help="Commands that return non-zero return codes should not be considered failed.")
-arvrun_parser.add_argument('--no-reuse', action="store_true", help="Do not reuse past jobs.")
-arvrun_parser.add_argument('--no-wait', action="store_true", help="Do not wait and display logs after submitting command, just exit.")
-arvrun_parser.add_argument('--project-uuid', type=str, help="Parent project of the pipeline")
-arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git repository passed to arv-crunch-job when using --local")
-arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of component, default 'arvados'")
-arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of component, default 'master'")
+arvrun_parser.add_argument('--dry-run', action="store_true",
+ help="Print out the pipeline that would be submitted and exit")
+arvrun_parser.add_argument('--local', action="store_true",
+ help="Run locally using arv-run-pipeline-instance")
+arvrun_parser.add_argument('--docker-image', type=str,
+ help="Docker image to use, otherwise use instance default.")
+arvrun_parser.add_argument('--ignore-rcode', action="store_true",
+ help="Commands that return non-zero return codes should not be considered failed.")
+arvrun_parser.add_argument('--no-reuse', action="store_true",
+ help="Do not reuse past jobs.")
+arvrun_parser.add_argument('--no-wait', action="store_true",
+ help="Do not wait and display logs after submitting command, just exit.")
+arvrun_parser.add_argument('--project-uuid', type=str,
+ help="Parent project of the pipeline")
+arvrun_parser.add_argument('--git-dir', type=str, default="",
+ help="Git repository passed to arv-crunch-job when using --local")
+arvrun_parser.add_argument('--repository', type=str, default="arvados",
+ help="repository field of component, default 'arvados'")
+arvrun_parser.add_argument('--script-version', type=str, default="master",
+ help="script_version field of component, default 'master'")
+arvrun_parser.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
class ArvFile(object):
import arvados
import json
from arvados.events import subscribe
+from arvados._version import __version__
import signal
def main(arguments=None):
logger = logging.getLogger('arvados.arv-ws')
parser = argparse.ArgumentParser()
+ parser.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
parser.add_argument('-u', '--uuid', type=str, default="", help="Filter events on object_uuid")
parser.add_argument('-f', '--filters', type=str, default="", help="Arvados query filter to apply to log events (JSON encoded)")
parser.add_argument('-s', '--start-time', type=str, default="", help="Arvados query filter to fetch log events created at or after this time. This will be server time in UTC. Allowed format: YYYY-MM-DD or YYYY-MM-DD hh:mm:ss")
import arvados
import arvados.commands._util as arv_cmd
+from arvados._version import __version__
+
logger = logging.getLogger('arvados.arv-get')
def abort(msg, code=1):
parser = argparse.ArgumentParser(
description='Copy data from Keep to a local file or pipe.',
parents=[arv_cmd.retry_opt])
+parser.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
parser.add_argument('locator', type=str,
help="""
Collection locator, optionally with a file path or prefix.
import string
import sys
+import arvados
+from arvados._version import __version__
+
parser = argparse.ArgumentParser(
description='Read manifest on standard input and put normalized manifest on standard output.')
-parser.add_argument('--extract', type=str, help="The file to extract from the input manifest")
-parser.add_argument('--strip', action='store_true', help="Strip authorization tokens")
+parser.add_argument('--extract', type=str,
+ help="The file to extract from the input manifest")
+parser.add_argument('--strip', action='store_true',
+ help="Strip authorization tokens")
+parser.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
args = parser.parse_args()
-import arvados
-
r = sys.stdin.read()
cr = arvados.CollectionReader(r)
'httplib2',
'pycurl >=7.19.5.1, <7.21.5',
'python-gflags<3.0',
+ 'setuptools',
'ws4py'
],
test_suite='tests',
#!/usr/bin/env python
import arvados
+import contextlib
import errno
import hashlib
import httplib
import pycurl
import Queue
import shutil
+import sys
import tempfile
import unittest
def str_keep_locator(s):
return '{}+{}'.format(hashlib.md5(s).hexdigest(), len(s))
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+ orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+ orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+ try:
+ yield
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+
+
class FakeCurl:
@classmethod
def make(cls, code, body='', headers={}):
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import os
+import sys
+import tempfile
+import unittest
+
+import arvados.commands.arv_copy as arv_copy
+import arvados_testutil as tutil
+
+class ArvCopyTestCase(unittest.TestCase):
+ def run_copy(self, args):
+ sys.argv = ['arv-copy'] + args
+ return arv_copy.main()
+
+ def test_unsupported_arg(self):
+ with self.assertRaises(SystemExit):
+ self.run_copy(['-x=unknown'])
+
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_copy(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import os
+import sys
+import tempfile
+import unittest
+
+import arvados.commands.keepdocker as arv_keepdocker
+import arvados_testutil as tutil
+
+
+class ArvKeepdockerTestCase(unittest.TestCase):
+ def run_arv_keepdocker(self, args):
+ sys.argv = ['arv-keepdocker'] + args
+ return arv_keepdocker.main()
+
+ def test_unsupported_arg(self):
+ with self.assertRaises(SystemExit):
+ self.run_arv_keepdocker(['-x=unknown'])
+
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_arv_keepdocker(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
# -*- coding: utf-8 -*-
import io
+import os
import random
-
+import sys
import mock
+import tempfile
import arvados.errors as arv_error
import arvados.commands.ls as arv_ls
import run_test_server
-from arvados_testutil import str_keep_locator
+from arvados_testutil import str_keep_locator, redirected_streams
class ArvLsTestCase(run_test_server.TestCaseWithServers):
FAKE_UUID = 'zzzzz-4zz18-12345abcde12345'
arv_error.NotFoundError)
self.assertNotEqual(0, self.run_ls([self.FAKE_UUID], api_client))
self.assertNotEqual('', self.stderr.getvalue())
+
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_ls(['--version'], None)
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import subprocess
+import sys
+import tempfile
+import unittest
+
+
+class ArvNormalizeTestCase(unittest.TestCase):
+ def run_arv_normalize(self, args=[]):
+ p = subprocess.Popen([sys.executable, 'bin/arv-normalize'] + args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ (stdout, stderr) = p.communicate()
+ return p.returncode, stdout, stderr
+
+ def test_unsupported_arg(self):
+ returncode, out, err = self.run_arv_normalize(['-x=unknown'])
+ self.assertNotEqual(0, returncode)
+
+ def test_version_argument(self):
+ returncode, out, err = self.run_arv_normalize(['--version'])
+ self.assertEqual(0, returncode)
+ self.assertEqual('', out)
+ self.assertNotEqual('', err)
+ self.assertRegexpMatches(err, "[0-9]+\.[0-9]+\.[0-9]+")
# -*- coding: utf-8 -*-
import apiclient
+import io
import mock
import os
import pwd
delattr(self, outbuf)
super(ArvadosPutTest, self).tearDown()
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.call_main_with_args(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
def test_simple_file_put(self):
self.call_main_on_test_file()
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import os
+import sys
+import tempfile
+import unittest
+
+import arvados.commands.run as arv_run
+import arvados_testutil as tutil
+
+class ArvRunTestCase(unittest.TestCase):
+ def run_arv_run(self, args):
+ sys.argv = ['arv-run'] + args
+ return arv_run.main()
+
+ def test_unsupported_arg(self):
+ with self.assertRaises(SystemExit):
+ self.run_arv_run(['-x=unknown'])
+
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_arv_run(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
#!/usr/bin/env python
+import io
+import os
+import sys
+import tempfile
import unittest
+
import arvados.errors as arv_error
import arvados.commands.ws as arv_ws
+import arvados_testutil as tutil
class ArvWsTestCase(unittest.TestCase):
def run_ws(self, args):
def test_unsupported_arg(self):
with self.assertRaises(SystemExit):
self.run_ws(['-x=unknown'])
+
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with tutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_ws(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
dataclass: Collection
title: "Foo/bar pair"
description: "Provide a collection containing at least two files."
+
+workflow_with_input_defaults:
+ uuid: zzzzz-p5p6p-aox0k0ofxrystg2
+ owner_uuid: zzzzz-j7d0g-v955i6s2oi1cbso
+ created_at: 2014-04-14 12:35:04 -0400
+ updated_at: 2014-04-14 12:35:04 -0400
+ modified_at: 2014-04-14 12:35:04 -0400
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ name: Pipeline with default input specifications
+ components:
+ part-one:
+ script: foo
+ script_version: master
+ script_parameters:
+ ex_string:
+ required: true
+ dataclass: string
+ ex_string_def:
+ required: true
+ dataclass: string
+ default: hello-testing-123
\ No newline at end of file
inputBinding:
position: 1
outputs: []
+
+workflow_with_input_defaults:
+ uuid: zzzzz-7fd4e-validwithinput2
+ owner_uuid: zzzzz-j7d0g-zhxawtyetzwc5f0
+ name: Workflow with default input specifications
+ description: this workflow has inputs specified
+ created_at: <%= 1.minute.ago.to_s(:db) %>
+ definition: |
+ cwlVersion: v1.0
+ class: CommandLineTool
+ baseCommand:
+ - echo
+ inputs:
+ - type: string
+ id: ex_string
+ - type: string
+ id: ex_string_def
+ default: hello-testing-123
+ outputs: []
b, _ := ioutil.ReadAll(stdoutReader)
stdoutReader.Close()
stdoutChan <- b
+ close(stdoutChan)
}()
stderrChan := make(chan []byte)
b, _ := ioutil.ReadAll(stderrReader)
stderrReader.Close()
stderrChan <- b
+ close(stderrChan)
}()
// Send a tiny script on stdin to execute the crunch-run command
io.WriteString(stdinWriter, execScript(append(crunchRunCommand, container.UUID)))
stdinWriter.Close()
- err = cmd.Wait()
-
stdoutMsg := <-stdoutChan
stderrmsg := <-stderrChan
- close(stdoutChan)
- close(stderrChan)
+ err = cmd.Wait()
if err != nil {
submitErr = fmt.Errorf("Container submission failed: %v: %v (stderr: %q)", cmd.Args, err, stderrmsg)
// Mutex between squeue sync and running sbatch or scancel.
squeueUpdater.SlurmLock.Lock()
- err := scancelCmd(container).Run()
+ cmd := scancelCmd(container)
+ msg, err := cmd.CombinedOutput()
squeueUpdater.SlurmLock.Unlock()
if err != nil {
- log.Printf("Error stopping container %s with scancel: %v",
- container.UUID, err)
+ log.Printf("Error stopping container %s with %v %v: %v %v",
+ container.UUID, cmd.Path, cmd.Args, err, string(msg))
if squeueUpdater.CheckSqueue(container.UUID) {
log.Printf("Container %s is still in squeue after scancel.",
container.UUID)
return exec.Command("echo")
}
- container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
+ container := s.integrationTest(c,
+ func() *exec.Cmd { return exec.Command("echo", "zzzzz-dz642-queuedcontainer") },
[]string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
}(squeueCmd)
squeueCmd = newSqueueCmd
- // There should be no queued containers now
+ // There should be one queued container
params := arvadosclient.Dict{
"filters": [][]string{{"state", "=", "Queued"}},
}
import (
"bufio"
+ "io"
+ "io/ioutil"
"log"
"os/exec"
"sync"
log.Printf("Error creating stdout pipe for squeue: %v", err)
return
}
+
+ stderrReader, err := cmd.StderrPipe()
+ if err != nil {
+ log.Printf("Error creating stderr pipe for squeue: %v", err)
+ return
+ }
+
err = cmd.Start()
if err != nil {
log.Printf("Error running squeue: %v", err)
return
}
+
+ stderrChan := make(chan []byte)
+ go func() {
+ b, _ := ioutil.ReadAll(stderrReader)
+ stderrChan <- b
+ close(stderrChan)
+ }()
+
scanner := bufio.NewScanner(sq)
for scanner.Scan() {
newSqueueContents = append(newSqueueContents, scanner.Text())
}
- if err := scanner.Err(); err != nil {
- cmd.Wait()
- log.Printf("Error reading from squeue pipe: %v", err)
- return
- }
+ io.Copy(ioutil.Discard, sq)
+
+ stderrmsg := <-stderrChan
err = cmd.Wait()
+
+ if scanner.Err() != nil {
+ log.Printf("Error reading from squeue pipe: %v", err)
+ }
if err != nil {
- log.Printf("Error running squeue: %v", err)
- return
+ log.Printf("Error running %v %v: %v %q", cmd.Path, cmd.Args, err, string(stderrmsg))
}
- squeue.squeueCond.L.Lock()
- squeue.squeueContents = newSqueueContents
- squeue.squeueCond.Broadcast()
- squeue.squeueCond.L.Unlock()
+ if scanner.Err() == nil && err == nil {
+ squeue.squeueCond.L.Lock()
+ squeue.squeueContents = newSqueueContents
+ squeue.squeueCond.Broadcast()
+ squeue.squeueCond.L.Unlock()
+ }
}
// CheckSqueue checks if a given container UUID is in the slurm queue. This
checkErr(err)
if runner.finalState == "Queued" {
+ runner.CrunchLog.Close()
runner.UpdateContainerFinal()
return
}
// check for and/or load image
err = runner.LoadImage()
if err != nil {
+ runner.finalState = "Cancelled"
err = fmt.Errorf("While loading container image: %v", err)
return
}
// set up FUSE mount and binds
err = runner.SetupMounts()
if err != nil {
+ runner.finalState = "Cancelled"
err = fmt.Errorf("While setting up mounts: %v", err)
return
}
+++ /dev/null
-// Deals with parsing Collection responses from API Server.
-
-package collection
-
-import (
- "flag"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/sdk/go/logger"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
- "git.curoverse.com/arvados.git/sdk/go/util"
- "log"
- "os"
- "runtime/pprof"
- "time"
-)
-
-var (
- HeapProfileFilename string
-)
-
-// Collection representation
-type Collection struct {
- UUID string
- OwnerUUID string
- ReplicationLevel int
- BlockDigestToSize map[blockdigest.BlockDigest]int
- TotalSize int
-}
-
-// ReadCollections holds information about collections from API server
-type ReadCollections struct {
- ReadAllCollections bool
- UUIDToCollection map[string]Collection
- OwnerToCollectionSize map[string]int
- BlockToDesiredReplication map[blockdigest.DigestWithSize]int
- CollectionUUIDToIndex map[string]int
- CollectionIndexToUUID []string
- BlockToCollectionIndices map[blockdigest.DigestWithSize][]int
-}
-
-// GetCollectionsParams params
-type GetCollectionsParams struct {
- Client *arvadosclient.ArvadosClient
- Logger *logger.Logger
- BatchSize int
-}
-
-// SdkCollectionInfo holds collection info from api
-type SdkCollectionInfo struct {
- UUID string `json:"uuid"`
- OwnerUUID string `json:"owner_uuid"`
- ReplicationDesired int `json:"replication_desired"`
- ModifiedAt time.Time `json:"modified_at"`
- ManifestText string `json:"manifest_text"`
-}
-
-// SdkCollectionList lists collections from api
-type SdkCollectionList struct {
- ItemsAvailable int `json:"items_available"`
- Items []SdkCollectionInfo `json:"items"`
-}
-
-func init() {
- flag.StringVar(&HeapProfileFilename,
- "heap-profile",
- "",
- "File to write the heap profiles to. Leave blank to skip profiling.")
-}
-
-// WriteHeapProfile writes the heap profile to a file for later review.
-// Since a file is expected to only contain a single heap profile this
-// function overwrites the previously written profile, so it is safe
-// to call multiple times in a single run.
-// Otherwise we would see cumulative numbers as explained here:
-// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() error {
- if HeapProfileFilename != "" {
- heapProfile, err := os.Create(HeapProfileFilename)
- if err != nil {
- return err
- }
-
- defer heapProfile.Close()
-
- err = pprof.WriteHeapProfile(heapProfile)
- return err
- }
-
- return nil
-}
-
-// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections, err error) {
- results, err = GetCollections(params)
- if err != nil {
- return
- }
-
- results.Summarize(params.Logger)
-
- log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
- log.Printf("Read and processed %d collections",
- len(results.UUIDToCollection))
-
- // TODO(misha): Add a "readonly" flag. If we're in readonly mode,
- // lots of behaviors can become warnings (and obviously we can't
- // write anything).
- // if !readCollections.ReadAllCollections {
- // log.Fatalf("Did not read all collections")
- // }
-
- return
-}
-
-// GetCollections gets collections from api
-func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
- if ¶ms.Client == nil {
- err = fmt.Errorf("params.Client passed to GetCollections() should " +
- "contain a valid ArvadosClient, but instead it is nil.")
- return
- }
-
- fieldsWanted := []string{"manifest_text",
- "owner_uuid",
- "uuid",
- "replication_desired",
- "modified_at"}
-
- sdkParams := arvadosclient.Dict{
- "select": fieldsWanted,
- "order": []string{"modified_at ASC", "uuid ASC"},
- "filters": [][]string{{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
- "offset": 0}
-
- if params.BatchSize > 0 {
- sdkParams["limit"] = params.BatchSize
- }
-
- var defaultReplicationLevel int
- {
- var value interface{}
- value, err = params.Client.Discovery("defaultCollectionReplication")
- if err != nil {
- return
- }
-
- defaultReplicationLevel = int(value.(float64))
- if defaultReplicationLevel <= 0 {
- err = fmt.Errorf("Default collection replication returned by arvados SDK "+
- "should be a positive integer but instead it was %d.",
- defaultReplicationLevel)
- return
- }
- }
-
- initialNumberOfCollectionsAvailable, err :=
- util.NumberItemsAvailable(params.Client, "collections")
- if err != nil {
- return
- }
- // Include a 1% margin for collections added while we're reading so
- // that we don't have to grow the map in most cases.
- maxExpectedCollections := int(
- float64(initialNumberOfCollectionsAvailable) * 1.01)
- results.UUIDToCollection = make(map[string]Collection, maxExpectedCollections)
-
- if params.Logger != nil {
- params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := logger.GetOrCreateMap(p, "collection_info")
- collectionInfo["num_collections_at_start"] = initialNumberOfCollectionsAvailable
- collectionInfo["batch_size"] = params.BatchSize
- collectionInfo["default_replication_level"] = defaultReplicationLevel
- })
- }
-
- // These values are just for getting the loop to run the first time,
- // afterwards they'll be set to real values.
- remainingCollections := 1
- var totalCollections int
- var previousTotalCollections int
- for remainingCollections > 0 {
- // We're still finding new collections
-
- // Write the heap profile for examining memory usage
- err = WriteHeapProfile()
- if err != nil {
- return
- }
-
- // Get next batch of collections.
- var collections SdkCollectionList
- err = params.Client.List("collections", sdkParams, &collections)
- if err != nil {
- return
- }
- batchCollections := len(collections.Items)
-
- // We must always have at least one collection in the batch
- if batchCollections < 1 {
- err = fmt.Errorf("API query returned no collections for %+v", sdkParams)
- return
- }
-
- // Update count of remaining collections
- remainingCollections = collections.ItemsAvailable - sdkParams["offset"].(int) - batchCollections
-
- // Process collection and update our date filter.
- latestModificationDate, maxManifestSize, totalManifestSize, err := ProcessCollections(params.Logger,
- collections.Items,
- defaultReplicationLevel,
- results.UUIDToCollection)
- if err != nil {
- return results, err
- }
- if sdkParams["filters"].([][]string)[0][2] != latestModificationDate.Format(time.RFC3339) {
- sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
- sdkParams["offset"] = 0
- } else {
- sdkParams["offset"] = sdkParams["offset"].(int) + batchCollections
- }
-
- // update counts
- previousTotalCollections = totalCollections
- totalCollections = len(results.UUIDToCollection)
-
- log.Printf("%d collections read, %d (%d new) in last batch, "+
- "%d remaining, "+
- "%s latest modified date, %.0f %d %d avg,max,total manifest size",
- totalCollections,
- batchCollections,
- totalCollections-previousTotalCollections,
- remainingCollections,
- sdkParams["filters"].([][]string)[0][2],
- float32(totalManifestSize)/float32(totalCollections),
- maxManifestSize, totalManifestSize)
-
- if params.Logger != nil {
- params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := logger.GetOrCreateMap(p, "collection_info")
- collectionInfo["collections_read"] = totalCollections
- collectionInfo["latest_modified_date_seen"] = sdkParams["filters"].([][]string)[0][2]
- collectionInfo["total_manifest_size"] = totalManifestSize
- collectionInfo["max_manifest_size"] = maxManifestSize
- })
- }
- }
-
- // Make one final API request to verify that we have processed all collections available up to the latest modification date
- var collections SdkCollectionList
- sdkParams["filters"].([][]string)[0][1] = "<="
- sdkParams["limit"] = 0
- err = params.Client.List("collections", sdkParams, &collections)
- if err != nil {
- return
- }
- finalNumberOfCollectionsAvailable, err :=
- util.NumberItemsAvailable(params.Client, "collections")
- if err != nil {
- return
- }
- if totalCollections < finalNumberOfCollectionsAvailable {
- err = fmt.Errorf("API server indicates a total of %d collections "+
- "available up to %v, but we only retrieved %d. "+
- "Refusing to continue as this could indicate an "+
- "otherwise undetected failure.",
- finalNumberOfCollectionsAvailable,
- sdkParams["filters"].([][]string)[0][2],
- totalCollections)
- return
- }
-
- // Write the heap profile for examining memory usage
- err = WriteHeapProfile()
-
- return
-}
-
-// StrCopy returns a newly allocated string.
-// It is useful to copy slices so that the garbage collector can reuse
-// the memory of the longer strings they came from.
-func StrCopy(s string) string {
- return string([]byte(s))
-}
-
-// ProcessCollections read from api server
-func ProcessCollections(arvLogger *logger.Logger,
- receivedCollections []SdkCollectionInfo,
- defaultReplicationLevel int,
- UUIDToCollection map[string]Collection,
-) (
- latestModificationDate time.Time,
- maxManifestSize, totalManifestSize uint64,
- err error,
-) {
- for _, sdkCollection := range receivedCollections {
- collection := Collection{UUID: StrCopy(sdkCollection.UUID),
- OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
- ReplicationLevel: sdkCollection.ReplicationDesired,
- BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
-
- if sdkCollection.ModifiedAt.IsZero() {
- err = fmt.Errorf(
- "Arvados SDK collection returned with unexpected zero "+
- "modification date. This probably means that either we failed to "+
- "parse the modification date or the API server has changed how "+
- "it returns modification dates: %+v",
- collection)
- return
- }
-
- if sdkCollection.ModifiedAt.After(latestModificationDate) {
- latestModificationDate = sdkCollection.ModifiedAt
- }
-
- if collection.ReplicationLevel == 0 {
- collection.ReplicationLevel = defaultReplicationLevel
- }
-
- manifest := manifest.Manifest{Text: sdkCollection.ManifestText}
- manifestSize := uint64(len(sdkCollection.ManifestText))
-
- if _, alreadySeen := UUIDToCollection[collection.UUID]; !alreadySeen {
- totalManifestSize += manifestSize
- }
- if manifestSize > maxManifestSize {
- maxManifestSize = manifestSize
- }
-
- blockChannel := manifest.BlockIterWithDuplicates()
- for block := range blockChannel {
- if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
- log.Printf(
- "Collection %s contains multiple sizes (%d and %d) for block %s",
- collection.UUID,
- storedSize,
- block.Size,
- block.Digest)
- }
- collection.BlockDigestToSize[block.Digest] = block.Size
- }
- if manifest.Err != nil {
- err = manifest.Err
- return
- }
-
- collection.TotalSize = 0
- for _, size := range collection.BlockDigestToSize {
- collection.TotalSize += size
- }
- UUIDToCollection[collection.UUID] = collection
-
- // Clear out all the manifest strings that we don't need anymore.
- // These hopefully form the bulk of our memory usage.
- manifest.Text = ""
- sdkCollection.ManifestText = ""
- }
-
- return
-}
-
-// Summarize the collections read
-func (readCollections *ReadCollections) Summarize(arvLogger *logger.Logger) {
- readCollections.OwnerToCollectionSize = make(map[string]int)
- readCollections.BlockToDesiredReplication = make(map[blockdigest.DigestWithSize]int)
- numCollections := len(readCollections.UUIDToCollection)
- readCollections.CollectionUUIDToIndex = make(map[string]int, numCollections)
- readCollections.CollectionIndexToUUID = make([]string, 0, numCollections)
- readCollections.BlockToCollectionIndices = make(map[blockdigest.DigestWithSize][]int)
-
- for _, coll := range readCollections.UUIDToCollection {
- collectionIndex := len(readCollections.CollectionIndexToUUID)
- readCollections.CollectionIndexToUUID =
- append(readCollections.CollectionIndexToUUID, coll.UUID)
- readCollections.CollectionUUIDToIndex[coll.UUID] = collectionIndex
-
- readCollections.OwnerToCollectionSize[coll.OwnerUUID] =
- readCollections.OwnerToCollectionSize[coll.OwnerUUID] + coll.TotalSize
-
- for block, size := range coll.BlockDigestToSize {
- locator := blockdigest.DigestWithSize{Digest: block, Size: uint32(size)}
- readCollections.BlockToCollectionIndices[locator] =
- append(readCollections.BlockToCollectionIndices[locator],
- collectionIndex)
- storedReplication := readCollections.BlockToDesiredReplication[locator]
- if coll.ReplicationLevel > storedReplication {
- readCollections.BlockToDesiredReplication[locator] =
- coll.ReplicationLevel
- }
- }
- }
-
- if arvLogger != nil {
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- collectionInfo := logger.GetOrCreateMap(p, "collection_info")
- // Since maps are shallow copied, we run a risk of concurrent
- // updates here. By copying results.OwnerToCollectionSize into
- // the log, we're assuming that it won't be updated.
- collectionInfo["owner_to_collection_size"] =
- readCollections.OwnerToCollectionSize
- collectionInfo["distinct_blocks_named"] =
- len(readCollections.BlockToDesiredReplication)
- })
- }
-
- return
-}
+++ /dev/null
-package collection
-
-import (
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- . "gopkg.in/check.v1"
- "net/http"
- "net/http/httptest"
- "testing"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) {
- TestingT(t)
-}
-
-type MySuite struct{}
-
-var _ = Suite(&MySuite{})
-
-// This captures the result we expect from
-// ReadCollections.Summarize(). Because CollectionUUIDToIndex is
-// indeterminate, we replace BlockToCollectionIndices with
-// BlockToCollectionUuids.
-type ExpectedSummary struct {
- OwnerToCollectionSize map[string]int
- BlockToDesiredReplication map[blockdigest.DigestWithSize]int
- BlockToCollectionUuids map[blockdigest.DigestWithSize][]string
-}
-
-func CompareSummarizedReadCollections(c *C,
- summarized ReadCollections,
- expected ExpectedSummary) {
-
- c.Assert(summarized.OwnerToCollectionSize, DeepEquals,
- expected.OwnerToCollectionSize)
-
- c.Assert(summarized.BlockToDesiredReplication, DeepEquals,
- expected.BlockToDesiredReplication)
-
- summarizedBlockToCollectionUuids :=
- make(map[blockdigest.DigestWithSize]map[string]struct{})
- for digest, indices := range summarized.BlockToCollectionIndices {
- uuidSet := make(map[string]struct{})
- summarizedBlockToCollectionUuids[digest] = uuidSet
- for _, index := range indices {
- uuidSet[summarized.CollectionIndexToUUID[index]] = struct{}{}
- }
- }
-
- expectedBlockToCollectionUuids :=
- make(map[blockdigest.DigestWithSize]map[string]struct{})
- for digest, uuidSlice := range expected.BlockToCollectionUuids {
- uuidSet := make(map[string]struct{})
- expectedBlockToCollectionUuids[digest] = uuidSet
- for _, uuid := range uuidSlice {
- uuidSet[uuid] = struct{}{}
- }
- }
-
- c.Assert(summarizedBlockToCollectionUuids, DeepEquals,
- expectedBlockToCollectionUuids)
-}
-
-func (s *MySuite) TestSummarizeSimple(checker *C) {
- rc := MakeTestReadCollections([]TestCollectionSpec{{
- ReplicationLevel: 5,
- Blocks: []int{1, 2},
- }})
-
- rc.Summarize(nil)
-
- c := rc.UUIDToCollection["col0"]
-
- blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
- blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
-
- expected := ExpectedSummary{
- OwnerToCollectionSize: map[string]int{c.OwnerUUID: c.TotalSize},
- BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{blockDigest1: 5, blockDigest2: 5},
- BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{blockDigest1: {c.UUID}, blockDigest2: {c.UUID}},
- }
-
- CompareSummarizedReadCollections(checker, rc, expected)
-}
-
-func (s *MySuite) TestSummarizeOverlapping(checker *C) {
- rc := MakeTestReadCollections([]TestCollectionSpec{
- {
- ReplicationLevel: 5,
- Blocks: []int{1, 2},
- },
- {
- ReplicationLevel: 8,
- Blocks: []int{2, 3},
- },
- })
-
- rc.Summarize(nil)
-
- c0 := rc.UUIDToCollection["col0"]
- c1 := rc.UUIDToCollection["col1"]
-
- blockDigest1 := blockdigest.MakeTestDigestWithSize(1)
- blockDigest2 := blockdigest.MakeTestDigestWithSize(2)
- blockDigest3 := blockdigest.MakeTestDigestWithSize(3)
-
- expected := ExpectedSummary{
- OwnerToCollectionSize: map[string]int{
- c0.OwnerUUID: c0.TotalSize,
- c1.OwnerUUID: c1.TotalSize,
- },
- BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{
- blockDigest1: 5,
- blockDigest2: 8,
- blockDigest3: 8,
- },
- BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{
- blockDigest1: {c0.UUID},
- blockDigest2: {c0.UUID, c1.UUID},
- blockDigest3: {c1.UUID},
- },
- }
-
- CompareSummarizedReadCollections(checker, rc, expected)
-}
-
-type APITestData struct {
- // path and response map
- responses map[string]arvadostest.StubResponse
-
- // expected error, if any
- expectedError string
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_DiscoveryError(c *C) {
- testGetCollectionsAndSummarize(c,
- APITestData{
- responses: make(map[string]arvadostest.StubResponse),
- expectedError: "arvados API server error: 500.*",
- })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_ApiErrorGetCollections(c *C) {
- respMap := make(map[string]arvadostest.StubResponse)
- respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
- respMap["/arvados/v1/collections"] = arvadostest.StubResponse{-1, ``}
-
- testGetCollectionsAndSummarize(c,
- APITestData{
- responses: respMap,
- expectedError: "arvados API server error: 302.*",
- })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadStreamName(c *C) {
- respMap := make(map[string]arvadostest.StubResponse)
- respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
- respMap["/arvados/v1/collections"] = arvadostest.StubResponse{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"badstreamname"}]}`}
-
- testGetCollectionsAndSummarize(c,
- APITestData{
- responses: respMap,
- expectedError: "Invalid stream name: badstreamname",
- })
-}
-
-func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadFileToken(c *C) {
- respMap := make(map[string]arvadostest.StubResponse)
- respMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StubResponse{200, `{"defaultCollectionReplication":2}`}
- respMap["/arvados/v1/collections"] = arvadostest.StubResponse{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"./goodstream acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt file2.txt"}]}`}
-
- testGetCollectionsAndSummarize(c,
- APITestData{
- responses: respMap,
- expectedError: "Invalid file token: file2.txt",
- })
-}
-
-func testGetCollectionsAndSummarize(c *C, testData APITestData) {
- apiStub := arvadostest.ServerStub{testData.responses}
-
- api := httptest.NewServer(&apiStub)
- defer api.Close()
-
- arv := &arvadosclient.ArvadosClient{
- Scheme: "http",
- ApiServer: api.URL[7:],
- ApiToken: "abc123",
- Client: &http.Client{Transport: &http.Transport{}},
- }
-
- // GetCollectionsAndSummarize
- _, err := GetCollectionsAndSummarize(GetCollectionsParams{arv, nil, 10})
-
- if testData.expectedError == "" {
- c.Assert(err, IsNil)
- } else {
- c.Assert(err, ErrorMatches, testData.expectedError)
- }
-}
+++ /dev/null
-// Code used for testing only.
-
-package collection
-
-import (
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-)
-
-// TestCollectionSpec with test blocks and desired replication level
-type TestCollectionSpec struct {
- // The desired replication level
- ReplicationLevel int
- // Blocks this contains, represented by ints. Ints repeated will
- // still only represent one block
- Blocks []int
-}
-
-// MakeTestReadCollections creates a ReadCollections object for testing
-// based on the give specs. Only the ReadAllCollections and UUIDToCollection
-// fields are populated. To populate other fields call rc.Summarize().
-func MakeTestReadCollections(specs []TestCollectionSpec) (rc ReadCollections) {
- rc = ReadCollections{
- ReadAllCollections: true,
- UUIDToCollection: map[string]Collection{},
- }
-
- for i, spec := range specs {
- c := Collection{
- UUID: fmt.Sprintf("col%d", i),
- OwnerUUID: fmt.Sprintf("owner%d", i),
- ReplicationLevel: spec.ReplicationLevel,
- BlockDigestToSize: map[blockdigest.BlockDigest]int{},
- }
- rc.UUIDToCollection[c.UUID] = c
- for _, j := range spec.Blocks {
- c.BlockDigestToSize[blockdigest.MakeTestBlockDigest(j)] = j
- }
- // We compute the size in a separate loop because the value
- // computed in the above loop would be invalid if c.Blocks
- // contained duplicates.
- for _, size := range c.BlockDigestToSize {
- c.TotalSize += size
- }
- }
- return
-}
-
-// CollectionIndicesForTesting returns a slice giving the collection
-// index of each collection that was passed in to MakeTestReadCollections.
-// rc.Summarize() must be called before this method, since Summarize()
-// assigns an index to each collection.
-func (rc ReadCollections) CollectionIndicesForTesting() (indices []int) {
- // TODO(misha): Assert that rc.Summarize() has been called.
- numCollections := len(rc.CollectionIndexToUUID)
- indices = make([]int, numCollections)
- for i := 0; i < numCollections; i++ {
- indices[i] = rc.CollectionUUIDToIndex[fmt.Sprintf("col%d", i)]
- }
- return
-}
+++ /dev/null
-/* Keep Datamanager. Responsible for checking on and reporting on Keep Storage */
-
-package main
-
-import (
- "errors"
- "flag"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/logger"
- "git.curoverse.com/arvados.git/sdk/go/util"
- "git.curoverse.com/arvados.git/services/datamanager/collection"
- "git.curoverse.com/arvados.git/services/datamanager/keep"
- "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
- "git.curoverse.com/arvados.git/services/datamanager/summary"
- "log"
- "time"
-)
-
-var (
- logEventTypePrefix string
- logFrequencySeconds int
- minutesBetweenRuns int
- collectionBatchSize int
- dryRun bool
-)
-
-func init() {
- flag.StringVar(&logEventTypePrefix,
- "log-event-type-prefix",
- "experimental-data-manager",
- "Prefix to use in the event_type of our arvados log entries. Set to empty to turn off logging")
- flag.IntVar(&logFrequencySeconds,
- "log-frequency-seconds",
- 20,
- "How frequently we'll write log entries in seconds.")
- flag.IntVar(&minutesBetweenRuns,
- "minutes-between-runs",
- 0,
- "How many minutes we wait between data manager runs. 0 means run once and exit.")
- flag.IntVar(&collectionBatchSize,
- "collection-batch-size",
- 1000,
- "How many collections to request in each batch.")
- flag.BoolVar(&dryRun,
- "dry-run",
- false,
- "Perform a dry run. Log how many blocks would be deleted/moved, but do not issue any changes to keepstore.")
-}
-
-func main() {
- flag.Parse()
-
- if minutesBetweenRuns == 0 {
- arv, err := arvadosclient.MakeArvadosClient()
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
- }
- err = singlerun(arv)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("singlerun: %v", err))
- }
- } else {
- waitTime := time.Minute * time.Duration(minutesBetweenRuns)
- for {
- log.Println("Beginning Run")
- arv, err := arvadosclient.MakeArvadosClient()
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error making arvados client: %v", err))
- }
- err = singlerun(arv)
- if err != nil {
- log.Printf("singlerun: %v", err)
- }
- log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
- time.Sleep(waitTime)
- }
- }
-}
-
-var arvLogger *logger.Logger
-
-func singlerun(arv *arvadosclient.ArvadosClient) error {
- var err error
- if isAdmin, err := util.UserIsAdmin(arv); err != nil {
- return errors.New("Error verifying admin token: " + err.Error())
- } else if !isAdmin {
- return errors.New("Current user is not an admin. Datamanager requires a privileged token.")
- }
-
- if logEventTypePrefix != "" {
- arvLogger, err = logger.NewLogger(logger.LoggerParams{
- Client: arv,
- EventTypePrefix: logEventTypePrefix,
- WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
- }
-
- loggerutil.LogRunInfo(arvLogger)
- if arvLogger != nil {
- arvLogger.AddWriteHook(loggerutil.LogMemoryAlloc)
- }
-
- var (
- dataFetcher summary.DataFetcher
- readCollections collection.ReadCollections
- keepServerInfo keep.ReadServers
- )
-
- if summary.ShouldReadData() {
- dataFetcher = summary.ReadData
- } else {
- dataFetcher = BuildDataFetcher(arv)
- }
-
- err = dataFetcher(arvLogger, &readCollections, &keepServerInfo)
- if err != nil {
- return err
- }
-
- err = summary.MaybeWriteData(arvLogger, readCollections, keepServerInfo)
- if err != nil {
- return err
- }
-
- buckets := summary.BucketReplication(readCollections, keepServerInfo)
- bucketCounts := buckets.Counts()
-
- replicationSummary := buckets.SummarizeBuckets(readCollections)
- replicationCounts := replicationSummary.ComputeCounts()
-
- log.Printf("Blocks In Collections: %d, "+
- "\nBlocks In Keep: %d.",
- len(readCollections.BlockToDesiredReplication),
- len(keepServerInfo.BlockToServers))
- log.Println(replicationCounts.PrettyPrint())
-
- log.Printf("Blocks Histogram:")
- for _, rlbss := range bucketCounts {
- log.Printf("%+v: %10d",
- rlbss.Levels,
- rlbss.Count)
- }
-
- kc, err := keepclient.MakeKeepClient(arv)
- if err != nil {
- return fmt.Errorf("Error setting up keep client %v", err.Error())
- }
-
- // Log that we're finished. We force the recording, since go will
- // not wait for the write timer before exiting.
- if arvLogger != nil {
- defer arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
- summaryInfo := logger.GetOrCreateMap(p, "summary_info")
- summaryInfo["block_replication_counts"] = bucketCounts
- summaryInfo["replication_summary"] = replicationCounts
- p["summary_info"] = summaryInfo
-
- p["run_info"].(map[string]interface{})["finished_at"] = time.Now()
- })
- }
-
- pullServers := summary.ComputePullServers(kc,
- &keepServerInfo,
- readCollections.BlockToDesiredReplication,
- replicationSummary.UnderReplicatedBlocks)
-
- pullLists := summary.BuildPullLists(pullServers)
-
- trashLists, trashErr := summary.BuildTrashLists(kc,
- &keepServerInfo,
- replicationSummary.KeepBlocksNotInCollections)
-
- err = summary.WritePullLists(arvLogger, pullLists, dryRun)
- if err != nil {
- return err
- }
-
- if trashErr != nil {
- return err
- }
- keep.SendTrashLists(arvLogger, kc, trashLists, dryRun)
-
- return nil
-}
-
-// BuildDataFetcher returns a data fetcher that fetches data from remote servers.
-func BuildDataFetcher(arv *arvadosclient.ArvadosClient) summary.DataFetcher {
- return func(
- arvLogger *logger.Logger,
- readCollections *collection.ReadCollections,
- keepServerInfo *keep.ReadServers,
- ) error {
- collDone := make(chan struct{})
- var collErr error
- go func() {
- *readCollections, collErr = collection.GetCollectionsAndSummarize(
- collection.GetCollectionsParams{
- Client: arv,
- Logger: arvLogger,
- BatchSize: collectionBatchSize})
- collDone <- struct{}{}
- }()
-
- var keepErr error
- *keepServerInfo, keepErr = keep.GetKeepServersAndSummarize(
- keep.GetKeepServersParams{
- Client: arv,
- Logger: arvLogger,
- Limit: 1000})
-
- <-collDone
-
- // Return a nil error only if both parts succeeded.
- if collErr != nil {
- return collErr
- }
- return keepErr
- }
-}
+++ /dev/null
-package main
-
-import (
- "encoding/json"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/services/datamanager/collection"
- "git.curoverse.com/arvados.git/services/datamanager/summary"
- "io/ioutil"
- "net/http"
- "os"
- "os/exec"
- "path"
- "regexp"
- "strings"
- "testing"
- "time"
-)
-
-var arv *arvadosclient.ArvadosClient
-var keepClient *keepclient.KeepClient
-var keepServers []string
-
-func SetupDataManagerTest(t *testing.T) {
- os.Setenv("ARVADOS_API_HOST_INSECURE", "true")
-
- // start api and keep servers
- arvadostest.ResetEnv()
- arvadostest.StartAPI()
- arvadostest.StartKeep(2, false)
-
- var err error
- arv, err = arvadosclient.MakeArvadosClient()
- if err != nil {
- t.Fatalf("Error making arvados client: %s", err)
- }
- arv.ApiToken = arvadostest.DataManagerToken
-
- // keep client
- keepClient = &keepclient.KeepClient{
- Arvados: arv,
- Want_replicas: 2,
- Client: &http.Client{},
- }
-
- // discover keep services
- if err = keepClient.DiscoverKeepServers(); err != nil {
- t.Fatalf("Error discovering keep services: %s", err)
- }
- keepServers = []string{}
- for _, host := range keepClient.LocalRoots() {
- keepServers = append(keepServers, host)
- }
-}
-
-func TearDownDataManagerTest(t *testing.T) {
- arvadostest.StopKeep(2)
- arvadostest.StopAPI()
- summary.WriteDataTo = ""
- collection.HeapProfileFilename = ""
-}
-
-func putBlock(t *testing.T, data string) string {
- locator, _, err := keepClient.PutB([]byte(data))
- if err != nil {
- t.Fatalf("Error putting test data for %s %s %v", data, locator, err)
- }
- if locator == "" {
- t.Fatalf("No locator found after putting test data")
- }
-
- splits := strings.Split(locator, "+")
- return splits[0] + "+" + splits[1]
-}
-
-func getBlock(t *testing.T, locator string, data string) {
- reader, blocklen, _, err := keepClient.Get(locator)
- if err != nil {
- t.Fatalf("Error getting test data in setup for %s %s %v", data, locator, err)
- }
- if reader == nil {
- t.Fatalf("No reader found after putting test data")
- }
- if blocklen != int64(len(data)) {
- t.Fatalf("blocklen %d did not match data len %d", blocklen, len(data))
- }
-
- all, err := ioutil.ReadAll(reader)
- if string(all) != data {
- t.Fatalf("Data read %s did not match expected data %s", string(all), data)
- }
-}
-
-// Create a collection using arv-put
-func createCollection(t *testing.T, data string) string {
- tempfile, err := ioutil.TempFile(os.TempDir(), "temp-test-file")
- defer os.Remove(tempfile.Name())
-
- _, err = tempfile.Write([]byte(data))
- if err != nil {
- t.Fatalf("Error writing to tempfile %v", err)
- }
-
- // arv-put
- output, err := exec.Command("arv-put", "--use-filename", "test.txt", tempfile.Name()).Output()
- if err != nil {
- t.Fatalf("Error running arv-put %s", err)
- }
-
- uuid := string(output[0:27]) // trim terminating char
- return uuid
-}
-
-// Get collection locator
-var locatorMatcher = regexp.MustCompile(`^([0-9a-f]{32})\+(\d*)(.*)$`)
-
-func getFirstLocatorFromCollection(t *testing.T, uuid string) string {
- manifest := getCollection(t, uuid)["manifest_text"].(string)
-
- locator := strings.Split(manifest, " ")[1]
- match := locatorMatcher.FindStringSubmatch(locator)
- if match == nil {
- t.Fatalf("No locator found in collection manifest %s", manifest)
- }
-
- return match[1] + "+" + match[2]
-}
-
-func switchToken(t string) func() {
- orig := arv.ApiToken
- restore := func() {
- arv.ApiToken = orig
- }
- arv.ApiToken = t
- return restore
-}
-
-func getCollection(t *testing.T, uuid string) Dict {
- defer switchToken(arvadostest.AdminToken)()
-
- getback := make(Dict)
- err := arv.Get("collections", uuid, nil, &getback)
- if err != nil {
- t.Fatalf("Error getting collection %s", err)
- }
- if getback["uuid"] != uuid {
- t.Fatalf("Get collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
- }
-
- return getback
-}
-
-func updateCollection(t *testing.T, uuid string, paramName string, paramValue string) {
- defer switchToken(arvadostest.AdminToken)()
-
- err := arv.Update("collections", uuid, arvadosclient.Dict{
- "collection": arvadosclient.Dict{
- paramName: paramValue,
- },
- }, &arvadosclient.Dict{})
-
- if err != nil {
- t.Fatalf("Error updating collection %s", err)
- }
-}
-
-type Dict map[string]interface{}
-
-func deleteCollection(t *testing.T, uuid string) {
- defer switchToken(arvadostest.AdminToken)()
-
- getback := make(Dict)
- err := arv.Delete("collections", uuid, nil, &getback)
- if err != nil {
- t.Fatalf("Error deleting collection %s", err)
- }
- if getback["uuid"] != uuid {
- t.Fatalf("Delete collection uuid did not match original: $s, result: $s", uuid, getback["uuid"])
- }
-}
-
-func dataManagerSingleRun(t *testing.T) {
- err := singlerun(arv)
- if err != nil {
- t.Fatalf("Error during singlerun %s", err)
- }
-}
-
-func getBlockIndexesForServer(t *testing.T, i int) []string {
- var indexes []string
-
- path := keepServers[i] + "/index"
- client := http.Client{}
- req, err := http.NewRequest("GET", path, nil)
- req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
- req.Header.Add("Content-Type", "application/octet-stream")
- resp, err := client.Do(req)
- defer resp.Body.Close()
-
- if err != nil {
- t.Fatalf("Error during %s %s", path, err)
- }
-
- body, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- t.Fatalf("Error reading response from %s %s", path, err)
- }
-
- lines := strings.Split(string(body), "\n")
- for _, line := range lines {
- indexes = append(indexes, strings.Split(line, " ")...)
- }
-
- return indexes
-}
-
-func getBlockIndexes(t *testing.T) [][]string {
- var indexes [][]string
-
- for i := 0; i < len(keepServers); i++ {
- indexes = append(indexes, getBlockIndexesForServer(t, i))
- }
- return indexes
-}
-
-func verifyBlocks(t *testing.T, notExpected []string, expected []string, minReplication int) {
- blocks := getBlockIndexes(t)
-
- for _, block := range notExpected {
- for _, idx := range blocks {
- if valueInArray(block, idx) {
- t.Fatalf("Found unexpected block %s", block)
- }
- }
- }
-
- for _, block := range expected {
- nFound := 0
- for _, idx := range blocks {
- if valueInArray(block, idx) {
- nFound++
- }
- }
- if nFound < minReplication {
- t.Fatalf("Found %d replicas of block %s, expected >= %d", nFound, block, minReplication)
- }
- }
-}
-
-func valueInArray(value string, list []string) bool {
- for _, v := range list {
- if value == v {
- return true
- }
- }
- return false
-}
-
-// Test env uses two keep volumes. The volume names can be found by reading the files
-// ARVADOS_HOME/tmp/keep0.volume and ARVADOS_HOME/tmp/keep1.volume
-//
-// The keep volumes are of the dir structure: volumeN/subdir/locator
-func backdateBlocks(t *testing.T, oldUnusedBlockLocators []string) {
- // First get rid of any size hints in the locators
- var trimmedBlockLocators []string
- for _, block := range oldUnusedBlockLocators {
- trimmedBlockLocators = append(trimmedBlockLocators, strings.Split(block, "+")[0])
- }
-
- // Get the working dir so that we can read keep{n}.volume files
- wd, err := os.Getwd()
- if err != nil {
- t.Fatalf("Error getting working dir %s", err)
- }
-
- // Now cycle through the two keep volumes
- oldTime := time.Now().AddDate(0, -2, 0)
- for i := 0; i < 2; i++ {
- filename := fmt.Sprintf("%s/../../tmp/keep%d.volume", wd, i)
- volumeDir, err := ioutil.ReadFile(filename)
- if err != nil {
- t.Fatalf("Error reading keep volume file %s %s", filename, err)
- }
-
- // Read the keep volume dir structure
- volumeContents, err := ioutil.ReadDir(string(volumeDir))
- if err != nil {
- t.Fatalf("Error reading keep dir %s %s", string(volumeDir), err)
- }
-
- // Read each subdir for each of the keep volume dir
- for _, subdir := range volumeContents {
- subdirName := fmt.Sprintf("%s/%s", volumeDir, subdir.Name())
- subdirContents, err := ioutil.ReadDir(string(subdirName))
- if err != nil {
- t.Fatalf("Error reading keep dir %s %s", string(subdirName), err)
- }
-
- // Now we got to the files. The files are names are the block locators
- for _, fileInfo := range subdirContents {
- blockName := fileInfo.Name()
- myname := fmt.Sprintf("%s/%s", subdirName, blockName)
- if valueInArray(blockName, trimmedBlockLocators) {
- err = os.Chtimes(myname, oldTime, oldTime)
- }
- }
- }
- }
-}
-
-func getStatus(t *testing.T, path string) interface{} {
- client := http.Client{}
- req, err := http.NewRequest("GET", path, nil)
- req.Header.Add("Authorization", "OAuth2 "+arvadostest.DataManagerToken)
- req.Header.Add("Content-Type", "application/octet-stream")
- resp, err := client.Do(req)
- if err != nil {
- t.Fatalf("Error during %s %s", path, err)
- }
- defer resp.Body.Close()
-
- var s interface{}
- json.NewDecoder(resp.Body).Decode(&s)
-
- return s
-}
-
-// Wait until PullQueue and TrashQueue are empty on all keepServers.
-func waitUntilQueuesFinishWork(t *testing.T) {
- for _, ks := range keepServers {
- for done := false; !done; {
- time.Sleep(100 * time.Millisecond)
- s := getStatus(t, ks+"/status.json")
- for _, qName := range []string{"PullQueue", "TrashQueue"} {
- qStatus := s.(map[string]interface{})[qName].(map[string]interface{})
- if qStatus["Queued"].(float64)+qStatus["InProgress"].(float64) == 0 {
- done = true
- }
- }
- }
- }
-}
-
-// Create some blocks and backdate some of them.
-// Also create some collections and delete some of them.
-// Verify block indexes.
-func TestPutAndGetBlocks(t *testing.T) {
- defer TearDownDataManagerTest(t)
- SetupDataManagerTest(t)
-
- // Put some blocks which will be backdated later on
- // The first one will also be used in a collection and hence should not be deleted when datamanager runs.
- // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
- var oldUnusedBlockLocators []string
- oldUnusedBlockData := "this block will have older mtime"
- for i := 0; i < 5; i++ {
- oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
- }
- for i := 0; i < 5; i++ {
- getBlock(t, oldUnusedBlockLocators[i], fmt.Sprintf("%s%d", oldUnusedBlockData, i))
- }
-
- // The rest will be old and unreferenced and hence should be deleted when datamanager runs.
- oldUsedBlockData := "this collection block will have older mtime"
- oldUsedBlockLocator := putBlock(t, oldUsedBlockData)
- getBlock(t, oldUsedBlockLocator, oldUsedBlockData)
-
- // Put some more blocks which will not be backdated; hence they are still new, but not in any collection.
- // Hence, even though unreferenced, these should not be deleted when datamanager runs.
- var newBlockLocators []string
- newBlockData := "this block is newer"
- for i := 0; i < 5; i++ {
- newBlockLocators = append(newBlockLocators, putBlock(t, fmt.Sprintf("%s%d", newBlockData, i)))
- }
- for i := 0; i < 5; i++ {
- getBlock(t, newBlockLocators[i], fmt.Sprintf("%s%d", newBlockData, i))
- }
-
- // Create a collection that would be deleted later on
- toBeDeletedCollectionUUID := createCollection(t, "some data for collection creation")
- toBeDeletedCollectionLocator := getFirstLocatorFromCollection(t, toBeDeletedCollectionUUID)
-
- // Create another collection that has the same data as the one of the old blocks
- oldUsedBlockCollectionUUID := createCollection(t, oldUsedBlockData)
- oldUsedBlockCollectionLocator := getFirstLocatorFromCollection(t, oldUsedBlockCollectionUUID)
- if oldUsedBlockCollectionLocator != oldUsedBlockLocator {
- t.Fatalf("Locator of the collection with the same data as old block is different %s", oldUsedBlockCollectionLocator)
- }
-
- // Create another collection whose replication level will be changed
- replicationCollectionUUID := createCollection(t, "replication level on this collection will be reduced")
- replicationCollectionLocator := getFirstLocatorFromCollection(t, replicationCollectionUUID)
-
- // Create two collections with same data; one will be deleted later on
- dataForTwoCollections := "one of these collections will be deleted"
- oneOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
- oneOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, oneOfTwoWithSameDataUUID)
- secondOfTwoWithSameDataUUID := createCollection(t, dataForTwoCollections)
- secondOfTwoWithSameDataLocator := getFirstLocatorFromCollection(t, secondOfTwoWithSameDataUUID)
- if oneOfTwoWithSameDataLocator != secondOfTwoWithSameDataLocator {
- t.Fatalf("Locators for both these collections expected to be same: %s %s", oneOfTwoWithSameDataLocator, secondOfTwoWithSameDataLocator)
- }
-
- // create collection with empty manifest text
- emptyBlockLocator := putBlock(t, "")
- emptyCollection := createCollection(t, "")
-
- // Verify blocks before doing any backdating / deleting.
- var expected []string
- expected = append(expected, oldUnusedBlockLocators...)
- expected = append(expected, newBlockLocators...)
- expected = append(expected, toBeDeletedCollectionLocator)
- expected = append(expected, replicationCollectionLocator)
- expected = append(expected, oneOfTwoWithSameDataLocator)
- expected = append(expected, secondOfTwoWithSameDataLocator)
- expected = append(expected, emptyBlockLocator)
-
- verifyBlocks(t, nil, expected, 2)
-
- // Run datamanager in singlerun mode
- dataManagerSingleRun(t)
- waitUntilQueuesFinishWork(t)
-
- verifyBlocks(t, nil, expected, 2)
-
- // Backdate the to-be old blocks and delete the collections
- backdateBlocks(t, oldUnusedBlockLocators)
- deleteCollection(t, toBeDeletedCollectionUUID)
- deleteCollection(t, secondOfTwoWithSameDataUUID)
- backdateBlocks(t, []string{emptyBlockLocator})
- deleteCollection(t, emptyCollection)
-
- // Run data manager again
- dataManagerSingleRun(t)
- waitUntilQueuesFinishWork(t)
-
- // Get block indexes and verify that all backdated blocks except the first one used in collection are not included.
- expected = expected[:0]
- expected = append(expected, oldUsedBlockLocator)
- expected = append(expected, newBlockLocators...)
- expected = append(expected, toBeDeletedCollectionLocator)
- expected = append(expected, oneOfTwoWithSameDataLocator)
- expected = append(expected, secondOfTwoWithSameDataLocator)
- expected = append(expected, emptyBlockLocator) // even when unreferenced, this remains
-
- verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
-
- // Reduce desired replication on replicationCollectionUUID
- // collection, and verify that Data Manager does not reduce
- // actual replication any further than that. (It might not
- // reduce actual replication at all; that's OK for this test.)
-
- // Reduce desired replication level.
- updateCollection(t, replicationCollectionUUID, "replication_desired", "1")
- collection := getCollection(t, replicationCollectionUUID)
- if collection["replication_desired"].(interface{}) != float64(1) {
- t.Fatalf("After update replication_desired is not 1; instead it is %v", collection["replication_desired"])
- }
-
- // Verify data is currently overreplicated.
- verifyBlocks(t, nil, []string{replicationCollectionLocator}, 2)
-
- // Run data manager again
- dataManagerSingleRun(t)
- waitUntilQueuesFinishWork(t)
-
- // Verify data is not underreplicated.
- verifyBlocks(t, nil, []string{replicationCollectionLocator}, 1)
-
- // Verify *other* collections' data is not underreplicated.
- verifyBlocks(t, oldUnusedBlockLocators, expected, 2)
-}
-
-func TestDatamanagerSingleRunRepeatedly(t *testing.T) {
- defer TearDownDataManagerTest(t)
- SetupDataManagerTest(t)
-
- for i := 0; i < 10; i++ {
- err := singlerun(arv)
- if err != nil {
- t.Fatalf("Got an error during datamanager singlerun: %v", err)
- }
- }
-}
-
-func TestGetStatusRepeatedly(t *testing.T) {
- defer TearDownDataManagerTest(t)
- SetupDataManagerTest(t)
-
- for i := 0; i < 10; i++ {
- for j := 0; j < 2; j++ {
- s := getStatus(t, keepServers[j]+"/status.json")
-
- var pullQueueStatus interface{}
- pullQueueStatus = s.(map[string]interface{})["PullQueue"]
- var trashQueueStatus interface{}
- trashQueueStatus = s.(map[string]interface{})["TrashQueue"]
-
- if pullQueueStatus.(map[string]interface{})["Queued"] == nil ||
- pullQueueStatus.(map[string]interface{})["InProgress"] == nil ||
- trashQueueStatus.(map[string]interface{})["Queued"] == nil ||
- trashQueueStatus.(map[string]interface{})["InProgress"] == nil {
- t.Fatalf("PullQueue and TrashQueue status not found")
- }
-
- time.Sleep(100 * time.Millisecond)
- }
- }
-}
-
-func TestRunDatamanagerWithBogusServer(t *testing.T) {
- defer TearDownDataManagerTest(t)
- SetupDataManagerTest(t)
-
- arv.ApiServer = "bogus-server"
-
- err := singlerun(arv)
- if err == nil {
- t.Fatalf("Expected error during singlerun with bogus server")
- }
-}
-
-func TestRunDatamanagerAsNonAdminUser(t *testing.T) {
- defer TearDownDataManagerTest(t)
- SetupDataManagerTest(t)
-
- arv.ApiToken = arvadostest.ActiveToken
-
- err := singlerun(arv)
- if err == nil {
- t.Fatalf("Expected error during singlerun as non-admin user")
- }
-}
-
-func TestPutAndGetBlocks_NoErrorDuringSingleRun(t *testing.T) {
- testOldBlocksNotDeletedOnDataManagerError(t, "", "", false, false)
-}
-
-func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadWriteTo(t *testing.T) {
- badpath, err := arvadostest.CreateBadPath()
- if err != nil {
- t.Fatalf(err.Error())
- }
- defer func() {
- err = arvadostest.DestroyBadPath(badpath)
- if err != nil {
- t.Fatalf(err.Error())
- }
- }()
- testOldBlocksNotDeletedOnDataManagerError(t, path.Join(badpath, "writetofile"), "", true, true)
-}
-
-func TestPutAndGetBlocks_ErrorDuringGetCollectionsBadHeapProfileFilename(t *testing.T) {
- badpath, err := arvadostest.CreateBadPath()
- if err != nil {
- t.Fatalf(err.Error())
- }
- defer func() {
- err = arvadostest.DestroyBadPath(badpath)
- if err != nil {
- t.Fatalf(err.Error())
- }
- }()
- testOldBlocksNotDeletedOnDataManagerError(t, "", path.Join(badpath, "heapprofilefile"), true, true)
-}
-
-// Create some blocks and backdate some of them.
-// Run datamanager while producing an error condition.
-// Verify that the blocks are hence not deleted.
-func testOldBlocksNotDeletedOnDataManagerError(t *testing.T, writeDataTo string, heapProfileFile string, expectError bool, expectOldBlocks bool) {
- defer TearDownDataManagerTest(t)
- SetupDataManagerTest(t)
-
- // Put some blocks and backdate them.
- var oldUnusedBlockLocators []string
- oldUnusedBlockData := "this block will have older mtime"
- for i := 0; i < 5; i++ {
- oldUnusedBlockLocators = append(oldUnusedBlockLocators, putBlock(t, fmt.Sprintf("%s%d", oldUnusedBlockData, i)))
- }
- backdateBlocks(t, oldUnusedBlockLocators)
-
- // Run data manager
- summary.WriteDataTo = writeDataTo
- collection.HeapProfileFilename = heapProfileFile
-
- err := singlerun(arv)
- if !expectError {
- if err != nil {
- t.Fatalf("Got an error during datamanager singlerun: %v", err)
- }
- } else {
- if err == nil {
- t.Fatalf("Expected error during datamanager singlerun")
- }
- }
- waitUntilQueuesFinishWork(t)
-
- // Get block indexes and verify that all backdated blocks are not/deleted as expected
- if expectOldBlocks {
- verifyBlocks(t, nil, oldUnusedBlockLocators, 2)
- } else {
- verifyBlocks(t, oldUnusedBlockLocators, nil, 2)
- }
-}
-
-// Create a collection with multiple streams and blocks
-func createMultiStreamBlockCollection(t *testing.T, data string, numStreams, numBlocks int) (string, []string) {
- defer switchToken(arvadostest.AdminToken)()
-
- manifest := ""
- locators := make(map[string]bool)
- for s := 0; s < numStreams; s++ {
- manifest += fmt.Sprintf("./stream%d ", s)
- for b := 0; b < numBlocks; b++ {
- locator, _, err := keepClient.PutB([]byte(fmt.Sprintf("%s in stream %d and block %d", data, s, b)))
- if err != nil {
- t.Fatalf("Error creating block %d in stream %d: %v", b, s, err)
- }
- locators[strings.Split(locator, "+A")[0]] = true
- manifest += locator + " "
- }
- manifest += "0:1:dummyfile.txt\n"
- }
-
- collection := make(Dict)
- err := arv.Create("collections",
- arvadosclient.Dict{"collection": arvadosclient.Dict{"manifest_text": manifest}},
- &collection)
-
- if err != nil {
- t.Fatalf("Error creating collection %v", err)
- }
-
- var locs []string
- for k := range locators {
- locs = append(locs, k)
- }
-
- return collection["uuid"].(string), locs
-}
-
-// Create collection with multiple streams and blocks; backdate the blocks and but do not delete the collection.
-// Also, create stray block and backdate it.
-// After datamanager run: expect blocks from the collection, but not the stray block.
-func TestManifestWithMultipleStreamsAndBlocks(t *testing.T) {
- testManifestWithMultipleStreamsAndBlocks(t, 100, 10, "", false)
-}
-
-// Same test as TestManifestWithMultipleStreamsAndBlocks with an additional
-// keepstore of a service type other than "disk". Only the "disk" type services
-// will be indexed by datamanager and hence should work the same way.
-func TestManifestWithMultipleStreamsAndBlocks_WithOneUnsupportedKeepServer(t *testing.T) {
- testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "testblobstore", false)
-}
-
-// Test datamanager with dry-run. Expect no block to be deleted.
-func TestManifestWithMultipleStreamsAndBlocks_DryRun(t *testing.T) {
- testManifestWithMultipleStreamsAndBlocks(t, 2, 2, "", true)
-}
-
-func testManifestWithMultipleStreamsAndBlocks(t *testing.T, numStreams, numBlocks int, createExtraKeepServerWithType string, isDryRun bool) {
- defer TearDownDataManagerTest(t)
- SetupDataManagerTest(t)
-
- // create collection whose blocks will be backdated
- collectionWithOldBlocks, oldBlocks := createMultiStreamBlockCollection(t, "old block", numStreams, numBlocks)
- if collectionWithOldBlocks == "" {
- t.Fatalf("Failed to create collection with %d blocks", numStreams*numBlocks)
- }
- if len(oldBlocks) != numStreams*numBlocks {
- t.Fatalf("Not all blocks are created: expected %v, found %v", 1000, len(oldBlocks))
- }
-
- // create a stray block that will be backdated
- strayOldBlock := putBlock(t, "this stray block is old")
-
- expected := []string{strayOldBlock}
- expected = append(expected, oldBlocks...)
- verifyBlocks(t, nil, expected, 2)
-
- // Backdate old blocks; but the collection still references these blocks
- backdateBlocks(t, oldBlocks)
-
- // also backdate the stray old block
- backdateBlocks(t, []string{strayOldBlock})
-
- // If requested, create an extra keepserver with the given type
- // This should be ignored during indexing and hence not change the datamanager outcome
- var extraKeepServerUUID string
- if createExtraKeepServerWithType != "" {
- extraKeepServerUUID = addExtraKeepServer(t, createExtraKeepServerWithType)
- defer deleteExtraKeepServer(extraKeepServerUUID)
- }
-
- // run datamanager
- dryRun = isDryRun
- dataManagerSingleRun(t)
-
- if dryRun {
- // verify that all blocks, including strayOldBlock, are still to be found
- verifyBlocks(t, nil, expected, 2)
- } else {
- // verify that strayOldBlock is not to be found, but the collections blocks are still there
- verifyBlocks(t, []string{strayOldBlock}, oldBlocks, 2)
- }
-}
-
-// Add one more keepstore with the given service type
-func addExtraKeepServer(t *testing.T, serviceType string) string {
- defer switchToken(arvadostest.AdminToken)()
-
- extraKeepService := make(arvadosclient.Dict)
- err := arv.Create("keep_services",
- arvadosclient.Dict{"keep_service": arvadosclient.Dict{
- "service_host": "localhost",
- "service_port": "21321",
- "service_ssl_flag": false,
- "service_type": serviceType}},
- &extraKeepService)
- if err != nil {
- t.Fatal(err)
- }
-
- return extraKeepService["uuid"].(string)
-}
-
-func deleteExtraKeepServer(uuid string) {
- defer switchToken(arvadostest.AdminToken)()
- arv.Delete("keep_services", uuid, nil, nil)
-}
+++ /dev/null
-#! /usr/bin/env python
-
-import arvados
-
-import argparse
-import cgi
-import csv
-import json
-import logging
-import math
-import pprint
-import re
-import threading
-import urllib2
-
-from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
-from collections import defaultdict, Counter
-from functools import partial
-from operator import itemgetter
-from SocketServer import ThreadingMixIn
-
-arv = arvados.api('v1')
-
-# Adapted from http://stackoverflow.com/questions/4180980/formatting-data-quantity-capacity-as-string
-byteunits = ('B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB')
-def fileSizeFormat(value):
- exponent = 0 if value == 0 else int(math.log(value, 1024))
- return "%7.2f %-3s" % (float(value) / pow(1024, exponent),
- byteunits[exponent])
-
-def percentageFloor(x):
- """ Returns a float which is the input rounded down to the neared 0.01.
-
-e.g. precentageFloor(0.941354) = 0.94
-"""
- return math.floor(x*100) / 100.0
-
-
-def byteSizeFromValidUuid(valid_uuid):
- return int(valid_uuid.split('+')[1])
-
-class maxdict(dict):
- """A dictionary that holds the largest value entered for each key."""
- def addValue(self, key, value):
- dict.__setitem__(self, key, max(dict.get(self, key), value))
- def addValues(self, kv_pairs):
- for key,value in kv_pairs:
- self.addValue(key, value)
- def addDict(self, d):
- self.addValues(d.items())
-
-class CollectionInfo:
- DEFAULT_PERSISTER_REPLICATION_LEVEL=2
- all_by_uuid = {}
-
- def __init__(self, uuid):
- if CollectionInfo.all_by_uuid.has_key(uuid):
- raise ValueError('Collection for uuid "%s" already exists.' % uuid)
- self.uuid = uuid
- self.block_uuids = set() # uuids of keep blocks in this collection
- self.reader_uuids = set() # uuids of users who can read this collection
- self.persister_uuids = set() # uuids of users who want this collection saved
- # map from user uuid to replication level they desire
- self.persister_replication = maxdict()
-
- # The whole api response in case we need anything else later.
- self.api_response = []
- CollectionInfo.all_by_uuid[uuid] = self
-
- def byteSize(self):
- return sum(map(byteSizeFromValidUuid, self.block_uuids))
-
- def __str__(self):
- return ('CollectionInfo uuid: %s\n'
- ' %d block(s) containing %s\n'
- ' reader_uuids: %s\n'
- ' persister_replication: %s' %
- (self.uuid,
- len(self.block_uuids),
- fileSizeFormat(self.byteSize()),
- pprint.pformat(self.reader_uuids, indent = 15),
- pprint.pformat(self.persister_replication, indent = 15)))
-
- @staticmethod
- def get(uuid):
- if not CollectionInfo.all_by_uuid.has_key(uuid):
- CollectionInfo(uuid)
- return CollectionInfo.all_by_uuid[uuid]
-
-
-def extractUuid(candidate):
- """ Returns a canonical (hash+size) uuid from a valid uuid, or None if candidate is not a valid uuid."""
- match = re.match('([0-9a-fA-F]{32}\+[0-9]+)(\+[^+]+)*$', candidate)
- return match and match.group(1)
-
-def checkUserIsAdmin():
- current_user = arv.users().current().execute()
-
- if not current_user['is_admin']:
- log.warning('Current user %s (%s - %s) does not have '
- 'admin access and will not see much of the data.',
- current_user['full_name'],
- current_user['email'],
- current_user['uuid'])
- if args.require_admin_user:
- log.critical('Exiting, rerun with --no-require-admin-user '
- 'if you wish to continue.')
- exit(1)
-
-def buildCollectionsList():
- if args.uuid:
- return [args.uuid,]
- else:
- collections_list_response = arv.collections().list(limit=args.max_api_results).execute()
-
- print ('Returned %d of %d collections.' %
- (len(collections_list_response['items']),
- collections_list_response['items_available']))
-
- return [item['uuid'] for item in collections_list_response['items']]
-
-
-def readCollections(collection_uuids):
- for collection_uuid in collection_uuids:
- collection_block_uuids = set()
- collection_response = arv.collections().get(uuid=collection_uuid).execute()
- collection_info = CollectionInfo.get(collection_uuid)
- collection_info.api_response = collection_response
- manifest_lines = collection_response['manifest_text'].split('\n')
-
- if args.verbose:
- print 'Manifest text for %s:' % collection_uuid
- pprint.pprint(manifest_lines)
-
- for manifest_line in manifest_lines:
- if manifest_line:
- manifest_tokens = manifest_line.split(' ')
- if args.verbose:
- print 'manifest tokens: ' + pprint.pformat(manifest_tokens)
- stream_name = manifest_tokens[0]
-
- line_block_uuids = set(filter(None,
- [extractUuid(candidate)
- for candidate in manifest_tokens[1:]]))
- collection_info.block_uuids.update(line_block_uuids)
-
- # file_tokens = [token
- # for token in manifest_tokens[1:]
- # if extractUuid(token) is None]
-
- # # Sort file tokens by start position in case they aren't already
- # file_tokens.sort(key=lambda file_token: int(file_token.split(':')[0]))
-
- # if args.verbose:
- # print 'line_block_uuids: ' + pprint.pformat(line_block_uuids)
- # print 'file_tokens: ' + pprint.pformat(file_tokens)
-
-
-def readLinks():
- link_classes = set()
-
- for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
- # TODO(misha): We may not be seing all the links, but since items
- # available does not return an accurate number, I don't knos how
- # to confirm that we saw all of them.
- collection_links_response = arv.links().list(where={'head_uuid':collection_uuid}).execute()
- link_classes.update([link['link_class'] for link in collection_links_response['items']])
- for link in collection_links_response['items']:
- if link['link_class'] == 'permission':
- collection_info.reader_uuids.add(link['tail_uuid'])
- elif link['link_class'] == 'resources':
- replication_level = link['properties'].get(
- 'replication',
- CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
- collection_info.persister_replication.addValue(
- link['tail_uuid'],
- replication_level)
- collection_info.persister_uuids.add(link['tail_uuid'])
-
- print 'Found the following link classes:'
- pprint.pprint(link_classes)
-
-def reportMostPopularCollections():
- most_popular_collections = sorted(
- CollectionInfo.all_by_uuid.values(),
- key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
- reverse=True)[:10]
-
- print 'Most popular Collections:'
- for collection_info in most_popular_collections:
- print collection_info
-
-
-def buildMaps():
- for collection_uuid,collection_info in CollectionInfo.all_by_uuid.items():
- # Add the block holding the manifest itself for all calculations
- block_uuids = collection_info.block_uuids.union([collection_uuid,])
- for block_uuid in block_uuids:
- block_to_collections[block_uuid].add(collection_uuid)
- block_to_readers[block_uuid].update(collection_info.reader_uuids)
- block_to_persisters[block_uuid].update(collection_info.persister_uuids)
- block_to_persister_replication[block_uuid].addDict(
- collection_info.persister_replication)
- for reader_uuid in collection_info.reader_uuids:
- reader_to_collections[reader_uuid].add(collection_uuid)
- reader_to_blocks[reader_uuid].update(block_uuids)
- for persister_uuid in collection_info.persister_uuids:
- persister_to_collections[persister_uuid].add(collection_uuid)
- persister_to_blocks[persister_uuid].update(block_uuids)
-
-
-def itemsByValueLength(original):
- return sorted(original.items(),
- key=lambda item:len(item[1]),
- reverse=True)
-
-
-def reportBusiestUsers():
- busiest_readers = itemsByValueLength(reader_to_collections)
- print 'The busiest readers are:'
- for reader,collections in busiest_readers:
- print '%s reading %d collections.' % (reader, len(collections))
- busiest_persisters = itemsByValueLength(persister_to_collections)
- print 'The busiest persisters are:'
- for persister,collections in busiest_persisters:
- print '%s reading %d collections.' % (persister, len(collections))
-
-
-def blockDiskUsage(block_uuid):
- """Returns the disk usage of a block given its uuid.
-
- Will return 0 before reading the contents of the keep servers.
- """
- return byteSizeFromValidUuid(block_uuid) * block_to_replication[block_uuid]
-
-def blockPersistedUsage(user_uuid, block_uuid):
- return (byteSizeFromValidUuid(block_uuid) *
- block_to_persister_replication[block_uuid].get(user_uuid, 0))
-
-memo_computeWeightedReplicationCosts = {}
-def computeWeightedReplicationCosts(replication_levels):
- """Computes the relative cost of varied replication levels.
-
- replication_levels: a tuple of integers representing the desired
- replication level. If n users want a replication level of x then x
- should appear n times in replication_levels.
-
- Returns a dictionary from replication level to cost.
-
- The basic thinking is that the cost of replicating at level x should
- be shared by everyone who wants replication of level x or higher.
-
- For example, if we have two users who want 1 copy, one user who
- wants 3 copies and two users who want 6 copies:
- the input would be [1, 1, 3, 6, 6] (or any permutation)
-
- The cost of the first copy is shared by all 5 users, so they each
- pay 1 copy / 5 users = 0.2.
- The cost of the second and third copies shared by 3 users, so they
- each pay 2 copies / 3 users = 0.67 (plus the above costs)
- The cost of the fourth, fifth and sixth copies is shared by two
- users, so they each pay 3 copies / 2 users = 1.5 (plus the above costs)
-
- Here are some other examples:
- computeWeightedReplicationCosts([1,]) -> {1:1.0}
- computeWeightedReplicationCosts([2,]) -> {2:2.0}
- computeWeightedReplicationCosts([1,1]) -> {1:0.5}
- computeWeightedReplicationCosts([2,2]) -> {1:1.0}
- computeWeightedReplicationCosts([1,2]) -> {1:0.5,2:1.5}
- computeWeightedReplicationCosts([1,3]) -> {1:0.5,2:2.5}
- computeWeightedReplicationCosts([1,3,6,6,10]) -> {1:0.2,3:0.7,6:1.7,10:5.7}
- """
- replication_level_counts = sorted(Counter(replication_levels).items())
-
- memo_key = str(replication_level_counts)
-
- if not memo_key in memo_computeWeightedReplicationCosts:
- last_level = 0
- current_cost = 0
- total_interested = float(sum(map(itemgetter(1), replication_level_counts)))
- cost_for_level = {}
- for replication_level, count in replication_level_counts:
- copies_added = replication_level - last_level
- # compute marginal cost from last level and add it to the last cost
- current_cost += copies_added / total_interested
- cost_for_level[replication_level] = current_cost
- # update invariants
- last_level = replication_level
- total_interested -= count
- memo_computeWeightedReplicationCosts[memo_key] = cost_for_level
-
- return memo_computeWeightedReplicationCosts[memo_key]
-
-def blockPersistedWeightedUsage(user_uuid, block_uuid):
- persister_replication_for_block = block_to_persister_replication[block_uuid]
- user_replication = persister_replication_for_block[user_uuid]
- return (
- byteSizeFromValidUuid(block_uuid) *
- computeWeightedReplicationCosts(
- persister_replication_for_block.values())[user_replication])
-
-
-def computeUserStorageUsage():
- for user, blocks in reader_to_blocks.items():
- user_to_usage[user][UNWEIGHTED_READ_SIZE_COL] = sum(map(
- byteSizeFromValidUuid,
- blocks))
- user_to_usage[user][WEIGHTED_READ_SIZE_COL] = sum(map(
- lambda block_uuid:(float(byteSizeFromValidUuid(block_uuid))/
- len(block_to_readers[block_uuid])),
- blocks))
- for user, blocks in persister_to_blocks.items():
- user_to_usage[user][UNWEIGHTED_PERSIST_SIZE_COL] = sum(map(
- partial(blockPersistedUsage, user),
- blocks))
- user_to_usage[user][WEIGHTED_PERSIST_SIZE_COL] = sum(map(
- partial(blockPersistedWeightedUsage, user),
- blocks))
-
-def printUserStorageUsage():
- print ('user: unweighted readable block size, weighted readable block size, '
- 'unweighted persisted block size, weighted persisted block size:')
- for user, usage in user_to_usage.items():
- print ('%s: %s %s %s %s' %
- (user,
- fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
- fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
- fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
- fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
-
-def logUserStorageUsage():
- for user, usage in user_to_usage.items():
- body = {}
- # user could actually represent a user or a group. We don't set
- # the object_type field since we don't know which we have.
- body['object_uuid'] = user
- body['event_type'] = args.user_storage_log_event_type
- properties = {}
- properties['read_collections_total_bytes'] = usage[UNWEIGHTED_READ_SIZE_COL]
- properties['read_collections_weighted_bytes'] = (
- usage[WEIGHTED_READ_SIZE_COL])
- properties['persisted_collections_total_bytes'] = (
- usage[UNWEIGHTED_PERSIST_SIZE_COL])
- properties['persisted_collections_weighted_bytes'] = (
- usage[WEIGHTED_PERSIST_SIZE_COL])
- body['properties'] = properties
- # TODO(misha): Confirm that this will throw an exception if it
- # fails to create the log entry.
- arv.logs().create(body=body).execute()
-
-def getKeepServers():
- response = arv.keep_disks().list().execute()
- return [[keep_server['service_host'], keep_server['service_port']]
- for keep_server in response['items']]
-
-
-def getKeepBlocks(keep_servers):
- blocks = []
- for host,port in keep_servers:
- response = urllib2.urlopen('http://%s:%d/index' % (host, port))
- server_blocks = [line.split(' ')
- for line in response.read().split('\n')
- if line]
- server_blocks = [(block_id, int(mtime))
- for block_id, mtime in server_blocks]
- blocks.append(server_blocks)
- return blocks
-
-def getKeepStats(keep_servers):
- MOUNT_COLUMN = 5
- TOTAL_COLUMN = 1
- FREE_COLUMN = 3
- DISK_BLOCK_SIZE = 1024
- stats = []
- for host,port in keep_servers:
- response = urllib2.urlopen('http://%s:%d/status.json' % (host, port))
-
- parsed_json = json.load(response)
- df_entries = [line.split()
- for line in parsed_json['df'].split('\n')
- if line]
- keep_volumes = [columns
- for columns in df_entries
- if 'keep' in columns[MOUNT_COLUMN]]
- total_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(TOTAL_COLUMN),
- keep_volumes)))
- free_space = DISK_BLOCK_SIZE*sum(map(int,map(itemgetter(FREE_COLUMN),
- keep_volumes)))
- stats.append([total_space, free_space])
- return stats
-
-
-def computeReplication(keep_blocks):
- for server_blocks in keep_blocks:
- for block_uuid, _ in server_blocks:
- block_to_replication[block_uuid] += 1
- log.debug('Seeing the following replication levels among blocks: %s',
- str(set(block_to_replication.values())))
-
-
-def computeGarbageCollectionCandidates():
- for server_blocks in keep_blocks:
- block_to_latest_mtime.addValues(server_blocks)
- empty_set = set()
- garbage_collection_priority = sorted(
- [(block,mtime)
- for block,mtime in block_to_latest_mtime.items()
- if len(block_to_persisters.get(block,empty_set)) == 0],
- key = itemgetter(1))
- global garbage_collection_report
- garbage_collection_report = []
- cumulative_disk_size = 0
- for block,mtime in garbage_collection_priority:
- disk_size = blockDiskUsage(block)
- cumulative_disk_size += disk_size
- garbage_collection_report.append(
- (block,
- mtime,
- disk_size,
- cumulative_disk_size,
- float(free_keep_space + cumulative_disk_size)/total_keep_space))
-
- print 'The oldest Garbage Collection Candidates: '
- pprint.pprint(garbage_collection_report[:20])
-
-
-def outputGarbageCollectionReport(filename):
- with open(filename, 'wb') as csvfile:
- gcwriter = csv.writer(csvfile)
- gcwriter.writerow(['block uuid', 'latest mtime', 'disk size',
- 'cumulative size', 'disk free'])
- for line in garbage_collection_report:
- gcwriter.writerow(line)
-
-def computeGarbageCollectionHistogram():
- # TODO(misha): Modify this to allow users to specify the number of
- # histogram buckets through a flag.
- histogram = []
- last_percentage = -1
- for _,mtime,_,_,disk_free in garbage_collection_report:
- curr_percentage = percentageFloor(disk_free)
- if curr_percentage > last_percentage:
- histogram.append( (mtime, curr_percentage) )
- last_percentage = curr_percentage
-
- log.info('Garbage collection histogram is: %s', histogram)
-
- return histogram
-
-
-def logGarbageCollectionHistogram():
- body = {}
- # TODO(misha): Decide whether we should specify an object_uuid in
- # the body and if so, which uuid to use.
- body['event_type'] = args.block_age_free_space_histogram_log_event_type
- properties = {}
- properties['histogram'] = garbage_collection_histogram
- body['properties'] = properties
- # TODO(misha): Confirm that this will throw an exception if it
- # fails to create the log entry.
- arv.logs().create(body=body).execute()
-
-
-def detectReplicationProblems():
- blocks_not_in_any_collections.update(
- set(block_to_replication.keys()).difference(block_to_collections.keys()))
- underreplicated_persisted_blocks.update(
- [uuid
- for uuid, persister_replication in block_to_persister_replication.items()
- if len(persister_replication) > 0 and
- block_to_replication[uuid] < max(persister_replication.values())])
- overreplicated_persisted_blocks.update(
- [uuid
- for uuid, persister_replication in block_to_persister_replication.items()
- if len(persister_replication) > 0 and
- block_to_replication[uuid] > max(persister_replication.values())])
-
- log.info('Found %d blocks not in any collections, e.g. %s...',
- len(blocks_not_in_any_collections),
- ','.join(list(blocks_not_in_any_collections)[:5]))
- log.info('Found %d underreplicated blocks, e.g. %s...',
- len(underreplicated_persisted_blocks),
- ','.join(list(underreplicated_persisted_blocks)[:5]))
- log.info('Found %d overreplicated blocks, e.g. %s...',
- len(overreplicated_persisted_blocks),
- ','.join(list(overreplicated_persisted_blocks)[:5]))
-
- # TODO:
- # Read blocks sorted by mtime
- # Cache window vs % free space
- # Collections which candidates will appear in
- # Youngest underreplicated read blocks that appear in collections.
- # Report Collections that have blocks which are missing from (or
- # underreplicated in) keep.
-
-
-# This is the main flow here
-
-parser = argparse.ArgumentParser(description='Report on keep disks.')
-"""The command line argument parser we use.
-
-We only use it in the __main__ block, but leave it outside the block
-in case another package wants to use it or customize it by specifying
-it as a parent to their commandline parser.
-"""
-parser.add_argument('-m',
- '--max-api-results',
- type=int,
- default=5000,
- help=('The max results to get at once.'))
-parser.add_argument('-p',
- '--port',
- type=int,
- default=9090,
- help=('The port number to serve on. 0 means no server.'))
-parser.add_argument('-v',
- '--verbose',
- help='increase output verbosity',
- action='store_true')
-parser.add_argument('-u',
- '--uuid',
- help='uuid of specific collection to process')
-parser.add_argument('--require-admin-user',
- action='store_true',
- default=True,
- help='Fail if the user is not an admin [default]')
-parser.add_argument('--no-require-admin-user',
- dest='require_admin_user',
- action='store_false',
- help=('Allow users without admin permissions with '
- 'only a warning.'))
-parser.add_argument('--log-to-workbench',
- action='store_true',
- default=False,
- help='Log findings to workbench')
-parser.add_argument('--no-log-to-workbench',
- dest='log_to_workbench',
- action='store_false',
- help='Don\'t log findings to workbench [default]')
-parser.add_argument('--user-storage-log-event-type',
- default='user-storage-report',
- help=('The event type to set when logging user '
- 'storage usage to workbench.'))
-parser.add_argument('--block-age-free-space-histogram-log-event-type',
- default='block-age-free-space-histogram',
- help=('The event type to set when logging user '
- 'storage usage to workbench.'))
-parser.add_argument('--garbage-collection-file',
- default='',
- help=('The file to write a garbage collection report, or '
- 'leave empty for no report.'))
-
-args = None
-
-# TODO(misha): Think about moving some of this to the __main__ block.
-log = logging.getLogger('arvados.services.datamanager')
-stderr_handler = logging.StreamHandler()
-log.setLevel(logging.INFO)
-stderr_handler.setFormatter(
- logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
-log.addHandler(stderr_handler)
-
-# Global Data - don't try this at home
-collection_uuids = []
-
-# These maps all map from uuids to a set of uuids
-block_to_collections = defaultdict(set) # keep blocks
-reader_to_collections = defaultdict(set) # collection(s) for which the user has read access
-persister_to_collections = defaultdict(set) # collection(s) which the user has persisted
-block_to_readers = defaultdict(set)
-block_to_persisters = defaultdict(set)
-block_to_persister_replication = defaultdict(maxdict)
-reader_to_blocks = defaultdict(set)
-persister_to_blocks = defaultdict(set)
-
-UNWEIGHTED_READ_SIZE_COL = 0
-WEIGHTED_READ_SIZE_COL = 1
-UNWEIGHTED_PERSIST_SIZE_COL = 2
-WEIGHTED_PERSIST_SIZE_COL = 3
-NUM_COLS = 4
-user_to_usage = defaultdict(lambda : [0,]*NUM_COLS)
-
-keep_servers = []
-keep_blocks = []
-keep_stats = []
-total_keep_space = 0
-free_keep_space = 0
-
-block_to_replication = defaultdict(lambda: 0)
-block_to_latest_mtime = maxdict()
-
-garbage_collection_report = []
-"""A list of non-persisted blocks, sorted by increasing mtime
-
-Each entry is of the form (block uuid, latest mtime, disk size,
-cumulative size)
-
-* block uuid: The id of the block we want to delete
-* latest mtime: The latest mtime of the block across all keep servers.
-* disk size: The total disk space used by this block (block size
-multiplied by current replication level)
-* cumulative disk size: The sum of this block's disk size and all the
-blocks listed above it
-* disk free: The proportion of our disk space that would be free if we
-deleted this block and all the above. So this is (free disk space +
-cumulative disk size) / total disk capacity
-"""
-
-garbage_collection_histogram = []
-""" Shows the tradeoff of keep block age vs keep disk free space.
-
-Each entry is of the form (mtime, Disk Proportion).
-
-An entry of the form (1388747781, 0.52) means that if we deleted the
-oldest non-presisted blocks until we had 52% of the disk free, then
-all blocks with an mtime greater than 1388747781 would be preserved.
-"""
-
-# Stuff to report on
-blocks_not_in_any_collections = set()
-underreplicated_persisted_blocks = set()
-overreplicated_persisted_blocks = set()
-
-all_data_loaded = False
-
-def loadAllData():
- checkUserIsAdmin()
-
- log.info('Building Collection List')
- global collection_uuids
- collection_uuids = filter(None, [extractUuid(candidate)
- for candidate in buildCollectionsList()])
-
- log.info('Reading Collections')
- readCollections(collection_uuids)
-
- if args.verbose:
- pprint.pprint(CollectionInfo.all_by_uuid)
-
- log.info('Reading Links')
- readLinks()
-
- reportMostPopularCollections()
-
- log.info('Building Maps')
- buildMaps()
-
- reportBusiestUsers()
-
- log.info('Getting Keep Servers')
- global keep_servers
- keep_servers = getKeepServers()
-
- print keep_servers
-
- log.info('Getting Blocks from each Keep Server.')
- global keep_blocks
- keep_blocks = getKeepBlocks(keep_servers)
-
- log.info('Getting Stats from each Keep Server.')
- global keep_stats, total_keep_space, free_keep_space
- keep_stats = getKeepStats(keep_servers)
-
- total_keep_space = sum(map(itemgetter(0), keep_stats))
- free_keep_space = sum(map(itemgetter(1), keep_stats))
-
- # TODO(misha): Delete this hack when the keep servers are fixed!
- # This hack deals with the fact that keep servers report each other's disks.
- total_keep_space /= len(keep_stats)
- free_keep_space /= len(keep_stats)
-
- log.info('Total disk space: %s, Free disk space: %s (%d%%).' %
- (fileSizeFormat(total_keep_space),
- fileSizeFormat(free_keep_space),
- 100*free_keep_space/total_keep_space))
-
- computeReplication(keep_blocks)
-
- log.info('average replication level is %f',
- (float(sum(block_to_replication.values())) /
- len(block_to_replication)))
-
- computeGarbageCollectionCandidates()
-
- if args.garbage_collection_file:
- log.info('Writing garbage Collection report to %s',
- args.garbage_collection_file)
- outputGarbageCollectionReport(args.garbage_collection_file)
-
- global garbage_collection_histogram
- garbage_collection_histogram = computeGarbageCollectionHistogram()
-
- if args.log_to_workbench:
- logGarbageCollectionHistogram()
-
- detectReplicationProblems()
-
- computeUserStorageUsage()
- printUserStorageUsage()
- if args.log_to_workbench:
- logUserStorageUsage()
-
- global all_data_loaded
- all_data_loaded = True
-
-
-class DataManagerHandler(BaseHTTPRequestHandler):
- USER_PATH = 'user'
- COLLECTION_PATH = 'collection'
- BLOCK_PATH = 'block'
-
- def userLink(self, uuid):
- return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
- {'uuid': uuid,
- 'path': DataManagerHandler.USER_PATH})
-
- def collectionLink(self, uuid):
- return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
- {'uuid': uuid,
- 'path': DataManagerHandler.COLLECTION_PATH})
-
- def blockLink(self, uuid):
- return ('<A HREF="/%(path)s/%(uuid)s">%(uuid)s</A>' %
- {'uuid': uuid,
- 'path': DataManagerHandler.BLOCK_PATH})
-
- def writeTop(self, title):
- self.wfile.write('<HTML><HEAD><TITLE>%s</TITLE></HEAD>\n<BODY>' % title)
-
- def writeBottom(self):
- self.wfile.write('</BODY></HTML>\n')
-
- def writeHomePage(self):
- self.send_response(200)
- self.end_headers()
- self.writeTop('Home')
- self.wfile.write('<TABLE>')
- self.wfile.write('<TR><TH>user'
- '<TH>unweighted readable block size'
- '<TH>weighted readable block size'
- '<TH>unweighted persisted block size'
- '<TH>weighted persisted block size</TR>\n')
- for user, usage in user_to_usage.items():
- self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
- (self.userLink(user),
- fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
- fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
- fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
- fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
- self.wfile.write('</TABLE>\n')
- self.writeBottom()
-
- def userExists(self, uuid):
- # Currently this will return false for a user who exists but
- # doesn't appear on any manifests.
- # TODO(misha): Figure out if we need to fix this.
- return user_to_usage.has_key(uuid)
-
- def writeUserPage(self, uuid):
- if not self.userExists(uuid):
- self.send_error(404,
- 'User (%s) Not Found.' % cgi.escape(uuid, quote=False))
- else:
- # Here we assume that since a user exists, they don't need to be
- # html escaped.
- self.send_response(200)
- self.end_headers()
- self.writeTop('User %s' % uuid)
- self.wfile.write('<TABLE>')
- self.wfile.write('<TR><TH>user'
- '<TH>unweighted readable block size'
- '<TH>weighted readable block size'
- '<TH>unweighted persisted block size'
- '<TH>weighted persisted block size</TR>\n')
- usage = user_to_usage[uuid]
- self.wfile.write('<TR><TD>%s<TD>%s<TD>%s<TD>%s<TD>%s</TR>\n' %
- (self.userLink(uuid),
- fileSizeFormat(usage[UNWEIGHTED_READ_SIZE_COL]),
- fileSizeFormat(usage[WEIGHTED_READ_SIZE_COL]),
- fileSizeFormat(usage[UNWEIGHTED_PERSIST_SIZE_COL]),
- fileSizeFormat(usage[WEIGHTED_PERSIST_SIZE_COL])))
- self.wfile.write('</TABLE>\n')
- self.wfile.write('<P>Persisting Collections: %s\n' %
- ', '.join(map(self.collectionLink,
- persister_to_collections[uuid])))
- self.wfile.write('<P>Reading Collections: %s\n' %
- ', '.join(map(self.collectionLink,
- reader_to_collections[uuid])))
- self.writeBottom()
-
- def collectionExists(self, uuid):
- return CollectionInfo.all_by_uuid.has_key(uuid)
-
- def writeCollectionPage(self, uuid):
- if not self.collectionExists(uuid):
- self.send_error(404,
- 'Collection (%s) Not Found.' % cgi.escape(uuid, quote=False))
- else:
- collection = CollectionInfo.get(uuid)
- # Here we assume that since a collection exists, its id doesn't
- # need to be html escaped.
- self.send_response(200)
- self.end_headers()
- self.writeTop('Collection %s' % uuid)
- self.wfile.write('<H1>Collection %s</H1>\n' % uuid)
- self.wfile.write('<P>Total size %s (not factoring in replication).\n' %
- fileSizeFormat(collection.byteSize()))
- self.wfile.write('<P>Readers: %s\n' %
- ', '.join(map(self.userLink, collection.reader_uuids)))
-
- if len(collection.persister_replication) == 0:
- self.wfile.write('<P>No persisters\n')
- else:
- replication_to_users = defaultdict(set)
- for user,replication in collection.persister_replication.items():
- replication_to_users[replication].add(user)
- replication_levels = sorted(replication_to_users.keys())
-
- self.wfile.write('<P>%d persisters in %d replication level(s) maxing '
- 'out at %dx replication:\n' %
- (len(collection.persister_replication),
- len(replication_levels),
- replication_levels[-1]))
-
- # TODO(misha): This code is used twice, let's move it to a method.
- self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
- '<TH>'.join(['Replication Level ' + str(x)
- for x in replication_levels]))
- self.wfile.write('<TR>\n')
- for replication_level in replication_levels:
- users = replication_to_users[replication_level]
- self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
- map(self.userLink, users)))
- self.wfile.write('</TR></TABLE>\n')
-
- replication_to_blocks = defaultdict(set)
- for block in collection.block_uuids:
- replication_to_blocks[block_to_replication[block]].add(block)
- replication_levels = sorted(replication_to_blocks.keys())
- self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
- (len(collection.block_uuids), len(replication_levels)))
- self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
- '<TH>'.join(['Replication Level ' + str(x)
- for x in replication_levels]))
- self.wfile.write('<TR>\n')
- for replication_level in replication_levels:
- blocks = replication_to_blocks[replication_level]
- self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(blocks))
- self.wfile.write('</TR></TABLE>\n')
-
-
- def do_GET(self):
- if not all_data_loaded:
- self.send_error(503,
- 'Sorry, but I am still loading all the data I need.')
- else:
- # Removing leading '/' and process request path
- split_path = self.path[1:].split('/')
- request_type = split_path[0]
- log.debug('path (%s) split as %s with request_type %s' % (self.path,
- split_path,
- request_type))
- if request_type == '':
- self.writeHomePage()
- elif request_type == DataManagerHandler.USER_PATH:
- self.writeUserPage(split_path[1])
- elif request_type == DataManagerHandler.COLLECTION_PATH:
- self.writeCollectionPage(split_path[1])
- else:
- self.send_error(404, 'Unrecognized request path.')
- return
-
-class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
- """Handle requests in a separate thread."""
-
-
-if __name__ == '__main__':
- args = parser.parse_args()
-
- if args.port == 0:
- loadAllData()
- else:
- loader = threading.Thread(target = loadAllData, name = 'loader')
- loader.start()
-
- server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
- server.serve_forever()
+++ /dev/null
-#! /usr/bin/env python
-
-import datamanager
-import unittest
-
-class TestComputeWeightedReplicationCosts(unittest.TestCase):
- def test_obvious(self):
- self.assertEqual(datamanager.computeWeightedReplicationCosts([1,]),
- {1:1.0})
-
- def test_simple(self):
- self.assertEqual(datamanager.computeWeightedReplicationCosts([2,]),
- {2:2.0})
-
- def test_even_split(self):
- self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1]),
- {1:0.5})
-
- def test_even_split_bigger(self):
- self.assertEqual(datamanager.computeWeightedReplicationCosts([2,2]),
- {2:1.0})
-
- def test_uneven_split(self):
- self.assertEqual(datamanager.computeWeightedReplicationCosts([1,2]),
- {1:0.5, 2:1.5})
-
- def test_uneven_split_bigger(self):
- self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3]),
- {1:0.5, 3:2.5})
-
- def test_uneven_split_jumble(self):
- self.assertEqual(datamanager.computeWeightedReplicationCosts([1,3,6,6,10]),
- {1:0.2, 3:0.7, 6:1.7, 10:5.7})
-
- def test_documentation_example(self):
- self.assertEqual(datamanager.computeWeightedReplicationCosts([1,1,3,6,6]),
- {1:0.2, 3: 0.2 + 2.0 / 3, 6: 0.2 + 2.0 / 3 + 1.5})
-
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-/* Deals with getting Keep Server blocks from API Server and Keep Servers. */
-
-package keep
-
-import (
- "bufio"
- "encoding/json"
- "errors"
- "flag"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/logger"
- "io"
- "io/ioutil"
- "log"
- "net/http"
- "strconv"
- "strings"
- "time"
-)
-
-// ServerAddress struct
-type ServerAddress struct {
- SSL bool `json:"service_ssl_flag"`
- Host string `json:"service_host"`
- Port int `json:"service_port"`
- UUID string `json:"uuid"`
- ServiceType string `json:"service_type"`
-}
-
-// BlockInfo is info about a particular block returned by the server
-type BlockInfo struct {
- Digest blockdigest.DigestWithSize
- Mtime int64 // TODO(misha): Replace this with a timestamp.
-}
-
-// BlockServerInfo is info about a specified block given by a server
-type BlockServerInfo struct {
- ServerIndex int
- Mtime int64 // TODO(misha): Replace this with a timestamp.
-}
-
-// ServerContents struct
-type ServerContents struct {
- BlockDigestToInfo map[blockdigest.DigestWithSize]BlockInfo
-}
-
-// ServerResponse struct
-type ServerResponse struct {
- Address ServerAddress
- Contents ServerContents
- Err error
-}
-
-// ReadServers struct
-type ReadServers struct {
- ReadAllServers bool
- KeepServerIndexToAddress []ServerAddress
- KeepServerAddressToIndex map[ServerAddress]int
- ServerToContents map[ServerAddress]ServerContents
- BlockToServers map[blockdigest.DigestWithSize][]BlockServerInfo
- BlockReplicationCounts map[int]int
-}
-
-// GetKeepServersParams struct
-type GetKeepServersParams struct {
- Client *arvadosclient.ArvadosClient
- Logger *logger.Logger
- Limit int
-}
-
-// ServiceList consists of the addresses of all the available kee servers
-type ServiceList struct {
- ItemsAvailable int `json:"items_available"`
- KeepServers []ServerAddress `json:"items"`
-}
-
-var serviceType string
-
-func init() {
- flag.StringVar(&serviceType,
- "service-type",
- "disk",
- "Operate only on keep_services with the specified service_type, ignoring all others.")
-}
-
-// String
-// TODO(misha): Change this to include the UUID as well.
-func (s ServerAddress) String() string {
- return s.URL()
-}
-
-// URL of the keep server
-func (s ServerAddress) URL() string {
- if s.SSL {
- return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
- }
- return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
-}
-
-// GetKeepServersAndSummarize gets keep servers from api
-func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers, err error) {
- results, err = GetKeepServers(params)
- if err != nil {
- return
- }
- log.Printf("Returned %d keep disks", len(results.ServerToContents))
-
- results.Summarize(params.Logger)
- log.Printf("Replication level distribution: %v",
- results.BlockReplicationCounts)
-
- return
-}
-
-// GetKeepServers from api server
-func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
- sdkParams := arvadosclient.Dict{
- "filters": [][]string{{"service_type", "!=", "proxy"}},
- }
- if params.Limit > 0 {
- sdkParams["limit"] = params.Limit
- }
-
- var sdkResponse ServiceList
- err = params.Client.List("keep_services", sdkParams, &sdkResponse)
-
- if err != nil {
- return
- }
-
- var keepServers []ServerAddress
- for _, server := range sdkResponse.KeepServers {
- if server.ServiceType == serviceType {
- keepServers = append(keepServers, server)
- } else {
- log.Printf("Skipping keep_service %q because its service_type %q does not match -service-type=%q", server, server.ServiceType, serviceType)
- }
- }
-
- if len(keepServers) == 0 {
- return results, fmt.Errorf("Found no keepservices with the service type %v", serviceType)
- }
-
- if params.Logger != nil {
- params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := logger.GetOrCreateMap(p, "keep_info")
- keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
- keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
- keepInfo["keep_servers"] = sdkResponse.KeepServers
- keepInfo["indexable_keep_servers"] = keepServers
- })
- }
-
- log.Printf("Received keep services list: %+v", sdkResponse)
-
- if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
- return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse)
- }
-
- results.KeepServerIndexToAddress = keepServers
- results.KeepServerAddressToIndex = make(map[ServerAddress]int)
- for i, address := range results.KeepServerIndexToAddress {
- results.KeepServerAddressToIndex[address] = i
- }
-
- log.Printf("Got Server Addresses: %v", results)
-
- // Send off all the index requests concurrently
- responseChan := make(chan ServerResponse)
- for _, keepServer := range results.KeepServerIndexToAddress {
- // The above keepsServer variable is reused for each iteration, so
- // it would be shared across all goroutines. This would result in
- // us querying one server n times instead of n different servers
- // as we intended. To avoid this we add it as an explicit
- // parameter which gets copied. This bug and solution is described
- // in https://golang.org/doc/effective_go.html#channels
- go func(keepServer ServerAddress) {
- responseChan <- GetServerContents(params.Logger,
- keepServer,
- params.Client)
- }(keepServer)
- }
-
- results.ServerToContents = make(map[ServerAddress]ServerContents)
- results.BlockToServers = make(map[blockdigest.DigestWithSize][]BlockServerInfo)
-
- // Read all the responses
- for i := range results.KeepServerIndexToAddress {
- _ = i // Here to prevent go from complaining.
- response := <-responseChan
-
- // Check if there were any errors during GetServerContents
- if response.Err != nil {
- return results, response.Err
- }
-
- log.Printf("Received channel response from %v containing %d files",
- response.Address,
- len(response.Contents.BlockDigestToInfo))
- results.ServerToContents[response.Address] = response.Contents
- serverIndex := results.KeepServerAddressToIndex[response.Address]
- for _, blockInfo := range response.Contents.BlockDigestToInfo {
- results.BlockToServers[blockInfo.Digest] = append(
- results.BlockToServers[blockInfo.Digest],
- BlockServerInfo{ServerIndex: serverIndex,
- Mtime: blockInfo.Mtime})
- }
- }
- return
-}
-
-// GetServerContents of the keep server
-func GetServerContents(arvLogger *logger.Logger,
- keepServer ServerAddress,
- arv *arvadosclient.ArvadosClient) (response ServerResponse) {
-
- err := GetServerStatus(arvLogger, keepServer, arv)
- if err != nil {
- response.Err = err
- return
- }
-
- req, err := CreateIndexRequest(arvLogger, keepServer, arv)
- if err != nil {
- response.Err = err
- return
- }
-
- resp, err := arv.Client.Do(req)
- if err != nil {
- response.Err = err
- return
- }
-
- response, err = ReadServerResponse(arvLogger, keepServer, resp)
- if err != nil {
- response.Err = err
- return
- }
-
- return
-}
-
-// GetServerStatus get keep server status by invoking /status.json
-func GetServerStatus(arvLogger *logger.Logger,
- keepServer ServerAddress,
- arv *arvadosclient.ArvadosClient) error {
- url := fmt.Sprintf("http://%s:%d/status.json",
- keepServer.Host,
- keepServer.Port)
-
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := make(map[string]interface{})
- serverInfo["status_request_sent_at"] = now
- serverInfo["host"] = keepServer.Host
- serverInfo["port"] = keepServer.Port
-
- keepInfo[keepServer.UUID] = serverInfo
- })
- }
-
- resp, err := arv.Client.Get(url)
- if err != nil {
- return fmt.Errorf("Error getting keep status from %s: %v", url, err)
- } else if resp.StatusCode != 200 {
- return fmt.Errorf("Received error code %d in response to request "+
- "for %s status: %s",
- resp.StatusCode, url, resp.Status)
- }
-
- var keepStatus map[string]interface{}
- decoder := json.NewDecoder(resp.Body)
- decoder.UseNumber()
- err = decoder.Decode(&keepStatus)
- if err != nil {
- return fmt.Errorf("Error decoding keep status from %s: %v", url, err)
- }
-
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
- serverInfo["status_response_processed_at"] = now
- serverInfo["status"] = keepStatus
- })
- }
-
- return nil
-}
-
-// CreateIndexRequest to the keep server
-func CreateIndexRequest(arvLogger *logger.Logger,
- keepServer ServerAddress,
- arv *arvadosclient.ArvadosClient) (req *http.Request, err error) {
- url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
- log.Println("About to fetch keep server contents from " + url)
-
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
- serverInfo["index_request_sent_at"] = now
- })
- }
-
- req, err = http.NewRequest("GET", url, nil)
- if err != nil {
- return req, fmt.Errorf("Error building http request for %s: %v", url, err)
- }
-
- req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
- return req, err
-}
-
-// ReadServerResponse reads reasponse from keep server
-func ReadServerResponse(arvLogger *logger.Logger,
- keepServer ServerAddress,
- resp *http.Response) (response ServerResponse, err error) {
-
- if resp.StatusCode != 200 {
- return response, fmt.Errorf("Received error code %d in response to index request for %s: %s",
- resp.StatusCode, keepServer.String(), resp.Status)
- }
-
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
- serverInfo["index_response_received_at"] = now
- })
- }
-
- response.Address = keepServer
- response.Contents.BlockDigestToInfo =
- make(map[blockdigest.DigestWithSize]BlockInfo)
- reader := bufio.NewReader(resp.Body)
- numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
- for {
- numLines++
- line, err := reader.ReadString('\n')
- if err == io.EOF {
- return response, fmt.Errorf("Index from %s truncated at line %d",
- keepServer.String(), numLines)
- } else if err != nil {
- return response, fmt.Errorf("Error reading index response from %s at line %d: %v",
- keepServer.String(), numLines, err)
- }
- if line == "\n" {
- if _, err := reader.Peek(1); err == nil {
- extra, _ := reader.ReadString('\n')
- return response, fmt.Errorf("Index from %s had trailing data at line %d after EOF marker: %s",
- keepServer.String(), numLines+1, extra)
- } else if err != io.EOF {
- return response, fmt.Errorf("Index from %s had read error after EOF marker at line %d: %v",
- keepServer.String(), numLines, err)
- }
- numLines--
- break
- }
- blockInfo, err := parseBlockInfoFromIndexLine(line)
- if err != nil {
- return response, fmt.Errorf("Error parsing BlockInfo from index line "+
- "received from %s: %v",
- keepServer.String(),
- err)
- }
-
- if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
- // This server returned multiple lines containing the same block digest.
- numDuplicates++
- // Keep the block that's newer.
- if storedBlock.Mtime < blockInfo.Mtime {
- response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
- }
- } else {
- response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
- }
- }
-
- log.Printf("%s index contained %d lines with %d duplicates with "+
- "%d size disagreements",
- keepServer.String(),
- numLines,
- numDuplicates,
- numSizeDisagreements)
-
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.UUID].(map[string]interface{})
-
- serverInfo["processing_finished_at"] = now
- serverInfo["lines_received"] = numLines
- serverInfo["duplicates_seen"] = numDuplicates
- serverInfo["size_disagreements_seen"] = numSizeDisagreements
- })
- }
- resp.Body.Close()
- return
-}
-
-func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err error) {
- tokens := strings.Fields(indexLine)
- if len(tokens) != 2 {
- err = fmt.Errorf("Expected 2 tokens per line but received a "+
- "line containing %#q instead.",
- tokens)
- }
-
- var locator blockdigest.BlockLocator
- if locator, err = blockdigest.ParseBlockLocator(tokens[0]); err != nil {
- err = fmt.Errorf("%v Received error while parsing line \"%#q\"",
- err, indexLine)
- return
- }
- if len(locator.Hints) > 0 {
- err = fmt.Errorf("Block locator in index line should not contain hints "+
- "but it does: %#q",
- locator)
- return
- }
-
- var ns int64
- ns, err = strconv.ParseInt(tokens[1], 10, 64)
- if err != nil {
- return
- }
- if ns < 1e12 {
- // An old version of keepstore is giving us timestamps
- // in seconds instead of nanoseconds. (This threshold
- // correctly handles all times between 1970-01-02 and
- // 33658-09-27.)
- ns = ns * 1e9
- }
- blockInfo.Mtime = ns
- blockInfo.Digest = blockdigest.DigestWithSize{
- Digest: locator.Digest,
- Size: uint32(locator.Size),
- }
- return
-}
-
-// Summarize results from keep server
-func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
- readServers.BlockReplicationCounts = make(map[int]int)
- for _, infos := range readServers.BlockToServers {
- replication := len(infos)
- readServers.BlockReplicationCounts[replication]++
- }
-
- if arvLogger != nil {
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := logger.GetOrCreateMap(p, "keep_info")
- keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
- })
- }
-}
-
-// TrashRequest struct
-type TrashRequest struct {
- Locator string `json:"locator"`
- BlockMtime int64 `json:"block_mtime"`
-}
-
-// TrashList is an array of TrashRequest objects
-type TrashList []TrashRequest
-
-// SendTrashLists to trash queue
-func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList, dryRun bool) (errs []error) {
- count := 0
- barrier := make(chan error)
-
- client := kc.Client
-
- for url, v := range spl {
- if arvLogger != nil {
- // We need a local variable because Update doesn't call our mutator func until later,
- // when our list variable might have been reused by the next loop iteration.
- url := url
- trashLen := len(v)
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- trashListInfo := logger.GetOrCreateMap(p, "trash_list_len")
- trashListInfo[url] = trashLen
- })
- }
-
- if dryRun {
- log.Printf("dry run, not sending trash list to service %s with %d blocks", url, len(v))
- continue
- }
-
- count++
- log.Printf("Sending trash list to %v", url)
-
- go (func(url string, v TrashList) {
- pipeReader, pipeWriter := io.Pipe()
- go (func() {
- enc := json.NewEncoder(pipeWriter)
- enc.Encode(v)
- pipeWriter.Close()
- })()
-
- req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
- if err != nil {
- log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
- barrier <- err
- return
- }
-
- req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
-
- // Make the request
- var resp *http.Response
- if resp, err = client.Do(req); err != nil {
- log.Printf("Error sending trash list to %v error: %v", url, err.Error())
- barrier <- err
- return
- }
-
- log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
-
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
-
- if resp.StatusCode != 200 {
- barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode))
- } else {
- barrier <- nil
- }
- })(url, v)
- }
-
- for i := 0; i < count; i++ {
- b := <-barrier
- if b != nil {
- errs = append(errs, b)
- }
- }
-
- return errs
-}
+++ /dev/null
-package keep
-
-import (
- "encoding/json"
- "fmt"
- "net"
- "net/http"
- "net/http/httptest"
- "net/url"
- "strconv"
- "strings"
- "testing"
-
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
-
- . "gopkg.in/check.v1"
-)
-
-// Gocheck boilerplate
-func Test(t *testing.T) {
- TestingT(t)
-}
-
-type KeepSuite struct{}
-
-var _ = Suite(&KeepSuite{})
-
-type TestHandler struct {
- request TrashList
-}
-
-func (ts *TestHandler) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
- r := json.NewDecoder(req.Body)
- r.Decode(&ts.request)
-}
-
-func (s *KeepSuite) TestSendTrashLists(c *C) {
- th := TestHandler{}
- server := httptest.NewServer(&th)
- defer server.Close()
-
- tl := map[string]TrashList{
- server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
-
- arv := &arvadosclient.ArvadosClient{ApiToken: "abc123"}
- kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
- kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
- map[string]string{"xxxx": server.URL},
- map[string]string{})
-
- err := SendTrashLists(nil, &kc, tl, false)
-
- c.Check(err, IsNil)
-
- c.Check(th.request,
- DeepEquals,
- tl[server.URL])
-
-}
-
-type TestHandlerError struct {
-}
-
-func (tse *TestHandlerError) ServeHTTP(writer http.ResponseWriter, req *http.Request) {
- http.Error(writer, "I'm a teapot", 418)
-}
-
-func sendTrashListError(c *C, server *httptest.Server) {
- tl := map[string]TrashList{
- server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
-
- arv := &arvadosclient.ArvadosClient{ApiToken: "abc123"}
- kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
- kc.SetServiceRoots(map[string]string{"xxxx": server.URL},
- map[string]string{"xxxx": server.URL},
- map[string]string{})
-
- err := SendTrashLists(nil, &kc, tl, false)
-
- c.Check(err, NotNil)
- c.Check(err[0], NotNil)
-}
-
-func (s *KeepSuite) TestSendTrashListErrorResponse(c *C) {
- server := httptest.NewServer(&TestHandlerError{})
- sendTrashListError(c, server)
- defer server.Close()
-}
-
-func (s *KeepSuite) TestSendTrashListUnreachable(c *C) {
- sendTrashListError(c, httptest.NewUnstartedServer(&TestHandler{}))
-}
-
-type APITestData struct {
- numServers int
- serverType string
- statusCode int
-}
-
-func (s *KeepSuite) TestGetKeepServers_UnsupportedServiceType(c *C) {
- testGetKeepServersFromAPI(c, APITestData{1, "notadisk", 200}, "Found no keepservices with the service type disk")
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReceivedTooFewServers(c *C) {
- testGetKeepServersFromAPI(c, APITestData{2, "disk", 200}, "Did not receive all available keep servers")
-}
-
-func (s *KeepSuite) TestGetKeepServers_ServerError(c *C) {
- testGetKeepServersFromAPI(c, APITestData{-1, "disk", -1}, "arvados API server error")
-}
-
-func testGetKeepServersFromAPI(c *C, testData APITestData, expectedError string) {
- keepServers := ServiceList{
- ItemsAvailable: testData.numServers,
- KeepServers: []ServerAddress{{
- SSL: false,
- Host: "example.com",
- Port: 12345,
- UUID: "abcdefg",
- ServiceType: testData.serverType,
- }},
- }
-
- ksJSON, _ := json.Marshal(keepServers)
- apiStubResponses := make(map[string]arvadostest.StubResponse)
- apiStubResponses["/arvados/v1/keep_services"] = arvadostest.StubResponse{testData.statusCode, string(ksJSON)}
- apiStub := arvadostest.ServerStub{apiStubResponses}
-
- api := httptest.NewServer(&apiStub)
- defer api.Close()
-
- arv := &arvadosclient.ArvadosClient{
- Scheme: "http",
- ApiServer: api.URL[7:],
- ApiToken: "abc123",
- Client: &http.Client{Transport: &http.Transport{}},
- }
-
- kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
- kc.SetServiceRoots(map[string]string{"xxxx": "http://example.com:23456"},
- map[string]string{"xxxx": "http://example.com:23456"},
- map[string]string{})
-
- params := GetKeepServersParams{
- Client: arv,
- Logger: nil,
- Limit: 10,
- }
-
- _, err := GetKeepServersAndSummarize(params)
- c.Assert(err, NotNil)
- c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*", expectedError))
-}
-
-type KeepServerTestData struct {
- // handle /status.json
- statusStatusCode int
-
- // handle /index
- indexStatusCode int
- indexResponseBody string
-
- // expected error, if any
- expectedError string
-}
-
-func (s *KeepSuite) TestGetKeepServers_ErrorGettingKeepServerStatus(c *C) {
- testGetKeepServersAndSummarize(c, KeepServerTestData{500, 200, "ok",
- ".*http://.* 500 Internal Server Error"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_GettingIndex(c *C) {
- testGetKeepServersAndSummarize(c, KeepServerTestData{200, -1, "notok",
- ".*redirect-loop.*"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ErrorReadServerResponse(c *C) {
- testGetKeepServersAndSummarize(c, KeepServerTestData{200, 500, "notok",
- ".*http://.* 500 Internal Server Error"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseTuncatedAtLineOne(c *C) {
- testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200,
- "notterminatedwithnewline", "Index from http://.* truncated at line 1"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_InvalidBlockLocatorPattern(c *C) {
- testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200, "testing\n",
- "Error parsing BlockInfo from index line.*"})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseEmpty(c *C) {
- testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200, "\n", ""})
-}
-
-func (s *KeepSuite) TestGetKeepServers_ReadServerResponseWithTwoBlocks(c *C) {
- testGetKeepServersAndSummarize(c, KeepServerTestData{200, 200,
- "51752ba076e461ec9ec1d27400a08548+20 1447526361\na048cc05c02ba1ee43ad071274b9e547+52 1447526362\n\n", ""})
-}
-
-func testGetKeepServersAndSummarize(c *C, testData KeepServerTestData) {
- ksStubResponses := make(map[string]arvadostest.StubResponse)
- ksStubResponses["/status.json"] = arvadostest.StubResponse{testData.statusStatusCode, string(`{}`)}
- ksStubResponses["/index"] = arvadostest.StubResponse{testData.indexStatusCode, testData.indexResponseBody}
- ksStub := arvadostest.ServerStub{ksStubResponses}
- ks := httptest.NewServer(&ksStub)
- defer ks.Close()
-
- ksURL, err := url.Parse(ks.URL)
- c.Check(err, IsNil)
- ksHost, port, err := net.SplitHostPort(ksURL.Host)
- ksPort, err := strconv.Atoi(port)
- c.Check(err, IsNil)
-
- servers_list := ServiceList{
- ItemsAvailable: 1,
- KeepServers: []ServerAddress{{
- SSL: false,
- Host: ksHost,
- Port: ksPort,
- UUID: "abcdefg",
- ServiceType: "disk",
- }},
- }
- ksJSON, _ := json.Marshal(servers_list)
- apiStubResponses := make(map[string]arvadostest.StubResponse)
- apiStubResponses["/arvados/v1/keep_services"] = arvadostest.StubResponse{200, string(ksJSON)}
- apiStub := arvadostest.ServerStub{apiStubResponses}
-
- api := httptest.NewServer(&apiStub)
- defer api.Close()
-
- arv := &arvadosclient.ArvadosClient{
- Scheme: "http",
- ApiServer: api.URL[7:],
- ApiToken: "abc123",
- Client: &http.Client{Transport: &http.Transport{}},
- }
-
- kc := keepclient.KeepClient{Arvados: arv, Client: &http.Client{}}
- kc.SetServiceRoots(map[string]string{"xxxx": ks.URL},
- map[string]string{"xxxx": ks.URL},
- map[string]string{})
-
- params := GetKeepServersParams{
- Client: arv,
- Logger: nil,
- Limit: 10,
- }
-
- // GetKeepServersAndSummarize
- results, err := GetKeepServersAndSummarize(params)
-
- if testData.expectedError == "" {
- c.Assert(err, IsNil)
- c.Assert(results, NotNil)
-
- blockToServers := results.BlockToServers
-
- blockLocators := strings.Split(testData.indexResponseBody, "\n")
- for _, loc := range blockLocators {
- locator := strings.Split(loc, " ")[0]
- if locator != "" {
- blockLocator, err := blockdigest.ParseBlockLocator(locator)
- c.Assert(err, IsNil)
-
- blockDigestWithSize := blockdigest.DigestWithSize{blockLocator.Digest, uint32(blockLocator.Size)}
- blockServerInfo := blockToServers[blockDigestWithSize]
- c.Assert(blockServerInfo[0].Mtime, NotNil)
- }
- }
- } else {
- c.Assert(err, ErrorMatches, testData.expectedError)
- }
-}
+++ /dev/null
-/* Datamanager-specific logging methods. */
-
-package loggerutil
-
-import (
- "git.curoverse.com/arvados.git/sdk/go/logger"
- "log"
- "os"
- "runtime"
- "time"
-)
-
-// Useful to call at the beginning of execution to log info about the
-// current run.
-func LogRunInfo(arvLogger *logger.Logger) {
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- runInfo := logger.GetOrCreateMap(p, "run_info")
- runInfo["started_at"] = now
- runInfo["args"] = os.Args
- hostname, err := os.Hostname()
- if err != nil {
- runInfo["hostname_error"] = err.Error()
- } else {
- runInfo["hostname"] = hostname
- }
- runInfo["pid"] = os.Getpid()
- })
- }
-}
-
-// A LogMutator that records the current memory usage. This is most useful as a logger write hook.
-func LogMemoryAlloc(p map[string]interface{}, e map[string]interface{}) {
- runInfo := logger.GetOrCreateMap(p, "run_info")
- var memStats runtime.MemStats
- runtime.ReadMemStats(&memStats)
- runInfo["memory_bytes_in_use"] = memStats.Alloc
- runInfo["memory_bytes_reserved"] = memStats.Sys
-}
-
-func FatalWithMessage(arvLogger *logger.Logger, message string) {
- if arvLogger != nil {
- arvLogger.FinalUpdate(func(p map[string]interface{}, e map[string]interface{}) {
- p["FATAL"] = message
- runInfo := logger.GetOrCreateMap(p, "run_info")
- runInfo["finished_at"] = time.Now()
- })
- }
-
- log.Fatalf(message)
-}
+++ /dev/null
-/* Ensures that we only have one copy of each unique string. This is
-/* not designed for concurrent access. */
-
-package summary
-
-// This code should probably be moved somewhere more universal.
-
-// CanonicalString struct
-type CanonicalString struct {
- m map[string]string
-}
-
-// Get a CanonicalString
-func (cs *CanonicalString) Get(s string) (r string) {
- if cs.m == nil {
- cs.m = make(map[string]string)
- }
- value, found := cs.m[s]
- if found {
- return value
- }
-
- // s may be a substring of a much larger string.
- // If we store s, it will prevent that larger string from getting
- // garbage collected.
- // If this is something you worry about you should change this code
- // to make an explict copy of s using a byte array.
- cs.m[s] = s
- return s
-}
+++ /dev/null
-// Handles writing data to and reading data from disk to speed up development.
-
-package summary
-
-import (
- "encoding/gob"
- "flag"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/logger"
- "git.curoverse.com/arvados.git/services/datamanager/collection"
- "git.curoverse.com/arvados.git/services/datamanager/keep"
- "log"
- "os"
-)
-
-// Used to locally cache data read from servers to reduce execution
-// time when developing. Not for use in production.
-type serializedData struct {
- ReadCollections collection.ReadCollections
- KeepServerInfo keep.ReadServers
-}
-
-var (
- WriteDataTo string
- readDataFrom string
-)
-
-// DataFetcher to fetch data from keep servers
-type DataFetcher func(arvLogger *logger.Logger,
- readCollections *collection.ReadCollections,
- keepServerInfo *keep.ReadServers) error
-
-func init() {
- flag.StringVar(&WriteDataTo,
- "write-data-to",
- "",
- "Write summary of data received to this file. Used for development only.")
- flag.StringVar(&readDataFrom,
- "read-data-from",
- "",
- "Avoid network i/o and read summary data from this file instead. Used for development only.")
-}
-
-// MaybeWriteData writes data we've read to a file.
-//
-// This is useful for development, so that we don't need to read all
-// our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func MaybeWriteData(arvLogger *logger.Logger,
- readCollections collection.ReadCollections,
- keepServerInfo keep.ReadServers) error {
- if WriteDataTo == "" {
- return nil
- }
- summaryFile, err := os.Create(WriteDataTo)
- if err != nil {
- return err
- }
- defer summaryFile.Close()
-
- enc := gob.NewEncoder(summaryFile)
- data := serializedData{
- ReadCollections: readCollections,
- KeepServerInfo: keepServerInfo}
- err = enc.Encode(data)
- if err != nil {
- return err
- }
- log.Printf("Wrote summary data to: %s", WriteDataTo)
- return nil
-}
-
-// ShouldReadData should not be used outside of development
-func ShouldReadData() bool {
- return readDataFrom != ""
-}
-
-// ReadData reads data that we've written to a file.
-//
-// This is useful for development, so that we don't need to read all
-// our data from the network every time we tweak something.
-//
-// This should not be used outside of development, since you'll be
-// working with stale data.
-func ReadData(arvLogger *logger.Logger,
- readCollections *collection.ReadCollections,
- keepServerInfo *keep.ReadServers) error {
- if readDataFrom == "" {
- return fmt.Errorf("ReadData() called with empty filename.")
- }
- summaryFile, err := os.Open(readDataFrom)
- if err != nil {
- return err
- }
- defer summaryFile.Close()
-
- dec := gob.NewDecoder(summaryFile)
- data := serializedData{}
- err = dec.Decode(&data)
- if err != nil {
- return err
- }
-
- // re-summarize data, so that we can update our summarizing
- // functions without needing to do all our network i/o
- data.ReadCollections.Summarize(arvLogger)
- data.KeepServerInfo.Summarize(arvLogger)
-
- *readCollections = data.ReadCollections
- *keepServerInfo = data.KeepServerInfo
- log.Printf("Read summary data from: %s", readDataFrom)
- return nil
-}
+++ /dev/null
-// Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
-
-package summary
-
-import (
- "encoding/json"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/logger"
- "git.curoverse.com/arvados.git/services/datamanager/keep"
- "log"
- "os"
- "strings"
-)
-
-// Locator is a block digest
-type Locator blockdigest.DigestWithSize
-
-// MarshalJSON encoding
-func (l Locator) MarshalJSON() ([]byte, error) {
- return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
-}
-
-// PullRequest represents one entry in the Pull List
-type PullRequest struct {
- Locator Locator `json:"locator"`
- Servers []string `json:"servers"`
-}
-
-// PullList for a particular server
-type PullList []PullRequest
-
-// PullListByLocator implements sort.Interface for PullList based on
-// the Digest.
-type PullListByLocator PullList
-
-func (a PullListByLocator) Len() int { return len(a) }
-func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a PullListByLocator) Less(i, j int) bool {
- di, dj := a[i].Locator.Digest, a[j].Locator.Digest
- if di.H < dj.H {
- return true
- } else if di.H == dj.H {
- if di.L < dj.L {
- return true
- } else if di.L == dj.L {
- return a[i].Locator.Size < a[j].Locator.Size
- }
- }
- return false
-}
-
-// PullServers struct
-// For a given under-replicated block, this structure represents which
-// servers should pull the specified block and which servers they can
-// pull it from.
-type PullServers struct {
- To []string // Servers that should pull the specified block
- From []string // Servers that already contain the specified block
-}
-
-// ComputePullServers creates a map from block locator to PullServers
-// with one entry for each under-replicated block.
-//
-// This method ignores zero-replica blocks since there are no servers
-// to pull them from, so callers should feel free to omit them, but
-// this function will ignore them if they are provided.
-func ComputePullServers(kc *keepclient.KeepClient,
- keepServerInfo *keep.ReadServers,
- blockToDesiredReplication map[blockdigest.DigestWithSize]int,
- underReplicated BlockSet) (m map[Locator]PullServers) {
- m = map[Locator]PullServers{}
- // We use CanonicalString to avoid filling memory with duplicate
- // copies of the same string.
- var cs CanonicalString
-
- // Servers that are writeable
- writableServers := map[string]struct{}{}
- for _, url := range kc.WritableLocalRoots() {
- writableServers[cs.Get(url)] = struct{}{}
- }
-
- for block := range underReplicated {
- serversStoringBlock := keepServerInfo.BlockToServers[block]
- numCopies := len(serversStoringBlock)
- numCopiesMissing := blockToDesiredReplication[block] - numCopies
- if numCopiesMissing > 0 {
- // We expect this to always be true, since the block was listed
- // in underReplicated.
-
- if numCopies > 0 {
- // Not much we can do with blocks with no copies.
-
- // A server's host-port string appears as a key in this map
- // iff it contains the block.
- serverHasBlock := map[string]struct{}{}
- for _, info := range serversStoringBlock {
- sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
- serverHasBlock[cs.Get(sa.URL())] = struct{}{}
- }
-
- roots := keepclient.NewRootSorter(kc.LocalRoots(),
- block.String()).GetSortedRoots()
-
- l := Locator(block)
- m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
- roots, numCopiesMissing)
- }
- }
- }
- return m
-}
-
-// CreatePullServers creates a pull list in which the To and From
-// fields preserve the ordering of sorted servers and the contents
-// are all canonical strings.
-func CreatePullServers(cs CanonicalString,
- serverHasBlock map[string]struct{},
- writableServers map[string]struct{},
- sortedServers []string,
- maxToFields int) (ps PullServers) {
-
- ps = PullServers{
- To: make([]string, 0, maxToFields),
- From: make([]string, 0, len(serverHasBlock)),
- }
-
- for _, host := range sortedServers {
- // Strip the protocol portion of the url.
- // Use the canonical copy of the string to avoid memory waste.
- server := cs.Get(host)
- _, hasBlock := serverHasBlock[server]
- if hasBlock {
- // The from field should include the protocol.
- ps.From = append(ps.From, cs.Get(host))
- } else if len(ps.To) < maxToFields {
- _, writable := writableServers[host]
- if writable {
- ps.To = append(ps.To, server)
- }
- }
- }
-
- return
-}
-
-// RemoveProtocolPrefix strips the protocol prefix from a url.
-func RemoveProtocolPrefix(url string) string {
- return url[(strings.LastIndex(url, "/") + 1):]
-}
-
-// BuildPullLists produces a PullList for each keep server.
-func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
- spl = map[string]PullList{}
- // We don't worry about canonicalizing our strings here, because we
- // assume lps was created by ComputePullServers() which already
- // canonicalized the strings for us.
- for locator, pullServers := range lps {
- for _, destination := range pullServers.To {
- pullList, pullListExists := spl[destination]
- if !pullListExists {
- pullList = PullList{}
- }
- spl[destination] = append(pullList,
- PullRequest{Locator: locator, Servers: pullServers.From})
- }
- }
- return
-}
-
-// WritePullLists writes each pull list to a file.
-// The filename is based on the hostname.
-//
-// This is just a hack for prototyping, it is not expected to be used
-// in production.
-func WritePullLists(arvLogger *logger.Logger,
- pullLists map[string]PullList,
- dryRun bool) error {
- r := strings.NewReplacer(":", ".")
-
- for host, list := range pullLists {
- if arvLogger != nil {
- // We need a local variable because Update doesn't call our mutator func until later,
- // when our list variable might have been reused by the next loop iteration.
- host := host
- listLen := len(list)
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- pullListInfo := logger.GetOrCreateMap(p, "pull_list_len")
- pullListInfo[host] = listLen
- })
- }
-
- if dryRun {
- log.Print("dry run, not sending pull list to service %s with %d blocks", host, len(list))
- continue
- }
-
- filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
- pullListFile, err := os.Create(filename)
- if err != nil {
- return err
- }
- defer pullListFile.Close()
-
- enc := json.NewEncoder(pullListFile)
- err = enc.Encode(list)
- if err != nil {
- return err
- }
- log.Printf("Wrote pull list to %s.", filename)
- }
-
- return nil
-}
+++ /dev/null
-package summary
-
-import (
- "encoding/json"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- . "gopkg.in/check.v1"
- "sort"
- "testing"
-)
-
-// Gocheck boilerplate
-func TestPullLists(t *testing.T) {
- TestingT(t)
-}
-
-type PullSuite struct{}
-
-var _ = Suite(&PullSuite{})
-
-// Helper method to declare string sets more succinctly
-// Could be placed somewhere more general.
-func stringSet(slice ...string) (m map[string]struct{}) {
- m = map[string]struct{}{}
- for _, s := range slice {
- m[s] = struct{}{}
- }
- return
-}
-
-func (s *PullSuite) TestPullListPrintsJSONCorrectly(c *C) {
- pl := PullList{PullRequest{
- Locator: Locator(blockdigest.MakeTestDigestSpecifySize(0xBadBeef, 56789)),
- Servers: []string{"keep0.qr1hi.arvadosapi.com:25107",
- "keep1.qr1hi.arvadosapi.com:25108"}}}
-
- b, err := json.Marshal(pl)
- c.Assert(err, IsNil)
- expectedOutput := `[{"locator":"0000000000000000000000000badbeef+56789",` +
- `"servers":["keep0.qr1hi.arvadosapi.com:25107",` +
- `"keep1.qr1hi.arvadosapi.com:25108"]}]`
- c.Check(string(b), Equals, expectedOutput)
-}
-
-func (s *PullSuite) TestCreatePullServers(c *C) {
- var cs CanonicalString
- c.Check(
- CreatePullServers(cs,
- stringSet(),
- stringSet(),
- []string{},
- 5),
- DeepEquals,
- PullServers{To: []string{}, From: []string{}})
-
- c.Check(
- CreatePullServers(cs,
- stringSet("https://keep0:25107", "https://keep1:25108"),
- stringSet(),
- []string{},
- 5),
- DeepEquals,
- PullServers{To: []string{}, From: []string{}})
-
- c.Check(
- CreatePullServers(cs,
- stringSet("https://keep0:25107", "https://keep1:25108"),
- stringSet("https://keep0:25107"),
- []string{"https://keep0:25107"},
- 5),
- DeepEquals,
- PullServers{To: []string{}, From: []string{"https://keep0:25107"}})
-
- c.Check(
- CreatePullServers(cs,
- stringSet("https://keep0:25107", "https://keep1:25108"),
- stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
- []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
- 5),
- DeepEquals,
- PullServers{To: []string{"https://keep3:25110", "https://keep2:25109"},
- From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
- c.Check(
- CreatePullServers(cs,
- stringSet("https://keep0:25107", "https://keep1:25108"),
- stringSet("https://keep3:25110", "https://keep1:25108", "https://keep0:25107"),
- []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
- 5),
- DeepEquals,
- PullServers{To: []string{"https://keep3:25110"},
- From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
- c.Check(
- CreatePullServers(cs,
- stringSet("https://keep0:25107", "https://keep1:25108"),
- stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
- []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
- 1),
- DeepEquals,
- PullServers{To: []string{"https://keep3:25110"},
- From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
- c.Check(
- CreatePullServers(cs,
- stringSet("https://keep0:25107", "https://keep1:25108"),
- stringSet("https://keep3:25110", "https://keep2:25109",
- "https://keep1:25108", "https://keep0:25107"),
- []string{"https://keep3:25110", "https://keep2:25109",
- "https://keep1:25108", "https://keep0:25107"},
- 1),
- DeepEquals,
- PullServers{To: []string{"https://keep3:25110"},
- From: []string{"https://keep1:25108", "https://keep0:25107"}})
-
- c.Check(
- CreatePullServers(cs,
- stringSet("https://keep0:25107", "https://keep1:25108"),
- stringSet("https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"),
- []string{"https://keep3:25110", "https://keep2:25109", "https://keep1:25108", "https://keep0:25107"},
- 0),
- DeepEquals,
- PullServers{To: []string{},
- From: []string{"https://keep1:25108", "https://keep0:25107"}})
-}
-
-// Checks whether two pull list maps are equal. Since pull lists are
-// ordered arbitrarily, we need to sort them by digest before
-// comparing them for deep equality.
-type pullListMapEqualsChecker struct {
- *CheckerInfo
-}
-
-func (c *pullListMapEqualsChecker) Check(params []interface{}, names []string) (result bool, error string) {
- obtained, ok := params[0].(map[string]PullList)
- if !ok {
- return false, "First parameter is not a PullList map"
- }
- expected, ok := params[1].(map[string]PullList)
- if !ok {
- return false, "Second parameter is not a PullList map"
- }
-
- for _, v := range obtained {
- sort.Sort(PullListByLocator(v))
- }
- for _, v := range expected {
- sort.Sort(PullListByLocator(v))
- }
-
- return DeepEquals.Check(params, names)
-}
-
-var PullListMapEquals Checker = &pullListMapEqualsChecker{&CheckerInfo{
- Name: "PullListMapEquals",
- Params: []string{"obtained", "expected"},
-}}
-
-func (s *PullSuite) TestBuildPullLists(c *C) {
- c.Check(
- BuildPullLists(map[Locator]PullServers{}),
- PullListMapEquals,
- map[string]PullList{})
-
- locator1 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xBadBeef)}
- c.Check(
- BuildPullLists(map[Locator]PullServers{
- locator1: {To: []string{}, From: []string{}}}),
- PullListMapEquals,
- map[string]PullList{})
-
- c.Check(
- BuildPullLists(map[Locator]PullServers{
- locator1: {To: []string{}, From: []string{"f1", "f2"}}}),
- PullListMapEquals,
- map[string]PullList{})
-
- c.Check(
- BuildPullLists(map[Locator]PullServers{
- locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}}}),
- PullListMapEquals,
- map[string]PullList{
- "t1": {PullRequest{locator1, []string{"f1", "f2"}}}})
-
- c.Check(
- BuildPullLists(map[Locator]PullServers{
- locator1: {To: []string{"t1"}, From: []string{}}}),
- PullListMapEquals,
- map[string]PullList{"t1": {
- PullRequest{locator1, []string{}}}})
-
- c.Check(
- BuildPullLists(map[Locator]PullServers{
- locator1: {
- To: []string{"t1", "t2"},
- From: []string{"f1", "f2"},
- }}),
- PullListMapEquals,
- map[string]PullList{
- "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
- "t2": {PullRequest{locator1, []string{"f1", "f2"}}},
- })
-
- locator2 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xCabbed)}
- c.Check(
- BuildPullLists(map[Locator]PullServers{
- locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}},
- locator2: {To: []string{"t2"}, From: []string{"f3", "f4"}}}),
- PullListMapEquals,
- map[string]PullList{
- "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
- "t2": {PullRequest{locator2, []string{"f3", "f4"}}},
- })
-
- c.Check(
- BuildPullLists(map[Locator]PullServers{
- locator1: {
- To: []string{"t1"},
- From: []string{"f1", "f2"}},
- locator2: {
- To: []string{"t2", "t1"},
- From: []string{"f3", "f4"}},
- }),
- PullListMapEquals,
- map[string]PullList{
- "t1": {
- PullRequest{locator1, []string{"f1", "f2"}},
- PullRequest{locator2, []string{"f3", "f4"}},
- },
- "t2": {
- PullRequest{locator2, []string{"f3", "f4"}},
- },
- })
-
- locator3 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xDeadBeef)}
- locator4 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xFedBeef)}
- c.Check(
- BuildPullLists(map[Locator]PullServers{
- locator1: {
- To: []string{"t1"},
- From: []string{"f1", "f2"}},
- locator2: {
- To: []string{"t2", "t1"},
- From: []string{"f3", "f4"}},
- locator3: {
- To: []string{"t3", "t2", "t1"},
- From: []string{"f4", "f5"}},
- locator4: {
- To: []string{"t4", "t3", "t2", "t1"},
- From: []string{"f1", "f5"}},
- }),
- PullListMapEquals,
- map[string]PullList{
- "t1": {
- PullRequest{locator1, []string{"f1", "f2"}},
- PullRequest{locator2, []string{"f3", "f4"}},
- PullRequest{locator3, []string{"f4", "f5"}},
- PullRequest{locator4, []string{"f1", "f5"}},
- },
- "t2": {
- PullRequest{locator2, []string{"f3", "f4"}},
- PullRequest{locator3, []string{"f4", "f5"}},
- PullRequest{locator4, []string{"f1", "f5"}},
- },
- "t3": {
- PullRequest{locator3, []string{"f4", "f5"}},
- PullRequest{locator4, []string{"f1", "f5"}},
- },
- "t4": {
- PullRequest{locator4, []string{"f1", "f5"}},
- },
- })
-}
+++ /dev/null
-// Summarizes Collection Data and Keep Server Contents.
-
-package summary
-
-// TODO(misha): Check size of blocks as well as their digest.
-
-import (
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/services/datamanager/collection"
- "git.curoverse.com/arvados.git/services/datamanager/keep"
- "sort"
-)
-
-// BlockSet is a map of blocks
-type BlockSet map[blockdigest.DigestWithSize]struct{}
-
-// Insert adds a single block to the set.
-func (bs BlockSet) Insert(digest blockdigest.DigestWithSize) {
- bs[digest] = struct{}{}
-}
-
-// Union adds a set of blocks to the set.
-func (bs BlockSet) Union(obs BlockSet) {
- for k, v := range obs {
- bs[k] = v
- }
-}
-
-// CollectionIndexSet is used to save space. To convert to and from
-// the uuid, use collection.ReadCollections' fields
-// CollectionIndexToUUID and CollectionUUIDToIndex.
-type CollectionIndexSet map[int]struct{}
-
-// Insert adds a single collection to the set. The collection is specified by
-// its index.
-func (cis CollectionIndexSet) Insert(collectionIndex int) {
- cis[collectionIndex] = struct{}{}
-}
-
-// ToCollectionIndexSet gets block to collection indices
-func (bs BlockSet) ToCollectionIndexSet(
- readCollections collection.ReadCollections,
- collectionIndexSet *CollectionIndexSet) {
- for block := range bs {
- for _, collectionIndex := range readCollections.BlockToCollectionIndices[block] {
- collectionIndexSet.Insert(collectionIndex)
- }
- }
-}
-
-// ReplicationLevels struct
-// Keeps track of the requested and actual replication levels.
-// Currently this is only used for blocks but could easily be used for
-// collections as well.
-type ReplicationLevels struct {
- // The requested replication level.
- // For Blocks this is the maximum replication level among all the
- // collections this block belongs to.
- Requested int
-
- // The actual number of keep servers this is on.
- Actual int
-}
-
-// ReplicationLevelBlockSetMap maps from replication levels to their blocks.
-type ReplicationLevelBlockSetMap map[ReplicationLevels]BlockSet
-
-// ReplicationLevelBlockCount is an individual entry from ReplicationLevelBlockSetMap
-// which only reports the number of blocks, not which blocks.
-type ReplicationLevelBlockCount struct {
- Levels ReplicationLevels
- Count int
-}
-
-// ReplicationLevelBlockSetSlice is an ordered list of ReplicationLevelBlockCount useful for reporting.
-type ReplicationLevelBlockSetSlice []ReplicationLevelBlockCount
-
-// ReplicationSummary sturct
-type ReplicationSummary struct {
- CollectionBlocksNotInKeep BlockSet
- UnderReplicatedBlocks BlockSet
- OverReplicatedBlocks BlockSet
- CorrectlyReplicatedBlocks BlockSet
- KeepBlocksNotInCollections BlockSet
-
- CollectionsNotFullyInKeep CollectionIndexSet
- UnderReplicatedCollections CollectionIndexSet
- OverReplicatedCollections CollectionIndexSet
- CorrectlyReplicatedCollections CollectionIndexSet
-}
-
-// ReplicationSummaryCounts struct counts the elements in each set in ReplicationSummary.
-type ReplicationSummaryCounts struct {
- CollectionBlocksNotInKeep int
- UnderReplicatedBlocks int
- OverReplicatedBlocks int
- CorrectlyReplicatedBlocks int
- KeepBlocksNotInCollections int
- CollectionsNotFullyInKeep int
- UnderReplicatedCollections int
- OverReplicatedCollections int
- CorrectlyReplicatedCollections int
-}
-
-// GetOrCreate gets the BlockSet for a given set of ReplicationLevels,
-// creating it if it doesn't already exist.
-func (rlbs ReplicationLevelBlockSetMap) GetOrCreate(
- repLevels ReplicationLevels) (bs BlockSet) {
- bs, exists := rlbs[repLevels]
- if !exists {
- bs = make(BlockSet)
- rlbs[repLevels] = bs
- }
- return
-}
-
-// Insert adds a block to the set for a given replication level.
-func (rlbs ReplicationLevelBlockSetMap) Insert(
- repLevels ReplicationLevels,
- block blockdigest.DigestWithSize) {
- rlbs.GetOrCreate(repLevels).Insert(block)
-}
-
-// Union adds a set of blocks to the set for a given replication level.
-func (rlbs ReplicationLevelBlockSetMap) Union(
- repLevels ReplicationLevels,
- bs BlockSet) {
- rlbs.GetOrCreate(repLevels).Union(bs)
-}
-
-// Counts outputs a sorted list of ReplicationLevelBlockCounts.
-func (rlbs ReplicationLevelBlockSetMap) Counts() (
- sorted ReplicationLevelBlockSetSlice) {
- sorted = make(ReplicationLevelBlockSetSlice, len(rlbs))
- i := 0
- for levels, set := range rlbs {
- sorted[i] = ReplicationLevelBlockCount{Levels: levels, Count: len(set)}
- i++
- }
- sort.Sort(sorted)
- return
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Len() int {
- return len(rlbss)
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Less(i, j int) bool {
- return rlbss[i].Levels.Requested < rlbss[j].Levels.Requested ||
- (rlbss[i].Levels.Requested == rlbss[j].Levels.Requested &&
- rlbss[i].Levels.Actual < rlbss[j].Levels.Actual)
-}
-
-// Implemented to meet sort.Interface
-func (rlbss ReplicationLevelBlockSetSlice) Swap(i, j int) {
- rlbss[i], rlbss[j] = rlbss[j], rlbss[i]
-}
-
-// ComputeCounts returns ReplicationSummaryCounts
-func (rs ReplicationSummary) ComputeCounts() (rsc ReplicationSummaryCounts) {
- // TODO(misha): Consider rewriting this method to iterate through
- // the fields using reflection, instead of explictily listing the
- // fields as we do now.
- rsc.CollectionBlocksNotInKeep = len(rs.CollectionBlocksNotInKeep)
- rsc.UnderReplicatedBlocks = len(rs.UnderReplicatedBlocks)
- rsc.OverReplicatedBlocks = len(rs.OverReplicatedBlocks)
- rsc.CorrectlyReplicatedBlocks = len(rs.CorrectlyReplicatedBlocks)
- rsc.KeepBlocksNotInCollections = len(rs.KeepBlocksNotInCollections)
- rsc.CollectionsNotFullyInKeep = len(rs.CollectionsNotFullyInKeep)
- rsc.UnderReplicatedCollections = len(rs.UnderReplicatedCollections)
- rsc.OverReplicatedCollections = len(rs.OverReplicatedCollections)
- rsc.CorrectlyReplicatedCollections = len(rs.CorrectlyReplicatedCollections)
- return rsc
-}
-
-// PrettyPrint ReplicationSummaryCounts
-func (rsc ReplicationSummaryCounts) PrettyPrint() string {
- return fmt.Sprintf("Replication Block Counts:"+
- "\n Missing From Keep: %d, "+
- "\n Under Replicated: %d, "+
- "\n Over Replicated: %d, "+
- "\n Replicated Just Right: %d, "+
- "\n Not In Any Collection: %d. "+
- "\nReplication Collection Counts:"+
- "\n Missing From Keep: %d, "+
- "\n Under Replicated: %d, "+
- "\n Over Replicated: %d, "+
- "\n Replicated Just Right: %d.",
- rsc.CollectionBlocksNotInKeep,
- rsc.UnderReplicatedBlocks,
- rsc.OverReplicatedBlocks,
- rsc.CorrectlyReplicatedBlocks,
- rsc.KeepBlocksNotInCollections,
- rsc.CollectionsNotFullyInKeep,
- rsc.UnderReplicatedCollections,
- rsc.OverReplicatedCollections,
- rsc.CorrectlyReplicatedCollections)
-}
-
-// BucketReplication returns ReplicationLevelBlockSetMap
-func BucketReplication(readCollections collection.ReadCollections,
- keepServerInfo keep.ReadServers) (rlbs ReplicationLevelBlockSetMap) {
- rlbs = make(ReplicationLevelBlockSetMap)
-
- for block, requestedReplication := range readCollections.BlockToDesiredReplication {
- rlbs.Insert(
- ReplicationLevels{
- Requested: requestedReplication,
- Actual: len(keepServerInfo.BlockToServers[block])},
- block)
- }
-
- for block, servers := range keepServerInfo.BlockToServers {
- if 0 == readCollections.BlockToDesiredReplication[block] {
- rlbs.Insert(
- ReplicationLevels{Requested: 0, Actual: len(servers)},
- block)
- }
- }
- return
-}
-
-// SummarizeBuckets reads collections and summarizes
-func (rlbs ReplicationLevelBlockSetMap) SummarizeBuckets(
- readCollections collection.ReadCollections) (
- rs ReplicationSummary) {
- rs.CollectionBlocksNotInKeep = make(BlockSet)
- rs.UnderReplicatedBlocks = make(BlockSet)
- rs.OverReplicatedBlocks = make(BlockSet)
- rs.CorrectlyReplicatedBlocks = make(BlockSet)
- rs.KeepBlocksNotInCollections = make(BlockSet)
-
- rs.CollectionsNotFullyInKeep = make(CollectionIndexSet)
- rs.UnderReplicatedCollections = make(CollectionIndexSet)
- rs.OverReplicatedCollections = make(CollectionIndexSet)
- rs.CorrectlyReplicatedCollections = make(CollectionIndexSet)
-
- for levels, bs := range rlbs {
- if levels.Actual == 0 {
- rs.CollectionBlocksNotInKeep.Union(bs)
- } else if levels.Requested == 0 {
- rs.KeepBlocksNotInCollections.Union(bs)
- } else if levels.Actual < levels.Requested {
- rs.UnderReplicatedBlocks.Union(bs)
- } else if levels.Actual > levels.Requested {
- rs.OverReplicatedBlocks.Union(bs)
- } else {
- rs.CorrectlyReplicatedBlocks.Union(bs)
- }
- }
-
- rs.CollectionBlocksNotInKeep.ToCollectionIndexSet(readCollections,
- &rs.CollectionsNotFullyInKeep)
- // Since different collections can specify different replication
- // levels, the fact that a block is under-replicated does not imply
- // that all collections that it belongs to are under-replicated, but
- // we'll ignore that for now.
- // TODO(misha): Fix this and report the correct set of collections.
- rs.UnderReplicatedBlocks.ToCollectionIndexSet(readCollections,
- &rs.UnderReplicatedCollections)
- rs.OverReplicatedBlocks.ToCollectionIndexSet(readCollections,
- &rs.OverReplicatedCollections)
-
- for i := range readCollections.CollectionIndexToUUID {
- if _, notInKeep := rs.CollectionsNotFullyInKeep[i]; notInKeep {
- } else if _, underReplicated := rs.UnderReplicatedCollections[i]; underReplicated {
- } else if _, overReplicated := rs.OverReplicatedCollections[i]; overReplicated {
- } else {
- rs.CorrectlyReplicatedCollections.Insert(i)
- }
- }
-
- return
-}
+++ /dev/null
-package summary
-
-import (
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/services/datamanager/collection"
- "git.curoverse.com/arvados.git/services/datamanager/keep"
- "reflect"
- "sort"
- "testing"
-)
-
-func BlockSetFromSlice(digests []int) (bs BlockSet) {
- bs = make(BlockSet)
- for _, digest := range digests {
- bs.Insert(blockdigest.MakeTestDigestWithSize(digest))
- }
- return
-}
-
-func CollectionIndexSetFromSlice(indices []int) (cis CollectionIndexSet) {
- cis = make(CollectionIndexSet)
- for _, index := range indices {
- cis.Insert(index)
- }
- return
-}
-
-func (cis CollectionIndexSet) ToSlice() (ints []int) {
- ints = make([]int, len(cis))
- i := 0
- for collectionIndex := range cis {
- ints[i] = collectionIndex
- i++
- }
- sort.Ints(ints)
- return
-}
-
-// Helper method to meet interface expected by older tests.
-func SummarizeReplication(readCollections collection.ReadCollections,
- keepServerInfo keep.ReadServers) (rs ReplicationSummary) {
- return BucketReplication(readCollections, keepServerInfo).
- SummarizeBuckets(readCollections)
-}
-
-// Takes a map from block digest to replication level and represents
-// it in a keep.ReadServers structure.
-func SpecifyReplication(digestToReplication map[int]int) (rs keep.ReadServers) {
- rs.BlockToServers = make(map[blockdigest.DigestWithSize][]keep.BlockServerInfo)
- for digest, replication := range digestToReplication {
- rs.BlockToServers[blockdigest.MakeTestDigestWithSize(digest)] =
- make([]keep.BlockServerInfo, replication)
- }
- return
-}
-
-// Verifies that
-// blocks.ToCollectionIndexSet(rc.BlockToCollectionIndices) returns
-// expectedCollections.
-func VerifyToCollectionIndexSet(
- t *testing.T,
- blocks []int,
- blockToCollectionIndices map[int][]int,
- expectedCollections []int) {
-
- expected := CollectionIndexSetFromSlice(expectedCollections)
-
- rc := collection.ReadCollections{
- BlockToCollectionIndices: map[blockdigest.DigestWithSize][]int{},
- }
- for digest, indices := range blockToCollectionIndices {
- rc.BlockToCollectionIndices[blockdigest.MakeTestDigestWithSize(digest)] = indices
- }
-
- returned := make(CollectionIndexSet)
- BlockSetFromSlice(blocks).ToCollectionIndexSet(rc, &returned)
-
- if !reflect.DeepEqual(returned, expected) {
- t.Errorf("Expected %v.ToCollectionIndexSet(%v) to return \n %v \n but instead received \n %v",
- blocks,
- blockToCollectionIndices,
- expectedCollections,
- returned.ToSlice())
- }
-}
-
-func TestToCollectionIndexSet(t *testing.T) {
- VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{6: {0}}, []int{0})
- VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1}}, []int{1})
- VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1, 9}}, []int{1, 9})
- VerifyToCollectionIndexSet(t, []int{5, 6},
- map[int][]int{5: {2, 3}, 6: {3, 4}},
- []int{2, 3, 4})
- VerifyToCollectionIndexSet(t, []int{5, 6},
- map[int][]int{5: {8}, 6: {4}},
- []int{4, 8})
- VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{5: {0}}, []int{})
-}
-
-func TestSimpleSummary(t *testing.T) {
- rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
- {ReplicationLevel: 1, Blocks: []int{1, 2}},
- })
- rc.Summarize(nil)
- cIndex := rc.CollectionIndicesForTesting()
-
- keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1})
-
- expectedSummary := ReplicationSummary{
- CollectionBlocksNotInKeep: BlockSet{},
- UnderReplicatedBlocks: BlockSet{},
- OverReplicatedBlocks: BlockSet{},
- CorrectlyReplicatedBlocks: BlockSetFromSlice([]int{1, 2}),
- KeepBlocksNotInCollections: BlockSet{},
-
- CollectionsNotFullyInKeep: CollectionIndexSet{},
- UnderReplicatedCollections: CollectionIndexSet{},
- OverReplicatedCollections: CollectionIndexSet{},
- CorrectlyReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[0]}),
- }
-
- returnedSummary := SummarizeReplication(rc, keepInfo)
-
- if !reflect.DeepEqual(returnedSummary, expectedSummary) {
- t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v", expectedSummary, returnedSummary)
- }
-}
-
-func TestMissingBlock(t *testing.T) {
- rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
- {ReplicationLevel: 1, Blocks: []int{1, 2}},
- })
- rc.Summarize(nil)
- cIndex := rc.CollectionIndicesForTesting()
-
- keepInfo := SpecifyReplication(map[int]int{1: 1})
-
- expectedSummary := ReplicationSummary{
- CollectionBlocksNotInKeep: BlockSetFromSlice([]int{2}),
- UnderReplicatedBlocks: BlockSet{},
- OverReplicatedBlocks: BlockSet{},
- CorrectlyReplicatedBlocks: BlockSetFromSlice([]int{1}),
- KeepBlocksNotInCollections: BlockSet{},
-
- CollectionsNotFullyInKeep: CollectionIndexSetFromSlice([]int{cIndex[0]}),
- UnderReplicatedCollections: CollectionIndexSet{},
- OverReplicatedCollections: CollectionIndexSet{},
- CorrectlyReplicatedCollections: CollectionIndexSet{},
- }
-
- returnedSummary := SummarizeReplication(rc, keepInfo)
-
- if !reflect.DeepEqual(returnedSummary, expectedSummary) {
- t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v",
- expectedSummary,
- returnedSummary)
- }
-}
-
-func TestUnderAndOverReplicatedBlocks(t *testing.T) {
- rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
- {ReplicationLevel: 2, Blocks: []int{1, 2}},
- })
- rc.Summarize(nil)
- cIndex := rc.CollectionIndicesForTesting()
-
- keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 3})
-
- expectedSummary := ReplicationSummary{
- CollectionBlocksNotInKeep: BlockSet{},
- UnderReplicatedBlocks: BlockSetFromSlice([]int{1}),
- OverReplicatedBlocks: BlockSetFromSlice([]int{2}),
- CorrectlyReplicatedBlocks: BlockSet{},
- KeepBlocksNotInCollections: BlockSet{},
-
- CollectionsNotFullyInKeep: CollectionIndexSet{},
- UnderReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[0]}),
- OverReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[0]}),
- CorrectlyReplicatedCollections: CollectionIndexSet{},
- }
-
- returnedSummary := SummarizeReplication(rc, keepInfo)
-
- if !reflect.DeepEqual(returnedSummary, expectedSummary) {
- t.Fatalf("Expected returnedSummary to look like %+v but instead it is %+v",
- expectedSummary,
- returnedSummary)
- }
-}
-
-func TestMixedReplication(t *testing.T) {
- rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
- {ReplicationLevel: 1, Blocks: []int{1, 2}},
- {ReplicationLevel: 1, Blocks: []int{3, 4}},
- {ReplicationLevel: 2, Blocks: []int{5, 6}},
- })
- rc.Summarize(nil)
- cIndex := rc.CollectionIndicesForTesting()
-
- keepInfo := SpecifyReplication(map[int]int{1: 1, 2: 1, 3: 1, 5: 1, 6: 3, 7: 2})
-
- expectedSummary := ReplicationSummary{
- CollectionBlocksNotInKeep: BlockSetFromSlice([]int{4}),
- UnderReplicatedBlocks: BlockSetFromSlice([]int{5}),
- OverReplicatedBlocks: BlockSetFromSlice([]int{6}),
- CorrectlyReplicatedBlocks: BlockSetFromSlice([]int{1, 2, 3}),
- KeepBlocksNotInCollections: BlockSetFromSlice([]int{7}),
-
- CollectionsNotFullyInKeep: CollectionIndexSetFromSlice([]int{cIndex[1]}),
- UnderReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[2]}),
- OverReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[2]}),
- CorrectlyReplicatedCollections: CollectionIndexSetFromSlice([]int{cIndex[0]}),
- }
-
- returnedSummary := SummarizeReplication(rc, keepInfo)
-
- if !reflect.DeepEqual(returnedSummary, expectedSummary) {
- t.Fatalf("Expected returnedSummary to look like: \n%+v but instead it is: \n%+v. Index to UUID is %v. BlockToCollectionIndices is %v.", expectedSummary, returnedSummary, rc.CollectionIndexToUUID, rc.BlockToCollectionIndices)
- }
-}
+++ /dev/null
-// Code for generating trash lists
-
-package summary
-
-import (
- "errors"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/services/datamanager/keep"
- "time"
-)
-
-// BuildTrashLists builds list of blocks to be sent to trash queue
-func BuildTrashLists(kc *keepclient.KeepClient,
- keepServerInfo *keep.ReadServers,
- keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList, err error) {
-
- // Servers that are writeable
- writableServers := map[string]struct{}{}
- for _, url := range kc.WritableLocalRoots() {
- writableServers[url] = struct{}{}
- }
-
- _ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
- if err != nil {
- return nil, errors.New(fmt.Sprintf("Failed to get blobSignatureTtl, can't build trash lists: %v", err))
- }
-
- ttl := int64(_ttl.(float64))
-
- // expire unreferenced blocks more than "ttl" seconds old.
- expiry := time.Now().UTC().UnixNano() - ttl*1e9
-
- return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
-}
-
-func buildTrashListsInternal(writableServers map[string]struct{},
- keepServerInfo *keep.ReadServers,
- expiry int64,
- keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
-
- m = make(map[string]keep.TrashList)
-
- for block := range keepBlocksNotInCollections {
- for _, blockOnServer := range keepServerInfo.BlockToServers[block] {
- if blockOnServer.Mtime >= expiry {
- continue
- }
-
- // block is older than expire cutoff
- srv := keepServerInfo.KeepServerIndexToAddress[blockOnServer.ServerIndex].String()
-
- if _, writable := writableServers[srv]; !writable {
- continue
- }
-
- m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: blockOnServer.Mtime})
- }
- }
- return
-
-}
+++ /dev/null
-package summary
-
-import (
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/services/datamanager/keep"
- . "gopkg.in/check.v1"
- "testing"
-)
-
-// Gocheck boilerplate
-func TestTrash(t *testing.T) {
- TestingT(t)
-}
-
-type TrashSuite struct{}
-
-var _ = Suite(&TrashSuite{})
-
-func (s *TrashSuite) TestBuildTrashLists(c *C) {
- var sv0 = keep.ServerAddress{Host: "keep0.example.com", Port: 80}
- var sv1 = keep.ServerAddress{Host: "keep1.example.com", Port: 80}
-
- var block0 = blockdigest.MakeTestDigestWithSize(0xdeadbeef)
- var block1 = blockdigest.MakeTestDigestWithSize(0xfedbeef)
-
- var keepServerInfo = keep.ReadServers{
- KeepServerIndexToAddress: []keep.ServerAddress{sv0, sv1},
- BlockToServers: map[blockdigest.DigestWithSize][]keep.BlockServerInfo{
- block0: {
- {0, 99},
- {1, 101}},
- block1: {
- {0, 99},
- {1, 101}}}}
-
- // only block0 is in delete set
- var bs = make(BlockSet)
- bs[block0] = struct{}{}
-
- // Test trash list where only sv0 is on writable list.
- c.Check(buildTrashListsInternal(
- map[string]struct{}{
- sv0.URL(): {}},
- &keepServerInfo,
- 110,
- bs),
- DeepEquals,
- map[string]keep.TrashList{
- "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
-
- // Test trash list where both sv0 and sv1 are on writable list.
- c.Check(buildTrashListsInternal(
- map[string]struct{}{
- sv0.URL(): {},
- sv1.URL(): {}},
- &keepServerInfo,
- 110,
- bs),
- DeepEquals,
- map[string]keep.TrashList{
- "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}},
- "http://keep1.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
-
- // Test trash list where only block on sv0 is expired
- c.Check(buildTrashListsInternal(
- map[string]struct{}{
- sv0.URL(): {},
- sv1.URL(): {}},
- &keepServerInfo,
- 100,
- bs),
- DeepEquals,
- map[string]keep.TrashList{
- "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
-
-}
--- /dev/null
+import pkg_resources
+
+__version__ = pkg_resources.require('arvados_fuse')[0].version
import arvados.commands._util as arv_cmd
from arvados_fuse import crunchstat
from arvados_fuse import *
+from arvados_fuse._version import __version__
class ArgumentParser(argparse.ArgumentParser):
def __init__(self):
mountpoint before --exec, or mark the end of your --exec arguments
with "--".
""")
+ self.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
self.add_argument('mountpoint', type=str, help="""Mount point.""")
self.add_argument('--allow-other', action='store_true',
help="""Let other users read the mount""")
'arvados-python-client >= 0.1.20151118035730',
'llfuse==0.41.1',
'python-daemon',
- 'ciso8601'
+ 'ciso8601',
+ 'setuptools'
],
test_suite='tests',
tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
import arvados_fuse.command
import contextlib
import functools
+import io
import json
import llfuse
import logging
ent = ent[p]
return ent
+ @contextlib.contextmanager
+ def stderrMatches(self, stderr):
+ orig, sys.stderr = sys.stderr, stderr
+ try:
+ yield
+ finally:
+ sys.stderr = orig
+
def check_ent_type(self, cls, *path):
ent = self.lookup(self.mnt, *path)
self.assertEqual(ent.__class__, cls)
run_test_server.fixture('users')['active']['uuid'])
self.assertEqual(True, self.mnt.listen_for_events)
+ def test_version_argument(self):
+ orig, sys.stderr = sys.stderr, io.BytesIO()
+ with self.assertRaises(SystemExit):
+ args = arvados_fuse.command.ArgumentParser().parse_args(['--version'])
+ self.assertRegexpMatches(sys.stderr.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+ sys.stderr = orig
+
@noexit
@mock.patch('arvados.events.subscribe')
def test_disable_event_listening(self, mock_subscribe):
}
if len(errs) > 0 {
// Some other goroutine encountered an
- // error -- any futher effort here
+ // error -- any further effort here
// will be wasted.
return
}
"encoding/json"
"fmt"
"io/ioutil"
+ "net/http"
+ "net/http/httptest"
"os"
"time"
c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
}
+type blockingHandler struct {
+ requested chan *http.Request
+ unblock chan struct{}
+}
+
+func (h *blockingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if h.requested != nil {
+ h.requested <- r
+ }
+ if h.unblock != nil {
+ <-h.unblock
+ }
+ http.Error(w, "nothing here", http.StatusNotFound)
+}
+
+func (s *StubbedS3Suite) TestGetContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := make([]byte, 3)
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
+ _, err := v.Get(ctx, loc, buf)
+ return err
+ })
+}
+
+func (s *StubbedS3Suite) TestCompareContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := []byte("bar")
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
+ return v.Compare(ctx, loc, buf)
+ })
+}
+
+func (s *StubbedS3Suite) TestPutContextCancel(c *check.C) {
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ buf := []byte("foo")
+
+ s.testContextCancel(c, func(ctx context.Context, v *TestableS3Volume) error {
+ return v.Put(ctx, loc, buf)
+ })
+}
+
+func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Context, *TestableS3Volume) error) {
+ handler := &blockingHandler{}
+ srv := httptest.NewServer(handler)
+ defer srv.Close()
+
+ v := s.newTestableVolume(c, 5*time.Minute, false, 2)
+ vol := *v.S3Volume
+ vol.Endpoint = srv.URL
+ v = &TestableS3Volume{S3Volume: &vol}
+ v.Start()
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ handler.requested = make(chan *http.Request)
+ handler.unblock = make(chan struct{})
+ defer close(handler.unblock)
+
+ doneFunc := make(chan struct{})
+ go func() {
+ err := testFunc(ctx, v)
+ c.Check(err, check.Equals, context.Canceled)
+ close(doneFunc)
+ }()
+
+ timeout := time.After(10 * time.Second)
+
+ // Wait for the stub server to receive a request, meaning
+ // Get() is waiting for an s3 operation.
+ select {
+ case <-timeout:
+ c.Fatal("timed out waiting for test func to call our handler")
+ case <-doneFunc:
+ c.Fatal("test func finished without even calling our handler!")
+ case <-handler.requested:
+ }
+
+ cancel()
+
+ select {
+ case <-timeout:
+ c.Fatal("timed out")
+ case <-doneFunc:
+ }
+}
+
func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
defer func(tl, bs arvados.Duration) {
theConfig.TrashLifetime = tl
srv, err := s3test.NewServer(&s3test.Config{Clock: clock})
c.Assert(err, check.IsNil)
- tmp, err := ioutil.TempFile("", "keepstore")
- c.Assert(err, check.IsNil)
- defer os.Remove(tmp.Name())
- _, err = tmp.Write([]byte("xxx\n"))
- c.Assert(err, check.IsNil)
- c.Assert(tmp.Close(), check.IsNil)
-
v := &TestableS3Volume{
S3Volume: &S3Volume{
Bucket: TestBucketName,
- AccessKeyFile: tmp.Name(),
- SecretKeyFile: tmp.Name(),
Endpoint: srv.URL(),
Region: "test-region-1",
LocationConstraint: true,
ReadOnly: readonly,
IndexPageSize: 1000,
},
+ c: c,
server: srv,
serverClock: clock,
}
- c.Assert(v.Start(), check.IsNil)
+ v.Start()
err = v.bucket.PutBucket(s3.ACL("private"))
c.Assert(err, check.IsNil)
return v
}
+func (v *TestableS3Volume) Start() error {
+ tmp, err := ioutil.TempFile("", "keepstore")
+ v.c.Assert(err, check.IsNil)
+ defer os.Remove(tmp.Name())
+ _, err = tmp.Write([]byte("xxx\n"))
+ v.c.Assert(err, check.IsNil)
+ v.c.Assert(tmp.Close(), check.IsNil)
+
+ v.S3Volume.AccessKeyFile = tmp.Name()
+ v.S3Volume.SecretKeyFile = tmp.Name()
+
+ v.c.Assert(v.S3Volume.Start(), check.IsNil)
+ return nil
+}
+
// PutRaw skips the ContentMD5 test
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
--- /dev/null
+import pkg_resources
+
+__version__ = pkg_resources.require('arvados-node-manager')[0].version
create_kwargs = create_kwargs.copy()
create_kwargs.setdefault('external_ip', None)
create_kwargs.setdefault('ex_metadata', {})
+ self._project = auth_kwargs.get("project")
super(ComputeNodeDriver, self).__init__(
auth_kwargs, list_kwargs, create_kwargs,
driver_class)
def _init_image(self, image_name):
return 'image', self.search_for(
- image_name, 'list_images', self._name_key)
+ image_name, 'list_images', self._name_key, ex_project=self._project)
def _init_network(self, network_name):
return 'ex_network', self.search_for(
from .jobqueue import JobQueueMonitorActor, ServerCalculator
from .nodelist import ArvadosNodeListMonitorActor, CloudNodeListMonitorActor
from .timedcallback import TimedCallBackActor
+from ._version import __version__
node_daemon = None
parser = argparse.ArgumentParser(
prog='arvados-node-manager',
description="Dynamically allocate Arvados cloud compute nodes")
+ parser.add_argument(
+ '--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
parser.add_argument(
'--foreground', action='store_true', default=False,
help="Run in the foreground. Don't daemonize.")
'arvados-python-client>=0.1.20150206225333',
'pykka',
'python-daemon',
+ 'setuptools'
],
dependency_links = [
"https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev4.zip"
--- /dev/null
+#!/usr/bin/env python
+
+import io
+import os
+import sys
+import tempfile
+import unittest
+
+import arvnodeman.launcher as nodeman
+from . import testutil
+
+class ArvNodemArgumentsTestCase(unittest.TestCase):
+ def run_nodeman(self, args):
+ return nodeman.main(args)
+
+ def test_unsupported_arg(self):
+ with self.assertRaises(SystemExit):
+ self.run_nodeman(['-x=unknown'])
+
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with testutil.redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_nodeman(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
from __future__ import absolute_import, print_function
+import contextlib
import datetime
+import mock
+import pykka
+import sys
import threading
import time
import libcloud.common.types as cloud_types
-import mock
-import pykka
from . import pykka_timeout
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)
+@contextlib.contextmanager
+def redirected_streams(stdout=None, stderr=None):
+ orig_stdout, sys.stdout = sys.stdout, stdout or sys.stdout
+ orig_stderr, sys.stderr = sys.stderr, stderr or sys.stderr
+ try:
+ yield
+ finally:
+ sys.stdout = orig_stdout
+ sys.stderr = orig_stderr
+
+
class MockShutdownTimer(object):
def _set_state(self, is_open, next_opening):
self.window_open = lambda: is_open