3550: Merge branch 'master' into 3550-local-pipeline
authorTom Clegg <tom@curoverse.com>
Wed, 17 Sep 2014 22:00:25 +0000 (18:00 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 17 Sep 2014 22:00:25 +0000 (18:00 -0400)
Conflicts:
sdk/cli/bin/arv-run-pipeline-instance

52 files changed:
apps/workbench/app/models/collection.rb
apps/workbench/app/models/job.rb
apps/workbench/app/models/pipeline_instance.rb
apps/workbench/app/models/pipeline_template.rb
apps/workbench/app/views/application/_arvados_attr_value.html.erb
apps/workbench/app/views/application/_name_and_description.html.erb [new file with mode: 0644]
apps/workbench/app/views/application/show.html.erb
apps/workbench/app/views/collections/show.html.erb
apps/workbench/app/views/pipeline_instances/show.html.erb
apps/workbench/app/views/pipeline_templates/_show_recent.html.erb
apps/workbench/app/views/pipeline_templates/show.html.erb
apps/workbench/app/views/projects/show.html.erb
apps/workbench/config/application.default.yml
apps/workbench/config/database.yml
apps/workbench/test/diagnostics/pipeline_test.rb [new file with mode: 0644]
apps/workbench/test/diagnostics_test_helper.rb [new file with mode: 0644]
apps/workbench/test/integration/collections_test.rb
apps/workbench/test/integration/jobs_test.rb [new file with mode: 0644]
apps/workbench/test/integration/pipeline_instances_test.rb
apps/workbench/test/integration/pipeline_templates_test.rb
apps/workbench/test/test_helper.rb
doc/_includes/_example_sdk_go.liquid [new file with mode: 0644]
doc/_includes/_example_sdk_go_imports.liquid [new file with mode: 0644]
doc/sdk/go/index.html.textile.liquid
docker/java-bwa-samtools/Dockerfile
sdk/cli/bin/arv-run-pipeline-instance
sdk/python/arvados/collection.py
sdk/python/arvados/keep.py
services/api/Gemfile
services/api/Gemfile.lock
services/api/app/controllers/arvados/v1/schema_controller.rb
services/api/app/models/collection.rb
services/api/app/models/job.rb
services/api/app/models/node.rb
services/api/app/models/pipeline_instance.rb
services/api/config/application.default.yml
services/api/db/migrate/20140911221252_add_description_to_pipeline_instances_and_jobs.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/lib/tasks/config_check.rake
services/api/script/crunch-dispatch.rb
services/api/test/fixtures/collections.yml
services/api/test/functional/arvados/v1/collections_controller_test.rb
services/api/test/functional/arvados/v1/nodes_controller_test.rb
services/api/test/functional/arvados/v1/schema_controller_test.rb
services/fuse/arvados_fuse/__init__.py
services/fuse/tests/test_mount.py
services/keepstore/block_work_list.go [new file with mode: 0644]
services/keepstore/block_work_list_test.go [new file with mode: 0644]
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/pull_list/pull_list.go [deleted file]

index edd87add19ada373ef4892fc532b2ffe8884a8de..40fd930e38a99ae453ff11e01defcedec35af5fb 100644 (file)
@@ -110,4 +110,8 @@ class Collection < ArvadosBase
     end
   end
 
+  def textile_attributes
+    [ 'description' ]
+  end
+
 end
index aac6168d22aecac8d37d9afcaee56db844cecdcd..9e9c0967871ba4e6408a79c1855b87460f156694 100644 (file)
@@ -8,7 +8,11 @@ class Job < ArvadosBase
   end
 
   def attribute_editable? attr, *args
-    false
+    if attr.to_sym == :description
+      super && attr.to_sym == :description
+    else
+      false
+    end
   end
 
   def self.creatable?
@@ -33,4 +37,8 @@ class Job < ArvadosBase
   def cancel
     arvados_api_client.api "jobs/#{self.uuid}/", "cancel", {}
   end
+
+  def textile_attributes
+    [ 'description' ]
+  end
 end
index e33484642b5e666b9887fc60f35ea24d6aaa9a64..89acbb0dbb9438bbbb80ba2bc97cc1ce93eac9fa 100644 (file)
@@ -44,7 +44,7 @@ class PipelineInstance < ArvadosBase
   end
 
   def attribute_editable? attr, *args
-    super && (attr.to_sym == :name ||
+    super && (attr.to_sym == :name || attr.to_sym == :description ||
               (attr.to_sym == :components and
                (self.state == 'New' || self.state == 'Ready')))
   end
@@ -67,4 +67,8 @@ class PipelineInstance < ArvadosBase
       "\"#{input_name.to_s}\" parameter for #{component[:script]} script in #{component_name} component"
     end
   end
+
+  def textile_attributes
+    [ 'description' ]
+  end
 end
index e1af2cb052676525ddba49d3f30afc07ff2aff10..6e7977539702abce395491cf718c5ea2ed82eafb 100644 (file)
@@ -6,4 +6,8 @@ class PipelineTemplate < ArvadosBase
   def self.creatable?
     false
   end
+
+  def textile_attributes
+    [ 'description' ]
+  end
 end
index 52c46fbe2f9e0821fe8bbba8621814b6a4c05ce5..3df892fd5a6b24126d584f112da28284f30c1e60 100644 (file)
@@ -3,7 +3,7 @@
     <%= message %><br />
   <% end %>
 <% else %>
-      <% if obj.attribute_editable?(attr) and (!defined?(editable) || editable) %>
+      <% if attr and obj.attribute_editable?(attr) and (!defined?(editable) || editable) %>
         <% if resource_class_for_uuid(attrvalue, {referring_object: obj, referring_attr: attr}) %>
           <%= link_to_if_arvados_object attrvalue, {referring_attr: attr, referring_object: obj, with_class_name: true, friendly_name: true} %>
           <br>
diff --git a/apps/workbench/app/views/application/_name_and_description.html.erb b/apps/workbench/app/views/application/_name_and_description.html.erb
new file mode 100644 (file)
index 0000000..0144a4d
--- /dev/null
@@ -0,0 +1,12 @@
+<% if @object.respond_to? :name %>
+  <h2>
+    <%= render_editable_attribute @object, 'name', nil, { 'data-emptytext' => "New #{controller.model_class.to_s.underscore.gsub("_"," ")}" } %>
+  </h2>
+<% end %>
+
+<% if @object.respond_to? :description %>
+  <div class="arv-description-as-subtitle">
+    <%= render_editable_attribute @object, 'description', nil, { 'data-emptytext' => "(No description provided)", 'data-toggle' => 'manual' } %>
+  </div>
+<% end %>
+
index 490e7e4e12cd8fcc982cf2f60f50549d3fa4b8eb..0697fefec596d55aa79e40cc419013c513456efb 100644 (file)
@@ -5,6 +5,9 @@
 <% end %>
 
 <% content_for :content_top do %>
+  <% if !['Group','User', 'Collection'].include? @object.class.to_s # projects and collections handle it themselves %>
+    <%= render partial: 'name_and_description' %>
+  <% end %>
 
 <% if @object.respond_to? :properties and !@object.properties.nil? %>
   <% if @object.properties[:page_content] %>
index 94b099a25c8c7256a3a63430080a0778d7f9d7c9..a80cf3985d92731b561b81dec01e62acc91d0323 100644 (file)
@@ -33,6 +33,9 @@
        </h3>
       </div>
       <div class="panel-body">
+        <div class="arv-description-as-subtitle">
+          <%= render_editable_attribute @object, 'description', nil, { 'data-emptytext' => "(No description provided)", 'data-toggle' => 'manual' } %>
+        </div>
         <img src="/favicon.ico" class="pull-right" alt="" style="opacity: 0.3"/>
         <% if defined? @same_pdh %>
           <p>Found in collections:<p>
index b30db3b12eb02bc74a3c926434afe72fd90c68dd..905470ce3c088d8653757a3793095a39f68d88c6 100644 (file)
@@ -1,14 +1,18 @@
 <% template = PipelineTemplate.find?(@object.pipeline_template_uuid) %>
 <%= content_for :content_top do %>
-  <h2>
-    <%= render_editable_attribute @object, 'name', nil %>
-  </h2>
-  <% if template %>
-  <blockquote><span class="deemphasize">From template:</span><br />
-    <%= link_to_if_arvados_object template, friendly_name: true %><br />
-    <%= template.description %>
-  </blockquote>
-  <% end %>
+  <div class="row">
+    <div class="col-sm-6">
+      <%= render partial: 'name_and_description' %>
+    </div>
+    <% if template %>
+      <div class="alert alert-info col-sm-6">
+        This pipeline was created from the template <%= link_to_if_arvados_object template, friendly_name: true %><br />
+        <% if template.modified_at && (template.modified_at > @object.created_at) %>
+        Note: This template has been modified since this instance was created.
+        <% end %>
+      </div>
+    <% end %>
+  </div>
 <% end %>
 
 <% content_for :tab_line_buttons do %>
index 9a02e0c337ac1eb1ea554a802535d7e14aa45f66..252b93ad45664ac041c7016ab6b82c09bcbf4ba2 100644 (file)
@@ -35,6 +35,7 @@
                                      action_method: 'post',
                                      action_data: {selection_param: 'pipeline_instance[owner_uuid]',
                                                    'pipeline_instance[pipeline_template_uuid]' => ob.uuid,
+                                                   'pipeline_instance[description]' => "Created at #{Time.now.localtime}" + (ob.name.andand.size.andand>0 ? " using the pipeline template *#{ob.name}*" : ""),
                                                    'success' => 'redirect-to-created-object'
                                                   }.to_json),
                 { class: "btn btn-default btn-xs", title: "Run #{ob.name}", remote: true, method: 'get' }
@@ -48,7 +49,7 @@
         <%= render_editable_attribute ob, 'name' %>
       </td><td>
         <% if ob.respond_to?(:description) and ob.description %>
-          <%= ob.description %>
+          <%= render_attribute_as_textile(ob, "description", ob.description, false) %>
           <br />
         <% end %>
         <% ob.components.collect { |k,v| k.to_s }.each do |k| %>
index 725d63db731c0feae254534157b235dd8e2798cf..723bce63da92db752c831f2f36ea22647482d9b1 100644 (file)
@@ -7,6 +7,7 @@
                                      action_method: 'post',
                                      action_data: {selection_param: 'pipeline_instance[owner_uuid]',
                                                    'pipeline_instance[pipeline_template_uuid]' => @object.uuid,
+                                                   'pipeline_instance[description]' => "Created at #{Time.now.localtime}" + (@object.name.andand.size.andand>0 ? " using the pipeline template *#{@object.name}*" : ""),
                                                    'success' => 'redirect-to-created-object'
                                                   }.to_json),
                 { class: "btn btn-primary btn-sm", remote: true, method: 'get' }
