8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi...
authorTom Clegg <tom@curoverse.com>
Mon, 29 Feb 2016 21:00:39 +0000 (16:00 -0500)
committerTom Clegg <tom@curoverse.com>
Mon, 29 Feb 2016 21:00:39 +0000 (16:00 -0500)
Conflicts:
sdk/cli/bin/crunch-job

115 files changed:
apps/workbench/.gitignore
apps/workbench/Gemfile.lock
apps/workbench/app/controllers/actions_controller.rb
apps/workbench/app/controllers/application_controller.rb
apps/workbench/app/models/arvados_api_client.rb
apps/workbench/app/views/application/_breadcrumbs.html.erb
apps/workbench/app/views/application/_projects_tree_menu.html.erb
apps/workbench/app/views/application/_show_star.html.erb [new file with mode: 0644]
apps/workbench/app/views/application/star.js.erb [new file with mode: 0644]
apps/workbench/app/views/layouts/body.html.erb
apps/workbench/app/views/projects/_choose.html.erb
apps/workbench/app/views/projects/show.html.erb
apps/workbench/config/routes.rb
apps/workbench/test/controllers/projects_controller_test.rb
apps/workbench/test/integration/projects_test.rb
backports/python-gflags/fpm-info.sh [new file with mode: 0644]
backports/python-llfuse/fpm-info.sh
crunch_scripts/crunchutil/subst.py
crunch_scripts/run-command
doc/_includes/_install_git.liquid
doc/_includes/_install_redhat_postgres_auth.liquid [new file with mode: 0644]
doc/_includes/_install_ruby_and_bundler.liquid
doc/_includes/_install_runit.liquid [new file with mode: 0644]
doc/install/install-api-server.html.textile.liquid
doc/install/install-arv-git-httpd.html.textile.liquid
doc/install/install-compute-node.html.textile.liquid
doc/install/install-crunch-dispatch.html.textile.liquid
doc/install/install-keep-web.html.textile.liquid
doc/install/install-keepproxy.html.textile.liquid
doc/install/install-keepstore.html.textile.liquid
doc/install/install-shell-server.html.textile.liquid
doc/install/install-sso.html.textile.liquid
doc/install/install-workbench-app.html.textile.liquid
sdk/cli/bin/crunch-job
sdk/cli/test/binstub_clean_fail/mount
sdk/cli/test/test_crunch-job.rb
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/setup.py
sdk/perl/.gitignore [new file with mode: 0644]
sdk/python/arvados/commands/run.py
sdk/python/arvados/events.py
sdk/python/setup.py
services/api/.gitignore
services/api/Gemfile
services/api/Gemfile.lock
services/api/app/controllers/arvados/v1/api_client_authorizations_controller.rb
services/api/app/controllers/arvados/v1/nodes_controller.rb
services/api/app/models/api_client_authorization.rb
services/api/app/models/arvados_model.rb
services/api/db/migrate/20160208210629_add_uuid_to_api_client_authorization.rb [new file with mode: 0644]
services/api/db/migrate/20160209155729_add_uuid_to_api_token_search_index.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/fpm-info.sh
services/api/lib/crunch_dispatch.rb
services/api/lib/load_param.rb
services/api/test/fixtures/api_client_authorizations.yml
services/api/test/fixtures/groups.yml
services/api/test/fixtures/links.yml
services/api/test/fixtures/nodes.yml
services/api/test/functional/arvados/v1/api_client_authorizations_controller_test.rb
services/api/test/functional/arvados/v1/groups_controller_test.rb
services/api/test/functional/arvados/v1/nodes_controller_test.rb
services/api/test/functional/arvados/v1/query_test.rb [new file with mode: 0644]
services/crunchstat/crunchstat.go
services/fuse/arvados_fuse/command.py
services/fuse/arvados_fuse/fusedir.py
services/fuse/fpm-info.sh [new file with mode: 0644]
services/fuse/setup.py
services/fuse/tests/integration_test.py
services/fuse/tests/mount_test_base.py
services/fuse/tests/test_command_args.py
services/fuse/tests/test_exec.py [new file with mode: 0644]
services/keepstore/azure_blob_volume.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/s3_volume.go
services/keepstore/trash_worker.go
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go
services/keepstore/volume_unix.go
services/keepstore/volume_unix_test.go
services/login-sync/bin/arvados-login-sync
services/nodemanager/arvnodeman/clientactor.py
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/computenode/driver/__init__.py
services/nodemanager/arvnodeman/computenode/driver/ec2.py
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/nodelist.py
services/nodemanager/tests/test_clientactor.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_driver_azure.py
services/nodemanager/tests/test_computenode_driver_gce.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/testutil.py
tools/crunchstat-summary/MANIFEST.in
tools/crunchstat-summary/bin/crunchstat-summary
tools/crunchstat-summary/crunchstat_summary/__init__.py
tools/crunchstat-summary/crunchstat_summary/chartjs.js [new file with mode: 0644]
tools/crunchstat-summary/crunchstat_summary/chartjs.py [new file with mode: 0644]
tools/crunchstat-summary/crunchstat_summary/command.py
tools/crunchstat-summary/crunchstat_summary/reader.py [new file with mode: 0644]
tools/crunchstat-summary/crunchstat_summary/summarizer.py
tools/crunchstat-summary/setup.py
tools/crunchstat-summary/tests/crunchstat_error_messages.txt [new file with mode: 0644]
tools/crunchstat-summary/tests/logfile_20151204190335.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063411.txt.gz.report
tools/crunchstat-summary/tests/logfile_20151210063439.txt.gz.report
tools/crunchstat-summary/tests/test_examples.py

index 9bef02bbfda670595750fd99a4461005ce5b8f12..a27ac31580a1d6b5cc81ab47e60c8deb649a2f85 100644 (file)
@@ -36,3 +36,6 @@
 # Dev/test SSL certificates
 /self-signed.key
 /self-signed.pem
+
+# Generated git-commit.version file
+/git-commit.version
index 303d5830de18774db271edb7891397000e1fddc8..b4e2400beda11a9186dd5f4c04468638f237a517 100644 (file)
@@ -294,6 +294,3 @@ DEPENDENCIES
   therubyracer
   uglifier (>= 1.0.3)
   wiselinks
-
-BUNDLED WITH
-   1.10.6
index 58b8cdc54f018e6dae20ba7b9c182bfbaef909c0..28680df33f3cf4f5902d5abdc278c305011018f2 100644 (file)
@@ -238,6 +238,35 @@ You can try recreating the collection to get a copy with full provenance data."
     end
   end
 
+  # star / unstar the current project
+  def star
+    links = Link.where(tail_uuid: current_user.uuid,
+                       head_uuid: @object.uuid,
+                       link_class: 'star')
+
+    if params['status'] == 'create'
+      # create 'star' link if one does not already exist
+      if !links.andand.any?
+        dst = Link.new(owner_uuid: current_user.uuid,
+                       tail_uuid: current_user.uuid,
+                       head_uuid: @object.uuid,
+                       link_class: 'star',
+                       name: @object.uuid)
+        dst.save!
+      end
+    else # delete any existing 'star' links
+      if links.andand.any?
+        links.each do |link|
+          link.destroy
+        end
+      end
+    end
+
+    respond_to do |format|
+      format.js
+    end
+  end
+
   protected
 
   def derive_unique_filename filename, manifest_files
index db3d43040c416bef846a19d8ee0b4009a9e8f622..4c3d3f852eb2a737049f0a734e88de738a6f0b95 100644 (file)
@@ -89,13 +89,14 @@ class ApplicationController < ActionController::Base
     # exception here than in a template.)
     unless current_user.nil?
       begin
-        build_project_trees
+        my_starred_projects current_user
+        build_my_wanted_projects_tree current_user
       rescue ArvadosApiClient::ApiError
         # Fall back to the default-setting code later.
       end
     end
-    @my_project_tree ||= []
-    @shared_project_tree ||= []
+    @starred_projects ||= []
+    @my_wanted_projects_tree ||= []
     render_error(err_opts)
   end
 
@@ -444,6 +445,15 @@ class ApplicationController < ActionController::Base
     end
   end
 
+  helper_method :is_starred
+  def is_starred
+    links = Link.where(tail_uuid: current_user.uuid,
+               head_uuid: @object.uuid,
+               link_class: 'star')
+
+    return links.andand.any?
+  end
+
   protected
 
   helper_method :strip_token_from_path
@@ -833,27 +843,63 @@ class ApplicationController < ActionController::Base
     {collections: c, owners: own}
   end
 
-  helper_method :my_project_tree
-  def my_project_tree
-    build_project_trees
-    @my_project_tree
+  helper_method :my_starred_projects
+  def my_starred_projects user
+    return if @starred_projects
+    links = Link.filter([['tail_uuid', '=', user.uuid],
+                         ['link_class', '=', 'star'],
+                         ['head_uuid', 'is_a', 'arvados#group']]).select(%w(head_uuid))
+    uuids = links.collect { |x| x.head_uuid }
+    starred_projects = Group.filter([['uuid', 'in', uuids]]).order('name')
+    @starred_projects = starred_projects.results
+  end
+
+  # If there are more than 200 projects that are readable by the user,
+  # build the tree using only the top 200+ projects owned by the user,
+  # from the top three levels.
+  # That is: get toplevel projects under home, get subprojects of
+  # these projects, and so on until we hit the limit.
+  def my_wanted_projects user, page_size=100
+    return @my_wanted_projects if @my_wanted_projects
+
+    from_top = []
+    uuids = [user.uuid]
+    depth = 0
+    @too_many_projects = false
+    @reached_level_limit = false
+    while from_top.size <= page_size*2
+      current_level = Group.filter([['group_class','=','project'],
+                                    ['owner_uuid', 'in', uuids]])
+                      .order('name').limit(page_size*2)
+      break if current_level.results.size == 0
+      @too_many_projects = true if current_level.items_available > current_level.results.size
+      from_top.concat current_level.results
+      uuids = current_level.results.collect { |x| x.uuid }
+      depth += 1
+      if depth >= 3
+        @reached_level_limit = true
+        break
+      end
+    end
+    @my_wanted_projects = from_top
   end
 
-  helper_method :shared_project_tree
-  def shared_project_tree
-    build_project_trees
-    @shared_project_tree
+  helper_method :my_wanted_projects_tree
+  def my_wanted_projects_tree user, page_size=100
+    build_my_wanted_projects_tree user, page_size
+    [@my_wanted_projects_tree, @too_many_projects, @reached_level_limit]
   end
 
-  def build_project_trees
-    return if @my_project_tree and @shared_project_tree
-    parent_of = {current_user.uuid => 'me'}
-    all_projects.each do |ob|
+  def build_my_wanted_projects_tree user, page_size=100
+    return @my_wanted_projects_tree if @my_wanted_projects_tree
+
+    parent_of = {user.uuid => 'me'}
+    my_wanted_projects(user, page_size).each do |ob|
       parent_of[ob.uuid] = ob.owner_uuid
     end
-    children_of = {false => [], 'me' => [current_user]}
-    all_projects.each do |ob|
-      if ob.owner_uuid != current_user.uuid and
+    children_of = {false => [], 'me' => [user]}
+    my_wanted_projects(user, page_size).each do |ob|
+      if ob.owner_uuid != user.uuid and
           not parent_of.has_key? ob.owner_uuid
         parent_of[ob.uuid] = false
       end
@@ -877,11 +923,8 @@ class ApplicationController < ActionController::Base
       end
       paths
     end
-    @my_project_tree =
+    @my_wanted_projects_tree =
       sorted_paths.call buildtree.call(children_of, 'me')
-    @shared_project_tree =
-      sorted_paths.call({'Projects shared with me' =>
-                          buildtree.call(children_of, false)})
   end
 
   helper_method :get_object
index 4d549d194728eb00a9f3a2a01fd097d84955a16e..13d4a24c69cc5f7e687c47c0e95ed715ab9f5fa2 100644 (file)
@@ -89,7 +89,10 @@ class ArvadosApiClient
           @api_client.ssl_config.verify_mode = OpenSSL::SSL::VERIFY_NONE
         else
           # Use system CA certificates
-          @api_client.ssl_config.add_trust_ca('/etc/ssl/certs')
+          ["/etc/ssl/certs/ca-certificates.crt",
+           "/etc/pki/tls/certs/ca-bundle.crt"]
+            .select { |ca_path| File.readable?(ca_path) }
+            .each { |ca_path| @api_client.ssl_config.add_trust_ca(ca_path) }
         end
         if Rails.configuration.api_response_compression
           @api_client.transparent_gzip_decompression = true
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..3ef2aec17c35575013dd0be2ca51afe95953c8ba 100644 (file)
@@ -0,0 +1,69 @@
+      <nav class="navbar navbar-default breadcrumbs" role="navigation">
+        <ul class="nav navbar-nav navbar-left">
+          <li>
+            <a href="/">
+              <i class="fa fa-lg fa-fw fa-dashboard"></i>
+              Dashboard
+            </a>
+          </li>
+          <li class="dropdown">
+            <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="projects-menu">
+              Projects
+              <span class="caret"></span>
+            </a>
+            <ul class="dropdown-menu" style="min-width: 20em" role="menu">
+              <li role="menuitem">
+                  <%= link_to(
+                        url_for(
+                          action: 'choose',
+                          controller: 'search',
+                          filters: [['uuid', 'is_a', 'arvados#group']].to_json,
+                          title: 'Search',
+                          action_name: 'Show',
+                          action_href: url_for(controller: :actions, action: :show),
+                          action_method: 'get',
+                          action_data: {selection_param: 'uuid', success: 'redirect-to-created-object'}.to_json),
+                        { remote: true, method: 'get', title: "Search" }) do %>
+                    <i class="glyphicon fa-fw glyphicon-search"></i> Search all projects ...
+                  <% end %>
+               </li>
+              <% if Rails.configuration.anonymous_user_token and Rails.configuration.enable_public_projects_page %>
+                <li role="menuitem"><a href="/projects/public" role="menuitem"><i class="fa fa-fw fa-list"></i> Browse public projects </a>
+                </li>
+              <% end %>
+              <li role="menuitem">
+                <%= link_to projects_path(options: {ensure_unique_name: true}), role: 'menu-item', method: :post do %>
+                  <i class="fa fa-fw fa-plus"></i> Add a new project
+                <% end %>
+              </li>
+              <li role="presentation" class="divider"></li>
+              <%= render partial: "projects_tree_menu", locals: {
+                  :project_link_to => Proc.new do |pnode, &block|
+                    link_to(project_path(pnode[:object].uuid),
+                      data: { 'object-uuid' => pnode[:object].uuid,
+                              'name' => 'name' },
+                      &block)
+                  end,
+              } %>
+            </ul>
+          </li>
+          <% if @name_link or @object %>
+            <li class="nav-separator">
+              <i class="fa fa-lg fa-angle-double-right"></i>
+            </li>
+            <li>
+              <%= link_to project_path(current_user.uuid) do %>
+                Home
+              <% end %>
+            </li>
+            <% project_breadcrumbs.each do |p| %>
+              <li class="nav-separator">
+                <i class="fa fa-lg fa-angle-double-right"></i>
+              </li>
+              <li>
+                <%= link_to(p.name, project_path(p.uuid), data: {object_uuid: p.uuid, name: 'name'}) %>
+              </li>
+            <% end %>
+          <% end %>
+        </ul>
+      </nav>
index 77b9d45f93587e1d1102582f62a0bda24e32fb0c..a680c69ce314bb4bd02c290506f996faa464f38a 100644 (file)
@@ -1,3 +1,23 @@
+<% starred_projects = my_starred_projects current_user%>
+<% if starred_projects.andand.any? %>
+  <li role="presentation" class="dropdown-header">
+    My favorite projects
+  </li>
+  <li>
+    <%= project_link_to.call({object: current_user, depth: 0}) do %>
+      <span style="padding-left: 0">Home</span>
+    <% end %>
+  </li>
+  <% (starred_projects).each do |pnode| %>
+    <li>
+      <%= project_link_to.call({object: pnode, depth: 0}) do%>
+        <span style="padding-left: 0em"></span><%= pnode[:name] %>
+      <% end %>
+    </li>
+  <% end %>
+  <li role="presentation" class="divider"></li>
+<% end %>
+
 <li role="presentation" class="dropdown-header">
   My projects
 </li>
@@ -6,7 +26,8 @@
     <span style="padding-left: 0">Home</span>
   <% end %>
 </li>
-<% my_project_tree.each do |pnode| %>
+<% my_tree = my_wanted_projects_tree current_user %>
+<% my_tree[0].each do |pnode| %>
   <% next if pnode[:object].class != Group %>
   <li>
     <%= project_link_to.call pnode do %>
     <% end %>
   </li>
 <% end %>
+<% if my_tree[1] or my_tree[0].size > 200 %>
+<li role="presentation" class="dropdown-header">
+  Some projects have been omitted.
+</li>
+<% elsif my_tree[2] %>
+<li role="presentation" class="dropdown-header">
+  Showing top three levels of your projects.
+</li>
+<% end %>
diff --git a/apps/workbench/app/views/application/_show_star.html.erb b/apps/workbench/app/views/application/_show_star.html.erb
new file mode 100644 (file)
index 0000000..b32fd47
--- /dev/null
@@ -0,0 +1,9 @@
+<% if current_user and is_starred %>
+  <%= link_to(star_path(status: 'delete', id:@object.uuid, action_method: 'get'), style: "color:#D00", class: "btn btn-xs star-unstar", title: "Remove from list of favorites", remote: true) do  %>
+            <i class="fa fa-lg fa-star"></i>
+          <% end %>
+<% else %>
+  <%= link_to(star_path(status: 'create', id:@object.uuid, action_method: 'get'), class: "btn btn-xs star-unstar", title: "Add to list of favorites", remote: true) do %>
+            <i class="fa fa-lg fa-star-o"></i>
+          <% end %>
+<% end %>
diff --git a/apps/workbench/app/views/application/star.js.erb b/apps/workbench/app/views/application/star.js.erb
new file mode 100644 (file)
index 0000000..701c673
--- /dev/null
@@ -0,0 +1,2 @@
+$(".star-unstar").html("<%= escape_javascript(render partial: 'show_star') %>");
+$(".breadcrumbs").html("<%= escape_javascript(render partial: 'breadcrumbs') %>");
index abb79e932790502c007215b857c822ca0908d160..456f15f218cee86cc76098acb33f2fab9d78cc23 100644 (file)
     </nav>
 
     <% if current_user.andand.is_active %>
-      <nav class="navbar navbar-default breadcrumbs" role="navigation">
-        <ul class="nav navbar-nav navbar-left">
-          <li>
-            <a href="/">
-              <i class="fa fa-lg fa-fw fa-dashboard"></i>
-              Dashboard
-            </a>
-          </li>
-          <li class="dropdown">
-            <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="projects-menu">
-              Projects
-              <span class="caret"></span>
-            </a>
-            <ul class="dropdown-menu" style="min-width: 20em" role="menu">
-              <li role="menuitem">
-                  <%= link_to(
-                        url_for(
-                          action: 'choose',
-                          controller: 'search',
-                          filters: [['uuid', 'is_a', 'arvados#group']].to_json,
-                          title: 'Search',
-                          action_name: 'Show',
-                          action_href: url_for(controller: :actions, action: :show),
-                          action_method: 'get',
-                          action_data: {selection_param: 'uuid', success: 'redirect-to-created-object'}.to_json),
-                        { remote: true, method: 'get', title: "Search" }) do %>
-                    <i class="glyphicon fa-fw glyphicon-search"></i> Search all projects ...
-                  <% end %>
-               </li>
-              <% if Rails.configuration.anonymous_user_token and Rails.configuration.enable_public_projects_page %>
-                <li role="menuitem"><a href="/projects/public" role="menuitem"><i class="fa fa-fw fa-list"></i> Browse public projects </a>
-                </li>
-              <% end %>
-              <li role="menuitem">
-                <%= link_to projects_path(options: {ensure_unique_name: true}), role: 'menu-item', method: :post do %>
-                  <i class="fa fa-fw fa-plus"></i> Add a new project
-                <% end %>
-              </li>
-              <li role="presentation" class="divider"></li>
-              <%= render partial: "projects_tree_menu", locals: {
-                  :project_link_to => Proc.new do |pnode, &block|
-                    link_to(project_path(pnode[:object].uuid),
-                      data: { 'object-uuid' => pnode[:object].uuid,
-                              'name' => 'name' },
-                      &block)
-                  end,
-              } %>
-            </ul>
-          </li>
-          <% if @name_link or @object %>
-            <li class="nav-separator">
-              <i class="fa fa-lg fa-angle-double-right"></i>
-            </li>
-            <li>
-              <%= link_to project_path(current_user.uuid) do %>
-                Home
-              <% end %>
-            </li>
-            <% project_breadcrumbs.each do |p| %>
-              <li class="nav-separator">
-                <i class="fa fa-lg fa-angle-double-right"></i>
-              </li>
-              <li>
-                <%= link_to(p.name, project_path(p.uuid), data: {object_uuid: p.uuid, name: 'name'}) %>
-              </li>
-            <% end %>
-          <% end %>
-        </ul>
-      </nav>
+      <%= render partial: 'breadcrumbs' %>
     <% elsif !current_user %>   <%# anonymous %>
       <% if (@name_link or @object) and (project_breadcrumbs.any?) %>
         <nav class="navbar navbar-default breadcrumbs" role="navigation">
index c0759ed2e3ac1da813acd7afa744f62f543185a7..badaa24983f2640e60a2d2e8397d9842941f74ea 100644 (file)
 
       <div class="modal-body">
         <div class="selectable-container" style="height: 15em; overflow-y: scroll">
-          <% [my_project_tree, shared_project_tree].each do |tree| %>
-            <% tree.each do |projectnode| %>
-              <% if projectnode[:object].is_a? String %>
-                <div class="row" style="padding-left: <%= 1 + projectnode[:depth] %>em; margin-right: 0px">
-                  <i class="fa fa-fw fa-share-alt"></i>
-                  <%= projectnode[:object] %>
-                </div>
-              <% else
-                 row_selectable = !params[:editable] || projectnode[:object].editable?
-                 if projectnode[:object].uuid == current_user.uuid
-                   row_name = "Home"
-                   row_selectable = true
-                 else
-                   row_name = projectnode[:object].friendly_link_name || 'New project'
-                 end %>
-                <div class="<%= 'selectable project' if row_selectable %> row"
-                     style="padding-left: <%= 1 + projectnode[:depth] %>em; margin-right: 0px" data-object-uuid="<%= projectnode[:object].uuid %>">
-                  <i class="fa fa-fw fa-folder-o"></i> <%= row_name %>
-                </div>
-              <% end %>
+          <% starred_projects = my_starred_projects current_user%>
+          <% if starred_projects.andand.any? %>
+            <% writable_projects = starred_projects.select(&:editable?) %>
+            <% writable_projects.each do |projectnode| %>
+              <% row_name = projectnode.friendly_link_name || 'New project' %>
+              <div class="selectable project row"
+                   style="padding-left: 1em; margin-right: 0px"
+                   data-object-uuid="<%= projectnode.uuid %>">
+                <i class="fa fa-fw fa-folder-o"></i> <%= row_name %> <i class="fa fa-fw fa-star"></i>
+              </div>
             <% end %>
           <% end %>
+
+          <% my_projects = my_wanted_projects_tree(current_user) %>
+          <% my_projects[0].each do |projectnode| %>
+            <% if projectnode[:object].uuid == current_user.uuid
+                 row_name = "Home"
+               else
+                 row_name = projectnode[:object].friendly_link_name || 'New project'
+               end %>
+            <div class="selectable project row"
+                 style="padding-left: <%= 1 + projectnode[:depth] %>em; margin-right: 0px"
+                 data-object-uuid="<%= projectnode[:object].uuid %>">
+              <i class="fa fa-fw fa-folder-o"></i> <%= row_name %>
+            </div>
+          <% end %>
         </div>
+
+        <% if my_projects[1] or my_projects[2] or my_projects[0].size > 200 %>
+          <div>Some of your projects are omitted. Add projects of interest to favorites.</div>
+        <% end %>
       </div>
 
       <div class="modal-footer">
index 2a85da83214303fed625725dea1a334067691ee2..6033a3491051d657bfb470eb351f2df710edb90c 100644 (file)
@@ -3,6 +3,7 @@
     <% if @object.uuid == current_user.andand.uuid %>
       Home
     <% else %>
+      <%= render partial: "show_star" %>
       <%= render_editable_attribute @object, 'name', nil, { 'data-emptytext' => "New project" } %>
     <% end %>
   </h2>
index 10426099937e2bdda42b6f7fb0976c0ada764a51..fc72ea2222508db8b19d540b089deaf03f128df5 100644 (file)
@@ -12,6 +12,7 @@ ArvadosWorkbench::Application.routes.draw do
   get "users/setup" => 'users#setup', :as => :setup_user
   get "report_issue_popup" => 'actions#report_issue_popup', :as => :report_issue_popup
   post "report_issue" => 'actions#report_issue', :as => :report_issue
+  get "star" => 'actions#star', :as => :star
   resources :nodes
   resources :humans
   resources :traits
index 8fa9fe9a817e7f2e3b6494099b85afc53c05ac57..58914a84ac87b5b0949f07d634a826226a2b64af 100644 (file)
@@ -418,4 +418,83 @@ class ProjectsControllerTest < ActionController::TestCase
     }, session_for(:active)
     assert_select "#projects-menu + ul li.divider ~ li a[href=/projects/#{project_uuid}]"
   end
+
+  [
+    ["active", 5, ["aproject", "asubproject"], "anonymously_accessible_project"],
+    ["user1_with_load", 2, ["project_with_10_collections"], "project_with_2_pipelines_and_60_jobs"],
+    ["admin", 5, ["anonymously_accessible_project", "subproject_in_anonymous_accessible_project"], "aproject"],
+  ].each do |user, page_size, tree_segment, unexpected|
+    test "build my projects tree for #{user} user and verify #{unexpected} is omitted" do
+      use_token user
+      ctrl = ProjectsController.new
+
+      current_user = User.find(api_fixture('users')[user]['uuid'])
+
+      my_tree = ctrl.send :my_wanted_projects_tree, current_user, page_size
+
+      tree_segment_at_depth_1 = api_fixture('groups')[tree_segment[0]]
+      tree_segment_at_depth_2 = api_fixture('groups')[tree_segment[1]] if tree_segment[1]
+
+      tree_nodes = {}
+      my_tree[0].each do |x|
+        tree_nodes[x[:object]['uuid']] = x[:depth]
+      end
+
+      assert_equal(1, tree_nodes[tree_segment_at_depth_1['uuid']])
+      assert_equal(2, tree_nodes[tree_segment_at_depth_2['uuid']]) if tree_segment[1]
+
+      unexpected_project = api_fixture('groups')[unexpected]
+      assert_nil(tree_nodes[unexpected_project['uuid']])
+    end
+  end
+
+  [
+    ["active", 1],
+    ["project_viewer", 1],
+    ["admin", 0],
+  ].each do |user, size|
+    test "starred projects for #{user}" do
+      use_token user
+      ctrl = ProjectsController.new
+      current_user = User.find(api_fixture('users')[user]['uuid'])
+      my_starred_project = ctrl.send :my_starred_projects, current_user
+      assert_equal(size, my_starred_project.andand.size)
+
+      ctrl2 = ProjectsController.new
+      current_user = User.find(api_fixture('users')[user]['uuid'])
+      my_starred_project = ctrl2.send :my_starred_projects, current_user
+      assert_equal(size, my_starred_project.andand.size)
+    end
+  end
+
+  test "unshare project and verify that it is no longer included in shared user's starred projects" do
+    # remove sharing link
+    use_token :system_user
+    Link.find(api_fixture('links')['share_starred_project_with_project_viewer']['uuid']).destroy
+
+    # verify that project is no longer included in starred projects
+    use_token :project_viewer
+    current_user = User.find(api_fixture('users')['project_viewer']['uuid'])
+    ctrl = ProjectsController.new
+    my_starred_project = ctrl.send :my_starred_projects, current_user
+    assert_equal(0, my_starred_project.andand.size)
+
+    # share it again
+    @controller = LinksController.new
+    post :create, {
+      link: {
+        link_class: 'permission',
+        name: 'can_read',
+        head_uuid: api_fixture('groups')['starred_and_shared_active_user_project']['uuid'],
+        tail_uuid: api_fixture('users')['project_viewer']['uuid'],
+      },
+      format: :json
+    }, session_for(:system_user)
+
+    # verify that the project is again included in starred projects
+    use_token :project_viewer
+    ctrl = ProjectsController.new
+    my_starred_project = ctrl.send :my_starred_projects, current_user
+    assert_equal(1, my_starred_project.andand.size)
+  end
 end
index 64a547108bc3e50157a0bebed23392728c83ee45..01e84b1c0219d19551122356006f7081b0d42629 100644 (file)
@@ -39,7 +39,10 @@ class ProjectsTest < ActionDispatch::IntegrationTest
   test 'Create a project and move it into a different project' do
     visit page_with_token 'active', '/projects'
     find("#projects-menu").click
-    find(".dropdown-menu a", text: "Home").click
+    within('.dropdown-menu') do
+      first('li', text: 'Home').click
+    end
+    wait_for_ajax
     find('.btn', text: "Add a subproject").click
 
     within('h2') do
@@ -51,7 +54,10 @@ class ProjectsTest < ActionDispatch::IntegrationTest
 
     visit '/projects'
     find("#projects-menu").click
-    find(".dropdown-menu a", text: "Home").click
+    within('.dropdown-menu') do
+      first('li', text: 'Home').click
+    end
+    wait_for_ajax
     find('.btn', text: "Add a subproject").click
     within('h2') do
       find('.fa-pencil').click
@@ -709,4 +715,26 @@ class ProjectsTest < ActionDispatch::IntegrationTest
      assert page.has_text?('Unrestricted public data'), 'No text - Unrestricted public data'
      assert page.has_text?('An anonymously accessible project'), 'No text - An anonymously accessible project'
   end
+
+  test "test star and unstar project" do
+    visit page_with_token 'active', "/projects/#{api_fixture('groups')['anonymously_accessible_project']['uuid']}"
+
+    # add to favorites
+    find('.fa-star-o').click
+    wait_for_ajax
+
+    find("#projects-menu").click
+    within('.dropdown-menu') do
+      assert_selector 'li', text: 'Unrestricted public data'
+    end
+
+    # remove from favotires
+    find('.fa-star').click
+    wait_for_ajax
+
+    find("#projects-menu").click
+    within('.dropdown-menu') do
+      assert_no_selector 'li', text: 'Unrestricted public data'
+    end
+  end
 end
diff --git a/backports/python-gflags/fpm-info.sh b/backports/python-gflags/fpm-info.sh
new file mode 100644 (file)
index 0000000..67a989e
--- /dev/null
@@ -0,0 +1 @@
+fpm_args+=(-v 2.0)
index a7d9398701b7bc4c405027c26f5133fe7b23d383..327bc5e50f5793c6cb8a81a2f73117fac424e3be 100644 (file)
@@ -11,3 +11,6 @@ esac
 
 # FIXME: Remove this line after #6885 is done.
 fpm_args+=(--iteration 2)
