Merge branch '8567-moar-docker' refs #8567
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 12 Apr 2017 20:10:13 +0000 (16:10 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 12 Apr 2017 20:10:13 +0000 (16:10 -0400)
98 files changed:
apps/workbench/app/assets/javascripts/edit_collection.js [new file with mode: 0644]
apps/workbench/app/assets/javascripts/selection.js.erb
apps/workbench/app/assets/stylesheets/collections.css.scss
apps/workbench/app/controllers/collections_controller.rb
apps/workbench/app/views/application/_content.html.erb
apps/workbench/app/views/collections/_extra_tab_line_buttons.html.erb [new file with mode: 0644]
apps/workbench/app/views/collections/_show_files.html.erb
apps/workbench/app/views/container_requests/_extra_tab_line_buttons.html.erb
apps/workbench/test/controllers/collections_controller_test.rb
apps/workbench/test/integration/collection_upload_test.rb
apps/workbench/test/integration/collections_test.rb
apps/workbench/test/integration_helper.rb
build/build.list
build/package-test-dockerfiles/ubuntu1204/Dockerfile
build/rails-package-scripts/postinst.sh
build/run-build-packages-sso.sh
build/run-tests.sh
doc/_includes/_navbar_top.liquid
doc/api/methods/container_requests.html.textile.liquid
doc/install/install-nodemanager.html.textile.liquid
doc/user/cwl/cwl-extensions.html.textile.liquid
sdk/cli/arvados-cli.gemspec
sdk/cli/test/test_arv-keep-get.rb
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arv-cwl-schema.yml
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/setup.py
sdk/cwl/test_with_arvbox.sh
sdk/cwl/tests/arvados-tests.sh
sdk/cwl/tests/arvados-tests.yml
sdk/cwl/tests/cat.cwl [new file with mode: 0644]
sdk/cwl/tests/dir-job2.yml [new file with mode: 0644]
sdk/cwl/tests/keep-dir-test-input2.cwl [new file with mode: 0644]
sdk/cwl/tests/keep-dir-test-input3.cwl [new file with mode: 0644]
sdk/cwl/tests/octo.yml [new file with mode: 0644]
sdk/cwl/tests/octothorpe/item #1.txt [new file with mode: 0644]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_pathmapper.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/tmp1/tmp2/tmp3/.gitkeep [new file with mode: 0644]
sdk/cwl/tests/wf/listing_deep.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/listing_none.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/listing_shallow.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/scatter2.cwl
sdk/cwl/tests/wf/scatter2_subwf.cwl
sdk/go/keepclient/support.go
sdk/python/arvados/__init__.py
sdk/python/arvados/api.py
sdk/python/arvados/commands/get.py [new file with mode: 0755]
sdk/python/arvados/commands/ls.py
sdk/python/arvados/commands/put.py
sdk/python/arvados/keep.py
sdk/python/bin/arv-get
sdk/python/setup.py
sdk/python/tests/test_arv_get.py [new file with mode: 0644]
sdk/python/tests/test_arv_ls.py
sdk/python/tests/test_cache.py
sdk/ruby/arvados.gemspec
services/api/app/controllers/application_controller.rb
services/api/app/controllers/arvados/v1/nodes_controller.rb
services/api/app/models/arvados_model.rb
services/api/app/models/collection.rb
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/app/models/node.rb
services/api/db/migrate/20170330012505_add_output_ttl_to_container_requests.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/functional/arvados/v1/nodes_controller_test.rb
services/api/test/functional/arvados/v1/users_controller_test.rb
services/api/test/unit/collection_test.rb
services/api/test/unit/container_request_test.rb
services/api/test/unit/container_test.rb
services/api/test/unit/node_test.rb
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/fuse/arvados_fuse/command.py
services/fuse/arvados_fuse/unmount.py
services/fuse/tests/test_crunchstat.py [new file with mode: 0644]
services/fuse/tests/test_unmount.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/status.py [new file with mode: 0644]
services/nodemanager/doc/azure.example.cfg
services/nodemanager/doc/ec2.example.cfg
services/nodemanager/doc/gce.example.cfg
services/nodemanager/doc/local.example.cfg
services/nodemanager/setup.py
services/nodemanager/tests/test_computenode_driver_gce.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py
services/nodemanager/tests/test_status.py [new file with mode: 0644]

diff --git a/apps/workbench/app/assets/javascripts/edit_collection.js b/apps/workbench/app/assets/javascripts/edit_collection.js
new file mode 100644 (file)
index 0000000..7da3e95
--- /dev/null
@@ -0,0 +1,33 @@
+// On loading of a collection, enable the "lock" button and
+// disable all file modification controls (upload, rename, delete)
+$(document).
+    ready(function(event) {
+        $(".btn-collection-file-control").addClass("disabled");
+        $(".tab-pane-Upload").addClass("disabled");
+        $("#Upload-tab").attr("data-toggle", "disabled");
+    }).
+    on('click', '.lock-collection-btn', function(event) {
+        classes = $(event.target).attr('class')
+
+        if (classes.indexOf("fa-lock") != -1) {
+            // About to unlock; warn and get confirmation from user
+            if (confirm("Adding, renaming, and deleting files changes the portable data hash. Are you sure you want to unlock the collection?")) {
+                $(".lock-collection-btn").removeClass("fa-lock");
+                $(".lock-collection-btn").addClass("fa-unlock");
+                $(".lock-collection-btn").attr("title", "Lock collection to prevent editing files");
+                $(".btn-collection-file-control").removeClass("disabled");
+                $(".tab-pane-Upload").removeClass("disabled");
+                $("#Upload-tab").attr("data-toggle", "tab");
+            } else {
+                // User clicked "no" and so do not unlock
+            }
+        } else {
+            // Lock it back
+            $(".lock-collection-btn").removeClass("fa-unlock");
+            $(".lock-collection-btn").addClass("fa-lock");
+            $(".lock-collection-btn").attr("title", "Unlock collection to edit files");
+            $(".btn-collection-file-control").addClass("disabled");
+            $(".tab-pane-Upload").addClass("disabled");
+            $("#Upload-tab").attr("data-toggle", "disabled");
+        }
+    });
index 5c69c50c119b5dd62c930b3b2144c109812d6871..f60bef7ddb432cda137c935f395b1a46abe997ad 100644 (file)
@@ -53,6 +53,8 @@ function dispatch_selection_action() {
 function enable_disable_selection_actions() {
     var $container = $(this);
     var $checked = $('.persistent-selection:checkbox:checked', $container);
+    var collection_lock_classes = $('.lock-collection-btn').attr('class')
+
     $('[data-selection-action]', $container).
         closest('div.btn-group-sm').
         find('ul li').
@@ -74,6 +76,11 @@ function enable_disable_selection_actions() {
         toggleClass('disabled',
                     ($checked.filter('[value*=-4zz18-]').length < 1) ||
                     ($checked.length != $checked.filter('[value*=-4zz18-]').length));
+    $('[data-selection-action=remove-selected-files]', $container).
+        closest('li').
+        toggleClass('disabled',
+                    ($checked.length < 0) ||
+                    !($checked.length > 0 && collection_lock_classes && collection_lock_classes.indexOf("fa-unlock") !=-1));
 }
 
 $(document).
index 35c2249ecf0540cea0eda2b4516fd4a58ebc38c7..2d2d0f25d12b462aeeed99f48646f53a4ee9a06d 100644 (file)
@@ -64,3 +64,9 @@ $active-bg: #39b3d7;
 .btn-group.toggle-persist .btn-info.active {
     background-color: $active-bg;
 }
+
+.lock-collection-btn {
+    display: inline-block;
+    padding: .5em 2em;
+    margin: 0 1em;
+}
index 02b204f18cbec2dc0d2e0a03bfd17c6ec042c8b6..dc9ed43c409b64a2838ad74d3e165609c49e1e62 100644 (file)
@@ -326,12 +326,18 @@ class CollectionsController < ApplicationController
       end
 
       arv_coll = Arv::Collection.new(@object.manifest_text)
-      arv_coll.rename "./"+file_path, new_file_path
 
-      if @object.update_attributes manifest_text: arv_coll.manifest_text
-        show
-      else
+      if arv_coll.exist?(new_file_path)
+        @errors = 'Duplicate file path. Please use a different name.'
         self.render_error status: 422
+      else
+        arv_coll.rename "./"+file_path, new_file_path
+
+        if @object.update_attributes manifest_text: arv_coll.manifest_text
+          show
+        else
+          self.render_error status: 422
+        end
       end
     else
       # Not a file rename; use default
index 9441a46c26d067f423188db099d240087f29a191..a22608d3c9e93f06e72314ab66d83f37964c8cb0 100644 (file)
@@ -29,7 +29,7 @@
         end
       %>
 
-      <li class="<%= 'active' if i==0 %> <%= link_disabled %>" data-toggle="tooltip" data-placement="top" title="<%=tab_tooltip%>">
+      <li class="<%= 'active' if i==0 %> <%= link_disabled %> tab-pane-<%=pane_name%>" data-toggle="tooltip" data-placement="top" title="<%=tab_tooltip%>">
         <a href="#<%= pane_name %>"
            id="<%= pane_name %>-tab"
            data-toggle="<%= data_toggle %>"
diff --git a/apps/workbench/app/views/collections/_extra_tab_line_buttons.html.erb b/apps/workbench/app/views/collections/_extra_tab_line_buttons.html.erb
new file mode 100644 (file)
index 0000000..fe62f6d
--- /dev/null
@@ -0,0 +1,3 @@
+<% if @object.editable? %>
+  <i class="fa fa-fw fa-lock lock-collection-btn btn btn-primary" title="Unlock collection to edit files"></i>
+<% end %>
index d39c81b2b16d2b9b8331226695f8df97f0ac0cec..58695ecf373ff0213dd9ab52cb6c522d6d3bc4e6 100644 (file)
             <% end %>
 
             <% if object.editable? %>
-                <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate', title: "Remove #{file_path}") do %>
+                <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate btn-collection-file-control', title: "Remove #{file_path}") do %>
                   <i class="fa fa-fw fa-trash-o"></i>
                 <% end %>
             <% end %>
         <% if CollectionsHelper::is_image(filename) %>
             <i class="fa fa-fw fa-bar-chart-o"></i>
               <% if object.editable? %>
-                <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'} %>
+                <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit name or directory or both for this file'} %>
               <% else %>
                 <%= filename %>
               <% end %>
          </div>
         <% else %>
               <% if object.editable? %>
-                <i class="fa fa-fw fa-file"></i><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'}  %>
+                <i class="fa fa-fw fa-file"></i><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit name or directory or both for this file', btnclass: 'collection-file-control'}  %>
               <% else %>
                 <i class="fa fa-fw fa-file" href="<%=object.uuid%>/<%=file_path%>" ></i> <%= filename %>
               <% end %>
index 662309ffe8e11df264b31f769b43f8612edacf70..fd7953551729d722b8cff678feb73baadc81f7d2 100644 (file)
@@ -1,6 +1,6 @@
 <% if @object.state == 'Final' %>
   <%= link_to(copy_container_request_path('id' => @object.uuid),
-      class: 'btn btn-primary',
+      class: 'btn btn-sm btn-primary',
       title: 'Re-run',
       data: {toggle: :tooltip, placement: :top}, title: 'This will make a copy and take you there. You can then make any needed changes and run it',
       method: :post,
index b4d0f4ae09bec870985f5895a8522faf35766c0e..1632dd0653b11d82d9a97e245bc9dd55db5ccc1d 100644 (file)
@@ -732,4 +732,34 @@ class CollectionsControllerTest < ActionController::TestCase
     collection = Collection.select([:uuid, :manifest_text]).where(uuid: collection['uuid']).first
     assert_match /. d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:file1renamed\n.\/dir1 d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:dir1file2 0:0:file2\n.\/dir2\/dir3 d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:dir1file1moved\n$/, collection['manifest_text']
   end
+
+  test "renaming file with a duplicate name in same stream not allowed" do
+    use_token :active
+
+    # rename 'file2' as 'file1' and expect error
+    post :update, {
+      id: 'zzzzz-4zz18-pyw8yp9g3pr7irn',
+      collection: {
+        'rename-file-path:file2' => 'file1'
+      },
+      format: :json
+    }, session_for(:active)
+    assert_response 422
+    assert_includes json_response['errors'], 'Duplicate file path'
+  end
+
+  test "renaming file with a duplicate name as another stream not allowed" do
+    use_token :active
+
+    # rename 'file1' as 'dir1/file1' and expect error
+    post :update, {
+      id: 'zzzzz-4zz18-pyw8yp9g3pr7irn',
+      collection: {
+        'rename-file-path:file1' => 'dir1/file1'
+      },
+      format: :json
+    }, session_for(:active)
+    assert_response 422
+    assert_includes json_response['errors'], 'Duplicate file path'
+  end
 end
index 903df90fb419c7ba231ae3c42d679abc85af41ed..552a9cd5e8fc53af4a484fbbd9b84798fc86f59a 100644 (file)
@@ -41,6 +41,9 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
   test "Upload two empty files with the same name" do
     need_selenium "to make file uploads work"
     visit page_with_token 'active', sandbox_path
+
+    unlock_collection
+
     find('.nav-tabs a', text: 'Upload').click
     attach_file 'file_selector', testfile_path('empty.txt')
     assert_selector 'div', text: 'empty.txt'
@@ -55,6 +58,9 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
   test "Upload non-empty files" do
     need_selenium "to make file uploads work"
     visit page_with_token 'active', sandbox_path
+
+    unlock_collection
+
     find('.nav-tabs a', text: 'Upload').click
     attach_file 'file_selector', testfile_path('a')
     attach_file 'file_selector', testfile_path('foo.txt')
@@ -93,6 +99,9 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
                           service_port: 99999)
     end
     visit page_with_token 'active', sandbox_path
+
+    unlock_collection
+
     find('.nav-tabs a', text: 'Upload').click
     attach_file 'file_selector', testfile_path('foo.txt')
     assert_selector 'button:not([disabled])', text: 'Start'
@@ -128,4 +137,9 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
     # Must be an absolute path. https://github.com/jnicklas/capybara/issues/621
     File.join Dir.getwd, 'tmp', filename
   end
+
+  def unlock_collection
+    first('.lock-collection-btn').click
+    accept_alert
+  end
 end
index eb9c2e831a8866d7a25f3f89c9c1eb04ef00a003..20a403d4b221cf50ecbe52a5e19d7014a06d939c 100644 (file)
@@ -300,9 +300,13 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   end
 
   test "remove a file from collection using checkbox and dropdown option" do
+    need_selenium 'to confirm unlock'
+
     visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
     assert(page.has_text?('file1'), 'file not found - file1')
 
+    unlock_collection
+
     # remove first file
     input_files = page.all('input[type=checkbox]')
     input_files[0].click
@@ -317,21 +321,27 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   end
 
   test "remove a file in collection using trash icon" do
-    need_selenium 'to confirm remove'
+    need_selenium 'to confirm unlock'
 
     visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
     assert(page.has_text?('file1'), 'file not found - file1')
 
+    unlock_collection
+
     first('.fa-trash-o').click
-    page.driver.browser.switch_to.alert.accept
+    accept_alert
 
     assert(page.has_no_text?('file1'), 'file found - file')
     assert(page.has_text?('file2'), 'file not found - file2')
   end
 
   test "rename a file in collection" do
+    need_selenium 'to confirm unlock'
+
     visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
 
+    unlock_collection
+
     within('.collection_files') do
       first('.fa-pencil').click
       find('.editable-input input').set('file1renamed')
@@ -357,4 +367,56 @@ class CollectionsTest < ActionDispatch::IntegrationTest
       assert_nil first('.fa-trash-o')
     end
   end
+
+  test "unlock collection to modify files" do
+    need_selenium 'to confirm remove'
+
+    collection = api_fixture('collections')['collection_owned_by_active']
+
+    # On load, collection is locked, and upload tab, rename and remove options are disabled
+    visit page_with_token('active', "/collections/#{collection['uuid']}")
+
+    assert_selector 'a[data-toggle="disabled"]', text: 'Upload'
+
+    within('.collection_files') do
+      file_ctrls = page.all('.btn-collection-file-control')
+      assert_equal 2, file_ctrls.size
+      assert_equal true, file_ctrls[0]['class'].include?('disabled')
+      assert_equal true, file_ctrls[1]['class'].include?('disabled')
+      find('input[type=checkbox]').click
+    end
+
+    click_button 'Selection'
+    within('.selection-action-container') do
+      assert_selector 'li.disabled', text: 'Remove selected files'
+      assert_selector 'li', text: 'Create new collection with selected files'
+    end
+
+    unlock_collection
+
+    assert_no_selector 'a[data-toggle="disabled"]', text: 'Upload'
+    assert_selector 'a', text: 'Upload'
+
+    within('.collection_files') do
+      file_ctrls = page.all('.btn-collection-file-control')
+      assert_equal 2, file_ctrls.size
+      assert_equal false, file_ctrls[0]['class'].include?('disabled')
+      assert_equal false, file_ctrls[1]['class'].include?('disabled')
+      # previous checkbox selection won't result in firing a new event;
+      # undo and redo checkbox to fire the selection event again
+      find('input[type=checkbox]').click
+      find('input[type=checkbox]').click
+    end
+
+    click_button 'Selection'
+    within('.selection-action-container') do
+      assert_no_selector 'li.disabled', text: 'Remove selected files'
+      assert_selector 'li', text: 'Remove selected files'
+    end
+  end
+
+  def unlock_collection
+    first('.lock-collection-btn').click
+    accept_alert
+  end
 end
index 3d92585135ff3db934c78cc6a092bf74a8b39054..067a1bdae845f98114df7a1c9cbfacc81966db45 100644 (file)
@@ -221,4 +221,19 @@ class ActionDispatch::IntegrationTest
     end
     Capybara.reset_sessions!
   end
+
+  def accept_alert
+    if Capybara.current_driver == :selenium
+      (0..9).each do
+        begin
+          page.driver.browser.switch_to.alert.accept
+          break
+        rescue Selenium::WebDriver::Error::NoSuchAlertError
+         sleep 0.1
+        end
+      end
+    else
+      # poltergeist returns true for confirm, so no need to accept
+    end
+  end
 end
index 98f6a3b773852061659a58c255907bf7cd413c20..bb662dbbd52ad97aecf092af0aa8cbf973c69859 100644 (file)
@@ -1,7 +1,7 @@
 #distribution(s)|name|version|iteration|type|architecture|extra fpm arguments
 debian8,ubuntu1204,centos7|python-gflags|2.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.4.2|2|python|all
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|oauth2client|1.5.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.6.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,centos7|oauth2client|1.5.2|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|pyasn1|0.1.7|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|pyasn1-modules|0.0.5|2|python|all
 debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rsa|3.4.2|2|python|all
@@ -14,7 +14,7 @@ debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|ciso8601|1.0.3|3|python|amd64
 debian8,ubuntu1204,centos7|pycrypto|2.6.1|3|python|amd64
 debian8,ubuntu1204,ubuntu1404,ubuntu1604|backports.ssl_match_hostname|3.5.0.1|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|llfuse|0.41.1|3|python|amd64
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pycurl|7.19.5.3|3|python|amd64
+debian8,ubuntu1204,ubuntu1404,centos7|pycurl|7.19.5.3|3|python|amd64
 debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pyyaml|3.12|2|python|amd64
 debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rdflib|4.2.2|2|python|all
 debian8,ubuntu1204,ubuntu1404,centos7|shellescape|3.4.1|2|python|all
@@ -40,3 +40,5 @@ all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-versi
 all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
 all|rdflib-jsonld|0.4.0|2|python|all
 all|futures|3.0.5|2|python|all
+all|future|0.16.0|2|python|all
+all|future|0.16.0|2|python3|all
index f6223715e1e43793fd71906856fc02aa1f661533..68a4a16fcb6e7550661e22155ffb14f1dbd65910 100644 (file)
@@ -3,7 +3,7 @@ MAINTAINER Peter Amstutz <peter.amstutz@curoverse.com>
 
 # Install RVM
 RUN apt-get update && \
-    DEBIAN_FRONTEND=noninteractive apt-get -y install --no-install-recommends curl ca-certificates && \
+    DEBIAN_FRONTEND=noninteractive apt-get -y install --no-install-recommends curl ca-certificates g++ && \
     gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
     curl -L https://get.rvm.io | bash -s stable && \
     /usr/local/rvm/bin/rvm install 2.3 && \
index e019170d71d24a6323e3b4eaee856c30cebb5768..915958ce97b547d74832e59a9ef6d154e8551ab0 100644 (file)
@@ -210,14 +210,14 @@ configure_version() {
   chown "$WWW_OWNER:" $RELEASE_PATH/config/environment.rb
   chown "$WWW_OWNER:" $RELEASE_PATH/config.ru
   chown "$WWW_OWNER:" $RELEASE_PATH/Gemfile.lock
-  chown -R "$WWW_OWNER:" $RELEASE_PATH/tmp
+  chown -R "$WWW_OWNER:" $RELEASE_PATH/tmp || true
   chown -R "$WWW_OWNER:" $SHARED_PATH/log
   case "$RAILSPKG_DATABASE_LOAD_TASK" in
       db:schema:load) chown "$WWW_OWNER:" $RELEASE_PATH/db/schema.rb ;;
       db:structure:load) chown "$WWW_OWNER:" $RELEASE_PATH/db/structure.sql ;;
   esac
   chmod 644 $SHARED_PATH/log/*
-  chmod -R 2775 $RELEASE_PATH/tmp
+  chmod -R 2775 $RELEASE_PATH/tmp || true
   echo "... done."
 
   if [ -n "$RAILSPKG_DATABASE_LOAD_TASK" ]; then
index 264f27d12b0202a9267a548a73e684460f8f5aa3..053a6dfb30de1782b81ec0a545fcd1844c1b4673 100755 (executable)
@@ -77,6 +77,9 @@ case "$TARGET" in
     ubuntu1404)
         FORMAT=deb
         ;;
+    ubuntu1604)
+        FORMAT=deb
+        ;;
     centos7)
         FORMAT=rpm
         ;;
index 44f9a303e73f02a0854eb5892648741506da5bfe..b0897224dc27c1f22e3c6a7352285ac009a9e454 100755 (executable)
@@ -178,6 +178,9 @@ sanity_checks() {
     echo -n 'fuse.h: '
     find /usr/include -wholename '*fuse/fuse.h' \
         || fatal "No fuse/fuse.h. Try: apt-get install libfuse-dev"
+    echo -n 'gnutls.h: '
+    find /usr/include -wholename '*gnutls/gnutls.h' \
+        || fatal "No gnutls/gnutls.h. Try: apt-get install libgnutls28-dev"
     echo -n 'pyconfig.h: '
     find /usr/include -name pyconfig.h | egrep --max-count=1 . \
         || fatal "No pyconfig.h. Try: apt-get install python-dev"
index 6caf36a18882115027c288717a74146b8281dd49..4fd1edefe455155fcc11b2b094c7b7a6a4fcb3bd 100644 (file)
@@ -20,7 +20,7 @@
       </ul>
 
       <div class="pull-right" style="padding-top: 6px">
-        <form method="get" action="http://www.google.com/search">
+        <form method="get" action="https://www.google.com/search">
           <div class="input-group" style="width: 220px">
             <input type="text" class="form-control" name="q" placeholder="search">
             <div class="input-group-addon">
index 3809b2f1eb65c254ae400b7d661ac0869ce2d833..75bf3d1ccca0c6845afc9f2d81deac5736710a0d 100644 (file)
@@ -43,6 +43,8 @@ table(table table-bordered table-condensed).
 |cwd|string|Initial working directory, given as an absolute path (in the container) or a path relative to the WORKDIR given in the image's Dockerfile.|Required.|
 |command|array of strings|Command to execute in the container.|Required. e.g., @["echo","hello"]@|
 |output_path|string|Path to a directory or file inside the container that should be preserved as container's output when it finishes. This path must be, or be inside, one of the mount targets. For best performance, point output_path to a writable collection mount. Also, see "Pre-populate output using Mount points":#pre-populate-output for details regarding optional output pre-population using mount points.|Required.|
+|output_name|string|Desired name for the output collection. If null, a name will be assigned automatically.||
+|output_ttl|integer|Desired lifetime for the output collection, in seconds. If zero, the output collection will not be deleted automatically.||
 |priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to preview the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
 |expires_at|datetime|After this time, priority is considered to be zero.|Not yet implemented.|
 |use_existing|boolean|If possible, use an existing (non-failed) container to satisfy the request instead of creating a new one.|Default is true|
index baf7c2fc7c8e0c1b408d85e43cd36d3102147ac6..0cad10c5a92229a02a7263e0ae8e36fdcac7adca 100644 (file)
@@ -48,6 +48,16 @@ h3(#aws). Amazon Web Services
 # EC2 configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
@@ -213,6 +223,16 @@ h3(#gcp). Google Cloud Platform
 # Google Compute Engine configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # Node Manager will ensure that there are at least this many nodes running at
 # all times.  If node manager needs to start new idle nodes for the purpose of
@@ -380,6 +400,16 @@ h3(#azure). Microsoft Azure
 # Azure configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index 8a7e64baa84b86bda85f0dbc5595a8e6d2dda3b9..8aef8ac0c4a18efef44f06f5c4dbb5fe40a98710 100644 (file)
@@ -11,6 +11,7 @@ To use Arvados CWL extensions, add the following @$namespaces@ section at the to
 <pre>
 $namespaces:
   arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
 </pre>
 
 Arvados extensions must go into the @hints@ section, for example:
@@ -24,6 +25,8 @@ hints:
   arv:PartitionRequirement:
     partition: dev_partition
   arv:APIRequirement: {}
+  cwltool:LoadListingRequirement:
+    loadListing: shallow_listing
 </pre>
 
 h2. arv:RunInSingleContainer
@@ -54,3 +57,17 @@ table(table table-bordered table-condensed).
 h2. arv:APIRequirement
 
 Indicates that process wants to access to the Arvados API.  Will be granted limited network access and have @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ set in the environment.
+
+h2. cwltool:LoadListingRequirement
+
+In CWL v1.0 documents, the default behavior for Directory objects is to recursively expand the @listing@ for access by parameter references an expressions.  For directory trees containing many files, this can be expensive in both time and memory usage.  Use @cwltool:LoadListingRequirement@ to change the behavior for expansion of directory listings in the workflow runner.
+
+table(table table-bordered table-condensed).
+|_. Field |_. Type |_. Description |
+|loadListing|string|One of @no_listing@, @shallow_listing@, or @deep_listing@|
+
+*no_listing*: Do not expand directory listing at all.  The @listing@ field on the Directory object will be undefined.
+
+*shallow_listing*: Only expand the first level of directory listing.  The @listing@ field on the toplevel Directory object will contain the directory contents, however @listing@ will not be defined on subdirectories.
+
+*deep_listing*: Recursively expand all levels of directory listing.  The @listing@ field will be provided on the toplevel object and all subdirectories.
index 651ebf20b0e96f13b4b32bb1b55855f0ac1076a9..06b5559102be9e02a69ddf8d859f0670b342d54b 100644 (file)
@@ -29,7 +29,7 @@ Gem::Specification.new do |s|
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
   s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
   s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
-  s.add_runtime_dependency 'json', '~> 1.7', '>= 1.7.7'
+  s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
   s.add_runtime_dependency 'trollop', '~> 2.0'
   s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
   s.add_runtime_dependency 'oj', '~> 2.0', '>= 2.0.3'
index 04f454369cc6541be477fe3f585c637f45c7aee9..b1f6bdf857a0fcda9b4f44d4db4778863093067d 100644 (file)
@@ -140,7 +140,7 @@ class TestArvKeepGet < Minitest::Test
       assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
     end
     assert_equal '', out
-    assert_match /Error:/, err
+    assert_match /ERROR:/, err
   end
 
   def test_nonexistent_manifest
@@ -148,7 +148,7 @@ class TestArvKeepGet < Minitest::Test
       assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
     end
     assert_equal '', out
-    assert_match /Error:/, err
+    assert_match /ERROR:/, err
   end
 
   def test_manifest_root_to_dir
index 3b14701a9b9480cf6119e6d0e81e478e6da81214..7c24310bdb7dcd79369c4a548495b936fe280957 100644 (file)
@@ -38,8 +38,8 @@ from .pathmapper import NoFollowPathMapper
 from ._version import __version__
 
 from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, getListing
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
@@ -203,12 +203,6 @@ class ArvCwlRunner(object):
         if isinstance(obj, dict):
             if obj.get("writable"):
                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
-            if obj.get("class") == "CommandLineTool":
-                if self.work_api == "containers":
-                    if obj.get("stdin"):
-                        raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
-                    if obj.get("stderr"):
-                        raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
             if obj.get("class") == "DockerRequirement":
                 if obj.get("dockerOutputDirectory"):
                     # TODO: can be supported by containers API, but not jobs API.
@@ -521,7 +515,7 @@ class ArvCwlRunner(object):
             self.set_crunch_output()
 
         if kwargs.get("compute_checksum"):
-            adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
+            adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return (self.final_output, self.final_status)
@@ -629,17 +623,19 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     return parser
 
 def add_arv_hints():
-    cache = {}
     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
-    cache["http://arvados.org/cwl"] = res.read()
+    use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
-    document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
-    _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
-    for n in extnames.names:
-        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
-            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
-        document_loader.idx["http://arvados.org/cwl#"+n] = {}
+    cwltool.process.supportedProcessRequirements.extend([
+        "http://arvados.org/cwl#RunInSingleContainer",
+        "http://arvados.org/cwl#OutputDirType",
+        "http://arvados.org/cwl#RuntimeConstraints",
+        "http://arvados.org/cwl#PartitionRequirement",
+        "http://arvados.org/cwl#APIRequirement",
+        "http://commonwl.org/cwltool#LoadListingRequirement"
+    ])
+
 
 def main(args, stdout, stderr, api_client=None, keep_client=None):
     parser = arg_parser()
@@ -721,4 +717,5 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                                                          keep_client=keep_client,
                                                          num_retries=runner.num_retries),
                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
-                             logger_handler=arvados.log_handler)
+                             logger_handler=arvados.log_handler,
+                             custom_schema_callback=add_arv_hints)
index 3a6eb4769779ab0ea2b88aa8c96ed776194863b7..4e48216f32aefb962df7c93ea69769df9c9762a7 100644 (file)
@@ -1,7 +1,32 @@
 $base: "http://arvados.org/cwl#"
+$namespaces:
+  cwl: "https://w3id.org/cwl/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
 $graph:
+- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
+
+- name: cwltool:LoadListingRequirement
+  type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
+  fields:
+    class:
+      type: string
+      doc: "Always 'LoadListingRequirement'"
+      jsonldPredicate:
+        "_id": "@type"
+        "_type": "@vocab"
+    loadListing:
+      type:
+        - "null"
+        - type: enum
+          name: LoadListingEnum
+          symbols: [no_listing, shallow_listing, deep_listing]
+
 - name: RunInSingleContainer
   type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
   doc: |
     Indicates that a subworkflow should run in a single container
     and not be scheduled as separate steps.
@@ -38,6 +63,8 @@ $graph:
 
 - name: RuntimeConstraints
   type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
   doc: |
     Set Arvados-specific runtime hints.
   fields:
@@ -62,6 +89,8 @@ $graph:
 
 - name: PartitionRequirement
   type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
   doc: |
     Select preferred compute partitions on which to run jobs.
   fields:
@@ -72,6 +101,8 @@ $graph:
 
 - name: APIRequirement
   type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
   doc: |
     Indicates that process wants to access to the Arvados API.  Will be granted
     limited network access and have ARVADOS_API_HOST and ARVADOS_API_TOKEN set
index 3090936121daf32d9ffa61744d1f5b932b73590a..9e76cf711ecaaf81c6e5e42131ea3d717da00c01 100644 (file)
@@ -1,6 +1,7 @@
 import logging
 import json
 import os
+import urllib
 
 import ruamel.yaml as yaml
 
@@ -53,7 +54,7 @@ class ArvadosContainer(object):
 
         dirs = set()
         for f in self.pathmapper.files():
-            pdh, p, tp = self.pathmapper.mapper(f)
+            pdh, p, tp, stg = self.pathmapper.mapper(f)
             if tp == "Directory" and '/' not in pdh:
                 mounts[p] = {
                     "kind": "collection",
@@ -62,7 +63,7 @@ class ArvadosContainer(object):
                 dirs.add(pdh)
 
         for f in self.pathmapper.files():
-            res, p, tp = self.pathmapper.mapper(f)
+            res, p, tp, stg = self.pathmapper.mapper(f)
             if res.startswith("keep:"):
                 res = res[5:]
             elif res.startswith("/keep/"):
@@ -77,7 +78,7 @@ class ArvadosContainer(object):
                     "portable_data_hash": pdh
                 }
                 if len(sp) == 2:
-                    mounts[p]["path"] = sp[1]
+                    mounts[p]["path"] = urllib.unquote(sp[1])
 
         with Perf(metrics, "generatefiles %s" % self.name):
             if self.generatefiles["listing"]:
@@ -114,10 +115,14 @@ class ArvadosContainer(object):
             container_request["environment"].update(self.environment)
 
         if self.stdin:
-            raise UnsupportedRequirement("Stdin redirection currently not suppported")
+            sp = self.stdin[6:].split("/", 1)
+            mounts["stdin"] = {"kind": "collection",
+                                "portable_data_hash": sp[0],
+                                "path": sp[1]}
 
         if self.stderr:
-            raise UnsupportedRequirement("Stderr redirection currently not suppported")
+            mounts["stderr"] = {"kind": "file",
+                                "path": "%s/%s" % (self.outdir, self.stderr)}
 
         if self.stdout:
             mounts["stdout"] = {"kind": "file",
index f33619391d89ef2583d406a6ed25a22cb87f5159..8073620be15f7e653bd382c21aad325f791aac2c 100644 (file)
@@ -19,7 +19,7 @@ import re
 import functools
 
 from arvados.api import OrderedJsonModel
-from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
+from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, normalizeFilesDirs
 from cwltool.load_tool import load_tool
 from cwltool.errors import WorkflowException
 
@@ -64,7 +64,6 @@ def run():
         adjustFileObjs(job_order_object, keeppathObj)
         adjustDirObjs(job_order_object, keeppathObj)
         normalizeFilesDirs(job_order_object)
-        adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
 
         output_name = None
         output_tags = None
index 70aa69f669be6d6e8c0a7210ea3da0808af3e6bc..3a3d16073833a6367876b5833b54cbb8f35584e7 100644 (file)
@@ -80,7 +80,10 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
     def exists(self, fn):
         collection, rest = self.get_collection(fn)
         if collection:
-            return collection.exists(rest)
+            if rest:
+                return collection.exists(rest)
+            else:
+                return True
         else:
             return super(CollectionFsAccess, self).exists(fn)
 
index 1f6aa577c180c65a9f8a31b0155c863f17e59b67..cddb4088b7bfcbbdb211d6785ba650e8ea36901e 100644 (file)
@@ -2,6 +2,7 @@ import re
 import logging
 import uuid
 import os
+import urllib
 
 import arvados.commands.run
 import arvados.collection
@@ -17,7 +18,7 @@ class ArvPathMapper(PathMapper):
     """Convert container-local paths to and from Keep collection ids."""
 
     pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
-    pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.+)?$')
+    pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
 
     def __init__(self, arvrunner, referenced_files, input_basedir,
                  collection_pattern, file_pattern, name=None, **kwargs):
@@ -30,36 +31,39 @@ class ArvPathMapper(PathMapper):
 
     def visit(self, srcobj, uploadfiles):
         src = srcobj["location"]
-        if srcobj["class"] == "File":
-            if "#" in src:
-                src = src[:src.index("#")]
-            if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
-                self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
-            if src not in self._pathmap:
+        if "#" in src:
+            src = src[:src.index("#")]
+
+        if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+            self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
+
+        if src not in self._pathmap:
+            if src.startswith("file:"):
                 # Local FS ref, may need to be uploaded or may be on keep
                 # mount.
                 ab = abspath(src, self.input_basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s")
+                st = arvados.commands.run.statfile("", ab,
+                                                   fnPattern="keep:%s/%s",
+                                                   dirPattern="keep:%s/%s")
                 with SourceLine(srcobj, "location", WorkflowException):
                     if isinstance(st, arvados.commands.run.UploadFile):
                         uploadfiles.add((src, ab, st))
                     elif isinstance(st, arvados.commands.run.ArvFile):
-                        self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
-                    elif src.startswith("_:"):
-                        if "contents" in srcobj:
-                            pass
-                        else:
-                            raise WorkflowException("File literal '%s' is missing contents" % src)
-                    elif src.startswith("arvwf:"):
-                        self._pathmap[src] = MapperEnt(src, src, "File")
+                        self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
                     else:
                         raise WorkflowException("Input file path '%s' is invalid" % st)
-            if "secondaryFiles" in srcobj:
-                for l in srcobj["secondaryFiles"]:
-                    self.visit(l, uploadfiles)
-        elif srcobj["class"] == "Directory":
-            if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
-                self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "Directory")
+            elif src.startswith("_:"):
+                if srcobj["class"] == "File" and "contents" not in srcobj:
+                    raise WorkflowException("File literal '%s' is missing `contents`" % src)
+                if srcobj["class"] == "Directory" and "listing" not in srcobj:
+                    raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+            else:
+                self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+
+        with SourceLine(srcobj, "secondaryFiles", WorkflowException):
+            for l in srcobj.get("secondaryFiles", []):
+                self.visit(l, uploadfiles)
+        with SourceLine(srcobj, "listing", WorkflowException):
             for l in srcobj.get("listing", []):
                 self.visit(l, uploadfiles)
 
@@ -72,7 +76,7 @@ class ArvPathMapper(PathMapper):
             for l in obj.get("secondaryFiles", []):
                 self.addentry(l, c, path, subdirs)
         elif obj["class"] == "Directory":
-            for l in obj["listing"]:
+            for l in obj.get("listing", []):
                 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
             subdirs.append((obj["location"], path + "/" + obj["basename"]))
         elif obj["location"].startswith("_:") and "contents" in obj:
@@ -90,7 +94,7 @@ class ArvPathMapper(PathMapper):
             loc = k["location"]
             if loc in already_uploaded:
                 v = already_uploaded[loc]
-                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File", True)
 
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
@@ -105,17 +109,18 @@ class ArvPathMapper(PathMapper):
                                              project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
+            self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+                                           "Directory" if os.path.isdir(ab) else "File", True)
             self.arvrunner.add_uploaded(src, self._pathmap[src])
 
         for srcobj in referenced_files:
+            subdirs = []
             if srcobj["class"] == "Directory":
                 if srcobj["location"] not in self._pathmap:
                     c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                       keep_client=self.arvrunner.keep_client,
                                                       num_retries=self.arvrunner.num_retries)
-                    subdirs = []
-                    for l in srcobj["listing"]:
+                    for l in srcobj.get("listing", []):
                         self.addentry(l, c, ".", subdirs)
 
                     check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
@@ -123,17 +128,13 @@ class ArvPathMapper(PathMapper):
                         c.save_new(owner_uuid=self.arvrunner.project_uuid)
 
                     ab = self.collection_pattern % c.portable_data_hash()
-                    self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "Directory")
-                    for loc, sub in subdirs:
-                        ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
-                        self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+                    self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
 
                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                   keep_client=self.arvrunner.keep_client,
                                                   num_retries=self.arvrunner.num_retries                                                  )
-                subdirs = []
                 self.addentry(srcobj, c, ".", subdirs)
 
                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
@@ -141,13 +142,18 @@ class ArvPathMapper(PathMapper):
                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
 
                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
-                self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "File")
+                self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
+                                                              ab, "File", True)
                 if srcobj.get("secondaryFiles"):
                     ab = self.collection_pattern % c.portable_data_hash()
-                    self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt(ab, ab, "Directory")
+                    self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
+
+            if subdirs:
                 for loc, sub in subdirs:
+                    # subdirs will all start with "./", strip it off
                     ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
-                    self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+                    self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
+                                                   ab, "Directory", True)
 
         self.keepdir = None
 
@@ -162,24 +168,24 @@ class ArvPathMapper(PathMapper):
 class StagingPathMapper(PathMapper):
     _follow_dirs = True
 
-    def visit(self, obj, stagedir, basedir, copy=False):
+    def visit(self, obj, stagedir, basedir, copy=False, staged=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
         loc = obj["location"]
         tgt = os.path.join(stagedir, obj["basename"])
         if obj["class"] == "Directory":
-            self._pathmap[loc] = MapperEnt(loc, tgt, "Directory")
+            self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
             if loc.startswith("_:") or self._follow_dirs:
                 self.visitlisting(obj.get("listing", []), tgt, basedir)
         elif obj["class"] == "File":
             if loc in self._pathmap:
                 return
             if "contents" in obj and loc.startswith("_:"):
-                self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
+                self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
             else:
                 if copy:
-                    self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile")
+                    self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
                 else:
-                    self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+                    self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
 
 
@@ -191,9 +197,9 @@ class VwdPathMapper(StagingPathMapper):
         # with any secondary files.
         self.visitlisting(referenced_files, self.stagedir, basedir)
 
-        for path, (ab, tgt, type) in self._pathmap.items():
+        for path, (ab, tgt, type, staged) in self._pathmap.items():
             if type in ("File", "Directory") and ab.startswith("keep:"):
-                self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
+                self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
 
 
 class NoFollowPathMapper(StagingPathMapper):
index b8ed68296511fe5fe2ab7f0f4ee5bce33400bc63..4f12d8ff1ec95f87e015203d7fd7eeac54887636 100644 (file)
@@ -48,11 +48,11 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20170224141733',
-          'schema-salad==2.2.20170222151604',
+          'cwltool==1.0.20170329142446',
+          'schema-salad==2.5.20170328195758',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
-          'arvados-python-client>=0.1.20170112173420',
+          'arvados-python-client>=0.1.20170327195441',
           'setuptools'
       ],
       data_files=[
index b5a3e9930de668f4a7d64e6d40fa364dc5e75fa3..8d076093abfe6b6fcbb52ccea421763d76aa38b6 100755 (executable)
@@ -74,6 +74,8 @@ export ARVADOS_API_HOST=localhost:8000
 export ARVADOS_API_HOST_INSECURE=1
 export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
 
+arv-keepdocker --pull arvados/jobs latest
+
 cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
 #!/bin/sh
 exec arvados-cwl-runner --api=jobs --compute-checksum \\\$@
index 86467040a5c7440ae6aed42b506fbae935ab45eb..2c03812ed3e38e339f1323abd25e2bc4ec61c8dc 100755 (executable)
@@ -2,4 +2,4 @@
 if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
     arv-put --portable-data-hash testdir
 fi
-exec cwltest --test arvados-tests.yml --tool $PWD/runner.sh
+exec cwltest --test arvados-tests.yml --tool $PWD/runner.sh $@
index 1187962a8eb5935d93a47308a8326a8de658eb3a..87528b2ae595df94a73b24ff9b3e931336edf067 100644 (file)
@@ -8,3 +8,77 @@
     }
   tool: keep-dir-test-input.cwl
   doc: Test directory in keep
+
+- job: dir-job2.yml
+  output:
+    "outlist": {
+        "size": 20,
+        "location": "output.txt",
+        "class": "File",
+        "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+    }
+  tool: keep-dir-test-input.cwl
+  doc: Test directory in keep
+
+- job: null
+  output:
+    "outlist": {
+        "size": 20,
+        "location": "output.txt",
+        "class": "File",
+        "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+    }
+  tool: keep-dir-test-input2.cwl
+  doc: Test default directory in keep
+
+- job: null
+  output:
+    "outlist": {
+        "size": 20,
+        "location": "output.txt",
+        "class": "File",
+        "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+    }
+  tool: keep-dir-test-input3.cwl
+  doc: Test default directory in keep
+
+- job: octo.yml
+  output: {}
+  tool: cat.cwl
+  doc: Test hashes in filenames
+
+- job: listing-job.yml
+  output: {
+    "out": {
+        "class": "File",
+        "location": "output.txt",
+        "size": 5,
+        "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+    }
+  }
+  tool: wf/listing_shallow.cwl
+  doc: test shallow directory listing
+
+- job: listing-job.yml
+  output: {
+    "out": {
+        "class": "File",
+        "location": "output.txt",
+        "size": 5,
+        "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+    }
+  }
+  tool: wf/listing_none.cwl
+  doc: test no directory listing
+
+- job: listing-job.yml
+  output: {
+    "out": {
+        "class": "File",
+        "location": "output.txt",
+        "size": 5,
+        "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+    }
+  }
+  tool: wf/listing_deep.cwl
+  doc: test deep directory listing
diff --git a/sdk/cwl/tests/cat.cwl b/sdk/cwl/tests/cat.cwl
new file mode 100644 (file)
index 0000000..93af517
--- /dev/null
@@ -0,0 +1,8 @@
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+  - id: inp
+    type: File
+    inputBinding: {}
+outputs: []
+baseCommand: cat
diff --git a/sdk/cwl/tests/dir-job2.yml b/sdk/cwl/tests/dir-job2.yml
new file mode 100644 (file)
index 0000000..5c654c9
--- /dev/null
@@ -0,0 +1,3 @@
+indir:
+  class: Directory
+  location: keep:d7514270f356df848477718d58308cc4+94/
diff --git a/sdk/cwl/tests/keep-dir-test-input2.cwl b/sdk/cwl/tests/keep-dir-test-input2.cwl
new file mode 100644 (file)
index 0000000..7a355ab
--- /dev/null
@@ -0,0 +1,24 @@
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+  - class: ShellCommandRequirement
+inputs:
+  indir:
+    type: Directory
+    inputBinding:
+      prefix: cd
+      position: -1
+    default:
+      class: Directory
+      location: keep:d7514270f356df848477718d58308cc4+94
+outputs:
+  outlist:
+    type: File
+    outputBinding:
+      glob: output.txt
+arguments: [
+  {shellQuote: false, valueFrom: "&&"},
+  "find", ".",
+  {shellQuote: false, valueFrom: "|"},
+  "sort"]
+stdout: output.txt
\ No newline at end of file
diff --git a/sdk/cwl/tests/keep-dir-test-input3.cwl b/sdk/cwl/tests/keep-dir-test-input3.cwl
new file mode 100644 (file)
index 0000000..f7321c8
--- /dev/null
@@ -0,0 +1,24 @@
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+  - class: ShellCommandRequirement
+inputs:
+  indir:
+    type: Directory
+    inputBinding:
+      prefix: cd
+      position: -1
+    default:
+      class: Directory
+      location: keep:d7514270f356df848477718d58308cc4+94/
+outputs:
+  outlist:
+    type: File
+    outputBinding:
+      glob: output.txt
+arguments: [
+  {shellQuote: false, valueFrom: "&&"},
+  "find", ".",
+  {shellQuote: false, valueFrom: "|"},
+  "sort"]
+stdout: output.txt
\ No newline at end of file
diff --git a/sdk/cwl/tests/octo.yml b/sdk/cwl/tests/octo.yml
new file mode 100644 (file)
index 0000000..f6530df
--- /dev/null
@@ -0,0 +1,3 @@
+inp:
+  class: File
+  location: "octothorpe/item %231.txt"
\ No newline at end of file
diff --git a/sdk/cwl/tests/octothorpe/item #1.txt b/sdk/cwl/tests/octothorpe/item #1.txt
new file mode 100644 (file)
index 0000000..e69de29
index ad69371605a9ffaec4bfd1abb99296cfe565f489..33305d927bd0e7a188731aeca3fc031aa37407bf 100644 (file)
@@ -275,6 +275,77 @@ class TestContainer(unittest.TestCase):
         for key in call_body:
             self.assertEqual(call_body_expected.get(key), call_body.get(key))
 
+
+    # Test redirecting stdin/stdout/stderr
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    def test_redirects(self, keepdocker):
+        arv_docker_clear_cache()
+
+        runner = mock.MagicMock()
+        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        runner.ignore_docker_for_reuse = False
+
+        keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+        runner.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
+
+        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+        tool = cmap({
+            "inputs": [],
+            "outputs": [],
+            "baseCommand": "ls",
+            "stdout": "stdout.txt",
+            "stderr": "stderr.txt",
+            "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
+            "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+        })
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
+                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+        arvtool.formatgraph = None
+        for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
+                             make_fs_access=make_fs_access, tmpdir="/tmp"):
+            j.run()
+            runner.api.container_requests().create.assert_called_with(
+                body=JsonDiffMatcher({
+                    'environment': {
+                        'HOME': '/var/spool/cwl',
+                        'TMPDIR': '/tmp'
+                    },
+                    'name': 'test_run_redirect',
+                    'runtime_constraints': {
+                        'vcpus': 1,
+                        'ram': 1073741824
+                    },
+                    'use_existing': True,
+                    'priority': 1,
+                    'mounts': {
+                        '/var/spool/cwl': {'kind': 'tmp'},
+                        "stderr": {
+                            "kind": "file",
+                            "path": "/var/spool/cwl/stderr.txt"
+                        },
+                        "stdin": {
+                            "kind": "collection",
+                            "path": "file.txt",
+                            "portable_data_hash": "99999999999999999999999999999996+99"
+                        },
+                        "stdout": {
+                            "kind": "file",
+                            "path": "/var/spool/cwl/stdout.txt"
+                        },
+                    },
+                    'state': 'Committed',
+                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                    'output_path': '/var/spool/cwl',
+                    'container_image': 'arvados/jobs',
+                    'command': ['ls', '/var/spool/cwl'],
+                    'cwd': '/var/spool/cwl',
+                    'scheduling_parameters': {},
+                    'properties': {},
+                }))
+
     @mock.patch("arvados.collection.Collection")
     def test_done(self, col):
         api = mock.MagicMock()
index 3b6af04b293e8f48e320fa6508716d3c6d27faf6..b39a9842845f7af01c64d62c713819ecaf1aca1c 100644 (file)
@@ -19,6 +19,7 @@ from arvados_cwl.pathmapper import ArvPathMapper
 def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
     pdh = "99999999999999999999999999999991+99"
     for c in files:
+        c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
         c.fn = fnPattern % (pdh, os.path.basename(c.fn))
 
 class TestPathmap(unittest.TestCase):
@@ -36,23 +37,29 @@ class TestPathmap(unittest.TestCase):
             "location": "keep:99999999999999999999999999999991+99/hw.py"
         }], "", "/test/%s", "/test/%s/%s")
 
-        self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+        self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
 
     @mock.patch("arvados.commands.run.uploadfiles")
-    def test_upload(self, upl):
+    @mock.patch("arvados.commands.run.statfile")
+    def test_upload(self, statfile, upl):
         """Test pathmapper uploading files."""
 
         arvrunner = arvados_cwl.ArvCwlRunner(self.api)
 
+        def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
+            st = arvados.commands.run.UploadFile("", "tests/hw.py")
+            return st
+
         upl.side_effect = upload_mock
+        statfile.side_effect = statfile_mock
 
         p = ArvPathMapper(arvrunner, [{
             "class": "File",
-            "location": "tests/hw.py"
+            "location": "file:tests/hw.py"
         }], "", "/test/%s", "/test/%s/%s")
 
-        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+        self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
 
     @mock.patch("arvados.commands.run.uploadfiles")
@@ -60,16 +67,16 @@ class TestPathmap(unittest.TestCase):
         """Test pathmapper handling previously uploaded files."""
 
         arvrunner = arvados_cwl.ArvCwlRunner(self.api)
-        arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
+        arvrunner.add_uploaded('file:tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File', staged=True))
 
         upl.side_effect = upload_mock
 
         p = ArvPathMapper(arvrunner, [{
             "class": "File",
-            "location": "tests/hw.py"
+            "location": "file:tests/hw.py"
         }], "", "/test/%s", "/test/%s/%s")
 
-        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+        self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
 
     @mock.patch("arvados.commands.run.uploadfiles")
@@ -89,8 +96,8 @@ class TestPathmap(unittest.TestCase):
 
         p = ArvPathMapper(arvrunner, [{
             "class": "File",
-            "location": "tests/hw.py"
+            "location": "file:tests/hw.py"
         }], "", "/test/%s", "/test/%s/%s")
 
-        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+        self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
index 9982f6ffd5955109904ed2fc726c0d0b35d495ec..85c49c913427b26d9207926ed407065125bfe9c0 100644 (file)
@@ -822,18 +822,8 @@ class TestSubmit(unittest.TestCase):
         arvrunner.project_uuid = ""
         api.return_value = mock.MagicMock()
         arvrunner.api = api.return_value
-        arvrunner.api.links().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
-                                                            {"items": [], "items_available": 0, "offset": 0},
-                                                            {"items": [], "items_available": 0, "offset": 0},
-                                                            {"items": [{"created_at": "",
-                                                                        "head_uuid": "",
-                                                                        "link_class": "docker_image_hash",
-                                                                        "name": "123456",
-                                                                        "owner_uuid": "",
-                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
-                                                            {"items": [], "items_available": 0, "offset": 0},
-                                                            {"items": [{"created_at": "",
-                                                                        "head_uuid": "",
+        arvrunner.api.links().list().execute.side_effect = ({"items": [{"created_at": "",
+                                                                        "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
                                                                         "link_class": "docker_image_repo+tag",
                                                                         "name": "arvados/jobs:"+arvados_cwl.__version__,
                                                                         "owner_uuid": "",
@@ -843,19 +833,18 @@ class TestSubmit(unittest.TestCase):
                                                                         "link_class": "docker_image_hash",
                                                                         "name": "123456",
                                                                         "owner_uuid": "",
-                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}                                                            ,
+                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}
         )
         find_one_image_hash.return_value = "123456"
 
-        arvrunner.api.collections().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
-                                                                  {"items": [{"uuid": "",
+        arvrunner.api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
                                                                               "owner_uuid": "",
                                                                               "manifest_text": "",
                                                                               "properties": ""
-                                                                          }], "items_available": 1, "offset": 0},
-                                                                  {"items": [{"uuid": ""}], "items_available": 1, "offset": 0})
+                                                                          }], "items_available": 1, "offset": 0},)
         arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
-        self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+        self.assertEqual("arvados/jobs:"+arvados_cwl.__version__,
+                         arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
 
 class TestCreateTemplate(unittest.TestCase):
     existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
diff --git a/sdk/cwl/tests/tmp1/tmp2/tmp3/.gitkeep b/sdk/cwl/tests/tmp1/tmp2/tmp3/.gitkeep
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/wf/listing_deep.cwl b/sdk/cwl/tests/wf/listing_deep.cwl
new file mode 100644 (file)
index 0000000..79ab368
--- /dev/null
@@ -0,0 +1,15 @@
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+  cwltool: "http://commonwl.org/cwltool#"
+requirements:
+  cwltool:LoadListingRequirement:
+    loadListing: deep_listing
+  InlineJavascriptRequirement: {}
+inputs:
+  d: Directory
+outputs:
+  out: stdout
+stdout: output.txt
+arguments:
+  [echo, "${if(inputs.d.listing[0].class === 'Directory' && inputs.d.listing[0].listing[0].class === 'Directory') {return 'true';} else {return 'false';}}"]
diff --git a/sdk/cwl/tests/wf/listing_none.cwl b/sdk/cwl/tests/wf/listing_none.cwl
new file mode 100644 (file)
index 0000000..584c8f5
--- /dev/null
@@ -0,0 +1,15 @@
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+  cwltool: http://commonwl.org/cwltool#
+requirements:
+  cwltool:LoadListingRequirement:
+    loadListing: no_listing
+  InlineJavascriptRequirement: {}
+inputs:
+  d: Directory
+outputs:
+  out: stdout
+stdout: output.txt
+arguments:
+  [echo, "${if(inputs.d.listing === undefined) {return 'true';} else {return 'false';}}"]
\ No newline at end of file
diff --git a/sdk/cwl/tests/wf/listing_shallow.cwl b/sdk/cwl/tests/wf/listing_shallow.cwl
new file mode 100644 (file)
index 0000000..b54f348
--- /dev/null
@@ -0,0 +1,15 @@
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+  cwltool: http://commonwl.org/cwltool#
+requirements:
+  cwltool:LoadListingRequirement:
+    loadListing: shallow_listing
+  InlineJavascriptRequirement: {}
+inputs:
+  d: Directory
+outputs:
+  out: stdout
+stdout: output.txt
+arguments:
+  [echo, "${if(inputs.d.listing[0].class === 'Directory' && inputs.d.listing[0].listing === undefined) {return 'true';} else {return 'false';}}"]
index f73ec2b13d3c4919dd99755bae65bbda8ff560a3..83b3537d528ea55446f26e7862ece767333fd8db 100644 (file)
@@ -44,6 +44,7 @@ steps:
           out: [out]
           run:
             class: CommandLineTool
+            id: subtool
             inputs:
               sleeptime:
                 type: int
index df4d992c359a42eeb619826aa14ce4b87aeab19d..dd4c7054a94c5c9dc73785eafbbc36e2bdff55b5 100644 (file)
           "run": {
             "baseCommand": "sleep",
             "class": "CommandLineTool",
+            "id": "#main/sleep1/subtool",
             "inputs": [
               {
-                "id": "#main/sleep1/sleeptime",
+                "id": "#main/sleep1/subtool/sleeptime",
                 "inputBinding": {
                   "position": 1
                 },
@@ -61,7 +62,7 @@
             ],
             "outputs": [
               {
-                "id": "#main/sleep1/out",
+                "id": "#main/sleep1/subtool/out",
                 "outputBinding": {
                   "outputEval": "out"
                 },
index 22447794f1c7f93c4eca6fcdec83d254cbc1740b..9adbb4878f40541eb13c0feed39bf22241f4c3f5 100644 (file)
@@ -7,9 +7,12 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/streamer"
        "io"
        "io/ioutil"
+       "log"
        "math/rand"
        "net"
        "net/http"
+       "os"
+       "regexp"
        "strings"
        "time"
 )
@@ -19,6 +22,13 @@ import (
 // log.Printf to DebugPrintf.
 var DebugPrintf = func(string, ...interface{}) {}
 
+func init() {
+       var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+       if matchTrue.MatchString(os.Getenv("ARVADOS_DEBUG")) {
+               DebugPrintf = log.Printf
+       }
+}
+
 type keepService struct {
        Uuid     string `json:"uuid"`
        Hostname string `json:"service_host"`
index b74f828f4bd04a2a6321aa50e5f823cb3a2496ab..b96a4c8bd30882bbcec27d453fe9732f24dd6d1c 100644 (file)
@@ -1,4 +1,3 @@
-import gflags
 import httplib
 import httplib2
 import logging
index d1263e24f27b2e89b3ef386d3d35a28bb9341811..543725b516beada820f9b3e001d1267024436b02 100644 (file)
@@ -44,7 +44,7 @@ class OrderedJsonModel(apiclient.model.JsonModel):
         return body
 
 
-def _intercept_http_request(self, uri, **kwargs):
+def _intercept_http_request(self, uri, method="GET", **kwargs):
     if (self.max_request_size and
         kwargs.get('body') and
         self.max_request_size < len(kwargs['body'])):
@@ -58,7 +58,7 @@ def _intercept_http_request(self, uri, **kwargs):
 
     kwargs['headers']['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
 
-    retryable = kwargs.get('method', 'GET') in [
+    retryable = method in [
         'DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT']
     retry_count = self._retry_count if retryable else 0
 
@@ -75,7 +75,7 @@ def _intercept_http_request(self, uri, **kwargs):
     for _ in range(retry_count):
         self._last_request_time = time.time()
         try:
-            return self.orig_http_request(uri, **kwargs)
+            return self.orig_http_request(uri, method, **kwargs)
         except httplib.HTTPException:
             _logger.debug("Retrying API request in %d s after HTTP error",
                           delay, exc_info=True)
@@ -93,7 +93,7 @@ def _intercept_http_request(self, uri, **kwargs):
         delay = delay * self._retry_delay_backoff
 
     self._last_request_time = time.time()
-    return self.orig_http_request(uri, **kwargs)
+    return self.orig_http_request(uri, method, **kwargs)
 
 def _patch_http_request(http, api_token):
     http.arvados_api_token = api_token
@@ -136,7 +136,7 @@ def http_cache(data_type):
     try:
         util.mkdir_dash_p(path)
     except OSError:
-        path = None
+        return None
     return cache.SafeHTTPCache(path, max_age=60*60*24*2)
 
 def api(version=None, cache=True, host=None, token=None, insecure=False, **kwargs):
diff --git a/sdk/python/arvados/commands/get.py b/sdk/python/arvados/commands/get.py
new file mode 100755 (executable)
index 0000000..f39e092
--- /dev/null
@@ -0,0 +1,276 @@
+#!/usr/bin/env python
+
+import argparse
+import hashlib
+import os
+import re
+import string
+import sys
+import logging
+
+import arvados
+import arvados.commands._util as arv_cmd
+
+from arvados._version import __version__
+
+api_client = None
+logger = logging.getLogger('arvados.arv-get')
+
+parser = argparse.ArgumentParser(
+    description='Copy data from Keep to a local file or pipe.',
+    parents=[arv_cmd.retry_opt])
+parser.add_argument('--version', action='version',
+                    version="%s %s" % (sys.argv[0], __version__),
+                    help='Print version and exit.')
+parser.add_argument('locator', type=str,
+                    help="""
+Collection locator, optionally with a file path or prefix.
+""")
+parser.add_argument('destination', type=str, nargs='?', default='-',
+                    help="""
+Local file or directory where the data is to be written. Default: stdout.
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('--progress', action='store_true',
+                   help="""
+Display human-readable progress on stderr (bytes and, if possible,
+percentage of total data size). This is the default behavior when it
+is not expected to interfere with the output: specifically, stderr is
+a tty _and_ either stdout is not a tty, or output is being written to
+named files rather than stdout.
+""")
+group.add_argument('--no-progress', action='store_true',
+                   help="""
+Do not display human-readable progress on stderr.
+""")
+group.add_argument('--batch-progress', action='store_true',
+                   help="""
+Display machine-readable progress on stderr (bytes and, if known,
+total data size).
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('--hash',
+                    help="""
+Display the hash of each file as it is read from Keep, using the given
+hash algorithm. Supported algorithms include md5, sha1, sha224,
+sha256, sha384, and sha512.
+""")
+group.add_argument('--md5sum', action='store_const',
+                    dest='hash', const='md5',
+                    help="""
+Display the MD5 hash of each file as it is read from Keep.
+""")
+parser.add_argument('-n', action='store_true',
+                    help="""
+Do not write any data -- just read from Keep, and report md5sums if
+requested.
+""")
+parser.add_argument('-r', action='store_true',
+                    help="""
+Retrieve all files in the specified collection/prefix. This is the
+default behavior if the "locator" argument ends with a forward slash.
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('-f', action='store_true',
+                   help="""
+Overwrite existing files while writing. The default behavior is to
+refuse to write *anything* if any of the output files already
+exist. As a special case, -f is not needed to write to stdout.
+""")
+group.add_argument('--skip-existing', action='store_true',
+                   help="""
+Skip files that already exist. The default behavior is to refuse to
+write *anything* if any files exist that would have to be
+overwritten. This option causes even devices, sockets, and fifos to be
+skipped.
+""")
+
+def parse_arguments(arguments, stdout, stderr):
+    args = parser.parse_args(arguments)
+
+    if args.locator[-1] == os.sep:
+        args.r = True
+    if (args.r and
+        not args.n and
+        not (args.destination and
+             os.path.isdir(args.destination))):
+        parser.error('Destination is not a directory.')
+    if not args.r and (os.path.isdir(args.destination) or
+                       args.destination[-1] == os.path.sep):
+        args.destination = os.path.join(args.destination,
+                                        os.path.basename(args.locator))
+        logger.debug("Appended source file name to destination directory: %s",
+                     args.destination)
+
+    if args.destination == '/dev/stdout':
+        args.destination = "-"
+
+    if args.destination == '-':
+        # Normally you have to use -f to write to a file (or device) that
+        # already exists, but "-" and "/dev/stdout" are common enough to
+        # merit a special exception.
+        args.f = True
+    else:
+        args.destination = args.destination.rstrip(os.sep)
+
+    # Turn on --progress by default if stderr is a tty and output is
+    # either going to a named file, or going (via stdout) to something
+    # that isn't a tty.
+    if (not (args.batch_progress or args.no_progress)
+        and stderr.isatty()
+        and (args.destination != '-'
+             or not stdout.isatty())):
+        args.progress = True
+    return args
+
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+    global api_client
+    
+    args = parse_arguments(arguments, stdout, stderr)
+    if api_client is None:
+        api_client = arvados.api('v1')
+
+    r = re.search(r'^(.*?)(/.*)?$', args.locator)
+    collection = r.group(1)
+    get_prefix = r.group(2)
+    if args.r and not get_prefix:
+        get_prefix = os.sep
+    try:
+        reader = arvados.CollectionReader(collection, num_retries=args.retries)
+    except Exception as error:
+        logger.error("failed to read collection: {}".format(error))
+        return 1
+
+    # User asked to download the collection's manifest
+    if not get_prefix:
+        if not args.n:
+            open_flags = os.O_CREAT | os.O_WRONLY
+            if not args.f:
+                open_flags |= os.O_EXCL
+            try:
+                if args.destination == "-":
+                    stdout.write(reader.manifest_text())
+                else:
+                    out_fd = os.open(args.destination, open_flags)
+                    with os.fdopen(out_fd, 'wb') as out_file:
+                        out_file.write(reader.manifest_text())
+            except (IOError, OSError) as error:
+                logger.error("can't write to '{}': {}".format(args.destination, error))
+                return 1
+            except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
+                logger.error("failed to download '{}': {}".format(collection, error))
+                return 1
+        return 0
+
+    # Scan the collection. Make an array of (stream, file, local
+    # destination filename) tuples, and add up total size to extract.
+    todo = []
+    todo_bytes = 0
+    try:
+        if get_prefix == os.sep:
+            item = reader
+        else:
+            item = reader.find('.' + get_prefix)
+
+        if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
+            # If the user asked for a file and we got a subcollection, error out.
+            if get_prefix[-1] != os.sep:
+                logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
+                return 1
+            # If the user asked stdout as a destination, error out.
+            elif args.destination == '-':
+                logger.error("cannot use 'stdout' as destination when downloading multiple files.")
+                return 1
+            # User asked for a subcollection, and that's what was found. Add up total size
+            # to download.
+            for s, f in files_in_collection(item):
+                dest_path = os.path.join(
+                    args.destination,
+                    os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
+                if (not (args.n or args.f or args.skip_existing) and
+                    os.path.exists(dest_path)):
+                    logger.error('Local file %s already exists.' % (dest_path,))
+                    return 1
+                todo += [(s, f, dest_path)]
+                todo_bytes += f.size()
+        elif isinstance(item, arvados.arvfile.ArvadosFile):
+            todo += [(item.parent, item, args.destination)]
+            todo_bytes += item.size()
+        else:
+            logger.error("'{}' not found.".format('.' + get_prefix))
+            return 1
+    except (IOError, arvados.errors.NotFoundError) as e:
+        logger.error(e)
+        return 1
+
+    out_bytes = 0
+    for s, f, outfilename in todo:
+        outfile = None
+        digestor = None
+        if not args.n:
+            if outfilename == "-":
+                outfile = stdout
+            else:
+                if args.skip_existing and os.path.exists(outfilename):
+                    logger.debug('Local file %s exists. Skipping.', outfilename)
+                    continue
+                elif not args.f and (os.path.isfile(outfilename) or
+                                   os.path.isdir(outfilename)):
+                    # Good thing we looked again: apparently this file wasn't
+                    # here yet when we checked earlier.
+                    logger.error('Local file %s already exists.' % (outfilename,))
+                    return 1
+                if args.r:
+                    arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
+                try:
+                    outfile = open(outfilename, 'wb')
+                except Exception as error:
+                    logger.error('Open(%s) failed: %s' % (outfilename, error))
+                    return 1
+        if args.hash:
+            digestor = hashlib.new(args.hash)
+        try:
+            with s.open(f.name, 'r') as file_reader:
+                for data in file_reader.readall():
+                    if outfile:
+                        outfile.write(data)
+                    if digestor:
+                        digestor.update(data)
+                    out_bytes += len(data)
+                    if args.progress:
+                        stderr.write('\r%d MiB / %d MiB %.1f%%' %
+                                     (out_bytes >> 20,
+                                      todo_bytes >> 20,
+                                      (100
+                                       if todo_bytes==0
+                                       else 100.0*out_bytes/todo_bytes)))
+                    elif args.batch_progress:
+                        stderr.write('%s %d read %d total\n' %
+                                     (sys.argv[0], os.getpid(),
+                                      out_bytes, todo_bytes))
+            if digestor:
+                stderr.write("%s  %s/%s\n"
+                             % (digestor.hexdigest(), s.stream_name(), f.name))
+        except KeyboardInterrupt:
+            if outfile and (outfile.fileno() > 2) and not outfile.closed:
+                os.unlink(outfile.name)
+            break
+        finally:
+            if outfile != None and outfile != stdout:
+                outfile.close()
+
+    if args.progress:
+        stderr.write('\n')
+    return 0
+
+def files_in_collection(c):
+    # Sort first by file type, then alphabetically by file path.
+    for i in sorted(c.keys(),
+                    key=lambda k: (
+                        isinstance(c[k], arvados.collection.Subcollection),
+                        k.upper())):
+        if isinstance(c[i], arvados.arvfile.ArvadosFile):
+            yield (c, c[i])
+        elif isinstance(c[i], arvados.collection.Subcollection):
+            for s, f in files_in_collection(c[i]):
+                yield (s, f)
index a2f2e542754f7e2e44edbd5673cf36d2c5d130af..c6ca0855f70098359f7648694fd807150497cb3e 100755 (executable)
@@ -3,6 +3,9 @@
 from __future__ import print_function
 
 import argparse
+import collections
+import logging
+import re
 import sys
 
 import arvados
@@ -10,13 +13,15 @@ import arvados.commands._util as arv_cmd
 
 from arvados._version import __version__
 
+FileInfo = collections.namedtuple('FileInfo', ['stream_name', 'name', 'size'])
+
 def parse_args(args):
     parser = argparse.ArgumentParser(
         description='List contents of a manifest',
         parents=[arv_cmd.retry_opt])
 
     parser.add_argument('locator', type=str,
-                        help="""Collection UUID or locator""")
+                        help="""Collection UUID or locator, optionally with a subdir path.""")
     parser.add_argument('-s', action='store_true',
                         help="""List file sizes, in KiB.""")
     parser.add_argument('--version', action='version',
@@ -26,25 +31,43 @@ def parse_args(args):
     return parser.parse_args(args)
 
 def size_formatter(coll_file):
-    return "{:>10}".format((coll_file.size() + 1023) / 1024)
+    return "{:>10}".format((coll_file.size + 1023) / 1024)
 
 def name_formatter(coll_file):
-    return "{}/{}".format(coll_file.stream_name(), coll_file.name)
+    return "{}/{}".format(coll_file.stream_name, coll_file.name)
 
-def main(args, stdout, stderr, api_client=None):
+def main(args, stdout, stderr, api_client=None, logger=None):
     args = parse_args(args)
 
     if api_client is None:
         api_client = arvados.api('v1')
 
+    if logger is None:
+        logger = logging.getLogger('arvados.arv-ls')
+
     try:
-        cr = arvados.CollectionReader(args.locator, api_client=api_client,
+        r = re.search(r'^(.*?)(/.*)?$', args.locator)
+        collection = r.group(1)
+        get_prefix = r.group(2)
+
+        cr = arvados.CollectionReader(collection, api_client=api_client,
                                       num_retries=args.retries)
-        cr.normalize()
-    except (arvados.errors.ArgumentError,
+        if get_prefix:
+            if get_prefix[-1] == '/':
+                get_prefix = get_prefix[:-1]
+            stream_name = '.' + get_prefix
+            reader = cr.find(stream_name)
+            if not (isinstance(reader, arvados.CollectionReader) or
+                    isinstance(reader, arvados.collection.Subcollection)):
+                logger.error("'{}' is not a subdirectory".format(get_prefix))
+                return 1
+        else:
+            stream_name = '.'
+            reader = cr
+    except (arvados.errors.ApiError,
+            arvados.errors.ArgumentError,
             arvados.errors.NotFoundError) as error:
-        print("arv-ls: error fetching collection: {}".format(error),
-              file=stderr)
+        logger.error("error fetching collection: {}".format(error))
         return 1
 
     formatters = []
@@ -52,7 +75,21 @@ def main(args, stdout, stderr, api_client=None):
         formatters.append(size_formatter)
     formatters.append(name_formatter)
 
-    for f in cr.all_files():
+    for f in files_in_collection(reader, stream_name):
         print(*(info_func(f) for info_func in formatters), file=stdout)
 
     return 0
+
+def files_in_collection(c, stream_name='.'):
+    # Sort first by file type, then alphabetically by file path.
+    for i in sorted(c.keys(),
+                    key=lambda k: (
+                        isinstance(c[k], arvados.collection.Subcollection),
+                        k.upper())):
+        if isinstance(c[i], arvados.arvfile.ArvadosFile):
+            yield FileInfo(stream_name=stream_name,
+                           name=i,
+                           size=c[i].size())
+        elif isinstance(c[i], arvados.collection.Subcollection):
+            for f in files_in_collection(c[i], "{}/{}".format(stream_name, i)):
+                yield f
index 32d5fef6a8588e1f2785517435949082dd5b3534..42510754aba4724bb7b91aa061125fce52a1c1a0 100644 (file)
@@ -543,7 +543,10 @@ class ArvPutUploadJob(object):
                     with self._state_lock:
                         self._state['manifest'] = manifest
             if self.use_cache:
-                self._save_state()
+                try:
+                    self._save_state()
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
@@ -673,10 +676,12 @@ class ArvPutUploadJob(object):
             cache_filepath = os.path.join(
                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
                 cache_filename)
-            if self.resume:
+            if self.resume and os.path.exists(cache_filepath):
+                self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
+                self.logger.info("Creating new cache file at {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
@@ -693,6 +698,7 @@ class ArvPutUploadJob(object):
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
             else:
+                self.logger.info("No cache usage requested for this run.")
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
@@ -719,20 +725,19 @@ class ArvPutUploadJob(object):
         """
         Atomically save current state into cache.
         """
+        with self._state_lock:
+            # We're not using copy.deepcopy() here because it's a lot slower
+            # than json.dumps(), and we're already needing JSON format to be
+            # saved on disk.
+            state = json.dumps(self._state)
         try:
-            with self._state_lock:
-                # We're not using copy.deepcopy() here because it's a lot slower
-                # than json.dumps(), and we're already needing JSON format to be
-                # saved on disk.
-                state = json.dumps(self._state)
-            new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_filename))
-            self._lock_file(new_cache_fd)
-            new_cache = os.fdopen(new_cache_fd, 'r+')
+            new_cache = tempfile.NamedTemporaryFile(
+                dir=os.path.dirname(self._cache_filename), delete=False)
+            self._lock_file(new_cache)
             new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_filename)
+            os.rename(new_cache.name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
index 38f332b38e2d51aae9b6a3fa5007a59aad6b006f..5b4770c4d0dca8824c268448296d0658c8ba04d8 100644 (file)
@@ -1,4 +1,5 @@
 import cStringIO
+import collections
 import datetime
 import hashlib
 import logging
@@ -307,10 +308,22 @@ class KeepClient(object):
             except:
                 ua.close()
 
-        @staticmethod
-        def _socket_open(family, socktype, protocol, address=None):
+        def _socket_open(self, *args, **kwargs):
+            if len(args) + len(kwargs) == 2:
+                return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+            else:
+                return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+        def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+            return self._socket_open_pycurl_7_21_5(
+                purpose=None,
+                address=collections.namedtuple(
+                    'Address', ['family', 'socktype', 'protocol', 'addr'],
+                )(family, socktype, protocol, address))
+
+        def _socket_open_pycurl_7_21_5(self, purpose, address):
             """Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
-            s = socket.socket(family, socktype, protocol)
+            s = socket.socket(address.family, address.socktype, address.protocol)
             s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
             # Will throw invalid protocol error on mac. This test prevents that.
             if hasattr(socket, 'TCP_KEEPIDLE'):
index f91b3977090da7c6f8b30844635174d122e67ba2..1c2e552490dcd49ba3fe1d9b893b20329f585fac 100755 (executable)
@@ -1,238 +1,7 @@
 #!/usr/bin/env python
 
-import argparse
-import hashlib
-import os
-import re
-import string
 import sys
-import logging
 
-import arvados
-import arvados.commands._util as arv_cmd
+from arvados.commands.get import main
 
-from arvados._version import __version__
-
-logger = logging.getLogger('arvados.arv-get')
-
-def abort(msg, code=1):
-    print >>sys.stderr, "arv-get:", msg
-    exit(code)
-
-parser = argparse.ArgumentParser(
-    description='Copy data from Keep to a local file or pipe.',
-    parents=[arv_cmd.retry_opt])
-parser.add_argument('--version', action='version',
-                    version="%s %s" % (sys.argv[0], __version__),
-                    help='Print version and exit.')
-parser.add_argument('locator', type=str,
-                    help="""
-Collection locator, optionally with a file path or prefix.
-""")
-parser.add_argument('destination', type=str, nargs='?', default='-',
-                    help="""
-Local file or directory where the data is to be written. Default: stdout.
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('--progress', action='store_true',
-                   help="""
-Display human-readable progress on stderr (bytes and, if possible,
-percentage of total data size). This is the default behavior when it
-is not expected to interfere with the output: specifically, stderr is
-a tty _and_ either stdout is not a tty, or output is being written to
-named files rather than stdout.
-""")
-group.add_argument('--no-progress', action='store_true',
-                   help="""
-Do not display human-readable progress on stderr.
-""")
-group.add_argument('--batch-progress', action='store_true',
-                   help="""
-Display machine-readable progress on stderr (bytes and, if known,
-total data size).
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('--hash',
-                    help="""
-Display the hash of each file as it is read from Keep, using the given
-hash algorithm. Supported algorithms include md5, sha1, sha224,
-sha256, sha384, and sha512.
-""")
-group.add_argument('--md5sum', action='store_const',
-                    dest='hash', const='md5',
-                    help="""
-Display the MD5 hash of each file as it is read from Keep.
-""")
-parser.add_argument('-n', action='store_true',
-                    help="""
-Do not write any data -- just read from Keep, and report md5sums if
-requested.
-""")
-parser.add_argument('-r', action='store_true',
-                    help="""
-Retrieve all files in the specified collection/prefix. This is the
-default behavior if the "locator" argument ends with a forward slash.
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('-f', action='store_true',
-                   help="""
-Overwrite existing files while writing. The default behavior is to
-refuse to write *anything* if any of the output files already
-exist. As a special case, -f is not needed to write to stdout.
-""")
-group.add_argument('--skip-existing', action='store_true',
-                   help="""
-Skip files that already exist. The default behavior is to refuse to
-write *anything* if any files exist that would have to be
-overwritten. This option causes even devices, sockets, and fifos to be
-skipped.
-""")
-
-args = parser.parse_args()
-
-if args.locator[-1] == os.sep:
-    args.r = True
-if (args.r and
-    not args.n and
-    not (args.destination and
-         os.path.isdir(args.destination))):
-    parser.error('Destination is not a directory.')
-if not args.r and (os.path.isdir(args.destination) or
-                   args.destination[-1] == os.path.sep):
-    args.destination = os.path.join(args.destination,
-                                    os.path.basename(args.locator))
-    logger.debug("Appended source file name to destination directory: %s",
-                 args.destination)
-
-if args.destination == '/dev/stdout':
-    args.destination = "-"
-
-if args.destination == '-':
-    # Normally you have to use -f to write to a file (or device) that
-    # already exists, but "-" and "/dev/stdout" are common enough to
-    # merit a special exception.
-    args.f = True
-else:
-    args.destination = args.destination.rstrip(os.sep)
-
-# Turn on --progress by default if stderr is a tty and output is
-# either going to a named file, or going (via stdout) to something
-# that isn't a tty.
-if (not (args.batch_progress or args.no_progress)
-    and sys.stderr.isatty()
-    and (args.destination != '-'
-         or not sys.stdout.isatty())):
-    args.progress = True
-
-
-r = re.search(r'^(.*?)(/.*)?$', args.locator)
-collection = r.group(1)
-get_prefix = r.group(2)
-if args.r and not get_prefix:
-    get_prefix = os.sep
-api_client = arvados.api('v1')
-reader = arvados.CollectionReader(collection, num_retries=args.retries)
-
-if not get_prefix:
-    if not args.n:
-        open_flags = os.O_CREAT | os.O_WRONLY
-        if not args.f:
-            open_flags |= os.O_EXCL
-        try:
-            if args.destination == "-":
-                sys.stdout.write(reader.manifest_text())
-            else:
-                out_fd = os.open(args.destination, open_flags)
-                with os.fdopen(out_fd, 'wb') as out_file:
-                    out_file.write(reader.manifest_text())
-        except (IOError, OSError) as error:
-            abort("can't write to '{}': {}".format(args.destination, error))
-        except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
-            abort("failed to download '{}': {}".format(collection, error))
-    sys.exit(0)
-
-reader.normalize()
-
-# Scan the collection. Make an array of (stream, file, local
-# destination filename) tuples, and add up total size to extract.
-todo = []
-todo_bytes = 0
-try:
-    for s in reader.all_streams():
-        for f in s.all_files():
-            if get_prefix and get_prefix[-1] == os.sep:
-                if 0 != string.find(os.path.join(s.name(), f.name()),
-                                    '.' + get_prefix):
-                    continue
-                if args.destination == "-":
-                    dest_path = "-"
-                else:
-                    dest_path = os.path.join(
-                        args.destination,
-                        os.path.join(s.name(), f.name())[len(get_prefix)+1:])
-                    if (not (args.n or args.f or args.skip_existing) and
-                        os.path.exists(dest_path)):
-                        abort('Local file %s already exists.' % (dest_path,))
-            else:
-                if os.path.join(s.name(), f.name()) != '.' + get_prefix:
-                    continue
-                dest_path = args.destination
-            todo += [(s, f, dest_path)]
-            todo_bytes += f.size()
-except arvados.errors.NotFoundError as e:
-    abort(e)
-
-# Read data, and (if not -n) write to local file(s) or pipe.
-
-out_bytes = 0
-for s,f,outfilename in todo:
-    outfile = None
-    digestor = None
-    if not args.n:
-        if outfilename == "-":
-            outfile = sys.stdout
-        else:
-            if args.skip_existing and os.path.exists(outfilename):
-                logger.debug('Local file %s exists. Skipping.', outfilename)
-                continue
-            elif not args.f and (os.path.isfile(outfilename) or
-                               os.path.isdir(outfilename)):
-                # Good thing we looked again: apparently this file wasn't
-                # here yet when we checked earlier.
-                abort('Local file %s already exists.' % (outfilename,))
-            if args.r:
-                arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
-            try:
-                outfile = open(outfilename, 'wb')
-            except Exception as error:
-                abort('Open(%s) failed: %s' % (outfilename, error))
-    if args.hash:
-        digestor = hashlib.new(args.hash)
-    try:
-        for data in f.readall():
-            if outfile:
-                outfile.write(data)
-            if digestor:
-                digestor.update(data)
-            out_bytes += len(data)
-            if args.progress:
-                sys.stderr.write('\r%d MiB / %d MiB %.1f%%' %
-                                 (out_bytes >> 20,
-                                  todo_bytes >> 20,
-                                  (100
-                                   if todo_bytes==0
-                                   else 100.0*out_bytes/todo_bytes)))
-            elif args.batch_progress:
-                sys.stderr.write('%s %d read %d total\n' %
-                                 (sys.argv[0], os.getpid(),
-                                  out_bytes, todo_bytes))
-        if digestor:
-            sys.stderr.write("%s  %s/%s\n"
-                             % (digestor.hexdigest(), s.name(), f.name()))
-    except KeyboardInterrupt:
-        if outfile and (outfile.fileno() > 2) and not outfile.closed:
-            os.unlink(outfile.name)
-        break
-
-if args.progress:
-    sys.stderr.write('\n')
+sys.exit(main(sys.argv[1:], sys.stdout, sys.stderr))
index ed380e9a00831dd65f5a62ef0d12f1382f0972f3..5387b0232ab47ae74f5e9c125915b796ecfe1d96 100644 (file)
@@ -45,15 +45,13 @@ setup(name='arvados-python-client',
           ('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
       ],
       install_requires=[
-          'google-api-python-client==1.4.2',
-          'oauth2client >=1.4.6, <2',
+          'google-api-python-client==1.6.2, <1.7',
           'ciso8601',
-          'httplib2',
-          'pycurl >=7.19.5.1, <7.21.5',
-          'python-gflags<3.0',
+          'httplib2 >= 0.9.2',
+          'pycurl >=7.19.5.1',
           'setuptools',
           'ws4py<0.4',
-          'ruamel.yaml==0.13.7'
+          'ruamel.yaml>=0.13.7'
       ],
       test_suite='tests',
       tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
diff --git a/sdk/python/tests/test_arv_get.py b/sdk/python/tests/test_arv_get.py
new file mode 100644 (file)
index 0000000..907c671
--- /dev/null
@@ -0,0 +1,108 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import shutil
+import tempfile
+
+import arvados
+import arvados.collection as collection
+import arvados.commands.get as arv_get
+import run_test_server
+
+from arvados_testutil import redirected_streams
+
+class ArvadosGetTestCase(run_test_server.TestCaseWithServers):
+    MAIN_SERVER = {}
+    KEEP_SERVER = {}
+
+    def setUp(self):
+        super(ArvadosGetTestCase, self).setUp()
+        self.tempdir = tempfile.mkdtemp()
+        self.col_loc, self.col_pdh, self.col_manifest = self.write_test_collection()
+
+    def tearDown(self):
+        super(ArvadosGetTestCase, self).tearDown()
+        shutil.rmtree(self.tempdir)
+
+    def write_test_collection(self,
+                              contents = {
+                                  'foo.txt' : 'foo',
+                                  'bar.txt' : 'bar',
+                                  'subdir/baz.txt' : 'baz',
+                              }):
+        c = collection.Collection()
+        for path, data in contents.items():
+            with c.open(path, 'w') as f:
+                f.write(data)
+        c.save_new()
+        return (c.manifest_locator(), c.portable_data_hash(), c.manifest_text())
+    
+    def run_get(self, args):
+        self.stdout = io.BytesIO()
+        self.stderr = io.BytesIO()
+        return arv_get.main(args, self.stdout, self.stderr)
+
+    def test_version_argument(self):
+        err = io.BytesIO()
+        out = io.BytesIO()
+        with redirected_streams(stdout=out, stderr=err):
+            with self.assertRaises(SystemExit):
+                self.run_get(['--version'])
+        self.assertEqual(out.getvalue(), '')
+        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
+    def test_get_single_file(self):
+        # Get the file using the collection's locator
+        r = self.run_get(["{}/subdir/baz.txt".format(self.col_loc), '-'])
+        self.assertEqual(0, r)
+        self.assertEqual('baz', self.stdout.getvalue())
+        # Then, try by PDH
+        r = self.run_get(["{}/subdir/baz.txt".format(self.col_pdh), '-'])
+        self.assertEqual(0, r)
+        self.assertEqual('baz', self.stdout.getvalue())        
+
+    def test_get_multiple_files(self):
+        # Download the entire collection to the temp directory
+        r = self.run_get(["{}/".format(self.col_loc), self.tempdir])
+        self.assertEqual(0, r)
+        with open("{}/foo.txt".format(self.tempdir), "r") as f:
+            self.assertEqual("foo", f.read())
+        with open("{}/bar.txt".format(self.tempdir), "r") as f:
+            self.assertEqual("bar", f.read())
+        with open("{}/subdir/baz.txt".format(self.tempdir), "r") as f:
+            self.assertEqual("baz", f.read())
+
+    def test_get_collection_manifest(self):
+        # Get the collection manifest
+        r = self.run_get([self.col_loc, self.tempdir])
+        self.assertEqual(0, r)
+        with open("{}/{}".format(self.tempdir, self.col_loc), "r") as f:
+            self.assertEqual(self.col_manifest, f.read())
+
+    def test_invalid_collection(self):
+        # Asking for an invalid collection should generate an error.
+        r = self.run_get(['this-uuid-seems-to-be-fake', self.tempdir])
+        self.assertNotEqual(0, r)
+
+    def test_invalid_file_request(self):
+        # Asking for an inexistant file within a collection should generate an error.
+        r = self.run_get(["{}/im-not-here.txt".format(self.col_loc), self.tempdir])
+        self.assertNotEqual(0, r)
+
+    def test_invalid_destination(self):
+        # Asking to place the collection's files on a non existant directory
+        # should generate an error.
+        r = self.run_get([self.col_loc, "/fake/subdir/"])
+        self.assertNotEqual(0, r)
+
+    def test_preexistent_destination(self):
+        # Asking to place a file with the same path as a local one should
+        # generate an error and avoid overwrites.
+        with open("{}/foo.txt".format(self.tempdir), "w") as f:
+            f.write("another foo")
+        r = self.run_get(["{}/foo.txt".format(self.col_loc), self.tempdir])
+        self.assertNotEqual(0, r)
+        with open("{}/foo.txt".format(self.tempdir), "r") as f:
+            self.assertEqual("another foo", f.read())
+
index 5064f07d722ee77efc0c8a4f733eaf86d02b8b39..99b551082f8c7399500e9b71f3338050f33fea02 100644 (file)
@@ -35,10 +35,10 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers):
         api_client.collections().get().execute.return_value = coll_info
         return coll_info, api_client
 
-    def run_ls(self, args, api_client):
+    def run_ls(self, args, api_client, logger=None):
         self.stdout = io.BytesIO()
         self.stderr = io.BytesIO()
-        return arv_ls.main(args, self.stdout, self.stderr, api_client)
+        return arv_ls.main(args, self.stdout, self.stderr, api_client, logger)
 
     def test_plain_listing(self):
         collection, api_client = self.mock_api_for_manifest(
@@ -76,10 +76,13 @@ class ArvLsTestCase(run_test_server.TestCaseWithServers):
 
     def test_locator_failure(self):
         api_client = mock.MagicMock(name='mock_api_client')
+        error_mock = mock.MagicMock()
+        logger = mock.MagicMock()
+        logger.error = error_mock
         api_client.collections().get().execute.side_effect = (
             arv_error.NotFoundError)
-        self.assertNotEqual(0, self.run_ls([self.FAKE_UUID], api_client))
-        self.assertNotEqual('', self.stderr.getvalue())
+        self.assertNotEqual(0, self.run_ls([self.FAKE_UUID], api_client, logger))
+        self.assertEqual(1, error_mock.call_count)
 
     def test_version_argument(self):
         err = io.BytesIO()
index baa60bfa17b6f4a7ec2b180bc635dc3ef6da95f2..4d68b401ee2c219cce872ce75d6ec81a6a678108 100644 (file)
@@ -2,14 +2,16 @@ from __future__ import print_function
 
 import md5
 import mock
-import shutil
+import os
 import random
+import shutil
 import sys
 import tempfile
 import threading
 import unittest
 
 import arvados.cache
+import arvados
 import run_test_server
 
 
@@ -47,6 +49,17 @@ class CacheTest(unittest.TestCase):
     def tearDown(self):
         shutil.rmtree(self._dir)
 
+    def test_cache_create_error(self):
+        _, filename = tempfile.mkstemp()
+        home_was = os.environ['HOME']
+        os.environ['HOME'] = filename
+        try:
+            c = arvados.http_cache('test')
+            self.assertEqual(None, c)
+        finally:
+            os.environ['HOME'] = home_was
+            os.unlink(filename)
+
     def test_cache_crud(self):
         c = arvados.cache.SafeHTTPCache(self._dir, max_age=0)
         url = 'https://example.com/foo?bar=baz'
index 84e24f4d3ba13acf5728b0c8c0cdfae1634105e7..264af9fb363b8a0f73672e5ff145250daf7e0527 100644 (file)
@@ -27,7 +27,7 @@ Gem::Specification.new do |s|
   s.add_dependency('google-api-client', '>= 0.7', '< 0.8.9')
   # work around undeclared dependency on i18n in some activesupport 3.x.x:
   s.add_dependency('i18n', '~> 0')
-  s.add_dependency('json', '~> 1.7', '>= 1.7.7')
+  s.add_dependency('json', '>= 1.7.7', '<3')
   s.add_runtime_dependency('jwt', '<2', '>= 0.1.5')
   s.homepage    =
     'https://arvados.org'
index 2072520bb389449180eed9f350eff7f5183e76a2..71fb365fc686f7feaa519c9c9ce467ec426b1d29 100644 (file)
@@ -51,8 +51,6 @@ class ApplicationController < ActionController::Base
 
   attr_writer :resource_attrs
 
-  MAX_UNIQUE_NAME_ATTEMPTS = 10
-
   begin
     rescue_from(Exception,
                 ArvadosModel::PermissionDeniedError,
@@ -99,50 +97,12 @@ class ApplicationController < ActionController::Base
   def create
     @object = model_class.new resource_attrs
 
-    if @object.respond_to? :name and params[:ensure_unique_name]
-      # Record the original name.  See below.
-      name_stem = @object.name
-      retries = MAX_UNIQUE_NAME_ATTEMPTS
+    if @object.respond_to?(:name) && params[:ensure_unique_name]
+      @object.save_with_unique_name!
     else
-      retries = 0
-    end
-
-    begin
       @object.save!
-    rescue ActiveRecord::RecordNotUnique => rn
-      raise unless retries > 0
-      retries -= 1
-
-      # Dig into the error to determine if it is specifically calling out a
-      # (owner_uuid, name) uniqueness violation.  In this specific case, and
-      # the client requested a unique name with ensure_unique_name==true,
-      # update the name field and try to save again.  Loop as necessary to
-      # discover a unique name.  It is necessary to handle name choosing at
-      # this level (as opposed to the client) to ensure that record creation
-      # never fails due to a race condition.
-      raise unless rn.original_exception.is_a? PG::UniqueViolation
-
-      # Unfortunately ActiveRecord doesn't abstract out any of the
-      # necessary information to figure out if this the error is actually
-      # the specific case where we want to apply the ensure_unique_name
-      # behavior, so the following code is specialized to Postgres.
-      err = rn.original_exception
-      detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
-      raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
-
-      @object.uuid = nil
-
-      new_name = "#{name_stem} (#{db_current_time.utc.iso8601(3)})"
-      if new_name == @object.name
-        # If the database is fast enough to do two attempts in the
-        # same millisecond, we need to wait to ensure we try a
-        # different timestamp on each attempt.
-        sleep 0.002
-        new_name = "#{name_stem} (#{db_current_time.utc.iso8601(3)})"
-      end
-      @object.name = new_name
-      retry
     end
+
     show
   end
 
index 5e2404e62c4db63360ddf91f5a6f1c801c763ce3..b308c183291968d9ed2aa609216dc574d0b92efc 100644 (file)
@@ -46,10 +46,12 @@ class Arvados::V1::NodesController < ApplicationController
       @objects = model_class.where('last_ping_at >= ?', db_current_time - 1.hours)
     end
     super
-    job_uuids = @objects.map { |n| n[:job_uuid] }.compact
-    assoc_jobs = readable_job_uuids(job_uuids)
-    @objects.each do |node|
-      node.job_readable = assoc_jobs.include?(node[:job_uuid])
+    if @select.nil? or @select.include? 'job_uuid'
+      job_uuids = @objects.map { |n| n[:job_uuid] }.compact
+      assoc_jobs = readable_job_uuids(job_uuids)
+      @objects.each do |node|
+        node.job_readable = assoc_jobs.include?(node[:job_uuid])
+      end
     end
   end
 
index 0419dadafba891995156b7f5c286f1955a9fbb8e..96dc85e1e44138485899942a084e8095d3eb9182 100644 (file)
@@ -243,6 +243,57 @@ class ArvadosModel < ActiveRecord::Base
           permission_link_classes: ['permission', 'resources'])
   end
 
+  def save_with_unique_name!
+    uuid_was = uuid
+    name_was = name
+    max_retries = 2
+    transaction do
+      conn = ActiveRecord::Base.connection
+      conn.exec_query 'SAVEPOINT save_with_unique_name'
+      begin
+        save!
+      rescue ActiveRecord::RecordNotUnique => rn
+        raise if max_retries == 0
+        max_retries -= 1
+
+        conn.exec_query 'ROLLBACK TO SAVEPOINT save_with_unique_name'
+
+        # Dig into the error to determine if it is specifically calling out a
+        # (owner_uuid, name) uniqueness violation.  In this specific case, and
+        # the client requested a unique name with ensure_unique_name==true,
+        # update the name field and try to save again.  Loop as necessary to
+        # discover a unique name.  It is necessary to handle name choosing at
+        # this level (as opposed to the client) to ensure that record creation
+        # never fails due to a race condition.
+        err = rn.original_exception
+        raise unless err.is_a?(PG::UniqueViolation)
+
+        # Unfortunately ActiveRecord doesn't abstract out any of the
+        # necessary information to figure out if this the error is actually
+        # the specific case where we want to apply the ensure_unique_name
+        # behavior, so the following code is specialized to Postgres.
+        detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
+        raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
+
+        new_name = "#{name_was} (#{db_current_time.utc.iso8601(3)})"
+        if new_name == name
+          # If the database is fast enough to do two attempts in the
+          # same millisecond, we need to wait to ensure we try a
+          # different timestamp on each attempt.
+          sleep 0.002
+          new_name = "#{name_was} (#{db_current_time.utc.iso8601(3)})"
+        end
+
+        self[:name] = new_name
+        self[:uuid] = nil if uuid_was.nil? && !uuid.nil?
+        conn.exec_query 'SAVEPOINT save_with_unique_name'
+        retry
+      ensure
+        conn.exec_query 'RELEASE SAVEPOINT save_with_unique_name'
+      end
+    end
+  end
+
   def logged_attributes
     attributes.except(*Rails.configuration.unlogged_attributes)
   end
@@ -335,36 +386,31 @@ class ArvadosModel < ActiveRecord::Base
       raise PermissionDeniedError
     end
 
-    # Verify "write" permission on old owner
-    # default fail unless one of:
-    # owner_uuid did not change
-    # previous owner_uuid is nil
-    # current user is the old owner
-    # current user is this object
-    # current user can_write old owner
-    unless !owner_uuid_changed? or
-        owner_uuid_was.nil? or
-        current_user.uuid == self.owner_uuid_was or
-        current_user.uuid == self.uuid or
-        current_user.can? write: self.owner_uuid_was
-      logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{uuid} but does not have permission to write old owner_uuid #{owner_uuid_was}"
-      errors.add :owner_uuid, "cannot be changed without write permission on old owner"
-      raise PermissionDeniedError
-    end
-
-    # Verify "write" permission on new owner
-    # default fail unless one of:
-    # current_user is this object
-    # current user can_write new owner, or this object if owner unchanged
-    if new_record? or owner_uuid_changed? or is_a?(ApiClientAuthorization)
-      write_target = owner_uuid
+    if new_record? || owner_uuid_changed?
+      # Permission on owner_uuid_was is needed to move an existing
+      # object away from its previous owner (which implies permission
+      # to modify this object itself, so we don't need to check that
+      # separately). Permission on the new owner_uuid is also needed.
+      [['old', owner_uuid_was],
+       ['new', owner_uuid]
+      ].each do |which, check_uuid|
+        if check_uuid.nil?
+          # old_owner_uuid is nil? New record, no need to check.
+        elsif !current_user.can?(write: check_uuid)
+          logger.warn "User #{current_user.uuid} tried to set ownership of #{self.class.to_s} #{self.uuid} but does not have permission to write #{which} owner_uuid #{check_uuid}"
+          errors.add :owner_uuid, "cannot be set or changed without write permission on #{which} owner"
+          raise PermissionDeniedError
+        end
+      end
     else
-      write_target = uuid
-    end
-    unless current_user == self or current_user.can? write: write_target
-      logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{uuid} but does not have permission to write new owner_uuid #{owner_uuid}"
-      errors.add :owner_uuid, "cannot be changed without write permission on new owner"
-      raise PermissionDeniedError
+      # If the object already existed and we're not changing
+      # owner_uuid, we only need write permission on the object
+      # itself.
+      if !current_user.can?(write: self.uuid)
+        logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{self.uuid} without write permission"
+        errors.add :uuid, "is not writable"
+        raise PermissionDeniedError
+      end
     end
 
     true
index b45c178fb498af1eb588c55982ad38f2bfdeb70e..4a1e610303189a28ae3c029836f9ece92e50c4e0 100644 (file)
@@ -510,33 +510,49 @@ class Collection < ArvadosModel
     true
   end
 
-  # If trash_at is updated without touching delete_at, automatically
-  # update delete_at to a sensible value.
   def default_trash_interval
     if trash_at_changed? && !delete_at_changed?
+      # If trash_at is updated without touching delete_at,
+      # automatically update delete_at to a sensible value.
       if trash_at.nil?
         self.delete_at = nil
       else
         self.delete_at = trash_at + Rails.configuration.default_trash_lifetime.seconds
       end
+    elsif !trash_at || !delete_at || trash_at > delete_at
+      # Not trash, or bogus arguments? Just validate in
+      # validate_trash_and_delete_timing.
+    elsif delete_at_changed? && delete_at >= trash_at
+      # Fix delete_at if needed, so it's not earlier than the expiry
+      # time on any permission tokens that might have been given out.
+
+      # In any case there are no signatures expiring after now+TTL.
+      # Also, if the existing trash_at time has already passed, we
+      # know we haven't given out any signatures since then.
+      earliest_delete = [
+        @validation_timestamp,
+        trash_at_was,
+      ].compact.min + Rails.configuration.blob_signature_ttl.seconds
+
+      # The previous value of delete_at is also an upper bound on the
+      # longest-lived permission token. For example, if TTL=14,
+      # trash_at_was=now-7, delete_at_was=now+7, then it is safe to
+      # set trash_at=now+6, delete_at=now+8.
+      earliest_delete = [earliest_delete, delete_at_was].compact.min
+
+      # If delete_at is too soon, use the earliest possible time.
+      if delete_at < earliest_delete
+        self.delete_at = earliest_delete
+      end
     end
   end
 
   def validate_trash_and_delete_timing
     if trash_at.nil? != delete_at.nil?
       errors.add :delete_at, "must be set if trash_at is set, and must be nil otherwise"
-    end
-
-    earliest_delete = ([@validation_timestamp, trash_at_was].compact.min +
-                       Rails.configuration.blob_signature_ttl.seconds)
-    if delete_at && delete_at < earliest_delete
-      errors.add :delete_at, "#{delete_at} is too soon: earliest allowed is #{earliest_delete}"
-    end
-
-    if delete_at && delete_at < trash_at
+    elsif delete_at && delete_at < trash_at
       errors.add :delete_at, "must not be earlier than trash_at"
     end
-
     true
   end
 end
index a3cc9c107191a2e1b8dbeb87e9624648a17f980e..9420ef3cb88be716b2313fbf2f89657f9e28c8c4 100644 (file)
@@ -82,15 +82,102 @@ class Container < ArvadosModel
     end
   end
 
+  # Create a new container (or find an existing one) to satisfy the
+  # given container request.
+  def self.resolve(req)
+    c_attrs = {
+      command: req.command,
+      cwd: req.cwd,
+      environment: req.environment,
+      output_path: req.output_path,
+      container_image: resolve_container_image(req.container_image),
+      mounts: resolve_mounts(req.mounts),
+      runtime_constraints: resolve_runtime_constraints(req.runtime_constraints),
+      scheduling_parameters: req.scheduling_parameters,
+    }
+    act_as_system_user do
+      if req.use_existing && (reusable = find_reusable(c_attrs))
+        reusable
+      else
+        Container.create!(c_attrs)
+      end
+    end
+  end
+
+  # Return a runtime_constraints hash that complies with requested but
+  # is suitable for saving in a container record, i.e., has specific
+  # values instead of ranges.
+  #
+  # Doing this as a step separate from other resolutions, like "git
+  # revision range to commit hash", makes sense only when there is no
+  # opportunity to reuse an existing container (e.g., container reuse
+  # is not implemented yet, or we have already found that no existing
+  # containers are suitable).
+  def self.resolve_runtime_constraints(runtime_constraints)
+    rc = {}
+    defaults = {
+      'keep_cache_ram' =>
+      Rails.configuration.container_default_keep_cache_ram,
+    }
+    defaults.merge(runtime_constraints).each do |k, v|
+      if v.is_a? Array
+        rc[k] = v[0]
+      else
+        rc[k] = v
+      end
+    end
+    rc
+  end
+
+  # Return a mounts hash suitable for a Container, i.e., with every
+  # readonly collection UUID resolved to a PDH.
+  def self.resolve_mounts(mounts)
+    c_mounts = {}
+    mounts.each do |k, mount|
+      mount = mount.dup
+      c_mounts[k] = mount
+      if mount['kind'] != 'collection'
+        next
+      end
+      if (uuid = mount.delete 'uuid')
+        c = Collection.
+          readable_by(current_user).
+          where(uuid: uuid).
+          select(:portable_data_hash).
+          first
+        if !c
+          raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
+        end
+        if mount['portable_data_hash'].nil?
+          # PDH not supplied by client
+          mount['portable_data_hash'] = c.portable_data_hash
+        elsif mount['portable_data_hash'] != c.portable_data_hash
+          # UUID and PDH supplied by client, but they don't agree
+          raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
+        end
+      end
+    end
+    return c_mounts
+  end
+
+  # Return a container_image PDH suitable for a Container.
+  def self.resolve_container_image(container_image)
+    coll = Collection.for_latest_docker_image(container_image)
+    if !coll
+      raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
+    end
+    coll.portable_data_hash
+  end
+
   def self.find_reusable(attrs)
     candidates = Container.
       where_serialized(:command, attrs[:command]).
       where('cwd = ?', attrs[:cwd]).
       where_serialized(:environment, attrs[:environment]).
       where('output_path = ?', attrs[:output_path]).
-      where('container_image = ?', attrs[:container_image]).
-      where_serialized(:mounts, attrs[:mounts]).
-      where_serialized(:runtime_constraints, attrs[:runtime_constraints])
+      where('container_image = ?', resolve_container_image(attrs[:container_image])).
+      where_serialized(:mounts, resolve_mounts(attrs[:mounts])).
+      where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]))
 
     # Check for Completed candidates whose output and log are both readable.
     select_readable_pdh = Collection.
index 87c3ebed30184f822839b35443e1d7808e62fa10..628ef886be99f06cdad61e95de4719ef0201b1ee 100644 (file)
@@ -18,8 +18,9 @@ class ContainerRequest < ArvadosModel
   before_validation :validate_scheduling_parameters
   before_validation :set_container
   validates :command, :container_image, :output_path, :cwd, :presence => true
+  validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
   validate :validate_state_change
-  validate :validate_change
+  validate :check_update_whitelist
   after_save :update_priority
   after_save :finalize_if_needed
   before_create :set_requesting_container_uuid
@@ -41,6 +42,7 @@ class ContainerRequest < ArvadosModel
     t.add :output_name
     t.add :output_path
     t.add :output_uuid
+    t.add :output_ttl
     t.add :priority
     t.add :properties
     t.add :requesting_container_uuid
@@ -64,6 +66,13 @@ class ContainerRequest < ArvadosModel
     Committed => [Final]
   }
 
+  AttrsPermittedAlways = [:owner_uuid, :state, :name, :description]
+  AttrsPermittedBeforeCommit = [:command, :container_count_max,
+  :container_image, :cwd, :environment, :filters, :mounts,
+  :output_path, :priority, :properties, :requesting_container_uuid,
+  :runtime_constraints, :state, :container_uuid, :use_existing,
+  :scheduling_parameters, :output_name, :output_ttl]
+
   def state_transitions
     State_transitions
   end
@@ -91,41 +100,31 @@ class ContainerRequest < ArvadosModel
     ['output', 'log'].each do |out_type|
       pdh = c.send(out_type)
       next if pdh.nil?
-      if self.output_name and out_type == 'output'
-        coll_name = self.output_name
-      else
-        coll_name = "Container #{out_type} for request #{uuid}"
+      coll_name = "Container #{out_type} for request #{uuid}"
+      trash_at = nil
+      if out_type == 'output'
+        if self.output_name
+          coll_name = self.output_name
+        end
+        if self.output_ttl > 0
+          trash_at = db_current_time + self.output_ttl
+        end
       end
       manifest = Collection.unscoped do
         Collection.where(portable_data_hash: pdh).first.manifest_text
       end
-      begin
-        coll = Collection.create!(owner_uuid: owner_uuid,
-                                  manifest_text: manifest,
-                                  portable_data_hash: pdh,
-                                  name: coll_name,
-                                  properties: {
-                                    'type' => out_type,
-                                    'container_request' => uuid,
-                                  })
-      rescue ActiveRecord::RecordNotUnique => rn
-        # In case this is executed as part of a transaction: When a Postgres exception happens,
-        # the following statements on the same transaction become invalid, so a rollback is
-        # needed. One example are Unit Tests, every test is enclosed inside a transaction so
-        # that the database can be reverted before every new test starts.
-        # See: http://api.rubyonrails.org/classes/ActiveRecord/Transactions/ClassMethods.html#module-ActiveRecord::Transactions::ClassMethods-label-Exception+handling+and+rolling+back
-        ActiveRecord::Base.connection.execute 'ROLLBACK'
-        raise unless out_type == 'output' and self.output_name
-        # Postgres specific unique name check. See ApplicationController#create for
-        # a detailed explanation.
-        raise unless rn.original_exception.is_a? PG::UniqueViolation
-        err = rn.original_exception
-        detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
-        raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
-        # Output collection name collision detected: append a timestamp.
-        coll_name = "#{self.output_name} #{Time.now.getgm.strftime('%FT%TZ')}"
-        retry
-      end
+
+      coll = Collection.new(owner_uuid: owner_uuid,
+                            manifest_text: manifest,
+                            portable_data_hash: pdh,
+                            name: coll_name,
+                            trash_at: trash_at,
+                            delete_at: trash_at,
+                            properties: {
+                              'type' => out_type,
+                              'container_request' => uuid,
+                            })
+      coll.save_with_unique_name!
       if out_type == 'output'
         out_coll = coll.uuid
       else
@@ -149,93 +148,7 @@ class ContainerRequest < ArvadosModel
     self.cwd ||= "."
     self.container_count_max ||= Rails.configuration.container_count_max
     self.scheduling_parameters ||= {}
-  end
-
-  # Create a new container (or find an existing one) to satisfy this
-  # request.
-  def resolve
-    c_mounts = mounts_for_container
-    c_runtime_constraints = runtime_constraints_for_container
-    c_container_image = container_image_for_container
-    c = act_as_system_user do
-      c_attrs = {command: self.command,
-                 cwd: self.cwd,
-                 environment: self.environment,
-                 output_path: self.output_path,
-                 container_image: c_container_image,
-                 mounts: c_mounts,
-                 runtime_constraints: c_runtime_constraints}
-
-      reusable = self.use_existing ? Container.find_reusable(c_attrs) : nil
-      if not reusable.nil?
-        reusable
-      else
-        c_attrs[:scheduling_parameters] = self.scheduling_parameters
-        Container.create!(c_attrs)
-      end
-    end
-    self.container_uuid = c.uuid
-  end
-
-  # Return a runtime_constraints hash that complies with
-  # self.runtime_constraints but is suitable for saving in a container
-  # record, i.e., has specific values instead of ranges.
-  #
-  # Doing this as a step separate from other resolutions, like "git
-  # revision range to commit hash", makes sense only when there is no
-  # opportunity to reuse an existing container (e.g., container reuse
-  # is not implemented yet, or we have already found that no existing
-  # containers are suitable).
-  def runtime_constraints_for_container
-    rc = {}
-    runtime_constraints.each do |k, v|
-      if v.is_a? Array
-        rc[k] = v[0]
-      else
-        rc[k] = v
-      end
-    end
-    rc
-  end
-
-  # Return a mounts hash suitable for a Container, i.e., with every
-  # readonly collection UUID resolved to a PDH.
-  def mounts_for_container
-    c_mounts = {}
-    mounts.each do |k, mount|
-      mount = mount.dup
-      c_mounts[k] = mount
-      if mount['kind'] != 'collection'
-        next
-      end
-      if (uuid = mount.delete 'uuid')
-        c = Collection.
-          readable_by(current_user).
-          where(uuid: uuid).
-          select(:portable_data_hash).
-          first
-        if !c
-          raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
-        end
-        if mount['portable_data_hash'].nil?
-          # PDH not supplied by client
-          mount['portable_data_hash'] = c.portable_data_hash
-        elsif mount['portable_data_hash'] != c.portable_data_hash
-          # UUID and PDH supplied by client, but they don't agree
-          raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
-        end
-      end
-    end
-    return c_mounts
-  end
-
-  # Return a container_image PDH suitable for a Container.
-  def container_image_for_container
-    coll = Collection.for_latest_docker_image(container_image)
-    if !coll
-      raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
-    end
-    coll.portable_data_hash
+    self.output_ttl ||= 0
   end
 
   def set_container
@@ -246,7 +159,7 @@ class ContainerRequest < ArvadosModel
       return false
     end
     if state_changed? and state == Committed and container_uuid.nil?
-      resolve
+      self.container_uuid = Container.resolve(self).uuid
     end
     if self.container_uuid != self.container_uuid_was
       if self.container_count_changed?
@@ -261,20 +174,17 @@ class ContainerRequest < ArvadosModel
   def validate_runtime_constraints
     case self.state
     when Committed
-      ['vcpus', 'ram'].each do |k|
-        if not (runtime_constraints.include? k and
-                runtime_constraints[k].is_a? Integer and
-                runtime_constraints[k] > 0)
-          errors.add :runtime_constraints, "#{k} must be a positive integer"
+      [['vcpus', true],
+       ['ram', true],
+       ['keep_cache_ram', false]].each do |k, required|
+        if !required && !runtime_constraints.include?(k)
+          next
+        end
+        v = runtime_constraints[k]
+        unless (v.is_a?(Integer) && v > 0)
+          errors.add(:runtime_constraints,
+                     "[#{k}]=#{v.inspect} must be a positive integer")
         end
-      end
-
-      if runtime_constraints.include? 'keep_cache_ram' and
-         (!runtime_constraints['keep_cache_ram'].is_a?(Integer) or
-          runtime_constraints['keep_cache_ram'] <= 0)
-            errors.add :runtime_constraints, "keep_cache_ram must be a positive integer"
-      elsif !runtime_constraints.include? 'keep_cache_ram'
-        runtime_constraints['keep_cache_ram'] = Rails.configuration.container_default_keep_cache_ram
       end
     end
   end
@@ -290,57 +200,45 @@ class ContainerRequest < ArvadosModel
     end
   end
 
-  def validate_change
-    permitted = [:owner_uuid]
+  def check_update_whitelist
+    permitted = AttrsPermittedAlways.dup
 
-    case self.state
-    when Uncommitted
-      # Permit updating most fields
-      permitted.push :command, :container_count_max,
-                     :container_image, :cwd, :description, :environment,
-                     :filters, :mounts, :name, :output_path, :priority,
-                     :properties, :requesting_container_uuid, :runtime_constraints,
-                     :state, :container_uuid, :use_existing, :scheduling_parameters,
-                     :output_name
+    if self.new_record? || self.state_was == Uncommitted
+      # Allow create-and-commit in a single operation.
+      permitted.push *AttrsPermittedBeforeCommit
+    end
 
+    case self.state
     when Committed
-      if container_uuid.nil?
-        errors.add :container_uuid, "has not been resolved to a container."
-      end
+      permitted.push :priority, :container_count_max, :container_uuid
 
-      if priority.nil?
-        errors.add :priority, "cannot be nil"
+      if self.container_uuid.nil?
+        self.errors.add :container_uuid, "has not been resolved to a container."
       end
 
-      # Can update priority, container count, name and description
-      permitted.push :priority, :container_count, :container_count_max, :container_uuid,
-                     :name, :description
+      if self.priority.nil?
+        self.errors.add :priority, "cannot be nil"
+      end
 
-      if self.state_changed?
-        # Allow create-and-commit in a single operation.
-        permitted.push :command, :container_image, :cwd, :description, :environment,
-                       :filters, :mounts, :name, :output_path, :properties,
-                       :requesting_container_uuid, :runtime_constraints,
-                       :state, :container_uuid, :use_existing, :scheduling_parameters,
-                       :output_name
+      # Allow container count to increment by 1
+      if (self.container_uuid &&
+          self.container_uuid != self.container_uuid_was &&
+          self.container_count == 1 + (self.container_count_was || 0))
+        permitted.push :container_count
       end
 
     when Final
-      if not current_user.andand.is_admin and not (self.name_changed? || self.description_changed?)
-        errors.add :state, "of container request can only be set to Final by system."
+      if self.state_changed? and not current_user.andand.is_admin
+        self.errors.add :state, "of container request can only be set to Final by system."
       end
 
-      if self.state_changed? || self.name_changed? || self.description_changed? || self.output_uuid_changed? || self.log_uuid_changed?
-          permitted.push :state, :name, :description, :output_uuid, :log_uuid
-      else
-        errors.add :state, "does not allow updates"
+      if self.state_was == Committed
+        permitted.push :output_uuid, :log_uuid
       end
 
-    else
-      errors.add :state, "invalid value"
     end
 
-    check_update_whitelist permitted
+    super(permitted)
   end
 
   def update_priority
index c7ac090adc3b9d7bc6b265707ac0c3b4973521e7..82ea0acbd63d9b12e2b31cd4d35fec783b869b59 100644 (file)
@@ -1,3 +1,5 @@
+require 'tempfile'
+
 class Node < ArvadosModel
   include HasUuid
   include KindAndEtag
@@ -171,22 +173,30 @@ class Node < ArvadosModel
     }
 
     if Rails.configuration.dns_server_conf_dir and Rails.configuration.dns_server_conf_template
+      tmpfile = nil
       begin
         begin
           template = IO.read(Rails.configuration.dns_server_conf_template)
-        rescue => e
+        rescue IOError, SystemCallError => e
           logger.error "Reading #{Rails.configuration.dns_server_conf_template}: #{e.message}"
           raise
         end
 
         hostfile = File.join Rails.configuration.dns_server_conf_dir, "#{hostname}.conf"
-        File.open hostfile+'.tmp', 'w' do |f|
+        Tempfile.open(["#{hostname}-", ".conf.tmp"],
+                                 Rails.configuration.dns_server_conf_dir) do |f|
+          tmpfile = f.path
           f.puts template % template_vars
         end
-        File.rename hostfile+'.tmp', hostfile
-      rescue => e
+        File.rename tmpfile, hostfile
+      rescue IOError, SystemCallError => e
         logger.error "Writing #{hostfile}: #{e.message}"
         ok = false
+      ensure
+        if tmpfile and File.file? tmpfile
+          # Cleanup remaining temporary file.
+          File.unlink tmpfile
+        end
       end
     end
 
@@ -205,7 +215,7 @@ class Node < ArvadosModel
           # Typically, this is used to trigger a dns server restart
           f.puts Rails.configuration.dns_server_reload_command
         end
-      rescue => e
+      rescue IOError, SystemCallError => e
         logger.error "Unable to write #{restartfile}: #{e.message}"
         ok = false
       end
diff --git a/services/api/db/migrate/20170330012505_add_output_ttl_to_container_requests.rb b/services/api/db/migrate/20170330012505_add_output_ttl_to_container_requests.rb
new file mode 100644 (file)
index 0000000..ee6fa37
--- /dev/null
@@ -0,0 +1,5 @@
+class AddOutputTtlToContainerRequests < ActiveRecord::Migration
+  def change
+    add_column :container_requests, :output_ttl, :integer, default: 0, null: false
+  end
+end
index d877452f200178c4df67610ce57a7cfc9dd0f236..e25a2a960571f21b823f14d4d45e619fa2c0ae9a 100644 (file)
@@ -297,7 +297,8 @@ CREATE TABLE container_requests (
     scheduling_parameters text,
     output_uuid character varying(255),
     log_uuid character varying(255),
-    output_name character varying(255) DEFAULT NULL::character varying
+    output_name character varying(255) DEFAULT NULL::character varying,
+    output_ttl integer DEFAULT 0 NOT NULL
 );
 
 
@@ -2753,4 +2754,6 @@ INSERT INTO schema_migrations (version) VALUES ('20170216170823');
 
 INSERT INTO schema_migrations (version) VALUES ('20170301225558');
 
-INSERT INTO schema_migrations (version) VALUES ('20170328215436');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20170328215436');
+
+INSERT INTO schema_migrations (version) VALUES ('20170330012505');
\ No newline at end of file
index 428c663a77b92ab2fc679ef20bbcf6b8fcc0fd69..b4ddd78554c8e7c85f4ad6ee0a3514de048f074d 100644 (file)
@@ -17,8 +17,8 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     authorize_with :inactive
     get :index
     assert_response :success
-    node_items = JSON.parse(@response.body)['items']
-    assert_equal 0, node_items.size
+    assert_equal 0, json_response['items'].size
+    assert_equal 0, json_response['items_available']
   end
 
   # active user sees non-secret attributes of up and recently-up nodes
@@ -26,8 +26,9 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     authorize_with :active
     get :index
     assert_response :success
-    node_items = JSON.parse(@response.body)['items']
-    assert_not_equal 0, node_items.size
+    assert_operator 0, :<, json_response['items_available']
+    node_items = json_response['items']
+    assert_operator 0, :<, node_items.size
     found_busy_node = false
     node_items.each do |node|
       assert_nil node['info'].andand['ping_secret']
@@ -113,6 +114,7 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
       authorize_with user
       get :index, {select: ['domain']}
       assert_response :success
+      assert_operator 0, :<, json_response['items_available']
     end
   end
 
index 579b8cc6d05256a88086cd0a50592e3d8a1afaa7..98643a9e74b99b7342b36f34e1b803949e773cbf 100644 (file)
@@ -6,7 +6,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
   include UsersTestHelper
 
   setup do
-    @all_links_at_start = Link.all
+    @initial_link_count = Link.count
     @vm_uuid = virtual_machines(:testvm).uuid
   end
 
@@ -107,7 +107,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
     assert_nil created['identity_url'], 'expected no identity_url'
 
     # arvados#user, repo link and link add user to 'All users' group
-    verify_num_links @all_links_at_start, 4
+    verify_links_added 4
 
     verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
         created['uuid'], created['email'], 'arvados#user', false, 'User'
@@ -269,7 +269,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
     assert_equal response_object['email'], 'foo@example.com', 'expected given email'
 
     # four extra links; system_group, login, group and repo perms
-    verify_num_links @all_links_at_start, 4
+    verify_links_added 4
   end
 
   test "setup user with fake vm and expect error" do
@@ -306,7 +306,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
     assert_equal response_object['email'], 'foo@example.com', 'expected given email'
 
     # five extra links; system_group, login, group, vm, repo
-    verify_num_links @all_links_at_start, 5
+    verify_links_added 5
   end
 
   test "setup user with valid email, no vm and no repo as input" do
@@ -324,7 +324,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
     assert_equal response_object['email'], 'foo@example.com', 'expected given email'
 
     # three extra links; system_group, login, and group
-    verify_num_links @all_links_at_start, 3
+    verify_links_added 3
 
     verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
         response_object['uuid'], response_object['email'], 'arvados#user', false, 'User'
@@ -361,7 +361,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
         'expecting first name'
 
     # five extra links; system_group, login, group, repo and vm
-    verify_num_links @all_links_at_start, 5
+    verify_links_added 5
   end
 
   test "setup user with an existing user email and check different object is created" do
@@ -384,7 +384,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
         'expected different uuid after create operation'
     assert_equal inactive_user['email'], response_object['email'], 'expected given email'
     # system_group, openid, group, and repo. No vm link.
-    verify_num_links @all_links_at_start, 4
+    verify_links_added 4
   end
 
   test "setup user with openid prefix" do
@@ -412,7 +412,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
 
     # verify links
     # four new links: system_group, arvados#user, repo, and 'All users' group.
-    verify_num_links @all_links_at_start, 4
+    verify_links_added 4
 
     verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
         created['uuid'], created['email'], 'arvados#user', false, 'User'
@@ -472,7 +472,7 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
 
     # five new links: system_group, arvados#user, repo, vm and 'All
     # users' group link
-    verify_num_links @all_links_at_start, 5
+    verify_links_added 5
 
     verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
         created['uuid'], created['email'], 'arvados#user', false, 'User'
@@ -841,9 +841,9 @@ class Arvados::V1::UsersControllerTest < ActionController::TestCase
                  "admin's filtered index did not return inactive user")
   end
 
-  def verify_num_links (original_links, expected_additional_links)
-    assert_equal expected_additional_links, Link.all.size-original_links.size,
-        "Expected #{expected_additional_links.inspect} more links"
+  def verify_links_added more
+    assert_equal @initial_link_count+more, Link.count,
+        "Started with #{@initial_link_count} links, expected #{more} more"
   end
 
   def find_obj_in_resp (response_items, object_type, head_kind=nil)
index 87bec21520a05b27c76e807555ba84b45739500d..882e26059ced5a30df507143e0216e6695bc1e08 100644 (file)
@@ -370,21 +370,26 @@ class CollectionTest < ActiveSupport::TestCase
     end
   end
 
+  now = Time.now
   [['trash-to-delete interval negative',
     :collection_owned_by_active,
-    {trash_at: Time.now+2.weeks, delete_at: Time.now},
+    {trash_at: now+2.weeks, delete_at: now},
     {state: :invalid}],
-   ['trash-to-delete interval too short',
+   ['now-to-delete interval short',
     :collection_owned_by_active,
-    {trash_at: Time.now+3.days, delete_at: Time.now+7.days},
-    {state: :invalid}],
+    {trash_at: now+3.days, delete_at: now+7.days},
+    {state: :trash_future}],
+   ['now-to-delete interval short, trash=delete',
+    :collection_owned_by_active,
+    {trash_at: now+3.days, delete_at: now+3.days},
+    {state: :trash_future}],
    ['trash-to-delete interval ok',
     :collection_owned_by_active,
-    {trash_at: Time.now, delete_at: Time.now+15.days},
+    {trash_at: now, delete_at: now+15.days},
     {state: :trash_now}],
    ['trash-to-delete interval short, but far enough in future',
     :collection_owned_by_active,
-    {trash_at: Time.now+13.days, delete_at: Time.now+15.days},
+    {trash_at: now+13.days, delete_at: now+15.days},
     {state: :trash_future}],
    ['trash by setting is_trashed bool',
     :collection_owned_by_active,
@@ -392,11 +397,11 @@ class CollectionTest < ActiveSupport::TestCase
     {state: :trash_now}],
    ['trash in future by setting just trash_at',
     :collection_owned_by_active,
-    {trash_at: Time.now+1.week},
+    {trash_at: now+1.week},
     {state: :trash_future}],
    ['trash in future by setting trash_at and delete_at',
     :collection_owned_by_active,
-    {trash_at: Time.now+1.week, delete_at: Time.now+4.weeks},
+    {trash_at: now+1.week, delete_at: now+4.weeks},
     {state: :trash_future}],
    ['untrash by clearing is_trashed bool',
     :expired_collection,
@@ -416,7 +421,7 @@ class CollectionTest < ActiveSupport::TestCase
         end
         updates_ok = c.update_attributes(updates)
         expect_valid = expect[:state] != :invalid
-        assert_equal updates_ok, expect_valid, c.errors.full_messages.to_s
+        assert_equal expect_valid, updates_ok, c.errors.full_messages.to_s
         case expect[:state]
         when :invalid
           refute c.valid?
index af1d4b25fdeed5b26d55b17affeb3c552ab01646..a3dd1f9835e14ee40c475b80d41d0ec0521378aa 100644 (file)
@@ -3,6 +3,7 @@ require 'helpers/docker_migration_helper'
 
 class ContainerRequestTest < ActiveSupport::TestCase
   include DockerMigrationHelper
+  include DbCurrentTime
 
   def create_minimal_req! attrs={}
     defaults = {
@@ -123,6 +124,8 @@ class ContainerRequestTest < ActiveSupport::TestCase
 
     cr.reload
 
+    assert_equal({"vcpus" => 2, "ram" => 30}, cr.runtime_constraints)
+
     assert_not_nil cr.container_uuid
     c = Container.find_by_uuid cr.container_uuid
     assert_not_nil c
@@ -310,8 +313,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
     lambda { |resolved| resolved["ram"] == 1234234234 }],
   ].each do |rc, okfunc|
     test "resolve runtime constraint range #{rc} to values" do
-      cr = ContainerRequest.new(runtime_constraints: rc)
-      resolved = cr.send :runtime_constraints_for_container
+      resolved = Container.resolve_runtime_constraints(rc)
       assert(okfunc.call(resolved),
              "container runtime_constraints was #{resolved.inspect}")
     end
@@ -343,10 +345,9 @@ class ContainerRequestTest < ActiveSupport::TestCase
   ].each do |mounts, okfunc|
     test "resolve mounts #{mounts.inspect} to values" do
       set_user_from_auth :active
-      cr = ContainerRequest.new(mounts: mounts)
-      resolved = cr.send :mounts_for_container
+      resolved = Container.resolve_mounts(mounts)
       assert(okfunc.call(resolved),
-             "mounts_for_container returned #{resolved.inspect}")
+             "Container.resolve_mounts returned #{resolved.inspect}")
     end
   end
 
@@ -359,9 +360,8 @@ class ContainerRequestTest < ActiveSupport::TestCase
         "path" => "/foo",
       },
     }
-    cr = ContainerRequest.new(mounts: m)
     assert_raises(ArvadosModel::UnresolvableContainerError) do
-      cr.send :mounts_for_container
+      Container.resolve_mounts(m)
     end
   end
 
@@ -375,9 +375,8 @@ class ContainerRequestTest < ActiveSupport::TestCase
         "path" => "/foo",
       },
     }
-    cr = ContainerRequest.new(mounts: m)
     assert_raises(ArgumentError) do
-      cr.send :mounts_for_container
+      Container.resolve_mounts(m)
     end
   end
 
@@ -385,21 +384,19 @@ class ContainerRequestTest < ActiveSupport::TestCase
    'arvados/apitestfixture',
    'd8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678',
   ].each do |tag|
-    test "container_image_for_container(#{tag.inspect})" do
+    test "Container.resolve_container_image(#{tag.inspect})" do
       set_user_from_auth :active
-      cr = ContainerRequest.new(container_image: tag)
-      resolved = cr.send :container_image_for_container
+      resolved = Container.resolve_container_image(tag)
       assert_equal resolved, collections(:docker_image).portable_data_hash
     end
   end
 
-  test "container_image_for_container(pdh)" do
+  test "Container.resolve_container_image(pdh)" do
     set_user_from_auth :active
     [[:docker_image, 'v1'], [:docker_image_1_12, 'v2']].each do |coll, ver|
       Rails.configuration.docker_image_formats = [ver]
       pdh = collections(coll).portable_data_hash
-      cr = ContainerRequest.new(container_image: pdh)
-      resolved = cr.send :container_image_for_container
+      resolved = Container.resolve_container_image(pdh)
       assert_equal resolved, pdh
     end
   end
@@ -410,9 +407,8 @@ class ContainerRequestTest < ActiveSupport::TestCase
   ].each do |img|
     test "container_image_for_container(#{img.inspect}) => 422" do
       set_user_from_auth :active
-      cr = ContainerRequest.new(container_image: img)
       assert_raises(ArvadosModel::UnresolvableContainerError) do
-        cr.send :container_image_for_container
+        Container.resolve_container_image(img)
       end
     end
   end
@@ -426,12 +422,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
     set_user_from_auth :active
     cr = create_minimal_req!(command: ["true", "1"],
                              container_image: collections(:docker_image).portable_data_hash)
-    assert_equal(cr.send(:container_image_for_container),
+    assert_equal(Container.resolve_container_image(cr.container_image),
                  collections(:docker_image_1_12).portable_data_hash)
 
     cr = create_minimal_req!(command: ["true", "2"],
                              container_image: links(:docker_image_collection_tag).name)
-    assert_equal(cr.send(:container_image_for_container),
+    assert_equal(Container.resolve_container_image(cr.container_image),
                  collections(:docker_image_1_12).portable_data_hash)
   end
 
@@ -445,12 +441,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
     set_user_from_auth :active
     cr = create_minimal_req!(command: ["true", "1"],
                              container_image: collections(:docker_image).portable_data_hash)
-    assert_equal(cr.send(:container_image_for_container),
+    assert_equal(Container.resolve_container_image(cr.container_image),
                  collections(:docker_image).portable_data_hash)
 
     cr = create_minimal_req!(command: ["true", "2"],
                              container_image: links(:docker_image_collection_tag).name)
-    assert_equal(cr.send(:container_image_for_container),
+    assert_equal(Container.resolve_container_image(cr.container_image),
                  collections(:docker_image).portable_data_hash)
   end
 
@@ -463,7 +459,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
     cr = create_minimal_req!(command: ["true", "1"],
                              container_image: collections(:docker_image_1_12).portable_data_hash)
     assert_raises(ArvadosModel::UnresolvableContainerError) do
-      cr.send(:container_image_for_container)
+      Container.resolve_container_image(cr.container_image)
     end
   end
 
@@ -475,12 +471,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
     cr = create_minimal_req!(command: ["true", "1"],
                              container_image: collections(:docker_image).portable_data_hash)
     assert_raises(ArvadosModel::UnresolvableContainerError) do
-      cr.send(:container_image_for_container)
+      Container.resolve_container_image(cr.container_image)
     end
     cr = create_minimal_req!(command: ["true", "2"],
                              container_image: links(:docker_image_collection_tag).name)
     assert_raises(ArvadosModel::UnresolvableContainerError) do
-      cr.send(:container_image_for_container)
+      Container.resolve_container_image(cr.container_image)
     end
   end
 
@@ -502,8 +498,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
                       command: ["echo", "hello"],
                       output_path: "test",
                       runtime_constraints: {"vcpus" => 4,
-                                            "ram" => 12000000000,
-                                            "keep_cache_ram" => 268435456},
+                                            "ram" => 12000000000},
                       mounts: {"test" => {"kind" => "json"}}}
       set_user_from_auth :active
       cr1 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Committed,
@@ -585,38 +580,65 @@ class ContainerRequestTest < ActiveSupport::TestCase
 
   test "Output collection name setting using output_name with name collision resolution" do
     set_user_from_auth :active
-    output_name = collections(:foo_file).name
+    output_name = 'unimaginative name'
+    Collection.create!(name: output_name)
 
     cr = create_minimal_req!(priority: 1,
                              state: ContainerRequest::Committed,
                              output_name: output_name)
-    act_as_system_user do
-      c = Container.find_by_uuid(cr.container_uuid)
-      c.update_attributes!(state: Container::Locked)
-      c.update_attributes!(state: Container::Running)
-      c.update_attributes!(state: Container::Complete,
-                           exit_code: 0,
-                           output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
-                           log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
-    end
-    cr.save
+    run_container(cr)
+    cr.reload
     assert_equal ContainerRequest::Final, cr.state
     output_coll = Collection.find_by_uuid(cr.output_uuid)
     # Make sure the resulting output collection name include the original name
     # plus the date
     assert_not_equal output_name, output_coll.name,
-                     "It shouldn't exist more than one collection with the same owner and name '${output_name}'"
+                     "more than one collection with the same owner and name"
     assert output_coll.name.include?(output_name),
            "New name should include original name"
-    assert_match /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z/, output_coll.name,
+    assert_match /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/, output_coll.name,
                  "New name should include ISO8601 date"
   end
 
-  test "Finalize committed request when reusing a finished container" do
-    set_user_from_auth :active
-    cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
-    cr.reload
-    assert_equal ContainerRequest::Committed, cr.state
+  [[0, :check_output_ttl_0],
+   [1, :check_output_ttl_1s],
+   [365*86400, :check_output_ttl_1y],
+  ].each do |ttl, checker|
+    test "output_ttl=#{ttl}" do
+      act_as_user users(:active) do
+        cr = create_minimal_req!(priority: 1,
+                                 state: ContainerRequest::Committed,
+                                 output_name: 'foo',
+                                 output_ttl: ttl)
+        run_container(cr)
+        cr.reload
+        output = Collection.find_by_uuid(cr.output_uuid)
+        send(checker, db_current_time, output.trash_at, output.delete_at)
+      end
+    end
+  end
+
+  def check_output_ttl_0(now, trash, delete)
+    assert_nil(trash)
+    assert_nil(delete)
+  end
+
+  def check_output_ttl_1s(now, trash, delete)
+    assert_not_nil(trash)
+    assert_not_nil(delete)
+    assert_in_delta(trash, now + 1.second, 10)
+    assert_in_delta(delete, now + Rails.configuration.blob_signature_ttl.second, 10)
+  end
+
+  def check_output_ttl_1y(now, trash, delete)
+    year = (86400*365).second
+    assert_not_nil(trash)
+    assert_not_nil(delete)
+    assert_in_delta(trash, now + year, 10)
+    assert_in_delta(delete, now + year, 10)
+  end
+
+  def run_container(cr)
     act_as_system_user do
       c = Container.find_by_uuid(cr.container_uuid)
       c.update_attributes!(state: Container::Locked)
@@ -625,7 +647,16 @@ class ContainerRequestTest < ActiveSupport::TestCase
                            exit_code: 0,
                            output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
                            log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+      c
     end
+  end
+
+  test "Finalize committed request when reusing a finished container" do
+    set_user_from_auth :active
+    cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+    cr.reload
+    assert_equal ContainerRequest::Committed, cr.state
+    run_container(cr)
     cr.reload
     assert_equal ContainerRequest::Final, cr.state
 
@@ -640,34 +671,6 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal ContainerRequest::Final, cr3.state
   end
 
-  [
-    [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => 100}, ContainerRequest::Committed, 100],
-    [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Uncommitted],
-    [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Committed],
-    [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => -1}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
-    [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => '123'}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
-  ].each do |rc, state, expected|
-    test "create container request with #{rc} in state #{state} and verify keep_cache_ram #{expected}" do
-      common_attrs = {cwd: "test",
-                      priority: 1,
-                      command: ["echo", "hello"],
-                      output_path: "test",
-                      runtime_constraints: rc,
-                      mounts: {"test" => {"kind" => "json"}}}
-      set_user_from_auth :active
-
-      if expected == ActiveRecord::RecordInvalid
-        assert_raises(ActiveRecord::RecordInvalid) do
-          create_minimal_req!(common_attrs.merge({state: state}))
-        end
-      else
-        cr = create_minimal_req!(common_attrs.merge({state: state}))
-        expected = Rails.configuration.container_default_keep_cache_ram if state == ContainerRequest::Committed and expected.nil?
-        assert_equal expected, cr.runtime_constraints['keep_cache_ram']
-      end
-    end
-  end
-
   [
     [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
     [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
@@ -699,4 +702,48 @@ class ContainerRequestTest < ActiveSupport::TestCase
       end
     end
   end
+
+  [['Committed', true, {name: "foobar", priority: 123}],
+   ['Committed', false, {container_count: 2}],
+   ['Committed', false, {container_count: 0}],
+   ['Committed', false, {container_count: nil}],
+   ['Final', false, {state: ContainerRequest::Committed, name: "foobar"}],
+   ['Final', false, {name: "foobar", priority: 123}],
+   ['Final', false, {name: "foobar", output_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+   ['Final', false, {name: "foobar", log_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+   ['Final', false, {log_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+   ['Final', false, {priority: 123}],
+   ['Final', false, {mounts: {}}],
+   ['Final', false, {container_count: 2}],
+   ['Final', true, {name: "foobar"}],
+   ['Final', true, {name: "foobar", description: "baz"}],
+  ].each do |state, permitted, updates|
+    test "state=#{state} can#{'not' if !permitted} update #{updates.inspect}" do
+      act_as_user users(:active) do
+        cr = create_minimal_req!(priority: 1,
+                                 state: "Committed",
+                                 container_count_max: 1)
+        case state
+        when 'Committed'
+          # already done
+        when 'Final'
+          act_as_system_user do
+            Container.find_by_uuid(cr.container_uuid).
+              update_attributes!(state: Container::Cancelled)
+          end
+          cr.reload
+        else
+          raise 'broken test case'
+        end
+        assert_equal state, cr.state
+        if permitted
+          assert cr.update_attributes!(updates)
+        else
+          assert_raises(ActiveRecord::RecordInvalid) do
+            cr.update_attributes!(updates)
+          end
+        end
+      end
+    end
+  end
 end
index 5a19f05ee4bc0dcefb1572030ee002eff43f65d5..52d2aa6741d4e8a537fc515477aeaf104c46c4cc 100644 (file)
@@ -11,14 +11,22 @@ class ContainerTest < ActiveSupport::TestCase
     runtime_constraints: {"vcpus" => 1, "ram" => 1},
   }
 
-  REUSABLE_COMMON_ATTRS = {container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
-                           cwd: "test",
-                           command: ["echo", "hello"],
-                           output_path: "test",
-                           runtime_constraints: {"vcpus" => 4,
-                                                 "ram" => 12000000000},
-                           mounts: {"test" => {"kind" => "json"}},
-                           environment: {"var" => 'val'}}
+  REUSABLE_COMMON_ATTRS = {
+    container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
+    cwd: "test",
+    command: ["echo", "hello"],
+    output_path: "test",
+    runtime_constraints: {
+      "ram" => 12000000000,
+      "vcpus" => 4,
+    },
+    mounts: {
+      "test" => {"kind" => "json"},
+    },
+    environment: {
+      "var" => "val",
+    },
+  }
 
   def minimal_new attrs={}
     cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
@@ -86,7 +94,7 @@ class ContainerTest < ActiveSupport::TestCase
   test "Container serialized hash attributes sorted before save" do
     env = {"C" => 3, "B" => 2, "A" => 1}
     m = {"F" => {"kind" => 3}, "E" => {"kind" => 2}, "D" => {"kind" => 1}}
-    rc = {"vcpus" => 1, "ram" => 1}
+    rc = {"vcpus" => 1, "ram" => 1, "keep_cache_ram" => 1}
     c, _ = minimal_new(environment: env, mounts: m, runtime_constraints: rc)
     assert_equal c.environment.to_json, Container.deep_sort_hash(env).to_json
     assert_equal c.mounts.to_json, Container.deep_sort_hash(m).to_json
@@ -149,21 +157,21 @@ class ContainerTest < ActiveSupport::TestCase
       log: 'ea10d51bcf88862dbcc36eb292017dfd+45',
     }
 
-    set_user_from_auth :dispatch1
-
-    c_output1 = Container.create common_attrs
-    c_output2 = Container.create common_attrs
-    assert_not_equal c_output1.uuid, c_output2.uuid
-
     cr = ContainerRequest.new common_attrs
+    cr.use_existing = false
     cr.state = ContainerRequest::Committed
-    cr.container_uuid = c_output1.uuid
     cr.save!
+    c_output1 = Container.where(uuid: cr.container_uuid).first
 
     cr = ContainerRequest.new common_attrs
+    cr.use_existing = false
     cr.state = ContainerRequest::Committed
-    cr.container_uuid = c_output2.uuid
     cr.save!
+    c_output2 = Container.where(uuid: cr.container_uuid).first
+
+    assert_not_equal c_output1.uuid, c_output2.uuid
+
+    set_user_from_auth :dispatch1
 
     out1 = '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'
     log1 = collections(:real_log_collection).portable_data_hash
@@ -176,9 +184,8 @@ class ContainerTest < ActiveSupport::TestCase
     c_output2.update_attributes!({state: Container::Running})
     c_output2.update_attributes!(completed_attrs.merge({log: log1, output: out2}))
 
-    reused = Container.find_reusable(common_attrs)
-    assert_not_nil reused
-    assert_equal reused.uuid, c_output1.uuid
+    reused = Container.resolve(ContainerRequest.new(common_attrs))
+    assert_equal c_output1.uuid, reused.uuid
   end
 
   test "find_reusable method should select running container by start date" do
index c1e77f6a4d4cf78e6e3ac7ce77b4ffe5a2fd4c9e..2330e7c528f6a304b05cf318fcc2482e91e62b8d 100644 (file)
@@ -1,4 +1,6 @@
 require 'test_helper'
+require 'tmpdir'
+require 'tempfile'
 
 class NodeTest < ActiveSupport::TestCase
   def ping_node(node_name, ping_data)
@@ -76,6 +78,16 @@ class NodeTest < ActiveSupport::TestCase
     assert Node.dns_server_update 'compute65535', '127.0.0.127'
   end
 
+  test "don't leave temp files behind if there's an error writing them" do
+    Rails.configuration.dns_server_conf_template = Rails.root.join 'config', 'unbound.template'
+    Tempfile.any_instance.stubs(:puts).raises(IOError)
+    Dir.mktmpdir do |tmpdir|
+      Rails.configuration.dns_server_conf_dir = tmpdir
+      refute Node.dns_server_update 'compute65535', '127.0.0.127'
+      assert_empty Dir.entries(tmpdir).select{|f| File.file? f}
+    end
+  end
+
   test "ping new node with no hostname and default config" do
     node = ping_node(:new_with_no_hostname, {})
     slot_number = node.slot_number
index 062126d6ff42e710523c5779ac2441c109f77f65..fd2ce3f659f188dc71a6b8fa72c22fc149faa499 100644 (file)
@@ -1,6 +1,8 @@
 package main
 
 import (
+       "bytes"
+       "context"
        "encoding/json"
        "errors"
        "flag"
@@ -24,7 +26,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/curoverse/dockerclient"
+
+       dockertypes "github.com/docker/docker/api/types"
+       dockercontainer "github.com/docker/docker/api/types/container"
+       dockernetwork "github.com/docker/docker/api/types/network"
+       dockerclient "github.com/docker/docker/client"
 )
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -55,14 +61,62 @@ type MkTempDir func(string, string) (string, error)
 
 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
 type ThinDockerClient interface {
-       StopContainer(id string, timeout int) error
-       InspectImage(id string) (*dockerclient.ImageInfo, error)
-       LoadImage(reader io.Reader) error
-       CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
-       StartContainer(id string, config *dockerclient.HostConfig) error
-       AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
-       Wait(id string) <-chan dockerclient.WaitResult
-       RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
+       ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
+       ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+               networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
+       ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
+       ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+       ContainerWait(ctx context.Context, container string) (int64, error)
+       ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
+       ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
+       ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+}
+
+// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
+// that executes the docker requests on dockerclient.Client
+type ThinDockerClientProxy struct {
+       Docker *dockerclient.Client
+}
+
+// ContainerAttach invokes dockerclient.Client.ContainerAttach
+func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+       return proxy.Docker.ContainerAttach(ctx, container, options)
+}
+
+// ContainerCreate invokes dockerclient.Client.ContainerCreate
+func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+       networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
+       return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
+}
+
+// ContainerStart invokes dockerclient.Client.ContainerStart
+func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+       return proxy.Docker.ContainerStart(ctx, container, options)
+}
+
+// ContainerStop invokes dockerclient.Client.ContainerStop
+func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+       return proxy.Docker.ContainerStop(ctx, container, timeout)
+}
+
+// ContainerWait invokes dockerclient.Client.ContainerWait
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
+       return proxy.Docker.ContainerWait(ctx, container)
+}
+
+// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
+func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+       return proxy.Docker.ImageInspectWithRaw(ctx, image)
+}
+
+// ImageLoad invokes dockerclient.Client.ImageLoad
+func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+       return proxy.Docker.ImageLoad(ctx, input, quiet)
+}
+
+// ImageRemove invokes dockerclient.Client.ImageRemove
+func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
+       return proxy.Docker.ImageRemove(ctx, image, options)
 }
 
 // ContainerRunner is the main stateful struct used for a single execution of a
@@ -72,8 +126,8 @@ type ContainerRunner struct {
        ArvClient IArvadosClient
        Kc        IKeepClient
        arvados.Container
-       dockerclient.ContainerConfig
-       dockerclient.HostConfig
+       ContainerConfig dockercontainer.Config
+       dockercontainer.HostConfig
        token       string
        ContainerID string
        ExitCode    *int
@@ -81,7 +135,7 @@ type ContainerRunner struct {
        loggingDone   chan bool
        CrunchLog     *ThrottledLogger
        Stdout        io.WriteCloser
-       Stderr        *ThrottledLogger
+       Stderr        io.WriteCloser
        LogCollection *CollectionWriter
        LogsPDH       *string
        RunArvMount
@@ -146,7 +200,8 @@ func (runner *ContainerRunner) stop() {
        }
        runner.cCancelled = true
        if runner.cStarted {
-               err := runner.Docker.StopContainer(runner.ContainerID, 10)
+               timeout := time.Duration(10)
+               err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
                if err != nil {
                        log.Printf("StopContainer failed: %s", err)
                }
@@ -177,7 +232,7 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
 
-       _, err = runner.Docker.InspectImage(imageID)
+       _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
        if err != nil {
                runner.CrunchLog.Print("Loading Docker image from keep")
 
@@ -187,10 +242,11 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                        return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
                }
 
-               err = runner.Docker.LoadImage(readCloser)
+               response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
                if err != nil {
                        return fmt.Errorf("While loading container image into Docker: %v", err)
                }
+               response.Body.Close()
        } else {
                runner.CrunchLog.Print("Docker image is available")
        }
@@ -290,10 +346,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if bind == "stdout" {
+               if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+                               return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -302,7 +358,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+                       }
+               }
+
+               if bind == "stdin" {
+                       // Is it a "collection" mount kind?
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
                        }
                }
 
@@ -317,7 +380,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                switch {
-               case mnt.Kind == "collection":
+               case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -602,15 +665,44 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) {
        return nil
 }
 
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
 
-       var containerReader io.Reader
-       containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
-               &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
+       // If stdin mount is provided, attach it to the docker container
+       var stdinRdr keepclient.Reader
+       var stdinJson []byte
+       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
+
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
+               }
+       }
+
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+       response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
+               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -618,37 +710,76 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.loggingDone = make(chan bool)
 
        if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
-               index := strings.LastIndex(stdoutPath, "/")
-               if index > 0 {
-                       subdirs := stdoutPath[:index]
-                       if subdirs != "" {
-                               st, err := os.Stat(runner.HostOutputDir)
-                               if err != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", err)
-                               }
-                               stdoutPath := path.Join(runner.HostOutputDir, subdirs)
-                               err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
-                               if err != nil {
-                                       return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
-                               }
-                       }
-               }
-               stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
                if err != nil {
-                       return fmt.Errorf("While creating stdout file: %v", err)
+                       return err
                }
                runner.Stdout = stdoutFile
        } else {
                runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
        }
-       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
 
-       go runner.ProcessDockerAttach(containerReader)
+       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+               if err != nil {
+                       return err
+               }
+               runner.Stderr = stderrFile
+       } else {
+               runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+       }
+
+       if stdinRdr != nil {
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       stdinRdr.Close()
+                       response.CloseWrite()
+               }()
+       } else if len(stdinJson) != 0 {
+               go func() {
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.CloseWrite()
+               }()
+       }
+
+       go runner.ProcessDockerAttach(response.Reader)
 
        return nil
 }
 
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+       stdoutPath := mntPath[len(runner.Container.OutputPath):]
+       index := strings.LastIndex(stdoutPath, "/")
+       if index > 0 {
+               subdirs := stdoutPath[:index]
+               if subdirs != "" {
+                       st, err := os.Stat(runner.HostOutputDir)
+                       if err != nil {
+                               return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+                       }
+                       stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+                       err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+                       if err != nil {
+                               return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+                       }
+               }
+       }
+       stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+       if err != nil {
+               return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+       }
+
+       return stdoutFile, nil
+}
+
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
@@ -662,10 +793,10 @@ func (runner *ContainerRunner) CreateContainer() error {
                runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
        }
 
-       runner.HostConfig = dockerclient.HostConfig{
-               Binds:        runner.Binds,
-               CgroupParent: runner.setCgroupParent,
-               LogConfig: dockerclient.LogConfig{
+       runner.HostConfig = dockercontainer.HostConfig{
+               Binds:  runner.Binds,
+               Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+               LogConfig: dockercontainer.LogConfig{
                        Type: "none",
                },
        }
@@ -680,21 +811,29 @@ func (runner *ContainerRunner) CreateContainer() error {
                        "ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
                        "ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
                )
-               runner.HostConfig.NetworkMode = runner.networkMode
+               runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
        } else {
                if runner.enableNetwork == "always" {
-                       runner.HostConfig.NetworkMode = runner.networkMode
+                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
                } else {
-                       runner.HostConfig.NetworkMode = "none"
+                       runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
                }
        }
 
-       var err error
-       runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
+       _, stdinUsed := runner.Container.Mounts["stdin"]
+       runner.ContainerConfig.OpenStdin = stdinUsed
+       runner.ContainerConfig.StdinOnce = stdinUsed
+       runner.ContainerConfig.AttachStdin = stdinUsed
+       runner.ContainerConfig.AttachStdout = true
+       runner.ContainerConfig.AttachStderr = true
+
+       createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
        }
 
+       runner.ContainerID = createdBody.ID
+
        return runner.AttachStreams()
 }
 
@@ -706,7 +845,8 @@ func (runner *ContainerRunner) StartContainer() error {
        if runner.cCancelled {
                return ErrCancelled
        }
-       err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
+       err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
+               dockertypes.ContainerStartOptions{})
        if err != nil {
                return fmt.Errorf("could not start container: %v", err)
        }
@@ -719,21 +859,22 @@ func (runner *ContainerRunner) StartContainer() error {
 func (runner *ContainerRunner) WaitFinish() error {
        runner.CrunchLog.Print("Waiting for container to finish")
 
-       waitDocker := runner.Docker.Wait(runner.ContainerID)
+       waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+       if err != nil {
+               return fmt.Errorf("container wait: %v", err)
+       }
+
+       runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
+       code := int(waitDocker)
+       runner.ExitCode = &code
+
        waitMount := runner.ArvMountExit
-       for waitDocker != nil {
-               select {
-               case err := <-waitMount:
-                       runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
-                       waitMount = nil
-                       runner.stop()
-               case wr := <-waitDocker:
-                       if wr.Error != nil {
-                               return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
-                       }
-                       runner.ExitCode = &wr.ExitCode
-                       waitDocker = nil
-               }
+       select {
+       case err := <-waitMount:
+               runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+               waitMount = nil
+               runner.stop()
+       default:
        }
 
        // wait for stdout/stderr to complete
@@ -1186,13 +1327,17 @@ func main() {
        }
        kc.Retries = 4
 
-       var docker *dockerclient.DockerClient
-       docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+       var docker *dockerclient.Client
+       // API version 1.21 corresponds to Docker 1.9, which is currently the
+       // minimum version we want to support.
+       docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)
        }
 
-       cr := NewContainerRunner(api, kc, docker, containerId)
+       dockerClientProxy := ThinDockerClientProxy{Docker: docker}
+
+       cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
index 7224c4f1b3d622051e1506da7bc1116f443ac003..43c55b67c1c08c07f69fe9913e3681e3835026a6 100644 (file)
@@ -1,13 +1,16 @@
 package main
 
 import (
+       "bufio"
        "bytes"
+       "context"
        "crypto/md5"
        "encoding/json"
        "errors"
        "fmt"
        "io"
        "io/ioutil"
+       "net"
        "os"
        "os/exec"
        "path/filepath"
@@ -23,7 +26,10 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
-       "github.com/curoverse/dockerclient"
+
+       dockertypes "github.com/docker/docker/api/types"
+       dockercontainer "github.com/docker/docker/api/types/container"
+       dockernetwork "github.com/docker/docker/api/types/network"
        . "gopkg.in/check.v1"
 )
 
@@ -73,55 +79,49 @@ type TestDockerClient struct {
        logReader   io.ReadCloser
        logWriter   io.WriteCloser
        fn          func(t *TestDockerClient)
-       finish      chan dockerclient.WaitResult
+       finish      int
        stop        chan bool
        cwd         string
        env         []string
        api         *ArvTestClient
 }
 
-func NewTestDockerClient() *TestDockerClient {
+func NewTestDockerClient(exitCode int) *TestDockerClient {
        t := &TestDockerClient{}
        t.logReader, t.logWriter = io.Pipe()
-       t.finish = make(chan dockerclient.WaitResult)
+       t.finish = exitCode
        t.stop = make(chan bool)
        t.cwd = "/"
        return t
 }
 
-func (t *TestDockerClient) StopContainer(id string, timeout int) error {
-       t.stop <- true
-       return nil
+type MockConn struct {
+       net.Conn
 }
 
-func (t *TestDockerClient) InspectImage(id string) (*dockerclient.ImageInfo, error) {
-       if t.imageLoaded == id {
-               return &dockerclient.ImageInfo{}, nil
-       } else {
-               return nil, errors.New("")
-       }
+func (m *MockConn) Write(b []byte) (int, error) {
+       return len(b), nil
 }
 
-func (t *TestDockerClient) LoadImage(reader io.Reader) error {
-       _, err := io.Copy(ioutil.Discard, reader)
-       if err != nil {
-               return err
-       } else {
-               t.imageLoaded = hwImageId
-               return nil
-       }
+func NewMockConn() *MockConn {
+       c := &MockConn{}
+       return c
+}
+
+func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+       return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
 }
 
-func (t *TestDockerClient) CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error) {
+func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
        if config.WorkingDir != "" {
                t.cwd = config.WorkingDir
        }
        t.env = config.Env
-       return "abcde", nil
+       return dockercontainer.ContainerCreateCreatedBody{ID: "abcde"}, nil
 }
 
-func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostConfig) error {
-       if id == "abcde" {
+func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+       if container == "abcde" {
                go t.fn(t)
                return nil
        } else {
@@ -129,15 +129,34 @@ func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostCo
        }
 }
 
-func (t *TestDockerClient) AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error) {
-       return t.logReader, nil
+func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+       t.stop <- true
+       return nil
 }
 
-func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
-       return t.finish
+func (t *TestDockerClient) ContainerWait(ctx context.Context, container string) (int64, error) {
+       return int64(t.finish), nil
 }
 
-func (*TestDockerClient) RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error) {
+func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+       if t.imageLoaded == image {
+               return dockertypes.ImageInspect{}, nil, nil
+       } else {
+               return dockertypes.ImageInspect{}, nil, errors.New("")
+       }
+}
+
+func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+       _, err := io.Copy(ioutil.Discard, input)
+       if err != nil {
+               return dockertypes.ImageLoadResponse{}, err
+       } else {
+               t.imageLoaded = hwImageId
+               return dockertypes.ImageLoadResponse{Body: ioutil.NopCloser(input)}, nil
+       }
+}
+
+func (*TestDockerClient) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
        return nil, nil
 }
 
@@ -281,18 +300,22 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
                rdr := ioutil.NopCloser(&bytes.Buffer{})
                client.Called = true
                return FileWrapper{rdr, 1321984}, nil
+       } else if filename == "/file1_in_main.txt" {
+               rdr := ioutil.NopCloser(strings.NewReader("foo"))
+               client.Called = true
+               return FileWrapper{rdr, 3}, nil
        }
        return nil, nil
 }
 
 func (s *TestSuite) TestLoadImage(c *C) {
        kc := &KeepTestClient{}
-       docker := NewTestDockerClient()
+       docker := NewTestDockerClient(0)
        cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
-       _, err := cr.Docker.RemoveImage(hwImageId, true)
+       _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
-       _, err = cr.Docker.InspectImage(hwImageId)
+       _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
        c.Check(err, NotNil)
 
        cr.Container.ContainerImage = hwPDH
@@ -305,13 +328,13 @@ func (s *TestSuite) TestLoadImage(c *C) {
 
        c.Check(err, IsNil)
        defer func() {
-               cr.Docker.RemoveImage(hwImageId, true)
+               cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
        }()
 
        c.Check(kc.Called, Equals, true)
        c.Check(cr.ContainerConfig.Image, Equals, hwImageId)
 
-       _, err = cr.Docker.InspectImage(hwImageId)
+       _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
        c.Check(err, IsNil)
 
        // (2) Test using image that's already loaded
@@ -403,7 +426,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
        // (2) Keep error
-       docker := NewTestDockerClient()
+       docker := NewTestDockerClient(0)
        cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Container.ContainerImage = hwPDH
 
@@ -422,7 +445,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) {
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
        // (4) Collection doesn't contain image
-       docker := NewTestDockerClient()
+       docker := NewTestDockerClient(0)
        cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Container.ContainerImage = hwPDH
 
@@ -463,11 +486,10 @@ func dockerLog(fd byte, msg string) []byte {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-       docker := NewTestDockerClient()
+       docker := NewTestDockerClient(0)
        docker.fn = func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, "Hello world\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{}
        }
        cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
@@ -559,14 +581,14 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
 // dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, extraMounts []string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
+func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
        rec := arvados.Container{}
        err := json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient()
+       docker := NewTestDockerClient(exitCode)
        docker.fn = fn
-       docker.RemoveImage(hwImageId, true)
+       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api = &ArvTestClient{Container: rec}
        docker.api = api
@@ -626,10 +648,9 @@ func (s *TestSuite) TestFullRunHello(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, "hello world\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -648,10 +669,9 @@ func (s *TestSuite) TestCrunchstat(c *C) {
                "output_path": "/tmp",
                "priority": 1,
                "runtime_constraints": {}
-       }`, nil, func(t *TestDockerClient) {
+       }`, nil, 0, func(t *TestDockerClient) {
                time.Sleep(time.Second)
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -681,11 +701,11 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
                "output_path": "/tmp",
                "priority": 1,
                "runtime_constraints": {}
-       }`, nil, func(t *TestDockerClient) {
-               time.Sleep(time.Second)
-               t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{}
-       })
+       }`, nil, 0,
+               func(t *TestDockerClient) {
+                       time.Sleep(time.Second)
+                       t.logWriter.Close()
+               })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(api.CalledWith("container.state", "Complete"), NotNil)
@@ -708,11 +728,11 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
                "output_path": "/tmp",
                "priority": 1,
                "runtime_constraints": {}
-       }`, nil, func(t *TestDockerClient) {
-               time.Sleep(time.Second)
-               t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{}
-       })
+       }`, nil, 0,
+               func(t *TestDockerClient) {
+                       time.Sleep(time.Second)
+                       t.logWriter.Close()
+               })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(api.CalledWith("container.state", "Complete"), NotNil)
@@ -731,11 +751,10 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 1, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, "hello\n"))
                t.logWriter.Write(dockerLog(2, "world\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 1}
        })
 
        final := api.CalledWith("container.state", "Complete")
@@ -757,10 +776,9 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -779,10 +797,9 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -827,14 +844,13 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
        err := json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient()
+       docker := NewTestDockerClient(0)
        docker.fn = func(t *TestDockerClient) {
                <-t.stop
                t.logWriter.Write(dockerLog(1, "foo\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        }
-       docker.RemoveImage(hwImageId, true)
+       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api := &ArvTestClient{Container: rec}
        cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
@@ -872,10 +888,9 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1116,6 +1131,22 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                cr.CleanupDirs()
                checkEmpty()
        }
+
+       // Only mount point of kind 'collection' is allowed for stdin
+       {
+               i = 0
+               cr.ArvMountPoint = ""
+               cr.Container.Mounts = make(map[string]arvados.Mount)
+               cr.Container.Mounts = map[string]arvados.Mount{
+                       "stdin": {Kind: "tmp"},
+               }
+
+               err := cr.SetupMounts()
+               c.Check(err, NotNil)
+               c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
+               cr.CleanupDirs()
+               checkEmpty()
+       }
 }
 
 func (s *TestSuite) TestStdout(c *C) {
@@ -1130,10 +1161,9 @@ func (s *TestSuite) TestStdout(c *C) {
                "runtime_constraints": {}
        }`
 
-       api, _, _ := FullRunHelper(c, helperRecord, nil, func(t *TestDockerClient) {
+       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1147,9 +1177,9 @@ func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (ap
        err = json.Unmarshal([]byte(record), &rec)
        c.Check(err, IsNil)
 
-       docker := NewTestDockerClient()
+       docker := NewTestDockerClient(0)
        docker.fn = fn
-       docker.RemoveImage(hwImageId, true)
+       docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
 
        api = &ArvTestClient{Container: rec}
        cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
@@ -1202,10 +1232,9 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {"API": true}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1226,10 +1255,9 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {"API": true}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
                t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1258,10 +1286,9 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C
 
        extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1294,10 +1321,9 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
        }
 
-       api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+       api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(runner.Binds, DeepEquals, []string{realtemp + "/2:/tmp",
@@ -1347,10 +1373,9 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
                t.logWriter.Close()
-               t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1368,3 +1393,103 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                }
        }
 }
+
+func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
+       helperRecord := `{
+               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": "/bin",
+               "environment": {"FROBIZ": "bilbo"},
+               "mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/file1_in_main.txt"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`
+
+       extraMounts := []string{
+               "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
+       }
+
+       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       for _, v := range api.Content {
+               if v["collection"] != nil {
+                       collection := v["collection"].(arvadosclient.Dict)
+                       if strings.Index(collection["name"].(string), "output") == 0 {
+                               manifest := collection["manifest_text"].(string)
+                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+                       }
+               }
+       }
+}
+
+func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
+       helperRecord := `{
+               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": "/bin",
+               "environment": {"FROBIZ": "bilbo"},
+               "mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "json", "content": "foo"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`
+
+       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       for _, v := range api.Content {
+               if v["collection"] != nil {
+                       collection := v["collection"].(arvadosclient.Dict)
+                       if strings.Index(collection["name"].(string), "output") == 0 {
+                               manifest := collection["manifest_text"].(string)
+                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+                       }
+               }
+       }
+}
+
+func (s *TestSuite) TestStderrMount(c *C) {
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo hello;exit 1"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"},
+               "stdout": {"kind": "file", "path": "/tmp/a/out.txt"},
+               "stderr": {"kind": "file", "path": "/tmp/b/err.txt"}},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 1, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello\n"))
+               t.logWriter.Write(dockerLog(2, "oops\n"))
+               t.logWriter.Close()
+       })
+
+       final := api.CalledWith("container.state", "Complete")
+       c.Assert(final, NotNil)
+       c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+       c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
+
+       c.Check(api.CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
+}
index 45e902f87c28cdeb9c7f52767bde22db4f573309..ef3e78e8709c57c03b2fc63cd05259cdc3dc91a7 100644 (file)
@@ -11,6 +11,7 @@ import sys
 import time
 
 import arvados.commands._util as arv_cmd
+from arvados_fuse import crunchstat
 from arvados_fuse import *
 from arvados_fuse.unmount import unmount
 from arvados_fuse._version import __version__
@@ -92,9 +93,9 @@ class ArgumentParser(argparse.ArgumentParser):
 
         unmount = self.add_mutually_exclusive_group()
         unmount.add_argument('--unmount', action='store_true', default=False,
-                             help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit.")
+                             help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
         unmount.add_argument('--unmount-all', action='store_true', default=False,
-                             help="Forcefully unmount every fuse mount at or below the specified mountpoint and exit.")
+                             help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
         unmount.add_argument('--replace', action='store_true', default=False,
                              help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
         self.add_argument('--unmount-timeout',
@@ -158,6 +159,7 @@ class Mount(object):
     def run(self):
         if self.args.unmount or self.args.unmount_all:
             unmount(path=self.args.mountpoint,
+                    subtype=self.args.subtype,
                     timeout=self.args.unmount_timeout,
                     recursive=self.args.unmount_all)
         elif self.args.exec_args:
index db78ddc738311b40914087c7b07fd0e27d6700f1..e213c733d1768c6c7cb37346d3840b0d73bdcb54 100644 (file)
@@ -26,7 +26,7 @@ def mountinfo():
     return mi
 
 
-def unmount(path, timeout=10, recursive=False):
+def unmount(path, subtype=None, timeout=10, recursive=False):
     """Unmount the fuse mount at path.
 
     Unmounting is done by writing 1 to the "abort" control file in
@@ -43,15 +43,23 @@ def unmount(path, timeout=10, recursive=False):
 
     path = os.path.realpath(path)
 
+    if subtype is None:
+        mnttype = None
+    elif subtype == '':
+        mnttype = 'fuse'
+    else:
+        mnttype = 'fuse.' + subtype
+
     if recursive:
         paths = []
         for m in mountinfo():
             if m.path == path or m.path.startswith(path+"/"):
                 paths.append(m.path)
-                if not m.is_fuse:
+                if not (m.is_fuse and (mnttype is None or
+                                       mnttype == m.mnttype)):
                     raise Exception(
-                        "cannot unmount {}: non-fuse mountpoint {}".format(
-                            path, m))
+                        "cannot unmount {}: mount type is {}".format(
+                            path, m.mnttype))
         for path in sorted(paths, key=len, reverse=True):
             unmount(path, timeout=timeout, recursive=False)
         return len(paths) > 0
@@ -66,7 +74,7 @@ def unmount(path, timeout=10, recursive=False):
     while True:
         mounted = False
         for m in mountinfo():
-            if m.is_fuse:
+            if m.is_fuse and (mnttype is None or mnttype == m.mnttype):
                 try:
                     if os.path.realpath(m.path) == path:
                         was_mounted = True
diff --git a/services/fuse/tests/test_crunchstat.py b/services/fuse/tests/test_crunchstat.py
new file mode 100644 (file)
index 0000000..1fa28fb
--- /dev/null
@@ -0,0 +1,13 @@
+import subprocess
+
+from integration_test import IntegrationTest
+
+
+class CrunchstatTest(IntegrationTest):
+    def test_crunchstat(self):
+        output = subprocess.check_output(
+            ['./bin/arv-mount',
+             '--crunchstat-interval', '1',
+             self.mnt,
+             '--exec', 'echo', 'ok'])
+        self.assertEqual("ok\n", output)
index 972edaadc8baaf2bdd7cc5c2312473bc6adf81fd..716a0e00d704d16b5edcb8436b3ca68de157024b 100644 (file)
@@ -32,10 +32,41 @@ class UnmountTest(IntegrationTest):
             self.assertNotIn(' '+self.mnt+' ', m)
 
     def _mounted(self, mounts):
-        all_mounts = subprocess.check_output(['mount', '-t', 'fuse.test'])
+        all_mounts = subprocess.check_output(['mount'])
         return [m for m in mounts
                 if ' '+m+' ' in all_mounts]
 
+    def _wait_for_mounts(self, mounts):
+        deadline = time.time() + 10
+        while self._mounted(mounts) != mounts:
+            time.sleep(0.1)
+            self.assertLess(time.time(), deadline)
+
+    def test_unmount_subtype(self):
+        mounts = []
+        for d in ['foo', 'bar']:
+            mnt = self.tmp+'/'+d
+            os.mkdir(mnt)
+            self.to_delete.insert(0, mnt)
+            mounts.append(mnt)
+            subprocess.check_call(
+                ['./bin/arv-mount', '--subtype', d, mnt])
+
+        self._wait_for_mounts(mounts)
+        self.assertEqual(mounts, self._mounted(mounts))
+        subprocess.call(['./bin/arv-mount', '--subtype', 'baz', '--unmount-all', self.tmp])
+        self.assertEqual(mounts, self._mounted(mounts))
+        subprocess.call(['./bin/arv-mount', '--subtype', 'bar', '--unmount', mounts[0]])
+        self.assertEqual(mounts, self._mounted(mounts))
+        subprocess.call(['./bin/arv-mount', '--subtype', '', '--unmount', self.tmp])
+        self.assertEqual(mounts, self._mounted(mounts))
+        subprocess.check_call(['./bin/arv-mount', '--subtype', 'foo', '--unmount', mounts[0]])
+        self.assertEqual(mounts[1:], self._mounted(mounts))
+        subprocess.check_call(['./bin/arv-mount', '--subtype', '', '--unmount-all', mounts[0]])
+        self.assertEqual(mounts[1:], self._mounted(mounts))
+        subprocess.check_call(['./bin/arv-mount', '--subtype', 'bar', '--unmount-all', self.tmp])
+        self.assertEqual([], self._mounted(mounts))
+
     def test_unmount_children(self):
         for d in ['foo', 'foo/bar', 'bar']:
             mnt = self.tmp+'/'+d
@@ -48,12 +79,7 @@ class UnmountTest(IntegrationTest):
             subprocess.check_call(
                 ['./bin/arv-mount', '--subtype', 'test', mnt])
 
-        # Wait for mounts to attach
-        deadline = time.time() + 10
-        while self._mounted(mounts) != mounts:
-            time.sleep(0.1)
-            self.assertLess(time.time(), deadline)
-
+        self._wait_for_mounts(mounts)
         self.assertEqual(mounts, self._mounted(mounts))
         subprocess.check_call(['./bin/arv-mount', '--unmount', self.tmp])
         self.assertEqual(mounts, self._mounted(mounts))
index 71f9083c01a3e99b35020c69cf2dee301f7e848e..9ee26e336d2849b163d02ca77fdac39012afd7c1 100644 (file)
@@ -121,8 +121,14 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
-        if not self.cloud_node.size:
-             self.cloud_node.size = self.cloud_size
+
+        # The information included in the node size object we get from libcloud
+        # is inconsistent between cloud providers.  Replace libcloud NodeSize
+        # object with compatible CloudSizeWrapper object which merges the size
+        # info reported from the cloud with size information from the
+        # configuration file.
+        self.cloud_node.size = self.cloud_size
+
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
         self._later.update_arvados_node_properties()
 
index c78f1c6b8d63160c40e7e57d9b29f57d07e59dcf..29b04845b653190f88c306094486ac030add3af7 100644 (file)
@@ -219,6 +219,23 @@ class BaseComputeNodeDriver(RetryMixin):
         return (isinstance(exception, cls.CLOUD_ERRORS) or
                 type(exception) is Exception)
 
+    def destroy_node(self, cloud_node):
+        try:
+            return self.real.destroy_node(cloud_node)
+        except self.CLOUD_ERRORS as destroy_error:
+            # Sometimes the destroy node request succeeds but times out and
+            # raises an exception instead of returning success.  If this
+            # happens, we get a noisy stack trace.  Check if the node is still
+            # on the node list.  If it is gone, we can declare victory.
+            try:
+                self.search_for_now(cloud_node.id, 'list_nodes')
+            except ValueError:
+                # If we catch ValueError, that means search_for_now didn't find
+                # it, which means destroy_node actually succeeded.
+                return True
+            # The node is still on the list.  Re-raise.
+            raise
+
     # Now that we've defined all our own methods, delegate generic, public
     # attributes of libcloud drivers that we haven't defined ourselves.
     def _delegate_to_real(attr_name):
index 1c6d214fe8818e9dd49e94b413daa6609096a4c8..79e43cb52a881f37f0e91819d25977828e057941 100644 (file)
@@ -136,6 +136,10 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
             raise
 
     def sync_node(self, cloud_node, arvados_node):
+        # Update the cloud node record to ensure we have the correct metadata
+        # fingerprint.
+        cloud_node = self.real.ex_get_node(cloud_node.name, cloud_node.extra['zone'])
+
         # We can't store the FQDN on the name attribute or anything like it,
         # because (a) names are static throughout the node's life (so FQDN
         # isn't available because we don't know it at node creation time) and
@@ -147,12 +151,8 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
             self._find_metadata(metadata_items, 'hostname')['value'] = hostname
         except KeyError:
             metadata_items.append({'key': 'hostname', 'value': hostname})
-        response = self.real.connection.async_request(
-            '/zones/{}/instances/{}/setMetadata'.format(
-                cloud_node.extra['zone'].name, cloud_node.name),
-            method='POST', data=metadata_req)
-        if not response.success():
-            raise Exception("setMetadata error: {}".format(response.error))
+
+        self.real.ex_set_node_metadata(cloud_node, metadata_items)
 
     @classmethod
     def node_fqdn(cls, node):
index aa9b3e290fe42123ca19209de45781a58f2eda94..30e8995baa26bdcc22154570dfa099aa5658d341 100644 (file)
@@ -47,6 +47,8 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'node_stale_after': str(60 * 60 * 2),
                        'watchdog': '600',
                        'node_mem_scaling': '0.95'},
+            'Manage': {'address': '127.0.0.1',
+                       'port': '-1'},
             'Logging': {'file': '/dev/stderr',
                         'level': 'WARNING'},
         }.iteritems():
index f23b2615e29876aeb5689c79959fe605d09d7e11..9bfee79b59bae21968064b995e5cd87df7d7c7b9 100644 (file)
@@ -9,6 +9,7 @@ import time
 import pykka
 
 from . import computenode as cnode
+from . import status
 from .computenode import dispatch
 from .config import actor_class
 
@@ -253,6 +254,18 @@ class NodeManagerDaemonActor(actor_class):
                     states.append("shutdown")
         return states + pykka.get_all(proxy_states)
 
+    def _update_tracker(self):
+        updates = {
+            k: 0
+            for k in status.tracker.keys()
+            if k.startswith('nodes_')
+        }
+        for s in self._node_states(size=None):
+            updates.setdefault('nodes_'+s, 0)
+            updates['nodes_'+s] += 1
+        updates['nodes_wish'] = len(self.last_wishlist)
+        status.tracker.update(updates)
+
     def _state_counts(self, size):
         states = self._node_states(size)
         counts = {
@@ -336,7 +349,11 @@ class NodeManagerDaemonActor(actor_class):
                 elif (nodes_wanted < 0) and self.booting:
                     self._later.stop_booting_node(size)
             except Exception as e:
-                self._logger.exception("while calculating nodes wanted for size %s", size)
+                self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
+        try:
+            self._update_tracker()
+        except:
+            self._logger.exception("while updating tracker")
 
     def _check_poll_freshness(orig_func):
         """Decorator to inhibit a method when poll information is stale.
index 93f6cbdbe3c60c4736614594fdd2e77666276274..11d38ecb76d22105b289d17cec5081aaa5bf3952 100644 (file)
@@ -13,6 +13,7 @@ import pykka
 import libcloud
 
 from . import config as nmconfig
+from . import status
 from .baseactor import WatchdogActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
@@ -112,6 +113,8 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
+    status.Server(config).start()
+
     try:
         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
diff --git a/services/nodemanager/arvnodeman/status.py b/services/nodemanager/arvnodeman/status.py
new file mode 100644 (file)
index 0000000..d21899c
--- /dev/null
@@ -0,0 +1,65 @@
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import http.server
+import json
+import logging
+import socketserver
+import threading
+
+_logger = logging.getLogger('status.Handler')
+
+
+class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
+    def __init__(self, config):
+        port = config.getint('Manage', 'port')
+        self.enabled = port >= 0
+        if not self.enabled:
+            _logger.warning("Management server disabled. "+
+                            "Use [Manage] config section to enable.")
+            return
+        self._config = config
+        self._tracker = tracker
+        super(Server, self).__init__(
+            (config.get('Manage', 'address'), port), Handler)
+        self._thread = threading.Thread(target=self.serve_forever)
+        self._thread.daemon = True
+
+    def start(self):
+        if self.enabled:
+            self._thread.start()
+
+
+class Handler(http.server.BaseHTTPRequestHandler, object):
+    def do_GET(self):
+        if self.path == '/status.json':
+            self.send_response(200)
+            self.send_header('Content-type', 'application/json')
+            self.end_headers()
+            self.wfile.write(tracker.get_json())
+        else:
+            self.send_response(404)
+
+    def log_message(self, fmt, *args, **kwargs):
+        _logger.info(fmt, *args, **kwargs)
+
+
+class Tracker(object):
+    def __init__(self):
+        self._mtx = threading.Lock()
+        self._latest = {}
+
+    def get_json(self):
+        with self._mtx:
+            return json.dumps(self._latest)
+
+    def keys(self):
+        with self._mtx:
+            return self._latest.keys()
+
+    def update(self, updates):
+        with self._mtx:
+            self._latest.update(updates)
+
+
+tracker = Tracker()
index f253621ced4ecccd578eb0f810d6c8bbcc2d7dee..8d5b855cdb4370927991e0aadc4f077f920d9eaa 100644 (file)
@@ -1,6 +1,16 @@
 # Azure configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index b25bf940cf4564a9da43b254c08812b74bf8f528..d5bed57b95811795ee83529935edf897f2339b18 100644 (file)
@@ -1,6 +1,16 @@
 # EC2 configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index ed7bdc3b3d7d9b423b217c3057f83b0ef86823bc..043bb9567d04909ec848f027365e464a859b2a6b 100644 (file)
@@ -1,6 +1,16 @@
 # Google Compute Engine configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # Node Manager will ensure that there are at least this many nodes running at
 # all times.  If node manager needs to start new idle nodes for the purpose of
index 314750e6cd3ade7c6bf6abdd25e4803b6a7acb2c..85b4c7d4a7c3f545e3cdff1d23e8cdef9aaf5d48 100644 (file)
@@ -5,6 +5,10 @@
 # is through the API server Rails console: load the Node object, set its
 # IP address to 10.10.0.N (where N is the cloud node's ID), and save.
 
+[Manage]
+address = 0.0.0.0
+port = 8989
+
 [Daemon]
 min_nodes = 0
 max_nodes = 8
index c30108f44bb65a487945e665e7f2afae91528c00..5eb923eb93079cc28bc0d1836c9f4b6dea6635a2 100644 (file)
@@ -29,17 +29,23 @@ setup(name='arvados-node-manager',
           ('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
       ],
       install_requires=[
-        'apache-libcloud>=0.16',
-        'arvados-python-client>=0.1.20150206225333',
-        'pykka',
-        'python-daemon',
-        'setuptools'
-        ],
-      dependency_links = [
+          'apache-libcloud>=0.16',
+          'arvados-python-client>=0.1.20150206225333',
+          'future',
+          'pykka',
+          'python-daemon',
+          'setuptools'
+      ],
+      dependency_links=[
           "https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev4.zip"
       ],
       test_suite='tests',
-      tests_require=['pbr<1.7.0', 'mock>=1.0', "apache-libcloud==0.18.1.dev4"],
+      tests_require=[
+          'requests',
+          'pbr<1.7.0',
+          'mock>=1.0',
+          'apache-libcloud==0.18.1.dev4',
+      ],
       zip_safe=False,
       cmdclass={'egg_info': tagger},
       )
index 84e061d867ff42033fd526e92440695702a3dd8c..d47dbdfa0306d82a27232f1099f8902085513b75 100644 (file)
@@ -123,16 +123,15 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         cloud_node = testutil.cloud_node_mock(
             2, metadata=start_metadata.copy(),
             zone=testutil.cloud_object_mock('testzone'))
+        self.driver_mock().ex_get_node.return_value = cloud_node
         driver = self.new_driver()
         driver.sync_node(cloud_node, arv_node)
-        args, kwargs = self.driver_mock().connection.async_request.call_args
-        self.assertEqual('/zones/testzone/instances/2/setMetadata', args[0])
-        for key in ['kind', 'fingerprint']:
-            self.assertEqual(start_metadata[key], kwargs['data'][key])
+        args, kwargs = self.driver_mock().ex_set_node_metadata.call_args
+        self.assertEqual(cloud_node, args[0])
         plain_metadata['hostname'] = 'compute1.zzzzz.arvadosapi.com'
         self.assertEqual(
             plain_metadata,
-            {item['key']: item['value'] for item in kwargs['data']['items']})
+            {item['key']: item['value'] for item in args[1]})
 
     def test_sync_node_updates_hostname_tag(self):
         self.check_sync_node_updates_hostname_tag(
@@ -145,9 +144,7 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         arv_node = testutil.arvados_node_mock(8)
         cloud_node = testutil.cloud_node_mock(
             9, metadata={}, zone=testutil.cloud_object_mock('failzone'))
-        mock_response = self.driver_mock().connection.async_request()
-        mock_response.success.return_value = False
-        mock_response.error = 'sync error test'
+        mock_response = self.driver_mock().ex_set_node_metadata.side_effect = (Exception('sync error test'),)
         driver = self.new_driver()
         with self.assertRaises(Exception) as err_check:
             driver.sync_node(cloud_node, arv_node)
index e49fc39eed3dad01be48004047341ec650a81a5e..04ff9b6d79962922ea8a3327edc726db528b524e 100644 (file)
@@ -9,9 +9,11 @@ import mock
 import pykka
 
 import arvnodeman.daemon as nmdaemon
+import arvnodeman.status as status
 from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
+from . import test_status
 import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
@@ -355,10 +357,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
         self.assertTrue(self.node_shutdown.start.called,
                         "daemon did not shut down booted node on offer")
 
+        with test_status.TestServer() as srv:
+            self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
+            self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
+            self.assertEqual(0, srv.get_status().get('nodes_wish', None))
+
     def test_booted_node_lifecycle(self):
         cloud_node = testutil.cloud_node_mock(6)
         setup = self.start_node_boot(cloud_node, id_num=6)
index 285aa03c7deaa84a07ab6407151f77f5f2b08a31..b947c955723ab53dcfa36e9694c16c4973560b2b 100644 (file)
@@ -26,7 +26,7 @@ class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
     def ping(self):
         # Called by WatchdogActorTest, this delay is longer than the test timeout
         # of 1 second, which should cause the watchdog ping to fail.
-        time.sleep(2)
+        time.sleep(4)
         return True
 
 class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
diff --git a/services/nodemanager/tests/test_status.py b/services/nodemanager/tests/test_status.py
new file mode 100644 (file)
index 0000000..c11fe40
--- /dev/null
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import requests
+import unittest
+
+import arvnodeman.status as status
+import arvnodeman.config as config
+
+
+class TestServer(object):
+    def __enter__(self):
+        cfg = config.NodeManagerConfig()
+        cfg.set('Manage', 'port', '0')
+        cfg.set('Manage', 'address', '127.0.0.1')
+        self.srv = status.Server(cfg)
+        self.srv.start()
+        addr, port = self.srv.server_address
+        self.srv_base = 'http://127.0.0.1:'+str(port)
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.srv.shutdown()
+
+    def get_status_response(self):
+        return requests.get(self.srv_base+'/status.json')
+
+    def get_status(self):
+        return self.get_status_response().json()
+
+
+class StatusServerUpdates(unittest.TestCase):
+    def test_updates(self):
+        with TestServer() as srv:
+            for n in [1, 2, 3]:
+                status.tracker.update({'nodes_'+str(n): n})
+                r = srv.get_status_response()
+                self.assertEqual(200, r.status_code)
+                self.assertEqual('application/json', r.headers['content-type'])
+                resp = r.json()
+                self.assertEqual(n, resp['nodes_'+str(n)])
+            self.assertEqual(1, resp['nodes_1'])
+
+
+class StatusServerDisabled(unittest.TestCase):
+    def test_config_disabled(self):
+        cfg = config.NodeManagerConfig()
+        cfg.set('Manage', 'port', '-1')
+        cfg.set('Manage', 'address', '127.0.0.1')
+        self.srv = status.Server(cfg)
+        self.srv.start()
+        self.assertFalse(self.srv.enabled)
+        self.assertFalse(getattr(self.srv, '_thread', False))