7709: Merge branch 'master' into 7709-api-rails4
authorTom Clegg <tom@curoverse.com>
Tue, 25 Apr 2017 05:14:23 +0000 (01:14 -0400)
committerTom Clegg <tom@curoverse.com>
Tue, 25 Apr 2017 05:14:23 +0000 (01:14 -0400)
Conflicts:
services/api/db/structure.sql

94 files changed:
apps/workbench/app/assets/javascripts/edit_collection.js [new file with mode: 0644]
apps/workbench/app/assets/javascripts/selection.js.erb
apps/workbench/app/assets/stylesheets/collections.css.scss
apps/workbench/app/controllers/keep_disks_controller.rb
apps/workbench/app/controllers/users_controller.rb
apps/workbench/app/helpers/application_helper.rb
apps/workbench/app/models/arvados_base.rb
apps/workbench/app/models/arvados_resource_list.rb
apps/workbench/app/models/job.rb
apps/workbench/app/models/pipeline_instance.rb
apps/workbench/app/models/proxy_work_unit.rb
apps/workbench/app/models/workflow.rb
apps/workbench/app/views/application/_content.html.erb
apps/workbench/app/views/collections/_extra_tab_line_buttons.html.erb [new file with mode: 0644]
apps/workbench/app/views/collections/_show_files.html.erb
apps/workbench/app/views/container_requests/_extra_tab_line_buttons.html.erb
apps/workbench/app/views/workflows/_show_recent.html.erb [new file with mode: 0644]
apps/workbench/test/integration/collection_upload_test.rb
apps/workbench/test/integration/collections_test.rb
apps/workbench/test/integration/work_units_test.rb
apps/workbench/test/integration_helper.rb
apps/workbench/test/unit/arvados_resource_list_test.rb
apps/workbench/test/unit/link_test.rb
apps/workbench/test/unit/pipeline_instance_test.rb
apps/workbench/test/unit/work_unit_test.rb
build/build.list
build/run-build-packages-sso.sh
build/run-build-packages.sh
build/run-library.sh
build/run-tests.sh
doc/_includes/_navbar_top.liquid
doc/install/install-nodemanager.html.textile.liquid
doc/user/cwl/cwl-extensions.html.textile.liquid
docker/migrate-docker19/build.sh
docker/migrate-docker19/dnd.sh
docker/migrate-docker19/migrate.sh
sdk/cli/bin/crunch-job
sdk/cli/test/binstub_arv-mount/arv-mount [new file with mode: 0755]
sdk/cli/test/binstub_clean_fail/arv-mount [moved from sdk/cli/test/binstub_clean_fail/mount with 100% similarity]
sdk/cli/test/test_crunch-job.rb
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arv-cwl-schema.yml
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/fsaccess.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/test_with_arvbox.sh
sdk/cwl/tests/arvados-tests.yml
sdk/cwl/tests/listing-job.yml [new file with mode: 0644]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_fsaccess.py [new file with mode: 0644]
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_pathmapper.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/tmp1/tmp2/tmp3/.gitkeep [new file with mode: 0644]
sdk/cwl/tests/wf/listing_deep.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/listing_none.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/listing_shallow.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/scatter2.cwl
sdk/cwl/tests/wf/scatter2_subwf.cwl
sdk/go/crunchrunner/crunchrunner.go
sdk/go/crunchrunner/crunchrunner_test.go
sdk/go/dispatch/throttle_test.go
sdk/python/arvados/_ranges.py
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/get.py
sdk/python/arvados/commands/migrate19.py
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_get.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_stream.py
services/api/app/models/node.rb
services/api/db/migrate/20170419173031_add_created_by_job_task_index_to_job_tasks.rb [new file with mode: 0644]
services/api/db/migrate/20170419173712_add_object_owner_index_to_logs.rb [new file with mode: 0644]
services/api/db/migrate/20170419175801_add_requesting_container_index_to_container_requests.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/unit/node_test.rb
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/nodemanager/arvnodeman/config.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/status.py [new file with mode: 0644]
services/nodemanager/doc/azure.example.cfg
services/nodemanager/doc/ec2.example.cfg
services/nodemanager/doc/gce.example.cfg
services/nodemanager/doc/local.example.cfg
services/nodemanager/setup.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_status.py [new file with mode: 0644]

diff --git a/apps/workbench/app/assets/javascripts/edit_collection.js b/apps/workbench/app/assets/javascripts/edit_collection.js
new file mode 100644 (file)
index 0000000..5f0c3b4
--- /dev/null
@@ -0,0 +1,45 @@
+// On loading of a collection, enable the "lock" button and
+// disable all file modification controls (upload, rename, delete)
+$(document).
+    ready(function(event) {
+        $(".btn-collection-file-control").addClass("disabled");
+        $(".btn-collection-rename-file-span").attr("title", "Unlock collection to rename file");
+        $(".btn-collection-remove-file-span").attr("title", "Unlock collection to remove file");
+        $(".btn-remove-selected-files").attr("title", "Unlock collection to remove selected files");
+        $(".tab-pane-Upload").addClass("disabled");
+        $(".tab-pane-Upload").attr("title", "Unlock collection to upload files");
+        $("#Upload-tab").attr("data-toggle", "disabled");
+    }).
+    on('click', '.lock-collection-btn', function(event) {
+        classes = $(event.target).attr('class')
+
+        if (classes.indexOf("fa-lock") != -1) {
+            // About to unlock; warn and get confirmation from user
+            if (confirm("Adding, renaming, and deleting files changes the portable data hash. Are you sure you want to unlock the collection?")) {
+                $(".lock-collection-btn").removeClass("fa-lock");
+                $(".lock-collection-btn").addClass("fa-unlock");
+                $(".lock-collection-btn").attr("title", "Lock collection to prevent editing files");
+                $(".btn-collection-rename-file-span").attr("title", "");
+                $(".btn-collection-remove-file-span").attr("title", "");
+                $(".btn-collection-file-control").removeClass("disabled");
+                $(".btn-remove-selected-files").attr("title", "");
+                $(".tab-pane-Upload").removeClass("disabled");
+                $(".tab-pane-Upload").attr("data-original-title", "");
+                $("#Upload-tab").attr("data-toggle", "tab");
+            } else {
+                // User clicked "no" and so do not unlock
+            }
+        } else {
+            // Lock it back
+            $(".lock-collection-btn").removeClass("fa-unlock");
+            $(".lock-collection-btn").addClass("fa-lock");
+            $(".lock-collection-btn").attr("title", "Unlock collection to edit files");
+            $(".btn-collection-rename-file-span").attr("title", "Unlock collection to rename file");
+            $(".btn-collection-remove-file-span").attr("title", "Unlock collection to remove file");
+            $(".btn-collection-file-control").addClass("disabled");
+            $(".btn-remove-selected-files").attr("title", "Unlock collection to remove selected files");
+            $(".tab-pane-Upload").addClass("disabled");
+            $(".tab-pane-Upload").attr("data-original-title", "Unlock collection to upload files");
+            $("#Upload-tab").attr("data-toggle", "disabled");
+        }
+    });
index 5c69c50c119b5dd62c930b3b2144c109812d6871..f60bef7ddb432cda137c935f395b1a46abe997ad 100644 (file)
@@ -53,6 +53,8 @@ function dispatch_selection_action() {
 function enable_disable_selection_actions() {
     var $container = $(this);
     var $checked = $('.persistent-selection:checkbox:checked', $container);
+    var collection_lock_classes = $('.lock-collection-btn').attr('class')
+
     $('[data-selection-action]', $container).
         closest('div.btn-group-sm').
         find('ul li').
@@ -74,6 +76,11 @@ function enable_disable_selection_actions() {
         toggleClass('disabled',
                     ($checked.filter('[value*=-4zz18-]').length < 1) ||
                     ($checked.length != $checked.filter('[value*=-4zz18-]').length));
+    $('[data-selection-action=remove-selected-files]', $container).
+        closest('li').
+        toggleClass('disabled',
+                    ($checked.length < 0) ||
+                    !($checked.length > 0 && collection_lock_classes && collection_lock_classes.indexOf("fa-unlock") !=-1));
 }
 
 $(document).
index 35c2249ecf0540cea0eda2b4516fd4a58ebc38c7..2d2d0f25d12b462aeeed99f48646f53a4ee9a06d 100644 (file)
@@ -64,3 +64,9 @@ $active-bg: #39b3d7;
 .btn-group.toggle-persist .btn-info.active {
     background-color: $active-bg;
 }
+
+.lock-collection-btn {
+    display: inline-block;
+    padding: .5em 2em;
+    margin: 0 1em;
+}
index f57455b37fd6895262267bfc75e2df8e6dced594..bbf18e591bbb5f873a0c680f248f06eb85f72cdc 100644 (file)
@@ -32,6 +32,7 @@ class KeepDisksController < ApplicationController
     histogram_log = Log.
       filter([[:event_type, '=', 'block-age-free-space-histogram']]).
       order(:created_at => :desc).
+      with_count('none').
       limit(1)
     histogram_log.each do |log_entry|
       # We expect this block to only execute at most once since we
index d2fcbbb94c7385def6753fb0feee877d9ad7f13d..1a1c5f802152ffe4cdb7ff009fce0bb02c70cbc1 100644 (file)
@@ -102,6 +102,7 @@ class UsersController < ApplicationController
         filter([[:object_uuid, '=', u.uuid],
                 [:event_type, '=', 'user-storage-report']]).
         order(:created_at => :desc).
+        with_count('none').
         limit(1)
       storage_log.each do |log_entry|
         # We expect this block to only execute once since we specified limit(1)
index 056f12f6c8e70faa77f45d5f5d356439d528a1ec..41b33706d1b72d8afad059437ed5cfe1748701bf 100644 (file)
@@ -607,6 +607,7 @@ module ApplicationHelper
 
   RESOURCE_CLASS_ICONS = {
     "Collection" => "fa-archive",
+    "ContainerRequest" => "fa-gears",
     "Group" => "fa-users",
     "Human" => "fa-male",  # FIXME: Use a more inclusive icon.
     "Job" => "fa-gears",
@@ -621,6 +622,7 @@ module ApplicationHelper
     "Trait" => "fa-clipboard",
     "User" => "fa-user",
     "VirtualMachine" => "fa-terminal",
+    "Workflow" => "fa-gears",
   }
   DEFAULT_ICON_CLASS = "fa-cube"
 
index 6250daa06a3d0c65d2233f51c33588a9de3855a5..5d6a4c94b9bebb581d771ce4b0557f3ba828e119 100644 (file)
@@ -144,6 +144,10 @@ class ArvadosBase < ActiveRecord::Base
     ArvadosResourceList.new(self).select(*args)
   end
 
+  def self.with_count(*args)
+    ArvadosResourceList.new(self).with_count(*args)
+  end
+
   def self.distinct(*args)
     ArvadosResourceList.new(self).distinct(*args)
   end
index 27069706fb36e1f5f56b2853ff0b93216e9637ae..35dcde38da24cabc5290e24e0a99131f467181b4 100644 (file)
@@ -84,6 +84,13 @@ class ArvadosResourceList
     self
   end
 
+  # with_count sets the 'count' parameter to 'exact' or 'none' -- see
+  # https://doc.arvados.org/api/methods.html#index
+  def with_count(count_param='exact')
+    @count = count_param
+    self
+  end
+
   def fetch_multiple_pages(f)
     @fetch_multiple_pages = f
     self
@@ -178,6 +185,7 @@ class ArvadosResourceList
     api_params = {
       _method: 'GET'
     }
+    api_params[:count] = @count if @count
     api_params[:where] = @cond if @cond
     api_params[:eager] = '1' if @eager
     api_params[:select] = @select if @select
index 346aef35d27b835fd54a018bfd6ce05ae83de0d9..128440d52612c7b748eb91da7f14fba640045d0b 100644 (file)
@@ -43,7 +43,7 @@ class Job < ArvadosBase
   end
 
   def stderr_log_query(limit=nil)
-    query = Log.where(object_uuid: self.uuid).order("created_at DESC")
+    query = Log.where(object_uuid: self.uuid).order("created_at DESC").with_count('none')
     query = query.limit(limit) if limit
     query
   end
index b6e0ef17aef886e624f5a9398827091a9d189863..1c14efffa673ea20d94da71a71230bdcc5ffa5cb 100644 (file)
@@ -103,9 +103,10 @@ class PipelineInstance < ArvadosBase
 
   def stderr_log_query(limit=nil)
     query = Log.
-      where(event_type: "stderr",
-            object_uuid: stderr_log_object_uuids).
-      order("id DESC")
+            with_count('none').
+            where(event_type: "stderr",
+                  object_uuid: stderr_log_object_uuids).
+            order("created_at DESC")
     unless limit.nil?
       query = query.limit(limit)
     end
index 42304843d32d3c998235adb523819fb3ae080cdd..b7cc6a0f196951f19472ace07acc37d244500b94 100644 (file)
@@ -309,6 +309,7 @@ class ProxyWorkUnit < WorkUnit
     Log.where(object_uuid: log_object_uuids).
       order("created_at DESC").
       limit(limit).
+      with_count('none').
       select { |log| log.properties[:text].is_a? String }.
       reverse.
       flat_map { |log| log.properties[:text].split("\n") }
index 553f1410316ed14ec0a9d4a9c0074792231a934b..c59125fd08fa2700540b973623192950b6dbb646 100644 (file)
@@ -2,4 +2,12 @@ class Workflow < ArvadosBase
   def self.goes_in_projects?
     true
   end
+
+  def self.creatable?
+    false
+  end
+
+  def textile_attributes
+    [ 'description' ]
+  end
 end
index 9441a46c26d067f423188db099d240087f29a191..a22608d3c9e93f06e72314ab66d83f37964c8cb0 100644 (file)
@@ -29,7 +29,7 @@
         end
       %>
 
-      <li class="<%= 'active' if i==0 %> <%= link_disabled %>" data-toggle="tooltip" data-placement="top" title="<%=tab_tooltip%>">
+      <li class="<%= 'active' if i==0 %> <%= link_disabled %> tab-pane-<%=pane_name%>" data-toggle="tooltip" data-placement="top" title="<%=tab_tooltip%>">
         <a href="#<%= pane_name %>"
            id="<%= pane_name %>-tab"
            data-toggle="<%= data_toggle %>"
diff --git a/apps/workbench/app/views/collections/_extra_tab_line_buttons.html.erb b/apps/workbench/app/views/collections/_extra_tab_line_buttons.html.erb
new file mode 100644 (file)
index 0000000..fe62f6d
--- /dev/null
@@ -0,0 +1,3 @@
+<% if @object.editable? %>
+  <i class="fa fa-fw fa-lock lock-collection-btn btn btn-primary" title="Unlock collection to edit files"></i>
+<% end %>
index d39c81b2b16d2b9b8331226695f8df97f0ac0cec..7c7777d472e3cb48ff5b28a2c7d67d16e30a638d 100644 (file)
@@ -33,7 +33,8 @@
                     'data-href' => url_for(controller: 'collections', action: :remove_selected_files),
                     'data-selection-param-name' => 'selection[]',
                     'data-selection-action' => 'remove-selected-files',
-                    'data-toggle' => 'dropdown'
+                    'data-toggle' => 'dropdown',
+                    'class' => 'btn-remove-selected-files'
               %></li>
             <% end %>
           </ul>
             <% end %>
 
             <% if object.editable? %>
-                <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate', title: "Remove #{file_path}") do %>
+                <span class="btn-collection-remove-file-span">
+                <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate btn-collection-file-control', title: 'Remove this file') do %>
                   <i class="fa fa-fw fa-trash-o"></i>
                 <% end %>
+                </span>
             <% end %>
         <% if CollectionsHelper::is_image(filename) %>
             <i class="fa fa-fw fa-bar-chart-o"></i>
               <% if object.editable? %>
-                <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'} %>
+                <span class="btn-collection-rename-file-span">
+                <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit name or directory or both for this file', btnclass: 'collection-file-control'} %>
+                </span>
               <% else %>
                 <%= filename %>
               <% end %>
          </div>
         <% else %>
               <% if object.editable? %>
-                <i class="fa fa-fw fa-file"></i><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'}  %>
+                <i class="fa fa-fw fa-file"></i><span class="btn-collection-rename-file-span"><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit name or directory or both for this file', btnclass: 'collection-file-control'}  %>
+                </span>
               <% else %>
                 <i class="fa fa-fw fa-file" href="<%=object.uuid%>/<%=file_path%>" ></i> <%= filename %>
               <% end %>
index 662309ffe8e11df264b31f769b43f8612edacf70..fd7953551729d722b8cff678feb73baadc81f7d2 100644 (file)
@@ -1,6 +1,6 @@
 <% if @object.state == 'Final' %>
   <%= link_to(copy_container_request_path('id' => @object.uuid),
-      class: 'btn btn-primary',
+      class: 'btn btn-sm btn-primary',
       title: 'Re-run',
       data: {toggle: :tooltip, placement: :top}, title: 'This will make a copy and take you there. You can then make any needed changes and run it',
       method: :post,
diff --git a/apps/workbench/app/views/workflows/_show_recent.html.erb b/apps/workbench/app/views/workflows/_show_recent.html.erb
new file mode 100644 (file)
index 0000000..94d39c1
--- /dev/null
@@ -0,0 +1,65 @@
+<%= render partial: "paging", locals: {results: @objects, object: @object} %>
+
+<table class="table table-condensed arv-index">
+  <colgroup>
+    <col width="10%" />
+    <col width="10%" />
+    <col width="25%" />
+    <col width="40%" />
+    <col width="15%" />
+  </colgroup>
+
+  <thead>
+    <tr class="contain-align-left">
+      <th></th>
+      <th></th>
+      <th> name </th>
+      <th> description </th>
+      <th> owner </th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <% @objects.sort_by { |ob| ob[:created_at] }.reverse.each do |ob| %>
+      <tr>
+        <td>
+          <%= button_to(choose_projects_path(id: "run-workflow-button",
+                                             title: 'Choose project',
+                                             editable: true,
+                                             action_name: 'Choose',
+                                             action_href: work_units_path,
+                                             action_method: 'post',
+                                             action_data: {'selection_param' => 'work_unit[owner_uuid]',
+                                                           'work_unit[template_uuid]' => ob.uuid,
+                                                           'success' => 'redirect-to-created-object'
+                                                          }.to_json),
+                  { class: "btn btn-default btn-xs", title: "Run #{ob.name}", remote: true, method: :get }
+              ) do %>
+                 <i class="fa fa-fw fa-play"></i> Run
+          <% end %>
+        </td>
+
+        <td>
+          <%= render :partial => "show_object_button", :locals => {object: ob, size: 'xs'} %>
+        </td>
+
+        <td>
+          <%= render_editable_attribute ob, 'name' %>
+        </td>
+
+        <td>
+          <% if ob.description %>
+            <%= render_attribute_as_textile(ob, "description", ob.description, false) %>
+            <br />
+          <% end %>
+        </td>
+
+        <td>
+          <%= link_to_if_arvados_object ob.owner_uuid, friendly_name: true %>
+        </td>
+      </tr>
+    <% end %>
+  </tbody>
+</table>
+
+<%= render partial: "paging", locals: {results: @objects, object: @object} %>
index 903df90fb419c7ba231ae3c42d679abc85af41ed..552a9cd5e8fc53af4a484fbbd9b84798fc86f59a 100644 (file)
@@ -41,6 +41,9 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
   test "Upload two empty files with the same name" do
     need_selenium "to make file uploads work"
     visit page_with_token 'active', sandbox_path
+
+    unlock_collection
+
     find('.nav-tabs a', text: 'Upload').click
     attach_file 'file_selector', testfile_path('empty.txt')
     assert_selector 'div', text: 'empty.txt'
@@ -55,6 +58,9 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
   test "Upload non-empty files" do
     need_selenium "to make file uploads work"
     visit page_with_token 'active', sandbox_path
+
+    unlock_collection
+
     find('.nav-tabs a', text: 'Upload').click
     attach_file 'file_selector', testfile_path('a')
     attach_file 'file_selector', testfile_path('foo.txt')
@@ -93,6 +99,9 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
                           service_port: 99999)
     end
     visit page_with_token 'active', sandbox_path
+
+    unlock_collection
+
     find('.nav-tabs a', text: 'Upload').click
     attach_file 'file_selector', testfile_path('foo.txt')
     assert_selector 'button:not([disabled])', text: 'Start'
@@ -128,4 +137,9 @@ class CollectionUploadTest < ActionDispatch::IntegrationTest
     # Must be an absolute path. https://github.com/jnicklas/capybara/issues/621
     File.join Dir.getwd, 'tmp', filename
   end
+
+  def unlock_collection
+    first('.lock-collection-btn').click
+    accept_alert
+  end
 end
index eb9c2e831a8866d7a25f3f89c9c1eb04ef00a003..8b43e5dbe32ba9d9c58d31a5ba2f2e24b0729fde 100644 (file)
@@ -300,9 +300,13 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   end
 
   test "remove a file from collection using checkbox and dropdown option" do
+    need_selenium 'to confirm unlock'
+
     visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
     assert(page.has_text?('file1'), 'file not found - file1')
 
+    unlock_collection
+
     # remove first file
     input_files = page.all('input[type=checkbox]')
     input_files[0].click
@@ -317,21 +321,27 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   end
 
   test "remove a file in collection using trash icon" do
-    need_selenium 'to confirm remove'
+    need_selenium 'to confirm unlock'
 
     visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
     assert(page.has_text?('file1'), 'file not found - file1')
 
+    unlock_collection
+
     first('.fa-trash-o').click
-    page.driver.browser.switch_to.alert.accept
+    accept_alert
 
     assert(page.has_no_text?('file1'), 'file found - file')
     assert(page.has_text?('file2'), 'file not found - file2')
   end
 
   test "rename a file in collection" do
+    need_selenium 'to confirm unlock'
+
     visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
 
+    unlock_collection
+
     within('.collection_files') do
       first('.fa-pencil').click
       find('.editable-input input').set('file1renamed')
@@ -357,4 +367,57 @@ class CollectionsTest < ActionDispatch::IntegrationTest
       assert_nil first('.fa-trash-o')
     end
   end
+
+  test "unlock collection to modify files" do
+    need_selenium 'to confirm remove'
+
+    collection = api_fixture('collections')['collection_owned_by_active']
+
+    # On load, collection is locked, and upload tab, rename and remove options are disabled
+    visit page_with_token('active', "/collections/#{collection['uuid']}")
+
+    assert_selector 'a[data-toggle="disabled"]', text: 'Upload'
+
+    within('.collection_files') do
+      file_ctrls = page.all('.btn-collection-file-control')
+      assert_equal 2, file_ctrls.size
+      assert_equal true, file_ctrls[0]['class'].include?('disabled')
+      assert_equal true, file_ctrls[1]['class'].include?('disabled')
+      find('input[type=checkbox]').click
+    end
+
+    click_button 'Selection'
+    within('.selection-action-container') do
+      assert_selector 'li.disabled', text: 'Remove selected files'
+      assert_selector 'li', text: 'Create new collection with selected files'
+    end
+
+    unlock_collection
+
+    assert_no_selector 'a[data-toggle="disabled"]', text: 'Upload'
+    assert_selector 'a', text: 'Upload'
+
+    within('.collection_files') do
+      file_ctrls = page.all('.btn-collection-file-control')
+      assert_equal 2, file_ctrls.size
+      assert_equal false, file_ctrls[0]['class'].include?('disabled')
+      assert_equal false, file_ctrls[1]['class'].include?('disabled')
+
+      # previous checkbox selection won't result in firing a new event;
+      # undo and redo checkbox to fire the selection event again
+      find('input[type=checkbox]').click
+      find('input[type=checkbox]').click
+    end
+
+    click_button 'Selection'
+    within('.selection-action-container') do
+      assert_no_selector 'li.disabled', text: 'Remove selected files'
+      assert_selector 'li', text: 'Remove selected files'
+    end
+  end
+
+  def unlock_collection
+    first('.lock-collection-btn').click
+    accept_alert
+  end
 end
index 91b382d1bd0402e25488ec80e08e9c61ea1c745d..f9f5addb1561849b34f0f80ae659417468dc797e 100644 (file)
@@ -254,4 +254,28 @@ class WorkUnitsTest < ActionDispatch::IntegrationTest
       end
     end
   end
+
+  test 'Run from workflows index page' do
+    visit page_with_token('active', '/workflows')
+
+    wf_count = page.all('a[data-original-title="show workflow"]').count
+    assert_equal true, wf_count>0
+
+    # Run one of the workflows
+    wf_name = 'Workflow with input specifications'
+    within('tr', text: wf_name) do
+      find('a,button', text: 'Run').click
+    end
+
+    # Choose project for the container_request being created
+    within('.modal-dialog') do
+      find('.selectable', text: 'A Project').click
+      find('button', text: 'Choose').click
+    end
+
+    # In newly created container_request page now
+    assert_text 'A Project' # CR created in "A Project"
+    assert_text "This container request was created from the workflow #{wf_name}"
+    assert_match /Provide a value for .* then click the \"Run\" button to start the workflow/, page.text
+  end
 end
index 3d92585135ff3db934c78cc6a092bf74a8b39054..067a1bdae845f98114df7a1c9cbfacc81966db45 100644 (file)
@@ -221,4 +221,19 @@ class ActionDispatch::IntegrationTest
     end
     Capybara.reset_sessions!
   end
+
+  def accept_alert
+    if Capybara.current_driver == :selenium
+      (0..9).each do
+        begin
+          page.driver.browser.switch_to.alert.accept
+          break
+        rescue Selenium::WebDriver::Error::NoSuchAlertError
+         sleep 0.1
+        end
+      end
+    else
+      # poltergeist returns true for confirm, so no need to accept
+    end
+  end
 end
index a3bfbc19f4105918f453f4baf1ab0d0ffda13c5c..cfdf8f92445ec1b415dce7c2e86ff6a70f646834 100644 (file)
@@ -2,6 +2,8 @@ require 'test_helper'
 
 class ResourceListTest < ActiveSupport::TestCase
 
+  reset_api_fixtures :after_each_test, false
+
   test 'links_for on a resource list that does not return links' do
     use_token :active
     results = Specimen.all
@@ -91,4 +93,10 @@ class ResourceListTest < ActiveSupport::TestCase
     assert_empty c.results
   end
 
+  test 'count=none' do
+    use_token :active
+    c = Collection.with_count('none')
+    assert_nil c.items_available
+    refute_empty c.results
+  end
 end
index 763633586ad7dcad6f0df4af598eb9231522454b..9687292741dbecf4940c73bee174c0e34aed47bd 100644 (file)
@@ -1,6 +1,9 @@
 require 'test_helper'
 
 class LinkTest < ActiveSupport::TestCase
+
+  reset_api_fixtures :after_each_test, false
+
   def uuid_for(fixture_name, object_name)
     api_fixture(fixture_name)[object_name]["uuid"]
   end
index 747cfc12e5f6b4b70d4e586cbb7bd17c824dd070..926bb686acb2ee56a5fbaf2cf372c3e22747e3ed 100644 (file)
@@ -1,6 +1,9 @@
 require 'test_helper'
 
 class PipelineInstanceTest < ActiveSupport::TestCase
+
+  reset_api_fixtures :after_each_test, false
+
   def find_pi_with(token_name, pi_name)
     use_token token_name
     find_fixture(PipelineInstance, pi_name)
index 564a5d3b5b4068025fb90297637ce0dd87ee4232..8bbbb5cf26a993b3e408f923f0899097b971d8eb 100644 (file)
@@ -1,6 +1,9 @@
 require 'test_helper'
 
 class WorkUnitTest < ActiveSupport::TestCase
+
+  reset_api_fixtures :after_each_test, false
+
   setup do
     Rails.configuration.anonymous_user_token = api_fixture('api_client_authorizations')['anonymous']['api_token']
   end
index e163167a3a086fcdee675640bc2f45dca6c21501..bb662dbbd52ad97aecf092af0aa8cbf973c69859 100644 (file)
@@ -40,3 +40,5 @@ all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-versi
 all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
 all|rdflib-jsonld|0.4.0|2|python|all
 all|futures|3.0.5|2|python|all
+all|future|0.16.0|2|python|all
+all|future|0.16.0|2|python3|all
index 264f27d12b0202a9267a548a73e684460f8f5aa3..053a6dfb30de1782b81ec0a545fcd1844c1b4673 100755 (executable)
@@ -77,6 +77,9 @@ case "$TARGET" in
     ubuntu1404)
         FORMAT=deb
         ;;
+    ubuntu1604)
+        FORMAT=deb
+        ;;
     centos7)
         FORMAT=rpm
         ;;