+
+# FIXME: Remove once support for llfuse 0.42+ is in place
+fpm_args+=(-v 0.41.1)
index fad9b060ee65ea574d6c8a3a1f04e528559a6277..bd99d3c71cafc76392be14ec0b0b38973a7c11fc 100644 (file)
@@ -63,7 +63,7 @@ def sub_basename(v):
 def sub_glob(v):
     l = glob.glob(v)
     if len(l) == 0:
-        raise SubstitutionError("$(glob {}) no match fonud".format(v))
+        raise SubstitutionError("$(glob {}) no match found".format(v))
     else:
         return l[0]
 
index a6c5ef981ce35fb5e52f8bab9c6628c803e45872..74793d4fce40ae84a820a72db74b69a6c92f3ec4 100755 (executable)
@@ -397,12 +397,19 @@ try:
     active = 1
     pids = set([s.pid for s in subprocesses])
     while len(pids) > 0:
-        (pid, status) = os.wait()
-        pids.discard(pid)
-        if not taskp.get("task.ignore_rcode"):
-            rcode[pid] = (status >> 8)
+        try:
+            (pid, status) = os.wait()
+        except OSError as e:
+            if e.errno == errno.EINTR:
+                pass
+            else:
+                raise
         else:
-            rcode[pid] = 0
+            pids.discard(pid)
+            if not taskp.get("task.ignore_rcode"):
+                rcode[pid] = (status >> 8)
+            else:
+                rcode[pid] = 0
 
     if sig.sig is not None:
         logger.critical("terminating on signal %s" % sig.sig)
index 2ca1ce494434687a876befc4d73c75fc847a202e..60092c1ee8e4a1a66c0ab09552936479078c5b2b 100644 (file)
@@ -1,3 +1,9 @@
 {% include 'notebox_begin' %}
-Arvados requires git version 1.7.10 or later. If you are using an earlier version of git, please update your git version.
+The Arvados API and Git servers require Git 1.7.10 or later.  You can get this version on CentOS 6 from RepoForge.  "Install the repository":http://repoforge.org/use/, then run:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install --enablerepo=rpmforge-extras git</span>
+</code></pre>
+</notextile>
+
 {% include 'notebox_end' %}
diff --git a/doc/_includes/_install_redhat_postgres_auth.liquid b/doc/_includes/_install_redhat_postgres_auth.liquid
new file mode 100644 (file)
index 0000000..35f8b79
--- /dev/null
@@ -0,0 +1,11 @@
+{% include 'notebox_begin' %}
+
+If you are installing on CentOS 6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' authentication only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo sed -ri -e 's/^(host +all +all +(127\.0\.0\.1\/32|::1\/128) +)ident$/\1md5/' {{pg_hba_path}}</span>
+~$ <span class="userinput">sudo service {{pg_service}} restart</span>
+</code></pre>
+</notextile>
+
+{% include 'notebox_end' %}
index e79cffb7f86dd9adf5a622b231ceda25e057f4e4..cd1aeaf989caa1b67b775d75b1985cc6d2c1fcdf 100644 (file)
@@ -36,7 +36,7 @@ Install prerequisites for CentOS 6:
 <pre><code><span class="userinput">sudo yum install \
     libyaml-devel glibc-headers autoconf gcc-c++ glibc-devel \
     patch readline-devel zlib-devel libffi-devel openssl-devel \
-    automake libtool bison sqlite-devel
+    automake libtool bison sqlite-devel tar
 </span></code></pre></notextile>
 
 Install prerequisites for Ubuntu 12.04 or 14.04:
diff --git a/doc/_includes/_install_runit.liquid b/doc/_includes/_install_runit.liquid
new file mode 100644 (file)
index 0000000..8a1581c
--- /dev/null
@@ -0,0 +1,13 @@
+On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install runit</span>
+</code></pre>
+</notextile>
index fae06decb8ae14403e3afe92aa7e64057e0c2744..91e2c69892820fd2d81d7789bb85c205db4c4cb5 100644 (file)
@@ -52,11 +52,9 @@ Enter password for new role: <span class="userinput">paste-password-you-generate
 Enter it again: <span class="userinput">paste-password-again</span>
 </code></pre></notextile>
 
-{% include 'notebox_begin' %}
-
-This user setup assumes that your PostgreSQL is configured to accept password authentication.  Red Hat systems use ident-based authentication by default.  You may need to either adapt the user creation, or reconfigure PostgreSQL (in @pg_hba.conf@) to accept password authentication.
-
-{% include 'notebox_end' %}
+{% assign pg_hba_path = "/opt/rh/postgresql92/root/var/lib/pgsql/data/pg_hba.conf" %}
+{% assign pg_service = "postgresql92-postgresql" %}
+{% include 'install_redhat_postgres_auth' %}
 
 Create the database:
 
@@ -65,26 +63,13 @@ Create the database:
 </code></pre>
 </notextile>
 
-h2. Set up configuration files
-
-The API server package uses configuration files that you write to @/etc/arvados/api@ and ensures they're consistently deployed.  Create this directory and copy the example configuration files to it:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados/api</span>
-~$ <span class="userinput">sudo chmod 700 /etc/arvados/api</span>
-~$ <span class="userinput">cd /var/www/arvados-api/current</span>
-/var/www/arvados-api/current$ <span class="userinput">sudo cp config/database.yml.example /etc/arvados/api/database.yml</span>
-/var/www/arvados-api/current$ <span class="userinput">sudo cp config/application.yml.example /etc/arvados/api/application.yml</span>
-</code></pre>
-</notextile>
-
 h2. Configure the database connection
 
 Edit @/etc/arvados/api/database.yml@ and replace the @xxxxxxxx@ database password placeholders with the PostgreSQL password you generated above.
 
 h2(#configure_application). Configure the API server
 
-Edit @/etc/arvados/api/application.yml@ to configure the settings described in the following sections.  The deployment script will consistently deploy this to the API server's configuration directory.  The API server reads both @application.yml@ and its own @config/application.default.yml@ file.  The settings in @application.yml@ take precedence over the defaults that are defined in @config/application.default.yml@.  The @config/application.yml.example@ file is not read by the API server and is provided as a starting template only.
+Edit @/etc/arvados/api/application.yml@ to configure the settings described in the following sections.  The API server reads both @application.yml@ and its own @config/application.default.yml@ file.  The settings in @application.yml@ take precedence over the defaults that are defined in @config/application.default.yml@.  The @config/application.yml.example@ file is not read by the API server and is provided as a starting template only.
 
 @config/application.default.yml@ documents additional configuration settings not listed here.  You can "view the current source version":https://dev.arvados.org/projects/arvados/repository/revisions/master/entry/services/api/config/application.default.yml for reference.
 
@@ -205,7 +190,9 @@ For best performance, we recommend you use Nginx as your Web server front-end, w
 <ol>
 <li><a href="https://www.phusionpassenger.com/library/walkthroughs/deploy/ruby/ownserver/nginx/oss/install_passenger_main.html">Install Nginx and Phusion Passenger</a>.</li>
 
-<li><p>Puma is already included with the API server's gems.  We recommend you run it as a service under <a href="http://smarden.org/runit/">runit</a> or a similar tool.  Here's a sample runit script for that:</p>
+<li><p>Install runit to supervise the Puma daemon.  {% include 'install_runit' %}<notextile></p></li>
+
+<li><p>Install the script below as the run script for the Puma service, modifying it as directed by the comments.</p>
 
 <pre><code>#!/bin/bash
 
index 146dbe170b917ff45849ebc018324949dc789cb2..5e373c38b855bb3b2f27410d94b0a6835da07c18 100644 (file)
@@ -89,14 +89,14 @@ git@gitserver:~$ <span class="userinput">rm .ssh/authorized_keys</span>
 
 h2. Install gitolite
 
-Check "https://github.com/sitaramc/gitolite/tags":https://github.com/sitaramc/gitolite/tags for the latest stable version. This guide was tested with @v3.6.3@. _Versions below 3.0 are missing some features needed by Arvados, and should not be used._
+Check "https://github.com/sitaramc/gitolite/tags":https://github.com/sitaramc/gitolite/tags for the latest stable version. This guide was tested with @v3.6.4@. _Versions below 3.0 are missing some features needed by Arvados, and should not be used._
 
 Download and install the version you selected.
 
 <notextile>
 <pre><code>git@gitserver:~$ <span class="userinput">echo 'PATH=$HOME/bin:$PATH' &gt;.profile</span>
 git@gitserver:~$ <span class="userinput">source .profile</span>
-git@gitserver:~$ <span class="userinput">git clone --branch <b>v3.6.3</b> git://github.com/sitaramc/gitolite</span>
+git@gitserver:~$ <span class="userinput">git clone --branch <b>v3.6.4</b> https://github.com/sitaramc/gitolite</span>
 ...
 Note: checking out '5d24ae666bfd2fa9093d67c840eb8d686992083f'.
 ...
@@ -255,19 +255,13 @@ fatal: No REQUEST_METHOD from server
 
 h3. Enable arvados-git-httpd
 
-On Debian-based systems, install runit:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems, "install runit from source":http://smarden.org/runit/install.html or use an alternative daemon supervisor.
+Install runit to supervise the arvados-git-httpd daemon.  {% include 'install_runit' %}
 
 Configure runit to run arvados-git-httpd, making sure to update the API host to match your site:
 
 <notextile>
-<pre><code>~$ <span class="userinput">cd /etc/sv</span>
+<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
+~$ <span class="userinput">cd /etc/sv</span>
 /etc/sv$ <span class="userinput">sudo mkdir arvados-git-httpd; cd arvados-git-httpd</span>
 /etc/sv/arvados-git-httpd$ <span class="userinput">sudo mkdir log</span>
 /etc/sv/arvados-git-httpd$ <span class="userinput">sudo sh -c 'cat &gt;log/run' &lt;&lt;'EOF'
@@ -285,6 +279,7 @@ export PATH="$PATH:/var/lib/arvados/git/bin"
 exec chpst -u git:git arvados-git-httpd -address=:9001 -git-command=/var/lib/arvados/git/gitolite/src/gitolite-shell -repo-root=<b>/var/lib/arvados/git</b>/repositories 2&gt;&1
 EOF</span>
 /etc/sv/arvados-git-httpd$ <span class="userinput">sudo chmod +x run log/run</span>
+/etc/sv/arvados-git-httpd$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
 </code></pre>
 </notextile>
 
index 7d13f773a628c34db47fe5d9ba7f8b54e47eabfd..9a64ac76d79532d643a895a7df5d2f9971dfd2fc 100644 (file)
@@ -54,7 +54,7 @@ Install SLURM following "the same process you used to install the Crunch dispatc
 
 h2. Copy configuration files from the dispatcher (API server)
 
-The @/etc/slurm-llnl/slurm.conf@ and @/etc/munge/munge.key@ files need to be identicaly across the dispatcher and all compute nodes. Copy the files you created in the "Install the Crunch dispatcher":install-crunch-dispatch.html step to this compute node.
+The @slurm.conf@ and @/etc/munge/munge.key@ files need to be identical across the dispatcher and all compute nodes. Copy the files you created in the "Install the Crunch dispatcher":install-crunch-dispatch.html step to this compute node.
 
 h2. Configure FUSE
 
@@ -79,22 +79,16 @@ h2. Configure the Docker cleaner
 The arvados-docker-cleaner program removes least recently used docker images as needed to keep disk usage below a configured limit.
 
 {% include 'notebox_begin' %}
-This also removes all containers as soon as they exit, as if they were run with `docker run --rm`. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with `--remove-stopped-containers never`.
+This also removes all containers as soon as they exit, as if they were run with @docker run --rm@. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with @--remove-stopped-containers never@.
 {% include 'notebox_end' %}
 
-On Debian-based systems, install runit:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install runit</span>
-</code></pre>
-</notextile>
-
-On Red Hat-based systems, "install runit from source":http://smarden.org/runit/install.html or use an alternative daemon supervisor.
+Install runit to supervise the Docker cleaner daemon.  {% include 'install_runit' %}
 
 Configure runit to run the image cleaner using a suitable quota for your compute nodes and workload:
 
 <notextile>
-<pre><code>~$ <span class="userinput">cd /etc/sv</span>
+<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/sv</span>
+~$ <span class="userinput">cd /etc/sv</span>
 /etc/sv$ <span class="userinput">sudo mkdir arvados-docker-cleaner; cd arvados-docker-cleaner</span>
 /etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo mkdir log log/main</span>
 /etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo sh -c 'cat &gt;log/run' &lt;&lt;'EOF'
@@ -106,6 +100,7 @@ EOF</span>
 exec python3 -m arvados_docker.cleaner --quota <b>50G</b>
 EOF</span>
 /etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo chmod +x run log/run</span>
+/etc/sv/arvados-docker-cleaner$ <span class="userinput">sudo ln -s "$(pwd)" /etc/service/</span>
 </code></pre>
 </notextile>
 
@@ -152,8 +147,7 @@ if ! test -f /root/node.json ; then
 import arvados, json, socket
 fqdn = socket.getfqdn()
 hostname, _, domain = fqdn.partition('.')
-ip_address = socket.gethostbyname(fqdn)
-node = arvados.api('v1').nodes().create(body={'hostname': hostname, 'domain': domain, 'ip_address': ip_address}).execute()
+node = arvados.api('v1').nodes().create(body={'hostname': hostname, 'domain': domain}).execute()
 with open('/root/node.json', 'w') as node_file:
     json.dump(node, node_file, indent=2)
 EOF
@@ -183,4 +177,3 @@ And remove your token from the environment:
 </code>
 </pre>
 </notextile>
-
index 907f0fdf92b246bc7fdc691938f725376dd9db56..0e5be9411a28f435a738e694dd6c398b91ae2b1e 100644 (file)
@@ -58,9 +58,14 @@ On Debian-based systems:
 </code></pre>
 </notextile>
 
-On Red Hat-based systems, "install SLURM and munge from source following their installation guide":https://computing.llnl.gov/linux/slurm/quickstart_admin.html.
+On Red Hat-based systems:
 
-Now we need to give SLURM a configuration file in @/etc/slurm-llnl/slurm.conf@. Here's an example:
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install slurm munge slurm-munge</span>
+</code></pre>
+</notextile>
+
+Now we need to give SLURM a configuration file.  On Debian-based systems, this is installed at @/etc/slurm-llnl/slurm.conf@.  On Red Hat-based systems, this is installed at @/etc/slurm/slurm.conf@.  Here's an example @slurm.conf@:
 
 <notextile>
 <pre>
@@ -90,9 +95,8 @@ Waittime=0
 # SCHEDULING
 SchedulerType=sched/backfill
 SchedulerPort=7321
-SelectType=select/cons_res
-SelectTypeParameters=CR_CPU_Memory
-FastSchedule=1
+SelectType=select/linear
+FastSchedule=0
 #
 # LOGGING
 SlurmctldDebug=3
@@ -125,8 +129,8 @@ Whenever you change this file, you will need to update the copy _on every comput
 Each hostname in @slurm.conf@ must also resolve correctly on all SLURM worker nodes as well as the controller itself. Furthermore, the hostnames used in the configuration file must match the hostnames reported by @hostname@ or @hostname -s@ on the nodes themselves. This applies to the ControlMachine as well as the worker nodes.
 
 For example:
-* In @/etc/slurm-llnl/slurm.conf@ on control and worker nodes: @ControlMachine=uuid_prefix.your.domain@
-* In @/etc/slurm-llnl/slurm.conf@ on control and worker nodes: @NodeName=compute[0-255]@
+* In @slurm.conf@ on control and worker nodes: @ControlMachine=uuid_prefix.your.domain@
+* In @slurm.conf@ on control and worker nodes: @NodeName=compute[0-255]@
 * In @/etc/resolv.conf@ on control and worker nodes: @search uuid_prefix.your.domain@
 * On the control node: @hostname@ reports @uuid_prefix.your.domain@
 * On worker node 123: @hostname@ reports @compute123.uuid_prefix.your.domain@
@@ -163,7 +167,9 @@ To dispatch Arvados jobs:
 * @crunch-job@ needs the installation path of the Perl SDK in its @PERLLIB@.
 * @crunch-job@ needs the @ARVADOS_API_HOST@ (and, if necessary, @ARVADOS_API_HOST_INSECURE@) environment variable set.
 
-We recommend you run @crunch-dispatch.rb@ under "runit":http://smarden.org/runit/ or a similar supervisor.  Here's an example runit service script:
+Install runit to monitor the Crunch dispatch daemon.  {% include 'install_runit' %}
+
+Install the script below as the run script for the Crunch dispatch service, modifying it as directed by the comments.
 
 <notextile>
 <pre><code>#!/bin/sh
index c7a7b2007657d82b120bd191a5904bdd703d3936..16d23e6df56d2fb58cf38974a1e31dbda82d8915 100644 (file)
@@ -55,7 +55,9 @@ Usage of keep-web:
 {% assign railsout = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" %}
 If you intend to use Keep-web to serve public data to anonymous clients, configure it with an anonymous token. You can use the same one you used when you set up your Keepproxy server, or use the following command on the <strong>API server</strong> to create another. {% include 'install_rails_command' %}
 
-We recommend running Keep-web under "runit":https://packages.debian.org/search?keywords=runit or a similar supervisor. The basic command to start Keep-web is:
+Install runit to supervise the Keep-web daemon.  {% include 'install_runit' %}
+
+The basic command to start Keep-web in the service run script is:
 
 <notextile>
 <pre><code>export ARVADOS_API_HOST=<span class="userinput">uuid_prefix</span>.your.domain
index 14e5ed5741067e0550a450b9d3bf9f24cb264a83..a6bb5d4bd9aeb3a6d2b276d20e1fc1e0505bf2c9 100644 (file)
@@ -57,7 +57,9 @@ The Keepproxy server needs a token to talk to the API server.  On the <strong>AP
 
 h3. Set up the Keepproxy service
 
-We recommend you run Keepproxy under "runit":http://smarden.org/runit/ or a similar supervisor.  Make sure the launcher sets the envirnoment variables @ARVADOS_API_TOKEN@ (with the token you just generated), @ARVADOS_API_HOST@, and, if needed, @ARVADOS_API_HOST_INSECURE@.  The core keepproxy command to run is:
+Install runit to supervise the keepproxy daemon.  {% include 'install_runit' %}
+
+The run script for the keepproxy service should set the environment variables @ARVADOS_API_TOKEN@ (with the token you just generated), @ARVADOS_API_HOST@, and, if needed, @ARVADOS_API_HOST_INSECURE@.  The core keepproxy command to run is:
 
 <notextile>
 <pre><code>ARVADOS_API_TOKEN=<span class="userinput">{{railsout}}</span> ARVADOS_API_HOST=<span class="userinput">uuid_prefix.your.domain</span> exec keepproxy
@@ -77,7 +79,7 @@ upstream keepproxy {
 
 server {
   listen                <span class="userinput">[your public IP address]</span>:443 ssl;
-  server_name           keep.<span class="userinput">uuid_prefix</span>.your.domain
+  server_name           keep.<span class="userinput">uuid_prefix</span>.your.domain;
 
   proxy_connect_timeout 90s;
   proxy_read_timeout    300s;
index 2e7382bf1ebc642c34619a66ad6472fda18cc1fe..13dfaf6725d40e089759ec62058fcd63ff83c33b 100644 (file)
@@ -94,7 +94,9 @@ Equivalently:
 
 h3. Run keepstore as a supervised service
 
-We recommend running Keepstore under "runit":http://smarden.org/runit/ or something similar, using a run script like the following:
+Install runit to supervise the keepstore daemon.  {% include 'install_runit' %}
+
+Install this script as the run script for the keepstore service, modifying it as directed below.
 
 <notextile>
 <pre><code>#!/bin/sh
index ec4c9d46cc8ee7c98a097baa3ddad49f55bd760c..dd5995ffdde442c85f665cb5feae14fd9b0fe879 100644 (file)
@@ -12,7 +12,21 @@ Please follow the "API token guide":../user/reference/api-tokens.html to get API
 
 h2. Install the Ruby SDK and utilities
 
-If you're using RVM:
+First, install the curl development libraries necessary to build the Arvados Ruby SDK.  On Debian-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo apt-get install libcurl4-openssl-dev</span>
+</code></pre>
+</notextile>
+
+On Red Hat-based systems:
+
+<notextile>
+<pre><code>~$ <span class="userinput">sudo yum install libcurl-devel</span>
+</code></pre>
+</notextile>
+
+Next, install the arvados-cli Ruby gem.  If you're using RVM:
 
 <notextile>
 <pre><code>~$ <span class="userinput">sudo /usr/local/rvm/bin/rvm-exec default gem install arvados-cli</span>
index a3064038e85134f77697c8fb1c0cd43a7f370f64..aaa6211b461ca3296a6cab157c9ee49bf4488c65 100644 (file)
@@ -94,18 +94,9 @@ On a Red Hat-based system, we also need to initialize the database system:
 </code></pre>
 </notextile>
 
-{% include 'notebox_begin' %}
-
-If you are installing on CentOS6, you will need to modify PostgreSQL's configuration to allow password authentication for local users. The default configuration allows 'ident' only. The following commands will make the configuration change, and restart PostgreSQL for it to take effect.
-<br/>
-<notextile>
-<pre><code>~$ <span class="userinput">sudo sed -i -e "s/127.0.0.1\/32          ident/127.0.0.1\/32          md5/" /var/lib/pgsql/data/pg_hba.conf</span>
-~$ <span class="userinput">sudo sed -i -e "s/::1\/128               ident/::1\/128               md5/" /var/lib/pgsql/data/pg_hba.conf</span>
-~$ <span class="userinput">sudo service postgresql restart</span>
-</code></pre>
-</notextile>
-{% include 'notebox_end' %}
-
+{% assign pg_service = "postgresql" %}
+{% assign pg_hba_path = "/var/lib/pgsql/data/pg_hba.conf" %}
+{% include 'install_redhat_postgres_auth' %}
 
 Next, generate a new database password. Nobody ever needs to memorize it or type it, so make a strong one:
 
index 1fd525d404c348fda648d57898c8c4877fd0bbb1..5a60ca5484e49fe502617accf847988d9d815d81 100644 (file)
@@ -32,20 +32,9 @@ On a Red Hat-based system, install the following packages:
 </code></pre>
 </notextile>
 
-h2. Set up configuration files
-
-The Workbench server package uses configuration files that you write to @/etc/arvados/workbench@ and ensures they're consistently deployed.  Create this directory and copy the example configuration files to it:
-
-<notextile>
-<pre><code>~$ <span class="userinput">sudo mkdir -p /etc/arvados/workbench</span>
-~$ <span class="userinput">sudo chmod 700 /etc/arvados/workbench</span>
-~$ <span class="userinput">sudo cp /var/www/arvados-workbench/current/config/application.yml.example /etc/arvados/workbench/application.yml</span>
-</code></pre>
-</notextile>
-
 h2(#configure). Configure Workbench
 
-Edit @/etc/arvados/workbench/application.yml@ following the instructions below.  The deployment script will consistently deploy this to Workbench's configuration directory.  Workbench reads both @application.yml@ and its own @config/application.defaults.yml@ file.  Values in @application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@.  The @config/application.yml.example@ file is not read by Workbench and is provided for installation convenience only.
+Edit @/etc/arvados/workbench/application.yml@ following the instructions below.  Workbench reads both @application.yml@ and its own @config/application.defaults.yml@ file.  Values in @application.yml@ take precedence over the defaults that are defined in @config/application.defaults.yml@.  The @config/application.yml.example@ file is not read by Workbench and is provided for installation convenience only.
 
 Consult @config/application.default.yml@ for a full list of configuration options.  Always put your local configuration in @/etc/arvados/workbench/application.yml@&mdash;never edit @config/application.default.yml@.
 
@@ -98,7 +87,7 @@ For best performance, we recommend you use Nginx as your Web server front-end, w
 <li>If you're deploying on an older Red Hat-based distribution and installed Pythyon 2.7 from Software Collections, configure Nginx to use it:
 
 <pre><code>~$ <span class="userinput">sudo usermod --shell /bin/bash nginx</span>
-~$ <span class="userinput">sudo -u nginx sh -c 'echo "[[ -z \$PS1 && -e /opt/rh/python27/enable ]] && source /opt/rh/python27/enable" >>~/.bash_profile'</span>
+~$ <span class="userinput">sudo -u nginx sh -c 'echo "[[ -z \$PS1 ]] && source scl_source enable python27" >>~/.bash_profile'</span>
 </code></pre>
 
 </li>
index c8a1de9c653837ac4d7a059a5dce97483ccc6602..ca9db1dacdb162d73ee6eb53bb79d5ec2d550014 100755 (executable)
@@ -183,11 +183,12 @@ if (($Job || $local_job)->{docker_image_locator}) {
   $cmd = [$docker_bin, 'ps', '-q'];
 }
 Log(undef, "Sanity check is `@$cmd`");
-srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
-     $cmd,
-     {fork => 1});
-if ($? != 0) {
-  Log(undef, "Sanity check failed: ".exit_status_s($?));
+my ($exited, $stdout, $stderr) = srun_sync(
+  ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+  $cmd,
+  {label => "sanity check"});
+if ($exited != 0) {
+  Log(undef, "Sanity check failed: ".exit_status_s($exited));
   exit EX_TEMPFAIL;
 }
 Log(undef, "Sanity check OK");
@@ -386,28 +387,17 @@ my $nodelist = join(",", @node);
 my $git_tar_count = 0;
 
 if (!defined $no_clear_tmp) {
-  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
-  Log (undef, "Clean work dirs");
-
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    # Find FUSE mounts under $CRUNCH_TMP and unmount them.
-    # Then clean up work directories.
-    # TODO: When #5036 is done and widely deployed, we can limit mount's
-    # -t option to simply fuse.keep.
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
-  }
-  if ($?) {
-    Log(undef, "Clean work dirs: exit ".exit_status_s($?));
+  # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
+  # up work directories crunch_tmp/work, crunch_tmp/opt,
+  # crunch_tmp/src*.
+  #
+  # TODO: When #5036 is done and widely deployed, we can limit mount's
+  # -t option to simply fuse.keep.
+  my ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+    ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+    {label => "clean work dirs"});
+  if ($exited != 0) {
     exit(EX_RETRY_UNLOCKED);
   }
 }
@@ -415,41 +405,35 @@ if (!defined $no_clear_tmp) {
 # If this job requires a Docker image, install that.
 my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg);
 if ($docker_locator = $Job->{docker_image_locator}) {
+  Log (undef, "Install docker image $docker_locator");
   ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
   if (!$docker_hash)
   {
     croak("No Docker image hash found from locator $docker_locator");
   }
+  Log (undef, "docker image hash is $docker_hash");
   $docker_stream =~ s/^\.//;
   my $docker_install_script = qq{
 if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
 fi
 };
-  my $docker_pid = fork();
-  if ($docker_pid == 0)
-  {
-    srun (["srun", "--nodelist=" . join(',', @node)],
-          ["/bin/sh", "-ec", $docker_install_script]);
-    exit ($?);
-  }
-  while (1)
-  {
-    last if $docker_pid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($docker_pid);
-    select (undef, undef, undef, 0.1);
-  }
-  if ($? != 0)
+
+  my ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodelist=" . join(',', @node)],
+    ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
+    {label => "load docker image"});
+  if ($exited != 0)
   {
-    croak("Installing Docker image from $docker_locator exited "
-          .exit_status_s($?));
+    exit(EX_RETRY_UNLOCKED);
   }
 
   # Determine whether this version of Docker supports memory+swap limits.
-  srun(["srun", "--nodelist=" . $node[0]],
-       ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
-      {fork => 1});
-  $docker_limitmem = ($? == 0);
+  ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodelist=" . $node[0]],
+    [$docker_bin, 'run', '--help'],
+    {label => "check --memory-swap feature"});
+  $docker_limitmem = ($stdout =~ /--memory-swap/);
 
   # Find a non-root Docker user to use.
   # Tries the default user for the container, then 'crunch', then 'nobody',
@@ -459,20 +443,22 @@ fi
   # Docker containers.
   my @tryusers = ("", "crunch", "nobody");
   foreach my $try_user (@tryusers) {
+    my $label;
     my $try_user_arg;
     if ($try_user eq "") {
-      Log(undef, "Checking if container default user is not UID 0");
+      $label = "check whether default user is UID 0";
       $try_user_arg = "";
     } else {
-      Log(undef, "Checking if user '$try_user' is not UID 0");
+      $label = "check whether user '$try_user' is UID 0";
       $try_user_arg = "--user=$try_user";
     }
-    srun(["srun", "--nodelist=" . $node[0]],
-         ["/bin/sh", "-ec",
-          "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
-          " test \$a -ne 0"],
-         {fork => 1});
-    if ($? == 0) {
+    my ($exited, $stdout, $stderr) = srun_sync(
+      ["srun", "--nodelist=" . $node[0]],
+      ["/bin/sh", "-ec",
+       "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
+      {label => $label});
+    chomp($stdout);
+    if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
       $dockeruserarg = $try_user_arg;
       if ($try_user eq "") {
         Log(undef, "Container will run with default user");
@@ -662,11 +648,9 @@ if (!defined $git_archive) {
   }
 }
 else {
-  my $install_exited;
+  my $exited;
   my $install_script_tries_left = 3;
   for (my $attempts = 0; $attempts < 3; $attempts++) {
-    Log(undef, "Run install script on all workers");
-
     my @srunargs = ("srun",
                     "--nodelist=$nodelist",
                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
@@ -674,59 +658,21 @@ else {
                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
 
     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
-    my ($install_stderr_r, $install_stderr_w);
-    pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
-    set_nonblocking($install_stderr_r);
-    my $installpid = fork();
-    if ($installpid == 0)
-    {
-      close($install_stderr_r);
-      fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
-      open(STDOUT, ">&", $install_stderr_w);
-      open(STDERR, ">&", $install_stderr_w);
-      srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
-      exit (1);
-    }
-    close($install_stderr_w);
-    # Tell freeze_if_want_freeze how to kill the child, otherwise the
-    # "waitpid(installpid)" loop won't get interrupted by a freeze:
-    $proc{$installpid} = {};
-    my $stderr_buf = '';
-    # Track whether anything appears on stderr other than slurm errors
-    # ("srun: ...") and the "starting: ..." message printed by the
-    # srun subroutine itself:
+    my ($stdout, $stderr);
+    ($exited, $stdout, $stderr) = srun_sync(
+      \@srunargs, \@execargs,
+      {label => "run install script on all workers"},
+      $build_script . $git_archive);
+
     my $stderr_anything_from_script = 0;
-    my $match_our_own_errors = '^(srun: error: |starting: \[)';
-    while ($installpid != waitpid(-1, WNOHANG)) {
-      freeze_if_want_freeze ($installpid);
-      # Wait up to 0.1 seconds for something to appear on stderr, then
-      # do a non-blocking read.
-      my $bits = fhbits($install_stderr_r);
-      select ($bits, undef, $bits, 0.1);
-      if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
-      {
-        while ($stderr_buf =~ /^(.*?)\n/) {
-          my $line = $1;
-          substr $stderr_buf, 0, 1+length($line), "";
-          Log(undef, "stderr $line");
-          if ($line !~ /$match_our_own_errors/) {
-            $stderr_anything_from_script = 1;
-          }
-        }
-      }
-    }
-    delete $proc{$installpid};
-    $install_exited = $?;
-    close($install_stderr_r);
-    if (length($stderr_buf) > 0) {
-      if ($stderr_buf !~ /$match_our_own_errors/) {
+    for my $line (split(/\n/, $stderr)) {
+      if ($line !~ /^(srun: error: |starting: \[)/) {
         $stderr_anything_from_script = 1;
       }
-      Log(undef, "stderr $stderr_buf")
     }
 
-    Log (undef, "Install script exited ".exit_status_s($install_exited));
-    last if $install_exited == 0 || $main::please_freeze;
+    last if $exited == 0 || $main::please_freeze;
+
     # If the install script fails but doesn't print an error message,
     # the next thing anyone is likely to do is just run it again in
     # case it was a transient problem like "slurm communication fails
@@ -742,7 +688,7 @@ else {
     unlink($tar_filename);
   }
 
-  if ($install_exited != 0) {
+  if ($exited != 0) {
     croak("Giving up");
   }
 }
@@ -1012,11 +958,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     next;
   }
   shift @freeslot;
-  $proc{$childpid} = { jobstep => $id,
-                      time => time,
-                      slot => $childslot,
-                      jobstepname => "$job_id.$id.$childpid",
-                    };
+  $proc{$childpid} = {
+    jobstepidx => $id,
+    time => time,
+    slot => $childslot,
+    jobstepname => "$job_id.$id.$childpid",
+  };
   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
   $slot[$childslot]->{pid} = $childpid;
 
@@ -1063,6 +1010,9 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       update_progress_stats();
     }
+    if (!$gotsome) {
+      select (undef, undef, undef, 0.1);
+    }
     $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
                                         $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
@@ -1182,36 +1132,36 @@ sub update_progress_stats
 sub reapchildren
 {
   my $children_reaped = 0;
-
-  while((my $pid = waitpid (-1, WNOHANG)) > 0)
+  while ((my $pid = waitpid (-1, WNOHANG)) > 0)
   {
     my $childstatus = $?;
+
     my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
                     . "."
                     . $slot[$proc{$pid}->{slot}]->{cpu});
-    my $jobstepid = $proc{$pid}->{jobstep};
+    my $jobstepidx = $proc{$pid}->{jobstepidx};
 
     if (!WIFEXITED($childstatus))
     {
       # child did not exit (may be temporarily stopped)
-      Log ($jobstepid, "child $pid did not actually exit in reapchildren, ignoring for now.");
+      Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
       next;
     }
 
     $children_reaped++;
     my $elapsed = time - $proc{$pid}->{time};
-    my $Jobstep = $jobstep[$jobstepid];
+    my $Jobstep = $jobstep[$jobstepidx];
 
     my $exitvalue = $childstatus >> 8;
     my $exitinfo = "exit ".exit_status_s($childstatus);
     $Jobstep->{'arvados_task'}->reload;
     my $task_success = $Jobstep->{'arvados_task'}->{success};
 
-    Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
+    Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
 
     if (!defined $task_success) {
       # task did not indicate one way or the other --> fail
-      Log($jobstepid, sprintf(
+      Log($jobstepidx, sprintf(
             "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
             exit_status_s($childstatus)));
       $Jobstep->{'arvados_task'}->{success} = 0;
@@ -1235,24 +1185,24 @@ sub reapchildren
         # node is already suspected faulty and srun exited quickly
         if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
             $elapsed < 5) {
-          Log ($jobstepid, "blaming failure on suspect node " .
+          Log ($jobstepidx, "blaming failure on suspect node " .
                $slot[$proc{$pid}->{slot}]->{node}->{name});
           $temporary_fail ||= 1;
         }
         ban_node_by_slot($proc{$pid}->{slot});
       }
 
-      Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
-                               ++$Jobstep->{'failures'},
-                               $temporary_fail ? 'temporary' : 'permanent',
-                               $elapsed));
+      Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
+                                ++$Jobstep->{'failures'},
+                                $temporary_fail ? 'temporary' : 'permanent',
+                                $elapsed));
 
       if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
         # Give up on this task, and the whole job
         $main::success = 0;
       }
       # Put this task back on the todo queue
-      push @jobstep_todo, $jobstepid;
+      push @jobstep_todo, $jobstepidx;
       $Job->{'tasks_summary'}->{'failed'}++;
     }
     else
@@ -1261,20 +1211,20 @@ sub reapchildren
       $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
       $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
       $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
-      push @jobstep_done, $jobstepid;
-      Log ($jobstepid, "success in $elapsed seconds");
+      push @jobstep_done, $jobstepidx;
+      Log ($jobstepidx, "success in $elapsed seconds");
     }
     $Jobstep->{exitcode} = $childstatus;
     $Jobstep->{finishtime} = time;
     $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
     $Jobstep->{'arvados_task'}->save;
-    process_stderr ($jobstepid, $task_success);
-    Log ($jobstepid, sprintf("task output (%d bytes): %s",
-                             length($Jobstep->{'arvados_task'}->{output}),
-                             $Jobstep->{'arvados_task'}->{output}));
+    process_stderr_final ($jobstepidx);
+    Log ($jobstepidx, sprintf("task output (%d bytes): %s",
+                              length($Jobstep->{'arvados_task'}->{output}),
+                              $Jobstep->{'arvados_task'}->{output}));
 
-    close $reader{$jobstepid};
-    delete $reader{$jobstepid};
+    close $reader{$jobstepidx};
+    delete $reader{$jobstepidx};
     delete $slot[$proc{$pid}->{slot}]->{pid};
     push @freeslot, $proc{$pid}->{slot};
     delete $proc{$pid};
@@ -1313,7 +1263,10 @@ sub reapchildren
 sub check_refresh_wanted
 {
   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
-  if (@stat && $stat[9] > $latest_refresh) {
+  if (@stat &&
+      $stat[9] > $latest_refresh &&
+      # ...and we have actually locked the job record...
+      $job_id eq $Job->{'uuid'}) {
     $latest_refresh = scalar time;
     my $Job2 = api_call("jobs/get", uuid => $jobspec);
     for my $attr ('cancelled_at',
@@ -1351,7 +1304,7 @@ sub check_squeue
   # squeue check interval (15s) this should make the squeue check an
   # infrequent event.
   my $silent_procs = 0;
-  for my $js (map {$jobstep[$_->{jobstep}]} values %proc)
+  for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
   {
     if (!exists($js->{stderr_at}))
     {
@@ -1367,7 +1320,7 @@ sub check_squeue
   # use killem() on procs whose killtime is reached
   while (my ($pid, $procinfo) = each %proc)
   {
-    my $js = $jobstep[$procinfo->{jobstep}];
+    my $js = $jobstep[$procinfo->{jobstepidx}];
     if (exists $procinfo->{killtime}
         && $procinfo->{killtime} <= time
         && $js->{stderr_at} < $last_squeue_check)
@@ -1376,7 +1329,7 @@ sub check_squeue
       if ($js->{stderr_at}) {
         $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
       }
-      Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+      Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
       killem ($pid);
     }
   }
@@ -1413,7 +1366,6 @@ sub check_squeue
   # Check for child procs >60s old and not mentioned by squeue.
   while (my ($pid, $procinfo) = each %proc)
   {
-    my $js = $jobstep[$procinfo->{jobstep}];
     if ($procinfo->{time} < time - 60
         && $procinfo->{jobstepname}
         && !exists $ok{$procinfo->{jobstepname}}
@@ -1427,7 +1379,7 @@ sub check_squeue
       # error/delay has caused the task to die without notifying srun,
       # and we'll kill srun ourselves.
       $procinfo->{killtime} = time + 30;
-      Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+      Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
     }
   }
 }
@@ -1448,11 +1400,16 @@ sub readfrompipes
   my $gotsome = 0;
   my %fd_job;
   my $sel = IO::Select->new();
-  foreach my $job (keys %reader)
+  foreach my $jobstepidx (keys %reader)
   {
-    my $fd = $reader{$job};
+    my $fd = $reader{$jobstepidx};
     $sel->add($fd);
-    $fd_job{$fd} = $job;
+    $fd_job{$fd} = $jobstepidx;
+
+    if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
+      $sel->add($stdout_fd);
+      $fd_job{$stdout_fd} = $jobstepidx;
+    }
   }
   # select on all reader fds with 0.1s timeout
   my @ready_fds = $sel->can_read(0.1);
@@ -1461,66 +1418,80 @@ sub readfrompipes
     my $buf;
     if (0 < sysread ($fd, $buf, 65536))
     {
+      $gotsome = 1;
       print STDERR $buf if $ENV{CRUNCH_DEBUG};
-      my $job = $fd_job{$fd};
-      $jobstep[$job]->{stderr_at} = time;
-      $jobstep[$job]->{stderr} .= $buf;
+
+      my $jobstepidx = $fd_job{$fd};
+      if ($jobstep[$jobstepidx]->{stdout_r} == $fd) {
+        $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+        next;
+      }
+
+      $jobstep[$jobstepidx]->{stderr_at} = time;
+      $jobstep[$jobstepidx]->{stderr} .= $buf;
 
       # Consume everything up to the last \n
-      preprocess_stderr ($job);
+      preprocess_stderr ($jobstepidx);
 
-      if (length ($jobstep[$job]->{stderr}) > 16384)
+      if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
       {
         # If we get a lot of stderr without a newline, chop off the
         # front to avoid letting our buffer grow indefinitely.
-        substr ($jobstep[$job]->{stderr},
-                0, length($jobstep[$job]->{stderr}) - 8192) = "";
+        substr ($jobstep[$jobstepidx]->{stderr},
+                0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
       }
-      $gotsome = 1;
     }
   }
   return $gotsome;
 }
 
 
+# Consume all full lines of stderr for a jobstep. Everything after the
+# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
+# returning.
 sub preprocess_stderr
 {
-  my $job = shift;
+  my $jobstepidx = shift;
 
-  while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+  while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
     my $line = $1;
-    substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
-    Log ($job, "stderr $line");
+    substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
+    Log ($jobstepidx, "stderr $line");
     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /srun: error: Node failure on/) {
-      my $job_slot_index = $jobstep[$job]->{slotindex};
+    elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
+      # Skip the following tempfail checks if this srun proc isn't
+      # attached to a particular worker slot.
+    }
+    elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
+      my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
+      my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
       $slot[$job_slot_index]->{node}->{fail_count}++;
-      $jobstep[$job]->{tempfail} = 1;
+      $jobstep[$jobstepidx]->{tempfail} = 1;
       ban_node_by_slot($job_slot_index);
     }
     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
-      $jobstep[$job]->{tempfail} = 1;
-      ban_node_by_slot($jobstep[$job]->{slotindex});
+      $jobstep[$jobstepidx]->{tempfail} = 1;
+      ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
     }
     elsif ($line =~ /arvados\.errors\.Keep/) {
-      $jobstep[$job]->{tempfail} = 1;
+      $jobstep[$jobstepidx]->{tempfail} = 1;
     }
   }
 }
 
 
-sub process_stderr
+sub process_stderr_final
 {
-  my $job = shift;
-  my $task_success = shift;
-  preprocess_stderr ($job);
+  my $jobstepidx = shift;
+  preprocess_stderr ($jobstepidx);
 
   map {
-    Log ($job, "stderr $_");
-  } split ("\n", $jobstep[$job]->{stderr});
+    Log ($jobstepidx, "stderr $_");
+  } split ("\n", $jobstep[$jobstepidx]->{stderr});
+  $jobstep[$jobstepidx]->{stderr} = '';
 }
 
 sub fetch_block
@@ -1658,7 +1629,7 @@ sub killem
     }
     if (!exists $proc{$_}->{"sent_$sig"})
     {
-      Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+      Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
       kill $sig, $_;
       select (undef, undef, undef, 0.1);
       if ($sig == 2)
@@ -1782,16 +1753,21 @@ sub log_writer_is_active() {
   return $log_pipe_pid;
 }
 
-sub Log                                # ($jobstep_id, $logmessage)
+sub Log                                # ($jobstepidx, $logmessage)
 {
-  if ($_[1] =~ /\n/) {
+  my ($jobstepidx, $logmessage) = @_;
+  if ($logmessage =~ /\n/) {
     for my $line (split (/\n/, $_[1])) {
-      Log ($_[0], $line);
+      Log ($jobstepidx, $line);
     }
     return;
   }
   my $fh = select STDERR; $|=1; select $fh;
-  my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+  my $task_qseq = '';
+  if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
+    $task_qseq = $jobstepidx;
+  }
+  my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
   $message .= "\n";
   my $datetime;
@@ -1915,6 +1891,83 @@ sub freezeunquote
 }
 
 
+sub srun_sync
+{
+  my $srunargs = shift;
+  my $execargs = shift;
+  my $opts = shift || {};
+  my $stdin = shift;
+
+  my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
+  Log (undef, "$label: start");
+
+  my ($stderr_r, $stderr_w);
+  pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
+
+  my ($stdout_r, $stdout_w);
+  pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
+
+  my $srunpid = fork();
+  if ($srunpid == 0)
+  {
+    close($stderr_r);
+    close($stdout_r);
+    fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+    fcntl($stdout_w, F_SETFL, 0) or croak($!);
+    open(STDERR, ">&", $stderr_w);
+    open(STDOUT, ">&", $stdout_w);
+    srun ($srunargs, $execargs, $opts, $stdin);
+    exit (1);
+  }
+  close($stderr_w);
+  close($stdout_w);
+
+  set_nonblocking($stderr_r);
+  set_nonblocking($stdout_r);
+
+  # Add entries to @jobstep and %proc so check_squeue() and
+  # freeze_if_want_freeze() can treat it like a job task process.
+  push @jobstep, {
+    stderr => '',
+    stderr_at => 0,
+    stderr_captured => '',
+    stdout_r => $stdout_r,
+    stdout_captured => '',
+  };
+  my $jobstepidx = $#jobstep;
+  $proc{$srunpid} = {
+    jobstepidx => $jobstepidx,
+  };
+  $reader{$jobstepidx} = $stderr_r;
+
+  while ($srunpid != waitpid ($srunpid, WNOHANG)) {
+    my $busy = readfrompipes();
+    if (!$busy || ($latest_refresh + 2 < scalar time)) {
+      check_refresh_wanted();
+      check_squeue();
+    }
+    if (!$busy) {
+      select(undef, undef, undef, 0.1);
+    }
+    killem(keys %proc) if $main::please_freeze;
+  }
+  my $exited = $?;
+
+  1 while readfrompipes();
+  process_stderr_final ($jobstepidx);
+
+  Log (undef, "$label: exit ".exit_status_s($exited));
+
+  close($stdout_r);
+  close($stderr_r);
+  delete $proc{$srunpid};
+  delete $reader{$jobstepidx};
+
+  my $j = pop @jobstep;
+  return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+}
+
+
 sub srun
 {
   my $srunargs = shift;
index 961ac28a9f97888b8230e6bb27e9dabef31f58b0..52a7353f67a9e5f0040b31f083a5f719164a063c 100755 (executable)
@@ -1,3 +1,3 @@
 #!/bin/sh
 echo >&2 Failing mount stub was called
-exit 1
+exit 44
index 22d756a8c81f8ae64d8602925ab25fda9bbe156b..0fbff2e6de670da112461da20107fa5eeaae1e9b 100644 (file)
@@ -91,7 +91,7 @@ class TestCrunchJob < Minitest::Test
       tryjobrecord j, binstubs: ['clean_fail']
     end
     assert_match /Failing mount stub was called/, err
-    assert_match /Clean work dirs: exit 1\n$/, err
+    assert_match /clean work dirs: exit 44\n$/, err
     assert_equal SPECIAL_EXIT[:EX_RETRY_UNLOCKED], $?.exitstatus
   end
 
index 4198c34482ccd0f6fa54daa9e5a9d1d143db2ee1..8370e3d5e75a42e68fd73ee770c281b0388dd198 100644 (file)
@@ -8,6 +8,7 @@ import arvados.commands.run
 import cwltool.draft2tool
 import cwltool.workflow
 import cwltool.main
+from cwltool.process import shortname
 import threading
 import cwltool.docker
 import fnmatch
@@ -37,6 +38,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
         args = [image_name]
         if image_tag:
             args.append(image_tag)
+        logger.info("Uploading Docker image %s", ":".join(args))
         arvados.commands.keepdocker.main(args)
 
     return dockerRequirement["dockerImageId"]
@@ -144,11 +146,17 @@ class ArvadosJob(object):
                 "script_version": "master",
                 "script_parameters": {"tasks": [script_parameters]},
                 "runtime_constraints": runtime_constraints
-            }, find_or_create=kwargs.get("enable_reuse", True)).execute()
+            }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
 
             self.arvrunner.jobs[response["uuid"]] = self
 
-            logger.info("Job %s is %s", response["uuid"], response["state"])
+            self.arvrunner.pipeline["components"][self.name] = {"job": response}
+            self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+                                                                                     body={
+                                                                                         "components": self.arvrunner.pipeline["components"]
+                                                                                     }).execute(num_retries=self.arvrunner.num_retries)
+
+            logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
 
             if response["state"] in ("Complete", "Failed", "Cancelled"):
                 self.done(response)
@@ -156,8 +164,19 @@ class ArvadosJob(object):
             logger.error("Got error %s" % str(e))
             self.output_callback({}, "permanentFail")
 
+    def update_pipeline_component(self, record):
+        self.arvrunner.pipeline["components"][self.name] = {"job": record}
+        self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
+                                                                                 body={
+                                                                                    "components": self.arvrunner.pipeline["components"]
+                                                                                 }).execute(num_retries=self.arvrunner.num_retries)
 
     def done(self, record):
