end
end
+ def textile_attributes
+ [ 'description' ]
+ end
+
end
end
def attribute_editable? attr, *args
- false
+ if attr.to_sym == :description
+ super && attr.to_sym == :description
+ else
+ false
+ end
end
def self.creatable?
def cancel
arvados_api_client.api "jobs/#{self.uuid}/", "cancel", {}
end
+
+ def textile_attributes
+ [ 'description' ]
+ end
end
end
def attribute_editable? attr, *args
- super && (attr.to_sym == :name ||
+ super && (attr.to_sym == :name || attr.to_sym == :description ||
(attr.to_sym == :components and
(self.state == 'New' || self.state == 'Ready')))
end
"\"#{input_name.to_s}\" parameter for #{component[:script]} script in #{component_name} component"
end
end
+
+ def textile_attributes
+ [ 'description' ]
+ end
end
def self.creatable?
false
end
+
+ def textile_attributes
+ [ 'description' ]
+ end
end
<%= message %><br />
<% end %>
<% else %>
- <% if obj.attribute_editable?(attr) and (!defined?(editable) || editable) %>
+ <% if attr and obj.attribute_editable?(attr) and (!defined?(editable) || editable) %>
<% if resource_class_for_uuid(attrvalue, {referring_object: obj, referring_attr: attr}) %>
<%= link_to_if_arvados_object attrvalue, {referring_attr: attr, referring_object: obj, with_class_name: true, friendly_name: true} %>
<br>
--- /dev/null
+<% if @object.respond_to? :name %>
+ <h2>
+ <%= render_editable_attribute @object, 'name', nil, { 'data-emptytext' => "New #{controller.model_class.to_s.underscore.gsub("_"," ")}" } %>
+ </h2>
+<% end %>
+
+<% if @object.respond_to? :description %>
+ <div class="arv-description-as-subtitle">
+ <%= render_editable_attribute @object, 'description', nil, { 'data-emptytext' => "(No description provided)", 'data-toggle' => 'manual' } %>
+ </div>
+<% end %>
+
<% end %>
<% content_for :content_top do %>
+ <% if !['Group','User', 'Collection'].include? @object.class.to_s # projects and collections handle it themselves %>
+ <%= render partial: 'name_and_description' %>
+ <% end %>
<% if @object.respond_to? :properties and !@object.properties.nil? %>
<% if @object.properties[:page_content] %>
</h3>
</div>
<div class="panel-body">
+ <div class="arv-description-as-subtitle">
+ <%= render_editable_attribute @object, 'description', nil, { 'data-emptytext' => "(No description provided)", 'data-toggle' => 'manual' } %>
+ </div>
<img src="/favicon.ico" class="pull-right" alt="" style="opacity: 0.3"/>
<% if defined? @same_pdh %>
<p>Found in collections:<p>
<% template = PipelineTemplate.find?(@object.pipeline_template_uuid) %>
<%= content_for :content_top do %>
- <h2>
- <%= render_editable_attribute @object, 'name', nil %>
- </h2>
- <% if template %>
- <blockquote><span class="deemphasize">From template:</span><br />
- <%= link_to_if_arvados_object template, friendly_name: true %><br />
- <%= template.description %>
- </blockquote>
- <% end %>
+ <div class="row">
+ <div class="col-sm-6">
+ <%= render partial: 'name_and_description' %>
+ </div>
+ <% if template %>
+ <div class="alert alert-info col-sm-6">
+ This pipeline was created from the template <%= link_to_if_arvados_object template, friendly_name: true %><br />
+ <% if template.modified_at && (template.modified_at > @object.created_at) %>
+ Note: This template has been modified since this instance was created.
+ <% end %>
+ </div>
+ <% end %>
+ </div>
<% end %>
<% content_for :tab_line_buttons do %>
action_method: 'post',
action_data: {selection_param: 'pipeline_instance[owner_uuid]',
'pipeline_instance[pipeline_template_uuid]' => ob.uuid,
+ 'pipeline_instance[description]' => "Created at #{Time.now.localtime}" + (ob.name.andand.size.andand>0 ? " using the pipeline template *#{ob.name}*" : ""),
'success' => 'redirect-to-created-object'
}.to_json),
{ class: "btn btn-default btn-xs", title: "Run #{ob.name}", remote: true, method: 'get' }
<%= render_editable_attribute ob, 'name' %>
</td><td>
<% if ob.respond_to?(:description) and ob.description %>
- <%= ob.description %>
+ <%= render_attribute_as_textile(ob, "description", ob.description, false) %>
<br />
<% end %>
<% ob.components.collect { |k,v| k.to_s }.each do |k| %>
action_method: 'post',
action_data: {selection_param: 'pipeline_instance[owner_uuid]',
'pipeline_instance[pipeline_template_uuid]' => @object.uuid,
+ 'pipeline_instance[description]' => "Created at #{Time.now.localtime}" + (@object.name.andand.size.andand>0 ? " using the pipeline template *#{@object.name}*" : ""),
'success' => 'redirect-to-created-object'
}.to_json),
{ class: "btn btn-primary btn-sm", remote: true, method: 'get' }
<% if @object.uuid != current_user.uuid # Not the "Home" project %>
<% content_for :content_top do %>
-
-<h2>
- <%= render_editable_attribute @object, 'name', nil, { 'data-emptytext' => "New project" } %>
-</h2>
-
-<div class="arv-description-as-subtitle">
- <%= render_editable_attribute @object, 'description', nil, { 'data-emptytext' => "(No description provided)", 'data-toggle' => 'manual' } %>
-</div>
-
+ <%= render partial: 'name_and_description' %>
<% end %>
<% end %>
# Do not use this file for site configuration. Create application.yml
# instead (see application.yml.example).
+# Below is a sample setting for diagnostics testing.
+# Configure workbench URL as "arvados_workbench_url"
+# Configure test user tokens as "user_tokens".
+# At this time the tests need an "active" user token.
+# Also, configure the pipelines to be executed as "pipelines_to_test".
+# For each of the pipelines identified by the name of your choice
+# ("pipeline_1" and "pipeline_2" in this sample), provide the following:
+# template_uuid: is the uuid of the template to be executed
+# input_paths: an array of inputs for the pipeline. Use either a collection's "uuid"
+# or a file's "uuid/file_name" path in this array. If the pipeline does not require
+# any inputs, this can be omitted.
+# max_wait_seconds: max time in seconds to wait for the pipeline run to complete.
+# Default value of 30 seconds is used when this value is not provided.
+diagnostics:
+ arvados_workbench_url: https://localhost:3000
+ user_tokens:
+ active: eu33jurqntstmwo05h1jr3eblmi961e802703y6657s8zb14r
+ pipelines_to_test:
+ pipeline_1:
+ template_uuid: zzzzz-p5p6p-rxj8d71854j9idn
+ input_paths: [zzzzz-4zz18-nz98douzhaa3jh2]
+ max_wait_seconds: 10
+ pipeline_2:
+ template_uuid: zzzzz-p5p6p-1xbobfobk94ppbv
+ input_paths: [zzzzz-4zz18-nz98douzhaa3jh2, zzzzz-4zz18-gpw9o5wpcti3nib]
+
development:
cache_classes: false
eager_load: true
database: db/production.sqlite3
pool: 5
timeout: 5000
+
+# Note: The "diagnostics" database configuration is not actually used.
+diagnostics:
+ adapter: sqlite3
+ database: db/diagnostics.sqlite3
+ pool: 5
+ timeout: 5000
--- /dev/null
+require 'diagnostics_test_helper'
+require 'selenium-webdriver'
+require 'headless'
+
+class PipelineTest < DiagnosticsTest
+ pipelines_to_test = Rails.configuration.pipelines_to_test.andand.keys
+
+ setup do
+ headless = Headless.new
+ headless.start
+ Capybara.current_driver = :selenium
+ end
+
+ pipelines_to_test.andand.each do |pipeline_to_test|
+ test "visit home page for user #{pipeline_to_test}" do
+ visit_page_with_token 'active'
+ pipeline_config = Rails.configuration.pipelines_to_test[pipeline_to_test]
+
+ # Search for tutorial template
+ within('.navbar-fixed-top') do
+ page.find_field('search').set pipeline_config['template_uuid']
+ page.find('.glyphicon-search').click
+ end
+
+ # Run the pipeline
+ find('a,button', text: 'Run').click
+
+ # Choose project
+ within('.modal-dialog') do
+ find('.selectable', text: 'Home').click
+ find('button', text: 'Choose').click
+ end
+
+ page.assert_selector('a.disabled,button.disabled', text: 'Run') if pipeline_config['input_paths'].any?
+
+ # Choose input for the pipeline
+ pipeline_config['input_paths'].each do |look_for|
+ select_input look_for
+ end
+ wait_for_ajax
+
+ # All needed input are filled in. Run this pipeline now
+ find('a,button', text: 'Run').click
+
+ # Pipeline is running. We have a "Stop" button instead now.
+ page.assert_selector 'a,button', text: 'Stop'
+
+ # Wait for pipeline run to complete
+ wait_until_page_has 'Complete', pipeline_config['max_wait_seconds']
+ end
+ end
+
+ def select_input look_for
+ inputs_needed = page.all('.btn', text: 'Choose')
+ return if (!inputs_needed || !inputs_needed.any?)
+
+ look_for_uuid = nil
+ look_for_file = nil
+ if look_for.andand.index('/').andand.>0
+ partitions = look_for.partition('/')
+ look_for_uuid = partitions[0]
+ look_for_file = partitions[2]
+ else
+ look_for_uuid = look_for
+ look_for_file = nil
+ end
+
+ inputs_needed[0].click
+
+ within('.modal-dialog') do
+ if look_for_uuid
+ fill_in('Search', with: look_for_uuid, exact: true)
+ wait_for_ajax
+ end
+
+ page.all('.selectable').first.click
+ wait_for_ajax
+ # ajax reload is wiping out input selection after search results; so, select again.
+ page.all('.selectable').first.click
+ wait_for_ajax
+
+ if look_for_file
+ wait_for_ajax
+ within('.collection_files_name', text: look_for_file) do
+ find('.fa-file').click
+ end
+ end
+
+ find('button', text: 'OK').click
+ wait_for_ajax
+ end
+ end
+end
--- /dev/null
+require 'integration_helper'
+require 'yaml'
+
+# Diagnostics tests are executed when "RAILS_ENV=diagnostics" is used.
+# When "RAILS_ENV=test" is used, tests in the "diagnostics" directory
+# will not be executed.
+
+class DiagnosticsTest < ActionDispatch::IntegrationTest
+
+ # Prepends workbench URL to the path provided and visits that page
+ # Expects path parameters such as "/collections/<uuid>"
+ def visit_page_with_token token_name, path='/'
+ workbench_url = Rails.configuration.arvados_workbench_url
+ if workbench_url.end_with? '/'
+ workbench_url = workbench_url[0, workbench_url.size-1]
+ end
+ tokens = Rails.configuration.user_tokens
+ visit page_with_token(tokens[token_name], (workbench_url + path))
+ end
+
+ # Looks for the text_to_look_for for up to the max_time provided
+ def wait_until_page_has text_to_look_for, max_time=30
+ max_time = 30 if (!max_time || (max_time.to_s != max_time.to_i.to_s))
+ Timeout.timeout(max_time) do
+ loop until page.has_text?(text_to_look_for)
+ end
+ end
+
+end
end
test "combine selected collections into new collection" do
+ headless = Headless.new
+ headless.start
+ Capybara.current_driver = :selenium
+
foo_collection = api_fixture('collections')['foo_file']
bar_collection = api_fixture('collections')['bar_file']
assert(page.has_text?('foo'), "Collection page did not include foo file")
assert(page.has_no_text?(bar_collection['name']), "Collection page did not include foo file")
assert(page.has_text?('bar'), "Collection page did not include bar file")
+
+ headless.stop
end
test "combine selected collection files into new collection" do
+ headless = Headless.new
+ headless.start
+ Capybara.current_driver = :selenium
+
foo_collection = api_fixture('collections')['foo_file']
visit page_with_token('active', "/collections")
assert(page.has_text?('Copy to project'), "Copy to project text not found in new collection page")
assert(page.has_no_text?(foo_collection['name']), "Collection page did not include foo file")
assert(page.has_text?('foo'), "Collection page did not include foo file")
+
+ headless.stop
end
end
--- /dev/null
+require 'integration_helper'
+
+class JobsTest < ActionDispatch::IntegrationTest
+ test "add job description" do
+ Capybara.current_driver = Capybara.javascript_driver
+ visit page_with_token("active", "/jobs")
+
+ # go to job running the script "doesnotexist"
+ within first('tr', text: 'doesnotexist') do
+ find("a").click
+ end
+
+ # edit job description
+ within('.arv-description-as-subtitle') do
+ find('.fa-pencil').click
+ find('.editable-input textarea').set('*Textile description for job* - "Go to dashboard":/')
+ find('.editable-submit').click
+ end
+ wait_for_ajax
+
+ # Verify edited description
+ assert page.has_no_text? '*Textile description for job*'
+ assert page.has_text? 'Textile description for job'
+ assert page.has_link? 'Go to dashboard'
+ click_link 'Go to dashboard'
+ assert page.has_text? 'My projects'
+ assert page.has_text? 'Projects shared with me'
+ end
+end
assert page.has_text? 'script_version'
end
+ test 'pipeline description' do
+ visit page_with_token('active_trustedclient')
+
+ visit '/pipeline_instances'
+ assert page.has_text? 'pipeline_with_job'
+
+ find('a', text: 'pipeline_with_job').click
+
+ within('.arv-description-as-subtitle') do
+ find('.fa-pencil').click
+ find('.editable-input textarea').set('*Textile description for pipeline instance*')
+ find('.editable-submit').click
+ end
+ wait_for_ajax
+
+ # verify description
+ assert page.has_no_text? '*Textile description for pipeline instance*'
+ assert page.has_text? 'Textile description for pipeline instance'
+ end
+
test "JSON popup available for strange components" do
uuid = api_fixture("pipeline_instances")["components_is_jobspec"]["uuid"]
visit page_with_token("active", "/pipeline_instances/#{uuid}")
find(".selectable", text: proj_name).click
click_on "Choose"
end
- assert(has_text?("From template"), "did not land on pipeline instance page")
+ assert(has_text?("This pipeline was created from the template"), "did not land on pipeline instance page")
first("a.btn,button", text: "Choose").click
within(".modal-body") do
if (proj_name != PROJECT_WITH_SEARCH_COLLECTION)
assert(page.has_text?("script_parameters"),
"components JSON not found")
end
+
+ test "pipeline template description" do
+ Capybara.current_driver = Capybara.javascript_driver
+ visit page_with_token("active", "/pipeline_templates")
+
+ # go to Two Part pipeline template
+ within first('tr', text: 'Two Part Pipeline Template') do
+ find(".fa-gears").click
+ end
+
+ # edit template description
+ within('.arv-description-as-subtitle') do
+ find('.fa-pencil').click
+ find('.editable-input textarea').set('*Textile description for pipeline template* - "Go to dashboard":/')
+ find('.editable-submit').click
+ end
+ wait_for_ajax
+
+ # Verfiy edited description
+ assert page.has_no_text? '*Textile description for pipeline template*'
+ assert page.has_text? 'Textile description for pipeline template'
+ assert page.has_link? 'Go to dashboard'
+ click_link 'Go to dashboard'
+ assert page.has_text? 'My projects'
+ assert page.has_text? 'Projects shared with me'
+
+ # again visit recent templates page and verify edited description
+ visit page_with_token("active", "/pipeline_templates")
+ assert page.has_no_text? '*Textile description for pipeline template*'
+ assert page.has_text? 'Textile description for pipeline template'
+ end
end
-ENV["RAILS_ENV"] = "test"
+ENV["RAILS_ENV"] = "test" if (ENV["RAILS_ENV"] != "diagnostics")
+
unless ENV["NO_COVERAGE_TEST"]
begin
require 'simplecov'
end
end
-ApiServerForTests.run
+if ENV["RAILS_ENV"].eql? 'test'
+ ApiServerForTests.run
+end
--- /dev/null
+package main
+
+
+// *******************
+// Import the modules.
+//
+// Our examples don't use keepclient, but they do use fmt and log to
+// display output.
+
+import (
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "log"
+)
+
+func main() {
+
+
+ // ********************************
+ // Set up an API client user agent.
+ //
+
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ log.Fatalf("Error setting up arvados client %s", err.Error())
+ }
+
+
+ // *****************************************
+ // Print the full name of the current user.
+ //
+
+ type user struct {
+ Uuid string `json:"uuid"`
+ FullName string `json:"full_name"`
+ }
+
+ var u user
+ err = arv.Call("GET", "users", "", "current", nil, &u)
+
+ if err != nil {
+ log.Fatalf("error querying current user", err.Error())
+ }
+
+ log.Printf("Logged in as %s (uuid %s)", u.FullName, u.Uuid)
+
+
+ // ********************************************************
+ // Print all fields from the first five collections returned.
+ //
+ // Note that some fields, are not returned by default and have to be
+ // requested. See below for an example.
+
+ var results map[string]interface{}
+
+ params := arvadosclient.Dict{"limit": 5}
+
+ err = arv.List("collections", params, &results)
+ if err != nil {
+ log.Fatalf("error querying collections", err.Error())
+ }
+
+ printArvadosResults(results)
+
+
+ // *********************************************************
+ // Print some fields from the first two collections returned.
+ //
+ // We also print manifest_test, which has to be explicitly requested.
+ //
+
+ collection_fields_wanted := []string{"manifest_text", "owner_uuid", "uuid"}
+ params = arvadosclient.Dict{"limit": 2, "select": collection_fields_wanted}
+
+ err = arv.List("collections", params, &results)
+ if err != nil {
+ log.Fatalf("error querying collections", err.Error())
+ }
+
+ printArvadosResults(results)
+}
+
+
+// A helper method which will print out a result map returned by
+// arvadosclient.
+func printArvadosResults(results map[string]interface{}) {
+ for key, value := range results {
+ // "items", if it exists, holds a map.
+ // So we print it prettily below.
+ if key != "items" {
+ fmt.Println(key, ":", value)
+ }
+ }
+
+ if value, ok := results["items"]; ok {
+ items := value.([]interface{})
+ for index, item := range items {
+ fmt.Println("=========== ", index, " ===========")
+ item_map := item.(map[string]interface{})
+ if len(item_map) == 0 {
+ fmt.Println("item", index, ": empty map")
+ } else {
+ for k, v := range item_map {
+ fmt.Println(index, k, ":", v)
+ }
+ }
+ }
+ }
+}
--- /dev/null
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+)
You don't need to install anything. Just import the client like this. The go tools will fetch the relevant code and dependencies for you.
-<notextile>
-<pre><code class="userinput">import (
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
-)
-</code></pre>
-</notextile>
+<notextile>{% code 'example_sdk_go_imports' as go %}</notextile>
-h3. Examples
+If you need pre-release client code, you can use the latest version from the repo by following "these instructions.":https://arvados.org/projects/arvados/wiki/Go#Using-Go-with-Arvados
-Import the module. (We import the log module here too, so we can use it in the subsequent examples.)
+h3. Example
-<notextile>
-<pre><code class="userinput">import (
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "log"
-)
-</code></pre>
-</notextile>
+You can save this source as a .go file and run it:
-Set up an API client user agent:
+<notextile>{% code 'example_sdk_go' as go %}</notextile>
-<notextile>
-<pre><code class="userinput"> arv, err := arvadosclient.MakeArvadosClient()
- if err != nil {
- log.Fatalf("Error setting up arvados client %s", err.Error())
- }
-</code></pre>
-</notextile>
-
-Get the User object for the current user:
-
-<notextile>
-<pre><code class="userinput"> type user struct {
- Uuid string `json:"uuid"`
- FullName int `json:"full_name"`
- }
-
- var u user
- err := arv.Call("GET", "users", "", "current", nil, &u)
-
- if err != nil {
- return err
- }
-
- log.Printf("Logged in as %s (uuid %s)", user.Uuid, user.FullName)
-</code></pre>
-</notextile>
-
-A few more usage examples can be found in the services/keepproxy and sdk/go/keepclient directories in the arvados source tree.
+A few more usage examples can be found in the "services/keepproxy":https://arvados.org/projects/arvados/repository/revisions/master/show/services/keepproxy and "sdk/go/keepclient":https://arvados.org/projects/arvados/repository/revisions/master/show/sdk/go/keepclient directories in the arvados source tree.
USER root
-RUN apt-get install -y -q openjdk-7-jre-headless && \
+RUN apt-get update && \
+ apt-get install -y -q openjdk-7-jre-headless && \
cd /tmp && \
curl --location http://downloads.sourceforge.net/project/bio-bwa/bwa-0.7.9a.tar.bz2 -o bwa-0.7.9a.tar.bz2 && \
tar xjf bwa-0.7.9a.tar.bz2 && \
(find . -executable -type f -print0 | xargs -0 -I {} mv {} /usr/local/bin) && \
rm -r /tmp/samtools-0.1.19*
-USER crunch
\ No newline at end of file
+USER crunch
# [--status-json path] Print JSON status report to a file or
# fifo. Default: /dev/null
#
+# [--description] Description for the pipeline instance.
+#
# == Parameters
#
# [param_name=param_value]
"Synonym for --run-jobs-here.",
:short => :none,
:type => :boolean)
+ opt(:description,
+ "Description for the pipeline instance.",
+ :short => :none,
+ :type => :string)
stop_on [:'--']
end
$options = Trollop::with_standard_exception_handling p do
end
end
else
+ description = $options[:description]
+ description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
@instance = PipelineInstance.
create(components: @components,
properties: {
}
},
pipeline_template_uuid: @template[:uuid],
+ description: description,
state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
end
self
class CollectionReader(object):
- def __init__(self, manifest_locator_or_text, api_client=None):
+ def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
self._api_client = api_client
- self._keep_client = None
+ self._keep_client = keep_client
if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
# now regenerate the manifest text based on the normalized stream
#print "normalizing", self._manifest_text
- self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
+ self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
#print "result", self._manifest_text
self._populate()
resp = []
for s in self._streams:
- resp.append(StreamReader(s))
+ resp.append(StreamReader(s, keep=self._keep_client))
return resp
def all_files(self):
def manifest_text(self, strip=False):
self._populate()
if strip:
- m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+ m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
return m
else:
return self._manifest_text
def put(data, **kwargs):
return Keep.global_client_object().put(data, **kwargs)
+class KeepBlockCache(object):
+ # Default RAM cache is 256MiB
+ def __init__(self, cache_max=(256 * 1024 * 1024)):
+ self.cache_max = cache_max
+ self._cache = []
+ self._cache_lock = threading.Lock()
+
+ class CacheSlot(object):
+ def __init__(self, locator):
+ self.locator = locator
+ self.ready = threading.Event()
+ self.content = None
+
+ def get(self):
+ self.ready.wait()
+ return self.content
+
+ def set(self, value):
+ self.content = value
+ self.ready.set()
+
+ def size(self):
+ if self.content == None:
+ return 0
+ else:
+ return len(self.content)
+
+ def cap_cache(self):
+ '''Cap the cache size to self.cache_max'''
+ self._cache_lock.acquire()
+ try:
+ # Select all slots except those where ready.is_set() and content is
+ # None (that means there was an error reading the block).
+ self._cache = [c for c in self._cache if not (c.ready.is_set() and c.content is None)]
+ sm = sum([slot.size() for slot in self._cache])
+ while len(self._cache) > 0 and sm > self.cache_max:
+ for i in xrange(len(self._cache)-1, -1, -1):
+ if self._cache[i].ready.is_set():
+ del self._cache[i]
+ break
+ sm = sum([slot.size() for slot in self._cache])
+ finally:
+ self._cache_lock.release()
+
+ def reserve_cache(self, locator):
+ '''Reserve a cache slot for the specified locator,
+ or return the existing slot.'''
+ self._cache_lock.acquire()
+ try:
+ # Test if the locator is already in the cache
+ for i in xrange(0, len(self._cache)):
+ if self._cache[i].locator == locator:
+ n = self._cache[i]
+ if i != 0:
+ # move it to the front
+ del self._cache[i]
+ self._cache.insert(0, n)
+ return n, False
+
+ # Add a new cache slot for the locator
+ n = KeepBlockCache.CacheSlot(locator)
+ self._cache.insert(0, n)
+ return n, True
+ finally:
+ self._cache_lock.release()
class KeepClient(object):
class ThreadLimiter(object):
def __init__(self, api_client=None, proxy=None, timeout=300,
- api_token=None, local_store=None):
+ api_token=None, local_store=None, block_cache=None):
"""Initialize a new KeepClient.
Arguments:
if local_store is None:
local_store = os.environ.get('KEEP_LOCAL_STORE')
+ self.block_cache = block_cache if block_cache else KeepBlockCache()
+
if local_store:
self.local_store = local_store
self.get = self.local_store_get
self.put = self.local_store_put
else:
self.timeout = timeout
- self.cache_max = 256 * 1024 * 1024 # Cache is 256MiB
- self._cache = []
- self._cache_lock = threading.Lock()
+
if proxy:
if not proxy.endswith('/'):
proxy += '/'
_logger.debug(str(pseq))
return pseq
- class CacheSlot(object):
- def __init__(self, locator):
- self.locator = locator
- self.ready = threading.Event()
- self.content = None
-
- def get(self):
- self.ready.wait()
- return self.content
-
- def set(self, value):
- self.content = value
- self.ready.set()
-
- def size(self):
- if self.content == None:
- return 0
- else:
- return len(self.content)
-
- def cap_cache(self):
- '''Cap the cache size to self.cache_max'''
- self._cache_lock.acquire()
- try:
- self._cache = filter(lambda c: not (c.ready.is_set() and c.content == None), self._cache)
- sm = sum([slot.size() for slot in self._cache])
- while sm > self.cache_max:
- del self._cache[-1]
- sm = sum([slot.size() for a in self._cache])
- finally:
- self._cache_lock.release()
-
- def reserve_cache(self, locator):
- '''Reserve a cache slot for the specified locator,
- or return the existing slot.'''
- self._cache_lock.acquire()
- try:
- # Test if the locator is already in the cache
- for i in xrange(0, len(self._cache)):
- if self._cache[i].locator == locator:
- n = self._cache[i]
- if i != 0:
- # move it to the front
- del self._cache[i]
- self._cache.insert(0, n)
- return n, False
-
- # Add a new cache slot for the locator
- n = KeepClient.CacheSlot(locator)
- self._cache.insert(0, n)
- return n, True
- finally:
- self._cache_lock.release()
def map_new_services(self, roots_map, md5_s, force_rebuild, **headers):
# roots_map is a dictionary, mapping Keep service root strings
locator = KeepLocator(loc_s)
expect_hash = locator.md5sum
- slot, first = self.reserve_cache(expect_hash)
+ slot, first = self.block_cache.reserve_cache(expect_hash)
if not first:
v = slot.get()
return v
# Always cache the result, then return it if we succeeded.
slot.set(blob)
- self.cap_cache()
+ self.block_cache.cap_cache()
if loop.success():
return blob
gem 'themes_for_rails'
-gem 'arvados', '>= 0.1.20140910123800'
-gem 'arvados-cli', '>= 0.1.20140905165259'
+gem 'arvados', '>= 0.1.20140912152020'
+gem 'arvados-cli', '>= 0.1.20140912152020'
# pg_power lets us use partial indexes in schema.rb in Rails 3
gem 'pg_power'
addressable (2.3.6)
andand (1.3.3)
arel (3.0.3)
- arvados (0.1.20140910123800)
+ arvados (0.1.20140912152020)
activesupport (>= 3.2.13)
andand
google-api-client (~> 0.6.3)
json (>= 1.7.7)
jwt (>= 0.1.5, < 1.0.0)
- arvados-cli (0.1.20140905165259)
+ arvados-cli (0.1.20140912152020)
activesupport (~> 3.2, >= 3.2.13)
andand (~> 1.3, >= 1.3.3)
arvados (~> 0.1.0)
DEPENDENCIES
acts_as_api
andand
- arvados (>= 0.1.20140910123800)
- arvados-cli (>= 0.1.20140905165259)
+ arvados (>= 0.1.20140912152020)
+ arvados-cli (>= 0.1.20140912152020)
coffee-rails (~> 3.2.0)
database_cleaner
factory_girl_rails
rootUrl: root_url,
servicePath: "arvados/v1/",
batchPath: "batch",
+ defaultTrashLifetime: Rails.application.config.default_trash_lifetime,
parameters: {
alt: {
type: "string",
before_validation :set_portable_data_hash
validate :ensure_hash_matches_manifest_text
+ # Query only undeleted collections by default.
+ default_scope where("expires_at IS NULL or expires_at > CURRENT_TIMESTAMP")
+
api_accessible :user, extend: :common do |t|
t.add :name
t.add :description
t.add :repository
t.add :supplied_script_version
t.add :docker_image_locator
+ t.add :description
end
def assert_finished
t.add :slot_number
t.add :status
t.add :crunch_worker_state
+ t.add :info
end
api_accessible :superuser, :extend => :user do |t|
t.add :first_ping_at
- t.add :info
t.add lambda { |x| @@nameservers }, :as => :nameservers
end
def info
- @info ||= Hash.new
- super
+ if current_user.andand.current_user.is_admin
+ super
+ else
+ super.select { |k| not k.to_s.include? "secret" }
+ end
end
def domain
t.add :properties
t.add :state
t.add :components_summary
+ t.add :description
t.add :started_at
t.add :finished_at
end
# source_version
source_version: "<%= `git log -n 1 --format=%h` %>"
local_modified: false
+
+ # Default lifetime for ephemeral collections: 2 weeks.
+ default_trash_lifetime: 1209600
--- /dev/null
+class AddDescriptionToPipelineInstancesAndJobs < ActiveRecord::Migration
+ def up
+ add_column :pipeline_instances, :description, :text, null: true
+ add_column :jobs, :description, :text, null: true
+ end
+
+ def down
+ remove_column :jobs, :description
+ remove_column :pipeline_instances, :description
+ end
+end
repository character varying(255),
supplied_script_version character varying(255),
docker_image_locator character varying(255),
- priority integer DEFAULT 0 NOT NULL
+ priority integer DEFAULT 0 NOT NULL,
+ description text
);
properties text,
state character varying(255),
components_summary text,
- description text,
started_at timestamp without time zone,
- finished_at timestamp without time zone
+ finished_at timestamp without time zone,
+ description text
);
INSERT INTO schema_migrations (version) VALUES ('20140811184643');
-INSERT INTO schema_migrations (version) VALUES ('20140815171049');
-
INSERT INTO schema_migrations (version) VALUES ('20140817035914');
INSERT INTO schema_migrations (version) VALUES ('20140818125735');
INSERT INTO schema_migrations (version) VALUES ('20140828141043');
-INSERT INTO schema_migrations (version) VALUES ('20140909183946');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20140909183946');
+
+INSERT INTO schema_migrations (version) VALUES ('20140911221252');
\ No newline at end of file
end
end
end
+ # default_trash_lifetime cannot be less than 24 hours
+ if Rails.configuration.default_trash_lifetime < 86400 then
+ raise "default_trash_lifetime is %d, must be at least 86400" % Rails.configuration.default_trash_lifetime
+ end
end
end
$stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
$stderr.puts line
pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
- j[:stderr_buf_to_flush] << pub_msg
+ if not j[:log_truncated]
+ j[:stderr_buf_to_flush] << pub_msg
+ end
end
- if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
- (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
- write_log j
+ if not j[:log_truncated]
+ if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
+ (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
+ write_log j
+ end
end
end
end
# send message to log table. we want these records to be transient
def write_log running_job
+ return if running_job[:log_truncated]
+ return if running_job[:stderr_buf_to_flush] == ''
begin
- if (running_job && running_job[:stderr_buf_to_flush] != '')
- # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
- # or crunch_limit_log_events_per_job.
- if (too_many_bytes_logged_for_job(running_job))
- return if running_job[:log_truncated]
- running_job[:log_truncated] = true
- running_job[:stderr_buf_to_flush] =
- "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
- elsif (too_many_events_logged_for_job(running_job))
- return if running_job[:log_truncated]
- running_job[:log_truncated] = true
- running_job[:stderr_buf_to_flush] =
- "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
- end
- log = Log.new(object_uuid: running_job[:job].uuid,
- event_type: 'stderr',
- owner_uuid: running_job[:job].owner_uuid,
- properties: {"text" => running_job[:stderr_buf_to_flush]})
- log.save!
- running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
- running_job[:events_logged] += 1
- running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = Time.now.to_i
+ # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
+ # or crunch_limit_log_events_per_job.
+ if (too_many_bytes_logged_for_job(running_job))
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
+ elsif (too_many_events_logged_for_job(running_job))
+ running_job[:log_truncated] = true
+ running_job[:stderr_buf_to_flush] =
+ "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
end
+ log = Log.new(object_uuid: running_job[:job].uuid,
+ event_type: 'stderr',
+ owner_uuid: running_job[:job].owner_uuid,
+ properties: {"text" => running_job[:stderr_buf_to_flush]})
+ log.save!
+ running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
+ running_job[:events_logged] += 1
rescue
- running_job[:stderr_buf] = "Failed to write logs \n"
- running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = Time.now.to_i
+ running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
end
+ running_job[:stderr_buf_to_flush] = ''
+ running_job[:stderr_flushed_at] = Time.now.to_i
end
end
updated_at: 2014-02-03T17:22:54Z
manifest_text: ". 73feffa4b7f6bb68e44cf984c85f6e88+3 0:3:baz\n"
name: collection_to_move_around
+
+expired_collection:
+ uuid: zzzzz-4zz18-mto52zx1s7sn3ih
+ portable_data_hash: 0b21a217243bfce5617fb9224b95bcb9+49
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-02-03T17:22:54Z
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ modified_at: 2014-02-03T17:22:54Z
+ updated_at: 2014-02-03T17:22:54Z
+ expires_at: 2001-01-01T00:00:00Z
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:expired\n"
+ name: expired_collection
+
+collection_expires_in_future:
+ uuid: zzzzz-4zz18-padkqo7yb8d9i3j
+ portable_data_hash: 0b21a217243bfce5617fb9224b95bcb9+49
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ created_at: 2014-02-03T17:22:54Z
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ modified_at: 2014-02-03T17:22:54Z
+ updated_at: 2014-02-03T17:22:54Z
+ expires_at: 2038-01-01T00:00:00Z
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:expired\n"
+ name: collection_expires_in_future
"Collection should not exist in database after failed create"
end
+ test 'List expired collection returns empty list' do
+ authorize_with :active
+ get :index, {
+ where: {name: 'expired_collection'},
+ }
+ assert_response :success
+ found = assigns(:objects)
+ assert_equal 0, found.count
+ end
+
+ test 'Show expired collection returns 404' do
+ authorize_with :active
+ get :show, {
+ id: 'zzzzz-4zz18-mto52zx1s7sn3ih',
+ }
+ assert_response 404
+ end
+
+ test 'Update expired collection returns 404' do
+ authorize_with :active
+ post :update, {
+ id: 'zzzzz-4zz18-mto52zx1s7sn3ih',
+ collection: {
+ name: "still expired"
+ }
+ }
+ assert_response 404
+ end
+
+ test 'List collection with future expiration time succeeds' do
+ authorize_with :active
+ get :index, {
+ where: {name: 'collection_expires_in_future'},
+ }
+ found = assigns(:objects)
+ assert_equal 1, found.count
+ end
+
+
+ test 'Show collection with future expiration time succeeds' do
+ authorize_with :active
+ get :show, {
+ id: 'zzzzz-4zz18-padkqo7yb8d9i3j',
+ }
+ assert_response :success
+ end
+
+ test 'Update collection with future expiration time succeeds' do
+ authorize_with :active
+ post :update, {
+ id: 'zzzzz-4zz18-padkqo7yb8d9i3j',
+ collection: {
+ name: "still not expired"
+ }
+ }
+ assert_response :success
+ end
end
end
test "ping adds node stats to info" do
+ authorize_with :admin
node = nodes(:idle)
post :ping, {
id: node.uuid,
"discovery document was generated >#{MAX_SCHEMA_AGE}s ago")
end
+ test "discovery document has defaultTrashLifetime" do
+ get :index
+ assert_response :success
+ discovery_doc = JSON.parse(@response.body)
+ assert_includes discovery_doc, 'defaultTrashLifetime'
+ assert_equal discovery_doc['defaultTrashLifetime'], Rails.application.config.default_trash_lifetime
+ end
end
self.token = config.get('ARVADOS_API_TOKEN')
self.insecure = config.flag_is_true('ARVADOS_API_HOST_INSECURE')
self.local = threading.local()
+ self.block_cache = arvados.KeepBlockCache()
def localapi(self):
if 'api' not in self.local.__dict__:
self.local.api = arvados.api('v1', False, self.host, self.token, self.insecure)
return self.local.api
+ def localkeep(self):
+ if 'keep' not in self.local.__dict__:
+ self.local.keep = arvados.KeepClient(api_client=self.localapi(), block_cache=self.block_cache)
+ return self.local.keep
+
def collections(self):
return self.localapi().collections()
self.collection_object_file.update(self.collection_object)
self.clear()
- collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api)
+ collection = arvados.CollectionReader(self.collection_object["manifest_text"], self.api, self.api.localkeep())
for s in collection.all_streams():
cwd = self
for part in s.name().split('/'):
else:
_logger.error("arv-mount %s: error", self.collection_locator)
_logger.exception(detail)
+ except arvados.errors.ArgumentError as detail:
+ _logger.warning("arv-mount %s: error %s", self.collection_locator, detail)
+ if self.collection_object is not None and "manifest_text" in self.collection_object:
+ _logger.warning("arv-mount manifest_text is: %s", self.collection_object["manifest_text"])
except Exception as detail:
_logger.error("arv-mount %s: error", self.collection_locator)
if self.collection_object is not None and "manifest_text" in self.collection_object:
try:
with llfuse.lock_released:
return handle.entry.readfrom(off, size)
- except:
+ except arvados.errors.NotFoundError as e:
+ _logger.warning("Block not found: " + str(e))
+ raise llfuse.FUSEError(errno.EIO)
+ except Exception as e:
+ _logger.exception(e)
raise llfuse.FUSEError(errno.EIO)
def release(self, fh):
"Empty collection.link",
"Pipeline Template with Input Parameter with Search.pipelineTemplate",
"Pipeline Template with Jobspec Components.pipelineTemplate",
+ "collection_expires_in_future",
"pipeline_with_job.pipelineInstance"
], d2)
--- /dev/null
+package main
+
+/* A BlockWorkList is an asynchronous thread-safe queue manager. It
+ provides a channel from which items can be read off the queue, and
+ permits replacing the contents of the queue at any time.
+
+ The overall work flow for a BlockWorkList is as follows:
+
+ 1. A BlockWorkList is created with NewBlockWorkList(). This
+ function instantiates a new BlockWorkList and starts a manager
+ goroutine. The manager listens on an input channel
+ (manager.newlist) and an output channel (manager.NextItem).
+
+ 2. The manager first waits for a new list of requests on the
+ newlist channel. When another goroutine calls
+ manager.ReplaceList(lst), it sends lst over the newlist
+ channel to the manager. The manager goroutine now has
+ ownership of the list.
+
+ 3. Once the manager has this initial list, it listens on both the
+ input and output channels for one of the following to happen:
+
+ a. A worker attempts to read an item from the NextItem
+ channel. The manager sends the next item from the list
+ over this channel to the worker, and loops.
+
+ b. New data is sent to the manager on the newlist channel.
+ This happens when another goroutine calls
+ manager.ReplaceItem() with a new list. The manager
+ discards the current list, replaces it with the new one,
+ and begins looping again.
+
+ c. The input channel is closed. The manager closes its
+ output channel (signalling any workers to quit) and
+ terminates.
+
+ Tasks currently handled by BlockWorkList:
+ * the pull list
+ * the trash list
+
+ Example usage:
+
+ // Any kind of user-defined type can be used with the
+ // BlockWorkList.
+ type FrobRequest struct {
+ frob string
+ }
+
+ // Make a work list.
+ froblist := NewBlockWorkList()
+
+ // Start a concurrent worker to read items from the NextItem
+ // channel until it is closed, deleting each one.
+ go func(list BlockWorkList) {
+ for i := range list.NextItem {
+ req := i.(FrobRequest)
+ frob.Run(req)
+ }
+ }(froblist)
+
+ // Set up a HTTP handler for PUT /frob
+ router.HandleFunc(`/frob`,
+ func(w http.ResponseWriter, req *http.Request) {
+ // Parse the request body into a list.List
+ // of FrobRequests, and give this list to the
+ // frob manager.
+ newfrobs := parseBody(req.Body)
+ froblist.ReplaceList(newfrobs)
+ }).Methods("PUT")
+
+ Methods available on a BlockWorkList:
+
+ ReplaceList(list)
+ Replaces the current item list with a new one. The list
+ manager discards any unprocessed items on the existing
+ list and replaces it with the new one. If the worker is
+ processing a list item when ReplaceList is called, it
+ finishes processing before receiving items from the new
+ list.
+ Close()
+ Shuts down the manager goroutine. When Close is called,
+ the manager closes the NextItem channel.
+*/
+
+import "container/list"
+
+type BlockWorkList struct {
+ newlist chan *list.List
+ NextItem chan interface{}
+}
+
+// NewBlockWorkList returns a new worklist, and launches a listener
+// goroutine that waits for work and farms it out to workers.
+//
+func NewBlockWorkList() *BlockWorkList {
+ b := BlockWorkList{
+ newlist: make(chan *list.List),
+ NextItem: make(chan interface{}),
+ }
+ go b.listen()
+ return &b
+}
+
+// ReplaceList sends a new list of pull requests to the manager goroutine.
+// The manager will discard any outstanding pull list and begin
+// working on the new list.
+//
+func (b *BlockWorkList) ReplaceList(list *list.List) {
+ b.newlist <- list
+}
+
+// Close shuts down the manager and terminates the goroutine, which
+// completes any pull request in progress and abandons any pending
+// requests.
+//
+func (b *BlockWorkList) Close() {
+ close(b.newlist)
+}
+
+// listen is run in a goroutine. It reads new pull lists from its
+// input queue until the queue is closed.
+// listen takes ownership of the list that is passed to it.
+//
+// Note that the routine does not ever need to access the list
+// itself once the current_item has been initialized, so we do
+// not bother to keep a pointer to the list. Because it is a
+// doubly linked list, holding on to the current item will keep
+// it from garbage collection.
+//
+func (b *BlockWorkList) listen() {
+ var current_item *list.Element
+
+ // When we're done, close the output channel to shut down any
+ // workers.
+ defer close(b.NextItem)
+
+ for {
+ // If the current list is empty, wait for a new list before
+ // even checking if workers are ready.
+ if current_item == nil {
+ if p, ok := <-b.newlist; ok {
+ current_item = p.Front()
+ } else {
+ // The channel was closed; shut down.
+ return
+ }
+ }
+ select {
+ case p, ok := <-b.newlist:
+ if ok {
+ current_item = p.Front()
+ } else {
+ // The input channel is closed; time to shut down
+ return
+ }
+ case b.NextItem <- current_item.Value:
+ current_item = current_item.Next()
+ }
+ }
+}
--- /dev/null
+package main
+
+import (
+ "container/list"
+ "testing"
+)
+
+func makeTestWorkList(ary []int) *list.List {
+ l := list.New()
+ for _, n := range ary {
+ l.PushBack(n)
+ }
+ return l
+}
+
+func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
+ select {
+ case item := <-c:
+ t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
+ default:
+ // no-op
+ }
+}
+
+func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
+ if item, ok := <-c; !ok {
+ t.Fatal("expected data on a closed channel")
+ } else if item == nil {
+ t.Fatal("expected data on an empty channel")
+ }
+}
+
+func expectChannelClosed(t *testing.T, c <-chan interface{}) {
+ received, ok := <-c
+ if ok {
+ t.Fatalf("Expected channel to be closed, but received %v instead", received)
+ }
+}
+
+func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
+ for i := range expected {
+ actual, ok := <-c
+ t.Logf("received %v", actual)
+ if !ok {
+ t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
+ } else if actual.(int) != expected[i] {
+ t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
+ }
+ }
+}
+
+// Create a BlockWorkList, generate a list for it, and instantiate a worker.
+func TestBlockWorkListReadWrite(t *testing.T) {
+ var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+
+ b := NewBlockWorkList()
+ b.ReplaceList(makeTestWorkList(input))
+
+ expectFromChannel(t, b.NextItem, input)
+ expectChannelEmpty(t, b.NextItem)
+ b.Close()
+}
+
+// Start a worker before the list has any input.
+func TestBlockWorkListEarlyRead(t *testing.T) {
+ var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+
+ b := NewBlockWorkList()
+
+ // First, demonstrate that nothing is available on the NextItem
+ // channel.
+ expectChannelEmpty(t, b.NextItem)
+
+ // Start a reader in a goroutine. The reader will block until the
+ // block work list has been initialized.
+ //
+ done := make(chan int)
+ go func() {
+ expectFromChannel(t, b.NextItem, input)
+ b.Close()
+ done <- 1
+ }()
+
+ // Feed the blocklist a new worklist, and wait for the worker to
+ // finish.
+ b.ReplaceList(makeTestWorkList(input))
+ <-done
+
+ expectChannelClosed(t, b.NextItem)
+}
+
+// Show that a reader may block when the manager's list is exhausted,
+// and that the reader resumes automatically when new data is
+// available.
+func TestBlockWorkListReaderBlocks(t *testing.T) {
+ var (
+ inputBeforeBlock = []int{1, 2, 3, 4, 5}
+ inputAfterBlock = []int{6, 7, 8, 9, 10}
+ )
+
+ b := NewBlockWorkList()
+ sendmore := make(chan int)
+ done := make(chan int)
+ go func() {
+ expectFromChannel(t, b.NextItem, inputBeforeBlock)
+
+ // Confirm that the channel is empty, so a subsequent read
+ // on it will block.
+ expectChannelEmpty(t, b.NextItem)
+
+ // Signal that we're ready for more input.
+ sendmore <- 1
+ expectFromChannel(t, b.NextItem, inputAfterBlock)
+ b.Close()
+ done <- 1
+ }()
+
+ // Write a slice of the first five elements and wait for the
+ // reader to signal that it's ready for us to send more input.
+ b.ReplaceList(makeTestWorkList(inputBeforeBlock))
+ <-sendmore
+
+ b.ReplaceList(makeTestWorkList(inputAfterBlock))
+
+ // Wait for the reader to complete.
+ <-done
+}
+
+// Replace one active work list with another.
+func TestBlockWorkListReplaceList(t *testing.T) {
+ var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+ var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+
+ b := NewBlockWorkList()
+ b.ReplaceList(makeTestWorkList(firstInput))
+
+ // Read just the first five elements from the work list.
+ // Confirm that the channel is not empty.
+ expectFromChannel(t, b.NextItem, firstInput[0:5])
+ expectChannelNotEmpty(t, b.NextItem)
+
+ // Replace the work list and read five more elements.
+ // The old list should have been discarded and all new
+ // elements come from the new list.
+ b.ReplaceList(makeTestWorkList(replaceInput))
+ expectFromChannel(t, b.NextItem, replaceInput[0:5])
+
+ b.Close()
+}
// The Keep pull manager should have received one good list with 3
// requests on it.
- var saved_pull_list = pullmgr.GetList()
- if len(saved_pull_list) != 3 {
- t.Errorf(
- "saved_pull_list: expected 3 elements, got %d\nsaved_pull_list = %v",
- len(saved_pull_list), saved_pull_list)
+ var output_list = make([]PullRequest, 3)
+ for i := 0; i < 3; i++ {
+ item := <-pullmgr.NextItem
+ if pr, ok := item.(PullRequest); ok {
+ output_list[i] = pr
+ } else {
+ t.Errorf("item %v could not be parsed as a PullRequest", item)
+ }
}
}
import (
"bufio"
"bytes"
+ "container/list"
"crypto/md5"
"encoding/json"
"fmt"
- "git.curoverse.com/arvados.git/services/keepstore/pull_list"
"github.com/gorilla/mux"
"io"
"log"
If the JSON unmarshalling fails, return 400 Bad Request.
*/
+type PullRequest struct {
+ Locator string `json:"locator"`
+ Servers []string `json:"servers"`
+}
+
func PullHandler(resp http.ResponseWriter, req *http.Request) {
// Reject unauthorized requests.
api_token := GetApiToken(req)
}
// Parse the request body.
- var plist []pull_list.PullRequest
+ var pr []PullRequest
r := json.NewDecoder(req.Body)
- if err := r.Decode(&plist); err != nil {
+ if err := r.Decode(&pr); err != nil {
http.Error(resp, BadRequestError.Error(), BadRequestError.HTTPCode)
log.Printf("%s %s: %s\n", req.Method, req.URL, err.Error())
return
// We have a properly formatted pull list sent from the data
// manager. Report success and send the list to the pull list
// manager for further handling.
- log.Printf("%s %s: received %v\n", req.Method, req.URL, plist)
+ log.Printf("%s %s: received %v\n", req.Method, req.URL, pr)
resp.WriteHeader(http.StatusOK)
resp.Write([]byte(
- fmt.Sprintf("Received %d pull requests\n", len(plist))))
+ fmt.Sprintf("Received %d pull requests\n", len(pr))))
+
+ plist := list.New()
+ for _, p := range pr {
+ plist.PushBack(p)
+ }
if pullmgr == nil {
- pullmgr = pull_list.NewManager()
+ pullmgr = NewBlockWorkList()
}
- pullmgr.SetList(plist)
+ pullmgr.ReplaceList(plist)
}
// ==============================
"bytes"
"flag"
"fmt"
- "git.curoverse.com/arvados.git/services/keepstore/pull_list"
"io/ioutil"
"log"
"net"
// keepstore servers in order to increase data replication) with
// atomic update methods that are safe to use from multiple
// goroutines.
-var pullmgr *pull_list.Manager
+var pullmgr *BlockWorkList
// TODO(twp): continue moving as much code as possible out of main
// so it can be effectively tested. Esp. handling and postprocessing
+++ /dev/null
-package pull_list
-
-/* The pull_list package manages a list of pull requests sent
- by Data Manager.
-
- The interface is:
-
- pull_list.NewManager() creates and returns a pull_list.Manager. A
- listener runs in a goroutine, waiting for new requests on its input
- channels.
-
- pull_list.SetList() assigns a new pull list to the manager. Any
- existing list is discarded.
-
- pull_list.GetList() reports the manager's current pull list.
-
- pull_list.Close() shuts down the pull list manager.
-*/
-
-type PullRequest struct {
- Locator string
- Servers []string
-}
-
-type Manager struct {
- setlist chan []PullRequest // input channel for setting new lists
- getlist chan []PullRequest // output channel for getting existing list
-}
-
-// NewManager returns a new Manager object. It launches a goroutine that
-// waits for pull requests.
-//
-func NewManager() *Manager {
- r := Manager{
- make(chan []PullRequest),
- make(chan []PullRequest),
- }
- go r.listen()
- return &r
-}
-
-// SetList sends a new list of pull requests to the manager goroutine.
-// The manager will discard any outstanding pull list and begin
-// working on the new list.
-//
-func (r *Manager) SetList(pr []PullRequest) {
- r.setlist <- pr
-}
-
-// GetList reports the contents of the current pull list.
-func (r *Manager) GetList() []PullRequest {
- return <-r.getlist
-}
-
-// Close shuts down the manager and terminates the goroutine, which
-// completes any pull request in progress and abandons any pending
-// requests.
-//
-func (r *Manager) Close() {
- close(r.setlist)
-}
-
-// listen is run in a goroutine. It reads new pull lists from its
-// input queue until the queue is closed.
-func (r *Manager) listen() {
- var current []PullRequest
- for {
- select {
- case p, ok := <-r.setlist:
- if ok {
- current = p
- } else {
- // The input channel is closed; time to shut down
- close(r.getlist)
- return
- }
- case r.getlist <- current:
- // no-op
- }
- }
-}