index 37e963b6474eea96e939015a86e09aaf03a2d13b..777cd3c844536fc43bc90e70a611c3e57ccd8feb 100755 (executable)
@@ -32,8 +32,6 @@ DEBUG=${ARVADOS_DEBUG:-0}
 TARGET=debian8
 COMMAND=
 
-RAILS_PACKAGE_ITERATION=7
-
 PARSEDOPTS=$(getopt --name "$0" --longoptions \
     help,build-bundle-packages,debug,target:,only-build: \
     -- "" "$@")
index a13470bc434a7f52ba82cda6b3af5a85be107b0d..3d619622e4c0ac681a76375bc90dc3299d87ae2c 100755 (executable)
@@ -7,6 +7,8 @@
 # older packages.
 LICENSE_PACKAGE_TS=20151208015500
 
+RAILS_PACKAGE_ITERATION=7
+
 debug_echo () {
     echo "$@" >"$STDOUT_IF_DEBUG"
 }
index b0897224dc27c1f22e3c6a7352285ac009a9e454..afaa834d3b2f676b4c50a320748d151cf9b20790 100755 (executable)
@@ -441,7 +441,7 @@ export PERLLIB="$PERLINSTALLBASE/lib/perl5:${PERLLIB:+$PERLLIB}"
 
 export GOPATH
 mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \
+ln -sfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \
     || fatal "symlink failed"
 
 setup_virtualenv "$VENVDIR" --python python2.7
index 6caf36a18882115027c288717a74146b8281dd49..4fd1edefe455155fcc11b2b094c7b7a6a4fcb3bd 100644 (file)
@@ -20,7 +20,7 @@
       </ul>
 
       <div class="pull-right" style="padding-top: 6px">
-        <form method="get" action="http://www.google.com/search">
+        <form method="get" action="https://www.google.com/search">
           <div class="input-group" style="width: 220px">
             <input type="text" class="form-control" name="q" placeholder="search">
             <div class="input-group-addon">
