Server-side components of Arvados contained in the apps/ and services/
directories, including the API Server, Workbench, and Crunch, are licensed
-under the GNU Affero General Public License version 3 (see agpl-3.0.txt)
+under the GNU Affero General Public License version 3 (see agpl-3.0.txt).
+
+The files and directories under the build/, lib/ and tools/ directories are
+licensed under the GNU Affero General Public License version 3 (see
+agpl-3.0.txt).
The Arvados client Software Development Kits contained in the sdk/ directory,
-example scripts in the crunch_scripts/ directory, and code samples in the
-Aravados documentation are licensed under the Apache License, Version 2.0 (see
-LICENSE-2.0.txt)
+example scripts in the crunch_scripts/ directory, the files and directories
+under backports/ and docker/, and code samples in the Aravados documentation
+are licensed under the Apache License, Version 2.0 (see LICENSE-2.0.txt).
The Arvados Documentation located in the doc/ directory is licensed under the
-Creative Commons Attribution-Share Alike 3.0 United States (see by-sa-3.0.txt)
\ No newline at end of file
+Creative Commons Attribution-Share Alike 3.0 United States (see by-sa-3.0.txt).
--- /dev/null
+$(document).
+ on('click', '.component-detail-panel', function(event) {
+ var href = $($(event.target).attr('href'));
+ if ($(href).attr("class").split(' ').indexOf("in") == -1) {
+ return; // collapsed; nothing more to do
+ }
+
+ var content_div = href.find('.work-unit-component-detail-body');
+ content_div.html('<div class="spinner spinner-32px col-sm-1"></div>');
+ var content_url = href.attr('content-url');
+ var action_data = href.attr('action-data');
+ $.ajax(content_url, {dataType: 'html', type: 'POST', data: {action_data: action_data}}).
+ done(function(data, status, jqxhr) {
+ content_div.html(data);
+ }).fail(function(jqxhr, status, error) {
+ content_div.html(error);
+ });
+ });
class WorkUnitsController < ApplicationController
+ skip_around_filter :require_thread_api_token, if: proc { |ctrl|
+ Rails.configuration.anonymous_user_token and
+ 'show_child_component' == ctrl.action_name
+ }
+
def find_objects_for_index
# If it's not the index rows partial display, just return
# The /index request will again be invoked to display the
render_error status: 422
end
end
+
+ def find_object_by_uuid
+ if params['object_type']
+ @object = params['object_type'].constantize.find(params['uuid'])
+ else
+ super
+ end
+ end
+
+ def show_child_component
+ data = JSON.load(params[:action_data])
+
+ current_obj = {}
+ current_obj_uuid = data['current_obj_uuid']
+ current_obj_name = data['current_obj_name']
+ current_obj_type = data['current_obj_type']
+ current_obj_parent = data['current_obj_parent']
+ if current_obj_uuid
+ resource_class = resource_class_for_uuid current_obj_uuid
+ obj = object_for_dataclass(resource_class, current_obj_uuid)
+ current_obj = obj if obj
+ end
+
+ if current_obj.is_a?(Hash) and !current_obj.any?
+ if current_obj_parent
+ resource_class = resource_class_for_uuid current_obj_parent
+ parent = object_for_dataclass(resource_class, current_obj_parent)
+ parent_wu = parent.work_unit
+ children = parent_wu.children
+ if current_obj_uuid
+ wu = children.select {|c| c.uuid == current_obj_uuid}.first
+ else current_obj_name
+ wu = children.select {|c| c.label.to_s == current_obj_name}.first
+ end
+ end
+ else
+ if current_obj_type == JobWorkUnit.to_s
+ wu = JobWorkUnit.new(current_obj, current_obj_name, current_obj_parent)
+ elsif current_obj_type == PipelineInstanceWorkUnit.to_s
+ wu = PipelineInstanceWorkUnit.new(current_obj, current_obj_name, current_obj_parent)
+ elsif current_obj_type == ContainerWorkUnit.to_s
+ wu = ContainerWorkUnit.new(current_obj, current_obj_name, current_obj_parent)
+ end
+ end
+
+ respond_to do |f|
+ f.html { render(partial: "show_component", locals: {wu: wu}) }
+ end
+ end
end
end
def work_unit(label=nil)
- ContainerWorkUnit.new(self, label)
+ ContainerWorkUnit.new(self, label, self.uuid)
end
end
end
def work_unit(label=nil)
- ContainerWorkUnit.new(self, label)
+ ContainerWorkUnit.new(self, label, self.uuid)
end
end
class ContainerWorkUnit < ProxyWorkUnit
attr_accessor :container
- def initialize proxied, label
+ def initialize proxied, label, parent
super
if @proxied.is_a?(ContainerRequest)
container_uuid = get(:container_uuid)
end
def children
- return self.my_children if self.my_children
+ return @my_children if @my_children
container_uuid = nil
container_uuid = if @proxied.is_a?(Container) then uuid else get(:container_uuid) end
end
end
- self.my_children = items
+ @my_children = items
end
def title
end
def work_unit(label=nil)
- JobWorkUnit.new(self, label)
+ JobWorkUnit.new(self, label, self.uuid)
end
end
class JobTask < ArvadosBase
def work_unit(label=nil)
- JobTaskWorkUnit.new(self, label)
+ JobTaskWorkUnit.new(self, label, self.uuid)
end
end
end
def work_unit(label=nil)
- PipelineInstanceWorkUnit.new(self, label || self.name)
+ PipelineInstanceWorkUnit.new(self, label || self.name, self.uuid)
end
private
if job[:uuid] and jobs[job[:uuid]]
items << jobs[job[:uuid]].work_unit(name)
else
- items << JobWorkUnit.new(job, name)
+ items << JobWorkUnit.new(job, name, uuid)
end
else
- items << JobWorkUnit.new(c, name)
+ items << JobWorkUnit.new(c, name, uuid)
end
else
@unreadable_children = true
attr_accessor :my_children
attr_accessor :unreadable_children
- def initialize proxied, label
+ def initialize proxied, label, parent
@lbl = label
@proxied = proxied
+ @parent = parent
end
def label
get(:uuid)
end
+ def parent
+ @parent
+ end
+
def modified_by_user_uuid
get(:modified_by_user_uuid)
end
if obj.respond_to? key
obj.send(key)
elsif obj.is_a?(Hash)
- obj[key]
+ obj[key] || obj[key.to_s]
end
end
end
# returns the arvados UUID of the underlying object
end
+ def parent
+ # returns the parent uuid of this work unit
+ end
+
def children
# returns an array of child work units
end
<%
job_uuids = @object.components.map { |k,j| j.is_a? Hash and j[:job].andand[:uuid] }.compact
- throttle = @object.state.start_with?('Running') ? 5000 : 15000
+ throttle = 86486400000 # 1001 nights
%>
<div class="arv-log-refresh-control"
data-load-throttle="<%= throttle %>"
+<%
+ collections = [current_obj.outputs, current_obj.docker_image].flatten.compact.uniq
+ collections_pdhs = collections.select {|x| !CollectionsHelper.match(x).nil?}.uniq.compact
+ collections_uuids = collections - collections_pdhs
+ preload_collections_for_objects collections_uuids if collections_uuids.any?
+ preload_for_pdhs collections_pdhs if collections_pdhs.any?
+
+ preload_objects_for_dataclass(Repository, [current_obj.repository], :name) if current_obj.repository
+%>
+
<div class="container">
<div class="row">
<div class="col-md-5">
<% else %>
<table>
<% keys = [:uuid, :modified_by_user_uuid, :created_at, :started_at, :finished_at, :container_uuid, :priority] %>
- <% keys << :outputs if @object.uuid == current_obj.uuid %>
+ <% keys << :log_collection if @object.uuid != current_obj.uuid %>
+ <% keys << :outputs %>
<% keys.each do |k| %>
- <% val = current_obj.send(k) if current_obj.respond_to?(k) %>
- <% has_val = val %>
- <% has_val = val.andand.any? if k == :outputs %>
+ <%
+ val = current_obj.send(k) if current_obj.respond_to?(k)
+ if k == :outputs
+ has_val = val.andand.any?
+ elsif k == :log_collection and current_obj.state_label == "Running"
+ has_val = true
+ else
+ has_val = val
+ end
+ %>
<% if has_val %>
<tr>
<td style="padding-right: 1em">
<% else %>
<%= render partial: 'work_units/show_outputs', locals: {id: current_obj.uuid, outputs: val, align:""} %>
<% end %>
+ <% elsif k == :log_collection %>
+ <%= render partial: 'work_units/show_log_link', locals: {wu: current_obj} %>
<% else %>
<%= val %>
<% end %>
<div class="panel panel-default">
<div class="panel-heading">
<div class="row">
- <div class="col-md-2" style="word-break:break-all;">
+ <div class="col-md-3" style="word-break:break-all;">
<h4 class="panel-title">
- <a data-toggle="collapse" href="#collapse<%= i %>">
+ <a class="component-detail-panel" data-toggle="collapse" href="#collapse<%= i %>">
<%= current_obj.label %> <span class="caret"></span>
</a>
</h4>
</div>
<% if not current_obj %>
- <div class="col-md-8"></div>
+ <div class="col-md-7"></div>
<% else %>
- <div class="col-md-1">
- <%= render partial: 'work_units/show_log_link', locals: {wu: current_obj} %>
- </div>
-
<% walltime = current_obj.walltime %>
<% cputime = current_obj.cputime %>
<div class="col-md-3">
<%= current_obj.child_summary_str %>
</span>
</div>
- <% elsif current_obj.is_finished? %>
- <div class="col-md-3 text-overflow-ellipsis">
- <% outputs = current_obj.outputs %>
- <% if outputs.any? %>
- <% if outputs.size == 1 %>
- <%= link_to_arvados_object_if_readable(outputs[0], 'Output data not available', link_text: "Output of #{current_obj.label}") %>
- <% else %>
- <%= render partial: 'work_units/show_outputs', locals: {id: current_obj.uuid, outputs: outputs, align:"pull-right"} %>
- <% end %>
- <% else %>
- No output.
- <% end %>
- </div>
<% end %>
<div class="col-md-1 pipeline-instance-spacing">
</div>
</div>
- <div id="collapse<%= i %>" class="panel-collapse collapse <%= if expanded then 'in' end %>">
- <div class="panel-body">
- <%= render partial: 'work_units/show_component', locals: {wu: current_obj} %>
+ <% content_url = url_for(controller: :work_units, action: :show_child_component, id: @object.uuid, object_type: @object.class.to_s) %>
+ <div id="collapse<%=i%>" class="work-unit-component-detail panel-collapse collapse <%= if expanded then 'in' end %>" content-url="<%=content_url%>" action-data="<%={current_obj_type: current_obj.class.to_s, current_obj_uuid: current_obj.uuid, current_obj_name: current_obj.label, current_obj_parent: current_obj.parent}.to_json%>">
+ <div class="panel-body work-unit-component-detail-body">
</div>
</div>
</div>
</p>
<%# Work unit children %>
-
-<%
- uuids = wu.children.collect {|c| c.uuid}.compact
- if uuids.any?
- resource_class = resource_class_for_uuid(uuids.first, friendly_name: true)
-
- start = 0; inc = 200
- while start < uuids.length
- preload_objects_for_dataclass resource_class, uuids[start, inc]
- start += inc
- end
- end
-
- collections = wu.outputs.flatten.uniq
- collections << wu.log_collection if wu.log_collection
- collections << wu.docker_image if wu.docker_image
- collections = wu.children.collect {|j| j.outputs}.compact
- collections = collections.flatten.uniq
- collections.concat wu.children.collect {|j| j.docker_image}.uniq.compact
- collections.concat wu.children.collect {|j| j.log_collection}.uniq.compact
- collections_pdhs = collections.select {|x| !(m = CollectionsHelper.match(x)).nil?}.uniq.compact
- collections_uuids = collections - collections_pdhs
-
- if collections_uuids.any?
- start = 0; inc = 200
- while start < collections_uuids.length
- preload_collections_for_objects collections_uuids[start, inc]
- start += inc
- end
- end
-
- if collections_pdhs.any?
- start = 0; inc = 200
- while start < collections_pdhs.length
- preload_for_pdhs collections_pdhs[start, inc]
- start += inc
- end
- end
-
- repos = wu.children.collect {|c| c.repository}.uniq.compact
- preload_objects_for_dataclass(Repository, repos, :name) if repos.any?
-%>
-
<% if wu.has_unreadable_children %>
<%= render(partial: "pipeline_instances/show_components_json",
locals: {error_name: "Unreadable components", backtrace: nil, wu: wu}) %>
<% else %>
- <% @descendent_count = 0 if !@descendent_count %>
<% wu.children.each do |c| %>
- <% @descendent_count += 1 %>
- <%= render(partial: 'work_units/show_child', locals: {current_obj: c, i: @descendent_count, expanded: false}) %>
+ <%= render(partial: 'work_units/show_child', locals: {current_obj: c, i: (c.uuid || rand(2**128).to_s(36)), expanded: false}) %>
<% end %>
<% end %>
<div class="arv-log-refresh-control"
- data-load-throttle="15000"
+ data-load-throttle="86486400000" <%# 1001 nights %>
></div>
<%=
render(partial: 'work_units/show_component', locals: {wu: current_obj.work_unit(name)})
get "star" => 'actions#star', :as => :star
get "all_processes" => 'work_units#index', :as => :all_processes
get "choose_work_unit_templates" => 'work_unit_templates#choose', :as => :choose_work_unit_templates
- resources :work_units
+ resources :work_units do
+ post 'show_child_component', :on => :member
+ end
resources :nodes
resources :humans
resources :traits
}]
get :index, encoded_params, session_for(:active)
end
-
- [
- [Job, 'active', 'running_job_with_components', '/jobs/zzzzz-8i9sb-jyq01m7in1jlofj#Log'],
- [PipelineInstance, 'active', 'pipeline_in_running_state', '/jobs/zzzzz-8i9sb-pshmckwoma9plh7#Log'],
- [PipelineInstance, nil, 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', 'Log unavailable'],
- ].each do |type, token, fixture, log_link|
- test "link_to_log for #{fixture} for #{token}" do
- use_token 'admin'
- obj = find_fixture(type, fixture)
-
- @controller = if type == Job then JobsController.new else PipelineInstancesController.new end
-
- if token
- get :show, {id: obj['uuid']}, session_for(token)
- else
- Rails.configuration.anonymous_user_token =
- api_fixture("api_client_authorizations", "anonymous", "api_token")
- get :show, {id: obj['uuid']}
- end
-
- assert_includes @response.body, log_link
- end
- end
end
if readable
click_link('component1')
- within('#collapse1') do
+ within('.panel-collapse') do
assert(has_text? component1['uuid'])
assert(has_text? component1['script_version'])
assert(has_text? 'script_parameters')
assert_text(expect_log_text)
end
end
+
+ [
+ ['jobs', 'active', 'running_job_with_components', 'component1', '/jobs/zzzzz-8i9sb-jyq01m7in1jlofj#Log'],
+ ['pipeline_instances', 'active', 'pipeline_in_running_state', 'foo', '/jobs/zzzzz-8i9sb-pshmckwoma9plh7#Log'],
+ ['pipeline_instances', nil, 'pipeline_in_publicly_accessible_project_but_other_objects_elsewhere', 'foo', 'Log unavailable'],
+ ].each do |type, token, fixture, child, log_link|
+ test "link_to_log for #{fixture} for #{token}" do
+ obj = api_fixture(type)[fixture]
+ if token
+ visit page_with_token token, "/#{type}/#{obj['uuid']}"
+ else
+ Rails.configuration.anonymous_user_token =
+ api_fixture("api_client_authorizations", "anonymous", "api_token")
+ visit "/#{type}/#{obj['uuid']}"
+ end
+
+ click_link(child)
+
+ if token
+ assert_selector "a[href=\"#{log_link}\"]"
+ else
+ assert_text log_link
+ end
+ end
+ end
end
echo >&2 "usage: $0 [options]"
echo >&2
echo >&2 "$0 options:"
- echo >&2 " -t, --tags [csv_tags] comma separated tags"
echo >&2 " -u, --upload Upload the images (docker push)"
+ echo >&2 " --no-cache Don't use build cache"
echo >&2 " -h, --help Display this help and exit"
echo >&2
echo >&2 " If no options are given, just builds the images."
# NOTE: This requires GNU getopt (part of the util-linux package on Debian-based distros).
TEMP=`getopt -o hut: \
- --long help,upload,tags: \
+ --long help,upload,no-cache,tags: \
-n "$0" -- "$@"`
if [ $? != 0 ] ; then echo "Use -h for help"; exit 1 ; fi
upload=true
shift
;;
+ --no-cache)
+ NOCACHE=--no-cache
+ shift
+ ;;
-t | --tags)
case "$2" in
"")
exit 1
;;
*)
- tags=$2;
+ echo "WARNING: --tags is deprecated and doesn't do anything";
shift 2
;;
esac
. $WORKSPACE/build/run-library.sh
docker_push () {
- if [[ ! -z "$tags" ]]
- then
- for tag in $( echo $tags|tr "," " " )
- do
- $DOCKER tag $1 $1:$tag
- done
- fi
-
# Sometimes docker push fails; retry it a few times if necessary.
for i in `seq 1 5`; do
$DOCKER push $*
# clean up the docker build environment
cd "$WORKSPACE"
-cd docker/jobs
-if [[ ! -z "$tags" ]]; then
- docker build --build-arg COMMIT=${tags/,*/} -t arvados/jobs .
+
+python_sdk_ts=$(cd sdk/python && timestamp_from_git)
+cwl_runner_ts=$(cd sdk/cwl && timestamp_from_git)
+
+python_sdk_version=$(cd sdk/python && nohash_version_from_git 0.1)-2
+cwl_runner_version=$(cd sdk/cwl && nohash_version_from_git 1.0)-3
+
+if [[ $python_sdk_ts -gt $cwl_runner_ts ]]; then
+ cwl_runner_version=$python_sdk_version
+ gittag=$(cd sdk/python && git log --first-parent --max-count=1 --format=format:%H)
else
- docker build -t arvados/jobs .
+ gittag=$(cd sdk/cwl && git log --first-parent --max-count=1 --format=format:%H)
fi
+echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
+
+cd docker/jobs
+docker build $NOCACHE \
+ --build-arg python_sdk_version=$python_sdk_version \
+ --build-arg cwl_runner_version=$cwl_runner_version \
+ -t arvados/jobs:$gittag .
+
ECODE=$?
if [[ "$ECODE" != "0" ]]; then
checkexit $ECODE "docker build"
title "docker build complete (`timer`)"
+if [[ "$ECODE" != "0" ]]; then
+ exit_cleanly
+fi
+
+timer_reset
+
+if docker --version |grep " 1\.[0-9]\." ; then
+ # Docker version prior 1.10 require -f flag
+ # -f flag removed in Docker 1.12
+ FORCE=-f
+fi
+
+docker tag $FORCE arvados/jobs:$gittag arvados/jobs:latest
+
+ECODE=$?
+
+if [[ "$ECODE" != "0" ]]; then
+ EXITCODE=$(($EXITCODE + $ECODE))
+fi
+
+checkexit $ECODE "docker tag"
+title "docker tag complete (`timer`)"
+
title "uploading images"
timer_reset
if [[ "$ECODE" != "0" ]]; then
- title "upload arvados images SKIPPED because build failed"
+ title "upload arvados images SKIPPED because build or tag failed"
else
if [[ $upload == true ]]; then
## 20150526 nico -- *sometimes* dockerhub needs re-login
## even though credentials are already in .dockercfg
docker login -u arvados
- docker_push arvados/jobs
+ docker_push arvados/jobs:$gittag
+ docker_push arvados/jobs:latest
title "upload arvados images finished (`timer`)"
else
title "upload arvados images SKIPPED because no --upload option set (`timer`)"
oauth2client==1.5.2 pyasn1==0.1.7 pyasn1-modules==0.0.5 \
rsa uritemplate httplib2 ws4py pykka \
ciso8601 pycrypto 'pycurl<7.21.5' \
- python-daemon llfuse==0.41.1 'pbr<1.0' pyyaml \
+ python-daemon==2.1.1 llfuse==0.41.1 'pbr<1.0' pyyaml \
'rdflib>=4.2.0' shellescape mistune typing avro \
isodate pyparsing sparqlwrapper html5lib==0.9999999 keepalive \
ruamel.ordereddict cachecontrol)
version_from_git() {
# Generates a version number from the git log for the current working
# directory, and writes it to stdout.
- local git_ts git_hash
+ local git_ts git_hash prefix
+ if [[ -n "$1" ]] ; then
+ prefix="$1"
+ else
+ prefix="0.1"
+ fi
+
declare $(format_last_commit_here "git_ts=%ct git_hash=%h")
- echo "0.1.$(date -ud "@$git_ts" +%Y%m%d%H%M%S).$git_hash"
+ echo "${prefix}.$(date -ud "@$git_ts" +%Y%m%d%H%M%S).$git_hash"
}
nohash_version_from_git() {
- version_from_git | cut -d. -f1-3
+ version_from_git $1 | cut -d. -f1-3
}
timestamp_from_git() {
RUN apt-key adv --keyserver pool.sks-keyservers.net --recv 1078ECD7
RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3
-ARG COMMIT=latest
-RUN echo $COMMIT && apt-get update -q
+ARG python_sdk_version
+ARG cwl_runner_version
+RUN echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
-RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libgnutls28-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
+RUN apt-get update -q
+RUN apt-get install -yq --no-install-recommends \
+ git python-pip python-virtualenv \
+ python-dev libgnutls28-dev libcurl4-gnutls-dev nodejs \
+ python-arvados-python-client=$python_sdk_version \
+ python-arvados-cwl-runner=$cwl_runner_version
# Install dependencies and set up system.
RUN /usr/sbin/adduser --disabled-password \
try:
self.cond.acquire()
j = self.processes[uuid]
- logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+ txt = self.work_api[0].upper() + self.work_api[1:-1]
+ logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
with Perf(metrics, "done %s" % j.name):
j.done(event["properties"]["new_attributes"])
self.cond.notify()
self.final_output_collection = final
+ def set_crunch_output(self):
+ if self.work_api == "containers":
+ try:
+ current = self.api.containers().current().execute(num_retries=self.num_retries)
+ self.api.containers().update(uuid=current['uuid'],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ }).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.info("Setting container output: %s", e)
+ elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+ self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ 'success': self.final_status == "success",
+ 'progress':1.0
+ }).execute(num_retries=self.num_retries)
+
def arv_executor(self, tool, job_order, **kwargs):
self.debug = kwargs.get("debug")
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
- if self.final_status != "success":
- raise WorkflowException("Workflow failed.")
-
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
self.make_output_collection(self.output_name, self.final_output)
+ self.set_crunch_output()
+
+ if self.final_status != "success":
+ raise WorkflowException("Workflow failed.")
if kwargs.get("compute_checksum"):
adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
parser.add_argument("--api", type=str,
default=None, dest="work_api",
- help="Select work submission API, one of 'jobs' or 'containers'.")
+ help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
parser.add_argument("--compute-checksum", action="store_true", default=False,
help="Compute checksum of contents while collecting outputs",
self.arvrunner.processes[response["container_uuid"]] = self
- logger.info("Container %s (%s) request state is %s", self.name, response["uuid"], response["state"])
+ container = self.arvrunner.api.containers().get(
+ uuid=response["container_uuid"]
+ ).execute(num_retries=self.arvrunner.num_retries)
+
+ logger.info("Container request %s (%s) state is %s with container %s %s", self.name, response["uuid"], response["state"], container["uuid"], container["state"])
- if response["state"] == "Final":
- self.done(response)
+ if container["state"] in ("Complete", "Cancelled"):
+ self.done(container)
except Exception as e:
logger.error("Got error %s" % str(e))
self.output_callback({}, "permanentFail")
from arvados.api import OrderedJsonModel
from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
from cwltool.load_tool import load_tool
+from cwltool.errors import WorkflowException
logger = logging.getLogger('arvados.cwl-runner')
arvados_cwl.add_arv_hints()
+ runner = None
try:
job_order_object = arvados.current_job()['script_parameters']
args.basedir = os.getcwd()
args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
outputObj = runner.arv_executor(t, job_order_object, **vars(args))
-
- if runner.final_output_collection:
+ except Exception as e:
+ if isinstance(e, WorkflowException):
+ logging.info("Workflow error %s", e)
+ else:
+ logging.exception("Unhandled exception")
+ if runner and runner.final_output_collection:
outputCollection = runner.final_output_collection.portable_data_hash()
else:
outputCollection = None
-
api.job_tasks().update(uuid=arvados.current_task()['uuid'],
body={
'output': outputCollection,
- 'success': True,
- 'progress':1.0
- }).execute()
- except Exception as e:
- logging.exception("Unhandled exception")
- api.job_tasks().update(uuid=arvados.current_task()['uuid'],
- body={
- 'output': None,
'success': False,
'progress':1.0
}).execute()
+++ /dev/null
-../python/gittaggers.py
\ No newline at end of file
--- /dev/null
+from setuptools.command.egg_info import egg_info
+import subprocess
+import time
+import os
+
+SETUP_DIR = os.path.dirname(__file__) or '.'
+
+def choose_version_from():
+ sdk_ts = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', os.path.join(SETUP_DIR, "../python")]).strip()
+ cwl_ts = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', SETUP_DIR]).strip()
+ if int(sdk_ts) > int(cwl_ts):
+ getver = os.path.join(SETUP_DIR, "../python")
+ else:
+ getver = SETUP_DIR
+ return getver
+
+class EggInfoFromGit(egg_info):
+ """Tag the build with git commit timestamp.
+
+ If a build tag has already been set (e.g., "egg_info -b", building
+ from source package), leave it alone.
+ """
+
+ def git_timestamp_tag(self):
+ gitinfo = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', choose_version_from()]).strip()
+ return time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo)))
+
+ def tags(self):
+ if self.tag_build is None:
+ self.tag_build = self.git_timestamp_tag()
+ return egg_info.tags(self)
try:
gitinfo = subprocess.check_output(
['git', 'log', '--first-parent', '--max-count=1',
- '--format=format:%H', SETUP_DIR]).strip()
+ '--format=format:%H', gittaggers.choose_version_from()]).strip()
with open(versionfile, "w") as f:
f.write("__version__ = '%s'\n" % gitinfo)
except Exception as e:
self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
+ self._pending_write_size = 0
+ self.threads_lock = threading.Lock()
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if self._put_queue is not None:
self._put_queue.task_done()
- @synchronized
def start_put_threads(self):
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=self._commit_bufferblock_worker)
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
+ with self.threads_lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=self._commit_bufferblock_worker)
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
def _block_prefetch_worker(self):
"""The background downloader thread."""
self.stop_threads()
@synchronized
- def repack_small_blocks(self, force=False, sync=False):
+ def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ self._pending_write_size += closed_file_size
# Check if there are enough small blocks for filling up one in full
- pending_write_size = sum([b.size() for b in small_blocks])
- if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+ if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+ # Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
+ small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
new_bb = self._alloc_bufferblock()
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
arvfile = bb.owner
+ self._pending_write_size -= bb.size()
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
arvfile.set_segments([Range(new_bb.blockid,
0,
self.flush()
elif self.closed():
# All writers closed and size is adequate for repacking
- self.parent._my_block_manager().repack_small_blocks()
+ self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
def closed(self):
"""
import json
import os
import pwd
+import time
import signal
import socket
import sys
import tempfile
+import threading
+import copy
+import logging
from apiclient import errors as apiclient_errors
import arvados.commands._util as arv_cmd
self.__init__(self.filename)
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
- STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
- ['bytes_written', '_seen_inputs'])
-
- def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
- self.bytes_written = 0
- self._seen_inputs = []
- self.cache = cache
+class ArvPutUploadJob(object):
+ CACHE_DIR = '.cache/arvados/arv-put'
+ EMPTY_STATE = {
+ 'manifest' : None, # Last saved manifest checkpoint
+ 'files' : {} # Previous run file list: {path : {size, mtime}}
+ }
+
+ def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
+ name=None, owner_uuid=None, ensure_unique_name=False,
+ num_retries=None, replication_desired=None,
+ filename=None, update_time=1.0):
+ self.paths = paths
+ self.resume = resume
self.reporter = reporter
self.bytes_expected = bytes_expected
- super(ArvPutCollectionWriter, self).__init__(**kwargs)
-
- @classmethod
- def from_cache(cls, cache, reporter=None, bytes_expected=None,
- num_retries=0, replication=0):
+ self.bytes_written = 0
+ self.bytes_skipped = 0
+ self.name = name
+ self.owner_uuid = owner_uuid
+ self.ensure_unique_name = ensure_unique_name
+ self.num_retries = num_retries
+ self.replication_desired = replication_desired
+ self.filename = filename
+ self._state_lock = threading.Lock()
+ self._state = None # Previous run state (file list & manifest)
+ self._current_files = [] # Current run file list
+ self._cache_file = None
+ self._collection = None
+ self._collection_lock = threading.Lock()
+ self._stop_checkpointer = threading.Event()
+ self._checkpointer = threading.Thread(target=self._update_task)
+ self._update_task_time = update_time # How many seconds wait between update runs
+ self.logger = logging.getLogger('arvados.arv_put')
+ # Load cached data if any and if needed
+ self._setup_state()
+
+ def start(self):
+ """
+ Start supporting thread & file uploading
+ """
+ self._checkpointer.daemon = True
+ self._checkpointer.start()
try:
- state = cache.load()
- state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state, cache, reporter, bytes_expected,
- num_retries=num_retries,
- replication=replication)
- except (TypeError, ValueError,
- arvados.errors.StaleWriterStateError) as error:
- return cls(cache, reporter, bytes_expected,
- num_retries=num_retries,
- replication=replication)
- else:
- return writer
-
- def cache_state(self):
- if self.cache is None:
- return
- state = self.dump_state()
- # Transform attributes for serialization.
- for attr, value in state.items():
- if attr == '_data_buffer':
- state[attr] = base64.encodestring(''.join(value))
- elif hasattr(value, 'popleft'):
- state[attr] = list(value)
- self.cache.save(state)
+ for path in self.paths:
+ # Test for stdin first, in case some file named '-' exist
+ if path == '-':
+ self._write_stdin(self.filename or 'stdin')
+ elif os.path.isdir(path):
+ self._write_directory_tree(path)
+ else:
+ self._write_file(path, self.filename or os.path.basename(path))
+ finally:
+ # Stop the thread before doing anything else
+ self._stop_checkpointer.set()
+ self._checkpointer.join()
+ # Commit all & one last _update()
+ self.manifest_text()
+ self._update()
+ if self.resume:
+ self._cache_file.close()
+ # Correct the final written bytes count
+ self.bytes_written -= self.bytes_skipped
+
+ def save_collection(self):
+ with self._collection_lock:
+ self._my_collection().save_new(
+ name=self.name, owner_uuid=self.owner_uuid,
+ ensure_unique_name=self.ensure_unique_name,
+ num_retries=self.num_retries)
+
+ def destroy_cache(self):
+ if self.resume:
+ try:
+ os.unlink(self._cache_filename)
+ except OSError as error:
+ # That's what we wanted anyway.
+ if error.errno != errno.ENOENT:
+ raise
+ self._cache_file.close()
+
+ def _collection_size(self, collection):
+ """
+ Recursively get the total size of the collection
+ """
+ size = 0
+ for item in collection.values():
+ if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
+ size += self._collection_size(item)
+ else:
+ size += item.size()
+ return size
+
+ def _update_task(self):
+ """
+ Periodically called support task. File uploading is
+ asynchronous so we poll status from the collection.
+ """
+ while not self._stop_checkpointer.wait(self._update_task_time):
+ self._update()
+
+ def _update(self):
+ """
+ Update cached manifest text and report progress.
+ """
+ with self._collection_lock:
+ self.bytes_written = self._collection_size(self._my_collection())
+ # Update cache, if resume enabled
+ if self.resume:
+ with self._state_lock:
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+ self._save_state()
+ # Call the reporter, if any
+ self.report_progress()
def report_progress(self):
if self.reporter is not None:
self.reporter(self.bytes_written, self.bytes_expected)
- def flush_data(self):
- start_buffer_len = self._data_buffer_len
- start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
- super(ArvPutCollectionWriter, self).flush_data()
- if self._data_buffer_len < start_buffer_len: # We actually PUT data.
- self.bytes_written += (start_buffer_len - self._data_buffer_len)
- self.report_progress()
- if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
- self.cache_state()
-
- def _record_new_input(self, input_type, source_name, dest_name):
- # The key needs to be a list because that's what we'll get back
- # from JSON deserialization.
- key = [input_type, source_name, dest_name]
- if key in self._seen_inputs:
- return False
- self._seen_inputs.append(key)
- return True
-
- def write_file(self, source, filename=None):
- if self._record_new_input('file', source, filename):
- super(ArvPutCollectionWriter, self).write_file(source, filename)
-
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- if self._record_new_input('directory', path, stream_name):
- super(ArvPutCollectionWriter, self).write_directory_tree(
- path, stream_name, max_manifest_depth)
+ def _write_directory_tree(self, path, stream_name="."):
+ # TODO: Check what happens when multiple directories are passed as
+ # arguments.
+ # If the code below is uncommented, integration test
+ # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
+ # fails, I suppose it is because the manifest_uuid changes because
+ # of the dir addition to stream_name.
+
+ # if stream_name == '.':
+ # stream_name = os.path.join('.', os.path.basename(path))
+ for item in os.listdir(path):
+ if os.path.isdir(os.path.join(path, item)):
+ self._write_directory_tree(os.path.join(path, item),
+ os.path.join(stream_name, item))
+ else:
+ self._write_file(os.path.join(path, item),
+ os.path.join(stream_name, item))
+
+ def _write_stdin(self, filename):
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'w')
+ self._write(sys.stdin, output)
+ output.close()
+
+ def _write_file(self, source, filename):
+ resume_offset = 0
+ if self.resume:
+ # Check if file was already uploaded (at least partially)
+ with self._collection_lock:
+ try:
+ file_in_collection = self._my_collection().find(filename)
+ except IOError:
+ # Not found
+ file_in_collection = None
+ # If no previous cached data on this file, store it for an eventual
+ # repeated run.
+ if source not in self._state['files']:
+ with self._state_lock:
+ self._state['files'][source] = {
+ 'mtime': os.path.getmtime(source),
+ 'size' : os.path.getsize(source)
+ }
+ with self._state_lock:
+ cached_file_data = self._state['files'][source]
+ # See if this file was already uploaded at least partially
+ if file_in_collection:
+ if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+ if cached_file_data['size'] == file_in_collection.size():
+ # File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
+ return
+ elif cached_file_data['size'] > file_in_collection.size():
+ # File partially uploaded, resume!
+ resume_offset = file_in_collection.size()
+ else:
+ # Inconsistent cache, re-upload the file
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ else:
+ # Local file differs from cached data, re-upload it
+ pass
+ with open(source, 'r') as source_fd:
+ if resume_offset > 0:
+ # Start upload where we left off
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'a')
+ source_fd.seek(resume_offset)
+ self.bytes_skipped += resume_offset
+ else:
+ # Start from scratch
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'w')
+ self._write(source_fd, output)
+ output.close(flush=False)
+
+ def _write(self, source_fd, output):
+ first_read = True
+ while True:
+ data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+ # Allow an empty file to be written
+ if not data and not first_read:
+ break
+ if first_read:
+ first_read = False
+ output.write(data)
+
+ def _my_collection(self):
+ """
+ Create a new collection if none cached. Load it from cache otherwise.
+ """
+ if self._collection is None:
+ with self._state_lock:
+ manifest = self._state['manifest']
+ if self.resume and manifest is not None:
+ # Create collection from saved state
+ self._collection = arvados.collection.Collection(
+ manifest,
+ replication_desired=self.replication_desired)
+ else:
+ # Create new collection
+ self._collection = arvados.collection.Collection(
+ replication_desired=self.replication_desired)
+ return self._collection
+
+ def _setup_state(self):
+ """
+ Create a new cache file or load a previously existing one.
+ """
+ if self.resume:
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in self.paths)
+ md5.update('\0'.join(realpaths))
+ if self.filename:
+ md5.update(self.filename)
+ cache_filename = md5.hexdigest()
+ self._cache_file = open(os.path.join(
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename), 'a+')
+ self._cache_filename = self._cache_file.name
+ self._lock_file(self._cache_file)
+ self._cache_file.seek(0)
+ with self._state_lock:
+ try:
+ self._state = json.load(self._cache_file)
+ if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ # Cache at least partially incomplete, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+ except ValueError:
+ # Cache file empty, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+ # Load how many bytes were uploaded on previous run
+ with self._collection_lock:
+ self.bytes_written = self._collection_size(self._my_collection())
+ # No resume required
+ else:
+ with self._state_lock:
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+
+ def _lock_file(self, fileobj):
+ try:
+ fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+ def _save_state(self):
+ """
+ Atomically save current state into cache.
+ """
+ try:
+ with self._state_lock:
+ state = self._state
+ new_cache_fd, new_cache_name = tempfile.mkstemp(
+ dir=os.path.dirname(self._cache_filename))
+ self._lock_file(new_cache_fd)
+ new_cache = os.fdopen(new_cache_fd, 'r+')
+ json.dump(state, new_cache)
+ new_cache.flush()
+ os.fsync(new_cache)
+ os.rename(new_cache_name, self._cache_filename)
+ except (IOError, OSError, ResumeCacheConflict) as error:
+ self.logger.error("There was a problem while saving the cache file: {}".format(error))
+ try:
+ os.unlink(new_cache_name)
+ except NameError: # mkstemp failed.
+ pass
+ else:
+ self._cache_file.close()
+ self._cache_file = new_cache
+
+ def collection_name(self):
+ with self._collection_lock:
+ name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
+ return name
+
+ def manifest_locator(self):
+ with self._collection_lock:
+ locator = self._my_collection().manifest_locator()
+ return locator
+
+ def portable_data_hash(self):
+ with self._collection_lock:
+ datahash = self._my_collection().portable_data_hash()
+ return datahash
+
+ def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ with self._collection_lock:
+ manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
+ return manifest
+
+ def _datablocks_on_item(self, item):
+ """
+ Return a list of datablock locators, recursively navigating
+ through subcollections
+ """
+ if isinstance(item, arvados.arvfile.ArvadosFile):
+ if item.size() == 0:
+ # Empty file locator
+ return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+ else:
+ locators = []
+ for segment in item.segments():
+ loc = segment.locator
+ locators.append(loc)
+ return locators
+ elif isinstance(item, arvados.collection.Collection):
+ l = [self._datablocks_on_item(x) for x in item.values()]
+ # Fast list flattener method taken from:
+ # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
+ return [loc for sublist in l for loc in sublist]
+ else:
+ return None
+
+ def data_locators(self):
+ with self._collection_lock:
+ # Make sure all datablocks are flushed before getting the locators
+ self._my_collection().manifest_text()
+ datablocks = self._datablocks_on_item(self._my_collection())
+ return datablocks
def expected_bytes_for(pathlist):
print >>stderr, error
sys.exit(1)
- # write_copies diverges from args.replication here.
- # args.replication is how many copies we will instruct Arvados to
- # maintain (by passing it in collections().create()) after all
- # data is written -- and if None was given, we'll use None there.
- # Meanwhile, write_copies is how many copies of each data block we
- # write to Keep, which has to be a number.
- #
- # If we simply changed args.replication from None to a default
- # here, we'd end up erroneously passing the default replication
- # level (instead of None) to collections().create().
- write_copies = (args.replication or
- api_client._rootDesc.get('defaultCollectionReplication', 2))
-
if args.progress:
reporter = progress_writer(human_progress)
elif args.batch_progress:
reporter = progress_writer(machine_progress)
else:
reporter = None
- bytes_expected = expected_bytes_for(args.paths)
-
- resume_cache = None
- if args.resume:
- try:
- resume_cache = ResumeCache(ResumeCache.make_path(args))
- resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
- except (IOError, OSError, ValueError):
- pass # Couldn't open cache directory/file. Continue without it.
- except ResumeCacheConflict:
- print >>stderr, "\n".join([
- "arv-put: Another process is already uploading this data.",
- " Use --no-resume if this is really what you want."])
- sys.exit(1)
- if resume_cache is None:
- writer = ArvPutCollectionWriter(
- resume_cache, reporter, bytes_expected,
- num_retries=args.retries,
- replication=write_copies)
- else:
- writer = ArvPutCollectionWriter.from_cache(
- resume_cache, reporter, bytes_expected,
- num_retries=args.retries,
- replication=write_copies)
+ bytes_expected = expected_bytes_for(args.paths)
+ try:
+ writer = ArvPutUploadJob(paths = args.paths,
+ resume = args.resume,
+ filename = args.filename,
+ reporter = reporter,
+ bytes_expected = bytes_expected,
+ num_retries = args.retries,
+ replication_desired = args.replication,
+ name = collection_name,
+ owner_uuid = project_uuid,
+ ensure_unique_name = True)
+ except ResumeCacheConflict:
+ print >>stderr, "\n".join([
+ "arv-put: Another process is already uploading this data.",
+ " Use --no-resume if this is really what you want."])
+ sys.exit(1)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
for sigcode in CAUGHT_SIGNALS}
- if writer.bytes_written > 0: # We're resuming a previous upload.
+ if args.resume and writer.bytes_written > 0:
print >>stderr, "\n".join([
"arv-put: Resuming previous upload from last checkpoint.",
" Use the --no-resume option to start over."])
writer.report_progress()
- writer.do_queued_work() # Do work resumed from cache.
- for path in args.paths: # Copy file data to Keep.
- if path == '-':
- writer.start_new_stream()
- writer.start_new_file(args.filename)
- r = sys.stdin.read(64*1024)
- while r:
- # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
- # CollectionWriter.write().
- super(arvados.collection.ResumableCollectionWriter, writer).write(r)
- r = sys.stdin.read(64*1024)
- elif os.path.isdir(path):
- writer.write_directory_tree(
- path, max_manifest_depth=args.max_manifest_depth)
- else:
- writer.start_new_stream()
- writer.write_file(path, args.filename or os.path.basename(path))
- writer.finish_current_stream()
-
+ output = None
+ writer.start()
if args.progress: # Print newline to split stderr from stdout for humans.
print >>stderr
- output = None
if args.stream:
- output = writer.manifest_text()
if args.normalize:
- output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
+ output = writer.manifest_text(normalize=True)
+ else:
+ output = writer.manifest_text()
elif args.raw:
output = ','.join(writer.data_locators())
else:
try:
- manifest_text = writer.manifest_text()
- if args.normalize:
- manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
- replication_attr = 'replication_desired'
- if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
- # API called it 'redundancy' before #3410.
- replication_attr = 'redundancy'
- # Register the resulting collection in Arvados.
- collection = api_client.collections().create(
- body={
- 'owner_uuid': project_uuid,
- 'name': collection_name,
- 'manifest_text': manifest_text,
- replication_attr: args.replication,
- },
- ensure_unique_name=True
- ).execute(num_retries=args.retries)
-
- print >>stderr, "Collection saved as '%s'" % collection['name']
-
- if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
- output = collection['portable_data_hash']
+ writer.save_collection()
+ print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+ if args.portable_data_hash:
+ output = writer.portable_data_hash()
else:
- output = collection['uuid']
-
+ output = writer.manifest_locator()
except apiclient_errors.Error as error:
print >>stderr, (
"arv-put: Error creating Collection on project: {}.".format(
if status != 0:
sys.exit(status)
- if resume_cache is not None:
- resume_cache.destroy()
-
+ # Success!
+ writer.destroy_cache()
return output
+
if __name__ == '__main__':
main()
import time
import unittest
import yaml
+import threading
+import hashlib
+import random
from cStringIO import StringIO
import arvados
import arvados.commands.put as arv_put
+import arvados_testutil as tutil
from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
import run_test_server
arv_put.ResumeCache, path)
-class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
- ArvadosBaseTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+ ArvadosBaseTestCase):
def setUp(self):
- super(ArvadosPutCollectionWriterTest, self).setUp()
+ super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
- with tempfile.NamedTemporaryFile(delete=False) as cachefile:
- self.cache = arv_put.ResumeCache(cachefile.name)
- self.cache_filename = cachefile.name
+ # Temp files creation
+ self.tempdir = tempfile.mkdtemp()
+ subdir = os.path.join(self.tempdir, 'subdir')
+ os.mkdir(subdir)
+ data = "x" * 1024 # 1 KB
+ for i in range(1, 5):
+ with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+ f.write(data * i)
+ with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+ f.write(data * 5)
+ # Large temp file for resume test
+ _, self.large_file_name = tempfile.mkstemp()
+ fileobj = open(self.large_file_name, 'w')
+ # Make sure to write just a little more than one block
+ for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ fileobj.write(data)
+ fileobj.close()
+ self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
def tearDown(self):
- super(ArvadosPutCollectionWriterTest, self).tearDown()
- if os.path.exists(self.cache_filename):
- self.cache.destroy()
- self.cache.close()
-
- def test_writer_caches(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- cwriter.write_file('/dev/null')
- cwriter.cache_state()
- self.assertTrue(self.cache.load())
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+ super(ArvPutUploadJobTest, self).tearDown()
+ shutil.rmtree(self.tempdir)
+ os.unlink(self.large_file_name)
def test_writer_works_without_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter()
- cwriter.write_file('/dev/null')
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
- def test_writer_resumes_from_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(
- self.cache)
- self.assertEqual(
- ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
- new_writer.manifest_text())
-
- def test_new_writer_from_stale_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- new_writer.write_file('/dev/null')
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
- def test_new_writer_from_empty_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- cwriter.write_file('/dev/null')
+ cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+ cwriter.start()
self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
- def test_writer_resumable_after_arbitrary_bytes(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- # These bytes are intentionally not valid UTF-8.
- with self.make_test_file('\x00\x07\xe2') as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(
- self.cache)
- self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
+ def test_writer_works_with_cache(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write('foo')
+ f.flush()
+ cwriter = arv_put.ArvPutUploadJob([f.name])
+ cwriter.start()
+ self.assertEqual(3, cwriter.bytes_written)
+ # Don't destroy the cache, and start another upload
+ cwriter_new = arv_put.ArvPutUploadJob([f.name])
+ cwriter_new.start()
+ cwriter_new.destroy_cache()
+ self.assertEqual(0, cwriter_new.bytes_written)
def make_progress_tester(self):
progression = []
return progression, record_func
def test_progress_reporting(self):
- for expect_count in (None, 8):
- progression, reporter = self.make_progress_tester()
- cwriter = arv_put.ArvPutCollectionWriter(
- reporter=reporter, bytes_expected=expect_count)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- self.assertIn((4, expect_count), progression)
-
- def test_resume_progress(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
- with self.make_test_file() as testfile:
- # Set up a writer with some flushed bytes.
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- self.assertEqual(new_writer.bytes_written, 4)
+ with tempfile.NamedTemporaryFile() as f:
+ f.write('foo')
+ f.flush()
+ for expect_count in (None, 8):
+ progression, reporter = self.make_progress_tester()
+ cwriter = arv_put.ArvPutUploadJob([f.name],
+ reporter=reporter, bytes_expected=expect_count)
+ cwriter.start()
+ cwriter.destroy_cache()
+ self.assertIn((3, expect_count), progression)
+
+ def test_writer_upload_directory(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir])
+ cwriter.start()
+ cwriter.destroy_cache()
+ self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+ def test_resume_large_file_upload(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start()
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer2.start()
+ self.assertEqual(writer.bytes_written + writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
os.chmod(cachedir, 0o700)
def test_put_block_replication(self):
- with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
- mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
- cache_mock.side_effect = ValueError
+ self.call_main_on_test_file()
+ with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
self.call_main_on_test_file(['--replication', '1'])
self.call_main_on_test_file(['--replication', '4'])
['--project-uuid', self.Z_UUID, '--stream'])
def test_api_error_handling(self):
- collections_mock = mock.Mock(name='arv.collections()')
- coll_create_mock = collections_mock().create().execute
- coll_create_mock.side_effect = arvados.errors.ApiError(
+ coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+ coll_save_mock.side_effect = arvados.errors.ApiError(
fake_httplib2_response(403), '{}')
- arv_put.api_client = arvados.api('v1')
- arv_put.api_client.collections = collections_mock
- with self.assertRaises(SystemExit) as exc_test:
- self.call_main_with_args(['/dev/null'])
- self.assertLess(0, exc_test.exception.args[0])
- self.assertLess(0, coll_create_mock.call_count)
- self.assertEqual("", self.main_stdout.getvalue())
+ with mock.patch('arvados.collection.Collection.save_new',
+ new=coll_save_mock):
+ with self.assertRaises(SystemExit) as exc_test:
+ self.call_main_with_args(['/dev/null'])
+ self.assertLess(0, exc_test.exception.args[0])
+ self.assertLess(0, coll_save_mock.call_count)
+ self.assertEqual("", self.main_stdout.getvalue())
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
accept_attribute_as_json :runtime_constraints, Hash
accept_attribute_as_json :command, Array
+ skip_before_filter :find_object_by_uuid, only: [:current]
+ skip_before_filter :render_404_if_no_object, only: [:current]
+
def auth
if @object.locked_by_uuid != Thread.current[:api_client_authorization].uuid
raise ArvadosModel::PermissionDeniedError.new("Not locked by your token")
@object.unlock
show
end
+
+ def current
+ if Thread.current[:api_client_authorization].nil?
+ send_error("Not logged in", status: 401)
+ else
+ c = Container.where(auth_uuid: Thread.current[:api_client_authorization].uuid).first
+ if c.nil?
+ send_error("Token is not associated with a container.", status: 404)
+ else
+ @object = c
+ show
+ end
+ end
+ end
end
validate :validate_state_change
validate :validate_change
validate :validate_lock
+ validate :validate_output
after_validation :assign_auth
before_save :sort_serialized_attrs
after_save :handle_completed
end
def permission_to_update
- current_user.andand.is_admin
+ # Override base permission check to allow auth_uuid to set progress and
+ # output (only). Whether it is legal to set progress and output in the current
+ # state has already been checked in validate_change.
+ current_user.andand.is_admin ||
+ (!current_api_client_authorization.nil? and
+ [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
+ end
+
+ def ensure_owner_uuid_is_permitted
+ # Override base permission check to allow auth_uuid to set progress and
+ # output (only). Whether it is legal to set progress and output in the current
+ # state has already been checked in validate_change.
+ if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
+ check_update_whitelist [:progress, :output]
+ else
+ super
+ end
end
def set_timestamps
permitted.push :priority
when Running
- permitted.push :priority, :progress
+ permitted.push :priority, :progress, :output
if self.state_changed?
permitted.push :started_at
end
end
def validate_lock
- # If the Container is already locked by someone other than the
- # current api_client_auth, disallow all changes -- except
- # priority, which needs to change to reflect max(priority) of
- # relevant ContainerRequests.
- if locked_by_uuid_was
- if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
- check_update_whitelist [:priority]
- end
- end
-
if [Locked, Running].include? self.state
# If the Container was already locked, locked_by_uuid must not
# changes. Otherwise, the current auth gets the lock.
- need_lock = locked_by_uuid_was || Thread.current[:api_client_authorization].uuid
+ need_lock = locked_by_uuid_was || current_api_client_authorization.andand.uuid
else
need_lock = nil
end
self.locked_by_uuid = need_lock
end
+ def validate_output
+ # Output must exist and be readable by the current user. This is so
+ # that a container cannot "claim" a collection that it doesn't otherwise
+ # have access to just by setting the output field to the collection PDH.
+ if output_changed?
+ c = Collection.
+ readable_by(current_user).
+ where(portable_data_hash: self.output).
+ first
+ if !c
+ errors.add :output, "collection must exist and be readable by current user."
+ end
+ end
+ end
+
def assign_auth
if self.auth_uuid_changed?
return errors.add :auth_uuid, 'is readonly'
get 'auth', on: :member
post 'lock', on: :member
post 'unlock', on: :member
+ get 'current', on: :collection
end
resources :container_requests
resources :jobs do
user: system_user
api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
expires_at: 2038-01-01 00:00:00
+
+running_container_auth:
+ uuid: zzzzz-gj3su-077z32aux8dg2s2
+ api_client: untrusted
+ user: active
+ api_token: 3kg6k6lzmp9kj6bpkcoxie963cmvjahbt2fod9zru30k1jqdmi
+ expires_at: 2038-01-01 00:00:00
+
+not_running_container_auth:
+ uuid: zzzzz-gj3su-077z32aux8dg2s3
+ api_client: untrusted
+ user: active
+ api_token: 4kg6k6lzmp9kj6bpkcoxie963cmvjahbt2fod9zru30k1jqdmj
+ expires_at: 2038-01-01 00:00:00
manifest_text: ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:file 0:0:file.bam\n"
name: collection_with_several_unsupported_file_types
+collection_not_readable_by_active:
+ uuid: zzzzz-4zz18-cd42uwvy3neko21
+ portable_data_hash: bb89eb5140e2848d39b416daeef4ffc5+45
+ owner_uuid: zzzzz-tpzed-000000000000000
+ created_at: 2014-02-03T17:22:54Z
+ modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
+ modified_by_user_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ modified_at: 2014-02-03T17:22:54Z
+ updated_at: 2014-02-03T17:22:54Z
+ manifest_text: ". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"
+ name: collection_not_readable_by_active
+
+
# Test Helper trims the rest of the file
# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
runtime_constraints:
ram: 12000000000
vcpus: 4
- auth_uuid: zzzzz-gj3su-077z32aux8dg2s1
+ auth_uuid: zzzzz-gj3su-077z32aux8dg2s2
running_older:
uuid: zzzzz-dz642-runningcontain2
runtime_constraints:
ram: 12000000000
vcpus: 4
- auth_uuid: zzzzz-gj3su-077z32aux8dg2s1
+ auth_uuid: zzzzz-gj3su-077z32aux8dg2s3
failed_container:
uuid: zzzzz-dz642-failedcontainr1
assert_equal state, Container.where(uuid: uuid).first.state
end
end
+
+ test 'get current container for token' do
+ authorize_with :running_container_auth
+ get :current
+ assert_response :success
+ assert_equal containers(:running).uuid, json_response['uuid']
+ end
+
+ test 'no container associated with token' do
+ authorize_with :dispatch1
+ get :current
+ assert_response 404
+ end
+
+ test 'try get current container, no token' do
+ get :current
+ assert_response 401
+ end
+
end
end
[
- ['active', 'zzzzz-dz642-runningcontainr'],
+ ['running_container_auth', 'zzzzz-dz642-runningcontainr'],
['active_no_prefs', nil],
].each do |token, expected|
test "create as #{token} and expect requesting_container_uuid to be #{expected}" do
DEFAULT_ATTRS = {
command: ['echo', 'foo'],
- container_image: 'img',
+ container_image: 'fa3c1a9cb6783f85f2ecda037e07b8c3+167',
output_path: '/tmp',
priority: 1,
runtime_constraints: {"vcpus" => 1, "ram" => 1},
}
- REUSABLE_COMMON_ATTRS = {container_image: "test",
+ REUSABLE_COMMON_ATTRS = {container_image: "9ae44d5792468c58bcf85ce7353c7027+124",
cwd: "test",
command: ["echo", "hello"],
output_path: "test",
def minimal_new attrs={}
cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
+ cr.state = ContainerRequest::Committed
act_as_user users(:active) do
cr.save!
end
- c = Container.new DEFAULT_ATTRS.merge(attrs)
- act_as_system_user do
- c.save!
- assert cr.update_attributes(container_uuid: c.uuid,
- state: ContainerRequest::Committed,
- ), show_errors(cr)
- end
+ c = Container.find_by_uuid cr.container_uuid
+ assert_not_nil c
return c, cr
end
def check_illegal_modify c
check_illegal_updates c, [{command: ["echo", "bar"]},
- {container_image: "img2"},
+ {container_image: "arvados/apitestfixture:june10"},
{cwd: "/tmp2"},
{environment: {"FOO" => "BAR"}},
{mounts: {"FOO" => "BAR"}},
test "Container serialized hash attributes sorted before save" do
env = {"C" => 3, "B" => 2, "A" => 1}
- m = {"F" => 3, "E" => 2, "D" => 1}
+ m = {"F" => {"kind" => 3}, "E" => {"kind" => 2}, "D" => {"kind" => 1}}
rc = {"vcpus" => 1, "ram" => 1}
c, _ = minimal_new(environment: env, mounts: m, runtime_constraints: rc)
assert_equal c.environment.to_json, Container.deep_sort_hash(env).to_json
test "find_reusable method should not select completed container when inconsistent outputs exist" do
set_user_from_auth :active
- common_attrs = REUSABLE_COMMON_ATTRS.merge({environment: {"var" => "complete"}})
+ common_attrs = REUSABLE_COMMON_ATTRS.merge({environment: {"var" => "complete"}, priority: 1})
completed_attrs = {
state: Container::Complete,
exit_code: 0,
log: 'ea10d51bcf88862dbcc36eb292017dfd+45',
}
- c_output1, _ = minimal_new(common_attrs)
- c_output2, _ = minimal_new(common_attrs)
-
set_user_from_auth :dispatch1
+
+ c_output1 = Container.create common_attrs
+ c_output2 = Container.create common_attrs
+
+ cr = ContainerRequest.new common_attrs
+ cr.state = ContainerRequest::Committed
+ cr.container_uuid = c_output1.uuid
+ cr.save!
+
+ cr = ContainerRequest.new common_attrs
+ cr.state = ContainerRequest::Committed
+ cr.container_uuid = c_output2.uuid
+ cr.save!
+
c_output1.update_attributes!({state: Container::Locked})
c_output1.update_attributes!({state: Container::Running})
c_output1.update_attributes!(completed_attrs.merge({output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'}))
assert c.update_attributes(exit_code: 1, state: Container::Complete)
end
+
+ test "locked_by_uuid can set output on running container" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ assert_equal c.locked_by_uuid, Thread.current[:api_client_authorization].uuid
+
+ assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+ assert c.update_attributes! state: Container::Complete
+ end
+
+ test "auth_uuid can set output on running container, but not change container state" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+ Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
+ assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+
+ assert_raises ArvadosModel::PermissionDeniedError do
+ # auth_uuid cannot set container state
+ c.update_attributes state: Container::Complete
+ end
+ end
+
+ test "not allowed to set output that is not readable by current user" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+ Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
+
+ assert_raises ActiveRecord::RecordInvalid do
+ c.update_attributes! output: collections(:collection_not_readable_by_active).portable_data_hash
+ end
+ end
+
+ test "other token cannot set output on running container" do
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.lock
+ c.update_attributes! state: Container::Running
+
+ set_user_from_auth :not_running_container_auth
+ assert_raises ArvadosModel::PermissionDeniedError do
+ c.update_attributes! output: collections(:foo_file).portable_data_hash
+ end
+ end
+
end
return nil
}
+ if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+ // Output may have been set directly by the container, so
+ // refresh the container record to check.
+ err := runner.ArvClient.Get("containers", runner.Container.UUID,
+ nil, &runner.Container)
+ if err != nil {
+ return err
+ }
+ if runner.Container.Output != "" {
+ // Container output is already set.
+ runner.OutputPDH = &runner.Container.Output
+ return nil
+ }
+ }
+
if runner.HostOutputDir == "" {
return nil
}
stop chan bool
cwd string
env []string
+ api *ArvTestClient
}
func NewTestDockerClient() *TestDockerClient {
docker.RemoveImage(hwImageId, true)
api = &ArvTestClient{Container: rec}
+ docker.api = api
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.statInterval = 100 * time.Millisecond
am := &ArvMountCmdLine{}
c.Check(err, NotNil)
c.Check(strings.Contains(err.Error(), "Unsupported mount kind 'collection' for stdout"), Equals, true)
}
+
+func (s *TestSuite) TestFullRunWithAPI(c *C) {
+ os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
+ defer os.Unsetenv("ARVADOS_API_HOST")
+ api, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"API": true}
+}`, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, t.env[1][17:]+"\n"))
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "test.arvados.org\n"), Equals, true)
+ c.Check(api.CalledWith("container.output", "d41d8cd98f00b204e9800998ecf8427e+0"), NotNil)
+}
+
+func (s *TestSuite) TestFullRunSetOutput(c *C) {
+ os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
+ defer os.Unsetenv("ARVADOS_API_HOST")
+ api, _ := FullRunHelper(c, `{
+ "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"API": true}
+}`, func(t *TestDockerClient) {
+ t.api.Container.Output = "d4ab34d3d4f8a72f5c4973051ae69fab+122"
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{ExitCode: 0}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(api.CalledWith("container.output", "d4ab34d3d4f8a72f5c4973051ae69fab+122"), NotNil)
+}
self._filehandles[fh] = FileHandle(fh, p)
self.inodes.touch(p)
+ # Normally, we will have received an "update" event if the
+ # parent collection is stale here. However, even if the parent
+ # collection hasn't changed, the manifest might have been
+ # fetched so long ago that the signatures on the data block
+ # locators have expired. Calling checkupdate() on all
+ # ancestors ensures the signatures will be refreshed if
+ # necessary.
+ while p.parent_inode in self.inodes:
+ if p == self.inodes[p.parent_inode]:
+ break
+ p = self.inodes[p.parent_inode]
+ self.inodes.touch(p)
+ p.checkupdate()
+
_logger.debug("arv-mount open inode %i flags %x fh %i", inode, flags, fh)
return fh
def setUp(self):
self.mnt = tempfile.mkdtemp()
run_test_server.authorize_with('active')
- self.api = arvados.safeapi.ThreadSafeApiCache(arvados.config.settings())
def tearDown(self):
os.rmdir(self.mnt)
def test_with_default_by_id(self):
self.verify_pdh_only(skip_pdh_only=True)
-
-def _test_refresh_old_manifest(zzz):
- fnm = 'zzzzz-8i9sb-0vsrcqi7whchuil.log.txt'
- os.listdir(os.path.join(zzz))
- time.sleep(3)
- with open(os.path.join(zzz, fnm)) as f:
- f.read()
-
-class TokenExpiryTest(MountTestBase):
- def setUp(self):
- super(TokenExpiryTest, self).setUp(local_store=False)
-
- @unittest.skip("bug #10008")
- @mock.patch('arvados.keep.KeepClient.get')
- def runTest(self, mocked_get):
- self.api._rootDesc = {"blobSignatureTtl": 2}
- mnt = self.make_mount(fuse.CollectionDirectory, collection_record='zzzzz-4zz18-op4e2lbej01tcvu')
- mocked_get.return_value = 'fake data'
-
- old_exp = int(time.time()) + 86400*14
- self.pool.apply(_test_refresh_old_manifest, (self.mounttmp,))
- want_exp = int(time.time()) + 86400*14
-
- got_loc = mocked_get.call_args[0][0]
- got_exp = int(
- re.search(r'\+A[0-9a-f]+@([0-9a-f]+)', got_loc).group(1),
- 16)
- self.assertGreaterEqual(
- got_exp, want_exp-2,
- msg='now+2w = {:x}, but fuse fetched locator {} (old_exp {:x})'.format(
- want_exp, got_loc, old_exp))
- self.assertLessEqual(
- got_exp, want_exp,
- msg='server is not using the expected 2w TTL; test is ineffective')
--- /dev/null
+import apiclient
+import arvados
+import arvados_fuse
+import logging
+import mock
+import multiprocessing
+import os
+import re
+import sys
+import time
+import unittest
+
+from .integration_test import IntegrationTest
+
+logger = logging.getLogger('arvados.arv-mount')
+
+class TokenExpiryTest(IntegrationTest):
+ def setUp(self):
+ super(TokenExpiryTest, self).setUp()
+ self.test_start_time = time.time()
+ self.time_now = int(time.time())+1
+
+ def fake_time(self):
+ self.time_now += 1
+ return self.time_now
+
+ orig_open = arvados_fuse.Operations.open
+ def fake_open(self, operations, *args, **kwargs):
+ self.time_now += 86400*13
+ logger.debug('opening file at time=%f', self.time_now)
+ return self.orig_open(operations, *args, **kwargs)
+
+ @mock.patch.object(arvados_fuse.Operations, 'open', autospec=True)
+ @mock.patch('time.time')
+ @mock.patch('arvados.keep.KeepClient.get')
+ @IntegrationTest.mount(argv=['--mount-by-id', 'zzz'])
+ def test_refresh_old_manifest(self, mocked_get, mocked_time, mocked_open):
+ # This test (and associated behavior) is still not strong
+ # enough. We should ensure old tokens are never used even if
+ # blobSignatureTtl seconds elapse between open() and
+ # read(). See https://dev.arvados.org/issues/10008
+
+ mocked_get.return_value = 'fake data'
+ mocked_time.side_effect = self.fake_time
+ mocked_open.side_effect = self.fake_open
+
+ with mock.patch.object(self.mount.api, 'collections', wraps=self.mount.api.collections) as mocked_collections:
+ mocked_collections.return_value = mocked_collections()
+ with mock.patch.object(self.mount.api.collections(), 'get', wraps=self.mount.api.collections().get) as mocked_get:
+ self.pool_test(os.path.join(self.mnt, 'zzz'))
+
+ # open() several times here to make sure we don't reach our
+ # quota of mocked_get.call_count dishonestly (e.g., the first
+ # open causes 5 mocked_get, and the rest cause none).
+ self.assertEqual(8, mocked_open.call_count)
+ self.assertGreaterEqual(
+ mocked_get.call_count, 8,
+ 'Not enough calls to collections().get(): expected 8, got {!r}'.format(
+ mocked_get.mock_calls))
+
+ @staticmethod
+ def _test_refresh_old_manifest(self, zzz):
+ uuid = 'zzzzz-4zz18-op4e2lbej01tcvu'
+ fnm = 'zzzzz-8i9sb-0vsrcqi7whchuil.log.txt'
+ os.listdir(os.path.join(zzz, uuid))
+ for _ in range(8):
+ with open(os.path.join(zzz, uuid, fnm)) as f:
+ f.read()
echo "Could not find Dockerfile (expected it at $ARVBOX_DOCKER/Dockerfile.base)"
exit 1
fi
- GITHEAD=$(cd $ARVBOX_DOCKER && git log --format=%H -n1 HEAD)
- docker build --build-arg=arvados_version=$GITHEAD $NO_CACHE -t arvados/arvbox-base:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
if docker --version |grep " 1\.[0-9]\." ; then
# Docker version prior 1.10 require -f flag
# -f flag removed in Docker 1.12
FORCE=-f
fi
+ GITHEAD=$(cd $ARVBOX_DOCKER && git log --format=%H -n1 HEAD)
+ docker build --build-arg=arvados_version=$GITHEAD $NO_CACHE -t arvados/arvbox-base:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
+ docker tag $FORCE arvados/arvbox-base:$GITHEAD arvados/arvbox-base:latest
if test "$1" = localdemo -o "$1" = publicdemo ; then
docker build $NO_CACHE -t arvados/arvbox-demo:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.demo" "$ARVBOX_DOCKER"
docker tag $FORCE arvados/arvbox-demo:$GITHEAD arvados/arvbox-demo:latest