Merge branch 'master' into 10231-keep-cache-runtime-constraints
authorradhika <radhika@curoverse.com>
Mon, 31 Oct 2016 16:04:51 +0000 (12:04 -0400)
committerradhika <radhika@curoverse.com>
Mon, 31 Oct 2016 16:04:51 +0000 (12:04 -0400)
49 files changed:
COPYING
apps/workbench/app/assets/javascripts/work_unit_component.js [new file with mode: 0644]
apps/workbench/app/controllers/work_units_controller.rb
apps/workbench/app/models/container.rb
apps/workbench/app/models/container_request.rb
apps/workbench/app/models/container_work_unit.rb
apps/workbench/app/models/job.rb
apps/workbench/app/models/job_task.rb
apps/workbench/app/models/pipeline_instance.rb
apps/workbench/app/models/pipeline_instance_work_unit.rb
apps/workbench/app/models/proxy_work_unit.rb
apps/workbench/app/models/work_unit.rb
apps/workbench/app/views/pipeline_instances/_show_components.html.erb
apps/workbench/app/views/work_units/_component_detail.html.erb
apps/workbench/app/views/work_units/_show_child.html.erb
apps/workbench/app/views/work_units/_show_component.html.erb
apps/workbench/app/views/work_units/_show_status.html.erb
apps/workbench/config/routes.rb
apps/workbench/test/controllers/work_units_controller_test.rb
apps/workbench/test/integration/jobs_test.rb
apps/workbench/test/integration/work_units_test.rb
build/run-build-docker-jobs-image.sh
build/run-build-packages.sh
build/run-library.sh
docker/jobs/Dockerfile
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/gittaggers.py [changed from symlink to file mode: 0644]
sdk/cwl/setup.py
sdk/python/arvados/arvfile.py
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py
services/api/app/controllers/arvados/v1/containers_controller.rb
services/api/app/models/container.rb
services/api/config/routes.rb
services/api/test/fixtures/api_client_authorizations.yml
services/api/test/fixtures/collections.yml
services/api/test/fixtures/containers.yml
services/api/test/functional/arvados/v1/containers_controller_test.rb
services/api/test/unit/container_request_test.rb
services/api/test/unit/container_test.rb
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/fuse/arvados_fuse/__init__.py
services/fuse/tests/integration_test.py
services/fuse/tests/test_mount.py
services/fuse/tests/test_token_expiry.py [new file with mode: 0644]
tools/arvbox/bin/arvbox

diff --git a/COPYING b/COPYING
index acbd7523ed49f01217874965aa3180cccec89d61..2cba2ad20a1fbf4aeae5e00ad55f64ed0eeb5b47 100644 (file)
--- a/COPYING
+++ b/COPYING
@@ -1,11 +1,15 @@
 Server-side components of Arvados contained in the apps/ and services/
 directories, including the API Server, Workbench, and Crunch, are licensed
-under the GNU Affero General Public License version 3 (see agpl-3.0.txt)
+under the GNU Affero General Public License version 3 (see agpl-3.0.txt).
+
+The files and directories under the build/, lib/ and tools/ directories are
+licensed under the GNU Affero General Public License version 3 (see
+agpl-3.0.txt).
 
 The Arvados client Software Development Kits contained in the sdk/ directory,
-example scripts in the crunch_scripts/ directory, and code samples in the
-Aravados documentation are licensed under the Apache License, Version 2.0 (see
-LICENSE-2.0.txt)
+example scripts in the crunch_scripts/ directory, the files and directories
+under backports/ and docker/, and code samples in the Aravados documentation
+are licensed under the Apache License, Version 2.0 (see LICENSE-2.0.txt).
 
 The Arvados Documentation located in the doc/ directory is licensed under the
-Creative Commons Attribution-Share Alike 3.0 United States (see by-sa-3.0.txt)
\ No newline at end of file
+Creative Commons Attribution-Share Alike 3.0 United States (see by-sa-3.0.txt).
diff --git a/apps/workbench/app/assets/javascripts/work_unit_component.js b/apps/workbench/app/assets/javascripts/work_unit_component.js
new file mode 100644 (file)
index 0000000..baff0e8
--- /dev/null
@@ -0,0 +1,18 @@
+$(document).
+    on('click', '.component-detail-panel', function(event) {
+      var href = $($(event.target).attr('href'));
+      if ($(href).attr("class").split(' ').indexOf("in") == -1) {
+        return;   // collapsed; nothing more to do
+      }
+
+      var content_div = href.find('.work-unit-component-detail-body');
+      content_div.html('<div class="spinner spinner-32px col-sm-1"></div>');
+      var content_url = href.attr('content-url');
+      var action_data = href.attr('action-data');
+      $.ajax(content_url, {dataType: 'html', type: 'POST', data: {action_data: action_data}}).
+          done(function(data, status, jqxhr) {
+              content_div.html(data);
+          }).fail(function(jqxhr, status, error) {
+              content_div.html(error);
+          });
+      });
index 6ed25dd334a77b083197c6ad8e39815aa8ea3eba..fe6bff1cee4dfd7fa42a8b487376713808cbbd29 100644 (file)
@@ -1,4 +1,9 @@
 class WorkUnitsController < ApplicationController
+  skip_around_filter :require_thread_api_token, if: proc { |ctrl|
+    Rails.configuration.anonymous_user_token and
+    'show_child_component' == ctrl.action_name
+  }
+
   def find_objects_for_index
     # If it's not the index rows partial display, just return
     # The /index request will again be invoked to display the
@@ -111,4 +116,53 @@ class WorkUnitsController < ApplicationController
       render_error status: 422
     end
   end
+
+  def find_object_by_uuid
+    if params['object_type']
+      @object = params['object_type'].constantize.find(params['uuid'])
+    else
+      super
+    end
+  end
+
+  def show_child_component
+    data = JSON.load(params[:action_data])
+
+    current_obj = {}
+    current_obj_uuid = data['current_obj_uuid']
+    current_obj_name = data['current_obj_name']
+    current_obj_type = data['current_obj_type']
+    current_obj_parent = data['current_obj_parent']
+    if current_obj_uuid
+      resource_class = resource_class_for_uuid current_obj_uuid
+      obj = object_for_dataclass(resource_class, current_obj_uuid)
+      current_obj = obj if obj
+    end
+
+    if current_obj.is_a?(Hash) and !current_obj.any?
+      if current_obj_parent
+        resource_class = resource_class_for_uuid current_obj_parent
+        parent = object_for_dataclass(resource_class, current_obj_parent)
+        parent_wu = parent.work_unit
+        children = parent_wu.children
+        if current_obj_uuid
+          wu = children.select {|c| c.uuid == current_obj_uuid}.first
+        else current_obj_name
+          wu = children.select {|c| c.label.to_s == current_obj_name}.first
+        end
+      end
+    else
+      if current_obj_type == JobWorkUnit.to_s
+        wu = JobWorkUnit.new(current_obj, current_obj_name, current_obj_parent)
+      elsif current_obj_type == PipelineInstanceWorkUnit.to_s
+        wu = PipelineInstanceWorkUnit.new(current_obj, current_obj_name, current_obj_parent)
+      elsif current_obj_type == ContainerWorkUnit.to_s
+        wu = ContainerWorkUnit.new(current_obj, current_obj_name, current_obj_parent)
+      end
+    end
+
+    respond_to do |f|
+      f.html { render(partial: "show_component", locals: {wu: wu}) }
+    end
+  end
 end
index 0a7c288718f581f89235cb9a9fed9fdcd76d1d8e..e683a6e4f2b28e501ce7adb3574c12db796b73b4 100644 (file)
@@ -4,6 +4,6 @@ class Container < ArvadosBase
   end
 
   def work_unit(label=nil)
-    ContainerWorkUnit.new(self, label)
+    ContainerWorkUnit.new(self, label, self.uuid)
   end
 end
index 0148de51f7459a678d49547fe4f24a10e6bc27e9..aae712b343e1468386ee77ba257d6eee04848063 100644 (file)
@@ -12,6 +12,6 @@ class ContainerRequest < ArvadosBase
   end
 
   def work_unit(label=nil)
-    ContainerWorkUnit.new(self, label)
+    ContainerWorkUnit.new(self, label, self.uuid)
   end
 end
index b6e72dc526538d022166d52ff0a342373bb585cc..88aab306cedc8b9ea5a8a94a27cc394f2022780b 100644 (file)
@@ -1,7 +1,7 @@
 class ContainerWorkUnit < ProxyWorkUnit
   attr_accessor :container
 
-  def initialize proxied, label
+  def initialize proxied, label, parent
     super
     if @proxied.is_a?(ContainerRequest)
       container_uuid = get(:container_uuid)
@@ -12,7 +12,7 @@ class ContainerWorkUnit < ProxyWorkUnit
   end
 
   def children
-    return self.my_children if self.my_children
+    return @my_children if @my_children
 
     container_uuid = nil
     container_uuid = if @proxied.is_a?(Container) then uuid else get(:container_uuid) end
@@ -25,7 +25,7 @@ class ContainerWorkUnit < ProxyWorkUnit
       end
     end
 
-    self.my_children = items
+    @my_children = items
   end
 
   def title
index bf202c4eaaadffbd92f1a44d1160c3bd8c51572e..7bfed6d44b5c089fb9ecc518362e026e787762f4 100644 (file)
@@ -54,6 +54,6 @@ class Job < ArvadosBase
   end
 
   def work_unit(label=nil)
-    JobWorkUnit.new(self, label)
+    JobWorkUnit.new(self, label, self.uuid)
   end
 end
index 9fb04737badb1114547c0a6a581f71e9b2bbd459..654e0a37e0897c6b85d4bd60a46ad0f76c521e0d 100644 (file)
@@ -1,5 +1,5 @@
 class JobTask < ArvadosBase
   def work_unit(label=nil)
-    JobTaskWorkUnit.new(self, label)
+    JobTaskWorkUnit.new(self, label, self.uuid)
   end
 end
index 62bbc5431937e6a4b89826a3f03e2cda5c37ff27..e9fa04ab6d09419f7875354a01c26b33c8c0b679 100644 (file)
@@ -133,7 +133,7 @@ class PipelineInstance < ArvadosBase
   end
 
   def work_unit(label=nil)
-    PipelineInstanceWorkUnit.new(self, label || self.name)
+    PipelineInstanceWorkUnit.new(self, label || self.name, self.uuid)
   end
 
   private
index dd5685ac3d8082d5a5836896afa310b416e728f2..293a77c099d829ae58e58d07784932ba1e9c5fdf 100644 (file)
@@ -18,10 +18,10 @@ class PipelineInstanceWorkUnit < ProxyWorkUnit
           if job[:uuid] and jobs[job[:uuid]]
             items << jobs[job[:uuid]].work_unit(name)
           else
-            items << JobWorkUnit.new(job, name)
+            items << JobWorkUnit.new(job, name, uuid)
           end
         else
-          items << JobWorkUnit.new(c, name)
+          items << JobWorkUnit.new(c, name, uuid)
         end
       else
         @unreadable_children = true
index 44905be061d9808d58b34950d9fa0d36edac1bb2..48bc3a04bc95dd41915e317449e7287ed4e42bce 100644 (file)
@@ -6,9 +6,10 @@ class ProxyWorkUnit < WorkUnit
   attr_accessor :my_children
   attr_accessor :unreadable_children
 
-  def initialize proxied, label
+  def initialize proxied, label, parent
     @lbl = label
     @proxied = proxied