index baf7c2fc7c8e0c1b408d85e43cd36d3102147ac6..0cad10c5a92229a02a7263e0ae8e36fdcac7adca 100644 (file)
@@ -48,6 +48,16 @@ h3(#aws). Amazon Web Services
 # EC2 configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
@@ -213,6 +223,16 @@ h3(#gcp). Google Cloud Platform
 # Google Compute Engine configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # Node Manager will ensure that there are at least this many nodes running at
 # all times.  If node manager needs to start new idle nodes for the purpose of
@@ -380,6 +400,16 @@ h3(#azure). Microsoft Azure
 # Azure configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index 8a7e64baa84b86bda85f0dbc5595a8e6d2dda3b9..8aef8ac0c4a18efef44f06f5c4dbb5fe40a98710 100644 (file)
@@ -11,6 +11,7 @@ To use Arvados CWL extensions, add the following @$namespaces@ section at the to
 <pre>
 $namespaces:
   arv: "http://arvados.org/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
 </pre>
 
 Arvados extensions must go into the @hints@ section, for example:
@@ -24,6 +25,8 @@ hints:
   arv:PartitionRequirement:
     partition: dev_partition
   arv:APIRequirement: {}
+  cwltool:LoadListingRequirement:
+    loadListing: shallow_listing
 </pre>
 
 h2. arv:RunInSingleContainer
@@ -54,3 +57,17 @@ table(table table-bordered table-condensed).
 h2. arv:APIRequirement
 
 Indicates that process wants to access to the Arvados API.  Will be granted limited network access and have @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ set in the environment.
+
+h2. cwltool:LoadListingRequirement
+
+In CWL v1.0 documents, the default behavior for Directory objects is to recursively expand the @listing@ for access by parameter references an expressions.  For directory trees containing many files, this can be expensive in both time and memory usage.  Use @cwltool:LoadListingRequirement@ to change the behavior for expansion of directory listings in the workflow runner.
+
+table(table table-bordered table-condensed).
+|_. Field |_. Type |_. Description |
+|loadListing|string|One of @no_listing@, @shallow_listing@, or @deep_listing@|
+
+*no_listing*: Do not expand directory listing at all.  The @listing@ field on the Directory object will be undefined.
+
+*shallow_listing*: Only expand the first level of directory listing.  The @listing@ field on the toplevel Directory object will contain the directory contents, however @listing@ will not be defined on subdirectories.
+
+*deep_listing*: Recursively expand all levels of directory listing.  The @listing@ field will be provided on the toplevel object and all subdirectories.
index 3a36cd495ab870b66a0c8aa78fd44fdbffac5bc9..e563bfc7e018458c6c57f87a870866cd8ab7b37f 100755 (executable)
@@ -1,2 +1,2 @@
 #!/bin/sh
-exec docker build -t arvados/migrate-docker19 .
+exec docker build -t arvados/migrate-docker19:1.0 .
index ec6f1e3e12bbde76ecb1e2c5d3406c03d47ab8b2..f253f0be660e565880d085933395ad7197cf618a 100755 (executable)
@@ -96,4 +96,4 @@ rm -rf /var/run/docker.pid
 
 read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
 
-exec docker daemon --storage-driver=vfs $DOCKER_DAEMON_ARGS
+exec docker daemon --storage-driver=$1 $DOCKER_DAEMON_ARGS
index 8e39be45678aa357632dd7e4dd0bb147bcdeebc3..857678bde86b856c2f1ce48e8716e94bcc56d0cc 100755 (executable)
@@ -1,9 +1,46 @@
 #!/bin/bash
 
-set -e
+# This script is called by arv-migrate-docker19 to perform the actual migration
+# of a single image.  This works by running Docker-in-Docker (dnd.sh) to
+# download the image using Docker 1.9 and then upgrading to Docker 1.13 and
+# uploading the converted image.
+
+# When using bash in pid 1 and using "trap on EXIT"
+# it will sometimes go into an 100% CPU infinite loop.
+#
+# Using workaround from here:
+#
+# https://github.com/docker/docker/issues/4854
+if [ "$$" = 1 ]; then
+  $0 "$@"
+  exit $?
+fi
+
+# -x           show script
+# -e           exit on error
+# -o pipefail  use exit code from 1st failure in pipeline, not last
+set -x -e -o pipefail
 
+image_tar_keepref=$1
+image_id=$2
+image_repo=$3
+image_tag=$4
+project_uuid=$5
+graph_driver=$6
+
+if [[ "$image_repo" = "<none>" ]] ; then
+  image_repo=none
+  image_tag=latest
+fi
+
+# Print free space in /var/lib/docker
+function freespace() {
+    df -B1 /var/lib/docker | tail -n1 | sed 's/  */ /g' | cut -d' ' -f4
+}
+
+# Run docker-in-docker script and then wait for it to come up
 function start_docker {
-    /root/dnd.sh &
+    /root/dnd.sh $graph_driver &
     for i in $(seq 1 10) ; do
         if docker version >/dev/null 2>/dev/null ; then
             return
@@ -13,6 +50,7 @@ function start_docker {
     false
 }
 
+# Kill docker from pid then wait for it to be down
 function kill_docker {
     if test -f /var/run/docker.pid ; then
         kill $(cat /var/run/docker.pid)
@@ -26,37 +64,47 @@ function kill_docker {
     false
 }
 
+# Ensure that we clean up docker graph and/or lingering cache files on exit
 function cleanup {
     kill_docker
     rm -rf /var/lib/docker/*
     rm -rf /root/.cache/arvados/docker/*
+    echo "Available space after cleanup is $(freespace)"
 }
 
 trap cleanup EXIT
 
 start_docker
 
-image_tar_keepref=$1
-image_id=$2
-image_repo=$3
-image_tag=$4
-project_uuid=$5
+echo "Initial available space is $(freespace)"
 
 arv-get $image_tar_keepref | docker load
 
+
 docker tag $image_id $image_repo:$image_tag
 
 docker images -a
 
 kill_docker
 
+echo "Available space after image load is $(freespace)"
+
 cd /root/pkgs
 dpkg -i libltdl7_2.4.2-1.11+b1_amd64.deb docker-engine_1.13.1-0~debian-jessie_amd64.deb
 
+echo "Available space after image upgrade is $(freespace)"
+
 start_docker
 
 docker images -a
 
+if [[ "$image_repo" = "none" ]] ; then
+  image_repo=$(docker images -a --no-trunc | sed 's/  */ /g' | grep ^none | cut -d' ' -f3)
+  image_tag=""
+fi
+
 UUID=$(arv-keepdocker --force-image-format --project-uuid=$project_uuid $image_repo $image_tag)
 
+echo "Available space after arv-keepdocker is $(freespace)"
+
 echo "Migrated uuid is $UUID"
index e0b27107986a36abd1bd99759d758162c006a042..55e35b04b9cf933f702df4e109eded633230a15a 100755 (executable)
@@ -393,12 +393,12 @@ if (!defined $no_clear_tmp) {
   # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
   # up work directories crunch_tmp/work, crunch_tmp/opt,
   # crunch_tmp/src*.
-  #
-  # TODO: When #5036 is done and widely deployed, we can limit mount's
-  # -t option to simply fuse.keep.
   my ($exited, $stdout, $stderr) = srun_sync(
     ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-    ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+    ['bash', '-ec', q{
+arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
+rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
+    }],
     {label => "clean work dirs"});
   if ($exited != 0) {
     exit(EX_RETRY_UNLOCKED);
diff --git a/sdk/cli/test/binstub_arv-mount/arv-mount b/sdk/cli/test/binstub_arv-mount/arv-mount
new file mode 100755 (executable)
index 0000000..a9bf588
--- /dev/null
@@ -0,0 +1 @@
+#!/bin/bash
index 847f994f486842b69ad2436ea767649cfb1a0c64..a0fed6ee32b90022b20d3666e2272155ad40db01 100644 (file)
@@ -99,7 +99,7 @@ class TestCrunchJob < Minitest::Test
   def test_output_collection_owner_uuid
     j = jobspec :grep_local
     out, err = capture_subprocess_io do
-      tryjobrecord j, binstubs: ['output_coll_owner']
+      tryjobrecord j, binstubs: ['arv-mount', 'output_coll_owner']
     end
     assert_match /owner_uuid: #{j['owner_uuid']}/, err
   end
@@ -120,7 +120,7 @@ class TestCrunchJob < Minitest::Test
     out, err = capture_subprocess_io do
       j = jobspec :grep_local
       j[:script_version] = bogus_version
-      tryjobrecord j
+      tryjobrecord j, binstubs: ['arv-mount']
     end
     assert_match /'#{bogus_version}' not found, giving up/, err
     assert_jobfail $?
index 1e109920fabaae4fa208615cbbaabd8f9161c3a1..3c7de77ebf7825a8ec3334f8b756460c3d0f73c9 100644 (file)
@@ -32,14 +32,14 @@ from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
 from .perf import Perf
 from .pathmapper import NoFollowPathMapper
 from ._version import __version__
 
 from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, getListing
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
 from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
@@ -80,6 +80,8 @@ class ArvCwlRunner(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
+        self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
         self.work_api = None
         expected_api = ["jobs", "containers"]
         for api in expected_api:
@@ -102,7 +104,8 @@ class ArvCwlRunner(object):
         kwargs["work_api"] = self.work_api
         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
                                                 api_client=self.api,
-                                                keep_client=self.keep_client)
+                                                fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+                                                num_retries=self.num_retries)
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
             return ArvadosCommandTool(self, toolpath_object, **kwargs)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
@@ -203,12 +206,6 @@ class ArvCwlRunner(object):
         if isinstance(obj, dict):
             if obj.get("writable"):
                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
-            if obj.get("class") == "CommandLineTool":
-                if self.work_api == "containers":
-                    if obj.get("stdin"):
-                        raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
-                    if obj.get("stderr"):
-                        raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
             if obj.get("class") == "DockerRequirement":
                 if obj.get("dockerOutputDirectory"):
                     # TODO: can be supported by containers API, but not jobs API.
@@ -237,7 +234,6 @@ class ArvCwlRunner(object):
                                               keep_client=self.keep_client,
                                               num_retries=self.num_retries)
 
-        srccollections = {}
         for k,v in generatemapper.items():
             if k.startswith("_:"):
                 if v.type == "Directory":
@@ -251,20 +247,13 @@ class ArvCwlRunner(object):
                 raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
-            if srccollection not in srccollections:
-                try:
-                    srccollections[srccollection] = arvados.collection.CollectionReader(
-                        srccollection,
-                        api_client=self.api,
-                        keep_client=self.keep_client,
-                        num_retries=self.num_retries)
-                except arvados.errors.ArgumentError as e:
-                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
-                    raise
-            reader = srccollections[srccollection]
             try:
+                reader = self.collection_cache.get(srccollection)
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+            except arvados.errors.ArgumentError as e:
+                logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                raise
             except IOError as e:
                 logger.warn("While preparing output collection: %s", e)
 
@@ -337,8 +326,7 @@ class ArvCwlRunner(object):
         self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
-                                                                 api_client=self.api,
-                                                                 keep_client=self.keep_client)
+                                                                 collection_cache=self.collection_cache)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
         if not kwargs.get("name"):
@@ -521,7 +509,7 @@ class ArvCwlRunner(object):
             self.set_crunch_output()
 
         if kwargs.get("compute_checksum"):
-            adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
+            adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return (self.final_output, self.final_status)
@@ -629,18 +617,19 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     return parser
 
 def add_arv_hints():
-    cache = {}
-    cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+    cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
-    cache["http://arvados.org/cwl"] = res.read()
+    use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
-    document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
-    _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
-    for n in extnames.names:
-        if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
-            cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
-        document_loader.idx["http://arvados.org/cwl#"+n] = {}
+    cwltool.process.supportedProcessRequirements.extend([
+        "http://arvados.org/cwl#RunInSingleContainer",
+        "http://arvados.org/cwl#OutputDirType",
+        "http://arvados.org/cwl#RuntimeConstraints",
+        "http://arvados.org/cwl#PartitionRequirement",
+        "http://arvados.org/cwl#APIRequirement",
+        "http://commonwl.org/cwltool#LoadListingRequirement"
+    ])
 
 def main(args, stdout, stderr, api_client=None, keep_client=None):
     parser = arg_parser()
@@ -707,6 +696,9 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     arvargs.relax_path_checks = True
     arvargs.validate = None
 
+    make_fs_access = partial(CollectionFsAccess,
+                           collection_cache=runner.collection_cache)
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
@@ -714,12 +706,11 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
                              makeTool=runner.arv_make_tool,
                              versionfunc=versionstring,
                              job_order_object=job_order_object,
-                             make_fs_access=partial(CollectionFsAccess,
-                                                    api_client=api_client,
-                                                    keep_client=keep_client),
+                             make_fs_access=make_fs_access,
                              fetcher_constructor=partial(CollectionFetcher,
                                                          api_client=api_client,
-                                                         keep_client=keep_client,
+                                                         fs_access=make_fs_access(""),
                                                          num_retries=runner.num_retries),
                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
-                             logger_handler=arvados.log_handler)
+                             logger_handler=arvados.log_handler,
+                             custom_schema_callback=add_arv_hints)
index 3a6eb4769779ab0ea2b88aa8c96ed776194863b7..4e48216f32aefb962df7c93ea69769df9c9762a7 100644 (file)
@@ -1,7 +1,32 @@
 $base: "http://arvados.org/cwl#"
+$namespaces:
+  cwl: "https://w3id.org/cwl/cwl#"
+  cwltool: "http://commonwl.org/cwltool#"
 $graph:
+- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
+
+- name: cwltool:LoadListingRequirement
+  type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
+  fields:
+    class:
+      type: string
+      doc: "Always 'LoadListingRequirement'"
+      jsonldPredicate:
+        "_id": "@type"
+        "_type": "@vocab"
+    loadListing:
+      type:
+        - "null"
+        - type: enum
+          name: LoadListingEnum
+          symbols: [no_listing, shallow_listing, deep_listing]
+
 - name: RunInSingleContainer
   type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
   doc: |
     Indicates that a subworkflow should run in a single container
     and not be scheduled as separate steps.
@@ -38,6 +63,8 @@ $graph:
 
 - name: RuntimeConstraints
   type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
   doc: |
     Set Arvados-specific runtime hints.
   fields:
@@ -62,6 +89,8 @@ $graph:
 
 - name: PartitionRequirement
   type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
   doc: |
     Select preferred compute partitions on which to run jobs.
   fields:
@@ -72,6 +101,8 @@ $graph:
 
 - name: APIRequirement
   type: record
+  extends: cwl:ProcessRequirement
+  inVocab: false
   doc: |
     Indicates that process wants to access to the Arvados API.  Will be granted
     limited network access and have ARVADOS_API_HOST and ARVADOS_API_TOKEN set
index 28107b491ca45f760813c03cefa63bb9c693b331..9e76cf711ecaaf81c6e5e42131ea3d717da00c01 100644 (file)
@@ -54,7 +54,7 @@ class ArvadosContainer(object):
 
         dirs = set()
         for f in self.pathmapper.files():
-            pdh, p, tp = self.pathmapper.mapper(f)
+            pdh, p, tp, stg = self.pathmapper.mapper(f)
             if tp == "Directory" and '/' not in pdh:
                 mounts[p] = {
                     "kind": "collection",
@@ -63,7 +63,7 @@ class ArvadosContainer(object):
                 dirs.add(pdh)
 
         for f in self.pathmapper.files():
-            res, p, tp = self.pathmapper.mapper(f)
+            res, p, tp, stg = self.pathmapper.mapper(f)
             if res.startswith("keep:"):
                 res = res[5:]
             elif res.startswith("/keep/"):
@@ -115,10 +115,14 @@ class ArvadosContainer(object):
             container_request["environment"].update(self.environment)
 
         if self.stdin:
-            raise UnsupportedRequirement("Stdin redirection currently not suppported")
+            sp = self.stdin[6:].split("/", 1)
+            mounts["stdin"] = {"kind": "collection",
+                                "portable_data_hash": sp[0],
+                                "path": sp[1]}
 
         if self.stderr:
-            raise UnsupportedRequirement("Stderr redirection currently not suppported")
+            mounts["stderr"] = {"kind": "file",
+                                "path": "%s/%s" % (self.outdir, self.stderr)}
 
         if self.stdout:
             mounts["stdout"] = {"kind": "file",
index f33619391d89ef2583d406a6ed25a22cb87f5159..9ea4824557654c365bd5612f4293ce14e360179c 100644 (file)
@@ -19,11 +19,11 @@ import re
 import functools
 
 from arvados.api import OrderedJsonModel
-from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
+from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, normalizeFilesDirs
 from cwltool.load_tool import load_tool
 from cwltool.errors import WorkflowException
 
-from .fsaccess import CollectionFetcher
+from .fsaccess import CollectionFetcher, CollectionFsAccess
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -64,7 +64,6 @@ def run():
         adjustFileObjs(job_order_object, keeppathObj)
         adjustDirObjs(job_order_object, keeppathObj)
         normalizeFilesDirs(job_order_object)
-        adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
 
         output_name = None
         output_tags = None
@@ -89,10 +88,14 @@ def run():
         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
                                           output_name=output_name, output_tags=output_tags)
 
+        make_fs_access = functools.partial(CollectionFsAccess,
+                                 collection_cache=runner.collection_cache)
+
         t = load_tool(toolpath, runner.arv_make_tool,
                       fetcher_constructor=functools.partial(CollectionFetcher,
-                                                            api_client=api,
-                                                            keep_client=arvados.keep.KeepClient(api_client=api, num_retries=4)))
+                                                  api_client=runner.api,
+                                                  fs_access=make_fs_access(""),
+                                                  num_retries=runner.num_retries))
 
         args = argparse.Namespace()
         args.project_uuid = arvados.current_job()["owner_uuid"]
@@ -105,6 +108,8 @@ def run():
         args.basedir = os.getcwd()
         args.name = None
         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
+        args.make_fs_access = make_fs_access
+
         runner.arv_executor(t, job_order_object, **vars(args))
     except Exception as e:
         if isinstance(e, WorkflowException):
index 3a3d16073833a6367876b5833b54cbb8f35584e7..534a675525ae60d90c08559e337dddc5b7d78934 100644 (file)
@@ -4,6 +4,7 @@ import errno
 import urlparse
 import re
 import logging
+import threading
 
 import ruamel.yaml as yaml
 
@@ -20,24 +21,35 @@ from schema_salad.ref_resolver import DefaultFetcher
 
 logger = logging.getLogger('arvados.cwl-runner')
 
+class CollectionCache(object):
+    def __init__(self, api_client, keep_client, num_retries):
+        self.api_client = api_client
+        self.keep_client = keep_client
+        self.collections = {}
+        self.lock = threading.Lock()
+
+    def get(self, pdh):
+        with self.lock:
+            if pdh not in self.collections:
+                logger.debug("Creating collection reader for %s", pdh)
+                self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
+                                                                            keep_client=self.keep_client)
+            return self.collections[pdh]
+
+
 class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
     """Implement the cwltool FsAccess interface for Arvados Collections."""
 
-    def __init__(self, basedir, api_client=None, keep_client=None):
+    def __init__(self, basedir, collection_cache=None):
         super(CollectionFsAccess, self).__init__(basedir)
-        self.api_client = api_client
-        self.keep_client = keep_client
-        self.collections = {}
+        self.collection_cache = collection_cache
 
     def get_collection(self, path):
         sp = path.split("/", 1)
         p = sp[0]
         if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
             pdh = p[5:]
-            if pdh not in self.collections:
-                self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
-                                                                            keep_client=self.keep_client)
-            return (self.collections[pdh], sp[1] if len(sp) == 2 else None)
+            return (self.collection_cache.get(pdh), sp[1] if len(sp) == 2 else None)
         else:
             return (None, path)
 
@@ -137,10 +149,10 @@ class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
             return os.path.realpath(path)
 
 class CollectionFetcher(DefaultFetcher):
-    def __init__(self, cache, session, api_client=None, keep_client=None, num_retries=4):
+    def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
         super(CollectionFetcher, self).__init__(cache, session)
         self.api_client = api_client
-        self.fsaccess = CollectionFsAccess("", api_client=api_client, keep_client=keep_client)
+        self.fsaccess = fs_access
         self.num_retries = num_retries
 
     def fetch_text(self, url):
index a8619a8598a538d5ba7353390bc63e316a76a648..cddb4088b7bfcbbdb211d6785ba650e8ea36901e 100644 (file)
@@ -31,36 +31,39 @@ class ArvPathMapper(PathMapper):
 
     def visit(self, srcobj, uploadfiles):
         src = srcobj["location"]
-        if srcobj["class"] == "File":
-            if "#" in src:
-                src = src[:src.index("#")]
-            if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
-                self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), "File")
-            if src not in self._pathmap:
+        if "#" in src:
+            src = src[:src.index("#")]
+
+        if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+            self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
+
+        if src not in self._pathmap:
+            if src.startswith("file:"):
                 # Local FS ref, may need to be uploaded or may be on keep
                 # mount.
                 ab = abspath(src, self.input_basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s")
+                st = arvados.commands.run.statfile("", ab,
+                                                   fnPattern="keep:%s/%s",
+                                                   dirPattern="keep:%s/%s")
                 with SourceLine(srcobj, "location", WorkflowException):
                     if isinstance(st, arvados.commands.run.UploadFile):
                         uploadfiles.add((src, ab, st))
                     elif isinstance(st, arvados.commands.run.ArvFile):
-                        self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File")
-                    elif src.startswith("_:"):
-                        if "contents" in srcobj:
-                            pass
-                        else:
-                            raise WorkflowException("File literal '%s' is missing contents" % src)
-                    elif src.startswith("arvwf:"):
-                        self._pathmap[src] = MapperEnt(src, src, "File")
+                        self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
                     else:
                         raise WorkflowException("Input file path '%s' is invalid" % st)
-            if "secondaryFiles" in srcobj:
-                for l in srcobj["secondaryFiles"]:
-                    self.visit(l, uploadfiles)
-        elif srcobj["class"] == "Directory":
-            if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
-                self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), "Directory")
+            elif src.startswith("_:"):
+                if srcobj["class"] == "File" and "contents" not in srcobj:
+                    raise WorkflowException("File literal '%s' is missing `contents`" % src)
+                if srcobj["class"] == "Directory" and "listing" not in srcobj:
+                    raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+            else:
+                self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+
+        with SourceLine(srcobj, "secondaryFiles", WorkflowException):
+            for l in srcobj.get("secondaryFiles", []):
+                self.visit(l, uploadfiles)
+        with SourceLine(srcobj, "listing", WorkflowException):
             for l in srcobj.get("listing", []):
                 self.visit(l, uploadfiles)
 
@@ -73,7 +76,7 @@ class ArvPathMapper(PathMapper):
             for l in obj.get("secondaryFiles", []):
                 self.addentry(l, c, path, subdirs)
         elif obj["class"] == "Directory":
-            for l in obj["listing"]:
+            for l in obj.get("listing", []):
                 self.addentry(l, c, path + "/" + obj["basename"], subdirs)
             subdirs.append((obj["location"], path + "/" + obj["basename"]))
         elif obj["location"].startswith("_:") and "contents" in obj:
@@ -91,7 +94,7 @@ class ArvPathMapper(PathMapper):
             loc = k["location"]
             if loc in already_uploaded:
                 v = already_uploaded[loc]
-                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File")
+                self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File", True)
 
         for srcobj in referenced_files:
             self.visit(srcobj, uploadfiles)
@@ -106,17 +109,18 @@ class ArvPathMapper(PathMapper):
                                              project=self.arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
-            self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:], "File")
+            self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+                                           "Directory" if os.path.isdir(ab) else "File", True)
             self.arvrunner.add_uploaded(src, self._pathmap[src])
 
         for srcobj in referenced_files:
+            subdirs = []
             if srcobj["class"] == "Directory":
                 if srcobj["location"] not in self._pathmap:
                     c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                       keep_client=self.arvrunner.keep_client,
                                                       num_retries=self.arvrunner.num_retries)
-                    subdirs = []
-                    for l in srcobj["listing"]:
+                    for l in srcobj.get("listing", []):
                         self.addentry(l, c, ".", subdirs)
 
                     check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
@@ -124,17 +128,13 @@ class ArvPathMapper(PathMapper):
                         c.save_new(owner_uuid=self.arvrunner.project_uuid)
 
                     ab = self.collection_pattern % c.portable_data_hash()
-                    self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "Directory")
-                    for loc, sub in subdirs:
-                        ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
-                        self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+                    self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
             elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
                 (srcobj["location"].startswith("_:") and "contents" in srcobj)):
 
                 c = arvados.collection.Collection(api_client=self.arvrunner.api,
                                                   keep_client=self.arvrunner.keep_client,
                                                   num_retries=self.arvrunner.num_retries                                                  )
-                subdirs = []
                 self.addentry(srcobj, c, ".", subdirs)
 
                 check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
@@ -142,13 +142,18 @@ class ArvPathMapper(PathMapper):
                     c.save_new(owner_uuid=self.arvrunner.project_uuid)
 
                 ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
-                self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "File")
+                self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
+                                                              ab, "File", True)
                 if srcobj.get("secondaryFiles"):
                     ab = self.collection_pattern % c.portable_data_hash()
-                    self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt(ab, ab, "Directory")
+                    self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
+
+            if subdirs:
                 for loc, sub in subdirs:
+                    # subdirs will all start with "./", strip it off
                     ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
-                    self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+                    self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
+                                                   ab, "Directory", True)
 
         self.keepdir = None
 
@@ -163,24 +168,24 @@ class ArvPathMapper(PathMapper):
 class StagingPathMapper(PathMapper):
     _follow_dirs = True
 
-    def visit(self, obj, stagedir, basedir, copy=False):
+    def visit(self, obj, stagedir, basedir, copy=False, staged=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
         loc = obj["location"]
         tgt = os.path.join(stagedir, obj["basename"])
         if obj["class"] == "Directory":
-            self._pathmap[loc] = MapperEnt(loc, tgt, "Directory")
+            self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
             if loc.startswith("_:") or self._follow_dirs:
                 self.visitlisting(obj.get("listing", []), tgt, basedir)
         elif obj["class"] == "File":
             if loc in self._pathmap:
                 return
             if "contents" in obj and loc.startswith("_:"):
-                self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
+                self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
             else:
                 if copy:
-                    self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile")
+                    self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
                 else:
-                    self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+                    self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
                 self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
 
 
@@ -192,9 +197,9 @@ class VwdPathMapper(StagingPathMapper):
         # with any secondary files.
         self.visitlisting(referenced_files, self.stagedir, basedir)
 
-        for path, (ab, tgt, type) in self._pathmap.items():
+        for path, (ab, tgt, type, staged) in self._pathmap.items():
             if type in ("File", "Directory") and ab.startswith("keep:"):
-                self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
+                self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
 
 
 class NoFollowPathMapper(StagingPathMapper):
index 1c10fe8196aba940495934169d559bb756cfda31..57a672389c740e00b33a8e47d8f3eb419fed33f8 100644 (file)
@@ -265,7 +265,7 @@ class Runner(object):
         if submit_runner_ram:
             self.submit_runner_ram = submit_runner_ram
         else:
-            self.submit_runner_ram = 1024
+            self.submit_runner_ram = 3000
 
         if self.submit_runner_ram <= 0:
             raise Exception("Value of --submit-runner-ram must be greater than zero")
index b8ed68296511fe5fe2ab7f0f4ee5bce33400bc63..2b1167148aa8902723590abd522aed67f25e4d86 100644 (file)
@@ -48,11 +48,11 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20170224141733',
-          'schema-salad==2.2.20170222151604',
+          'cwltool==1.0.20170413194156',
+          'schema-salad==2.5.20170328195758',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
-          'arvados-python-client>=0.1.20170112173420',
+          'arvados-python-client>=0.1.20170327195441',
           'setuptools'
       ],
       data_files=[
index b5a3e9930de668f4a7d64e6d40fa364dc5e75fa3..8d076093abfe6b6fcbb52ccea421763d76aa38b6 100755 (executable)
@@ -74,6 +74,8 @@ export ARVADOS_API_HOST=localhost:8000
 export ARVADOS_API_HOST_INSECURE=1
 export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
 
+arv-keepdocker --pull arvados/jobs latest
+
 cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
 #!/bin/sh
 exec arvados-cwl-runner --api=jobs --compute-checksum \\\$@
index 1beadb9de4e4e4a3897f34f57f704caff75a30ac..87528b2ae595df94a73b24ff9b3e931336edf067 100644 (file)
   output: {}
   tool: cat.cwl
   doc: Test hashes in filenames
+
+- job: listing-job.yml
+  output: {
+    "out": {
+        "class": "File",
+        "location": "output.txt",
+        "size": 5,
+        "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+    }
+  }
+  tool: wf/listing_shallow.cwl
+  doc: test shallow directory listing
+
+- job: listing-job.yml
+  output: {
+    "out": {
+        "class": "File",
+        "location": "output.txt",
+        "size": 5,
+        "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+    }
+  }
+  tool: wf/listing_none.cwl
+  doc: test no directory listing
+
+- job: listing-job.yml
+  output: {
+    "out": {
+        "class": "File",
+        "location": "output.txt",
+        "size": 5,
+        "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+    }
+  }
+  tool: wf/listing_deep.cwl
+  doc: test deep directory listing
diff --git a/sdk/cwl/tests/listing-job.yml b/sdk/cwl/tests/listing-job.yml
new file mode 100644 (file)
index 0000000..b885342
--- /dev/null
@@ -0,0 +1,3 @@
+d:
+  class: Directory
+  location: tmp1
\ No newline at end of file
index ad69371605a9ffaec4bfd1abb99296cfe565f489..08b50e0d342ca67371ead9d6434e49570d55673f 100644 (file)
@@ -41,7 +41,8 @@ class TestContainer(unittest.TestCase):
                 "baseCommand": "ls",
                 "arguments": [{"valueFrom": "$(runtime.outdir)"}]
             })
-            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}))
             arvtool.formatgraph = None
@@ -107,7 +108,8 @@ class TestContainer(unittest.TestCase):
             }],
             "baseCommand": "ls"
         })
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
                                                  loader=Loader({}))
@@ -208,7 +210,8 @@ class TestContainer(unittest.TestCase):
             }],
             "baseCommand": "ls"
         })
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
                                                  loader=Loader({}))
@@ -275,6 +278,78 @@ class TestContainer(unittest.TestCase):
         for key in call_body:
             self.assertEqual(call_body_expected.get(key), call_body.get(key))
 
+
+    # Test redirecting stdin/stdout/stderr
+    @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+    def test_redirects(self, keepdocker):
+        arv_docker_clear_cache()
+
+        runner = mock.MagicMock()
+        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        runner.ignore_docker_for_reuse = False
+
+        keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+        runner.api.collections().get().execute.return_value = {
+            "portable_data_hash": "99999999999999999999999999999993+99"}
+
+        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+        tool = cmap({
+            "inputs": [],
+            "outputs": [],
+            "baseCommand": "ls",
+            "stdout": "stdout.txt",
+            "stderr": "stderr.txt",
+            "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
+            "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+        })
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
+                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+        arvtool.formatgraph = None
+        for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
+                             make_fs_access=make_fs_access, tmpdir="/tmp"):
+            j.run()
+            runner.api.container_requests().create.assert_called_with(
+                body=JsonDiffMatcher({
+                    'environment': {
+                        'HOME': '/var/spool/cwl',
+                        'TMPDIR': '/tmp'
+                    },
+                    'name': 'test_run_redirect',
+                    'runtime_constraints': {
+                        'vcpus': 1,
+                        'ram': 1073741824
+                    },
+                    'use_existing': True,
+                    'priority': 1,
+                    'mounts': {
+                        '/var/spool/cwl': {'kind': 'tmp'},
+                        "stderr": {
+                            "kind": "file",
+                            "path": "/var/spool/cwl/stderr.txt"
+                        },
+                        "stdin": {
+                            "kind": "collection",
+                            "path": "file.txt",
+                            "portable_data_hash": "99999999999999999999999999999996+99"
+                        },
+                        "stdout": {
+                            "kind": "file",
+                            "path": "/var/spool/cwl/stdout.txt"
+                        },
+                    },
+                    'state': 'Committed',
+                    'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+                    'output_path': '/var/spool/cwl',
+                    'container_image': 'arvados/jobs',
+                    'command': ['ls', '/var/spool/cwl'],
+                    'cwd': '/var/spool/cwl',
+                    'scheduling_parameters': {},
+                    'properties': {},
+                }))
+
     @mock.patch("arvados.collection.Collection")
     def test_done(self, col):
         api = mock.MagicMock()
diff --git a/sdk/cwl/tests/test_fsaccess.py b/sdk/cwl/tests/test_fsaccess.py
new file mode 100644 (file)
index 0000000..4395661
--- /dev/null
@@ -0,0 +1,28 @@
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+
+import arvados
+import arvados.keep
+import arvados.collection
+import arvados_cwl
+
+from cwltool.pathmapper import MapperEnt
+from .mock_discovery import get_rootDesc
+
+from arvados_cwl.fsaccess import CollectionCache
+
+class TestFsAccess(unittest.TestCase):
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_collection_cache(self, cr):
+        cache = CollectionCache(mock.MagicMock(), mock.MagicMock(), 4)
+        c1 = cache.get("99999999999999999999999999999991+99")
+        c2 = cache.get("99999999999999999999999999999991+99")
+        self.assertIs(c1, c2)
+        self.assertEqual(1, cr.call_count)
+        c3 = cache.get("99999999999999999999999999999992+99")
+        self.assertEqual(2, cr.call_count)
index 076514b1e96f48b6ae887bf8b1e6f2c0893123bc..3061e2fb7c1d1515e09fa7a13f9c364d046cfb05 100644 (file)
@@ -41,7 +41,8 @@ class TestJob(unittest.TestCase):
                 "baseCommand": "ls",
                 "arguments": [{"valueFrom": "$(runtime.outdir)"}]
             })
-            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}))
             arvtool.formatgraph = None
@@ -108,7 +109,8 @@ class TestJob(unittest.TestCase):
             }],
             "baseCommand": "ls"
         }
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
                                                  make_fs_access=make_fs_access, loader=Loader({}))
         arvtool.formatgraph = None
@@ -264,7 +266,8 @@ class TestWorkflow(unittest.TestCase):
 
         mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
 
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
                                               makeTool=runner.arv_make_tool, metadata=metadata)
index 3b6af04b293e8f48e320fa6508716d3c6d27faf6..b39a9842845f7af01c64d62c713819ecaf1aca1c 100644 (file)
@@ -19,6 +19,7 @@ from arvados_cwl.pathmapper import ArvPathMapper
 def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
     pdh = "99999999999999999999999999999991+99"
     for c in files:
+        c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
         c.fn = fnPattern % (pdh, os.path.basename(c.fn))
 
 class TestPathmap(unittest.TestCase):
@@ -36,23 +37,29 @@ class TestPathmap(unittest.TestCase):
             "location": "keep:99999999999999999999999999999991+99/hw.py"
         }], "", "/test/%s", "/test/%s/%s")
 
-        self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+        self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
 
     @mock.patch("arvados.commands.run.uploadfiles")
-    def test_upload(self, upl):
+    @mock.patch("arvados.commands.run.statfile")
+    def test_upload(self, statfile, upl):
         """Test pathmapper uploading files."""
 
         arvrunner = arvados_cwl.ArvCwlRunner(self.api)
 
+        def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
+            st = arvados.commands.run.UploadFile("", "tests/hw.py")
+            return st
+
         upl.side_effect = upload_mock
+        statfile.side_effect = statfile_mock
 
         p = ArvPathMapper(arvrunner, [{
             "class": "File",
-            "location": "tests/hw.py"
+            "location": "file:tests/hw.py"
         }], "", "/test/%s", "/test/%s/%s")
 
-        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+        self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
 
     @mock.patch("arvados.commands.run.uploadfiles")
@@ -60,16 +67,16 @@ class TestPathmap(unittest.TestCase):
         """Test pathmapper handling previously uploaded files."""
 
         arvrunner = arvados_cwl.ArvCwlRunner(self.api)
-        arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
+        arvrunner.add_uploaded('file:tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File', staged=True))
 
         upl.side_effect = upload_mock
 
         p = ArvPathMapper(arvrunner, [{
             "class": "File",
-            "location": "tests/hw.py"
+            "location": "file:tests/hw.py"
         }], "", "/test/%s", "/test/%s/%s")
 
-        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+        self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
 
     @mock.patch("arvados.commands.run.uploadfiles")
@@ -89,8 +96,8 @@ class TestPathmap(unittest.TestCase):
 
         p = ArvPathMapper(arvrunner, [{
             "class": "File",
-            "location": "tests/hw.py"
+            "location": "file:tests/hw.py"
         }], "", "/test/%s", "/test/%s/%s")
 
-        self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+        self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
                          p._pathmap)
index 9982f6ffd5955109904ed2fc726c0d0b35d495ec..85c49c913427b26d9207926ed407065125bfe9c0 100644 (file)
@@ -822,18 +822,8 @@ class TestSubmit(unittest.TestCase):
         arvrunner.project_uuid = ""
         api.return_value = mock.MagicMock()
         arvrunner.api = api.return_value
