5824: Merge branch 'master' into 5824-keep-web
authorTom Clegg <tom@curoverse.com>
Thu, 29 Oct 2015 20:31:37 +0000 (16:31 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 29 Oct 2015 20:31:37 +0000 (16:31 -0400)
63 files changed:
README
apps/workbench/Gemfile.lock
apps/workbench/app/assets/javascripts/add_group.js [new file with mode: 0644]
apps/workbench/app/models/group.rb
apps/workbench/app/models/keep_disk.rb
apps/workbench/app/models/keep_service.rb
apps/workbench/app/models/link.rb
apps/workbench/app/models/node.rb
apps/workbench/app/models/user.rb
apps/workbench/app/models/virtual_machine.rb
apps/workbench/app/views/getting_started/_getting_started_popup.html.erb
apps/workbench/app/views/users/_add_group_modal.html.erb [new file with mode: 0644]
apps/workbench/app/views/users/_show_admin.html.erb
apps/workbench/test/integration/application_layout_test.rb
apps/workbench/test/integration/errors_test.rb
apps/workbench/test/integration/users_test.rb
apps/workbench/test/integration/virtual_machines_test.rb
doc/_config.yml
doc/_includes/_run_command_simple_example.liquid
doc/_includes/_ssh_addkey.liquid
doc/_includes/_tutorial_bwa_sortsam_pipeline.liquid
doc/api/schema/Job.html.textile.liquid
doc/index.html.liquid
doc/install/configure-azure-blob-storage.html.textile.liquid [new file with mode: 0644]
doc/install/install-keepstore.html.textile.liquid
doc/install/install-sso.html.textile.liquid
doc/user/getting_started/ssh-access-unix.html.textile.liquid
doc/user/getting_started/ssh-access-windows.html.textile.liquid
doc/user/getting_started/vm-login-with-webshell.html.textile.liquid
doc/user/index.html.textile.liquid
doc/user/topics/arv-docker.html.textile.liquid
doc/user/tutorials/tutorial-pipeline-workbench.html.textile.liquid
sdk/cli/bin/crunch-job
sdk/go/arvadosclient/arvadosclient_test.go
sdk/go/arvadostest/run_servers.go
sdk/go/crunchrunner/crunchrunner.go [new file with mode: 0644]
sdk/go/crunchrunner/crunchrunner_test.go [new file with mode: 0644]
sdk/go/crunchrunner/upload.go [new file with mode: 0644]
sdk/go/crunchrunner/upload_test.go [new file with mode: 0644]
sdk/go/keepclient/keepclient.go
sdk/go/keepclient/keepclient_test.go
sdk/go/keepclient/support.go
sdk/python/arvados/__init__.py
sdk/python/arvados/api.py
sdk/python/arvados/commands/run.py
sdk/python/arvados/keep.py
sdk/python/tests/arvados_testutil.py
sdk/python/tests/nginx.conf
sdk/python/tests/run_test_server.py
sdk/python/tests/test_api.py
sdk/python/tests/test_keep_client.py
sdk/python/tests/test_retry_job_helpers.py [new file with mode: 0644]
services/api/Gemfile
services/api/config/application.yml.example
services/arv-git-httpd/git_handler_test.go
services/datamanager/datamanager_test.go
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepstore/pull_worker_integration_test.go
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
tools/keep-rsync/keep-rsync.go
tools/keep-rsync/keep-rsync_test.go

diff --git a/README b/README
index c7a36c355b4a2b94dfab45c9748330022a788c91..7a2c5ef018a9827db24212beb38d2579bbe1438f 100644 (file)
--- a/README
+++ b/README
@@ -4,10 +4,10 @@ The main Arvados web site is
   https://arvados.org
 
 The Arvados public wiki is located at 
-  https://arvados.org/projects/arvados/wiki
+  https://dev.arvados.org/projects/arvados/wiki
 
 The Arvados public bug tracker is located at 
-  https://arvados.org/projects/arvados/issues
+  https://dev.arvados.org/projects/arvados/issues
 
 For support see 
   http://doc.arvados.org/user/getting_started/community.html
index 20b8d6164ccca273e11756928a21c1a17851f07a..8b2118ce087d220c32edf4acd35e7a934404a946 100644 (file)
@@ -74,7 +74,7 @@ GEM
       rack (>= 1.0.0)
       rack-test (>= 0.5.4)
       xpath (~> 2.0)
-    childprocess (0.5.5)
+    childprocess (0.5.6)
       ffi (~> 1.0, >= 1.0.11)
     cliver (0.3.2)
     coffee-rails (4.1.0)
@@ -98,7 +98,7 @@ GEM
     fast_stack (0.1.0)
       rake
       rake-compiler
-    ffi (1.9.6)
+    ffi (1.9.10)
     flamegraph (0.1.0)
       fast_stack
     google-api-client (0.6.4)
@@ -139,7 +139,7 @@ GEM
       metaclass (~> 0.0.1)
     morrisjs-rails (0.5.1)
       railties (> 3.1, < 5)
-    multi_json (1.11.1)
+    multi_json (1.11.2)
     multipart-post (1.2.0)
     net-scp (1.2.1)
       net-ssh (>= 2.6.5)
@@ -192,7 +192,7 @@ GEM
     ref (1.0.5)
     ruby-debug-passenger (0.2.0)
     ruby-prof (0.15.2)
-    rubyzip (1.1.6)
+    rubyzip (1.1.7)
     rvm-capistrano (1.5.5)
       capistrano (~> 2.15.4)
     sass (3.4.9)
@@ -202,7 +202,7 @@ GEM
       sprockets (>= 2.8, < 4.0)
       sprockets-rails (>= 2.0, < 4.0)
       tilt (~> 1.1)
-    selenium-webdriver (2.44.0)
+    selenium-webdriver (2.48.1)
       childprocess (~> 0.5)
       multi_json (~> 1.0)
       rubyzip (~> 1.0)
@@ -239,7 +239,7 @@ GEM
       execjs (>= 0.3.0)
       json (>= 1.8.0)
     uuidtools (2.1.5)
-    websocket (1.2.1)
+    websocket (1.2.2)
     websocket-driver (0.5.1)
       websocket-extensions (>= 0.1.0)
     websocket-extensions (0.1.1)
@@ -294,3 +294,6 @@ DEPENDENCIES
   therubyracer
   uglifier (>= 1.0.3)
   wiselinks
+
+BUNDLED WITH
+   1.10.6
diff --git a/apps/workbench/app/assets/javascripts/add_group.js b/apps/workbench/app/assets/javascripts/add_group.js
new file mode 100644 (file)
index 0000000..16d9c2d
--- /dev/null
@@ -0,0 +1,44 @@
+$(document).on('shown.bs.modal', '#add-group-modal', function(event) {
+    // Disable the submit button on modal loading
+    $submit = $('#add-group-submit');
+    $submit.prop('disabled', true);
+
+    $('input[type=text]', event.target).val('');
+    $('#add-group-error', event.target).hide();
+}).on('input propertychange', '#group_name_input', function(event) {
+    group_name = $(event.target).val();
+    $submit = $('#add-group-submit');
+    $submit.prop('disabled', (group_name === null || group_name === ""));
+}).on('submit', '#add-group-form', function(event) {
+    var $form = $(event.target),
+    $submit = $(':submit', $form),
+    $error = $('#add-group-error', $form),
+    group_name = $('input[name="group_name_input"]', $form).val();
+
+    $submit.prop('disabled', true);
+
+    $error.hide();
+    $.ajax('/groups',
+           {method: 'POST',
+            dataType: 'json',
+            data: {group: {name: group_name, group_class: 'role'}},
+            context: $form}).
+        done(function(data, status, jqxhr) {
+            location.reload();
+        }).
+        fail(function(jqxhr, status, error) {
+            var errlist = jqxhr.responseJSON.errors;
+            var errmsg;
+            if (Array.isArray(errlist)) {
+                errmsg = errlist.join();
+            } else {
+                errmsg = ("The server returned an error when creating " +
+                          "this group (status " + jqxhr.status +
+                          ": " + errlist + ").");
+            }
+            $error.text(errmsg);
+            $error.show();
+            $submit.prop('disabled', false);
+        });
+    return false;
+});
index 3f5da155c4ca9fc5ea13d0836551a92c3c11d435..0d358603a830b821a7f4c20874d01dec185b1e9c 100644 (file)
@@ -32,4 +32,8 @@ class Group < ArvadosBase
   def textile_attributes
     [ 'description' ]
   end
+
+  def self.creatable?
+    false
+  end
 end
index 8ced4eb5f331a3a837ccb6a12a6d34b8374a236b..6438fc72d694713ec4656e4417d67edd625a15e4 100644 (file)
@@ -1,5 +1,5 @@
 class KeepDisk < ArvadosBase
   def self.creatable?
-    current_user and current_user.is_admin
+    false
   end
 end
index f27e369b86cf04188d17126920b804022dd16ec5..0c998c4591fcb45eacc5e62165823ecebf8d9cc8 100644 (file)
@@ -1,5 +1,5 @@
 class KeepService < ArvadosBase
   def self.creatable?
-    current_user and current_user.is_admin
+    false
   end
 end
index 271fa0f0103eac4e3197d417ffc31430285b6643..b1bbcff1265ac0b545e210ab7d6621c7dce0b7a8 100644 (file)
@@ -18,4 +18,8 @@ class Link < ArvadosBase
     result = arvados_api_client.api("permissions", "/#{uuid}")
     arvados_api_client.unpack_api_response(result)
   end
+
+  def self.creatable?
+    false
+  end
 end
index e66be83078c9f5dbe2fff3be09dcba17c5fb4bf9..8bf98c6decb3865054c6339105c688f2ec89fcf4 100644 (file)
@@ -1,6 +1,6 @@
 class Node < ArvadosBase
   def self.creatable?
-    current_user and current_user.is_admin
+    false
   end
   def friendly_link_name lookup=nil
     (hostname && !hostname.empty?) ? hostname : uuid
index 3b5b3083fc531b5b97c937bb326d3c32e377f202..8df16f29a44f84ed540ad1c292e9ee5e92da54e4 100644 (file)
@@ -63,4 +63,8 @@ class User < ArvadosBase
   def deletable?
     false
   end
+
+   def self.creatable?
+    current_user and current_user.is_admin
+   end
 end
index 3b44397df5459efb7074f46bd094826192e061eb..e1a208c53d70f4d02828e6a7215b976be2fe616a 100644 (file)
@@ -1,20 +1,25 @@
 class VirtualMachine < ArvadosBase
   attr_accessor :current_user_logins
+
   def self.creatable?
-    current_user.andand.is_admin
+    false
   end
+
   def attributes_for_display
     super.append ['current_user_logins', @current_user_logins]
   end
+
   def editable_attributes
     super - %w(current_user_logins)
   end
+
   def self.attribute_info
     merger = ->(k,a,b) { a.merge(b, &merger) }
     merger [nil,
             {current_user_logins: {column_heading: "logins", type: 'array'}},
             super]
   end
+
   def friendly_link_name lookup=nil
     (hostname && !hostname.empty?) ? hostname : uuid
   end
index 0db0567ec98b22495b6e063479a60ba753b5bc83..3020a1249bca287a41103b6f5924c134b2090adc 100644 (file)
@@ -154,7 +154,7 @@ div.figure p {
             </li><li>
               <strong>Use existing pipelines</strong>: Use best-practices pipelines on your own data with the click of a button.
             </li><li>
-              <strong>Open-source</strong>: Arvados is completely open-source. Check out our <a href="http://arvados.org">developer site</a>.
+              <strong>Open source</strong>: Arvados is completely open source. Check out our <a href="http://dev.arvados.org">developer site</a>.
             </li>
           </ol>
           <p style="margin-top: 1em;">
diff --git a/apps/workbench/app/views/users/_add_group_modal.html.erb b/apps/workbench/app/views/users/_add_group_modal.html.erb
new file mode 100644 (file)
index 0000000..8230e56
--- /dev/null
@@ -0,0 +1,27 @@
+<div class="modal" id="add-group-modal" tabindex="-1" role="dialog" aria-labelledby="add-group-label" aria-hidden="true">
+  <div class="modal-dialog">
+    <div class="modal-content">
+      <form id="add-group-form">
+        <div class="modal-header">
+          <button type="button" class="close" data-dismiss="modal" aria-label="Close"><span aria-hidden="true">&times;</span></button>
+          <h4 class="modal-title" id="add-group-label">Add new group</h4>
+        </div>
+        <div class="modal-body form-horizontal">
+          <div class="form-group">
+            <label for="group_name_input" class="col-sm-1 control-label">Name</label>
+            <div class="col-sm-9">
+              <div class="input-group-name">
+                <input type="text" class="form-control" id="group_name_input" name="group_name_input" placeholder="Enter group name"/>
+              </div>
+            </div>
+          </div>
+          <p id="add-group-error" class="alert alert-danger"></p>
+        </div>
+        <div class="modal-footer">
+          <button type="button" class="btn btn-default" data-dismiss="modal">Cancel</button>
+          <input type="submit" class="btn btn-primary" id="add-group-submit" name="submit" value="Create">
+        </div>
+      </form>
+    </div>
+  </div>
+</div>
index 54643a1c189476ae55b99e526b26ad85cd492ae8..f83f9a05fd4b35739e4750780de5f817dace5927 100644 (file)
     <div class="panel panel-default">
       <div class="panel-heading">
         Group memberships
+
+        <div class="pull-right">
+          <%= link_to raw('<i class="fa fa-plus"></i> Add new group'), "#",
+                       {class: 'btn btn-xs btn-primary', 'data-toggle' => "modal",
+                        'data-target' => '#add-group-modal'}  %>
+        </div>
       </div>
       <div class="panel-body">
         <div class="alert alert-info">
@@ -92,7 +98,7 @@
         </form>
       </div>
       <div class="panel-footer">
-        To manage these groups (roles), use:
+        These groups (roles) can also be managed from the command line. For example:
         <ul>
           <li><code>arv group create \<br/>--group '{"group_class":"role","name":"New group"}'</code></li>
           <li><code>arv group list \<br/>--filters '[["group_class","=","role"]]' \<br/>--select '["uuid","name"]'</code></li>
 </div>
 
 <div id="user-setup-modal-window" class="modal fade" role="dialog" aria-labelledby="myModalLabel" aria-hidden="true"></div>
+<%= render partial: "add_group_modal" %>
index 61ba16294f85f5d7bacaf02486b3f83a2e253fac..db072e496be7affdddec7d2f8cf2a164ed67bc5b 100644 (file)
@@ -218,14 +218,14 @@ class ApplicationLayoutTest < ActionDispatch::IntegrationTest
   end
 
    [
-    ['Repositories',nil,'s0uqq'],
-    ['Virtual machines','virtual machine','current_user_logins'],
-    ['SSH keys',nil,'public_key'],
-    ['Links','link','link_class'],
-    ['Groups','group','group_class'],
-    ['Compute nodes','node','info[ping_secret'],
-    ['Keep services','keep service','service_ssl_flag'],
-    ['Keep disks', 'keep disk','bytes_free'],
+    ['Repositories', nil, 's0uqq'],
+    ['Virtual machines', nil, 'testvm.shell'],
+    ['SSH keys', nil, 'public_key'],
+    ['Links', nil, 'link_class'],
+    ['Groups', nil, 'All users'],
+    ['Compute nodes', nil, 'ping_secret'],
+    ['Keep services', nil, 'service_ssl_flag'],
+    ['Keep disks', nil, 'bytes_free'],
   ].each do |page_name, add_button_text, look_for|
     test "test system menu #{page_name} link" do
       visit page_with_token('admin')
index f2067a92bfdb27233b59fbc4ddb730156c5ce8c8..32f6e027557b838980df3769b88a8aa49f2e3a04 100644 (file)
@@ -46,18 +46,6 @@ class ErrorsTest < ActionDispatch::IntegrationTest
     page.html =~ /\b(#{matching_stamps})\+[0-9A-Fa-f]{8}\b/
   end
 
-  # We use API tokens with limited scopes as the quickest way to get the API
-  # server to return an error.  If Workbench gets smarter about coping when
-  # it has a too-limited token, these tests will need to be adjusted.
-  test "API error page includes error token" do
-    start_stamp = now_timestamp
-    visit(page_with_token("active_readonly", "/groups"))
-    click_on "Add a new group"
-    assert(page.has_text?(/fiddlesticks/i),
-           "Not on an error page after making a group out of scope")
-    assert(page_has_error_token?(start_stamp), "no error token on 404 page")
-  end
-
   test "showing a bad UUID returns 404" do
     visit(page_with_token("active", "/pipeline_templates/zzz"))
     assert(page.has_no_text?(/fiddlesticks/i),
index 1ae302c23947c2968d194fdb006bcdaf3561be04..90a3eb2333fedeae32b80ef9146323689bf2fbca 100644 (file)
@@ -197,4 +197,26 @@ class UsersTest < ActionDispatch::IntegrationTest
     click_link 'Metadata'
     assert page.has_text? 'VirtualMachine: testvm.shell'
   end
+
+  test "test add group button" do
+    need_javascript
+
+    user_url = "/users/#{api_fixture('users')['active']['uuid']}"
+    visit page_with_token('admin_trustedclient', user_url)
+
+    # Setup user
+    click_link 'Admin'
+    assert page.has_text? 'As an admin, you can setup'
+
+    click_link 'Add new group'
+
+    within '.modal-content' do
+      fill_in "group_name_input", :with => "test-group-added-in-modal"
+      click_button "Create"
+    end
+    wait_for_ajax
+
+    # Back in the user "Admin" tab
+    assert page.has_text? 'test-group-added-in-modal'
+  end
 end
index 1d398a595ce854335df8e3c1d5a434632ed89728..a22337bcb4469e96a08787338998d286ec484ca3 100644 (file)
@@ -1,18 +1,4 @@
 require 'integration_helper'
 
 class VirtualMachinesTest < ActionDispatch::IntegrationTest
-  test "make and name a new virtual machine" do
-    need_javascript
-    visit page_with_token('admin_trustedclient')
-    find('#system-menu').click
-    click_link 'Virtual machines'
-    assert page.has_text? 'testvm.shell'
-    click_on 'Add a new virtual machine'
-    find('tr', text: 'hostname').
-      find('a[data-original-title=edit]').click
-    assert page.has_text? 'Edit hostname'
-    fill_in 'editable-text', with: 'testname'
-    click_button 'editable-submit'
-    assert page.has_text? 'testname'
-  end
 end
index 6d0bdb2f28a01e0329b4b3ed80ec9f0326e02243..2f37f5af0803c360f2c0cce2d08389d64b21249f 100644 (file)
@@ -153,6 +153,7 @@ navbar:
       - install/install-shell-server.html.textile.liquid
       - install/create-standard-objects.html.textile.liquid
       - install/install-keepstore.html.textile.liquid
+      - install/configure-azure-blob-storage.html.textile.liquid
       - install/install-keepproxy.html.textile.liquid
       #- install/install-keep-web.html.textile.liquid
       - install/install-crunch-dispatch.html.textile.liquid
index abd00715038a55cb799e0b8e7f6e68387277349a..3f695934aecff49dc71094eb93b6dffa07dc01bb 100644 (file)
@@ -1,29 +1,36 @@
 {
     "name":"run-command example pipeline",
     "components":{
-        "bwa-mem": {
+         "bwa-mem": {
             "script": "run-command",
             "script_version": "master",
             "repository": "arvados",
             "script_parameters": {
                 "command": [
-                    "bwa",
+                    "$(dir $(bwa_collection))/bwa",
                     "mem",
                     "-t",
                     "$(node.cores)",
+                    "-R",
+                    "@RG\\\tID:group_id\\\tPL:illumina\\\tSM:sample_id",
                     "$(glob $(dir $(reference_collection))/*.fasta)",
                     "$(glob $(dir $(sample))/*_1.fastq)",
                     "$(glob $(dir $(sample))/*_2.fastq)"
                 ],
-                "task.stdout": "$(basename $(glob $(dir $(sample))/*_1.fastq)).sam",
                 "reference_collection": {
                     "required": true,
                     "dataclass": "Collection"
                 },
+                "bwa_collection": {
+                    "required": true,
+                    "dataclass": "Collection",
+                    "default": "39c6f22d40001074f4200a72559ae7eb+5745"
+                },
                 "sample": {
                     "required": true,
                     "dataclass": "Collection"
-                }
+                },
+                "task.stdout": "$(basename $(glob $(dir $(sample))/*_1.fastq)).sam"
             }
         }
     }
index 098767300fede09944adce305ef017739ebf3b76..db2aa8dac154d0290ce89a725b51b5434996b52b 100644 (file)
@@ -12,6 +12,6 @@ Paste your public key into the text area labeled *Public Key*, and click on the
 
 h1(#login). Using SSH to log into an Arvados VM
 
-To see a list of virtual machines that you have access to and determine the name and login information, click on the dropdown menu icon <span class="fa fa-lg fa-user"></span> <span class="caret"></span> in the upper right corner of the top navigation menu to access the user settings menu and click on the menu item *Virtual machines* to go to the Virtual machines page. This page lists the virtual machines you can access. The *hostname* column lists the name of each available VM.  The *logins* column will have a list of comma separated values of the form @you@. In this guide the hostname will be *_shell_* and the login will be *_you_*.  Replace these with your hostname and login name as appropriate.
+To see a list of virtual machines that you have access to and determine the name and login information, click on the dropdown menu icon <span class="fa fa-lg fa-user"></span> <span class="caret"></span> in the upper right corner of the top navigation menu to access the user settings menu and click on the menu item *Virtual machines* to go to the Virtual machines page. This page lists the virtual machines you can access. The *Host name* column lists the name of each available VM.  The *Login name* column will have a list of comma separated values of the form @you@. In this guide the hostname will be *_shell_* and the login will be *_you_*.  Replace these with your hostname and login name as appropriate.
 
 
index 413e51f9c057565647f829506506b4fd58aa7c8d..8a9e52abe7889e986dc1b9002f3bffc313219294 100644 (file)
@@ -12,7 +12,7 @@
                     "-t",
                     "$(node.cores)",
                     "-R",
-                    "@RG\\tID:group_id\\tPL:illumina\\tSM:sample_id",
+                    "@RG\\\tID:group_id\\\tPL:illumina\\\tSM:sample_id",
                     "$(glob $(dir $(reference_collection))/*.fasta)",
                     "$(glob $(dir $(sample))/*_1.fastq)",
                     "$(glob $(dir $(sample))/*_2.fastq)"
index fd635034b1e8db79b97a1893de07d07f87ed95ee..dba0fe94189034c239389801497cd213b5560919 100644 (file)
@@ -57,7 +57,7 @@ h3. Runtime constraints
 table(table table-bordered table-condensed).
 |_. Key|_. Type|_. Description|_. Implemented|
 |arvados_sdk_version|string|The Git version of the SDKs to use from the Arvados git repository.  See "Specifying Git versions":#script_version for more detail about acceptable ways to specify a commit.  If you use this, you must also specify a @docker_image@ constraint (see below).  In order to install the Python SDK successfully, Crunch must be able to find and run virtualenv inside the container.|&#10003;|
-|docker_image|string|The Docker image that this Job needs to run.  If specified, Crunch will create a Docker container from this image, and run the Job's script inside that.  The Keep mount and work directories will be available as volumes inside this container.  The image must be uploaded to Arvados using @arv keep docker@.  You may specify the image in any format that Docker accepts, such as @arvados/jobs@, @debian:latest@, or the Docker image id.  Alternatively, you may specify the UUID or portable data hash of the image Collection, returned by @arv keep docker@.|&#10003;|
+|docker_image|string|The Docker image that this Job needs to run.  If specified, Crunch will create a Docker container from this image, and run the Job's script inside that.  The Keep mount and work directories will be available as volumes inside this container.  The image must be uploaded to Arvados using @arv keep docker@.  You may specify the image in any format that Docker accepts, such as @arvados/jobs@, @debian:latest@, or the Docker image id.  Alternatively, you may specify the portable data hash of the image Collection.|&#10003;|
 |min_nodes|integer||&#10003;|
 |max_nodes|integer|||
 |min_cores_per_node|integer|Require that each node assigned to this Job have the specified number of CPU cores|&#10003;|
index 8cbf959fe638b8b90452b082a44f5164a81d4ed4..31a86c2ef44b770ebdf1ec816bf434d9dac6e6fa 100644 (file)
@@ -41,7 +41,7 @@ title: Arvados | Documentation
     <div class="col-sm-6" style="border-left: solid; border-width: 1px">
       <p><strong>Quickstart</strong> 
       <p>
-        Try any pipeline from the <a href="https://dev.arvados.org/projects/arvados/wiki/Public_Pipelines_and_Datasets">list of public pipelines</a>. For instance, the <a href="http://curover.se/pathomap">Pathomap Pipeline</a> links to these <a href="https://dev.arvados.org/projects/arvados/wiki/pathomap_tutorial/">step-by-step instructions</a> for trying Arvados out right in your browser using Curoverse's <a href="http://lp.curoverse.com/beta-signup/">public Arvados instance</a>.
+        Try any pipeline from the <a href="https://cloud.curoverse.com/projects/public">list of public pipelines</a>. For instance, the <a href="http://curover.se/pathomap">Pathomap Pipeline</a> links to these <a href="https://dev.arvados.org/projects/arvados/wiki/pathomap_tutorial/">step-by-step instructions</a> for trying Arvados out right in your browser using Curoverse's <a href="http://lp.curoverse.com/beta-signup/">public Arvados instance</a>.
       </p>
         <!--<p>-->
       <!--<ol>-->
diff --git a/doc/install/configure-azure-blob-storage.html.textile.liquid b/doc/install/configure-azure-blob-storage.html.textile.liquid
new file mode 100644 (file)
index 0000000..e365d74
--- /dev/null
@@ -0,0 +1,62 @@
+---
+layout: default
+navsection: installguide
+title: Configure Azure Blob storage
+...
+
+As an alternative to local and network-attached POSIX filesystems, Keepstore can store data in an Azure Storage container.
+
+h2. Create a container
+
+Normally, all keepstore services are configured to share a single Azure Storage container.
+
+Using the Azure web portal or command line tool, create or choose a storage account with a suitable redundancy profile and availability region. Use the storage account keys to create a new container.
+
+<notextile>
+<pre><code>~$ <span class="userinput">azure config mode arm</span>
+~$ <span class="userinput">azure login</span>
+~$ <span class="userinput">azure group create exampleGroupName eastus</span>
+~$ <span class="userinput">azure storage account create --type LRS --location eastus --resource-group exampleGroupName exampleStorageAccountName</span>
+~$ <span class="userinput">azure storage account keys list --resource-group exampleGroupName exampleStorageAccountName</span>
+info:    Executing command storage account keys list
++ Getting storage account keys
+data:    Primary: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==
+data:    Secondary: yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy==
+info:    storage account keys list command OK
+~$ <span class="userinput">AZURE_STORAGE_ACCOUNT="exampleStorageAccountName" \
+AZURE_STORAGE_ACCESS_KEY="zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==" \
+azure storage container create exampleContainerName</span>
+</code></pre>
+</notextile>
+
+h2. Configure keepstore
+
+Copy the primary storage account key to a file where it will be accessible to keepstore at startup time.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sh -c 'cat &gt;/etc/sv/keepstore/exampleStorageAccountName.key &lt;&lt;EOF'
+zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz==
+EOF</span>
+~$ <span class="userinput">sudo chmod 0400 /etc/sv/keepstore/exampleStorageAccountName.key</span>
+</code></pre>
+</notextile>
+
+In your keepstore startup script, instead of specifying a local storage using @-volume /path@ or discovering mount points automatically, use @-azure-*@ arguments to specify the storage container:
+
+<notextile>
+<pre><code>#!/bin/sh
+
+exec 2&gt;&amp;1
+exec keepstore \
+ -azure-storage-account-key-file <span class="userinput">/etc/sv/keepstore/exampleStorageAccountName.key</span> \
+ -azure-storage-account-name <span class="userinput">exampleStorageAccountName</span> \
+ -azure-storage-container-volume <span class="userinput">exampleContainerName</span>
+</code></pre>
+</notextile>
+
+Start (or restart) keepstore, and check its log file to confirm it is using the new configuration.
+
+<notextile>
+<pre><code>2015/10/26 21:06:24 Using volume azure-storage-container:"exampleContainerName" (writable=true)
+</code></pre>
+</notextile>
index 4cb46e13801e78a95b51c26cce0f3696cf8b3f78..efeff65b83dce3c774bffec219821a57f78e6175 100644 (file)
@@ -37,6 +37,10 @@ Verify that Keepstore is functional:
 <pre><code>~$ <span class="userinput">keepstore -h</span>
 2015/05/08 13:41:16 keepstore starting, pid 2565
 Usage of ./keepstore:
+  -azure-storage-account-key-file="": File containing the account key used for subsequent --azure-storage-container-volume arguments.
+  -azure-storage-account-name="": Azure storage account name used for subsequent --azure-storage-container-volume arguments.
+  -azure-storage-container-volume=[]: Use the given container as a storage volume. Can be given multiple times.
+  -azure-storage-replication=3: Replication level to report to clients when data is stored in an Azure container.
   -blob-signature-ttl=1209600: Lifetime of blob permission signatures. See services/api/config/application.default.yml.
   -blob-signing-key-file="": File containing the secret key for generating and verifying blob permission signatures.
   -data-manager-token-file="": File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
@@ -54,23 +58,67 @@ Usage of ./keepstore:
 </code></pre>
 </notextile>
 
-If you want access control on your Keepstore server(s), you must specify the @-enforce-permissions@ flag and provide a signing key. The @-blob-signing-key-file@ argument should be a file containing a long random alphanumeric string with no internal line breaks (it is also possible to use a socket or FIFO: keepstore reads it only once, at startup). This key must be the same as the @blob_signing_key@ configured in the "API server":install-api-server.html config/application.yml file.
+h3. Prepare storage volumes
 
-The @-max-buffers@ argument can be used to restrict keepstore's memory use. By default, keepstore will allocate no more than 128 blocks (8 GiB) worth of data buffers at a time. Normally this should be set as high as possible without risking swapping.
+{% include 'notebox_begin' %}
+This section uses a local filesystem as a backing store. If you are using Azure Storage, follow the setup instructions on the "Azure Blob Storage":configure-azure-blob-storage.html page instead.
+{% include 'notebox_end' %}
 
-Prepare one or more volumes for Keepstore to use. Simply create a /keep directory on all the partitions you would like Keepstore to use, and then start Keepstore. For example, using 2 tmpfs volumes:
+There are two ways to specify a set of local directories where keepstore should store its data files.
+# Implicitly, by creating a directory called @keep@ at the top level of each filesystem you intend to use, and omitting @-volume@ arguments.
+# Explicitly, by providing a @-volume@ argument for each directory.
+
+For example, if there are filesystems mounted at @/mnt@ and @/mnt2@:
 
 <notextile>
-<pre><code>~$ <span class="userinput">keepstore -blob-signing-key-file=./blob-signing-key</span>
+<pre><code>~$ <span class="userinput">mkdir /mnt/keep /mnt2/keep</span>
+~$ <span class="userinput">keepstore</span>
 2015/05/08 13:44:26 keepstore starting, pid 2765
 2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt2/keep] (writable=true)
 2015/05/08 13:44:26 listening at :25107
 </code></pre>
 </notextile>
 
-It's recommended to run Keepstore under "runit":http://smarden.org/runit/ or something similar.
+Equivalently:
 
-Repeat this section for each Keepstore server you are setting up.
+<notextile>
+<pre><code>~$ <span class="userinput">mkdir /mnt/keep /mnt2/keep</span>
+~$ <span class="userinput">keepstore -volume=/mnt/keep -volume=/mnt2/keep</span>
+2015/05/08 13:44:26 keepstore starting, pid 2765
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt/keep] (writable=true)
+2015/05/08 13:44:26 Using volume [UnixVolume /mnt2/keep] (writable=true)
+2015/05/08 13:44:26 listening at :25107
+</code></pre>
+</notextile>
+
+h3. Run keepstore as a supervised service
+
+We recommend running Keepstore under "runit":http://smarden.org/runit/ or something similar, using a run script like the following:
+
+<notextile>
+<pre><code>#!/bin/sh
+
+exec 2>&1
+exec GOGC=10 GOMAXPROCS=<span class="userinput">4</span> keepstore \
+ -enforce-permissions=true \
+ -blob-signing-key-file=<span class="userinput">/etc/keepstore/blob-signing.key</span> \
+ -max-buffers=<span class="userinput">100</span> \
+ -serialize=true \
+ -volume=<span class="userinput">/mnt/keep</span> \
+ -volume=<span class="userinput">/mnt2/keep</span>
+</code></pre>
+</notextile>
+
+The @GOMAXPROCS@ environment variable determines the maximum number of concurrent threads, and should normally be set to the number of CPU cores present.
+
+The @-max-buffers@ argument limits keepstore's memory usage. It should be set such that @max-buffers * 64MiB + 10%@ fits comfortably in memory. For example, @-max-buffers=100@ is suitable for a host with 8 GiB RAM.
+
+If you want access control on your Keepstore server(s), you must specify the @-enforce-permissions@ flag and provide a signing key. The @-blob-signing-key-file@ argument should be a file containing a long random alphanumeric string with no internal line breaks (it is also possible to use a socket or FIFO: keepstore reads it only once, at startup). This key must be the same as the @blob_signing_key@ configured in the "API server's":install-api-server.html configuration file, @/etc/arvados/api/application.yml@.
+
+h3. Set up additional servers
+
+Repeat the above sections to prepare volumes and bring up supervised services on each Keepstore server you are setting up.
 
 h3. Tell the API server about the Keepstore servers
 
@@ -90,6 +138,3 @@ Make sure to update the @service_host@ value to match each of your Keepstore ser
 }
 EOF</span>
 </code></pre></notextile>
-
-
-
index 56c7a4b337eb5eb0c2b6c8fbd2aa47675e353b7c..ca620f478a8d215066fde5a9ffa157032174f4db 100644 (file)
@@ -164,7 +164,8 @@ Use @rails console@ to create a @Client@ record that will be used by the Arvados
 <notextile>
 <pre><code>~$ <span class="userinput">ruby -e 'puts rand(2**400).to_s(36)'</span>
 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
-~$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
+~$ <span class="userinput">cd /var/www/arvados-sso/current</span>
+/var/www/arvados-sso/current$ <span class="userinput">RAILS_ENV=production bundle exec rails console</span>
 :001 &gt; <span class="userinput">c = Client.new</span>
 :002 &gt; <span class="userinput">c.name = "joshid"</span>
 :003 &gt; <span class="userinput">c.app_id = "arvados-server"</span>
index a9eb8c135943fd58a9f7b9d91627af19059386a0..a9fe1a9a11c3f182440b2eaeee2a56ba82aa6cf2 100644 (file)
@@ -4,7 +4,7 @@ navsection: userguide
 title: Accessing an Arvados VM with SSH - Unix Environments
 ...
 
-This document is for accessing an arvados VM using SSK keys in Unix environments (Linux, OS X, Cygwin). If you would like to access VM through your browser, please visit the "Accessing an Arvados VM with Webshell":vm-login-with-webshell.html page. If you are using a Windows environment, please visit the "Accessing an Arvados VM with SSH - Windows Environments":ssh-access-windows.html page.
+This document is for accessing an Arvados VM using SSH keys in Unix environments (Linux, OS X, Cygwin). If you would like to access VM through your browser, please visit the "Accessing an Arvados VM with Webshell":vm-login-with-webshell.html page. If you are using a Windows environment, please visit the "Accessing an Arvados VM with SSH - Windows Environments":ssh-access-windows.html page.
 
 {% include 'ssh_intro' %}
 
index c3a06405493d9c340b010547e88057aacb6039bf..98fadc9f204a35e142e6ca61ce47b2b30c811ca6 100644 (file)
@@ -4,7 +4,7 @@ navsection: userguide
 title: Accessing an Arvados VM with SSH - Windows Environments
 ...
 
-This document is for accessing an arvados VM using SSK keys in Windows environments. If you would like to use to access VM through your browser, please visit the "Accessing an Arvados VM with Webshell":vm-login-with-webshell.html page. If you are using a Unix environment (Linux, OS X, Cygwin), please visit the "Accessing an Arvados VM with SSH - Unix Environments":ssh-access-unix.html page.
+This document is for accessing an Arvados VM using SSH keys in Windows environments. If you would like to use to access VM through your browser, please visit the "Accessing an Arvados VM with Webshell":vm-login-with-webshell.html page. If you are using a Unix environment (Linux, OS X, Cygwin), please visit the "Accessing an Arvados VM with SSH - Unix Environments":ssh-access-unix.html page.
 
 {% include 'ssh_intro' %}
 
@@ -23,7 +23,7 @@ h3. Step 1 - Adding PuTTY to the PATH
 # After downloading PuTTY and installing it, you should have a PuTTY folder in @C:\Program Files\@ or @C:\Program Files (x86)\@ (if you are using a 64 bit operating system).
 # Open the Control Panel.
 # Select _Advanced System Settings_, and choose _Environment Variables_.
-If you are using newer systems like Windows 7, you may use the following to open _Advanced System Settings_. Open Control Panel. Click on _System and Security_. Click on _System_. Click on _Advanced system settings_ and choose _Environment Variables..._
+If you are using newer systems like Windows 10, you may use the following to open _Advanced System Settings_. Open Control Panel. Click on _System and Security_. Click on _System_. Click on _Advanced system settings_ and choose _Environment Variables..._
 # Under system variables, find and edit @PATH@.
 # If you installed PuTTY in @C:\Program Files\PuTTY\@, add the following to the end of PATH:
 <code>;C:\Program Files\PuTTY</code>
@@ -55,7 +55,7 @@ h3. Initial configuration
 # Open PuTTY from the Start Menu.
 # On the Session screen set the Host Name (or IP address) to “shell”, which is the hostname listed in the _Virtual Machines_ page.
 # On the Session screen set the Port to “22”.
-# On the Connection %(rarr)&rarr;% Data screen set the Auto-login username to the username listed in the *logins* column on the Arvados Workbench _Settings %(rarr)&rarr;% Virtual machines_ page.
+# On the Connection %(rarr)&rarr;% Data screen set the Auto-login username to the username listed in the *Login name* column on the Arvados Workbench Virtual machines_ page.
 # On the Connection %(rarr)&rarr;% Proxy screen set the Proxy Type to “Local”.
 # On the Connection %(rarr)&rarr;% Proxy screen in the “Telnet command, or local proxy command” box enter:
 <code>plink -P 2222 turnout@switchyard.{{ site.arvados_api_host }} %host</code>
index 50fa4747e74541943ba68782d3cdb8c8912e47f6..58ad868e5e50083576ade0dd9854ebc309dc580f 100644 (file)
@@ -10,7 +10,7 @@ h2(#webshell). Access VM using webshell
 
 Webshell gives you access to an arvados virtual machine from your browser with no additional setup.
 
-In the Arvados Workbench, click on the dropdown menu icon <span class="fa fa-lg fa-user"></span> <span class="caret"></span> in the upper right corner of the top navigation menu to access the user settings menu, and click on the menu item *Virtual machines* to see the list of virtual machines you can access.
+In the Arvados Workbench, click on the dropdown menu icon <span class="fa fa-lg fa-user"></span> <span class="caret"></span> in the upper right corner of the top navigation menu to access the user settings menu, and click on the menu item *Virtual machines* to see the list of virtual machines you can access.  If you do not have access to any virtual machines, please click on <span class="btn btn-sm btn-primary">Send request for shell access</span> or send an email to "support@curoverse.com":mailto:support@curoverse.com.
 
 Each row in the Virtual Machines panel lists the hostname of the VM, along with a <code>Log in as *you*</code> button under the column "Web shell beta". Clicking on this button will open up a webshell terminal for you in a new browser tab and log you in.
 
index 0967cbc308b3054ed7b7cd32f0903d7b1590f353..2d2120268aeecba5a532b1e4903151f714763988 100644 (file)
@@ -13,9 +13,7 @@ This guide provides a reference for using Arvados to solve big data bioinformati
 * Storing and querying metadata about genome sequence files, such as human subjects and their phenotypic traits using the "Arvados Metadata Database.":{{site.baseurl}}/user/topics/tutorial-trait-search.html
 * Accessing, organizing, and sharing data, pipelines and results using the "Arvados Workbench":{{site.baseurl}}/user/getting_started/workbench.html web application.
 
-The examples in this guide use the Arvados instance located at <a href="{{site.arvados_workbench_host}}/" target="_blank">{{site.arvados_workbench_host}}</a>.  If you are using a different Arvados instance replace @{{ site.arvados_workbench_host }}@ with your private instance in all of the examples in this guide.
-
-Curoverse maintains a public Arvados instance located at <a href="https://workbench.qr1hi.arvadosapi.com/" target="_blank">https://workbench.qr1hi.arvadosapi.com/</a>.  You must have an account in order to use this service.  If you would like to request an account, please send an email to "arvados@curoverse.com":mailto:arvados@curoverse.com.
+The examples in this guide use the public Arvados instance located at <a href="{{site.arvados_workbench_host}}/" target="_blank">{{site.arvados_workbench_host}}</a>.  If you are using a different Arvados instance replace @{{ site.arvados_workbench_host }}@ with your private instance in all of the examples in this guide.
 
 h2. Typographic conventions
 
index e6c83affb5d9cb86fab64803027c9f8369c5e673..1a31d126da698495eca853b72e1dcca9424c94a1 100644 (file)
@@ -190,7 +190,7 @@ You are now able to specify the runtime environment for your program using the @
 {% code 'example_docker' as javascript %}
 </notextile>
 
-* The @docker_image@ field can be one of: the Docker repository name (as shown above), the Docker image hash, the Arvados collection UUID, or the Arvados collection portable data hash.
+* The @docker_image@ field can be one of: the Docker repository name (as shown above), the Docker image hash, or the Arvados collection portable data hash.
 
 h2. Share Docker images
 
index f9522fbef01658f68b20493516b823c2e2d3611f..fac573aca9bdbc30a1f93b8f75e4af10e0818cfe 100644 (file)
@@ -18,13 +18,13 @@ h3. Steps
 
 # Start from the *Workbench Dashboard*.  You can access the Dashboard by clicking on *<i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard* in the upper left corner of any Workbench page.
 # Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a pipeline...</span> button.  This will open a dialog box titled *Choose a pipeline to run*.
-# Click to open the *All projects <span class="caret"></span>* menu.  Under the *Projects shared with me* header, select *<i class="fa fa-fw fa-share-alt"></i> Arvados Tutorial*.
+# In the search box, type in *Tutorial align using bwa mem*.
 # Select *<i class="fa fa-fw fa-gear"></i> Tutorial align using bwa mem* and click the <span class="btn btn-sm btn-primary" >Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span> button.  This will create a new pipeline in your *Home* project and will open it. You can now supply the inputs for the pipeline.
 # The first input parameter to the pipeline is *"reference_collection" parameter for run-command script in bwa-mem component*.  Click the <span class="btn btn-sm btn-primary">Choose</span> button beneath that header.  This will open a dialog box titled *Choose a dataset for "reference_collection" parameter for run-command script in bwa-mem component*.
-# Once again, open the *All projects <span class="caret"></span>* menu and select *<i class="fa fa-fw fa-share-alt"></i> Arvados Tutorial*.  Select *<i class="fa fa-fw fa-archive"></i> Tutorial chromosome 19 reference* and click the <span class="btn btn-sm btn-primary" >OK</span> button.
+# Open the *Home <span class="caret"></span>* menu and select *All Projects*. Search for and select *<i class="fa fa-fw fa-archive"></i> Tutorial chromosome 19 reference* and click the <span class="btn btn-sm btn-primary" >OK</span> button.
 # Repeat the previous two steps to set the *"sample" parameter for run-command script in bwa-mem component* parameter to *<i class="fa fa-fw fa-archive"></i> Tutorial sample exome*.
 # Click on the <span class="btn btn-sm btn-primary" >Run <i class="fa fa-fw fa-play"></i></span> button.  The page updates to show you that the pipeline has been submitted to run on the Arvados cluster.
-# After the pipeline starts running, you can track the progress by watching log messages from jobs.  This page refreshes automatically.  You will see a <span class="label label-success">complete</span> label under the *job* column when the pipeline completes successfully.
+# After the pipeline starts running, you can track the progress by watching log messages from jobs.  This page refreshes automatically.  You will see a <span class="label label-success">complete</span> label when the pipeline completes successfully.
 # Click on the *Output* link to see the results of the job.  This will load a new page listing the output files from this pipeline.  You'll see the output SAM file from the alignment tool under the *Files* tab.
 # Click on the <span class="btn btn-sm btn-info"><i class="fa fa-download"></i></span> download button to the right of the SAM file to download your results.
 
index 5539012c49e19e2b9afbeba26850aabbec9405d1..555c4d19a613037e36395338a9de32e15ee53c6a 100755 (executable)
@@ -411,7 +411,7 @@ if (!defined $no_clear_tmp) {
 }
 
 # If this job requires a Docker image, install that.
-my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem);
+my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
 if ($docker_locator = $Job->{docker_image_locator}) {
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
@@ -449,6 +449,42 @@ fi
       {fork => 1});
   $docker_limitmem = ($? == 0);
 
+  # Find a non-root Docker user to use.
+  # Tries the default user for the container, then 'crunch', then 'nobody',
+  # testing for whether the actual user id is non-zero.  This defends against
+  # mistakes but not malice, but we intend to harden the security in the future
+  # so we don't want anyone getting used to their jobs running as root in their
+  # Docker containers.
+  my @tryusers = ("", "crunch", "nobody");
+  foreach my $try_user (@tryusers) {
+    my $try_user_arg;
+    if ($try_user eq "") {
+      Log(undef, "Checking if container default user is not UID 0");
+      $try_user_arg = "";
+    } else {
+      Log(undef, "Checking if user '$try_user' is not UID 0");
+      $try_user_arg = "--user=$try_user";
+    }
+    srun(["srun", "--nodelist=" . $node[0]],
+         ["/bin/sh", "-ec",
+          "a=`$docker_bin run --rm $try_user_arg $docker_hash id --user` && " .
+          " test \$a -ne 0"],
+         {fork => 1});
+    if ($? == 0) {
+      $dockeruserarg = $try_user_arg;
+      if ($try_user eq "") {
+        Log(undef, "Container will run with default user");
+      } else {
+        Log(undef, "Container will run with $dockeruserarg");
+      }
+      last;
+    }
+  }
+
+  if (!defined $dockeruserarg) {
+    croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container.");
+  }
+
   if ($Job->{arvados_sdk_version}) {
     # The job also specifies an Arvados SDK version.  Add the SDKs to the
     # tar file for the build script to install.
@@ -844,6 +880,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
       "--job-name=$job_id.$id.$$",
        );
+
+    my $stdbuf = " stdbuf --output=0 --error=0 ";
+
     my $command =
        "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
         ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
@@ -859,7 +898,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       my $cidfile = "$ENV{CRUNCH_TMP}/$Jobstep->{arvados_task}->{uuid}-$Jobstep->{failures}.cid";
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 ";
-      $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy ";
+      $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i \Q$dockeruserarg\E --cidfile=$cidfile --sig-proxy ";
       # We only set memory limits if Docker lets us limit both memory and swap.
       # Memory limits alone have been supported longer, but subprocesses tend
       # to get SIGKILL if they exceed that without any swap limit set.
@@ -917,12 +956,22 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
       }
       $command .= "--env=\QHOME=$ENV{HOME}\E ";
       $command .= "\Q$docker_hash\E ";
-      $command .= "stdbuf --output=0 --error=0 ";
-      $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
+
+      if ($Job->{arvados_sdk_version}) {
+        $command .= $stdbuf;
+        $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E";
+      } else {
+        $command .= "/bin/sh -c \'mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " .
+            "if which stdbuf >/dev/null ; then " .
+            "  exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+            " else " .
+            "  exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" .
+            " fi\'";
+      }
     } else {
       # Non-docker run
       $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
-      $command .= "stdbuf --output=0 --error=0 ";
+      $command .= $stdbuf;
       $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
     }
 
index d35f6dacb72b632e18718b503d0a2f4ff55e7a17..2c508dcb4a1100cf4609fc3ff3387ac089c6ab26 100644 (file)
@@ -21,7 +21,7 @@ type ServerRequiredSuite struct{}
 
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 }
 
 func (s *ServerRequiredSuite) SetUpTest(c *C) {
index e922104aeb75096b4a6f36b6648e74af0abe821d..27c552a4e104094ca2ed15991e310a3b7e9cd65e 100644 (file)
@@ -99,14 +99,10 @@ func StopAPI() {
        exec.Command("python", "run_test_server.py", "stop").Run()
 }
 
-// StartKeep starts 2 keep servers with enforcePermissions=false
-func StartKeep() {
-       StartKeepWithParams(2, false)
-}
-
-// StartKeepWithParams starts the given number of keep servers,
+// StartKeep starts the given number of keep servers,
 // optionally with -enforce-permissions enabled.
-func StartKeepWithParams(numKeepServers int, enforcePermissions bool) {
+// Use numKeepServers = 2 and enforcePermissions = false under all normal circumstances.
+func StartKeep(numKeepServers int, enforcePermissions bool) {
        cwd, _ := os.Getwd()
        defer os.Chdir(cwd)
        chdirToPythonTests()
@@ -128,14 +124,10 @@ func StartKeepWithParams(numKeepServers int, enforcePermissions bool) {
        }
 }
 
-func StopKeep() {
-       StopKeepWithParams(2)
-}
-
-// StopKeepServers stops keep servers that were started with
-// StartKeep. numkeepServers should be the same value that was passed
-// to StartKeep.
-func StopKeepWithParams(numKeepServers int) {
+// StopKeep stops keep servers that were started with StartKeep.
+// numkeepServers should be the same value that was passed to StartKeep,
+// which is 2 under all normal circumstances.
+func StopKeep(numKeepServers int) {
        cwd, _ := os.Getwd()
        defer os.Chdir(cwd)
        chdirToPythonTests()
diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go
new file mode 100644 (file)
index 0000000..8e24e18
--- /dev/null
@@ -0,0 +1,356 @@
+package main
+
+import (
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "log"
+       "os"
+       "os/exec"
+       "os/signal"
+       "strings"
+       "syscall"
+)
+
+type TaskDef struct {
+       Command            []string          `json:"command"`
+       Env                map[string]string `json:"task.env"`
+       Stdin              string            `json:"task.stdin"`
+       Stdout             string            `json:"task.stdout"`
+       Vwd                map[string]string `json:"task.vwd"`
+       SuccessCodes       []int             `json:"task.successCodes"`
+       PermanentFailCodes []int             `json:"task.permanentFailCodes"`
+       TemporaryFailCodes []int             `json:"task.temporaryFailCodes"`
+}
+
+type Tasks struct {
+       Tasks []TaskDef `json:"tasks"`
+}
+
+type Job struct {
+       Script_parameters Tasks `json:"script_parameters"`
+}
+
+type Task struct {
+       Job_uuid                 string  `json:"job_uuid"`
+       Created_by_job_task_uuid string  `json:"created_by_job_task_uuid"`
+       Parameters               TaskDef `json:"parameters"`
+       Sequence                 int     `json:"sequence"`
+       Output                   string  `json:"output"`
+       Success                  bool    `json:"success"`
+       Progress                 float32 `json:"sequence"`
+}
+
+type IArvadosClient interface {
+       Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
+       Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
+}
+
+func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
+       tmpdir = crunchtmpdir + "/tmpdir"
+       err = os.Mkdir(tmpdir, 0700)
+       if err != nil {
+               return "", "", err
+       }
+
+       outdir = crunchtmpdir + "/outdir"
+       err = os.Mkdir(outdir, 0700)
+       if err != nil {
+               return "", "", err
+       }
+
+       return tmpdir, outdir, nil
+}
+
+func checkOutputFilename(outdir, fn string) error {
+       if strings.HasPrefix(fn, "/") || strings.HasSuffix(fn, "/") {
+               return fmt.Errorf("Path must not start or end with '/'")
+       }
+       if strings.Index("../", fn) != -1 {
+               return fmt.Errorf("Path must not contain '../'")
+       }
+
+       sl := strings.LastIndex(fn, "/")
+       if sl != -1 {
+               os.MkdirAll(outdir+"/"+fn[0:sl], 0777)
+       }
+       return nil
+}
+
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+       if taskp.Vwd != nil {
+               for k, v := range taskp.Vwd {
+                       v = substitute(v, replacements)
+                       err = checkOutputFilename(outdir, k)
+                       if err != nil {
+                               return "", "", err
+                       }
+                       os.Symlink(v, outdir+"/"+k)
+               }
+       }
+
+       if taskp.Stdin != "" {
+               // Set up stdin redirection
+               stdin = substitute(taskp.Stdin, replacements)
+               cmd.Stdin, err = os.Open(stdin)
+               if err != nil {
+                       return "", "", err
+               }
+       }
+
+       if taskp.Stdout != "" {
+               err = checkOutputFilename(outdir, taskp.Stdout)
+               if err != nil {
+                       return "", "", err
+               }
+               // Set up stdout redirection
+               stdout = outdir + "/" + taskp.Stdout
+               cmd.Stdout, err = os.Create(stdout)
+               if err != nil {
+                       return "", "", err
+               }
+       } else {
+               cmd.Stdout = os.Stdout
+       }
+
+       if taskp.Env != nil {
+               // Set up subprocess environment
+               cmd.Env = os.Environ()
+               for k, v := range taskp.Env {
+                       v = substitute(v, replacements)
+                       cmd.Env = append(cmd.Env, k+"="+v)
+               }
+       }
+       return stdin, stdout, nil
+}
+
+// Set up signal handlers.  Go sends signal notifications to a "signal
+// channel".
+func setupSignals(cmd *exec.Cmd) chan os.Signal {
+       sigChan := make(chan os.Signal, 1)
+       signal.Notify(sigChan, syscall.SIGTERM)
+       signal.Notify(sigChan, syscall.SIGINT)
+       signal.Notify(sigChan, syscall.SIGQUIT)
+       return sigChan
+}
+
+func inCodes(code int, codes []int) bool {
+       if codes != nil {
+               for _, c := range codes {
+                       if code == c {
+                               return true
+                       }
+               }
+       }
+       return false
+}
+
+const TASK_TEMPFAIL = 111
+
+type TempFail struct{ error }
+type PermFail struct{}
+
+func (s PermFail) Error() string {
+       return "PermFail"
+}
+
+func substitute(inp string, subst map[string]string) string {
+       for k, v := range subst {
+               inp = strings.Replace(inp, k, v, -1)
+       }
+       return inp
+}
+
+func runner(api IArvadosClient,
+       kc IKeepClient,
+       jobUuid, taskUuid, crunchtmpdir, keepmount string,
+       jobStruct Job, taskStruct Task) error {
+
+       var err error
+       taskp := taskStruct.Parameters
+
+       // If this is task 0 and there are multiple tasks, dispatch subtasks
+       // and exit.
+       if taskStruct.Sequence == 0 {
+               if len(jobStruct.Script_parameters.Tasks) == 1 {
+                       taskp = jobStruct.Script_parameters.Tasks[0]
+               } else {
+                       for _, task := range jobStruct.Script_parameters.Tasks {
+                               err := api.Create("job_tasks",
+                                       map[string]interface{}{
+                                               "job_task": Task{Job_uuid: jobUuid,
+                                                       Created_by_job_task_uuid: taskUuid,
+                                                       Sequence:                 1,
+                                                       Parameters:               task}},
+                                       nil)
+                               if err != nil {
+                                       return TempFail{err}
+                               }
+                       }
+                       err = api.Update("job_tasks", taskUuid,
+                               map[string]interface{}{
+                                       "job_task": Task{
+                                               Output:   "",
+                                               Success:  true,
+                                               Progress: 1.0}},
+                               nil)
+                       return nil
+               }
+       }
+
+       var tmpdir, outdir string
+       tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
+       if err != nil {
+               return TempFail{err}
+       }
+
+       replacements := map[string]string{
+               "$(task.tmpdir)": tmpdir,
+               "$(task.outdir)": outdir,
+               "$(task.keep)":   keepmount}
+
+       // Set up subprocess
+       for k, v := range taskp.Command {
+               taskp.Command[k] = substitute(v, replacements)
+       }
+
+       cmd := exec.Command(taskp.Command[0], taskp.Command[1:]...)
+
+       cmd.Dir = outdir
+
+       var stdin, stdout string
+       stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+       if err != nil {
+               return err
+       }
+
+       // Run subprocess and wait for it to complete
+       if stdin != "" {
+               stdin = " < " + stdin
+       }
+       if stdout != "" {
+               stdout = " > " + stdout
+       }
+       log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+
+       var caughtSignal os.Signal
+       sigChan := setupSignals(cmd)
+
+       err = cmd.Start()
+       if err != nil {
+               signal.Stop(sigChan)
+               return TempFail{err}
+       }
+
+       finishedSignalNotify := make(chan struct{})
+       go func(sig <-chan os.Signal) {
+               for sig := range sig {
+                       caughtSignal = sig
+                       cmd.Process.Signal(caughtSignal)
+               }
+               close(finishedSignalNotify)
+       }(sigChan)
+
+       err = cmd.Wait()
+       signal.Stop(sigChan)
+
+       close(sigChan)
+       <-finishedSignalNotify
+
+       if caughtSignal != nil {
+               log.Printf("Caught signal %v", caughtSignal)
+               return PermFail{}
+       }
+
+       if err != nil {
+               // Run() returns ExitError on non-zero exit code, but we handle
+               // that down below.  So only return if it's not ExitError.
+               if _, ok := err.(*exec.ExitError); !ok {
+                       return TempFail{err}
+               }
+       }
+
+       var success bool
+
+       exitCode := cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()
+
+       log.Printf("Completed with exit code %v", exitCode)
+
+       if inCodes(exitCode, taskp.PermanentFailCodes) {
+               success = false
+       } else if inCodes(exitCode, taskp.TemporaryFailCodes) {
+               return TempFail{fmt.Errorf("Process tempfail with exit code %v", exitCode)}
+       } else if inCodes(exitCode, taskp.SuccessCodes) || cmd.ProcessState.Success() {
+               success = true
+       } else {
+               success = false
+       }
+
+       // Upload output directory
+       manifest, err := WriteTree(kc, outdir)
+       if err != nil {
+               return TempFail{err}
+       }
+
+       // Set status
+       err = api.Update("job_tasks", taskUuid,
+               map[string]interface{}{
+                       "job_task": Task{
+                               Output:   manifest,
+                               Success:  success,
+                               Progress: 1}},
+               nil)
+       if err != nil {
+               return TempFail{err}
+       }
+
+       if success {
+               return nil
+       } else {
+               return PermFail{}
+       }
+}
+
+func main() {
+       api, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       jobUuid := os.Getenv("JOB_UUID")
+       taskUuid := os.Getenv("TASK_UUID")
+       tmpdir := os.Getenv("TASK_WORK")
+       keepmount := os.Getenv("TASK_KEEPMOUNT")
+
+       var jobStruct Job
+       var taskStruct Task
+
+       err = api.Get("jobs", jobUuid, nil, &jobStruct)
+       if err != nil {
+               log.Fatal(err)
+       }
+       err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       var kc IKeepClient
+       kc, err = keepclient.MakeKeepClient(&api)
+       if err != nil {
+               log.Fatal(err)
+       }
+
+       syscall.Umask(0022)
+       err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+
+       if err == nil {
+               os.Exit(0)
+       } else if _, ok := err.(TempFail); ok {
+               log.Print(err)
+               os.Exit(TASK_TEMPFAIL)
+       } else if _, ok := err.(PermFail); ok {
+               os.Exit(1)
+       } else {
+               log.Fatal(err)
+       }
+}
diff --git a/sdk/go/crunchrunner/crunchrunner_test.go b/sdk/go/crunchrunner/crunchrunner_test.go
new file mode 100644 (file)
index 0000000..52d5c1a
--- /dev/null
@@ -0,0 +1,445 @@
+package main
+
+import (
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       . "gopkg.in/check.v1"
+       "io"
+       "io/ioutil"
+       "log"
+       "os"
+       "syscall"
+       "testing"
+       "time"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+type TestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&TestSuite{})
+
+type ArvTestClient struct {
+       c        *C
+       manifest string
+       success  bool
+}
+
+func (t ArvTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+       return nil
+}
+
+func (t ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+       t.c.Check(resourceType, Equals, "job_tasks")
+       t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": Task{
+               Output:   t.manifest,
+               Success:  t.success,
+               Progress: 1}})
+       return nil
+}
+
+func (s *TestSuite) TestSimpleRun(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"echo", "foo"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+}
+
+func checkOutput(c *C, tmpdir string) {
+       file, err := os.Open(tmpdir + "/outdir/output.txt")
+       c.Assert(err, IsNil)
+
+       data := make([]byte, 100)
+       var count int
+       err = nil
+       offset := 0
+       for err == nil {
+               count, err = file.Read(data[offset:])
+               offset += count
+       }
+       c.Assert(err, Equals, io.EOF)
+       c.Check(string(data[0:offset]), Equals, "foo\n")
+}
+
+func (s *TestSuite) TestSimpleRunSubtask(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c,
+               ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{
+                       TaskDef{Command: []string{"echo", "bar"}},
+                       TaskDef{Command: []string{"echo", "foo"}}}}},
+               Task{Parameters: TaskDef{
+                       Command: []string{"echo", "foo"},
+                       Stdout:  "output.txt"},
+                       Sequence: 1})
+       c.Check(err, IsNil)
+
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestRedirect(c *C) {
+       tmpfile, _ := ioutil.TempFile("", "")
+       tmpfile.Write([]byte("foo\n"))
+       tmpfile.Close()
+       defer os.Remove(tmpfile.Name())
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c,
+               ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"cat"},
+                       Stdout:  "output.txt",
+                       Stdin:   tmpfile.Name()}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnv(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"/bin/sh", "-c", "echo $BAR"},
+                       Stdout:  "output.txt",
+                       Env:     map[string]string{"BAR": "foo"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnvSubstitute(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "foo\n",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"/bin/sh", "-c", "echo $BAR"},
+                       Stdout:  "output.txt",
+                       Env:     map[string]string{"BAR": "$(task.keep)"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestEnvReplace(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"/bin/sh", "-c", "echo $PATH"},
+                       Stdout:  "output.txt",
+                       Env:     map[string]string{"PATH": "foo"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+type SubtaskTestClient struct {
+       c     *C
+       parms []Task
+       i     int
+}
+
+func (t *SubtaskTestClient) Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error {
+       t.c.Check(resourceType, Equals, "job_tasks")
+       t.c.Check(parameters, DeepEquals, arvadosclient.Dict{"job_task": t.parms[t.i]})
+       t.i += 1
+       return nil
+}
+
+func (t SubtaskTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
+       return nil
+}
+
+func (s *TestSuite) TestScheduleSubtask(c *C) {
+
+       api := SubtaskTestClient{c, []Task{
+               Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+                       Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+                       Sequence:                 1,
+                       Parameters: TaskDef{
+                               Command: []string{"echo", "bar"}}},
+               Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+                       Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
+                       Sequence:                 1,
+                       Parameters: TaskDef{
+                               Command: []string{"echo", "foo"}}}},
+               0}
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(&api, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{
+                       TaskDef{Command: []string{"echo", "bar"}},
+                       TaskDef{Command: []string{"echo", "foo"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+
+}
+
+func (s *TestSuite) TestRunFail(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunSuccessCode(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", true}, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command:      []string{"/bin/sh", "-c", "exit 1"},
+                       SuccessCodes: []int{0, 1}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+}
+
+func (s *TestSuite) TestRunFailCode(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command:            []string{"/bin/sh", "-c", "exit 0"},
+                       PermanentFailCodes: []int{0, 1}}}}},
+               Task{Sequence: 0})
+       c.Check(err, FitsTypeOf, PermFail{})
+}
+
+func (s *TestSuite) TestRunTempFailCode(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, "", false}, KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command:            []string{"/bin/sh", "-c", "exit 1"},
+                       TemporaryFailCodes: []int{1}}}}},
+               Task{Sequence: 0})
+       c.Check(err, FitsTypeOf, TempFail{})
+}
+
+func (s *TestSuite) TestVwd(c *C) {
+       tmpfile, _ := ioutil.TempFile("", "")
+       tmpfile.Write([]byte("foo\n"))
+       tmpfile.Close()
+       defer os.Remove(tmpfile.Name())
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c, ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"ls", "output.txt"},
+                       Vwd: map[string]string{
+                               "output.txt": tmpfile.Name()}}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSubstitutionStdin(c *C) {
+       keepmount, _ := ioutil.TempDir("", "")
+       ioutil.WriteFile(keepmount+"/"+"file1.txt", []byte("foo\n"), 0600)
+       defer func() {
+               os.RemoveAll(keepmount)
+       }()
+
+       log.Print("Keepmount is ", keepmount)
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       log.Print("tmpdir is ", tmpdir)
+
+       err := runner(ArvTestClient{c,
+               ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               keepmount,
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"cat"},
+                       Stdout:  "output.txt",
+                       Stdin:   "$(task.keep)/file1.txt"}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSubstitutionCommandLine(c *C) {
+       keepmount, _ := ioutil.TempDir("", "")
+       ioutil.WriteFile(keepmount+"/"+"file1.txt", []byte("foo\n"), 0600)
+       defer func() {
+               os.RemoveAll(keepmount)
+       }()
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c,
+               ". d3b07384d113edec49eaa6238ad5ff00+4 0:4:output.txt\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               keepmount,
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"cat", "$(task.keep)/file1.txt"},
+                       Stdout:  "output.txt"}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+
+       checkOutput(c, tmpdir)
+}
+
+func (s *TestSuite) TestSignal(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       go func() {
+               time.Sleep(1 * time.Second)
+               self, _ := os.FindProcess(os.Getpid())
+               self.Signal(syscall.SIGINT)
+       }()
+
+       err := runner(ArvTestClient{c,
+               "", false},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"sleep", "4"}}}}},
+               Task{Sequence: 0})
+       c.Check(err, FitsTypeOf, PermFail{})
+
+}
+
+func (s *TestSuite) TestQuoting(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       err := runner(ArvTestClient{c,
+               "./s\\040ub:dir d3b07384d113edec49eaa6238ad5ff00+4 0:4::e\\040vil\n", true},
+               KeepTestClient{},
+               "zzzz-8i9sb-111111111111111",
+               "zzzz-ot0gb-111111111111111",
+               tmpdir,
+               "",
+               Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+                       Command: []string{"echo", "foo"},
+                       Stdout:  "s ub:dir/:e vi\nl"}}}},
+               Task{Sequence: 0})
+       c.Check(err, IsNil)
+}
diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go
new file mode 100644 (file)
index 0000000..4ced0ce
--- /dev/null
@@ -0,0 +1,217 @@
+package main
+
+import (
+       "bytes"
+       "crypto/md5"
+       "errors"
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
+       "io"
+       "log"
+       "os"
+       "path/filepath"
+       "sort"
+       "strings"
+)
+
+type Block struct {
+       data   []byte
+       offset int64
+}
+
+type ManifestStreamWriter struct {
+       *ManifestWriter
+       *manifest.ManifestStream
+       offset int64
+       *Block
+       uploader chan *Block
+       finish   chan []error
+}
+
+type IKeepClient interface {
+       PutHB(hash string, buf []byte) (string, int, error)
+}
+
+func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
+       n, err := m.ReadFrom(bytes.NewReader(p))
+       return int(n), err
+}
+
+func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
+       var total int64
+       var count int
+
+       for err == nil {
+               if m.Block == nil {
+                       m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
+               }
+               count, err = r.Read(m.Block.data[m.Block.offset:])
+               total += int64(count)
+               m.Block.offset += int64(count)
+               if m.Block.offset == keepclient.BLOCKSIZE {
+                       m.uploader <- m.Block
+                       m.Block = nil
+               }
+       }
+
+       if err == io.EOF {
+               return total, nil
+       } else {
+               return total, err
+       }
+
+}
+
+func (m *ManifestStreamWriter) goUpload() {
+       var errors []error
+       uploader := m.uploader
+       finish := m.finish
+       for block := range uploader {
+               hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+               signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+               if err != nil {
+                       errors = append(errors, err)
+               } else {
+                       m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+               }
+       }
+       finish <- errors
+}
+
+type ManifestWriter struct {
+       IKeepClient
+       stripPrefix string
+       Streams     map[string]*ManifestStreamWriter
+}
+
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
+       if info.IsDir() {
+               return nil
+       }
+
+       var dir string
+       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       }
+       if dir == "" {
+               dir = "."
+       }
+
+       fn := path[(len(path) - len(info.Name())):]
+
+       if m.Streams[dir] == nil {
+               m.Streams[dir] = &ManifestStreamWriter{
+                       m,
+                       &manifest.ManifestStream{StreamName: dir},
+                       0,
+                       nil,
+                       make(chan *Block),
+                       make(chan []error)}
+               go m.Streams[dir].goUpload()
+       }
+
+       stream := m.Streams[dir]
+
+       fileStart := stream.offset
+
+       file, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+
+       log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+       var count int64
+       count, err = io.Copy(stream, file)
+       if err != nil {
+               return err
+       }
+
+       stream.offset += count
+
+       stream.ManifestStream.Files = append(stream.ManifestStream.Files,
+               fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+
+       return nil
+}
+
+func (m *ManifestWriter) Finish() error {
+       var errstring string
+       for _, stream := range m.Streams {
+               if stream.uploader == nil {
+                       continue
+               }
+               if stream.Block != nil {
+                       stream.uploader <- stream.Block
+               }
+               close(stream.uploader)
+               stream.uploader = nil
+
+               errors := <-stream.finish
+               close(stream.finish)
+               stream.finish = nil
+
+               for _, r := range errors {
+                       errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+               }
+       }
+       if errstring != "" {
+               return errors.New(errstring)
+       } else {
+               return nil
+       }
+}
+
+func (m *ManifestWriter) ManifestText() string {
+       m.Finish()
+       var buf bytes.Buffer
+
+       dirs := make([]string, len(m.Streams))
+       i := 0
+       for k := range m.Streams {
+               dirs[i] = k
+               i++
+       }
+       sort.Strings(dirs)
+
+       for _, k := range dirs {
+               v := m.Streams[k]
+
+               if k == "." {
+                       buf.WriteString(".")
+               } else {
+                       k = strings.Replace(k, " ", "\\040", -1)
+                       k = strings.Replace(k, "\n", "", -1)
+                       buf.WriteString("./" + k)
+               }
+               for _, b := range v.Blocks {
+                       buf.WriteString(" ")
+                       buf.WriteString(b)
+               }
+               for _, f := range v.Files {
+                       buf.WriteString(" ")
+                       f = strings.Replace(f, " ", "\\040", -1)
+                       f = strings.Replace(f, "\n", "", -1)
+                       buf.WriteString(f)
+               }
+               buf.WriteString("\n")
+       }
+       return buf.String()
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+       mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
+       err = filepath.Walk(root, mw.WalkFunc)
+
+       if err != nil {
+               return "", err
+       }
+
+       err = mw.Finish()
+       if err != nil {
+               return "", err
+       }
+
+       return mw.ManifestText(), nil
+}
diff --git a/sdk/go/crunchrunner/upload_test.go b/sdk/go/crunchrunner/upload_test.go
new file mode 100644 (file)
index 0000000..a2bf0ac
--- /dev/null
@@ -0,0 +1,140 @@
+package main
+
+import (
+       "crypto/md5"
+       "errors"
+       "fmt"
+       . "gopkg.in/check.v1"
+       "io/ioutil"
+       "os"
+)
+
+type UploadTestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&UploadTestSuite{})
+
+type KeepTestClient struct {
+}
+
+func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+       return fmt.Sprintf("%x+%v", md5.Sum(buf), len(buf)), len(buf), nil
+}
+
+func (s *TestSuite) TestSimpleUpload(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       os.Mkdir(tmpdir+"/subdir", 0700)
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+       ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+`)
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       file, _ := os.Create(tmpdir + "/" + "file1.txt")
+       data := make([]byte, 1024*1024-1)
+       for i := range data {
+               data[i] = byte(i % 10)
+       }
+       for i := 0; i < 65; i++ {
+               file.Write(data)
+       }
+       file.Close()
+
+       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       os.Mkdir(tmpdir+"/subdir", 0700)
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
+`)
+}
+
+type KeepErrorTestClient struct {
+}
+
+func (k KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+       return "", 0, errors.New("Failed!")
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       str, err := WriteTree(KeepErrorTestClient{}, tmpdir)
+       c.Check(err, NotNil)
+       c.Check(str, Equals, "")
+}
index e74ae2cf08da0401bab6b808cf7333af05ef8a36..0e6fadcc3548c99d2e85c30219a74ffdce42bf72 100644 (file)
@@ -22,7 +22,33 @@ import (
 // A Keep "block" is 64MB.
 const BLOCKSIZE = 64 * 1024 * 1024
 
-var BlockNotFound = errors.New("Block not found")
+// Error interface with an error and boolean indicating whether the error is temporary
+type Error interface {
+       error
+       Temporary() bool
+}
+
+// multipleResponseError is of type Error
+type multipleResponseError struct {
+       error
+       isTemp bool
+}
+
+func (e *multipleResponseError) Temporary() bool {
+       return e.isTemp
+}
+
+// BlockNotFound is a multipleResponseError where isTemp is false
+var BlockNotFound = &ErrNotFound{multipleResponseError{
+       error:  errors.New("Block not found"),
+       isTemp: false,
+}}
+
+// ErrNotFound is a multipleResponseError where isTemp can be true or false
+type ErrNotFound struct {
+       multipleResponseError
+}
+
 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
 var OversizeBlockError = errors.New("Exceeded maximum block size (" + strconv.Itoa(BLOCKSIZE) + ")")
 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
@@ -145,7 +171,12 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
        var errs []string
 
        tries_remaining := 1 + kc.Retries
+
        serversToTry := kc.getSortedRoots(locator)
+
+       numServers := len(serversToTry)
+       count404 := 0
+
        var retryList []string
 
        for tries_remaining > 0 {
@@ -181,6 +212,8 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
                                        // server side failure, transient
                                        // error, can try again.
                                        retryList = append(retryList, host)
+                               } else if resp.StatusCode == 404 {
+                                       count404++
                                }
                        } else {
                                // Success.
@@ -201,7 +234,16 @@ func (kc *KeepClient) getOrHead(method string, locator string) (io.ReadCloser, i
        }
        log.Printf("DEBUG: %s %s failed: %v", method, locator, errs)
 
-       return nil, 0, "", BlockNotFound
+       var err error
+       if count404 == numServers {
+               err = BlockNotFound
+       } else {
+               err = &ErrNotFound{multipleResponseError{
+                       error:  fmt.Errorf("%s %s failed: %v", method, locator, errs),
+                       isTemp: len(serversToTry) > 0,
+               }}
+       }
+       return nil, 0, "", err
 }
 
 // Get() retrieves a block, given a locator. Returns a reader, the