index 2551760ff2e5ad15dee9457d526bffee3579cdea..a0ae2d3036ccd69a5ee14a4ee86e55f8179504b6 100644 (file)
@@ -1,14 +1,6 @@
 <% if @object.uuid != current_user.uuid # Not the "Home" project %>
 <% content_for :content_top do %>
-
-<h2>
-  <%= render_editable_attribute @object, 'name', nil, { 'data-emptytext' => "New project" } %>
-</h2>
-
-<div class="arv-description-as-subtitle">
-  <%= render_editable_attribute @object, 'description', nil, { 'data-emptytext' => "(No description provided)", 'data-toggle' => 'manual' } %>
-</div>
-
+  <%= render partial: 'name_and_description' %>
 <% end %>
 <% end %>
 
index 893b28c1bf0abfe41381db1ab4e7aeff4717db85..e43d94b225d8903f0a7f0d46e22dd098d0efd072 100644 (file)
@@ -1,6 +1,32 @@
 # Do not use this file for site configuration. Create application.yml
 # instead (see application.yml.example).
 
+# Below is a sample setting for diagnostics testing.
+# Configure workbench URL as "arvados_workbench_url"
+# Configure test user tokens as "user_tokens".
+#   At this time the tests need an "active" user token.
+# Also, configure the pipelines to be executed as "pipelines_to_test".
+# For each of the pipelines identified by the name of your choice
+#     ("pipeline_1" and "pipeline_2" in this sample), provide the following:
+#   template_uuid: is the uuid of the template to be executed
+#   input_paths: an array of inputs for the pipeline. Use either a collection's "uuid"
+#     or a file's "uuid/file_name" path in this array. If the pipeline does not require
+#     any inputs, this can be omitted. 
+#   max_wait_seconds: max time in seconds to wait for the pipeline run to complete.
+#     Default value of 30 seconds is used when this value is not provided.
+diagnostics:
+  arvados_workbench_url: https://localhost:3000
+  user_tokens:
+    active: eu33jurqntstmwo05h1jr3eblmi961e802703y6657s8zb14r
+  pipelines_to_test:
+    pipeline_1:
+      template_uuid: zzzzz-p5p6p-rxj8d71854j9idn
+      input_paths: [zzzzz-4zz18-nz98douzhaa3jh2]
+      max_wait_seconds: 10
+    pipeline_2:
+      template_uuid: zzzzz-p5p6p-1xbobfobk94ppbv
+      input_paths: [zzzzz-4zz18-nz98douzhaa3jh2, zzzzz-4zz18-gpw9o5wpcti3nib]
+
 development:
   cache_classes: false
   eager_load: true
index 51a4dd459dc12673ad440debb28f499c89550022..dd7669cb281555304f789cc7971281ba1b93d9cc 100644 (file)
@@ -23,3 +23,10 @@ production:
   database: db/production.sqlite3
   pool: 5
   timeout: 5000
+
+# Note: The "diagnostics" database configuration is not actually used.
+diagnostics:
+  adapter: sqlite3
+  database: db/diagnostics.sqlite3
+  pool: 5
+  timeout: 5000
diff --git a/apps/workbench/test/diagnostics/pipeline_test.rb b/apps/workbench/test/diagnostics/pipeline_test.rb
new file mode 100644 (file)
index 0000000..a52d1a6
--- /dev/null
@@ -0,0 +1,93 @@
+require 'diagnostics_test_helper'
+require 'selenium-webdriver'
+require 'headless'
+
+class PipelineTest < DiagnosticsTest
+  pipelines_to_test = Rails.configuration.pipelines_to_test.andand.keys
+
+  setup do
+    headless = Headless.new
+    headless.start
+    Capybara.current_driver = :selenium
+  end
+
+  pipelines_to_test.andand.each do |pipeline_to_test|
+    test "visit home page for user #{pipeline_to_test}" do
+      visit_page_with_token 'active'
+      pipeline_config = Rails.configuration.pipelines_to_test[pipeline_to_test]
+
+      # Search for tutorial template
+      within('.navbar-fixed-top') do
+        page.find_field('search').set pipeline_config['template_uuid']
+        page.find('.glyphicon-search').click
+      end
+
+      # Run the pipeline
+      find('a,button', text: 'Run').click
+
+      # Choose project
+      within('.modal-dialog') do
+        find('.selectable', text: 'Home').click
+        find('button', text: 'Choose').click
+      end
+
+      page.assert_selector('a.disabled,button.disabled', text: 'Run') if pipeline_config['input_paths'].any?
+
+      # Choose input for the pipeline
+      pipeline_config['input_paths'].each do |look_for|
+        select_input look_for
+      end
+      wait_for_ajax
+
+      # All needed input are filled in. Run this pipeline now
+      find('a,button', text: 'Run').click
+
+      # Pipeline is running. We have a "Stop" button instead now.
+      page.assert_selector 'a,button', text: 'Stop'
+
+      # Wait for pipeline run to complete
+      wait_until_page_has 'Complete', pipeline_config['max_wait_seconds']
+    end
+  end
+
+  def select_input look_for
+    inputs_needed = page.all('.btn', text: 'Choose')
+    return if (!inputs_needed || !inputs_needed.any?)
+
+    look_for_uuid = nil
+    look_for_file = nil
+    if look_for.andand.index('/').andand.>0
+      partitions = look_for.partition('/')
+      look_for_uuid = partitions[0]
+      look_for_file = partitions[2]
+    else
+      look_for_uuid = look_for
+      look_for_file = nil
+    end
+
+    inputs_needed[0].click
+
+    within('.modal-dialog') do
+      if look_for_uuid
+        fill_in('Search', with: look_for_uuid, exact: true)
+        wait_for_ajax
+      end
+             
+      page.all('.selectable').first.click
+      wait_for_ajax
+      # ajax reload is wiping out input selection after search results; so, select again.
+      page.all('.selectable').first.click
+      wait_for_ajax
+
+      if look_for_file
+        wait_for_ajax
+        within('.collection_files_name', text: look_for_file) do
+          find('.fa-file').click
+        end
+      end
+      
+      find('button', text: 'OK').click
+      wait_for_ajax
+    end
+  end
+end
diff --git a/apps/workbench/test/diagnostics_test_helper.rb b/apps/workbench/test/diagnostics_test_helper.rb
new file mode 100644 (file)
index 0000000..01d351a
--- /dev/null
@@ -0,0 +1,29 @@
+require 'integration_helper'
+require 'yaml'
+
+# Diagnostics tests are executed when "RAILS_ENV=diagnostics" is used.
+# When "RAILS_ENV=test" is used, tests in the "diagnostics" directory
+# will not be executed.
+
+class DiagnosticsTest < ActionDispatch::IntegrationTest
+
+  # Prepends workbench URL to the path provided and visits that page
+  # Expects path parameters such as "/collections/<uuid>"
+  def visit_page_with_token token_name, path='/'
+    workbench_url = Rails.configuration.arvados_workbench_url
+    if workbench_url.end_with? '/'
+      workbench_url = workbench_url[0, workbench_url.size-1]
+    end
+    tokens = Rails.configuration.user_tokens
+    visit page_with_token(tokens[token_name], (workbench_url + path))
+  end
+
+  # Looks for the text_to_look_for for up to the max_time provided
+  def wait_until_page_has text_to_look_for, max_time=30
+    max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s))
+    Timeout.timeout(max_time) do
+      loop until page.has_text?(text_to_look_for)
+    end
+  end
+
+end
index 17a59f81851bcac50584991babca8311597fe58b..6eed8d3b7855b7e188f8cba2c03db92a36fe0e6a 100644 (file)
@@ -52,6 +52,10 @@ class CollectionsTest < ActionDispatch::IntegrationTest
   end
 
   test "combine selected collections into new collection" do
