https://arvados.org
The Arvados public wiki is located at
- https://arvados.org/projects/arvados/wiki
+ https://dev.arvados.org/projects/arvados/wiki
The Arvados public bug tracker is located at
- https://arvados.org/projects/arvados/issues
+ https://dev.arvados.org/projects/arvados/issues
For support see
http://doc.arvados.org/user/getting_started/community.html
rack (>= 1.0.0)
rack-test (>= 0.5.4)
xpath (~> 2.0)
- childprocess (0.5.5)
+ childprocess (0.5.6)
ffi (~> 1.0, >= 1.0.11)
cliver (0.3.2)
coffee-rails (4.1.0)
fast_stack (0.1.0)
rake
rake-compiler
- ffi (1.9.6)
+ ffi (1.9.10)
flamegraph (0.1.0)
fast_stack
google-api-client (0.6.4)
metaclass (~> 0.0.1)
morrisjs-rails (0.5.1)
railties (> 3.1, < 5)
- multi_json (1.11.1)
+ multi_json (1.11.2)
multipart-post (1.2.0)
net-scp (1.2.1)
net-ssh (>= 2.6.5)
ref (1.0.5)
ruby-debug-passenger (0.2.0)
ruby-prof (0.15.2)
- rubyzip (1.1.6)
+ rubyzip (1.1.7)
rvm-capistrano (1.5.5)
capistrano (~> 2.15.4)
sass (3.4.9)
sprockets (>= 2.8, < 4.0)
sprockets-rails (>= 2.0, < 4.0)
tilt (~> 1.1)
- selenium-webdriver (2.44.0)
+ selenium-webdriver (2.48.1)
childprocess (~> 0.5)
multi_json (~> 1.0)
rubyzip (~> 1.0)
execjs (>= 0.3.0)
json (>= 1.8.0)
uuidtools (2.1.5)
- websocket (1.2.1)
+ websocket (1.2.2)
websocket-driver (0.5.1)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.1)
therubyracer
uglifier (>= 1.0.3)
wiselinks
+
+BUNDLED WITH
+ 1.10.6
--- /dev/null
+$(document).on('shown.bs.modal', '#add-group-modal', function(event) {
+ // Disable the submit button on modal loading
+ $submit = $('#add-group-submit');
+ $submit.prop('disabled', true);
+
+ $('input[type=text]', event.target).val('');
+ $('#add-group-error', event.target).hide();
+}).on('input propertychange', '#group_name_input', function(event) {
+ group_name = $(event.target).val();
+ $submit = $('#add-group-submit');
+ $submit.prop('disabled', (group_name === null || group_name === ""));
+}).on('submit', '#add-group-form', function(event) {
+ var $form = $(event.target),
+ $submit = $(':submit', $form),
+ $error = $('#add-group-error', $form),
+ group_name = $('input[name="group_name_input"]', $form).val();
+
+ $submit.prop('disabled', true);
+
+ $error.hide();
+ $.ajax('/groups',
+ {method: 'POST',
+ dataType: 'json',
+ data: {group: {name: group_name, group_class: 'role'}},
+ context: $form}).
+ done(function(data, status, jqxhr) {
+ location.reload();
+ }).
+ fail(function(jqxhr, status, error) {
+ var errlist = jqxhr.responseJSON.errors;
+ var errmsg;
+ if (Array.isArray(errlist)) {
+ errmsg = errlist.join();
+ } else {
+ errmsg = ("The server returned an error when creating " +
+ "this group (status " + jqxhr.status +
+ ": " + errlist + ").");
+ }
+ $error.text(errmsg);
+ $error.show();
+ $submit.prop('disabled', false);
+ });
+ return false;
+});
def textile_attributes
[ 'description' ]
end
+
+ def self.creatable?
+ false
+ end
end
class KeepDisk < ArvadosBase
def self.creatable?
- current_user and current_user.is_admin
+ false
end
end
class KeepService < ArvadosBase
def self.creatable?
- current_user and current_user.is_admin
+ false
end
end
result = arvados_api_client.api("permissions", "/#{uuid}")
arvados_api_client.unpack_api_response(result)
end
+
+ def self.creatable?
+ false
+ end
end
class Node < ArvadosBase
def self.creatable?
- current_user and current_user.is_admin
+ false
end
def friendly_link_name lookup=nil
(hostname && !hostname.empty?) ? hostname : uuid
def deletable?
false
end
+
+ def self.creatable?
+ current_user and current_user.is_admin
+ end
end
class VirtualMachine < ArvadosBase
attr_accessor :current_user_logins
+
def self.creatable?
- current_user.andand.is_admin
+ false
end
+
def attributes_for_display
super.append ['current_user_logins', @current_user_logins]
end
+
def editable_attributes
super - %w(current_user_logins)
end
+
def self.attribute_info
merger = ->(k,a,b) { a.merge(b, &merger) }
merger [nil,
{current_user_logins: {column_heading: "logins", type: 'array'}},
super]
end
+
def friendly_link_name lookup=nil
(hostname && !hostname.empty?) ? hostname : uuid
end
</li><li>
<strong>Use existing pipelines</strong>: Use best-practices pipelines on your own data with the click of a button.
</li><li>
- <strong>Open-source</strong>: Arvados is completely open-source. Check out our <a href="http://arvados.org">developer site</a>.
+ <strong>Open source</strong>: Arvados is completely open source. Check out our <a href="http://dev.arvados.org">developer site</a>.
</li>
</ol>
<p style="margin-top: 1em;">
--- /dev/null
+<div class="modal" id="add-group-modal" tabindex="-1" role="dialog" aria-labelledby="add-group-label" aria-hidden="true">
+ <div class="modal-dialog">
+ <div class="modal-content">
+ <form id="add-group-form">
+ <div class="modal-header">
+ <button type="button" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">×</span></button>
+ <h4 class="modal-title" id="add-group-label">Add new group</h4>
+ </div>
+ <div class="modal-body form-horizontal">
+ <div class="form-group">
+ <label for="group_name_input" class="col-sm-1 control-label">Name</label>
+ <div class="col-sm-9">
+ <div class="input-group-name">
+ <input type="text" class="form-control" id="group_name_input" name="group_name_input" placeholder="Enter group name"/>
+ </div>
+ </div>
+ </div>
+ <p id="add-group-error" class="alert alert-danger"></p>
+ </div>
+ <div class="modal-footer">
+ <button type="button" class="btn btn-default" data-dismiss="modal">Cancel</button>
+ <input type="submit" class="btn btn-primary" id="add-group-submit" name="submit" value="Create">
+ </div>
+ </form>
+ </div>
+ </div>
+</div>
<div class="panel panel-default">
<div class="panel-heading">
Group memberships
+
+ <div class="pull-right">
+ <%= link_to raw('<i class="fa fa-plus"></i> Add new group'), "#",
+ {class: 'btn btn-xs btn-primary', 'data-toggle' => "modal",
+ 'data-target' => '#add-group-modal'} %>
+ </div>
</div>
<div class="panel-body">
<div class="alert alert-info">
</form>
</div>
<div class="panel-footer">
- To manage these groups (roles), use:
+ These groups (roles) can also be managed from the command line. For example:
<ul>
<li><code>arv group create \<br/>--group '{"group_class":"role","name":"New group"}'</code></li>
<li><code>arv group list \<br/>--filters '[["group_class","=","role"]]' \<br/>--select '["uuid","name"]'</code></li>
</div>
<div id="user-setup-modal-window" class="modal fade" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true"></div>
+<%= render partial: "add_group_modal" %>
end
[
- ['Repositories',nil,'s0uqq'],
- ['Virtual machines','virtual machine','current_user_logins'],
- ['SSH keys',nil,'public_key'],
- ['Links','link','link_class'],
- ['Groups','group','group_class'],
- ['Compute nodes','node','info[ping_secret'],
- ['Keep services','keep service','service_ssl_flag'],
- ['Keep disks', 'keep disk','bytes_free'],
+ ['Repositories', nil, 's0uqq'],
+ ['Virtual machines', nil, 'testvm.shell'],
+ ['SSH keys', nil, 'public_key'],
+ ['Links', nil, 'link_class'],
+ ['Groups', nil, 'All users'],
+ ['Compute nodes', nil, 'ping_secret'],
+ ['Keep services', nil, 'service_ssl_flag'],
+ ['Keep disks', nil, 'bytes_free'],
].each do |page_name, add_button_text, look_for|
test "test system menu #{page_name} link" do
visit page_with_token('admin')
page.html =~ /\b(#{matching_stamps})\+[0-9A-Fa-f]{8}\b/
end
- # We use API tokens with limited scopes as the quickest way to get the API
- # server to return an error. If Workbench gets smarter about coping when
- # it has a too-limited token, these tests will need to be adjusted.
- test "API error page includes error token" do
- start_stamp = now_timestamp
- visit(page_with_token("active_readonly", "/groups"))
- click_on "Add a new group"
- assert(page.has_text?(/fiddlesticks/i),
- "Not on an error page after making a group out of scope")
- assert(page_has_error_token?(start_stamp), "no error token on 404 page")
- end
-
test "showing a bad UUID returns 404" do
visit(page_with_token("active", "/pipeline_templates/zzz"))
assert(page.has_no_text?(/fiddlesticks/i),
click_link 'Metadata'
assert page.has_text? 'VirtualMachine: testvm.shell'
end
+
+ test "test add group button" do
+ need_javascript
+
+ user_url = "/users/#{api_fixture('users')['active']['uuid']}"
+ visit page_with_token('admin_trustedclient', user_url)
+
+ # Setup user
+ click_link 'Admin'
+ assert page.has_text? 'As an admin, you can setup'
+
+ click_link 'Add new group'
+
+ within '.modal-content' do
+ fill_in "group_name_input", :with => "test-group-added-in-modal"
+ click_button "Create"
+ end
+ wait_for_ajax
+
+ # Back in the user "Admin" tab
+ assert page.has_text? 'test-group-added-in-modal'
+ end
end
require 'integration_helper'
class VirtualMachinesTest < ActionDispatch::IntegrationTest
- test "make and name a new virtual machine" do
- need_javascript
- visit page_with_token('admin_trustedclient')
- find('#system-menu').click
- click_link 'Virtual machines'
- assert page.has_text? 'testvm.shell'
- click_on 'Add a new virtual machine'
- find('tr', text: 'hostname').
- find('a[data-original-title=edit]').click
- assert page.has_text? 'Edit hostname'
- fill_in 'editable-text', with: 'testname'
- click_button 'editable-submit'
- assert page.has_text? 'testname'
- end
end
- install/install-shell-server.html.textile.liquid
- install/create-standard-objects.html.textile.liquid
- install/install-keepstore.html.textile.liquid
+ - install/configure-azure-blob-storage.html.textile.liquid
- install/install-keepproxy.html.textile.liquid
#- install/install-keep-web.html.textile.liquid
- install/install-crunch-dispatch.html.textile.liquid
{
"name":"run-command example pipeline",
"components":{
- "bwa-mem": {
+ "bwa-mem": {
"script": "run-command",
"script_version": "master",
"repository": "arvados",
"script_parameters": {
"command": [
- "bwa",
+ "$(dir $(bwa_collection))/bwa",
"mem",
"-t",
"$(node.cores)",
+ "-R",
+ "@RG\\\tID:group_id\\\tPL:illumina\\\tSM:sample_id",
"$(glob $(dir $(reference_collection))/*.fasta)",
"$(glob $(dir $(sample))/*_1.fastq)",
"$(glob $(dir $(sample))/*_2.fastq)"
],
- "task.stdout": "$(basename $(glob $(dir $(sample))/*_1.fastq)).sam",
"reference_collection": {
"required": true,
"dataclass": "Collection"
},
+ "bwa_collection": {
+ "required": true,
+ "dataclass": "Collection",
+ "default": "39c6f22d40001074f4200a72559ae7eb+5745"
+ },
"sample": {
"required": true,
"dataclass": "Collection"
- }
+ },
+ "task.stdout": "$(basename $(glob $(dir $(sample))/*_1.fastq)).sam"
}
}
}
h1(#login). Using SSH to log into an Arvados VM
-To see a list of virtual machines that you have access to and determine the name and login information, click on the dropdown menu icon <span class="fa fa-lg fa-user"></span> <span class="caret"></span> in the upper right corner of the top navigation menu to access the user settings menu and click on the menu item *Virtual machines* to go to the Virtual machines page. This page lists the virtual machines you can access. The *hostname* column lists the name of each available VM. The *logins* column will have a list of comma separated values of the form @you@. In this guide the hostname will be *_shell_* and the login will be *_you_*. Replace these with your hostname and login name as appropriate.
+To see a list of virtual machines that you have access to and determine the name and login information, click on the dropdown menu icon <span class="fa fa-lg fa-user"></span> <span class="caret"></span> in the upper right corner of the top navigation menu to access the user settings menu and click on the menu item *Virtual machines* to go to the Virtual machines page. This page lists the virtual machines you can access. The *Host name* column lists the name of each available VM. The *Login name* column will have a list of comma separated values of the form @you@. In this guide the hostname will be *_shell_* and the login will be *_you_*. Replace these with your hostname and login name as appropriate.
"-t",
"$(node.cores)",
"-R",
- "@RG\\tID:group_id\\tPL:illumina\\tSM:sample_id",
+ "@RG\\\tID:group_id\\\tPL:illumina\\\tSM:sample_id",
"$(glob $(dir $(reference_collection))/*.fasta)",
"$(glob $(dir $(sample))/*_1.fastq)",
"$(glob $(dir $(sample))/*_2.fastq)"
table(table table-bordered table-condensed).
|_. Key|_. Type|_. Description|_. Implemented|
|arvados_sdk_version|string|The Git version of the SDKs to use from the Arvados git repository. See "Specifying Git versions":#script_version for more detail about acceptable ways to specify a commit. If you use this, you must also specify a @docker_image@ constraint (see below). In order to install the Python SDK successfully, Crunch must be able to find and run virtualenv inside the container.|✓|
-|docker_image|string|The Docker image that this Job needs to run. If specified, Crunch will create a Docker container from this image, and run the Job's script inside that. The Keep mount and work directories will be available as volumes inside this container. The image must be uploaded to Arvados using @arv keep docker@. You may specify the image in any format that Docker accepts, such as @arvados/jobs@, @debian:latest@, or the Docker image id. Alternatively, you may specify the UUID or portable data hash of the image Collection, returned by @arv keep docker@.|✓|
+|docker_image|string|The Docker image that this Job needs to run. If specified, Crunch will create a Docker container from this image, and run the Job's script inside that. The Keep mount and work directories will be available as volumes inside this container. The image must be uploaded to Arvados using @arv keep docker@. You may specify the image in any format that Docker accepts, such as @arvados/jobs@, @debian:latest@, or the Docker image id. Alternatively, you may specify the portable data hash of the image Collection.|✓|
|min_nodes|integer||✓|
|max_nodes|integer|||
|min_cores_per_node|integer|Require that each node assigned to this Job have the specified number of CPU cores|✓|
<div class="col-sm-6" style="border-left: solid; border-width: 1px">
<p><strong>Quickstart</strong>
<p>
- Try any pipeline from the <a href="https://dev.arvados.org/projects/arvados/wiki/Public_Pipelines_and_Datasets">list of public pipelines</a>. For instance, the <a href="http://curover.se/pathomap">Pathomap Pipeline</a> links to these <a href="https://dev.arvados.org/projects/arvados/wiki/pathomap_tutorial/">step-by-step instructions</a> for trying Arvados out right in your browser using Curoverse's <a href="http://lp.curoverse.com/beta-signup/">public Arvados instance</a>.
+ Try any pipeline from the <a href="https://cloud.curoverse.com/projects/public">list of public pipelines</a>. For instance, the <a href="http://curover.se/pathomap">Pathomap Pipeline</a> links to these <a href="https://dev.arvados.org/projects/arvados/wiki/pathomap_tutorial/">step-by-step instructions</a> for trying Arvados out right in your browser using Curoverse's <a href="http://lp.curoverse.com/beta-signup/">public Arvados instance</a>.
</p>
<!--<p>-->
<!--<ol>-->
--- /dev/null
+---
+layout: default
+navsection: installguide
+title: Configure Azure Blob storage
+...
+
+As an alternative to local and network-attached POSIX filesystems, Keepstore can store data in an Azure Storage container.
+
+h2. Create a container
+
+Normally, all keepstore services are configured to share a single Azure Storage container.
+
+Using the Azure web portal or command line tool, create or choose a storage account with a suitable redundancy profile and availability region. Use the storage account keys to create a new container.
+
+<notextile>
+<pre><code>~$ <span class="userinput">azure config mode arm</span>
+~$ <span class="userinput">azure login</span>
+~$ <span class="userinput">azure group create exampleGroupName eastus</span>
+~$ <span class="userinput">azure storage account create --type LRS --location eastus --resource-group exampleGroupName exampleStorageAccountName</span>
+~$ <span class="userinput">azure storage account keys list --resource-group exampleGroupName exampleStorageAccountName</span>
+info: Executing command storage account keys list
++ Getting storage account keys
+data: Primary: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==
+data: Secondary: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy==
+info: storage account keys list command OK
+~$ <span class="userinput">AZURE_STORAGE_ACCOUNT="exampleStorageAccountName" \
+AZURE_STORAGE_ACCESS_KEY="zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==" \
+azure storage container create exampleContainerName</span>
+</code></pre>
+</notextile>
+
+h2. Configure keepstore
+
+Copy the primary storage account key to a file where it will be accessible to keepstore at startup time.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sh -c 'cat >/etc/sv/keepstore/exampleStorageAccountName.key <<EOF'
+zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==
+EOF</span>
+~$ <span class="userinput">sudo chmod 0400 /etc/sv/keepstore/exampleStorageAccountName.key</span>
+</code></pre>
+</notextile>
+
+In your keepstore startup script, instead of specifying a local storage using @-volume /path@ or discovering mount points automatically, use @-azure-*@ arguments to specify the storage container:
+
+<notextile>
+<pre><code>#!/bin/sh
+
+exec 2>&1
+exec keepstore \
+ -azure-storage-account-key-file <span class="userinput">/etc/sv/keepstore/exampleStorageAccountName.key</span> \
+ -azure-storage-account-name <span class="userinput">exampleStorageAccountName</span> \
+ -azure-storage-container-volume <span class="userinput">exampleContainerName</span>
+</code></pre>
+</notextile>
+
+Start (or restart) keepstore, and check its log file to confirm it is using the new configuration.
+
+<notextile>
+<pre><code>2015/10/26 21:06:24 Using volume azure-storage-container:"exampleContainerName" (writable=true)
+</code></pre>
+</notextile>
<pre><code>~$ <span class="userinput">keepstore -h</span>
2015/05/08 13:41:16 keepstore starting, pid 2565
Usage of ./keepstore:
+ -azure-storage-account-key-file="": File containing the account key used for subsequent --azure-storage-container-volume arguments.
+ -azure-storage-account-name="": Azure storage account name used for subsequent --azure-storage-container-volume arguments.
+ -azure-storage-container-volume=[]: Use the given container as a storage volume. Can be given multiple times.
+ -azure-storage-replication=3: Replication level to report to clients when data is stored in an Azure container.
-blob-signature-ttl=1209600: Lifetime of blob permission signatures. See services/api/config/application.default.yml.
-blob-signing-key-file="": File containing the secret key for generating and verifying blob permission signatures.
-data-manager-token-file="": File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
</code></pre>
</notextile>
-If you want access control on your Keepstore server(s), you must specify the @-enforce-permissions@ flag and provide a signing key. The @-blob-signing-key-file@ argument should be a file containing a long random alphanumeric string with no internal line breaks (it is also possible to use a socket or FIFO: keepstore reads it only once, at startup). This key must be the same as the @blob_signing_key@ configured in the "API server":install-api-server.html config/application.yml file.
+h3. Prepare storage volumes
-The @-max-buffers@ argument can be used to restrict keepstore's memory use. By default, keepstore will allocate no more than 128 blocks (8 GiB) worth of data buffers at a time. Normally this should be set as high as possible without risking swapping.
+{% include 'notebox_begin' %}
+This section uses a local filesystem as a backing store. If you are using Azure Storage, follow the setup instructions on the "Azure Blob Storage":configure-azure-blob-storage.html page instead.
+{% include 'notebox_end' %}
-Prepare one or more volumes for Keepstore to use. Simply create a /keep directory on all the partitions you would like Keepstore to use, and then start Keepstore. For example, using 2 tmpfs volumes:
+There are two ways to specify a set of local directories where keepstore should store its data files.
+# Implicitly, by creating a directory called @keep@ at the top level of each filesystem you intend to use, and omitting @-volume@ arguments.
+# Explicitly, by providing a @-volume@ argument for each directory.
+
+For example, if there are filesystems mounted at @/mnt@ and @/mnt2@:
<notextile>
-<pre><code>~$ <span class="userinput">keepstore -blob-signing-key-file=./blob-signing-key</span>
+<pre><code>~$ <span class="userinput">mkdir /mnt/keep /mnt2/keep</span>
+~$ <span class="userinput">keepstore</span>
2015/05/08 13:44:26 keepstore starting, pid 2765
2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt2/keep] (writable=true)
2015/05/08 13:44:26 listening at :25107
</code></pre>
</notextile>
-It's recommended to run Keepstore under "runit":http://smarden.org/runit/ or something similar.
+Equivalently:
-Repeat this section for each Keepstore server you are setting up.
+<notextile>
+<pre><code>~$ <span class="userinput">mkdir /mnt/keep /mnt2/keep</span>
+~$ <span class="userinput">keepstore -volume=/mnt/keep -volume=/mnt2/keep</span>
+2015/05/08 13:44:26 keepstore starting, pid 2765
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt2/keep] (writable=true)
+2015/05/08 13:44:26 listening at :25107
+</code></pre>
+</notextile>
+
+h3. Run keepstore as a supervised service
+
+We recommend running Keepstore under "runit":http://smarden.org/runit/ or something similar, using a run script like the following:
+
+<notextile>
+<pre><code>#!/bin/sh
+
+exec 2>&1
+exec GOGC=10 GOMAXPROCS=<span class="userinput">4</span> keepstore \
+ -enforce-permissions=true \
+ -blob-signing-key-file=<span class="userinput">/etc/keepstore/blob-signing.key</span> \
+ -max-buffers=<span class="userinput">100</span> \
+ -serialize=true \
+ -volume=<span class="userinput">/mnt/keep</span> \
+ -volume=<span class="userinput">/mnt2/keep</span>
+</code></pre>
+</notextile>
+
+The @GOMAXPROCS@ environment variable determines the maximum number of concurrent threads, and should normally be set to the number of CPU cores present.
+
+The @-max-buffers@ argument limits keepstore's memory usage. It should be set such that @max-buffers * 64MiB + 10%@ fits comfortably in memory. For example, @-max-buffers=100@ is suitable for a host with 8 GiB RAM.
+
+If you want access control on your Keepstore server(s), you must specify the @-enforce-permissions@ flag and provide a signing key. The @-blob-signing-key-file@ argument should be a file containing a long random alphanumeric string with no internal line breaks (it is also possible to use a socket or FIFO: keepstore reads it only once, at startup). This key must be the same as the @blob_signing_key@ configured in the "API server's":install-api-server.html configuration file, @/etc/arvados/api/application.yml@.
+
+h3. Set up additional servers
+
+Repeat the above sections to prepare volumes and bring up supervised services on each Keepstore server you are setting up.
h3. Tell the API server about the Keepstore servers
}
EOF</span>
</code></pre></notextile>
-
-
-
<notextile>
<pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-~$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
+~$ <span class="userinput">cd /var/www/arvados-sso/current</span>
+/var/www/arvados-sso/current$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
:001 > <span class="userinput">c = Client.new</span>
:002 > <span class="userinput">c.name = "joshid"</span>
:003 > <span class="userinput">c.app_id = "arvados-server"</span>
title: Accessing an Arvados VM with SSH - Unix Environments
...
-This document is for accessing an arvados VM using SSK keys in Unix environments (Linux, OS X, Cygwin). If you would like to access VM through your browser, please visit the "Accessing an Arvados VM with Webshell":vm-login-with-webshell.html page. If you are using a Windows environment, please visit the "Accessing an Arvados VM with SSH - Windows Environments":ssh-access-windows.html page.
+This document is for accessing an Arvados VM using SSH keys in Unix environments (Linux, OS X, Cygwin). If you would like to access VM through your browser, please visit the "Accessing an Arvados VM with Webshell":vm-login-with-webshell.html page. If you are using a Windows environment, please visit the "Accessing an Arvados VM with SSH - Windows Environments":ssh-access-windows.html page.
{% include 'ssh_intro' %}
title: Accessing an Arvados VM with SSH - Windows Environments
...
-This document is for accessing an arvados VM using SSK keys in Windows environments. If you would like to use to access VM through your browser, please visit the "Accessing an Arvados VM with Webshell":vm-login-with-webshell.html page. If you are using a Unix environment (Linux, OS X, Cygwin), please visit the "Accessing an Arvados VM with SSH - Unix Environments":ssh-access-unix.html page.
+This document is for accessing an Arvados VM using SSH keys in Windows environments. If you would like to use to access VM through your browser, please visit the "Accessing an Arvados VM with Webshell":vm-login-with-webshell.html page. If you are using a Unix environment (Linux, OS X, Cygwin), please visit the "Accessing an Arvados VM with SSH - Unix Environments":ssh-access-unix.html page.
{% include 'ssh_intro' %}
# After downloading PuTTY and installing it, you should have a PuTTY folder in @C:\Program Files\@ or @C:\Program Files (x86)\@ (if you are using a 64 bit operating system).
# Open the Control Panel.
# Select _Advanced System Settings_, and choose _Environment Variables_.
-If you are using newer systems like Windows 7, you may use the following to open _Advanced System Settings_. Open Control Panel. Click on _System and Security_. Click on _System_. Click on _Advanced system settings_ and choose _Environment Variables..._
+If you are using newer systems like Windows 10, you may use the following to open _Advanced System Settings_. Open Control Panel. Click on _System and Security_. Click on _System_. Click on _Advanced system settings_ and choose _Environment Variables..._
# Under system variables, find and edit @PATH@.
# If you installed PuTTY in @C:\Program Files\PuTTY\@, add the following to the end of PATH:
<code>;C:\Program Files\PuTTY</code>
# Open PuTTY from the Start Menu.
# On the Session screen set the Host Name (or IP address) to “shell”, which is the hostname listed in the _Virtual Machines_ page.
# On the Session screen set the Port to “22”.
-# On the Connection %(rarr)→% Data screen set the Auto-login username to the username listed in the *logins* column on the Arvados Workbench _Settings %(rarr)→% Virtual machines_ page.
+# On the Connection %(rarr)→% Data screen set the Auto-login username to the username listed in the *Login name* column on the Arvados Workbench Virtual machines_ page.
# On the Connection %(rarr)→% Proxy screen set the Proxy Type to “Local”.
# On the Connection %(rarr)→% Proxy screen in the “Telnet command, or local proxy command” box enter:
<code>plink -P 2222 turnout@switchyard.{{ site.arvados_api_host }} %host</code>
Webshell gives you access to an arvados virtual machine from your browser with no additional setup.
-In the Arvados Workbench, click on the dropdown menu icon <span class="fa fa-lg fa-user"></span> <span class="caret"></span> in the upper right corner of the top navigation menu to access the user settings menu, and click on the menu item *Virtual machines* to see the list of virtual machines you can access.
+In the Arvados Workbench, click on the dropdown menu icon <span class="fa fa-lg fa-user"></span> <span class="caret"></span> in the upper right corner of the top navigation menu to access the user settings menu, and click on the menu item *Virtual machines* to see the list of virtual machines you can access. If you do not have access to any virtual machines, please click on <span class="btn btn-sm btn-primary">Send request for shell access</span> or send an email to "support@curoverse.com":mailto:support@curoverse.com.
Each row in the Virtual Machines panel lists the hostname of the VM, along with a <code>Log in as *you*</code> button under the column "Web shell beta". Clicking on this button will open up a webshell terminal for you in a new browser tab and log you in.
* Storing and querying metadata about genome sequence files, such as human subjects and their phenotypic traits using the "Arvados Metadata Database.":{{site.baseurl}}/user/topics/tutorial-trait-search.html
* Accessing, organizing, and sharing data, pipelines and results using the "Arvados Workbench":{{site.baseurl}}/user/getting_started/workbench.html web application.
-The examples in this guide use the Arvados instance located at <a href="{{site.arvados_workbench_host}}/" target="_blank">{{site.arvados_workbench_host}}</a>. If you are using a different Arvados instance replace @{{ site.arvados_workbench_host }}@ with your private instance in all of the examples in this guide.
-
-Curoverse maintains a public Arvados instance located at <a href="https://workbench.qr1hi.arvadosapi.com/" target="_blank">https://workbench.qr1hi.arvadosapi.com/</a>. You must have an account in order to use this service. If you would like to request an account, please send an email to "arvados@curoverse.com":mailto:arvados@curoverse.com.
+The examples in this guide use the public Arvados instance located at <a href="{{site.arvados_workbench_host}}/" target="_blank">{{site.arvados_workbench_host}}</a>. If you are using a different Arvados instance replace @{{ site.arvados_workbench_host }}@ with your private instance in all of the examples in this guide.
h2. Typographic conventions
{% code 'example_docker' as javascript %}
</notextile>
-* The @docker_image@ field can be one of: the Docker repository name (as shown above), the Docker image hash, the Arvados collection UUID, or the Arvados collection portable data hash.
+* The @docker_image@ field can be one of: the Docker repository name (as shown above), the Docker image hash, or the Arvados collection portable data hash.
h2. Share Docker images
# Start from the *Workbench Dashboard*. You can access the Dashboard by clicking on *<i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard* in the upper left corner of any Workbench page.
# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a pipeline...</span> button. This will open a dialog box titled *Choose a pipeline to run*.
-# Click to open the *All projects <span class="caret"></span>* menu. Under the *Projects shared with me* header, select *<i class="fa fa-fw fa-share-alt"></i> Arvados Tutorial*.
+# In the search box, type in *Tutorial align using bwa mem*.
# Select *<i class="fa fa-fw fa-gear"></i> Tutorial align using bwa mem* and click the <span class="btn btn-sm btn-primary" >Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span> button. This will create a new pipeline in your *Home* project and will open it. You can now supply the inputs for the pipeline.
# The first input parameter to the pipeline is *"reference_collection" parameter for run-command script in bwa-mem component*. Click the <span class="btn btn-sm btn-primary">Choose</span> button beneath that header. This will open a dialog box titled *Choose a dataset for "reference_collection" parameter for run-command script in bwa-mem component*.
-# Once again, open the *All projects <span class="caret"></span>* menu and select *<i class="fa fa-fw fa-share-alt"></i> Arvados Tutorial*. Select *<i class="fa fa-fw fa-archive"></i> Tutorial chromosome 19 reference* and click the <span class="btn btn-sm btn-primary" >OK</span> button.
+# Open the *Home <span class="caret"></span>* menu and select *All Projects*. Search for and select *<i class="fa fa-fw fa-archive"></i> Tutorial chromosome 19 reference* and click the <span class="btn btn-sm btn-primary" >OK</span> button.
# Repeat the previous two steps to set the *"sample" parameter for run-command script in bwa-mem component* parameter to *<i class="fa fa-fw fa-archive"></i> Tutorial sample exome*.
# Click on the <span class="btn btn-sm btn-primary" >Run <i class="fa fa-fw fa-play"></i></span> button. The page updates to show you that the pipeline has been submitted to run on the Arvados cluster.
-# After the pipeline starts running, you can track the progress by watching log messages from jobs. This page refreshes automatically. You will see a <span class="label label-success">complete</span> label under the *job* column when the pipeline completes successfully.
+# After the pipeline starts running, you can track the progress by watching log messages from jobs. This page refreshes automatically. You will see a <span class="label label-success">complete</span> label when the pipeline completes successfully.
# Click on the *Output* link to see the results of the job. This will load a new page listing the output files from this pipeline. You'll see the output SAM file from the alignment tool under the *Files* tab.
# Click on the <span class="btn btn-sm btn-info"><i class="fa fa-download"></i></span> download button to the right of the SAM file to download your results.
}
# If this job requires a Docker image, install that.
-my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
if ($docker_locator = $Job->{docker_image_locator}) {
($docker_stream, $docker_hash) = find_docker_image($docker_locator);
if (!$docker_hash)
{fork => 1});
$docker_limitmem = ($? == 0);
+ # Find a non-root Docker user to use.
+ # Tries the default user for the container, then 'crunch', then 'nobody',
+ # testing for whether the actual user id is non-zero. This defends against
+ # mistakes but not malice, but we intend to harden the security in the future
+ # so we don't want anyone getting used to their jobs running as root in their
+ # Docker containers.
+ my @tryusers = ("", "crunch", "nobody");
+ foreach my $try_user (@tryusers) {
+ my $try_user_arg;
+ if ($try_user eq "") {
+ Log(undef, "Checking if container default user is not UID 0");
+ $try_user_arg = "";
+ } else {
+ Log(undef, "Checking if user '$try_user' is not UID 0");
+ $try_user_arg = "--user=$try_user";
+ }
+ srun(["srun", "--nodelist=" . $node[0]],
+ ["/bin/sh", "-ec",
+ "a=`$docker_bin run --rm $try_user_arg $docker_hash id --user` && " .
+ " test \$a -ne 0"],
+ {fork => 1});
+ if ($? == 0) {
+ $dockeruserarg = $try_user_arg;
+ if ($try_user eq "") {
+ Log(undef, "Container will run with default user");
+ } else {
+ Log(undef, "Container will run with $dockeruserarg");
+ }
+ last;
+ }
+ }
+
+ if (!defined $dockeruserarg) {
+ croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
+ }
+
if ($Job->{arvados_sdk_version}) {
# The job also specifies an Arvados SDK version. Add the SDKs to the
# tar file for the build script to install.
qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
"--job-name=$job_id.$id.$$",
);
+
+ my $stdbuf = " stdbuf --output=0 --error=0 ";
+
my $command =
"if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
{
my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
$command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
- $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+ $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
# We only set memory limits if Docker lets us limit both memory and swap.
# Memory limits alone have been supported longer, but subprocesses tend
# to get SIGKILL if they exceed that without any swap limit set.
}
$command .= "--env=\QHOME=$ENV{HOME}\E ";
$command .= "\Q$docker_hash\E ";
- $command .= "stdbuf --output=0 --error=0 ";
- $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+
+ if ($Job->{arvados_sdk_version}) {
+ $command .= $stdbuf;
+ $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
+ } else {
+ $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
+ "if which stdbuf >/dev/null ; then " .
+ " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+ " else " .
+ " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+ " fi\'";
+ }
} else {
# Non-docker run
$command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
- $command .= "stdbuf --output=0 --error=0 ";
+ $command .= $stdbuf;
$command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
}
func (s *ServerRequiredSuite) SetUpTest(c *C) {
exec.Command("python", "run_test_server.py", "stop").Run()
}
-// StartKeep starts 2 keep servers with enforcePermissions=false
-func StartKeep() {
- StartKeepWithParams(2, false)
-}
-
-// StartKeepWithParams starts the given number of keep servers,
+// StartKeep starts the given number of keep servers,
// optionally with -enforce-permissions enabled.
-func StartKeepWithParams(numKeepServers int, enforcePermissions bool) {
+// Use numKeepServers = 2 and enforcePermissions = false under all normal circumstances.
+func StartKeep(numKeepServers int, enforcePermissions bool) {
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
chdirToPythonTests()
}
}
-func StopKeep() {
- StopKeepWithParams(2)
-}
-
-// StopKeepServers stops keep servers that were started with
-// StartKeep. numkeepServers should be the same value that was passed
-// to StartKeep.
-func StopKeepWithParams(numKeepServers int) {
+// StopKeep stops keep servers that were started with StartKeep.
+// numkeepServers should be the same value that was passed to StartKeep,
+// which is 2 under all normal circumstances.
+func StopKeep(numKeepServers int) {
cwd, _ := os.Getwd()
defer os.Chdir(cwd)
chdirToPythonTests()
--- /dev/null
+package main
+
+import (
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "log"
+ "os"
+ "os/exec"
+ "os/signal"
+ "strings"
+ "syscall"
+)
+
+type TaskDef struct {
+ Command []string `json:"command"`
+ Env map[string]string `json:"task.env"`
+ Stdin string `json:"task.stdin"`
+ Stdout string `json:"task.stdout"`
+ Vwd map[string]string `json:"task.vwd"`
+ SuccessCodes []int `json:"task.successCodes"`
+ PermanentFailCodes []int `json:"task.permanentFailCodes"`
+ TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
+}
+
+type Tasks struct {
+ Tasks []TaskDef `json:"tasks"`
+}
+
+type Job struct {
+ Script_parameters Tasks `json:"script_parameters"`
+}
+
+type Task struct {
+ Job_uuid string `json:"job_uuid"`
+ Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
+ Parameters TaskDef `json:"parameters"`
+ Sequence int `json:"sequence"`
+ Output string `json:"output"`
+ Success bool `json:"success"`
+ Progress float32 `json:"sequence"`
+}
+
+type IArvadosClient interface {
+ Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+ Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
+}
+
+func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
+ tmpdir = crunchtmpdir + "/tmpdir"
+ err = os.Mkdir(tmpdir, 0700)
+ if err != nil {
+ return "", "", err
+ }
+
+ outdir = crunchtmpdir + "/outdir"
+ err = os.Mkdir(outdir, 0700)
+ if err != nil {
+ return "", "", err
+ }
+
+ return tmpdir, outdir, nil
+}
+
+func checkOutputFilename(outdir, fn string) error {
+ if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
+ return fmt.Errorf("Path must not start or end with '/'")
+ }
+ if strings.Index("../", fn) != -1 {
+ return fmt.Errorf("Path must not contain '../'")
+ }
+
+ sl := strings.LastIndex(fn, "/")
+ if sl != -1 {
+ os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
+ }
+ return nil
+}
+
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+ if taskp.Vwd != nil {
+ for k, v := range taskp.Vwd {
+ v = substitute(v, replacements)
+ err = checkOutputFilename(outdir, k)
+ if err != nil {
+ return "", "", err
+ }
+ os.Symlink(v, outdir+"/"+k)
+ }
+ }
+
+ if taskp.Stdin != "" {
+ // Set up stdin redirection
+ stdin = substitute(taskp.Stdin, replacements)
+ cmd.Stdin, err = os.Open(stdin)
+ if err != nil {
+ return "", "", err
+ }
+ }
+
+ if taskp.Stdout != "" {
+ err = checkOutputFilename(outdir, taskp.Stdout)
+ if err != nil {
+ return "", "", err
+ }
+ // Set up stdout redirection
+ stdout = outdir + "/" + taskp.Stdout
+ cmd.Stdout, err = os.Create(stdout)
+ if err != nil {
+ return "", "", err
+ }
+ } else {
+ cmd.Stdout = os.Stdout
+ }
+
+ if taskp.Env != nil {
+ // Set up subprocess environment
+ cmd.Env = os.Environ()
+ for k, v := range taskp.Env {
+ v = substitute(v, replacements)
+ cmd.Env = append(cmd.Env, k+"="+v)
+ }
+ }
+ return stdin, stdout, nil
+}
+
+// Set up signal handlers. Go sends signal notifications to a "signal
+// channel".
+func setupSignals(cmd *exec.Cmd) chan os.Signal {
+ sigChan := make(chan os.Signal, 1)
+ signal.Notify(sigChan, syscall.SIGTERM)
+ signal.Notify(sigChan, syscall.SIGINT)
+ signal.Notify(sigChan, syscall.SIGQUIT)
+ return sigChan
+}
+
+func inCodes(code int, codes []int) bool {
+ if codes != nil {
+ for _, c := range codes {
+ if code == c {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+const TASK_TEMPFAIL = 111
+
+type TempFail struct{ error }
+type PermFail struct{}
+
+func (s PermFail) Error() string {
+ return "PermFail"
+}
+
+func substitute(inp string, subst map[string]string) string {
+ for k, v := range subst {
+ inp = strings.Replace(inp, k, v, -1)
+ }
+ return inp
+}
+
+func runner(api IArvadosClient,
+ kc IKeepClient,
+ jobUuid, taskUuid, crunchtmpdir, keepmount string,
+ jobStruct Job, taskStruct Task) error {
+
+ var err error
+ taskp := taskStruct.Parameters
+
+ // If this is task 0 and there are multiple tasks, dispatch subtasks
+ // and exit.
+ if taskStruct.Sequence == 0 {
+ if len(jobStruct.Script_parameters.Tasks) == 1 {
+ taskp = jobStruct.Script_parameters.Tasks[0]
+ } else {
+ for _, task := range jobStruct.Script_parameters.Tasks {
+ err := api.Create("job_tasks",
+ map[string]interface{}{
+ "job_task": Task{Job_uuid: jobUuid,
+ Created_by_job_task_uuid: taskUuid,
+ Sequence: 1,
+ Parameters: task}},
+ nil)
+ if err != nil {
+ return TempFail{err}
+ }
+ }
+ err = api.Update("job_tasks", taskUuid,
+ map[string]interface{}{
+ "job_task": Task{
+ Output: "",
+ Success: true,
+ Progress: 1.0}},
+ nil)
+ return nil
+ }
+ }
+
+ var tmpdir, outdir string
+ tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
+ if err != nil {
+ return TempFail{err}
+ }
+
+ replacements := map[string]string{
+ "$(task.tmpdir)": tmpdir,
+ "$(task.outdir)": outdir,
+ "$(task.keep)": keepmount}
+
+ // Set up subprocess
+ for k, v := range taskp.Command {
+ taskp.Command[k] = substitute(v, replacements)
+ }
+
+ cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
+
+ cmd.Dir = outdir
+
+ var stdin, stdout string
+ stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+ if err != nil {
+ return err
+ }
+
+ // Run subprocess and wait for it to complete
+ if stdin != "" {
+ stdin = " < " + stdin
+ }
+ if stdout != "" {
+ stdout = " > " + stdout
+ }
+ log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+
+ var caughtSignal os.Signal
+ sigChan := setupSignals(cmd)
+
+ err = cmd.Start()
+ if err != nil {
+ signal.Stop(sigChan)
+ return TempFail{err}
+ }
+
+ finishedSignalNotify := make(chan struct{})
+ go func(sig <-chan os.Signal) {
+ for sig := range sig {
+ caughtSignal = sig
+ cmd.Process.Signal(caughtSignal)
+ }
+ close(finishedSignalNotify)
+ }(sigChan)
+
+ err = cmd.Wait()
+ signal.Stop(sigChan)
+
+ close(sigChan)
+ <-finishedSignalNotify
+
+ if caughtSignal != nil {
+ log.Printf("Caught signal %v", caughtSignal)
+ return PermFail{}
+ }
+
+ if err != nil {
+ // Run() returns ExitError on non-zero exit code, but we handle
+ // that down below. So only return if it's not ExitError.
+ if _, ok := err.(*exec.ExitError); !ok {
+ return TempFail{err}
+ }
+ }
+
+ var success bool
+
+ exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
+
+ log.Printf("Completed with exit code %v", exitCode)
+
+ if inCodes(exitCode, taskp.PermanentFailCodes) {
+ success = false
+ } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
+ return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
+ } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
+ success = true
+ } else {
+ success = false
+ }
+
+ // Upload output directory
+ manifest, err := WriteTree(kc, outdir)
+ if err != nil {
+ return TempFail{err}
+ }
+
+ // Set status
+ err = api.Update("job_tasks", taskUuid,
+ map[string]interface{}{
+ "job_task": Task{
+ Output: manifest,
+ Success: success,
+ Progress: 1}},
+ nil)
+ if err != nil {
+ return TempFail{err}
+ }
+
+ if success {
+ return nil
+ } else {
+ return PermFail{}
+ }
+}
+
+func main() {
+ api, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ jobUuid := os.Getenv("JOB_UUID")
+ taskUuid := os.Getenv("TASK_UUID")
+ tmpdir := os.Getenv("TASK_WORK")
+ keepmount := os.Getenv("TASK_KEEPMOUNT")
+
+ var jobStruct Job
+ var taskStruct Task
+
+ err = api.Get("jobs", jobUuid, nil, &jobStruct)
+ if err != nil {
+ log.Fatal(err)
+ }
+ err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ var kc IKeepClient
+ kc, err = keepclient.MakeKeepClient(&api)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ syscall.Umask(0022)
+ err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+
+ if err == nil {
+ os.Exit(0)
+ } else if _, ok := err.(TempFail); ok {
+ log.Print(err)
+ os.Exit(TASK_TEMPFAIL)
+ } else if _, ok := err.(PermFail); ok {
+ os.Exit(1)
+ } else {
+ log.Fatal(err)
+ }
+}
--- /dev/null
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ . "gopkg.in/check.v1"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "syscall"
+ "testing"
+ "time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+type TestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+ c *C
+ manifest string
+ success bool
+}
+
+func (t ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+ return nil
+}
+
+func (t ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ t.c.Check(resourceType, Equals, "job_tasks")
+ t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": Task{
+ Output: t.manifest,
+ Success: t.success,
+ Progress: 1}})
+ return nil
+}
+
+func (s *TestSuite) TestSimpleRun(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"echo", "foo"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+}
+
+func checkOutput(c *C, tmpdir string) {
+ file, err := os.Open(tmpdir + "/outdir/output.txt")
+ c.Assert(err, IsNil)
+
+ data := make([]byte, 100)
+ var count int
+ err = nil
+ offset := 0
+ for err == nil {
+ count, err = file.Read(data[offset:])
+ offset += count
+ }
+ c.Assert(err, Equals, io.EOF)
+ c.Check(string(data[0:offset]), Equals, "foo\n")
+}
+
+func (s *TestSuite) TestSimpleRunSubtask(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{
+ TaskDef{Command: []string{"echo", "bar"}},
+ TaskDef{Command: []string{"echo", "foo"}}}}},
+ Task{Parameters: TaskDef{
+ Command: []string{"echo", "foo"},
+ Stdout: "output.txt"},
+ Sequence: 1})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestRedirect(c *C) {
+ tmpfile, _ := ioutil.TempFile("", "")
+ tmpfile.Write([]byte("foo\n"))
+ tmpfile.Close()
+ defer os.Remove(tmpfile.Name())
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"cat"},
+ Stdout: "output.txt",
+ Stdin: tmpfile.Name()}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnv(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "echo $BAR"},
+ Stdout: "output.txt",
+ Env: map[string]string{"BAR": "foo"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnvSubstitute(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "foo\n",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "echo $BAR"},
+ Stdout: "output.txt",
+ Env: map[string]string{"BAR": "$(task.keep)"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnvReplace(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "echo $PATH"},
+ Stdout: "output.txt",
+ Env: map[string]string{"PATH": "foo"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+type SubtaskTestClient struct {
+ c *C
+ parms []Task
+ i int
+}
+
+func (t *SubtaskTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+ t.c.Check(resourceType, Equals, "job_tasks")
+ t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": t.parms[t.i]})
+ t.i += 1
+ return nil
+}
+
+func (t SubtaskTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+ return nil
+}
+
+func (s *TestSuite) TestScheduleSubtask(c *C) {
+
+ api := SubtaskTestClient{c, []Task{
+ Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+ Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+ Sequence: 1,
+ Parameters: TaskDef{
+ Command: []string{"echo", "bar"}}},
+ Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+ Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+ Sequence: 1,
+ Parameters: TaskDef{
+ Command: []string{"echo", "foo"}}}},
+ 0}
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(&api, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{
+ TaskDef{Command: []string{"echo", "bar"}},
+ TaskDef{Command: []string{"echo", "foo"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+
+}
+
+func (s *TestSuite) TestRunFail(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunSuccessCode(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", true}, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "exit 1"},
+ SuccessCodes: []int{0, 1}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+}
+
+func (s *TestSuite) TestRunFailCode(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "exit 0"},
+ PermanentFailCodes: []int{0, 1}}}}},
+ Task{Sequence: 0})
+ c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunTempFailCode(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"/bin/sh", "-c", "exit 1"},
+ TemporaryFailCodes: []int{1}}}}},
+ Task{Sequence: 0})
+ c.Check(err, FitsTypeOf, TempFail{})
+}
+
+func (s *TestSuite) TestVwd(c *C) {
+ tmpfile, _ := ioutil.TempFile("", "")
+ tmpfile.Write([]byte("foo\n"))
+ tmpfile.Close()
+ defer os.Remove(tmpfile.Name())
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"ls", "output.txt"},
+ Vwd: map[string]string{
+ "output.txt": tmpfile.Name()}}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSubstitutionStdin(c *C) {
+ keepmount, _ := ioutil.TempDir("", "")
+ ioutil.WriteFile(keepmount+"/"+"file1.txt", []byte("foo\n"), 0600)
+ defer func() {
+ os.RemoveAll(keepmount)
+ }()
+
+ log.Print("Keepmount is ", keepmount)
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ log.Print("tmpdir is ", tmpdir)
+
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ keepmount,
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"cat"},
+ Stdout: "output.txt",
+ Stdin: "$(task.keep)/file1.txt"}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSubstitutionCommandLine(c *C) {
+ keepmount, _ := ioutil.TempDir("", "")
+ ioutil.WriteFile(keepmount+"/"+"file1.txt", []byte("foo\n"), 0600)
+ defer func() {
+ os.RemoveAll(keepmount)
+ }()
+
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c,
+ ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ keepmount,
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"cat", "$(task.keep)/file1.txt"},
+ Stdout: "output.txt"}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+
+ checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSignal(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ go func() {
+ time.Sleep(1 * time.Second)
+ self, _ := os.FindProcess(os.Getpid())
+ self.Signal(syscall.SIGINT)
+ }()
+
+ err := runner(ArvTestClient{c,
+ "", false},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"sleep", "4"}}}}},
+ Task{Sequence: 0})
+ c.Check(err, FitsTypeOf, PermFail{})
+
+}
+
+func (s *TestSuite) TestQuoting(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ err := runner(ArvTestClient{c,
+ "./s\\040ub:dir d3b07384d113edec49eaa6238ad5ff00+4 0:4::e\\040vil\n", true},
+ KeepTestClient{},
+ "zzzz-8i9sb-111111111111111",
+ "zzzz-ot0gb-111111111111111",
+ tmpdir,
+ "",
+ Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Command: []string{"echo", "foo"},
+ Stdout: "s ub:dir/:e vi\nl"}}}},
+ Task{Sequence: 0})
+ c.Check(err, IsNil)
+}
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "crypto/md5"
+ "errors"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+ "io"
+ "log"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+)
+
+type Block struct {
+ data []byte
+ offset int64
+}
+
+type ManifestStreamWriter struct {
+ *ManifestWriter
+ *manifest.ManifestStream
+ offset int64
+ *Block
+ uploader chan *Block
+ finish chan []error
+}
+
+type IKeepClient interface {
+ PutHB(hash string, buf []byte) (string, int, error)
+}
+
+func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
+ n, err := m.ReadFrom(bytes.NewReader(p))
+ return int(n), err
+}
+
+func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
+ var total int64
+ var count int
+
+ for err == nil {
+ if m.Block == nil {
+ m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+ }
+ count, err = r.Read(m.Block.data[m.Block.offset:])
+ total += int64(count)
+ m.Block.offset += int64(count)
+ if m.Block.offset == keepclient.BLOCKSIZE {
+ m.uploader <- m.Block
+ m.Block = nil
+ }
+ }
+
+ if err == io.EOF {
+ return total, nil
+ } else {
+ return total, err
+ }
+
+}
+
+func (m *ManifestStreamWriter) goUpload() {
+ var errors []error
+ uploader := m.uploader
+ finish := m.finish
+ for block := range uploader {
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ }
+ }
+ finish <- errors
+}
+
+type ManifestWriter struct {
+ IKeepClient
+ stripPrefix string
+ Streams map[string]*ManifestStreamWriter
+}
+
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
+ if info.IsDir() {
+ return nil
+ }
+
+ var dir string
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ }
+ if dir == "" {
+ dir = "."
+ }
+
+ fn := path[(len(path) - len(info.Name())):]
+
+ if m.Streams[dir] == nil {
+ m.Streams[dir] = &ManifestStreamWriter{
+ m,
+ &manifest.ManifestStream{StreamName: dir},
+ 0,
+ nil,
+ make(chan *Block),
+ make(chan []error)}
+ go m.Streams[dir].goUpload()
+ }
+
+ stream := m.Streams[dir]
+
+ fileStart := stream.offset
+
+ file, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+
+ log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+ var count int64
+ count, err = io.Copy(stream, file)
+ if err != nil {
+ return err
+ }
+
+ stream.offset += count
+
+ stream.ManifestStream.Files = append(stream.ManifestStream.Files,
+ fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+
+ return nil
+}
+
+func (m *ManifestWriter) Finish() error {
+ var errstring string
+ for _, stream := range m.Streams {
+ if stream.uploader == nil {
+ continue
+ }
+ if stream.Block != nil {
+ stream.uploader <- stream.Block
+ }
+ close(stream.uploader)
+ stream.uploader = nil
+
+ errors := <-stream.finish
+ close(stream.finish)
+ stream.finish = nil
+
+ for _, r := range errors {
+ errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+ }
+ }
+ if errstring != "" {
+ return errors.New(errstring)
+ } else {
+ return nil
+ }
+}
+
+func (m *ManifestWriter) ManifestText() string {
+ m.Finish()
+ var buf bytes.Buffer
+
+ dirs := make([]string, len(m.Streams))
+ i := 0
+ for k := range m.Streams {
+ dirs[i] = k
+ i++
+ }
+ sort.Strings(dirs)
+
+ for _, k := range dirs {
+ v := m.Streams[k]
+
+ if k == "." {
+ buf.WriteString(".")
+ } else {
+ k = strings.Replace(k, " ", "\\040", -1)
+ k = strings.Replace(k, "\n", "", -1)
+ buf.WriteString("./" + k)
+ }
+ for _, b := range v.Blocks {
+ buf.WriteString(" ")
+ buf.WriteString(b)
+ }
+ for _, f := range v.Files {
+ buf.WriteString(" ")
+ f = strings.Replace(f, " ", "\\040", -1)
+ f = strings.Replace(f, "\n", "", -1)
+ buf.WriteString(f)
+ }
+ buf.WriteString("\n")
+ }
+ return buf.String()
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+ mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
+ err = filepath.Walk(root, mw.WalkFunc)
+
+ if err != nil {
+ return "", err
+ }
+
+ err = mw.Finish()
+ if err != nil {
+ return "", err
+ }
+
+ return mw.ManifestText(), nil
+}
--- /dev/null
+package main
+
+import (
+ "crypto/md5"
+ "errors"
+ "fmt"
+ . "gopkg.in/check.v1"
+ "io/ioutil"
+ "os"
+)
+
+type UploadTestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&UploadTestSuite{})
+
+type KeepTestClient struct {
+}
+
+func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return fmt.Sprintf("%x+%v", md5.Sum(buf), len(buf)), len(buf), nil
+}
+
+func (s *TestSuite) TestSimpleUpload(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ os.Mkdir(tmpdir+"/subdir", 0700)
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+`)
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ file, _ := os.Create(tmpdir + "/" + "file1.txt")
+ data := make([]byte, 1024*1024-1)
+ for i := range data {
+ data[i] = byte(i % 10)
+ }
+ for i := 0; i < 65; i++ {
+ file.Write(data)
+ }
+ file.Close()
+
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ os.Mkdir(tmpdir+"/subdir", 0700)
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+ str, err := WriteTree(KeepTestClient{}, tmpdir)
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
+`)
+}
+
+type KeepErrorTestClient struct {
+}
+
+func (k KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+ return "", 0, errors.New("Failed!")
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ str, err := WriteTree(KeepErrorTestClient{}, tmpdir)
+ c.Check(err, NotNil)
+ c.Check(str, Equals, "")
+}
// A Keep "block" is 64MB.
const BLOCKSIZE = 64 * 1024 * 1024
-var BlockNotFound = errors.New("Block not found")
+// Error interface with an error and boolean indicating whether the error is temporary
+type Error interface {
+ error
+ Temporary() bool
+}
+
+// multipleResponseError is of type Error
+type multipleResponseError struct {
+ error
+ isTemp bool
+}
+
+func (e *multipleResponseError) Temporary() bool {
+ return e.isTemp
+}
+
+// BlockNotFound is a multipleResponseError where isTemp is false
+var BlockNotFound = &ErrNotFound{multipleResponseError{
+ error: errors.New("Block not found"),
+ isTemp: false,
+}}
+
+// ErrNotFound is a multipleResponseError where isTemp can be true or false
+type ErrNotFound struct {
+ multipleResponseError
+}
+
var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
var errs []string
tries_remaining := 1 + kc.Retries
+
serversToTry := kc.getSortedRoots(locator)
+
+ numServers := len(serversToTry)
+ count404 := 0
+
var retryList []string
for tries_remaining > 0 {
// server side failure, transient
// error, can try again.
retryList = append(retryList, host)
+ } else if resp.StatusCode == 404 {
+ count404++
}
} else {
// Success.
}
log.Printf("DEBUG: %s %s failed: %v", method, locator, errs)
- return nil, 0, "", BlockNotFound
+ var err error
+ if count404 == numServers {
+ err = BlockNotFound
+ } else {
+ err = &ErrNotFound{multipleResponseError{
+ error: fmt.Errorf("%s %s failed: %v", method, locator, errs),
+ isTemp: len(serversToTry) > 0,
+ }}
+ }
+ return nil, 0, "", err
}
// Get() retrieves a block, given a locator. Returns a reader, the
"net"
"net/http"
"os"
+ "strings"
"testing"
)
return
}
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
if *no_server {
return
}
- arvadostest.StopKeep()
+ arvadostest.StopKeep(2)
arvadostest.StopAPI()
}
kc, _ := MakeKeepClient(&arv)
kc.Want_replicas = 2
+ kc.Retries = 0
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
writableLocalRoots := make(map[string]string)
kc, _ := MakeKeepClient(&arv)
arv.ApiToken = "abc123"
kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+ kc.Retries = 0
r, n, url2, err := kc.Get(hash)
- c.Check(err, Equals, BlockNotFound)
+ errNotFound, _ := err.(*ErrNotFound)
+ c.Check(errNotFound, NotNil)
+ c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
+ c.Check(errNotFound.Temporary(), Equals, true)
c.Check(n, Equals, int64(0))
c.Check(url2, Equals, "")
c.Check(r, Equals, nil)
kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
r, n, url2, err := kc.Get(hash)
- c.Check(err, Equals, BlockNotFound)
+ errNotFound, _ := err.(*ErrNotFound)
+ c.Check(errNotFound, NotNil)
+ c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
+ c.Check(errNotFound.Temporary(), Equals, true)
c.Check(n, Equals, int64(0))
c.Check(url2, Equals, "")
c.Check(r, Equals, nil)
}
kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+ kc.Retries = 0
// This test works only if one of the failing services is
// attempted before the succeeding service. Otherwise,
c.Check(err2, Equals, nil)
c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
}
+
+type FailThenSucceedPutHandler struct {
+ handled chan string
+ count int
+ successhandler StubPutHandler
+}
+
+func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if h.count == 0 {
+ resp.WriteHeader(500)
+ h.count += 1
+ h.handled <- fmt.Sprintf("http://%s", req.Host)
+ } else {
+ h.successhandler.ServeHTTP(resp, req)
+ }
+}
+
+func (s *StandaloneSuite) TestPutBRetry(c *C) {
+ st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
+ StubPutHandler{
+ c,
+ Md5String("foo"),
+ "abc123",
+ "foo",
+ make(chan string, 5)}}
+
+ arv, _ := arvadosclient.MakeArvadosClient()
+ kc, _ := MakeKeepClient(&arv)
+
+ kc.Want_replicas = 2
+ arv.ApiToken = "abc123"
+ localRoots := make(map[string]string)
+ writableLocalRoots := make(map[string]string)
+
+ ks := RunSomeFakeKeepServers(st, 2)
+
+ for i, k := range ks {
+ localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+ defer k.listener.Close()
+ }
+
+ kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+
+ hash, replicas, err := kc.PutB([]byte("foo"))
+
+ c.Check(err, Equals, nil)
+ c.Check(hash, Equals, "")
+ c.Check(replicas, Equals, 2)
+}
// Take the hash of locator and timestamp in order to identify this
// specific transaction in log statements.
- requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+ requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
// Calculate the ordering for uploading to servers
sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots()
replicasPerThread = remaining_replicas
}
- for remaining_replicas > 0 {
- for active*replicasPerThread < remaining_replicas {
- // Start some upload requests
- if next_server < len(sv) {
- log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
- go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
- next_server += 1
- active += 1
- } else {
- if active == 0 {
- return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ retriesRemaining := 1 + this.Retries
+ var retryServers []string
+
+ for retriesRemaining > 0 {
+ retriesRemaining -= 1
+ next_server = 0
+ retryServers = []string{}
+ for remaining_replicas > 0 {
+ for active*replicasPerThread < remaining_replicas {
+ // Start some upload requests
+ if next_server < len(sv) {
+ log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+ go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+ next_server += 1
+ active += 1
} else {
- break
+ if active == 0 && retriesRemaining == 0 {
+ return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+ } else {
+ break
+ }
+ }
+ }
+ log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+ requestId, remaining_replicas, active)
+
+ // Now wait for something to happen.
+ if active > 0 {
+ status := <-upload_status
+ active -= 1
+
+ if status.statusCode == 200 {
+ // good news!
+ remaining_replicas -= status.replicas_stored
+ locator = status.response
+ } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+ (status.statusCode >= 500 && status.statusCode != 503) {
+ // Timeout, too many requests, or other server side failure
+ // Do not retry when status code is 503, which means the keep server is full
+ retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
}
+ } else {
+ break
}
}
- log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
- requestId, remaining_replicas, active)
-
- // Now wait for something to happen.
- status := <-upload_status
- active -= 1
- if status.statusCode == 200 {
- // good news!
- remaining_replicas -= status.replicas_stored
- locator = status.response
- }
+ sv = retryServers
}
return locator, this.Want_replicas, nil
from keep import *
from stream import *
from arvfile import StreamFileReader
+from retry import RetryLoop
import errors
import util
logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
else logging.WARNING)
-def task_set_output(self,s):
- api('v1').job_tasks().update(uuid=self['uuid'],
- body={
- 'output':s,
- 'success':True,
- 'progress':1.0
- }).execute()
+def task_set_output(self, s, num_retries=5):
+ for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
+ try:
+ return api('v1').job_tasks().update(
+ uuid=self['uuid'],
+ body={
+ 'output':s,
+ 'success':True,
+ 'progress':1.0
+ }).execute()
+ except errors.ApiError as error:
+ if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+ logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+ else:
+ raise
_current_task = None
-def current_task():
+def current_task(num_retries=5):
global _current_task
if _current_task:
return _current_task
- t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
- t = UserDict.UserDict(t)
- t.set_output = types.MethodType(task_set_output, t)
- t.tmpdir = os.environ['TASK_WORK']
- _current_task = t
- return t
+
+ for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
+ try:
+ task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
+ task = UserDict.UserDict(task)
+ task.set_output = types.MethodType(task_set_output, task)
+ task.tmpdir = os.environ['TASK_WORK']
+ _current_task = task
+ return task
+ except errors.ApiError as error:
+ if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+ logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+ else:
+ raise
_current_job = None
-def current_job():
+def current_job(num_retries=5):
global _current_job
if _current_job:
return _current_job
- t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
- t = UserDict.UserDict(t)
- t.tmpdir = os.environ['JOB_WORK']
- _current_job = t
- return t
+
+ for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
+ try:
+ job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
+ job = UserDict.UserDict(job)
+ job.tmpdir = os.environ['JOB_WORK']
+ _current_job = job
+ return job
+ except errors.ApiError as error:
+ if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+ logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+ else:
+ raise
def getjobparam(*args):
return current_job()['script_parameters'].get(*args)
import logging
import os
import re
+import socket
import types
import apiclient
# previous call did not succeed, so this is slightly
# risky.
return self.orig_http_request(uri, **kwargs)
+ except socket.error:
+ # This is the one case where httplib2 doesn't close the
+ # underlying connection first. Close all open connections,
+ # expecting this object only has the one connection to the API
+ # server. This is safe because httplib2 reopens connections when
+ # needed.
+ _logger.debug("Retrying API request after socket error", exc_info=True)
+ for conn in self.connections.itervalues():
+ conn.close()
+ return self.orig_http_request(uri, **kwargs)
def _patch_http_request(http, api_token):
http.arvados_api_token = api_token
else:
sp = os.path.split(root)
return is_in_collection(sp[0], os.path.join(sp[1], branch))
- except IOError, OSError:
+ except (IOError, OSError):
return (None, None)
# Determine the project to place the output of this command by searching upward
else:
sp = os.path.split(root)
return determine_project(sp[0], current_user)
- except IOError, OSError:
+ except (IOError, OSError):
return current_user
# Determine if string corresponds to a file, and if that file is part of a
Should be used in a "with" block.
"""
def __init__(self, todo):
+ self._started = 0
self._todo = todo
self._done = 0
self._response = None
+ self._start_lock = threading.Condition()
self._todo_lock = threading.Semaphore(todo)
self._done_lock = threading.Lock()
+ self._local = threading.local()
def __enter__(self):
+ self._start_lock.acquire()
+ if getattr(self._local, 'sequence', None) is not None:
+ # If the calling thread has used set_sequence(N), then
+ # we wait here until N other threads have started.
+ while self._started < self._local.sequence:
+ self._start_lock.wait()
self._todo_lock.acquire()
+ self._started += 1
+ self._start_lock.notifyAll()
+ self._start_lock.release()
return self
def __exit__(self, type, value, traceback):
self._todo_lock.release()
+ def set_sequence(self, sequence):
+ self._local.sequence = sequence
+
def shall_i_proceed(self):
"""
Return true if the current thread should do stuff. Return
return self._success
def run(self):
- with self.args['thread_limiter'] as limiter:
+ limiter = self.args['thread_limiter']
+ sequence = self.args['thread_sequence']
+ if sequence is not None:
+ limiter.set_sequence(sequence)
+ with limiter:
if not limiter.shall_i_proceed():
# My turn arrived, but the job has been done without
# me.
thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
loop = retry.RetryLoop(num_retries, self._check_loop_result,
backoff_start=2)
+ thread_sequence = 0
for tries_left in loop:
try:
- local_roots = self.map_new_services(
+ sorted_roots = self.map_new_services(
roots_map, locator,
force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
except Exception as error:
continue
threads = []
- for service_root, ks in roots_map.iteritems():
+ for service_root, ks in [(root, roots_map[root])
+ for root in sorted_roots]:
if ks.finished():
continue
t = KeepClient.KeepWriterThread(
data_hash=data_hash,
service_root=service_root,
thread_limiter=thread_limiter,
- timeout=self.current_timeout(num_retries-tries_left))
+ timeout=self.current_timeout(num_retries-tries_left),
+ thread_sequence=thread_sequence)
t.start()
threads.append(t)
+ thread_sequence += 1
for t in threads:
t.join()
loop.save_result((thread_limiter.done() >= copies, len(threads)))
data_hash, loop.last_result()))
else:
service_errors = ((key, roots_map[key].last_result()['error'])
- for key in local_roots
+ for key in sorted_roots
if roots_map[key].last_result()['error'])
raise arvados.errors.KeepWriteError(
"failed to write {} (wanted {} copies but wrote {})".format(
return mock.patch('httplib2.Http.request', side_effect=queue_with((
(fake_httplib2_response(code, **headers), body) for code in codes)))
+def mock_api_responses(api_client, body, codes, headers={}):
+ return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
+ (fake_httplib2_response(code, **headers), body) for code in codes)))
+
class FakeCurl:
@classmethod
events {
}
http {
- access_log /dev/stderr combined;
+ access_log {{ACCESSLOG}} combined;
upstream arv-git-http {
server localhost:{{GITPORT}};
}
from __future__ import print_function
import argparse
import atexit
+import errno
import httplib2
import os
import pipes
with open(PID_PATH, 'r') as f:
server_pid = int(f.read())
good_pid = (os.kill(server_pid, 0) is None)
- except IOError:
- good_pid = False
- except OSError:
+ except EnvironmentError:
good_pid = False
now = time.time()
os.getpgid(server_pid)
time.sleep(0.1)
now = time.time()
- except IOError:
- pass
- except OSError:
+ except EnvironmentError:
pass
def find_available_port():
nginxconf['GITSSLPORT'] = find_available_port()
nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
+ nginxconf['ACCESSLOG'] = os.path.join(TEST_TMPDIR, 'nginx_access_log.fifo')
conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
env = os.environ.copy()
env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin'
+
+ try:
+ os.remove(nginxconf['ACCESSLOG'])
+ except OSError as error:
+ if error.errno != errno.ENOENT:
+ raise
+
+ os.mkfifo(nginxconf['ACCESSLOG'], 0700)
nginx = subprocess.Popen(
['nginx',
'-g', 'error_log stderr info;',
'-g', 'pid '+_pidfile('nginx')+';',
'-c', conffile],
env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+ cat_access = subprocess.Popen(
+ ['cat', nginxconf['ACCESSLOG']],
+ stdout=sys.stderr)
_setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
_setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT'])
import json
import mimetypes
import os
-import run_test_server
+import socket
import string
import unittest
+
+import mock
+import run_test_server
+
from apiclient import errors as apiclient_errors
from apiclient import http as apiclient_http
from arvados.api import OrderedJsonModel
-
from arvados_testutil import fake_httplib2_response
if not mimetypes.inited:
mimetypes.init()
-class ArvadosApiClientTest(unittest.TestCase):
+class ArvadosApiTest(run_test_server.TestCaseWithServers):
+ MAIN_SERVER = {}
ERROR_HEADERS = {'Content-Type': mimetypes.types_map['.json']}
- @classmethod
- def api_error_response(cls, code, *errors):
- return (fake_httplib2_response(code, **cls.ERROR_HEADERS),
+ def api_error_response(self, code, *errors):
+ return (fake_httplib2_response(code, **self.ERROR_HEADERS),
json.dumps({'errors': errors,
'error_token': '1234567890+12345678'}))
- @classmethod
- def setUpClass(cls):
- # The apiclient library has support for mocking requests for
- # testing, but it doesn't extend to the discovery document
- # itself. For now, bring up an API server that will serve
- # a discovery document.
- # FIXME: Figure out a better way to stub this out.
- run_test_server.run()
- mock_responses = {
- 'arvados.humans.delete': (
- fake_httplib2_response(500, **cls.ERROR_HEADERS),
- ""),
- 'arvados.humans.get': cls.api_error_response(
- 422, "Bad UUID format", "Bad output format"),
- 'arvados.humans.list': (None, json.dumps(
- {'items_available': 0, 'items': []})),
- }
- req_builder = apiclient_http.RequestMockBuilder(mock_responses)
- cls.api = arvados.api('v1',
- host=os.environ['ARVADOS_API_HOST'],
- token='discovery-doc-only-no-token-needed',
- insecure=True,
- requestBuilder=req_builder)
-
- def tearDown(cls):
- run_test_server.reset()
-
def test_new_api_objects_with_cache(self):
- clients = [arvados.api('v1', cache=True,
- host=os.environ['ARVADOS_API_HOST'],
- token='discovery-doc-only-no-token-needed',
- insecure=True)
- for index in [0, 1]]
+ clients = [arvados.api('v1', cache=True) for index in [0, 1]]
self.assertIsNot(*clients)
def test_empty_list(self):
new_item['created_at']))
def test_exceptions_include_errors(self):
+ mock_responses = {
+ 'arvados.humans.get': self.api_error_response(
+ 422, "Bad UUID format", "Bad output format"),
+ }
+ req_builder = apiclient_http.RequestMockBuilder(mock_responses)
+ api = arvados.api('v1', requestBuilder=req_builder)
with self.assertRaises(apiclient_errors.HttpError) as err_ctx:
- self.api.humans().get(uuid='xyz-xyz-abcdef').execute()
+ api.humans().get(uuid='xyz-xyz-abcdef').execute()
err_s = str(err_ctx.exception)
for msg in ["Bad UUID format", "Bad output format"]:
self.assertIn(msg, err_s)
def test_exceptions_without_errors_have_basic_info(self):
+ mock_responses = {
+ 'arvados.humans.delete': (
+ fake_httplib2_response(500, **self.ERROR_HEADERS),
+ "")
+ }
+ req_builder = apiclient_http.RequestMockBuilder(mock_responses)
+ api = arvados.api('v1', requestBuilder=req_builder)
with self.assertRaises(apiclient_errors.HttpError) as err_ctx:
- self.api.humans().delete(uuid='xyz-xyz-abcdef').execute()
+ api.humans().delete(uuid='xyz-xyz-abcdef').execute()
self.assertIn("500", str(err_ctx.exception))
def test_request_too_large(self):
}
req_builder = apiclient_http.RequestMockBuilder(mock_responses)
api = arvados.api('v1',
- host=os.environ['ARVADOS_API_HOST'],
- token='discovery-doc-only-no-token-needed',
- insecure=True,
- requestBuilder=req_builder,
- model=OrderedJsonModel())
+ requestBuilder=req_builder, model=OrderedJsonModel())
result = api.humans().get(uuid='test').execute()
self.assertEqual(string.hexdigits, ''.join(result.keys()))
+ def test_socket_errors_retried(self):
+ api = arvados.api('v1')
+ self.assertTrue(hasattr(api._http, 'orig_http_request'),
+ "test doesn't know how to intercept HTTP requests")
+ api._http.orig_http_request = mock.MagicMock()
+ mock_response = {'user': 'person'}
+ api._http.orig_http_request.side_effect = [
+ socket.error("mock error"),
+ (fake_httplib2_response(200), json.dumps(mock_response))
+ ]
+ actual_response = api.users().current().execute()
+ self.assertEqual(mock_response, actual_response)
+ self.assertGreater(api._http.orig_http_request.call_count, 1,
+ "client got the right response without retrying")
+
if __name__ == '__main__':
unittest.main()
mock.responses[0].getopt(pycurl.TIMEOUT_MS),
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
- def test_probe_order_reference_set(self):
+ def check_no_services_error(self, verb, exc_class):
+ api_client = mock.MagicMock(name='api_client')
+ api_client.keep_services().accessible().execute.side_effect = (
+ arvados.errors.ApiError)
+ keep_client = arvados.KeepClient(api_client=api_client)
+ with self.assertRaises(exc_class) as err_check:
+ getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
+ self.assertEqual(0, len(err_check.exception.request_errors()))
+
+ def test_get_error_with_no_services(self):
+ self.check_no_services_error('get', arvados.errors.KeepReadError)
+
+ def test_put_error_with_no_services(self):
+ self.check_no_services_error('put', arvados.errors.KeepWriteError)
+
+ def check_errors_from_last_retry(self, verb, exc_class):
+ api_client = self.mock_keep_services(count=2)
+ req_mock = tutil.mock_keep_responses(
+ "retry error reporting test", 500, 500, 403, 403)
+ with req_mock, tutil.skip_sleep, \
+ self.assertRaises(exc_class) as err_check:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
+ num_retries=3)
+ self.assertEqual([403, 403], [
+ getattr(error, 'status_code', None)
+ for error in err_check.exception.request_errors().itervalues()])
+
+ def test_get_error_reflects_last_retry(self):
+ self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
+
+ def test_put_error_reflects_last_retry(self):
+ self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
+
+ def test_put_error_does_not_include_successful_puts(self):
+ data = 'partial failure test'
+ data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ api_client = self.mock_keep_services(count=3)
+ with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
+ self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client.put(data)
+ self.assertEqual(2, len(exc_check.exception.request_errors()))
+
+ def test_proxy_put_with_no_writable_services(self):
+ data = 'test with no writable services'
+ data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+ api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
+ with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
+ self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
+ keep_client = arvados.KeepClient(api_client=api_client)
+ keep_client.put(data)
+ self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
+ self.assertEqual(0, len(exc_check.exception.request_errors()))
+
+
+@tutil.skip_sleep
+class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+
+ def setUp(self):
# expected_order[i] is the probe order for
# hash=md5(sprintf("%064x",i)) where there are 16 services
# with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
# the first probe for the block consisting of 64 "0"
# characters is the service whose uuid is
# "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
- expected_order = [
+ self.services = 16
+ self.expected_order = [
list('3eab2d5fc9681074'),
list('097dba52e648f1c3'),
list('c5b4e023f8a7d691'),
list('9d81c02e76a3bf54'),
]
- hashes = [
- hashlib.md5("{:064x}".format(x)).hexdigest()
- for x in range(len(expected_order))]
- api_client = self.mock_keep_services(count=16)
- keep_client = arvados.KeepClient(api_client=api_client)
- for i, hash in enumerate(hashes):
- roots = keep_client.weighted_service_roots(arvados.KeepLocator(hash))
+ self.blocks = [
+ "{:064x}".format(x)
+ for x in range(len(self.expected_order))]
+ self.hashes = [
+ hashlib.md5(self.blocks[x]).hexdigest()
+ for x in range(len(self.expected_order))]
+ self.api_client = self.mock_keep_services(count=self.services)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
+
+ def test_weighted_service_roots_against_reference_set(self):
+ # Confirm weighted_service_roots() returns the correct order
+ for i, hash in enumerate(self.hashes):
+ roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
got_order = [
re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
for root in roots]
- self.assertEqual(expected_order[i], got_order)
+ self.assertEqual(self.expected_order[i], got_order)
+
+ def test_get_probe_order_against_reference_set(self):
+ self._test_probe_order_against_reference_set(
+ lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
+
+ def test_put_probe_order_against_reference_set(self):
+ # copies=1 prevents the test from being sensitive to races
+ # between writer threads.
+ self._test_probe_order_against_reference_set(
+ lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
+
+ def _test_probe_order_against_reference_set(self, op):
+ for i in range(len(self.blocks)):
+ with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
+ self.assertRaises(arvados.errors.KeepRequestError):
+ op(i)
+ got_order = [
+ re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
+ for resp in mock.responses]
+ self.assertEqual(self.expected_order[i]*2, got_order)
+
+ def test_put_probe_order_multiple_copies(self):
+ for copies in range(2, 4):
+ for i in range(len(self.blocks)):
+ with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
+ self.assertRaises(arvados.errors.KeepWriteError):
+ self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
+ got_order = [
+ re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
+ for resp in mock.responses]
+ # With T threads racing to make requests, the position
+ # of a given server in the sequence of HTTP requests
+ # (got_order) cannot be more than T-1 positions
+ # earlier than that server's position in the reference
+ # probe sequence (expected_order).
+ #
+ # Loop invariant: we have accounted for +pos+ expected
+ # probes, either by seeing them in +got_order+ or by
+ # putting them in +pending+ in the hope of seeing them
+ # later. As long as +len(pending)<T+, we haven't
+ # started a request too early.
+ pending = []
+ for pos, expected in enumerate(self.expected_order[i]*3):
+ got = got_order[pos-len(pending)]
+ while got in pending:
+ del pending[pending.index(got)]
+ got = got_order[pos-len(pending)]
+ if got != expected:
+ pending.append(expected)
+ self.assertLess(
+ len(pending), copies,
+ "pending={}, with copies={}, got {}, expected {}".format(
+ pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
def test_probe_waste_adding_one_server(self):
hashes = [
hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
initial_services = 12
- api_client = self.mock_keep_services(count=initial_services)
- keep_client = arvados.KeepClient(api_client=api_client)
+ self.api_client = self.mock_keep_services(count=initial_services)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
probes_before = [
- keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
+ self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
for added_services in range(1, 12):
api_client = self.mock_keep_services(count=initial_services+added_services)
keep_client = arvados.KeepClient(api_client=api_client)
data = hashlib.md5(data).hexdigest() + '+1234'
# Arbitrary port number:
aport = random.randint(1024,65535)
- api_client = self.mock_keep_services(service_port=aport, count=16)
+ api_client = self.mock_keep_services(service_port=aport, count=self.services)
keep_client = arvados.KeepClient(api_client=api_client)
with mock.patch('pycurl.Curl') as curl_mock, \
self.assertRaises(exc_class) as err_check:
def test_put_error_shows_probe_order(self):
self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
- def check_no_services_error(self, verb, exc_class):
- api_client = mock.MagicMock(name='api_client')
- api_client.keep_services().accessible().execute.side_effect = (
- arvados.errors.ApiError)
- keep_client = arvados.KeepClient(api_client=api_client)
- with self.assertRaises(exc_class) as err_check:
- getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
- self.assertEqual(0, len(err_check.exception.request_errors()))
-
- def test_get_error_with_no_services(self):
- self.check_no_services_error('get', arvados.errors.KeepReadError)
-
- def test_put_error_with_no_services(self):
- self.check_no_services_error('put', arvados.errors.KeepWriteError)
-
- def check_errors_from_last_retry(self, verb, exc_class):
- api_client = self.mock_keep_services(count=2)
- req_mock = tutil.mock_keep_responses(
- "retry error reporting test", 500, 500, 403, 403)
- with req_mock, tutil.skip_sleep, \
- self.assertRaises(exc_class) as err_check:
- keep_client = arvados.KeepClient(api_client=api_client)
- getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
- num_retries=3)
- self.assertEqual([403, 403], [
- getattr(error, 'status_code', None)
- for error in err_check.exception.request_errors().itervalues()])
-
- def test_get_error_reflects_last_retry(self):
- self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
-
- def test_put_error_reflects_last_retry(self):
- self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
-
- def test_put_error_does_not_include_successful_puts(self):
- data = 'partial failure test'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
- api_client = self.mock_keep_services(count=3)
- with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
- self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
- keep_client.put(data)
- self.assertEqual(2, len(exc_check.exception.request_errors()))
-
- def test_proxy_put_with_no_writable_services(self):
- data = 'test with no writable services'
- data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
- api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
- with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
- self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
- keep_client = arvados.KeepClient(api_client=api_client)
- keep_client.put(data)
- self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
- self.assertEqual(0, len(exc_check.exception.request_errors()))
class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
DATA = 'x' * 2**10
--- /dev/null
+#!/usr/bin/env python
+
+import mock
+import os
+import unittest
+import hashlib
+import run_test_server
+import json
+import arvados
+import arvados_testutil as tutil
+from apiclient import http as apiclient_http
+
+
+@tutil.skip_sleep
+class ApiClientRetryTestMixin(object):
+
+ TEST_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+ TEST_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
+
+ @classmethod
+ def setUpClass(cls):
+ run_test_server.run()
+
+ def setUp(self):
+ # Patch arvados.api() to return our mock API, so we can mock
+ # its http requests.
+ self.api_client = arvados.api('v1', cache=False)
+ self.api_patch = mock.patch('arvados.api', return_value=self.api_client)
+ self.api_patch.start()
+
+ def tearDown(self):
+ self.api_patch.stop()
+
+ def run_method(self):
+ raise NotImplementedError("test subclasses must define run_method")
+
+ def test_immediate_success(self):
+ with tutil.mock_api_responses(self.api_client, '{}', [200]):
+ self.run_method()
+
+ def test_immediate_failure(self):
+ with tutil.mock_api_responses(self.api_client, '{}', [400]), self.assertRaises(self.DEFAULT_EXCEPTION):
+ self.run_method()
+
+ def test_retry_then_success(self):
+ with tutil.mock_api_responses(self.api_client, '{}', [500, 200]):
+ self.run_method()
+
+ def test_error_after_default_retries_exhausted(self):
+ with tutil.mock_api_responses(self.api_client, '{}', [500, 500, 500, 500, 500, 500, 200]), self.assertRaises(self.DEFAULT_EXCEPTION):
+ self.run_method()
+
+ def test_no_retry_after_immediate_success(self):
+ with tutil.mock_api_responses(self.api_client, '{}', [200, 400]):
+ self.run_method()
+
+
+class CurrentJobTestCase(ApiClientRetryTestMixin, unittest.TestCase):
+
+ DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+ def setUp(self):
+ super(CurrentJobTestCase, self).setUp()
+ os.environ['JOB_UUID'] = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+ os.environ['JOB_WORK'] = '.'
+
+ def tearDown(self):
+ del os.environ['JOB_UUID']
+ del os.environ['JOB_WORK']
+ arvados._current_job = None
+ super(CurrentJobTestCase, self).tearDown()
+
+ def run_method(self):
+ arvados.current_job()
+
+
+class CurrentTaskTestCase(ApiClientRetryTestMixin, unittest.TestCase):
+
+ DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+ def setUp(self):
+ super(CurrentTaskTestCase, self).setUp()
+ os.environ['TASK_UUID'] = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+ os.environ['TASK_WORK'] = '.'
+
+ def tearDown(self):
+ del os.environ['TASK_UUID']
+ del os.environ['TASK_WORK']
+ arvados._current_task = None
+ super(CurrentTaskTestCase, self).tearDown()
+
+ def run_method(self):
+ arvados.current_task()
+
+
+class TaskSetOutputTestCase(CurrentTaskTestCase, unittest.TestCase):
+
+ DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+ def tearDown(self):
+ super(TaskSetOutputTestCase, self).tearDown()
+ run_test_server.reset()
+
+ def run_method(self, locator=ApiClientRetryTestMixin.TEST_LOCATOR):
+ arvados.task_set_output({'uuid':self.TEST_UUID},s=locator)
gem 'themes_for_rails'
gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20150605170031'
+gem 'arvados-cli', '>= 0.1.20151023185755'
# pg_power lets us use partial indexes in schema.rb in Rails 3
gem 'pg_power'
sso_app_id: ~
sso_provider_url: ~
workbench_address: ~
- websockets_address: ~
+ websocket_address: ~
#git_repositories_dir: ~
#git_internal_dir: ~
sso_app_secret: ~
sso_provider_url: ~
workbench_address: ~
- websockets_address: ~
+ websocket_address: ~
#git_repositories_dir: ~
#git_internal_dir: ~
}
h := newGitHandler()
h.(*gitHandler).Path = "/bin/sh"
- h.(*gitHandler).Args = []string{"-c", "echo HTTP/1.1 200 OK; echo Content-Type: text/plain; echo; env"}
+ h.(*gitHandler).Args = []string{"-c", "printf 'Content-Type: text/plain\r\n\r\n'; env"}
os.Setenv("GITOLITE_HTTP_HOME", "/test/ghh")
os.Setenv("GL_BYPASS_ACCESS_CHECKS", "yesplease")
c.Check(body, check.Matches, `(?ms).*^SERVER_ADDR=`+regexp.QuoteMeta(theConfig.Addr)+`$.*`)
}
-func (s *GitHandlerSuite) TestCGIError(c *check.C) {
+func (s *GitHandlerSuite) TestCGIErrorOnSplitHostPortError(c *check.C) {
u, err := url.Parse("git.zzzzz.arvadosapi.com/test")
c.Check(err, check.Equals, nil)
resp := httptest.NewRecorder()
req := &http.Request{
Method: "GET",
URL: u,
- RemoteAddr: "bogus",
+ RemoteAddr: "test.bad.address.missing.port",
}
h := newGitHandler()
h.ServeHTTP(resp, req)
// start api and keep servers
arvadostest.ResetEnv()
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
arv = makeArvadosClient()
}
func TearDownDataManagerTest(t *testing.T) {
- arvadostest.StopKeep()
+ arvadostest.StopKeep(2)
arvadostest.StopAPI()
}
log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
}
- switch err {
+ switch respErr := err.(type) {
case nil:
status = http.StatusOK
resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
err = ContentLengthMismatch
}
}
- case keepclient.BlockNotFound:
- status = http.StatusNotFound
+ case keepclient.Error:
+ if respErr == keepclient.BlockNotFound {
+ status = http.StatusNotFound
+ } else if respErr.Temporary() {
+ status = http.StatusBadGateway
+ } else {
+ status = 422
+ }
default:
- status = http.StatusBadGateway
+ status = http.StatusInternalServerError
}
}
// Tests that require the Keep server running
type ServerRequiredSuite struct{}
+// Gocheck boilerplate
+var _ = Suite(&NoKeepServerSuite{})
+
+// Test with no keepserver to simulate errors
+type NoKeepServerSuite struct{}
+
// Wait (up to 1 second) for keepproxy to listen on a port. This
// avoids a race condition where we hit a "connection refused" error
// because we start testing the proxy too soon.
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
}
func (s *ServerRequiredSuite) SetUpTest(c *C) {
}
func (s *ServerRequiredSuite) TearDownSuite(c *C) {
- arvadostest.StopKeep()
+ arvadostest.StopKeep(2)
+ arvadostest.StopAPI()
+}
+
+func (s *NoKeepServerSuite) SetUpSuite(c *C) {
+ arvadostest.StartAPI()
+}
+
+func (s *NoKeepServerSuite) SetUpTest(c *C) {
+ arvadostest.ResetEnv()
+}
+
+func (s *NoKeepServerSuite) TearDownSuite(c *C) {
arvadostest.StopAPI()
}
{
_, _, err := kc.Ask(hash)
- c.Check(err, Equals, keepclient.BlockNotFound)
+ errNotFound, _ := err.(keepclient.ErrNotFound)
+ c.Check(errNotFound, NotNil)
+ c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
log.Print("Ask 1")
}
{
blocklen, _, err := kc.Ask(hash)
- c.Assert(err, Equals, keepclient.BlockNotFound)
+ errNotFound, _ := err.(keepclient.ErrNotFound)
+ c.Check(errNotFound, NotNil)
+ c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
log.Print("Ask 2")
}
{
_, blocklen, _, err := kc.Get(hash)
- c.Assert(err, Equals, keepclient.BlockNotFound)
+ errNotFound, _ := err.(keepclient.ErrNotFound)
+ c.Check(errNotFound, NotNil)
+ c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
log.Print("Get")
}
{
_, _, err := kc.Ask(hash)
- c.Check(err, Equals, keepclient.BlockNotFound)
+ errNotFound, _ := err.(keepclient.ErrNotFound)
+ c.Check(errNotFound, NotNil)
+ c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
log.Print("Ask 1")
}
{
blocklen, _, err := kc.Ask(hash)
- c.Assert(err, Equals, keepclient.BlockNotFound)
+ errNotFound, _ := err.(keepclient.ErrNotFound)
+ c.Check(errNotFound, NotNil)
+ c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
log.Print("Ask 2")
}
{
_, blocklen, _, err := kc.Get(hash)
- c.Assert(err, Equals, keepclient.BlockNotFound)
+ errNotFound, _ := err.(keepclient.ErrNotFound)
+ c.Check(errNotFound, NotNil)
+ c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
log.Print("Get")
}
_, err = kc.GetIndex("proxy", "xyz")
c.Assert((err != nil), Equals, true)
}
+
+func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
+ kc := runProxy(c, []string{"keepproxy"}, 28852, false)
+ waitForListener()
+ defer closeListener()
+
+ // Put a test block
+ hash, rep, err := kc.PutB([]byte("foo"))
+ c.Check(err, Equals, nil)
+ c.Check(rep, Equals, 2)
+
+ for _, token := range []string{
+ "nosuchtoken",
+ "2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx", // expired
+ } {
+ // Change token to given bad token
+ kc.Arvados.ApiToken = token
+
+ // Ask should result in error
+ _, _, err = kc.Ask(hash)
+ c.Check(err, NotNil)
+ errNotFound, _ := err.(keepclient.ErrNotFound)
+ c.Check(errNotFound.Temporary(), Equals, false)
+ c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
+
+ // Get should result in error
+ _, _, _, err = kc.Get(hash)
+ c.Check(err, NotNil)
+ errNotFound, _ = err.(keepclient.ErrNotFound)
+ c.Check(errNotFound.Temporary(), Equals, false)
+ c.Assert(strings.Contains(err.Error(), "HTTP 403 \"Missing or invalid Authorization header\""), Equals, true)
+ }
+}
+
+func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, Equals, nil)
+
+ // keepclient with no such keep server
+ kc := keepclient.New(&arv)
+ locals := map[string]string{
+ "proxy": "http://localhost:12345",
+ }
+ kc.SetServiceRoots(locals, nil, nil)
+
+ // Ask should result in temporary connection refused error
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ _, _, err = kc.Ask(hash)
+ c.Check(err, NotNil)
+ errNotFound, _ := err.(*keepclient.ErrNotFound)
+ c.Check(errNotFound.Temporary(), Equals, true)
+ c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+
+ // Get should result in temporary connection refused error
+ _, _, _, err = kc.Get(hash)
+ c.Check(err, NotNil)
+ errNotFound, _ = err.(*keepclient.ErrNotFound)
+ c.Check(errNotFound.Temporary(), Equals, true)
+ c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+}
+
+func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
+ kc := runProxy(c, []string{"keepproxy"}, 29999, false)
+ waitForListener()
+ defer closeListener()
+
+ // Ask should result in temporary connection refused error
+ hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+ _, _, err := kc.Ask(hash)
+ c.Check(err, NotNil)
+ errNotFound, _ := err.(*keepclient.ErrNotFound)
+ c.Check(errNotFound.Temporary(), Equals, true)
+ c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+
+ // Get should result in temporary connection refused error
+ _, _, _, err = kc.Get(hash)
+ c.Check(err, NotNil)
+ errNotFound, _ = err.(*keepclient.ErrNotFound)
+ c.Check(errNotFound.Temporary(), Equals, true)
+ c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+}
// start api and keep servers
arvadostest.StartAPI()
- arvadostest.StartKeep()
+ arvadostest.StartKeep(2, false)
// make arvadosclient
arv, err := arvadosclient.MakeArvadosClient()
def _get_slurm_state(self):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+ # The following methods retry on OSError. This is intended to mitigate bug
+ # #6321 where fork() of node manager raises "OSError: [Errno 12] Cannot
+ # allocate memory" resulting in the untimely death of the shutdown actor
+ # and tends to result in node manager getting into a wedged state where it
+ # won't allocate new nodes or shut down gracefully. The underlying causes
+ # of the excessive memory usage that result in the "Cannot allocate memory"
+ # error are still being investigated.
+
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
def cancel_shutdown(self):
if self._nodename:
if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
pass
return super(ComputeNodeShutdownActor, self).cancel_shutdown()
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
def issue_slurm_drain(self):
self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
+ @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
@ShutdownActorBase._stop_if_window_closed
- @ShutdownActorBase._retry((subprocess.CalledProcessError,))
def await_slurm_drain(self):
output = self._get_slurm_state()
if output in self.SLURM_END_STATES:
self.check_success_flag(False, 2)
self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99')
+ def test_cancel_shutdown_retry(self, proc_mock):
+ proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n'])
+ self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+ self.make_actor()
+ self.check_success_flag(False, 2)
+
+ def test_issue_slurm_drain_retry(self, proc_mock):
+ proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+ self.check_success_after_reset(proc_mock)
def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
proc_mock.return_value = 'drain\n'
)
func main() {
- var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
- var replications int
- var srcBlobSigningKey string
+ err := doMain()
+ if err != nil {
+ log.Fatalf("%v", err)
+ }
+}
- flag.StringVar(
- &srcConfigFile,
+func doMain() error {
+ flags := flag.NewFlagSet("keep-rsync", flag.ExitOnError)
+
+ srcConfigFile := flags.String(
"src",
"",
- "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+ "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf file. This file is expected to specify the values for ARVADOS_API_TOKEN, ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, and ARVADOS_BLOB_SIGNING_KEY for the source.")
- flag.StringVar(
- &dstConfigFile,
+ dstConfigFile := flags.String(
"dst",
"",
- "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+ "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf file. This file is expected to specify the values for ARVADOS_API_TOKEN, ARVADOS_API_HOST, and ARVADOS_API_HOST_INSECURE for the destination.")
- flag.StringVar(
- &srcKeepServicesJSON,
+ srcKeepServicesJSON := flags.String(
"src-keep-services-json",
"",
"An optional list of available source keepservices. "+
"If not provided, this list is obtained from api server configured in src-config-file.")
- flag.StringVar(
- &dstKeepServicesJSON,
+ dstKeepServicesJSON := flags.String(
"dst-keep-services-json",
"",
"An optional list of available destination keepservices. "+
"If not provided, this list is obtained from api server configured in dst-config-file.")
- flag.IntVar(
- &replications,
+ replications := flags.Int(
"replications",
0,
"Number of replications to write to the destination. If replications not specified, "+
"default replication level configured on destination server will be used.")
- flag.StringVar(
- &prefix,
+ prefix := flags.String(
"prefix",
"",
"Index prefix")
- flag.Parse()
+ // Parse args; omit the first arg which is the command name
+ flags.Parse(os.Args[1:])
- srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
+ srcConfig, srcBlobSigningKey, err := loadConfig(*srcConfigFile)
if err != nil {
- log.Fatalf("Error loading src configuration from file: %s", err.Error())
+ return fmt.Errorf("Error loading src configuration from file: %s", err.Error())
}
- dstConfig, _, err := loadConfig(dstConfigFile)
+ dstConfig, _, err := loadConfig(*dstConfigFile)
if err != nil {
- log.Fatalf("Error loading dst configuration from file: %s", err.Error())
+ return fmt.Errorf("Error loading dst configuration from file: %s", err.Error())
}
// setup src and dst keepclients
- kcSrc, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+ kcSrc, err := setupKeepClient(srcConfig, *srcKeepServicesJSON, false, 0)
if err != nil {
- log.Fatalf("Error configuring src keepclient: %s", err.Error())
+ return fmt.Errorf("Error configuring src keepclient: %s", err.Error())
}
- kcDst, err := setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+ kcDst, err := setupKeepClient(dstConfig, *dstKeepServicesJSON, true, *replications)
if err != nil {
- log.Fatalf("Error configuring dst keepclient: %s", err.Error())
+ return fmt.Errorf("Error configuring dst keepclient: %s", err.Error())
}
// Copy blocks not found in dst from src
- err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
+ err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, *prefix)
if err != nil {
- log.Fatalf("Error while syncing data: %s", err.Error())
+ return fmt.Errorf("Error while syncing data: %s", err.Error())
}
+
+ return nil
}
type apiConfig struct {
"crypto/md5"
"fmt"
"io/ioutil"
+ "log"
"os"
"strings"
"testing"
// Gocheck boilerplate
var _ = Suite(&ServerRequiredSuite{})
var _ = Suite(&ServerNotRequiredSuite{})
+var _ = Suite(&DoMainTestSuite{})
// Tests that require the Keep server running
type ServerRequiredSuite struct{}
type ServerNotRequiredSuite struct{}
+type DoMainTestSuite struct{}
func (s *ServerRequiredSuite) SetUpSuite(c *C) {
// Start API server
arvadostest.ResetEnv()
}
+var initialArgs []string
+
+func (s *DoMainTestSuite) SetUpSuite(c *C) {
+ initialArgs = os.Args
+}
+
var kcSrc, kcDst *keepclient.KeepClient
var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
}
func (s *ServerRequiredSuite) TearDownTest(c *C) {
- arvadostest.StopKeepWithParams(3)
+ arvadostest.StopKeep(3)
+}
+
+func (s *DoMainTestSuite) SetUpTest(c *C) {
+ args := []string{"keep-rsync"}
+ os.Args = args
+}
+
+func (s *DoMainTestSuite) TearDownTest(c *C) {
+ os.Args = initialArgs
}
var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
}
// Start Keep servers
- arvadostest.StartAPI()
- arvadostest.StartKeepWithParams(3, enforcePermissions)
+ arvadostest.StartKeep(3, enforcePermissions)
// setup keepclients
var err error
setupRsync(c, false, 1)
err := performKeepRsync(kcSrc, kcDst, "", "")
- c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+ log.Printf("Err = %v", err)
+ c.Check(strings.Contains(err.Error(), "no such host"), Equals, true)
}
// Setup rsync using dstKeepServicesJSON with fake keepservers.
setupRsync(c, false, 1)
err := performKeepRsync(kcSrc, kcDst, "", "")
- c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+ log.Printf("Err = %v", err)
+ c.Check(strings.Contains(err.Error(), "no such host"), Equals, true)
}
// Test rsync with signature error during Get from src.
blobSigningKey = "thisisfakeblobsigningkey"
err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
- c.Check(strings.HasSuffix(err.Error(), "Block not found"), Equals, true)
+ c.Check(strings.Contains(err.Error(), "HTTP 403 \"Forbidden\""), Equals, true)
}
// Test rsync with error during Put to src.
kcDst.Want_replicas = 2
err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
- c.Check(strings.HasSuffix(err.Error(), "Could not write sufficient replicas"), Equals, true)
+ c.Check(strings.Contains(err.Error(), "Could not write sufficient replicas"), Equals, true)
}
// Test loadConfig func
srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
c.Check(err, IsNil)
- c.Assert(srcConfig.APIHost, Equals, "testhost")
- c.Assert(srcConfig.APIToken, Equals, "testtoken")
- c.Assert(srcConfig.APIHostInsecure, Equals, true)
+ c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Assert(srcConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Assert(srcConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
c.Assert(srcConfig.ExternalClient, Equals, false)
dstConfig, _, err := loadConfig(dstConfigFile)
c.Check(err, IsNil)
- c.Assert(dstConfig.APIHost, Equals, "testhost")
- c.Assert(dstConfig.APIToken, Equals, "testtoken")
- c.Assert(dstConfig.APIHostInsecure, Equals, true)
+ c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Assert(dstConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+ c.Assert(dstConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
c.Assert(dstConfig.ExternalClient, Equals, false)
c.Assert(srcBlobSigningKey, Equals, "abcdefg")
// Test loadConfig func - error reading config
func (s *ServerNotRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) {
_, _, err := loadConfig("no-such-config-file")
- c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true)
+ c.Assert(strings.Contains(err.Error(), "no such file or directory"), Equals, true)
}
func setupConfigFile(c *C, name string) *os.File {
file, err := ioutil.TempFile(os.TempDir(), name)
c.Check(err, IsNil)
- fileContent := "ARVADOS_API_HOST=testhost\n"
- fileContent += "ARVADOS_API_TOKEN=testtoken\n"
- fileContent += "ARVADOS_API_HOST_INSECURE=true\n"
+ fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n"
+ fileContent += "ARVADOS_API_TOKEN=" + os.Getenv("ARVADOS_API_TOKEN") + "\n"
+ fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
+ fileContent += "ARVADOS_EXTERNAL_CLIENT=false\n"
fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
_, err = file.Write([]byte(fileContent))
return file
}
+
+func (s *DoMainTestSuite) Test_doMain_NoSrcConfig(c *C) {
+ err := doMain()
+ c.Check(err, NotNil)
+ c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMain_SrcButNoDstConfig(c *C) {
+ srcConfig := setupConfigFile(c, "src")
+ args := []string{"-replications", "3", "-src", srcConfig.Name()}
+ os.Args = append(os.Args, args...)
+ err := doMain()
+ c.Check(err, NotNil)
+ c.Assert(err.Error(), Equals, "Error loading dst configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMain_BadSrcConfig(c *C) {
+ args := []string{"-src", "abcd"}
+ os.Args = append(os.Args, args...)
+ err := doMain()
+ c.Check(err, NotNil)
+ c.Assert(strings.HasPrefix(err.Error(), "Error loading src configuration from file: Error reading config file"), Equals, true)
+}
+
+func (s *DoMainTestSuite) Test_doMain_WithReplicationsButNoSrcConfig(c *C) {
+ args := []string{"-replications", "3"}
+ os.Args = append(os.Args, args...)
+ err := doMain()
+ c.Check(err, NotNil)
+ c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMainWithSrcAndDstConfig(c *C) {
+ srcConfig := setupConfigFile(c, "src")
+ dstConfig := setupConfigFile(c, "dst")
+ args := []string{"-src", srcConfig.Name(), "-dst", dstConfig.Name()}
+ os.Args = append(os.Args, args...)
+
+ // Start keepservers. Since we are not doing any tweaking as in setupRsync func,
+ // kcSrc and kcDst will be the same and no actual copying to dst will happen, but that's ok.
+ arvadostest.StartKeep(2, false)
+
+ err := doMain()
+ c.Check(err, IsNil)
+}