--- /dev/null
+// On loading of a collection, enable the "lock" button and
+// disable all file modification controls (upload, rename, delete)
+$(document).
+ ready(function(event) {
+ $(".btn-collection-file-control").addClass("disabled");
+ $(".btn-collection-rename-file-span").attr("title", "Unlock collection to rename file");
+ $(".btn-collection-remove-file-span").attr("title", "Unlock collection to remove file");
+ $(".btn-remove-selected-files").attr("title", "Unlock collection to remove selected files");
+ $(".tab-pane-Upload").addClass("disabled");
+ $(".tab-pane-Upload").attr("title", "Unlock collection to upload files");
+ $("#Upload-tab").attr("data-toggle", "disabled");
+ }).
+ on('click', '.lock-collection-btn', function(event) {
+ classes = $(event.target).attr('class')
+
+ if (classes.indexOf("fa-lock") != -1) {
+ // About to unlock; warn and get confirmation from user
+ if (confirm("Adding, renaming, and deleting files changes the portable data hash. Are you sure you want to unlock the collection?")) {
+ $(".lock-collection-btn").removeClass("fa-lock");
+ $(".lock-collection-btn").addClass("fa-unlock");
+ $(".lock-collection-btn").attr("title", "Lock collection to prevent editing files");
+ $(".btn-collection-rename-file-span").attr("title", "");
+ $(".btn-collection-remove-file-span").attr("title", "");
+ $(".btn-collection-file-control").removeClass("disabled");
+ $(".btn-remove-selected-files").attr("title", "");
+ $(".tab-pane-Upload").removeClass("disabled");
+ $(".tab-pane-Upload").attr("data-original-title", "");
+ $("#Upload-tab").attr("data-toggle", "tab");
+ } else {
+ // User clicked "no" and so do not unlock
+ }
+ } else {
+ // Lock it back
+ $(".lock-collection-btn").removeClass("fa-unlock");
+ $(".lock-collection-btn").addClass("fa-lock");
+ $(".lock-collection-btn").attr("title", "Unlock collection to edit files");
+ $(".btn-collection-rename-file-span").attr("title", "Unlock collection to rename file");
+ $(".btn-collection-remove-file-span").attr("title", "Unlock collection to remove file");
+ $(".btn-collection-file-control").addClass("disabled");
+ $(".btn-remove-selected-files").attr("title", "Unlock collection to remove selected files");
+ $(".tab-pane-Upload").addClass("disabled");
+ $(".tab-pane-Upload").attr("data-original-title", "Unlock collection to upload files");
+ $("#Upload-tab").attr("data-toggle", "disabled");
+ }
+ });
function enable_disable_selection_actions() {
var $container = $(this);
var $checked = $('.persistent-selection:checkbox:checked', $container);
+ var collection_lock_classes = $('.lock-collection-btn').attr('class')
+
$('[data-selection-action]', $container).
closest('div.btn-group-sm').
find('ul li').
toggleClass('disabled',
($checked.filter('[value*=-4zz18-]').length < 1) ||
($checked.length != $checked.filter('[value*=-4zz18-]').length));
+ $('[data-selection-action=remove-selected-files]', $container).
+ closest('li').
+ toggleClass('disabled',
+ ($checked.length < 0) ||
+ !($checked.length > 0 && collection_lock_classes && collection_lock_classes.indexOf("fa-unlock") !=-1));
}
$(document).
.btn-group.toggle-persist .btn-info.active {
background-color: $active-bg;
}
+
+.lock-collection-btn {
+ display: inline-block;
+ padding: .5em 2em;
+ margin: 0 1em;
+}
histogram_log = Log.
filter([[:event_type, '=', 'block-age-free-space-histogram']]).
order(:created_at => :desc).
+ with_count('none').
limit(1)
histogram_log.each do |log_entry|
# We expect this block to only execute at most once since we
filter([[:object_uuid, '=', u.uuid],
[:event_type, '=', 'user-storage-report']]).
order(:created_at => :desc).
+ with_count('none').
limit(1)
storage_log.each do |log_entry|
# We expect this block to only execute once since we specified limit(1)
RESOURCE_CLASS_ICONS = {
"Collection" => "fa-archive",
+ "ContainerRequest" => "fa-gears",
"Group" => "fa-users",
"Human" => "fa-male", # FIXME: Use a more inclusive icon.
"Job" => "fa-gears",
"Trait" => "fa-clipboard",
"User" => "fa-user",
"VirtualMachine" => "fa-terminal",
+ "Workflow" => "fa-gears",
}
DEFAULT_ICON_CLASS = "fa-cube"
ArvadosResourceList.new(self).select(*args)
end
+ def self.with_count(*args)
+ ArvadosResourceList.new(self).with_count(*args)
+ end
+
def self.distinct(*args)
ArvadosResourceList.new(self).distinct(*args)
end
self
end
+ # with_count sets the 'count' parameter to 'exact' or 'none' -- see
+ # https://doc.arvados.org/api/methods.html#index
+ def with_count(count_param='exact')
+ @count = count_param
+ self
+ end
+
def fetch_multiple_pages(f)
@fetch_multiple_pages = f
self
api_params = {
_method: 'GET'
}
+ api_params[:count] = @count if @count
api_params[:where] = @cond if @cond
api_params[:eager] = '1' if @eager
api_params[:select] = @select if @select
end
def stderr_log_query(limit=nil)
- query = Log.where(object_uuid: self.uuid).order("created_at DESC")
+ query = Log.where(object_uuid: self.uuid).order("created_at DESC").with_count('none')
query = query.limit(limit) if limit
query
end
def stderr_log_query(limit=nil)
query = Log.
- where(event_type: "stderr",
- object_uuid: stderr_log_object_uuids).
- order("id DESC")
+ with_count('none').
+ where(event_type: "stderr",
+ object_uuid: stderr_log_object_uuids).
+ order("created_at DESC")
unless limit.nil?
query = query.limit(limit)
end
Log.where(object_uuid: log_object_uuids).
order("created_at DESC").
limit(limit).
+ with_count('none').
select { |log| log.properties[:text].is_a? String }.
reverse.
flat_map { |log| log.properties[:text].split("\n") }
def self.goes_in_projects?
true
end
+
+ def self.creatable?
+ false
+ end
+
+ def textile_attributes
+ [ 'description' ]
+ end
end
end
%>
- <li class="<%= 'active' if i==0 %> <%= link_disabled %>" data-toggle="tooltip" data-placement="top" title="<%=tab_tooltip%>">
+ <li class="<%= 'active' if i==0 %> <%= link_disabled %> tab-pane-<%=pane_name%>" data-toggle="tooltip" data-placement="top" title="<%=tab_tooltip%>">
<a href="#<%= pane_name %>"
id="<%= pane_name %>-tab"
data-toggle="<%= data_toggle %>"
--- /dev/null
+<% if @object.editable? %>
+ <i class="fa fa-fw fa-lock lock-collection-btn btn btn-primary" title="Unlock collection to edit files"></i>
+<% end %>
'data-href' => url_for(controller: 'collections', action: :remove_selected_files),
'data-selection-param-name' => 'selection[]',
'data-selection-action' => 'remove-selected-files',
- 'data-toggle' => 'dropdown'
+ 'data-toggle' => 'dropdown',
+ 'class' => 'btn-remove-selected-files'
%></li>
<% end %>
</ul>
<% end %>
<% if object.editable? %>
- <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate', title: "Remove #{file_path}") do %>
+ <span class="btn-collection-remove-file-span">
+ <%= link_to({controller: 'collections', action: 'remove_selected_files', id: object.uuid, selection: [object.portable_data_hash+'/'+file_path]}, method: :post, remote: true, data: {confirm: "Remove #{file_path}?", toggle: 'tooltip', placement: 'top'}, class: 'btn btn-sm btn-default btn-nodecorate btn-collection-file-control', title: 'Remove this file') do %>
<i class="fa fa-fw fa-trash-o"></i>
<% end %>
+ </span>
<% end %>
<% if CollectionsHelper::is_image(filename) %>
<i class="fa fa-fw fa-bar-chart-o"></i>
<% if object.editable? %>
- <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'} %>
+ <span class="btn-collection-rename-file-span">
+ <%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_path' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit name or directory or both for this file', btnclass: 'collection-file-control'} %>
+ </span>
<% else %>
<%= filename %>
<% end %>
</div>
<% else %>
<% if object.editable? %>
- <i class="fa fa-fw fa-file"></i><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit path of this file (name or directory or both). If you use the same path as another file, it may be removed.'} %>
+ <i class="fa fa-fw fa-file"></i><span class="btn-collection-rename-file-span"><%= render_editable_attribute object, 'filename', filename, {'data-value' => file_path, 'data-toggle' => 'manual', 'selection_name' => 'rename-file-path:'+file_path}, {tiptitle: 'Edit name or directory or both for this file', btnclass: 'collection-file-control'} %>
+ </span>
<% else %>
<i class="fa fa-fw fa-file" href="<%=object.uuid%>/<%=file_path%>" ></i> <%= filename %>
<% end %>
<% if @object.state == 'Final' %>
<%= link_to(copy_container_request_path('id' => @object.uuid),
- class: 'btn btn-primary',
+ class: 'btn btn-sm btn-primary',
title: 'Re-run',
data: {toggle: :tooltip, placement: :top}, title: 'This will make a copy and take you there. You can then make any needed changes and run it',
method: :post,
--- /dev/null
+<%= render partial: "paging", locals: {results: @objects, object: @object} %>
+
+<table class="table table-condensed arv-index">
+ <colgroup>
+ <col width="10%" />
+ <col width="10%" />
+ <col width="25%" />
+ <col width="40%" />
+ <col width="15%" />
+ </colgroup>
+
+ <thead>
+ <tr class="contain-align-left">
+ <th></th>
+ <th></th>
+ <th> name </th>
+ <th> description </th>
+ <th> owner </th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <% @objects.sort_by { |ob| ob[:created_at] }.reverse.each do |ob| %>
+ <tr>
+ <td>
+ <%= button_to(choose_projects_path(id: "run-workflow-button",
+ title: 'Choose project',
+ editable: true,
+ action_name: 'Choose',
+ action_href: work_units_path,
+ action_method: 'post',
+ action_data: {'selection_param' => 'work_unit[owner_uuid]',
+ 'work_unit[template_uuid]' => ob.uuid,
+ 'success' => 'redirect-to-created-object'
+ }.to_json),
+ { class: "btn btn-default btn-xs", title: "Run #{ob.name}", remote: true, method: :get }
+ ) do %>
+ <i class="fa fa-fw fa-play"></i> Run
+ <% end %>
+ </td>
+
+ <td>
+ <%= render :partial => "show_object_button", :locals => {object: ob, size: 'xs'} %>
+ </td>
+
+ <td>
+ <%= render_editable_attribute ob, 'name' %>
+ </td>
+
+ <td>
+ <% if ob.description %>
+ <%= render_attribute_as_textile(ob, "description", ob.description, false) %>
+ <br />
+ <% end %>
+ </td>
+
+ <td>
+ <%= link_to_if_arvados_object ob.owner_uuid, friendly_name: true %>
+ </td>
+ </tr>
+ <% end %>
+ </tbody>
+</table>
+
+<%= render partial: "paging", locals: {results: @objects, object: @object} %>
test "Upload two empty files with the same name" do
need_selenium "to make file uploads work"
visit page_with_token 'active', sandbox_path
+
+ unlock_collection
+
find('.nav-tabs a', text: 'Upload').click
attach_file 'file_selector', testfile_path('empty.txt')
assert_selector 'div', text: 'empty.txt'
test "Upload non-empty files" do
need_selenium "to make file uploads work"
visit page_with_token 'active', sandbox_path
+
+ unlock_collection
+
find('.nav-tabs a', text: 'Upload').click
attach_file 'file_selector', testfile_path('a')
attach_file 'file_selector', testfile_path('foo.txt')
service_port: 99999)
end
visit page_with_token 'active', sandbox_path
+
+ unlock_collection
+
find('.nav-tabs a', text: 'Upload').click
attach_file 'file_selector', testfile_path('foo.txt')
assert_selector 'button:not([disabled])', text: 'Start'
# Must be an absolute path. https://github.com/jnicklas/capybara/issues/621
File.join Dir.getwd, 'tmp', filename
end
+
+ def unlock_collection
+ first('.lock-collection-btn').click
+ accept_alert
+ end
end
end
test "remove a file from collection using checkbox and dropdown option" do
+ need_selenium 'to confirm unlock'
+
visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
assert(page.has_text?('file1'), 'file not found - file1')
+ unlock_collection
+
# remove first file
input_files = page.all('input[type=checkbox]')
input_files[0].click
end
test "remove a file in collection using trash icon" do
- need_selenium 'to confirm remove'
+ need_selenium 'to confirm unlock'
visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
assert(page.has_text?('file1'), 'file not found - file1')
+ unlock_collection
+
first('.fa-trash-o').click
- page.driver.browser.switch_to.alert.accept
+ accept_alert
assert(page.has_no_text?('file1'), 'file found - file')
assert(page.has_text?('file2'), 'file not found - file2')
end
test "rename a file in collection" do
+ need_selenium 'to confirm unlock'
+
visit page_with_token('active', '/collections/zzzzz-4zz18-a21ux3541sxa8sf')
+ unlock_collection
+
within('.collection_files') do
first('.fa-pencil').click
find('.editable-input input').set('file1renamed')
assert_nil first('.fa-trash-o')
end
end
+
+ test "unlock collection to modify files" do
+ need_selenium 'to confirm remove'
+
+ collection = api_fixture('collections')['collection_owned_by_active']
+
+ # On load, collection is locked, and upload tab, rename and remove options are disabled
+ visit page_with_token('active', "/collections/#{collection['uuid']}")
+
+ assert_selector 'a[data-toggle="disabled"]', text: 'Upload'
+
+ within('.collection_files') do
+ file_ctrls = page.all('.btn-collection-file-control')
+ assert_equal 2, file_ctrls.size
+ assert_equal true, file_ctrls[0]['class'].include?('disabled')
+ assert_equal true, file_ctrls[1]['class'].include?('disabled')
+ find('input[type=checkbox]').click
+ end
+
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_selector 'li.disabled', text: 'Remove selected files'
+ assert_selector 'li', text: 'Create new collection with selected files'
+ end
+
+ unlock_collection
+
+ assert_no_selector 'a[data-toggle="disabled"]', text: 'Upload'
+ assert_selector 'a', text: 'Upload'
+
+ within('.collection_files') do
+ file_ctrls = page.all('.btn-collection-file-control')
+ assert_equal 2, file_ctrls.size
+ assert_equal false, file_ctrls[0]['class'].include?('disabled')
+ assert_equal false, file_ctrls[1]['class'].include?('disabled')
+
+ # previous checkbox selection won't result in firing a new event;
+ # undo and redo checkbox to fire the selection event again
+ find('input[type=checkbox]').click
+ find('input[type=checkbox]').click
+ end
+
+ click_button 'Selection'
+ within('.selection-action-container') do
+ assert_no_selector 'li.disabled', text: 'Remove selected files'
+ assert_selector 'li', text: 'Remove selected files'
+ end
+ end
+
+ def unlock_collection
+ first('.lock-collection-btn').click
+ accept_alert
+ end
end
end
end
end
+
+ test 'Run from workflows index page' do
+ visit page_with_token('active', '/workflows')
+
+ wf_count = page.all('a[data-original-title="show workflow"]').count
+ assert_equal true, wf_count>0
+
+ # Run one of the workflows
+ wf_name = 'Workflow with input specifications'
+ within('tr', text: wf_name) do
+ find('a,button', text: 'Run').click
+ end
+
+ # Choose project for the container_request being created
+ within('.modal-dialog') do
+ find('.selectable', text: 'A Project').click
+ find('button', text: 'Choose').click
+ end
+
+ # In newly created container_request page now
+ assert_text 'A Project' # CR created in "A Project"
+ assert_text "This container request was created from the workflow #{wf_name}"
+ assert_match /Provide a value for .* then click the \"Run\" button to start the workflow/, page.text
+ end
end
end
Capybara.reset_sessions!
end
+
+ def accept_alert
+ if Capybara.current_driver == :selenium
+ (0..9).each do
+ begin
+ page.driver.browser.switch_to.alert.accept
+ break
+ rescue Selenium::WebDriver::Error::NoSuchAlertError
+ sleep 0.1
+ end
+ end
+ else
+ # poltergeist returns true for confirm, so no need to accept
+ end
+ end
end
class ResourceListTest < ActiveSupport::TestCase
+ reset_api_fixtures :after_each_test, false
+
test 'links_for on a resource list that does not return links' do
use_token :active
results = Specimen.all
assert_empty c.results
end
+ test 'count=none' do
+ use_token :active
+ c = Collection.with_count('none')
+ assert_nil c.items_available
+ refute_empty c.results
+ end
end
require 'test_helper'
class LinkTest < ActiveSupport::TestCase
+
+ reset_api_fixtures :after_each_test, false
+
def uuid_for(fixture_name, object_name)
api_fixture(fixture_name)[object_name]["uuid"]
end
require 'test_helper'
class PipelineInstanceTest < ActiveSupport::TestCase
+
+ reset_api_fixtures :after_each_test, false
+
def find_pi_with(token_name, pi_name)
use_token token_name
find_fixture(PipelineInstance, pi_name)
require 'test_helper'
class WorkUnitTest < ActiveSupport::TestCase
+
+ reset_api_fixtures :after_each_test, false
+
setup do
Rails.configuration.anonymous_user_token = api_fixture('api_client_authorizations')['anonymous']['api_token']
end
all|cwltest|1.0.20160907111242|3|python|all|--depends 'python-futures >= 3.0.5'
all|rdflib-jsonld|0.4.0|2|python|all
all|futures|3.0.5|2|python|all
+all|future|0.16.0|2|python|all
+all|future|0.16.0|2|python3|all
ubuntu1404)
FORMAT=deb
;;
+ ubuntu1604)
+ FORMAT=deb
+ ;;
centos7)
FORMAT=rpm
;;
TARGET=debian8
COMMAND=
-RAILS_PACKAGE_ITERATION=7
-
PARSEDOPTS=$(getopt --name "$0" --longoptions \
help,build-bundle-packages,debug,target:,only-build: \
-- "" "$@")
# older packages.
LICENSE_PACKAGE_TS=20151208015500
+RAILS_PACKAGE_ITERATION=7
+
debug_echo () {
echo "$@" >"$STDOUT_IF_DEBUG"
}
export GOPATH
mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \
+ln -sfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \
|| fatal "symlink failed"
setup_virtualenv "$VENVDIR" --python python2.7
</ul>
<div class="pull-right" style="padding-top: 6px">
- <form method="get" action="http://www.google.com/search">
+ <form method="get" action="https://www.google.com/search">
<div class="input-group" style="width: 220px">
<input type="text" class="form-control" name="q" placeholder="search">
<div class="input-group-addon">
# EC2 configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
# Google Compute Engine configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# Node Manager will ensure that there are at least this many nodes running at
# all times. If node manager needs to start new idle nodes for the purpose of
# Azure configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
<pre>
$namespaces:
arv: "http://arvados.org/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
</pre>
Arvados extensions must go into the @hints@ section, for example:
arv:PartitionRequirement:
partition: dev_partition
arv:APIRequirement: {}
+ cwltool:LoadListingRequirement:
+ loadListing: shallow_listing
</pre>
h2. arv:RunInSingleContainer
h2. arv:APIRequirement
Indicates that process wants to access to the Arvados API. Will be granted limited network access and have @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ set in the environment.
+
+h2. cwltool:LoadListingRequirement
+
+In CWL v1.0 documents, the default behavior for Directory objects is to recursively expand the @listing@ for access by parameter references an expressions. For directory trees containing many files, this can be expensive in both time and memory usage. Use @cwltool:LoadListingRequirement@ to change the behavior for expansion of directory listings in the workflow runner.
+
+table(table table-bordered table-condensed).
+|_. Field |_. Type |_. Description |
+|loadListing|string|One of @no_listing@, @shallow_listing@, or @deep_listing@|
+
+*no_listing*: Do not expand directory listing at all. The @listing@ field on the Directory object will be undefined.
+
+*shallow_listing*: Only expand the first level of directory listing. The @listing@ field on the toplevel Directory object will contain the directory contents, however @listing@ will not be defined on subdirectories.
+
+*deep_listing*: Recursively expand all levels of directory listing. The @listing@ field will be provided on the toplevel object and all subdirectories.
#!/bin/sh
-exec docker build -t arvados/migrate-docker19 .
+exec docker build -t arvados/migrate-docker19:1.0 .
read pid cmd state ppid pgrp session tty_nr tpgid rest < /proc/self/stat
-exec docker daemon --storage-driver=vfs $DOCKER_DAEMON_ARGS
+exec docker daemon --storage-driver=$1 $DOCKER_DAEMON_ARGS
#!/bin/bash
-set -e
+# This script is called by arv-migrate-docker19 to perform the actual migration
+# of a single image. This works by running Docker-in-Docker (dnd.sh) to
+# download the image using Docker 1.9 and then upgrading to Docker 1.13 and
+# uploading the converted image.
+
+# When using bash in pid 1 and using "trap on EXIT"
+# it will sometimes go into an 100% CPU infinite loop.
+#
+# Using workaround from here:
+#
+# https://github.com/docker/docker/issues/4854
+if [ "$$" = 1 ]; then
+ $0 "$@"
+ exit $?
+fi
+
+# -x show script
+# -e exit on error
+# -o pipefail use exit code from 1st failure in pipeline, not last
+set -x -e -o pipefail
+image_tar_keepref=$1
+image_id=$2
+image_repo=$3
+image_tag=$4
+project_uuid=$5
+graph_driver=$6
+
+if [[ "$image_repo" = "<none>" ]] ; then
+ image_repo=none
+ image_tag=latest
+fi
+
+# Print free space in /var/lib/docker
+function freespace() {
+ df -B1 /var/lib/docker | tail -n1 | sed 's/ */ /g' | cut -d' ' -f4
+}
+
+# Run docker-in-docker script and then wait for it to come up
function start_docker {
- /root/dnd.sh &
+ /root/dnd.sh $graph_driver &
for i in $(seq 1 10) ; do
if docker version >/dev/null 2>/dev/null ; then
return
false
}
+# Kill docker from pid then wait for it to be down
function kill_docker {
if test -f /var/run/docker.pid ; then
kill $(cat /var/run/docker.pid)
false
}
+# Ensure that we clean up docker graph and/or lingering cache files on exit
function cleanup {
kill_docker
rm -rf /var/lib/docker/*
rm -rf /root/.cache/arvados/docker/*
+ echo "Available space after cleanup is $(freespace)"
}
trap cleanup EXIT
start_docker
-image_tar_keepref=$1
-image_id=$2
-image_repo=$3
-image_tag=$4
-project_uuid=$5
+echo "Initial available space is $(freespace)"
arv-get $image_tar_keepref | docker load
+
docker tag $image_id $image_repo:$image_tag
docker images -a
kill_docker
+echo "Available space after image load is $(freespace)"
+
cd /root/pkgs
dpkg -i libltdl7_2.4.2-1.11+b1_amd64.deb docker-engine_1.13.1-0~debian-jessie_amd64.deb
+echo "Available space after image upgrade is $(freespace)"
+
start_docker
docker images -a
+if [[ "$image_repo" = "none" ]] ; then
+ image_repo=$(docker images -a --no-trunc | sed 's/ */ /g' | grep ^none | cut -d' ' -f3)
+ image_tag=""
+fi
+
UUID=$(arv-keepdocker --force-image-format --project-uuid=$project_uuid $image_repo $image_tag)
+echo "Available space after arv-keepdocker is $(freespace)"
+
echo "Migrated uuid is $UUID"
# Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
# up work directories crunch_tmp/work, crunch_tmp/opt,
# crunch_tmp/src*.
- #
- # TODO: When #5036 is done and widely deployed, we can limit mount's
- # -t option to simply fuse.keep.
my ($exited, $stdout, $stderr) = srun_sync(
["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+ ['bash', '-ec', q{
+arv-mount --unmount-timeout 10 --unmount-all ${CRUNCH_TMP}
+rm -rf ${JOB_WORK} ${CRUNCH_INSTALL} ${CRUNCH_TMP}/task ${CRUNCH_TMP}/src* ${CRUNCH_TMP}/*.cid
+ }],
{label => "clean work dirs"});
if ($exited != 0) {
exit(EX_RETRY_UNLOCKED);
--- /dev/null
+#!/bin/bash
def test_output_collection_owner_uuid
j = jobspec :grep_local
out, err = capture_subprocess_io do
- tryjobrecord j, binstubs: ['output_coll_owner']
+ tryjobrecord j, binstubs: ['arv-mount', 'output_coll_owner']
end
assert_match /owner_uuid: #{j['owner_uuid']}/, err
end
out, err = capture_subprocess_io do
j = jobspec :grep_local
j[:script_version] = bogus_version
- tryjobrecord j
+ tryjobrecord j, binstubs: ['arv-mount']
end
assert_match /'#{bogus_version}' not found, giving up/, err
assert_jobfail $?
from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
-from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
from ._version import __version__
from cwltool.pack import pack
-from cwltool.process import shortname, UnsupportedRequirement, getListing
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
from cwltool.draft2tool import compute_checksums
from arvados.api import OrderedJsonModel
else:
self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
+ self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
+
self.work_api = None
expected_api = ["jobs", "containers"]
for api in expected_api:
kwargs["work_api"] = self.work_api
kwargs["fetcher_constructor"] = partial(CollectionFetcher,
api_client=self.api,
- keep_client=self.keep_client)
+ fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
+ num_retries=self.num_retries)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
return ArvadosCommandTool(self, toolpath_object, **kwargs)
elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
if isinstance(obj, dict):
if obj.get("writable"):
raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
- if obj.get("class") == "CommandLineTool":
- if self.work_api == "containers":
- if obj.get("stdin"):
- raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
- if obj.get("stderr"):
- raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
if obj.get("class") == "DockerRequirement":
if obj.get("dockerOutputDirectory"):
# TODO: can be supported by containers API, but not jobs API.
keep_client=self.keep_client,
num_retries=self.num_retries)
- srccollections = {}
for k,v in generatemapper.items():
if k.startswith("_:"):
if v.type == "Directory":
raise Exception("Output source is not in keep or a literal")
sp = k.split("/")
srccollection = sp[0][5:]
- if srccollection not in srccollections:
- try:
- srccollections[srccollection] = arvados.collection.CollectionReader(
- srccollection,
- api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
- except arvados.errors.ArgumentError as e:
- logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
- raise
- reader = srccollections[srccollection]
try:
+ reader = self.collection_cache.get(srccollection)
srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+ except arvados.errors.ArgumentError as e:
+ logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+ raise
except IOError as e:
logger.warn("While preparing output collection: %s", e)
self.project_uuid = kwargs.get("project_uuid")
self.pipeline = None
make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
- api_client=self.api,
- keep_client=self.keep_client)
+ collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
if not kwargs.get("name"):
self.set_crunch_output()
if kwargs.get("compute_checksum"):
- adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
+ adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
return (self.final_output, self.final_status)
return parser
def add_arv_hints():
- cache = {}
- cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+ cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
- cache["http://arvados.org/cwl"] = res.read()
+ use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
res.close()
- document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
- _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
- for n in extnames.names:
- if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
- cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
- document_loader.idx["http://arvados.org/cwl#"+n] = {}
+ cwltool.process.supportedProcessRequirements.extend([
+ "http://arvados.org/cwl#RunInSingleContainer",
+ "http://arvados.org/cwl#OutputDirType",
+ "http://arvados.org/cwl#RuntimeConstraints",
+ "http://arvados.org/cwl#PartitionRequirement",
+ "http://arvados.org/cwl#APIRequirement",
+ "http://commonwl.org/cwltool#LoadListingRequirement"
+ ])
def main(args, stdout, stderr, api_client=None, keep_client=None):
parser = arg_parser()
arvargs.relax_path_checks = True
arvargs.validate = None
+ make_fs_access = partial(CollectionFsAccess,
+ collection_cache=runner.collection_cache)
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
makeTool=runner.arv_make_tool,
versionfunc=versionstring,
job_order_object=job_order_object,
- make_fs_access=partial(CollectionFsAccess,
- api_client=api_client,
- keep_client=keep_client),
+ make_fs_access=make_fs_access,
fetcher_constructor=partial(CollectionFetcher,
api_client=api_client,
- keep_client=keep_client,
+ fs_access=make_fs_access(""),
num_retries=runner.num_retries),
resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
- logger_handler=arvados.log_handler)
+ logger_handler=arvados.log_handler,
+ custom_schema_callback=add_arv_hints)
$base: "http://arvados.org/cwl#"
+$namespaces:
+ cwl: "https://w3id.org/cwl/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
$graph:
+- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
+
+- name: cwltool:LoadListingRequirement
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ fields:
+ class:
+ type: string
+ doc: "Always 'LoadListingRequirement'"
+ jsonldPredicate:
+ "_id": "@type"
+ "_type": "@vocab"
+ loadListing:
+ type:
+ - "null"
+ - type: enum
+ name: LoadListingEnum
+ symbols: [no_listing, shallow_listing, deep_listing]
+
- name: RunInSingleContainer
type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
doc: |
Indicates that a subworkflow should run in a single container
and not be scheduled as separate steps.
- name: RuntimeConstraints
type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
doc: |
Set Arvados-specific runtime hints.
fields:
- name: PartitionRequirement
type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
doc: |
Select preferred compute partitions on which to run jobs.
fields:
- name: APIRequirement
type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
doc: |
Indicates that process wants to access to the Arvados API. Will be granted
limited network access and have ARVADOS_API_HOST and ARVADOS_API_TOKEN set
dirs = set()
for f in self.pathmapper.files():
- pdh, p, tp = self.pathmapper.mapper(f)
+ pdh, p, tp, stg = self.pathmapper.mapper(f)
if tp == "Directory" and '/' not in pdh:
mounts[p] = {
"kind": "collection",
dirs.add(pdh)
for f in self.pathmapper.files():
- res, p, tp = self.pathmapper.mapper(f)
+ res, p, tp, stg = self.pathmapper.mapper(f)
if res.startswith("keep:"):
res = res[5:]
elif res.startswith("/keep/"):
container_request["environment"].update(self.environment)
if self.stdin:
- raise UnsupportedRequirement("Stdin redirection currently not suppported")
+ sp = self.stdin[6:].split("/", 1)
+ mounts["stdin"] = {"kind": "collection",
+ "portable_data_hash": sp[0],
+ "path": sp[1]}
if self.stderr:
- raise UnsupportedRequirement("Stderr redirection currently not suppported")
+ mounts["stderr"] = {"kind": "file",
+ "path": "%s/%s" % (self.outdir, self.stderr)}
if self.stdout:
mounts["stdout"] = {"kind": "file",
import functools
from arvados.api import OrderedJsonModel
-from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
+from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, normalizeFilesDirs
from cwltool.load_tool import load_tool
from cwltool.errors import WorkflowException
-from .fsaccess import CollectionFetcher
+from .fsaccess import CollectionFetcher, CollectionFsAccess
logger = logging.getLogger('arvados.cwl-runner')
adjustFileObjs(job_order_object, keeppathObj)
adjustDirObjs(job_order_object, keeppathObj)
normalizeFilesDirs(job_order_object)
- adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
output_name = None
output_tags = None
runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
output_name=output_name, output_tags=output_tags)
+ make_fs_access = functools.partial(CollectionFsAccess,
+ collection_cache=runner.collection_cache)
+
t = load_tool(toolpath, runner.arv_make_tool,
fetcher_constructor=functools.partial(CollectionFetcher,
- api_client=api,
- keep_client=arvados.keep.KeepClient(api_client=api, num_retries=4)))
+ api_client=runner.api,
+ fs_access=make_fs_access(""),
+ num_retries=runner.num_retries))
args = argparse.Namespace()
args.project_uuid = arvados.current_job()["owner_uuid"]
args.basedir = os.getcwd()
args.name = None
args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
+ args.make_fs_access = make_fs_access
+
runner.arv_executor(t, job_order_object, **vars(args))
except Exception as e:
if isinstance(e, WorkflowException):
import urlparse
import re
import logging
+import threading
import ruamel.yaml as yaml
logger = logging.getLogger('arvados.cwl-runner')
+class CollectionCache(object):
+ def __init__(self, api_client, keep_client, num_retries):
+ self.api_client = api_client
+ self.keep_client = keep_client
+ self.collections = {}
+ self.lock = threading.Lock()
+
+ def get(self, pdh):
+ with self.lock:
+ if pdh not in self.collections:
+ logger.debug("Creating collection reader for %s", pdh)
+ self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
+ keep_client=self.keep_client)
+ return self.collections[pdh]
+
+
class CollectionFsAccess(cwltool.stdfsaccess.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
- def __init__(self, basedir, api_client=None, keep_client=None):
+ def __init__(self, basedir, collection_cache=None):
super(CollectionFsAccess, self).__init__(basedir)
- self.api_client = api_client
- self.keep_client = keep_client
- self.collections = {}
+ self.collection_cache = collection_cache
def get_collection(self, path):
sp = path.split("/", 1)
p = sp[0]
if p.startswith("keep:") and arvados.util.keep_locator_pattern.match(p[5:]):
pdh = p[5:]
- if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client,
- keep_client=self.keep_client)
- return (self.collections[pdh], sp[1] if len(sp) == 2 else None)
+ return (self.collection_cache.get(pdh), sp[1] if len(sp) == 2 else None)
else:
return (None, path)
return os.path.realpath(path)
class CollectionFetcher(DefaultFetcher):
- def __init__(self, cache, session, api_client=None, keep_client=None, num_retries=4):
+ def __init__(self, cache, session, api_client=None, fs_access=None, num_retries=4):
super(CollectionFetcher, self).__init__(cache, session)
self.api_client = api_client
- self.fsaccess = CollectionFsAccess("", api_client=api_client, keep_client=keep_client)
+ self.fsaccess = fs_access
self.num_retries = num_retries
def fetch_text(self, url):
def visit(self, srcobj, uploadfiles):
src = srcobj["location"]
- if srcobj["class"] == "File":
- if "#" in src:
- src = src[:src.index("#")]
- if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
- self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), "File")
- if src not in self._pathmap:
+ if "#" in src:
+ src = src[:src.index("#")]
+
+ if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), srcobj["class"], True)
+
+ if src not in self._pathmap:
+ if src.startswith("file:"):
# Local FS ref, may need to be uploaded or may be on keep
# mount.
ab = abspath(src, self.input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern="keep:%s/%s")
+ st = arvados.commands.run.statfile("", ab,
+ fnPattern="keep:%s/%s",
+ dirPattern="keep:%s/%s")
with SourceLine(srcobj, "location", WorkflowException):
if isinstance(st, arvados.commands.run.UploadFile):
uploadfiles.add((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File")
- elif src.startswith("_:"):
- if "contents" in srcobj:
- pass
- else:
- raise WorkflowException("File literal '%s' is missing contents" % src)
- elif src.startswith("arvwf:"):
- self._pathmap[src] = MapperEnt(src, src, "File")
+ self._pathmap[src] = MapperEnt(st.fn, self.collection_pattern % urllib.unquote(st.fn[5:]), "File", True)
else:
raise WorkflowException("Input file path '%s' is invalid" % st)
- if "secondaryFiles" in srcobj:
- for l in srcobj["secondaryFiles"]:
- self.visit(l, uploadfiles)
- elif srcobj["class"] == "Directory":
- if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
- self._pathmap[src] = MapperEnt(src, self.collection_pattern % urllib.unquote(src[5:]), "Directory")
+ elif src.startswith("_:"):
+ if srcobj["class"] == "File" and "contents" not in srcobj:
+ raise WorkflowException("File literal '%s' is missing `contents`" % src)
+ if srcobj["class"] == "Directory" and "listing" not in srcobj:
+ raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+ else:
+ self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
+
+ with SourceLine(srcobj, "secondaryFiles", WorkflowException):
+ for l in srcobj.get("secondaryFiles", []):
+ self.visit(l, uploadfiles)
+ with SourceLine(srcobj, "listing", WorkflowException):
for l in srcobj.get("listing", []):
self.visit(l, uploadfiles)
for l in obj.get("secondaryFiles", []):
self.addentry(l, c, path, subdirs)
elif obj["class"] == "Directory":
- for l in obj["listing"]:
+ for l in obj.get("listing", []):
self.addentry(l, c, path + "/" + obj["basename"], subdirs)
subdirs.append((obj["location"], path + "/" + obj["basename"]))
elif obj["location"].startswith("_:") and "contents" in obj:
loc = k["location"]
if loc in already_uploaded:
v = already_uploaded[loc]
- self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File")
+ self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), "File", True)
for srcobj in referenced_files:
self.visit(srcobj, uploadfiles)
project=self.arvrunner.project_uuid)
for src, ab, st in uploadfiles:
- self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:], "File")
+ self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
+ "Directory" if os.path.isdir(ab) else "File", True)
self.arvrunner.add_uploaded(src, self._pathmap[src])
for srcobj in referenced_files:
+ subdirs = []
if srcobj["class"] == "Directory":
if srcobj["location"] not in self._pathmap:
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- subdirs = []
- for l in srcobj["listing"]:
+ for l in srcobj.get("listing", []):
self.addentry(l, c, ".", subdirs)
check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
c.save_new(owner_uuid=self.arvrunner.project_uuid)
ab = self.collection_pattern % c.portable_data_hash()
- self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "Directory")
- for loc, sub in subdirs:
- ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
- self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+ self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
(srcobj["location"].startswith("_:") and "contents" in srcobj)):
c = arvados.collection.Collection(api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries )
- subdirs = []
self.addentry(srcobj, c, ".", subdirs)
check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
c.save_new(owner_uuid=self.arvrunner.project_uuid)
ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
- self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "File")
+ self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
+ ab, "File", True)
if srcobj.get("secondaryFiles"):
ab = self.collection_pattern % c.portable_data_hash()
- self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt(ab, ab, "Directory")
+ self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
+
+ if subdirs:
for loc, sub in subdirs:
+ # subdirs will all start with "./", strip it off
ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
- self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+ self._pathmap[loc] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), sub[2:]),
+ ab, "Directory", True)
self.keepdir = None
class StagingPathMapper(PathMapper):
_follow_dirs = True
- def visit(self, obj, stagedir, basedir, copy=False):
+ def visit(self, obj, stagedir, basedir, copy=False, staged=False):
# type: (Dict[unicode, Any], unicode, unicode, bool) -> None
loc = obj["location"]
tgt = os.path.join(stagedir, obj["basename"])
if obj["class"] == "Directory":
- self._pathmap[loc] = MapperEnt(loc, tgt, "Directory")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
if loc.startswith("_:") or self._follow_dirs:
self.visitlisting(obj.get("listing", []), tgt, basedir)
elif obj["class"] == "File":
if loc in self._pathmap:
return
if "contents" in obj and loc.startswith("_:"):
- self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
+ self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
else:
if copy:
- self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
else:
- self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
# with any secondary files.
self.visitlisting(referenced_files, self.stagedir, basedir)
- for path, (ab, tgt, type) in self._pathmap.items():
+ for path, (ab, tgt, type, staged) in self._pathmap.items():
if type in ("File", "Directory") and ab.startswith("keep:"):
- self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
+ self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type, staged)
class NoFollowPathMapper(StagingPathMapper):
if submit_runner_ram:
self.submit_runner_ram = submit_runner_ram
else:
- self.submit_runner_ram = 1024
+ self.submit_runner_ram = 3000
if self.submit_runner_ram <= 0:
raise Exception("Value of --submit-runner-ram must be greater than zero")
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20170224141733',
- 'schema-salad==2.2.20170222151604',
+ 'cwltool==1.0.20170413194156',
+ 'schema-salad==2.5.20170328195758',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
- 'arvados-python-client>=0.1.20170112173420',
+ 'arvados-python-client>=0.1.20170327195441',
'setuptools'
],
data_files=[
export ARVADOS_API_HOST_INSECURE=1
export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
+arv-keepdocker --pull arvados/jobs latest
+
cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
#!/bin/sh
exec arvados-cwl-runner --api=jobs --compute-checksum \\\$@
output: {}
tool: cat.cwl
doc: Test hashes in filenames
+
+- job: listing-job.yml
+ output: {
+ "out": {
+ "class": "File",
+ "location": "output.txt",
+ "size": 5,
+ "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+ }
+ }
+ tool: wf/listing_shallow.cwl
+ doc: test shallow directory listing
+
+- job: listing-job.yml
+ output: {
+ "out": {
+ "class": "File",
+ "location": "output.txt",
+ "size": 5,
+ "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+ }
+ }
+ tool: wf/listing_none.cwl
+ doc: test no directory listing
+
+- job: listing-job.yml
+ output: {
+ "out": {
+ "class": "File",
+ "location": "output.txt",
+ "size": 5,
+ "checksum": "sha1$724ba28f4a9a1b472057ff99511ed393a45552e1"
+ }
+ }
+ tool: wf/listing_deep.cwl
+ doc: test deep directory listing
--- /dev/null
+d:
+ class: Directory
+ location: tmp1
\ No newline at end of file
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
}],
"baseCommand": "ls"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
loader=Loader({}))
}],
"baseCommand": "ls"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
loader=Loader({}))
for key in call_body:
self.assertEqual(call_body_expected.get(key), call_body.get(key))
+
+ # Test redirecting stdin/stdout/stderr
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_redirects(self, keepdocker):
+ arv_docker_clear_cache()
+
+ runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ tool = cmap({
+ "inputs": [],
+ "outputs": [],
+ "baseCommand": "ls",
+ "stdout": "stdout.txt",
+ "stderr": "stderr.txt",
+ "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+ })
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ arvtool.formatgraph = None
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
+ make_fs_access=make_fs_access, tmpdir="/tmp"):
+ j.run()
+ runner.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher({
+ 'environment': {
+ 'HOME': '/var/spool/cwl',
+ 'TMPDIR': '/tmp'
+ },
+ 'name': 'test_run_redirect',
+ 'runtime_constraints': {
+ 'vcpus': 1,
+ 'ram': 1073741824
+ },
+ 'use_existing': True,
+ 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {'kind': 'tmp'},
+ "stderr": {
+ "kind": "file",
+ "path": "/var/spool/cwl/stderr.txt"
+ },
+ "stdin": {
+ "kind": "collection",
+ "path": "file.txt",
+ "portable_data_hash": "99999999999999999999999999999996+99"
+ },
+ "stdout": {
+ "kind": "file",
+ "path": "/var/spool/cwl/stdout.txt"
+ },
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': 'arvados/jobs',
+ 'command': ['ls', '/var/spool/cwl'],
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {},
+ 'properties': {},
+ }))
+
@mock.patch("arvados.collection.Collection")
def test_done(self, col):
api = mock.MagicMock()
--- /dev/null
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+
+import arvados
+import arvados.keep
+import arvados.collection
+import arvados_cwl
+
+from cwltool.pathmapper import MapperEnt
+from .mock_discovery import get_rootDesc
+
+from arvados_cwl.fsaccess import CollectionCache
+
+class TestFsAccess(unittest.TestCase):
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_collection_cache(self, cr):
+ cache = CollectionCache(mock.MagicMock(), mock.MagicMock(), 4)
+ c1 = cache.get("99999999999999999999999999999991+99")
+ c2 = cache.get("99999999999999999999999999999991+99")
+ self.assertIs(c1, c2)
+ self.assertEqual(1, cr.call_count)
+ c3 = cache.get("99999999999999999999999999999992+99")
+ self.assertEqual(2, cr.call_count)
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}]
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
}],
"baseCommand": "ls"
}
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
make_fs_access=make_fs_access, loader=Loader({}))
arvtool.formatgraph = None
mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
basedir="", make_fs_access=make_fs_access, loader=document_loader,
makeTool=runner.arv_make_tool, metadata=metadata)
def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None):
pdh = "99999999999999999999999999999991+99"
for c in files:
+ c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
c.fn = fnPattern % (pdh, os.path.basename(c.fn))
class TestPathmap(unittest.TestCase):
"location": "keep:99999999999999999999999999999991+99/hw.py"
}], "", "/test/%s", "/test/%s/%s")
- self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ self.assertEqual({'keep:99999999999999999999999999999991+99/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
@mock.patch("arvados.commands.run.uploadfiles")
- def test_upload(self, upl):
+ @mock.patch("arvados.commands.run.statfile")
+ def test_upload(self, statfile, upl):
"""Test pathmapper uploading files."""
arvrunner = arvados_cwl.ArvCwlRunner(self.api)
+ def statfile_mock(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)"):
+ st = arvados.commands.run.UploadFile("", "tests/hw.py")
+ return st
+
upl.side_effect = upload_mock
+ statfile.side_effect = statfile_mock
p = ArvPathMapper(arvrunner, [{
"class": "File",
- "location": "tests/hw.py"
+ "location": "file:tests/hw.py"
}], "", "/test/%s", "/test/%s/%s")
- self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
@mock.patch("arvados.commands.run.uploadfiles")
"""Test pathmapper handling previously uploaded files."""
arvrunner = arvados_cwl.ArvCwlRunner(self.api)
- arvrunner.add_uploaded('tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File'))
+ arvrunner.add_uploaded('file:tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='', type='File', staged=True))
upl.side_effect = upload_mock
p = ArvPathMapper(arvrunner, [{
"class": "File",
- "location": "tests/hw.py"
+ "location": "file:tests/hw.py"
}], "", "/test/%s", "/test/%s/%s")
- self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
@mock.patch("arvados.commands.run.uploadfiles")
p = ArvPathMapper(arvrunner, [{
"class": "File",
- "location": "tests/hw.py"
+ "location": "file:tests/hw.py"
}], "", "/test/%s", "/test/%s/%s")
- self.assertEqual({'tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File')},
+ self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
arvrunner.project_uuid = ""
api.return_value = mock.MagicMock()
arvrunner.api = api.return_value
- arvrunner.api.links().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
- {"items": [], "items_available": 0, "offset": 0},
- {"items": [], "items_available": 0, "offset": 0},
- {"items": [{"created_at": "",
- "head_uuid": "",
- "link_class": "docker_image_hash",
- "name": "123456",
- "owner_uuid": "",
- "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
- {"items": [], "items_available": 0, "offset": 0},
- {"items": [{"created_at": "",
- "head_uuid": "",
+ arvrunner.api.links().list().execute.side_effect = ({"items": [{"created_at": "",
+ "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"link_class": "docker_image_repo+tag",
"name": "arvados/jobs:"+arvados_cwl.__version__,
"owner_uuid": "",
"link_class": "docker_image_hash",
"name": "123456",
"owner_uuid": "",
- "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0} ,
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}
)
find_one_image_hash.return_value = "123456"
- arvrunner.api.collections().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
- {"items": [{"uuid": "",
+ arvrunner.api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"owner_uuid": "",
"manifest_text": "",
"properties": ""
- }], "items_available": 1, "offset": 0},
- {"items": [{"uuid": ""}], "items_available": 1, "offset": 0})
+ }], "items_available": 1, "offset": 0},)
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
- self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+ self.assertEqual("arvados/jobs:"+arvados_cwl.__version__,
+ arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
class TestCreateTemplate(unittest.TestCase):
existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+ cwltool: "http://commonwl.org/cwltool#"
+requirements:
+ cwltool:LoadListingRequirement:
+ loadListing: deep_listing
+ InlineJavascriptRequirement: {}
+inputs:
+ d: Directory
+outputs:
+ out: stdout
+stdout: output.txt
+arguments:
+ [echo, "${if(inputs.d.listing[0].class === 'Directory' && inputs.d.listing[0].listing[0].class === 'Directory') {return 'true';} else {return 'false';}}"]
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+ cwltool: http://commonwl.org/cwltool#
+requirements:
+ cwltool:LoadListingRequirement:
+ loadListing: no_listing
+ InlineJavascriptRequirement: {}
+inputs:
+ d: Directory
+outputs:
+ out: stdout
+stdout: output.txt
+arguments:
+ [echo, "${if(inputs.d.listing === undefined) {return 'true';} else {return 'false';}}"]
\ No newline at end of file
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+$namespaces:
+ cwltool: http://commonwl.org/cwltool#
+requirements:
+ cwltool:LoadListingRequirement:
+ loadListing: shallow_listing
+ InlineJavascriptRequirement: {}
+inputs:
+ d: Directory
+outputs:
+ out: stdout
+stdout: output.txt
+arguments:
+ [echo, "${if(inputs.d.listing[0].class === 'Directory' && inputs.d.listing[0].listing === undefined) {return 'true';} else {return 'false';}}"]
out: [out]
run:
class: CommandLineTool
+ id: subtool
inputs:
sleeptime:
type: int
"run": {
"baseCommand": "sleep",
"class": "CommandLineTool",
+ "id": "#main/sleep1/subtool",
"inputs": [
{
- "id": "#main/sleep1/sleeptime",
+ "id": "#main/sleep1/subtool/sleeptime",
"inputBinding": {
"position": 1
},
],
"outputs": [
{
- "id": "#main/sleep1/out",
+ "id": "#main/sleep1/subtool/out",
"outputBinding": {
"outputEval": "out"
},
import (
"encoding/json"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
"io"
"io/ioutil"
"log"
"os/signal"
"strings"
"syscall"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
type TaskDef struct {
}
type Job struct {
- Script_parameters Tasks `json:"script_parameters"`
+ ScriptParameters Tasks `json:"script_parameters"`
}
type Task struct {
- Job_uuid string `json:"job_uuid"`
- Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
- Parameters TaskDef `json:"parameters"`
- Sequence int `json:"sequence"`
- Output string `json:"output"`
- Success bool `json:"success"`
- Progress float32 `json:"sequence"`
+ JobUUID string `json:"job_uuid"`
+ CreatedByJobTaskUUID string `json:"created_by_job_task_uuid"`
+ Parameters TaskDef `json:"parameters"`
+ Sequence int `json:"sequence"`
+ Output string `json:"output"`
+ Success bool `json:"success"`
+ Progress float32 `json:"sequence"`
}
type IArvadosClient interface {
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
-func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
+func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
tmpdir = crunchtmpdir + "/tmpdir"
err = os.Mkdir(tmpdir, 0700)
if err != nil {
func runner(api IArvadosClient,
kc IKeepClient,
- jobUuid, taskUuid, crunchtmpdir, keepmount string,
+ jobUUID, taskUUID, crunchtmpdir, keepmount string,
jobStruct Job, taskStruct Task) error {
var err error
// If this is task 0 and there are multiple tasks, dispatch subtasks
// and exit.
if taskStruct.Sequence == 0 {
- if len(jobStruct.Script_parameters.Tasks) == 1 {
- taskp = jobStruct.Script_parameters.Tasks[0]
+ if len(jobStruct.ScriptParameters.Tasks) == 1 {
+ taskp = jobStruct.ScriptParameters.Tasks[0]
} else {
- for _, task := range jobStruct.Script_parameters.Tasks {
+ for _, task := range jobStruct.ScriptParameters.Tasks {
err := api.Create("job_tasks",
map[string]interface{}{
- "job_task": Task{Job_uuid: jobUuid,
- Created_by_job_task_uuid: taskUuid,
- Sequence: 1,
- Parameters: task}},
+ "job_task": Task{
+ JobUUID: jobUUID,
+ CreatedByJobTaskUUID: taskUUID,
+ Sequence: 1,
+ Parameters: task}},
nil)
if err != nil {
return TempFail{err}
}
}
- err = api.Update("job_tasks", taskUuid,
+ err = api.Update("job_tasks", taskUUID,
map[string]interface{}{
- "job_task": Task{
- Output: "",
- Success: true,
- Progress: 1.0}},
+ "job_task": map[string]interface{}{
+ "output": "",
+ "success": true,
+ "progress": 1.0}},
nil)
return nil
}
}
var tmpdir, outdir string
- tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput)
+ tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
if err != nil {
return TempFail{err}
}
}
// Set status
- err = api.Update("job_tasks", taskUuid,
+ err = api.Update("job_tasks", taskUUID,
map[string]interface{}{
"job_task": Task{
Output: manifest,
log.Fatal(err)
}
- jobUuid := os.Getenv("JOB_UUID")
- taskUuid := os.Getenv("TASK_UUID")
+ jobUUID := os.Getenv("JOB_UUID")
+ taskUUID := os.Getenv("TASK_UUID")
tmpdir := os.Getenv("TASK_WORK")
keepmount := os.Getenv("TASK_KEEPMOUNT")
var jobStruct Job
var taskStruct Task
- err = api.Get("jobs", jobUuid, nil, &jobStruct)
+ err = api.Get("jobs", jobUUID, nil, &jobStruct)
if err != nil {
log.Fatal(err)
}
- err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+ err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
if err != nil {
log.Fatal(err)
}
}
syscall.Umask(0022)
- err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+ err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
if err == nil {
os.Exit(0)
package main
import (
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- . "gopkg.in/check.v1"
"io"
"io/ioutil"
"log"
"syscall"
"testing"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ . "gopkg.in/check.v1"
)
// Gocheck boilerplate
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"echo", "foo"}}}}},
Task{Sequence: 0})
c.Check(err, IsNil)
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{
+ Job{ScriptParameters: Tasks{[]TaskDef{
{Command: []string{"echo", "bar"}},
{Command: []string{"echo", "foo"}}}}},
Task{Parameters: TaskDef{
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"cat"},
Stdout: "output.txt",
Stdin: tmpfile.Name()}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $BAR"},
Stdout: "output.txt",
Env: map[string]string{"BAR": "foo"}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"foo\n",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $BAR"},
Stdout: "output.txt",
Env: map[string]string{"BAR": "$(task.keep)"}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $PATH"},
Stdout: "output.txt",
Env: map[string]string{"PATH": "foo"}}}}},
func (s *TestSuite) TestScheduleSubtask(c *C) {
api := SubtaskTestClient{c, []Task{
- {Job_uuid: "zzzz-8i9sb-111111111111111",
- Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
- Sequence: 1,
+ {JobUUID: "zzzz-8i9sb-111111111111111",
+ CreatedByJobTaskUUID: "zzzz-ot0gb-111111111111111",
+ Sequence: 1,
Parameters: TaskDef{
Command: []string{"echo", "bar"}}},
- {Job_uuid: "zzzz-8i9sb-111111111111111",
- Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
- Sequence: 1,
+ {JobUUID: "zzzz-8i9sb-111111111111111",
+ CreatedByJobTaskUUID: "zzzz-ot0gb-111111111111111",
+ Sequence: 1,
Parameters: TaskDef{
Command: []string{"echo", "foo"}}}},
0}
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{
+ Job{ScriptParameters: Tasks{[]TaskDef{
{Command: []string{"echo", "bar"}},
{Command: []string{"echo", "foo"}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
Task{Sequence: 0})
c.Check(err, FitsTypeOf, PermFail{})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"},
SuccessCodes: []int{0, 1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 0"},
PermanentFailCodes: []int{0, 1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"},
TemporaryFailCodes: []int{1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"ls", "output.txt"},
Vwd: map[string]string{
"output.txt": tmpfile.Name()}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
keepmount,
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"cat"},
Stdout: "output.txt",
Stdin: "$(task.keep)/file1.txt"}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
keepmount,
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"cat", "$(task.keep)/file1.txt"},
Stdout: "output.txt"}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"sleep", "4"}}}}},
Task{Sequence: 0})
c.Check(err, FitsTypeOf, PermFail{})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"echo", "foo"},
Stdout: "s ub:dir/:e vi\nl"}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{{
+ Job{ScriptParameters: Tasks{[]TaskDef{{
Command: []string{"echo", "foo"},
KeepTmpOutput: true}}}},
Task{Sequence: 0})
func (*ThrottleTestSuite) TestThrottle(c *check.C) {
uuid := "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ t0 := throttle{}
+ c.Check(t0.Check(uuid), check.Equals, true)
+ c.Check(t0.Check(uuid), check.Equals, true)
- t := throttle{}
- c.Check(t.Check(uuid), check.Equals, true)
- c.Check(t.Check(uuid), check.Equals, true)
-
- t = throttle{hold: time.Nanosecond}
- c.Check(t.Check(uuid), check.Equals, true)
+ tNs := throttle{hold: time.Nanosecond}
+ c.Check(tNs.Check(uuid), check.Equals, true)
time.Sleep(time.Microsecond)
- c.Check(t.Check(uuid), check.Equals, true)
-
- t = throttle{hold: time.Minute}
- c.Check(t.Check(uuid), check.Equals, true)
- c.Check(t.Check(uuid), check.Equals, false)
- c.Check(t.Check(uuid), check.Equals, false)
- t.seen[uuid].last = time.Now().Add(-time.Hour)
- c.Check(t.Check(uuid), check.Equals, true)
- c.Check(t.Check(uuid), check.Equals, false)
+ c.Check(tNs.Check(uuid), check.Equals, true)
+
+ tMin := throttle{hold: time.Minute}
+ c.Check(tMin.Check(uuid), check.Equals, true)
+ c.Check(tMin.Check(uuid), check.Equals, false)
+ c.Check(tMin.Check(uuid), check.Equals, false)
+ tMin.seen[uuid].last = time.Now().Add(-time.Hour)
+ c.Check(tMin.Check(uuid), check.Equals, true)
+ c.Check(tMin.Check(uuid), check.Equals, false)
}
block_size = dl.range_size
block_end = block_start + block_size
_logger.log(RANGES_SPAM,
- "%s range_start %s block_start %s range_end %s block_end %s",
+ "L&R %s range_start %s block_start %s range_end %s block_end %s",
dl.locator, range_start, block_start, range_end, block_end)
if range_end <= block_start:
# range ends before this block starts, so don't look at any more locators
last = data_locators[-1]
if (last.range_start+last.range_size) == new_range_start:
- if last.locator == new_locator:
+ if last.locator == new_locator and (last.segment_offset+last.range_size) == new_segment_offset:
# extend last segment
last.range_size += new_range_size
else:
old_segment_start = dl.range_start
old_segment_end = old_segment_start + dl.range_size
_logger.log(RANGES_SPAM,
- "%s range_start %s segment_start %s range_end %s segment_end %s",
+ "RR %s range_start %s segment_start %s range_end %s segment_end %s",
dl, new_range_start, old_segment_start, new_range_end,
old_segment_end)
if new_range_end <= old_segment_start:
pos += self._filepos
elif whence == os.SEEK_END:
pos += self.size()
- self._filepos = min(max(pos, 0L), self.size())
+ if pos < 0L:
+ raise IOError(errno.EINVAL, "Tried to seek to negative file offset.")
+ self._filepos = pos
+ return self._filepos
def tell(self):
return self._filepos
+ def readable(self):
+ return True
+
+ def writable(self):
+ return False
+
+ def seekable(self):
+ return True
+
@_FileLikeObjectBase._before_close
@retry_method
def readall(self, size=2**20, num_retries=None):
return ''.join(data).splitlines(True)
def size(self):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
def read(self, size, num_retries=None):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
def readfrom(self, start, size, num_retries=None):
- raise NotImplementedError()
+ raise IOError(errno.ENOSYS, "Not implemented")
class StreamFileReader(ArvadosFileReaderBase):
self.copies = copies
self._pending_write_size = 0
self.threads_lock = threading.Lock()
+ self.padding_block = None
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
def get_bufferblock(self, locator):
return self._bufferblocks.get(locator)
+ @synchronized
+ def get_padding_block(self):
+ """Get a bufferblock 64 MB in size consisting of all zeros, used as padding
+ when using truncate() to extend the size of a file.
+
+ For reference (and possible future optimization), the md5sum of the
+ padding block is: 7f614da9329cd3aebf59b91aadc30bf0+67108864
+
+ """
+
+ if self.padding_block is None:
+ self.padding_block = self._alloc_bufferblock(starting_capacity=config.KEEP_BLOCK_SIZE)
+ self.padding_block.write_pointer = config.KEEP_BLOCK_SIZE
+ self.commit_bufferblock(self.padding_block, False)
+ return self.padding_block
+
@synchronized
def delete_bufferblock(self, locator):
self._delete_bufferblock(locator)
@must_be_writable
@synchronized
def truncate(self, size):
- """Shrink the size of the file.
+ """Shrink or expand the size of the file.
If `size` is less than the size of the file, the file contents after
`size` will be discarded. If `size` is greater than the current size
- of the file, an IOError will be raised.
+ of the file, it will be filled with zero bytes.
"""
if size < self.size():
self._segments = new_segs
self.set_committed(False)
elif size > self.size():
- raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
+ padding = self.parent._my_block_manager().get_padding_block()
+ diff = size - self.size()
+ while diff > config.KEEP_BLOCK_SIZE:
+ self._segments.append(Range(padding.blockid, self.size(), config.KEEP_BLOCK_SIZE, 0))
+ diff -= config.KEEP_BLOCK_SIZE
+ if diff > 0:
+ self._segments.append(Range(padding.blockid, self.size(), diff, 0))
+ self.set_committed(False)
+ else:
+ # size == self.size()
+ pass
def readfrom(self, offset, size, num_retries, exact=False):
"""Read up to `size` bytes from the file starting at `offset`.
return ''.join(data)
def _repack_writes(self, num_retries):
- """Test if the buffer block has more data than actual segments.
+ """Optimize buffer block by repacking segments in file sequence.
- This happens when a buffered write over-writes a file range written in
- a previous buffered write. Re-pack the buffer block for efficiency
- and to avoid leaking information.
+ When the client makes random writes, they appear in the buffer block in
+ the sequence they were written rather than the sequence they appear in
+ the file. This makes for inefficient, fragmented manifests. Attempt
+ to optimize by repacking writes in file sequence.
"""
segs = self._segments
- # Sum up the segments to get the total bytes of the file referencing
- # into the buffer block.
+ # Collect the segments that reference the buffer block.
bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
+
+ # Collect total data referenced by segments (could be smaller than
+ # bufferblock size if a portion of the file was written and
+ # then overwritten).
write_total = sum([s.range_size for s in bufferblock_segs])
- if write_total < self._current_bblock.size():
- # There is more data in the buffer block than is actually accounted for by segments, so
- # re-pack into a new buffer by copying over to a new buffer block.
+ if write_total < self._current_bblock.size() or len(bufferblock_segs) > 1:
+ # If there's more than one segment referencing this block, it is
+ # due to out-of-order writes and will produce a fragmented
+ # manifest, so try to optimize by re-packing into a new buffer.
contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
for t in bufferblock_segs:
return
if offset > self.size():
- raise ArgumentError("Offset is past the end of the file")
+ self.truncate(offset)
if len(data) > config.KEEP_BLOCK_SIZE:
# Chunk it up into smaller writes
self.mode = mode
self.arvadosfile.add_writer(self)
+ def writable(self):
+ return True
+
@_FileLikeObjectBase._before_close
@retry_method
def write(self, data, num_retries=None):
if size is None:
size = self._filepos
self.arvadosfile.truncate(size)
- if self._filepos > self.size():
- self._filepos = self.size()
@_FileLikeObjectBase._before_close
def flush(self):
import arvados
import arvados.commands._util as arv_cmd
+import arvados.util as util
from arvados._version import __version__
overwritten. This option causes even devices, sockets, and fifos to be
skipped.
""")
+group.add_argument('--strip-manifest', action='store_true', default=False,
+ help="""
+When getting a collection manifest, strip its access tokens before writing
+it.
+""")
def parse_arguments(arguments, stdout, stderr):
args = parser.parse_args(arguments)
api_client = arvados.api('v1')
r = re.search(r'^(.*?)(/.*)?$', args.locator)
- collection = r.group(1)
+ col_loc = r.group(1)
get_prefix = r.group(2)
if args.r and not get_prefix:
get_prefix = os.sep
try:
- reader = arvados.CollectionReader(collection, num_retries=args.retries)
+ reader = arvados.CollectionReader(col_loc, num_retries=args.retries)
except Exception as error:
logger.error("failed to read collection: {}".format(error))
return 1
open_flags |= os.O_EXCL
try:
if args.destination == "-":
- stdout.write(reader.manifest_text())
+ stdout.write(reader.manifest_text(strip=args.strip_manifest))
else:
out_fd = os.open(args.destination, open_flags)
with os.fdopen(out_fd, 'wb') as out_file:
- out_file.write(reader.manifest_text())
+ out_file.write(reader.manifest_text(strip=args.strip_manifest))
except (IOError, OSError) as error:
logger.error("can't write to '{}': {}".format(args.destination, error))
return 1
except (arvados.errors.ApiError, arvados.errors.KeepReadError) as error:
- logger.error("failed to download '{}': {}".format(collection, error))
+ logger.error("failed to download '{}': {}".format(col_loc, error))
return 1
return 0
migrate19_parser.add_argument(
'--version', action='version', version="%s %s" % (sys.argv[0], __version__),
help='Print version and exit.')
+ migrate19_parser.add_argument(
+ '--verbose', action="store_true", help="Print stdout/stderr even on success")
+ migrate19_parser.add_argument(
+ '--force', action="store_true", help="Try to migrate even if there isn't enough space")
+
+ migrate19_parser.add_argument(
+ '--storage-driver', type=str, default="overlay",
+ help="Docker storage driver, e.g. aufs, overlay, vfs")
exgroup = migrate19_parser.add_mutually_exclusive_group()
exgroup.add_argument(
if args.tempdir:
tempfile.tempdir = args.tempdir
+ if args.verbose:
+ logger.setLevel(logging.DEBUG)
+
only_migrate = None
if args.infile:
only_migrate = set()
uuid_to_collection = {i["uuid"]: i for i in items}
need_migrate = {}
+ totalbytes = 0
biggest = 0
+ biggest_pdh = None
for img in old_images:
i = uuid_to_collection[img["collection"]]
pdh = i["portable_data_hash"]
- if pdh not in already_migrated and (only_migrate is None or pdh in only_migrate):
+ if pdh not in already_migrated and pdh not in need_migrate and (only_migrate is None or pdh in only_migrate):
need_migrate[pdh] = img
with CollectionReader(i["manifest_text"]) as c:
if c.values()[0].size() > biggest:
biggest = c.values()[0].size()
+ biggest_pdh = pdh
+ totalbytes += c.values()[0].size()
+
+
+ if args.storage_driver == "vfs":
+ will_need = (biggest*20)
+ else:
+ will_need = (biggest*2.5)
if args.print_unmigrated:
only_migrate = set()
for pdh in need_migrate:
- print pdh
+ print(pdh)
return
logger.info("Already migrated %i images", len(already_migrated))
logger.info("Need to migrate %i images", len(need_migrate))
logger.info("Using tempdir %s", tempfile.gettempdir())
- logger.info("Biggest image is about %i MiB, tempdir needs at least %i MiB free", biggest/(2**20), (biggest*2)/(2**20))
+ logger.info("Biggest image %s is about %i MiB", biggest_pdh, biggest/(2**20))
+ logger.info("Total data to migrate about %i MiB", totalbytes/(2**20))
+
+ df_out = subprocess.check_output(["df", "-B1", tempfile.gettempdir()])
+ ln = df_out.splitlines()[1]
+ filesystem, blocks, used, available, use_pct, mounted = re.match(r"^([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+) *([^ ]+)", ln).groups(1)
+ if int(available) <= will_need:
+ logger.warn("Temp filesystem mounted at %s does not have enough space for biggest image (has %i MiB, needs %i MiB)", mounted, int(available)/(2**20), will_need/(2**20))
+ if not args.force:
+ exit(1)
+ else:
+ logger.warn("--force provided, will migrate anyway")
if args.dry_run:
return
dockercache = tempfile.mkdtemp()
try:
with tempfile.NamedTemporaryFile() as envfile:
- envfile.write("ARVADOS_API_HOST=%s\n" % (os.environ["ARVADOS_API_HOST"]))
- envfile.write("ARVADOS_API_TOKEN=%s\n" % (os.environ["ARVADOS_API_TOKEN"]))
- if "ARVADOS_API_HOST_INSECURE" in os.environ:
- envfile.write("ARVADOS_API_HOST_INSECURE=%s\n" % (os.environ["ARVADOS_API_HOST_INSECURE"]))
+ envfile.write("ARVADOS_API_HOST=%s\n" % (arvados.config.get("ARVADOS_API_HOST")))
+ envfile.write("ARVADOS_API_TOKEN=%s\n" % (arvados.config.get("ARVADOS_API_TOKEN")))
+ if arvados.config.get("ARVADOS_API_HOST_INSECURE"):
+ envfile.write("ARVADOS_API_HOST_INSECURE=%s\n" % (arvados.config.get("ARVADOS_API_HOST_INSECURE")))
envfile.flush()
dockercmd = ["docker", "run",
"--env-file", envfile.name,
"--volume", "%s:/var/lib/docker" % varlibdocker,
"--volume", "%s:/root/.cache/arvados/docker" % dockercache,
- "arvados/migrate-docker19",
+ "arvados/migrate-docker19:1.0",
"/root/migrate.sh",
"%s/%s" % (old_image["collection"], tarfile),
tarfile[0:40],
old_image["repo"],
old_image["tag"],
- uuid_to_collection[old_image["collection"]]["owner_uuid"]]
+ uuid_to_collection[old_image["collection"]]["owner_uuid"],
+ args.storage_driver]
proc = subprocess.Popen(dockercmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = proc.communicate()
+ initial_space = re.search(r"Initial available space is (\d+)", out)
+ imgload_space = re.search(r"Available space after image load is (\d+)", out)
+ imgupgrade_space = re.search(r"Available space after image upgrade is (\d+)", out)
+ keepdocker_space = re.search(r"Available space after arv-keepdocker is (\d+)", out)
+ cleanup_space = re.search(r"Available space after cleanup is (\d+)", out)
+
+ if initial_space:
+ isp = int(initial_space.group(1))
+ logger.info("Available space initially: %i MiB", (isp)/(2**20))
+ if imgload_space:
+ sp = int(imgload_space.group(1))
+ logger.debug("Used after load: %i MiB", (isp-sp)/(2**20))
+ if imgupgrade_space:
+ sp = int(imgupgrade_space.group(1))
+ logger.debug("Used after upgrade: %i MiB", (isp-sp)/(2**20))
+ if keepdocker_space:
+ sp = int(keepdocker_space.group(1))
+ logger.info("Used after upload: %i MiB", (isp-sp)/(2**20))
+
+ if cleanup_space:
+ sp = int(cleanup_space.group(1))
+ logger.debug("Available after cleanup: %i MiB", (sp)/(2**20))
+
if proc.returncode != 0:
logger.error("Failed with return code %i", proc.returncode)
logger.error("--- Stdout ---\n%s", out)
logger.error("--- Stderr ---\n%s", err)
raise MigrationFailed()
+ if args.verbose:
+ logger.info("--- Stdout ---\n%s", out)
+ logger.info("--- Stderr ---\n%s", err)
+
migrated = re.search(r"Migrated uuid is ([a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15})", out)
if migrated:
newcol = CollectionReader(migrated.group(1))
with self._state_lock:
self._state['manifest'] = manifest
if self.use_cache:
- self._save_state()
+ try:
+ self._save_state()
+ except Exception as e:
+ self.logger.error("Unexpected error trying to save cache file: {}".format(e))
else:
self.bytes_written = self.bytes_skipped
# Call the reporter, if any
cache_filepath = os.path.join(
arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
cache_filename)
- if self.resume:
+ if self.resume and os.path.exists(cache_filepath):
+ self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'a+')
else:
# --no-resume means start with a empty cache file.
+ self.logger.info("Creating new cache file at {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'w+')
self._cache_filename = self._cache_file.name
self._lock_file(self._cache_file)
# Cache file empty, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
else:
+ self.logger.info("No cache usage requested for this run.")
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
"""
Atomically save current state into cache.
"""
+ with self._state_lock:
+ # We're not using copy.deepcopy() here because it's a lot slower
+ # than json.dumps(), and we're already needing JSON format to be
+ # saved on disk.
+ state = json.dumps(self._state)
try:
- with self._state_lock:
- # We're not using copy.deepcopy() here because it's a lot slower
- # than json.dumps(), and we're already needing JSON format to be
- # saved on disk.
- state = json.dumps(self._state)
- new_cache_fd, new_cache_name = tempfile.mkstemp(
- dir=os.path.dirname(self._cache_filename))
- self._lock_file(new_cache_fd)
- new_cache = os.fdopen(new_cache_fd, 'r+')
+ new_cache = tempfile.NamedTemporaryFile(
+ dir=os.path.dirname(self._cache_filename), delete=False)
+ self._lock_file(new_cache)
new_cache.write(state)
new_cache.flush()
os.fsync(new_cache)
- os.rename(new_cache_name, self._cache_filename)
+ os.rename(new_cache.name, self._cache_filename)
except (IOError, OSError, ResumeCacheConflict) as error:
self.logger.error("There was a problem while saving the cache file: {}".format(error))
try:
# -*- coding: utf-8 -*-
import io
+import os
+import re
import shutil
import tempfile
shutil.rmtree(self.tempdir)
def write_test_collection(self,
+ strip_manifest=False,
contents = {
'foo.txt' : 'foo',
'bar.txt' : 'bar',
with c.open(path, 'w') as f:
f.write(data)
c.save_new()
- return (c.manifest_locator(), c.portable_data_hash(), c.manifest_text())
+ return (c.manifest_locator(),
+ c.portable_data_hash(),
+ c.manifest_text(strip=strip_manifest))
def run_get(self, args):
self.stdout = io.BytesIO()
# Download the entire collection to the temp directory
r = self.run_get(["{}/".format(self.col_loc), self.tempdir])
self.assertEqual(0, r)
- with open("{}/foo.txt".format(self.tempdir), "r") as f:
+ with open(os.path.join(self.tempdir, "foo.txt"), "r") as f:
self.assertEqual("foo", f.read())
- with open("{}/bar.txt".format(self.tempdir), "r") as f:
+ with open(os.path.join(self.tempdir, "bar.txt"), "r") as f:
self.assertEqual("bar", f.read())
- with open("{}/subdir/baz.txt".format(self.tempdir), "r") as f:
+ with open(os.path.join(self.tempdir, "subdir", "baz.txt"), "r") as f:
self.assertEqual("baz", f.read())
- def test_get_collection_manifest(self):
- # Get the collection manifest
+ def test_get_collection_unstripped_manifest(self):
+ dummy_token = "+Axxxxxxx"
+ # Get the collection manifest by UUID
r = self.run_get([self.col_loc, self.tempdir])
self.assertEqual(0, r)
- with open("{}/{}".format(self.tempdir, self.col_loc), "r") as f:
- self.assertEqual(self.col_manifest, f.read())
+ m_from_collection = re.sub(r"\+A[0-9a-f@]+", dummy_token, self.col_manifest)
+ with open(os.path.join(self.tempdir, self.col_loc), "r") as f:
+ # Replace manifest tokens before comparison to avoid races
+ m_from_file = re.sub(r"\+A[0-9a-f@]+", dummy_token, f.read())
+ self.assertEqual(m_from_collection, m_from_file)
+ # Get the collection manifest by PDH
+ r = self.run_get([self.col_pdh, self.tempdir])
+ self.assertEqual(0, r)
+ with open(os.path.join(self.tempdir, self.col_pdh), "r") as f:
+ # Replace manifest tokens before comparison to avoid races
+ m_from_file = re.sub(r"\+A[0-9a-f@]+", dummy_token, f.read())
+ self.assertEqual(m_from_collection, m_from_file)
+
+ def test_get_collection_stripped_manifest(self):
+ col_loc, col_pdh, col_manifest = self.write_test_collection(strip_manifest=True)
+ # Get the collection manifest by UUID
+ r = self.run_get(['--strip-manifest', col_loc, self.tempdir])
+ self.assertEqual(0, r)
+ with open(os.path.join(self.tempdir, col_loc), "r") as f:
+ self.assertEqual(col_manifest, f.read())
+ # Get the collection manifest by PDH
+ r = self.run_get(['--strip-manifest', col_pdh, self.tempdir])
+ self.assertEqual(0, r)
+ with open(os.path.join(self.tempdir, col_pdh), "r") as f:
+ self.assertEqual(col_manifest, f.read())
def test_invalid_collection(self):
# Asking for an invalid collection should generate an error.
def test_preexistent_destination(self):
# Asking to place a file with the same path as a local one should
# generate an error and avoid overwrites.
- with open("{}/foo.txt".format(self.tempdir), "w") as f:
+ with open(os.path.join(self.tempdir, "foo.txt"), "w") as f:
f.write("another foo")
r = self.run_get(["{}/foo.txt".format(self.col_loc), self.tempdir])
self.assertNotEqual(0, r)
- with open("{}/foo.txt".format(self.tempdir), "r") as f:
+ with open(os.path.join(self.tempdir, "foo.txt"), "r") as f:
self.assertEqual("another foo", f.read())
self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
self.assertFalse(c.modified())
+
+ def test_truncate2(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
+ api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate2",
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 7f614da9329cd3aebf59b91aadc30bf0+67108864 0:12:count.txt\n",
+ "replication_desired":None},
+ {"uuid":"zzzzz-4zz18-mockcollection0",
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 7f614da9329cd3aebf59b91aadc30bf0+67108864 0:12:count.txt\n",
+ "portable_data_hash":"272da898abdf86ddc71994835e3155f8+95"})
+ with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
+ api_client=api, keep_client=keep) as c:
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 10)
+ self.assertEqual("0123456789", writer.read(12))
+
+ # extend file size
+ writer.truncate(12)
+
+ self.assertEqual(writer.size(), 12)
+ writer.seek(0, os.SEEK_SET)
+ self.assertEqual(b"0123456789\x00\x00", writer.read(12))
+
+ self.assertIsNone(c.manifest_locator())
+ self.assertTrue(c.modified())
+ c.save_new("test_truncate2")
+ self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
+ self.assertFalse(c.modified())
+
+ def test_truncate3(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789",
+ "a925576942e94b2ef57a066101b48876+10": "abcdefghij"})
+ api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate",
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n",
+ "replication_desired":None},
+ {"uuid":"zzzzz-4zz18-mockcollection0",
+ "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n",
+ "portable_data_hash":"7fcd0eaac3aad4c31a6a0e756475da92+52"})
+ with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 a925576942e94b2ef57a066101b48876+10 0:20:count.txt\n',
+ api_client=api, keep_client=keep) as c:
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 20)
+ self.assertEqual("0123456789ab", writer.read(12))
+ self.assertEqual(12, writer.tell())
+
+ writer.truncate(8)
+
+ # Make sure reading off the end doesn't break
+ self.assertEqual(12, writer.tell())
+ self.assertEqual("", writer.read(12))
+
+ self.assertEqual(writer.size(), 8)
+ self.assertEqual(2, writer.seek(-10, os.SEEK_CUR))
+ self.assertEqual("234567", writer.read(12))
+
+ self.assertIsNone(c.manifest_locator())
+ self.assertTrue(c.modified())
+ c.save_new("test_truncate")
+ self.assertEqual("zzzzz-4zz18-mockcollection0", c.manifest_locator())
+ self.assertFalse(c.modified())
+
+
+
def test_write_to_end(self):
keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_append",
writer = c.open("count.txt", "r+")
self.assertEqual(writer.size(), 10)
- writer.seek(5, os.SEEK_SET)
+ self.assertEqual(5, writer.seek(5, os.SEEK_SET))
self.assertEqual("56789", writer.read(8))
writer.seek(10, os.SEEK_SET)
self.assertEqual(c.manifest_text(), ". 781e5e245d69b566979b86e28d23f2c7+10 48dd23ea1645fd47d789804d71b5bb8e+67108864 77c57dc6ac5a10bb2205caaa73187994+32891126 0:100000000:count.txt\n")
+ def test_sparse_write(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ api = ArvadosFileWriterTestCase.MockApi({}, {})
+ with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
+ api_client=api, keep_client=keep) as c:
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 0)
+
+ text = "0123456789"
+ writer.seek(2)
+ writer.write(text)
+ self.assertEqual(writer.size(), 12)
+ writer.seek(0, os.SEEK_SET)
+ self.assertEqual(writer.read(), b"\x00\x00"+text)
+
+ self.assertEqual(c.manifest_text(), ". 7f614da9329cd3aebf59b91aadc30bf0+67108864 781e5e245d69b566979b86e28d23f2c7+10 0:2:count.txt 67108864:10:count.txt\n")
+
+
+ def test_sparse_write2(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ api = ArvadosFileWriterTestCase.MockApi({}, {})
+ with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
+ api_client=api, keep_client=keep) as c:
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 0)
+
+ text = "0123456789"
+ writer.seek((arvados.config.KEEP_BLOCK_SIZE*2) + 2)
+ writer.write(text)
+ self.assertEqual(writer.size(), (arvados.config.KEEP_BLOCK_SIZE*2) + 12)
+ writer.seek(0, os.SEEK_SET)
+
+ self.assertEqual(c.manifest_text(), ". 7f614da9329cd3aebf59b91aadc30bf0+67108864 781e5e245d69b566979b86e28d23f2c7+10 0:67108864:count.txt 0:67108864:count.txt 0:2:count.txt 67108864:10:count.txt\n")
+
+
+ def test_sparse_write3(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ api = ArvadosFileWriterTestCase.MockApi({}, {})
+ for r in [[0, 1, 2, 3, 4], [4, 3, 2, 1, 0], [3, 2, 0, 4, 1]]:
+ with Collection() as c:
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 0)
+
+ for i in r:
+ w = ("%s" % i) * 10
+ writer.seek(i*10)
+ writer.write(w)
+ writer.seek(0)
+ self.assertEqual(writer.read(), "00000000001111111111222222222233333333334444444444")
+
+ def test_sparse_write4(self):
+ keep = ArvadosFileWriterTestCase.MockKeep({})
+ api = ArvadosFileWriterTestCase.MockApi({}, {})
+ for r in [[0, 1, 2, 4], [4, 2, 1, 0], [2, 0, 4, 1]]:
+ with Collection() as c:
+ writer = c.open("count.txt", "r+")
+ self.assertEqual(writer.size(), 0)
+
+ for i in r:
+ w = ("%s" % i) * 10
+ writer.seek(i*10)
+ writer.write(w)
+ writer.seek(0)
+ self.assertEqual(writer.read(), b"000000000011111111112222222222\x00\x00\x00\x00\x00\x00\x00\x00\x00\x004444444444")
+
+
def test_rewrite_on_empty_file(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
def test_write_large_rewrite(self):
keep = ArvadosFileWriterTestCase.MockKeep({})
api = ArvadosFileWriterTestCase.MockApi({"name":"test_write_large",
- "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n",
+ "manifest_text": ". 3dc0d4bc21f48060bedcb2c91af4f906+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 0:3:count.txt 32892006:67107997:count.txt 3:32892000:count.txt\n",
"replication_desired":None},
{"uuid":"zzzzz-4zz18-mockcollection0",
- "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n",
+ "manifest_text": ". 3dc0d4bc21f48060bedcb2c91af4f906+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 0:3:count.txt 32892006:67107997:count.txt 3:32892000:count.txt\n",
"portable_data_hash":"217665c6b713e1b78dfba7ebd42344db+156"})
with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
api_client=api, keep_client=keep) as c:
def test_seek_min_zero(self):
sfile = self.make_count_reader()
- sfile.seek(-2, os.SEEK_SET)
+ self.assertEqual(0, sfile.tell())
+ with self.assertRaises(IOError):
+ sfile.seek(-2, os.SEEK_SET)
self.assertEqual(0, sfile.tell())
def test_seek_max_size(self):
sfile = self.make_count_reader()
sfile.seek(2, os.SEEK_END)
- self.assertEqual(9, sfile.tell())
+ # POSIX permits seeking past end of file.
+ self.assertEqual(11, sfile.tell())
def test_size(self):
self.assertEqual(9, self.make_count_reader().size())
+require 'tempfile'
+
class Node < ArvadosModel
include HasUuid
include KindAndEtag
}
if Rails.configuration.dns_server_conf_dir and Rails.configuration.dns_server_conf_template
+ tmpfile = nil
begin
begin
template = IO.read(Rails.configuration.dns_server_conf_template)
- rescue => e
+ rescue IOError, SystemCallError => e
logger.error "Reading #{Rails.configuration.dns_server_conf_template}: #{e.message}"
raise
end
hostfile = File.join Rails.configuration.dns_server_conf_dir, "#{hostname}.conf"
- File.open hostfile+'.tmp', 'w' do |f|
+ Tempfile.open(["#{hostname}-", ".conf.tmp"],
+ Rails.configuration.dns_server_conf_dir) do |f|
+ tmpfile = f.path
f.puts template % template_vars
end
- File.rename hostfile+'.tmp', hostfile
- rescue => e
+ File.rename tmpfile, hostfile
+ rescue IOError, SystemCallError => e
logger.error "Writing #{hostfile}: #{e.message}"
ok = false
+ ensure
+ if tmpfile and File.file? tmpfile
+ # Cleanup remaining temporary file.
+ File.unlink tmpfile
+ end
end
end
# Typically, this is used to trigger a dns server restart
f.puts Rails.configuration.dns_server_reload_command
end
- rescue => e
+ rescue IOError, SystemCallError => e
logger.error "Unable to write #{restartfile}: #{e.message}"
ok = false
end
--- /dev/null
+class AddCreatedByJobTaskIndexToJobTasks < ActiveRecord::Migration
+ def change
+ add_index :job_tasks, :created_by_job_task_uuid
+ end
+end
--- /dev/null
+class AddObjectOwnerIndexToLogs < ActiveRecord::Migration
+ def change
+ add_index :logs, :object_owner_uuid
+ end
+end
--- /dev/null
+class AddRequestingContainerIndexToContainerRequests < ActiveRecord::Migration
+ def change
+ add_index :container_requests, :requesting_container_uuid
+ end
+end
CREATE INDEX index_container_requests_on_owner_uuid ON container_requests USING btree (owner_uuid);
+--
+-- Name: index_container_requests_on_requesting_container_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace:
+--
+
+CREATE INDEX index_container_requests_on_requesting_container_uuid ON container_requests USING btree (requesting_container_uuid);
+
+
--
-- Name: index_container_requests_on_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
CREATE INDEX index_job_tasks_on_created_at ON job_tasks USING btree (created_at);
+--
+-- Name: index_job_tasks_on_created_by_job_task_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace:
+--
+
+CREATE INDEX index_job_tasks_on_created_by_job_task_uuid ON job_tasks USING btree (created_by_job_task_uuid);
+
+
--
-- Name: index_job_tasks_on_job_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
CREATE INDEX index_logs_on_modified_at ON logs USING btree (modified_at);
+--
+-- Name: index_logs_on_object_owner_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace:
+--
+
+CREATE INDEX index_logs_on_object_owner_uuid ON logs USING btree (object_owner_uuid);
+
+
--
-- Name: index_logs_on_object_uuid; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
INSERT INTO schema_migrations (version) VALUES ('20170328215436');
INSERT INTO schema_migrations (version) VALUES ('20170330012505');
+
+INSERT INTO schema_migrations (version) VALUES ('20170419173031');
+
+INSERT INTO schema_migrations (version) VALUES ('20170419173712');
+
+INSERT INTO schema_migrations (version) VALUES ('20170419175801');
+
require 'test_helper'
+require 'tmpdir'
+require 'tempfile'
class NodeTest < ActiveSupport::TestCase
def ping_node(node_name, ping_data)
assert Node.dns_server_update 'compute65535', '127.0.0.127'
end
+ test "don't leave temp files behind if there's an error writing them" do
+ Rails.configuration.dns_server_conf_template = Rails.root.join 'config', 'unbound.template'
+ Tempfile.any_instance.stubs(:puts).raises(IOError)
+ Dir.mktmpdir do |tmpdir|
+ Rails.configuration.dns_server_conf_dir = tmpdir
+ refute Node.dns_server_update 'compute65535', '127.0.0.127'
+ assert_empty Dir.entries(tmpdir).select{|f| File.file? f}
+ end
+ end
+
test "ping new node with no hostname and default config" do
node = ping_node(:new_with_no_hostname, {})
slot_number = node.slot_number
package main
import (
+ "bytes"
"context"
"encoding/json"
"errors"
loggingDone chan bool
CrunchLog *ThrottledLogger
Stdout io.WriteCloser
- Stderr *ThrottledLogger
+ Stderr io.WriteCloser
LogCollection *CollectionWriter
LogsPDH *string
RunArvMount
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
- if bind == "stdout" {
+ if bind == "stdout" || bind == "stderr" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
- return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind)
+ return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
}
// Does path start with OutputPath?
prefix += "/"
}
if !strings.HasPrefix(mnt.Path, prefix) {
- return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
+ return fmt.Errorf("%s path does not start with OutputPath: %s, %s", strings.Title(bind), mnt.Path, prefix)
+ }
+ }
+
+ if bind == "stdin" {
+ // Is it a "collection" mount kind?
+ if mnt.Kind != "collection" && mnt.Kind != "json" {
+ return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
}
}
}
switch {
- case mnt.Kind == "collection":
+ case mnt.Kind == "collection" && bind != "stdin":
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
return nil
}
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
+// AttachStreams connects the docker container stdin, stdout and stderr logs
+// to the Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachStreams() (err error) {
runner.CrunchLog.Print("Attaching container streams")
+ // If stdin mount is provided, attach it to the docker container
+ var stdinRdr keepclient.Reader
+ var stdinJson []byte
+ if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
+ if stdinMnt.Kind == "collection" {
+ var stdinColl arvados.Collection
+ collId := stdinMnt.UUID
+ if collId == "" {
+ collId = stdinMnt.PortableDataHash
+ }
+ err = runner.ArvClient.Get("collections", collId, nil, &stdinColl)
+ if err != nil {
+ return fmt.Errorf("While getting stding collection: %v", err)
+ }
+
+ stdinRdr, err = runner.Kc.ManifestFileReader(manifest.Manifest{Text: stdinColl.ManifestText}, stdinMnt.Path)
+ if os.IsNotExist(err) {
+ return fmt.Errorf("stdin collection path not found: %v", stdinMnt.Path)
+ } else if err != nil {
+ return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
+ }
+ } else if stdinMnt.Kind == "json" {
+ stdinJson, err = json.Marshal(stdinMnt.Content)
+ if err != nil {
+ return fmt.Errorf("While encoding stdin json data: %v", err)
+ }
+ }
+ }
+
+ stdinUsed := stdinRdr != nil || len(stdinJson) != 0
response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
- dockertypes.ContainerAttachOptions{Stream: true, Stdout: true, Stderr: true})
+ dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
return fmt.Errorf("While attaching container stdout/stderr streams: %v", err)
}
runner.loggingDone = make(chan bool)
if stdoutMnt, ok := runner.Container.Mounts["stdout"]; ok {
- stdoutPath := stdoutMnt.Path[len(runner.Container.OutputPath):]
- index := strings.LastIndex(stdoutPath, "/")
- if index > 0 {
- subdirs := stdoutPath[:index]
- if subdirs != "" {
- st, err := os.Stat(runner.HostOutputDir)
- if err != nil {
- return fmt.Errorf("While Stat on temp dir: %v", err)
- }
- stdoutPath := path.Join(runner.HostOutputDir, subdirs)
- err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
- if err != nil {
- return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
- }
- }
- }
- stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ stdoutFile, err := runner.getStdoutFile(stdoutMnt.Path)
if err != nil {
- return fmt.Errorf("While creating stdout file: %v", err)
+ return err
}
runner.Stdout = stdoutFile
} else {
runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
}
- runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+ if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
+ stderrFile, err := runner.getStdoutFile(stderrMnt.Path)
+ if err != nil {
+ return err
+ }
+ runner.Stderr = stderrFile
+ } else {
+ runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ }
+
+ if stdinRdr != nil {
+ go func() {
+ _, err := io.Copy(response.Conn, stdinRdr)
+ if err != nil {
+ runner.CrunchLog.Print("While writing stdin collection to docker container %q", err)
+ runner.stop()
+ }
+ stdinRdr.Close()
+ response.CloseWrite()
+ }()
+ } else if len(stdinJson) != 0 {
+ go func() {
+ _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+ if err != nil {
+ runner.CrunchLog.Print("While writing stdin json to docker container %q", err)
+ runner.stop()
+ }
+ response.CloseWrite()
+ }()
+ }
go runner.ProcessDockerAttach(response.Reader)
return nil
}
+func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) {
+ stdoutPath := mntPath[len(runner.Container.OutputPath):]
+ index := strings.LastIndex(stdoutPath, "/")
+ if index > 0 {
+ subdirs := stdoutPath[:index]
+ if subdirs != "" {
+ st, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return nil, fmt.Errorf("While Stat on temp dir: %v", err)
+ }
+ stdoutPath := path.Join(runner.HostOutputDir, subdirs)
+ err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777)
+ if err != nil {
+ return nil, fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err)
+ }
+ }
+ }
+ stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath))
+ if err != nil {
+ return nil, fmt.Errorf("While creating file %q: %v", stdoutPath, err)
+ }
+
+ return stdoutFile, nil
+}
+
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer() error {
runner.CrunchLog.Print("Creating Docker container")
}
}
+ _, stdinUsed := runner.Container.Mounts["stdin"]
+ runner.ContainerConfig.OpenStdin = stdinUsed
+ runner.ContainerConfig.StdinOnce = stdinUsed
+ runner.ContainerConfig.AttachStdin = stdinUsed
+ runner.ContainerConfig.AttachStdout = true
+ runner.ContainerConfig.AttachStderr = true
+
createdBody, err := runner.Docker.ContainerCreate(context.TODO(), &runner.ContainerConfig, &runner.HostConfig, nil, runner.Container.UUID)
if err != nil {
return fmt.Errorf("While creating container: %v", err)
"fmt"
"io"
"io/ioutil"
+ "net"
"os"
"os/exec"
"path/filepath"
return t
}
+type MockConn struct {
+ net.Conn
+}
+
+func (m *MockConn) Write(b []byte) (int, error) {
+ return len(b), nil
+}
+
+func NewMockConn() *MockConn {
+ c := &MockConn{}
+ return c
+}
+
func (t *TestDockerClient) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
- return dockertypes.HijackedResponse{Reader: bufio.NewReader(t.logReader)}, nil
+ return dockertypes.HijackedResponse{Conn: NewMockConn(), Reader: bufio.NewReader(t.logReader)}, nil
}
func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
rdr := ioutil.NopCloser(&bytes.Buffer{})
client.Called = true
return FileWrapper{rdr, 1321984}, nil
+ } else if filename == "/file1_in_main.txt" {
+ rdr := ioutil.NopCloser(strings.NewReader("foo"))
+ client.Called = true
+ return FileWrapper{rdr, 3}, nil
}
return nil, nil
}
cr.CleanupDirs()
checkEmpty()
}
+
+ // Only mount point of kind 'collection' is allowed for stdin
+ {
+ i = 0
+ cr.ArvMountPoint = ""
+ cr.Container.Mounts = make(map[string]arvados.Mount)
+ cr.Container.Mounts = map[string]arvados.Mount{
+ "stdin": {Kind: "tmp"},
+ }
+
+ err := cr.SetupMounts()
+ c.Check(err, NotNil)
+ c.Check(err, ErrorMatches, `Unsupported mount kind 'tmp' for stdin.*`)
+ cr.CleanupDirs()
+ checkEmpty()
+ }
}
func (s *TestSuite) TestStdout(c *C) {
}
}
}
+
+func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "stdin": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/file1_in_main.txt"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ extraMounts := []string{
+ "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
+ }
+
+ api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ manifest := collection["manifest_text"].(string)
+ c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+ }
+ }
+ }
+}
+
+func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {
+ "/tmp": {"kind": "tmp"},
+ "stdin": {"kind": "json", "content": "foo"},
+ "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
+
+ api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ for _, v := range api.Content {
+ if v["collection"] != nil {
+ collection := v["collection"].(arvadosclient.Dict)
+ if strings.Index(collection["name"].(string), "output") == 0 {
+ manifest := collection["manifest_text"].(string)
+ c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+`)
+ }
+ }
+ }
+}
+
+func (s *TestSuite) TestStderrMount(c *C) {
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo hello;exit 1"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"},
+ "stdout": {"kind": "file", "path": "/tmp/a/out.txt"},
+ "stderr": {"kind": "file", "path": "/tmp/b/err.txt"}},
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 1, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello\n"))
+ t.logWriter.Write(dockerLog(2, "oops\n"))
+ t.logWriter.Close()
+ })
+
+ final := api.CalledWith("container.state", "Complete")
+ c.Assert(final, NotNil)
+ c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+ c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
+
+ c.Check(api.CalledWith("collection.manifest_text", "./a b1946ac92492d2347c6235b4d2611184+6 0:6:out.txt\n./b 38af5c54926b620264ab1501150cf189+5 0:5:err.txt\n"), NotNil)
+}
httpserver.Log(remoteAddr, statusCode, statusText, w.WroteBodyBytes(), r.Method, r.Host, r.URL.Path, r.URL.RawQuery)
}()
+ if r.Method == "OPTIONS" {
+ method := r.Header.Get("Access-Control-Request-Method")
+ if method != "GET" && method != "POST" {
+ statusCode = http.StatusMethodNotAllowed
+ return
+ }
+ w.Header().Set("Access-Control-Allow-Headers", "Range")
+ w.Header().Set("Access-Control-Allow-Methods", "GET, POST")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ w.Header().Set("Access-Control-Max-Age", "86400")
+ statusCode = http.StatusOK
+ return
+ }
+
if r.Method != "GET" && r.Method != "POST" {
statusCode, statusText = http.StatusMethodNotAllowed, r.Method
return
type UnitSuite struct{}
+func (s *UnitSuite) TestCORSPreflight(c *check.C) {
+ h := handler{Config: &Config{}}
+ u, _ := url.Parse("http://keep-web.example/c=" + arvadostest.FooCollection + "/foo")
+ req := &http.Request{
+ Method: "OPTIONS",
+ Host: u.Host,
+ URL: u,
+ RequestURI: u.RequestURI(),
+ Header: http.Header{
+ "Origin": {"https://workbench.example"},
+ "Access-Control-Request-Method": {"POST"},
+ },
+ }
+
+ // Check preflight for an allowed request
+ resp := httptest.NewRecorder()
+ h.ServeHTTP(resp, req)
+ c.Check(resp.Code, check.Equals, http.StatusOK)
+ c.Check(resp.Body.String(), check.Equals, "")
+ c.Check(resp.Header().Get("Access-Control-Allow-Origin"), check.Equals, "*")
+ c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "GET, POST")
+ c.Check(resp.Header().Get("Access-Control-Allow-Headers"), check.Equals, "Range")
+
+ // Check preflight for a disallowed request
+ resp = httptest.NewRecorder()
+ req.Header.Set("Access-Control-Request-Method", "DELETE")
+ h.ServeHTTP(resp, req)
+ c.Check(resp.Body.String(), check.Equals, "")
+ c.Check(resp.Code, check.Equals, http.StatusMethodNotAllowed)
+}
+
func mustParseURL(s string) *url.URL {
r, err := url.Parse(s)
if err != nil {
'node_stale_after': str(60 * 60 * 2),
'watchdog': '600',
'node_mem_scaling': '0.95'},
+ 'Manage': {'address': '127.0.0.1',
+ 'port': '-1'},
'Logging': {'file': '/dev/stderr',
'level': 'WARNING'},
}.iteritems():
import pykka
from . import computenode as cnode
+from . import status
from .computenode import dispatch
from .config import actor_class
states.append("shutdown")
return states + pykka.get_all(proxy_states)
+ def _update_tracker(self):
+ updates = {
+ k: 0
+ for k in status.tracker.keys()
+ if k.startswith('nodes_')
+ }
+ for s in self._node_states(size=None):
+ updates.setdefault('nodes_'+s, 0)
+ updates['nodes_'+s] += 1
+ updates['nodes_wish'] = len(self.last_wishlist)
+ status.tracker.update(updates)
+
def _state_counts(self, size):
states = self._node_states(size)
counts = {
self._later.stop_booting_node(size)
except Exception as e:
self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
+ try:
+ self._update_tracker()
+ except:
+ self._logger.exception("while updating tracker")
def _check_poll_freshness(orig_func):
"""Decorator to inhibit a method when poll information is stale.
import libcloud
from . import config as nmconfig
+from . import status
from .baseactor import WatchdogActor
from .daemon import NodeManagerDaemonActor
from .jobqueue import JobQueueMonitorActor, ServerCalculator
for sigcode in [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]:
signal.signal(sigcode, shutdown_signal)
+ status.Server(config).start()
+
try:
root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
--- /dev/null
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import http.server
+import json
+import logging
+import socketserver
+import threading
+
+_logger = logging.getLogger('status.Handler')
+
+
+class Server(socketserver.ThreadingMixIn, http.server.HTTPServer, object):
+ def __init__(self, config):
+ port = config.getint('Manage', 'port')
+ self.enabled = port >= 0
+ if not self.enabled:
+ _logger.warning("Management server disabled. "+
+ "Use [Manage] config section to enable.")
+ return
+ self._config = config
+ self._tracker = tracker
+ super(Server, self).__init__(
+ (config.get('Manage', 'address'), port), Handler)
+ self._thread = threading.Thread(target=self.serve_forever)
+ self._thread.daemon = True
+
+ def start(self):
+ if self.enabled:
+ self._thread.start()
+
+
+class Handler(http.server.BaseHTTPRequestHandler, object):
+ def do_GET(self):
+ if self.path == '/status.json':
+ self.send_response(200)
+ self.send_header('Content-type', 'application/json')
+ self.end_headers()
+ self.wfile.write(tracker.get_json())
+ else:
+ self.send_response(404)
+
+ def log_message(self, fmt, *args, **kwargs):
+ _logger.info(fmt, *args, **kwargs)
+
+
+class Tracker(object):
+ def __init__(self):
+ self._mtx = threading.Lock()
+ self._latest = {}
+
+ def get_json(self):
+ with self._mtx:
+ return json.dumps(self._latest)
+
+ def keys(self):
+ with self._mtx:
+ return self._latest.keys()
+
+ def update(self, updates):
+ with self._mtx:
+ self._latest.update(updates)
+
+
+tracker = Tracker()
# Azure configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
# EC2 configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# The dispatcher can customize the start and stop procedure for
# cloud nodes. For example, the SLURM dispatcher drains nodes
# Google Compute Engine configuration for Arvados Node Manager.
# All times are in seconds unless specified otherwise.
+[Manage]
+# The management server responds to http://addr:port/status.json with
+# a snapshot of internal state.
+
+# Management server listening address (default 127.0.0.1)
+#address = 0.0.0.0
+
+# Management server port number (default -1, server is disabled)
+#port = 8989
+
[Daemon]
# Node Manager will ensure that there are at least this many nodes running at
# all times. If node manager needs to start new idle nodes for the purpose of
# is through the API server Rails console: load the Node object, set its
# IP address to 10.10.0.N (where N is the cloud node's ID), and save.
+[Manage]
+address = 0.0.0.0
+port = 8989
+
[Daemon]
min_nodes = 0
max_nodes = 8
('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
],
install_requires=[
- 'apache-libcloud>=0.16',
- 'arvados-python-client>=0.1.20150206225333',
- 'pykka',
- 'python-daemon',
- 'setuptools'
- ],
- dependency_links = [
+ 'apache-libcloud>=0.16',
+ 'arvados-python-client>=0.1.20150206225333',
+ 'future',
+ 'pykka',
+ 'python-daemon',
+ 'setuptools'
+ ],
+ dependency_links=[
"https://github.com/curoverse/libcloud/archive/apache-libcloud-0.18.1.dev4.zip"
],
test_suite='tests',
- tests_require=['pbr<1.7.0', 'mock>=1.0', "apache-libcloud==0.18.1.dev4"],
+ tests_require=[
+ 'requests',
+ 'pbr<1.7.0',
+ 'mock>=1.0',
+ 'apache-libcloud==0.18.1.dev4',
+ ],
zip_safe=False,
cmdclass={'egg_info': tagger},
)
import pykka
import arvnodeman.daemon as nmdaemon
+import arvnodeman.status as status
from arvnodeman.jobqueue import ServerCalculator
from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
+from . import test_status
import logging
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
monitor = self.monitor_list()[0].proxy()
self.daemon.update_server_wishlist([])
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
+ self.daemon.update_server_wishlist([]).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
self.assertTrue(self.node_shutdown.start.called,
"daemon did not shut down booted node on offer")
+ with test_status.TestServer() as srv:
+ self.assertEqual(0, srv.get_status().get('nodes_unpaired', None))
+ self.assertEqual(1, srv.get_status().get('nodes_shutdown', None))
+ self.assertEqual(0, srv.get_status().get('nodes_wish', None))
+
def test_booted_node_lifecycle(self):
cloud_node = testutil.cloud_node_mock(6)
setup = self.start_node_boot(cloud_node, id_num=6)
--- /dev/null
+#!/usr/bin/env python
+
+from __future__ import absolute_import, print_function
+from future import standard_library
+
+import requests
+import unittest
+
+import arvnodeman.status as status
+import arvnodeman.config as config
+
+
+class TestServer(object):
+ def __enter__(self):
+ cfg = config.NodeManagerConfig()
+ cfg.set('Manage', 'port', '0')
+ cfg.set('Manage', 'address', '127.0.0.1')
+ self.srv = status.Server(cfg)
+ self.srv.start()
+ addr, port = self.srv.server_address
+ self.srv_base = 'http://127.0.0.1:'+str(port)
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.srv.shutdown()
+
+ def get_status_response(self):
+ return requests.get(self.srv_base+'/status.json')
+
+ def get_status(self):
+ return self.get_status_response().json()
+
+
+class StatusServerUpdates(unittest.TestCase):
+ def test_updates(self):
+ with TestServer() as srv:
+ for n in [1, 2, 3]:
+ status.tracker.update({'nodes_'+str(n): n})
+ r = srv.get_status_response()
+ self.assertEqual(200, r.status_code)
+ self.assertEqual('application/json', r.headers['content-type'])
+ resp = r.json()
+ self.assertEqual(n, resp['nodes_'+str(n)])
+ self.assertEqual(1, resp['nodes_1'])
+
+
+class StatusServerDisabled(unittest.TestCase):
+ def test_config_disabled(self):
+ cfg = config.NodeManagerConfig()
+ cfg.set('Manage', 'port', '-1')
+ cfg.set('Manage', 'address', '127.0.0.1')
+ self.srv = status.Server(cfg)
+ self.srv.start()
+ self.assertFalse(self.srv.enabled)
+ self.assertFalse(getattr(self.srv, '_thread', False))