+    @parent = parent
   end
 
   def label
@@ -19,6 +20,10 @@ class ProxyWorkUnit < WorkUnit
     get(:uuid)
   end
 
+  def parent
+    @parent
+  end
+
   def modified_by_user_uuid
     get(:modified_by_user_uuid)
   end
@@ -322,7 +327,7 @@ class ProxyWorkUnit < WorkUnit
     if obj.respond_to? key
       obj.send(key)
     elsif obj.is_a?(Hash)
-      obj[key]
+      obj[key] || obj[key.to_s]
     end
   end
 end
index 0c384bb209d905f0e8d3efa164c79383b0088059..dd4a706f9d6edfdbaf60b7607e62d3f40a672fe2 100644 (file)
@@ -9,6 +9,10 @@ class WorkUnit
     # returns the arvados UUID of the underlying object
   end
 
+  def parent
+    # returns the parent uuid of this work unit
+  end
+
   def children
     # returns an array of child work units
   end
index b79759f989181564dc380e4dce55bdd5c27f292d..560b3a512b3d327b50c564ff55b947fcae854c75 100644 (file)
@@ -2,7 +2,7 @@
 
   <%
      job_uuids = @object.components.map { |k,j| j.is_a? Hash and j[:job].andand[:uuid] }.compact
-     throttle = @object.state.start_with?('Running') ? 5000 : 15000
+     throttle = 86486400000 # 1001 nights
      %>
   <div class="arv-log-refresh-control"
        data-load-throttle="<%= throttle %>"
index 7d588bace69d05651ca675a4069fa4ecafcd27be..bb5b913dfe4b85bdb28429b422fb611dd1b59a4f 100644 (file)
@@ -1,3 +1,13 @@
+<%
+  collections = [current_obj.outputs, current_obj.docker_image].flatten.compact.uniq
+  collections_pdhs = collections.select {|x| !CollectionsHelper.match(x).nil?}.uniq.compact
+  collections_uuids = collections - collections_pdhs
+  preload_collections_for_objects collections_uuids if collections_uuids.any?
+  preload_for_pdhs collections_pdhs if collections_pdhs.any?
+
+  preload_objects_for_dataclass(Repository, [current_obj.repository], :name) if current_obj.repository
+%>
+
       <div class="container">
         <div class="row">
           <div class="col-md-5">
             <% else %>
             <table>
               <% keys = [:uuid, :modified_by_user_uuid, :created_at, :started_at, :finished_at, :container_uuid, :priority] %>
-              <% keys << :outputs if @object.uuid == current_obj.uuid %>
+              <% keys << :log_collection if @object.uuid != current_obj.uuid %>
+              <% keys << :outputs %>
               <% keys.each do |k| %>
-                <% val = current_obj.send(k) if current_obj.respond_to?(k) %>
-                <% has_val = val %>
-                <% has_val = val.andand.any? if k == :outputs %>
+                <%
+                  val = current_obj.send(k) if current_obj.respond_to?(k)
+                  if k == :outputs
+                    has_val = val.andand.any?
+                  elsif k == :log_collection and current_obj.state_label == "Running"
+                    has_val = true
+                  else
+                    has_val = val
+                  end
+                %>
                 <% if has_val %>
                 <tr>
                   <td style="padding-right: 1em">
@@ -29,6 +47,8 @@
                       <% else %>
                         <%= render partial: 'work_units/show_outputs', locals: {id: current_obj.uuid, outputs: val, align:""} %>
                       <% end %>
+                    <% elsif k == :log_collection %>
+                      <%= render partial: 'work_units/show_log_link', locals: {wu: current_obj} %>
                     <% else %>
                       <%= val %>
                     <% end %>
index 2693334bc21404b0631e313d08a73311cd369f91..8bb33b54cb7523ac3ecf5516a55e95358eb3e816 100644 (file)
@@ -1,9 +1,9 @@
 <div class="panel panel-default">
   <div class="panel-heading">
       <div class="row">
-        <div class="col-md-2" style="word-break:break-all;">
+        <div class="col-md-3" style="word-break:break-all;">
           <h4 class="panel-title">
-            <a data-toggle="collapse" href="#collapse<%= i %>">
+            <a class="component-detail-panel" data-toggle="collapse" href="#collapse<%= i %>">
               <%= current_obj.label %> <span class="caret"></span>
             </a>
           </h4>
         </div>
 
         <% if not current_obj %>
-          <div class="col-md-8"></div>
+          <div class="col-md-7"></div>
         <% else %>
-          <div class="col-md-1">
-            <%= render partial: 'work_units/show_log_link', locals: {wu: current_obj} %>
-          </div>
-
           <% walltime = current_obj.walltime %>
           <% cputime = current_obj.cputime %>
           <div class="col-md-3">
                 <%= current_obj.child_summary_str %>
               </span>
             </div>
-          <% elsif current_obj.is_finished? %>
-            <div class="col-md-3 text-overflow-ellipsis">
-              <% outputs = current_obj.outputs %>
-              <% if outputs.any? %>
-                <% if outputs.size == 1 %>
-                  <%= link_to_arvados_object_if_readable(outputs[0], 'Output data not available', link_text: "Output of #{current_obj.label}") %>
-                <% else %>
-                  <%= render partial: 'work_units/show_outputs', locals: {id: current_obj.uuid, outputs: outputs, align:"pull-right"} %>
-                <% end %>
-              <% else %>
-                No output.
-              <% end %>
-            </div>
           <% end %>
 
           <div class="col-md-1 pipeline-instance-spacing">
@@ -67,9 +50,9 @@
       </div>
   </div>
 
-  <div id="collapse<%= i %>" class="panel-collapse collapse <%= if expanded then 'in' end %>">
-    <div class="panel-body">
-      <%= render partial: 'work_units/show_component', locals: {wu: current_obj} %>
+  <% content_url = url_for(controller: :work_units, action: :show_child_component, id: @object.uuid, object_type: @object.class.to_s) %>
+  <div id="collapse<%=i%>" class="work-unit-component-detail panel-collapse collapse <%= if expanded then 'in' end %>" content-url="<%=content_url%>" action-data="<%={current_obj_type: current_obj.class.to_s, current_obj_uuid: current_obj.uuid, current_obj_name: current_obj.label, current_obj_parent: current_obj.parent}.to_json%>">
+    <div class="panel-body work-unit-component-detail-body">
     </div>
   </div>
 </div>
index 4feb292209689f069de1e4c6911257730b5f31c9..8f0c2d71eafa351082c5f7756162b928bef496ba 100644 (file)
 </p>
 
 <%# Work unit children %>
-
-<%
-  uuids = wu.children.collect {|c| c.uuid}.compact
-  if uuids.any?
-    resource_class = resource_class_for_uuid(uuids.first, friendly_name: true)
-
-    start = 0; inc = 200
-    while start < uuids.length
-      preload_objects_for_dataclass resource_class, uuids[start, inc]
-      start += inc
-    end
-  end
-
-  collections = wu.outputs.flatten.uniq
-  collections << wu.log_collection if wu.log_collection
-  collections << wu.docker_image if wu.docker_image
-  collections = wu.children.collect {|j| j.outputs}.compact
-  collections = collections.flatten.uniq
-  collections.concat wu.children.collect {|j| j.docker_image}.uniq.compact
-  collections.concat wu.children.collect {|j| j.log_collection}.uniq.compact
-  collections_pdhs = collections.select {|x| !(m = CollectionsHelper.match(x)).nil?}.uniq.compact
-  collections_uuids = collections - collections_pdhs
-
-  if collections_uuids.any?
-    start = 0; inc = 200
-    while start < collections_uuids.length
-      preload_collections_for_objects collections_uuids[start, inc]
-      start += inc
-    end
-  end
-
-  if collections_pdhs.any?
-    start = 0; inc = 200
-    while start < collections_pdhs.length
-      preload_for_pdhs collections_pdhs[start, inc]
-      start += inc
-    end
-  end
-
-  repos = wu.children.collect {|c| c.repository}.uniq.compact
-  preload_objects_for_dataclass(Repository, repos, :name) if repos.any?
-%>
-
 <% if wu.has_unreadable_children %>
   <%= render(partial: "pipeline_instances/show_components_json",
              locals: {error_name: "Unreadable components", backtrace: nil, wu: wu}) %>
 <% else %>
-  <% @descendent_count = 0 if !@descendent_count %>
   <% wu.children.each do |c| %>
-    <% @descendent_count += 1 %>
-    <%= render(partial: 'work_units/show_child', locals: {current_obj: c, i: @descendent_count, expanded: false}) %>
+    <%= render(partial: 'work_units/show_child', locals: {current_obj: c, i: (c.uuid || rand(2**128).to_s(36)), expanded: false}) %>
   <% end %>
 <% end %>
index 4b629c8783414ff59f3f64563b851fbbcc604e47..f2052ef0edead69473db5cfeee18518898089eff 100644 (file)
@@ -1,5 +1,5 @@
 <div class="arv-log-refresh-control"
-     data-load-throttle="15000"
+     data-load-throttle="86486400000" <%# 1001 nights %>
      ></div>
 <%=
    render(partial: 'work_units/show_component', locals: {wu: current_obj.work_unit(name)})
index 7f7854864190318171750cb498a0dc7d941156c9..7c2312c1cee57c7ffef86ad300539d8c575db5bf 100644 (file)
@@ -15,7 +15,9 @@ ArvadosWorkbench::Application.routes.draw do
   get "star" => 'actions#star', :as => :star
   get "all_processes" => 'work_units#index', :as => :all_processes
   get "choose_work_unit_templates" => 'work_unit_templates#choose', :as => :choose_work_unit_templates
-  resources :work_units
+  resources :work_units do
+    post 'show_child_component', :on => :member
+  end
   resources :nodes
   resources :humans
   resources :traits
index ee18861c92099ea91ec62bf551ff7ad17317e6f0..12e0271260edf3f815c6be70b0e995438d32761e 100644 (file)
@@ -65,27 +65,4 @@ class WorkUnitsControllerTest < ActionController::TestCase
                           }]
     get :index, encoded_params, session_for(:active)
   end