+    headless = Headless.new
+    headless.start
+    Capybara.current_driver = :selenium
+
     foo_collection = api_fixture('collections')['foo_file']
     bar_collection = api_fixture('collections')['bar_file']
 
@@ -79,9 +83,15 @@ class CollectionsTest < ActionDispatch::IntegrationTest
     assert(page.has_text?('foo'), "Collection page did not include foo file")
     assert(page.has_no_text?(bar_collection['name']), "Collection page did not include foo file")
     assert(page.has_text?('bar'), "Collection page did not include bar file")
+
+    headless.stop
   end
 
   test "combine selected collection files into new collection" do
+    headless = Headless.new
+    headless.start
+    Capybara.current_driver = :selenium
+
     foo_collection = api_fixture('collections')['foo_file']
 
     visit page_with_token('active', "/collections")
@@ -103,5 +113,7 @@ class CollectionsTest < ActionDispatch::IntegrationTest
     assert(page.has_text?('Copy to project'), "Copy to project text not found in new collection page")
     assert(page.has_no_text?(foo_collection['name']), "Collection page did not include foo file")
     assert(page.has_text?('foo'), "Collection page did not include foo file")
+
+    headless.stop
   end
 end
diff --git a/apps/workbench/test/integration/jobs_test.rb b/apps/workbench/test/integration/jobs_test.rb
new file mode 100644 (file)
index 0000000..b5fc56a
--- /dev/null
@@ -0,0 +1,29 @@
+require 'integration_helper'
+
+class JobsTest < ActionDispatch::IntegrationTest
+  test "add job description" do
+    Capybara.current_driver = Capybara.javascript_driver
+    visit page_with_token("active", "/jobs")
+
+    # go to job running the script "doesnotexist"
+    within first('tr', text: 'doesnotexist') do
+      find("a").click
+    end
+
+    # edit job description
+    within('.arv-description-as-subtitle') do
+      find('.fa-pencil').click
+      find('.editable-input textarea').set('*Textile description for job* - "Go to dashboard":/')
+      find('.editable-submit').click
+    end
+    wait_for_ajax
+
+    # Verify edited description
+    assert page.has_no_text? '*Textile description for job*'
+    assert page.has_text? 'Textile description for job'
+    assert page.has_link? 'Go to dashboard'
+    click_link 'Go to dashboard'
+    assert page.has_text? 'My projects'
+    assert page.has_text? 'Projects shared with me'
+  end
+end
index e019813a163bfb0001d03985714dfa323298360d..b996068b32b84726a2b920bbfe082ed0d460cc20 100644 (file)
@@ -158,6 +158,26 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
     assert page.has_text? 'script_version'
   end
 
+  test 'pipeline description' do
+    visit page_with_token('active_trustedclient')
+
+    visit '/pipeline_instances'
+    assert page.has_text? 'pipeline_with_job'
+
+    find('a', text: 'pipeline_with_job').click
+
+    within('.arv-description-as-subtitle') do
+      find('.fa-pencil').click
+      find('.editable-input textarea').set('*Textile description for pipeline instance*')
+      find('.editable-submit').click
+    end
+    wait_for_ajax
+
+    # verify description
+    assert page.has_no_text? '*Textile description for pipeline instance*'
+    assert page.has_text? 'Textile description for pipeline instance'
+  end
+
   test "JSON popup available for strange components" do
     uuid = api_fixture("pipeline_instances")["components_is_jobspec"]["uuid"]
     visit page_with_token("active", "/pipeline_instances/#{uuid}")
@@ -179,7 +199,7 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
       find(".selectable", text: proj_name).click
       click_on "Choose"
     end
-    assert(has_text?("From template"), "did not land on pipeline instance page")
+    assert(has_text?("This pipeline was created from the template"), "did not land on pipeline instance page")
     first("a.btn,button", text: "Choose").click
     within(".modal-body") do
       if (proj_name != PROJECT_WITH_SEARCH_COLLECTION)
index d131986d25843613a62ff10faca1fb51c11338f9..56d6f4a009ada155f3cf6eeac3f9e2fcaa33be15 100644 (file)
@@ -12,4 +12,35 @@ class PipelineTemplatesTest < ActionDispatch::IntegrationTest
     assert(page.has_text?("script_parameters"),
            "components JSON not found")
   end
+
+  test "pipeline template description" do
+    Capybara.current_driver = Capybara.javascript_driver
+    visit page_with_token("active", "/pipeline_templates")
+
+    # go to Two Part pipeline template
+    within first('tr', text: 'Two Part Pipeline Template') do
+      find(".fa-gears").click
+    end
+
+    # edit template description
+    within('.arv-description-as-subtitle') do
+      find('.fa-pencil').click
+      find('.editable-input textarea').set('*Textile description for pipeline template* - "Go to dashboard":/')
+      find('.editable-submit').click
+    end
+    wait_for_ajax
+
+    # Verfiy edited description
+    assert page.has_no_text? '*Textile description for pipeline template*'
+    assert page.has_text? 'Textile description for pipeline template'
+    assert page.has_link? 'Go to dashboard'
+    click_link 'Go to dashboard'
+    assert page.has_text? 'My projects'
+    assert page.has_text? 'Projects shared with me'
+
+    # again visit recent templates page and verify edited description
+    visit page_with_token("active", "/pipeline_templates")
+    assert page.has_no_text? '*Textile description for pipeline template*'
+    assert page.has_text? 'Textile description for pipeline template'
+  end
 end
index 260bff043c8cfc10cfc08f3d272835c0be214d51..f7cf87f718b8b67474a7fd6a7fbc223fef97452d 100644 (file)
@@ -1,4 +1,5 @@
-ENV["RAILS_ENV"] = "test"
+ENV["RAILS_ENV"] = "test" if (ENV["RAILS_ENV"] != "diagnostics")
+
 unless ENV["NO_COVERAGE_TEST"]
   begin
     require 'simplecov'
@@ -151,4 +152,6 @@ class ApiServerForTests
   end
 end
 