-        arvrunner.api.links().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
-                                                            {"items": [], "items_available": 0, "offset": 0},
-                                                            {"items": [], "items_available": 0, "offset": 0},
-                                                            {"items": [{"created_at": "",
-                                                                        "head_uuid": "",
-                                                                        "link_class": "docker_image_hash",
-                                                                        "name": "123456",
-                                                                        "owner_uuid": "",
-                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
-                                                            {"items": [], "items_available": 0, "offset": 0},
-                                                            {"items": [{"created_at": "",
-                                                                        "head_uuid": "",
+        arvrunner.api.links().list().execute.side_effect = ({"items": [{"created_at": "",
+                                                                        "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
                                                                         "link_class": "docker_image_repo+tag",
                                                                         "name": "arvados/jobs:"+arvados_cwl.__version__,
                                                                         "owner_uuid": "",
@@ -843,19 +833,18 @@ class TestSubmit(unittest.TestCase):
                                                                         "link_class": "docker_image_hash",
                                                                         "name": "123456",
                                                                         "owner_uuid": "",
-                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}                                                            ,
+                                                                        "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}
         )
         find_one_image_hash.return_value = "123456"
 
-        arvrunner.api.collections().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
-                                                                  {"items": [{"uuid": "",
+        arvrunner.api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
                                                                               "owner_uuid": "",
                                                                               "manifest_text": "",
                                                                               "properties": ""
-                                                                          }], "items_available": 1, "offset": 0},
-                                                                  {"items": [{"uuid": ""}], "items_available": 1, "offset": 0})
+                                                                          }], "items_available": 1, "offset": 0},)
         arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
-        self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+        self.assertEqual("arvados/jobs:"+arvados_cwl.__version__,
+                         arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
 
 class TestCreateTemplate(unittest.TestCase):
     existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
diff --git a/sdk/cwl/tests/tmp1/tmp2/tmp3/.gitkeep b/sdk/cwl/tests/tmp1/tmp2/tmp3/.gitkeep
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/sdk/cwl/tests/wf/listing_deep.cwl b/sdk/cwl/tests/wf/listing_deep.cwl
new file mode 100644 (file)
index 0000000..79ab368
--- /dev/null
@@ -0,0 +1,15 @@
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+  cwltool: "http://commonwl.org/cwltool#"
+requirements:
+  cwltool:LoadListingRequirement:
+    loadListing: deep_listing
+  InlineJavascriptRequirement: {}
+inputs:
+  d: Directory
+outputs:
+  out: stdout
+stdout: output.txt
+arguments:
+  [echo, "${if(inputs.d.listing[0].class === 'Directory' && inputs.d.listing[0].listing[0].class === 'Directory') {return 'true';} else {return 'false';}}"]
diff --git a/sdk/cwl/tests/wf/listing_none.cwl b/sdk/cwl/tests/wf/listing_none.cwl
new file mode 100644 (file)
index 0000000..584c8f5
--- /dev/null
@@ -0,0 +1,15 @@
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+  cwltool: http://commonwl.org/cwltool#
+requirements:
+  cwltool:LoadListingRequirement:
+    loadListing: no_listing
+  InlineJavascriptRequirement: {}
+inputs:
+  d: Directory
+outputs:
+  out: stdout
+stdout: output.txt
+arguments:
+  [echo, "${if(inputs.d.listing === undefined) {return 'true';} else {return 'false';}}"]
\ No newline at end of file
diff --git a/sdk/cwl/tests/wf/listing_shallow.cwl b/sdk/cwl/tests/wf/listing_shallow.cwl
new file mode 100644 (file)
index 0000000..b54f348
--- /dev/null
@@ -0,0 +1,15 @@
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+  cwltool: http://commonwl.org/cwltool#
+requirements:
+  cwltool:LoadListingRequirement:
+    loadListing: shallow_listing
+  InlineJavascriptRequirement: {}
+inputs:
+  d: Directory
+outputs:
+  out: stdout
+stdout: output.txt
+arguments:
+  [echo, "${if(inputs.d.listing[0].class === 'Directory' && inputs.d.listing[0].listing === undefined) {return 'true';} else {return 'false';}}"]
index f73ec2b13d3c4919dd99755bae65bbda8ff560a3..83b3537d528ea55446f26e7862ece767333fd8db 100644 (file)
@@ -44,6 +44,7 @@ steps:
           out: [out]
           run:
             class: CommandLineTool
+            id: subtool
             inputs:
               sleeptime:
                 type: int
index df4d992c359a42eeb619826aa14ce4b87aeab19d..dd4c7054a94c5c9dc73785eafbbc36e2bdff55b5 100644 (file)
           "run": {
             "baseCommand": "sleep",
             "class": "CommandLineTool",
+            "id": "#main/sleep1/subtool",
             "inputs": [
               {
-                "id": "#main/sleep1/sleeptime",
+                "id": "#main/sleep1/subtool/sleeptime",
                 "inputBinding": {
                   "position": 1
                 },
@@ -61,7 +62,7 @@
             ],
             "outputs": [
               {
-                "id": "#main/sleep1/out",
+                "id": "#main/sleep1/subtool/out",
                 "outputBinding": {
                   "outputEval": "out"
                 },
index 5d7e10be4beb34fef1892b2d2d7c150fd9906176..999a19371dba49925215ab94ed0f6eef3466c4f3 100644 (file)
@@ -3,9 +3,6 @@ package main
 import (
        "encoding/json"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io"
        "io/ioutil"
        "log"
@@ -14,6 +11,10 @@ import (
        "os/signal"
        "strings"
        "syscall"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
 type TaskDef struct {
@@ -34,17 +35,17 @@ type Tasks struct {
 }
 
 type Job struct {
-       Script_parameters Tasks `json:"script_parameters"`
+       ScriptParameters Tasks `json:"script_parameters"`
 }
 
 type Task struct {
-       Job_uuid                 string  `json:"job_uuid"`
-       Created_by_job_task_uuid string  `json:"created_by_job_task_uuid"`
-       Parameters               TaskDef `json:"parameters"`
-       Sequence                 int     `json:"sequence"`
-       Output                   string  `json:"output"`
-       Success                  bool    `json:"success"`
-       Progress                 float32 `json:"sequence"`
+       JobUUID              string  `json:"job_uuid"`
+       CreatedByJobTaskUUID string  `json:"created_by_job_task_uuid"`
+       Parameters           TaskDef `json:"parameters"`
+       Sequence             int     `json:"sequence"`
+       Output               string  `json:"output"`
+       Success              bool    `json:"success"`
+       Progress             float32 `json:"sequence"`
 }
 
 type IArvadosClient interface {
@@ -52,7 +53,7 @@ type IArvadosClient interface {
        Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
 }
 
-func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
+func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
        tmpdir = crunchtmpdir + "/tmpdir"
        err = os.Mkdir(tmpdir, 0700)
        if err != nil {
@@ -228,7 +229,7 @@ func getKeepTmp(outdir string) (manifest string, err error) {
 
 func runner(api IArvadosClient,
        kc IKeepClient,
-       jobUuid, taskUuid, crunchtmpdir, keepmount string,
+       jobUUID, taskUUID, crunchtmpdir, keepmount string,
        jobStruct Job, taskStruct Task) error {
 
        var err error
@@ -237,34 +238,35 @@ func runner(api IArvadosClient,
        // If this is task 0 and there are multiple tasks, dispatch subtasks
        // and exit.
        if taskStruct.Sequence == 0 {
-               if len(jobStruct.Script_parameters.Tasks) == 1 {
-                       taskp = jobStruct.Script_parameters.Tasks[0]
+               if len(jobStruct.ScriptParameters.Tasks) == 1 {
+                       taskp = jobStruct.ScriptParameters.Tasks[0]
                } else {
-                       for _, task := range jobStruct.Script_parameters.Tasks {
+                       for _, task := range jobStruct.ScriptParameters.Tasks {
                                err := api.Create("job_tasks",
                                        map[string]interface{}{
-                                               "job_task": Task{Job_uuid: jobUuid,
-                                                       Created_by_job_task_uuid: taskUuid,
-                                                       Sequence:                 1,
-                                                       Parameters:               task}},
+                                               "job_task": Task{
+                                                       JobUUID:              jobUUID,
+                                                       CreatedByJobTaskUUID: taskUUID,
+                                                       Sequence:             1,
+                                                       Parameters:           task}},
                                        nil)
                                if err != nil {
                                        return TempFail{err}
                                }
                        }
-                       err = api.Update("job_tasks", taskUuid,
+                       err = api.Update("job_tasks", taskUUID,
                                map[string]interface{}{
-                                       "job_task": Task{
-                                               Output:   "",
-                                               Success:  true,
-                                               Progress: 1.0}},
+                                       "job_task": map[string]interface{}{
+                                               "output":   "",
+                                               "success":  true,
+                                               "progress": 1.0}},
                                nil)
                        return nil
                }
        }
 
        var tmpdir, outdir string
-       tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput)
+       tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
        if err != nil {
                return TempFail{err}
        }
@@ -370,7 +372,7 @@ func runner(api IArvadosClient,
        }
 
        // Set status
-       err = api.Update("job_tasks", taskUuid,
+       err = api.Update("job_tasks", taskUUID,
                map[string]interface{}{
                        "job_task": Task{
                                Output:   manifest,
@@ -394,19 +396,19 @@ func main() {
                log.Fatal(err)
        }
 
-       jobUuid := os.Getenv("JOB_UUID")
-       taskUuid := os.Getenv("TASK_UUID")
+       jobUUID := os.Getenv("JOB_UUID")
+       taskUUID := os.Getenv("TASK_UUID")
        tmpdir := os.Getenv("TASK_WORK")
        keepmount := os.Getenv("TASK_KEEPMOUNT")
 
        var jobStruct Job
        var taskStruct Task
 
-       err = api.Get("jobs", jobUuid, nil, &jobStruct)
+       err = api.Get("jobs", jobUUID, nil, &jobStruct)
        if err != nil {
                log.Fatal(err)
        }
-       err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+       err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
        if err != nil {
                log.Fatal(err)
        }
@@ -418,7 +420,7 @@ func main() {
        }
 
        syscall.Umask(0022)
-       err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+       err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
 
        if err == nil {
                os.Exit(0)
index f4c8193696104be204b525b20a8a57ef01e13082..fcc77090ae5492401f47f9fbc7cd9e609e4c3e1a 100644 (file)
@@ -1,8 +1,6 @@
 package main
 
 import (
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       . "gopkg.in/check.v1"
        "io"
        "io/ioutil"
        "log"
@@ -10,6 +8,9 @@ import (
        "syscall"
        "testing"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       . "gopkg.in/check.v1"
 )
 
 // Gocheck boilerplate
@@ -53,7 +54,7 @@ func (s *TestSuite) TestSimpleRun(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"echo", "foo"}}}}},
                Task{Sequence: 0})
        c.Check(err, IsNil)
@@ -88,7 +89,7 @@ func (s *TestSuite) TestSimpleRunSubtask(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{
+               Job{ScriptParameters: Tasks{[]TaskDef{
                        {Command: []string{"echo", "bar"}},
                        {Command: []string{"echo", "foo"}}}}},
                Task{Parameters: TaskDef{
@@ -118,7 +119,7 @@ func (s *TestSuite) TestRedirect(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"cat"},
                        Stdout:  "output.txt",
                        Stdin:   tmpfile.Name()}}}},
@@ -140,7 +141,7 @@ func (s *TestSuite) TestEnv(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"/bin/sh", "-c", "echo $BAR"},
                        Stdout:  "output.txt",
                        Env:     map[string]string{"BAR": "foo"}}}}},
@@ -161,7 +162,7 @@ func (s *TestSuite) TestEnvSubstitute(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "foo\n",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"/bin/sh", "-c", "echo $BAR"},
                        Stdout:  "output.txt",
                        Env:     map[string]string{"BAR": "$(task.keep)"}}}}},
@@ -182,7 +183,7 @@ func (s *TestSuite) TestEnvReplace(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"/bin/sh", "-c", "echo $PATH"},
                        Stdout:  "output.txt",
                        Env:     map[string]string{"PATH": "foo"}}}}},
@@ -211,14 +212,14 @@ func (t SubtaskTestClient) Update(resourceType string, uuid string, parameters a
 func (s *TestSuite) TestScheduleSubtask(c *C) {
 
        api := SubtaskTestClient{c, []Task{
-               {Job_uuid: "zzzz-8i9sb-111111111111111",
-                       Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
-                       Sequence:                 1,
+               {JobUUID: "zzzz-8i9sb-111111111111111",
+                       CreatedByJobTaskUUID: "zzzz-ot0gb-111111111111111",
+                       Sequence:             1,
                        Parameters: TaskDef{
                                Command: []string{"echo", "bar"}}},
-               {Job_uuid: "zzzz-8i9sb-111111111111111",
-                       Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
-                       Sequence:                 1,
+               {JobUUID: "zzzz-8i9sb-111111111111111",
+                       CreatedByJobTaskUUID: "zzzz-ot0gb-111111111111111",
+                       Sequence:             1,
                        Parameters: TaskDef{
                                Command: []string{"echo", "foo"}}}},
                0}
@@ -233,7 +234,7 @@ func (s *TestSuite) TestScheduleSubtask(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{
+               Job{ScriptParameters: Tasks{[]TaskDef{
                        {Command: []string{"echo", "bar"}},
                        {Command: []string{"echo", "foo"}}}}},
                Task{Sequence: 0})
@@ -252,7 +253,7 @@ func (s *TestSuite) TestRunFail(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
                Task{Sequence: 0})
        c.Check(err, FitsTypeOf, PermFail{})
@@ -269,7 +270,7 @@ func (s *TestSuite) TestRunSuccessCode(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command:      []string{"/bin/sh", "-c", "exit 1"},
                        SuccessCodes: []int{0, 1}}}}},
                Task{Sequence: 0})
@@ -287,7 +288,7 @@ func (s *TestSuite) TestRunFailCode(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command:            []string{"/bin/sh", "-c", "exit 0"},
                        PermanentFailCodes: []int{0, 1}}}}},
                Task{Sequence: 0})
@@ -305,7 +306,7 @@ func (s *TestSuite) TestRunTempFailCode(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command:            []string{"/bin/sh", "-c", "exit 1"},
                        TemporaryFailCodes: []int{1}}}}},
                Task{Sequence: 0})
@@ -329,7 +330,7 @@ func (s *TestSuite) TestVwd(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"ls", "output.txt"},
                        Vwd: map[string]string{
                                "output.txt": tmpfile.Name()}}}}},
@@ -361,7 +362,7 @@ func (s *TestSuite) TestSubstitutionStdin(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                keepmount,
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"cat"},
                        Stdout:  "output.txt",
                        Stdin:   "$(task.keep)/file1.txt"}}}},
@@ -389,7 +390,7 @@ func (s *TestSuite) TestSubstitutionCommandLine(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                keepmount,
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"cat", "$(task.keep)/file1.txt"},
                        Stdout:  "output.txt"}}}},
                Task{Sequence: 0})
@@ -417,7 +418,7 @@ func (s *TestSuite) TestSignal(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"sleep", "4"}}}}},
                Task{Sequence: 0})
        c.Check(err, FitsTypeOf, PermFail{})
@@ -437,7 +438,7 @@ func (s *TestSuite) TestQuoting(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command: []string{"echo", "foo"},
                        Stdout:  "s ub:dir/:e vi\nl"}}}},
                Task{Sequence: 0})