+        try:
+            self.update_pipeline_component(record)
+        except:
+            pass
+
         try:
             if record["state"] == "Complete":
                 processStatus = "success"
@@ -166,7 +185,8 @@ class ArvadosJob(object):
 
             try:
                 outputs = {}
-                outputs = self.collect_outputs("keep:" + record["output"])
+                if record["output"]:
+                    outputs = self.collect_outputs("keep:" + record["output"])
             except Exception as e:
                 logger.exception("Got exception while collecting job outputs:")
                 processStatus = "permanentFail"
@@ -188,7 +208,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
             if src not in self._pathmap:
                 ab = cwltool.pathmapper.abspath(src, basedir)
-                st = arvados.commands.run.statfile("", ab)
+                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
                 if kwargs.get("conformance_test"):
                     self._pathmap[src] = (src, ab)
                 elif isinstance(st, arvados.commands.run.UploadFile):
@@ -231,6 +251,7 @@ class ArvCwlRunner(object):
         self.cond = threading.Condition(self.lock)
         self.final_output = None
         self.uploaded = {}
+        self.num_retries = 4
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
@@ -241,22 +262,33 @@ class ArvCwlRunner(object):
     def output_callback(self, out, processStatus):
         if processStatus == "success":
             logger.info("Overall job status is %s", processStatus)
+            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                 body={"state": "Complete"}).execute(num_retries=self.num_retries)
+
         else:
             logger.warn("Overall job status is %s", processStatus)
+            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                 body={"state": "Failed"}).execute(num_retries=self.num_retries)
         self.final_output = out
 
+
     def on_message(self, event):
         if "object_uuid" in event:
                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
                     if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
-                        logger.info("Job %s is Running", event["object_uuid"])
+                        uuid = event["object_uuid"]
                         with self.lock:
-                            self.jobs[event["object_uuid"]].running = True
+                            j = self.jobs[uuid]
+                            logger.info("Job %s (%s) is Running", j.name, uuid)
+                            j.running = True
+                            j.update_pipeline_component(event["properties"]["new_attributes"])
                     elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
-                        logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
+                        uuid = event["object_uuid"]
                         try:
                             self.cond.acquire()
-                            self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
+                            j = self.jobs[uuid]
+                            logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+                            j.done(event["properties"]["new_attributes"])
                             self.cond.notify()
                         finally:
                             self.cond.release()
@@ -270,6 +302,10 @@ class ArvCwlRunner(object):
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
+        self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
+                                                                   "components": {},
+                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
index bcf6b963830aca8570545045ab112ee79aa8216d..65ae16b5158aebe388afc7f42e2e247f4e13733f 100644 (file)
@@ -30,8 +30,8 @@ setup(name='arvados-cwl-runner',
           'bin/arvados-cwl-runner'
       ],
       install_requires=[
-          'cwltool>=1.0.20151026181844',
-          'arvados-python-client>=0.1.20151023214338'
+          'cwltool>=1.0.20160129152024',
+          'arvados-python-client>=0.1.20160122132348'
       ],
       zip_safe=True,
       cmdclass={'egg_info': tagger},
diff --git a/sdk/perl/.gitignore b/sdk/perl/.gitignore
new file mode 100644 (file)
index 0000000..7c32f55
--- /dev/null
@@ -0,0 +1 @@
+install
index 5c8bced513c160dd64e2cdbf3f4433d72ce89fe6..ef39be81a4650cda86e20c6d13a7d23848398ecb 100644 (file)
@@ -81,7 +81,7 @@ def determine_project(root, current_user):
 # ArvFile() (file already exists in a collection), UploadFile() (file needs to
 # be uploaded to a collection), or simply returns prefix+fn (which yields the
 # original parameter string).
-def statfile(prefix, fn):
+def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
     absfn = os.path.abspath(fn)
     if os.path.exists(absfn):
         st = os.stat(absfn)
@@ -89,7 +89,7 @@ def statfile(prefix, fn):
             sp = os.path.split(absfn)
             (pdh, branch) = is_in_collection(sp[0], sp[1])
             if pdh:
-                return ArvFile(prefix, "$(file %s/%s)" % (pdh, branch))
+                return ArvFile(prefix, fnPattern % (pdh, branch))
             else:
                 # trim leading '/' for path prefix test later
                 return UploadFile(prefix, absfn[1:])
@@ -97,7 +97,7 @@ def statfile(prefix, fn):
             sp = os.path.split(absfn)
             (pdh, branch) = is_in_collection(sp[0], sp[1])
             if pdh:
-                return ArvFile(prefix, "$(dir %s/%s/)" % (pdh, branch))
+                return ArvFile(prefix, dirPattern % (pdh, branch))
 
     return prefix+fn
 
index 3132da3a0a2e271350c1a5418efa577703555231..df824a331ea41a2fd702587be9c5d2828884ffb5 100644 (file)
@@ -13,6 +13,7 @@ from ws4py.client.threadedclient import WebSocketClient
 
 _logger = logging.getLogger('arvados.events')
 
+
 class EventClient(WebSocketClient):
     def __init__(self, url, filters, on_event, last_log_id):
         ssl_options = {'ca_certs': arvados.util.ca_certs_path()}
@@ -29,23 +30,33 @@ class EventClient(WebSocketClient):
         self.filters = filters
         self.on_event = on_event
         self.last_log_id = last_log_id
-        self._closed_lock = threading.RLock()
-        self._closed = False
+        self._closing_lock = threading.RLock()
+        self._closing = False
+        self._closed = threading.Event()
 
     def opened(self):
         self.subscribe(self.filters, self.last_log_id)
 
+    def closed(self, code, reason=None):
+        self._closed.set()
+
     def received_message(self, m):
-        with self._closed_lock:
-            if not self._closed:
+        with self._closing_lock:
+            if not self._closing:
                 self.on_event(json.loads(str(m)))
 
-    def close(self, code=1000, reason=''):
-        """Close event client and wait for it to finish."""
+    def close(self, code=1000, reason='', timeout=0):
+        """Close event client and optionally wait for it to finish.
+
+        :timeout: is the number of seconds to wait for ws4py to
+        indicate that the connection has closed.
+        """
         super(EventClient, self).close(code, reason)
-        with self._closed_lock:
+        with self._closing_lock:
             # make sure we don't process any more messages.
-            self._closed = True
+            self._closing = True
+        # wait for ws4py to tell us the connection is closed.
+        self._closed.wait(timeout=timeout)
 
     def subscribe(self, filters, last_log_id=None):
         m = {"method": "subscribe", "filters": filters}
@@ -56,6 +67,7 @@ class EventClient(WebSocketClient):
     def unsubscribe(self, filters):
         self.send(json.dumps({"method": "unsubscribe", "filters": filters}))
 
+
 class PollClient(threading.Thread):
     def __init__(self, api, filters, on_event, poll_time, last_log_id):
         super(PollClient, self).__init__()
@@ -67,8 +79,9 @@ class PollClient(threading.Thread):
         self.on_event = on_event
         self.poll_time = poll_time
         self.daemon = True
-        self.stop = threading.Event()
         self.last_log_id = last_log_id
+        self._closing = threading.Event()
+        self._closing_lock = threading.RLock()
 
     def run(self):
         self.id = 0
@@ -83,7 +96,7 @@ class PollClient(threading.Thread):
 
         self.on_event({'status': 200})
 
-        while not self.stop.isSet():
+        while not self._closing.is_set():
             max_id = self.id
             moreitems = False
             for f in self.filters:
@@ -91,24 +104,38 @@ class PollClient(threading.Thread):
                 for i in items["items"]:
                     if i['id'] > max_id:
                         max_id = i['id']
-                    self.on_event(i)
+                    with self._closing_lock:
+                        if self._closing.is_set():
+                            return
+                        self.on_event(i)
                 if items["items_available"] > len(items["items"]):
                     moreitems = True
             self.id = max_id
             if not moreitems:
-                self.stop.wait(self.poll_time)
+                self._closing.wait(self.poll_time)
 
     def run_forever(self):
         # Have to poll here, otherwise KeyboardInterrupt will never get processed.
-        while not self.stop.is_set():
-            self.stop.wait(1)
+        while not self._closing.is_set():
+            self._closing.wait(1)
+
+    def close(self, code=None, reason=None, timeout=0):
+        """Close poll client and optionally wait for it to finish.
 
-    def close(self):
-        """Close poll client and wait for it to finish."""
+        If an :on_event: handler is running in a different thread,
+        first wait (indefinitely) for it to return.
 
-        self.stop.set()
+        After closing, wait up to :timeout: seconds for the thread to
+        finish the poll request in progress (if any).
+
+        :code: and :reason: are ignored. They are present for
+        interface compatibility with EventClient.
+        """
+
+        with self._closing_lock:
+            self._closing.set()
         try:
-            self.join()
+            self.join(timeout=timeout)
         except RuntimeError:
             # "join() raises a RuntimeError if an attempt is made to join the
             # current thread as that would cause a deadlock. It is also an
@@ -163,7 +190,10 @@ def subscribe(api, filters, on_event, poll_fallback=15, last_log_id=None):
         return _subscribe_websocket(api, filters, on_event, last_log_id)
 
     try:
-        return _subscribe_websocket(api, filters, on_event, last_log_id)
+        if not config.flag_is_true('ARVADOS_DISABLE_WEBSOCKETS'):
+            return _subscribe_websocket(api, filters, on_event, last_log_id)
+        else:
+            _logger.info("Using polling because ARVADOS_DISABLE_WEBSOCKETS is true")
     except Exception as e:
         _logger.warn("Falling back to polling after websocket error: %s" % e)
     p = PollClient(api, filters, on_event, poll_fallback, last_log_id)
index 98b7361e92e94a5da39fdfe446f92546020e0ab1..c547d87ab5c6def0d880ea5a685374f796ac02c5 100644 (file)
@@ -39,11 +39,12 @@ setup(name='arvados-python-client',
           ('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
       ],
       install_requires=[
+          'google-api-python-client==1.4.2',
+          'oauth2client==1.5.1',
           'ciso8601',
-          'google-api-python-client',
           'httplib2',
           'pycurl >=7.19.5.1, <7.21.5',
-          'python-gflags',
+          'python-gflags<3.0',
           'ws4py'
       ],
       test_suite='tests',
index 4ad5e10faa46b96222d4291596d2f47f686bf18c..29eb939002fa9dab98cb8feff4987d46151d0bc8 100644 (file)
@@ -28,3 +28,6 @@
 # Dev/test SSL certificates
 /self-signed.key
 /self-signed.pem
+
+# Generated git-commit.version file
+/git-commit.version
index 3b4330935c568a035178b594857f1e4c8916b37c..48998aad36d10f7630f24e574a4d278968dc7032 100644 (file)
@@ -74,7 +74,7 @@ gem 'faye-websocket'
 gem 'themes_for_rails'
 
 gem 'arvados', '>= 0.1.20150615153458'
-gem 'arvados-cli', '>= 0.1.20151023185755'
+gem 'arvados-cli', '>= 0.1.20151207150126'
 
 # pg_power lets us use partial indexes in schema.rb in Rails 3
 gem 'pg_power'
index b505b194b2ac35b220c970c9f9e6c45ebeedd9f4..ac6be5a522303fd7418ba69974165e3b2821e17f 100644 (file)
@@ -41,7 +41,7 @@ GEM
       google-api-client (~> 0.6.3, >= 0.6.3)
       json (~> 1.7, >= 1.7.7)
       jwt (>= 0.1.5, < 1.0.0)
-    arvados-cli (0.1.20151023190001)
+    arvados-cli (0.1.20151207150126)
       activesupport (~> 3.2, >= 3.2.13)
       andand (~> 1.3, >= 1.3.3)
       arvados (~> 0.1, >= 0.1.20150128223554)
@@ -228,7 +228,7 @@ DEPENDENCIES
   acts_as_api
   andand
   arvados (>= 0.1.20150615153458)
-  arvados-cli (>= 0.1.20151023185755)
+  arvados-cli (>= 0.1.20151207150126)
   coffee-rails (~> 3.2.0)
   database_cleaner
   factory_girl_rails
index f365a7fee8996e7c9ba51cdd11bc4d525c3e9193..2eb79c090dcd69a4e6e4d4157b0ea0f0d2de5afc 100644 (file)
@@ -56,7 +56,7 @@ class Arvados::V1::ApiClientAuthorizationsController < ApplicationController
         ((attr == 'scopes') and (operator == '=')) ? operand : nil
       })
       @filters.select! { |attr, operator, operand|
-        (attr == 'uuid') and (operator == '=')
+        ((attr == 'uuid') and (operator == '=')) || ((attr == 'api_token') and (operator == '='))
       }
     end
     if @where
@@ -74,14 +74,23 @@ class Arvados::V1::ApiClientAuthorizationsController < ApplicationController
   end
 
   def find_object_by_uuid