index b5bc5ced4b23716a381f2af7d53f075a17b6771e..df4638619f488bc86dd3626cf2579a60fda62bed 100644 (file)
@@ -14,6 +14,7 @@ import (
        "net"
        "net/http"
        "os"
+       "strings"
        "testing"
 )
 
@@ -45,14 +46,14 @@ func (s *ServerRequiredSuite) SetUpSuite(c *C) {
                return
        }
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 }
 
 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
        if *no_server {
                return
        }
-       arvadostest.StopKeep()
+       arvadostest.StopKeep(2)
        arvadostest.StopAPI()
 }
 
@@ -442,6 +443,7 @@ func (s *StandaloneSuite) TestPutWithTooManyFail(c *C) {
        kc, _ := MakeKeepClient(&arv)
 
        kc.Want_replicas = 2
+       kc.Retries = 0
        arv.ApiToken = "abc123"
        localRoots := make(map[string]string)
        writableLocalRoots := make(map[string]string)
@@ -552,9 +554,13 @@ func (s *StandaloneSuite) TestGetFail(c *C) {
        kc, _ := MakeKeepClient(&arv)
        arv.ApiToken = "abc123"
        kc.SetServiceRoots(map[string]string{"x": ks.url}, nil, nil)
+       kc.Retries = 0
 
        r, n, url2, err := kc.Get(hash)
-       c.Check(err, Equals, BlockNotFound)
+       errNotFound, _ := err.(*ErrNotFound)
+       c.Check(errNotFound, NotNil)
+       c.Check(strings.Contains(errNotFound.Error(), "HTTP 500"), Equals, true)
+       c.Check(errNotFound.Temporary(), Equals, true)
        c.Check(n, Equals, int64(0))
        c.Check(url2, Equals, "")
        c.Check(r, Equals, nil)
@@ -599,7 +605,10 @@ func (s *StandaloneSuite) TestGetNetError(c *C) {
        kc.SetServiceRoots(map[string]string{"x": "http://localhost:62222"}, nil, nil)
 
        r, n, url2, err := kc.Get(hash)
-       c.Check(err, Equals, BlockNotFound)
+       errNotFound, _ := err.(*ErrNotFound)
+       c.Check(errNotFound, NotNil)
+       c.Check(strings.Contains(errNotFound.Error(), "connection refused"), Equals, true)
+       c.Check(errNotFound.Temporary(), Equals, true)
        c.Check(n, Equals, int64(0))
        c.Check(url2, Equals, "")
        c.Check(r, Equals, nil)
@@ -808,6 +817,7 @@ func (s *StandaloneSuite) TestGetWithFailures(c *C) {
        }
 
        kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+       kc.Retries = 0
 
        // This test works only if one of the failing services is
        // attempted before the succeeding service. Otherwise,
@@ -1186,3 +1196,53 @@ func (s *StandaloneSuite) TestGetIndexWithNoSuchPrefix(c *C) {
        c.Check(err2, Equals, nil)
        c.Check(content, DeepEquals, st.body[0:len(st.body)-1])
 }
+
+type FailThenSucceedPutHandler struct {
+       handled        chan string
+       count          int
+       successhandler StubPutHandler
+}
+
+func (h *FailThenSucceedPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+       if h.count == 0 {
+               resp.WriteHeader(500)
+               h.count += 1
+               h.handled <- fmt.Sprintf("http://%s", req.Host)
+       } else {
+               h.successhandler.ServeHTTP(resp, req)
+       }
+}
+
+func (s *StandaloneSuite) TestPutBRetry(c *C) {
+       st := &FailThenSucceedPutHandler{make(chan string, 1), 0,
+               StubPutHandler{
+                       c,
+                       Md5String("foo"),
+                       "abc123",
+                       "foo",
+                       make(chan string, 5)}}
+
+       arv, _ := arvadosclient.MakeArvadosClient()
+       kc, _ := MakeKeepClient(&arv)
+
+       kc.Want_replicas = 2
+       arv.ApiToken = "abc123"
+       localRoots := make(map[string]string)
+       writableLocalRoots := make(map[string]string)
+
+       ks := RunSomeFakeKeepServers(st, 2)
+
+       for i, k := range ks {
+               localRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+               writableLocalRoots[fmt.Sprintf("zzzzz-bi6l4-fakefakefake%03d", i)] = k.url
+               defer k.listener.Close()
+       }
+
+       kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
+
+       hash, replicas, err := kc.PutB([]byte("foo"))
+
+       c.Check(err, Equals, nil)
+       c.Check(hash, Equals, "")
+       c.Check(replicas, Equals, 2)
+}
index f5554618feece00d04c34982347ffcc8e1e405aa..4a210243defcd0bd33ea130dd17c96b2c151534f 100644 (file)
@@ -235,7 +235,7 @@ func (this *KeepClient) putReplicas(
 
        // Take the hash of locator and timestamp in order to identify this
        // specific transaction in log statements.
-       requestId := fmt.Sprintf("%x", md5.Sum([]byte(locator+time.Now().String())))[0:8]
+       requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
 
        // Calculate the ordering for uploading to servers
        sv := NewRootSorter(this.WritableLocalRoots(), hash).GetSortedRoots()
@@ -269,34 +269,53 @@ func (this *KeepClient) putReplicas(
                replicasPerThread = remaining_replicas
        }
 
-       for remaining_replicas > 0 {
-               for active*replicasPerThread < remaining_replicas {
-                       // Start some upload requests
-                       if next_server < len(sv) {
-                               log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
-                               go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
-                               next_server += 1
-                               active += 1
-                       } else {
-                               if active == 0 {
-                                       return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+       retriesRemaining := 1 + this.Retries
+       var retryServers []string
+
+       for retriesRemaining > 0 {
+               retriesRemaining -= 1
+               next_server = 0
+               retryServers = []string{}
+               for remaining_replicas > 0 {
+                       for active*replicasPerThread < remaining_replicas {
+                               // Start some upload requests
+                               if next_server < len(sv) {
+                                       log.Printf("[%v] Begin upload %s to %s", requestId, hash, sv[next_server])
+                                       go this.uploadToKeepServer(sv[next_server], hash, tr.MakeStreamReader(), upload_status, expectedLength, requestId)
+                                       next_server += 1
+                                       active += 1
                                } else {
-                                       break
+                                       if active == 0 && retriesRemaining == 0 {
+                                               return locator, (this.Want_replicas - remaining_replicas), InsufficientReplicasError
+                                       } else {
+                                               break
+                                       }
+                               }
+                       }
+                       log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
+                               requestId, remaining_replicas, active)
+
+                       // Now wait for something to happen.
+                       if active > 0 {
+                               status := <-upload_status
+                               active -= 1
+
+                               if status.statusCode == 200 {
+                                       // good news!
+                                       remaining_replicas -= status.replicas_stored
+                                       locator = status.response
+                               } else if status.statusCode == 0 || status.statusCode == 408 || status.statusCode == 429 ||
+                                       (status.statusCode >= 500 && status.statusCode != 503) {
+                                       // Timeout, too many requests, or other server side failure
+                                       // Do not retry when status code is 503, which means the keep server is full
+                                       retryServers = append(retryServers, status.url[0:strings.LastIndex(status.url, "/")])
                                }
+                       } else {
+                               break
                        }
                }
-               log.Printf("[%v] Replicas remaining to write: %v active uploads: %v",
-                       requestId, remaining_replicas, active)
-
-               // Now wait for something to happen.
-               status := <-upload_status
-               active -= 1
 
-               if status.statusCode == 200 {
-                       // good news!
-                       remaining_replicas -= status.replicas_stored
-                       locator = status.response
-               }
+               sv = retryServers
        }
 
        return locator, this.Want_replicas, nil
index 1df64703a9577d7338f60f5dd944b1c72d854064..b74f828f4bd04a2a6321aa50e5f823cb3a2496ab 100644 (file)
@@ -23,6 +23,7 @@ from collection import CollectionReader, CollectionWriter, ResumableCollectionWr
 from keep import *
 from stream import *
 from arvfile import StreamFileReader
+from retry import RetryLoop
 import errors
 import util
 
@@ -37,36 +38,60 @@ logger.addHandler(log_handler)
 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
                 else logging.WARNING)
 
-def task_set_output(self,s):
-    api('v1').job_tasks().update(uuid=self['uuid'],
-                                 body={
-            'output':s,
-            'success':True,
-            'progress':1.0
-            }).execute()
+def task_set_output(self, s, num_retries=5):
+    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
+        try:
+            return api('v1').job_tasks().update(
+                uuid=self['uuid'],
+                body={
+                    'output':s,
+                    'success':True,
+                    'progress':1.0
+                }).execute()
+        except errors.ApiError as error:
+            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+                logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+            else:
+                raise
 
 _current_task = None
-def current_task():
+def current_task(num_retries=5):
     global _current_task
     if _current_task:
         return _current_task
-    t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
-    t = UserDict.UserDict(t)
-    t.set_output = types.MethodType(task_set_output, t)
-    t.tmpdir = os.environ['TASK_WORK']
-    _current_task = t
-    return t
+
+    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
+        try:
+            task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
+            task = UserDict.UserDict(task)
+            task.set_output = types.MethodType(task_set_output, task)
+            task.tmpdir = os.environ['TASK_WORK']
+            _current_task = task
+            return task
+        except errors.ApiError as error:
+            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+                logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+            else:
+                raise
 
 _current_job = None
-def current_job():
+def current_job(num_retries=5):
     global _current_job
     if _current_job:
         return _current_job
-    t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
-    t = UserDict.UserDict(t)
-    t.tmpdir = os.environ['JOB_WORK']
-    _current_job = t
-    return t
+
+    for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
+        try:
+            job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
+            job = UserDict.UserDict(job)
+            job.tmpdir = os.environ['JOB_WORK']
+            _current_job = job
+            return job
+        except errors.ApiError as error:
+            if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
+                logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
+            else:
+                raise
 
 def getjobparam(*args):
     return current_job()['script_parameters'].get(*args)
index 086487aa09714e2873e2b49ac7cafff222ea0d1b..5ec5ac2e8aea92fb105265053c357a3ba6275c68 100644 (file)
@@ -4,6 +4,7 @@ import json
 import logging
 import os
 import re
+import socket
 import types
 
 import apiclient
@@ -60,6 +61,16 @@ def _intercept_http_request(self, uri, **kwargs):
         # previous call did not succeed, so this is slightly
         # risky.
         return self.orig_http_request(uri, **kwargs)
+    except socket.error:
+        # This is the one case where httplib2 doesn't close the
+        # underlying connection first.  Close all open connections,
+        # expecting this object only has the one connection to the API
+        # server.  This is safe because httplib2 reopens connections when
+        # needed.
+        _logger.debug("Retrying API request after socket error", exc_info=True)
+        for conn in self.connections.itervalues():
+            conn.close()
+        return self.orig_http_request(uri, **kwargs)
 
 def _patch_http_request(http, api_token):
     http.arvados_api_token = api_token
index be94e7304a34f3a2cdc829cd9a9f4b8339aaaf33..8df945a063f3c76999c882b8f27dc20649c07d8c 100644 (file)
@@ -52,7 +52,7 @@ def is_in_collection(root, branch):
         else:
             sp = os.path.split(root)
             return is_in_collection(sp[0], os.path.join(sp[1], branch))
-    except IOError, OSError:
+    except (IOError, OSError):
         return (None, None)
 
 # Determine the project to place the output of this command by searching upward
@@ -73,7 +73,7 @@ def determine_project(root, current_user):
         else:
             sp = os.path.split(root)
             return determine_project(sp[0], current_user)
-    except IOError, OSError:
+    except (IOError, OSError):
         return current_user
 
 # Determine if string corresponds to a file, and if that file is part of a
index 0754744241a97a0011fb71ace5792a7be0c27148..8ed86fd79e96144886744dba81786ef21ef12ae8 100644 (file)
@@ -241,19 +241,34 @@ class KeepClient(object):
         Should be used in a "with" block.
         """
         def __init__(self, todo):
+            self._started = 0
             self._todo = todo
             self._done = 0
             self._response = None
+            self._start_lock = threading.Condition()
             self._todo_lock = threading.Semaphore(todo)
             self._done_lock = threading.Lock()
+            self._local = threading.local()
 
         def __enter__(self):
+            self._start_lock.acquire()
+            if getattr(self._local, 'sequence', None) is not None:
+                # If the calling thread has used set_sequence(N), then
+                # we wait here until N other threads have started.
+                while self._started < self._local.sequence:
+                    self._start_lock.wait()
             self._todo_lock.acquire()
+            self._started += 1
+            self._start_lock.notifyAll()
+            self._start_lock.release()
             return self
 
         def __exit__(self, type, value, traceback):
             self._todo_lock.release()
 
+        def set_sequence(self, sequence):
+            self._local.sequence = sequence
+
         def shall_i_proceed(self):
             """
             Return true if the current thread should do stuff. Return
@@ -517,7 +532,11 @@ class KeepClient(object):
             return self._success
 
         def run(self):
-            with self.args['thread_limiter'] as limiter:
+            limiter = self.args['thread_limiter']
+            sequence = self.args['thread_sequence']
+            if sequence is not None:
+                limiter.set_sequence(sequence)
+            with limiter:
                 if not limiter.shall_i_proceed():
                     # My turn arrived, but the job has been done without
                     # me.
@@ -950,9 +969,10 @@ class KeepClient(object):
         thread_limiter = KeepClient.ThreadLimiter(1 if self.max_replicas_per_service is None else copies)
         loop = retry.RetryLoop(num_retries, self._check_loop_result,
                                backoff_start=2)
+        thread_sequence = 0
         for tries_left in loop:
             try:
-                local_roots = self.map_new_services(
+                sorted_roots = self.map_new_services(
                     roots_map, locator,
                     force_rebuild=(tries_left < num_retries), need_writable=True, **headers)
             except Exception as error:
@@ -960,7 +980,8 @@ class KeepClient(object):
                 continue
 
             threads = []
-            for service_root, ks in roots_map.iteritems():
+            for service_root, ks in [(root, roots_map[root])
+                                     for root in sorted_roots]:
                 if ks.finished():
                     continue
                 t = KeepClient.KeepWriterThread(
@@ -969,9 +990,11 @@ class KeepClient(object):
                     data_hash=data_hash,
                     service_root=service_root,
                     thread_limiter=thread_limiter,
-                    timeout=self.current_timeout(num_retries-tries_left))
+                    timeout=self.current_timeout(num_retries-tries_left),
+                    thread_sequence=thread_sequence)
                 t.start()
                 threads.append(t)
+                thread_sequence += 1
             for t in threads:
                 t.join()
             loop.save_result((thread_limiter.done() >= copies, len(threads)))
@@ -984,7 +1007,7 @@ class KeepClient(object):
                     data_hash, loop.last_result()))
         else:
             service_errors = ((key, roots_map[key].last_result()['error'])
-                              for key in local_roots
+                              for key in sorted_roots
                               if roots_map[key].last_result()['error'])
             raise arvados.errors.KeepWriteError(
                 "failed to write {} (wanted {} copies but wrote {})".format(
index 6e2a07888662172fd6f26b335a0206db4ef15e98..ea318c6de05c9624d8517d15e8c4071f287a4ab9 100644 (file)
@@ -43,6 +43,10 @@ def mock_responses(body, *codes, **headers):
     return mock.patch('httplib2.Http.request', side_effect=queue_with((
         (fake_httplib2_response(code, **headers), body) for code in codes)))
 
+def mock_api_responses(api_client, body, codes, headers={}):
+    return mock.patch.object(api_client._http, 'request', side_effect=queue_with((
+        (fake_httplib2_response(code, **headers), body) for code in codes)))
+
 
 class FakeCurl:
     @classmethod
index 61966054a08f1cb15c4e000ca47e302b5dd7a248..d8a207f2cf2ce506d9406a646272e92c51c201e6 100644 (file)
@@ -3,7 +3,7 @@ error_log stderr info;          # Yes, must be specified here _and_ cmdline
 events {
 }
 http {
-  access_log /dev/stderr combined;
+  access_log {{ACCESSLOG}} combined;
   upstream arv-git-http {
     server localhost:{{GITPORT}};
   }
index d90d2ad1a7b61dd567bc6931c95a06b8d8463226..d325b4eb6ecb086d15effa34bc26db3e95c9ad15 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import print_function
 import argparse
 import atexit
+import errno
 import httplib2
 import os
 import pipes
@@ -54,9 +55,7 @@ def find_server_pid(PID_PATH, wait=10):
             with open(PID_PATH, 'r') as f:
                 server_pid = int(f.read())
             good_pid = (os.kill(server_pid, 0) is None)
-        except IOError:
-            good_pid = False
-        except OSError:
+        except EnvironmentError:
             good_pid = False
         now = time.time()
 
@@ -91,9 +90,7 @@ def kill_server_pid(pidfile, wait=10, passenger_root=False):
             os.getpgid(server_pid)
             time.sleep(0.1)
             now = time.time()
-    except IOError:
-        pass
-    except OSError:
+    except EnvironmentError:
         pass
 
 def find_available_port():
@@ -448,6 +445,7 @@ def run_nginx():
     nginxconf['GITSSLPORT'] = find_available_port()
     nginxconf['SSLCERT'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.pem')
     nginxconf['SSLKEY'] = os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'self-signed.key')
+    nginxconf['ACCESSLOG'] = os.path.join(TEST_TMPDIR, 'nginx_access_log.fifo')
 
     conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
     conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
@@ -459,12 +457,23 @@ def run_nginx():
 
     env = os.environ.copy()
     env['PATH'] = env['PATH']+':/sbin:/usr/sbin:/usr/local/sbin'
+
+    try:
+        os.remove(nginxconf['ACCESSLOG'])
+    except OSError as error:
+        if error.errno != errno.ENOENT:
+            raise
+
+    os.mkfifo(nginxconf['ACCESSLOG'], 0700)
     nginx = subprocess.Popen(
         ['nginx',
          '-g', 'error_log stderr info;',
          '-g', 'pid '+_pidfile('nginx')+';',
          '-c', conffile],
         env=env, stdin=open('/dev/null'), stdout=sys.stderr)
+    cat_access = subprocess.Popen(
+        ['cat', nginxconf['ACCESSLOG']],
+        stdout=sys.stderr)
     _setport('keepproxy-ssl', nginxconf['KEEPPROXYSSLPORT'])
     _setport('arv-git-httpd-ssl', nginxconf['GITSSLPORT'])
 
index 9d438e2e038ecca70225009d261e5229d61a81c3..6d1e9798cfbecf72cc23e30e33bc590274bcec6c 100644 (file)
@@ -6,60 +6,32 @@ import httplib2
 import json
 import mimetypes
 import os
-import run_test_server
+import socket
 import string
 import unittest
+
+import mock
+import run_test_server
+
 from apiclient import errors as apiclient_errors
 from apiclient import http as apiclient_http
 from arvados.api import OrderedJsonModel
-
 from arvados_testutil import fake_httplib2_response
 
 if not mimetypes.inited:
     mimetypes.init()
 
-class ArvadosApiClientTest(unittest.TestCase):
+class ArvadosApiTest(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
     ERROR_HEADERS = {'Content-Type': mimetypes.types_map['.json']}
 
-    @classmethod
-    def api_error_response(cls, code, *errors):
-        return (fake_httplib2_response(code, **cls.ERROR_HEADERS),
+    def api_error_response(self, code, *errors):
+        return (fake_httplib2_response(code, **self.ERROR_HEADERS),
                 json.dumps({'errors': errors,
                             'error_token': '1234567890+12345678'}))
 
-    @classmethod
-    def setUpClass(cls):
-        # The apiclient library has support for mocking requests for
-        # testing, but it doesn't extend to the discovery document
-        # itself. For now, bring up an API server that will serve
-        # a discovery document.
-        # FIXME: Figure out a better way to stub this out.
-        run_test_server.run()
-        mock_responses = {
-            'arvados.humans.delete': (
-                fake_httplib2_response(500, **cls.ERROR_HEADERS),
-                ""),
-            'arvados.humans.get': cls.api_error_response(
-                422, "Bad UUID format", "Bad output format"),
-            'arvados.humans.list': (None, json.dumps(
-                    {'items_available': 0, 'items': []})),
-            }
-        req_builder = apiclient_http.RequestMockBuilder(mock_responses)
-        cls.api = arvados.api('v1',
-                              host=os.environ['ARVADOS_API_HOST'],
-                              token='discovery-doc-only-no-token-needed',
-                              insecure=True,
-                              requestBuilder=req_builder)
-
-    def tearDown(cls):
-        run_test_server.reset()
-
     def test_new_api_objects_with_cache(self):
-        clients = [arvados.api('v1', cache=True,
-                               host=os.environ['ARVADOS_API_HOST'],
-                               token='discovery-doc-only-no-token-needed',
-                               insecure=True)
-                   for index in [0, 1]]
+        clients = [arvados.api('v1', cache=True) for index in [0, 1]]
         self.assertIsNot(*clients)
 
     def test_empty_list(self):
@@ -92,15 +64,28 @@ class ArvadosApiClientTest(unittest.TestCase):
                     new_item['created_at']))
 
     def test_exceptions_include_errors(self):
+        mock_responses = {
+            'arvados.humans.get': self.api_error_response(
+                422, "Bad UUID format", "Bad output format"),
+            }
+        req_builder = apiclient_http.RequestMockBuilder(mock_responses)
+        api = arvados.api('v1', requestBuilder=req_builder)
         with self.assertRaises(apiclient_errors.HttpError) as err_ctx:
-            self.api.humans().get(uuid='xyz-xyz-abcdef').execute()
+            api.humans().get(uuid='xyz-xyz-abcdef').execute()
         err_s = str(err_ctx.exception)
         for msg in ["Bad UUID format", "Bad output format"]:
             self.assertIn(msg, err_s)
 
     def test_exceptions_without_errors_have_basic_info(self):
+        mock_responses = {
+            'arvados.humans.delete': (
+                fake_httplib2_response(500, **self.ERROR_HEADERS),
+                "")
+            }
+        req_builder = apiclient_http.RequestMockBuilder(mock_responses)
+        api = arvados.api('v1', requestBuilder=req_builder)
         with self.assertRaises(apiclient_errors.HttpError) as err_ctx:
-            self.api.humans().delete(uuid='xyz-xyz-abcdef').execute()
+            api.humans().delete(uuid='xyz-xyz-abcdef').execute()
         self.assertIn("500", str(err_ctx.exception))
 
     def test_request_too_large(self):
@@ -117,14 +102,25 @@ class ArvadosApiClientTest(unittest.TestCase):
             }
         req_builder = apiclient_http.RequestMockBuilder(mock_responses)
         api = arvados.api('v1',
-                          host=os.environ['ARVADOS_API_HOST'],
-                          token='discovery-doc-only-no-token-needed',
-                          insecure=True,
-                          requestBuilder=req_builder,
-                          model=OrderedJsonModel())
+                          requestBuilder=req_builder, model=OrderedJsonModel())
         result = api.humans().get(uuid='test').execute()
         self.assertEqual(string.hexdigits, ''.join(result.keys()))
 
+    def test_socket_errors_retried(self):
+        api = arvados.api('v1')
+        self.assertTrue(hasattr(api._http, 'orig_http_request'),
+                        "test doesn't know how to intercept HTTP requests")
+        api._http.orig_http_request = mock.MagicMock()
+        mock_response = {'user': 'person'}
+        api._http.orig_http_request.side_effect = [
+            socket.error("mock error"),
+            (fake_httplib2_response(200), json.dumps(mock_response))
+            ]
+        actual_response = api.users().current().execute()
+        self.assertEqual(mock_response, actual_response)
+        self.assertGreater(api._http.orig_http_request.call_count, 1,
+                           "client got the right response without retrying")
+
 
 if __name__ == '__main__':
     unittest.main()
index c44379bac79465417e9a7d128d1aa47f13d6a6fa..90468924a668b09e9116084adf20581878a382b2 100644 (file)
@@ -332,39 +332,158 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 mock.responses[0].getopt(pycurl.TIMEOUT_MS),
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]*1000))
 
-    def test_probe_order_reference_set(self):
+    def check_no_services_error(self, verb, exc_class):
+        api_client = mock.MagicMock(name='api_client')
+        api_client.keep_services().accessible().execute.side_effect = (
+            arvados.errors.ApiError)
+        keep_client = arvados.KeepClient(api_client=api_client)
+        with self.assertRaises(exc_class) as err_check:
+            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
+        self.assertEqual(0, len(err_check.exception.request_errors()))
+
+    def test_get_error_with_no_services(self):
+        self.check_no_services_error('get', arvados.errors.KeepReadError)
+
+    def test_put_error_with_no_services(self):
+        self.check_no_services_error('put', arvados.errors.KeepWriteError)
+
+    def check_errors_from_last_retry(self, verb, exc_class):
+        api_client = self.mock_keep_services(count=2)
+        req_mock = tutil.mock_keep_responses(
+            "retry error reporting test", 500, 500, 403, 403)
+        with req_mock, tutil.skip_sleep, \
+                self.assertRaises(exc_class) as err_check:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
+                                       num_retries=3)
+        self.assertEqual([403, 403], [
+                getattr(error, 'status_code', None)
+                for error in err_check.exception.request_errors().itervalues()])
+
+    def test_get_error_reflects_last_retry(self):
+        self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
+
+    def test_put_error_reflects_last_retry(self):
+        self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
+
+    def test_put_error_does_not_include_successful_puts(self):
+        data = 'partial failure test'
+        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        api_client = self.mock_keep_services(count=3)
+        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
+                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
+            keep_client = arvados.KeepClient(api_client=api_client)
+            keep_client.put(data)
+        self.assertEqual(2, len(exc_check.exception.request_errors()))
+
+    def test_proxy_put_with_no_writable_services(self):
+        data = 'test with no writable services'
+        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
+        api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
+        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
+                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
+          keep_client = arvados.KeepClient(api_client=api_client)
+          keep_client.put(data)
+        self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
+        self.assertEqual(0, len(exc_check.exception.request_errors()))
+
+
+@tutil.skip_sleep
+class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
+
+    def setUp(self):
         # expected_order[i] is the probe order for
         # hash=md5(sprintf("%064x",i)) where there are 16 services
         # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
         # the first probe for the block consisting of 64 "0"
         # characters is the service whose uuid is
         # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
-        expected_order = [
+        self.services = 16
+        self.expected_order = [
             list('3eab2d5fc9681074'),
             list('097dba52e648f1c3'),
             list('c5b4e023f8a7d691'),
             list('9d81c02e76a3bf54'),
             ]
-        hashes = [
-            hashlib.md5("{:064x}".format(x)).hexdigest()
-            for x in range(len(expected_order))]
-        api_client = self.mock_keep_services(count=16)
-        keep_client = arvados.KeepClient(api_client=api_client)
-        for i, hash in enumerate(hashes):
-            roots = keep_client.weighted_service_roots(arvados.KeepLocator(hash))
+        self.blocks = [
+            "{:064x}".format(x)
+            for x in range(len(self.expected_order))]
+        self.hashes = [
+            hashlib.md5(self.blocks[x]).hexdigest()
+            for x in range(len(self.expected_order))]
+        self.api_client = self.mock_keep_services(count=self.services)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+
+    def test_weighted_service_roots_against_reference_set(self):
+        # Confirm weighted_service_roots() returns the correct order
+        for i, hash in enumerate(self.hashes):
+            roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
             got_order = [
                 re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
                 for root in roots]
-            self.assertEqual(expected_order[i], got_order)
+            self.assertEqual(self.expected_order[i], got_order)
+
+    def test_get_probe_order_against_reference_set(self):
+        self._test_probe_order_against_reference_set(
+            lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
+
+    def test_put_probe_order_against_reference_set(self):
+        # copies=1 prevents the test from being sensitive to races
+        # between writer threads.
+        self._test_probe_order_against_reference_set(
+            lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
+
+    def _test_probe_order_against_reference_set(self, op):
+        for i in range(len(self.blocks)):
+            with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
+                 self.assertRaises(arvados.errors.KeepRequestError):
+                op(i)
+            got_order = [
+                re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
+                for resp in mock.responses]
+            self.assertEqual(self.expected_order[i]*2, got_order)
+
+    def test_put_probe_order_multiple_copies(self):
+        for copies in range(2, 4):
+            for i in range(len(self.blocks)):
+                with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
+                     self.assertRaises(arvados.errors.KeepWriteError):
+                    self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
+                got_order = [
+                    re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
+                    for resp in mock.responses]
+                # With T threads racing to make requests, the position
+                # of a given server in the sequence of HTTP requests
+                # (got_order) cannot be more than T-1 positions
+                # earlier than that server's position in the reference
+                # probe sequence (expected_order).
+                #
+                # Loop invariant: we have accounted for +pos+ expected
+                # probes, either by seeing them in +got_order+ or by
+                # putting them in +pending+ in the hope of seeing them
+                # later. As long as +len(pending)<T+, we haven't
+                # started a request too early.
+                pending = []
+                for pos, expected in enumerate(self.expected_order[i]*3):
+                    got = got_order[pos-len(pending)]
+                    while got in pending:
+                        del pending[pending.index(got)]
+                        got = got_order[pos-len(pending)]
+                    if got != expected:
+                        pending.append(expected)
+                        self.assertLess(
+                            len(pending), copies,
+                            "pending={}, with copies={}, got {}, expected {}".format(
+                                pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
 
     def test_probe_waste_adding_one_server(self):
         hashes = [
             hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
         initial_services = 12
-        api_client = self.mock_keep_services(count=initial_services)
-        keep_client = arvados.KeepClient(api_client=api_client)
+        self.api_client = self.mock_keep_services(count=initial_services)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client)
         probes_before = [
-            keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
+            self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
         for added_services in range(1, 12):
             api_client = self.mock_keep_services(count=initial_services+added_services)
             keep_client = arvados.KeepClient(api_client=api_client)
@@ -402,7 +521,7 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
             data = hashlib.md5(data).hexdigest() + '+1234'
         # Arbitrary port number:
         aport = random.randint(1024,65535)
-        api_client = self.mock_keep_services(service_port=aport, count=16)
+        api_client = self.mock_keep_services(service_port=aport, count=self.services)
         keep_client = arvados.KeepClient(api_client=api_client)
         with mock.patch('pycurl.Curl') as curl_mock, \
              self.assertRaises(exc_class) as err_check:
@@ -419,60 +538,6 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
     def test_put_error_shows_probe_order(self):
         self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
 
-    def check_no_services_error(self, verb, exc_class):
-        api_client = mock.MagicMock(name='api_client')
-        api_client.keep_services().accessible().execute.side_effect = (
-            arvados.errors.ApiError)
-        keep_client = arvados.KeepClient(api_client=api_client)
-        with self.assertRaises(exc_class) as err_check:
-            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
-        self.assertEqual(0, len(err_check.exception.request_errors()))
-
-    def test_get_error_with_no_services(self):
-        self.check_no_services_error('get', arvados.errors.KeepReadError)
-
-    def test_put_error_with_no_services(self):
-        self.check_no_services_error('put', arvados.errors.KeepWriteError)
-
-    def check_errors_from_last_retry(self, verb, exc_class):
-        api_client = self.mock_keep_services(count=2)
-        req_mock = tutil.mock_keep_responses(
-            "retry error reporting test", 500, 500, 403, 403)
-        with req_mock, tutil.skip_sleep, \
-                self.assertRaises(exc_class) as err_check:
-            keep_client = arvados.KeepClient(api_client=api_client)
-            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
-                                       num_retries=3)
-        self.assertEqual([403, 403], [
-                getattr(error, 'status_code', None)
-                for error in err_check.exception.request_errors().itervalues()])
-
-    def test_get_error_reflects_last_retry(self):
-        self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
-
-    def test_put_error_reflects_last_retry(self):
-        self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
-
-    def test_put_error_does_not_include_successful_puts(self):
-        data = 'partial failure test'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
-        api_client = self.mock_keep_services(count=3)
-        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
-                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
-            keep_client = arvados.KeepClient(api_client=api_client)
-            keep_client.put(data)
-        self.assertEqual(2, len(exc_check.exception.request_errors()))
-
-    def test_proxy_put_with_no_writable_services(self):
-        data = 'test with no writable services'
-        data_loc = '{}+{}'.format(hashlib.md5(data).hexdigest(), len(data))
-        api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
-        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
-                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
-          keep_client = arvados.KeepClient(api_client=api_client)
-          keep_client.put(data)
-        self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
-        self.assertEqual(0, len(exc_check.exception.request_errors()))
 
 class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
     DATA = 'x' * 2**10
diff --git a/sdk/python/tests/test_retry_job_helpers.py b/sdk/python/tests/test_retry_job_helpers.py
new file mode 100644 (file)
index 0000000..6e562a0
--- /dev/null
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+
+import mock
+import os
+import unittest
+import hashlib
+import run_test_server
+import json
+import arvados
+import arvados_testutil as tutil
+from apiclient import http as apiclient_http
+
+
+@tutil.skip_sleep
+class ApiClientRetryTestMixin(object):
+
+    TEST_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+    TEST_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
+
+    @classmethod
+    def setUpClass(cls):
+        run_test_server.run()
+
+    def setUp(self):
+        # Patch arvados.api() to return our mock API, so we can mock
+        # its http requests.
+        self.api_client = arvados.api('v1', cache=False)
+        self.api_patch = mock.patch('arvados.api', return_value=self.api_client)
+        self.api_patch.start()
+
+    def tearDown(self):
+        self.api_patch.stop()
+
+    def run_method(self):
+        raise NotImplementedError("test subclasses must define run_method")
+
+    def test_immediate_success(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [200]):
+            self.run_method()
+
+    def test_immediate_failure(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [400]), self.assertRaises(self.DEFAULT_EXCEPTION):
+            self.run_method()
+
+    def test_retry_then_success(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [500, 200]):
+            self.run_method()
+
+    def test_error_after_default_retries_exhausted(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [500, 500, 500, 500, 500, 500, 200]), self.assertRaises(self.DEFAULT_EXCEPTION):
+            self.run_method()
+
+    def test_no_retry_after_immediate_success(self):
+        with tutil.mock_api_responses(self.api_client, '{}', [200, 400]):
+            self.run_method()
+
+
+class CurrentJobTestCase(ApiClientRetryTestMixin, unittest.TestCase):
+
+    DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+    def setUp(self):
+        super(CurrentJobTestCase, self).setUp()
+        os.environ['JOB_UUID'] = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+        os.environ['JOB_WORK'] = '.'
+
+    def tearDown(self):
+        del os.environ['JOB_UUID']
+        del os.environ['JOB_WORK']
+        arvados._current_job = None
+        super(CurrentJobTestCase, self).tearDown()
+
+    def run_method(self):
+        arvados.current_job()
+
+
+class CurrentTaskTestCase(ApiClientRetryTestMixin, unittest.TestCase):
+
+    DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+    def setUp(self):
+        super(CurrentTaskTestCase, self).setUp()
+        os.environ['TASK_UUID'] = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
+        os.environ['TASK_WORK'] = '.'
+
+    def tearDown(self):
+        del os.environ['TASK_UUID']
+        del os.environ['TASK_WORK']
+        arvados._current_task = None
+        super(CurrentTaskTestCase, self).tearDown()
+
+    def run_method(self):
+        arvados.current_task()
+
+
+class TaskSetOutputTestCase(CurrentTaskTestCase, unittest.TestCase):
+
+    DEFAULT_EXCEPTION = arvados.errors.ApiError
+
+    def tearDown(self):
+        super(TaskSetOutputTestCase, self).tearDown()
+        run_test_server.reset()
+
+    def run_method(self, locator=ApiClientRetryTestMixin.TEST_LOCATOR):
+        arvados.task_set_output({'uuid':self.TEST_UUID},s=locator)
index 1193e915363e80e7ccc70a0ce16c731dc3ebcaeb..3b4330935c568a035178b594857f1e4c8916b37c 100644 (file)
@@ -74,7 +74,7 @@ gem 'faye-websocket'
 gem 'themes_for_rails'
 
 gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>=  0.1.20150605170031'
+gem 'arvados-cli', '>= 0.1.20151023185755'
 
 # pg_power lets us use partial indexes in schema.rb in Rails 3
 gem 'pg_power'
index 149770c9ac38c9e40b311fae2b429eebd09009d1..3681952508509b07ea13168a712ecfa685165857 100644 (file)
@@ -21,7 +21,7 @@ production:
   sso_app_id: ~
   sso_provider_url: ~
   workbench_address: ~
-  websockets_address: ~
+  websocket_address: ~
   #git_repositories_dir: ~
   #git_internal_dir: ~
 
@@ -34,7 +34,7 @@ development:
   sso_app_secret: ~
   sso_provider_url: ~
   workbench_address: ~
-  websockets_address: ~
+  websocket_address: ~
   #git_repositories_dir: ~
   #git_internal_dir: ~
 
index b3e49c59a0954ee886b4969880271a7ec5c2291c..35c2f4884f4f99e2894c5125776edfb0db32895c 100644 (file)
@@ -25,7 +25,7 @@ func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
        }
        h := newGitHandler()
        h.(*gitHandler).Path = "/bin/sh"
-       h.(*gitHandler).Args = []string{"-c", "echo HTTP/1.1 200 OK; echo Content-Type: text/plain; echo; env"}
+       h.(*gitHandler).Args = []string{"-c", "printf 'Content-Type: text/plain\r\n\r\n'; env"}
        os.Setenv("GITOLITE_HTTP_HOME", "/test/ghh")
        os.Setenv("GL_BYPASS_ACCESS_CHECKS", "yesplease")
 
@@ -40,14 +40,14 @@ func (s *GitHandlerSuite) TestEnvVars(c *check.C) {
        c.Check(body, check.Matches, `(?ms).*^SERVER_ADDR=`+regexp.QuoteMeta(theConfig.Addr)+`$.*`)
 }
 
-func (s *GitHandlerSuite) TestCGIError(c *check.C) {
+func (s *GitHandlerSuite) TestCGIErrorOnSplitHostPortError(c *check.C) {
        u, err := url.Parse("git.zzzzz.arvadosapi.com/test")
        c.Check(err, check.Equals, nil)
        resp := httptest.NewRecorder()
        req := &http.Request{
                Method:     "GET",
                URL:        u,
-               RemoteAddr: "bogus",
+               RemoteAddr: "test.bad.address.missing.port",
        }
        h := newGitHandler()
        h.ServeHTTP(resp, req)
index 3d9bb3da90a2c828207318908f1ee083d0d0d5f3..c2cb762d52b625b625634f24d385ddbf9ad4e7d8 100644 (file)
@@ -31,7 +31,7 @@ func SetupDataManagerTest(t *testing.T) {
        // start api and keep servers
        arvadostest.ResetEnv()
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 
        arv = makeArvadosClient()
 
@@ -54,7 +54,7 @@ func SetupDataManagerTest(t *testing.T) {
 }
 
 func TearDownDataManagerTest(t *testing.T) {
-       arvadostest.StopKeep()
+       arvadostest.StopKeep(2)
        arvadostest.StopAPI()
 }
 
index f7677ec5fed3c28e8bab6c62fd452b0ce687ae9a..1a1189658d7ba1473aa33036cce60276932cb9b8 100644 (file)
@@ -362,7 +362,7 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                log.Println("Warning:", GetRemoteAddress(req), req.Method, proxiedURI, "Content-Length not provided")
        }
 
-       switch err {
+       switch respErr := err.(type) {
        case nil:
                status = http.StatusOK
                resp.Header().Set("Content-Length", fmt.Sprint(expectLength))
@@ -375,10 +375,16 @@ func (this GetBlockHandler) ServeHTTP(resp http.ResponseWriter, req *http.Reques
                                err = ContentLengthMismatch
                        }
                }
-       case keepclient.BlockNotFound:
-               status = http.StatusNotFound
+       case keepclient.Error:
+               if respErr == keepclient.BlockNotFound {
+                       status = http.StatusNotFound
+               } else if respErr.Temporary() {
+                       status = http.StatusBadGateway
+               } else {
+                       status = 422
+               }
        default:
-               status = http.StatusBadGateway
+               status = http.StatusInternalServerError
        }
 }
 
index 917f0124adbec7a10305e700ea1bd76ea2d39f75..2c75ec1616e3b404a9547b6e2f969ce9fc1fe9f2 100644 (file)
@@ -30,6 +30,12 @@ var _ = Suite(&ServerRequiredSuite{})
 // Tests that require the Keep server running
 type ServerRequiredSuite struct{}
 
+// Gocheck boilerplate
+var _ = Suite(&NoKeepServerSuite{})
+
+// Test with no keepserver to simulate errors
+type NoKeepServerSuite struct{}
+
 // Wait (up to 1 second) for keepproxy to listen on a port. This
 // avoids a race condition where we hit a "connection refused" error
 // because we start testing the proxy too soon.
@@ -53,7 +59,7 @@ func closeListener() {
 
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 }
 
 func (s *ServerRequiredSuite) SetUpTest(c *C) {
@@ -61,7 +67,19 @@ func (s *ServerRequiredSuite) SetUpTest(c *C) {
 }
 
 func (s *ServerRequiredSuite) TearDownSuite(c *C) {
-       arvadostest.StopKeep()
+       arvadostest.StopKeep(2)
+       arvadostest.StopAPI()
+}
+
+func (s *NoKeepServerSuite) SetUpSuite(c *C) {
+       arvadostest.StartAPI()
+}
+
+func (s *NoKeepServerSuite) SetUpTest(c *C) {
+       arvadostest.ResetEnv()
+}
+
+func (s *NoKeepServerSuite) TearDownSuite(c *C) {
        arvadostest.StopAPI()
 }
 
@@ -251,7 +269,9 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
 
        {
                _, _, err := kc.Ask(hash)
-               c.Check(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                log.Print("Ask 1")
        }
 
@@ -265,14 +285,18 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
 
        {
                blocklen, _, err := kc.Ask(hash)
-               c.Assert(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
                log.Print("Ask 2")
        }
 
        {
                _, blocklen, _, err := kc.Get(hash)
-               c.Assert(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
                log.Print("Get")
        }
@@ -291,7 +315,9 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
 
        {
                _, _, err := kc.Ask(hash)
-               c.Check(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                log.Print("Ask 1")
        }
 
@@ -305,14 +331,18 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
 
        {
                blocklen, _, err := kc.Ask(hash)
-               c.Assert(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
                log.Print("Ask 2")
        }
 
        {
                _, blocklen, _, err := kc.Get(hash)
-               c.Assert(err, Equals, keepclient.BlockNotFound)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound, NotNil)
+               c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
                log.Print("Get")
        }
@@ -473,3 +503,84 @@ func (s *ServerRequiredSuite) TestGetIndex(c *C) {
        _, err = kc.GetIndex("proxy", "xyz")
        c.Assert((err != nil), Equals, true)
 }
+
+func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
+       kc := runProxy(c, []string{"keepproxy"}, 28852, false)
+       waitForListener()
+       defer closeListener()
+
+       // Put a test block
+       hash, rep, err := kc.PutB([]byte("foo"))
+       c.Check(err, Equals, nil)
+       c.Check(rep, Equals, 2)
+
+       for _, token := range []string{
+               "nosuchtoken",
+               "2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx", // expired
+       } {
+               // Change token to given bad token
+               kc.Arvados.ApiToken = token
+
+               // Ask should result in error
+               _, _, err = kc.Ask(hash)
+               c.Check(err, NotNil)
+               errNotFound, _ := err.(keepclient.ErrNotFound)
+               c.Check(errNotFound.Temporary(), Equals, false)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
+
+               // Get should result in error
+               _, _, _, err = kc.Get(hash)
+               c.Check(err, NotNil)
+               errNotFound, _ = err.(keepclient.ErrNotFound)
+               c.Check(errNotFound.Temporary(), Equals, false)
+               c.Assert(strings.Contains(err.Error(), "HTTP 403 \"Missing or invalid Authorization header\""), Equals, true)
+       }
+}
+
+func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, Equals, nil)
+
+       // keepclient with no such keep server
+       kc := keepclient.New(&arv)
+       locals := map[string]string{
+               "proxy": "http://localhost:12345",
+       }
+       kc.SetServiceRoots(locals, nil, nil)
+
+       // Ask should result in temporary connection refused error
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       _, _, err = kc.Ask(hash)
+       c.Check(err, NotNil)
+       errNotFound, _ := err.(*keepclient.ErrNotFound)
+       c.Check(errNotFound.Temporary(), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+
+       // Get should result in temporary connection refused error
+       _, _, _, err = kc.Get(hash)
+       c.Check(err, NotNil)
+       errNotFound, _ = err.(*keepclient.ErrNotFound)
+       c.Check(errNotFound.Temporary(), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "connection refused"), Equals, true)
+}
+
+func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
+       kc := runProxy(c, []string{"keepproxy"}, 29999, false)
+       waitForListener()
+       defer closeListener()
+
+       // Ask should result in temporary connection refused error
+       hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
+       _, _, err := kc.Ask(hash)
+       c.Check(err, NotNil)
+       errNotFound, _ := err.(*keepclient.ErrNotFound)
+       c.Check(errNotFound.Temporary(), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+
+       // Get should result in temporary connection refused error
+       _, _, _, err = kc.Get(hash)
+       c.Check(err, NotNil)
+       errNotFound, _ = err.(*keepclient.ErrNotFound)
+       c.Check(errNotFound.Temporary(), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "HTTP 502"), Equals, true)
+}
index e0bad0045af34d8067591fa723e9229be093790c..3a3069ab7745c1efc0a21fd8c706565f65c525e0 100644 (file)
@@ -27,7 +27,7 @@ func SetupPullWorkerIntegrationTest(t *testing.T, testData PullWorkIntegrationTe
 
        // start api and keep servers
        arvadostest.StartAPI()
-       arvadostest.StartKeep()
+       arvadostest.StartKeep(2, false)
 
        // make arvadosclient
        arv, err := arvadosclient.MakeArvadosClient()
index dfb26bc303d2b310f1186a1fc72baf26438608b1..ec5014e9f9cf1e8848353cf3c755e22875227850 100644 (file)
@@ -34,7 +34,15 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
     def _get_slurm_state(self):
         return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', self._nodename])
 
-    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
+    # The following methods retry on OSError.  This is intended to mitigate bug
+    # #6321 where fork() of node manager raises "OSError: [Errno 12] Cannot
+    # allocate memory" resulting in the untimely death of the shutdown actor
+    # and tends to result in node manager getting into a wedged state where it
+    # won't allocate new nodes or shut down gracefully.  The underlying causes
+    # of the excessive memory usage that result in the "Cannot allocate memory"
+    # error are still being investigated.
+
+    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
     def cancel_shutdown(self):
         if self._nodename:
             if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
@@ -46,15 +54,15 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
                 pass
         return super(ComputeNodeShutdownActor, self).cancel_shutdown()
 
+    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
     @ShutdownActorBase._stop_if_window_closed
-    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
     def issue_slurm_drain(self):
         self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
         self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
         self._later.await_slurm_drain()
 
+    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
     @ShutdownActorBase._stop_if_window_closed
-    @ShutdownActorBase._retry((subprocess.CalledProcessError,))
     def await_slurm_drain(self):
         output = self._get_slurm_state()
         if output in self.SLURM_END_STATES:
index 2ddf7676c8f8ddc4ca6f84630f38d35ca86abe6a..8648783bac5889f11a328af3b277bd1f21da5665 100644 (file)
@@ -73,6 +73,15 @@ class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
         self.check_success_flag(False, 2)
         self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99')
 
+    def test_cancel_shutdown_retry(self, proc_mock):
+        proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n'])
+        self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+        self.make_actor()
+        self.check_success_flag(False, 2)
+
+    def test_issue_slurm_drain_retry(self, proc_mock):
+        proc_mock.side_effect = iter([OSError, '', OSError, 'drng\n'])
+        self.check_success_after_reset(proc_mock)
 
     def test_arvados_node_cleaned_after_shutdown(self, proc_mock):
         proc_mock.return_value = 'drain\n'
index 705025b07970f720388f4139a6143234e8bd2e12..820772eb5b6a040d15683549b1e617c5d9e718e0 100644 (file)
@@ -18,77 +18,79 @@ import (
 )
 
 func main() {
-       var srcConfigFile, dstConfigFile, srcKeepServicesJSON, dstKeepServicesJSON, prefix string
-       var replications int
-       var srcBlobSigningKey string
+       err := doMain()
+       if err != nil {
+               log.Fatalf("%v", err)
+       }
+}
 
-       flag.StringVar(
-               &srcConfigFile,
+func doMain() error {
+       flags := flag.NewFlagSet("keep-rsync", flag.ExitOnError)
+
+       srcConfigFile := flags.String(
                "src",
                "",
-               "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+               "Source configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf file. This file is expected to specify the values for ARVADOS_API_TOKEN, ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, and ARVADOS_BLOB_SIGNING_KEY for the source.")
 
-       flag.StringVar(
-               &dstConfigFile,
+       dstConfigFile := flags.String(
                "dst",
                "",
-               "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf")
+               "Destination configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf file. This file is expected to specify the values for ARVADOS_API_TOKEN, ARVADOS_API_HOST, and ARVADOS_API_HOST_INSECURE for the destination.")
 
-       flag.StringVar(
-               &srcKeepServicesJSON,
+       srcKeepServicesJSON := flags.String(
                "src-keep-services-json",
                "",
                "An optional list of available source keepservices. "+
                        "If not provided, this list is obtained from api server configured in src-config-file.")
 
-       flag.StringVar(
-               &dstKeepServicesJSON,
+       dstKeepServicesJSON := flags.String(
                "dst-keep-services-json",
                "",
                "An optional list of available destination keepservices. "+
                        "If not provided, this list is obtained from api server configured in dst-config-file.")
 
-       flag.IntVar(
-               &replications,
+       replications := flags.Int(
                "replications",
                0,
                "Number of replications to write to the destination. If replications not specified, "+
                        "default replication level configured on destination server will be used.")
 
-       flag.StringVar(
-               &prefix,
+       prefix := flags.String(
                "prefix",
                "",
                "Index prefix")
 
-       flag.Parse()
+       // Parse args; omit the first arg which is the command name
+       flags.Parse(os.Args[1:])
 
-       srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
+       srcConfig, srcBlobSigningKey, err := loadConfig(*srcConfigFile)
        if err != nil {
-               log.Fatalf("Error loading src configuration from file: %s", err.Error())
+               return fmt.Errorf("Error loading src configuration from file: %s", err.Error())
        }
 
-       dstConfig, _, err := loadConfig(dstConfigFile)
+       dstConfig, _, err := loadConfig(*dstConfigFile)
        if err != nil {
-               log.Fatalf("Error loading dst configuration from file: %s", err.Error())
+               return fmt.Errorf("Error loading dst configuration from file: %s", err.Error())
        }
 
        // setup src and dst keepclients
-       kcSrc, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+       kcSrc, err := setupKeepClient(srcConfig, *srcKeepServicesJSON, false, 0)
        if err != nil {
-               log.Fatalf("Error configuring src keepclient: %s", err.Error())
+               return fmt.Errorf("Error configuring src keepclient: %s", err.Error())
        }
 
-       kcDst, err := setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+       kcDst, err := setupKeepClient(dstConfig, *dstKeepServicesJSON, true, *replications)
        if err != nil {
-               log.Fatalf("Error configuring dst keepclient: %s", err.Error())
+               return fmt.Errorf("Error configuring dst keepclient: %s", err.Error())
        }
 
        // Copy blocks not found in dst from src
-       err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, prefix)
+       err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, *prefix)
        if err != nil {
-               log.Fatalf("Error while syncing data: %s", err.Error())
+               return fmt.Errorf("Error while syncing data: %s", err.Error())
        }
+
+       return nil
 }
 
 type apiConfig struct {
index 299df5aeb0a6317a71677b76ac76a6ef813e9036..e72889038850631bb8205ee1602326d307d729fc 100644 (file)
@@ -4,6 +4,7 @@ import (
        "crypto/md5"
        "fmt"
        "io/ioutil"
+       "log"
        "os"
        "strings"
        "testing"
@@ -23,10 +24,12 @@ func Test(t *testing.T) {
 // Gocheck boilerplate
 var _ = Suite(&ServerRequiredSuite{})
 var _ = Suite(&ServerNotRequiredSuite{})
+var _ = Suite(&DoMainTestSuite{})
 
 // Tests that require the Keep server running
 type ServerRequiredSuite struct{}
 type ServerNotRequiredSuite struct{}
+type DoMainTestSuite struct{}
 
 func (s *ServerRequiredSuite) SetUpSuite(c *C) {
        // Start API server
@@ -38,6 +41,12 @@ func (s *ServerRequiredSuite) TearDownSuite(c *C) {
        arvadostest.ResetEnv()
 }
 
+var initialArgs []string
+
+func (s *DoMainTestSuite) SetUpSuite(c *C) {
+       initialArgs = os.Args
+}
+
 var kcSrc, kcDst *keepclient.KeepClient
 var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
 
@@ -51,7 +60,16 @@ func (s *ServerRequiredSuite) SetUpTest(c *C) {
 }
 
 func (s *ServerRequiredSuite) TearDownTest(c *C) {
-       arvadostest.StopKeepWithParams(3)
+       arvadostest.StopKeep(3)
+}
+
+func (s *DoMainTestSuite) SetUpTest(c *C) {
+       args := []string{"keep-rsync"}
+       os.Args = args
+}
+
+func (s *DoMainTestSuite) TearDownTest(c *C) {
+       os.Args = initialArgs
 }
 
 var testKeepServicesJSON = "{ \"kind\":\"arvados#keepServiceList\", \"etag\":\"\", \"self_link\":\"\", \"offset\":null, \"limit\":null, \"items\":[ { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012340\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012340\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25107, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false }, { \"href\":\"/keep_services/zzzzz-bi6l4-123456789012341\", \"kind\":\"arvados#keepService\", \"etag\":\"641234567890enhj7hzx432e5\", \"uuid\":\"zzzzz-bi6l4-123456789012341\", \"owner_uuid\":\"zzzzz-tpzed-123456789012345\", \"service_host\":\"keep0.zzzzz.arvadosapi.com\", \"service_port\":25108, \"service_ssl_flag\":false, \"service_type\":\"disk\", \"read_only\":false } ], \"items_available\":2 }"
@@ -77,8 +95,7 @@ func setupRsync(c *C, enforcePermissions bool, replications int) {
        }
 
        // Start Keep servers
-       arvadostest.StartAPI()
-       arvadostest.StartKeepWithParams(3, enforcePermissions)
+       arvadostest.StartKeep(3, enforcePermissions)
 
        // setup keepclients
        var err error
@@ -311,7 +328,8 @@ func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeSrcKeepservers(c *C) {
        setupRsync(c, false, 1)
 
        err := performKeepRsync(kcSrc, kcDst, "", "")
-       c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+       log.Printf("Err = %v", err)
+       c.Check(strings.Contains(err.Error(), "no such host"), Equals, true)
 }
 
 // Setup rsync using dstKeepServicesJSON with fake keepservers.
@@ -322,7 +340,8 @@ func (s *ServerRequiredSuite) TestErrorDuringRsync_FakeDstKeepservers(c *C) {
        setupRsync(c, false, 1)
 
        err := performKeepRsync(kcSrc, kcDst, "", "")
-       c.Check(strings.HasSuffix(err.Error(), "no such host"), Equals, true)
+       log.Printf("Err = %v", err)
+       c.Check(strings.Contains(err.Error(), "no such host"), Equals, true)
 }
 
 // Test rsync with signature error during Get from src.
@@ -336,7 +355,7 @@ func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorGettingBlockFromSrc(c *C
        blobSigningKey = "thisisfakeblobsigningkey"
 
        err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
-       c.Check(strings.HasSuffix(err.Error(), "Block not found"), Equals, true)
+       c.Check(strings.Contains(err.Error(), "HTTP 403 \"Forbidden\""), Equals, true)
 }
 
 // Test rsync with error during Put to src.
@@ -350,7 +369,7 @@ func (s *ServerRequiredSuite) TestErrorDuringRsync_ErrorPuttingBlockInDst(c *C)
        kcDst.Want_replicas = 2
 
        err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
-       c.Check(strings.HasSuffix(err.Error(), "Could not write sufficient replicas"), Equals, true)
+       c.Check(strings.Contains(err.Error(), "Could not write sufficient replicas"), Equals, true)
 }
 
 // Test loadConfig func
@@ -369,17 +388,17 @@ func (s *ServerNotRequiredSuite) TestLoadConfig(c *C) {
        srcConfig, srcBlobSigningKey, err := loadConfig(srcConfigFile)
        c.Check(err, IsNil)
 
-       c.Assert(srcConfig.APIHost, Equals, "testhost")
-       c.Assert(srcConfig.APIToken, Equals, "testtoken")
-       c.Assert(srcConfig.APIHostInsecure, Equals, true)
+       c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+       c.Assert(srcConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+       c.Assert(srcConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
        c.Assert(srcConfig.ExternalClient, Equals, false)
 
        dstConfig, _, err := loadConfig(dstConfigFile)
        c.Check(err, IsNil)
 
-       c.Assert(dstConfig.APIHost, Equals, "testhost")
-       c.Assert(dstConfig.APIToken, Equals, "testtoken")
-       c.Assert(dstConfig.APIHostInsecure, Equals, true)
+       c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+       c.Assert(dstConfig.APIToken, Equals, os.Getenv("ARVADOS_API_TOKEN"))
+       c.Assert(dstConfig.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
        c.Assert(dstConfig.ExternalClient, Equals, false)
 
        c.Assert(srcBlobSigningKey, Equals, "abcdefg")
@@ -394,7 +413,7 @@ func (s *ServerNotRequiredSuite) TestLoadConfig_MissingSrcConfig(c *C) {
 // Test loadConfig func - error reading config
 func (s *ServerNotRequiredSuite) TestLoadConfig_ErrorLoadingSrcConfig(c *C) {
        _, _, err := loadConfig("no-such-config-file")
-       c.Assert(strings.HasSuffix(err.Error(), "no such file or directory"), Equals, true)
+       c.Assert(strings.Contains(err.Error(), "no such file or directory"), Equals, true)
 }
 
 func setupConfigFile(c *C, name string) *os.File {
@@ -402,9 +421,10 @@ func setupConfigFile(c *C, name string) *os.File {
        file, err := ioutil.TempFile(os.TempDir(), name)
        c.Check(err, IsNil)
 
-       fileContent := "ARVADOS_API_HOST=testhost\n"
-       fileContent += "ARVADOS_API_TOKEN=testtoken\n"
-       fileContent += "ARVADOS_API_HOST_INSECURE=true\n"
+       fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n"
+       fileContent += "ARVADOS_API_TOKEN=" + os.Getenv("ARVADOS_API_TOKEN") + "\n"
+       fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
+       fileContent += "ARVADOS_EXTERNAL_CLIENT=false\n"
        fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
 
        _, err = file.Write([]byte(fileContent))
@@ -412,3 +432,48 @@ func setupConfigFile(c *C, name string) *os.File {
 
        return file
 }
+
+func (s *DoMainTestSuite) Test_doMain_NoSrcConfig(c *C) {
+       err := doMain()
+       c.Check(err, NotNil)
+       c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMain_SrcButNoDstConfig(c *C) {
+       srcConfig := setupConfigFile(c, "src")
+       args := []string{"-replications", "3", "-src", srcConfig.Name()}
+       os.Args = append(os.Args, args...)
+       err := doMain()
+       c.Check(err, NotNil)
+       c.Assert(err.Error(), Equals, "Error loading dst configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMain_BadSrcConfig(c *C) {
+       args := []string{"-src", "abcd"}
+       os.Args = append(os.Args, args...)
+       err := doMain()
+       c.Check(err, NotNil)
+       c.Assert(strings.HasPrefix(err.Error(), "Error loading src configuration from file: Error reading config file"), Equals, true)
+}
+
+func (s *DoMainTestSuite) Test_doMain_WithReplicationsButNoSrcConfig(c *C) {
+       args := []string{"-replications", "3"}
+       os.Args = append(os.Args, args...)
+       err := doMain()
+       c.Check(err, NotNil)
+       c.Assert(err.Error(), Equals, "Error loading src configuration from file: config file not specified")
+}
+
+func (s *DoMainTestSuite) Test_doMainWithSrcAndDstConfig(c *C) {
+       srcConfig := setupConfigFile(c, "src")
+       dstConfig := setupConfigFile(c, "dst")
+       args := []string{"-src", srcConfig.Name(), "-dst", dstConfig.Name()}
+       os.Args = append(os.Args, args...)
+
+       // Start keepservers. Since we are not doing any tweaking as in setupRsync func,
+       // kcSrc and kcDst will be the same and no actual copying to dst will happen, but that's ok.
+       arvadostest.StartKeep(2, false)
+
+       err := doMain()
+       c.Check(err, IsNil)
+}