--- /dev/null
+// On loading of a collection, enable the "lock" button and
+// disable all file modification controls (upload, rename, delete)
+$(document).
+ ready(function(event) {
+ $(".btn-collection-file-control").addClass("disabled");
+ $(".tab-pane-Upload").addClass("disabled");
+ $("#Upload-tab").attr("data-toggle", "disabled");
+ }).
+ on('click', '.lock-collection-btn', function(event) {
+ classes = $(event.target).attr('class')
+
+ if (classes.indexOf("fa-lock") != -1) {
+ // About to unlock; warn and get confirmation from user
+ if (confirm("Adding, renaming, and deleting files changes the portable data hash. Are you sure you want to unlock the collection?")) {
+ $(".lock-collection-btn").removeClass("fa-lock");
+ $(".lock-collection-btn").addClass("fa-unlock");
+ $(".lock-collection-btn").attr("title", "Lock collection to prevent editing files");
+ $(".btn-collection-file-control").removeClass("disabled");
+ $(".tab-pane-Upload").removeClass("disabled");
+ $("#Upload-tab").attr("data-toggle", "tab");
+ } else {
+ // User clicked "no" and so do not unlock
+ }
+ } else {
+ // Lock it back
+ $(".lock-collection-btn").removeClass("fa-unlock");
+ $(".lock-collection-btn").addClass("fa-lock");
+ $(".lock-collection-btn").attr("title", "Unlock collection to edit files");
+ $(".btn-collection-file-control").addClass("disabled");
+ $(".tab-pane-Upload").addClass("disabled");
+ $("#Upload-tab").attr("data-toggle", "disabled");
+ }
+ });
function enable_disable_selection_actions() {
var $container = $(this);
var $checked = $('.persistent-selection:checkbox:checked', $container);
+ var collection_lock_classes = $('.lock-collection-btn').attr('class')
+
$('[data-selection-action]', $container).
closest('div.btn-group-sm').
find('ul li').
toggleClass('disabled',
($checked.filter('[value*=-4zz18-]').length < 1) ||
($checked.length != $checked.filter('[value*=-4zz18-]').length));
+ $('[data-selection-action=remove-selected-files]', $container).
+ closest('li').
+ toggleClass('disabled',
+ ($checked.length < 0) ||
+ !($checked.length > 0 && collection_lock_classes && collection_lock_classes.indexOf("fa-unlock") !=-1));
}
$(document).
.btn-group.toggle-persist .btn-info.active {
background-color: $active-bg;
}
+
+.lock-collection-btn {
+ display: inline-block;
+ padding: .5em 2em;
+ margin: 0 1em;
+}
end
arv_coll = Arv::Collection.new(@object.manifest_text)
- arv_coll.rename "./"+file_path, new_file_path
- if @object.update_attributes manifest_text: arv_coll.manifest_text
- show
- else
+ if arv_coll.exist?(new_file_path)
+ @errors = 'Duplicate file path. Please use a different name.'
self.render_error status: 422
+ else
+ arv_coll.rename "./"+file_path, new_file_path
+
+ if @object.update_attributes manifest_text: arv_coll.manifest_text
+ show
+ else
+ self.render_error status: 422
+ end
end
else
# Not a file rename; use default
end
%>
- <li class="<%= 'active' if i==0 %> <%= link_disabled %>" data-toggle="tooltip" data-placement="top" title="<%=tab_tooltip%>">
+ <li class="<%= 'active' if i==0 %> <%= link_disabled %> tab-pane-<%=pane_name%>" data-toggle="tooltip" data-placement="top" title="<%=tab_tooltip%>">
<a href="#<%= pane_name %>"
id="<%= pane_name %>-tab"
data-toggle="<%= data_toggle %>"
--- /dev/null
+<% if @object.editable? %>
+ <i class="fa fa-fw fa-lock lock-collection-btn btn btn-primary" title="Unlock collection to edit files"></i>
+<% end %>
<% end %>
<% if object.editable? %>
- <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate', title: "Remove #{file_path}") do %>
+ <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate btn-collection-file-control', title: "Remove #{file_path}") do %>
<i class="fa fa-fw fa-trash-o"></i>
<% end %>
<% end %>
<% if CollectionsHelper::is_image(filename) %>
<i class="fa fa-fw fa-bar-chart-o"></i>
<% if object.editable? %>
- <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'} %>
+ <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit name or directory or both for this file'} %>
<% else %>
<%= filename %>
<% end %>
</div>
<% else %>
<% if object.editable? %>
- <i class="fa fa-fw fa-file"></i><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'} %>
+ <i class="fa fa-fw fa-file"></i><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit name or directory or both for this file', btnclass: 'collection-file-control'} %>
<% else %>
<i class="fa fa-fw fa-file" href="<%=object.uuid%>/<%=file_path%>" ></i> <%= filename %>
<% end %>
<% if @object.state == 'Final' %>
<%= link_to(copy_container_request_path('id' => @object.uuid),
- class: 'btn btn-primary',
+ class: 'btn btn-sm btn-primary',
title: 'Re-run',
data: {toggle: :tooltip, placement: :top}, title: 'This will make a copy and take you there. You can then make any needed changes and run it',
method: :post,
collection = Collection.select([:uuid, :manifest_text]).where(uuid: collection['uuid']).first
assert_match /. d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:file1renamed\n.\/dir1 d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:dir1file2 0:0:file2\n.\/dir2\/dir3 d41d8cd98f00b204e9800998ecf8427e\+0\+A(.*) 0:0:dir1file1moved\n$/, collection['manifest_text']
end
+
+ test "renaming file with a duplicate name in same stream not allowed" do
+ use_token :active
+
+ # rename 'file2' as 'file1' and expect error
+ post :update, {
+ id: 'zzzzz-4zz18-pyw8yp9g3pr7irn',
+ collection: {
+ 'rename-file-path:file2' => 'file1'
+ },
+ format: :json
+ }, session_for(:active)
+ assert_response 422
+ assert_includes json_response['errors'], 'Duplicate file path'
+ end
+
+ test "renaming file with a duplicate name as another stream not allowed" do
+ use_token :active
+
+ # rename 'file1' as 'dir1/file1' and expect error
+ post :update, {
+ id: 'zzzzz-4zz18-pyw8yp9g3pr7irn',
+ collection: {
+ 'rename-file-path:file1' => 'dir1/file1'
+ },
+ format: :json
+ }, session_for(:active)
+ assert_response 422
+ assert_includes json_response['errors'], 'Duplicate file path'
+ end
end
test "Upload two empty files with the same name" do
need_selenium "to make file uploads work"
visit page_with_token 'active', sandbox_path
+
+ unlock_collection
+
find('.nav-tabs a', text: 'Upload').click
attach_file 'file_selector', testfile_path('empty.txt')
assert_selector 'div', text: 'empty.txt'
test "Upload non-empty files" do
need_selenium "to make file uploads work"
visit page_with_token 'active', sandbox_path
+
+ unlock_collection
+
find('.nav-tabs a', text: 'Upload').click
attach_file 'file_selector', testfile_path('a')
attach_file 'file_selector', testfile_path('foo.txt')
service_port: 99999)
end
visit page_with_token 'active', sandbox_path
+
+ unlock_collection
+
find('.nav-tabs a', text: 'Upload').click
attach_file 'file_selector', testfile_path('foo.txt')
assert_selector 'button:not([disabled])', text: 'Start'
# Must be an absolute path. https://github.com/jnicklas/capybara/issues/621
File.join Dir.getwd, 'tmp', filename
end
+
+ def unlock_collection
+ first('.lock-collection-btn').click
+ accept_alert
+ end
end
end
test "remove a file from collection using checkbox and dropdown option" do
+ need_selenium 'to confirm unlock'
+
visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
assert(page.has_text?('file1'), 'file not found - file1')
+ unlock_collection
+
# remove first file
input_files = page.all('input[type=checkbox]')
input_files[0].click
end
test "remove a file in collection using trash icon" do
- need_selenium 'to confirm remove'
+ need_selenium 'to confirm unlock'
visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
assert(page.has_text?('file1'), 'file not found - file1')
+ unlock_collection
+
first('.fa-trash-o').click
- page.driver.browser.switch_to.alert.accept
+ accept_alert
assert(page.has_no_text?('file1'), 'file found - file')
assert(page.has_text?('file2'), 'file not found - file2')
end
test "rename a file in collection" do
+ need_selenium 'to confirm unlock'
+
visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
+ unlock_collection
+
within('.collection_files') do
first('.fa-pencil').click
find('.editable-input input').set('file1renamed')
assert_nil first('.fa-trash-o')
end
end
+
+ test "unlock collection to modify files" do
+ need_selenium 'to confirm remove'
+
+ collection = api_fixture('collections')['collection_owned_by_active']
+
+ # On load, collection is locked, and upload tab, rename and remove options are disabled
+ visit page_with_token('active', "/collections/#{collection['uuid']}")
+
+ assert_selector 'a[data-toggle="disabled"]', text: 'Upload'
+
+ within('.collection_files') do
+ file_ctrls = page.all('.btn-collection-file-control')
+ assert_equal 2, file_ctrls.size
+ assert_equal true, file_ctrls[0]['class'].include?('disabled')
+ assert_equal true, file_ctrls[1]['class'].include?('disabled')
+ find('input[type=checkbox]').click
+ end
+
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li.disabled', text: 'Remove selected files'
+ assert_selector 'li', text: 'Create new collection with selected files'
+ end
+
+ unlock_collection
+
+ assert_no_selector 'a[data-toggle="disabled"]', text: 'Upload'
+ assert_selector 'a', text: 'Upload'
+
+ within('.collection_files') do
+ file_ctrls = page.all('.btn-collection-file-control')
+ assert_equal 2, file_ctrls.size
+ assert_equal false, file_ctrls[0]['class'].include?('disabled')
+ assert_equal false, file_ctrls[1]['class'].include?('disabled')
+ # previous checkbox selection won't result in firing a new event;
+ # undo and redo checkbox to fire the selection event again
+ find('input[type=checkbox]').click
+ find('input[type=checkbox]').click
+ end
+
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_no_selector 'li.disabled', text: 'Remove selected files'
+ assert_selector 'li', text: 'Remove selected files'
+ end
+ end
+
+ def unlock_collection
+ first('.lock-collection-btn').click
+ accept_alert
+ end
end
end
Capybara.reset_sessions!
end
+
+ def accept_alert
+ if Capybara.current_driver == :selenium
+ (0..9).each do
+ begin
+ page.driver.browser.switch_to.alert.accept
+ break
+ rescue Selenium::WebDriver::Error::NoSuchAlertError
+ sleep 0.1
+ end
+ end
+ else
+ # poltergeist returns true for confirm, so no need to accept
+ end
+ end
end
#distribution(s)|name|version|iteration|type|architecture|extra fpm arguments
debian8,ubuntu1204,centos7|python-gflags|2.0|2|python|all
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.4.2|2|python|all
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|oauth2client|1.5.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.6.2|2|python|all
+debian8,ubuntu1204,ubuntu1404,centos7|oauth2client|1.5.2|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|pyasn1|0.1.7|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|pyasn1-modules|0.0.5|2|python|all
debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rsa|3.4.2|2|python|all
debian8,ubuntu1204,centos7|pycrypto|2.6.1|3|python|amd64
debian8,ubuntu1204,ubuntu1404,ubuntu1604|backports.ssl_match_hostname|3.5.0.1|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|llfuse|0.41.1|3|python|amd64
-debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pycurl|7.19.5.3|3|python|amd64
+debian8,ubuntu1204,ubuntu1404,centos7|pycurl|7.19.5.3|3|python|amd64
debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|pyyaml|3.12|2|python|amd64
debian8,ubuntu1204,ubuntu1404,ubuntu1604,centos7|rdflib|4.2.2|2|python|all
debian8,ubuntu1204,ubuntu1404,centos7|shellescape|3.4.1|2|python|all
all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
all|rdflib-jsonld|0.4.0|2|python|all
all|futures|3.0.5|2|python|all
+all|future|0.16.0|2|python|all
+all|future|0.16.0|2|python3|all
# Install RVM
RUN apt-get update && \
- DEBIAN_FRONTEND=noninteractive apt-get -y install --no-install-recommends curl ca-certificates && \
+ DEBIAN_FRONTEND=noninteractive apt-get -y install --no-install-recommends curl ca-certificates g++ && \
gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
curl -L https://get.rvm.io | bash -s stable && \
/usr/local/rvm/bin/rvm install 2.3 && \
chown "$WWW_OWNER:" $RELEASE_PATH/config/environment.rb
chown "$WWW_OWNER:" $RELEASE_PATH/config.ru
chown "$WWW_OWNER:" $RELEASE_PATH/Gemfile.lock
- chown -R "$WWW_OWNER:" $RELEASE_PATH/tmp
+ chown -R "$WWW_OWNER:" $RELEASE_PATH/tmp || true
chown -R "$WWW_OWNER:" $SHARED_PATH/log
case "$RAILSPKG_DATABASE_LOAD_TASK" in
db:schema:load) chown "$WWW_OWNER:" $RELEASE_PATH/db/schema.rb ;;
db:structure:load) chown "$WWW_OWNER:" $RELEASE_PATH/db/structure.sql ;;
esac
chmod 644 $SHARED_PATH/log/*
- chmod -R 2775 $RELEASE_PATH/tmp
+ chmod -R 2775 $RELEASE_PATH/tmp || true
echo "... done."
if [ -n "$RAILSPKG_DATABASE_LOAD_TASK" ]; then
ubuntu1404)
FORMAT=deb
;;
+ ubuntu1604)
+ FORMAT=deb
+ ;;
centos7)
FORMAT=rpm
;;
echo -n 'fuse.h: '
find /usr/include -wholename '*fuse/fuse.h' \
|| fatal "No fuse/fuse.h. Try: apt-get install libfuse-dev"
+ echo -n 'gnutls.h: '
+ find /usr/include -wholename '*gnutls/gnutls.h' \
+ || fatal "No gnutls/gnutls.h. Try: apt-get install libgnutls28-dev"
echo -n 'pyconfig.h: '
find /usr/include -name pyconfig.h | egrep --max-count=1 . \
|| fatal "No pyconfig.h. Try: apt-get install python-dev"
</ul>
<div class="pull-right" style="padding-top: 6px">
- <form method="get" action="http://www.google.com/search">
+ <form method="get" action="https://www.google.com/search">
<div class="input-group" style="width: 220px">
<input type="text" class="form-control" name="q" placeholder="search">
<div class="input-group-addon">
|cwd|string|Initial working directory, given as an absolute path (in the container) or a path relative to the WORKDIR given in the image's Dockerfile.|Required.|
|command|array of strings|Command to execute in the container.|Required. e.g., @["echo","hello"]@|
|output_path|string|Path to a directory or file inside the container that should be preserved as container's output when it finishes. This path must be, or be inside, one of the mount targets. For best performance, point output_path to a writable collection mount. Also, see "Pre-populate output using Mount points":#pre-populate-output for details regarding optional output pre-population using mount points.|Required.|
+|output_name|string|Desired name for the output collection. If null, a name will be assigned automatically.||
+|output_ttl|integer|Desired lifetime for the output collection, in seconds. If zero, the output collection will not be deleted automatically.||
|priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to preview the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
|expires_at|datetime|After this time, priority is considered to be zero.|Not yet implemented.|
|use_existing|boolean|If possible, use an existing (non-failed) container to satisfy the request instead of creating a new one.|Default is true|
# EC2 configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
# Google Compute Engine configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# Node Manager will ensure that there are at least this many nodes running at
# all times. If node manager needs to start new idle nodes for the purpose of
# Azure configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
<pre>
$namespaces:
arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
</pre>
Arvados extensions must go into the @hints@ section, for example:
arv:PartitionRequirement:
partition: dev_partition
arv:APIRequirement: {}
+ cwltool:LoadListingRequirement:
+ loadListing: shallow_listing
</pre>
h2. arv:RunInSingleContainer
h2. arv:APIRequirement
Indicates that process wants to access to the Arvados API. Will be granted limited network access and have @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ set in the environment.
+
+h2. cwltool:LoadListingRequirement
+
+In CWL v1.0 documents, the default behavior for Directory objects is to recursively expand the @listing@ for access by parameter references an expressions. For directory trees containing many files, this can be expensive in both time and memory usage. Use @cwltool:LoadListingRequirement@ to change the behavior for expansion of directory listings in the workflow runner.
+
+table(table table-bordered table-condensed).
+|_. Field |_. Type |_. Description |
+|loadListing|string|One of @no_listing@, @shallow_listing@, or @deep_listing@|
+
+*no_listing*: Do not expand directory listing at all. The @listing@ field on the Directory object will be undefined.
+
+*shallow_listing*: Only expand the first level of directory listing. The @listing@ field on the toplevel Directory object will contain the directory contents, however @listing@ will not be defined on subdirectories.
+
+*deep_listing*: Recursively expand all levels of directory listing. The @listing@ field will be provided on the toplevel object and all subdirectories.
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
s.add_runtime_dependency 'google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
s.add_runtime_dependency 'activesupport', '>= 3.2.13', '< 5'
- s.add_runtime_dependency 'json', '~> 1.7', '>= 1.7.7'
+ s.add_runtime_dependency 'json', '>= 1.7.7', '<3'
s.add_runtime_dependency 'trollop', '~> 2.0'
s.add_runtime_dependency 'andand', '~> 1.3', '>= 1.3.3'
s.add_runtime_dependency 'oj', '~> 2.0', '>= 2.0.3'
assert_arv_get false, 'e796ab2294f3e48ec709ffa8d6daf58c'
end
assert_equal '', out
- assert_match /Error:/, err
+ assert_match /ERROR:/, err
end
def test_nonexistent_manifest
assert_arv_get false, 'acbd18db4cc2f85cedef654fccc4a4d8/', 'tmp/'
end
assert_equal '', out
- assert_match /Error:/, err
+ assert_match /ERROR:/, err
end
def test_manifest_root_to_dir
from ._version import __version__
from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, getListing
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
from cwltool.draft2tool import compute_checksums
from arvados.api import OrderedJsonModel
if isinstance(obj, dict):
if obj.get("writable"):
raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
- if obj.get("class") == "CommandLineTool":
- if self.work_api == "containers":
- if obj.get("stdin"):
- raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
- if obj.get("stderr"):
- raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
if obj.get("class") == "DockerRequirement":
if obj.get("dockerOutputDirectory"):
# TODO: can be supported by containers API, but not jobs API.
self.set_crunch_output()
if kwargs.get("compute_checksum"):
- adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
+ adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
return (self.final_output, self.final_status)
return parser
def add_arv_hints():
- cache = {}
cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
- cache["http://arvados.org/cwl"] = res.read()
+ use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
res.close()
- document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
- _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
- for n in extnames.names:
- if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
- cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
- document_loader.idx["http://arvados.org/cwl#"+n] = {}
+ cwltool.process.supportedProcessRequirements.extend([
+ "http://arvados.org/cwl#RunInSingleContainer",
+ "http://arvados.org/cwl#OutputDirType",
+ "http://arvados.org/cwl#RuntimeConstraints",
+ "http://arvados.org/cwl#PartitionRequirement",
+ "http://arvados.org/cwl#APIRequirement",
+ "http://commonwl.org/cwltool#LoadListingRequirement"
+ ])
+
def main(args, stdout, stderr, api_client=None, keep_client=None):
parser = arg_parser()
keep_client=keep_client,
num_retries=runner.num_retries),
resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
- logger_handler=arvados.log_handler)
+ logger_handler=arvados.log_handler,
+ custom_schema_callback=add_arv_hints)
$base: "http://arvados.org/cwl#"
+$namespaces:
+ cwl: "https://w3id.org/cwl/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
$graph:
+- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
+
+- name: cwltool:LoadListingRequirement
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ fields:
+ class:
+ type: string
+ doc: "Always 'LoadListingRequirement'"
+ jsonldPredicate:
+ "_id": "@type"
+ "_type": "@vocab"
+ loadListing:
+ type:
+ - "null"
+ - type: enum
+ name: LoadListingEnum
+ symbols: [no_listing, shallow_listing, deep_listing]
+
- name: RunInSingleContainer
type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
doc: |
Indicates that a subworkflow should run in a single container
and not be scheduled as separate steps.
- name: RuntimeConstraints
type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
doc: |
Set Arvados-specific runtime hints.
fields:
- name: PartitionRequirement
type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
doc: |
Select preferred compute partitions on which to run jobs.
fields:
- name: APIRequirement
type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
doc: |
Indicates that process wants to access to the Arvados API. Will be granted
limited network access and have ARVADOS_API_HOST and ARVADOS_API_TOKEN set
import logging
import json
import os
+import urllib
import ruamel.yaml as yaml
dirs = set()
for f in self.pathmapper.files():
- pdh, p, tp = self.pathmapper.mapper(f)
+ pdh, p, tp, stg = self.pathmapper.mapper(f)
if tp == "Directory" and '/' not in pdh:
mounts[p] = {
"kind": "collection",
dirs.add(pdh)
for f in self.pathmapper.files():
- res, p, tp = self.pathmapper.mapper(f)
+ res, p, tp, stg = self.pathmapper.mapper(f)
if res.startswith("keep:"):
res = res[5:]
elif res.startswith("/keep/"):
"portable_data_hash": pdh
}
if len(sp) == 2:
- mounts[p]["path"] = sp[1]
+ mounts[p]["path"] = urllib.unquote(sp[1])
with Perf(metrics, "generatefiles %s" % self.name):
if self.generatefiles["listing"]:
container_request["environment"].update(self.environment)
if self.stdin:
- raise UnsupportedRequirement("Stdin redirection currently not suppported")
+ sp = self.stdin[6:].split("/", 1)
+ mounts["stdin"] = {"kind": "collection",
+ "portable_data_hash": sp[0],
+ "path": sp[1]}
if self.stderr:
- raise UnsupportedRequirement("Stderr redirection currently not suppported")
+ mounts["stderr"] = {"kind": "file",
+ "path": "%s/%s" % (self.outdir, self.stderr)}
if self.stdout:
mounts["stdout"] = {"kind": "file",
import functools
from arvados.api import OrderedJsonModel
-from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
+from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, normalizeFilesDirs
from cwltool.load_tool import load_tool
from cwltool.errors import WorkflowException
adjustFileObjs(job_order_object, keeppathObj)
adjustDirObjs(job_order_object, keeppathObj)
normalizeFilesDirs(job_order_object)
- adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
output_name = None
output_tags = None
def exists(self, fn):
collection, rest = self.get_collection(fn)
if collection:
- return collection.exists(rest)
+ if rest:
+ return collection.exists(rest)
+ else:
+ return True
else:
return super(CollectionFsAccess, self).exists(fn)
import logging
import uuid
import os
+import urllib
import arvados.commands.run
import arvados.collection
"""Convert container-local paths to and from Keep collection ids."""
pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
- pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.+)?$')
+ pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.*)?$')
def __init__(self, arvrunner, referenced_files, input_basedir,
collection_pattern, file_pattern, name=None, **kwargs):
def visit(self, srcobj, uploadfiles):
src = srcobj["location"]
- if srcobj["class"] == "File":
- if "#" in src:
- src = src[:src.index("#")]
- if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
- self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
- if src not in self._pathmap:
+ if "#" in src:
+ src = src[:src.index("#")]
+
+ if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
+
+ if src not in self._pathmap:
+ if src.startswith("file:"):
# Local FS ref, may need to be uploaded or may be on keep
# mount.
ab = abspath(src, self.input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s")
+ st = arvados.commands.run.statfile("", ab,
+ fnPattern="keep:%s/%s",
+ dirPattern="keep:%s/%s")
with SourceLine(srcobj, "location", WorkflowException):
if isinstance(st, arvados.commands.run.UploadFile):
uploadfiles.add((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
- elif src.startswith("_:"):
- if "contents" in srcobj:
- pass
- else:
- raise WorkflowException("File literal '%s' is missing contents" % src)
- elif src.startswith("arvwf:"):
- self._pathmap[src] = MapperEnt(src, src, "File")
+ self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
else:
raise WorkflowException("Input file path '%s' is invalid" % st)
- if "secondaryFiles" in srcobj:
- for l in srcobj["secondaryFiles"]:
- self.visit(l, uploadfiles)
- elif srcobj["class"] == "Directory":
- if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
- self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "Directory")
+ elif src.startswith("_:"):
+ if srcobj["class"] == "File" and "contents" not in srcobj:
+ raise WorkflowException("File literal '%s' is missing `contents`" % src)
+ if srcobj["class"] == "Directory" and "listing" not in srcobj:
+ raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+ else:
+ self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+
+ with SourceLine(srcobj, "secondaryFiles", WorkflowException):
+ for l in srcobj.get("secondaryFiles", []):
+ self.visit(l, uploadfiles)
+ with SourceLine(srcobj, "listing", WorkflowException):
for l in srcobj.get("listing", []):
self.visit(l, uploadfiles)
for l in obj.get("secondaryFiles", []):
self.addentry(l, c, path, subdirs)
elif obj["class"] == "Directory":
- for l in obj["listing"]:
+ for l in obj.get("listing", []):
self.addentry(l, c, path + "/" + obj["basename"], subdirs)
subdirs.append((obj["location"], path + "/" + obj["basename"]))
elif obj["location"].startswith("_:") and "contents" in obj:
loc = k["location"]
if loc in already_uploaded:
v = already_uploaded[loc]
- self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % v.resolved[5:], "File")
+ self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File", True)
for srcobj in referenced_files:
self.visit(srcobj, uploadfiles)
project=self.arvrunner.project_uuid)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % st.fn[5:], "File")
+ self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+ "Directory" if os.path.isdir(ab) else "File", True)
self.arvrunner.add_uploaded(src, self._pathmap[src])
for srcobj in referenced_files:
+ subdirs = []
if srcobj["class"] == "Directory":
if srcobj["location"] not in self._pathmap:
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- subdirs = []
- for l in srcobj["listing"]:
+ for l in srcobj.get("listing", []):
self.addentry(l, c, ".", subdirs)
check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
c.save_new(owner_uuid=self.arvrunner.project_uuid)
ab = self.collection_pattern % c.portable_data_hash()
- self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "Directory")
- for loc, sub in subdirs:
- ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
- self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+ self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
(srcobj["location"].startswith("_:") and "contents" in srcobj)):
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries )
- subdirs = []
self.addentry(srcobj, c, ".", subdirs)
check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
c.save_new(owner_uuid=self.arvrunner.project_uuid)
ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
- self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "File")
+ self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
+ ab, "File", True)
if srcobj.get("secondaryFiles"):
ab = self.collection_pattern % c.portable_data_hash()
- self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt(ab, ab, "Directory")
+ self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
+
+ if subdirs:
for loc, sub in subdirs:
+ # subdirs will all start with "./", strip it off
ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
- self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+ self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
+ ab, "Directory", True)
self.keepdir = None
class StagingPathMapper(PathMapper):
_follow_dirs = True
- def visit(self, obj, stagedir, basedir, copy=False):
+ def visit(self, obj, stagedir, basedir, copy=False, staged=False):
# type: (Dict[unicode, Any], unicode, unicode, bool) -> None
loc = obj["location"]
tgt = os.path.join(stagedir, obj["basename"])
if obj["class"] == "Directory":
- self._pathmap[loc] = MapperEnt(loc, tgt, "Directory")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
if loc.startswith("_:") or self._follow_dirs:
self.visitlisting(obj.get("listing", []), tgt, basedir)
elif obj["class"] == "File":
if loc in self._pathmap:
return
if "contents" in obj and loc.startswith("_:"):
- self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
+ self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
else:
if copy:
- self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
else:
- self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
# with any secondary files.
self.visitlisting(referenced_files, self.stagedir, basedir)
- for path, (ab, tgt, type) in self._pathmap.items():
+ for path, (ab, tgt, type, staged) in self._pathmap.items():
if type in ("File", "Directory") and ab.startswith("keep:"):
- self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
+ self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
class NoFollowPathMapper(StagingPathMapper):
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170224141733',
- 'schema-salad==2.2.20170222151604',
+ 'cwltool==1.0.20170329142446',
+ 'schema-salad==2.5.20170328195758',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
- 'arvados-python-client>=0.1.20170112173420',
+ 'arvados-python-client>=0.1.20170327195441',
'setuptools'
],
data_files=[
export ARVADOS_API_HOST_INSECURE=1
export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
+arv-keepdocker --pull arvados/jobs latest
+
cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
#!/bin/sh
exec arvados-cwl-runner --api=jobs --compute-checksum \\\$@
if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
arv-put --portable-data-hash testdir
fi
-exec cwltest --test arvados-tests.yml --tool $PWD/runner.sh
+exec cwltest --test arvados-tests.yml --tool $PWD/runner.sh $@
}
tool: keep-dir-test-input.cwl
doc: Test directory in keep
+
+- job: dir-job2.yml
+ output:
+ "outlist": {
+ "size": 20,
+ "location": "output.txt",
+ "class": "File",
+ "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+ }
+ tool: keep-dir-test-input.cwl
+ doc: Test directory in keep
+
+- job: null
+ output:
+ "outlist": {
+ "size": 20,
+ "location": "output.txt",
+ "class": "File",
+ "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+ }
+ tool: keep-dir-test-input2.cwl
+ doc: Test default directory in keep
+
+- job: null
+ output:
+ "outlist": {
+ "size": 20,
+ "location": "output.txt",
+ "class": "File",
+ "checksum": "sha1$13cda8661796ae241da3a18668fb552161a72592"
+ }
+ tool: keep-dir-test-input3.cwl
+ doc: Test default directory in keep
+
+- job: octo.yml
+ output: {}
+ tool: cat.cwl
+ doc: Test hashes in filenames
+
+- job: listing-job.yml
+ output: {
+ "out": {
+ "class": "File",
+ "location": "output.txt",
+ "size": 5,
+ "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+ }
+ }
+ tool: wf/listing_shallow.cwl
+ doc: test shallow directory listing
+
+- job: listing-job.yml
+ output: {
+ "out": {
+ "class": "File",
+ "location": "output.txt",
+ "size": 5,
+ "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+ }
+ }
+ tool: wf/listing_none.cwl
+ doc: test no directory listing
+
+- job: listing-job.yml
+ output: {
+ "out": {
+ "class": "File",
+ "location": "output.txt",
+ "size": 5,
+ "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+ }
+ }
+ tool: wf/listing_deep.cwl
+ doc: test deep directory listing
--- /dev/null
+cwlVersion: v1.0
+class: CommandLineTool
+inputs:
+ - id: inp
+ type: File
+ inputBinding: {}
+outputs: []
+baseCommand: cat
--- /dev/null
+indir:
+ class: Directory
+ location: keep:d7514270f356df848477718d58308cc4+94/
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+ - class: ShellCommandRequirement
+inputs:
+ indir:
+ type: Directory
+ inputBinding:
+ prefix: cd
+ position: -1
+ default:
+ class: Directory
+ location: keep:d7514270f356df848477718d58308cc4+94
+outputs:
+ outlist:
+ type: File
+ outputBinding:
+ glob: output.txt
+arguments: [
+ {shellQuote: false, valueFrom: "&&"},
+ "find", ".",
+ {shellQuote: false, valueFrom: "|"},
+ "sort"]
+stdout: output.txt
\ No newline at end of file
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+ - class: ShellCommandRequirement
+inputs:
+ indir:
+ type: Directory
+ inputBinding:
+ prefix: cd
+ position: -1
+ default:
+ class: Directory
+ location: keep:d7514270f356df848477718d58308cc4+94/
+outputs:
+ outlist:
+ type: File
+ outputBinding:
+ glob: output.txt
+arguments: [
+ {shellQuote: false, valueFrom: "&&"},
+ "find", ".",
+ {shellQuote: false, valueFrom: "|"},
+ "sort"]
+stdout: output.txt
\ No newline at end of file
--- /dev/null
+inp:
+ class: File
+ location: "octothorpe/item %231.txt"
\ No newline at end of file
for key in call_body:
self.assertEqual(call_body_expected.get(key), call_body.get(key))
+
+ # Test redirecting stdin/stdout/stderr
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_redirects(self, keepdocker):
+ arv_docker_clear_cache()
+
+ runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ tool = cmap({
+ "inputs": [],
+ "outputs": [],
+ "baseCommand": "ls",
+ "stdout": "stdout.txt",
+ "stderr": "stderr.txt",
+ "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+ })
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ arvtool.formatgraph = None
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
+ make_fs_access=make_fs_access, tmpdir="/tmp"):
+ j.run()
+ runner.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher({
+ 'environment': {
+ 'HOME': '/var/spool/cwl',
+ 'TMPDIR': '/tmp'
+ },
+ 'name': 'test_run_redirect',
+ 'runtime_constraints': {
+ 'vcpus': 1,
+ 'ram': 1073741824
+ },
+ 'use_existing': True,
+ 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {'kind': 'tmp'},
+ "stderr": {
+ "kind": "file",
+ "path": "/var/spool/cwl/stderr.txt"
+ },
+ "stdin": {
+ "kind": "collection",
+ "path": "file.txt",
+ "portable_data_hash": "99999999999999999999999999999996+99"
+ },
+ "stdout": {
+ "kind": "file",
+ "path": "/var/spool/cwl/stdout.txt"
+ },
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': 'arvados/jobs',
+ 'command': ['ls', '/var/spool/cwl'],
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {},
+ 'properties': {},
+ }))
+
@mock.patch("arvados.collection.Collection")
def test_done(self, col):
api = mock.MagicMock()
def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
pdh = "99999999999999999999999999999991+99"
for c in files:
+ c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
c.fn = fnPattern % (pdh, os.path.basename(c.fn))
class TestPathmap(unittest.TestCase):
"location": "keep:99999999999999999999999999999991+99/hw.py"
}], "", "/test/%s", "/test/%s/%s")
- self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
@mock.patch("arvados.commands.run.uploadfiles")
- def test_upload(self, upl):
+ @mock.patch("arvados.commands.run.statfile")
+ def test_upload(self, statfile, upl):
"""Test pathmapper uploading files."""
arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+ def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
+ st = arvados.commands.run.UploadFile("", "tests/hw.py")
+ return st
+
upl.side_effect = upload_mock
+ statfile.side_effect = statfile_mock
p = ArvPathMapper(arvrunner, [{
"class": "File",
- "location": "tests/hw.py"
+ "location": "file:tests/hw.py"
}], "", "/test/%s", "/test/%s/%s")
- self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
@mock.patch("arvados.commands.run.uploadfiles")
"""Test pathmapper handling previously uploaded files."""
arvrunner = arvados_cwl.ArvCwlRunner(self.api)
- arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
+ arvrunner.add_uploaded('file:tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File', staged=True))
upl.side_effect = upload_mock
p = ArvPathMapper(arvrunner, [{
"class": "File",
- "location": "tests/hw.py"
+ "location": "file:tests/hw.py"
}], "", "/test/%s", "/test/%s/%s")
- self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
@mock.patch("arvados.commands.run.uploadfiles")
p = ArvPathMapper(arvrunner, [{
"class": "File",
- "location": "tests/hw.py"
+ "location": "file:tests/hw.py"
}], "", "/test/%s", "/test/%s/%s")
- self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
arvrunner.project_uuid = ""
api.return_value = mock.MagicMock()
arvrunner.api = api.return_value
- arvrunner.api.links().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
- {"items": [], "items_available": 0, "offset": 0},
- {"items": [], "items_available": 0, "offset": 0},
- {"items": [{"created_at": "",
- "head_uuid": "",
- "link_class": "docker_image_hash",
- "name": "123456",
- "owner_uuid": "",
- "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
- {"items": [], "items_available": 0, "offset": 0},
- {"items": [{"created_at": "",
- "head_uuid": "",
+ arvrunner.api.links().list().execute.side_effect = ({"items": [{"created_at": "",
+ "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"link_class": "docker_image_repo+tag",
"name": "arvados/jobs:"+arvados_cwl.__version__,
"owner_uuid": "",
"link_class": "docker_image_hash",
"name": "123456",
"owner_uuid": "",
- "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0} ,
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}
)
find_one_image_hash.return_value = "123456"
- arvrunner.api.collections().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
- {"items": [{"uuid": "",
+ arvrunner.api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"owner_uuid": "",
"manifest_text": "",
"properties": ""
- }], "items_available": 1, "offset": 0},
- {"items": [{"uuid": ""}], "items_available": 1, "offset": 0})
+ }], "items_available": 1, "offset": 0},)
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
- self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+ self.assertEqual("arvados/jobs:"+arvados_cwl.__version__,
+ arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
class TestCreateTemplate(unittest.TestCase):
existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+ cwltool: "http://commonwl.org/cwltool#"
+requirements:
+ cwltool:LoadListingRequirement:
+ loadListing: deep_listing
+ InlineJavascriptRequirement: {}
+inputs:
+ d: Directory
+outputs:
+ out: stdout
+stdout: output.txt
+arguments:
+ [echo, "${if(inputs.d.listing[0].class === 'Directory' && inputs.d.listing[0].listing[0].class === 'Directory') {return 'true';} else {return 'false';}}"]
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+ cwltool: http://commonwl.org/cwltool#
+requirements:
+ cwltool:LoadListingRequirement:
+ loadListing: no_listing
+ InlineJavascriptRequirement: {}
+inputs:
+ d: Directory
+outputs:
+ out: stdout
+stdout: output.txt
+arguments:
+ [echo, "${if(inputs.d.listing === undefined) {return 'true';} else {return 'false';}}"]
\ No newline at end of file
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+ cwltool: http://commonwl.org/cwltool#
+requirements:
+ cwltool:LoadListingRequirement:
+ loadListing: shallow_listing
+ InlineJavascriptRequirement: {}
+inputs:
+ d: Directory
+outputs:
+ out: stdout
+stdout: output.txt
+arguments:
+ [echo, "${if(inputs.d.listing[0].class === 'Directory' && inputs.d.listing[0].listing === undefined) {return 'true';} else {return 'false';}}"]
out: [out]
run:
class: CommandLineTool
+ id: subtool
inputs:
sleeptime:
type: int
"run": {
"baseCommand": "sleep",
"class": "CommandLineTool",
+ "id": "#main/sleep1/subtool",
"inputs": [
{
- "id": "#main/sleep1/sleeptime",
+ "id": "#main/sleep1/subtool/sleeptime",
"inputBinding": {
"position": 1
},
],
"outputs": [
{
- "id": "#main/sleep1/out",
+ "id": "#main/sleep1/subtool/out",
"outputBinding": {
"outputEval": "out"
},
"git.curoverse.com/arvados.git/sdk/go/streamer"
"io"
"io/ioutil"
+ "log"
"math/rand"
"net"
"net/http"
+ "os"
+ "regexp"
"strings"
"time"
)
// log.Printf to DebugPrintf.
var DebugPrintf = func(string, ...interface{}) {}
+func init() {
+ var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+ if matchTrue.MatchString(os.Getenv("ARVADOS_DEBUG")) {
+ DebugPrintf = log.Printf
+ }
+}
+
type keepService struct {
Uuid string `json:"uuid"`
Hostname string `json:"service_host"`
-import gflags
import httplib
import httplib2
import logging
return body
-def _intercept_http_request(self, uri, **kwargs):
+def _intercept_http_request(self, uri, method="GET", **kwargs):
if (self.max_request_size and
kwargs.get('body') and
self.max_request_size < len(kwargs['body'])):
kwargs['headers']['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
- retryable = kwargs.get('method', 'GET') in [
+ retryable = method in [
'DELETE', 'GET', 'HEAD', 'OPTIONS', 'PUT']
retry_count = self._retry_count if retryable else 0
for _ in range(retry_count):
self._last_request_time = time.time()
try:
- return self.orig_http_request(uri, **kwargs)
+ return self.orig_http_request(uri, method, **kwargs)
except httplib.HTTPException:
_logger.debug("Retrying API request in %d s after HTTP error",
delay, exc_info=True)
delay = delay * self._retry_delay_backoff
self._last_request_time = time.time()
- return self.orig_http_request(uri, **kwargs)
+ return self.orig_http_request(uri, method, **kwargs)
def _patch_http_request(http, api_token):
http.arvados_api_token = api_token
try:
util.mkdir_dash_p(path)
except OSError:
- path = None
+ return None
return cache.SafeHTTPCache(path, max_age=60*60*24*2)
def api(version=None, cache=True, host=None, token=None, insecure=False, **kwargs):
--- /dev/null
+#!/usr/bin/env python
+
+import argparse
+import hashlib
+import os
+import re
+import string
+import sys
+import logging
+
+import arvados
+import arvados.commands._util as arv_cmd
+
+from arvados._version import __version__
+
+api_client = None
+logger = logging.getLogger('arvados.arv-get')
+
+parser = argparse.ArgumentParser(
+ description='Copy data from Keep to a local file or pipe.',
+ parents=[arv_cmd.retry_opt])
+parser.add_argument('--version', action='version',
+ version="%s %s" % (sys.argv[0], __version__),
+ help='Print version and exit.')
+parser.add_argument('locator', type=str,
+ help="""
+Collection locator, optionally with a file path or prefix.
+""")
+parser.add_argument('destination', type=str, nargs='?', default='-',
+ help="""
+Local file or directory where the data is to be written. Default: stdout.
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('--progress', action='store_true',
+ help="""
+Display human-readable progress on stderr (bytes and, if possible,
+percentage of total data size). This is the default behavior when it
+is not expected to interfere with the output: specifically, stderr is
+a tty _and_ either stdout is not a tty, or output is being written to
+named files rather than stdout.
+""")
+group.add_argument('--no-progress', action='store_true',
+ help="""
+Do not display human-readable progress on stderr.
+""")
+group.add_argument('--batch-progress', action='store_true',
+ help="""
+Display machine-readable progress on stderr (bytes and, if known,
+total data size).
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('--hash',
+ help="""
+Display the hash of each file as it is read from Keep, using the given
+hash algorithm. Supported algorithms include md5, sha1, sha224,
+sha256, sha384, and sha512.
+""")
+group.add_argument('--md5sum', action='store_const',
+ dest='hash', const='md5',
+ help="""
+Display the MD5 hash of each file as it is read from Keep.
+""")
+parser.add_argument('-n', action='store_true',
+ help="""
+Do not write any data -- just read from Keep, and report md5sums if
+requested.
+""")
+parser.add_argument('-r', action='store_true',
+ help="""
+Retrieve all files in the specified collection/prefix. This is the
+default behavior if the "locator" argument ends with a forward slash.
+""")
+group = parser.add_mutually_exclusive_group()
+group.add_argument('-f', action='store_true',
+ help="""
+Overwrite existing files while writing. The default behavior is to
+refuse to write *anything* if any of the output files already
+exist. As a special case, -f is not needed to write to stdout.
+""")
+group.add_argument('--skip-existing', action='store_true',
+ help="""
+Skip files that already exist. The default behavior is to refuse to
+write *anything* if any files exist that would have to be
+overwritten. This option causes even devices, sockets, and fifos to be
+skipped.
+""")
+
+def parse_arguments(arguments, stdout, stderr):
+ args = parser.parse_args(arguments)
+
+ if args.locator[-1] == os.sep:
+ args.r = True
+ if (args.r and
+ not args.n and
+ not (args.destination and
+ os.path.isdir(args.destination))):
+ parser.error('Destination is not a directory.')
+ if not args.r and (os.path.isdir(args.destination) or
+ args.destination[-1] == os.path.sep):
+ args.destination = os.path.join(args.destination,
+ os.path.basename(args.locator))
+ logger.debug("Appended source file name to destination directory: %s",
+ args.destination)
+
+ if args.destination == '/dev/stdout':
+ args.destination = "-"
+
+ if args.destination == '-':
+ # Normally you have to use -f to write to a file (or device) that
+ # already exists, but "-" and "/dev/stdout" are common enough to
+ # merit a special exception.
+ args.f = True
+ else:
+ args.destination = args.destination.rstrip(os.sep)
+
+ # Turn on --progress by default if stderr is a tty and output is
+ # either going to a named file, or going (via stdout) to something
+ # that isn't a tty.
+ if (not (args.batch_progress or args.no_progress)
+ and stderr.isatty()
+ and (args.destination != '-'
+ or not stdout.isatty())):
+ args.progress = True
+ return args
+
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+ global api_client
+
+ args = parse_arguments(arguments, stdout, stderr)
+ if api_client is None:
+ api_client = arvados.api('v1')
+
+ r = re.search(r'^(.*?)(/.*)?$', args.locator)
+ collection = r.group(1)
+ get_prefix = r.group(2)
+ if args.r and not get_prefix:
+ get_prefix = os.sep
+ try:
+ reader = arvados.CollectionReader(collection, num_retries=args.retries)
+ except Exception as error:
+ logger.error("failed to read collection: {}".format(error))
+ return 1
+
+ # User asked to download the collection's manifest
+ if not get_prefix:
+ if not args.n:
+ open_flags = os.O_CREAT | os.O_WRONLY
+ if not args.f:
+ open_flags |= os.O_EXCL
+ try:
+ if args.destination == "-":
+ stdout.write(reader.manifest_text())
+ else:
+ out_fd = os.open(args.destination, open_flags)
+ with os.fdopen(out_fd, 'wb') as out_file:
+ out_file.write(reader.manifest_text())
+ except (IOError, OSError) as error:
+ logger.error("can't write to '{}': {}".format(args.destination, error))
+ return 1
+ except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
+ logger.error("failed to download '{}': {}".format(collection, error))
+ return 1
+ return 0
+
+ # Scan the collection. Make an array of (stream, file, local
+ # destination filename) tuples, and add up total size to extract.
+ todo = []
+ todo_bytes = 0
+ try:
+ if get_prefix == os.sep:
+ item = reader
+ else:
+ item = reader.find('.' + get_prefix)
+
+ if isinstance(item, arvados.collection.Subcollection) or isinstance(item, arvados.collection.CollectionReader):
+ # If the user asked for a file and we got a subcollection, error out.
+ if get_prefix[-1] != os.sep:
+ logger.error("requested file '{}' is in fact a subcollection. Append a trailing '/' to download it.".format('.' + get_prefix))
+ return 1
+ # If the user asked stdout as a destination, error out.
+ elif args.destination == '-':
+ logger.error("cannot use 'stdout' as destination when downloading multiple files.")
+ return 1
+ # User asked for a subcollection, and that's what was found. Add up total size
+ # to download.
+ for s, f in files_in_collection(item):
+ dest_path = os.path.join(
+ args.destination,
+ os.path.join(s.stream_name(), f.name)[len(get_prefix)+1:])
+ if (not (args.n or args.f or args.skip_existing) and
+ os.path.exists(dest_path)):
+ logger.error('Local file %s already exists.' % (dest_path,))
+ return 1
+ todo += [(s, f, dest_path)]
+ todo_bytes += f.size()
+ elif isinstance(item, arvados.arvfile.ArvadosFile):
+ todo += [(item.parent, item, args.destination)]
+ todo_bytes += item.size()
+ else:
+ logger.error("'{}' not found.".format('.' + get_prefix))
+ return 1
+ except (IOError, arvados.errors.NotFoundError) as e:
+ logger.error(e)
+ return 1
+
+ out_bytes = 0
+ for s, f, outfilename in todo:
+ outfile = None
+ digestor = None
+ if not args.n:
+ if outfilename == "-":
+ outfile = stdout
+ else:
+ if args.skip_existing and os.path.exists(outfilename):
+ logger.debug('Local file %s exists. Skipping.', outfilename)
+ continue
+ elif not args.f and (os.path.isfile(outfilename) or
+ os.path.isdir(outfilename)):
+ # Good thing we looked again: apparently this file wasn't
+ # here yet when we checked earlier.
+ logger.error('Local file %s already exists.' % (outfilename,))
+ return 1
+ if args.r:
+ arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
+ try:
+ outfile = open(outfilename, 'wb')
+ except Exception as error:
+ logger.error('Open(%s) failed: %s' % (outfilename, error))
+ return 1
+ if args.hash:
+ digestor = hashlib.new(args.hash)
+ try:
+ with s.open(f.name, 'r') as file_reader:
+ for data in file_reader.readall():
+ if outfile:
+ outfile.write(data)
+ if digestor:
+ digestor.update(data)
+ out_bytes += len(data)
+ if args.progress:
+ stderr.write('\r%d MiB / %d MiB %.1f%%' %
+ (out_bytes >> 20,
+ todo_bytes >> 20,
+ (100
+ if todo_bytes==0
+ else 100.0*out_bytes/todo_bytes)))
+ elif args.batch_progress:
+ stderr.write('%s %d read %d total\n' %
+ (sys.argv[0], os.getpid(),
+ out_bytes, todo_bytes))
+ if digestor:
+ stderr.write("%s %s/%s\n"
+ % (digestor.hexdigest(), s.stream_name(), f.name))
+ except KeyboardInterrupt:
+ if outfile and (outfile.fileno() > 2) and not outfile.closed:
+ os.unlink(outfile.name)
+ break
+ finally:
+ if outfile != None and outfile != stdout:
+ outfile.close()
+
+ if args.progress:
+ stderr.write('\n')
+ return 0
+
+def files_in_collection(c):
+ # Sort first by file type, then alphabetically by file path.
+ for i in sorted(c.keys(),
+ key=lambda k: (
+ isinstance(c[k], arvados.collection.Subcollection),
+ k.upper())):
+ if isinstance(c[i], arvados.arvfile.ArvadosFile):
+ yield (c, c[i])
+ elif isinstance(c[i], arvados.collection.Subcollection):
+ for s, f in files_in_collection(c[i]):
+ yield (s, f)
from __future__ import print_function
import argparse
+import collections
+import logging
+import re
import sys
import arvados
from arvados._version import __version__
+FileInfo = collections.namedtuple('FileInfo', ['stream_name', 'name', 'size'])
+
def parse_args(args):
parser = argparse.ArgumentParser(
description='List contents of a manifest',
parents=[arv_cmd.retry_opt])
parser.add_argument('locator', type=str,
- help="""Collection UUID or locator""")
+ help="""Collection UUID or locator, optionally with a subdir path.""")
parser.add_argument('-s', action='store_true',
help="""List file sizes, in KiB.""")
parser.add_argument('--version', action='version',
return parser.parse_args(args)
def size_formatter(coll_file):
- return "{:>10}".format((coll_file.size() + 1023) / 1024)
+ return "{:>10}".format((coll_file.size + 1023) / 1024)
def name_formatter(coll_file):
- return "{}/{}".format(coll_file.stream_name(), coll_file.name)
+ return "{}/{}".format(coll_file.stream_name, coll_file.name)
-def main(args, stdout, stderr, api_client=None):
+def main(args, stdout, stderr, api_client=None, logger=None):
args = parse_args(args)
if api_client is None:
api_client = arvados.api('v1')
+ if logger is None:
+ logger = logging.getLogger('arvados.arv-ls')
+
try:
- cr = arvados.CollectionReader(args.locator, api_client=api_client,
+ r = re.search(r'^(.*?)(/.*)?$', args.locator)
+ collection = r.group(1)
+ get_prefix = r.group(2)
+
+ cr = arvados.CollectionReader(collection, api_client=api_client,
num_retries=args.retries)
- cr.normalize()
- except (arvados.errors.ArgumentError,
+ if get_prefix:
+ if get_prefix[-1] == '/':
+ get_prefix = get_prefix[:-1]
+ stream_name = '.' + get_prefix
+ reader = cr.find(stream_name)
+ if not (isinstance(reader, arvados.CollectionReader) or
+ isinstance(reader, arvados.collection.Subcollection)):
+ logger.error("'{}' is not a subdirectory".format(get_prefix))
+ return 1
+ else:
+ stream_name = '.'
+ reader = cr
+ except (arvados.errors.ApiError,
+ arvados.errors.ArgumentError,
arvados.errors.NotFoundError) as error:
- print("arv-ls: error fetching collection: {}".format(error),
- file=stderr)
+ logger.error("error fetching collection: {}".format(error))
return 1
formatters = []
formatters.append(size_formatter)
formatters.append(name_formatter)
- for f in cr.all_files():
+ for f in files_in_collection(reader, stream_name):
print(*(info_func(f) for info_func in formatters), file=stdout)
return 0
+
+def files_in_collection(c, stream_name='.'):
+ # Sort first by file type, then alphabetically by file path.
+ for i in sorted(c.keys(),
+ key=lambda k: (
+ isinstance(c[k], arvados.collection.Subcollection),
+ k.upper())):
+ if isinstance(c[i], arvados.arvfile.ArvadosFile):
+ yield FileInfo(stream_name=stream_name,
+ name=i,
+ size=c[i].size())
+ elif isinstance(c[i], arvados.collection.Subcollection):
+ for f in files_in_collection(c[i], "{}/{}".format(stream_name, i)):
+ yield f
with self._state_lock:
self._state['manifest'] = manifest
if self.use_cache:
- self._save_state()
+ try:
+ self._save_state()
+ except Exception as e:
+ self.logger.error("Unexpected error trying to save cache file: {}".format(e))
else:
self.bytes_written = self.bytes_skipped
# Call the reporter, if any
cache_filepath = os.path.join(
arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
cache_filename)
- if self.resume:
+ if self.resume and os.path.exists(cache_filepath):
+ self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'a+')
else:
# --no-resume means start with a empty cache file.
+ self.logger.info("Creating new cache file at {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'w+')
self._cache_filename = self._cache_file.name
self._lock_file(self._cache_file)
# Cache file empty, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
else:
+ self.logger.info("No cache usage requested for this run.")
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
"""
Atomically save current state into cache.
"""
+ with self._state_lock:
+ # We're not using copy.deepcopy() here because it's a lot slower
+ # than json.dumps(), and we're already needing JSON format to be
+ # saved on disk.
+ state = json.dumps(self._state)
try:
- with self._state_lock:
- # We're not using copy.deepcopy() here because it's a lot slower
- # than json.dumps(), and we're already needing JSON format to be
- # saved on disk.
- state = json.dumps(self._state)
- new_cache_fd, new_cache_name = tempfile.mkstemp(
- dir=os.path.dirname(self._cache_filename))
- self._lock_file(new_cache_fd)
- new_cache = os.fdopen(new_cache_fd, 'r+')
+ new_cache = tempfile.NamedTemporaryFile(
+ dir=os.path.dirname(self._cache_filename), delete=False)
+ self._lock_file(new_cache)
new_cache.write(state)
new_cache.flush()
os.fsync(new_cache)
- os.rename(new_cache_name, self._cache_filename)
+ os.rename(new_cache.name, self._cache_filename)
except (IOError, OSError, ResumeCacheConflict) as error:
self.logger.error("There was a problem while saving the cache file: {}".format(error))
try:
import cStringIO
+import collections
import datetime
import hashlib
import logging
except:
ua.close()
- @staticmethod
- def _socket_open(family, socktype, protocol, address=None):
+ def _socket_open(self, *args, **kwargs):
+ if len(args) + len(kwargs) == 2:
+ return self._socket_open_pycurl_7_21_5(*args, **kwargs)
+ else:
+ return self._socket_open_pycurl_7_19_3(*args, **kwargs)
+
+ def _socket_open_pycurl_7_19_3(self, family, socktype, protocol, address=None):
+ return self._socket_open_pycurl_7_21_5(
+ purpose=None,
+ address=collections.namedtuple(
+ 'Address', ['family', 'socktype', 'protocol', 'addr'],
+ )(family, socktype, protocol, address))
+
+ def _socket_open_pycurl_7_21_5(self, purpose, address):
"""Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
- s = socket.socket(family, socktype, protocol)
+ s = socket.socket(address.family, address.socktype, address.protocol)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Will throw invalid protocol error on mac. This test prevents that.
if hasattr(socket, 'TCP_KEEPIDLE'):
#!/usr/bin/env python
-import argparse
-import hashlib
-import os
-import re
-import string
import sys
-import logging
-import arvados
-import arvados.commands._util as arv_cmd
+from arvados.commands.get import main
-from arvados._version import __version__
-
-logger = logging.getLogger('arvados.arv-get')
-
-def abort(msg, code=1):
- print >>sys.stderr, "arv-get:", msg
- exit(code)
-
-parser = argparse.ArgumentParser(
- description='Copy data from Keep to a local file or pipe.',
- parents=[arv_cmd.retry_opt])
-parser.add_argument('--version', action='version',
- version="%s %s" % (sys.argv[0], __version__),
- help='Print version and exit.')
-parser.add_argument('locator', type=str,
- help="""
-Collection locator, optionally with a file path or prefix.
-""")
-parser.add_argument('destination', type=str, nargs='?', default='-',
- help="""
-Local file or directory where the data is to be written. Default: stdout.
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('--progress', action='store_true',
- help="""
-Display human-readable progress on stderr (bytes and, if possible,
-percentage of total data size). This is the default behavior when it
-is not expected to interfere with the output: specifically, stderr is
-a tty _and_ either stdout is not a tty, or output is being written to
-named files rather than stdout.
-""")
-group.add_argument('--no-progress', action='store_true',
- help="""
-Do not display human-readable progress on stderr.
-""")
-group.add_argument('--batch-progress', action='store_true',
- help="""
-Display machine-readable progress on stderr (bytes and, if known,
-total data size).
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('--hash',
- help="""
-Display the hash of each file as it is read from Keep, using the given
-hash algorithm. Supported algorithms include md5, sha1, sha224,
-sha256, sha384, and sha512.
-""")
-group.add_argument('--md5sum', action='store_const',
- dest='hash', const='md5',
- help="""
-Display the MD5 hash of each file as it is read from Keep.
-""")
-parser.add_argument('-n', action='store_true',
- help="""
-Do not write any data -- just read from Keep, and report md5sums if
-requested.
-""")
-parser.add_argument('-r', action='store_true',
- help="""
-Retrieve all files in the specified collection/prefix. This is the
-default behavior if the "locator" argument ends with a forward slash.
-""")
-group = parser.add_mutually_exclusive_group()
-group.add_argument('-f', action='store_true',
- help="""
-Overwrite existing files while writing. The default behavior is to
-refuse to write *anything* if any of the output files already
-exist. As a special case, -f is not needed to write to stdout.
-""")
-group.add_argument('--skip-existing', action='store_true',
- help="""
-Skip files that already exist. The default behavior is to refuse to
-write *anything* if any files exist that would have to be
-overwritten. This option causes even devices, sockets, and fifos to be
-skipped.
-""")
-
-args = parser.parse_args()
-
-if args.locator[-1] == os.sep:
- args.r = True
-if (args.r and
- not args.n and
- not (args.destination and
- os.path.isdir(args.destination))):
- parser.error('Destination is not a directory.')
-if not args.r and (os.path.isdir(args.destination) or
- args.destination[-1] == os.path.sep):
- args.destination = os.path.join(args.destination,
- os.path.basename(args.locator))
- logger.debug("Appended source file name to destination directory: %s",
- args.destination)
-
-if args.destination == '/dev/stdout':
- args.destination = "-"
-
-if args.destination == '-':
- # Normally you have to use -f to write to a file (or device) that
- # already exists, but "-" and "/dev/stdout" are common enough to
- # merit a special exception.
- args.f = True
-else:
- args.destination = args.destination.rstrip(os.sep)
-
-# Turn on --progress by default if stderr is a tty and output is
-# either going to a named file, or going (via stdout) to something
-# that isn't a tty.
-if (not (args.batch_progress or args.no_progress)
- and sys.stderr.isatty()
- and (args.destination != '-'
- or not sys.stdout.isatty())):
- args.progress = True
-
-
-r = re.search(r'^(.*?)(/.*)?$', args.locator)
-collection = r.group(1)
-get_prefix = r.group(2)
-if args.r and not get_prefix:
- get_prefix = os.sep
-api_client = arvados.api('v1')
-reader = arvados.CollectionReader(collection, num_retries=args.retries)
-
-if not get_prefix:
- if not args.n:
- open_flags = os.O_CREAT | os.O_WRONLY
- if not args.f:
- open_flags |= os.O_EXCL
- try:
- if args.destination == "-":
- sys.stdout.write(reader.manifest_text())
- else:
- out_fd = os.open(args.destination, open_flags)
- with os.fdopen(out_fd, 'wb') as out_file:
- out_file.write(reader.manifest_text())
- except (IOError, OSError) as error:
- abort("can't write to '{}': {}".format(args.destination, error))
- except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
- abort("failed to download '{}': {}".format(collection, error))
- sys.exit(0)
-
-reader.normalize()
-
-# Scan the collection. Make an array of (stream, file, local
-# destination filename) tuples, and add up total size to extract.
-todo = []
-todo_bytes = 0
-try:
- for s in reader.all_streams():
- for f in s.all_files():
- if get_prefix and get_prefix[-1] == os.sep:
- if 0 != string.find(os.path.join(s.name(), f.name()),
- '.' + get_prefix):
- continue
- if args.destination == "-":
- dest_path = "-"
- else:
- dest_path = os.path.join(
- args.destination,
- os.path.join(s.name(), f.name())[len(get_prefix)+1:])
- if (not (args.n or args.f or args.skip_existing) and
- os.path.exists(dest_path)):
- abort('Local file %s already exists.' % (dest_path,))
- else:
- if os.path.join(s.name(), f.name()) != '.' + get_prefix:
- continue
- dest_path = args.destination
- todo += [(s, f, dest_path)]
- todo_bytes += f.size()
-except arvados.errors.NotFoundError as e:
- abort(e)
-
-# Read data, and (if not -n) write to local file(s) or pipe.
-
-out_bytes = 0
-for s,f,outfilename in todo:
- outfile = None
- digestor = None
- if not args.n:
- if outfilename == "-":
- outfile = sys.stdout
- else:
- if args.skip_existing and os.path.exists(outfilename):
- logger.debug('Local file %s exists. Skipping.', outfilename)
- continue
- elif not args.f and (os.path.isfile(outfilename) or
- os.path.isdir(outfilename)):
- # Good thing we looked again: apparently this file wasn't
- # here yet when we checked earlier.
- abort('Local file %s already exists.' % (outfilename,))
- if args.r:
- arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
- try:
- outfile = open(outfilename, 'wb')
- except Exception as error:
- abort('Open(%s) failed: %s' % (outfilename, error))
- if args.hash:
- digestor = hashlib.new(args.hash)
- try:
- for data in f.readall():
- if outfile:
- outfile.write(data)
- if digestor:
- digestor.update(data)
- out_bytes += len(data)
- if args.progress:
- sys.stderr.write('\r%d MiB / %d MiB %.1f%%' %
- (out_bytes >> 20,
- todo_bytes >> 20,
- (100
- if todo_bytes==0
- else 100.0*out_bytes/todo_bytes)))
- elif args.batch_progress:
- sys.stderr.write('%s %d read %d total\n' %
- (sys.argv[0], os.getpid(),
- out_bytes, todo_bytes))
- if digestor:
- sys.stderr.write("%s %s/%s\n"
- % (digestor.hexdigest(), s.name(), f.name()))
- except KeyboardInterrupt:
- if outfile and (outfile.fileno() > 2) and not outfile.closed:
- os.unlink(outfile.name)
- break
-
-if args.progress:
- sys.stderr.write('\n')
+sys.exit(main(sys.argv[1:], sys.stdout, sys.stderr))
('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
],
install_requires=[
- 'google-api-python-client==1.4.2',
- 'oauth2client >=1.4.6, <2',
+ 'google-api-python-client==1.6.2, <1.7',
'ciso8601',
- 'httplib2',
- 'pycurl >=7.19.5.1, <7.21.5',
- 'python-gflags<3.0',
+ 'httplib2 >= 0.9.2',
+ 'pycurl >=7.19.5.1',
'setuptools',
'ws4py<0.4',
- 'ruamel.yaml==0.13.7'
+ 'ruamel.yaml>=0.13.7'
],
test_suite='tests',
tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+import io
+import shutil
+import tempfile
+
+import arvados
+import arvados.collection as collection
+import arvados.commands.get as arv_get
+import run_test_server
+
+from arvados_testutil import redirected_streams
+
+class ArvadosGetTestCase(run_test_server.TestCaseWithServers):
+ MAIN_SERVER = {}
+ KEEP_SERVER = {}
+
+ def setUp(self):
+ super(ArvadosGetTestCase, self).setUp()
+ self.tempdir = tempfile.mkdtemp()
+ self.col_loc, self.col_pdh, self.col_manifest = self.write_test_collection()
+
+ def tearDown(self):
+ super(ArvadosGetTestCase, self).tearDown()
+ shutil.rmtree(self.tempdir)
+
+ def write_test_collection(self,
+ contents = {
+ 'foo.txt' : 'foo',
+ 'bar.txt' : 'bar',
+ 'subdir/baz.txt' : 'baz',
+ }):
+ c = collection.Collection()
+ for path, data in contents.items():
+ with c.open(path, 'w') as f:
+ f.write(data)
+ c.save_new()
+ return (c.manifest_locator(), c.portable_data_hash(), c.manifest_text())
+
+ def run_get(self, args):
+ self.stdout = io.BytesIO()
+ self.stderr = io.BytesIO()
+ return arv_get.main(args, self.stdout, self.stderr)
+
+ def test_version_argument(self):
+ err = io.BytesIO()
+ out = io.BytesIO()
+ with redirected_streams(stdout=out, stderr=err):
+ with self.assertRaises(SystemExit):
+ self.run_get(['--version'])
+ self.assertEqual(out.getvalue(), '')
+ self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
+
+ def test_get_single_file(self):
+ # Get the file using the collection's locator
+ r = self.run_get(["{}/subdir/baz.txt".format(self.col_loc), '-'])
+ self.assertEqual(0, r)
+ self.assertEqual('baz', self.stdout.getvalue())
+ # Then, try by PDH
+ r = self.run_get(["{}/subdir/baz.txt".format(self.col_pdh), '-'])
+ self.assertEqual(0, r)
+ self.assertEqual('baz', self.stdout.getvalue())
+
+ def test_get_multiple_files(self):
+ # Download the entire collection to the temp directory
+ r = self.run_get(["{}/".format(self.col_loc), self.tempdir])
+ self.assertEqual(0, r)
+ with open("{}/foo.txt".format(self.tempdir), "r") as f:
+ self.assertEqual("foo", f.read())
+ with open("{}/bar.txt".format(self.tempdir), "r") as f:
+ self.assertEqual("bar", f.read())
+ with open("{}/subdir/baz.txt".format(self.tempdir), "r") as f:
+ self.assertEqual("baz", f.read())
+
+ def test_get_collection_manifest(self):
+ # Get the collection manifest
+ r = self.run_get([self.col_loc, self.tempdir])
+ self.assertEqual(0, r)
+ with open("{}/{}".format(self.tempdir, self.col_loc), "r") as f:
+ self.assertEqual(self.col_manifest, f.read())
+
+ def test_invalid_collection(self):
+ # Asking for an invalid collection should generate an error.
+ r = self.run_get(['this-uuid-seems-to-be-fake', self.tempdir])
+ self.assertNotEqual(0, r)
+
+ def test_invalid_file_request(self):
+ # Asking for an inexistant file within a collection should generate an error.
+ r = self.run_get(["{}/im-not-here.txt".format(self.col_loc), self.tempdir])
+ self.assertNotEqual(0, r)
+
+ def test_invalid_destination(self):
+ # Asking to place the collection's files on a non existant directory
+ # should generate an error.
+ r = self.run_get([self.col_loc, "/fake/subdir/"])
+ self.assertNotEqual(0, r)
+
+ def test_preexistent_destination(self):
+ # Asking to place a file with the same path as a local one should
+ # generate an error and avoid overwrites.
+ with open("{}/foo.txt".format(self.tempdir), "w") as f:
+ f.write("another foo")
+ r = self.run_get(["{}/foo.txt".format(self.col_loc), self.tempdir])
+ self.assertNotEqual(0, r)
+ with open("{}/foo.txt".format(self.tempdir), "r") as f:
+ self.assertEqual("another foo", f.read())
+
api_client.collections().get().execute.return_value = coll_info
return coll_info, api_client
- def run_ls(self, args, api_client):
+ def run_ls(self, args, api_client, logger=None):
self.stdout = io.BytesIO()
self.stderr = io.BytesIO()
- return arv_ls.main(args, self.stdout, self.stderr, api_client)
+ return arv_ls.main(args, self.stdout, self.stderr, api_client, logger)
def test_plain_listing(self):
collection, api_client = self.mock_api_for_manifest(
def test_locator_failure(self):
api_client = mock.MagicMock(name='mock_api_client')
+ error_mock = mock.MagicMock()
+ logger = mock.MagicMock()
+ logger.error = error_mock
api_client.collections().get().execute.side_effect = (
arv_error.NotFoundError)
- self.assertNotEqual(0, self.run_ls([self.FAKE_UUID], api_client))
- self.assertNotEqual('', self.stderr.getvalue())
+ self.assertNotEqual(0, self.run_ls([self.FAKE_UUID], api_client, logger))
+ self.assertEqual(1, error_mock.call_count)
def test_version_argument(self):
err = io.BytesIO()
import md5
import mock
-import shutil
+import os
import random
+import shutil
import sys
import tempfile
import threading
import unittest
import arvados.cache
+import arvados
import run_test_server
def tearDown(self):
shutil.rmtree(self._dir)
+ def test_cache_create_error(self):
+ _, filename = tempfile.mkstemp()
+ home_was = os.environ['HOME']
+ os.environ['HOME'] = filename
+ try:
+ c = arvados.http_cache('test')
+ self.assertEqual(None, c)
+ finally:
+ os.environ['HOME'] = home_was
+ os.unlink(filename)
+
def test_cache_crud(self):
c = arvados.cache.SafeHTTPCache(self._dir, max_age=0)
url = 'https://example.com/foo?bar=baz'
s.add_dependency('google-api-client', '>= 0.7', '< 0.8.9')
# work around undeclared dependency on i18n in some activesupport 3.x.x:
s.add_dependency('i18n', '~> 0')
- s.add_dependency('json', '~> 1.7', '>= 1.7.7')
+ s.add_dependency('json', '>= 1.7.7', '<3')
s.add_runtime_dependency('jwt', '<2', '>= 0.1.5')
s.homepage =
'https://arvados.org'
attr_writer :resource_attrs
- MAX_UNIQUE_NAME_ATTEMPTS = 10
-
begin
rescue_from(Exception,
ArvadosModel::PermissionDeniedError,
def create
@object = model_class.new resource_attrs
- if @object.respond_to? :name and params[:ensure_unique_name]
- # Record the original name. See below.
- name_stem = @object.name
- retries = MAX_UNIQUE_NAME_ATTEMPTS
+ if @object.respond_to?(:name) && params[:ensure_unique_name]
+ @object.save_with_unique_name!
else
- retries = 0
- end
-
- begin
@object.save!
- rescue ActiveRecord::RecordNotUnique => rn
- raise unless retries > 0
- retries -= 1
-
- # Dig into the error to determine if it is specifically calling out a
- # (owner_uuid, name) uniqueness violation. In this specific case, and
- # the client requested a unique name with ensure_unique_name==true,
- # update the name field and try to save again. Loop as necessary to
- # discover a unique name. It is necessary to handle name choosing at
- # this level (as opposed to the client) to ensure that record creation
- # never fails due to a race condition.
- raise unless rn.original_exception.is_a? PG::UniqueViolation
-
- # Unfortunately ActiveRecord doesn't abstract out any of the
- # necessary information to figure out if this the error is actually
- # the specific case where we want to apply the ensure_unique_name
- # behavior, so the following code is specialized to Postgres.
- err = rn.original_exception
- detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
- raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
-
- @object.uuid = nil
-
- new_name = "#{name_stem} (#{db_current_time.utc.iso8601(3)})"
- if new_name == @object.name
- # If the database is fast enough to do two attempts in the
- # same millisecond, we need to wait to ensure we try a
- # different timestamp on each attempt.
- sleep 0.002
- new_name = "#{name_stem} (#{db_current_time.utc.iso8601(3)})"
- end
- @object.name = new_name
- retry
end
+
show
end
@objects = model_class.where('last_ping_at >= ?', db_current_time - 1.hours)
end
super
- job_uuids = @objects.map { |n| n[:job_uuid] }.compact
- assoc_jobs = readable_job_uuids(job_uuids)
- @objects.each do |node|
- node.job_readable = assoc_jobs.include?(node[:job_uuid])
+ if @select.nil? or @select.include? 'job_uuid'
+ job_uuids = @objects.map { |n| n[:job_uuid] }.compact
+ assoc_jobs = readable_job_uuids(job_uuids)
+ @objects.each do |node|
+ node.job_readable = assoc_jobs.include?(node[:job_uuid])
+ end
end
end
permission_link_classes: ['permission', 'resources'])
end
+ def save_with_unique_name!
+ uuid_was = uuid
+ name_was = name
+ max_retries = 2
+ transaction do
+ conn = ActiveRecord::Base.connection
+ conn.exec_query 'SAVEPOINT save_with_unique_name'
+ begin
+ save!
+ rescue ActiveRecord::RecordNotUnique => rn
+ raise if max_retries == 0
+ max_retries -= 1
+
+ conn.exec_query 'ROLLBACK TO SAVEPOINT save_with_unique_name'
+
+ # Dig into the error to determine if it is specifically calling out a
+ # (owner_uuid, name) uniqueness violation. In this specific case, and
+ # the client requested a unique name with ensure_unique_name==true,
+ # update the name field and try to save again. Loop as necessary to
+ # discover a unique name. It is necessary to handle name choosing at
+ # this level (as opposed to the client) to ensure that record creation
+ # never fails due to a race condition.
+ err = rn.original_exception
+ raise unless err.is_a?(PG::UniqueViolation)
+
+ # Unfortunately ActiveRecord doesn't abstract out any of the
+ # necessary information to figure out if this the error is actually
+ # the specific case where we want to apply the ensure_unique_name
+ # behavior, so the following code is specialized to Postgres.
+ detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
+ raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
+
+ new_name = "#{name_was} (#{db_current_time.utc.iso8601(3)})"
+ if new_name == name
+ # If the database is fast enough to do two attempts in the
+ # same millisecond, we need to wait to ensure we try a
+ # different timestamp on each attempt.
+ sleep 0.002
+ new_name = "#{name_was} (#{db_current_time.utc.iso8601(3)})"
+ end
+
+ self[:name] = new_name
+ self[:uuid] = nil if uuid_was.nil? && !uuid.nil?
+ conn.exec_query 'SAVEPOINT save_with_unique_name'
+ retry
+ ensure
+ conn.exec_query 'RELEASE SAVEPOINT save_with_unique_name'
+ end
+ end
+ end
+
def logged_attributes
attributes.except(*Rails.configuration.unlogged_attributes)
end
raise PermissionDeniedError
end
- # Verify "write" permission on old owner
- # default fail unless one of:
- # owner_uuid did not change
- # previous owner_uuid is nil
- # current user is the old owner
- # current user is this object
- # current user can_write old owner
- unless !owner_uuid_changed? or
- owner_uuid_was.nil? or
- current_user.uuid == self.owner_uuid_was or
- current_user.uuid == self.uuid or
- current_user.can? write: self.owner_uuid_was
- logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{uuid} but does not have permission to write old owner_uuid #{owner_uuid_was}"
- errors.add :owner_uuid, "cannot be changed without write permission on old owner"
- raise PermissionDeniedError
- end
-
- # Verify "write" permission on new owner
- # default fail unless one of:
- # current_user is this object
- # current user can_write new owner, or this object if owner unchanged
- if new_record? or owner_uuid_changed? or is_a?(ApiClientAuthorization)
- write_target = owner_uuid
+ if new_record? || owner_uuid_changed?
+ # Permission on owner_uuid_was is needed to move an existing
+ # object away from its previous owner (which implies permission
+ # to modify this object itself, so we don't need to check that
+ # separately). Permission on the new owner_uuid is also needed.
+ [['old', owner_uuid_was],
+ ['new', owner_uuid]
+ ].each do |which, check_uuid|
+ if check_uuid.nil?
+ # old_owner_uuid is nil? New record, no need to check.
+ elsif !current_user.can?(write: check_uuid)
+ logger.warn "User #{current_user.uuid} tried to set ownership of #{self.class.to_s} #{self.uuid} but does not have permission to write #{which} owner_uuid #{check_uuid}"
+ errors.add :owner_uuid, "cannot be set or changed without write permission on #{which} owner"
+ raise PermissionDeniedError
+ end
+ end
else
- write_target = uuid
- end
- unless current_user == self or current_user.can? write: write_target
- logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{uuid} but does not have permission to write new owner_uuid #{owner_uuid}"
- errors.add :owner_uuid, "cannot be changed without write permission on new owner"
- raise PermissionDeniedError
+ # If the object already existed and we're not changing
+ # owner_uuid, we only need write permission on the object
+ # itself.
+ if !current_user.can?(write: self.uuid)
+ logger.warn "User #{current_user.uuid} tried to modify #{self.class.to_s} #{self.uuid} without write permission"
+ errors.add :uuid, "is not writable"
+ raise PermissionDeniedError
+ end
end
true
true
end
- # If trash_at is updated without touching delete_at, automatically
- # update delete_at to a sensible value.
def default_trash_interval
if trash_at_changed? && !delete_at_changed?
+ # If trash_at is updated without touching delete_at,
+ # automatically update delete_at to a sensible value.
if trash_at.nil?
self.delete_at = nil
else
self.delete_at = trash_at + Rails.configuration.default_trash_lifetime.seconds
end
+ elsif !trash_at || !delete_at || trash_at > delete_at
+ # Not trash, or bogus arguments? Just validate in
+ # validate_trash_and_delete_timing.
+ elsif delete_at_changed? && delete_at >= trash_at
+ # Fix delete_at if needed, so it's not earlier than the expiry
+ # time on any permission tokens that might have been given out.
+
+ # In any case there are no signatures expiring after now+TTL.
+ # Also, if the existing trash_at time has already passed, we
+ # know we haven't given out any signatures since then.
+ earliest_delete = [
+ @validation_timestamp,
+ trash_at_was,
+ ].compact.min + Rails.configuration.blob_signature_ttl.seconds
+
+ # The previous value of delete_at is also an upper bound on the
+ # longest-lived permission token. For example, if TTL=14,
+ # trash_at_was=now-7, delete_at_was=now+7, then it is safe to
+ # set trash_at=now+6, delete_at=now+8.
+ earliest_delete = [earliest_delete, delete_at_was].compact.min
+
+ # If delete_at is too soon, use the earliest possible time.
+ if delete_at < earliest_delete
+ self.delete_at = earliest_delete
+ end
end
end
def validate_trash_and_delete_timing
if trash_at.nil? != delete_at.nil?
errors.add :delete_at, "must be set if trash_at is set, and must be nil otherwise"
- end
-
- earliest_delete = ([@validation_timestamp, trash_at_was].compact.min +
- Rails.configuration.blob_signature_ttl.seconds)
- if delete_at && delete_at < earliest_delete
- errors.add :delete_at, "#{delete_at} is too soon: earliest allowed is #{earliest_delete}"
- end
-
- if delete_at && delete_at < trash_at
+ elsif delete_at && delete_at < trash_at
errors.add :delete_at, "must not be earlier than trash_at"
end
-
true
end
end
end
end
+ # Create a new container (or find an existing one) to satisfy the
+ # given container request.
+ def self.resolve(req)
+ c_attrs = {
+ command: req.command,
+ cwd: req.cwd,
+ environment: req.environment,
+ output_path: req.output_path,
+ container_image: resolve_container_image(req.container_image),
+ mounts: resolve_mounts(req.mounts),
+ runtime_constraints: resolve_runtime_constraints(req.runtime_constraints),
+ scheduling_parameters: req.scheduling_parameters,
+ }
+ act_as_system_user do
+ if req.use_existing && (reusable = find_reusable(c_attrs))
+ reusable
+ else
+ Container.create!(c_attrs)
+ end
+ end
+ end
+
+ # Return a runtime_constraints hash that complies with requested but
+ # is suitable for saving in a container record, i.e., has specific
+ # values instead of ranges.
+ #
+ # Doing this as a step separate from other resolutions, like "git
+ # revision range to commit hash", makes sense only when there is no
+ # opportunity to reuse an existing container (e.g., container reuse
+ # is not implemented yet, or we have already found that no existing
+ # containers are suitable).
+ def self.resolve_runtime_constraints(runtime_constraints)
+ rc = {}
+ defaults = {
+ 'keep_cache_ram' =>
+ Rails.configuration.container_default_keep_cache_ram,
+ }
+ defaults.merge(runtime_constraints).each do |k, v|
+ if v.is_a? Array
+ rc[k] = v[0]
+ else
+ rc[k] = v
+ end
+ end
+ rc
+ end
+
+ # Return a mounts hash suitable for a Container, i.e., with every
+ # readonly collection UUID resolved to a PDH.
+ def self.resolve_mounts(mounts)
+ c_mounts = {}
+ mounts.each do |k, mount|
+ mount = mount.dup
+ c_mounts[k] = mount
+ if mount['kind'] != 'collection'
+ next
+ end
+ if (uuid = mount.delete 'uuid')
+ c = Collection.
+ readable_by(current_user).
+ where(uuid: uuid).
+ select(:portable_data_hash).
+ first
+ if !c
+ raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
+ end
+ if mount['portable_data_hash'].nil?
+ # PDH not supplied by client
+ mount['portable_data_hash'] = c.portable_data_hash
+ elsif mount['portable_data_hash'] != c.portable_data_hash
+ # UUID and PDH supplied by client, but they don't agree
+ raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
+ end
+ end
+ end
+ return c_mounts
+ end
+
+ # Return a container_image PDH suitable for a Container.
+ def self.resolve_container_image(container_image)
+ coll = Collection.for_latest_docker_image(container_image)
+ if !coll
+ raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
+ end
+ coll.portable_data_hash
+ end
+
def self.find_reusable(attrs)
candidates = Container.
where_serialized(:command, attrs[:command]).
where('cwd = ?', attrs[:cwd]).
where_serialized(:environment, attrs[:environment]).
where('output_path = ?', attrs[:output_path]).
- where('container_image = ?', attrs[:container_image]).
- where_serialized(:mounts, attrs[:mounts]).
- where_serialized(:runtime_constraints, attrs[:runtime_constraints])
+ where('container_image = ?', resolve_container_image(attrs[:container_image])).
+ where_serialized(:mounts, resolve_mounts(attrs[:mounts])).
+ where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]))
# Check for Completed candidates whose output and log are both readable.
select_readable_pdh = Collection.
before_validation :validate_scheduling_parameters
before_validation :set_container
validates :command, :container_image, :output_path, :cwd, :presence => true
+ validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
validate :validate_state_change
- validate :validate_change
+ validate :check_update_whitelist
after_save :update_priority
after_save :finalize_if_needed
before_create :set_requesting_container_uuid
t.add :output_name
t.add :output_path
t.add :output_uuid
+ t.add :output_ttl
t.add :priority
t.add :properties
t.add :requesting_container_uuid
Committed => [Final]
}
+ AttrsPermittedAlways = [:owner_uuid, :state, :name, :description]
+ AttrsPermittedBeforeCommit = [:command, :container_count_max,
+ :container_image, :cwd, :environment, :filters, :mounts,
+ :output_path, :priority, :properties, :requesting_container_uuid,
+ :runtime_constraints, :state, :container_uuid, :use_existing,
+ :scheduling_parameters, :output_name, :output_ttl]
+
def state_transitions
State_transitions
end
['output', 'log'].each do |out_type|
pdh = c.send(out_type)
next if pdh.nil?
- if self.output_name and out_type == 'output'
- coll_name = self.output_name
- else
- coll_name = "Container #{out_type} for request #{uuid}"
+ coll_name = "Container #{out_type} for request #{uuid}"
+ trash_at = nil
+ if out_type == 'output'
+ if self.output_name
+ coll_name = self.output_name
+ end
+ if self.output_ttl > 0
+ trash_at = db_current_time + self.output_ttl
+ end
end
manifest = Collection.unscoped do
Collection.where(portable_data_hash: pdh).first.manifest_text
end
- begin
- coll = Collection.create!(owner_uuid: owner_uuid,
- manifest_text: manifest,
- portable_data_hash: pdh,
- name: coll_name,
- properties: {
- 'type' => out_type,
- 'container_request' => uuid,
- })
- rescue ActiveRecord::RecordNotUnique => rn
- # In case this is executed as part of a transaction: When a Postgres exception happens,
- # the following statements on the same transaction become invalid, so a rollback is
- # needed. One example are Unit Tests, every test is enclosed inside a transaction so
- # that the database can be reverted before every new test starts.
- # See: http://api.rubyonrails.org/classes/ActiveRecord/Transactions/ClassMethods.html#module-ActiveRecord::Transactions::ClassMethods-label-Exception+handling+and+rolling+back
- ActiveRecord::Base.connection.execute 'ROLLBACK'
- raise unless out_type == 'output' and self.output_name
- # Postgres specific unique name check. See ApplicationController#create for
- # a detailed explanation.
- raise unless rn.original_exception.is_a? PG::UniqueViolation
- err = rn.original_exception
- detail = err.result.error_field(PG::Result::PG_DIAG_MESSAGE_DETAIL)
- raise unless /^Key \(owner_uuid, name\)=\([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}, .*?\) already exists\./.match detail
- # Output collection name collision detected: append a timestamp.
- coll_name = "#{self.output_name} #{Time.now.getgm.strftime('%FT%TZ')}"
- retry
- end
+
+ coll = Collection.new(owner_uuid: owner_uuid,
+ manifest_text: manifest,
+ portable_data_hash: pdh,
+ name: coll_name,
+ trash_at: trash_at,
+ delete_at: trash_at,
+ properties: {
+ 'type' => out_type,
+ 'container_request' => uuid,
+ })
+ coll.save_with_unique_name!
if out_type == 'output'
out_coll = coll.uuid
else
self.cwd ||= "."
self.container_count_max ||= Rails.configuration.container_count_max
self.scheduling_parameters ||= {}
- end
-
- # Create a new container (or find an existing one) to satisfy this
- # request.
- def resolve
- c_mounts = mounts_for_container
- c_runtime_constraints = runtime_constraints_for_container
- c_container_image = container_image_for_container
- c = act_as_system_user do
- c_attrs = {command: self.command,
- cwd: self.cwd,
- environment: self.environment,
- output_path: self.output_path,
- container_image: c_container_image,
- mounts: c_mounts,
- runtime_constraints: c_runtime_constraints}
-
- reusable = self.use_existing ? Container.find_reusable(c_attrs) : nil
- if not reusable.nil?
- reusable
- else
- c_attrs[:scheduling_parameters] = self.scheduling_parameters
- Container.create!(c_attrs)
- end
- end
- self.container_uuid = c.uuid
- end
-
- # Return a runtime_constraints hash that complies with
- # self.runtime_constraints but is suitable for saving in a container
- # record, i.e., has specific values instead of ranges.
- #
- # Doing this as a step separate from other resolutions, like "git
- # revision range to commit hash", makes sense only when there is no
- # opportunity to reuse an existing container (e.g., container reuse
- # is not implemented yet, or we have already found that no existing
- # containers are suitable).
- def runtime_constraints_for_container
- rc = {}
- runtime_constraints.each do |k, v|
- if v.is_a? Array
- rc[k] = v[0]
- else
- rc[k] = v
- end
- end
- rc
- end
-
- # Return a mounts hash suitable for a Container, i.e., with every
- # readonly collection UUID resolved to a PDH.
- def mounts_for_container
- c_mounts = {}
- mounts.each do |k, mount|
- mount = mount.dup
- c_mounts[k] = mount
- if mount['kind'] != 'collection'
- next
- end
- if (uuid = mount.delete 'uuid')
- c = Collection.
- readable_by(current_user).
- where(uuid: uuid).
- select(:portable_data_hash).
- first
- if !c
- raise ArvadosModel::UnresolvableContainerError.new "cannot mount collection #{uuid.inspect}: not found"
- end
- if mount['portable_data_hash'].nil?
- # PDH not supplied by client
- mount['portable_data_hash'] = c.portable_data_hash
- elsif mount['portable_data_hash'] != c.portable_data_hash
- # UUID and PDH supplied by client, but they don't agree
- raise ArgumentError.new "cannot mount collection #{uuid.inspect}: current portable_data_hash #{c.portable_data_hash.inspect} does not match #{c['portable_data_hash'].inspect} in request"
- end
- end
- end
- return c_mounts
- end
-
- # Return a container_image PDH suitable for a Container.
- def container_image_for_container
- coll = Collection.for_latest_docker_image(container_image)
- if !coll
- raise ArvadosModel::UnresolvableContainerError.new "docker image #{container_image.inspect} not found"
- end
- coll.portable_data_hash
+ self.output_ttl ||= 0
end
def set_container
return false
end
if state_changed? and state == Committed and container_uuid.nil?
- resolve
+ self.container_uuid = Container.resolve(self).uuid
end
if self.container_uuid != self.container_uuid_was
if self.container_count_changed?
def validate_runtime_constraints
case self.state
when Committed
- ['vcpus', 'ram'].each do |k|
- if not (runtime_constraints.include? k and
- runtime_constraints[k].is_a? Integer and
- runtime_constraints[k] > 0)
- errors.add :runtime_constraints, "#{k} must be a positive integer"
+ [['vcpus', true],
+ ['ram', true],
+ ['keep_cache_ram', false]].each do |k, required|
+ if !required && !runtime_constraints.include?(k)
+ next
+ end
+ v = runtime_constraints[k]
+ unless (v.is_a?(Integer) && v > 0)
+ errors.add(:runtime_constraints,
+ "[#{k}]=#{v.inspect} must be a positive integer")
end
- end
-
- if runtime_constraints.include? 'keep_cache_ram' and
- (!runtime_constraints['keep_cache_ram'].is_a?(Integer) or
- runtime_constraints['keep_cache_ram'] <= 0)
- errors.add :runtime_constraints, "keep_cache_ram must be a positive integer"
- elsif !runtime_constraints.include? 'keep_cache_ram'
- runtime_constraints['keep_cache_ram'] = Rails.configuration.container_default_keep_cache_ram
end
end
end
end
end
- def validate_change
- permitted = [:owner_uuid]
+ def check_update_whitelist
+ permitted = AttrsPermittedAlways.dup
- case self.state
- when Uncommitted
- # Permit updating most fields
- permitted.push :command, :container_count_max,
- :container_image, :cwd, :description, :environment,
- :filters, :mounts, :name, :output_path, :priority,
- :properties, :requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid, :use_existing, :scheduling_parameters,
- :output_name
+ if self.new_record? || self.state_was == Uncommitted
+ # Allow create-and-commit in a single operation.
+ permitted.push *AttrsPermittedBeforeCommit
+ end
+ case self.state
when Committed
- if container_uuid.nil?
- errors.add :container_uuid, "has not been resolved to a container."
- end
+ permitted.push :priority, :container_count_max, :container_uuid
- if priority.nil?
- errors.add :priority, "cannot be nil"
+ if self.container_uuid.nil?
+ self.errors.add :container_uuid, "has not been resolved to a container."
end
- # Can update priority, container count, name and description
- permitted.push :priority, :container_count, :container_count_max, :container_uuid,
- :name, :description
+ if self.priority.nil?
+ self.errors.add :priority, "cannot be nil"
+ end
- if self.state_changed?
- # Allow create-and-commit in a single operation.
- permitted.push :command, :container_image, :cwd, :description, :environment,
- :filters, :mounts, :name, :output_path, :properties,
- :requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid, :use_existing, :scheduling_parameters,
- :output_name
+ # Allow container count to increment by 1
+ if (self.container_uuid &&
+ self.container_uuid != self.container_uuid_was &&
+ self.container_count == 1 + (self.container_count_was || 0))
+ permitted.push :container_count
end
when Final
- if not current_user.andand.is_admin and not (self.name_changed? || self.description_changed?)
- errors.add :state, "of container request can only be set to Final by system."
+ if self.state_changed? and not current_user.andand.is_admin
+ self.errors.add :state, "of container request can only be set to Final by system."
end
- if self.state_changed? || self.name_changed? || self.description_changed? || self.output_uuid_changed? || self.log_uuid_changed?
- permitted.push :state, :name, :description, :output_uuid, :log_uuid
- else
- errors.add :state, "does not allow updates"
+ if self.state_was == Committed
+ permitted.push :output_uuid, :log_uuid
end
- else
- errors.add :state, "invalid value"
end
- check_update_whitelist permitted
+ super(permitted)
end
def update_priority
+require 'tempfile'
+
class Node < ArvadosModel
include HasUuid
include KindAndEtag
}
if Rails.configuration.dns_server_conf_dir and Rails.configuration.dns_server_conf_template
+ tmpfile = nil
begin
begin
template = IO.read(Rails.configuration.dns_server_conf_template)
- rescue => e
+ rescue IOError, SystemCallError => e
logger.error "Reading #{Rails.configuration.dns_server_conf_template}: #{e.message}"
raise
end
hostfile = File.join Rails.configuration.dns_server_conf_dir, "#{hostname}.conf"
- File.open hostfile+'.tmp', 'w' do |f|
+ Tempfile.open(["#{hostname}-", ".conf.tmp"],
+ Rails.configuration.dns_server_conf_dir) do |f|
+ tmpfile = f.path
f.puts template % template_vars
end
- File.rename hostfile+'.tmp', hostfile
- rescue => e
+ File.rename tmpfile, hostfile
+ rescue IOError, SystemCallError => e
logger.error "Writing #{hostfile}: #{e.message}"
ok = false
+ ensure
+ if tmpfile and File.file? tmpfile
+ # Cleanup remaining temporary file.
+ File.unlink tmpfile
+ end
end
end
# Typically, this is used to trigger a dns server restart
f.puts Rails.configuration.dns_server_reload_command
end
- rescue => e
+ rescue IOError, SystemCallError => e
logger.error "Unable to write #{restartfile}: #{e.message}"
ok = false
end
--- /dev/null
+class AddOutputTtlToContainerRequests < ActiveRecord::Migration
+ def change
+ add_column :container_requests, :output_ttl, :integer, default: 0, null: false
+ end
+end
scheduling_parameters text,
output_uuid character varying(255),
log_uuid character varying(255),
- output_name character varying(255) DEFAULT NULL::character varying
+ output_name character varying(255) DEFAULT NULL::character varying,
+ output_ttl integer DEFAULT 0 NOT NULL
);
INSERT INTO schema_migrations (version) VALUES ('20170301225558');
-INSERT INTO schema_migrations (version) VALUES ('20170328215436');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20170328215436');
+
+INSERT INTO schema_migrations (version) VALUES ('20170330012505');
\ No newline at end of file
authorize_with :inactive
get :index
assert_response :success
- node_items = JSON.parse(@response.body)['items']
- assert_equal 0, node_items.size
+ assert_equal 0, json_response['items'].size
+ assert_equal 0, json_response['items_available']
end
# active user sees non-secret attributes of up and recently-up nodes
authorize_with :active
get :index
assert_response :success
- node_items = JSON.parse(@response.body)['items']
- assert_not_equal 0, node_items.size
+ assert_operator 0, :<, json_response['items_available']
+ node_items = json_response['items']
+ assert_operator 0, :<, node_items.size
found_busy_node = false
node_items.each do |node|
assert_nil node['info'].andand['ping_secret']
authorize_with user
get :index, {select: ['domain']}
assert_response :success
+ assert_operator 0, :<, json_response['items_available']
end
end
include UsersTestHelper
setup do
- @all_links_at_start = Link.all
+ @initial_link_count = Link.count
@vm_uuid = virtual_machines(:testvm).uuid
end
assert_nil created['identity_url'], 'expected no identity_url'
# arvados#user, repo link and link add user to 'All users' group
- verify_num_links @all_links_at_start, 4
+ verify_links_added 4
verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
created['uuid'], created['email'], 'arvados#user', false, 'User'
assert_equal response_object['email'], 'foo@example.com', 'expected given email'
# four extra links; system_group, login, group and repo perms
- verify_num_links @all_links_at_start, 4
+ verify_links_added 4
end
test "setup user with fake vm and expect error" do
assert_equal response_object['email'], 'foo@example.com', 'expected given email'
# five extra links; system_group, login, group, vm, repo
- verify_num_links @all_links_at_start, 5
+ verify_links_added 5
end
test "setup user with valid email, no vm and no repo as input" do
assert_equal response_object['email'], 'foo@example.com', 'expected given email'
# three extra links; system_group, login, and group
- verify_num_links @all_links_at_start, 3
+ verify_links_added 3
verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
response_object['uuid'], response_object['email'], 'arvados#user', false, 'User'
'expecting first name'
# five extra links; system_group, login, group, repo and vm
- verify_num_links @all_links_at_start, 5
+ verify_links_added 5
end
test "setup user with an existing user email and check different object is created" do
'expected different uuid after create operation'
assert_equal inactive_user['email'], response_object['email'], 'expected given email'
# system_group, openid, group, and repo. No vm link.
- verify_num_links @all_links_at_start, 4
+ verify_links_added 4
end
test "setup user with openid prefix" do
# verify links
# four new links: system_group, arvados#user, repo, and 'All users' group.
- verify_num_links @all_links_at_start, 4
+ verify_links_added 4
verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
created['uuid'], created['email'], 'arvados#user', false, 'User'
# five new links: system_group, arvados#user, repo, vm and 'All
# users' group link
- verify_num_links @all_links_at_start, 5
+ verify_links_added 5
verify_link response_items, 'arvados#user', true, 'permission', 'can_login',
created['uuid'], created['email'], 'arvados#user', false, 'User'
"admin's filtered index did not return inactive user")
end
- def verify_num_links (original_links, expected_additional_links)
- assert_equal expected_additional_links, Link.all.size-original_links.size,
- "Expected #{expected_additional_links.inspect} more links"
+ def verify_links_added more
+ assert_equal @initial_link_count+more, Link.count,
+ "Started with #{@initial_link_count} links, expected #{more} more"
end
def find_obj_in_resp (response_items, object_type, head_kind=nil)
end
end
+ now = Time.now
[['trash-to-delete interval negative',
:collection_owned_by_active,
- {trash_at: Time.now+2.weeks, delete_at: Time.now},
+ {trash_at: now+2.weeks, delete_at: now},
{state: :invalid}],
- ['trash-to-delete interval too short',
+ ['now-to-delete interval short',
:collection_owned_by_active,
- {trash_at: Time.now+3.days, delete_at: Time.now+7.days},
- {state: :invalid}],
+ {trash_at: now+3.days, delete_at: now+7.days},
+ {state: :trash_future}],
+ ['now-to-delete interval short, trash=delete',
+ :collection_owned_by_active,
+ {trash_at: now+3.days, delete_at: now+3.days},
+ {state: :trash_future}],
['trash-to-delete interval ok',
:collection_owned_by_active,
- {trash_at: Time.now, delete_at: Time.now+15.days},
+ {trash_at: now, delete_at: now+15.days},
{state: :trash_now}],
['trash-to-delete interval short, but far enough in future',
:collection_owned_by_active,
- {trash_at: Time.now+13.days, delete_at: Time.now+15.days},
+ {trash_at: now+13.days, delete_at: now+15.days},
{state: :trash_future}],
['trash by setting is_trashed bool',
:collection_owned_by_active,
{state: :trash_now}],
['trash in future by setting just trash_at',
:collection_owned_by_active,
- {trash_at: Time.now+1.week},
+ {trash_at: now+1.week},
{state: :trash_future}],
['trash in future by setting trash_at and delete_at',
:collection_owned_by_active,
- {trash_at: Time.now+1.week, delete_at: Time.now+4.weeks},
+ {trash_at: now+1.week, delete_at: now+4.weeks},
{state: :trash_future}],
['untrash by clearing is_trashed bool',
:expired_collection,
end
updates_ok = c.update_attributes(updates)
expect_valid = expect[:state] != :invalid
- assert_equal updates_ok, expect_valid, c.errors.full_messages.to_s
+ assert_equal expect_valid, updates_ok, c.errors.full_messages.to_s
case expect[:state]
when :invalid
refute c.valid?
class ContainerRequestTest < ActiveSupport::TestCase
include DockerMigrationHelper
+ include DbCurrentTime
def create_minimal_req! attrs={}
defaults = {
cr.reload
+ assert_equal({"vcpus" => 2, "ram" => 30}, cr.runtime_constraints)
+
assert_not_nil cr.container_uuid
c = Container.find_by_uuid cr.container_uuid
assert_not_nil c
lambda { |resolved| resolved["ram"] == 1234234234 }],
].each do |rc, okfunc|
test "resolve runtime constraint range #{rc} to values" do
- cr = ContainerRequest.new(runtime_constraints: rc)
- resolved = cr.send :runtime_constraints_for_container
+ resolved = Container.resolve_runtime_constraints(rc)
assert(okfunc.call(resolved),
"container runtime_constraints was #{resolved.inspect}")
end
].each do |mounts, okfunc|
test "resolve mounts #{mounts.inspect} to values" do
set_user_from_auth :active
- cr = ContainerRequest.new(mounts: mounts)
- resolved = cr.send :mounts_for_container
+ resolved = Container.resolve_mounts(mounts)
assert(okfunc.call(resolved),
- "mounts_for_container returned #{resolved.inspect}")
+ "Container.resolve_mounts returned #{resolved.inspect}")
end
end
"path" => "/foo",
},
}
- cr = ContainerRequest.new(mounts: m)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send :mounts_for_container
+ Container.resolve_mounts(m)
end
end
"path" => "/foo",
},
}
- cr = ContainerRequest.new(mounts: m)
assert_raises(ArgumentError) do
- cr.send :mounts_for_container
+ Container.resolve_mounts(m)
end
end
'arvados/apitestfixture',
'd8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678',
].each do |tag|
- test "container_image_for_container(#{tag.inspect})" do
+ test "Container.resolve_container_image(#{tag.inspect})" do
set_user_from_auth :active
- cr = ContainerRequest.new(container_image: tag)
- resolved = cr.send :container_image_for_container
+ resolved = Container.resolve_container_image(tag)
assert_equal resolved, collections(:docker_image).portable_data_hash
end
end
- test "container_image_for_container(pdh)" do
+ test "Container.resolve_container_image(pdh)" do
set_user_from_auth :active
[[:docker_image, 'v1'], [:docker_image_1_12, 'v2']].each do |coll, ver|
Rails.configuration.docker_image_formats = [ver]
pdh = collections(coll).portable_data_hash
- cr = ContainerRequest.new(container_image: pdh)
- resolved = cr.send :container_image_for_container
+ resolved = Container.resolve_container_image(pdh)
assert_equal resolved, pdh
end
end
].each do |img|
test "container_image_for_container(#{img.inspect}) => 422" do
set_user_from_auth :active
- cr = ContainerRequest.new(container_image: img)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send :container_image_for_container
+ Container.resolve_container_image(img)
end
end
end
set_user_from_auth :active
cr = create_minimal_req!(command: ["true", "1"],
container_image: collections(:docker_image).portable_data_hash)
- assert_equal(cr.send(:container_image_for_container),
+ assert_equal(Container.resolve_container_image(cr.container_image),
collections(:docker_image_1_12).portable_data_hash)
cr = create_minimal_req!(command: ["true", "2"],
container_image: links(:docker_image_collection_tag).name)
- assert_equal(cr.send(:container_image_for_container),
+ assert_equal(Container.resolve_container_image(cr.container_image),
collections(:docker_image_1_12).portable_data_hash)
end
set_user_from_auth :active
cr = create_minimal_req!(command: ["true", "1"],
container_image: collections(:docker_image).portable_data_hash)
- assert_equal(cr.send(:container_image_for_container),
+ assert_equal(Container.resolve_container_image(cr.container_image),
collections(:docker_image).portable_data_hash)
cr = create_minimal_req!(command: ["true", "2"],
container_image: links(:docker_image_collection_tag).name)
- assert_equal(cr.send(:container_image_for_container),
+ assert_equal(Container.resolve_container_image(cr.container_image),
collections(:docker_image).portable_data_hash)
end
cr = create_minimal_req!(command: ["true", "1"],
container_image: collections(:docker_image_1_12).portable_data_hash)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send(:container_image_for_container)
+ Container.resolve_container_image(cr.container_image)
end
end
cr = create_minimal_req!(command: ["true", "1"],
container_image: collections(:docker_image).portable_data_hash)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send(:container_image_for_container)
+ Container.resolve_container_image(cr.container_image)
end
cr = create_minimal_req!(command: ["true", "2"],
container_image: links(:docker_image_collection_tag).name)
assert_raises(ArvadosModel::UnresolvableContainerError) do
- cr.send(:container_image_for_container)
+ Container.resolve_container_image(cr.container_image)
end
end
command: ["echo", "hello"],
output_path: "test",
runtime_constraints: {"vcpus" => 4,
- "ram" => 12000000000,
- "keep_cache_ram" => 268435456},
+ "ram" => 12000000000},
mounts: {"test" => {"kind" => "json"}}}
set_user_from_auth :active
cr1 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Committed,
test "Output collection name setting using output_name with name collision resolution" do
set_user_from_auth :active
- output_name = collections(:foo_file).name
+ output_name = 'unimaginative name'
+ Collection.create!(name: output_name)
cr = create_minimal_req!(priority: 1,
state: ContainerRequest::Committed,
output_name: output_name)
- act_as_system_user do
- c = Container.find_by_uuid(cr.container_uuid)
- c.update_attributes!(state: Container::Locked)
- c.update_attributes!(state: Container::Running)
- c.update_attributes!(state: Container::Complete,
- exit_code: 0,
- output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
- log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
- end
- cr.save
+ run_container(cr)
+ cr.reload
assert_equal ContainerRequest::Final, cr.state
output_coll = Collection.find_by_uuid(cr.output_uuid)
# Make sure the resulting output collection name include the original name
# plus the date
assert_not_equal output_name, output_coll.name,
- "It shouldn't exist more than one collection with the same owner and name '${output_name}'"
+ "more than one collection with the same owner and name"
assert output_coll.name.include?(output_name),
"New name should include original name"
- assert_match /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z/, output_coll.name,
+ assert_match /\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/, output_coll.name,
"New name should include ISO8601 date"
end
- test "Finalize committed request when reusing a finished container" do
- set_user_from_auth :active
- cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
- cr.reload
- assert_equal ContainerRequest::Committed, cr.state
+ [[0, :check_output_ttl_0],
+ [1, :check_output_ttl_1s],
+ [365*86400, :check_output_ttl_1y],
+ ].each do |ttl, checker|
+ test "output_ttl=#{ttl}" do
+ act_as_user users(:active) do
+ cr = create_minimal_req!(priority: 1,
+ state: ContainerRequest::Committed,
+ output_name: 'foo',
+ output_ttl: ttl)
+ run_container(cr)
+ cr.reload
+ output = Collection.find_by_uuid(cr.output_uuid)
+ send(checker, db_current_time, output.trash_at, output.delete_at)
+ end
+ end
+ end
+
+ def check_output_ttl_0(now, trash, delete)
+ assert_nil(trash)
+ assert_nil(delete)
+ end
+
+ def check_output_ttl_1s(now, trash, delete)
+ assert_not_nil(trash)
+ assert_not_nil(delete)
+ assert_in_delta(trash, now + 1.second, 10)
+ assert_in_delta(delete, now + Rails.configuration.blob_signature_ttl.second, 10)
+ end
+
+ def check_output_ttl_1y(now, trash, delete)
+ year = (86400*365).second
+ assert_not_nil(trash)
+ assert_not_nil(delete)
+ assert_in_delta(trash, now + year, 10)
+ assert_in_delta(delete, now + year, 10)
+ end
+
+ def run_container(cr)
act_as_system_user do
c = Container.find_by_uuid(cr.container_uuid)
c.update_attributes!(state: Container::Locked)
exit_code: 0,
output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+ c
end
+ end
+
+ test "Finalize committed request when reusing a finished container" do
+ set_user_from_auth :active
+ cr = create_minimal_req!(priority: 1, state: ContainerRequest::Committed)
+ cr.reload
+ assert_equal ContainerRequest::Committed, cr.state
+ run_container(cr)
cr.reload
assert_equal ContainerRequest::Final, cr.state
assert_equal ContainerRequest::Final, cr3.state
end
- [
- [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => 100}, ContainerRequest::Committed, 100],
- [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Uncommitted],
- [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Committed],
- [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => -1}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
- [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => '123'}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
- ].each do |rc, state, expected|
- test "create container request with #{rc} in state #{state} and verify keep_cache_ram #{expected}" do
- common_attrs = {cwd: "test",
- priority: 1,
- command: ["echo", "hello"],
- output_path: "test",
- runtime_constraints: rc,
- mounts: {"test" => {"kind" => "json"}}}
- set_user_from_auth :active
-
- if expected == ActiveRecord::RecordInvalid
- assert_raises(ActiveRecord::RecordInvalid) do
- create_minimal_req!(common_attrs.merge({state: state}))
- end
- else
- cr = create_minimal_req!(common_attrs.merge({state: state}))
- expected = Rails.configuration.container_default_keep_cache_ram if state == ContainerRequest::Committed and expected.nil?
- assert_equal expected, cr.runtime_constraints['keep_cache_ram']
- end
- end
- end
-
[
[{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
[{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
end
end
end
+
+ [['Committed', true, {name: "foobar", priority: 123}],
+ ['Committed', false, {container_count: 2}],
+ ['Committed', false, {container_count: 0}],
+ ['Committed', false, {container_count: nil}],
+ ['Final', false, {state: ContainerRequest::Committed, name: "foobar"}],
+ ['Final', false, {name: "foobar", priority: 123}],
+ ['Final', false, {name: "foobar", output_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+ ['Final', false, {name: "foobar", log_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+ ['Final', false, {log_uuid: "zzzzz-4zz18-znfnqtbbv4spc3w"}],
+ ['Final', false, {priority: 123}],
+ ['Final', false, {mounts: {}}],
+ ['Final', false, {container_count: 2}],
+ ['Final', true, {name: "foobar"}],
+ ['Final', true, {name: "foobar", description: "baz"}],
+ ].each do |state, permitted, updates|
+ test "state=#{state} can#{'not' if !permitted} update #{updates.inspect}" do
+ act_as_user users(:active) do
+ cr = create_minimal_req!(priority: 1,
+ state: "Committed",
+ container_count_max: 1)
+ case state
+ when 'Committed'
+ # already done
+ when 'Final'
+ act_as_system_user do
+ Container.find_by_uuid(cr.container_uuid).
+ update_attributes!(state: Container::Cancelled)
+ end
+ cr.reload
+ else
+ raise 'broken test case'
+ end
+ assert_equal state, cr.state
+ if permitted
+ assert cr.update_attributes!(updates)
+ else
+ assert_raises(ActiveRecord::RecordInvalid) do
+ cr.update_attributes!(updates)
+ end
+ end
+ end
+ end
+ end
end
runtime_constraints: {"vcpus" => 1, "ram" => 1},
}
- REUSABLE_COMMON_ATTRS = {container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
- cwd: "test",
- command: ["echo", "hello"],
- output_path: "test",
- runtime_constraints: {"vcpus" => 4,
- "ram" => 12000000000},
- mounts: {"test" => {"kind" => "json"}},
- environment: {"var" => 'val'}}
+ REUSABLE_COMMON_ATTRS = {
+ container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
+ cwd: "test",
+ command: ["echo", "hello"],
+ output_path: "test",
+ runtime_constraints: {
+ "ram" => 12000000000,
+ "vcpus" => 4,
+ },
+ mounts: {
+ "test" => {"kind" => "json"},
+ },
+ environment: {
+ "var" => "val",
+ },
+ }
def minimal_new attrs={}
cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
test "Container serialized hash attributes sorted before save" do
env = {"C" => 3, "B" => 2, "A" => 1}
m = {"F" => {"kind" => 3}, "E" => {"kind" => 2}, "D" => {"kind" => 1}}
- rc = {"vcpus" => 1, "ram" => 1}
+ rc = {"vcpus" => 1, "ram" => 1, "keep_cache_ram" => 1}
c, _ = minimal_new(environment: env, mounts: m, runtime_constraints: rc)
assert_equal c.environment.to_json, Container.deep_sort_hash(env).to_json
assert_equal c.mounts.to_json, Container.deep_sort_hash(m).to_json
log: 'ea10d51bcf88862dbcc36eb292017dfd+45',
}
- set_user_from_auth :dispatch1
-
- c_output1 = Container.create common_attrs
- c_output2 = Container.create common_attrs
- assert_not_equal c_output1.uuid, c_output2.uuid
-
cr = ContainerRequest.new common_attrs
+ cr.use_existing = false
cr.state = ContainerRequest::Committed
- cr.container_uuid = c_output1.uuid
cr.save!
+ c_output1 = Container.where(uuid: cr.container_uuid).first
cr = ContainerRequest.new common_attrs
+ cr.use_existing = false
cr.state = ContainerRequest::Committed
- cr.container_uuid = c_output2.uuid
cr.save!
+ c_output2 = Container.where(uuid: cr.container_uuid).first
+
+ assert_not_equal c_output1.uuid, c_output2.uuid
+
+ set_user_from_auth :dispatch1
out1 = '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'
log1 = collections(:real_log_collection).portable_data_hash
c_output2.update_attributes!({state: Container::Running})
c_output2.update_attributes!(completed_attrs.merge({log: log1, output: out2}))
- reused = Container.find_reusable(common_attrs)
- assert_not_nil reused
- assert_equal reused.uuid, c_output1.uuid
+ reused = Container.resolve(ContainerRequest.new(common_attrs))
+ assert_equal c_output1.uuid, reused.uuid
end
test "find_reusable method should select running container by start date" do
require 'test_helper'
+require 'tmpdir'
+require 'tempfile'
class NodeTest < ActiveSupport::TestCase
def ping_node(node_name, ping_data)
assert Node.dns_server_update 'compute65535', '127.0.0.127'
end
+ test "don't leave temp files behind if there's an error writing them" do
+ Rails.configuration.dns_server_conf_template = Rails.root.join 'config', 'unbound.template'
+ Tempfile.any_instance.stubs(:puts).raises(IOError)
+ Dir.mktmpdir do |tmpdir|
+ Rails.configuration.dns_server_conf_dir = tmpdir
+ refute Node.dns_server_update 'compute65535', '127.0.0.127'
+ assert_empty Dir.entries(tmpdir).select{|f| File.file? f}
+ end
+ end
+
test "ping new node with no hostname and default config" do
node = ping_node(:new_with_no_hostname, {})
slot_number = node.slot_number
package main
import (
+ "bytes"
+ "context"
"encoding/json"
"errors"
"flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
- "github.com/curoverse/dockerclient"
+
+ dockertypes "github.com/docker/docker/api/types"
+ dockercontainer "github.com/docker/docker/api/types/container"
+ dockernetwork "github.com/docker/docker/api/types/network"
+ dockerclient "github.com/docker/docker/client"
)
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
// ThinDockerClient is the minimal Docker client interface used by crunch-run.
type ThinDockerClient interface {
- StopContainer(id string, timeout int) error
- InspectImage(id string) (*dockerclient.ImageInfo, error)
- LoadImage(reader io.Reader) error
- CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
- StartContainer(id string, config *dockerclient.HostConfig) error
- AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
- Wait(id string) <-chan dockerclient.WaitResult
- RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
+ ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error)
+ ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+ networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
+ ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
+ ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+ ContainerWait(ctx context.Context, container string) (int64, error)
+ ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
+ ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
+ ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
+}
+
+// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
+// that executes the docker requests on dockerclient.Client
+type ThinDockerClientProxy struct {
+ Docker *dockerclient.Client
+}
+
+// ContainerAttach invokes dockerclient.Client.ContainerAttach
+func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+ return proxy.Docker.ContainerAttach(ctx, container, options)
+}
+
+// ContainerCreate invokes dockerclient.Client.ContainerCreate
+func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
+ networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
+ return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
+}
+
+// ContainerStart invokes dockerclient.Client.ContainerStart
+func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+ return proxy.Docker.ContainerStart(ctx, container, options)
+}
+
+// ContainerStop invokes dockerclient.Client.ContainerStop
+func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+ return proxy.Docker.ContainerStop(ctx, container, timeout)
+}
+
+// ContainerWait invokes dockerclient.Client.ContainerWait
+func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string) (int64, error) {
+ return proxy.Docker.ContainerWait(ctx, container)
+}
+
+// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
+func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ return proxy.Docker.ImageInspectWithRaw(ctx, image)
+}
+
+// ImageLoad invokes dockerclient.Client.ImageLoad
+func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ return proxy.Docker.ImageLoad(ctx, input, quiet)
+}
+
+// ImageRemove invokes dockerclient.Client.ImageRemove
+func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
+ return proxy.Docker.ImageRemove(ctx, image, options)
}
// ContainerRunner is the main stateful struct used for a single execution of a
ArvClient IArvadosClient
Kc IKeepClient
arvados.Container
- dockerclient.ContainerConfig
- dockerclient.HostConfig
+ ContainerConfig dockercontainer.Config
+ dockercontainer.HostConfig
token string
ContainerID string
ExitCode *int
loggingDone chan bool
CrunchLog *ThrottledLogger
Stdout io.WriteCloser
- Stderr *ThrottledLogger
+ Stderr io.WriteCloser
LogCollection *CollectionWriter
LogsPDH *string
RunArvMount
}
runner.cCancelled = true
if runner.cStarted {
- err := runner.Docker.StopContainer(runner.ContainerID, 10)
+ timeout := time.Duration(10)
+ err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
if err != nil {
log.Printf("StopContainer failed: %s", err)
}
runner.CrunchLog.Printf("Using Docker image id '%s'", imageID)
- _, err = runner.Docker.InspectImage(imageID)
+ _, _, err = runner.Docker.ImageInspectWithRaw(context.TODO(), imageID)
if err != nil {
runner.CrunchLog.Print("Loading Docker image from keep")
return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
- err = runner.Docker.LoadImage(readCloser)
+ response, err := runner.Docker.ImageLoad(context.TODO(), readCloser, false)
if err != nil {
return fmt.Errorf("While loading container image into Docker: %v", err)
}
+ response.Body.Close()
} else {
runner.CrunchLog.Print("Docker image is available")
}
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
- if bind == "stdout" {
+ if bind == "stdout" || bind == "stderr" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
- return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+ return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
}
// Does path start with OutputPath?
prefix += "/"
}
if !strings.HasPrefix(mnt.Path, prefix) {
- return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+ return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+ }
+ }
+
+ if bind == "stdin" {
+ // Is it a "collection" mount kind?
+ if mnt.Kind != "collection" && mnt.Kind != "json" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
}
}
}
switch {
- case mnt.Kind == "collection":
+ case mnt.Kind == "collection" && bind != "stdin":
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
return nil
}
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachStreams() (err error) {
runner.CrunchLog.Print("Attaching container streams")
- var containerReader io.Reader
- containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
- &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
+ // If stdin mount is provided, attach it to the docker container
+ var stdinRdr keepclient.Reader
+ var stdinJson []byte
+ if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+ if stdinMnt.Kind == "collection" {
+ var stdinColl arvados.Collection
+ collId := stdinMnt.UUID
+ if collId == "" {
+ collId = stdinMnt.PortableDataHash
+ }
+ err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+ if err != nil {
+ return fmt.Errorf("While getting stding collection: %v", err)
+ }
+
+ stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+ if os.IsNotExist(err) {
+ return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+ } else if err != nil {
+ return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+ }
+ } else if stdinMnt.Kind == "json" {
+ stdinJson, err = json.Marshal(stdinMnt.Content)
+ if err != nil {
+ return fmt.Errorf("While encoding stdin json data: %v", err)
+ }
+ }
+ }
+
+ stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+ response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
+ dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
runner.loggingDone = make(chan bool)
if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
- stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
- index := strings.LastIndex(stdoutPath, "/")
- if index > 0 {
- subdirs := stdoutPath[:index]
- if subdirs != "" {
- st, err := os.Stat(runner.HostOutputDir)
- if err != nil {
- return fmt.Errorf("While Stat on temp dir: %v", err)
- }
- stdoutPath := path.Join(runner.HostOutputDir, subdirs)
- err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
- if err != nil {
- return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
- }
- }
- }
- stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
if err != nil {
- return fmt.Errorf("While creating stdout file: %v", err)
+ return err
}
runner.Stdout = stdoutFile
} else {
runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
}
- runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
- go runner.ProcessDockerAttach(containerReader)
+ if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+ stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+ if err != nil {
+ return err
+ }
+ runner.Stderr = stderrFile
+ } else {
+ runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ }
+
+ if stdinRdr != nil {
+ go func() {
+ _, err := io.Copy(response.Conn, stdinRdr)
+ if err != nil {
+ runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+ runner.stop()
+ }
+ stdinRdr.Close()
+ response.CloseWrite()
+ }()
+ } else if len(stdinJson) != 0 {
+ go func() {
+ _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+ if err != nil {
+ runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+ runner.stop()
+ }
+ response.CloseWrite()
+ }()
+ }
+
+ go runner.ProcessDockerAttach(response.Reader)
return nil
}
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+ stdoutPath := mntPath[len(runner.Container.OutputPath):]
+ index := strings.LastIndex(stdoutPath, "/")
+ if index > 0 {
+ subdirs := stdoutPath[:index]
+ if subdirs != "" {
+ st, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+ }
+ stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+ if err != nil {
+ return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+ }
+ }
+ }
+ stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ if err != nil {
+ return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+ }
+
+ return stdoutFile, nil
+}
+
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
- runner.HostConfig = dockerclient.HostConfig{
- Binds: runner.Binds,
- CgroupParent: runner.setCgroupParent,
- LogConfig: dockerclient.LogConfig{
+ runner.HostConfig = dockercontainer.HostConfig{
+ Binds: runner.Binds,
+ Cgroup: dockercontainer.CgroupSpec(runner.setCgroupParent),
+ LogConfig: dockercontainer.LogConfig{
Type: "none",
},
}
"ARVADOS_API_HOST="+os.Getenv("ARVADOS_API_HOST"),
"ARVADOS_API_HOST_INSECURE="+os.Getenv("ARVADOS_API_HOST_INSECURE"),
)
- runner.HostConfig.NetworkMode = runner.networkMode
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
} else {
if runner.enableNetwork == "always" {
- runner.HostConfig.NetworkMode = runner.networkMode
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode(runner.networkMode)
} else {
- runner.HostConfig.NetworkMode = "none"
+ runner.HostConfig.NetworkMode = dockercontainer.NetworkMode("none")
}
}
- var err error
- runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
+ _, stdinUsed := runner.Container.Mounts["stdin"]
+ runner.ContainerConfig.OpenStdin = stdinUsed
+ runner.ContainerConfig.StdinOnce = stdinUsed
+ runner.ContainerConfig.AttachStdin = stdinUsed
+ runner.ContainerConfig.AttachStdout = true
+ runner.ContainerConfig.AttachStderr = true
+
+ createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
}
+ runner.ContainerID = createdBody.ID
+
return runner.AttachStreams()
}
if runner.cCancelled {
return ErrCancelled
}
- err := runner.Docker.StartContainer(runner.ContainerID, &runner.HostConfig)
+ err := runner.Docker.ContainerStart(context.TODO(), runner.ContainerID,
+ dockertypes.ContainerStartOptions{})
if err != nil {
return fmt.Errorf("could not start container: %v", err)
}
func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- waitDocker := runner.Docker.Wait(runner.ContainerID)
+ waitDocker, err := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID)
+ if err != nil {
+ return fmt.Errorf("container wait: %v", err)
+ }
+
+ runner.CrunchLog.Printf("Container exited with code: %v", waitDocker)
+ code := int(waitDocker)
+ runner.ExitCode = &code
+
waitMount := runner.ArvMountExit
- for waitDocker != nil {
- select {
- case err := <-waitMount:
- runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
- waitMount = nil
- runner.stop()
- case wr := <-waitDocker:
- if wr.Error != nil {
- return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
- }
- runner.ExitCode = &wr.ExitCode
- waitDocker = nil
- }
+ select {
+ case err := <-waitMount:
+ runner.CrunchLog.Printf("arv-mount exited before container finished: %v", err)
+ waitMount = nil
+ runner.stop()
+ default:
}
// wait for stdout/stderr to complete
}
kc.Retries = 4
- var docker *dockerclient.DockerClient
- docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
+ var docker *dockerclient.Client
+ // API version 1.21 corresponds to Docker 1.9, which is currently the
+ // minimum version we want to support.
+ docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
if err != nil {
log.Fatalf("%s: %v", containerId, err)
}
- cr := NewContainerRunner(api, kc, docker, containerId)
+ dockerClientProxy := ThinDockerClientProxy{Docker: docker}
+
+ cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
package main
import (
+ "bufio"
"bytes"
+ "context"
"crypto/md5"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
+ "net"
"os"
"os/exec"
"path/filepath"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
- "github.com/curoverse/dockerclient"
+
+ dockertypes "github.com/docker/docker/api/types"
+ dockercontainer "github.com/docker/docker/api/types/container"
+ dockernetwork "github.com/docker/docker/api/types/network"
. "gopkg.in/check.v1"
)
logReader io.ReadCloser
logWriter io.WriteCloser
fn func(t *TestDockerClient)
- finish chan dockerclient.WaitResult
+ finish int
stop chan bool
cwd string
env []string
api *ArvTestClient
}
-func NewTestDockerClient() *TestDockerClient {
+func NewTestDockerClient(exitCode int) *TestDockerClient {
t := &TestDockerClient{}
t.logReader, t.logWriter = io.Pipe()
- t.finish = make(chan dockerclient.WaitResult)
+ t.finish = exitCode
t.stop = make(chan bool)
t.cwd = "/"
return t
}
-func (t *TestDockerClient) StopContainer(id string, timeout int) error {
- t.stop <- true
- return nil
+type MockConn struct {
+ net.Conn
}
-func (t *TestDockerClient) InspectImage(id string) (*dockerclient.ImageInfo, error) {
- if t.imageLoaded == id {
- return &dockerclient.ImageInfo{}, nil
- } else {
- return nil, errors.New("")
- }
+func (m *MockConn) Write(b []byte) (int, error) {
+ return len(b), nil
}
-func (t *TestDockerClient) LoadImage(reader io.Reader) error {
- _, err := io.Copy(ioutil.Discard, reader)
- if err != nil {
- return err
- } else {
- t.imageLoaded = hwImageId
- return nil
- }
+func NewMockConn() *MockConn {
+ c := &MockConn{}
+ return c
+}
+
+func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
+ return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
}
-func (t *TestDockerClient) CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error) {
+func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
if config.WorkingDir != "" {
t.cwd = config.WorkingDir
}
t.env = config.Env
- return "abcde", nil
+ return dockercontainer.ContainerCreateCreatedBody{ID: "abcde"}, nil
}
-func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostConfig) error {
- if id == "abcde" {
+func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+ if container == "abcde" {
go t.fn(t)
return nil
} else {
}
}
-func (t *TestDockerClient) AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error) {
- return t.logReader, nil
+func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+ t.stop <- true
+ return nil
}
-func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
- return t.finish
+func (t *TestDockerClient) ContainerWait(ctx context.Context, container string) (int64, error) {
+ return int64(t.finish), nil
}
-func (*TestDockerClient) RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error) {
+func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ if t.imageLoaded == image {
+ return dockertypes.ImageInspect{}, nil, nil
+ } else {
+ return dockertypes.ImageInspect{}, nil, errors.New("")
+ }
+}
+
+func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ _, err := io.Copy(ioutil.Discard, input)
+ if err != nil {
+ return dockertypes.ImageLoadResponse{}, err
+ } else {
+ t.imageLoaded = hwImageId
+ return dockertypes.ImageLoadResponse{Body: ioutil.NopCloser(input)}, nil
+ }
+}
+
+func (*TestDockerClient) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
return nil, nil
}
rdr := ioutil.NopCloser(&bytes.Buffer{})
client.Called = true
return FileWrapper{rdr, 1321984}, nil
+ } else if filename == "/file1_in_main.txt" {
+ rdr := ioutil.NopCloser(strings.NewReader("foo"))
+ client.Called = true
+ return FileWrapper{rdr, 3}, nil
}
return nil, nil
}
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- _, err := cr.Docker.RemoveImage(hwImageId, true)
+ _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
- _, err = cr.Docker.InspectImage(hwImageId)
+ _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
c.Check(err, NotNil)
cr.Container.ContainerImage = hwPDH
c.Check(err, IsNil)
defer func() {
- cr.Docker.RemoveImage(hwImageId, true)
+ cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
}()
c.Check(kc.Called, Equals, true)
c.Check(cr.ContainerConfig.Image, Equals, hwImageId)
- _, err = cr.Docker.InspectImage(hwImageId)
+ _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
c.Check(err, IsNil)
// (2) Test using image that's already loaded
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Container.ContainerImage = hwPDH
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Container.ContainerImage = hwPDH
}
func (s *TestSuite) TestRunContainer(c *C) {
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
docker.fn = func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "Hello world\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
}
cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
// dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, extraMounts []string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
+func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(exitCode)
docker.fn = fn
- docker.RemoveImage(hwImageId, true)
+ docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api = &ArvTestClient{Container: rec}
docker.api = api
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "hello world\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
- }`, nil, func(t *TestDockerClient) {
+ }`, nil, 0, func(t *TestDockerClient) {
time.Sleep(time.Second)
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
- }`, nil, func(t *TestDockerClient) {
- time.Sleep(time.Second)
- t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
- })
+ }`, nil, 0,
+ func(t *TestDockerClient) {
+ time.Sleep(time.Second)
+ t.logWriter.Close()
+ })
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
- }`, nil, func(t *TestDockerClient) {
- time.Sleep(time.Second)
- t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{}
- })
+ }`, nil, 0,
+ func(t *TestDockerClient) {
+ time.Sleep(time.Second)
+ t.logWriter.Close()
+ })
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 1, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "hello\n"))
t.logWriter.Write(dockerLog(2, "world\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 1}
})
final := api.CalledWith("container.state", "Complete")
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
docker.fn = func(t *TestDockerClient) {
<-t.stop
t.logWriter.Write(dockerLog(1, "foo\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
}
- docker.RemoveImage(hwImageId, true)
+ docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api := &ArvTestClient{Container: rec}
cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
cr.CleanupDirs()
checkEmpty()
}
+
+ // Only mount point of kind 'collection' is allowed for stdin
+ {
+ i = 0
+ cr.ArvMountPoint = ""
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts = map[string]arvados.Mount{
+ "stdin": {Kind: "tmp"},
+ }
+
+ err := cr.SetupMounts()
+ c.Check(err, NotNil)
+ c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
+ cr.CleanupDirs()
+ checkEmpty()
+ }
}
func (s *TestSuite) TestStdout(c *C) {
"runtime_constraints": {}
}`
- api, _, _ := FullRunHelper(c, helperRecord, nil, func(t *TestDockerClient) {
+ api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
err = json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient()
+ docker := NewTestDockerClient(0)
docker.fn = fn
- docker.RemoveImage(hwImageId, true)
+ docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api = &ArvTestClient{Container: rec}
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {"API": true}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {"API": true}
-}`, nil, func(t *TestDockerClient) {
+}`, nil, 0, func(t *TestDockerClient) {
t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
"a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
}
- api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(runner.Binds, DeepEquals, []string{realtemp + "/2:/tmp",
"b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, func(t *TestDockerClient) {
+ api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
- t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
}
}
}
+
+func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "stdin": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/file1_in_main.txt"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ extraMounts := []string{
+ "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
+ }
+
+ api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ manifest := collection["manifest_text"].(string)
+ c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+ }
+ }
+ }
+}
+
+func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "stdin": {"kind": "json", "content": "foo"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ manifest := collection["manifest_text"].(string)
+ c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+ }
+ }
+ }
+}
+
+func (s *TestSuite) TestStderrMount(c *C) {
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo hello;exit 1"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"},
+ "stdout": {"kind": "file", "path": "/tmp/a/out.txt"},
+ "stderr": {"kind": "file", "path": "/tmp/b/err.txt"}},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 1, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello\n"))
+ t.logWriter.Write(dockerLog(2, "oops\n"))
+ t.logWriter.Close()
+ })
+
+ final := api.CalledWith("container.state", "Complete")
+ c.Assert(final, NotNil)
+ c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+ c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
+
+ c.Check(api.CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
+}
import time
import arvados.commands._util as arv_cmd
+from arvados_fuse import crunchstat
from arvados_fuse import *
from arvados_fuse.unmount import unmount
from arvados_fuse._version import __version__
unmount = self.add_mutually_exclusive_group()
unmount.add_argument('--unmount', action='store_true', default=False,
- help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit.")
+ help="Forcefully unmount the specified mountpoint (if it's a fuse mount) and exit. If --subtype is given, unmount only if the mount has the specified subtype. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
unmount.add_argument('--unmount-all', action='store_true', default=False,
- help="Forcefully unmount every fuse mount at or below the specified mountpoint and exit.")
+ help="Forcefully unmount every fuse mount at or below the specified path and exit. If --subtype is given, unmount only mounts that have the specified subtype. Exit non-zero if any other types of mounts are found at or below the given path. WARNING: This command can affect any kind of fuse mount, not just arv-mount.")
unmount.add_argument('--replace', action='store_true', default=False,
help="If a fuse mount is already present at mountpoint, forcefully unmount it before mounting")
self.add_argument('--unmount-timeout',
def run(self):
if self.args.unmount or self.args.unmount_all:
unmount(path=self.args.mountpoint,
+ subtype=self.args.subtype,
timeout=self.args.unmount_timeout,
recursive=self.args.unmount_all)
elif self.args.exec_args:
return mi
-def unmount(path, timeout=10, recursive=False):
+def unmount(path, subtype=None, timeout=10, recursive=False):
"""Unmount the fuse mount at path.
Unmounting is done by writing 1 to the "abort" control file in
path = os.path.realpath(path)
+ if subtype is None:
+ mnttype = None
+ elif subtype == '':
+ mnttype = 'fuse'
+ else:
+ mnttype = 'fuse.' + subtype
+
if recursive:
paths = []
for m in mountinfo():
if m.path == path or m.path.startswith(path+"/"):
paths.append(m.path)
- if not m.is_fuse:
+ if not (m.is_fuse and (mnttype is None or
+ mnttype == m.mnttype)):
raise Exception(
- "cannot unmount {}: non-fuse mountpoint {}".format(
- path, m))
+ "cannot unmount {}: mount type is {}".format(
+ path, m.mnttype))
for path in sorted(paths, key=len, reverse=True):
unmount(path, timeout=timeout, recursive=False)
return len(paths) > 0
while True:
mounted = False
for m in mountinfo():
- if m.is_fuse:
+ if m.is_fuse and (mnttype is None or mnttype == m.mnttype):
try:
if os.path.realpath(m.path) == path:
was_mounted = True
--- /dev/null
+import subprocess
+
+from integration_test import IntegrationTest
+
+
+class CrunchstatTest(IntegrationTest):
+ def test_crunchstat(self):
+ output = subprocess.check_output(
+ ['./bin/arv-mount',
+ '--crunchstat-interval', '1',
+ self.mnt,
+ '--exec', 'echo', 'ok'])
+ self.assertEqual("ok\n", output)
self.assertNotIn(' '+self.mnt+' ', m)
def _mounted(self, mounts):
- all_mounts = subprocess.check_output(['mount', '-t', 'fuse.test'])
+ all_mounts = subprocess.check_output(['mount'])
return [m for m in mounts
if ' '+m+' ' in all_mounts]
+ def _wait_for_mounts(self, mounts):
+ deadline = time.time() + 10
+ while self._mounted(mounts) != mounts:
+ time.sleep(0.1)
+ self.assertLess(time.time(), deadline)
+
+ def test_unmount_subtype(self):
+ mounts = []
+ for d in ['foo', 'bar']:
+ mnt = self.tmp+'/'+d
+ os.mkdir(mnt)
+ self.to_delete.insert(0, mnt)
+ mounts.append(mnt)
+ subprocess.check_call(
+ ['./bin/arv-mount', '--subtype', d, mnt])
+
+ self._wait_for_mounts(mounts)
+ self.assertEqual(mounts, self._mounted(mounts))
+ subprocess.call(['./bin/arv-mount', '--subtype', 'baz', '--unmount-all', self.tmp])
+ self.assertEqual(mounts, self._mounted(mounts))
+ subprocess.call(['./bin/arv-mount', '--subtype', 'bar', '--unmount', mounts[0]])
+ self.assertEqual(mounts, self._mounted(mounts))
+ subprocess.call(['./bin/arv-mount', '--subtype', '', '--unmount', self.tmp])
+ self.assertEqual(mounts, self._mounted(mounts))
+ subprocess.check_call(['./bin/arv-mount', '--subtype', 'foo', '--unmount', mounts[0]])
+ self.assertEqual(mounts[1:], self._mounted(mounts))
+ subprocess.check_call(['./bin/arv-mount', '--subtype', '', '--unmount-all', mounts[0]])
+ self.assertEqual(mounts[1:], self._mounted(mounts))
+ subprocess.check_call(['./bin/arv-mount', '--subtype', 'bar', '--unmount-all', self.tmp])
+ self.assertEqual([], self._mounted(mounts))
+
def test_unmount_children(self):
for d in ['foo', 'foo/bar', 'bar']:
mnt = self.tmp+'/'+d
subprocess.check_call(
['./bin/arv-mount', '--subtype', 'test', mnt])
- # Wait for mounts to attach
- deadline = time.time() + 10
- while self._mounted(mounts) != mounts:
- time.sleep(0.1)
- self.assertLess(time.time(), deadline)
-
+ self._wait_for_mounts(mounts)
self.assertEqual(mounts, self._mounted(mounts))
subprocess.check_call(['./bin/arv-mount', '--unmount', self.tmp])
self.assertEqual(mounts, self._mounted(mounts))
self.cloud_size.name)
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
- if not self.cloud_node.size:
- self.cloud_node.size = self.cloud_size
+
+ # The information included in the node size object we get from libcloud
+ # is inconsistent between cloud providers. Replace libcloud NodeSize
+ # object with compatible CloudSizeWrapper object which merges the size
+ # info reported from the cloud with size information from the
+ # configuration file.
+ self.cloud_node.size = self.cloud_size
+
self._logger.info("Cloud node %s created.", self.cloud_node.id)
self._later.update_arvados_node_properties()
return (isinstance(exception, cls.CLOUD_ERRORS) or
type(exception) is Exception)
+ def destroy_node(self, cloud_node):
+ try:
+ return self.real.destroy_node(cloud_node)
+ except self.CLOUD_ERRORS as destroy_error:
+ # Sometimes the destroy node request succeeds but times out and
+ # raises an exception instead of returning success. If this
+ # happens, we get a noisy stack trace. Check if the node is still
+ # on the node list. If it is gone, we can declare victory.
+ try:
+ self.search_for_now(cloud_node.id, 'list_nodes')
+ except ValueError:
+ # If we catch ValueError, that means search_for_now didn't find
+ # it, which means destroy_node actually succeeded.
+ return True
+ # The node is still on the list. Re-raise.
+ raise
+
# Now that we've defined all our own methods, delegate generic, public
# attributes of libcloud drivers that we haven't defined ourselves.
def _delegate_to_real(attr_name):
raise
def sync_node(self, cloud_node, arvados_node):
+ # Update the cloud node record to ensure we have the correct metadata
+ # fingerprint.
+ cloud_node = self.real.ex_get_node(cloud_node.name, cloud_node.extra['zone'])
+
# We can't store the FQDN on the name attribute or anything like it,
# because (a) names are static throughout the node's life (so FQDN
# isn't available because we don't know it at node creation time) and
self._find_metadata(metadata_items, 'hostname')['value'] = hostname
except KeyError:
metadata_items.append({'key': 'hostname', 'value': hostname})
- response = self.real.connection.async_request(
- '/zones/{}/instances/{}/setMetadata'.format(
- cloud_node.extra['zone'].name, cloud_node.name),
- method='POST', data=metadata_req)
- if not response.success():
- raise Exception("setMetadata error: {}".format(response.error))
+
+ self.real.ex_set_node_metadata(cloud_node, metadata_items)
@classmethod
def node_fqdn(cls, node):
'node_stale_after': str(60 * 60 * 2),
'watchdog': '600',
'node_mem_scaling': '0.95'},
+ 'Manage': {'address': '127.0.0.1',
+ 'port': '-1'},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'},
}.iteritems():
import pykka
from . import computenode as cnode
+from . import status
from .computenode import dispatch
from .config import actor_class
states.append("shutdown")
return states + pykka.get_all(proxy_states)
+ def _update_tracker(self):
+ updates = {
+ k: 0
+ for k in status.tracker.keys()
+ if k.startswith('nodes_')
+ }
+ for s in self._node_states(size=None):
+ updates.setdefault('nodes_'+s, 0)
+ updates['nodes_'+s] += 1
+ updates['nodes_wish'] = len(self.last_wishlist)
+ status.tracker.update(updates)
+
def _state_counts(self, size):
states = self._node_states(size)
counts = {
elif (nodes_wanted < 0) and self.booting:
self._later.stop_booting_node(size)
except Exception as e:
- self._logger.exception("while calculating nodes wanted for size %s", size)
+ self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
+ try:
+ self._update_tracker()
+ except:
+ self._logger.exception("while updating tracker")
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
import libcloud
from . import config as nmconfig
+from . import status
from .baseactor import WatchdogActor
from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
signal.signal(sigcode, shutdown_signal)
+ status.Server(config).start()
+
try:
root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
--- /dev/null
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import http.server
+import json
+import logging
+import socketserver
+import threading
+
+_logger = logging.getLogger('status.Handler')
+
+
+class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
+ def __init__(self, config):
+ port = config.getint('Manage', 'port')
+ self.enabled = port >= 0
+ if not self.enabled:
+ _logger.warning("Management server disabled. "+
+ "Use [Manage] config section to enable.")
+ return
+ self._config = config
+ self._tracker = tracker
+ super(Server, self).__init__(
+ (config.get('Manage', 'address'), port), Handler)
+ self._thread = threading.Thread(target=self.serve_forever)
+ self._thread.daemon = True
+
+ def start(self):
+ if self.enabled:
+ self._thread.start()
+
+
+class Handler(http.server.BaseHTTPRequestHandler, object):
+ def do_GET(self):
+ if self.path == '/status.json':
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(tracker.get_json())
+ else:
+ self.send_response(404)
+
+ def log_message(self, fmt, *args, **kwargs):
+ _logger.info(fmt, *args, **kwargs)
+
+
+class Tracker(object):
+ def __init__(self):
+ self._mtx = threading.Lock()
+ self._latest = {}
+
+ def get_json(self):
+ with self._mtx:
+ return json.dumps(self._latest)
+
+ def keys(self):
+ with self._mtx:
+ return self._latest.keys()
+
+ def update(self, updates):
+ with self._mtx:
+ self._latest.update(updates)
+
+
+tracker = Tracker()
# Azure configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
# EC2 configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
# Google Compute Engine configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# Node Manager will ensure that there are at least this many nodes running at
# all times. If node manager needs to start new idle nodes for the purpose of
# is through the API server Rails console: load the Node object, set its
# IP address to 10.10.0.N (where N is the cloud node's ID), and save.
+[Manage]
+address = 0.0.0.0
+port = 8989
+
[Daemon]
min_nodes = 0
max_nodes = 8
('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
],
install_requires=[
- 'apache-libcloud>=0.16',
- 'arvados-python-client>=0.1.20150206225333',
- 'pykka',
- 'python-daemon',
- 'setuptools'
- ],
- dependency_links = [
+ 'apache-libcloud>=0.16',
+ 'arvados-python-client>=0.1.20150206225333',
+ 'future',
+ 'pykka',
+ 'python-daemon',
+ 'setuptools'
+ ],
+ dependency_links=[
"https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev4.zip"
],
test_suite='tests',
- tests_require=['pbr<1.7.0', 'mock>=1.0', "apache-libcloud==0.18.1.dev4"],
+ tests_require=[
+ 'requests',
+ 'pbr<1.7.0',
+ 'mock>=1.0',
+ 'apache-libcloud==0.18.1.dev4',
+ ],
zip_safe=False,
cmdclass={'egg_info': tagger},
)
cloud_node = testutil.cloud_node_mock(
2, metadata=start_metadata.copy(),
zone=testutil.cloud_object_mock('testzone'))
+ self.driver_mock().ex_get_node.return_value = cloud_node
driver = self.new_driver()
driver.sync_node(cloud_node, arv_node)
- args, kwargs = self.driver_mock().connection.async_request.call_args
- self.assertEqual('/zones/testzone/instances/2/setMetadata', args[0])
- for key in ['kind', 'fingerprint']:
- self.assertEqual(start_metadata[key], kwargs['data'][key])
+ args, kwargs = self.driver_mock().ex_set_node_metadata.call_args
+ self.assertEqual(cloud_node, args[0])
plain_metadata['hostname'] = 'compute1.zzzzz.arvadosapi.com'
self.assertEqual(
plain_metadata,
- {item['key']: item['value'] for item in kwargs['data']['items']})
+ {item['key']: item['value'] for item in args[1]})
def test_sync_node_updates_hostname_tag(self):
self.check_sync_node_updates_hostname_tag(
arv_node = testutil.arvados_node_mock(8)
cloud_node = testutil.cloud_node_mock(
9, metadata={}, zone=testutil.cloud_object_mock('failzone'))
- mock_response = self.driver_mock().connection.async_request()
- mock_response.success.return_value = False
- mock_response.error = 'sync error test'
+ mock_response = self.driver_mock().ex_set_node_metadata.side_effect = (Exception('sync error test'),)
driver = self.new_driver()
with self.assertRaises(Exception) as err_check:
driver.sync_node(cloud_node, arv_node)
import pykka
import arvnodeman.daemon as nmdaemon
+import arvnodeman.status as status
from arvnodeman.jobqueue import ServerCalculator
from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
+from . import test_status
import logging
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
monitor = self.monitor_list()[0].proxy()
self.daemon.update_server_wishlist([])
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
self.assertTrue(self.node_shutdown.start.called,
"daemon did not shut down booted node on offer")
+ with test_status.TestServer() as srv:
+ self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
+ self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
+ self.assertEqual(0, srv.get_status().get('nodes_wish', None))
+
def test_booted_node_lifecycle(self):
cloud_node = testutil.cloud_node_mock(6)
setup = self.start_node_boot(cloud_node, id_num=6)
def ping(self):
# Called by WatchdogActorTest, this delay is longer than the test timeout
# of 1 second, which should cause the watchdog ping to fail.
- time.sleep(2)
+ time.sleep(4)
return True
class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
--- /dev/null
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import requests
+import unittest
+
+import arvnodeman.status as status
+import arvnodeman.config as config
+
+
+class TestServer(object):
+ def __enter__(self):
+ cfg = config.NodeManagerConfig()
+ cfg.set('Manage', 'port', '0')
+ cfg.set('Manage', 'address', '127.0.0.1')
+ self.srv = status.Server(cfg)
+ self.srv.start()
+ addr, port = self.srv.server_address
+ self.srv_base = 'http://127.0.0.1:'+str(port)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.srv.shutdown()
+
+ def get_status_response(self):
+ return requests.get(self.srv_base+'/status.json')
+
+ def get_status(self):
+ return self.get_status_response().json()
+
+
+class StatusServerUpdates(unittest.TestCase):
+ def test_updates(self):
+ with TestServer() as srv:
+ for n in [1, 2, 3]:
+ status.tracker.update({'nodes_'+str(n): n})
+ r = srv.get_status_response()
+ self.assertEqual(200, r.status_code)
+ self.assertEqual('application/json', r.headers['content-type'])
+ resp = r.json()
+ self.assertEqual(n, resp['nodes_'+str(n)])
+ self.assertEqual(1, resp['nodes_1'])
+
+
+class StatusServerDisabled(unittest.TestCase):
+ def test_config_disabled(self):
+ cfg = config.NodeManagerConfig()
+ cfg.set('Manage', 'port', '-1')
+ cfg.set('Manage', 'address', '127.0.0.1')
+ self.srv = status.Server(cfg)
+ self.srv.start()
+ self.assertFalse(self.srv.enabled)
+ self.assertFalse(getattr(self.srv, '_thread', False))