@@ -464,7 +465,7 @@ func (s *TestSuite) TestKeepTmp(c *C) {
                "zzzz-ot0gb-111111111111111",
                tmpdir,
                "",
-               Job{Script_parameters: Tasks{[]TaskDef{{
+               Job{ScriptParameters: Tasks{[]TaskDef{{
                        Command:       []string{"echo", "foo"},
                        KeepTmpOutput: true}}}},
                Task{Sequence: 0})
index d91513ed4013b7fe7acd9bbe04eec332136836d5..a6286d9924a3e2240b16bc8a88f6ef79da34e9d6 100644 (file)
@@ -18,21 +18,20 @@ type ThrottleTestSuite struct{}
 
 func (*ThrottleTestSuite) TestThrottle(c *check.C) {
        uuid := "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+       t0 := throttle{}
+       c.Check(t0.Check(uuid), check.Equals, true)
+       c.Check(t0.Check(uuid), check.Equals, true)
 
-       t := throttle{}
-       c.Check(t.Check(uuid), check.Equals, true)
-       c.Check(t.Check(uuid), check.Equals, true)
-
-       t = throttle{hold: time.Nanosecond}
-       c.Check(t.Check(uuid), check.Equals, true)
+       tNs := throttle{hold: time.Nanosecond}
+       c.Check(tNs.Check(uuid), check.Equals, true)
        time.Sleep(time.Microsecond)
-       c.Check(t.Check(uuid), check.Equals, true)
-
-       t = throttle{hold: time.Minute}
-       c.Check(t.Check(uuid), check.Equals, true)
-       c.Check(t.Check(uuid), check.Equals, false)
-       c.Check(t.Check(uuid), check.Equals, false)
-       t.seen[uuid].last = time.Now().Add(-time.Hour)
-       c.Check(t.Check(uuid), check.Equals, true)
-       c.Check(t.Check(uuid), check.Equals, false)
+       c.Check(tNs.Check(uuid), check.Equals, true)
+
+       tMin := throttle{hold: time.Minute}
+       c.Check(tMin.Check(uuid), check.Equals, true)
+       c.Check(tMin.Check(uuid), check.Equals, false)
+       c.Check(tMin.Check(uuid), check.Equals, false)
+       tMin.seen[uuid].last = time.Now().Add(-time.Hour)
+       c.Check(tMin.Check(uuid), check.Equals, true)
+       c.Check(tMin.Check(uuid), check.Equals, false)
 }
index 83874164eebd8bd88b03e6558f14f62c0b991033..5532ea011ee8c5244e15add14425ad526941fc90 100644 (file)
@@ -109,7 +109,7 @@ def locators_and_ranges(data_locators, range_start, range_size, limit=None):
         block_size = dl.range_size
         block_end = block_start + block_size
         _logger.log(RANGES_SPAM,
-            "%s range_start %s block_start %s range_end %s block_end %s",
+            "L&R %s range_start %s block_start %s range_end %s block_end %s",
             dl.locator, range_start, block_start, range_end, block_end)
         if range_end <= block_start:
             # range ends before this block starts, so don't look at any more locators
@@ -165,7 +165,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
 
     last = data_locators[-1]
     if (last.range_start+last.range_size) == new_range_start:
-        if last.locator == new_locator:
+        if last.locator == new_locator and (last.segment_offset+last.range_size) == new_segment_offset:
             # extend last segment
             last.range_size += new_range_size
         else:
@@ -183,7 +183,7 @@ def replace_range(data_locators, new_range_start, new_range_size, new_locator, n
         old_segment_start = dl.range_start
         old_segment_end = old_segment_start + dl.range_size
         _logger.log(RANGES_SPAM,
-            "%s range_start %s segment_start %s range_end %s segment_end %s",
+            "RR %s range_start %s segment_start %s range_end %s segment_end %s",
             dl, new_range_start, old_segment_start, new_range_end,
             old_segment_end)
         if new_range_end <= old_segment_start:
index 9db19b05f6bc356c2b673d4983551b2dceef7122..aad3ce12a5bfd7e3f30e749832c582bbb5a27a80 100644 (file)
@@ -96,11 +96,23 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
             pos += self._filepos
         elif whence == os.SEEK_END:
             pos += self.size()
-        self._filepos = min(max(pos, 0L), self.size())
+        if pos < 0L:
+            raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
+        self._filepos = pos
+        return self._filepos
 
     def tell(self):
         return self._filepos
 
+    def readable(self):
+        return True
+
+    def writable(self):
+        return False
+
+    def seekable(self):
+        return True
+
     @_FileLikeObjectBase._before_close
     @retry_method
     def readall(self, size=2**20, num_retries=None):
@@ -172,13 +184,13 @@ class ArvadosFileReaderBase(_FileLikeObjectBase):
         return ''.join(data).splitlines(True)
 
     def size(self):
-        raise NotImplementedError()
+        raise IOError(errno.ENOSYS, "Not implemented")
 
     def read(self, size, num_retries=None):
-        raise NotImplementedError()
+        raise IOError(errno.ENOSYS, "Not implemented")
 
     def readfrom(self, start, size, num_retries=None):
-        raise NotImplementedError()
+        raise IOError(errno.ENOSYS, "Not implemented")
 
 
 class StreamFileReader(ArvadosFileReaderBase):
@@ -428,6 +440,7 @@ class _BlockManager(object):
         self.copies = copies
         self._pending_write_size = 0
         self.threads_lock = threading.Lock()
+        self.padding_block = None
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -653,6 +666,22 @@ class _BlockManager(object):
     def get_bufferblock(self, locator):
         return self._bufferblocks.get(locator)
 
+    @synchronized
+    def get_padding_block(self):
+        """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
+        when using truncate() to extend the size of a file.
+
+        For reference (and possible future optimization), the md5sum of the
+        padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
+
+        """
+
+        if self.padding_block is None:
+            self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
+            self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
+            self.commit_bufferblock(self.padding_block, False)
+        return self.padding_block
+
     @synchronized
     def delete_bufferblock(self, locator):
         self._delete_bufferblock(locator)
@@ -900,11 +929,11 @@ class ArvadosFile(object):
     @must_be_writable
     @synchronized
     def truncate(self, size):
-        """Shrink the size of the file.
+        """Shrink or expand the size of the file.
 
         If `size` is less than the size of the file, the file contents after
         `size` will be discarded.  If `size` is greater than the current size
-        of the file, an IOError will be raised.
+        of the file, it will be filled with zero bytes.
 
         """
         if size < self.size():
@@ -925,7 +954,17 @@ class ArvadosFile(object):
             self._segments = new_segs
             self.set_committed(False)
         elif size > self.size():
-            raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
+            padding = self.parent._my_block_manager().get_padding_block()
+            diff = size - self.size()
+            while diff > config.KEEP_BLOCK_SIZE:
+                self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
+                diff -= config.KEEP_BLOCK_SIZE
+            if diff > 0:
+                self._segments.append(Range(padding.blockid, self.size(), diff, 0))
+            self.set_committed(False)
+        else:
+            # size == self.size()
+            pass
 
     def readfrom(self, offset, size, num_retries, exact=False):
         """Read up to `size` bytes from the file starting at `offset`.
@@ -961,23 +1000,28 @@ class ArvadosFile(object):
         return ''.join(data)
 
     def _repack_writes(self, num_retries):
-        """Test if the buffer block has more data than actual segments.
+        """Optimize buffer block by repacking segments in file sequence.
 
-        This happens when a buffered write over-writes a file range written in
-        a previous buffered write.  Re-pack the buffer block for efficiency
-        and to avoid leaking information.
+        When the client makes random writes, they appear in the buffer block in
+        the sequence they were written rather than the sequence they appear in
+        the file.  This makes for inefficient, fragmented manifests.  Attempt
+        to optimize by repacking writes in file sequence.
 
         """
         segs = self._segments
 
-        # Sum up the segments to get the total bytes of the file referencing
-        # into the buffer block.
+        # Collect the segments that reference the buffer block.
         bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
+
+        # Collect total data referenced by segments (could be smaller than
+        # bufferblock size if a portion of the file was written and
+        # then overwritten).
         write_total = sum([s.range_size for s in bufferblock_segs])
 
-        if write_total < self._current_bblock.size():
-            # There is more data in the buffer block than is actually accounted for by segments, so
-            # re-pack into a new buffer by copying over to a new buffer block.
+        if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
+            # If there's more than one segment referencing this block, it is
+            # due to out-of-order writes and will produce a fragmented
+            # manifest, so try to optimize by re-packing into a new buffer.
             contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
             new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
             for t in bufferblock_segs:
@@ -999,7 +1043,7 @@ class ArvadosFile(object):
             return
 
         if offset > self.size():
-            raise ArgumentError("Offset is past the end of the file")
+            self.truncate(offset)
 
         if len(data) > config.KEEP_BLOCK_SIZE:
             # Chunk it up into smaller writes
@@ -1183,6 +1227,9 @@ class ArvadosFileWriter(ArvadosFileReader):
         self.mode = mode
         self.arvadosfile.add_writer(self)
 
+    def writable(self):
+        return True
+
     @_FileLikeObjectBase._before_close
     @retry_method
     def write(self, data, num_retries=None):
@@ -1204,8 +1251,6 @@ class ArvadosFileWriter(ArvadosFileReader):
         if size is None:
             size = self._filepos
         self.arvadosfile.truncate(size)
-        if self._filepos > self.size():
-            self._filepos = self.size()
 
     @_FileLikeObjectBase._before_close
     def flush(self):
index f39e092135aa6234c6318326dd8db654aa84aa04..3bf929584ea4e22968b0e29a07044c7e1e95a744 100755 (executable)
@@ -10,6 +10,7 @@ import logging
 
 import arvados
 import arvados.commands._util as arv_cmd
+import arvados.util as util
 
 from arvados._version import __version__
 
@@ -84,6 +85,11 @@ write *anything* if any files exist that would have to be
 overwritten. This option causes even devices, sockets, and fifos to be
 skipped.
 """)
+group.add_argument('--strip-manifest', action='store_true', default=False,
+                   help="""
+When getting a collection manifest, strip its access tokens before writing
+it.
+""")
 
 def parse_arguments(arguments, stdout, stderr):
     args = parser.parse_args(arguments)
@@ -131,12 +137,12 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         api_client = arvados.api('v1')
 
     r = re.search(r'^(.*?)(/.*)?$', args.locator)
-    collection = r.group(1)
+    col_loc = r.group(1)
     get_prefix = r.group(2)
     if args.r and not get_prefix:
         get_prefix = os.sep
     try:
-        reader = arvados.CollectionReader(collection, num_retries=args.retries)
+        reader = arvados.CollectionReader(col_loc, num_retries=args.retries)
     except Exception as error:
         logger.error("failed to read collection: {}".format(error))
         return 1
@@ -149,16 +155,16 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
                 open_flags |= os.O_EXCL
             try:
                 if args.destination == "-":
-                    stdout.write(reader.manifest_text())
+                    stdout.write(reader.manifest_text(strip=args.strip_manifest))
                 else:
                     out_fd = os.open(args.destination, open_flags)
                     with os.fdopen(out_fd, 'wb') as out_file:
-                        out_file.write(reader.manifest_text())
+                        out_file.write(reader.manifest_text(strip=args.strip_manifest))
             except (IOError, OSError) as error:
                 logger.error("can't write to '{}': {}".format(args.destination, error))
                 return 1
             except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
-                logger.error("failed to download '{}': {}".format(collection, error))
+                logger.error("failed to download '{}': {}".format(col_loc, error))
                 return 1
         return 0
 
index 4be99961ba0febcfcf36b254f582945427b133e8..9ff0b780b7dd6c5b2ec2cf6e290d70faeb74c17b 100644 (file)
@@ -56,6 +56,14 @@ def main(arguments=None):
     migrate19_parser.add_argument(
         '--version', action='version', version="%s %s" % (sys.argv[0], __version__),
         help='Print version and exit.')
+    migrate19_parser.add_argument(
+        '--verbose', action="store_true", help="Print stdout/stderr even on success")
+    migrate19_parser.add_argument(
+        '--force', action="store_true", help="Try to migrate even if there isn't enough space")
+
+    migrate19_parser.add_argument(
+        '--storage-driver', type=str, default="overlay",
+        help="Docker storage driver, e.g. aufs, overlay, vfs")
 
     exgroup = migrate19_parser.add_mutually_exclusive_group()
     exgroup.add_argument(
@@ -74,6 +82,9 @@ def main(arguments=None):
     if args.tempdir:
         tempfile.tempdir = args.tempdir
 
+    if args.verbose:
+        logger.setLevel(logging.DEBUG)
+
     only_migrate = None
     if args.infile:
         only_migrate = set()
@@ -114,26 +125,47 @@ def main(arguments=None):
     uuid_to_collection = {i["uuid"]: i for i in items}
 
     need_migrate = {}
+    totalbytes = 0
     biggest = 0
+    biggest_pdh = None
     for img in old_images:
         i = uuid_to_collection[img["collection"]]
         pdh = i["portable_data_hash"]
-        if pdh not in already_migrated and (only_migrate is None or pdh in only_migrate):
+        if pdh not in already_migrated and pdh not in need_migrate and (only_migrate is None or pdh in only_migrate):
             need_migrate[pdh] = img
             with CollectionReader(i["manifest_text"]) as c:
                 if c.values()[0].size() > biggest:
                     biggest = c.values()[0].size()
+                    biggest_pdh = pdh
+                totalbytes += c.values()[0].size()
+
+
+    if args.storage_driver == "vfs":
+        will_need = (biggest*20)
+    else:
+        will_need = (biggest*2.5)
 
     if args.print_unmigrated:
         only_migrate = set()
         for pdh in need_migrate:
-            print pdh
+            print(pdh)
         return
 
     logger.info("Already migrated %i images", len(already_migrated))
     logger.info("Need to migrate %i images", len(need_migrate))
     logger.info("Using tempdir %s", tempfile.gettempdir())
-    logger.info("Biggest image is about %i MiB, tempdir needs at least %i MiB free", biggest/(2**20), (biggest*2)/(2**20))
+    logger.info("Biggest image %s is about %i MiB", biggest_pdh, biggest/(2**20))
+    logger.info("Total data to migrate about %i MiB", totalbytes/(2**20))
+
+    df_out = subprocess.check_output(["df", "-B1", tempfile.gettempdir()])
+    ln = df_out.splitlines()[1]
+    filesystem, blocks, used, available, use_pct, mounted = re.match(r"^([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+)", ln).groups(1)
+    if int(available) <= will_need:
+        logger.warn("Temp filesystem mounted at %s does not have enough space for biggest image (has %i MiB, needs %i MiB)", mounted, int(available)/(2**20), will_need/(2**20))
+        if not args.force:
+            exit(1)
+        else:
+            logger.warn("--force provided, will migrate anyway")
 
     if args.dry_run:
         return
@@ -157,10 +189,10 @@ def main(arguments=None):
         dockercache = tempfile.mkdtemp()
         try:
             with tempfile.NamedTemporaryFile() as envfile:
-                envfile.write("ARVADOS_API_HOST=%s\n" % (os.environ["ARVADOS_API_HOST"]))
-                envfile.write("ARVADOS_API_TOKEN=%s\n" % (os.environ["ARVADOS_API_TOKEN"]))
-                if "ARVADOS_API_HOST_INSECURE" in os.environ:
-                    envfile.write("ARVADOS_API_HOST_INSECURE=%s\n" % (os.environ["ARVADOS_API_HOST_INSECURE"]))
+                envfile.write("ARVADOS_API_HOST=%s\n" % (arvados.config.get("ARVADOS_API_HOST")))
+                envfile.write("ARVADOS_API_TOKEN=%s\n" % (arvados.config.get("ARVADOS_API_TOKEN")))
+                if arvados.config.get("ARVADOS_API_HOST_INSECURE"):
+                    envfile.write("ARVADOS_API_HOST_INSECURE=%s\n" % (arvados.config.get("ARVADOS_API_HOST_INSECURE")))
                 envfile.flush()
 
                 dockercmd = ["docker", "run",
@@ -169,23 +201,51 @@ def main(arguments=None):
                              "--env-file", envfile.name,
                              "--volume", "%s:/var/lib/docker" % varlibdocker,
                              "--volume", "%s:/root/.cache/arvados/docker" % dockercache,
-                             "arvados/migrate-docker19",
+                             "arvados/migrate-docker19:1.0",
                              "/root/migrate.sh",
                              "%s/%s" % (old_image["collection"], tarfile),
                              tarfile[0:40],
                              old_image["repo"],
                              old_image["tag"],
-                             uuid_to_collection[old_image["collection"]]["owner_uuid"]]
+                             uuid_to_collection[old_image["collection"]]["owner_uuid"],
+                             args.storage_driver]
 
                 proc = subprocess.Popen(dockercmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                 out, err = proc.communicate()
 
+                initial_space = re.search(r"Initial available space is (\d+)", out)
+                imgload_space = re.search(r"Available space after image load is (\d+)", out)
+                imgupgrade_space = re.search(r"Available space after image upgrade is (\d+)", out)
+                keepdocker_space = re.search(r"Available space after arv-keepdocker is (\d+)", out)
+                cleanup_space = re.search(r"Available space after cleanup is (\d+)", out)
+
+                if initial_space:
+                    isp = int(initial_space.group(1))
+                    logger.info("Available space initially: %i MiB", (isp)/(2**20))
+                    if imgload_space:
+                        sp = int(imgload_space.group(1))
+                        logger.debug("Used after load: %i MiB", (isp-sp)/(2**20))
+                    if imgupgrade_space:
+                        sp = int(imgupgrade_space.group(1))
+                        logger.debug("Used after upgrade: %i MiB", (isp-sp)/(2**20))
+                    if keepdocker_space:
+                        sp = int(keepdocker_space.group(1))
+                        logger.info("Used after upload: %i MiB", (isp-sp)/(2**20))
+
+                if cleanup_space:
+                    sp = int(cleanup_space.group(1))
+                    logger.debug("Available after cleanup: %i MiB", (sp)/(2**20))
+
                 if proc.returncode != 0:
                     logger.error("Failed with return code %i", proc.returncode)
                     logger.error("--- Stdout ---\n%s", out)
                     logger.error("--- Stderr ---\n%s", err)
                     raise MigrationFailed()
 
+                if args.verbose:
+                    logger.info("--- Stdout ---\n%s", out)
+                    logger.info("--- Stderr ---\n%s", err)
+
             migrated = re.search(r"Migrated uuid is ([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15})", out)
             if migrated:
                 newcol = CollectionReader(migrated.group(1))
index 32d5fef6a8588e1f2785517435949082dd5b3534..42510754aba4724bb7b91aa061125fce52a1c1a0 100644 (file)
@@ -543,7 +543,10 @@ class ArvPutUploadJob(object):
                     with self._state_lock:
                         self._state['manifest'] = manifest
             if self.use_cache:
-                self._save_state()
+                try:
+                    self._save_state()
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to save cache file: {}".format(e))
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
@@ -673,10 +676,12 @@ class ArvPutUploadJob(object):
             cache_filepath = os.path.join(
                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
                 cache_filename)
-            if self.resume:
+            if self.resume and os.path.exists(cache_filepath):
+                self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'a+')
             else:
                 # --no-resume means start with a empty cache file.
+                self.logger.info("Creating new cache file at {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'w+')
             self._cache_filename = self._cache_file.name
             self._lock_file(self._cache_file)
@@ -693,6 +698,7 @@ class ArvPutUploadJob(object):
                     # Cache file empty, set up new cache
                     self._state = copy.deepcopy(self.EMPTY_STATE)
             else:
+                self.logger.info("No cache usage requested for this run.")
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
@@ -719,20 +725,19 @@ class ArvPutUploadJob(object):
         """
         Atomically save current state into cache.
         """
+        with self._state_lock:
+            # We're not using copy.deepcopy() here because it's a lot slower
+            # than json.dumps(), and we're already needing JSON format to be
+            # saved on disk.
+            state = json.dumps(self._state)
         try:
-            with self._state_lock:
-                # We're not using copy.deepcopy() here because it's a lot slower
-                # than json.dumps(), and we're already needing JSON format to be
-                # saved on disk.
-                state = json.dumps(self._state)
-            new_cache_fd, new_cache_name = tempfile.mkstemp(
-                dir=os.path.dirname(self._cache_filename))
-            self._lock_file(new_cache_fd)
-            new_cache = os.fdopen(new_cache_fd, 'r+')
+            new_cache = tempfile.NamedTemporaryFile(
+                dir=os.path.dirname(self._cache_filename), delete=False)
+            self._lock_file(new_cache)
             new_cache.write(state)
             new_cache.flush()
             os.fsync(new_cache)
-            os.rename(new_cache_name, self._cache_filename)
+            os.rename(new_cache.name, self._cache_filename)
         except (IOError, OSError, ResumeCacheConflict) as error:
             self.logger.error("There was a problem while saving the cache file: {}".format(error))
             try:
index 907c671822b16ed0929f5976c0a0c0ad5f9b6eac..b7f5e2b9c6d7923f39d485cbd96f9f1c6228ac82 100644 (file)
@@ -2,6 +2,8 @@
 # -*- coding: utf-8 -*-
 
 import io
+import os
+import re
 import shutil
 import tempfile
 
@@ -26,6 +28,7 @@ class ArvadosGetTestCase(run_test_server.TestCaseWithServers):
         shutil.rmtree(self.tempdir)
 
     def write_test_collection(self,
+                              strip_manifest=False,
                               contents = {
                                   'foo.txt' : 'foo',
                                   'bar.txt' : 'bar',
@@ -36,7 +39,9 @@ class ArvadosGetTestCase(run_test_server.TestCaseWithServers):
             with c.open(path, 'w') as f:
                 f.write(data)
         c.save_new()
-        return (c.manifest_locator(), c.portable_data_hash(), c.manifest_text())
+        return (c.manifest_locator(),
+                c.portable_data_hash(),
+                c.manifest_text(strip=strip_manifest))
     
     def run_get(self, args):
         self.stdout = io.BytesIO()
@@ -66,19 +71,43 @@ class ArvadosGetTestCase(run_test_server.TestCaseWithServers):
         # Download the entire collection to the temp directory
         r = self.run_get(["{}/".format(self.col_loc), self.tempdir])
         self.assertEqual(0, r)
-        with open("{}/foo.txt".format(self.tempdir), "r") as f:
+        with open(os.path.join(self.tempdir, "foo.txt"), "r") as f:
             self.assertEqual("foo", f.read())
-        with open("{}/bar.txt".format(self.tempdir), "r") as f:
+        with open(os.path.join(self.tempdir, "bar.txt"), "r") as f:
             self.assertEqual("bar", f.read())
-        with open("{}/subdir/baz.txt".format(self.tempdir), "r") as f:
+        with open(os.path.join(self.tempdir, "subdir", "baz.txt"), "r") as f:
             self.assertEqual("baz", f.read())
 
-    def test_get_collection_manifest(self):
-        # Get the collection manifest
+    def test_get_collection_unstripped_manifest(self):
+        dummy_token = "+Axxxxxxx"
+        # Get the collection manifest by UUID
         r = self.run_get([self.col_loc, self.tempdir])
         self.assertEqual(0, r)
-        with open("{}/{}".format(self.tempdir, self.col_loc), "r") as f:
-            self.assertEqual(self.col_manifest, f.read())
+        m_from_collection = re.sub(r"\+A[0-9a-f@]+", dummy_token, self.col_manifest)
+        with open(os.path.join(self.tempdir, self.col_loc), "r") as f:
+            # Replace manifest tokens before comparison to avoid races
+            m_from_file = re.sub(r"\+A[0-9a-f@]+", dummy_token, f.read())
+            self.assertEqual(m_from_collection, m_from_file)
+        # Get the collection manifest by PDH
+        r = self.run_get([self.col_pdh, self.tempdir])
+        self.assertEqual(0, r)
+        with open(os.path.join(self.tempdir, self.col_pdh), "r") as f:
+            # Replace manifest tokens before comparison to avoid races
+            m_from_file = re.sub(r"\+A[0-9a-f@]+", dummy_token, f.read())
+            self.assertEqual(m_from_collection, m_from_file)
+
+    def test_get_collection_stripped_manifest(self):
+        col_loc, col_pdh, col_manifest = self.write_test_collection(strip_manifest=True)
+        # Get the collection manifest by UUID
+        r = self.run_get(['--strip-manifest', col_loc, self.tempdir])
+        self.assertEqual(0, r)
+        with open(os.path.join(self.tempdir, col_loc), "r") as f:
+            self.assertEqual(col_manifest, f.read())
+        # Get the collection manifest by PDH
+        r = self.run_get(['--strip-manifest', col_pdh, self.tempdir])
+        self.assertEqual(0, r)
+        with open(os.path.join(self.tempdir, col_pdh), "r") as f:
+            self.assertEqual(col_manifest, f.read())
 
     def test_invalid_collection(self):
         # Asking for an invalid collection should generate an error.
@@ -99,10 +128,10 @@ class ArvadosGetTestCase(run_test_server.TestCaseWithServers):
     def test_preexistent_destination(self):
         # Asking to place a file with the same path as a local one should
         # generate an error and avoid overwrites.
-        with open("{}/foo.txt".format(self.tempdir), "w") as f:
+        with open(os.path.join(self.tempdir, "foo.txt"), "w") as f:
             f.write("another foo")
         r = self.run_get(["{}/foo.txt".format(self.col_loc), self.tempdir])
         self.assertNotEqual(0, r)
-        with open("{}/foo.txt".format(self.tempdir), "r") as f:
+        with open(os.path.join(self.tempdir, "foo.txt"), "r") as f:
             self.assertEqual("another foo", f.read())
 
index 1b66935237c20bcedb75ab1d6ae51fd17bf38dad..20b258437ded77bbe01b7ff86cb17a3125183a91 100644 (file)
@@ -91,6 +91,68 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
             self.assertFalse(c.modified())
 
+
+    def test_truncate2(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
+        api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate2",
+                                                 "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 7f614da9329cd3aebf59b91aadc30bf0+67108864 0:12:count.txt\n",
+                                                 "replication_desired":None},
+                                                {"uuid":"zzzzz-4zz18-mockcollection0",
+                                                 "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 7f614da9329cd3aebf59b91aadc30bf0+67108864 0:12:count.txt\n",
+                                                 "portable_data_hash":"272da898abdf86ddc71994835e3155f8+95"})
+        with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+                             api_client=api, keep_client=keep) as c:
+            writer = c.open("count.txt", "r+")
+            self.assertEqual(writer.size(), 10)
+            self.assertEqual("0123456789", writer.read(12))
+
+            # extend file size
+            writer.truncate(12)
+
+            self.assertEqual(writer.size(), 12)
+            writer.seek(0, os.SEEK_SET)
+            self.assertEqual(b"0123456789\x00\x00", writer.read(12))
+
+            self.assertIsNone(c.manifest_locator())
+            self.assertTrue(c.modified())
+            c.save_new("test_truncate2")
+            self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
+            self.assertFalse(c.modified())
+
+    def test_truncate3(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789",
+                                                   "a925576942e94b2ef57a066101b48876+10": "abcdefghij"})
+        api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate",
+                                                 "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n",
+                                                 "replication_desired":None},
+                                                {"uuid":"zzzzz-4zz18-mockcollection0",
+                                                 "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n",
+                                                 "portable_data_hash":"7fcd0eaac3aad4c31a6a0e756475da92+52"})
+        with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n',
+                             api_client=api, keep_client=keep) as c:
+            writer = c.open("count.txt", "r+")
+            self.assertEqual(writer.size(), 20)
+            self.assertEqual("0123456789ab", writer.read(12))
+            self.assertEqual(12, writer.tell())
+
+            writer.truncate(8)
+
+            # Make sure reading off the end doesn't break
+            self.assertEqual(12, writer.tell())
+            self.assertEqual("", writer.read(12))
+
+            self.assertEqual(writer.size(), 8)
+            self.assertEqual(2, writer.seek(-10, os.SEEK_CUR))
+            self.assertEqual("234567", writer.read(12))
+
+            self.assertIsNone(c.manifest_locator())
+            self.assertTrue(c.modified())
+            c.save_new("test_truncate")
+            self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
+            self.assertFalse(c.modified())
+
+
+
     def test_write_to_end(self):
         keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_append",
@@ -104,7 +166,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
             writer = c.open("count.txt", "r+")
             self.assertEqual(writer.size(), 10)
 
-            writer.seek(5, os.SEEK_SET)
+            self.assertEqual(5, writer.seek(5, os.SEEK_SET))
             self.assertEqual("56789", writer.read(8))
 
             writer.seek(10, os.SEEK_SET)
@@ -268,6 +330,72 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
 
             self.assertEqual(c.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 48dd23ea1645fd47d789804d71b5bb8e+67108864 77c57dc6ac5a10bb2205caaa73187994+32891126 0:100000000:count.txt\n")
 
+    def test_sparse_write(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({})
+        api = ArvadosFileWriterTestCase.MockApi({}, {})
+        with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
+                             api_client=api, keep_client=keep) as c:
+            writer = c.open("count.txt", "r+")
+            self.assertEqual(writer.size(), 0)
+
+            text = "0123456789"
+            writer.seek(2)
+            writer.write(text)
+            self.assertEqual(writer.size(), 12)
+            writer.seek(0, os.SEEK_SET)
+            self.assertEqual(writer.read(), b"\x00\x00"+text)
+
+            self.assertEqual(c.manifest_text(), ". 7f614da9329cd3aebf59b91aadc30bf0+67108864 781e5e245d69b566979b86e28d23f2c7+10 0:2:count.txt 67108864:10:count.txt\n")
+
+
+    def test_sparse_write2(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({})
+        api = ArvadosFileWriterTestCase.MockApi({}, {})
+        with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
+                             api_client=api, keep_client=keep) as c:
+            writer = c.open("count.txt", "r+")
+            self.assertEqual(writer.size(), 0)
+
+            text = "0123456789"
+            writer.seek((arvados.config.KEEP_BLOCK_SIZE*2) + 2)
+            writer.write(text)
+            self.assertEqual(writer.size(), (arvados.config.KEEP_BLOCK_SIZE*2) + 12)
+            writer.seek(0, os.SEEK_SET)
+
+            self.assertEqual(c.manifest_text(), ". 7f614da9329cd3aebf59b91aadc30bf0+67108864 781e5e245d69b566979b86e28d23f2c7+10 0:67108864:count.txt 0:67108864:count.txt 0:2:count.txt 67108864:10:count.txt\n")
+
+
+    def test_sparse_write3(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({})
+        api = ArvadosFileWriterTestCase.MockApi({}, {})
+        for r in [[0, 1, 2, 3, 4], [4, 3, 2, 1, 0], [3, 2, 0, 4, 1]]:
+            with Collection() as c:
+                writer = c.open("count.txt", "r+")
+                self.assertEqual(writer.size(), 0)
+
+                for i in r:
+                    w = ("%s" % i) * 10
+                    writer.seek(i*10)
+                    writer.write(w)
+                writer.seek(0)
+                self.assertEqual(writer.read(), "00000000001111111111222222222233333333334444444444")
+
+    def test_sparse_write4(self):
+        keep = ArvadosFileWriterTestCase.MockKeep({})
+        api = ArvadosFileWriterTestCase.MockApi({}, {})
+        for r in [[0, 1, 2, 4], [4, 2, 1, 0], [2, 0, 4, 1]]:
+            with Collection() as c:
+                writer = c.open("count.txt", "r+")
+                self.assertEqual(writer.size(), 0)
+
+                for i in r:
+                    w = ("%s" % i) * 10
+                    writer.seek(i*10)
+                    writer.write(w)
+                writer.seek(0)
+                self.assertEqual(writer.read(), b"000000000011111111112222222222\x00\x00\x00\x00\x00\x00\x00\x00\x00\x004444444444")
+
+
     def test_rewrite_on_empty_file(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
         with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
@@ -325,10 +453,10 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_write_large_rewrite(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_write_large",
-                                                 "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n",
+                                                 "manifest_text": ". 3dc0d4bc21f48060bedcb2c91af4f906+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 0:3:count.txt 32892006:67107997:count.txt 3:32892000:count.txt\n",
                                                  "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
-                                                 "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n",
+                                                 "manifest_text": ". 3dc0d4bc21f48060bedcb2c91af4f906+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 0:3:count.txt 32892006:67107997:count.txt 3:32892000:count.txt\n",
                                                  "portable_data_hash":"217665c6b713e1b78dfba7ebd42344db+156"})
         with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
                              api_client=api, keep_client=keep) as c:
index 624f1b8ca4391678215539f70c2a28b00fd37388..082af1bb7caa67e89b68df8f0e1e994e0ccfa0fe 100644 (file)
@@ -70,13 +70,16 @@ class StreamFileReaderTestCase(unittest.TestCase):
 
     def test_seek_min_zero(self):
         sfile = self.make_count_reader()
-        sfile.seek(-2, os.SEEK_SET)
+        self.assertEqual(0, sfile.tell())
+        with self.assertRaises(IOError):
+            sfile.seek(-2, os.SEEK_SET)
         self.assertEqual(0, sfile.tell())
 
     def test_seek_max_size(self):
         sfile = self.make_count_reader()
         sfile.seek(2, os.SEEK_END)
-        self.assertEqual(9, sfile.tell())
+        # POSIX permits seeking past end of file.
+        self.assertEqual(11, sfile.tell())
 
     def test_size(self):
         self.assertEqual(9, self.make_count_reader().size())
index c7ac090adc3b9d7bc6b265707ac0c3b4973521e7..82ea0acbd63d9b12e2b31cd4d35fec783b869b59 100644 (file)
@@ -1,3 +1,5 @@
+require 'tempfile'
+
 class Node < ArvadosModel
   include HasUuid
   include KindAndEtag
@@ -171,22 +173,30 @@ class Node < ArvadosModel
     }
 
     if Rails.configuration.dns_server_conf_dir and Rails.configuration.dns_server_conf_template
+      tmpfile = nil
       begin
         begin
           template = IO.read(Rails.configuration.dns_server_conf_template)
-        rescue => e
+        rescue IOError, SystemCallError => e
           logger.error "Reading #{Rails.configuration.dns_server_conf_template}: #{e.message}"
           raise
         end
 
         hostfile = File.join Rails.configuration.dns_server_conf_dir, "#{hostname}.conf"
-        File.open hostfile+'.tmp', 'w' do |f|
+        Tempfile.open(["#{hostname}-", ".conf.tmp"],
+                                 Rails.configuration.dns_server_conf_dir) do |f|
+          tmpfile = f.path
           f.puts template % template_vars
         end
-        File.rename hostfile+'.tmp', hostfile
-      rescue => e
+        File.rename tmpfile, hostfile
+      rescue IOError, SystemCallError => e
         logger.error "Writing #{hostfile}: #{e.message}"
         ok = false
+      ensure
+        if tmpfile and File.file? tmpfile
+          # Cleanup remaining temporary file.
+          File.unlink tmpfile
+        end
       end
     end
 
@@ -205,7 +215,7 @@ class Node < ArvadosModel
           # Typically, this is used to trigger a dns server restart
           f.puts Rails.configuration.dns_server_reload_command
         end
-      rescue => e
+      rescue IOError, SystemCallError => e
         logger.error "Unable to write #{restartfile}: #{e.message}"
         ok = false
       end
diff --git a/services/api/db/migrate/20170419173031_add_created_by_job_task_index_to_job_tasks.rb b/services/api/db/migrate/20170419173031_add_created_by_job_task_index_to_job_tasks.rb
new file mode 100644 (file)
index 0000000..225af45
--- /dev/null
@@ -0,0 +1,5 @@
+class AddCreatedByJobTaskIndexToJobTasks < ActiveRecord::Migration
+  def change
+    add_index :job_tasks, :created_by_job_task_uuid
+  end
+end
diff --git a/services/api/db/migrate/20170419173712_add_object_owner_index_to_logs.rb b/services/api/db/migrate/20170419173712_add_object_owner_index_to_logs.rb
new file mode 100644 (file)
index 0000000..8da78fe
--- /dev/null
@@ -0,0 +1,5 @@
+class AddObjectOwnerIndexToLogs < ActiveRecord::Migration
+  def change
+    add_index :logs, :object_owner_uuid
+  end
+end
diff --git a/services/api/db/migrate/20170419175801_add_requesting_container_index_to_container_requests.rb b/services/api/db/migrate/20170419175801_add_requesting_container_index_to_container_requests.rb
new file mode 100644 (file)
index 0000000..edfddff
--- /dev/null
@@ -0,0 +1,5 @@
+class AddRequestingContainerIndexToContainerRequests < ActiveRecord::Migration
+  def change
+    add_index :container_requests, :requesting_container_uuid
+  end
+end
index 5c592a264b21a4dd136f9d78ba53fa24878c052b..3e1fa3fae4be514e5d47761ca616ef072142fae3 100644 (file)
@@ -1728,6 +1728,13 @@ CREATE UNIQUE INDEX index_commits_on_repository_name_and_sha1 ON commits USING b
 CREATE INDEX index_container_requests_on_owner_uuid ON container_requests USING btree (owner_uuid);
 
 
+--
+-- Name: index_container_requests_on_requesting_container_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace: 
+--
+
+CREATE INDEX index_container_requests_on_requesting_container_uuid ON container_requests USING btree (requesting_container_uuid);
+
+
 --
 -- Name: index_container_requests_on_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
@@ -1805,6 +1812,13 @@ CREATE UNIQUE INDEX index_humans_on_uuid ON humans USING btree (uuid);
 CREATE INDEX index_job_tasks_on_created_at ON job_tasks USING btree (created_at);
 
 
+--
+-- Name: index_job_tasks_on_created_by_job_task_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace: 
+--
+
+CREATE INDEX index_job_tasks_on_created_by_job_task_uuid ON job_tasks USING btree (created_by_job_task_uuid);
+
+
 --
 -- Name: index_job_tasks_on_job_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
@@ -2036,6 +2050,13 @@ CREATE INDEX index_logs_on_event_type ON logs USING btree (event_type);
 CREATE INDEX index_logs_on_modified_at ON logs USING btree (modified_at);
 
 
+--
+-- Name: index_logs_on_object_owner_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace: 
+--
+
+CREATE INDEX index_logs_on_object_owner_uuid ON logs USING btree (object_owner_uuid);
+
+
 --
 -- Name: index_logs_on_object_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
@@ -2757,3 +2778,10 @@ INSERT INTO schema_migrations (version) VALUES ('20170319063406');
 INSERT INTO schema_migrations (version) VALUES ('20170328215436');
 
 INSERT INTO schema_migrations (version) VALUES ('20170330012505');
+
+INSERT INTO schema_migrations (version) VALUES ('20170419173031');
+
+INSERT INTO schema_migrations (version) VALUES ('20170419173712');
+
+INSERT INTO schema_migrations (version) VALUES ('20170419175801');
+
index c1e77f6a4d4cf78e6e3ac7ce77b4ffe5a2fd4c9e..2330e7c528f6a304b05cf318fcc2482e91e62b8d 100644 (file)
@@ -1,4 +1,6 @@
 require 'test_helper'
+require 'tmpdir'
+require 'tempfile'
 
 class NodeTest < ActiveSupport::TestCase
   def ping_node(node_name, ping_data)
@@ -76,6 +78,16 @@ class NodeTest < ActiveSupport::TestCase
     assert Node.dns_server_update 'compute65535', '127.0.0.127'
   end
 
+  test "don't leave temp files behind if there's an error writing them" do
+    Rails.configuration.dns_server_conf_template = Rails.root.join 'config', 'unbound.template'
+    Tempfile.any_instance.stubs(:puts).raises(IOError)
+    Dir.mktmpdir do |tmpdir|
+      Rails.configuration.dns_server_conf_dir = tmpdir
+      refute Node.dns_server_update 'compute65535', '127.0.0.127'
+      assert_empty Dir.entries(tmpdir).select{|f| File.file? f}
+    end
+  end
+
   test "ping new node with no hostname and default config" do
     node = ping_node(:new_with_no_hostname, {})
     slot_number = node.slot_number
index a05f61a858c04321f41ff7c8a3faf97a87618431..fd2ce3f659f188dc71a6b8fa72c22fc149faa499 100644 (file)
@@ -1,6 +1,7 @@
 package main
 
 import (
+       "bytes"
        "context"
        "encoding/json"
        "errors"
@@ -134,7 +135,7 @@ type ContainerRunner struct {
        loggingDone   chan bool
        CrunchLog     *ThrottledLogger
        Stdout        io.WriteCloser
-       Stderr        *ThrottledLogger
+       Stderr        io.WriteCloser
        LogCollection *CollectionWriter
        LogsPDH       *string
        RunArvMount
@@ -345,10 +346,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        for _, bind := range binds {
                mnt := runner.Container.Mounts[bind]
-               if bind == "stdout" {
+               if bind == "stdout" || bind == "stderr" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
-                               return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+                               return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
                        }
 
                        // Does path start with OutputPath?
@@ -357,7 +358,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                prefix += "/"
                        }
                        if !strings.HasPrefix(mnt.Path, prefix) {
-                               return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+                               return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+                       }
+               }
+
+               if bind == "stdin" {
+                       // Is it a "collection" mount kind?
+                       if mnt.Kind != "collection" && mnt.Kind != "json" {
+                               return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
                        }
                }
 
@@ -372,7 +380,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                }
 
                switch {
-               case mnt.Kind == "collection":
+               case mnt.Kind == "collection" && bind != "stdin":
                        var src string
                        if mnt.UUID != "" && mnt.PortableDataHash != "" {
                                return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
@@ -657,14 +665,44 @@ func (runner *ContainerRunner) LogContainerRecord() (err error) {
        return nil
 }
 
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
 func (runner *ContainerRunner) AttachStreams() (err error) {
 
        runner.CrunchLog.Print("Attaching container streams")
 
+       // If stdin mount is provided, attach it to the docker container
+       var stdinRdr keepclient.Reader
+       var stdinJson []byte
+       if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+               if stdinMnt.Kind == "collection" {
+                       var stdinColl arvados.Collection
+                       collId := stdinMnt.UUID
+                       if collId == "" {
+                               collId = stdinMnt.PortableDataHash
+                       }
+                       err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+                       if err != nil {
+                               return fmt.Errorf("While getting stding collection: %v", err)
+                       }
+
+                       stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+                       if os.IsNotExist(err) {
+                               return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+                       } else if err != nil {
+                               return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+                       }
+               } else if stdinMnt.Kind == "json" {
+                       stdinJson, err = json.Marshal(stdinMnt.Content)
+                       if err != nil {
+                               return fmt.Errorf("While encoding stdin json data: %v", err)
+                       }
+               }
+       }
+
+       stdinUsed := stdinRdr != nil || len(stdinJson) != 0
        response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
-               dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+               dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
        if err != nil {
                return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
        }
@@ -672,37 +710,76 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
        runner.loggingDone = make(chan bool)
 
        if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
-               stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
-               index := strings.LastIndex(stdoutPath, "/")
-               if index > 0 {
-                       subdirs := stdoutPath[:index]
-                       if subdirs != "" {
-                               st, err := os.Stat(runner.HostOutputDir)
-                               if err != nil {
-                                       return fmt.Errorf("While Stat on temp dir: %v", err)
-                               }
-                               stdoutPath := path.Join(runner.HostOutputDir, subdirs)
-                               err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
-                               if err != nil {
-                                       return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
-                               }
-                       }
-               }
-               stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+               stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
                if err != nil {
-                       return fmt.Errorf("While creating stdout file: %v", err)
+                       return err
                }
                runner.Stdout = stdoutFile
        } else {
                runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
        }
-       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+       if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+               stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+               if err != nil {
+                       return err
+               }
+               runner.Stderr = stderrFile
+       } else {
+               runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+       }
+
+       if stdinRdr != nil {
+               go func() {
+                       _, err := io.Copy(response.Conn, stdinRdr)
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+                               runner.stop()
+                       }
+                       stdinRdr.Close()
+                       response.CloseWrite()
+               }()
+       } else if len(stdinJson) != 0 {
+               go func() {
+                       _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+                       if err != nil {
+                               runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+                               runner.stop()
+                       }
+                       response.CloseWrite()
+               }()
+       }
 
        go runner.ProcessDockerAttach(response.Reader)
 
        return nil
 }
 
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+       stdoutPath := mntPath[len(runner.Container.OutputPath):]
+       index := strings.LastIndex(stdoutPath, "/")
+       if index > 0 {
+               subdirs := stdoutPath[:index]
+               if subdirs != "" {
+                       st, err := os.Stat(runner.HostOutputDir)
+                       if err != nil {
+                               return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+                       }
+                       stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+                       err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+                       if err != nil {
+                               return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+                       }
+               }
+       }
+       stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+       if err != nil {
+               return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+       }
+
+       return stdoutFile, nil
+}
+
 // CreateContainer creates the docker container.
 func (runner *ContainerRunner) CreateContainer() error {
        runner.CrunchLog.Print("Creating Docker container")
@@ -743,6 +820,13 @@ func (runner *ContainerRunner) CreateContainer() error {
                }
        }
 