-    # Again, to make things easier for the client and our own routing,
-    # here we look for the api_token key in a "uuid" (POST) or "id"
-    # (GET) parameter.
-    @object = model_class.where('api_token=?', params[:uuid] || params[:id]).first
+    @object = model_class.where(uuid: (params[:uuid] || params[:id])).first
   end
 
   def current_api_client_is_trusted
     unless Thread.current[:api_client].andand.is_trusted
+      if params["action"] == "show"
+        if @object and @object['api_token'] == current_api_client_authorization.andand.api_token
+          return true
+        end
+      elsif params["action"] == "index" and @objects.andand.size == 1
+        filters = @filters.map{|f|f.first}.uniq
+        if ['uuid'] == filters
+          return true if @objects.first['api_token'] == current_api_client_authorization.andand.api_token
+        elsif ['api_token'] == filters
+          return true if @objects.first[:user_id] = current_user.id
+        end
+      end
       send_error('Forbidden: this API client cannot manipulate other clients\' access tokens.',
                  status: 403)
     end
index 4ab5695a2f1c607f22bec9bd1948d575c73fd29e..5e2404e62c4db63360ddf91f5a6f1c801c763ce3 100644 (file)
@@ -23,7 +23,7 @@ class Arvados::V1::NodesController < ApplicationController
         return render_not_found
       end
       ping_data = {
-        ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
+        ip: params[:local_ipv4] || request.remote_ip,
         ec2_instance_id: params[:instance_id]
       }
       [:ping_secret, :total_cpu_cores, :total_ram_mb, :total_scratch_mb]
index b9442d64e78bf888741e9e47d532f53d73059e22..c587e5830af41549c5bd637c7ffa9472bbf51017 100644 (file)
@@ -1,4 +1,5 @@
 class ApiClientAuthorization < ArvadosModel
+  include HasUuid
   include KindAndEtag
   include CommonApiTemplate
 
@@ -36,17 +37,6 @@ class ApiClientAuthorization < ArvadosModel
     self.user_id_changed?
   end
 
-  def uuid
-    self.api_token
-  end
-  def uuid=(x) end
-  def uuid_was
-    self.api_token_was
-  end
-  def uuid_changed?
-    self.api_token_changed?
-  end
-
   def modified_by_client_uuid
     nil
   end
index 35dd1a94c9d983b343fc6394370f03ca795ca896..6cd40a44585c6805278dd9d421c8495d5d66c1c7 100644 (file)
@@ -115,6 +115,10 @@ class ArvadosModel < ActiveRecord::Base
     ["#{table_name}.modified_at desc", "#{table_name}.uuid"]
   end
 
+  def self.unique_columns
+    ["id", "uuid"]
+  end
+
   # If current user can manage the object, return an array of uuids of
   # users and groups that have permission to write the object. The
   # first two elements are always [self.owner_uuid, current user's
diff --git a/services/api/db/migrate/20160208210629_add_uuid_to_api_client_authorization.rb b/services/api/db/migrate/20160208210629_add_uuid_to_api_client_authorization.rb
new file mode 100644 (file)
index 0000000..69da34c
--- /dev/null
@@ -0,0 +1,28 @@
+require 'has_uuid'
+
+class AddUuidToApiClientAuthorization < ActiveRecord::Migration
+  extend HasUuid::ClassMethods
+
+  def up
+    add_column :api_client_authorizations, :uuid, :string
+    add_index :api_client_authorizations, :uuid, :unique => true
+
+    prefix = Server::Application.config.uuid_prefix + '-' +
+             Digest::MD5.hexdigest('ApiClientAuthorization'.to_s).to_i(16).to_s(36)[-5..-1] + '-'
+
+    update_sql <<-EOS
+update api_client_authorizations set uuid = (select concat('#{prefix}',
+array_to_string(ARRAY (SELECT substring(api_token FROM (ceil(random()*36))::int FOR 1) FROM generate_series(1, 15)), '')
+));
+EOS
+
+    change_column_null :api_client_authorizations, :uuid, false
+  end
+
+  def down
+    if column_exists?(:api_client_authorizations, :uuid)
+      remove_index :api_client_authorizations, :uuid
+      remove_column :api_client_authorizations, :uuid
+    end
+  end
+end
diff --git a/services/api/db/migrate/20160209155729_add_uuid_to_api_token_search_index.rb b/services/api/db/migrate/20160209155729_add_uuid_to_api_token_search_index.rb
new file mode 100644 (file)
index 0000000..1bbc16a
--- /dev/null
@@ -0,0 +1,21 @@
+class AddUuidToApiTokenSearchIndex < ActiveRecord::Migration
+  def up
+    begin
+      remove_index :api_client_authorizations, :name => 'api_client_authorizations_search_index'
+    rescue
+    end
+    add_index :api_client_authorizations,
+              ["api_token", "created_by_ip_address", "last_used_by_ip_address", "default_owner_uuid", "uuid"],
+              name: "api_client_authorizations_search_index"
+  end
+
+  def down
+    begin
+      remove_index :api_client_authorizations, :name => 'api_client_authorizations_search_index'
+    rescue
+    end
+         add_index :api_client_authorizations,
+              ["api_token", "created_by_ip_address", "last_used_by_ip_address", "default_owner_uuid"],
+              name: "api_client_authorizations_search_index"
+  end
+end
index 0492c87e1a859df21063834741b7a0ff9c7701a5..e482e6e607b4141bbbc00f9b70352852be62ac90 100644 (file)
@@ -46,7 +46,8 @@ CREATE TABLE api_client_authorizations (
     default_owner_uuid character varying(255),
     scopes text DEFAULT '---
 - all
-'::text NOT NULL
+'::text NOT NULL,
+    uuid character varying(255) NOT NULL
 );
 
 
@@ -1414,7 +1415,7 @@ ALTER TABLE ONLY virtual_machines
 -- Name: api_client_authorizations_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX api_client_authorizations_search_index ON api_client_authorizations USING btree (api_token, created_by_ip_address, last_used_by_ip_address, default_owner_uuid);
+CREATE INDEX api_client_authorizations_search_index ON api_client_authorizations USING btree (api_token, created_by_ip_address, last_used_by_ip_address, default_owner_uuid, uuid);
 
 
 --
@@ -1529,6 +1530,13 @@ CREATE INDEX index_api_client_authorizations_on_expires_at ON api_client_authori
 CREATE INDEX index_api_client_authorizations_on_user_id ON api_client_authorizations USING btree (user_id);
 
 
+--
+-- Name: index_api_client_authorizations_on_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace: 
+--
+
+CREATE UNIQUE INDEX index_api_client_authorizations_on_uuid ON api_client_authorizations USING btree (uuid);
+
+
 --
 -- Name: index_api_clients_on_created_at; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
@@ -2568,4 +2576,8 @@ INSERT INTO schema_migrations (version) VALUES ('20151202151426');
 
 INSERT INTO schema_migrations (version) VALUES ('20151215134304');
 
-INSERT INTO schema_migrations (version) VALUES ('20151229214707');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20151229214707');
+
+INSERT INTO schema_migrations (version) VALUES ('20160208210629');
+
+INSERT INTO schema_migrations (version) VALUES ('20160209155729');
\ No newline at end of file
index aa3d819b56d53bb1e50ff9dfa13bb6bb9bff2fb3..38a9cde32765d7fa24b7d03bd29d579a377ded93 100644 (file)
@@ -5,6 +5,6 @@ case "$TARGET" in
         fpm_depends+=(libcurl-devel postgresql-devel)
         ;;
     debian* | ubuntu*)
-        fpm_depends+=(libcurl4-openssl-dev libpq-dev)
+        fpm_depends+=(libcurl-ssl-dev libpq-dev)
         ;;
 esac
index 05f85c7bb67f8c7863ed46fd793ad6a96fa2cf77..b59279e554643c5015fb371ec3774e34bc28f99f 100644 (file)
@@ -637,9 +637,8 @@ class CrunchDispatch
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
 
-    if exit_status == EXIT_RETRY_UNLOCKED
-      # The job failed because all of the nodes allocated to it
-      # failed.  Only this crunch-dispatch process can retry the job:
+    if exit_status == EXIT_RETRY_UNLOCKED or (exit_tempfail and @job_retry_counts.include? jobrecord.uuid)
+      # Only this crunch-dispatch process can retry the job:
       # it's already locked, and there's no way to put it back in the
       # Queued state.  Put it in our internal todo list unless the job
       # has failed this way excessively.
index 718aaeaf690be5bc5e61592849ad4009e3b69f52..d7b9bb7513899d477906738d09a5a23bc8e6095f 100644 (file)
@@ -111,7 +111,31 @@ module LoadParam
     # (e.g., [] or ['owner_uuid desc']), fall back on the default
     # orders to ensure repeating the same request (possibly with
     # different limit/offset) will return records in the same order.
-    @orders += model_class.default_orders
+    #
+    # Clean up the resulting list of orders such that no column
+    # uselessly appears twice (Postgres might not optimize this out
+    # for us) and no columns uselessly appear after a unique column
+    # (Postgres does not optimize this out for us; as of 9.2, "order
+    # by id, modified_at desc, uuid" is slow but "order by id" is
+    # fast).
+    orders_given_and_default = @orders + model_class.default_orders
+    order_cols_used = {}
+    @orders = []
+    orders_given_and_default.each do |order|
+      otablecol = order.split(' ')[0]
+
+      next if order_cols_used[otablecol]
+      order_cols_used[otablecol] = true
+
+      @orders << order
+
+      otable, ocol = otablecol.split('.')
+      if otable == table_name and model_class.unique_columns.include?(ocol)
+        # we already have a full ordering; subsequent entries would be
+        # superfluous
+        break
+      end
+    end
 
     case params[:select]
     when Array
index 7169ebdc8a3699b84df58d6f090c32f289546582..f99a9fb941f1b26f44d2d4b4035a28afd84fbc08 100644 (file)
@@ -1,24 +1,28 @@
 # Read about fixtures at http://api.rubyonrails.org/classes/ActiveRecord/Fixtures.html
 
 system_user:
+  uuid: zzzzz-gj3su-017z32aux8dg2s1
   api_client: untrusted
   user: system_user
   api_token: systemusertesttoken1234567890aoeuidhtnsqjkxbmwvzpy
   expires_at: 2038-01-01 00:00:00
 
 admin:
+  uuid: zzzzz-gj3su-027z32aux8dg2s1
   api_client: untrusted
   user: admin
   api_token: 4axaw8zxe0qm22wa6urpp5nskcne8z88cvbupv653y1njyi05h
   expires_at: 2038-01-01 00:00:00
 
 admin_trustedclient:
+  uuid: zzzzz-gj3su-037z32aux8dg2s1
   api_client: trusted_workbench
   user: admin
   api_token: 1a9ffdcga2o7cw8q12dndskomgs1ygli3ns9k2o9hgzgmktc78
   expires_at: 2038-01-01 00:00:00
 
 data_manager:
+  uuid: zzzzz-gj3su-047z32aux8dg2s1
   api_client: untrusted
   user: system_user
   api_token: 320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1
@@ -31,30 +35,35 @@ data_manager:
     - POST /arvados/v1/logs
 
 miniadmin:
+  uuid: zzzzz-gj3su-057z32aux8dg2s1
   api_client: untrusted
   user: miniadmin
   api_token: 2zb2y9pw3e70270te7oe3ewaantea3adyxjascvkz0zob7q7xb
   expires_at: 2038-01-01 00:00:00
 
 rominiadmin:
+  uuid: zzzzz-gj3su-067z32aux8dg2s1
   api_client: untrusted
   user: rominiadmin
   api_token: 5tsb2pc3zlatn1ortl98s2tqsehpby88wmmnzmpsjmzwa6payh
   expires_at: 2038-01-01 00:00:00
 
 active:
+  uuid: zzzzz-gj3su-077z32aux8dg2s1
   api_client: untrusted
   user: active
   api_token: 3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi
   expires_at: 2038-01-01 00:00:00
 
 active_trustedclient:
+  uuid: zzzzz-gj3su-087z32aux8dg2s1
   api_client: trusted_workbench
   user: active
   api_token: 27bnddk6x2nmq00a1e3gq43n9tsl5v87a3faqar2ijj8tud5en
   expires_at: 2038-01-01 00:00:00
 
 active_noscope:
+  uuid: zzzzz-gj3su-097z32aux8dg2s1
   api_client: untrusted
   user: active
   api_token: activenoscopeabcdefghijklmnopqrstuvwxyz12345678901
@@ -62,24 +71,28 @@ active_noscope:
   scopes: []
 
 project_viewer:
+  uuid: zzzzz-gj3su-107z32aux8dg2s1
   api_client: untrusted
   user: project_viewer
   api_token: projectviewertoken1234567890abcdefghijklmnopqrstuv
   expires_at: 2038-01-01 00:00:00
 
 project_viewer_trustedclient:
+  uuid: zzzzz-gj3su-117z32aux8dg2s1
   api_client: trusted_workbench
   user: project_viewer
   api_token: projectviewertrustedtoken1234567890abcdefghijklmno
   expires_at: 2038-01-01 00:00:00
 
 subproject_admin:
+  uuid: zzzzz-gj3su-127z32aux8dg2s1
   api_client: untrusted
   user: subproject_admin
   api_token: subprojectadmintoken1234567890abcdefghijklmnopqrst
   expires_at: 2038-01-01 00:00:00
 
 admin_vm:
+  uuid: zzzzz-gj3su-137z32aux8dg2s1
   api_client: untrusted
   user: admin
   api_token: adminvirtualmachineabcdefghijklmnopqrstuvwxyz12345
@@ -88,6 +101,7 @@ admin_vm:
   scopes: ["GET /arvados/v1/virtual_machines/zzzzz-2x53u-382brsig8rp3064/logins"]
 
 admin_noscope:
+  uuid: zzzzz-gj3su-147z32aux8dg2s1
   api_client: untrusted
   user: admin
   api_token: adminnoscopeabcdefghijklmnopqrstuvwxyz123456789012
@@ -95,6 +109,7 @@ admin_noscope:
   scopes: []
 
 active_all_collections:
+  uuid: zzzzz-gj3su-157z32aux8dg2s1
   api_client: untrusted
   user: active
   api_token: activecollectionsabcdefghijklmnopqrstuvwxyz1234567
@@ -102,6 +117,7 @@ active_all_collections:
   scopes: ["GET /arvados/v1/collections/", "GET /arvados/v1/keep_services/accessible"]
 
 active_userlist:
+  uuid: zzzzz-gj3su-167z32aux8dg2s1
   api_client: untrusted
   user: active
   api_token: activeuserlistabcdefghijklmnopqrstuvwxyz1234568900
@@ -109,6 +125,7 @@ active_userlist:
   scopes: ["GET /arvados/v1/users"]
 
 active_specimens:
+  uuid: zzzzz-gj3su-177z32aux8dg2s1
   api_client: untrusted
   user: active
   api_token: activespecimensabcdefghijklmnopqrstuvwxyz123456890
@@ -116,6 +133,7 @@ active_specimens:
   scopes: ["GET /arvados/v1/specimens/"]
 
 active_apitokens:
+  uuid: zzzzz-gj3su-187z32aux8dg2s1
   api_client: trusted_workbench
   user: active
   api_token: activeapitokensabcdefghijklmnopqrstuvwxyz123456789
@@ -124,6 +142,7 @@ active_apitokens:
            "POST /arvados/v1/api_client_authorizations"]
 
 active_readonly:
+  uuid: zzzzz-gj3su-197z32aux8dg2s1
   api_client: untrusted
   user: active
   api_token: activereadonlyabcdefghijklmnopqrstuvwxyz1234568790
@@ -131,12 +150,14 @@ active_readonly:
   scopes: ["GET /"]
 
 spectator:
+  uuid: zzzzz-gj3su-207z32aux8dg2s1
   api_client: untrusted
   user: spectator
   api_token: zw2f4gwx8hw8cjre7yp6v1zylhrhn3m5gvjq73rtpwhmknrybu
   expires_at: 2038-01-01 00:00:00
 
 spectator_specimens:
+  uuid: zzzzz-gj3su-217z32aux8dg2s1
   api_client: untrusted
   user: spectator
   api_token: spectatorspecimensabcdefghijklmnopqrstuvwxyz123245
@@ -145,42 +166,49 @@ spectator_specimens:
            "POST /arvados/v1/specimens"]
 
 inactive:
+  uuid: zzzzz-gj3su-227z32aux8dg2s1
   api_client: untrusted
   user: inactive
   api_token: 5s29oj2hzmcmpq80hx9cta0rl5wuf3xfd6r7disusaptz7h9m0
   expires_at: 2038-01-01 00:00:00
 
 inactive_uninvited:
+  uuid: zzzzz-gj3su-237z32aux8dg2s1
   api_client: untrusted
   user: inactive_uninvited
   api_token: 62mhllc0otp78v08e3rpa3nsmf8q8ogk47f7u5z4erp5gpj9al
   expires_at: 2038-01-01 00:00:00
 
 inactive_but_signed_user_agreement:
+  uuid: zzzzz-gj3su-247z32aux8dg2s1
   api_client: untrusted
   user: inactive_but_signed_user_agreement
   api_token: 64k3bzw37iwpdlexczj02rw3m333rrb8ydvn2qq99ohv68so5k
   expires_at: 2038-01-01 00:00:00
 
 expired:
+  uuid: zzzzz-gj3su-257z32aux8dg2s1
   api_client: untrusted
   user: active
   api_token: 2ym314ysp27sk7h943q6vtc378srb06se3pq6ghurylyf3pdmx
   expires_at: 1970-01-01 00:00:00
 
 expired_trustedclient:
+  uuid: zzzzz-gj3su-267z32aux8dg2s1
   api_client: trusted_workbench
   user: active
   api_token: 5hpni7izokzcatku2896xxwqdbt5ptomn04r6auc7fohnli82v
   expires_at: 1970-01-01 00:00:00
 
 valid_token_deleted_user:
+  uuid: zzzzz-gj3su-277z32aux8dg2s1
   api_client: trusted_workbench
   user_id: 1234567
   api_token: tewfa58099sndckyqhlgd37za6e47o6h03r9l1vpll23hudm8b
   expires_at: 2038-01-01 00:00:00
 
 anonymous:
+  uuid: zzzzz-gj3su-287z32aux8dg2s1
   api_client: untrusted
   user: anonymous
   api_token: 4kg6k6lzmp9kj4cpkcoxie964cmvjahbt4fod9zru44k4jqdmi
@@ -188,48 +216,56 @@ anonymous:
   scopes: ["GET /"]
 
 job_reader:
+  uuid: zzzzz-gj3su-297z32aux8dg2s1
   api_client: untrusted
   user: job_reader
   api_token: e99512cdc0f3415c2428b9758f33bdfb07bc3561b00e86e7e6
   expires_at: 2038-01-01 00:00:00
 
 active_no_prefs:
+  uuid: zzzzz-gj3su-307z32aux8dg2s1
   api_client: untrusted
   user: active_no_prefs
   api_token: 3kg612cdc0f3415c2428b9758f33bdfb07bc3561b00e86qdmi
   expires_at: 2038-01-01 00:00:00
 
 active_no_prefs_profile_no_getting_started_shown:
+  uuid: zzzzz-gj3su-317z32aux8dg2s1
   api_client: untrusted
   user: active_no_prefs_profile_no_getting_started_shown
   api_token: 3kg612cdc0f3415c242856758f33bdfb07bc3561b00e86qdmi
   expires_at: 2038-01-01 00:00:00
 
 active_no_prefs_profile_with_getting_started_shown:
+  uuid: zzzzz-gj3su-327z32aux8dg2s1
   api_client: untrusted
   user: active_no_prefs_profile_with_getting_started_shown
   api_token: 3kg612cdc0f3415c245786758f33bdfb07babcd1b00e86qdmi
   expires_at: 2038-01-01 00:00:00
 
 active_with_prefs_profile_no_getting_started_shown:
+  uuid: zzzzz-gj3su-337z32aux8dg2s1
   api_client: untrusted
   user: active_with_prefs_profile_no_getting_started_shown
   api_token: 3kg612cdc0f3415c245786758f33bdfb07befgh1b00e86qdmi
   expires_at: 2038-01-01 00:00:00
 
 user_foo_in_sharing_group:
+  uuid: zzzzz-gj3su-347z32aux8dg2s1
   api_client: untrusted
   user: user_foo_in_sharing_group
   api_token: 2p1pou8p4ls208mcbedeewlotghppenobcyrmyhq8pyf51xd8u
   expires_at: 2038-01-01 00:00:00
 
 user1_with_load:
+  uuid: zzzzz-gj3su-357z32aux8dg2s1
   api_client: untrusted
   user: user1_with_load
   api_token: 1234k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi
   expires_at: 2038-01-01 00:00:00
 
 fuse:
+  uuid: zzzzz-gj3su-367z32aux8dg2s1
   api_client: untrusted
   user: fuse
   api_token: 4nagbkv8eap0uok7pxm72nossq5asihls3yn5p4xmvqx5t5e7p
index f6b99a06617a860d9d4c6681b60c2861b426d4e1..4029846484d41a79acc2443246bae76a8c526fa3 100644 (file)
@@ -282,3 +282,15 @@ subproject_in_asubproject_with_same_name_as_one_in_active_user_home:
   name: Subproject to test owner uuid and name unique key violation upon removal
   description: "Removing this will result in name conflict with 'A project' in Home project and hence get renamed."
   group_class: project
+
+starred_and_shared_active_user_project:
+  uuid: zzzzz-j7d0g-starredshared01
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2014-04-21 15:37:48 -0400
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  modified_at: 2014-04-21 15:37:48 -0400
+  updated_at: 2014-04-21 15:37:48 -0400
+  name: Starred and shared active user project
+  description: Starred and shared active user project
+  group_class: project
index 925e4661248279b1543052fd8f8cc563e5efc8d8..7ed7f6bcf35636ad4ccc572b2da2e3044f6a277a 100644 (file)
@@ -920,3 +920,38 @@ empty_collection_name_in_fuse_user_home_project:
   properties: {}
   updated_at: 2014-08-06 22:11:51.242010312 Z
 
+star_project_for_active_user:
+  uuid: zzzzz-o0j2j-starredbyactive
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2014-01-24 20:42:26 -0800
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-000000000000000
+  modified_at: 2014-01-24 20:42:26 -0800
+  updated_at: 2014-01-24 20:42:26 -0800
+  tail_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  link_class: star
+  name: zzzzz-j7d0g-starredshared01
+  head_uuid: zzzzz-j7d0g-starredshared01
+  properties: {}
+
+share_starred_project_with_project_viewer:
+  uuid: zzzzz-o0j2j-sharewithviewer
+  owner_uuid: zzzzz-tpzed-000000000000000
+  tail_uuid: zzzzz-tpzed-projectviewer1a
+  link_class: permission
+  name: can_read
+  head_uuid: zzzzz-j7d0g-starredshared01
+
+star_shared_project_for_project_viewer:
+  uuid: zzzzz-o0j2j-starredbyviewer
+  owner_uuid: zzzzz-tpzed-projectviewer1a
+  created_at: 2014-01-24 20:42:26 -0800
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-000000000000000
+  modified_at: 2014-01-24 20:42:26 -0800
+  updated_at: 2014-01-24 20:42:26 -0800
+  tail_uuid: zzzzz-tpzed-projectviewer1a
+  link_class: star
+  name: zzzzz-j7d0g-starredshared01
+  head_uuid: zzzzz-j7d0g-starredshared01
+  properties: {}
index 23d2e1955eeee141e263a68f7aa101549e3037b1..489bb1d6605f86d622c824260d96ef89f63bd026 100644 (file)
@@ -80,3 +80,14 @@ new_with_custom_hostname:
   job_uuid: ~
   info:
     ping_secret: "abcdyi0x4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2"
+
+node_with_no_ip_address_yet:
+  uuid: zzzzz-7ekkf-nodenoipaddryet
+  owner_uuid: zzzzz-tpzed-000000000000000
+  hostname: noipaddr
+  slot_number: ~
+  last_ping_at: ~
+  first_ping_at: ~
+  job_uuid: ~
+  info:
+    ping_secret: "abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2"
index 8877719b5bd613673581a9b118c5cf3ae9e41f9e..5da9145a81e052b3ef5a471f672c7568399e428b 100644 (file)
@@ -23,6 +23,7 @@ class Arvados::V1::ApiClientAuthorizationsControllerTest < ActionController::Tes
     authorize_with :admin_trustedclient
     post :create_system_auth, scopes: '["test"]'
     assert_response :success
+    assert_not_nil JSON.parse(@response.body)['uuid']
   end
 
   test "prohibit create system auth with token from non-trusted client" do
@@ -66,4 +67,47 @@ class Arvados::V1::ApiClientAuthorizationsControllerTest < ActionController::Tes
       assert_found_tokens(auth, {filters: [['scopes', '=', scopes]]}, *expected)
     end
   end
+
+  [
+    [:admin, :admin, 200],
+    [:admin, :active, 403],
+    [:admin, :admin_vm, 403], # this belongs to the user of current session, but we can't get it by uuid
+    [:admin_trustedclient, :active, 200],
+  ].each do |user, token, status|
+    test "as user #{user} get #{token} token and expect #{status}" do
+      authorize_with user
+      get :show, {id: api_client_authorizations(token).uuid}
+      assert_response status
+    end
+  end
+
+  [
+    [:admin, :admin, 200],
+    [:admin, :active, 403],
+    [:admin, :admin_vm, 403], # this belongs to the user of current session, but we can't list it by uuid
+    [:admin_trustedclient, :active, 200],
+  ].each do |user, token, status|
+    test "as user #{user} list #{token} token using uuid and expect #{status}" do
+      authorize_with user
+      get :index, {
+        filters: [['uuid','=',api_client_authorizations(token).uuid]]
+      }
+      assert_response status
+    end
+  end
+
+  [
+    [:admin, :admin, 200],
+    [:admin, :active, 403],
+    [:admin, :admin_vm, 200], # this belongs to the user of current session, and can be listed by token
+    [:admin_trustedclient, :active, 200],
+  ].each do |user, token, status|
+    test "as user #{user} list #{token} token using token and expect #{status}" do
+      authorize_with user
+      get :index, {
+        filters: [['api_token','=',api_client_authorizations(token).api_token]]
+      }
+      assert_response status
+    end
+  end
 end
index 1d7bb77725222ed98c4272308a2717edaf86b6cd..6623c726df01923b7227d33f17e6f2098cab649e 100644 (file)
@@ -380,4 +380,47 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
                  'A Project (2)',
                  "new project name '#{new_project['name']}' was expected to be 'A Project (2)'")
   end
+
+  test "unsharing a project results in hiding it from previously shared user" do
+    # remove sharing link for project
+    @controller = Arvados::V1::LinksController.new
+    authorize_with :admin
+    post :destroy, id: links(:share_starred_project_with_project_viewer).uuid
+    assert_response :success
+
+    # verify that the user can no longer see the project
+    @counter = 0  # Reset executed action counter
+    @controller = Arvados::V1::GroupsController.new
+    authorize_with :project_viewer
+    get :index, filters: [['group_class', '=', 'project']], format: :json
+    assert_response :success
+    found_projects = {}
+    json_response['items'].each do |g|
+      found_projects[g['uuid']] = g
+    end
+    assert_equal false, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
+
+    # share the project
+    @counter = 0
+    @controller = Arvados::V1::LinksController.new
+    authorize_with :system_user
+    post :create, link: {
+      link_class: "permission",
+      name: "can_read",
+      head_uuid: groups(:starred_and_shared_active_user_project).uuid,
+      tail_uuid: users(:project_viewer).uuid,
+    }
+
+    # verify that project_viewer user can now see shared project again
+    @counter = 0
+    @controller = Arvados::V1::GroupsController.new
+    authorize_with :project_viewer
+    get :index, filters: [['group_class', '=', 'project']], format: :json
+    assert_response :success
+    found_projects = {}
+    json_response['items'].each do |g|
+      found_projects[g['uuid']] = g
+    end
+    assert_equal true, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
+  end
 end
index d2f56699ed0c0a858b3d296bd1a799dd605fa4a0..428c663a77b92ab2fc679ef20bbcf6b8fcc0fd69 100644 (file)
@@ -182,4 +182,42 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     }
     assert_response 422
   end
+
+  test "first ping should set ip addr using local_ipv4 when provided" do
+    post :ping, {
+      id: 'zzzzz-7ekkf-nodenoipaddryet',
+      instance_id: 'i-0000000',
+      local_ipv4: '172.17.2.172',
+      ping_secret: 'abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2'
+    }
+    assert_response :success
+    response = JSON.parse(@response.body)
+    assert_equal 'zzzzz-7ekkf-nodenoipaddryet', response['uuid']
+    assert_equal '172.17.2.172', response['ip_address']
+  end
+
+  test "first ping should set ip addr using remote_ip when local_ipv4 is not provided" do
+    post :ping, {
+      id: 'zzzzz-7ekkf-nodenoipaddryet',
+      instance_id: 'i-0000000',
+      ping_secret: 'abcdyefg4lb5q4gzqqtrnq30oyj08r8dtdimmanbqw49z1anz2'
+    }
+    assert_response :success
+    response = JSON.parse(@response.body)
+    assert_equal 'zzzzz-7ekkf-nodenoipaddryet', response['uuid']
+    assert_equal request.remote_ip, response['ip_address']
+  end
+
+  test "future pings should not change previous ip address" do
+    post :ping, {
+      id: 'zzzzz-7ekkf-2z3mc76g2q73aio',
+      instance_id: 'i-0000000',
+      local_ipv4: '172.17.2.175',
+      ping_secret: '69udawxvn3zzj45hs8bumvndricrha4lcpi23pd69e44soanc0'
+    }
+    assert_response :success
+    response = JSON.parse(@response.body)
+    assert_equal 'zzzzz-7ekkf-2z3mc76g2q73aio', response['uuid']
+    assert_equal '172.17.2.174', response['ip_address']   # original ip address is not overwritten
+  end
 end