-ApiServerForTests.run
+if ENV["RAILS_ENV"].eql? 'test'
+  ApiServerForTests.run
+end
diff --git a/doc/_includes/_example_sdk_go.liquid b/doc/_includes/_example_sdk_go.liquid
new file mode 100644 (file)
index 0000000..08124e6
--- /dev/null
@@ -0,0 +1,109 @@
+package main
+
+
+// *******************
+// Import the modules.
+//
+// Our examples don't use keepclient, but they do use fmt and log to
+// display output.
+
+import (
+       "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "log"
+)
+
+func main() {
+
+
+       // ********************************
+       // Set up an API client user agent.
+       //
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               log.Fatalf("Error setting up arvados client %s", err.Error())
+       }
+
+
+       // *****************************************
+       // Print the full name of the current user.
+       //
+
+       type user struct {
+               Uuid     string `json:"uuid"`
+               FullName string `json:"full_name"`
+       }
+
+       var u user
+       err = arv.Call("GET", "users", "", "current", nil, &u)
+
+       if err != nil {
+               log.Fatalf("error querying current user", err.Error())
+       }
+
+       log.Printf("Logged in as %s (uuid %s)", u.FullName, u.Uuid)
+
+
+       // ********************************************************
+       // Print all fields from the first five collections returned.
+       //
+       // Note that some fields, are not returned by default and have to be
+       // requested. See below for an example.
+
+       var results map[string]interface{}
+
+       params := arvadosclient.Dict{"limit": 5}
+
+       err = arv.List("collections", params, &results)
+       if err != nil {
+               log.Fatalf("error querying collections", err.Error())
+       }
+
+       printArvadosResults(results)
+
+
+       // *********************************************************
+       // Print some fields from the first two collections returned.
+       //
+       // We also print manifest_test, which has to be explicitly requested.
+       //
+
+       collection_fields_wanted := []string{"manifest_text", "owner_uuid", "uuid"}
+       params = arvadosclient.Dict{"limit": 2, "select": collection_fields_wanted}
+
+       err = arv.List("collections", params, &results)
+       if err != nil {
+               log.Fatalf("error querying collections", err.Error())
+       }
+
+       printArvadosResults(results)
+}
+
+
+// A helper method which will print out a result map returned by
+// arvadosclient.
+func printArvadosResults(results map[string]interface{}) {
+       for key, value := range results {
+               // "items", if it exists, holds a map.
+               // So we print it prettily below.
+               if key != "items" {
+                       fmt.Println(key, ":", value)
+               }
+       }
+
+       if value, ok := results["items"]; ok {
+               items := value.([]interface{})
+               for index, item := range items {
+                       fmt.Println("===========  ", index, "  ===========")
+                       item_map := item.(map[string]interface{})
+                       if len(item_map) == 0 {
+                               fmt.Println("item", index, ": empty map")
+                       } else {
+                               for k, v := range item_map {
+                                       fmt.Println(index, k, ":", v)
+                               }
+                       }
+               }
+       }
+}
diff --git a/doc/_includes/_example_sdk_go_imports.liquid b/doc/_includes/_example_sdk_go_imports.liquid
new file mode 100644 (file)
index 0000000..fe2cfca
--- /dev/null
@@ -0,0 +1,4 @@
+import (
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
index 0e26369b388825e57f77076ef2efd060c5a29e39..58446a9edaf9ed05b5ac01deeb692affed4d5590 100644 (file)
@@ -12,54 +12,14 @@ h3. Installation
 
 You don't need to install anything. Just import the client like this. The go tools will fetch the relevant code and dependencies for you.
 
-<notextile>
-<pre><code class="userinput">import (
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-)
-</code></pre>
-</notextile>
+<notextile>{% code 'example_sdk_go_imports' as go %}</notextile>
 
-h3. Examples
+If you need pre-release client code, you can use the latest version from the repo by following "these instructions.":https://arvados.org/projects/arvados/wiki/Go#Using-Go-with-Arvados
 
-Import the module. (We import the log module here too, so we can use it in the subsequent examples.)
+h3. Example
 
-<notextile>
-<pre><code class="userinput">import (
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-       "log"
-)
-</code></pre>
-</notextile>
+You can save this source as a .go file and run it:
 
-Set up an API client user agent:
+<notextile>{% code 'example_sdk_go' as go %}</notextile>
 
-<notextile>
-<pre><code class="userinput">  arv, err := arvadosclient.MakeArvadosClient()
-       if err != nil {
-               log.Fatalf("Error setting up arvados client %s", err.Error())
-       }
-</code></pre>
-</notextile>
-
-Get the User object for the current user:
-
-<notextile>
-<pre><code class="userinput">  type user struct {
-               Uuid         string `json:"uuid"`
-               FullName     int    `json:"full_name"`
-       }
-
-       var u user
-       err := arv.Call("GET", "users", "", "current", nil, &u)
-
-       if err != nil {
-               return err
-       }
-
-       log.Printf("Logged in as %s (uuid %s)", user.Uuid, user.FullName)
-</code></pre>
-</notextile>
-
-A few more usage examples can be found in the services/keepproxy and sdk/go/keepclient directories in the arvados source tree.
+A few more usage examples can be found in the "services/keepproxy":https://arvados.org/projects/arvados/repository/revisions/master/show/services/keepproxy and "sdk/go/keepclient":https://arvados.org/projects/arvados/repository/revisions/master/show/sdk/go/keepclient directories in the arvados source tree.
index e10f94f6c16c872bae74c5201b8e01f6b6b62c8e..c12bf06dfb3a7e71799490e323533f48be46cf2e 100644 (file)
@@ -3,7 +3,8 @@ MAINTAINER Peter Amstutz <peter.amstutz@curoverse.com>
 
 USER root
 
-RUN apt-get install -y -q openjdk-7-jre-headless && \
+RUN apt-get update && \
+    apt-get install -y -q openjdk-7-jre-headless && \
     cd /tmp && \
     curl --location http://downloads.sourceforge.net/project/bio-bwa/bwa-0.7.9a.tar.bz2 -o bwa-0.7.9a.tar.bz2 && \
     tar xjf bwa-0.7.9a.tar.bz2 && \
@@ -19,4 +20,4 @@ RUN apt-get install -y -q openjdk-7-jre-headless && \
     (find . -executable -type f -print0 | xargs -0 -I {} mv {} /usr/local/bin) && \
     rm -r /tmp/samtools-0.1.19*
 
-USER crunch
\ No newline at end of file
+USER crunch
index dee57b059ebdd231d77775025644d59940fa611c..472c20bd73a283feb3149c44b6b3f534d3ed50bb 100755 (executable)
@@ -42,6 +42,8 @@
 # [--status-json path] Print JSON status report to a file or
 #                      fifo. Default: /dev/null
 #
+# [--description] Description for the pipeline instance.
+#
 # == Parameters
 #
 # [param_name=param_value]
@@ -166,6 +168,10 @@ p = Trollop::Parser.new do
       "Synonym for --run-jobs-here.",
       :short => :none,
       :type => :boolean)
+  opt(:description,
+      "Description for the pipeline instance.",
+      :short => :none,
+      :type => :string)
   stop_on [:'--']
 end
 $options = Trollop::with_standard_exception_handling p do
@@ -468,6 +474,8 @@ class WhRunPipelineInstance
         end
       end
     else
+      description = $options[:description]
+      description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
       @instance = PipelineInstance.
         create(components: @components,
                properties: {
@@ -476,6 +484,7 @@ class WhRunPipelineInstance
                  }
                },
                pipeline_template_uuid: @template[:uuid],
+               description: description,
                state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
     end
     self
index f5c4066a4d7573cf35553df79adbc5ec35220469..496136ebe3c92116cc5ddf0340267b33969b5df0 100644 (file)
@@ -92,9 +92,9 @@ def normalize(collection):
 
 
 class CollectionReader(object):
-    def __init__(self, manifest_locator_or_text, api_client=None):
+    def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
         self._api_client = api_client
-        self._keep_client = None
+        self._keep_client = keep_client
         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
@@ -153,7 +153,7 @@ class CollectionReader(object):
         # now regenerate the manifest text based on the normalized stream
 
         #print "normalizing", self._manifest_text
-        self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
+        self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
         #print "result", self._manifest_text
 
 
@@ -161,7 +161,7 @@ class CollectionReader(object):
         self._populate()
         resp = []
         for s in self._streams:
-            resp.append(StreamReader(s))
+            resp.append(StreamReader(s, keep=self._keep_client))
         return resp
 
     def all_files(self):
@@ -172,7 +172,7 @@ class CollectionReader(object):
     def manifest_text(self, strip=False):
         self._populate()
         if strip:
-            m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+            m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
             return m
         else:
             return self._manifest_text
index f0a872417a141ef0d291d9871e7fdb98e2784773..22bf327e79dfd1942e5f452c1de944707818d0cd 100644 (file)
@@ -143,6 +143,71 @@ class Keep(object):
     def put(data, **kwargs):
         return Keep.global_client_object().put(data, **kwargs)
 
+class KeepBlockCache(object):
+    # Default RAM cache is 256MiB
+    def __init__(self, cache_max=(256 * 1024 * 1024)):
+        self.cache_max = cache_max
+        self._cache = []
+        self._cache_lock = threading.Lock()
+
+    class CacheSlot(object):
+        def __init__(self, locator):
+            self.locator = locator
+            self.ready = threading.Event()
+            self.content = None
+
+        def get(self):
+            self.ready.wait()
+            return self.content
+
+        def set(self, value):
+            self.content = value
+            self.ready.set()
+
+        def size(self):
+            if self.content == None:
+                return 0
+            else:
+                return len(self.content)
+
+    def cap_cache(self):
+        '''Cap the cache size to self.cache_max'''
+        self._cache_lock.acquire()
+        try:
+            # Select all slots except those where ready.is_set() and content is
+            # None (that means there was an error reading the block).
+            self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+            sm = sum([slot.size() for slot in self._cache])
+            while len(self._cache) > 0 and sm > self.cache_max:
+                for i in xrange(len(self._cache)-1, -1, -1):
+                    if self._cache[i].ready.is_set():
+                        del self._cache[i]
+                        break
+                sm = sum([slot.size() for slot in self._cache])
+        finally:
+            self._cache_lock.release()
+
+    def reserve_cache(self, locator):
+        '''Reserve a cache slot for the specified locator,
+        or return the existing slot.'''
+        self._cache_lock.acquire()
+        try:
+            # Test if the locator is already in the cache
+            for i in xrange(0, len(self._cache)):
+                if self._cache[i].locator == locator:
+                    n = self._cache[i]
+                    if i != 0:
+                        # move it to the front
+                        del self._cache[i]
+                        self._cache.insert(0, n)
+                    return n, False
+
+            # Add a new cache slot for the locator
+            n = KeepBlockCache.CacheSlot(locator)
+            self._cache.insert(0, n)
+            return n, True
+        finally:
+            self._cache_lock.release()
 
 class KeepClient(object):
     class ThreadLimiter(object):