-
-  [
-    [Job, 'active', 'running_job_with_components', '/jobs/zzzzz-8i9sb-jyq01m7in1jlofj#Log'],
-    [PipelineInstance, 'active', 'pipeline_in_running_state', '/jobs/zzzzz-8i9sb-pshmckwoma9plh7#Log'],
-    [PipelineInstance, nil, 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', 'Log unavailable'],
-  ].each do |type, token, fixture, log_link|
-    test "link_to_log for #{fixture} for #{token}" do
-      use_token 'admin'
-      obj = find_fixture(type, fixture)
-
-      @controller = if type == Job then JobsController.new else PipelineInstancesController.new end
-
-      if token
-        get :show, {id: obj['uuid']}, session_for(token)
-      else
-        Rails.configuration.anonymous_user_token =
-          api_fixture("api_client_authorizations", "anonymous", "api_token")
-        get :show, {id: obj['uuid']}
-      end
-
-      assert_includes @response.body, log_link
-    end
-  end
 end
index e39d6f4dbf66576df928df93b7cf43030240d3f2..7708ffdc0a42e11e63e7a5f8145e7c2396ef4742 100644 (file)
@@ -154,7 +154,7 @@ class JobsTest < ActionDispatch::IntegrationTest
 
       if readable
         click_link('component1')
-        within('#collapse1') do
+        within('.panel-collapse') do
           assert(has_text? component1['uuid'])
           assert(has_text? component1['script_version'])
           assert(has_text? 'script_parameters')
index b1d5a2158909d68cae12ab58512168df8167fbaa..f04616dd383ac49f927a544fb2e7d372c30b8acb 100644 (file)
@@ -207,4 +207,29 @@ class WorkUnitsTest < ActionDispatch::IntegrationTest
       assert_text(expect_log_text)
     end
   end
+
+  [
+    ['jobs', 'active', 'running_job_with_components', 'component1', '/jobs/zzzzz-8i9sb-jyq01m7in1jlofj#Log'],
+    ['pipeline_instances', 'active', 'pipeline_in_running_state', 'foo', '/jobs/zzzzz-8i9sb-pshmckwoma9plh7#Log'],
+    ['pipeline_instances', nil, 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', 'foo', 'Log unavailable'],
+  ].each do |type, token, fixture, child, log_link|
+    test "link_to_log for #{fixture} for #{token}" do
+      obj = api_fixture(type)[fixture]
+      if token
+        visit page_with_token token, "/#{type}/#{obj['uuid']}"
+      else
+        Rails.configuration.anonymous_user_token =
+          api_fixture("api_client_authorizations", "anonymous", "api_token")
+        visit "/#{type}/#{obj['uuid']}"
+      end
+
+      click_link(child)
+
+      if token
+        assert_selector "a[href=\"#{log_link}\"]"
+      else
+        assert_text log_link
+      end
+    end
+  end
 end
index 22f6f54288741d2ed4723e9260234b469fb24591..7b5ea4ecec6bcda873ed8a98ea09d862e8dd86f4 100755 (executable)
@@ -5,8 +5,8 @@ function usage {
     echo >&2 "usage: $0 [options]"
     echo >&2
     echo >&2 "$0 options:"
-    echo >&2 "  -t, --tags [csv_tags]         comma separated tags"
     echo >&2 "  -u, --upload                  Upload the images (docker push)"
+    echo >&2 "  --no-cache                    Don't use build cache"
     echo >&2 "  -h, --help                    Display this help and exit"
     echo >&2
     echo >&2 "  If no options are given, just builds the images."
@@ -16,7 +16,7 @@ upload=false
 
 # NOTE: This requires GNU getopt (part of the util-linux package on Debian-based distros).
 TEMP=`getopt -o hut: \
-    --long help,upload,tags: \
+    --long help,upload,no-cache,tags: \
     -n "$0" -- "$@"`
 
 if [ $? != 0 ] ; then echo "Use -h for help"; exit 1 ; fi
@@ -30,6 +30,10 @@ do
             upload=true
             shift
             ;;
+        --no-cache)
+            NOCACHE=--no-cache
+            shift
+            ;;
         -t | --tags)
             case "$2" in
                 "")
@@ -38,7 +42,7 @@ do
                   exit 1
                   ;;
                 *)
-                  tags=$2;
+                  echo "WARNING: --tags is deprecated and doesn't do anything";
                   shift 2
                   ;;
             esac
@@ -66,14 +70,6 @@ COLUMNS=80
 . $WORKSPACE/build/run-library.sh
 
 docker_push () {
-    if [[ ! -z "$tags" ]]
-    then
-        for tag in $( echo $tags|tr "," " " )
-        do
-             $DOCKER tag $1 $1:$tag
-        done
-    fi
-
     # Sometimes docker push fails; retry it a few times if necessary.
     for i in `seq 1 5`; do
         $DOCKER push $*
@@ -118,13 +114,28 @@ timer_reset
 
 # clean up the docker build environment
 cd "$WORKSPACE"
-cd docker/jobs
-if [[ ! -z "$tags" ]]; then
-    docker build --build-arg COMMIT=${tags/,*/} -t arvados/jobs .
+
+python_sdk_ts=$(cd sdk/python && timestamp_from_git)
+cwl_runner_ts=$(cd sdk/cwl && timestamp_from_git)
+
+python_sdk_version=$(cd sdk/python && nohash_version_from_git 0.1)-2
+cwl_runner_version=$(cd sdk/cwl && nohash_version_from_git 1.0)-3
+
+if [[ $python_sdk_ts -gt $cwl_runner_ts ]]; then
+    cwl_runner_version=$python_sdk_version
+    gittag=$(cd sdk/python && git log --first-parent --max-count=1 --format=format:%H)
 else
-    docker build -t arvados/jobs .
+    gittag=$(cd sdk/cwl && git log --first-parent --max-count=1 --format=format:%H)
 fi
 
+echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
+
+cd docker/jobs
+docker build $NOCACHE \
+       --build-arg python_sdk_version=$python_sdk_version \
+       --build-arg cwl_runner_version=$cwl_runner_version \
+       -t arvados/jobs:$gittag .
+
 ECODE=$?
 
 if [[ "$ECODE" != "0" ]]; then
@@ -134,19 +145,43 @@ fi
 checkexit $ECODE "docker build"
 title "docker build complete (`timer`)"
 
+if [[ "$ECODE" != "0" ]]; then
+  exit_cleanly
+fi
+
+timer_reset
+
+if docker --version |grep " 1\.[0-9]\." ; then
+    # Docker version prior 1.10 require -f flag
+    # -f flag removed in Docker 1.12
+    FORCE=-f
+fi
+
+docker tag $FORCE arvados/jobs:$gittag arvados/jobs:latest
+
+ECODE=$?
+
+if [[ "$ECODE" != "0" ]]; then
+    EXITCODE=$(($EXITCODE + $ECODE))
+fi
+
+checkexit $ECODE "docker tag"
+title "docker tag complete (`timer`)"
+
 title "uploading images"
 
 timer_reset
 
 if [[ "$ECODE" != "0" ]]; then
-    title "upload arvados images SKIPPED because build failed"
+    title "upload arvados images SKIPPED because build or tag failed"
 else
     if [[ $upload == true ]]; then
         ## 20150526 nico -- *sometimes* dockerhub needs re-login
         ## even though credentials are already in .dockercfg
         docker login -u arvados
 
-        docker_push arvados/jobs
+        docker_push arvados/jobs:$gittag
+        docker_push arvados/jobs:latest
         title "upload arvados images finished (`timer`)"
     else
         title "upload arvados images SKIPPED because no --upload option set (`timer`)"
index c84e2ccea14303b5a82d31c285b3111eb567fe87..12c92607de51e3fd4aa7b6ec129e438b9427b1ec 100755 (executable)
@@ -172,7 +172,7 @@ case "$TARGET" in
             oauth2client==1.5.2 pyasn1==0.1.7 pyasn1-modules==0.0.5 \
             rsa uritemplate httplib2 ws4py pykka  \
             ciso8601 pycrypto 'pycurl<7.21.5' \
-            python-daemon llfuse==0.41.1 'pbr<1.0' pyyaml \
+            python-daemon==2.1.1 llfuse==0.41.1 'pbr<1.0' pyyaml \
             'rdflib>=4.2.0' shellescape mistune typing avro \
             isodate pyparsing sparqlwrapper html5lib==0.9999999 keepalive \
             ruamel.ordereddict cachecontrol)