+       _, stdinUsed := runner.Container.Mounts["stdin"]
+       runner.ContainerConfig.OpenStdin = stdinUsed
+       runner.ContainerConfig.StdinOnce = stdinUsed
+       runner.ContainerConfig.AttachStdin = stdinUsed
+       runner.ContainerConfig.AttachStdout = true
+       runner.ContainerConfig.AttachStderr = true
+
        createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
        if err != nil {
                return fmt.Errorf("While creating container: %v", err)
index 98462f8fdcfda5a84fb93b5f56a22d536665a0f4..43c55b67c1c08c07f69fe9913e3681e3835026a6 100644 (file)
@@ -10,6 +10,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "net"
        "os"
        "os/exec"
        "path/filepath"
@@ -94,8 +95,21 @@ func NewTestDockerClient(exitCode int) *TestDockerClient {
        return t
 }
 
+type MockConn struct {
+       net.Conn
+}
+
+func (m *MockConn) Write(b []byte) (int, error) {
+       return len(b), nil
+}
+
+func NewMockConn() *MockConn {
+       c := &MockConn{}
+       return c
+}
+
 func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
-       return dockertypes.HijackedResponse{Reader: bufio.NewReader(t.logReader)}, nil
+       return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
 }
 
 func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
@@ -286,6 +300,10 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
                rdr := ioutil.NopCloser(&bytes.Buffer{})
                client.Called = true
                return FileWrapper{rdr, 1321984}, nil