diff --git a/services/api/test/functional/arvados/v1/query_test.rb b/services/api/test/functional/arvados/v1/query_test.rb
new file mode 100644 (file)
index 0000000..91fe077
--- /dev/null
@@ -0,0 +1,68 @@
+require 'test_helper'
+
+class Arvados::V1::QueryTest < ActionController::TestCase
+  test 'no fallback orders when order is unambiguous' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['id asc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal ['logs.id asc'], assigns(:objects).order_values
+  end
+
+  test 'fallback orders when order is ambiguous' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['event_type asc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal('logs.event_type asc, logs.modified_at desc, logs.uuid',
+                 assigns(:objects).order_values.join(', '))
+  end
+
+  test 'skip fallback orders already given by client' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['modified_at asc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal('logs.modified_at asc, logs.uuid',
+                 assigns(:objects).order_values.join(', '))
+  end
+
+  test 'eliminate superfluous orders' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['logs.modified_at asc',
+              'modified_at desc',
+              'event_type desc',
+              'logs.event_type asc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal('logs.modified_at asc, logs.event_type desc, logs.uuid',
+                 assigns(:objects).order_values.join(', '))
+  end
+
+  test 'eliminate orders after the first unique column' do
+    @controller = Arvados::V1::LogsController.new
+    authorize_with :active
+    get :index, {
+      order: ['event_type asc',
+              'id asc',
+              'uuid asc',
+              'modified_at desc'],
+      controller: 'logs',
+    }
+    assert_response :success
+    assert_equal('logs.event_type asc, logs.id asc',
+                 assigns(:objects).order_values.join(', '))
+  end
+end
index e14912423db73483ef2623149e23d3ca63b3dabb..6bce3258d9857808f24e677ce5374f0f2de61a23 100644 (file)
@@ -142,7 +142,7 @@ func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
                statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
                stats, err := ioutil.ReadFile(statsFilename)
                if err != nil {
-                       statLog.Printf("read %s: %s\n", statsFilename, err)
+                       statLog.Printf("error reading %s: %s\n", statsFilename, err)
                        continue
                }
                return strings.NewReader(string(stats)), nil
@@ -409,7 +409,7 @@ func run(logger *log.Logger) error {
                        if cmd.Process != nil {
                                cmd.Process.Signal(catch)
                        }
-                       statLog.Println("caught signal:", catch)
+                       statLog.Println("notice: caught signal:", catch)
                }(sigChan)
                signal.Notify(sigChan, syscall.SIGTERM)
                signal.Notify(sigChan, syscall.SIGINT)
index 71623a5f3d07364bb4de94c42023af3b5658fc55..c4b0df3a4e51e5ba9236b1a04957d019cce5d88c 100644 (file)
@@ -82,6 +82,10 @@ class ArgumentParser(argparse.ArgumentParser):
 
         self.add_argument('--crunchstat-interval', type=float, help="Write stats to stderr every N seconds (default disabled)", default=0)
 
+        self.add_argument('--unmount-timeout',
+                          type=float, default=2.0,
+                          help="Time to wait for graceful shutdown after --exec program exits and filesystem is unmounted")
+
         self.add_argument('--exec', type=str, nargs=argparse.REMAINDER,
                             dest="exec_args", metavar=('command', 'args', '...', '--'),
                             help="""Mount, run a command, then unmount and exit""")
@@ -91,6 +95,7 @@ class Mount(object):
     def __init__(self, args, logger=logging.getLogger('arvados.arv-mount')):
         self.logger = logger
         self.args = args
+        self.listen_for_events = False
 
         self.args.mountpoint = os.path.realpath(self.args.mountpoint)
         if self.args.logfile:
@@ -106,15 +111,21 @@ class Mount(object):
 
     def __enter__(self):
         llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-        if self.args.mode != 'by_pdh':
+        if self.listen_for_events:
             self.operations.listen_for_events()
-        t = threading.Thread(None, lambda: llfuse.main())
-        t.start()
+        self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+        self.llfuse_thread.daemon = True
+        self.llfuse_thread.start()
         self.operations.initlock.wait()
 
     def __exit__(self, exc_type, exc_value, traceback):
         subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
-        self.operations.destroy()
+        self.llfuse_thread.join(timeout=self.args.unmount_timeout)
+        if self.llfuse_thread.is_alive():
+            self.logger.warning("Mount.__exit__:"
+                                " llfuse thread still alive %fs after umount"
+                                " -- abandoning and exiting anyway",
+                                self.args.unmount_timeout)
 
     def run(self):
         if self.args.exec_args:
@@ -230,7 +241,9 @@ class Mount(object):
             mount_readme = True
 
         if dir_class is not None:
-            self.operations.inodes.add_entry(dir_class(*dir_args))
+            ent = dir_class(*dir_args)
+            self.operations.inodes.add_entry(ent)
+            self.listen_for_events = ent.want_event_subscribe()
             return
 
         e = self.operations.inodes.add_entry(Directory(
@@ -260,6 +273,7 @@ class Mount(object):
         if name in ['', '.', '..'] or '/' in name:
             sys.exit("Mount point '{}' is not supported.".format(name))
         tld._entries[name] = self.operations.inodes.add_entry(ent)
+        self.listen_for_events = (self.listen_for_events or ent.want_event_subscribe())
 
     def _readme_text(self, api_host, user_email):
         return '''
@@ -277,63 +291,57 @@ From here, the following directories are available:
 '''.format(api_host, user_email)
 
     def _run_exec(self):
-        # Initialize the fuse connection
-        llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
-
-        # Subscribe to change events from API server
-        if self.args.mode != 'by_pdh':
-            self.operations.listen_for_events()
-
-        t = threading.Thread(None, lambda: llfuse.main())
-        t.start()
-
-        # wait until the driver is finished initializing
-        self.operations.initlock.wait()
-
         rc = 255
-        try:
-            sp = subprocess.Popen(self.args.exec_args, shell=False)
-
-            # forward signals to the process.
-            signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
-            signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
-            signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
-
-            # wait for process to complete.
-            rc = sp.wait()
-
-            # restore default signal handlers.
-            signal.signal(signal.SIGINT, signal.SIG_DFL)
-            signal.signal(signal.SIGTERM, signal.SIG_DFL)
-            signal.signal(signal.SIGQUIT, signal.SIG_DFL)
-        except Exception as e:
-            self.logger.exception(
-                'arv-mount: exception during exec %s', self.args.exec_args)
+        with self:
             try:
-                rc = e.errno
-            except AttributeError:
-                pass
-        finally:
-            subprocess.call(["fusermount", "-u", "-z", self.args.mountpoint])
-            self.operations.destroy()
+                sp = subprocess.Popen(self.args.exec_args, shell=False)
+
+                # forward signals to the process.
+                signal.signal(signal.SIGINT, lambda signum, frame: sp.send_signal(signum))
+                signal.signal(signal.SIGTERM, lambda signum, frame: sp.send_signal(signum))
+                signal.signal(signal.SIGQUIT, lambda signum, frame: sp.send_signal(signum))
+
+                # wait for process to complete.
+                rc = sp.wait()
+
+                # restore default signal handlers.
+                signal.signal(signal.SIGINT, signal.SIG_DFL)
+                signal.signal(signal.SIGTERM, signal.SIG_DFL)
+                signal.signal(signal.SIGQUIT, signal.SIG_DFL)
+            except Exception as e:
+                self.logger.exception(
+                    'arv-mount: exception during exec %s', self.args.exec_args)
+                try:
+                    rc = e.errno
+                except AttributeError:
+                    pass
         exit(rc)
 
     def _run_standalone(self):
         try:
             llfuse.init(self.operations, self.args.mountpoint, self._fuse_options())
 
-            if not (self.args.exec_args or self.args.foreground):
-                self.daemon_ctx = daemon.DaemonContext(working_directory=os.path.dirname(self.args.mountpoint),
-                                                       files_preserve=range(3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
+            if not self.args.foreground:
+                self.daemon_ctx = daemon.DaemonContext(
+                    working_directory=os.path.dirname(self.args.mountpoint),
+                    files_preserve=range(
+                        3, resource.getrlimit(resource.RLIMIT_NOFILE)[1]))
                 self.daemon_ctx.open()
 
             # Subscribe to change events from API server
-            self.operations.listen_for_events()
+            if self.listen_for_events:
+                self.operations.listen_for_events()
 
-            llfuse.main()
+            self._llfuse_main()
         except Exception as e:
             self.logger.exception('arv-mount: exception during mount: %s', e)
             exit(getattr(e, 'errno', 1))
-        finally:
-            self.operations.destroy()
         exit(0)
+
+    def _llfuse_main(self):
+        try:
+            llfuse.main()
+        except:
+            llfuse.close(unmount=False)
+            raise
+        llfuse.close()
index 4c4dbc8ea1c7ec1db585673015d236586868154b..196bb221e901e132d10db4f2bdbd7ed060f794e3 100644 (file)
@@ -184,6 +184,9 @@ class Directory(FreshBase):
     def flush(self):
         pass
 
+    def want_event_subscribe(self):
+        raise NotImplementedError()
+
     def create(self, name):
         raise NotImplementedError()
 
@@ -351,6 +354,9 @@ class CollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return self.collection.writable() if self.collection is not None else self._writable
 
+    def want_event_subscribe(self):
+        return (uuid_pattern.match(self.collection_locator) is not None)
+
     # Used by arv-web.py to switch the contents of the CollectionDirectory
     def change_collection(self, new_locator):
         """Switch the contents of the CollectionDirectory.
@@ -544,6 +550,9 @@ class TmpCollectionDirectory(CollectionDirectoryBase):
     def writable(self):
         return True
 
+    def want_event_subscribe(self):
+        return False
+
     def finalize(self):
         self.collection.stop_threads()
 
@@ -629,6 +638,9 @@ will appear if it exists.
     def clear(self, force=False):
         pass
 
+    def want_event_subscribe(self):
+        return not self.pdh_only
+
 
 class RecursiveInvalidateDirectory(Directory):
     def invalidate(self):
@@ -650,6 +662,9 @@ class TagsDirectory(RecursiveInvalidateDirectory):
         self._poll = True
         self._poll_time = poll_time
 
+    def want_event_subscribe(self):
+        return True
+
     @use_counter
     def update(self):
         with llfuse.lock_released:
@@ -678,6 +693,9 @@ class TagDirectory(Directory):
         self._poll = poll
         self._poll_time = poll_time
 
+    def want_event_subscribe(self):
+        return True
+
     @use_counter
     def update(self):
         with llfuse.lock_released:
@@ -709,6 +727,9 @@ class ProjectDirectory(Directory):
         self._updating_lock = threading.Lock()
         self._current_user = None
 
+    def want_event_subscribe(self):
+        return True
+
     def createDirectory(self, i):
         if collection_uuid_pattern.match(i['uuid']):
             return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
@@ -929,3 +950,6 @@ class SharedDirectory(Directory):
                        lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
         except Exception:
             _logger.exception()
+
+    def want_event_subscribe(self):
+        return True
diff --git a/services/fuse/fpm-info.sh b/services/fuse/fpm-info.sh
new file mode 100644 (file)
index 0000000..57671cd
--- /dev/null
@@ -0,0 +1 @@
+fpm_depends+=(fuse)
index 1cedd6665001b86a5baf4e1d5cba2c2a32e9df6f..fca1edf6bc25603beac62e6b88041cf481da43ee 100644 (file)
@@ -33,7 +33,7 @@ setup(name='arvados_fuse',
       ],
       install_requires=[
         'arvados-python-client >= 0.1.20151118035730',
-        'llfuse>=0.40',
+        'llfuse==0.41.1',
         'python-daemon',
         'ciso8601'
         ],
index faa4a55065945d907109b8994470ccccb2c5904d..5a45bfc103f34df2ae928ed5bcd305d100c408fa 100644 (file)
@@ -62,7 +62,9 @@ class IntegrationTest(unittest.TestCase):
             def wrapper(self, *args, **kwargs):
                 with arvados_fuse.command.Mount(
                         arvados_fuse.command.ArgumentParser().parse_args(
-                            argv + ['--foreground', self.mnt])):
+                            argv + ['--foreground',
+                                    '--unmount-timeout=0.1',
+                                    self.mnt])):
                     return func(self, *args, **kwargs)
             return wrapper
         return decorator
index 44ec1996d2121698b478e2ba25a6ae95d0bef4c7..c79daf80f54156b6e304839c01a66221217ae3c9 100644 (file)
@@ -37,6 +37,16 @@ class MountTestBase(unittest.TestCase):
         run_test_server.authorize_with("admin")
         self.api = api if api else arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
 
+    # This is a copy of Mount's method.  TODO: Refactor MountTestBase
+    # to use a Mount instead of copying its code.
+    def _llfuse_main(self):
+        try:
+            llfuse.main()
+        except:
+            llfuse.close(unmount=False)
+            raise
+        llfuse.close()
+
     def make_mount(self, root_class, **root_kwargs):
         self.operations = fuse.Operations(
             os.getuid(), os.getgid(),
@@ -45,7 +55,9 @@ class MountTestBase(unittest.TestCase):
         self.operations.inodes.add_entry(root_class(
             llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
         llfuse.init(self.operations, self.mounttmp, [])
-        threading.Thread(None, llfuse.main).start()
+        self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
+        self.llfuse_thread.daemon = True
+        self.llfuse_thread.start()
         # wait until the driver is finished initializing
         self.operations.initlock.wait()
         return self.operations.inodes[llfuse.ROOT_INODE]
@@ -55,17 +67,12 @@ class MountTestBase(unittest.TestCase):
         self.pool.join()
         del self.pool
 
-        # llfuse.close is buggy, so use fusermount instead.
-        #llfuse.close(unmount=True)
-
-        count = 0
-        success = 1
-        while (count < 9 and success != 0):
-          success = subprocess.call(["fusermount", "-u", self.mounttmp])
-          time.sleep(0.1)
-          count += 1
-
-        self.operations.destroy()
+        subprocess.call(["fusermount", "-u", "-z", self.mounttmp])
+        self.llfuse_thread.join(timeout=1)
+        if self.llfuse_thread.is_alive():
+            logger.warning("MountTestBase.tearDown():"
+                           " llfuse thread still alive 1s after umount"
+                           " -- abandoning and exiting anyway")
 
         os.rmdir(self.mounttmp)
         if self.keeptmp:
index bfefc674d7991dd80963a5d102a1bc757bc7bfb3..bb80d0a2fc94dc4c77c0f46f59414a8d00627235 100644 (file)
@@ -6,6 +6,7 @@ import functools
 import json
 import llfuse
 import logging
+import mock
 import os
 import run_test_server
 import sys
@@ -82,6 +83,7 @@ class MountArgsTest(unittest.TestCase):
         self.mnt = arvados_fuse.command.Mount(args)
         e = self.check_ent_type(arvados_fuse.MagicDirectory)
         self.assertEqual(e.pdh_only, False)
+        self.assertEqual(True, self.mnt.listen_for_events)
 
     @noexit
     def test_by_pdh(self):
@@ -92,6 +94,7 @@ class MountArgsTest(unittest.TestCase):
         self.mnt = arvados_fuse.command.Mount(args)
         e = self.check_ent_type(arvados_fuse.MagicDirectory)
         self.assertEqual(e.pdh_only, True)
+        self.assertEqual(False, self.mnt.listen_for_events)
 
     @noexit
     def test_by_tag(self):
@@ -101,6 +104,7 @@ class MountArgsTest(unittest.TestCase):
         self.assertEqual(args.mode, 'by_tag')
         self.mnt = arvados_fuse.command.Mount(args)
         e = self.check_ent_type(arvados_fuse.TagsDirectory)
+        self.assertEqual(True, self.mnt.listen_for_events)
 
     @noexit
     def test_collection(self, id_type='uuid'):
@@ -112,6 +116,7 @@ class MountArgsTest(unittest.TestCase):
         self.mnt = arvados_fuse.command.Mount(args)
         e = self.check_ent_type(arvados_fuse.CollectionDirectory)
         self.assertEqual(e.collection_locator, cid)
+        self.assertEqual(id_type == 'uuid', self.mnt.listen_for_events)
 
     def test_collection_pdh(self):
         self.test_collection('portable_data_hash')
@@ -126,6 +131,7 @@ class MountArgsTest(unittest.TestCase):
         e = self.check_ent_type(arvados_fuse.ProjectDirectory)
         self.assertEqual(e.project_object['uuid'],
                          run_test_server.fixture('users')['active']['uuid'])
+        self.assertEqual(True, self.mnt.listen_for_events)
 
     def test_mutually_exclusive_args(self):
         cid = run_test_server.fixture('collections')['public_text_file']['uuid']
@@ -162,9 +168,11 @@ class MountArgsTest(unittest.TestCase):
         e = self.check_ent_type(arvados_fuse.SharedDirectory)
         self.assertEqual(e.current_user['uuid'],
                          run_test_server.fixture('users')['active']['uuid'])
+        self.assertEqual(True, self.mnt.listen_for_events)
 
     @noexit
-    def test_custom(self):
+    @mock.patch('arvados.events.subscribe')
+    def test_custom(self, mock_subscribe):
         args = arvados_fuse.command.ArgumentParser().parse_args([
             '--mount-tmp', 'foo',
             '--mount-tmp', 'bar',
@@ -178,6 +186,24 @@ class MountArgsTest(unittest.TestCase):
         e = self.check_ent_type(arvados_fuse.ProjectDirectory, 'my_home')
         self.assertEqual(e.project_object['uuid'],
                          run_test_server.fixture('users')['active']['uuid'])
+        self.assertEqual(True, self.mnt.listen_for_events)
+        with self.mnt:
+            pass
+        self.assertEqual(1, mock_subscribe.call_count)
+
+    @noexit
+    @mock.patch('arvados.events.subscribe')
+    def test_custom_no_listen(self, mock_subscribe):
+        args = arvados_fuse.command.ArgumentParser().parse_args([
+            '--mount-by-pdh', 'pdh',
+            '--mount-tmp', 'foo',
+            '--mount-tmp', 'bar',
+            '--foreground', self.mntdir])
+        self.mnt = arvados_fuse.command.Mount(args)
+        self.assertEqual(False, self.mnt.listen_for_events)
+        with self.mnt:
+            pass
+        self.assertEqual(0, mock_subscribe.call_count)
 
     def test_custom_unsupported_layouts(self):
         for name in ['.', '..', '', 'foo/bar', '/foo']:
diff --git a/services/fuse/tests/test_exec.py b/services/fuse/tests/test_exec.py
new file mode 100644 (file)
index 0000000..66013a4
--- /dev/null
@@ -0,0 +1,60 @@
+import arvados_fuse.command
+import json
+import multiprocessing
+import os
+import run_test_server
+import tempfile
+import unittest
+
+try:
+    from shlex import quote
+except:
+    from pipes import quote
+
+def try_exec(mnt, cmd):
+    try:
+        arvados_fuse.command.Mount(
+            arvados_fuse.command.ArgumentParser().parse_args([
+                '--read-write',
+                '--mount-tmp=zzz',
+                '--unmount-timeout=0.1',
+                mnt,
+                '--exec'] + cmd)).run()
+    except SystemExit:
+        pass
+    else:
+        raise AssertionError('should have exited')
+
+
+class ExecMode(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        run_test_server.run()
+        run_test_server.run_keep(enforce_permissions=True, num_servers=2)
+        run_test_server.authorize_with('active')
+
+    @classmethod
+    def tearDownClass(cls):
+        run_test_server.stop_keep(num_servers=2)
+
+    def setUp(self):
+        self.mnt = tempfile.mkdtemp()
+        _, self.okfile = tempfile.mkstemp()
+        self.pool = multiprocessing.Pool(1)
+
+    def tearDown(self):
+        self.pool.terminate()
+        self.pool.join()
+        os.rmdir(self.mnt)
+        os.unlink(self.okfile)
+
+    def test_exec(self):
+        self.pool.apply(try_exec, (self.mnt, [
+            'sh', '-c',
+            'echo -n foo >{}; cp {} {}'.format(
+                quote(os.path.join(self.mnt, 'zzz', 'foo.txt')),
+                quote(os.path.join(self.mnt, 'zzz', '.arvados#collection')),
+                quote(os.path.join(self.okfile)))]))
+        self.assertRegexpMatches(
+            json.load(open(self.okfile))['manifest_text'],
+            r' 0:3:foo.txt\n')
index c0033d9186c3a6306ff4251aa3e84b37af6e5109..7dfb84d109957af05f101f01c4dbc94e074457d3 100644 (file)
@@ -43,6 +43,10 @@ type azureVolumeAdder struct {
 }
 
 func (s *azureVolumeAdder) Set(containerName string) error {
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
+
        if containerName == "" {
                return errors.New("no container name given")
        }
@@ -311,11 +315,16 @@ func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
        }
 }
 
-// Delete a Keep block.
-func (v *AzureBlobVolume) Delete(loc string) error {
+// Trash a Keep block.
+func (v *AzureBlobVolume) Trash(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
+
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
+
        // Ideally we would use If-Unmodified-Since, but that
        // particular condition seems to be ignored by Azure. Instead,
        // we get the Etag before checking Mtime, and use If-Match to
@@ -335,6 +344,12 @@ func (v *AzureBlobVolume) Delete(loc string) error {
        })
 }
 
+// Untrash a Keep block.
+// TBD
+func (v *AzureBlobVolume) Untrash(loc string) error {
+       return ErrNotImplemented
+}
+
 // Status returns a VolumeStatus struct with placeholder data.
 func (v *AzureBlobVolume) Status() *VolumeStatus {
        return &VolumeStatus{
index 3817ea19002d1c18f14c2479a383fb2d1601d763..a7675fb1dcfbea5782a40ef6fc5b0d0c8bd93a8f 100644 (file)
@@ -970,3 +970,106 @@ func TestPutReplicationHeader(t *testing.T) {
                t.Errorf("Got X-Keep-Replicas-Stored: %q, expected %q", r, "1")
        }
 }
+
+func TestUntrashHandler(t *testing.T) {
+       defer teardown()
+
+       // Set up Keep volumes
+       KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
+       vols := KeepVM.AllWritable()
+       vols[0].Put(TestHash, TestBlock)
+
+       dataManagerToken = "DATA MANAGER TOKEN"
+
+       // unauthenticatedReq => UnauthorizedError
+       unauthenticatedReq := &RequestTester{
+               method: "PUT",
+               uri:    "/untrash/" + TestHash,
+       }
+       response := IssueRequest(unauthenticatedReq)
+       ExpectStatusCode(t,
+               "Unauthenticated request",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // notDataManagerReq => UnauthorizedError
+       notDataManagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: knownToken,
+       }
+
+       response = IssueRequest(notDataManagerReq)
+       ExpectStatusCode(t,
+               "Non-datamanager token",
+               UnauthorizedError.HTTPCode,
+               response)
+
+       // datamanagerWithBadHashReq => StatusBadRequest
+       datamanagerWithBadHashReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/thisisnotalocator",
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerWithBadHashReq)
+       ExpectStatusCode(t,
+               "Bad locator in untrash request",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerWrongMethodReq => StatusBadRequest
+       datamanagerWrongMethodReq := &RequestTester{
+               method:   "GET",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerWrongMethodReq)
+       ExpectStatusCode(t,
+               "Only PUT method is supported for untrash",
+               http.StatusBadRequest,
+               response)
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response = IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "",
+               http.StatusOK,
+               response)
+       expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
+       if response.Body.String() != expected {
+               t.Errorf(
+                       "Untrash response mismatched: expected %s, got:\n%s",
+                       expected, response.Body.String())
+       }
+}
+
+func TestUntrashHandlerWithNoWritableVolumes(t *testing.T) {
+       defer teardown()
+
+       // Set up readonly Keep volumes
+       vols := []*MockVolume{CreateMockVolume(), CreateMockVolume()}
+       vols[0].Readonly = true
+       vols[1].Readonly = true
+       KeepVM = MakeRRVolumeManager([]Volume{vols[0], vols[1]})
+       defer KeepVM.Close()
+
+       dataManagerToken = "DATA MANAGER TOKEN"
+
+       // datamanagerReq => StatusOK
+       datamanagerReq := &RequestTester{
+               method:   "PUT",
+               uri:      "/untrash/" + TestHash,
+               apiToken: dataManagerToken,
+       }
+       response := IssueRequest(datamanagerReq)
+       ExpectStatusCode(t,
+               "No writable volumes",
+               http.StatusNotFound,
+               response)
+}
index 95af1b48707c6b189982dc18762cb517769bd117..043ab69b17c255aa463fe8259a777cec682453f5 100644 (file)
@@ -20,6 +20,7 @@ import (
        "regexp"
        "runtime"
        "strconv"
+       "strings"
        "sync"
        "time"
 )
@@ -53,6 +54,9 @@ func MakeRESTRouter() *mux.Router {
        // Replace the current trash queue.
        rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
 
+       // Untrash moves blocks from trash back into store
+       rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
+
        // Any request which does not match any of these routes gets
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
@@ -295,7 +299,7 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
                Failed  int `json:"copies_failed"`
        }
        for _, vol := range KeepVM.AllWritable() {
-               if err := vol.Delete(hash); err == nil {
+               if err := vol.Trash(hash); err == nil {
                        result.Deleted++
                } else if os.IsNotExist(err) {
                        continue
@@ -430,6 +434,53 @@ func TrashHandler(resp http.ResponseWriter, req *http.Request) {
        trashq.ReplaceQueue(tlist)
 }
 
+// UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
+func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
+       // Reject unauthorized requests.
+       if !IsDataManagerToken(GetApiToken(req)) {
+               http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
+               return
+       }
+
+       hash := mux.Vars(req)["hash"]
+
+       if len(KeepVM.AllWritable()) == 0 {
+               http.Error(resp, "No writable volumes", http.StatusNotFound)
+               return
+       }
+
+       var untrashedOn, failedOn []string
+       var numNotFound int
+       for _, vol := range KeepVM.AllWritable() {
+               err := vol.Untrash(hash)
+
+               if os.IsNotExist(err) {
+                       numNotFound++
+               } else if err != nil {
+                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       failedOn = append(failedOn, vol.String())
+               } else {
+                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       untrashedOn = append(untrashedOn, vol.String())
+               }
+       }
+
+       if numNotFound == len(KeepVM.AllWritable()) {
+               http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
+               return
+       }
+
+       if len(failedOn) == len(KeepVM.AllWritable()) {
+               http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
+       } else {
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               if len(failedOn) > 0 {
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+               }
+               resp.Write([]byte(respBody))
+       }
+}
+
 // ==============================
 // GetBlock and PutBlock implement lower-level code for handling
 // blocks by rooting through volumes connected to the local machine.
index 96a887fecb20b278a2c9a763ebfc094b71bf31ac..3850e993fc511216d4967b1c757109ced844a591 100644 (file)
@@ -55,6 +55,10 @@ var dataManagerToken string
 // actually deleting anything.
 var neverDelete = true
 
+// trashLifetime is the time duration after a block is trashed
+// during which it can be recovered using an /untrash request
+var trashLifetime time.Duration
+
 var maxBuffers = 128
 var bufs *bufferPool
 
@@ -79,6 +83,7 @@ var (
        SizeRequiredError   = &KeepError{411, "Missing Content-Length"}
        TooLongError        = &KeepError{413, "Block is too large"}
        MethodDisabledError = &KeepError{405, "Method disabled"}
+       ErrNotImplemented   = &KeepError{500, "Unsupported configuration"}
 )
 
 func (e *KeepError) Error() string {
@@ -200,6 +205,11 @@ func main() {
                "max-buffers",
                maxBuffers,
                fmt.Sprintf("Maximum RAM to use for data buffers, given in multiples of block size (%d MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.", BlockSize>>20))
+       flag.DurationVar(
+               &trashLifetime,
+               "trash-lifetime",
+               0*time.Second,
+               "Interval after a block is trashed during which it can be recovered using an /untrash request")
 
        flag.Parse()
 
index 572ee46e71419693b103801c7e01a8a139a0c69e..7d9ba8ab9ef33bf46888566c6d0c6ae333dba9ae 100644 (file)
@@ -39,6 +39,9 @@ type s3VolumeAdder struct {
 }
 
 func (s *s3VolumeAdder) Set(bucketName string) error {
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
        if bucketName == "" {
                return fmt.Errorf("no container name given")
        }
@@ -257,10 +260,13 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        return nil
 }
 
-func (v *S3Volume) Delete(loc string) error {
+func (v *S3Volume) Trash(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
        if t, err := v.Mtime(loc); err != nil {
                return err
        } else if time.Since(t) < blobSignatureTTL {
@@ -272,6 +278,11 @@ func (v *S3Volume) Delete(loc string) error {
        return v.Bucket.Del(loc)
 }
 
+// TBD
+func (v *S3Volume) Untrash(loc string) error {
+       return ErrNotImplemented
+}
+
 func (v *S3Volume) Status() *VolumeStatus {
        return &VolumeStatus{
                DeviceNum: 1,
index 65e3fbd2849593e44be94921cb7073a5aba3adaa..62f63d57c8edb655b5078ebf637ce6d0ed0475bb 100644 (file)
@@ -47,7 +47,7 @@ func TrashItem(trashRequest TrashRequest) {
                if neverDelete {
                        err = errors.New("did not delete block because neverDelete is true")
                } else {
-                       err = volume.Delete(trashRequest.Locator)
+                       err = volume.Trash(trashRequest.Locator)
                }
 
                if err != nil {
index 7966c41b92bd89958308ec77765f0b7a5a1f0fd9..58710c04b269a57af236fbb36f5a6aaa61d9b256 100644 (file)
@@ -144,20 +144,21 @@ type Volume interface {
        // particular order.
        IndexTo(prefix string, writer io.Writer) error
 
-       // Delete deletes the block data from the underlying storage
-       // device.
+       // Trash moves the block data from the underlying storage
+       // device to trash area. The block then stays in trash for
+       // -trash-lifetime interval before it is actually deleted.
        //
        // loc is as described in Get.
        //
        // If the timestamp for the given locator is newer than
-       // blobSignatureTTL, Delete must not delete the data.
+       // blobSignatureTTL, Trash must not trash the data.
        //
-       // If a Delete operation overlaps with any Touch or Put
+       // If a Trash operation overlaps with any Touch or Put
        // operations on the same locator, the implementation must
        // ensure one of the following outcomes:
        //
        //   - Touch and Put return a non-nil error, or
-       //   - Delete does not delete the block, or
+       //   - Trash does not trash the block, or
        //   - Both of the above.
        //
        // If it is possible for the storage device to be accessed by
@@ -171,9 +172,12 @@ type Volume interface {
        // reliably or fail outright.
        //
        // Corollary: A successful Touch or Put guarantees a block
-       // will not be deleted for at least blobSignatureTTL
+       // will not be trashed for at least blobSignatureTTL
        // seconds.
-       Delete(loc string) error
+       Trash(loc string) error
+
+       // Untrash moves block from trash back into store
+       Untrash(loc string) error
 
        // Status returns a *VolumeStatus representing the current
        // in-use and available storage capacity and an
index 7580a202594426173ca14d54d9fda8c523171e30..5810411c89bca1d3a31b3d03f65748456fec78db 100644 (file)
@@ -76,6 +76,8 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
        testPutConcurrent(t, factory)
 
        testPutFullBlock(t, factory)
+
+       testTrashUntrash(t, factory)
 }
 
 // Put a test block, get it and verify content
@@ -420,7 +422,7 @@ func testDeleteNewBlock(t TB, factory TestableVolumeFactory) {
 
        v.Put(TestHash, TestBlock)
 
-       if err := v.Delete(TestHash); err != nil {
+       if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
        data, err := v.Get(TestHash)
@@ -449,7 +451,7 @@ func testDeleteOldBlock(t TB, factory TestableVolumeFactory) {
        v.Put(TestHash, TestBlock)
        v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
 
-       if err := v.Delete(TestHash); err != nil {
+       if err := v.Trash(TestHash); err != nil {
                t.Error(err)
        }
        if _, err := v.Get(TestHash); err == nil || !os.IsNotExist(err) {
@@ -463,7 +465,7 @@ func testDeleteNoSuchBlock(t TB, factory TestableVolumeFactory) {
        v := factory(t)
        defer v.Teardown()
 
-       if err := v.Delete(TestHash2); err == nil {
+       if err := v.Trash(TestHash2); err == nil {
                t.Errorf("Expected error when attempting to delete a non-existing block")
        }
 }
@@ -535,7 +537,7 @@ func testUpdateReadOnly(t TB, factory TestableVolumeFactory) {
        }
 
        // Delete a block from a read-only volume should result in error
-       err = v.Delete(TestHash)
+       err = v.Trash(TestHash)
        if err == nil {
                t.Errorf("Expected error when deleting block from a read-only volume")
        }
@@ -696,3 +698,63 @@ func testPutFullBlock(t TB, factory TestableVolumeFactory) {
                t.Error("rdata != wdata")
        }
 }
+
+// With trashLifetime != 0, perform:
+// Trash an old block - which either raises ErrNotImplemented or succeeds
+// Untrash -  which either raises ErrNotImplemented or succeeds
+// Get - which must succeed
+func testTrashUntrash(t TB, factory TestableVolumeFactory) {
+       v := factory(t)
+       defer v.Teardown()
+       defer func() {
+               trashLifetime = 0
+       }()
+
+       trashLifetime = 3600 * time.Second
+
+       // put block and backdate it
+       v.PutRaw(TestHash, TestBlock)
+       v.TouchWithDate(TestHash, time.Now().Add(-2*blobSignatureTTL))
+
+       buf, err := v.Get(TestHash)
+       if err != nil {
+               t.Fatal(err)
+       }
+       if bytes.Compare(buf, TestBlock) != 0 {
+               t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+       }
+       bufs.Put(buf)
+
+       // Trash
+       err = v.Trash(TestHash)
+       if v.Writable() == false {
+               if err != MethodDisabledError {
+                       t.Error(err)
+               }
+       } else if err != nil {
+               if err != ErrNotImplemented {
+                       t.Error(err)
+               }
+       } else {
+               _, err = v.Get(TestHash)
+               if err == nil || !os.IsNotExist(err) {
+                       t.Errorf("os.IsNotExist(%v) should have been true", err)
+               }
+
+               // Untrash
+               err = v.Untrash(TestHash)
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       // Get the block - after trash and untrash sequence
+       buf, err = v.Get(TestHash)
+       if err != nil {
+               t.Fatal(err)
+       }
+       if bytes.Compare(buf, TestBlock) != 0 {
+               t.Errorf("Got data %+q, expected %+q", buf, TestBlock)
+       }
+       bufs.Put(buf)
+}
index d6714365de5bef98ad082b93f595231993bafa48..53ffeef0bba186d7f995e6e6afb00feb194c5e7f 100644 (file)
@@ -183,7 +183,7 @@ func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
        return nil
 }
 
-func (v *MockVolume) Delete(loc string) error {
+func (v *MockVolume) Trash(loc string) error {
        v.gotCall("Delete")
        <-v.Gate
        if v.Readonly {
@@ -199,6 +199,11 @@ func (v *MockVolume) Delete(loc string) error {
        return os.ErrNotExist
 }
 
+// TBD
+func (v *MockVolume) Untrash(loc string) error {
+       return nil
+}
+
 func (v *MockVolume) Status() *VolumeStatus {
        var used uint64
        for _, block := range v.Store {
index 910cc25d613cb7690f944b418aebf5c205c7aced..0dd1d82a98ca4b9f14c79d8b96e90f10faf4311f 100644 (file)
@@ -23,6 +23,9 @@ type unixVolumeAdder struct {
 }
 
 func (vs *unixVolumeAdder) Set(value string) error {
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
        if dirs := strings.Split(value, ","); len(dirs) > 1 {
                log.Print("DEPRECATED: using comma-separated volume list.")
                for _, dir := range dirs {
@@ -363,7 +366,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 }
 
 // Delete deletes the block data from the unix storage
-func (v *UnixVolume) Delete(loc string) error {
+func (v *UnixVolume) Trash(loc string) error {
        // Touch() must be called before calling Write() on a block.  Touch()
        // also uses lockfile().  This avoids a race condition between Write()
        // and Delete() because either (a) the file will be deleted and Touch()
@@ -375,6 +378,9 @@ func (v *UnixVolume) Delete(loc string) error {
        if v.readonly {
                return MethodDisabledError
        }
+       if trashLifetime != 0 {
+               return ErrNotImplemented
+       }
        if v.locker != nil {
                v.locker.Lock()
                defer v.locker.Unlock()
@@ -405,6 +411,12 @@ func (v *UnixVolume) Delete(loc string) error {
        return os.Remove(p)
 }
 
+// Untrash moves block from trash back into store
+// TBD
+func (v *UnixVolume) Untrash(loc string) error {
+       return ErrNotImplemented
+}
+
 // blockDir returns the fully qualified directory name for the directory
 // where loc is (or would be) stored on this volume.
 func (v *UnixVolume) blockDir(loc string) string {
index b216810f8cb0fc1008e4bb7bc99b3c306728d70a..0775e89ed275d14f7e2be510084a52e39af84472 100644 (file)
@@ -166,7 +166,7 @@ func TestUnixVolumeReadonly(t *testing.T) {
                t.Errorf("got err %v, expected MethodDisabledError", err)
        }
 
-       err = v.Delete(TestHash)
+       err = v.Trash(TestHash)
        if err != MethodDisabledError {
                t.Errorf("got err %v, expected MethodDisabledError", err)
        }
index c92fc9b00dd0cc723e4fd64bd2b001daa8bce60c..e1b8c484f0413cb8ff6bfe3ac0be77aebbcd2aa7 100755 (executable)
@@ -68,7 +68,6 @@ begin
   logins.each do |l|
     next if seen[l[:username]]
     seen[l[:username]] = true if not seen.has_key?(l[:username])
-    @homedir = "/home/#{l[:username]}"
 
     unless uids[l[:username]]
       STDERR.puts "Creating account #{l[:username]}"
@@ -85,6 +84,7 @@ begin
                          out: devnull)
     end
     # Create .ssh directory if necessary
+    @homedir = Etc.getpwnam(l[:username]).dir
     userdotssh = File.join(@homedir, ".ssh")
     Dir.mkdir(userdotssh) if !File.exists?(userdotssh)
     @key = "#######################################################################################
@@ -109,4 +109,3 @@ rescue Exception => bang
   puts bang.backtrace.join("\n")
   exit 1
 end
-
index 6319f4bbfc5cece52832842cfda05d9ae9253005..9a9ce588d382f54b9e399b5b280b8ed979557927 100644 (file)
@@ -38,10 +38,8 @@ class RemotePollLoopActor(actor_class):
         super(RemotePollLoopActor, self).__init__()
         self._client = client
         self._timer = timer_actor
-        self._logger = logging.getLogger(self.LOGGER_NAME)
         self._later = self.actor_ref.proxy()
         self._polling_started = False
-        self.log_prefix = "{} (at {})".format(self.__class__.__name__, id(self))
         self.min_poll_wait = poll_wait
         self.max_poll_wait = max_poll_wait
         self.poll_wait = self.min_poll_wait
@@ -50,6 +48,9 @@ class RemotePollLoopActor(actor_class):
         if hasattr(self, '_item_key'):
             self.subscribe_to = self._subscribe_to
 
+    def on_start(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, id(self.actor_urn[9:])))
+
     def _start_polling(self):
         if not self._polling_started:
             self._polling_started = True
@@ -57,22 +58,20 @@ class RemotePollLoopActor(actor_class):
 
     def subscribe(self, subscriber):
         self.all_subscribers.add(subscriber)
-        self._logger.debug("%r subscribed to all events", subscriber)
+        self._logger.debug("%s subscribed to all events", subscriber.actor_ref.actor_urn)
         self._start_polling()
 
     # __init__ exposes this method to the proxy if the subclass defines
     # _item_key.
     def _subscribe_to(self, key, subscriber):
         self.key_subscribers.setdefault(key, set()).add(subscriber)
-        self._logger.debug("%r subscribed to events for '%s'", subscriber, key)
+        self._logger.debug("%s subscribed to events for '%s'", subscriber.actor_ref.actor_urn, key)
         self._start_polling()
 
     def _send_request(self):
         raise NotImplementedError("subclasses must implement request method")
 
     def _got_response(self, response):
-        self._logger.debug("%s got response with %d items",
-                           self.log_prefix, len(response))
         self.poll_wait = self.min_poll_wait
         _notify_subscribers(response, self.all_subscribers)
         if hasattr(self, '_item_key'):
@@ -82,14 +81,14 @@ class RemotePollLoopActor(actor_class):
 
     def _got_error(self, error):
         self.poll_wait = min(self.poll_wait * 2, self.max_poll_wait)
-        return "{} got error: {} - waiting {} seconds".format(
-            self.log_prefix, error, self.poll_wait)
+        return "got error: {} - will try again in {} seconds".format(
+            error, self.poll_wait)
 
     def is_common_error(self, exception):
         return False
 
     def poll(self, scheduled_start=None):
-        self._logger.debug("%s sending poll", self.log_prefix)
+        self._logger.debug("sending request")
         start_time = time.time()
         if scheduled_start is None:
             scheduled_start = start_time
@@ -105,6 +104,9 @@ class RemotePollLoopActor(actor_class):
         else:
             self._got_response(response)
             next_poll = scheduled_start + self.poll_wait
+            self._logger.info("got response with %d items in %s seconds, next poll at %s",
+                              len(response), (time.time() - scheduled_start),
+                              time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_poll)))
         end_time = time.time()
         if next_poll < end_time:  # We've drifted too much; start fresh.
             next_poll = end_time + self.poll_wait
index 6e46bc0f4c6283ab0d73da212529299bdec4ba10..54d6a82bcefa1cf38ad06f58cbbf89fafe55ecd1 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import absolute_import, print_function
 
 import calendar
+import functools
 import itertools
 import re
 import time
@@ -43,6 +44,69 @@ def arvados_node_missing(arvados_node, fresh_time):
     else:
         return not timestamp_fresh(arvados_timestamp(arvados_node["last_ping_at"]), fresh_time)
 
+class RetryMixin(object):
+    """Retry decorator for an method that makes remote requests.
+
+    Use this function to decorate method, and pass in a tuple of exceptions to
+    catch.  If the original method raises a known cloud driver error, or any of
+    the given exception types, this decorator will either go into a
+    sleep-and-retry loop with exponential backoff either by sleeping (if
+    self._timer is None) or by scheduling retries of the method (if self._timer
+    is a timer actor.)
+
+    """
+    def __init__(self, retry_wait, max_retry_wait,
+                 logger, cloud, timer=None):
+        self.min_retry_wait = retry_wait
+        self.max_retry_wait = max_retry_wait
+        self.retry_wait = retry_wait
+        self._logger = logger
+        self._cloud = cloud
+        self._timer = timer
+
+    @staticmethod
+    def _retry(errors=()):
+        def decorator(orig_func):
+            @functools.wraps(orig_func)
+            def retry_wrapper(self, *args, **kwargs):
+                while True:
+                    try:
+                        ret = orig_func(self, *args, **kwargs)
+                    except Exception as error:
+                        if not (isinstance(error, errors) or
+                                self._cloud.is_cloud_exception(error)):
+                            self.retry_wait = self.min_retry_wait
+                            self._logger.warning(
+                                "Re-raising unknown error (no retry): %s",
+                                error, exc_info=error)
+                            raise
+
+                        self._logger.warning(
+                            "Client error: %s - waiting %s seconds",
+                            error, self.retry_wait, exc_info=error)
+
+                        if self._timer:
+                            start_time = time.time()
+                            # reschedule to be called again
+                            self._timer.schedule(start_time + self.retry_wait,
+                                                 getattr(self._later,
+                                                         orig_func.__name__),
+                                                 *args, **kwargs)
+                        else:
+                            # sleep on it.
+                            time.sleep(self.retry_wait)
+
+                        self.retry_wait = min(self.retry_wait * 2,
+                                              self.max_retry_wait)
+                        if self._timer:
+                            # expect to be called again by timer so don't loop
+                            return
+                    else:
+                        self.retry_wait = self.min_retry_wait
+                        return ret
+            return retry_wrapper
+        return decorator
+
 class ShutdownTimer(object):
     """Keep track of a cloud node's shutdown windows.
 
index b366e79ff834fc825697946cd9f3ae5007ad911f..2ae4fc8923612d474b833fcf9f345b255148ee3d 100644 (file)
@@ -10,66 +10,36 @@ import libcloud.common.types as cloud_types
 import pykka
 
 from .. import \
-    arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, arvados_node_missing
+    arvados_node_fqdn, arvados_node_mtime, arvados_timestamp, timestamp_fresh, \
+    arvados_node_missing, RetryMixin
 from ...clientactor import _notify_subscribers
 from ... import config
 
-class ComputeNodeStateChangeBase(config.actor_class):
+class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
     """Base class for actors that change a compute node's state.
 
     This base class takes care of retrying changes and notifying
     subscribers when the change is finished.
     """
-    def __init__(self, logger_name, cloud_client, arvados_client, timer_actor,
+    def __init__(self, cloud_client, arvados_client, timer_actor,
                  retry_wait, max_retry_wait):
         super(ComputeNodeStateChangeBase, self).__init__()
+        RetryMixin.__init__(self, retry_wait, max_retry_wait,
+                            None, cloud_client, timer_actor)
         self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger(logger_name)
-        self._cloud = cloud_client
         self._arvados = arvados_client
-        self._timer = timer_actor
-        self.min_retry_wait = retry_wait
-        self.max_retry_wait = max_retry_wait
-        self.retry_wait = retry_wait
         self.subscribers = set()
 
-    @staticmethod
-    def _retry(errors=()):
-        """Retry decorator for an actor method that makes remote requests.
-
-        Use this function to decorator an actor method, and pass in a
-        tuple of exceptions to catch.  This decorator will schedule
-        retries of that method with exponential backoff if the
-        original method raises a known cloud driver error, or any of the
-        given exception types.
-        """
-        def decorator(orig_func):
-            @functools.wraps(orig_func)
-            def retry_wrapper(self, *args, **kwargs):
-                start_time = time.time()
-                try:
-                    orig_func(self, *args, **kwargs)
-                except Exception as error:
-                    if not (isinstance(error, errors) or
-                            self._cloud.is_cloud_exception(error)):
-                        raise
-                    self._logger.warning(
-                        "Client error: %s - waiting %s seconds",
-                        error, self.retry_wait)
-                    self._timer.schedule(start_time + self.retry_wait,
-                                         getattr(self._later,
-                                                 orig_func.__name__),
-                                         *args, **kwargs)
-                    self.retry_wait = min(self.retry_wait * 2,
-                                          self.max_retry_wait)
-                else:
-                    self.retry_wait = self.min_retry_wait
-            return retry_wrapper
-        return decorator
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+
+    def on_start(self):
+        self._set_logger()
 
     def _finished(self):
         _notify_subscribers(self._later, self.subscribers)
         self.subscribers = None
+        self._logger.info("finished")
 
     def subscribe(self, subscriber):
         if self.subscribers is None:
@@ -93,6 +63,17 @@ class ComputeNodeStateChangeBase(config.actor_class):
                            'last_action': explanation}},
             ).execute()
 
+    @staticmethod
+    def _finish_on_exception(orig_func):
+        @functools.wraps(orig_func)
+        def finish_wrapper(self, *args, **kwargs):
+            try:
+                return orig_func(self, *args, **kwargs)
+            except Exception as error:
+                self._logger.error("Actor error %s", error)
+                self._finished()
+        return finish_wrapper
+
 
 class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     """Actor to create and set up a cloud compute node.
@@ -107,7 +88,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
                  cloud_size, arvados_node=None,
                  retry_wait=1, max_retry_wait=180):
         super(ComputeNodeSetupActor, self).__init__(
-            'arvnodeman.nodeup', cloud_client, arvados_client, timer_actor,
+            cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self.cloud_size = cloud_size
         self.arvados_node = None
@@ -117,20 +98,23 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         else:
             self._later.prepare_arvados_node(arvados_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
         self.arvados_node = self._arvados.nodes().create(body={}).execute()
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
         self.arvados_node = self._clean_arvados_node(
             node, "Prepared by Node Manager")
         self._later.create_cloud_node()
 
-    @ComputeNodeStateChangeBase._retry()
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry()
     def create_cloud_node(self):
-        self._logger.info("Creating cloud node with size %s.",
+        self._logger.info("Sending create_node request for node size %s.",
                           self.cloud_size.name)
         self.cloud_node = self._cloud.create_node(self.cloud_size,
                                                   self.arvados_node)
@@ -139,7 +123,8 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self._logger.info("Cloud node %s created.", self.cloud_node.id)
         self._later.update_arvados_node_properties()
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
     def update_arvados_node_properties(self):
         """Tell Arvados some details about the cloud node.
 
@@ -163,7 +148,7 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
         self._logger.info("%s updated properties.", self.arvados_node['uuid'])
         self._later.post_create()
 
-    @ComputeNodeStateChangeBase._retry()
+    @RetryMixin._retry()
     def post_create(self):
         self._cloud.post_create_node(self.cloud_node)
         self._logger.info("%s post-create work done.", self.cloud_node.id)
@@ -193,7 +178,7 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         # eligible.  Normal shutdowns based on job demand should be
         # cancellable; shutdowns based on node misbehavior should not.
         super(ComputeNodeShutdownActor, self).__init__(
-            'arvnodeman.nodedown', cloud_client, arvados_client, timer_actor,
+            cloud_client, arvados_client, timer_actor,
             retry_wait, max_retry_wait)
         self._monitor = node_monitor.proxy()
         self.cloud_node = self._monitor.cloud_node.get()
@@ -201,7 +186,11 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
         self.cancel_reason = None
         self.success = None
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
     def on_start(self):
+        super(ComputeNodeShutdownActor, self).on_start()
         self._later.shutdown_node()
 
     def _arvados_node(self):
@@ -214,38 +203,40 @@ class ComputeNodeShutdownActor(ComputeNodeStateChangeBase):
 
     def cancel_shutdown(self, reason):
         self.cancel_reason = reason
-        self._logger.info("Cloud node %s shutdown cancelled: %s.",
-                          self.cloud_node.id, reason)
+        self._logger.info("Shutdown cancelled: %s.", reason)
         self._finished(success_flag=False)
 
     def _stop_if_window_closed(orig_func):
         @functools.wraps(orig_func)
         def stop_wrapper(self, *args, **kwargs):
             if (self.cancellable and
-                  (not self._monitor.shutdown_eligible().get())):
+                  (self._monitor.shutdown_eligible().get() is not True)):
                 self._later.cancel_shutdown(self.WINDOW_CLOSED)
                 return None
             else:
                 return orig_func(self, *args, **kwargs)
         return stop_wrapper
 
+    @ComputeNodeStateChangeBase._finish_on_exception
     @_stop_if_window_closed
-    @ComputeNodeStateChangeBase._retry()
+    @RetryMixin._retry()
     def shutdown_node(self):
+        self._logger.info("Starting shutdown")
         if not self._cloud.destroy_node(self.cloud_node):
             if self._cloud.broken(self.cloud_node):
                 self._later.cancel_shutdown(self.NODE_BROKEN)
             else:
                 # Force a retry.
                 raise cloud_types.LibcloudError("destroy_node failed")
-        self._logger.info("Cloud node %s shut down.", self.cloud_node.id)
+        self._logger.info("Shutdown success")
         arv_node = self._arvados_node()
         if arv_node is None:
             self._finished(success_flag=True)
         else:
             self._later.clean_arvados_node(arv_node)
 
-    @ComputeNodeStateChangeBase._retry(config.ARVADOS_ERRORS)
+    @ComputeNodeStateChangeBase._finish_on_exception
+    @RetryMixin._retry(config.ARVADOS_ERRORS)
     def clean_arvados_node(self, arvados_node):
         self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
         self._finished(success_flag=True)
@@ -314,7 +305,6 @@ class ComputeNodeMonitorActor(config.actor_class):
     ):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.proxy()
-        self._logger = logging.getLogger('arvnodeman.computenode')
         self._last_log = None
         self._shutdowns = shutdown_timer
         self._cloud_node_fqdn = cloud_fqdn_func
@@ -332,6 +322,13 @@ class ComputeNodeMonitorActor(config.actor_class):
         self.last_shutdown_opening = None
         self._later.consider_shutdown()
 
+    def _set_logger(self):
+        self._logger = logging.getLogger("%s.%s.%s" % (self.__class__.__name__, self.actor_urn[33:], self.cloud_node.name))
+
+    def on_start(self):
+        self._set_logger()
+        self._timer.schedule(self.cloud_node_start_time + self.boot_fail_after, self._later.consider_shutdown)
+
     def subscribe(self, subscriber):
         self.subscribers.add(subscriber)
 
@@ -358,37 +355,49 @@ class ComputeNodeMonitorActor(config.actor_class):
         return result
 
     def shutdown_eligible(self):
+        """Return True if eligible for shutdown, or a string explaining why the node
+        is not eligible for shutdown."""
+
         if not self._shutdowns.window_open():
-            return False
+            return "shutdown window is not open."
         if self.arvados_node is None:
             # Node is unpaired.
             # If it hasn't pinged Arvados after boot_fail seconds, shut it down
-            return not timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after)
+            if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+                return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
+            else:
+                return True
         missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
         if missing and self._cloud.broken(self.cloud_node):
             # Node is paired, but Arvados says it is missing and the cloud says the node
             # is in an error state, so shut it down.
             return True
         if missing is None and self._cloud.broken(self.cloud_node):