index 73a99dabd7b3626582a418040e1ce6713a68ada9..f0b120f6bf1e4e011a69f9f811ee67ad55624938 100755 (executable)
@@ -35,13 +35,19 @@ format_last_commit_here() {
 version_from_git() {
   # Generates a version number from the git log for the current working
   # directory, and writes it to stdout.
-  local git_ts git_hash
+  local git_ts git_hash prefix
+  if [[ -n "$1" ]] ; then
+      prefix="$1"
+  else
+      prefix="0.1"
+  fi
+
   declare $(format_last_commit_here "git_ts=%ct git_hash=%h")
-  echo "0.1.$(date -ud "@$git_ts" +%Y%m%d%H%M%S).$git_hash"
+  echo "${prefix}.$(date -ud "@$git_ts" +%Y%m%d%H%M%S).$git_hash"
 }
 
 nohash_version_from_git() {
-    version_from_git | cut -d. -f1-3
+    version_from_git $1 | cut -d. -f1-3
 }
 
 timestamp_from_git() {
index e1e7e87c5e53d0c297ec6d2e3ad0870890f402ff..9b1be1e74df9e36501ed0bd8def2d70cd7e1bfea 100644 (file)
@@ -8,10 +8,16 @@ ADD apt.arvados.org.list /etc/apt/sources.list.d/
 RUN apt-key adv --keyserver pool.sks-keyservers.net --recv 1078ECD7
 RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3
 
-ARG COMMIT=latest
-RUN echo $COMMIT && apt-get update -q
+ARG python_sdk_version
+ARG cwl_runner_version
+RUN echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
 
-RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libgnutls28-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
+RUN apt-get update -q
+RUN apt-get install -yq --no-install-recommends \
+    git python-pip python-virtualenv \
+    python-dev libgnutls28-dev libcurl4-gnutls-dev nodejs \
+    python-arvados-python-client=$python_sdk_version \
+    python-arvados-cwl-runner=$cwl_runner_version
 
 # Install dependencies and set up system.
 RUN /usr/sbin/adduser --disabled-password \
index e11f6a12a679f0d5cf2b6bf6e6b1726d512637a8..3144592fc98f47083581e06fabcf900517d7ab01 100644 (file)
@@ -124,7 +124,8 @@ class ArvCwlRunner(object):
                     try:
                         self.cond.acquire()
                         j = self.processes[uuid]
-                        logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        txt = self.work_api[0].upper() + self.work_api[1:-1]
+                        logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
                         with Perf(metrics, "done %s" % j.name):
                             j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
@@ -235,6 +236,24 @@ class ArvCwlRunner(object):
 
         self.final_output_collection = final
 
+    def set_crunch_output(self):
+        if self.work_api == "containers":
+            try:
+                current = self.api.containers().current().execute(num_retries=self.num_retries)
+                self.api.containers().update(uuid=current['uuid'],
+                                             body={
+                                                 'output': self.final_output_collection.portable_data_hash(),
+                                             }).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.info("Setting container output: %s", e)
+        elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+            self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+                                   body={
+                                       'output': self.final_output_collection.portable_data_hash(),
+                                       'success': self.final_status == "success",
+                                       'progress':1.0
+                                   }).execute(num_retries=self.num_retries)
+
     def arv_executor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
@@ -363,9 +382,6 @@ class ArvCwlRunner(object):
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
-        if self.final_status != "success":
-            raise WorkflowException("Workflow failed.")
-
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
@@ -375,6 +391,10 @@ class ArvCwlRunner(object):
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
             self.make_output_collection(self.output_name, self.final_output)
+            self.set_crunch_output()
+
+        if self.final_status != "success":
+            raise WorkflowException("Workflow failed.")
 
         if kwargs.get("compute_checksum"):
             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
@@ -449,7 +469,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     parser.add_argument("--api", type=str,
                         default=None, dest="work_api",
-                        help="Select work submission API, one of 'jobs' or 'containers'.")
+                        help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
 
     parser.add_argument("--compute-checksum", action="store_true", default=False,
                         help="Compute checksum of contents while collecting outputs",
index ed843477449554612597a1b6d1d6e6d0da4523fb..c2029b965b4f3b5173bcd981a67ff8bf3c809d3d 100644 (file)
@@ -113,10 +113,14 @@ class ArvadosContainer(object):
 
             self.arvrunner.processes[response["container_uuid"]] = self
 
-            logger.info("Container %s (%s) request state is %s", self.name, response["uuid"], response["state"])
+            container = self.arvrunner.api.containers().get(
+                uuid=response["container_uuid"]
+            ).execute(num_retries=self.arvrunner.num_retries)
+
+            logger.info("Container request %s (%s) state is %s with container %s %s", self.name, response["uuid"], response["state"], container["uuid"], container["state"])
 
-            if response["state"] == "Final":
-                self.done(response)
+            if container["state"] in ("Complete", "Cancelled"):
+                self.done(container)
         except Exception as e:
             logger.error("Got error %s" % str(e))
             self.output_callback({}, "permanentFail")
index e5c6d67ac9c3d167206f25535149a88d62c02fe8..9b0680bc83783e94054f68d1a6376d4d1f5883ae 100644 (file)
@@ -21,6 +21,7 @@ import functools
 from arvados.api import OrderedJsonModel
 from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
 from cwltool.load_tool import load_tool
+from cwltool.errors import WorkflowException
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -32,6 +33,7 @@ def run():
 
     arvados_cwl.add_arv_hints()
 
+    runner = None
     try:
         job_order_object = arvados.current_job()['script_parameters']
 
@@ -80,23 +82,18 @@ def run():
         args.basedir = os.getcwd()
         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
         outputObj = runner.arv_executor(t, job_order_object, **vars(args))
-
-        if runner.final_output_collection:
+    except Exception as e:
+        if isinstance(e, WorkflowException):
+            logging.info("Workflow error %s", e)
+        else:
+            logging.exception("Unhandled exception")
+        if runner and runner.final_output_collection:
             outputCollection = runner.final_output_collection.portable_data_hash()
         else:
             outputCollection = None
-
         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
                                              body={
                                                  'output': outputCollection,
-                                                 'success': True,
-                                                 'progress':1.0
-                                             }).execute()
-    except Exception as e:
-        logging.exception("Unhandled exception")
-        api.job_tasks().update(uuid=arvados.current_task()['uuid'],
-                                             body={
-                                                 'output': None,
                                                  'success': False,
                                                  'progress':1.0
                                              }).execute()
deleted file mode 120000 (symlink)
index d59c02ca1758d4b5e484cb6e3280ba0ae2b7c5ea..0000000000000000000000000000000000000000
+++ /dev/null
@@ -1 +0,0 @@
-../python/gittaggers.py
\ No newline at end of file
new file mode 100644 (file)
index 0000000000000000000000000000000000000000..2a292ee061b412d85216c68b81e2d0acd9557765
--- /dev/null
@@ -0,0 +1,37 @@
+from setuptools.command.egg_info import egg_info
+import subprocess
+import time
+import os
+
+SETUP_DIR = os.path.dirname(__file__) or '.'
+
+def choose_version_from():
+    sdk_ts = subprocess.check_output(
+        ['git', 'log', '--first-parent', '--max-count=1',
+         '--format=format:%ct', os.path.join(SETUP_DIR, "../python")]).strip()
+    cwl_ts = subprocess.check_output(
+        ['git', 'log', '--first-parent', '--max-count=1',
+         '--format=format:%ct', SETUP_DIR]).strip()
+    if int(sdk_ts) > int(cwl_ts):
+        getver = os.path.join(SETUP_DIR, "../python")
+    else:
+        getver = SETUP_DIR
+    return getver
+
+class EggInfoFromGit(egg_info):
+    """Tag the build with git commit timestamp.
+
+    If a build tag has already been set (e.g., "egg_info -b", building
+    from source package), leave it alone.
+    """
+
+    def git_timestamp_tag(self):
+        gitinfo = subprocess.check_output(
+            ['git', 'log', '--first-parent', '--max-count=1',
+             '--format=format:%ct', choose_version_from()]).strip()
+        return time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo)))
+
+    def tags(self):
+        if self.tag_build is None:
+            self.tag_build = self.git_timestamp_tag()
+        return egg_info.tags(self)
index 4adcac4daf87b4f4899fb883bb6fe499e4ee53d0..d1c8f9b567839bb6aaf1e78db2d6855b9a6038c2 100644 (file)
@@ -20,7 +20,7 @@ versionfile = os.path.join(SETUP_DIR, "arvados_cwl/_version.py")
 try:
     gitinfo = subprocess.check_output(
         ['git', 'log', '--first-parent', '--max-count=1',
-         '--format=format:%H', SETUP_DIR]).strip()
+         '--format=format:%H', gittaggers.choose_version_from()]).strip()
     with open(versionfile, "w") as f:
         f.write("__version__ = '%s'\n" % gitinfo)
 except Exception as e:
index c394dab810715c2659b6f72f8f5f1e173d711ead..610fd7dc1317b6f0a6af7672d148766dbf9ce961 100644 (file)
@@ -417,6 +417,8 @@ class _BlockManager(object):
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
         self.copies = copies
+        self._pending_write_size = 0
+        self.threads_lock = threading.Lock()
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -482,28 +484,28 @@ class _BlockManager(object):
                 if self._put_queue is not None:
                     self._put_queue.task_done()
 
-    @synchronized
     def start_put_threads(self):
-        if self._put_threads is None:
-            # Start uploader threads.
-
-            # If we don't limit the Queue size, the upload queue can quickly
-            # grow to take up gigabytes of RAM if the writing process is
-            # generating data more quickly than it can be send to the Keep
-            # servers.
-            #
-            # With two upload threads and a queue size of 2, this means up to 4
-            # blocks pending.  If they are full 64 MiB blocks, that means up to
-            # 256 MiB of internal buffering, which is the same size as the
-            # default download block cache in KeepClient.
-            self._put_queue = Queue.Queue(maxsize=2)
-
-            self._put_threads = []
-            for i in xrange(0, self.num_put_threads):
-                thread = threading.Thread(target=self._commit_bufferblock_worker)
-                self._put_threads.append(thread)
-                thread.daemon = True
-                thread.start()
+        with self.threads_lock:
+            if self._put_threads is None:
+                # Start uploader threads.
+
+                # If we don't limit the Queue size, the upload queue can quickly
+                # grow to take up gigabytes of RAM if the writing process is
+                # generating data more quickly than it can be send to the Keep
+                # servers.
+                #
+                # With two upload threads and a queue size of 2, this means up to 4
+                # blocks pending.  If they are full 64 MiB blocks, that means up to
+                # 256 MiB of internal buffering, which is the same size as the
+                # default download block cache in KeepClient.
+                self._put_queue = Queue.Queue(maxsize=2)
+
+                self._put_threads = []
+                for i in xrange(0, self.num_put_threads):
+                    thread = threading.Thread(target=self._commit_bufferblock_worker)
+                    self._put_threads.append(thread)
+                    thread.daemon = True
+                    thread.start()
 
     def _block_prefetch_worker(self):
         """The background downloader thread."""
@@ -555,24 +557,34 @@ class _BlockManager(object):
         self.stop_threads()
 
     @synchronized
-    def repack_small_blocks(self, force=False, sync=False):
+    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
         """Packs small blocks together before uploading"""
-        # Search blocks ready for getting packed together before being committed to Keep.
-        # A WRITABLE block always has an owner.
-        # A WRITABLE block with its owner.closed() implies that it's
-        # size is <= KEEP_BLOCK_SIZE/2.
-        small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
-        if len(small_blocks) <= 1:
-            # Not enough small blocks for repacking
-            return
+        self._pending_write_size += closed_file_size
 
         # Check if there are enough small blocks for filling up one in full
-        pending_write_size = sum([b.size() for b in small_blocks])
-        if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+            # Search blocks ready for getting packed together before being committed to Keep.
+            # A WRITABLE block always has an owner.
+            # A WRITABLE block with its owner.closed() implies that it's
+            # size is <= KEEP_BLOCK_SIZE/2.
+            small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+
+            if len(small_blocks) <= 1:
+                # Not enough small blocks for repacking
+                return
+
+            # Update the pending write size count with its true value, just in case
+            # some small file was opened, written and closed several times.
+            self._pending_write_size = sum([b.size() for b in small_blocks])
+            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+                return
+
             new_bb = self._alloc_bufferblock()
             while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
                 bb = small_blocks.pop(0)
                 arvfile = bb.owner