+       } else if filename == "/file1_in_main.txt" {
+               rdr := ioutil.NopCloser(strings.NewReader("foo"))
+               client.Called = true
+               return FileWrapper{rdr, 3}, nil
        }
        return nil, nil
 }
@@ -1113,6 +1131,22 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                cr.CleanupDirs()
                checkEmpty()
        }
+
+       // Only mount point of kind 'collection' is allowed for stdin
+       {
+               i = 0
+               cr.ArvMountPoint = ""
+               cr.Container.Mounts = make(map[string]arvados.Mount)
+               cr.Container.Mounts = map[string]arvados.Mount{
+                       "stdin": {Kind: "tmp"},
+               }
+
+               err := cr.SetupMounts()
+               c.Check(err, NotNil)
+               c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
+               cr.CleanupDirs()
+               checkEmpty()
+       }
 }
 
 func (s *TestSuite) TestStdout(c *C) {
@@ -1359,3 +1393,103 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                }
        }
 }
+
+func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
+       helperRecord := `{
+               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": "/bin",
+               "environment": {"FROBIZ": "bilbo"},
+               "mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/file1_in_main.txt"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`
+
+       extraMounts := []string{
+               "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
+       }
+
+       api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       for _, v := range api.Content {
+               if v["collection"] != nil {
+                       collection := v["collection"].(arvadosclient.Dict)
+                       if strings.Index(collection["name"].(string), "output") == 0 {
+                               manifest := collection["manifest_text"].(string)
+                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+                       }
+               }
+       }
+}
+
+func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
+       helperRecord := `{
+               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+               "cwd": "/bin",
+               "environment": {"FROBIZ": "bilbo"},
+               "mounts": {
+        "/tmp": {"kind": "tmp"},
+        "stdin": {"kind": "json", "content": "foo"},
+        "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+    },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {}
+       }`
+
+       api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       for _, v := range api.Content {
+               if v["collection"] != nil {
+                       collection := v["collection"].(arvadosclient.Dict)
+                       if strings.Index(collection["name"].(string), "output") == 0 {
+                               manifest := collection["manifest_text"].(string)
+                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+                       }
+               }
+       }
+}
+
+func (s *TestSuite) TestStderrMount(c *C) {
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo hello;exit 1"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"},
+               "stdout": {"kind": "file", "path": "/tmp/a/out.txt"},
+               "stderr": {"kind": "file", "path": "/tmp/b/err.txt"}},
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 1, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello\n"))
+               t.logWriter.Write(dockerLog(2, "oops\n"))
+               t.logWriter.Close()
+       })
+
+       final := api.CalledWith("container.state", "Complete")
+       c.Assert(final, NotNil)
+       c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+       c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
+
+       c.Check(api.CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
+}
index db7517adc68ee39ec2300e7763caad7cd5ab07c3..a79973b975486b92e4b2d2349b7613df29ff4943 100644 (file)
@@ -94,6 +94,20 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                httpserver.Log(remoteAddr, statusCode, statusText, w.WroteBodyBytes(), r.Method, r.Host, r.URL.Path, r.URL.RawQuery)
        }()
 
+       if r.Method == "OPTIONS" {
+               method := r.Header.Get("Access-Control-Request-Method")
+               if method != "GET" && method != "POST" {
+                       statusCode = http.StatusMethodNotAllowed
+                       return
+               }
+               w.Header().Set("Access-Control-Allow-Headers", "Range")
+               w.Header().Set("Access-Control-Allow-Methods", "GET, POST")
+               w.Header().Set("Access-Control-Allow-Origin", "*")
+               w.Header().Set("Access-Control-Max-Age", "86400")
+               statusCode = http.StatusOK
+               return
+       }
+
        if r.Method != "GET" && r.Method != "POST" {
                statusCode, statusText = http.StatusMethodNotAllowed, r.Method
                return
index 0c960b8c0e323e860b4f79ec032d879de9a92944..86e1409391ddfc2deedde7c590ff3a09ce3d3aa1 100644 (file)
@@ -18,6 +18,37 @@ var _ = check.Suite(&UnitSuite{})
 
 type UnitSuite struct{}
 
+func (s *UnitSuite) TestCORSPreflight(c *check.C) {
+       h := handler{Config: &Config{}}
+       u, _ := url.Parse("http://keep-web.example/c=" + arvadostest.FooCollection + "/foo")
+       req := &http.Request{
+               Method:     "OPTIONS",
+               Host:       u.Host,
+               URL:        u,
+               RequestURI: u.RequestURI(),
+               Header: http.Header{
+                       "Origin":                        {"https://workbench.example"},
+                       "Access-Control-Request-Method": {"POST"},
+               },
+       }
+
+       // Check preflight for an allowed request
+       resp := httptest.NewRecorder()
+       h.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
+       c.Check(resp.Body.String(), check.Equals, "")
+       c.Check(resp.Header().Get("Access-Control-Allow-Origin"), check.Equals, "*")
+       c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "GET, POST")
+       c.Check(resp.Header().Get("Access-Control-Allow-Headers"), check.Equals, "Range")
+
+       // Check preflight for a disallowed request
+       resp = httptest.NewRecorder()
+       req.Header.Set("Access-Control-Request-Method", "DELETE")
+       h.ServeHTTP(resp, req)
+       c.Check(resp.Body.String(), check.Equals, "")
+       c.Check(resp.Code, check.Equals, http.StatusMethodNotAllowed)
+}
+
 func mustParseURL(s string) *url.URL {
        r, err := url.Parse(s)
        if err != nil {
index aa9b3e290fe42123ca19209de45781a58f2eda94..30e8995baa26bdcc22154570dfa099aa5658d341 100644 (file)
@@ -47,6 +47,8 @@ class NodeManagerConfig(ConfigParser.SafeConfigParser):
                        'node_stale_after': str(60 * 60 * 2),
                        'watchdog': '600',
                        'node_mem_scaling': '0.95'},
+            'Manage': {'address': '127.0.0.1',
+                       'port': '-1'},
             'Logging': {'file': '/dev/stderr',
                         'level': 'WARNING'},
         }.iteritems():
index 94415465a21459a802190366da27c9f7abf7e71f..9bfee79b59bae21968064b995e5cd87df7d7c7b9 100644 (file)
@@ -9,6 +9,7 @@ import time
 import pykka
 
 from . import computenode as cnode
+from . import status
 from .computenode import dispatch
 from .config import actor_class
 
@@ -253,6 +254,18 @@ class NodeManagerDaemonActor(actor_class):
                     states.append("shutdown")
         return states + pykka.get_all(proxy_states)
 
+    def _update_tracker(self):
+        updates = {
+            k: 0
+            for k in status.tracker.keys()
+            if k.startswith('nodes_')
+        }
+        for s in self._node_states(size=None):
+            updates.setdefault('nodes_'+s, 0)
+            updates['nodes_'+s] += 1
+        updates['nodes_wish'] = len(self.last_wishlist)
+        status.tracker.update(updates)
+
     def _state_counts(self, size):
         states = self._node_states(size)
         counts = {
@@ -337,6 +350,10 @@ class NodeManagerDaemonActor(actor_class):
                     self._later.stop_booting_node(size)
             except Exception as e:
                 self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
+        try:
+            self._update_tracker()
+        except:
+            self._logger.exception("while updating tracker")
 
     def _check_poll_freshness(orig_func):
         """Decorator to inhibit a method when poll information is stale.
index 93f6cbdbe3c60c4736614594fdd2e77666276274..11d38ecb76d22105b289d17cec5081aaa5bf3952 100644 (file)
@@ -13,6 +13,7 @@ import pykka
 import libcloud
 
 from . import config as nmconfig
+from . import status
 from .baseactor import WatchdogActor
 from .daemon import NodeManagerDaemonActor
 from .jobqueue import JobQueueMonitorActor, ServerCalculator
@@ -112,6 +113,8 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
+    status.Server(config).start()
+
     try:
         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
         root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
diff --git a/services/nodemanager/arvnodeman/status.py b/services/nodemanager/arvnodeman/status.py
new file mode 100644 (file)
index 0000000..d21899c
--- /dev/null
@@ -0,0 +1,65 @@
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import http.server
+import json
+import logging
+import socketserver
+import threading
+
+_logger = logging.getLogger('status.Handler')
+
+
+class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
+    def __init__(self, config):
+        port = config.getint('Manage', 'port')
+        self.enabled = port >= 0
+        if not self.enabled:
+            _logger.warning("Management server disabled. "+
+                            "Use [Manage] config section to enable.")
+            return
+        self._config = config
+        self._tracker = tracker
+        super(Server, self).__init__(
+            (config.get('Manage', 'address'), port), Handler)
+        self._thread = threading.Thread(target=self.serve_forever)
+        self._thread.daemon = True
+
+    def start(self):
+        if self.enabled:
+            self._thread.start()
+
+
+class Handler(http.server.BaseHTTPRequestHandler, object):
+    def do_GET(self):
+        if self.path == '/status.json':
+            self.send_response(200)
+            self.send_header('Content-type', 'application/json')
+            self.end_headers()
+            self.wfile.write(tracker.get_json())
+        else:
+            self.send_response(404)
+
+    def log_message(self, fmt, *args, **kwargs):
+        _logger.info(fmt, *args, **kwargs)
+
+
+class Tracker(object):
+    def __init__(self):
+        self._mtx = threading.Lock()
+        self._latest = {}
+
+    def get_json(self):
+        with self._mtx:
+            return json.dumps(self._latest)
+
+    def keys(self):
+        with self._mtx:
+            return self._latest.keys()
+
+    def update(self, updates):
+        with self._mtx:
+            self._latest.update(updates)
+
+
+tracker = Tracker()
index f253621ced4ecccd578eb0f810d6c8bbcc2d7dee..8d5b855cdb4370927991e0aadc4f077f920d9eaa 100644 (file)
@@ -1,6 +1,16 @@
 # Azure configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index b25bf940cf4564a9da43b254c08812b74bf8f528..d5bed57b95811795ee83529935edf897f2339b18 100644 (file)
@@ -1,6 +1,16 @@
 # EC2 configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # The dispatcher can customize the start and stop procedure for
 # cloud nodes.  For example, the SLURM dispatcher drains nodes
index ed7bdc3b3d7d9b423b217c3057f83b0ef86823bc..043bb9567d04909ec848f027365e464a859b2a6b 100644 (file)
@@ -1,6 +1,16 @@
 # Google Compute Engine configuration for Arvados Node Manager.
 # All times are in seconds unless specified otherwise.
 
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
 [Daemon]
 # Node Manager will ensure that there are at least this many nodes running at
 # all times.  If node manager needs to start new idle nodes for the purpose of
index 314750e6cd3ade7c6bf6abdd25e4803b6a7acb2c..85b4c7d4a7c3f545e3cdff1d23e8cdef9aaf5d48 100644 (file)
@@ -5,6 +5,10 @@
 # is through the API server Rails console: load the Node object, set its
 # IP address to 10.10.0.N (where N is the cloud node's ID), and save.
 
+[Manage]
+address = 0.0.0.0
+port = 8989
+
 [Daemon]
 min_nodes = 0
 max_nodes = 8
index c30108f44bb65a487945e665e7f2afae91528c00..5eb923eb93079cc28bc0d1836c9f4b6dea6635a2 100644 (file)
@@ -29,17 +29,23 @@ setup(name='arvados-node-manager',
           ('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
       ],
       install_requires=[
-        'apache-libcloud>=0.16',
-        'arvados-python-client>=0.1.20150206225333',
-        'pykka',
-        'python-daemon',
-        'setuptools'
-        ],
-      dependency_links = [
+          'apache-libcloud>=0.16',
+          'arvados-python-client>=0.1.20150206225333',
+          'future',
+          'pykka',
+          'python-daemon',
+          'setuptools'
+      ],
+      dependency_links=[
           "https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev4.zip"
       ],
       test_suite='tests',
-      tests_require=['pbr<1.7.0', 'mock>=1.0', "apache-libcloud==0.18.1.dev4"],
+      tests_require=[
+          'requests',
+          'pbr<1.7.0',
+          'mock>=1.0',
+          'apache-libcloud==0.18.1.dev4',
+      ],
       zip_safe=False,
       cmdclass={'egg_info': tagger},
       )
index e49fc39eed3dad01be48004047341ec650a81a5e..04ff9b6d79962922ea8a3327edc726db528b524e 100644 (file)
@@ -9,9 +9,11 @@ import mock
 import pykka
 
 import arvnodeman.daemon as nmdaemon
+import arvnodeman.status as status
 from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
+from . import test_status
 import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
@@ -355,10 +357,16 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         monitor = self.monitor_list()[0].proxy()
         self.daemon.update_server_wishlist([])
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+        self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
         self.assertTrue(self.node_shutdown.start.called,
                         "daemon did not shut down booted node on offer")
 
+        with test_status.TestServer() as srv:
+            self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
+            self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
+            self.assertEqual(0, srv.get_status().get('nodes_wish', None))
+
     def test_booted_node_lifecycle(self):
         cloud_node = testutil.cloud_node_mock(6)
         setup = self.start_node_boot(cloud_node, id_num=6)
diff --git a/services/nodemanager/tests/test_status.py b/services/nodemanager/tests/test_status.py
new file mode 100644 (file)
index 0000000..c11fe40
--- /dev/null
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import requests
+import unittest
+
+import arvnodeman.status as status
+import arvnodeman.config as config
+
+
+class TestServer(object):
+    def __enter__(self):
+        cfg = config.NodeManagerConfig()
+        cfg.set('Manage', 'port', '0')
+        cfg.set('Manage', 'address', '127.0.0.1')
+        self.srv = status.Server(cfg)
+        self.srv.start()
+        addr, port = self.srv.server_address
+        self.srv_base = 'http://127.0.0.1:'+str(port)
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.srv.shutdown()
+
+    def get_status_response(self):
+        return requests.get(self.srv_base+'/status.json')
+
+    def get_status(self):
+        return self.get_status_response().json()
+
+
+class StatusServerUpdates(unittest.TestCase):
+    def test_updates(self):
+        with TestServer() as srv:
+            for n in [1, 2, 3]:
+                status.tracker.update({'nodes_'+str(n): n})
+                r = srv.get_status_response()
+                self.assertEqual(200, r.status_code)
+                self.assertEqual('application/json', r.headers['content-type'])
+                resp = r.json()
+                self.assertEqual(n, resp['nodes_'+str(n)])
+            self.assertEqual(1, resp['nodes_1'])
+
+
+class StatusServerDisabled(unittest.TestCase):
+    def test_config_disabled(self):
+        cfg = config.NodeManagerConfig()
+        cfg.set('Manage', 'port', '-1')
+        cfg.set('Manage', 'address', '127.0.0.1')
+        self.srv = status.Server(cfg)
+        self.srv.start()
+        self.assertFalse(self.srv.enabled)
+        self.assertFalse(getattr(self.srv, '_thread', False))