-            self._logger.warning(
-                "cloud reports broken node, but paired node %s never pinged "
-                "(bug?) -- skipped check for node_stale_after",
+            self._logger.info(
+                "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
+                "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
                 self.arvados_node['uuid'])
-        return self.in_state('idle')
+        if self.in_state('idle'):
+            return True
+        else:
+            return "node is not idle."
 
     def consider_shutdown(self):
-        next_opening = self._shutdowns.next_opening()
-        if self.shutdown_eligible():
-            self._debug("Node %s suggesting shutdown.", self.cloud_node.id)
-            _notify_subscribers(self._later, self.subscribers)
-        elif self._shutdowns.window_open():
-            self._debug("Node %s shutdown window open but node busy.",
-                        self.cloud_node.id)
-        elif self.last_shutdown_opening != next_opening:
-            self._debug("Node %s shutdown window closed.  Next at %s.",
-                        self.cloud_node.id, time.ctime(next_opening))
-            self._timer.schedule(next_opening, self._later.consider_shutdown)
-            self.last_shutdown_opening = next_opening
+        try:
+            next_opening = self._shutdowns.next_opening()
+            eligible = self.shutdown_eligible()
+            if eligible is True:
+                self._debug("Suggesting shutdown.")
+                _notify_subscribers(self._later, self.subscribers)
+            elif self._shutdowns.window_open():
+                self._debug("Cannot shut down because %s", eligible)
+            elif self.last_shutdown_opening != next_opening:
+                self._debug("Shutdown window closed.  Next at %s.",
+                            time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+                self._timer.schedule(next_opening, self._later.consider_shutdown)
+                self.last_shutdown_opening = next_opening
+        except Exception:
+            self._logger.exception("Unexpected exception")
 
     def offer_arvados_pair(self, arvados_node):
         first_ping_s = arvados_node.get('first_ping_at')
index 919b57f42c8973bab91de742d1fee48598296f35..4d70436801564e9a35675e95c18f33fddc125806 100644 (file)
@@ -8,6 +8,7 @@ import time
 from . import \
     ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
 from . import ComputeNodeShutdownActor as ShutdownActorBase
+from .. import RetryMixin
 
 class ComputeNodeShutdownActor(ShutdownActorBase):
     SLURM_END_STATES = frozenset(['down\n', 'down*\n',
@@ -21,6 +22,7 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
             self._nodename = None
             return super(ComputeNodeShutdownActor, self).on_start()
         else:
+            self._set_logger()
             self._nodename = arv_node['hostname']
             self._logger.info("Draining SLURM node %s", self._nodename)
             self._later.issue_slurm_drain()
@@ -42,7 +44,7 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
     # of the excessive memory usage that result in the "Cannot allocate memory"
     # error are still being investigated.
 
-    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     def cancel_shutdown(self, reason):
         if self._nodename:
             if self._get_slurm_state() in self.SLURM_DRAIN_STATES:
@@ -54,14 +56,14 @@ class ComputeNodeShutdownActor(ShutdownActorBase):
                 pass
         return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
 
-    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     @ShutdownActorBase._stop_if_window_closed
     def issue_slurm_drain(self):
         self._set_node_state('DRAIN', 'Reason=Node Manager shutdown')
         self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
         self._later.await_slurm_drain()
 
-    @ShutdownActorBase._retry((subprocess.CalledProcessError, OSError))
+    @RetryMixin._retry((subprocess.CalledProcessError, OSError))
     @ShutdownActorBase._stop_if_window_closed
     def await_slurm_drain(self):
         output = self._get_slurm_state()
index 66ffb8099cb82d674381ac587d41454d6457e1f4..c98c95af66d89b257be6f7e79d4fd4371138281c 100644 (file)
@@ -2,14 +2,16 @@
 
 from __future__ import absolute_import, print_function
 
+import logging
 from operator import attrgetter
 
 import libcloud.common.types as cloud_types
 from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
 
 from ...config import NETWORK_ERRORS
+from .. import RetryMixin
 
-class BaseComputeNodeDriver(object):
+class BaseComputeNodeDriver(RetryMixin):
     """Abstract base class for compute node drivers.
 
     libcloud drivers abstract away many of the differences between
@@ -24,7 +26,16 @@ class BaseComputeNodeDriver(object):
     """
     CLOUD_ERRORS = NETWORK_ERRORS + (cloud_types.LibcloudError,)
 
-    def __init__(self, auth_kwargs, list_kwargs, create_kwargs, driver_class):
+    @RetryMixin._retry()
+    def _create_driver(self, driver_class, **auth_kwargs):
+        return driver_class(**auth_kwargs)
+
+    @RetryMixin._retry()
+    def _set_sizes(self):
+        self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+
+    def __init__(self, auth_kwargs, list_kwargs, create_kwargs,
+                 driver_class, retry_wait=1, max_retry_wait=180):
         """Base initializer for compute node drivers.
 
         Arguments:
@@ -37,7 +48,12 @@ class BaseComputeNodeDriver(object):
           libcloud driver's create_node method to create a new compute node.
         * driver_class: The class of a libcloud driver to use.
         """
-        self.real = driver_class(**auth_kwargs)
+
+        super(BaseComputeNodeDriver, self).__init__(retry_wait, max_retry_wait,
+                                         logging.getLogger(self.__class__.__name__),
+                                         type(self),
+                                         None)
+        self.real = self._create_driver(driver_class, **auth_kwargs)
         self.list_kwargs = list_kwargs
         self.create_kwargs = create_kwargs
         # Transform entries in create_kwargs.  For each key K, if this class
@@ -53,7 +69,7 @@ class BaseComputeNodeDriver(object):
                 if new_pair is not None:
                     self.create_kwargs[new_pair[0]] = new_pair[1]
 
-        self.sizes = {sz.id: sz for sz in self.real.list_sizes()}
+        self._set_sizes()
 
     def _init_ping_host(self, ping_host):
         self.ping_host = ping_host
@@ -115,11 +131,35 @@ class BaseComputeNodeDriver(object):
             self.ping_host, arvados_node['uuid'],
             arvados_node['info']['ping_secret'])
 
+    def find_node(self, name):
+        node = [n for n in self.list_nodes() if n.name == name]
+        if node:
+            return node[0]
+        else:
+            return None
+
     def create_node(self, size, arvados_node):
-        kwargs = self.create_kwargs.copy()
-        kwargs.update(self.arvados_create_kwargs(size, arvados_node))
-        kwargs['size'] = size
-        return self.real.create_node(**kwargs)
+        try:
+            kwargs = self.create_kwargs.copy()
+            kwargs.update(self.arvados_create_kwargs(size, arvados_node))
+            kwargs['size'] = size
+            return self.real.create_node(**kwargs)
+        except self.CLOUD_ERRORS:
+            # Workaround for bug #6702: sometimes the create node request
+            # succeeds but times out and raises an exception instead of
+            # returning a result.  If this happens, we get stuck in a retry
+            # loop forever because subsequent create_node attempts will fail
+            # due to node name collision.  So check if the node we intended to
+            # create shows up in the cloud node list and return it if found.
+            try:
+                node = self.find_node(kwargs['name'])
+                if node:
+                    return node
+            except:
+                # Ignore possible exception from find_node in favor of
+                # re-raising the original create_node exception.
+                pass
+            raise
 
     def post_create_node(self, cloud_node):
         # ComputeNodeSetupActor calls this method after the cloud node is
index 991a2983c7217f1a29368293513587d117d01d59..d89c48e270bcc119638c70fc3d5f2928fbe1f8e3 100644 (file)
@@ -75,6 +75,9 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         self.real.ex_create_tags(cloud_node,
                                  {'Name': arvados_node_fqdn(arvados_node)})
 
+    def find_node(self, name):
+        raise NotImplementedError("ec2.ComputeNodeDriver.find_node")
+
     def list_nodes(self):
         # Need to populate Node.size
         nodes = super(ComputeNodeDriver, self).list_nodes()
index be3f3f1c133531bf3aececc99b2e998ffca838e1..c5bf0b8cda42d211adcfbb61ffb3d73f460a7830 100644 (file)
@@ -101,6 +101,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
                 })
         return result
 
+
     def list_nodes(self):
         # The GCE libcloud driver only supports filtering node lists by zone.
         # Do our own filtering based on tag list.
index 243d3bfaa4cd13fbf9a540affc20ae6e562979a9..0993c479625f23a209c90412fa4426ff2c406d23 100644 (file)
@@ -121,7 +121,6 @@ class NodeManagerDaemonActor(actor_class):
         self._new_arvados = arvados_factory
         self._new_cloud = cloud_factory
         self._cloud_driver = self._new_cloud()
-        self._logger = logging.getLogger('arvnodeman.daemon')
         self._later = self.actor_ref.proxy()
         self.shutdown_windows = shutdown_windows
         self.server_calculator = server_calculator
@@ -143,14 +142,18 @@ class NodeManagerDaemonActor(actor_class):
         self.booting = {}       # Actor IDs to ComputeNodeSetupActors
         self.booted = {}        # Cloud node IDs to _ComputeNodeRecords
         self.shutdowns = {}     # Cloud node IDs to ComputeNodeShutdownActors
-        self._logger.debug("Daemon initialized")
+        self.sizes_booting_shutdown = {} # Actor IDs or Cloud node IDs to node size
+
+    def on_start(self):
+        self._logger = logging.getLogger("%s.%s" % (self.__class__.__name__, self.actor_urn[33:]))
+        self._logger.debug("Daemon started")
 
     def _update_poll_time(self, poll_key):
         self.last_polls[poll_key] = time.time()
 
     def _pair_nodes(self, node_record, arvados_node):
-        self._logger.info("Cloud node %s has associated with Arvados node %s",
-                          node_record.cloud_node.id, arvados_node['uuid'])
+        self._logger.info("Cloud node %s is now paired with Arvados node %s",
+                          node_record.cloud_node.name, arvados_node['uuid'])
         self._arvados_nodes_actor.subscribe_to(
             arvados_node['uuid'], node_record.actor.update_arvados_node)
         node_record.arvados_node = arvados_node
@@ -198,6 +201,7 @@ class NodeManagerDaemonActor(actor_class):
                 except pykka.ActorDeadError:
                     pass
                 del self.shutdowns[key]
+                del self.sizes_booting_shutdown[key]
             record.actor.stop()
             record.cloud_node = None
 
@@ -214,21 +218,33 @@ class NodeManagerDaemonActor(actor_class):
                     self._pair_nodes(cloud_rec, arv_node)
                     break
 
-    def _nodes_up(self, size):
-        up = 0
-        up += sum(1
-                  for c in self.booting.itervalues()
-                  if size is None or c.cloud_size.get().id == size.id)
-        up += sum(1
-                  for i in (self.booted, self.cloud_nodes.nodes)
-                  for c in i.itervalues()
+    def _nodes_booting(self, size):
+        s = sum(1
+                for c in self.booting.iterkeys()
+                if size is None or self.sizes_booting_shutdown[c].id == size.id)
+        s += sum(1
+                 for c in self.booted.itervalues()
+                 if size is None or c.cloud_node.size.id == size.id)
+        return s
+
+    def _nodes_unpaired(self, size):
+        return sum(1
+                   for c in self.cloud_nodes.unpaired()
+                   if size is None or c.cloud_node.size.id == size.id)
+
+    def _nodes_booted(self, size):
+        return sum(1
+                  for c in self.cloud_nodes.nodes.itervalues()
                   if size is None or c.cloud_node.size.id == size.id)
+
+    def _nodes_up(self, size):
+        up = self._nodes_booting(size) + self._nodes_booted(size)
         return up
 
     def _total_price(self):
         cost = 0
-        cost += sum(self.server_calculator.find_size(c.cloud_size.get().id).price
-                  for c in self.booting.itervalues())
+        cost += sum(self.server_calculator.find_size(self.sizes_booting_shutdown[c].id).price
+                  for c in self.booting.iterkeys())
         cost += sum(self.server_calculator.find_size(c.cloud_node.size.id).price
                     for i in (self.booted, self.cloud_nodes.nodes)
                     for c in i.itervalues())
@@ -253,9 +269,9 @@ class NodeManagerDaemonActor(actor_class):
 
     def _size_shutdowns(self, size):
         sh = 0
-        for c in self.shutdowns.itervalues():
+        for c in self.shutdowns.iterkeys():
             try:
-                if c.cloud_node.get().size.id == size.id:
+                if self.sizes_booting_shutdown[c].id == size.id:
                     sh += 1
             except pykka.ActorDeadError:
                 pass
@@ -272,11 +288,18 @@ class NodeManagerDaemonActor(actor_class):
         elif under_min > 0 and size.id == self.min_cloud_size.id:
             return under_min
 
-        up_count = self._nodes_up(size) - (self._size_shutdowns(size) +
-                                           self._nodes_busy(size) +
-                                           self._nodes_missing(size))
+        booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
+        shutdown_count = self._size_shutdowns(size)
+        busy_count = self._nodes_busy(size)
+        up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
 
-        self._logger.debug("%s: idle nodes %i, wishlist size %i", size.name, up_count, self._size_wishlist(size))
+        self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
+                          self._size_wishlist(size),
+                          up_count + busy_count,
+                          booting_count,
+                          up_count - booting_count,
+                          busy_count,
+                          shutdown_count)
 
         wanted = self._size_wishlist(size) - up_count
         if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
@@ -298,11 +321,14 @@ class NodeManagerDaemonActor(actor_class):
         self._update_poll_time('server_wishlist')
         self.last_wishlist = wishlist
         for size in reversed(self.server_calculator.cloud_sizes):
-            nodes_wanted = self._nodes_wanted(size)
-            if nodes_wanted > 0:
-                self._later.start_node(size)
-            elif (nodes_wanted < 0) and self.booting:
-                self._later.stop_booting_node(size)
+            try:
+                nodes_wanted = self._nodes_wanted(size)
+                if nodes_wanted > 0:
+                    self._later.start_node(size)
+                elif (nodes_wanted < 0) and self.booting:
+                    self._later.stop_booting_node(size)
+            except Exception as e:
+                self._logger.exception("while calculating nodes wanted for size %s", size)
 
     def _check_poll_freshness(orig_func):
         """Decorator to inhibit a method when poll information is stale.
@@ -327,7 +353,7 @@ class NodeManagerDaemonActor(actor_class):
         if nodes_wanted < 1:
             return None
         arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
-        self._logger.info("Want %s more nodes.  Booting a %s node.",
+        self._logger.info("Want %i more %s nodes.  Booting a node.",
                           nodes_wanted, cloud_size.name)
         new_setup = self._node_setup.start(
             timer_actor=self._timer,
@@ -336,6 +362,8 @@ class NodeManagerDaemonActor(actor_class):
             cloud_client=self._new_cloud(),
             cloud_size=cloud_size).proxy()
         self.booting[new_setup.actor_ref.actor_urn] = new_setup
+        self.sizes_booting_shutdown[new_setup.actor_ref.actor_urn] = cloud_size
+
         if arvados_node is not None:
             self.arvados_nodes[arvados_node['uuid']].assignment_time = (
                 time.time())
@@ -349,13 +377,16 @@ class NodeManagerDaemonActor(actor_class):
     def node_up(self, setup_proxy):
         cloud_node = setup_proxy.cloud_node.get()
         del self.booting[setup_proxy.actor_ref.actor_urn]
+        del self.sizes_booting_shutdown[setup_proxy.actor_ref.actor_urn]
+
         setup_proxy.stop()
-        record = self.cloud_nodes.get(cloud_node.id)
-        if record is None:
-            record = self._new_node(cloud_node)
-            self.booted[cloud_node.id] = record
-        self._timer.schedule(time.time() + self.boot_fail_after,
-                             self._later.shutdown_unpaired_node, cloud_node.id)
+        if cloud_node is not None:
+            record = self.cloud_nodes.get(cloud_node.id)
+            if record is None:
+                record = self._new_node(cloud_node)
+                self.booted[cloud_node.id] = record
+            self._timer.schedule(time.time() + self.boot_fail_after,
+                                 self._later.shutdown_unpaired_node, cloud_node.id)
 
     @_check_poll_freshness
     def stop_booting_node(self, size):
@@ -365,12 +396,15 @@ class NodeManagerDaemonActor(actor_class):
         for key, node in self.booting.iteritems():
             if node.cloud_size.get().id == size.id and node.stop_if_no_cloud_node().get():
                 del self.booting[key]
+                del self.sizes_booting_shutdown[key]
+
                 if nodes_excess > 1:
                     self._later.stop_booting_node(size)
                 break
 
     def _begin_node_shutdown(self, node_actor, cancellable):
-        cloud_node_id = node_actor.cloud_node.get().id
+        cloud_node_obj = node_actor.cloud_node.get()
+        cloud_node_id = cloud_node_obj.id
         if cloud_node_id in self.shutdowns:
             return None
         shutdown = self._node_shutdown.start(
@@ -378,6 +412,7 @@ class NodeManagerDaemonActor(actor_class):
             arvados_client=self._new_arvados(),
             node_monitor=node_actor.actor_ref, cancellable=cancellable).proxy()
         self.shutdowns[cloud_node_id] = shutdown
+        self.sizes_booting_shutdown[cloud_node_id] = cloud_node_obj.size
         shutdown.subscribe(self._later.node_finished_shutdown)
 
     @_check_poll_freshness
@@ -404,9 +439,11 @@ class NodeManagerDaemonActor(actor_class):
             if cancel_reason == self._node_shutdown.NODE_BROKEN:
                 self.cloud_nodes.blacklist(cloud_node_id)
             del self.shutdowns[cloud_node_id]
+            del self.sizes_booting_shutdown[cloud_node_id]
         elif cloud_node_id in self.booted:
             self.booted.pop(cloud_node_id).actor.stop()
             del self.shutdowns[cloud_node_id]
+            del self.sizes_booting_shutdown[cloud_node_id]
 
     def shutdown(self):
         self._logger.info("Shutting down after signal.")
index e0f0a5b1ec26045745fe479f76b8614eb700875d..87cf738311730feed045d23c63af6481c0731e06 100644 (file)
@@ -102,7 +102,6 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
     """
 
     CLIENT_ERRORS = ARVADOS_ERRORS
-    LOGGER_NAME = 'arvnodeman.jobqueue'
 
     def __init__(self, client, timer_actor, server_calc, *args, **kwargs):
         super(JobQueueMonitorActor, self).__init__(
@@ -114,6 +113,6 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
 
     def _got_response(self, queue):
         server_list = self._calculator.servers_for_queue(queue)
-        self._logger.debug("Sending server wishlist: %s",
+        self._logger.debug("Calculated wishlist: %s",
                            ', '.join(s.name for s in server_list) or "(empty)")
         return super(JobQueueMonitorActor, self)._got_response(server_list)
index e8c2fe661e5203469a1ee87341159aeb6cdc1aec..c8b3d19485b2c9cb8d6ee6e4353ddeb2c0b9c560 100644 (file)
@@ -103,32 +103,36 @@ def main(args=None):
     for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
         signal.signal(sigcode, shutdown_signal)
 
-    setup_logging(config.get('Logging', 'file'), **config.log_levels())
-    node_setup, node_shutdown, node_update, node_monitor = \
-        config.dispatch_classes()
-    server_calculator = build_server_calculator(config)
-    timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
-        launch_pollers(config, server_calculator)
-    cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
-    node_daemon = NodeManagerDaemonActor.start(
-        job_queue_poller, arvados_node_poller, cloud_node_poller,
-        cloud_node_updater, timer,
-        config.new_arvados_client, config.new_cloud_client,
-        config.shutdown_windows(),
-        server_calculator,
-        config.getint('Daemon', 'min_nodes'),
-        config.getint('Daemon', 'max_nodes'),
-        config.getint('Daemon', 'poll_stale_after'),
-        config.getint('Daemon', 'boot_fail_after'),
-        config.getint('Daemon', 'node_stale_after'),
-        node_setup, node_shutdown, node_monitor,
-        max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
-
-    signal.pause()
-    daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
-    while not daemon_stopped():
-        time.sleep(1)
-    pykka.ActorRegistry.stop_all()
+    try:
+        setup_logging(config.get('Logging', 'file'), **config.log_levels())
+        node_setup, node_shutdown, node_update, node_monitor = \
+            config.dispatch_classes()
+        server_calculator = build_server_calculator(config)
+        timer, cloud_node_poller, arvados_node_poller, job_queue_poller = \
+            launch_pollers(config, server_calculator)
+        cloud_node_updater = node_update.start(config.new_cloud_client).proxy()
+        node_daemon = NodeManagerDaemonActor.start(
+            job_queue_poller, arvados_node_poller, cloud_node_poller,
+            cloud_node_updater, timer,
+            config.new_arvados_client, config.new_cloud_client,
+            config.shutdown_windows(),
+            server_calculator,
+            config.getint('Daemon', 'min_nodes'),
+            config.getint('Daemon', 'max_nodes'),
+            config.getint('Daemon', 'poll_stale_after'),
+            config.getint('Daemon', 'boot_fail_after'),
+            config.getint('Daemon', 'node_stale_after'),
+            node_setup, node_shutdown, node_monitor,
+            max_total_price=config.getfloat('Daemon', 'max_total_price')).proxy()
+
+        signal.pause()
+        daemon_stopped = node_daemon.actor_ref.actor_stopped.is_set
+        while not daemon_stopped():
+            time.sleep(1)
+    except Exception:
+        logging.exception("Uncaught exception during setup")
+    finally:
+        pykka.ActorRegistry.stop_all()
 
 
 if __name__ == '__main__':
index 83dd93f077bfb504a8f41b6f0addd93c717ac7da..f1a661e559247e263a10acdd3eea1271f282bc58 100644 (file)
@@ -11,7 +11,6 @@ class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
     This actor regularly polls the list of Arvados node records, and
     sends it to subscribers.
     """
-    LOGGER_NAME = 'arvnodeman.arvados_nodes'
 
     def is_common_error(self, exception):
         return isinstance(exception, config.ARVADOS_ERRORS)
@@ -29,7 +28,6 @@ class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
     This actor regularly polls the cloud to get a list of running compute
     nodes, and sends it to subscribers.
     """
-    LOGGER_NAME = 'arvnodeman.cloud_nodes'
 
     def is_common_error(self, exception):
         return self._client.is_cloud_exception(exception)
@@ -38,4 +36,5 @@ class CloudNodeListMonitorActor(clientactor.RemotePollLoopActor):
         return node.id
 
     def _send_request(self):
-        return self._client.list_nodes()
+        n = self._client.list_nodes()
+        return n
index 57a0d32d062b87276af90c5945b0d60ceaba8e88..cee9c85a221ad3d33ac914ed02c1bf33ea00a3f1 100644 (file)
@@ -47,7 +47,8 @@ class RemotePollLoopActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     def test_late_subscribers_get_responses(self):
         self.build_monitor(['pre_late_test', 'late_test'])
-        self.monitor.subscribe(lambda response: None).get(self.TIMEOUT)
+        mock_subscriber = mock.Mock(name='mock_subscriber')
+        self.monitor.subscribe(mock_subscriber).get(self.TIMEOUT)
         self.monitor.subscribe(self.subscriber)
         self.monitor.poll().get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
@@ -146,4 +147,3 @@ class RemotePollLoopActorWithKeysTestCase(testutil.RemotePollLoopActorTestMixin,
 
 if __name__ == '__main__':
     unittest.main()
-
index ecf83c693a5a9da5121b4c26bbb7318e197298fe..9c8af19ea315df129f0c0365862cd8c5fefcab52 100644 (file)
@@ -9,6 +9,7 @@ import arvados.errors as arverror
 import httplib2
 import mock
 import pykka
+import threading
 
 import arvnodeman.computenode.dispatch as dispatch
 from . import testutil
@@ -44,8 +45,11 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
 
     def test_creation_without_arvados_node(self):
         self.make_actor()
+        finished = threading.Event()
+        self.setup_actor.subscribe(lambda _: finished.set())
         self.assertEqual(self.arvados_effect[-1],
                          self.setup_actor.arvados_node.get(self.TIMEOUT))
+        assert(finished.wait(self.TIMEOUT))
         self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
         self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
         self.assert_node_properties_updated()
@@ -55,8 +59,11 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
     def test_creation_with_arvados_node(self):
         self.make_mocks(arvados_effect=[testutil.arvados_node_mock()]*2)
         self.make_actor(testutil.arvados_node_mock())
+        finished = threading.Event()
+        self.setup_actor.subscribe(lambda _: finished.set())
         self.assertEqual(self.arvados_effect[-1],
                          self.setup_actor.arvados_node.get(self.TIMEOUT))
+        assert(finished.wait(self.TIMEOUT))
         self.assert_node_properties_updated()
         self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
         self.assertEqual(self.cloud_client.create_node(),
@@ -339,7 +346,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
     def test_no_shutdown_booting(self):
         self.make_actor()
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is still booting"))
 
     def test_shutdown_without_arvados_node(self):
         self.make_actor(start_time=0)
@@ -352,7 +359,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
                                               last_ping_at='1970-01-01T01:02:03.04050607Z')
         self.make_actor(10, arv_node)
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_no_shutdown_running_broken(self):
         arv_node = testutil.arvados_node_mock(12, job_uuid=None,
@@ -360,7 +367,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.make_actor(12, arv_node)
         self.shutdowns._set_state(True, 600)
         self.cloud_client.broken.return_value = True
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_shutdown_missing_broken(self):
         arv_node = testutil.arvados_node_mock(11, job_uuid=None,
@@ -373,23 +380,23 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
 
     def test_no_shutdown_when_window_closed(self):
         self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("shutdown window is not open."))
 
     def test_no_shutdown_when_node_running_job(self):
         self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_no_shutdown_when_node_state_unknown(self):
         self.make_actor(5, testutil.arvados_node_mock(
             5, crunch_worker_state=None))
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_no_shutdown_when_node_state_stale(self):
         self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
         self.shutdowns._set_state(True, 600)
-        self.assertFalse(self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+        self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
 
     def test_arvados_node_match(self):
         self.make_actor(2)
index 3ef152ef874bb17c7a368c68827e665851c1e2f8..5721abc5f87efeaf029c2eb476bb0fcdf6a14f2a 100644 (file)
@@ -100,3 +100,13 @@ echo compute-000000000000063-zzzzz > /var/tmp/arv-node-data/meta-data/instance-i
 echo z1.test > /var/tmp/arv-node-data/meta-data/instance-type
 """,
                          driver.arvados_create_kwargs(testutil.MockSize(1), arv_node)['ex_customdata'])
+
+    def test_create_raises_but_actually_succeeded(self):
+        arv_node = testutil.arvados_node_mock(1, hostname=None)
+        driver = self.new_driver(create_kwargs={"tag_arvados-class": "dynamic-compute"})
+        nodelist = [testutil.cloud_node_mock(1, tags={"arvados-class": "dynamic-compute"})]
+        nodelist[0].name = 'compute-000000000000001-zzzzz'
+        self.driver_mock().list_nodes.return_value = nodelist
+        self.driver_mock().create_node.side_effect = IOError
+        n = driver.create_node(testutil.MockSize(1), arv_node)
+        self.assertEqual('compute-000000000000001-zzzzz', n.name)
index 41cb1aac8623ee8617f0464623aa4ce3fd3a7d28..e8b2fa36c582876359fa6e667f80e9a7cb1f3013 100644 (file)
@@ -48,6 +48,16 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         metadata = self.driver_mock().create_node.call_args[1]['ex_metadata']
         self.assertIn('ping_secret=ssshh', metadata.get('arv-ping-url'))
 
+    def test_create_raises_but_actually_succeeded(self):
+        arv_node = testutil.arvados_node_mock(1, hostname=None)
+        driver = self.new_driver()
+        nodelist = [testutil.cloud_node_mock(1)]
+        nodelist[0].name = 'compute-000000000000001-zzzzz'
+        self.driver_mock().list_nodes.return_value = nodelist
+        self.driver_mock().create_node.side_effect = IOError
+        n = driver.create_node(testutil.MockSize(1), arv_node)
+        self.assertEqual('compute-000000000000001-zzzzz', n.name)
+
     def test_create_sets_default_hostname(self):
         driver = self.new_driver()
         driver.create_node(testutil.MockSize(1),
index 200919bfb819183dfa8b50b426c7d33d60d64db9..f41fa6cb1af57b6b1cb005a09bec95207ace0b14 100644 (file)
@@ -208,6 +208,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         mock_shutdown = self.node_shutdown.start(node_monitor=mock_node_monitor)
 
         self.daemon.shutdowns.get()[cloud_nodes[1].id] = mock_shutdown.proxy()
+        self.daemon.sizes_booting_shutdown.get()[cloud_nodes[1].id] = size
 
         self.assertEqual(2, self.alive_monitor_count())
         for mon_ref in self.monitor_list():
index e543c2891698c6f0626c3bf5f444cd1c0b49a86b..6cde766fa312f5b0e07ba53148a93844d26dbf47 100644 (file)
@@ -130,6 +130,14 @@ class DriverTestMixin(object):
     def driver_method_args(self, method_name):
         return getattr(self.driver_mock(), method_name).call_args
 
+    def test_driver_create_retry(self):
+        with mock.patch('time.sleep'):
+            driver_mock2 = mock.MagicMock(name='driver_mock2')
+            self.driver_mock.side_effect = (Exception("oops"), driver_mock2)
+            kwargs = {'user_id': 'foo'}
+            driver = self.new_driver(auth_kwargs=kwargs)
+            self.assertTrue(self.driver_mock.called)
+            self.assertIs(driver.real, driver_mock2)
 
 class RemotePollLoopActorTestMixin(ActorTestMixin):
     def build_monitor(self, *args, **kwargs):
index 4db8152d090ecd7b43f8be48fd2db363d03a2483..325ee59e854dfe36a83b406297d37047c7a90718 100644 (file)
@@ -1 +1,2 @@
 include agpl-3.0.txt
+include crunchstat_summary/chartjs.js
index 662d7835cc0a1878cb3fff7fa2ef468ccc1f8087..e16bd8e0a23776e7561787c6c4eca636ad2d5be6 100755 (executable)
@@ -4,9 +4,12 @@ from __future__ import print_function
 
 import crunchstat_summary.command
 import crunchstat_summary.summarizer
+import logging
 import sys
 
+logging.getLogger().addHandler(logging.StreamHandler())
+
 args = crunchstat_summary.command.ArgumentParser().parse_args(sys.argv[1:])
-s = crunchstat_summary.summarizer.Summarizer(args)
-s.run()
-print(s.report(), end='')
+cmd = crunchstat_summary.command.Command(args)
+cmd.run()
+print(cmd.report(), end='')
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..c10988db2c0609f836a2f8d210ac0ac7375d8fcd 100644 (file)
@@ -0,0 +1,4 @@
+import logging
+
+logger = logging.getLogger(__name__)
+logger.addHandler(logging.NullHandler())
diff --git a/tools/crunchstat-summary/crunchstat_summary/chartjs.js b/tools/crunchstat-summary/crunchstat_summary/chartjs.js
new file mode 100644 (file)
index 0000000..72ff7a4
--- /dev/null
@@ -0,0 +1,27 @@
+window.onload = function() {
+    var charts = {};
+    sections.forEach(function(section, section_idx) {
+        var h1 = document.createElement('h1');
+        h1.appendChild(document.createTextNode(section.label));
+        document.body.appendChild(h1);
+        section.charts.forEach(function(data, chart_idx) {
+            // Skip chart if every series has zero data points
+            if (0 == data.data.reduce(function(len, series) {
+                return len + series.dataPoints.length;
+            }, 0)) {
+                return;
+            }
+            var id = 'chart-'+section_idx+'-'+chart_idx;
+            var div = document.createElement('div');
+            div.setAttribute('id', id);
+            div.setAttribute('style', 'width: 100%; height: 150px');
+            document.body.appendChild(div);
+            charts[id] = new CanvasJS.Chart(id, data);
+            charts[id].render();
+        });
+    });
+
+    if (typeof window.debug === 'undefined')
+        window.debug = {};
+    window.debug.charts = charts;
+};
diff --git a/tools/crunchstat-summary/crunchstat_summary/chartjs.py b/tools/crunchstat-summary/crunchstat_summary/chartjs.py
new file mode 100644 (file)
index 0000000..03e45e6
--- /dev/null
@@ -0,0 +1,84 @@
+from __future__ import print_function
+
+import cgi
+import json
+import math
+import pkg_resources
+
+from crunchstat_summary import logger
+
+
+class ChartJS(object):
+    JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/canvasjs/1.7.0/canvasjs.min.js'
+
+    def __init__(self, label, summarizers):
+        self.label = label
+        self.summarizers = summarizers
+
+    def html(self):
+        return '''<!doctype html><html><head>
+        <title>{} stats</title>
+        <script type="text/javascript" src="{}"></script>
+        <script type="text/javascript">{}</script>
+        </head><body></body></html>
+        '''.format(cgi.escape(self.label), self.JSLIB, self.js())
+
+    def js(self):
+        return 'var sections = {};\n{}'.format(
+            json.dumps(self.sections()),
+            pkg_resources.resource_string('crunchstat_summary', 'chartjs.js'))
+
+    def sections(self):
+        return [
+            {
+                'label': s.long_label(),
+                'charts': self.charts(s.label, s.tasks),
+            }
+            for s in self.summarizers]
+
+    def _axisY(self, tasks, stat):
+        ymax = 1
+        for task in tasks.itervalues():
+            for pt in task.series[stat]:
+                ymax = max(ymax, pt[1])
+        ytick = math.exp((1+math.floor(math.log(ymax, 2)))*math.log(2))/4
+        return {
+            'gridColor': '#cccccc',
+            'gridThickness': 1,
+            'interval': ytick,
+            'minimum': 0,
+            'maximum': ymax,
+            'valueFormatString': "''",
+        }
+
+    def charts(self, label, tasks):
+        return [
+            {
+                'axisY': self._axisY(tasks=tasks, stat=stat),
+                'data': [
+                    {
+                        'type': 'line',
+                        'markerType': 'none',
+                        'dataPoints': self._datapoints(
+                            label=uuid, task=task, series=task.series[stat]),
+                    }
+                    for uuid, task in tasks.iteritems()
+                ],
+                'title': {
+                    'text': '{}: {} {}'.format(label, stat[0], stat[1]),
+                },
+                'zoomEnabled': True,
+            }
+            for stat in (('cpu', 'user+sys__rate'),
+                         ('mem', 'rss'),
+                         ('net:eth0', 'tx+rx__rate'),
+                         ('net:keep0', 'tx+rx__rate'))]
+
+    def _datapoints(self, label, task, series):
+        points = [
+            {'x': pt[0].total_seconds(), 'y': pt[1]}
+            for pt in series]
+        if len(points) > 0:
+            points[-1]['markerType'] = 'cross'
+            points[-1]['markerSize'] = 12
+        return points
index 8186e5d7579e91c208e714abaeff3d7192c3ab16..78638c60e840b48e2117af842fc472e02d109daa 100644 (file)
@@ -1,4 +1,9 @@
 import argparse
+import gzip
+import logging
+import sys
+
+from crunchstat_summary import logger, summarizer
 
 
 class ArgumentParser(argparse.ArgumentParser):
@@ -8,7 +13,50 @@ class ArgumentParser(argparse.ArgumentParser):
         src = self.add_mutually_exclusive_group()
         src.add_argument(
             '--job', type=str, metavar='UUID',
-            help='Look up the specified job and read its log data from Keep')
+            help='Look up the specified job and read its log data from Keep'
+            ' (or from the Arvados event log, if the job is still running)')
+        src.add_argument(
+            '--pipeline-instance', type=str, metavar='UUID',
+            help='Summarize each component of the given pipeline instance')
         src.add_argument(
             '--log-file', type=str,
             help='Read log data from a regular file')
+        self.add_argument(
+            '--skip-child-jobs', action='store_true',
+            help='Do not include stats from child jobs')
+        self.add_argument(
+            '--format', type=str, choices=('html', 'text'), default='text',
+            help='Report format')
+        self.add_argument(
+            '--verbose', '-v', action='count', default=0,
+            help='Log more information (once for progress, twice for debug)')
+
+
+class Command(object):
+    def __init__(self, args):
+        self.args = args
+        logger.setLevel(logging.WARNING - 10 * args.verbose)
+
+    def run(self):
+        kwargs = {
+            'skip_child_jobs': self.args.skip_child_jobs,
+        }
+        if self.args.pipeline_instance:
+            self.summer = summarizer.PipelineSummarizer(self.args.pipeline_instance, **kwargs)
+        elif self.args.job:
+            self.summer = summarizer.JobSummarizer(self.args.job, **kwargs)
+        elif self.args.log_file:
+            if self.args.log_file.endswith('.gz'):
+                fh = gzip.open(self.args.log_file)
+            else:
+                fh = open(self.args.log_file)
+            self.summer = summarizer.Summarizer(fh, **kwargs)
+        else:
+            self.summer = summarizer.Summarizer(sys.stdin, **kwargs)
+        return self.summer.run()
+
+    def report(self):
+        if self.args.format == 'html':
+            return self.summer.html_report()
+        elif self.args.format == 'text':
+            return self.summer.text_report()
diff --git a/tools/crunchstat-summary/crunchstat_summary/reader.py b/tools/crunchstat-summary/crunchstat_summary/reader.py
new file mode 100644 (file)
index 0000000..2b6ebce
--- /dev/null
@@ -0,0 +1,80 @@
+from __future__ import print_function
+
+import arvados
+import Queue
+import threading
+
+from crunchstat_summary import logger
+
+
+class CollectionReader(object):
+    def __init__(self, collection_id):
+        logger.debug('load collection %s', collection_id)
+        collection = arvados.collection.CollectionReader(collection_id)
+        filenames = [filename for filename in collection]
+        if len(filenames) != 1:
+            raise ValueError(
+                "collection {} has {} files; need exactly one".format(
+                    collection_id, len(filenames)))
+        self._reader = collection.open(filenames[0])
+        self._label = "{}/{}".format(collection_id, filenames[0])
+
+    def __str__(self):
+        return self._label
+
+    def __iter__(self):
+        return iter(self._reader)
+
+
+class LiveLogReader(object):
+    EOF = None
+
+    def __init__(self, job_uuid):
+        logger.debug('load stderr events for job %s', job_uuid)
+        self.job_uuid = job_uuid
+
+    def __str__(self):
+        return self.job_uuid
+
+    def _get_all_pages(self):
+        got = 0
+        last_id = 0
+        filters = [
+            ['object_uuid', '=', self.job_uuid],
+            ['event_type', '=', 'stderr']]
+        try:
+            while True:
+                page = arvados.api().logs().index(
+                    limit=1000,
+                    order=['id asc'],
+                    filters=filters + [['id','>',str(last_id)]],
+                    select=['id', 'properties'],
+                ).execute(num_retries=2)
+                got += len(page['items'])
+                logger.debug(
+                    '%s: received %d of %d log events',
+                    self.job_uuid, got,
+                    got + page['items_available'] - len(page['items']))
+                for i in page['items']:
+                    for line in i['properties']['text'].split('\n'):
+                        self._queue.put(line+'\n')
+                    last_id = i['id']
+                if (len(page['items']) == 0 or
+                    len(page['items']) >= page['items_available']):
+                    break
+        finally:
+            self._queue.put(self.EOF)
+
+    def __iter__(self):
+        self._queue = Queue.Queue()
+        self._thread = threading.Thread(target=self._get_all_pages)
+        self._thread.daemon = True
+        self._thread.start()
+        return self
+
+    def next(self):
+        line = self._queue.get()
+        if line is self.EOF:
+            self._thread.join()
+            raise StopIteration
+        return line
index ac0964b30e990724ecb5c7b16fb1cbd940bce25d..f422501b10ff1858f9b636621aaaba4bad662d5b 100644 (file)
@@ -2,40 +2,129 @@ from __future__ import print_function
 
 import arvados
 import collections
+import crunchstat_summary.chartjs
+import crunchstat_summary.reader
+import datetime
 import functools
-import gzip
+import itertools
+import math
 import re
 import sys
+import threading
+
+from arvados.api import OrderedJsonModel
+from crunchstat_summary import logger
+
+# Recommend memory constraints that are this multiple of an integral
+# number of GiB. (Actual nodes tend to be sold in sizes like 8 GiB
+# that have amounts like 7.5 GiB according to the kernel.)
+AVAILABLE_RAM_RATIO = 0.95
+
+
+class Task(object):
+    def __init__(self):
+        self.starttime = None
+        self.series = collections.defaultdict(list)
 
 
 class Summarizer(object):
-    def __init__(self, args):
-        self.args = args
+    def __init__(self, logdata, label=None, skip_child_jobs=False):
+        self._logdata = logdata
+
+        self.label = label
+        self.starttime = None
+        self.finishtime = None
+        self._skip_child_jobs = skip_child_jobs
 
-    def run(self):
         # stats_max: {category: {stat: val}}
         self.stats_max = collections.defaultdict(
-            functools.partial(collections.defaultdict,
-                              lambda: float('-Inf')))
+            functools.partial(collections.defaultdict, lambda: 0))
         # task_stats: {task_id: {category: {stat: val}}}
         self.task_stats = collections.defaultdict(
             functools.partial(collections.defaultdict, dict))
-        for line in self._logdata():
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) success in (?P<elapsed>\d+) seconds', line)
+
+        self.seq_to_uuid = {}
+        self.tasks = collections.defaultdict(Task)
+
+        # We won't bother recommending new runtime constraints if the
+        # constraints given when running the job are known to us and
+        # are already suitable.  If applicable, the subclass
+        # constructor will overwrite this with something useful.
+        self.existing_constraints = {}
+
+        logger.debug("%s: logdata %s", self.label, logdata)
+
+    def run(self):
+        logger.debug("%s: parsing logdata %s", self.label, self._logdata)
+        for line in self._logdata:
+            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) job_task (?P<task_uuid>\S+)$', line)
             if m:
-                task_id = m.group('seq')
+                seq = int(m.group('seq'))
+                uuid = m.group('task_uuid')
+                self.seq_to_uuid[seq] = uuid
+                logger.debug('%s: seq %d is task %s', self.label, seq, uuid)
+                continue
+
+            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) (success in|failure \(#., permanent\) after) (?P<elapsed>\d+) seconds', line)
+            if m:
+                task_id = self.seq_to_uuid[int(m.group('seq'))]
                 elapsed = int(m.group('elapsed'))
                 self.task_stats[task_id]['time'] = {'elapsed': elapsed}
                 if elapsed > self.stats_max['time']['elapsed']:
                     self.stats_max['time']['elapsed'] = elapsed
                 continue
-            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
+
+            m = re.search(r'^\S+ \S+ \d+ (?P<seq>\d+) stderr Queued job (?P<uuid>\S+)$', line)
+            if m:
+                uuid = m.group('uuid')
+                if self._skip_child_jobs:
+                    logger.warning('%s: omitting stats from child job %s'
+                                   ' because --skip-child-jobs flag is on',
+                                   self.label, uuid)
+                    continue
+                logger.debug('%s: follow %s', self.label, uuid)
+                child_summarizer = JobSummarizer(uuid)
+                child_summarizer.stats_max = self.stats_max
+                child_summarizer.task_stats = self.task_stats
+                child_summarizer.tasks = self.tasks
+                child_summarizer.starttime = self.starttime
+                child_summarizer.run()
+                logger.debug('%s: done %s', self.label, uuid)
+                continue
+
+            m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n', line)
             if not m:
                 continue
+
+            if self.label is None:
+                self.label = m.group('job_uuid')
+                logger.debug('%s: using job uuid as label', self.label)
             if m.group('category').endswith(':'):
-                # "notice:" etc.
+                # "stderr crunchstat: notice: ..."
+                continue
+            elif m.group('category') in ('error', 'caught'):
+                continue
+            elif m.group('category') == 'read':
+                # "stderr crunchstat: read /proc/1234/net/dev: ..."
+                # (crunchstat formatting fixed, but old logs still say this)
                 continue
-            task_id = m.group('seq')
+            task_id = self.seq_to_uuid[int(m.group('seq'))]
+            task = self.tasks[task_id]
+
+            # Use the first and last crunchstat timestamps as
+            # approximations of starttime and finishtime.
+            timestamp = datetime.datetime.strptime(
+                m.group('timestamp'), '%Y-%m-%d_%H:%M:%S')
+            if not task.starttime:
+                task.starttime = timestamp
+                logger.debug('%s: task %s starttime %s',
+                             self.label, task_id, timestamp)
+            task.finishtime = timestamp
+
+            if not self.starttime:
+                self.starttime = timestamp
+            self.finishtime = timestamp
+
             this_interval_s = None
             for group in ['current', 'interval']:
                 if not m.group(group):
@@ -44,10 +133,15 @@ class Summarizer(object):
                 words = m.group(group).split(' ')
                 stats = {}
                 for val, stat in zip(words[::2], words[1::2]):
-                    if '.' in val:
-                        stats[stat] = float(val)
-                    else:
-                        stats[stat] = int(val)
+                    try:
+                        if '.' in val:
+                            stats[stat] = float(val)
+                        else:
+                            stats[stat] = int(val)
+                    except ValueError as e:
+                        raise ValueError(
+                            'Error parsing {} stat in "{}": {!r}'.format(
+                                stat, line, e))
                 if 'user' in stats or 'sys' in stats:
                     stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
                 if 'tx' in stats or 'rx' in stats:
@@ -58,23 +152,26 @@ class Summarizer(object):
                             this_interval_s = val
                             continue
                         elif not (this_interval_s > 0):
-                            print("BUG? interval stat given with duration {!r}".
-                                  format(this_interval_s),
-                                  file=sys.stderr)
+                            logger.error(
+                                "BUG? interval stat given with duration {!r}".
+                                format(this_interval_s))
                             continue
                         else:
                             stat = stat + '__rate'
                             val = val / this_interval_s
+                            if stat in ['user+sys__rate', 'tx+rx__rate']:
+                                task.series[category, stat].append(
+                                    (timestamp - self.starttime, val))
                     else:
+                        if stat in ['rss']:
+                            task.series[category, stat].append(
+                                (timestamp - self.starttime, val))
                         self.task_stats[task_id][category][stat] = val
                     if val > self.stats_max[category][stat]:
                         self.stats_max[category][stat] = val
+        logger.debug('%s: done parsing', self.label)
 
-    def report(self):
-        return "\n".join(self._report_gen()) + "\n"
-
-    def _report_gen(self):
-        job_tot = collections.defaultdict(
+        self.job_tot = collections.defaultdict(
             functools.partial(collections.defaultdict, int))
         for task_id, task_stat in self.task_stats.iteritems():
             for category, stat_last in task_stat.iteritems():
@@ -82,7 +179,34 @@ class Summarizer(object):
                     if stat in ['cpus', 'cache', 'swap', 'rss']:
                         # meaningless stats like 16 cpu cores x 5 tasks = 80
                         continue
-                    job_tot[category][stat] += val
+                    self.job_tot[category][stat] += val
+        logger.debug('%s: done totals', self.label)
+
+    def long_label(self):
+        label = self.label
+        if self.finishtime:
+            label += ' -- elapsed time '
+            s = (self.finishtime - self.starttime).total_seconds()
+            if s > 86400:
+                label += '{}d'.format(int(s/86400))
+            if s > 3600:
+                label += '{}h'.format(int(s/3600) % 24)
+            if s > 60:
+                label += '{}m'.format(int(s/60) % 60)
+            label += '{}s'.format(int(s) % 60)
+        return label
+
+    def text_report(self):
+        if not self.tasks:
+            return "(no report generated)\n"
+        return "\n".join(itertools.chain(
+            self._text_report_gen(),
+            self._recommend_gen())) + "\n"
+
+    def html_report(self):
+        return crunchstat_summary.chartjs.ChartJS(self.label, [self]).html()
+
+    def _text_report_gen(self):
         yield "\t".join(['category', 'metric', 'task_max', 'task_max_rate', 'job_total'])
         for category, stat_max in sorted(self.stats_max.iteritems()):
             for stat, val in sorted(stat_max.iteritems()):
@@ -90,9 +214,12 @@ class Summarizer(object):
                     continue
                 max_rate = self._format(stat_max.get(stat+'__rate', '-'))
                 val = self._format(val)
-                tot = self._format(job_tot[category].get(stat, '-'))
+                tot = self._format(self.job_tot[category].get(stat, '-'))
                 yield "\t".join([category, stat, str(val), max_rate, tot])
         for args in (
+                ('Number of tasks: {}',
+                 len(self.tasks),
+                 None),
                 ('Max CPU time spent by a single task: {}s',
                  self.stats_max['cpu']['user+sys'],
                  None),
@@ -100,17 +227,31 @@ class Summarizer(object):
                  self.stats_max['cpu']['user+sys__rate'],
                  lambda x: x * 100),
                 ('Overall CPU usage: {}%',
-                 job_tot['cpu']['user+sys'] / job_tot['time']['elapsed'],
+                 self.job_tot['cpu']['user+sys'] /
+                 self.job_tot['time']['elapsed']
+                 if self.job_tot['time']['elapsed'] > 0 else 0,
                  lambda x: x * 100),
                 ('Max memory used by a single task: {}GB',
                  self.stats_max['mem']['rss'],
                  lambda x: x / 1e9),
                 ('Max network traffic in a single task: {}GB',
-                 self.stats_max['net:eth0']['tx+rx'],
+                 self.stats_max['net:eth0']['tx+rx'] +
+                 self.stats_max['net:keep0']['tx+rx'],
                  lambda x: x / 1e9),
                 ('Max network speed in a single interval: {}MB/s',
-                 self.stats_max['net:eth0']['tx+rx__rate'],
-                 lambda x: x / 1e6)):
+                 self.stats_max['net:eth0']['tx+rx__rate'] +
+                 self.stats_max['net:keep0']['tx+rx__rate'],
+                 lambda x: x / 1e6),
+                ('Keep cache miss rate {}%',
+                 (float(self.job_tot['keepcache']['miss']) /
+                 float(self.job_tot['keepcalls']['get']))
+                 if self.job_tot['keepcalls']['get'] > 0 else 0,
+                 lambda x: x * 100.0),
+                ('Keep cache utilization {}%',
+                 (float(self.job_tot['blkio:0:0']['read']) /
+                 float(self.job_tot['net:keep0']['rx']))
+                 if self.job_tot['net:keep0']['rx'] > 0 else 0,
+                 lambda x: x * 100.0)):
             format_string, val, transform = args
             if val == float('-Inf'):
                 continue
@@ -118,6 +259,100 @@ class Summarizer(object):
                 val = transform(val)
             yield "# "+format_string.format(self._format(val))
 
+    def _recommend_gen(self):
+        return itertools.chain(
+            self._recommend_cpu(),
+            self._recommend_ram(),
+            self._recommend_keep_cache())
+
+    def _recommend_cpu(self):
+        """Recommend asking for 4 cores if max CPU usage was 333%"""
+
+        cpu_max_rate = self.stats_max['cpu']['user+sys__rate']
+        if cpu_max_rate == float('-Inf'):
+            logger.warning('%s: no CPU usage data', self.label)
+            return
+        used_cores = max(1, int(math.ceil(cpu_max_rate)))
+        asked_cores = self.existing_constraints.get('min_cores_per_node')
+        if asked_cores is None or used_cores < asked_cores:
+            yield (
+                '#!! {} max CPU usage was {}% -- '
+                'try runtime_constraints "min_cores_per_node":{}'
+            ).format(
+                self.label,
+                int(math.ceil(cpu_max_rate*100)),
+                int(used_cores))
+
+    def _recommend_ram(self):
+        """Recommend an economical RAM constraint for this job.
+
+        Nodes that are advertised as "8 gibibytes" actually have what
+        we might call "8 nearlygibs" of memory available for jobs.
+        Here, we calculate a whole number of nearlygibs that would
+        have sufficed to run the job, then recommend requesting a node
+        with that number of nearlygibs (expressed as mebibytes).
+
+        Requesting a node with "nearly 8 gibibytes" is our best hope
+        of getting a node that actually has nearly 8 gibibytes
+        available.  If the node manager is smart enough to account for
+        the discrepancy itself when choosing/creating a node, we'll
+        get an 8 GiB node with nearly 8 GiB available.  Otherwise, the
+        advertised size of the next-size-smaller node (say, 6 GiB)
+        will be too low to satisfy our request, so we will effectively
+        get rounded up to 8 GiB.
+
+        For example, if we need 7500 MiB, we can ask for 7500 MiB, and
+        we will generally get a node that is advertised as "8 GiB" and
+        has at least 7500 MiB available.  However, asking for 8192 MiB
+        would either result in an unnecessarily expensive 12 GiB node
+        (if node manager knows about the discrepancy), or an 8 GiB
+        node which has less than 8192 MiB available and is therefore
+        considered by crunch-dispatch to be too small to meet our
+        constraint.
+
+        When node manager learns how to predict the available memory
+        for each node type such that crunch-dispatch always agrees
+        that a node is big enough to run the job it was brought up
+        for, all this will be unnecessary.  We'll just ask for exactly
+        the memory we want -- even if that happens to be 8192 MiB.
+        """
+
+        used_bytes = self.stats_max['mem']['rss']
+        if used_bytes == float('-Inf'):
+            logger.warning('%s: no memory usage data', self.label)
+            return
+        used_mib = math.ceil(float(used_bytes) / 1048576)
+        asked_mib = self.existing_constraints.get('min_ram_mb_per_node')
+
+        nearlygibs = lambda mebibytes: mebibytes/AVAILABLE_RAM_RATIO/1024
+        if asked_mib is None or (
+                math.ceil(nearlygibs(used_mib)) < nearlygibs(asked_mib)):
+            yield (
+                '#!! {} max RSS was {} MiB -- '
+                'try runtime_constraints "min_ram_mb_per_node":{}'
+            ).format(
+                self.label,
+                int(used_mib),
+                int(math.ceil(nearlygibs(used_mib))*AVAILABLE_RAM_RATIO*1024))
+
+    def _recommend_keep_cache(self):
+        """Recommend increasing keep cache if utilization < 80%"""
+        if self.job_tot['net:keep0']['rx'] == 0:
+            return
+        utilization = (float(self.job_tot['blkio:0:0']['read']) /
+                       float(self.job_tot['net:keep0']['rx']))
+        asked_mib = self.existing_constraints.get('keep_cache_mb_per_task', 256)
+
+        if utilization < 0.8:
+            yield (
+                '#!! {} Keep cache utilization was {:.2f}% -- '
+                'try runtime_constraints "keep_cache_mb_per_task":{} (or more)'
+            ).format(
+                self.label,
+                utilization * 100.0,
+                asked_mib*2)
+
+
     def _format(self, val):
         """Return a string representation of a stat.
 
@@ -127,25 +362,76 @@ class Summarizer(object):
         else:
             return '{}'.format(val)
 
-    def _logdata(self):
-        if self.args.log_file:
-            if self.args.log_file.endswith('.gz'):
-                return gzip.open(self.args.log_file)
-            else:
-                return open(self.args.log_file)
-        elif self.args.job:
-            arv = arvados.api('v1')
-            job = arv.jobs().get(uuid=self.args.job).execute()
-            if not job['log']:
-                raise ValueError(
-                    "job {} has no log; live summary not implemented".format(
-                        self.args.job))
-            collection = arvados.collection.CollectionReader(job['log'])
-            filenames = [filename for filename in collection]
-            if len(filenames) != 1:
-                raise ValueError(
-                    "collection {} has {} files; need exactly one".format(
-                        job.log, len(filenames)))
-            return collection.open(filenames[0])
+
+class CollectionSummarizer(Summarizer):
+    def __init__(self, collection_id, **kwargs):
+        super(CollectionSummarizer, self).__init__(
+            crunchstat_summary.reader.CollectionReader(collection_id), **kwargs)
+        self.label = collection_id
+
+
+class JobSummarizer(Summarizer):
+    def __init__(self, job, **kwargs):
+        arv = arvados.api('v1')
+        if isinstance(job, basestring):
+            self.job = arv.jobs().get(uuid=job).execute()
         else:
-            return sys.stdin
+            self.job = job
+        rdr = None
+        if self.job.get('log'):
+            try:
+                rdr = crunchstat_summary.reader.CollectionReader(self.job['log'])
+            except arvados.errors.NotFoundError as e:
+                logger.warning("Trying event logs after failing to read "
+                               "log collection %s: %s", self.job['log'], e)
+            else:
+                label = self.job['uuid']
+        if rdr is None:
+            rdr = crunchstat_summary.reader.LiveLogReader(self.job['uuid'])
+            label = self.job['uuid'] + ' (partial)'
+        super(JobSummarizer, self).__init__(rdr, **kwargs)
+        self.label = label
+        self.existing_constraints = self.job.get('runtime_constraints', {})
+
+
+class PipelineSummarizer(object):
+    def __init__(self, pipeline_instance_uuid, **kwargs):
+        arv = arvados.api('v1', model=OrderedJsonModel())
+        instance = arv.pipeline_instances().get(
+            uuid=pipeline_instance_uuid).execute()
+        self.summarizers = collections.OrderedDict()
+        for cname, component in instance['components'].iteritems():
+            if 'job' not in component:
+                logger.warning(
+                    "%s: skipping component with no job assigned", cname)
+            else:
+                logger.info(
+                    "%s: job %s", cname, component['job']['uuid'])
+                summarizer = JobSummarizer(component['job'], **kwargs)
+                summarizer.label = '{} {}'.format(
+                    cname, component['job']['uuid'])
+                self.summarizers[cname] = summarizer
+        self.label = pipeline_instance_uuid
+
+    def run(self):
+        threads = []
+        for summarizer in self.summarizers.itervalues():
+            t = threading.Thread(target=summarizer.run)
+            t.daemon = True
+            t.start()
+            threads.append(t)
+        for t in threads:
+            t.join()
+
+    def text_report(self):
+        txt = ''
+        for cname, summarizer in self.summarizers.iteritems():
+            txt += '### Summary for {} ({})\n'.format(
+                cname, summarizer.job['uuid'])
+            txt += summarizer.text_report()
+            txt += '\n'
+        return txt
+
+    def html_report(self):
+        return crunchstat_summary.chartjs.ChartJS(
+            self.label, self.summarizers.itervalues()).html()
index aeb0fe6c3369bc30fdc125985374895461685546..f3c10bdaec0dbffd210a2e2b6a30886903be0a88 100755 (executable)
@@ -23,6 +23,7 @@ setup(name='crunchstat_summary',
       download_url="https://github.com/curoverse/arvados.git",
       license='GNU Affero General Public License, version 3.0',
       packages=['crunchstat_summary'],
+      include_package_data=True,
       scripts=[
           'bin/crunchstat-summary'
       ],
@@ -33,6 +34,7 @@ setup(name='crunchstat_summary',
           'arvados-python-client',
       ],
       test_suite='tests',
+      tests_require=['pbr<1.7.0', 'mock>=1.0'],
       zip_safe=False,
       cmdclass={'egg_info': tagger},
       )
diff --git a/tools/crunchstat-summary/tests/crunchstat_error_messages.txt b/tools/crunchstat-summary/tests/crunchstat_error_messages.txt
new file mode 100644 (file)
index 0000000..bf6dd5c
--- /dev/null
@@ -0,0 +1,9 @@
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr 
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr old error message:
+2016-01-07_00:15:33 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: read /proc/3305/net/dev: open /proc/3305/net/dev: no such file or directory
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr 
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr new error message:
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: error reading /proc/3305/net/dev: open /proc/3305/net/dev: no such file or directory
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr cancelled job:
+2016-01-07_00:15:34 tb05z-8i9sb-khsk5rmf4xjdcbl 20819 0 stderr crunchstat: caught signal: interrupt
index ef7beb11c0057009f8465aa2836349ca32235f5a..f0a60957bba706b18b6d41288f5fc46ded754b1c 100644 (file)
@@ -1,16 +1,16 @@
 category       metric  task_max        task_max_rate   job_total
-blkio:0:0      read    0       0.00    0
-blkio:0:0      write   0       0.00    0
+blkio:0:0      read    0       0       0
+blkio:0:0      write   0       0       0
 cpu    cpus    8       -       -
 cpu    sys     1.92    0.04    1.92
 cpu    user    3.83    0.09    3.83
 cpu    user+sys        5.75    0.13    5.75
-fuseops        read    0       0.00    0
-fuseops        write   0       0.00    0
-keepcache      hit     0       0.00    0
-keepcache      miss    0       0.00    0
-keepcalls      get     0       0.00    0
-keepcalls      put     0       0.00    0
+fuseops        read    0       0       0
+fuseops        write   0       0       0
+keepcache      hit     0       0       0
+keepcache      miss    0       0       0
+keepcalls      get     0       0       0
+keepcalls      put     0       0       0
 mem    cache   1678139392      -       -
 mem    pgmajfault      0       -       0
 mem    rss     349814784       -       -
@@ -18,13 +18,18 @@ mem swap    0       -       -
 net:eth0       rx      1754364530      41658344.87     1754364530
 net:eth0       tx      38837956        920817.97       38837956
 net:eth0       tx+rx   1793202486      42579162.83     1793202486
-net:keep0      rx      0       0.00    0
-net:keep0      tx      0       0.00    0
-net:keep0      tx+rx   0       0.00    0
+net:keep0      rx      0       0       0
+net:keep0      tx      0       0       0
+net:keep0      tx+rx   0       0       0
 time   elapsed 80      -       80
+# Number of tasks: 1
 # Max CPU time spent by a single task: 5.75s
 # Max CPU usage in a single interval: 13.00%
 # Overall CPU usage: 7.19%
 # Max memory used by a single task: 0.35GB
 # Max network traffic in a single task: 1.79GB
 # Max network speed in a single interval: 42.58MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max CPU usage was 13% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-jq0ekny1xou3zoh max RSS was 334 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index 38af3e7e8c7752eb07532f5a2f4fd54a616eab4e..f9a34cfb98c7c6d42a633166b946b7c49db77cc1 100644 (file)
@@ -1,8 +1,8 @@
 category       metric  task_max        task_max_rate   job_total
 cpu    cpus    8       -       -
-cpu    sys     0.00    -       0.00
-cpu    user    0.00    -       0.00
-cpu    user+sys        0.00    -       0.00
+cpu    sys     0       -       0.00
+cpu    user    0       -       0.00
+cpu    user+sys        0       -       0.00
 mem    cache   12288   -       -
 mem    pgmajfault      0       -       0
 mem    rss     856064  -       -
@@ -11,7 +11,14 @@ net:eth0     rx      90      -       90
 net:eth0       tx      90      -       90
 net:eth0       tx+rx   180     -       180
 time   elapsed 2       -       4
-# Max CPU time spent by a single task: 0.00s
+# Number of tasks: 2
+# Max CPU time spent by a single task: 0s
+# Max CPU usage in a single interval: 0%
 # Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-zvb2ocfycpomrup max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index 7e42d612b753fadb25e4bac7cee7e941e01231e3..c54102d78a25d1158ba563a22de7279b3d39c2f2 100644 (file)
@@ -1,8 +1,8 @@
 category       metric  task_max        task_max_rate   job_total
 cpu    cpus    8       -       -
-cpu    sys     0.00    -       0.00
-cpu    user    0.00    -       0.00
-cpu    user+sys        0.00    -       0.00
+cpu    sys     0       -       0.00
+cpu    user    0       -       0.00
+cpu    user+sys        0       -       0.00
 mem    cache   8192    -       -
 mem    pgmajfault      0       -       0
 mem    rss     450560  -       -
@@ -11,7 +11,14 @@ net:eth0     rx      90      -       90
 net:eth0       tx      90      -       90
 net:eth0       tx+rx   180     -       180
 time   elapsed 2       -       3
-# Max CPU time spent by a single task: 0.00s
+# Number of tasks: 2
+# Max CPU time spent by a single task: 0s
+# Max CPU usage in a single interval: 0%
 # Overall CPU usage: 0.00%
 # Max memory used by a single task: 0.00GB
 # Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max CPU usage was 0% -- try runtime_constraints "min_cores_per_node":1
+#!! 4xphq-8i9sb-v831jm2uq0g2g9x max RSS was 1 MiB -- try runtime_constraints "min_ram_mb_per_node":972
index dbc3843c698c616e9ef2f98b95b250e632b9f3c7..b1e5fed81d7b4481023de037d68656abe6fe4406 100644 (file)
+import arvados
+import collections
 import crunchstat_summary.command
-import crunchstat_summary.summarizer
 import difflib
 import glob
+import gzip
+import mock
 import os
 import unittest
 
+TESTS_DIR = os.path.dirname(os.path.abspath(__file__))
 
-class ExampleLogsTestCase(unittest.TestCase):
+
+class ReportDiff(unittest.TestCase):
+    def diff_known_report(self, logfile, cmd):
+        expectfile = logfile+'.report'
+        expect = open(expectfile).readlines()
+        self.diff_report(cmd, expect, expectfile=expectfile)
+
+    def diff_report(self, cmd, expect, expectfile=None):
+        got = [x+"\n" for x in cmd.report().strip("\n").split("\n")]
+        self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
+            expect, got, fromfile=expectfile, tofile="(generated)")))
+
+
+class SummarizeFile(ReportDiff):
     def test_example_files(self):
-        dirname = os.path.dirname(os.path.abspath(__file__))
-        for fnm in glob.glob(os.path.join(dirname, '*.txt.gz')):
-            logfile = os.path.join(dirname, fnm)
+        for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+            logfile = os.path.join(TESTS_DIR, fnm)
             args = crunchstat_summary.command.ArgumentParser().parse_args(
                 ['--log-file', logfile])
-            summarizer = crunchstat_summary.summarizer.Summarizer(args)
-            summarizer.run()
-            got = [x+"\n" for x in summarizer.report().strip("\n").split("\n")]
-            expectfile = logfile+'.report'
-            expect = open(expectfile).readlines()
-            self.assertEqual(got, expect, "\n"+"".join(difflib.context_diff(
-                expect, got, fromfile=expectfile, tofile="(generated)")))
+            cmd = crunchstat_summary.command.Command(args)
+            cmd.run()
+            self.diff_known_report(logfile, cmd)
+
+
+class HTMLFromFile(ReportDiff):
+    def test_example_files(self):
+        # Note we don't test the output content at all yet; we're
+        # mainly just verifying the --format=html option isn't ignored
+        # and the HTML code path doesn't crash.
+        for fnm in glob.glob(os.path.join(TESTS_DIR, '*.txt.gz')):
+            logfile = os.path.join(TESTS_DIR, fnm)
+            args = crunchstat_summary.command.ArgumentParser().parse_args(
+                ['--format=html', '--log-file', logfile])
+            cmd = crunchstat_summary.command.Command(args)
+            cmd.run()
+            self.assertRegexpMatches(cmd.report(), r'(?is)<html>.*</html>\s*$')
+
+
+class SummarizeEdgeCases(unittest.TestCase):
+    def test_error_messages(self):
+        logfile = open(os.path.join(TESTS_DIR, 'crunchstat_error_messages.txt'))
+        s = crunchstat_summary.summarizer.Summarizer(logfile)
+        s.run()
+
+
+class SummarizeJob(ReportDiff):
+    fake_job_uuid = '4xphq-8i9sb-jq0ekny1xou3zoh'
+    fake_log_id = 'fake-log-collection-id'
+    fake_job = {
+        'uuid': fake_job_uuid,
+        'log': fake_log_id,
+    }
+    logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+
+    @mock.patch('arvados.collection.CollectionReader')
+    @mock.patch('arvados.api')
+    def test_job_report(self, mock_api, mock_cr):
+        mock_api().jobs().get().execute.return_value = self.fake_job
+        mock_cr().__iter__.return_value = ['fake-logfile.txt']
+        mock_cr().open.return_value = gzip.open(self.logfile)
+        args = crunchstat_summary.command.ArgumentParser().parse_args(
+            ['--job', self.fake_job_uuid])
+        cmd = crunchstat_summary.command.Command(args)
+        cmd.run()
+        self.diff_known_report(self.logfile, cmd)
+        mock_api().jobs().get.assert_called_with(uuid=self.fake_job_uuid)
+        mock_cr.assert_called_with(self.fake_log_id)
+        mock_cr().open.assert_called_with('fake-logfile.txt')
+
+
+class SummarizePipeline(ReportDiff):
+    fake_instance = {
+        'uuid': 'zzzzz-d1hrv-i3e77t9z5y8j9cc',
+        'owner_uuid': 'zzzzz-tpzed-xurymjxw79nv3jz',
+        'components': collections.OrderedDict([
+            ['foo', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000000',
+                    'log': 'fake-log-pdh-0',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 900,
+                        'min_cores_per_node': 1,
+                    },
+                },
+            }],
+            ['bar', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000001',
+                    'log': 'fake-log-pdh-1',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 900,
+                        'min_cores_per_node': 1,
+                    },
+                },
+            }],
+            ['no-job-assigned', {}],
+            ['unfinished-job', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-xxxxxxxxxxxxxxx',
+                },
+            }],
+            ['baz', {
+                'job': {
+                    'uuid': 'zzzzz-8i9sb-000000000000002',
+                    'log': 'fake-log-pdh-2',
+                    'runtime_constraints': {
+                        'min_ram_mb_per_node': 900,
+                        'min_cores_per_node': 1,
+                    },
+                },
+            }]]),
+    }
+
+    @mock.patch('arvados.collection.CollectionReader')
+    @mock.patch('arvados.api')
+    def test_pipeline(self, mock_api, mock_cr):
+        logfile = os.path.join(TESTS_DIR, 'logfile_20151204190335.txt.gz')
+        mock_api().pipeline_instances().get().execute. \
+            return_value = self.fake_instance
+        mock_cr().__iter__.return_value = ['fake-logfile.txt']
+        mock_cr().open.side_effect = [gzip.open(logfile) for _ in range(3)]
+        args = crunchstat_summary.command.ArgumentParser().parse_args(
+            ['--pipeline-instance', self.fake_instance['uuid']])
+        cmd = crunchstat_summary.command.Command(args)
+        cmd.run()
+
+        job_report = [
+            line for line in open(logfile+'.report').readlines()
+            if not line.startswith('#!! ')]
+        expect = (
+            ['### Summary for foo (zzzzz-8i9sb-000000000000000)\n'] +
+            job_report + ['\n'] +
+            ['### Summary for bar (zzzzz-8i9sb-000000000000001)\n'] +
+            job_report + ['\n'] +
+            ['### Summary for unfinished-job (zzzzz-8i9sb-xxxxxxxxxxxxxxx)\n',
+             '(no report generated)\n',
+             '\n'] +
+            ['### Summary for baz (zzzzz-8i9sb-000000000000002)\n'] +
+            job_report)
+        self.diff_report(cmd, expect)
+        mock_cr.assert_has_calls(
+            [
+                mock.call('fake-log-pdh-0'),
+                mock.call('fake-log-pdh-1'),
+                mock.call('fake-log-pdh-2'),
+            ], any_order=True)
+        mock_cr().open.assert_called_with('fake-logfile.txt')