+                self._pending_write_size -= bb.size()
                 new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
                 arvfile.set_segments([Range(new_bb.blockid,
                                             0,
@@ -846,7 +858,7 @@ class ArvadosFile(object):
             self.flush()
         elif self.closed():
             # All writers closed and size is adequate for repacking
-            self.parent._my_block_manager().repack_small_blocks()
+            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
 
     def closed(self):
         """
index 5cb699f49f7a21eb8d7789a52bd0aea7bdd056f0..34cef6725500e6cb5cd12ec317aa1be8eeb4bd80 100644 (file)
@@ -14,10 +14,14 @@ import hashlib
 import json
 import os
 import pwd
+import time
 import signal
 import socket
 import sys
 import tempfile
+import threading
+import copy
+import logging
 from apiclient import errors as apiclient_errors
 
 import arvados.commands._util as arv_cmd
@@ -276,79 +280,343 @@ class ResumeCache(object):
         self.__init__(self.filename)
 
 
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
-    STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
-                   ['bytes_written', '_seen_inputs'])
-
-    def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
-        self.bytes_written = 0
-        self._seen_inputs = []
-        self.cache = cache
+class ArvPutUploadJob(object):
+    CACHE_DIR = '.cache/arvados/arv-put'
+    EMPTY_STATE = {
+        'manifest' : None, # Last saved manifest checkpoint
+        'files' : {} # Previous run file list: {path : {size, mtime}}
+    }
+
+    def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
+                 name=None, owner_uuid=None, ensure_unique_name=False,
+                 num_retries=None, replication_desired=None,
+                 filename=None, update_time=1.0):
+        self.paths = paths
+        self.resume = resume
         self.reporter = reporter
         self.bytes_expected = bytes_expected
-        super(ArvPutCollectionWriter, self).__init__(**kwargs)
-
-    @classmethod
-    def from_cache(cls, cache, reporter=None, bytes_expected=None,
-                   num_retries=0, replication=0):
+        self.bytes_written = 0
+        self.bytes_skipped = 0
+        self.name = name
+        self.owner_uuid = owner_uuid
+        self.ensure_unique_name = ensure_unique_name
+        self.num_retries = num_retries
+        self.replication_desired = replication_desired
+        self.filename = filename
+        self._state_lock = threading.Lock()
+        self._state = None # Previous run state (file list & manifest)
+        self._current_files = [] # Current run file list
+        self._cache_file = None
+        self._collection = None
+        self._collection_lock = threading.Lock()
+        self._stop_checkpointer = threading.Event()
+        self._checkpointer = threading.Thread(target=self._update_task)
+        self._update_task_time = update_time  # How many seconds wait between update runs
+        self.logger = logging.getLogger('arvados.arv_put')
+        # Load cached data if any and if needed
+        self._setup_state()
+
+    def start(self):
+        """
+        Start supporting thread & file uploading
+        """
+        self._checkpointer.daemon = True
+        self._checkpointer.start()
         try:
-            state = cache.load()
-            state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
-            writer = cls.from_state(state, cache, reporter, bytes_expected,
-                                    num_retries=num_retries,
-                                    replication=replication)
-        except (TypeError, ValueError,
-                arvados.errors.StaleWriterStateError) as error:
-            return cls(cache, reporter, bytes_expected,
-                       num_retries=num_retries,
-                       replication=replication)
-        else:
-            return writer
-
-    def cache_state(self):
-        if self.cache is None:
-            return
-        state = self.dump_state()
-        # Transform attributes for serialization.
-        for attr, value in state.items():
-            if attr == '_data_buffer':
-                state[attr] = base64.encodestring(''.join(value))
-            elif hasattr(value, 'popleft'):
-                state[attr] = list(value)
-        self.cache.save(state)
+            for path in self.paths:
+                # Test for stdin first, in case some file named '-' exist
+                if path == '-':
+                    self._write_stdin(self.filename or 'stdin')
+                elif os.path.isdir(path):
+                    self._write_directory_tree(path)
+                else:
+                    self._write_file(path, self.filename or os.path.basename(path))
+        finally:
+            # Stop the thread before doing anything else
+            self._stop_checkpointer.set()
+            self._checkpointer.join()
+            # Commit all & one last _update()
+            self.manifest_text()
+            self._update()
+            if self.resume:
+                self._cache_file.close()
+                # Correct the final written bytes count
+                self.bytes_written -= self.bytes_skipped
+
+    def save_collection(self):
+        with self._collection_lock:
+            self._my_collection().save_new(
+                name=self.name, owner_uuid=self.owner_uuid,
+                ensure_unique_name=self.ensure_unique_name,
+                num_retries=self.num_retries)
+
+    def destroy_cache(self):
+        if self.resume:
+            try:
+                os.unlink(self._cache_filename)
+            except OSError as error:
+                # That's what we wanted anyway.
+                if error.errno != errno.ENOENT:
+                    raise
+            self._cache_file.close()
+
+    def _collection_size(self, collection):
+        """
+        Recursively get the total size of the collection
+        """
+        size = 0
+        for item in collection.values():
+            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
+                size += self._collection_size(item)
+            else:
+                size += item.size()
+        return size
+
+    def _update_task(self):
+        """
+        Periodically called support task. File uploading is
+        asynchronous so we poll status from the collection.
+        """
+        while not self._stop_checkpointer.wait(self._update_task_time):
+            self._update()
+
+    def _update(self):
+        """
+        Update cached manifest text and report progress.
+        """
+        with self._collection_lock:
+            self.bytes_written = self._collection_size(self._my_collection())
+            # Update cache, if resume enabled
+            if self.resume:
+                with self._state_lock:
+                    # Get the manifest text without comitting pending blocks
+                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+                self._save_state()
+        # Call the reporter, if any
+        self.report_progress()
 
     def report_progress(self):
         if self.reporter is not None:
             self.reporter(self.bytes_written, self.bytes_expected)
 
-    def flush_data(self):
-        start_buffer_len = self._data_buffer_len
-        start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
-        super(ArvPutCollectionWriter, self).flush_data()
-        if self._data_buffer_len < start_buffer_len:  # We actually PUT data.
-            self.bytes_written += (start_buffer_len - self._data_buffer_len)
-            self.report_progress()
-            if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
-                self.cache_state()
-
-    def _record_new_input(self, input_type, source_name, dest_name):
-        # The key needs to be a list because that's what we'll get back
-        # from JSON deserialization.
-        key = [input_type, source_name, dest_name]
-        if key in self._seen_inputs:
-            return False
-        self._seen_inputs.append(key)
-        return True
-
-    def write_file(self, source, filename=None):
-        if self._record_new_input('file', source, filename):
-            super(ArvPutCollectionWriter, self).write_file(source, filename)
-
-    def write_directory_tree(self,
-                             path, stream_name='.', max_manifest_depth=-1):
-        if self._record_new_input('directory', path, stream_name):
-            super(ArvPutCollectionWriter, self).write_directory_tree(
-                path, stream_name, max_manifest_depth)
+    def _write_directory_tree(self, path, stream_name="."):
+        # TODO: Check what happens when multiple directories are passed as
+        # arguments.
+        # If the code below is uncommented, integration test
+        # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
+        # fails, I suppose it is because the manifest_uuid changes because
+        # of the dir addition to stream_name.
+
+        # if stream_name == '.':
+        #     stream_name = os.path.join('.', os.path.basename(path))
+        for item in os.listdir(path):
+            if os.path.isdir(os.path.join(path, item)):
+                self._write_directory_tree(os.path.join(path, item),
+                                os.path.join(stream_name, item))
+            else:
+                self._write_file(os.path.join(path, item),
+                                os.path.join(stream_name, item))
+
+    def _write_stdin(self, filename):
+        with self._collection_lock:
+            output = self._my_collection().open(filename, 'w')
+        self._write(sys.stdin, output)
+        output.close()
+
+    def _write_file(self, source, filename):
+        resume_offset = 0
+        if self.resume:
+            # Check if file was already uploaded (at least partially)
+            with self._collection_lock:
+                try:
+                    file_in_collection = self._my_collection().find(filename)
+                except IOError:
+                    # Not found
+                    file_in_collection = None
+            # If no previous cached data on this file, store it for an eventual
+            # repeated run.
+            if source not in self._state['files']:
+                with self._state_lock:
+                    self._state['files'][source] = {
+                        'mtime': os.path.getmtime(source),
+                        'size' : os.path.getsize(source)
+                    }
+            with self._state_lock:
+                cached_file_data = self._state['files'][source]
+            # See if this file was already uploaded at least partially
+            if file_in_collection:
+                if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+                    if cached_file_data['size'] == file_in_collection.size():
+                        # File already there, skip it.
+                        self.bytes_skipped += cached_file_data['size']
+                        return
+                    elif cached_file_data['size'] > file_in_collection.size():
+                        # File partially uploaded, resume!
+                        resume_offset = file_in_collection.size()
+                    else:
+                        # Inconsistent cache, re-upload the file
+                        self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+                else:
+                    # Local file differs from cached data, re-upload it
+                    pass
+        with open(source, 'r') as source_fd:
+            if resume_offset > 0:
+                # Start upload where we left off
+                with self._collection_lock:
+                    output = self._my_collection().open(filename, 'a')
+                source_fd.seek(resume_offset)
+                self.bytes_skipped += resume_offset
+            else:
+                # Start from scratch
+                with self._collection_lock:
+                    output = self._my_collection().open(filename, 'w')
+            self._write(source_fd, output)
+            output.close(flush=False)
+
+    def _write(self, source_fd, output):
+        first_read = True
+        while True:
+            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+            # Allow an empty file to be written
+            if not data and not first_read:
+                break
+            if first_read:
+                first_read = False
+            output.write(data)
+
+    def _my_collection(self):
+        """
+        Create a new collection if none cached. Load it from cache otherwise.
+        """
+        if self._collection is None:
+            with self._state_lock:
+                manifest = self._state['manifest']
+            if self.resume and manifest is not None:
+                # Create collection from saved state
+                self._collection = arvados.collection.Collection(
+                    manifest,
+                    replication_desired=self.replication_desired)
+            else:
+                # Create new collection
+                self._collection = arvados.collection.Collection(
+                    replication_desired=self.replication_desired)
+        return self._collection
+
+    def _setup_state(self):
+        """
+        Create a new cache file or load a previously existing one.
+        """
+        if self.resume:
+            md5 = hashlib.md5()
+            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+            realpaths = sorted(os.path.realpath(path) for path in self.paths)
+            md5.update('\0'.join(realpaths))
+            if self.filename:
+                md5.update(self.filename)
+            cache_filename = md5.hexdigest()
+            self._cache_file = open(os.path.join(
+                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+                cache_filename), 'a+')
+            self._cache_filename = self._cache_file.name
+            self._lock_file(self._cache_file)
+            self._cache_file.seek(0)
+            with self._state_lock:
+                try:
+                    self._state = json.load(self._cache_file)
+                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+                        # Cache at least partially incomplete, set up new cache
+                        self._state = copy.deepcopy(self.EMPTY_STATE)
+                except ValueError:
+                    # Cache file empty, set up new cache
+                    self._state = copy.deepcopy(self.EMPTY_STATE)
+            # Load how many bytes were uploaded on previous run
+            with self._collection_lock:
+                self.bytes_written = self._collection_size(self._my_collection())
+        # No resume required
+        else:
+            with self._state_lock:
+                self._state = copy.deepcopy(self.EMPTY_STATE)
+
+    def _lock_file(self, fileobj):
+        try:
+            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+        except IOError:
+            raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+    def _save_state(self):
+        """
+        Atomically save current state into cache.
+        """
+        try:
+            with self._state_lock:
+                state = self._state
+            new_cache_fd, new_cache_name = tempfile.mkstemp(
+                dir=os.path.dirname(self._cache_filename))
+            self._lock_file(new_cache_fd)
+            new_cache = os.fdopen(new_cache_fd, 'r+')
+            json.dump(state, new_cache)
+            new_cache.flush()
+            os.fsync(new_cache)
+            os.rename(new_cache_name, self._cache_filename)
+        except (IOError, OSError, ResumeCacheConflict) as error:
+            self.logger.error("There was a problem while saving the cache file: {}".format(error))
+            try:
+                os.unlink(new_cache_name)
+            except NameError:  # mkstemp failed.
+                pass
+        else:
+            self._cache_file.close()
+            self._cache_file = new_cache
+
+    def collection_name(self):
+        with self._collection_lock:
+            name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
+        return name
+
+    def manifest_locator(self):
+        with self._collection_lock:
+            locator = self._my_collection().manifest_locator()
+        return locator
+
+    def portable_data_hash(self):
+        with self._collection_lock:
+            datahash = self._my_collection().portable_data_hash()
+        return datahash
+
+    def manifest_text(self, stream_name=".", strip=False, normalize=False):
+        with self._collection_lock:
+            manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
+        return manifest
+
+    def _datablocks_on_item(self, item):
+        """
+        Return a list of datablock locators, recursively navigating
+        through subcollections
+        """
+        if isinstance(item, arvados.arvfile.ArvadosFile):
+            if item.size() == 0:
+                # Empty file locator
+                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+            else:
+                locators = []
+                for segment in item.segments():
+                    loc = segment.locator
+                    locators.append(loc)
+                return locators
+        elif isinstance(item, arvados.collection.Collection):
+            l = [self._datablocks_on_item(x) for x in item.values()]
+            # Fast list flattener method taken from:
+            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
+            return [loc for sublist in l for loc in sublist]
+        else:
+            return None
+
+    def data_locators(self):
+        with self._collection_lock:
+            # Make sure all datablocks are flushed before getting the locators
+            self._my_collection().manifest_text()
+            datablocks = self._datablocks_on_item(self._my_collection())
+        return datablocks
 
 
 def expected_bytes_for(pathlist):
@@ -430,118 +698,62 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         print >>stderr, error
         sys.exit(1)
 
-    # write_copies diverges from args.replication here.
-    # args.replication is how many copies we will instruct Arvados to
-    # maintain (by passing it in collections().create()) after all
-    # data is written -- and if None was given, we'll use None there.
-    # Meanwhile, write_copies is how many copies of each data block we
-    # write to Keep, which has to be a number.
-    #
-    # If we simply changed args.replication from None to a default
-    # here, we'd end up erroneously passing the default replication
-    # level (instead of None) to collections().create().
-    write_copies = (args.replication or
-                    api_client._rootDesc.get('defaultCollectionReplication', 2))
-
     if args.progress:
         reporter = progress_writer(human_progress)
     elif args.batch_progress:
         reporter = progress_writer(machine_progress)
     else:
         reporter = None
-    bytes_expected = expected_bytes_for(args.paths)
-
-    resume_cache = None
-    if args.resume:
-        try:
-            resume_cache = ResumeCache(ResumeCache.make_path(args))
-            resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
-        except (IOError, OSError, ValueError):
-            pass  # Couldn't open cache directory/file.  Continue without it.
-        except ResumeCacheConflict:
-            print >>stderr, "\n".join([
-                "arv-put: Another process is already uploading this data.",
-                "         Use --no-resume if this is really what you want."])
-            sys.exit(1)
 
-    if resume_cache is None:
-        writer = ArvPutCollectionWriter(
-            resume_cache, reporter, bytes_expected,
-            num_retries=args.retries,
-            replication=write_copies)
-    else:
-        writer = ArvPutCollectionWriter.from_cache(
-            resume_cache, reporter, bytes_expected,
-            num_retries=args.retries,
-            replication=write_copies)
+    bytes_expected = expected_bytes_for(args.paths)
+    try:
+        writer = ArvPutUploadJob(paths = args.paths,
+                                 resume = args.resume,
+                                 filename = args.filename,
+                                 reporter = reporter,
+                                 bytes_expected = bytes_expected,
+                                 num_retries = args.retries,
+                                 replication_desired = args.replication,
+                                 name = collection_name,
+                                 owner_uuid = project_uuid,
+                                 ensure_unique_name = True)
+    except ResumeCacheConflict:
+        print >>stderr, "\n".join([
+            "arv-put: Another process is already uploading this data.",
+            "         Use --no-resume if this is really what you want."])
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
                             for sigcode in CAUGHT_SIGNALS}
 
-    if writer.bytes_written > 0:  # We're resuming a previous upload.
+    if args.resume and writer.bytes_written > 0:
         print >>stderr, "\n".join([
                 "arv-put: Resuming previous upload from last checkpoint.",
                 "         Use the --no-resume option to start over."])
 
     writer.report_progress()
-    writer.do_queued_work()  # Do work resumed from cache.
-    for path in args.paths:  # Copy file data to Keep.
-        if path == '-':
-            writer.start_new_stream()
-            writer.start_new_file(args.filename)
-            r = sys.stdin.read(64*1024)
-            while r:
-                # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
-                # CollectionWriter.write().
-                super(arvados.collection.ResumableCollectionWriter, writer).write(r)
-                r = sys.stdin.read(64*1024)
-        elif os.path.isdir(path):
-            writer.write_directory_tree(
-                path, max_manifest_depth=args.max_manifest_depth)
-        else:
-            writer.start_new_stream()
-            writer.write_file(path, args.filename or os.path.basename(path))
-    writer.finish_current_stream()
-
+    output = None
+    writer.start()
     if args.progress:  # Print newline to split stderr from stdout for humans.
         print >>stderr
 
-    output = None
     if args.stream:
-        output = writer.manifest_text()
         if args.normalize:
-            output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
+            output = writer.manifest_text(normalize=True)
+        else:
+            output = writer.manifest_text()
     elif args.raw:
         output = ','.join(writer.data_locators())
     else:
         try:
-            manifest_text = writer.manifest_text()
-            if args.normalize:
-                manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
-            replication_attr = 'replication_desired'
-            if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
-                # API called it 'redundancy' before #3410.
-                replication_attr = 'redundancy'
-            # Register the resulting collection in Arvados.
-            collection = api_client.collections().create(
-                body={
-                    'owner_uuid': project_uuid,
-                    'name': collection_name,
-                    'manifest_text': manifest_text,
-                    replication_attr: args.replication,
-                    },
-                ensure_unique_name=True
-                ).execute(num_retries=args.retries)
-
-            print >>stderr, "Collection saved as '%s'" % collection['name']
-
-            if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
-                output = collection['portable_data_hash']
+            writer.save_collection()
+            print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+            if args.portable_data_hash:
+                output = writer.portable_data_hash()
             else:
-                output = collection['uuid']
-
+                output = writer.manifest_locator()
         except apiclient_errors.Error as error:
             print >>stderr, (
                 "arv-put: Error creating Collection on project: {}.".format(
@@ -562,10 +774,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     if status != 0:
         sys.exit(status)
 
-    if resume_cache is not None:
-        resume_cache.destroy()
-
+    # Success!
+    writer.destroy_cache()
     return output
 
+
 if __name__ == '__main__':
     main()
index e64d91474170ce688780c3ab94ea3ae6bb69bbfb..7a0120c02814d00b27e81dd41fbb50e51ef2855c 100644 (file)
@@ -13,11 +13,15 @@ import tempfile
 import time
 import unittest
 import yaml
+import threading
+import hashlib
+import random
 
 from cStringIO import StringIO
 
 import arvados
 import arvados.commands.put as arv_put
+import arvados_testutil as tutil
 
 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 import run_test_server
@@ -234,66 +238,53 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
                           arv_put.ResumeCache, path)
 
 
-class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
-                                     ArvadosBaseTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+                          ArvadosBaseTestCase):
     def setUp(self):
-        super(ArvadosPutCollectionWriterTest, self).setUp()
+        super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
-        with tempfile.NamedTemporaryFile(delete=False) as cachefile:
-            self.cache = arv_put.ResumeCache(cachefile.name)
-            self.cache_filename = cachefile.name
+        # Temp files creation
+        self.tempdir = tempfile.mkdtemp()
+        subdir = os.path.join(self.tempdir, 'subdir')
+        os.mkdir(subdir)
+        data = "x" * 1024 # 1 KB
+        for i in range(1, 5):
+            with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+                f.write(data * i)
+        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+            f.write(data * 5)
+        # Large temp file for resume test
+        _, self.large_file_name = tempfile.mkstemp()
+        fileobj = open(self.large_file_name, 'w')
+        # Make sure to write just a little more than one block
+        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+            fileobj.write(data)
+        fileobj.close()
+        self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
 
     def tearDown(self):
-        super(ArvadosPutCollectionWriterTest, self).tearDown()
-        if os.path.exists(self.cache_filename):
-            self.cache.destroy()
-        self.cache.close()
-
-    def test_writer_caches(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        cwriter.write_file('/dev/null')
-        cwriter.cache_state()
-        self.assertTrue(self.cache.load())
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+        super(ArvPutUploadJobTest, self).tearDown()
+        shutil.rmtree(self.tempdir)
+        os.unlink(self.large_file_name)
 
     def test_writer_works_without_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter()
-        cwriter.write_file('/dev/null')
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
-    def test_writer_resumes_from_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-            self.assertEqual(
-                ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
-                new_writer.manifest_text())
-
-    def test_new_writer_from_stale_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        with self.make_test_file() as testfile:
-            cwriter.write_file(testfile.name, 'test')
-        new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        new_writer.write_file('/dev/null')
-        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
-    def test_new_writer_from_empty_cache(self):
-        cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-        cwriter.write_file('/dev/null')
+        cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+        cwriter.start()
         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
 
-    def test_writer_resumable_after_arbitrary_bytes(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache)
-        # These bytes are intentionally not valid UTF-8.
-        with self.make_test_file('\x00\x07\xe2') as testfile:
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(
-                self.cache)
-        self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
+    def test_writer_works_with_cache(self):
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            cwriter = arv_put.ArvPutUploadJob([f.name])
+            cwriter.start()
+            self.assertEqual(3, cwriter.bytes_written)
+            # Don't destroy the cache, and start another upload
+            cwriter_new = arv_put.ArvPutUploadJob([f.name])
+            cwriter_new.start()
+            cwriter_new.destroy_cache()
+            self.assertEqual(0, cwriter_new.bytes_written)
 
     def make_progress_tester(self):
         progression = []
@@ -302,24 +293,47 @@ class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
         return progression, record_func
 
     def test_progress_reporting(self):
-        for expect_count in (None, 8):
-            progression, reporter = self.make_progress_tester()
-            cwriter = arv_put.ArvPutCollectionWriter(
-                reporter=reporter, bytes_expected=expect_count)
-            with self.make_test_file() as testfile:
-                cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            self.assertIn((4, expect_count), progression)
-
-    def test_resume_progress(self):
-        cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
-        with self.make_test_file() as testfile:
-            # Set up a writer with some flushed bytes.
-            cwriter.write_file(testfile.name, 'test')
-            cwriter.finish_current_stream()
-            cwriter.cache_state()
-            new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
-            self.assertEqual(new_writer.bytes_written, 4)
+        with tempfile.NamedTemporaryFile() as f:
+            f.write('foo')
+            f.flush()
+            for expect_count in (None, 8):
+                progression, reporter = self.make_progress_tester()
+                cwriter = arv_put.ArvPutUploadJob([f.name],
+                    reporter=reporter, bytes_expected=expect_count)
+                cwriter.start()
+                cwriter.destroy_cache()
+                self.assertIn((3, expect_count), progression)
+
+    def test_writer_upload_directory(self):
+        cwriter = arv_put.ArvPutUploadJob([self.tempdir])
+        cwriter.start()
+        cwriter.destroy_cache()
+        self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+    def test_resume_large_file_upload(self):
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
+            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start()
+                self.assertLess(writer.bytes_written,
+                                os.path.getsize(self.large_file_name))
+        # Retry the upload
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1)
+        writer2.start()
+        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
@@ -420,9 +434,8 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
             os.chmod(cachedir, 0o700)
 
     def test_put_block_replication(self):
-        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
-             mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
-            cache_mock.side_effect = ValueError
+        self.call_main_on_test_file()
+        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
             self.call_main_on_test_file(['--replication', '1'])
             self.call_main_on_test_file(['--replication', '4'])
@@ -461,17 +474,16 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
                           ['--project-uuid', self.Z_UUID, '--stream'])
 
     def test_api_error_handling(self):
-        collections_mock = mock.Mock(name='arv.collections()')
-        coll_create_mock = collections_mock().create().execute
-        coll_create_mock.side_effect = arvados.errors.ApiError(
+        coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+        coll_save_mock.side_effect = arvados.errors.ApiError(
             fake_httplib2_response(403), '{}')
-        arv_put.api_client = arvados.api('v1')
-        arv_put.api_client.collections = collections_mock
-        with self.assertRaises(SystemExit) as exc_test:
-            self.call_main_with_args(['/dev/null'])
-        self.assertLess(0, exc_test.exception.args[0])
-        self.assertLess(0, coll_create_mock.call_count)
-        self.assertEqual("", self.main_stdout.getvalue())
+        with mock.patch('arvados.collection.Collection.save_new',
+                        new=coll_save_mock):
+            with self.assertRaises(SystemExit) as exc_test:
+                self.call_main_with_args(['/dev/null'])
+            self.assertLess(0, exc_test.exception.args[0])
+            self.assertLess(0, coll_save_mock.call_count)
+            self.assertEqual("", self.main_stdout.getvalue())
 
 
 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
index fb748e9350d01273fbd09f6709a733815b745d78..7728ce6d536e90080be8a92d2b51753827e5342e 100644 (file)
@@ -4,6 +4,9 @@ class Arvados::V1::ContainersController < ApplicationController
   accept_attribute_as_json :runtime_constraints, Hash
   accept_attribute_as_json :command, Array
 
+  skip_before_filter :find_object_by_uuid, only: [:current]
+  skip_before_filter :render_404_if_no_object, only: [:current]
+
   def auth
     if @object.locked_by_uuid != Thread.current[:api_client_authorization].uuid
       raise ArvadosModel::PermissionDeniedError.new("Not locked by your token")
@@ -29,4 +32,18 @@ class Arvados::V1::ContainersController < ApplicationController
     @object.unlock
     show
   end
+
+  def current
+    if Thread.current[:api_client_authorization].nil?
+      send_error("Not logged in", status: 401)
+    else
+      c = Container.where(auth_uuid: Thread.current[:api_client_authorization].uuid).first
+      if c.nil?
+        send_error("Token is not associated with a container.", status: 404)
+      else
+        @object = c
+        show
+      end
+    end
+  end
 end
index 3a16e30e9ec545840b3be592138a0b2aacf34694..b1ea9bd230a47e2382dbb12ac0c0d6bee6929588 100644 (file)
@@ -18,6 +18,7 @@ class Container < ArvadosModel
   validate :validate_state_change
   validate :validate_change
   validate :validate_lock
+  validate :validate_output
   after_validation :assign_auth
   before_save :sort_serialized_attrs
   after_save :handle_completed
@@ -186,7 +187,23 @@ class Container < ArvadosModel
   end
 
   def permission_to_update
-    current_user.andand.is_admin
+    # Override base permission check to allow auth_uuid to set progress and
+    # output (only).  Whether it is legal to set progress and output in the current
+    # state has already been checked in validate_change.
+    current_user.andand.is_admin ||
+      (!current_api_client_authorization.nil? and
+       [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
+  end
+
+  def ensure_owner_uuid_is_permitted
+    # Override base permission check to allow auth_uuid to set progress and
+    # output (only).  Whether it is legal to set progress and output in the current
+    # state has already been checked in validate_change.
+    if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
+      check_update_whitelist [:progress, :output]
+    else
+      super
+    end
   end
 
   def set_timestamps
@@ -213,7 +230,7 @@ class Container < ArvadosModel
       permitted.push :priority
 
     when Running
-      permitted.push :priority, :progress
+      permitted.push :priority, :progress, :output
       if self.state_changed?
         permitted.push :started_at
       end
@@ -240,20 +257,10 @@ class Container < ArvadosModel
   end
 
   def validate_lock
-    # If the Container is already locked by someone other than the
-    # current api_client_auth, disallow all changes -- except
-    # priority, which needs to change to reflect max(priority) of
-    # relevant ContainerRequests.
-    if locked_by_uuid_was
-      if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
-        check_update_whitelist [:priority]
-      end
-    end
-
     if [Locked, Running].include? self.state
       # If the Container was already locked, locked_by_uuid must not
       # changes. Otherwise, the current auth gets the lock.
-      need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
+      need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid
     else
       need_lock = nil
     end
@@ -269,6 +276,21 @@ class Container < ArvadosModel
     self.locked_by_uuid = need_lock
   end
 
+  def validate_output
+    # Output must exist and be readable by the current user.  This is so
+    # that a container cannot "claim" a collection that it doesn't otherwise
+    # have access to just by setting the output field to the collection PDH.
+    if output_changed?
+      c = Collection.
+          readable_by(current_user).
+          where(portable_data_hash: self.output).
+          first
+      if !c
+        errors.add :output, "collection must exist and be readable by current user."
+      end
+    end
+  end
+
   def assign_auth
     if self.auth_uuid_changed?
       return errors.add :auth_uuid, 'is readonly'
index 3638c726e9bf1118540243476169db913dbc7e58..f28390489dca3f42e14b7274407881009cc80b2f 100644 (file)
@@ -33,6 +33,7 @@ Server::Application.routes.draw do
         get 'auth', on: :member
         post 'lock', on: :member
         post 'unlock', on: :member
+        get 'current', on: :collection
       end
       resources :container_requests
       resources :jobs do
index de14838186b00e1a3aebde728387882ef96f7c2e..0b5baf3b9c7e9cf4ff89d2e784d679baee0a5ec7 100644 (file)
@@ -284,3 +284,17 @@ dispatch1:
   user: system_user
   api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
   expires_at: 2038-01-01 00:00:00
+
+running_container_auth:
+  uuid: zzzzz-gj3su-077z32aux8dg2s2
+  api_client: untrusted
+  user: active
+  api_token: 3kg6k6lzmp9kj6bpkcoxie963cmvjahbt2fod9zru30k1jqdmi
+  expires_at: 2038-01-01 00:00:00
+
+not_running_container_auth:
+  uuid: zzzzz-gj3su-077z32aux8dg2s3
+  api_client: untrusted
+  user: active
+  api_token: 4kg6k6lzmp9kj6bpkcoxie963cmvjahbt2fod9zru30k1jqdmj
+  expires_at: 2038-01-01 00:00:00
index 9f2f41030028f87308798321e2f111b9cd69193f..2272b0f4a041094455c6a06a979a0ed4947f531a 100644 (file)
@@ -566,6 +566,19 @@ collection_with_several_unsupported_file_types:
   manifest_text: ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file 0:0:file.bam\n"
   name: collection_with_several_unsupported_file_types
 
+collection_not_readable_by_active:
+  uuid: zzzzz-4zz18-cd42uwvy3neko21
+  portable_data_hash: bb89eb5140e2848d39b416daeef4ffc5+45
+  owner_uuid: zzzzz-tpzed-000000000000000
+  created_at: 2014-02-03T17:22:54Z
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  modified_at: 2014-02-03T17:22:54Z
+  updated_at: 2014-02-03T17:22:54Z
+  manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+  name: collection_not_readable_by_active
+
+
 # Test Helper trims the rest of the file
 
 # Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
index 29266d3ab8f50f87586086c8702c676c2d7a7cb1..d1f4c7bdc8fac1248aa80d3b0a88d674900cc8e8 100644 (file)
@@ -28,7 +28,7 @@ running:
   runtime_constraints:
     ram: 12000000000
     vcpus: 4
-  auth_uuid: zzzzz-gj3su-077z32aux8dg2s1
+  auth_uuid: zzzzz-gj3su-077z32aux8dg2s2
 
 running_older:
   uuid: zzzzz-dz642-runningcontain2
@@ -133,7 +133,7 @@ requester_container:
   runtime_constraints:
     ram: 12000000000
     vcpus: 4
-  auth_uuid: zzzzz-gj3su-077z32aux8dg2s1
+  auth_uuid: zzzzz-gj3su-077z32aux8dg2s3
 
 failed_container:
   uuid: zzzzz-dz642-failedcontainr1
index cf1f5765b4d2460ae854356088d15bc7c8061add..65a1a915da26398e89f41203472304d97ae0082b 100644 (file)
@@ -87,4 +87,23 @@ class Arvados::V1::ContainersControllerTest < ActionController::TestCase
       assert_equal state, Container.where(uuid: uuid).first.state
     end
   end
+
+  test 'get current container for token' do
+    authorize_with :running_container_auth
+    get :current
+    assert_response :success
+    assert_equal containers(:running).uuid, json_response['uuid']
+  end
+
+  test 'no container associated with token' do
+    authorize_with :dispatch1
+    get :current
+    assert_response 404
+  end
+
+  test 'try get current container, no token' do
+    get :current
+    assert_response 401
+  end
+
 end
index 172ba4986660916d2a29ba1193e9a53d27b6496c..b2b0d57df56f9fec8a899e4c2eb6edb53027cc24 100644 (file)
@@ -276,7 +276,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
   end
 
   [
-    ['active', 'zzzzz-dz642-runningcontainr'],
+    ['running_container_auth', 'zzzzz-dz642-runningcontainr'],
     ['active_no_prefs', nil],
   ].each do |token, expected|
     test "create as #{token} and expect requesting_container_uuid to be #{expected}" do
index 8894ed9d4c0e16dc32a8b5bbc95d0a1371d3f41f..4fd9f8e75931eff3d3f66e8fcddd4e65253cec7f 100644 (file)
@@ -5,13 +5,13 @@ class ContainerTest < ActiveSupport::TestCase
 
   DEFAULT_ATTRS = {
     command: ['echo', 'foo'],
-    container_image: 'img',
+    container_image: 'fa3c1a9cb6783f85f2ecda037e07b8c3+167',
     output_path: '/tmp',
     priority: 1,
     runtime_constraints: {"vcpus" => 1, "ram" => 1},
   }
 
-  REUSABLE_COMMON_ATTRS = {container_image: "test",
+  REUSABLE_COMMON_ATTRS = {container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
                            cwd: "test",
                            command: ["echo", "hello"],
                            output_path: "test",
@@ -22,16 +22,12 @@ class ContainerTest < ActiveSupport::TestCase
 
   def minimal_new attrs={}
     cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
+    cr.state = ContainerRequest::Committed
     act_as_user users(:active) do
       cr.save!
     end
-    c = Container.new DEFAULT_ATTRS.merge(attrs)
-    act_as_system_user do
-      c.save!
-      assert cr.update_attributes(container_uuid: c.uuid,
-                                  state: ContainerRequest::Committed,
-                                  ), show_errors(cr)
-    end
+    c = Container.find_by_uuid cr.container_uuid
+    assert_not_nil c
     return c, cr
   end
 
@@ -45,7 +41,7 @@ class ContainerTest < ActiveSupport::TestCase
 
   def check_illegal_modify c
     check_illegal_updates c, [{command: ["echo", "bar"]},
-                              {container_image: "img2"},
+                              {container_image: "arvados/apitestfixture:june10"},
                               {cwd: "/tmp2"},
                               {environment: {"FOO" => "BAR"}},
                               {mounts: {"FOO" => "BAR"}},
@@ -89,7 +85,7 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "Container serialized hash attributes sorted before save" do
     env = {"C" => 3, "B" => 2, "A" => 1}
-    m = {"F" => 3, "E" => 2, "D" => 1}
+    m = {"F" => {"kind" => 3}, "E" => {"kind" => 2}, "D" => {"kind" => 1}}
     rc = {"vcpus" => 1, "ram" => 1}
     c, _ = minimal_new(environment: env, mounts: m, runtime_constraints: rc)
     assert_equal c.environment.to_json, Container.deep_sort_hash(env).to_json
@@ -144,17 +140,28 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "find_reusable method should not select completed container when inconsistent outputs exist" do
     set_user_from_auth :active
-    common_attrs = REUSABLE_COMMON_ATTRS.merge({environment: {"var" => "complete"}})
+    common_attrs = REUSABLE_COMMON_ATTRS.merge({environment: {"var" => "complete"}, priority: 1})
     completed_attrs = {
       state: Container::Complete,
       exit_code: 0,
       log: 'ea10d51bcf88862dbcc36eb292017dfd+45',
     }
 
-    c_output1, _ = minimal_new(common_attrs)
-    c_output2, _ = minimal_new(common_attrs)
-
     set_user_from_auth :dispatch1
+
+    c_output1 = Container.create common_attrs
+    c_output2 = Container.create common_attrs
+
+    cr = ContainerRequest.new common_attrs
+    cr.state = ContainerRequest::Committed
+    cr.container_uuid = c_output1.uuid
+    cr.save!
+
+    cr = ContainerRequest.new common_attrs
+    cr.state = ContainerRequest::Committed
+    cr.container_uuid = c_output2.uuid
+    cr.save!
+
     c_output1.update_attributes!({state: Container::Locked})
     c_output1.update_attributes!({state: Container::Running})
     c_output1.update_attributes!(completed_attrs.merge({output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'}))
@@ -427,4 +434,59 @@ class ContainerTest < ActiveSupport::TestCase
 
     assert c.update_attributes(exit_code: 1, state: Container::Complete)
   end
+
+  test "locked_by_uuid can set output on running container" do
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    c.lock
+    c.update_attributes! state: Container::Running
+
+    assert_equal c.locked_by_uuid, Thread.current[:api_client_authorization].uuid
+
+    assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+    assert c.update_attributes! state: Container::Complete
+  end
+
+  test "auth_uuid can set output on running container, but not change container state" do
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    c.lock
+    c.update_attributes! state: Container::Running
+
+    Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+    Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
+    assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+
+    assert_raises ArvadosModel::PermissionDeniedError do
+      # auth_uuid cannot set container state
+      c.update_attributes state: Container::Complete
+    end
+  end
+
+  test "not allowed to set output that is not readable by current user" do
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    c.lock
+    c.update_attributes! state: Container::Running
+
+    Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+    Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
+
+    assert_raises ActiveRecord::RecordInvalid do
+      c.update_attributes! output: collections(:collection_not_readable_by_active).portable_data_hash
+    end
+  end
+
+  test "other token cannot set output on running container" do
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    c.lock
+    c.update_attributes! state: Container::Running
+
+    set_user_from_auth :not_running_container_auth
+    assert_raises ArvadosModel::PermissionDeniedError do
+      c.update_attributes! output: collections(:foo_file).portable_data_hash
+    end
+  end
+
 end
index ab3a9f524e68d137bf2ba371e2f8ba6a34e0ae21..8e5cdb1f3b20ef1fbd9ef6114689148d12f0e781 100644 (file)
@@ -567,6 +567,21 @@ func (runner *ContainerRunner) CaptureOutput() error {
                return nil
        }
 
+       if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+               // Output may have been set directly by the container, so
+               // refresh the container record to check.
+               err := runner.ArvClient.Get("containers", runner.Container.UUID,
+                       nil, &runner.Container)
+               if err != nil {
+                       return err
+               }
+               if runner.Container.Output != "" {
+                       // Container output is already set.
+                       runner.OutputPDH = &runner.Container.Output
+                       return nil
+               }
+       }
+
        if runner.HostOutputDir == "" {
                return nil
        }
index 7ac71cc486a92baf91464764bba69bd3a38da674..2fbbb4db97da2be13eeb5f4f37bc31ed2fb61c09 100644 (file)
@@ -69,6 +69,7 @@ type TestDockerClient struct {
        stop        chan bool
        cwd         string
        env         []string
+       api         *ArvTestClient
 }
 
 func NewTestDockerClient() *TestDockerClient {
@@ -527,6 +528,7 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT
        docker.RemoveImage(hwImageId, true)
 
        api = &ArvTestClient{Container: rec}
+       docker.api = api
        cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.statInterval = 100 * time.Millisecond
        am := &ArvMountCmdLine{}
@@ -957,3 +959,50 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
        c.Check(err, NotNil)
        c.Check(strings.Contains(err.Error(), "Unsupported mount kind 'collection' for stdout"), Equals, true)
 }
+
+func (s *TestSuite) TestFullRunWithAPI(c *C) {
+       os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
+       defer os.Unsetenv("ARVADOS_API_HOST")
+       api, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": "/bin",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {"API": true}
+}`, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
+               t.logWriter.Close()
+               t.finish <- dockerclient.WaitResult{ExitCode: 0}
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "test.arvados.org\n"), Equals, true)
+       c.Check(api.CalledWith("container.output", "d41d8cd98f00b204e9800998ecf8427e+0"), NotNil)
+}
+
+func (s *TestSuite) TestFullRunSetOutput(c *C) {
+       os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
+       defer os.Unsetenv("ARVADOS_API_HOST")
+       api, _ := FullRunHelper(c, `{
+    "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": "/bin",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {"API": true}
+}`, func(t *TestDockerClient) {
+               t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
+               t.logWriter.Close()
+               t.finish <- dockerclient.WaitResult{ExitCode: 0}
+       })
+
+       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+       c.Check(api.CalledWith("container.output", "d4ab34d3d4f8a72f5c4973051ae69fab+122"), NotNil)
+}
index 527e02728bdd711c2deb6ef19573d223c9101e0a..f32a5db3cc29e63d7b556adfa19d011f89ccd713 100644 (file)
@@ -511,6 +511,20 @@ class Operations(llfuse.Operations):
         self._filehandles[fh] = FileHandle(fh, p)
         self.inodes.touch(p)
 
+        # Normally, we will have received an "update" event if the
+        # parent collection is stale here. However, even if the parent
+        # collection hasn't changed, the manifest might have been
+        # fetched so long ago that the signatures on the data block
+        # locators have expired. Calling checkupdate() on all
+        # ancestors ensures the signatures will be refreshed if
+        # necessary.
+        while p.parent_inode in self.inodes:
+            if p == self.inodes[p.parent_inode]:
+                break
+            p = self.inodes[p.parent_inode]
+            self.inodes.touch(p)
+            p.checkupdate()
+
         _logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
 
         return fh
index b485037c8cdfe5b897d2de64f42aeaba9ba3ea20..c79b1314cd906bf0067b0b3fac3f23b4aa0b0809 100644 (file)
@@ -61,7 +61,6 @@ class IntegrationTest(unittest.TestCase):
     def setUp(self):
         self.mnt = tempfile.mkdtemp()
         run_test_server.authorize_with('active')
-        self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
     def tearDown(self):
         os.rmdir(self.mnt)
index 8e4510355d80d996a0f02a730945df20e56611e6..4c70ce98d3d36bb2d28f77b3ad30a5bbfda42238 100644 (file)
@@ -1151,37 +1151,3 @@ class FuseMagicTestPDHOnly(MountTestBase):
 
     def test_with_default_by_id(self):
         self.verify_pdh_only(skip_pdh_only=True)
-
-def _test_refresh_old_manifest(zzz):
-    fnm = 'zzzzz-8i9sb-0vsrcqi7whchuil.log.txt'
-    os.listdir(os.path.join(zzz))
-    time.sleep(3)
-    with open(os.path.join(zzz, fnm)) as f:
-        f.read()
-
-class TokenExpiryTest(MountTestBase):
-    def setUp(self):
-        super(TokenExpiryTest, self).setUp(local_store=False)
-
-    @unittest.skip("bug #10008")
-    @mock.patch('arvados.keep.KeepClient.get')
-    def runTest(self, mocked_get):
-        self.api._rootDesc = {"blobSignatureTtl": 2}
-        mnt = self.make_mount(fuse.CollectionDirectory, collection_record='zzzzz-4zz18-op4e2lbej01tcvu')
-        mocked_get.return_value = 'fake data'
-
-        old_exp = int(time.time()) + 86400*14
-        self.pool.apply(_test_refresh_old_manifest, (self.mounttmp,))
-        want_exp = int(time.time()) + 86400*14
-
-        got_loc = mocked_get.call_args[0][0]
-        got_exp = int(
-            re.search(r'\+A[0-9a-f]+@([0-9a-f]+)', got_loc).group(1),
-            16)
-        self.assertGreaterEqual(
-            got_exp, want_exp-2,
-            msg='now+2w = {:x}, but fuse fetched locator {} (old_exp {:x})'.format(
-                want_exp, got_loc, old_exp))
-        self.assertLessEqual(
-            got_exp, want_exp,
-            msg='server is not using the expected 2w TTL; test is ineffective')
diff --git a/services/fuse/tests/test_token_expiry.py b/services/fuse/tests/test_token_expiry.py
new file mode 100644 (file)
index 0000000..e082e52
--- /dev/null
@@ -0,0 +1,68 @@
+import apiclient
+import arvados
+import arvados_fuse
+import logging
+import mock
+import multiprocessing
+import os
+import re
+import sys
+import time
+import unittest
+
+from .integration_test import IntegrationTest
+
+logger = logging.getLogger('arvados.arv-mount')
+
+class TokenExpiryTest(IntegrationTest):
+    def setUp(self):
+        super(TokenExpiryTest, self).setUp()
+        self.test_start_time = time.time()
+        self.time_now = int(time.time())+1
+
+    def fake_time(self):
+        self.time_now += 1
+        return self.time_now
+
+    orig_open = arvados_fuse.Operations.open
+    def fake_open(self, operations, *args, **kwargs):
+        self.time_now += 86400*13
+        logger.debug('opening file at time=%f', self.time_now)
+        return self.orig_open(operations, *args, **kwargs)
+
+    @mock.patch.object(arvados_fuse.Operations, 'open', autospec=True)
+    @mock.patch('time.time')
+    @mock.patch('arvados.keep.KeepClient.get')
+    @IntegrationTest.mount(argv=['--mount-by-id', 'zzz'])
+    def test_refresh_old_manifest(self, mocked_get, mocked_time, mocked_open):
+        # This test (and associated behavior) is still not strong
+        # enough. We should ensure old tokens are never used even if
+        # blobSignatureTtl seconds elapse between open() and
+        # read(). See https://dev.arvados.org/issues/10008
+
+        mocked_get.return_value = 'fake data'
+        mocked_time.side_effect = self.fake_time
+        mocked_open.side_effect = self.fake_open
+
+        with mock.patch.object(self.mount.api, 'collections', wraps=self.mount.api.collections) as mocked_collections:
+            mocked_collections.return_value = mocked_collections()
+            with mock.patch.object(self.mount.api.collections(), 'get', wraps=self.mount.api.collections().get) as mocked_get:
+                self.pool_test(os.path.join(self.mnt, 'zzz'))
+
+        # open() several times here to make sure we don't reach our
+        # quota of mocked_get.call_count dishonestly (e.g., the first
+        # open causes 5 mocked_get, and the rest cause none).
+        self.assertEqual(8, mocked_open.call_count)
+        self.assertGreaterEqual(
+            mocked_get.call_count, 8,
+            'Not enough calls to collections().get(): expected 8, got {!r}'.format(
+                mocked_get.mock_calls))
+
+    @staticmethod
+    def _test_refresh_old_manifest(self, zzz):
+        uuid = 'zzzzz-4zz18-op4e2lbej01tcvu'
+        fnm = 'zzzzz-8i9sb-0vsrcqi7whchuil.log.txt'
+        os.listdir(os.path.join(zzz, uuid))
+        for _ in range(8):
+            with open(os.path.join(zzz, uuid, fnm)) as f:
+                f.read()
index d74b6d4c079b42f97b2242966b00aa0c086c31cf..474913170a624a618ab82bad57dd74c4ca790cf3 100755 (executable)
@@ -267,13 +267,14 @@ build() {
         echo "Could not find Dockerfile (expected it at $ARVBOX_DOCKER/Dockerfile.base)"
         exit 1
     fi
-    GITHEAD=$(cd $ARVBOX_DOCKER && git log --format=%H -n1 HEAD)
-    docker build --build-arg=arvados_version=$GITHEAD $NO_CACHE -t arvados/arvbox-base:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
     if docker --version |grep " 1\.[0-9]\." ; then
         # Docker version prior 1.10 require -f flag
         # -f flag removed in Docker 1.12
         FORCE=-f
     fi
+    GITHEAD=$(cd $ARVBOX_DOCKER && git log --format=%H -n1 HEAD)
+    docker build --build-arg=arvados_version=$GITHEAD $NO_CACHE -t arvados/arvbox-base:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
+    docker tag $FORCE arvados/arvbox-base:$GITHEAD arvados/arvbox-base:latest
     if test "$1" = localdemo -o "$1" = publicdemo ; then
         docker build $NO_CACHE -t arvados/arvbox-demo:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.demo" "$ARVBOX_DOCKER"
         docker tag $FORCE arvados/arvbox-demo:$GITHEAD arvados/arvbox-demo:latest