@@ -326,7 +391,7 @@ class KeepClient(object):
 
 
     def __init__(self, api_client=None, proxy=None, timeout=300,
-                 api_token=None, local_store=None):
+                 api_token=None, local_store=None, block_cache=None):
         """Initialize a new KeepClient.
 
         Arguments:
@@ -362,15 +427,15 @@ class KeepClient(object):
         if local_store is None:
             local_store = os.environ.get('KEEP_LOCAL_STORE')
 
+        self.block_cache = block_cache if block_cache else KeepBlockCache()
+
         if local_store:
             self.local_store = local_store
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
             self.timeout = timeout
-            self.cache_max = 256 * 1024 * 1024  # Cache is 256MiB
-            self._cache = []
-            self._cache_lock = threading.Lock()
+
             if proxy:
                 if not proxy.endswith('/'):
                     proxy += '/'
@@ -461,59 +526,6 @@ class KeepClient(object):
         _logger.debug(str(pseq))
         return pseq
 
-    class CacheSlot(object):
-        def __init__(self, locator):
-            self.locator = locator
-            self.ready = threading.Event()
-            self.content = None
-
-        def get(self):
-            self.ready.wait()
-            return self.content
-
-        def set(self, value):
-            self.content = value
-            self.ready.set()
-
-        def size(self):
-            if self.content == None:
-                return 0
-            else:
-                return len(self.content)
-
-    def cap_cache(self):
-        '''Cap the cache size to self.cache_max'''
-        self._cache_lock.acquire()
-        try:
-            self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
-            sm = sum([slot.size() for slot in self._cache])
-            while sm > self.cache_max:
-                del self._cache[-1]
-                sm = sum([slot.size() for a in self._cache])
-        finally:
-            self._cache_lock.release()
-
-    def reserve_cache(self, locator):
-        '''Reserve a cache slot for the specified locator,
-        or return the existing slot.'''
-        self._cache_lock.acquire()
-        try:
-            # Test if the locator is already in the cache
-            for i in xrange(0, len(self._cache)):
-                if self._cache[i].locator == locator:
-                    n = self._cache[i]
-                    if i != 0:
-                        # move it to the front
-                        del self._cache[i]
-                        self._cache.insert(0, n)
-                    return n, False
-
-            # Add a new cache slot for the locator
-            n = KeepClient.CacheSlot(locator)
-            self._cache.insert(0, n)
-            return n, True
-        finally:
-            self._cache_lock.release()
 
     def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
         # roots_map is a dictionary, mapping Keep service root strings
@@ -569,7 +581,7 @@ class KeepClient(object):
         locator = KeepLocator(loc_s)
         expect_hash = locator.md5sum
 
-        slot, first = self.reserve_cache(expect_hash)
+        slot, first = self.block_cache.reserve_cache(expect_hash)
         if not first:
             v = slot.get()
             return v
@@ -609,7 +621,7 @@ class KeepClient(object):
 
         # Always cache the result, then return it if we succeeded.
         slot.set(blob)
-        self.cap_cache()
+        self.block_cache.cap_cache()
         if loop.success():
             return blob
 
index 0192887b148d8b0d7dd686118a9a5592f73cd963..3488d59bc4a63167aa09a7719dc7b242ce573d03 100644 (file)
@@ -71,8 +71,8 @@ gem 'database_cleaner'
 
 gem 'themes_for_rails'
 
-gem 'arvados', '>= 0.1.20140910123800'
-gem 'arvados-cli', '>= 0.1.20140905165259'
+gem 'arvados', '>= 0.1.20140912152020'
+gem 'arvados-cli', '>= 0.1.20140912152020'
 
 # pg_power lets us use partial indexes in schema.rb in Rails 3
 gem 'pg_power'
index 25bbf11f6759eaabb85f84dd59955423cf10f4ce..69517c1ec61d210722c3fb44111f5dba5ac2aa75 100644 (file)
@@ -35,13 +35,13 @@ GEM
     addressable (2.3.6)
     andand (1.3.3)
     arel (3.0.3)
-    arvados (0.1.20140910123800)
+    arvados (0.1.20140912152020)
       activesupport (>= 3.2.13)
       andand
       google-api-client (~> 0.6.3)
       json (>= 1.7.7)
       jwt (>= 0.1.5, < 1.0.0)
-    arvados-cli (0.1.20140905165259)
+    arvados-cli (0.1.20140912152020)
       activesupport (~> 3.2, >= 3.2.13)
       andand (~> 1.3, >= 1.3.3)
       arvados (~> 0.1.0)
@@ -223,8 +223,8 @@ PLATFORMS
 DEPENDENCIES
   acts_as_api
   andand
-  arvados (>= 0.1.20140910123800)
-  arvados-cli (>= 0.1.20140905165259)
+  arvados (>= 0.1.20140912152020)
+  arvados-cli (>= 0.1.20140912152020)
   coffee-rails (~> 3.2.0)
   database_cleaner
   factory_girl_rails
index f856dd64062bdbc5b120d2a2e8e7fbf39f4f142b..c5b2bcf2f2d4a97f5bac9b18e6f0456707e59210 100644 (file)
@@ -26,6 +26,7 @@ class Arvados::V1::SchemaController < ApplicationController
         rootUrl: root_url,
         servicePath: "arvados/v1/",
         batchPath: "batch",
+        defaultTrashLifetime: Rails.application.config.default_trash_lifetime,
         parameters: {
           alt: {
             type: "string",
index 84101ffdd44424363ed91bfb4d42bf97a1c09bd6..accd2cc62c7bc049518481efdcbf49db592f325a 100644 (file)
@@ -10,6 +10,9 @@ class Collection < ArvadosModel
   before_validation :set_portable_data_hash
   validate :ensure_hash_matches_manifest_text
 
+  # Query only undeleted collections by default.
+  default_scope where("expires_at IS NULL or expires_at > CURRENT_TIMESTAMP")
+
   api_accessible :user, extend: :common do |t|
     t.add :name
     t.add :description
index 75de61cb3a9c3733d49c45ab9c83ea5b47836913..32f28e3582d509930a4bdf3339659035f6d9b5d5 100644 (file)
@@ -40,6 +40,7 @@ class Job < ArvadosModel
     t.add :repository
     t.add :supplied_script_version
     t.add :docker_image_locator
+    t.add :description
   end
 
   def assert_finished
index 71d4dea2c0cc815c7b29c30c8d0d7dac40c31cf1..960142e977238c0b804ef0ad5b30284ec0049151 100644 (file)
@@ -20,16 +20,19 @@ class Node < ArvadosModel
     t.add :slot_number
     t.add :status
     t.add :crunch_worker_state
+    t.add :info
   end
   api_accessible :superuser, :extend => :user do |t|
     t.add :first_ping_at
-    t.add :info
     t.add lambda { |x| @@nameservers }, :as => :nameservers
   end
 
   def info
-    @info ||= Hash.new
-    super
+    if current_user.andand.current_user.is_admin
+      super
+    else
+      super.select { |k| not k.to_s.include? "secret" }
+    end
   end
 
   def domain
index 752391862a3d2e25c9cfe26a85913496bd5145a6..28345d51f507e8eebe9f9baf9a7640f4460a1041 100644 (file)
@@ -22,6 +22,7 @@ class PipelineInstance < ArvadosModel
     t.add :properties
     t.add :state
     t.add :components_summary
+    t.add :description
     t.add :started_at
     t.add :finished_at
   end
index 2cf2ce2217f3cc5abfea561579aa315ddc848e02..78d1d513c419dfe5e6e35d345b079ee41a7792b2 100644 (file)
@@ -196,3 +196,6 @@ common:
   # source_version
   source_version: "<%= `git log -n 1 --format=%h` %>"
   local_modified: false
+
+  # Default lifetime for ephemeral collections: 2 weeks.
+  default_trash_lifetime: 1209600
diff --git a/services/api/db/migrate/20140911221252_add_description_to_pipeline_instances_and_jobs.rb b/services/api/db/migrate/20140911221252_add_description_to_pipeline_instances_and_jobs.rb
new file mode 100644 (file)
index 0000000..53d3a13
--- /dev/null
@@ -0,0 +1,11 @@
+class AddDescriptionToPipelineInstancesAndJobs < ActiveRecord::Migration
+  def up
+    add_column :pipeline_instances, :description, :text, null: true
+    add_column :jobs, :description, :text, null: true
+  end
+
+  def down
+    remove_column :jobs, :description
+    remove_column :pipeline_instances, :description
+  end
+end
index 2168739ddf76b5f4acdbd84828c98d53b9c4383f..bd69102992a44e30d904283c0195baaf39228289 100644 (file)
@@ -430,7 +430,8 @@ CREATE TABLE jobs (
     repository character varying(255),
     supplied_script_version character varying(255),
     docker_image_locator character varying(255),
-    priority integer DEFAULT 0 NOT NULL
+    priority integer DEFAULT 0 NOT NULL,
+    description text
 );
 
 
@@ -680,9 +681,9 @@ CREATE TABLE pipeline_instances (
     properties text,
     state character varying(255),
     components_summary text,
-    description text,
     started_at timestamp without time zone,
-    finished_at timestamp without time zone
+    finished_at timestamp without time zone,
+    description text
 );
 
 
@@ -2007,8 +2008,6 @@ INSERT INTO schema_migrations (version) VALUES ('20140714184006');
 
 INSERT INTO schema_migrations (version) VALUES ('20140811184643');
 
-INSERT INTO schema_migrations (version) VALUES ('20140815171049');
-
 INSERT INTO schema_migrations (version) VALUES ('20140817035914');
 
 INSERT INTO schema_migrations (version) VALUES ('20140818125735');
@@ -2017,4 +2016,6 @@ INSERT INTO schema_migrations (version) VALUES ('20140826180337');
 
 INSERT INTO schema_migrations (version) VALUES ('20140828141043');
 
-INSERT INTO schema_migrations (version) VALUES ('20140909183946');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20140909183946');
+
+INSERT INTO schema_migrations (version) VALUES ('20140911221252');
\ No newline at end of file
index ec1ae7bdc478a08cc172eca7c30ffd7eecda1cba..1b386556e62ca1363e0c31bad1645236687602b0 100644 (file)
@@ -15,5 +15,9 @@ namespace :config do
         end
       end
     end
+    # default_trash_lifetime cannot be less than 24 hours
+    if Rails.configuration.default_trash_lifetime < 86400 then
+      raise "default_trash_lifetime is %d, must be at least 86400" % Rails.configuration.default_trash_lifetime
+    end
   end
 end
index bb7ce7e12dee7efe70ceeff06d506c1c18b23392..d316d92d605ce7c94e5c0eb8c0697d3f0f9c1ab2 100755 (executable)
@@ -342,12 +342,16 @@ class Dispatcher
             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
             $stderr.puts line
             pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
-            j[:stderr_buf_to_flush] << pub_msg
+            if not j[:log_truncated]
+              j[:stderr_buf_to_flush] << pub_msg
+            end
           end
 
-          if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
-              (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
-            write_log j
+          if not j[:log_truncated]
+            if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+                (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
+              write_log j
+            end
           end
         end
       end
@@ -512,36 +516,32 @@ class Dispatcher
 
   # send message to log table. we want these records to be transient
   def write_log running_job
+    return if running_job[:log_truncated]
+    return if running_job[:stderr_buf_to_flush] == ''
     begin
-      if (running_job && running_job[:stderr_buf_to_flush] != '')
-        # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
-        # or crunch_limit_log_events_per_job.
-        if (too_many_bytes_logged_for_job(running_job))
-          return if running_job[:log_truncated]
-          running_job[:log_truncated] = true
-          running_job[:stderr_buf_to_flush] =
-              "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
-        elsif (too_many_events_logged_for_job(running_job))
-          return if running_job[:log_truncated]
-          running_job[:log_truncated] = true
-          running_job[:stderr_buf_to_flush] =
-              "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
-        end
-        log = Log.new(object_uuid: running_job[:job].uuid,
-                      event_type: 'stderr',
-                      owner_uuid: running_job[:job].owner_uuid,
-                      properties: {"text" => running_job[:stderr_buf_to_flush]})
-        log.save!
-        running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
-        running_job[:events_logged] += 1
-        running_job[:stderr_buf_to_flush] = ''
-        running_job[:stderr_flushed_at] = Time.now.to_i
+      # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+      # or crunch_limit_log_events_per_job.
+      if (too_many_bytes_logged_for_job(running_job))
+        running_job[:log_truncated] = true
+        running_job[:stderr_buf_to_flush] =
+          "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+      elsif (too_many_events_logged_for_job(running_job))
+        running_job[:log_truncated] = true
+        running_job[:stderr_buf_to_flush] =
+          "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
       end
+      log = Log.new(object_uuid: running_job[:job].uuid,
+                    event_type: 'stderr',
+                    owner_uuid: running_job[:job].owner_uuid,
+                    properties: {"text" => running_job[:stderr_buf_to_flush]})
+      log.save!
+      running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
+      running_job[:events_logged] += 1
     rescue
-      running_job[:stderr_buf] = "Failed to write logs \n"
-      running_job[:stderr_buf_to_flush] = ''
-      running_job[:stderr_flushed_at] = Time.now.to_i
+      running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
     end
+    running_job[:stderr_buf_to_flush] = ''
+    running_job[:stderr_flushed_at] = Time.now.to_i
   end
 
 end
index acc6cc02b4ef1c20b10d3778117aacf00e1f1d30..18531174e0937991f17abbbc8bc43fe98dc90480 100644 (file)
@@ -173,3 +173,29 @@ collection_to_move_around_in_aproject:
   updated_at: 2014-02-03T17:22:54Z
   manifest_text: ". 73feffa4b7f6bb68e44cf984c85f6e88+3 0:3:baz\n"
   name: collection_to_move_around
+
+expired_collection:
+  uuid: zzzzz-4zz18-mto52zx1s7sn3ih
+  portable_data_hash: 0b21a217243bfce5617fb9224b95bcb9+49
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2014-02-03T17:22:54Z
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  modified_at: 2014-02-03T17:22:54Z
+  updated_at: 2014-02-03T17:22:54Z
+  expires_at: 2001-01-01T00:00:00Z
+  manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:expired\n"
+  name: expired_collection
+
+collection_expires_in_future:
+  uuid: zzzzz-4zz18-padkqo7yb8d9i3j
+  portable_data_hash: 0b21a217243bfce5617fb9224b95bcb9+49
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  created_at: 2014-02-03T17:22:54Z
+  modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+  modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  modified_at: 2014-02-03T17:22:54Z
+  updated_at: 2014-02-03T17:22:54Z
+  expires_at: 2038-01-01T00:00:00Z
+  manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:expired\n"
+  name: collection_expires_in_future
index 8e3e631ac36a6a2f87301f5af22ed6fb843edb20..f2c15c656b3f2570adf211cd6fdb01222cf3bfb7 100644 (file)
@@ -559,4 +559,61 @@ EOS
     "Collection should not exist in database after failed create"
   end
 
+  test 'List expired collection returns empty list' do
+    authorize_with :active
+    get :index, {
+      where: {name: 'expired_collection'},
+    }
+    assert_response :success
+    found = assigns(:objects)
+    assert_equal 0, found.count
+  end
+
+  test 'Show expired collection returns 404' do
+    authorize_with :active
+    get :show, {
+      id: 'zzzzz-4zz18-mto52zx1s7sn3ih',
+    }
+    assert_response 404
+  end
+
+  test 'Update expired collection returns 404' do
+    authorize_with :active
+    post :update, {
+      id: 'zzzzz-4zz18-mto52zx1s7sn3ih',
+      collection: {
+        name: "still expired"
+      }
+    }
+    assert_response 404
+  end
+
+  test 'List collection with future expiration time succeeds' do
+    authorize_with :active
+    get :index, {
+      where: {name: 'collection_expires_in_future'},
+    }
+    found = assigns(:objects)
+    assert_equal 1, found.count
+  end
+
+
+  test 'Show collection with future expiration time succeeds' do
+    authorize_with :active
+    get :show, {
+      id: 'zzzzz-4zz18-padkqo7yb8d9i3j',
+    }
+    assert_response :success
+  end
+
+  test 'Update collection with future expiration time succeeds' do
+    authorize_with :active
+    post :update, {
+      id: 'zzzzz-4zz18-padkqo7yb8d9i3j',
+      collection: {
+        name: "still not expired"
+      }
+    }
+    assert_response :success
+  end
 end
index 06695aa6a762a9871deef9f820dec14c33eefedb..28a183026758e029631e685c2970118bcd445fd7 100644 (file)
@@ -76,6 +76,7 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
   end
 
   test "ping adds node stats to info" do
+    authorize_with :admin
     node = nodes(:idle)
     post :ping, {
       id: node.uuid,
index 816834b2071471611d4add36fee89124a8743ca8..520e36ec373e36a2865a4db4bd45da3f241b66fb 100644 (file)
@@ -13,4 +13,11 @@ class Arvados::V1::SchemaControllerTest < ActionController::TestCase
                  "discovery document was generated >#{MAX_SCHEMA_AGE}s ago")
   end
 
+  test "discovery document has defaultTrashLifetime" do
+    get :index
+    assert_response :success
+    discovery_doc = JSON.parse(@response.body)
+    assert_includes discovery_doc, 'defaultTrashLifetime'
+    assert_equal discovery_doc['defaultTrashLifetime'], Rails.application.config.default_trash_lifetime
+  end
 end
index 4d2dfee546c1a73ceddffe97639e8c07452f62d2..f49b94777b76884ace7275a11367c83856c2b481 100644 (file)
@@ -34,12 +34,18 @@ class SafeApi(object):
         self.token = config.get('ARVADOS_API_TOKEN')
         self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
         self.local = threading.local()
+        self.block_cache = arvados.KeepBlockCache()
 
     def localapi(self):
         if 'api' not in self.local.__dict__:
             self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
         return self.local.api
 
+    def localkeep(self):
+        if 'keep' not in self.local.__dict__:
+            self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
+        return self.local.keep
+
     def collections(self):
         return self.localapi().collections()
 
@@ -307,7 +313,7 @@ class CollectionDirectory(Directory):
             self.collection_object_file.update(self.collection_object)
 
         self.clear()
-        collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
+        collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
         for s in collection.all_streams():
             cwd = self
             for part in s.name().split('/'):
@@ -341,6 +347,10 @@ class CollectionDirectory(Directory):
             else:
                 _logger.error("arv-mount %s: error", self.collection_locator)
                 _logger.exception(detail)
+        except arvados.errors.ArgumentError as detail:
+            _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
+            if self.collection_object is not None and "manifest_text" in self.collection_object:
+                _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
         except Exception as detail:
             _logger.error("arv-mount %s: error", self.collection_locator)
             if self.collection_object is not None and "manifest_text" in self.collection_object:
@@ -796,7 +806,11 @@ class Operations(llfuse.Operations):
         try:
             with llfuse.lock_released:
                 return handle.entry.readfrom(off, size)
-        except:
+        except arvados.errors.NotFoundError as e:
+            _logger.warning("Block not found: " + str(e))
+            raise llfuse.FUSEError(errno.EIO)
+        except Exception as e:
+            _logger.exception(e)
             raise llfuse.FUSEError(errno.EIO)
 
     def release(self, fh):
index 55f565bf7ae5be414318f190f79a5ac066b109ba..9627777a527aa87a2040eec56a9f7027086aaa94 100644 (file)
@@ -261,6 +261,7 @@ class FuseSharedTest(MountTestBase):
                           "Empty collection.link",
                           "Pipeline Template with Input Parameter with Search.pipelineTemplate",
                           "Pipeline Template with Jobspec Components.pipelineTemplate",
+                          "collection_expires_in_future",
                           "pipeline_with_job.pipelineInstance"
                       ], d2)
 
diff --git a/services/keepstore/block_work_list.go b/services/keepstore/block_work_list.go
new file mode 100644 (file)
index 0000000..4c5aeb1
--- /dev/null
@@ -0,0 +1,160 @@
+package main
+
+/* A BlockWorkList is an asynchronous thread-safe queue manager.  It
+   provides a channel from which items can be read off the queue, and
+   permits replacing the contents of the queue at any time.
+
+   The overall work flow for a BlockWorkList is as follows:
+
+     1. A BlockWorkList is created with NewBlockWorkList().  This
+        function instantiates a new BlockWorkList and starts a manager
+        goroutine.  The manager listens on an input channel
+        (manager.newlist) and an output channel (manager.NextItem).
+
+     2. The manager first waits for a new list of requests on the
+        newlist channel.  When another goroutine calls
+        manager.ReplaceList(lst), it sends lst over the newlist
+        channel to the manager.  The manager goroutine now has
+        ownership of the list.
+
+     3. Once the manager has this initial list, it listens on both the
+        input and output channels for one of the following to happen:
+
+          a. A worker attempts to read an item from the NextItem
+             channel.  The manager sends the next item from the list
+             over this channel to the worker, and loops.
+
+          b. New data is sent to the manager on the newlist channel.
+             This happens when another goroutine calls
+             manager.ReplaceItem() with a new list.  The manager
+             discards the current list, replaces it with the new one,
+             and begins looping again.
+
+          c. The input channel is closed.  The manager closes its
+             output channel (signalling any workers to quit) and
+             terminates.
+
+   Tasks currently handled by BlockWorkList:
+     * the pull list
+     * the trash list
+
+   Example usage:
+
+        // Any kind of user-defined type can be used with the
+        // BlockWorkList.
+               type FrobRequest struct {
+                       frob string
+               }
+
+               // Make a work list.
+               froblist := NewBlockWorkList()
+
+               // Start a concurrent worker to read items from the NextItem
+               // channel until it is closed, deleting each one.
+               go func(list BlockWorkList) {
+                       for i := range list.NextItem {
+                               req := i.(FrobRequest)
+                               frob.Run(req)
+                       }
+               }(froblist)
+
+               // Set up a HTTP handler for PUT /frob
+               router.HandleFunc(`/frob`,
+                       func(w http.ResponseWriter, req *http.Request) {
+                               // Parse the request body into a list.List
+                               // of FrobRequests, and give this list to the
+                               // frob manager.
+                               newfrobs := parseBody(req.Body)
+                               froblist.ReplaceList(newfrobs)
+                       }).Methods("PUT")
+
+   Methods available on a BlockWorkList:
+
+               ReplaceList(list)
+                       Replaces the current item list with a new one.  The list
+            manager discards any unprocessed items on the existing
+            list and replaces it with the new one. If the worker is
+            processing a list item when ReplaceList is called, it
+            finishes processing before receiving items from the new
+            list.
+               Close()
+                       Shuts down the manager goroutine. When Close is called,
+                       the manager closes the NextItem channel.
+*/
+
+import "container/list"
+
+type BlockWorkList struct {
+       newlist  chan *list.List
+       NextItem chan interface{}
+}
+
+// NewBlockWorkList returns a new worklist, and launches a listener
+// goroutine that waits for work and farms it out to workers.
+//
+func NewBlockWorkList() *BlockWorkList {
+       b := BlockWorkList{
+               newlist:  make(chan *list.List),
+               NextItem: make(chan interface{}),
+       }
+       go b.listen()
+       return &b
+}
+
+// ReplaceList sends a new list of pull requests to the manager goroutine.
+// The manager will discard any outstanding pull list and begin
+// working on the new list.
+//
+func (b *BlockWorkList) ReplaceList(list *list.List) {
+       b.newlist <- list
+}
+
+// Close shuts down the manager and terminates the goroutine, which
+// completes any pull request in progress and abandons any pending
+// requests.
+//
+func (b *BlockWorkList) Close() {
+       close(b.newlist)
+}
+
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+// listen takes ownership of the list that is passed to it.
+//
+// Note that the routine does not ever need to access the list
+// itself once the current_item has been initialized, so we do
+// not bother to keep a pointer to the list. Because it is a
+// doubly linked list, holding on to the current item will keep
+// it from garbage collection.
+//
+func (b *BlockWorkList) listen() {
+       var current_item *list.Element
+
+       // When we're done, close the output channel to shut down any
+       // workers.
+       defer close(b.NextItem)
+
+       for {
+               // If the current list is empty, wait for a new list before
+               // even checking if workers are ready.
+               if current_item == nil {
+                       if p, ok := <-b.newlist; ok {
+                               current_item = p.Front()
+                       } else {
+                               // The channel was closed; shut down.
+                               return
+                       }
+               }
+               select {
+               case p, ok := <-b.newlist:
+                       if ok {
+                               current_item = p.Front()
+                       } else {
+                               // The input channel is closed; time to shut down
+                               return
+                       }
+               case b.NextItem <- current_item.Value:
+                       current_item = current_item.Next()
+               }
+       }
+}
diff --git a/services/keepstore/block_work_list_test.go b/services/keepstore/block_work_list_test.go
new file mode 100644 (file)
index 0000000..c3df400
--- /dev/null
@@ -0,0 +1,149 @@
+package main
+
+import (
+       "container/list"
+       "testing"
+)
+
+func makeTestWorkList(ary []int) *list.List {
+       l := list.New()
+       for _, n := range ary {
+               l.PushBack(n)
+       }
+       return l
+}
+
+func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
+       select {
+       case item := <-c:
+               t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
+       default:
+               // no-op
+       }
+}
+
+func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
+       if item, ok := <-c; !ok {
+               t.Fatal("expected data on a closed channel")
+       } else if item == nil {
+               t.Fatal("expected data on an empty channel")
+       }
+}
+
+func expectChannelClosed(t *testing.T, c <-chan interface{}) {
+       received, ok := <-c
+       if ok {
+               t.Fatalf("Expected channel to be closed, but received %v instead", received)
+       }
+}
+
+func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
+       for i := range expected {
+               actual, ok := <-c
+               t.Logf("received %v", actual)
+               if !ok {
+                       t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
+               } else if actual.(int) != expected[i] {
+                       t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
+               }
+       }
+}
+
+// Create a BlockWorkList, generate a list for it, and instantiate a worker.
+func TestBlockWorkListReadWrite(t *testing.T) {
+       var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+
+       b := NewBlockWorkList()
+       b.ReplaceList(makeTestWorkList(input))
+
+       expectFromChannel(t, b.NextItem, input)
+       expectChannelEmpty(t, b.NextItem)
+       b.Close()
+}
+
+// Start a worker before the list has any input.
+func TestBlockWorkListEarlyRead(t *testing.T) {
+       var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+
+       b := NewBlockWorkList()
+
+       // First, demonstrate that nothing is available on the NextItem
+       // channel.
+       expectChannelEmpty(t, b.NextItem)
+
+       // Start a reader in a goroutine. The reader will block until the
+       // block work list has been initialized.
+       //
+       done := make(chan int)
+       go func() {
+               expectFromChannel(t, b.NextItem, input)
+               b.Close()
+               done <- 1
+       }()
+
+       // Feed the blocklist a new worklist, and wait for the worker to
+       // finish.
+       b.ReplaceList(makeTestWorkList(input))
+       <-done
+
+       expectChannelClosed(t, b.NextItem)
+}
+
+// Show that a reader may block when the manager's list is exhausted,
+// and that the reader resumes automatically when new data is
+// available.
+func TestBlockWorkListReaderBlocks(t *testing.T) {
+       var (
+               inputBeforeBlock = []int{1, 2, 3, 4, 5}
+               inputAfterBlock  = []int{6, 7, 8, 9, 10}
+       )
+
+       b := NewBlockWorkList()
+       sendmore := make(chan int)
+       done := make(chan int)
+       go func() {
+               expectFromChannel(t, b.NextItem, inputBeforeBlock)
+
+               // Confirm that the channel is empty, so a subsequent read
+               // on it will block.
+               expectChannelEmpty(t, b.NextItem)
+
+               // Signal that we're ready for more input.
+               sendmore <- 1
+               expectFromChannel(t, b.NextItem, inputAfterBlock)
+               b.Close()
+               done <- 1
+       }()
+
+       // Write a slice of the first five elements and wait for the
+       // reader to signal that it's ready for us to send more input.
+       b.ReplaceList(makeTestWorkList(inputBeforeBlock))
+       <-sendmore
+
+       b.ReplaceList(makeTestWorkList(inputAfterBlock))
+
+       // Wait for the reader to complete.
+       <-done
+}
+
+// Replace one active work list with another.
+func TestBlockWorkListReplaceList(t *testing.T) {
+       var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+       var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+
+       b := NewBlockWorkList()
+       b.ReplaceList(makeTestWorkList(firstInput))
+
+       // Read just the first five elements from the work list.
+       // Confirm that the channel is not empty.
+       expectFromChannel(t, b.NextItem, firstInput[0:5])
+       expectChannelNotEmpty(t, b.NextItem)
+
+       // Replace the work list and read five more elements.
+       // The old list should have been discarded and all new
+       // elements come from the new list.
+       b.ReplaceList(makeTestWorkList(replaceInput))
+       expectFromChannel(t, b.NextItem, replaceInput[0:5])
+
+       b.Close()
+}
index 34e711f19758cf0cd15760341516f63dd790113a..deb1c3dd110d146d50efa1e7bd5fc3e8711c5281 100644 (file)
@@ -663,11 +663,14 @@ func TestPullHandler(t *testing.T) {
 
        // The Keep pull manager should have received one good list with 3
        // requests on it.
-       var saved_pull_list = pullmgr.GetList()
-       if len(saved_pull_list) != 3 {
-               t.Errorf(
-                       "saved_pull_list: expected 3 elements, got %d\nsaved_pull_list = %v",
-                       len(saved_pull_list), saved_pull_list)
+       var output_list = make([]PullRequest, 3)
+       for i := 0; i < 3; i++ {
+               item := <-pullmgr.NextItem
+               if pr, ok := item.(PullRequest); ok {
+                       output_list[i] = pr
+               } else {
+                       t.Errorf("item %v could not be parsed as a PullRequest", item)
+               }
        }
 }
 
index 84fa6a6a88cf5a10b16a2280d6948dfda972e2f5..809520769754506f8c3bae8f7b49db6df3bf4bb0 100644 (file)
@@ -10,10 +10,10 @@ package main
 import (
        "bufio"
        "bytes"
+       "container/list"
        "crypto/md5"
        "encoding/json"
        "fmt"
-       "git.curoverse.com/arvados.git/services/keepstore/pull_list"
        "github.com/gorilla/mux"
        "io"
        "log"
@@ -436,6 +436,11 @@ func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
    If the JSON unmarshalling fails, return 400 Bad Request.
 */
 
+type PullRequest struct {
+       Locator string   `json:"locator"`
+       Servers []string `json:"servers"`
+}
+
 func PullHandler(resp http.ResponseWriter, req *http.Request) {
        // Reject unauthorized requests.
        api_token := GetApiToken(req)
@@ -446,9 +451,9 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
        }
 
        // Parse the request body.
-       var plist []pull_list.PullRequest
+       var pr []PullRequest
        r := json.NewDecoder(req.Body)
-       if err := r.Decode(&plist); err != nil {
+       if err := r.Decode(&pr); err != nil {
                http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
                log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
                return
@@ -457,15 +462,20 @@ func PullHandler(resp http.ResponseWriter, req *http.Request) {
        // We have a properly formatted pull list sent from the data
        // manager.  Report success and send the list to the pull list
        // manager for further handling.
-       log.Printf("%s %s: received %v\n", req.Method, req.URL, plist)
+       log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
        resp.WriteHeader(http.StatusOK)
        resp.Write([]byte(
-               fmt.Sprintf("Received %d pull requests\n", len(plist))))
+               fmt.Sprintf("Received %d pull requests\n", len(pr))))
+
+       plist := list.New()
+       for _, p := range pr {
+               plist.PushBack(p)
+       }
 
        if pullmgr == nil {
-               pullmgr = pull_list.NewManager()
+               pullmgr = NewBlockWorkList()
        }
-       pullmgr.SetList(plist)
+       pullmgr.ReplaceList(plist)
 }
 
 // ==============================
index e5bd3bffb5450236981f3d2c336840b014383c1f..06054f5205cf7d38b01b179ee5365230120ccc89 100644 (file)
@@ -4,7 +4,6 @@ import (
        "bytes"
        "flag"
        "fmt"
-       "git.curoverse.com/arvados.git/services/keepstore/pull_list"
        "io/ioutil"
        "log"
        "net"
@@ -97,7 +96,7 @@ var KeepVM VolumeManager
 // keepstore servers in order to increase data replication) with
 // atomic update methods that are safe to use from multiple
 // goroutines.
-var pullmgr *pull_list.Manager
+var pullmgr *BlockWorkList
 
 // TODO(twp): continue moving as much code as possible out of main
 // so it can be effectively tested. Esp. handling and postprocessing
diff --git a/services/keepstore/pull_list/pull_list.go b/services/keepstore/pull_list/pull_list.go
deleted file mode 100644 (file)
index 71d7821..0000000
+++ /dev/null
@@ -1,81 +0,0 @@
-package pull_list
-
-/* The pull_list package manages a list of pull requests sent
-   by Data Manager.
-
-   The interface is:
-
-   pull_list.NewManager() creates and returns a pull_list.Manager. A
-   listener runs in a goroutine, waiting for new requests on its input
-   channels.
-
-   pull_list.SetList() assigns a new pull list to the manager. Any
-   existing list is discarded.
-
-   pull_list.GetList() reports the manager's current pull list.
-
-   pull_list.Close() shuts down the pull list manager.
-*/
-
-type PullRequest struct {
-       Locator string
-       Servers []string
-}
-
-type Manager struct {
-       setlist chan []PullRequest // input channel for setting new lists
-       getlist chan []PullRequest // output channel for getting existing list
-}
-
-// NewManager returns a new Manager object.  It launches a goroutine that
-// waits for pull requests.
-//
-func NewManager() *Manager {
-       r := Manager{
-               make(chan []PullRequest),
-               make(chan []PullRequest),
-       }
-       go r.listen()
-       return &r
-}
-
-// SetList sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
-//
-func (r *Manager) SetList(pr []PullRequest) {
-       r.setlist <- pr
-}
-
-// GetList reports the contents of the current pull list.
-func (r *Manager) GetList() []PullRequest {
-       return <-r.getlist
-}
-
-// Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
-//
-func (r *Manager) Close() {
-       close(r.setlist)
-}
-
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-func (r *Manager) listen() {
-       var current []PullRequest
-       for {
-               select {
-               case p, ok := <-r.setlist:
-                       if ok {
-                               current = p
-                       } else {
-                               // The input channel is closed; time to shut down
-                               close(r.getlist)
-                               return
-                       }
-               case r.getlist <- current:
-                       // no-op
-               }
-       }
-}