Server-side components of Arvados contained in the apps/ and services/
directories, including the API Server, Workbench, and Crunch, are licensed
-under the GNU Affero General Public License version 3 (see agpl-3.0.txt)
+under the GNU Affero General Public License version 3 (see agpl-3.0.txt).
+
+The files and directories under the build/, lib/ and tools/ directories are
+licensed under the GNU Affero General Public License version 3 (see
+agpl-3.0.txt).
The Arvados client Software Development Kits contained in the sdk/ directory,
-example scripts in the crunch_scripts/ directory, and code samples in the
-Aravados documentation are licensed under the Apache License, Version 2.0 (see
-LICENSE-2.0.txt)
+example scripts in the crunch_scripts/ directory, the files and directories
+under backports/ and docker/, and code samples in the Aravados documentation
+are licensed under the Apache License, Version 2.0 (see LICENSE-2.0.txt).
The Arvados Documentation located in the doc/ directory is licensed under the
-Creative Commons Attribution-Share Alike 3.0 United States (see by-sa-3.0.txt)
\ No newline at end of file
+Creative Commons Attribution-Share Alike 3.0 United States (see by-sa-3.0.txt).
# Methods that don't require login should
# skip_around_filter :require_thread_api_token
around_filter :require_thread_api_token, except: ERROR_ACTIONS
+ before_filter :ensure_arvados_api_exists, only: [:index, :show]
before_filter :set_cache_buster
before_filter :accept_uuid_as_id_param, except: ERROR_ACTIONS
before_filter :check_user_agreements, except: ERROR_ACTIONS
end
end
+ def ensure_arvados_api_exists
+ if model_class.is_a?(Class) && model_class < ArvadosBase && !model_class.api_exists?(params['action'].to_sym)
+ @errors = ["#{params['action']} method is not supported for #{params['controller']}"]
+ return render_error(status: 404)
+ end
+ end
+
def index
find_objects_for_index if !@objects
render_index
}
@@notification_tests.push lambda { |controller, current_user|
- PipelineInstance.limit(1).where(created_by: current_user.uuid).each do
+ if PipelineInstance.api_exists?(:index)
+ PipelineInstance.limit(1).where(created_by: current_user.uuid).each do
+ return nil
+ end
+ else
return nil
end
return lambda { |view|
def recent_processes lim
lim = 12 if lim.nil?
- cols = %w(uuid owner_uuid created_at modified_at pipeline_template_uuid name state started_at finished_at)
- pipelines = PipelineInstance.select(cols).limit(lim).order(["created_at desc"])
+ procs = {}
+ if PipelineInstance.api_exists?(:index)
+ cols = %w(uuid owner_uuid created_at modified_at pipeline_template_uuid name state started_at finished_at)
+ pipelines = PipelineInstance.select(cols).limit(lim).order(["created_at desc"])
+ pipelines.results.each { |pi| procs[pi] = pi.created_at }
+ end
crs = ContainerRequest.limit(lim).order(["created_at desc"]).filter([["requesting_container_uuid", "=", nil]])
- procs = {}
- pipelines.results.each { |pi| procs[pi] = pi.created_at }
crs.results.each { |c| procs[c] = c.created_at }
Hash[procs.sort_by {|key, value| value}].keys.reverse.first(lim)
render 'hash_matches'
return
else
- jobs_with = lambda do |conds|
- Job.limit(RELATION_LIMIT).where(conds)
- .results.sort_by { |j| j.finished_at || j.created_at }
+ if Job.api_exists?(:index)
+ jobs_with = lambda do |conds|
+ Job.limit(RELATION_LIMIT).where(conds)
+ .results.sort_by { |j| j.finished_at || j.created_at }
+ end
+ @output_of = jobs_with.call(output: @object.portable_data_hash)
+ @log_of = jobs_with.call(log: @object.portable_data_hash)
end
- @output_of = jobs_with.call(output: @object.portable_data_hash)
- @log_of = jobs_with.call(log: @object.portable_data_hash)
+
@project_links = Link.limit(RELATION_LIMIT).order("modified_at DESC")
.where(head_uuid: @object.uuid, link_class: 'name').results
project_hash = Group.where(uuid: @project_links.map(&:tail_uuid)).to_hash
# It also seems to me that something like these could be used to configure the contents of the panes.
def show_pane_list
pane_list = []
+
+ procs = ["arvados#containerRequest"]
+ if PipelineInstance.api_exists?(:index)
+ procs << "arvados#pipelineInstance"
+ end
+
+ workflows = ["arvados#workflow"]
+ workflows_pane_name = 'Workflows'
+ if PipelineTemplate.api_exists?(:index)
+ workflows << "arvados#pipelineTemplate"
+ workflows_pane_name = 'Pipeline_templates'
+ end
+
if @object.uuid != current_user.andand.uuid
pane_list << 'Description'
end
pane_list <<
{
:name => 'Pipelines_and_processes',
- :filters => [%w(uuid is_a) + [%w(arvados#containerRequest arvados#pipelineInstance)]]
+ :filters => [%w(uuid is_a) + [procs]]
}
pane_list <<
{
- :name => 'Pipeline_templates',
- :filters => [%w(uuid is_a) + [%w(arvados#pipelineTemplate arvados#workflow)]]
+ :name => workflows_pane_name,
+ :filters => [%w(uuid is_a) + [workflows]]
}
pane_list <<
{
@name_link_for = {}
kind_filters.each do |attr,op,val|
(val.is_a?(Array) ? val : [val]).each do |type|
+ klass = type.split('#')[-1]
+ klass[0] = klass[0].capitalize
+ next if(!Object.const_get(klass).api_exists?(:index))
+
filters = @filters - kind_filters + [['uuid', 'is_a', type]]
if type == 'arvados#containerRequest'
filters = filters + [['container_requests.requesting_container_uuid', '=', nil]]
@filters = @filters || []
# get next page of pipeline_templates
- filters = @filters + [["uuid", "is_a", ["arvados#pipelineTemplate"]]]
- pipelines = PipelineTemplate.limit(@limit).order(["created_at desc"]).filter(filters)
+ if PipelineTemplate.api_exists?(:index)
+ filters = @filters + [["uuid", "is_a", ["arvados#pipelineTemplate"]]]
+ pipelines = PipelineTemplate.limit(@limit).order(["created_at desc"]).filter(filters)
+ end
# get next page of workflows
filters = @filters + [["uuid", "is_a", ["arvados#workflow"]]]
@filters = @filters || []
# get next page of pipeline_instances
- filters = @filters + [["uuid", "is_a", ["arvados#pipelineInstance"]]]
- pipelines = PipelineInstance.limit(@limit).order(["created_at desc"]).filter(filters)
+ if PipelineInstance.api_exists?(:index)
+ filters = @filters + [["uuid", "is_a", ["arvados#pipelineInstance"]]]
+ pipelines = PipelineInstance.limit(@limit).order(["created_at desc"]).filter(filters)
+ end
# get next page of jobs
- filters = @filters + [["uuid", "is_a", ["arvados#job"]]]
- jobs = Job.limit(@limit).order(["created_at desc"]).filter(filters)
+ if Job.api_exists?(:index)
+ filters = @filters + [["uuid", "is_a", ["arvados#job"]]]
+ jobs = Job.limit(@limit).order(["created_at desc"]).filter(filters)
+ end
# get next page of container_requests
filters = @filters + [["uuid", "is_a", ["arvados#containerRequest"]]]
end
def self.creatable?
- current_user.andand.is_active
+ current_user.andand.is_active && api_exists?(:create)
end
def self.goes_in_projects?
editable?
end
+ def self.api_exists?(method)
+ arvados_api_client.discovery[:resources][self.to_s.underscore.pluralize.to_sym].andand[:methods].andand[method]
+ end
+
# Array of strings that are the names of attributes that can be edited
# with X-Editable.
def editable_attributes
--- /dev/null
+<%= render_pane 'tab_contents', to_string: true, locals: {
+ limit: 50,
+ filters: [['uuid', 'is_a', ["arvados#workflow"]]],
+ sortable_columns: { 'name' => 'workflows.name', 'description' => 'workflows.description' }
+ }.merge(local_assigns) %>
--- /dev/null
+require 'test_helper'
+require 'helpers/share_object_helper'
+
+class DisabledApiTest < ActionController::TestCase
+ test "dashboard recent processes when pipeline_instance index API is disabled" do
+ @controller = ProjectsController.new
+
+ dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+ dd[:resources][:pipeline_instances][:methods].delete(:index)
+ ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+ get :index, {}, session_for(:active)
+ assert_includes @response.body, "zzzzz-xvhdp-cr4runningcntnr" # expect crs
+ assert_not_includes @response.body, "zzzzz-d1hrv-" # expect no pipelines
+ end
+
+ [
+ [:jobs, JobsController.new],
+ [:job_tasks, JobTasksController.new],
+ [:pipeline_instances, PipelineInstancesController.new],
+ [:pipeline_templates, PipelineTemplatesController.new],
+ ].each do |ctrl_name, ctrl|
+ test "#{ctrl_name} index page when API is disabled" do
+ @controller = ctrl
+
+ dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+ dd[:resources][ctrl_name][:methods].delete(:index)
+ ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+ get :index, {}, session_for(:active)
+ assert_response 404
+ end
+ end
+
+ [
+ :active,
+ nil,
+ ].each do |user|
+ test "project tabs as user #{user} when pipeline related index APIs are disabled" do
+ @controller = ProjectsController.new
+
+ Rails.configuration.anonymous_user_token = api_fixture('api_client_authorizations')['anonymous']['api_token']
+
+ dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+ dd[:resources][:pipeline_templates][:methods].delete(:index)
+ ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+
+ proj_uuid = api_fixture('groups')['anonymously_accessible_project']['uuid']
+
+ if user
+ get(:show, {id: proj_uuid}, session_for(user))
+ else
+ get(:show, {id: proj_uuid})
+ end
+
+ resp = @response.body
+ assert_includes resp, "href=\"#Data_collections\""
+ assert_includes resp, "href=\"#Pipelines_and_processes\""
+ assert_includes resp, "href=\"#Workflows\""
+ assert_not_includes resp, "href=\"#Pipeline_templates\""
+ end
+ end
+end
# Note: Even with all this help, phantomjs seem to behave badly
# when parsing timestamps on the other side of a DST transition.
# See skipped tests below.
+
+ # In some locales (e.g., en_CA.UTF-8) Firefox can't parse what its
+ # own toLocaleString() puts out.
+ t.sub!(/(\d\d\d\d)-(\d\d)-(\d\d)/, '\2/\3/\1')
+
if /(\d+:\d+ [AP]M) (\d+\/\d+\/\d+)/ =~ t
# Currently dates.js renders timestamps as
# '{t.toLocaleTimeString()} {t.toLocaleDateString()}' which even
- # browsers can't make sense of. First we need to flip it around
- # so it looks like what toLocaleString() would have made.
+ # en_US browsers can't make sense of. First we need to flip it
+ # around so it looks like what toLocaleString() would have made.
t = $~[2] + ', ' + $~[1]
end
- DateTime.parse(page.evaluate_script "new Date('#{t}').toUTCString()").to_time
+
+ utc = page.evaluate_script("new Date('#{t}').toUTCString()")
+ DateTime.parse(utc).to_time
end
if false
--- /dev/null
+require 'test_helper'
+
+class DisabledApiTest < ActiveSupport::TestCase
+ test 'Job.creatable? reflects whether jobs.create API is enabled' do
+ use_token(:active) do
+ assert(Job.creatable?)
+ end
+ dd = ArvadosApiClient.new_or_current.discovery.deep_dup
+ dd[:resources][:jobs][:methods].delete(:create)
+ ArvadosApiClient.any_instance.stubs(:discovery).returns(dd)
+ use_token(:active) do
+ refute(Job.creatable?)
+ end
+ end
+end
set -e
-# NOTE: This package name detection will only work on Debian.
-# If this postinst script ever starts doing work on Red Hat,
-# we'll need to adapt this code accordingly.
-script="$(basename "${0}")"
-pkg="${script%.postinst}"
-systemd_unit="${pkg}.service"
+if [ "%{name}" != "%\{name\}" ]; then
+ # Red Hat ("%{...}" is interpolated at package build time)
+ pkg="%{name}"
+ pkgtype=rpm
+ prefix="${RPM_INSTALL_PREFIX}"
+else
+ # Debian
+ script="$(basename "${0}")"
+ pkg="${script%.postinst}"
+ pkgtype=deb
+ prefix=/usr
+fi
-case "${1}" in
- configure)
- if [ -d /lib/systemd/system ]
- then
- # Python packages put all data files in /usr, so we copy
- # them to /lib at install time.
- py_unit="/usr/share/doc/${pkg}/${pkg}.service"
- if [ -e "${py_unit}" ]
- then
- cp "${py_unit}" /lib/systemd/system/
+case "${pkgtype}-${1}" in
+ deb-configure | rpm-1)
+ dest_dir="/lib/systemd/system"
+ if ! [ -d "${dest_dir}" ]; then
+ exit 0
+ fi
+
+ # Find the unit file we need to install.
+ unit_file="${pkg}.service"
+ for dir in \
+ "${prefix}/share/doc/${pkg}" \
+ "${dest_dir}"; do
+ if [ -e "${dir}/${unit_file}" ]; then
+ src_dir="${dir}"
+ break
fi
+ done
+ if [ -z "${src_dir}" ]; then
+ echo >&2 "WARNING: postinst script did not find ${unit_file} anywhere."
+ exit 0
+ fi
+
+ # Install/update the unit file if necessary.
+ if [ "${src_dir}" != "${dest_dir}" ]; then
+ cp "${src_dir}/${unit_file}" "${dest_dir}/" || exit 0
fi
+ # Enable service, and make sure systemd re-reads the unit
+ # file, in case we changed it.
if [ -e /run/systemd/system ]; then
- eval "$(systemctl -p UnitFileState show "${systemd_unit}")"
+ systemctl daemon-reload || true
+ eval "$(systemctl -p UnitFileState show "${pkg}")"
case "${UnitFileState}" in
disabled)
# Failing to enable or start the service is not a
# package error, so don't let errors here
# propagate up.
- systemctl enable "${systemd_unit}" || true
- systemctl start "${systemd_unit}" || true
+ systemctl enable "${pkg}" || true
+ systemctl start "${pkg}" || true
;;
enabled)
- systemctl daemon-reload || true
- systemctl reload-or-try-restart "${systemd_unit}" || true
+ systemctl reload-or-try-restart "${pkg}" || true
;;
esac
fi
set -e
-# NOTE: This package name detection will only work on Debian.
-# If this prerm script ever starts doing work on Red Hat,
-# we'll need to adapt this code accordingly.
-script="$(basename "${0}")"
-pkg="${script%.prerm}"
-systemd_unit="${pkg}.service"
+if [ "%{name}" != "%\{name\}" ]; then
+ # Red Hat ("%{...}" is interpolated at package build time)
+ pkg="%{name}"
+ pkgtype=rpm
+ prefix="${RPM_INSTALL_PREFIX}"
+else
+ # Debian
+ script="$(basename "${0}")"
+ pkg="${script%.prerm}"
+ pkgtype=deb
+ prefix=/usr
+fi
-case "${1}" in
- remove)
+case "${pkgtype}-${1}" in
+ deb-remove | rpm-0)
if [ -e /run/systemd/system ]; then
- systemctl stop "${systemd_unit}" || true
- systemctl disable "${systemd_unit}" || true
+ systemctl stop "${pkg}" || true
+ systemctl disable "${pkg}" || true
fi
-
- # Unit files from Python packages get installed by postinst so
- # we have to remove them explicitly here.
- py_unit="/usr/share/doc/${pkg}/${pkg}.service"
- if [ -e "${py_unit}" ]
- then
- rm "/lib/systemd/system/${pkg}.service" || true
+ if [ -e "${prefix}/share/doc/${pkg}/${pkg}.service" ]; then
+ # Unit files from Python packages get installed by
+ # postinst so we have to remove them explicitly here.
+ rm "/lib/systemd/system/${pkg}/${pkg}.service" || true
fi
;;
esac
cwl_runner_version=$(cd sdk/cwl && nohash_version_from_git 1.0)-3
if [[ $python_sdk_ts -gt $cwl_runner_ts ]]; then
- cwl_runner_version=$python_sdk_version
- gittag=$(cd sdk/python && git log --first-parent --max-count=1 --format=format:%H)
+ cwl_runner_version=$(cd sdk/python && nohash_version_from_git 1.0)-3
+ gittag=$(git log --first-parent --max-count=1 --format=format:%H sdk/python)
else
- gittag=$(cd sdk/cwl && git log --first-parent --max-count=1 --format=format:%H)
+ gittag=$(git log --first-parent --max-count=1 --format=format:%H sdk/cwl)
fi
echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
"Verify that all data from one set of Keep servers to another was copied"
package_go_binary tools/keep-rsync keep-rsync \
"Copy all data from one set of Keep servers to another"
+package_go_binary tools/keep-exercise keep-exercise \
+ "Performance testing tool for Arvados Keep"
# The Python SDK
# Please resist the temptation to add --no-python-fix-name to the fpm call here
fpm_build cwltest "" "" python 1.0.20160907111242
# And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20161007181528
+fpm_build cwltool "" "" python 1.0.20161107145355
# FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
fpm_build rdflib-jsonld "" "" python 0.3.0
sdk/go/crunchrunner
sdk/cwl
tools/crunchstat-summary
+tools/keep-exercise
tools/keep-rsync
tools/keep-block-check
echo -n 'go: '
go version \
|| fatal "No go binary. See http://golang.org/doc/install"
- [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 6 ]] \
- || fatal "Go >= 1.6 required. See http://golang.org/doc/install"
+ [[ $(go version) =~ go1.([0-9]+) ]] && [[ ${BASH_REMATCH[1]} -ge 7 ]] \
+ || fatal "Go >= 1.7 required. See http://golang.org/doc/install"
echo -n 'gcc: '
gcc --version | egrep ^gcc \
|| fatal "No gcc. Try: apt-get install build-essential"
services/crunch-dispatch-local
services/crunch-dispatch-slurm
services/crunch-run
- tools/keep-rsync
tools/keep-block-check
+ tools/keep-exercise
+ tools/keep-rsync
)
for g in "${gostuff[@]}"
do
|_. Key|_. Type|_. Description|_. Notes|
|ram|integer|Number of ram bytes to be used to run this process.|Optional. However, a ContainerRequest that is in "Committed" state must provide this.|
|vcpus|integer|Number of cores to be used to run this process.|Optional. However, a ContainerRequest that is in "Committed" state must provide this.|
+|keep_cache_ram|integer|Number of keep cache bytes to be used to run this process.|Optional.|
|API|boolean|When set, ARVADOS_API_HOST and ARVADOS_API_TOKEN will be set, and container will have networking enabled to access the Arvados API server.|Optional.|
|partition|array of strings|Specify the names of one or more compute partitions that may run this container. If not provided, the system chooses where to run the container.|Optional.|
The arvados-docker-cleaner program removes least recently used Docker images as needed to keep disk usage below a configured limit.
{% include 'notebox_begin' %}
-This also removes all containers as soon as they exit, as if they were run with @docker run --rm@. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or run it with @--remove-stopped-containers never@.
+This also removes all containers as soon as they exit, as if they were run with @docker run --rm@. If you need to debug or inspect containers after they stop, temporarily stop arvados-docker-cleaner or configure it with @"RemoveStoppedContainers":"never"@.
{% include 'notebox_end' %}
-Create a file @/etc/systemd/system/arvados-docker-cleaner.service@ in an editor. Include the text below as its contents. Make sure to edit the @ExecStart@ line appropriately for your compute node.
+Create a file @/etc/arvados/docker-cleaner/docker-cleaner.json@ in an editor, with the following contents.
<notextile>
-<pre><code>[Service]
-# Most deployments will want a quota that's at least 10G. From there,
-# a larger quota can help reduce compute overhead by preventing reloading
-# the same Docker image repeatedly, but will leave less space for other
-# files on the same storage (usually Docker volumes). Make sure the quota
-# is less than the total space available for Docker images.
-# If your deployment uses a Python 3 Software Collection, uncomment the
-# ExecStart line below, and delete the following one:
-# ExecStart=scl enable python33 "python3 -m arvados_docker.cleaner --quota <span class="userinput">20G</span>"
-ExecStart=python3 -m arvados_docker.cleaner --quota <span class="userinput">20G</span>
-Restart=always
-RestartPreventExitStatus=2
-
-[Install]
-WantedBy=default.target
-
-[Unit]
-After=docker.service
+<pre><code>{
+ "Quota": "<span class="userinput">10G</span>",
+ "RemoveStoppedContainers": "always"
+}
</code></pre>
</notextile>
-Then enable and start the service:
+*Choosing a quota:* Most deployments will want a quota that's at least 10G. From there, a larger quota can help reduce compute overhead by preventing reloading the same Docker image repeatedly, but will leave less space for other files on the same storage (usually Docker volumes). Make sure the quota is less than the total space available for Docker images.
+
+Restart the service after updating the configuration file.
<notextile>
-<pre><code>~$ <span class="userinput">sudo systemctl enable arvados-docker-cleaner.service</span>
-~$ <span class="userinput">sudo systemctl start arvados-docker-cleaner.service</span>
+<pre><code>~$ <span class="userinput">sudo systemctl restart arvados-docker-cleaner</span>
</code></pre>
</notextile>
-If you are using a different daemon supervisor, or if you want to test the daemon in a terminal window, use the command on the @ExecStart@ line above.
+*If you are using a different daemon supervisor,* or if you want to test the daemon in a terminal window, run @arvados-docker-cleaner@. Run @arvados-docker-cleaner --help@ for more configuration options.
value = params[parametername.to_s]
elsif parameter.has_key?(:default)
value = parameter[:default]
+ elsif [false, 'false', 0, '0'].index(parameter[:required])
+ value = nil
else
errors << [componentname, parametername, "required parameter is missing"]
next
srccollections = {}
for k,v in generatemapper.items():
+ if k.startswith("_:"):
+ if v.type == "Directory":
+ continue
+ if v.type == "CreateFile":
+ with final.open(v.target, "wb") as f:
+ f.write(v.resolved.encode("utf-8"))
+ continue
+
+ if not k.startswith("keep:"):
+ raise Exception("Output source is not in keep or a literal")
sp = k.split("/")
srccollection = sp[0][5:]
if srccollection not in srccollections:
- srccollections[srccollection] = arvados.collection.CollectionReader(
- srccollection,
- api_client=self.api,
- keep_client=self.keep_client,
- num_retries=self.num_retries)
+ try:
+ srccollections[srccollection] = arvados.collection.CollectionReader(
+ srccollection,
+ api_client=self.api,
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
+ except arvados.errors.ArgumentError as e:
+ logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+ raise
reader = srccollections[srccollection]
try:
srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "size", "listing"):
+ for k in ("basename", "listing", "contents"):
if k in fileobj:
del fileobj[k]
final.api_response()["name"],
final.manifest_locator())
- self.final_output_collection = final
+ def finalcollection(fileobj):
+ fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+ adjustDirObjs(outputObj, finalcollection)
+ adjustFileObjs(outputObj, finalcollection)
+
+ return (outputObj, final)
def set_crunch_output(self):
if self.work_api == "containers":
else:
if self.output_name is None:
self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
- self.make_output_collection(self.output_name, self.final_output)
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output)
self.set_crunch_output()
if self.final_status != "success":
"kind": "tmp"
}
}
+ scheduling_parameters = {}
dirs = set()
for f in self.pathmapper.files():
runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
- logger.warn("RuntimeConstraints not yet supported by container API")
+ if "keep_cache" in runtime_req:
+ runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"]
partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
if partition_req:
- runtime_constraints["partition"] = aslist(partition_req["partition"])
+ scheduling_parameters["partitions"] = aslist(partition_req["partition"])
container_request["mounts"] = mounts
container_request["runtime_constraints"] = runtime_constraints
+ container_request["use_existing"] = kwargs.get("enable_reuse", True)
+ container_request["scheduling_parameters"] = scheduling_parameters
try:
response = self.arvrunner.api.container_requests().create(
command = ["arvados-cwl-runner", "--local", "--api=containers"]
if self.output_name:
command.append("--output-name=" + self.output_name)
+
+ if self.enable_reuse:
+ command.append("--enable-reuse")
+ else:
+ command.append("--disable-reuse")
+
command.extend([workflowpath, jobpath])
return {
import logging
import sys
+import threading
import cwltool.docker
from cwltool.errors import WorkflowException
import arvados.commands.keepdocker
-
logger = logging.getLogger('arvados.cwl-runner')
+cached_lookups = {}
+cached_lookups_lock = threading.Lock()
+
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
+ global cached_lookups
+ global cached_lookups_lock
+ with cached_lookups_lock:
+ if dockerRequirement["dockerImageId"] in cached_lookups:
+ return cached_lookups[dockerRequirement["dockerImageId"]]
+
sp = dockerRequirement["dockerImageId"].split(":")
image_name = sp[0]
image_tag = sp[1] if len(sp) > 1 else None
logger.info("Uploading Docker image %s", ":".join(args[1:]))
try:
arvados.commands.keepdocker.main(args, stdout=sys.stderr)
- except SystemExit:
- raise WorkflowException()
+ except SystemExit as e:
+ if e.code:
+ raise WorkflowException("keepdocker exited with code %s" % e.code)
images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
pdh = api_client.collections().get(uuid=images[0][0]).execute()["portable_data_hash"]
+
+ with cached_lookups_lock:
+ cached_lookups[dockerRequirement["dockerImageId"]] = pdh
+
return pdh
+
+def arv_docker_clear_cache():
+ global cached_lookups
+ global cached_lookups_lock
+ with cached_lookups_lock:
+ cached_lookups = {}
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+crunchrunner_re = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.(tmpdir|outdir|keep)\)=(.*)")
class ArvadosJob(object):
"""Submit and manage a Crunch job for executing a CWL CommandLineTool."""
with Perf(metrics, "arv_docker_get_image %s" % self.name):
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
+ if docker_req.get("dockerOutputDirectory"):
+ raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
else:
runtime_constraints["docker_image"] = arvados_jobs_image(self.arvrunner)
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
log = logc.open(logc.keys()[0])
+ dirs = {}
tmpdir = None
outdir = None
keepdir = None
# the job restarts on a different node these values
# will different runs, and we need to know about the
# final run that actually produced output.
-
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
+ g = crunchrunner_re.match(l)
if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
+ dirs[g.group(1)] = g.group(2)
with Perf(metrics, "output collection %s" % self.name):
- outputs = done.done(self, record, tmpdir, outdir, keepdir)
+ outputs = done.done(self, record, dirs["tmpdir"],
+ dirs["outdir"], dirs["keep"])
except WorkflowException as e:
logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
del self.job_order["job_order"]
self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"]).target[5:]
+
if self.output_name:
self.job_order["arv:output_name"] = self.output_name
+
+ self.job_order["arv:enable_reuse"] = self.enable_reuse
+
return {
"script": "cwl-runner",
"script_version": __version__,
Specifically, translate CWL input specs to Arvados pipeline
format, like {"dataclass":"File","value":"xyz"}.
"""
+
spec = self.job.arvados_job_spec()
# Most of the component spec is exactly the same as the job
joborder_keepmount = copy.deepcopy(joborder)
def keepmount(obj):
+ if "location" not in obj:
+ raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
if obj["location"].startswith("keep:"):
obj["location"] = "/keep/" + obj["location"][5:]
if "listing" in obj:
adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
output_name = None
+ enable_reuse = True
if "arv:output_name" in job_order_object:
output_name = job_order_object["arv:output_name"]
del job_order_object["arv:output_name"]
+ if "arv:enable_reuse" in job_order_object:
+ enable_reuse = job_order_object["arv:enable_reuse"]
+ del job_order_object["arv:enable_reuse"]
+
runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
output_name=output_name)
args = argparse.Namespace()
args.project_uuid = arvados.current_job()["owner_uuid"]
- args.enable_reuse = True
+ args.enable_reuse = enable_reuse
args.submit = False
args.debug = True
args.quiet = False
else:
return super(ArvPathMapper, self).reversemap(target)
-class InitialWorkDirPathMapper(PathMapper):
+class StagingPathMapper(PathMapper):
+ _follow_dirs = True
def visit(self, obj, stagedir, basedir, copy=False):
# type: (Dict[unicode, Any], unicode, unicode, bool) -> None
loc = obj["location"]
+ tgt = os.path.join(stagedir, obj["basename"])
if obj["class"] == "Directory":
- self._pathmap[loc] = MapperEnt(obj["location"], stagedir, "Directory")
- self.visitlisting(obj.get("listing", []), stagedir, basedir)
+ self._pathmap[loc] = MapperEnt(loc, tgt, "Directory")
+ if loc.startswith("_:") or self._follow_dirs:
+ self.visitlisting(obj.get("listing", []), tgt, basedir)
elif obj["class"] == "File":
if loc in self._pathmap:
return
- tgt = os.path.join(stagedir, obj["basename"])
- if "contents" in obj and obj["location"].startswith("_:"):
+ if "contents" in obj and loc.startswith("_:"):
self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
else:
if copy:
- self._pathmap[loc] = MapperEnt(obj["path"], tgt, "WritableFile")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile")
else:
- self._pathmap[loc] = MapperEnt(obj["path"], tgt, "File")
+ self._pathmap[loc] = MapperEnt(loc, tgt, "File")
self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
+
+class InitialWorkDirPathMapper(StagingPathMapper):
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
-class FinalOutputPathMapper(PathMapper):
- def visit(self, obj, stagedir, basedir, copy=False):
- # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
- loc = obj["location"]
- if obj["class"] == "Directory":
- self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory")
- elif obj["class"] == "File":
- if loc in self._pathmap:
- return
- tgt = os.path.join(stagedir, obj["basename"])
- self._pathmap[loc] = MapperEnt(loc, tgt, "File")
- self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
-
+class FinalOutputPathMapper(StagingPathMapper):
+ _follow_dirs = False
def setup(self, referenced_files, basedir):
# type: (List[Any], unicode) -> None
self.visitlisting(referenced_files, self.stagedir, basedir)
import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.utils import aslist
+from cwltool.builder import substitute
import arvados.collection
import ruamel.yaml as yaml
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
+ if docker_req.get("dockerOutputDirectory"):
+ # TODO: can be supported by containers API, but not jobs API.
+ raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
def upload_instance(arvrunner, name, tool, job_order):
upload_docker(arvrunner, tool)
+ for t in tool.tool["inputs"]:
+ def setSecondary(fileobj):
+ if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+ if "secondaryFiles" not in fileobj:
+ fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+
+ if isinstance(fileobj, list):
+ for e in fileobj:
+ setSecondary(e)
+
+ if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+ setSecondary(job_order[shortname(t["id"])])
+
workflowmapper = upload_dependencies(arvrunner,
name,
tool.doc_loader,
tool.tool,
tool.tool["id"],
True)
-
jobmapper = upload_dependencies(arvrunner,
os.path.basename(job_order.get("id", "#")),
tool.doc_loader,
# Make sure to update arvados/build/run-build-packages.sh as well
# when updating the cwltool version pin.
install_requires=[
- 'cwltool==1.0.20161007181528',
+ 'cwltool==1.0.20161107145355',
'arvados-python-client>=0.1.20160826210445'
],
data_files=[
reset_container=1
leave_running=0
config=dev
-docker_pull=1
tag=""
while test -n "$1" ; do
config=$2
shift ; shift
;;
- --no-docker-pull)
- docker_pull=0
- shift
- ;;
--tag)
tag=$2
shift ; shift
;;
-h|--help)
- echo "$0 [--no-reset-container] [--leave-running] [--no-docker-pull] [--config dev|localdemo] [--tag docker_tag]"
+ echo "$0 [--no-reset-container] [--leave-running] [--config dev|localdemo] [--tag docker_tag]"
exit
;;
*)
. /usr/local/lib/arvbox/common.sh
-cd /usr/src/arvados/sdk/cwl
-python setup.py sdist
-pip_install \$(ls -r dist/arvados-cwl-runner-*.tar.gz | head -n1)
+if test $config = dev ; then
+ cd /usr/src/arvados/sdk/cwl
+ python setup.py sdist
+ pip_install \$(ls -r dist/arvados-cwl-runner-*.tar.gz | head -n1)
+fi
mkdir -p /tmp/cwltest
cd /tmp/cwltest
export ARVADOS_API_HOST_INSECURE=1
export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
-if test $docker_pull = 1 ; then
- arv-keepdocker --pull arvados/jobs $tag
-fi
-
cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
#!/bin/sh
exec arvados-cwl-runner --api=jobs --compute-checksum \\\$@
--- /dev/null
+import json
+import arvados
+
+_rootDesc = None
+
+def get_rootDesc():
+ global _rootDesc
+ if not _rootDesc:
+ try:
+ _rootDesc = arvados.api('v1')._rootDesc
+ except ValueError:
+ raise Exception("Test requires an running API server to fetch discovery document")
+ return _rootDesc
import arvados_cwl
+from arvados_cwl.arvdocker import arv_docker_clear_cache
import logging
import mock
import unittest
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_run(self, keepdocker):
- runner = mock.MagicMock()
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
-
- keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
- runner.api.collections().get().execute.return_value = {
- "portable_data_hash": "99999999999999999999999999999993+99"}
-
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
- tool = {
- "inputs": [],
- "outputs": [],
- "baseCommand": "ls",
- "arguments": [{"valueFrom": "$(runtime.outdir)"}]
- }
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
- arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run()
- runner.api.container_requests().create.assert_called_with(
- body={
- 'environment': {
- 'HOME': '/var/spool/cwl',
- 'TMPDIR': '/tmp'
- },
- 'name': 'test_run',
- 'runtime_constraints': {
- 'vcpus': 1,
- 'ram': 1073741824
- }, 'priority': 1,
- 'mounts': {
- '/var/spool/cwl': {'kind': 'tmp'}
- },
- 'state': 'Committed',
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'output_path': '/var/spool/cwl',
- 'container_image': '99999999999999999999999999999993+99',
- 'command': ['ls', '/var/spool/cwl'],
- 'cwd': '/var/spool/cwl'
- })
+ for enable_reuse in (True, False):
+ arv_docker_clear_cache()
+
+ runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ tool = {
+ "inputs": [],
+ "outputs": [],
+ "baseCommand": "ls",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+ }
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ arvtool.formatgraph = None
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
+ make_fs_access=make_fs_access, tmpdir="/tmp"):
+ j.run(enable_reuse=enable_reuse)
+ runner.api.container_requests().create.assert_called_with(
+ body={
+ 'environment': {
+ 'HOME': '/var/spool/cwl',
+ 'TMPDIR': '/tmp'
+ },
+ 'name': 'test_run_'+str(enable_reuse),
+ 'runtime_constraints': {
+ 'vcpus': 1,
+ 'ram': 1073741824
+ },
+ 'use_existing': enable_reuse,
+ 'priority': 1,
+ 'mounts': {
+ '/var/spool/cwl': {'kind': 'tmp'}
+ },
+ 'state': 'Committed',
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'command': ['ls', '/var/spool/cwl'],
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {}
+ })
# The test passes some fields in builder.resources
# For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
def test_resource_requirements(self, keepdocker):
+ arv_docker_clear_cache()
runner = mock.MagicMock()
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
make_fs_access=make_fs_access, tmpdir="/tmp"):
j.run()
- runner.api.container_requests().create.assert_called_with(
- body={
+ call_args, call_kwargs = runner.api.container_requests().create.call_args
+
+ call_body_expected = {
'environment': {
'HOME': '/var/spool/cwl',
'TMPDIR': '/tmp'
'runtime_constraints': {
'vcpus': 3,
'ram': 3145728000,
- 'API': True,
- 'partition': ['blurb']
- }, 'priority': 1,
+ 'keep_cache_ram': 512,
+ 'API': True
+ },
+ 'use_existing': True,
+ 'priority': 1,
'mounts': {
'/var/spool/cwl': {'kind': 'tmp'}
},
'output_path': '/var/spool/cwl',
'container_image': '99999999999999999999999999999993+99',
'command': ['ls'],
- 'cwd': '/var/spool/cwl'
- })
+ 'cwd': '/var/spool/cwl',
+ 'scheduling_parameters': {
+ 'partitions': ['blurb']
+ }
+ }
+
+ call_body = call_kwargs.get('body', None)
+ self.assertNotEqual(None, call_body)
+ for key in call_body:
+ self.assertEqual(call_body_expected.get(key), call_body.get(key))
@mock.patch("arvados.collection.Collection")
def test_done(self, col):
import mock
import os
import unittest
+import copy
+import StringIO
import arvados
import arvados_cwl
import cwltool.process
from schema_salad.ref_resolver import Loader
+from .mock_discovery import get_rootDesc
if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
class TestJob(unittest.TestCase):
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
def test_run(self, list_images_in_arv):
- runner = mock.MagicMock()
- runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- runner.ignore_docker_for_reuse = False
- runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
- list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
-
- tool = {
- "inputs": [],
- "outputs": [],
- "baseCommand": "ls",
- "arguments": [{"valueFrom": "$(runtime.outdir)"}]
- }
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
- arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
- j.run()
- runner.api.jobs().create.assert_called_with(
- body={
- 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
- 'runtime_constraints': {},
- 'script_parameters': {
- 'tasks': [{
- 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
- 'command': ['ls', '$(task.outdir)']
- }],
+ for enable_reuse in (True, False):
+ runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.num_retries = 0
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+ runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
+
+ tool = {
+ "inputs": [],
+ "outputs": [],
+ "baseCommand": "ls",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}]
+ }
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ arvtool.formatgraph = None
+ for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
+ j.run(enable_reuse=enable_reuse)
+ runner.api.jobs().create.assert_called_with(
+ body={
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
+ 'runtime_constraints': {},
+ 'script_parameters': {
+ 'tasks': [{
+ 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
+ 'command': ['ls', '$(task.outdir)']
+ }],
+ },
+ 'script_version': 'master',
+ 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
+ 'repository': 'arvados',
+ 'script': 'crunchrunner',
+ 'runtime_constraints': {
+ 'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
+ 'min_cores_per_node': 1,
+ 'min_ram_mb_per_node': 1024,
+ 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
+ }
},
- 'script_version': 'master',
- 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
- 'repository': 'arvados',
- 'script': 'crunchrunner',
- 'runtime_constraints': {
- 'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
- 'min_cores_per_node': 1,
- 'min_ram_mb_per_node': 1024,
- 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
- }
- },
- find_or_create=True,
- filters=[['repository', '=', 'arvados'],
- ['script', '=', 'crunchrunner'],
- ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
- ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]]
- )
+ find_or_create=enable_reuse,
+ filters=[['repository', '=', 'arvados'],
+ ['script', '=', 'crunchrunner'],
+ ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
+ ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]]
+ )
# The test passes some fields in builder.resources
# For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
runner.num_retries = 0
runner.ignore_docker_for_reuse = False
- reader().open.return_value = []
+ reader().open.return_value = StringIO.StringIO(
+ """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
+2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
+2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
+ """)
api.collections().list().execute.side_effect = ({"items": []},
{"items": [{"manifest_text": "XYZ"}]})
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.num_retries = 0
- reader().open.return_value = []
+ reader().open.return_value = StringIO.StringIO(
+ """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
+2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
+2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
+ """)
+
api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
arvjob = arvados_cwl.ArvadosJob(runner)
arvados_cwl.add_arv_hints()
api = mock.MagicMock()
- api._rootDesc = arvados.api('v1')._rootDesc
+ api._rootDesc = get_rootDesc()
+
runner = arvados_cwl.ArvCwlRunner(api)
self.assertEqual(runner.work_api, 'jobs')
arvados_cwl.add_arv_hints()
api = mock.MagicMock()
- api._rootDesc = arvados.api('v1')._rootDesc
+ api._rootDesc = copy.deepcopy(get_rootDesc())
del api._rootDesc.get('resources')['jobs']['methods']['create']
runner = arvados_cwl.ArvCwlRunner(api)
self.assertEqual(runner.work_api, 'containers')
import arvados
import arvados_cwl
+from .mock_discovery import get_rootDesc
class TestMakeOutput(unittest.TestCase):
def setUp(self):
self.api = mock.MagicMock()
- self.api._rootDesc = arvados.api('v1')._rootDesc
+ self.api._rootDesc = get_rootDesc()
@mock.patch("arvados.collection.Collection")
@mock.patch("arvados.collection.CollectionReader")
final.open.return_value = openmock
openmock.__enter__.return_value = cwlout
- runner.make_output_collection("Test output", {
+ _, runner.final_output_collection = runner.make_output_collection("Test output", {
"foo": {
"class": "File",
"location": "keep:99999999999999999999999999999991+99/foo.txt",
"bar": {
"class": "File",
"location": "keep:99999999999999999999999999999992+99/bar.txt",
- "basename": "baz.txt"
+ "basename": "baz.txt",
+ "size": 4
}
})
self.assertEqual("""{
"bar": {
"class": "File",
- "location": "baz.txt"
+ "location": "baz.txt",
+ "size": 4
},
"foo": {
"class": "File",
- "location": "foo.txt"
+ "location": "foo.txt",
+ "size": 3
}
}""", cwlout.getvalue())
import arvados_cwl
from cwltool.pathmapper import MapperEnt
+from .mock_discovery import get_rootDesc
from arvados_cwl.pathmapper import ArvPathMapper
class TestPathmap(unittest.TestCase):
def setUp(self):
self.api = mock.MagicMock()
- self.api._rootDesc = arvados.api('v1')._rootDesc
+ self.api._rootDesc = get_rootDesc()
def test_keepref(self):
"""Test direct keep references."""
import arvados
import arvados.collection
import arvados_cwl
+import arvados_cwl.runner
import arvados.keep
from .matcher import JsonDiffMatcher
+from .mock_discovery import get_rootDesc
+_rootDesc = None
def stubs(func):
@functools.wraps(func)
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
@mock.patch("arvados.collection.KeepClient")
+ @mock.patch("arvados.keep.KeepClient")
@mock.patch("arvados.events.subscribe")
- def wrapped(self, events, keep_client, keepdocker, *args, **kwargs):
+ def wrapped(self, events, keep_client1, keep_client2, keepdocker, *args, **kwargs):
class Stubs:
pass
stubs = Stubs()
stubs.events = events
stubs.keepdocker = keepdocker
- stubs.keep_client = keep_client
+
def putstub(p, **kwargs):
return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
- stubs.keep_client().put.side_effect = putstub
- stubs.keep_client.put.side_effect = putstub
+ keep_client1().put.side_effect = putstub
+ keep_client1.put.side_effect = putstub
+ keep_client2().put.side_effect = putstub
+ keep_client2.put.side_effect = putstub
+ stubs.keep_client = keep_client2
stubs.keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
-
stubs.api = mock.MagicMock()
- stubs.api._rootDesc = arvados.api('v1')._rootDesc
+ stubs.api._rootDesc = get_rootDesc()
+
stubs.api.users().current().execute.return_value = {
"uuid": stubs.fake_user_uuid,
}
'listing': [
{'basename': 'renamed.txt', 'class': 'File', 'location': 'keep:99999999999999999999999999999998+99/file1.txt'}
]}},
- 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl'
+ 'cwl:tool': '99999999999999999999999999999991+99/wf/submit_wf.cwl',
+ 'arv:enable_reuse': True
},
'repository': 'arvados',
'script_version': arvados_cwl.__version__,
},
'state': 'Committed',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
- 'command': ['arvados-cwl-runner', '--local', '--api=containers', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'output_path': '/var/spool/cwl',
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
+
+ @mock.patch("time.sleep")
+ @stubs
+ def test_submit_no_reuse(self, stubs, tm):
+ capture_stdout = cStringIO.StringIO()
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--debug", "--disable-reuse",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
+
+ expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
+ expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.pipeline_instances().create.assert_called_with(
+ body=expect_pipeline)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_pipeline_uuid + '\n')
+
@mock.patch("time.sleep")
@stubs
def test_submit_with_project_uuid(self, stubs, tm):
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_container_no_reuse(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--disable-reuse",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--disable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["owner_uuid"] = stubs.fake_user_uuid
+ stubs.api.container_requests().create.assert_called_with(
+ body=expect_container)
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
+ @mock.patch("cwltool.docker.get_image")
+ @mock.patch("arvados.api")
+ def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
+ arvrunner = mock.MagicMock()
+ arvrunner.project_uuid = ""
+ api.return_value = mock.MagicMock()
+ arvrunner.api = api.return_value
+ arvrunner.api.links().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
+ {"items": [], "items_available": 0, "offset": 0},
+ {"items": [], "items_available": 0, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "",
+ "link_class": "docker_image_hash",
+ "name": "123456",
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [], "items_available": 0, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "",
+ "link_class": "docker_image_repo+tag",
+ "name": "arvados/jobs:"+arvados_cwl.__version__,
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "",
+ "link_class": "docker_image_hash",
+ "name": "123456",
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0} ,
+ )
+ find_one_image_hash.return_value = "123456"
+
+ arvrunner.api.collections().list().execute.side_effect = ({"items": [], "items_available": 0, "offset": 0},
+ {"items": [{"uuid": "",
+ "owner_uuid": "",
+ "manifest_text": "",
+ "properties": ""
+ }], "items_available": 1, "offset": 0},
+ {"items": [{"uuid": ""}], "items_available": 1, "offset": 0})
+ arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
+ self.assertEqual("arvados/jobs:"+arvados_cwl.__version__, arvados_cwl.runner.arvados_jobs_image(arvrunner))
class TestCreateTemplate(unittest.TestCase):
@stubs
// Container is an arvados#container resource.
type Container struct {
- UUID string `json:"uuid"`
- Command []string `json:"command"`
- ContainerImage string `json:"container_image"`
- Cwd string `json:"cwd"`
- Environment map[string]string `json:"environment"`
- LockedByUUID string `json:"locked_by_uuid"`
- Mounts map[string]Mount `json:"mounts"`
- Output string `json:"output"`
- OutputPath string `json:"output_path"`
- Priority int `json:"priority"`
- RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
- State ContainerState `json:"state"`
+ UUID string `json:"uuid"`
+ Command []string `json:"command"`
+ ContainerImage string `json:"container_image"`
+ Cwd string `json:"cwd"`
+ Environment map[string]string `json:"environment"`
+ LockedByUUID string `json:"locked_by_uuid"`
+ Mounts map[string]Mount `json:"mounts"`
+ Output string `json:"output"`
+ OutputPath string `json:"output_path"`
+ Priority int `json:"priority"`
+ RuntimeConstraints RuntimeConstraints `json:"runtime_constraints"`
+ State ContainerState `json:"state"`
+ SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
}
// Mount is special behavior to attach to a filesystem path or device.
// RuntimeConstraints specify a container's compute resources (RAM,
// CPU) and network connectivity.
type RuntimeConstraints struct {
- API *bool
- RAM int `json:"ram"`
- VCPUs int `json:"vcpus"`
- Partition []string `json:"partition"`
+ API *bool
+ RAM int `json:"ram"`
+ VCPUs int `json:"vcpus"`
+ KeepCacheRAM int `json:"keep_cache_ram"`
+}
+
+// SchedulingParameters specify a container's scheduling parameters
+// such as Partitions
+type SchedulingParameters struct {
+ Partitions []string `json:"partitions"`
}
// ContainerList is an arvados#containerList resource.
self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
+ self._pending_write_size = 0
+ self.threads_lock = threading.Lock()
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
if self._put_queue is not None:
self._put_queue.task_done()
- @synchronized
def start_put_threads(self):
- if self._put_threads is None:
- # Start uploader threads.
-
- # If we don't limit the Queue size, the upload queue can quickly
- # grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be send to the Keep
- # servers.
- #
- # With two upload threads and a queue size of 2, this means up to 4
- # blocks pending. If they are full 64 MiB blocks, that means up to
- # 256 MiB of internal buffering, which is the same size as the
- # default download block cache in KeepClient.
- self._put_queue = Queue.Queue(maxsize=2)
-
- self._put_threads = []
- for i in xrange(0, self.num_put_threads):
- thread = threading.Thread(target=self._commit_bufferblock_worker)
- self._put_threads.append(thread)
- thread.daemon = True
- thread.start()
+ with self.threads_lock:
+ if self._put_threads is None:
+ # Start uploader threads.
+
+ # If we don't limit the Queue size, the upload queue can quickly
+ # grow to take up gigabytes of RAM if the writing process is
+ # generating data more quickly than it can be send to the Keep
+ # servers.
+ #
+ # With two upload threads and a queue size of 2, this means up to 4
+ # blocks pending. If they are full 64 MiB blocks, that means up to
+ # 256 MiB of internal buffering, which is the same size as the
+ # default download block cache in KeepClient.
+ self._put_queue = Queue.Queue(maxsize=2)
+
+ self._put_threads = []
+ for i in xrange(0, self.num_put_threads):
+ thread = threading.Thread(target=self._commit_bufferblock_worker)
+ self._put_threads.append(thread)
+ thread.daemon = True
+ thread.start()
def _block_prefetch_worker(self):
"""The background downloader thread."""
self.stop_threads()
@synchronized
- def repack_small_blocks(self, force=False, sync=False):
+ def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
"""Packs small blocks together before uploading"""
- # Search blocks ready for getting packed together before being committed to Keep.
- # A WRITABLE block always has an owner.
- # A WRITABLE block with its owner.closed() implies that it's
- # size is <= KEEP_BLOCK_SIZE/2.
- small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
- if len(small_blocks) <= 1:
- # Not enough small blocks for repacking
- return
+ self._pending_write_size += closed_file_size
# Check if there are enough small blocks for filling up one in full
- pending_write_size = sum([b.size() for b in small_blocks])
- if force or (pending_write_size >= config.KEEP_BLOCK_SIZE):
+ if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
+
+ # Search blocks ready for getting packed together before being committed to Keep.
+ # A WRITABLE block always has an owner.
+ # A WRITABLE block with its owner.closed() implies that it's
+ # size is <= KEEP_BLOCK_SIZE/2.
+ small_blocks = [b for b in self._bufferblocks.values() if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
+
+ if len(small_blocks) <= 1:
+ # Not enough small blocks for repacking
+ return
+
+ # Update the pending write size count with its true value, just in case
+ # some small file was opened, written and closed several times.
+ self._pending_write_size = sum([b.size() for b in small_blocks])
+ if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
+ return
+
new_bb = self._alloc_bufferblock()
while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
bb = small_blocks.pop(0)
arvfile = bb.owner
+ self._pending_write_size -= bb.size()
new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
arvfile.set_segments([Range(new_bb.blockid,
0,
self.flush()
elif self.closed():
# All writers closed and size is adequate for repacking
- self.parent._my_block_manager().repack_small_blocks()
+ self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
def closed(self):
"""
import json
import os
import pwd
+import time
import signal
import socket
import sys
import tempfile
+import threading
+import copy
+import logging
from apiclient import errors as apiclient_errors
import arvados.commands._util as arv_cmd
self.__init__(self.filename)
-class ArvPutCollectionWriter(arvados.ResumableCollectionWriter):
- STATE_PROPS = (arvados.ResumableCollectionWriter.STATE_PROPS +
- ['bytes_written', '_seen_inputs'])
-
- def __init__(self, cache=None, reporter=None, bytes_expected=None, **kwargs):
- self.bytes_written = 0
- self._seen_inputs = []
- self.cache = cache
+class ArvPutUploadJob(object):
+ CACHE_DIR = '.cache/arvados/arv-put'
+ EMPTY_STATE = {
+ 'manifest' : None, # Last saved manifest checkpoint
+ 'files' : {} # Previous run file list: {path : {size, mtime}}
+ }
+
+ def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
+ name=None, owner_uuid=None, ensure_unique_name=False,
+ num_retries=None, replication_desired=None,
+ filename=None, update_time=1.0):
+ self.paths = paths
+ self.resume = resume
self.reporter = reporter
self.bytes_expected = bytes_expected
- super(ArvPutCollectionWriter, self).__init__(**kwargs)
-
- @classmethod
- def from_cache(cls, cache, reporter=None, bytes_expected=None,
- num_retries=0, replication=0):
+ self.bytes_written = 0
+ self.bytes_skipped = 0
+ self.name = name
+ self.owner_uuid = owner_uuid
+ self.ensure_unique_name = ensure_unique_name
+ self.num_retries = num_retries
+ self.replication_desired = replication_desired
+ self.filename = filename
+ self._state_lock = threading.Lock()
+ self._state = None # Previous run state (file list & manifest)
+ self._current_files = [] # Current run file list
+ self._cache_file = None
+ self._collection = None
+ self._collection_lock = threading.Lock()
+ self._stop_checkpointer = threading.Event()
+ self._checkpointer = threading.Thread(target=self._update_task)
+ self._update_task_time = update_time # How many seconds wait between update runs
+ self.logger = logging.getLogger('arvados.arv_put')
+ # Load cached data if any and if needed
+ self._setup_state()
+
+ def start(self):
+ """
+ Start supporting thread & file uploading
+ """
+ self._checkpointer.daemon = True
+ self._checkpointer.start()
try:
- state = cache.load()
- state['_data_buffer'] = [base64.decodestring(state['_data_buffer'])]
- writer = cls.from_state(state, cache, reporter, bytes_expected,
- num_retries=num_retries,
- replication=replication)
- except (TypeError, ValueError,
- arvados.errors.StaleWriterStateError) as error:
- return cls(cache, reporter, bytes_expected,
- num_retries=num_retries,
- replication=replication)
- else:
- return writer
-
- def cache_state(self):
- if self.cache is None:
- return
- state = self.dump_state()
- # Transform attributes for serialization.
- for attr, value in state.items():
- if attr == '_data_buffer':
- state[attr] = base64.encodestring(''.join(value))
- elif hasattr(value, 'popleft'):
- state[attr] = list(value)
- self.cache.save(state)
+ for path in self.paths:
+ # Test for stdin first, in case some file named '-' exist
+ if path == '-':
+ self._write_stdin(self.filename or 'stdin')
+ elif os.path.isdir(path):
+ self._write_directory_tree(path)
+ else:
+ self._write_file(path, self.filename or os.path.basename(path))
+ finally:
+ # Stop the thread before doing anything else
+ self._stop_checkpointer.set()
+ self._checkpointer.join()
+ # Commit all & one last _update()
+ self.manifest_text()
+ self._update()
+ if self.resume:
+ self._cache_file.close()
+ # Correct the final written bytes count
+ self.bytes_written -= self.bytes_skipped
+
+ def save_collection(self):
+ with self._collection_lock:
+ self._my_collection().save_new(
+ name=self.name, owner_uuid=self.owner_uuid,
+ ensure_unique_name=self.ensure_unique_name,
+ num_retries=self.num_retries)
+
+ def destroy_cache(self):
+ if self.resume:
+ try:
+ os.unlink(self._cache_filename)
+ except OSError as error:
+ # That's what we wanted anyway.
+ if error.errno != errno.ENOENT:
+ raise
+ self._cache_file.close()
+
+ def _collection_size(self, collection):
+ """
+ Recursively get the total size of the collection
+ """
+ size = 0
+ for item in collection.values():
+ if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
+ size += self._collection_size(item)
+ else:
+ size += item.size()
+ return size
+
+ def _update_task(self):
+ """
+ Periodically called support task. File uploading is
+ asynchronous so we poll status from the collection.
+ """
+ while not self._stop_checkpointer.wait(self._update_task_time):
+ self._update()
+
+ def _update(self):
+ """
+ Update cached manifest text and report progress.
+ """
+ with self._collection_lock:
+ self.bytes_written = self._collection_size(self._my_collection())
+ # Update cache, if resume enabled
+ if self.resume:
+ with self._state_lock:
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+ self._save_state()
+ # Call the reporter, if any
+ self.report_progress()
def report_progress(self):
if self.reporter is not None:
self.reporter(self.bytes_written, self.bytes_expected)
- def flush_data(self):
- start_buffer_len = self._data_buffer_len
- start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
- super(ArvPutCollectionWriter, self).flush_data()
- if self._data_buffer_len < start_buffer_len: # We actually PUT data.
- self.bytes_written += (start_buffer_len - self._data_buffer_len)
- self.report_progress()
- if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
- self.cache_state()
-
- def _record_new_input(self, input_type, source_name, dest_name):
- # The key needs to be a list because that's what we'll get back
- # from JSON deserialization.
- key = [input_type, source_name, dest_name]
- if key in self._seen_inputs:
- return False
- self._seen_inputs.append(key)
- return True
-
- def write_file(self, source, filename=None):
- if self._record_new_input('file', source, filename):
- super(ArvPutCollectionWriter, self).write_file(source, filename)
-
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- if self._record_new_input('directory', path, stream_name):
- super(ArvPutCollectionWriter, self).write_directory_tree(
- path, stream_name, max_manifest_depth)
+ def _write_directory_tree(self, path, stream_name="."):
+ # TODO: Check what happens when multiple directories are passed as
+ # arguments.
+ # If the code below is uncommented, integration test
+ # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
+ # fails, I suppose it is because the manifest_uuid changes because
+ # of the dir addition to stream_name.
+
+ # if stream_name == '.':
+ # stream_name = os.path.join('.', os.path.basename(path))
+ for item in os.listdir(path):
+ if os.path.isdir(os.path.join(path, item)):
+ self._write_directory_tree(os.path.join(path, item),
+ os.path.join(stream_name, item))
+ else:
+ self._write_file(os.path.join(path, item),
+ os.path.join(stream_name, item))
+
+ def _write_stdin(self, filename):
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'w')
+ self._write(sys.stdin, output)
+ output.close()
+
+ def _write_file(self, source, filename):
+ resume_offset = 0
+ if self.resume:
+ # Check if file was already uploaded (at least partially)
+ with self._collection_lock:
+ try:
+ file_in_collection = self._my_collection().find(filename)
+ except IOError:
+ # Not found
+ file_in_collection = None
+ # If no previous cached data on this file, store it for an eventual
+ # repeated run.
+ if source not in self._state['files']:
+ with self._state_lock:
+ self._state['files'][source] = {
+ 'mtime': os.path.getmtime(source),
+ 'size' : os.path.getsize(source)
+ }
+ with self._state_lock:
+ cached_file_data = self._state['files'][source]
+ # See if this file was already uploaded at least partially
+ if file_in_collection:
+ if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+ if cached_file_data['size'] == file_in_collection.size():
+ # File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
+ return
+ elif cached_file_data['size'] > file_in_collection.size():
+ # File partially uploaded, resume!
+ resume_offset = file_in_collection.size()
+ else:
+ # Inconsistent cache, re-upload the file
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ else:
+ # Local file differs from cached data, re-upload it
+ pass
+ with open(source, 'r') as source_fd:
+ if resume_offset > 0:
+ # Start upload where we left off
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'a')
+ source_fd.seek(resume_offset)
+ self.bytes_skipped += resume_offset
+ else:
+ # Start from scratch
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'w')
+ self._write(source_fd, output)
+ output.close(flush=False)
+
+ def _write(self, source_fd, output):
+ first_read = True
+ while True:
+ data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+ # Allow an empty file to be written
+ if not data and not first_read:
+ break
+ if first_read:
+ first_read = False
+ output.write(data)
+
+ def _my_collection(self):
+ """
+ Create a new collection if none cached. Load it from cache otherwise.
+ """
+ if self._collection is None:
+ with self._state_lock:
+ manifest = self._state['manifest']
+ if self.resume and manifest is not None:
+ # Create collection from saved state
+ self._collection = arvados.collection.Collection(
+ manifest,
+ replication_desired=self.replication_desired)
+ else:
+ # Create new collection
+ self._collection = arvados.collection.Collection(
+ replication_desired=self.replication_desired)
+ return self._collection
+
+ def _setup_state(self):
+ """
+ Create a new cache file or load a previously existing one.
+ """
+ if self.resume:
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in self.paths)
+ md5.update('\0'.join(realpaths))
+ if self.filename:
+ md5.update(self.filename)
+ cache_filename = md5.hexdigest()
+ self._cache_file = open(os.path.join(
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename), 'a+')
+ self._cache_filename = self._cache_file.name
+ self._lock_file(self._cache_file)
+ self._cache_file.seek(0)
+ with self._state_lock:
+ try:
+ self._state = json.load(self._cache_file)
+ if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ # Cache at least partially incomplete, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+ except ValueError:
+ # Cache file empty, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+ # Load how many bytes were uploaded on previous run
+ with self._collection_lock:
+ self.bytes_written = self._collection_size(self._my_collection())
+ # No resume required
+ else:
+ with self._state_lock:
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+
+ def _lock_file(self, fileobj):
+ try:
+ fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+ def _save_state(self):
+ """
+ Atomically save current state into cache.
+ """
+ try:
+ with self._state_lock:
+ state = self._state
+ new_cache_fd, new_cache_name = tempfile.mkstemp(
+ dir=os.path.dirname(self._cache_filename))
+ self._lock_file(new_cache_fd)
+ new_cache = os.fdopen(new_cache_fd, 'r+')
+ json.dump(state, new_cache)
+ new_cache.flush()
+ os.fsync(new_cache)
+ os.rename(new_cache_name, self._cache_filename)
+ except (IOError, OSError, ResumeCacheConflict) as error:
+ self.logger.error("There was a problem while saving the cache file: {}".format(error))
+ try:
+ os.unlink(new_cache_name)
+ except NameError: # mkstemp failed.
+ pass
+ else:
+ self._cache_file.close()
+ self._cache_file = new_cache
+
+ def collection_name(self):
+ with self._collection_lock:
+ name = self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
+ return name
+
+ def manifest_locator(self):
+ with self._collection_lock:
+ locator = self._my_collection().manifest_locator()
+ return locator
+
+ def portable_data_hash(self):
+ with self._collection_lock:
+ datahash = self._my_collection().portable_data_hash()
+ return datahash
+
+ def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ with self._collection_lock:
+ manifest = self._my_collection().manifest_text(stream_name, strip, normalize)
+ return manifest
+
+ def _datablocks_on_item(self, item):
+ """
+ Return a list of datablock locators, recursively navigating
+ through subcollections
+ """
+ if isinstance(item, arvados.arvfile.ArvadosFile):
+ if item.size() == 0:
+ # Empty file locator
+ return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+ else:
+ locators = []
+ for segment in item.segments():
+ loc = segment.locator
+ locators.append(loc)
+ return locators
+ elif isinstance(item, arvados.collection.Collection):
+ l = [self._datablocks_on_item(x) for x in item.values()]
+ # Fast list flattener method taken from:
+ # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
+ return [loc for sublist in l for loc in sublist]
+ else:
+ return None
+
+ def data_locators(self):
+ with self._collection_lock:
+ # Make sure all datablocks are flushed before getting the locators
+ self._my_collection().manifest_text()
+ datablocks = self._datablocks_on_item(self._my_collection())
+ return datablocks
def expected_bytes_for(pathlist):
print >>stderr, error
sys.exit(1)
- # write_copies diverges from args.replication here.
- # args.replication is how many copies we will instruct Arvados to
- # maintain (by passing it in collections().create()) after all
- # data is written -- and if None was given, we'll use None there.
- # Meanwhile, write_copies is how many copies of each data block we
- # write to Keep, which has to be a number.
- #
- # If we simply changed args.replication from None to a default
- # here, we'd end up erroneously passing the default replication
- # level (instead of None) to collections().create().
- write_copies = (args.replication or
- api_client._rootDesc.get('defaultCollectionReplication', 2))
-
if args.progress:
reporter = progress_writer(human_progress)
elif args.batch_progress:
reporter = progress_writer(machine_progress)
else:
reporter = None
- bytes_expected = expected_bytes_for(args.paths)
-
- resume_cache = None
- if args.resume:
- try:
- resume_cache = ResumeCache(ResumeCache.make_path(args))
- resume_cache.check_cache(api_client=api_client, num_retries=args.retries)
- except (IOError, OSError, ValueError):
- pass # Couldn't open cache directory/file. Continue without it.
- except ResumeCacheConflict:
- print >>stderr, "\n".join([
- "arv-put: Another process is already uploading this data.",
- " Use --no-resume if this is really what you want."])
- sys.exit(1)
- if resume_cache is None:
- writer = ArvPutCollectionWriter(
- resume_cache, reporter, bytes_expected,
- num_retries=args.retries,
- replication=write_copies)
- else:
- writer = ArvPutCollectionWriter.from_cache(
- resume_cache, reporter, bytes_expected,
- num_retries=args.retries,
- replication=write_copies)
+ bytes_expected = expected_bytes_for(args.paths)
+ try:
+ writer = ArvPutUploadJob(paths = args.paths,
+ resume = args.resume,
+ filename = args.filename,
+ reporter = reporter,
+ bytes_expected = bytes_expected,
+ num_retries = args.retries,
+ replication_desired = args.replication,
+ name = collection_name,
+ owner_uuid = project_uuid,
+ ensure_unique_name = True)
+ except ResumeCacheConflict:
+ print >>stderr, "\n".join([
+ "arv-put: Another process is already uploading this data.",
+ " Use --no-resume if this is really what you want."])
+ sys.exit(1)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
for sigcode in CAUGHT_SIGNALS}
- if writer.bytes_written > 0: # We're resuming a previous upload.
+ if args.resume and writer.bytes_written > 0:
print >>stderr, "\n".join([
"arv-put: Resuming previous upload from last checkpoint.",
" Use the --no-resume option to start over."])
writer.report_progress()
- writer.do_queued_work() # Do work resumed from cache.
- for path in args.paths: # Copy file data to Keep.
- if path == '-':
- writer.start_new_stream()
- writer.start_new_file(args.filename)
- r = sys.stdin.read(64*1024)
- while r:
- # Need to bypass _queued_file check in ResumableCollectionWriter.write() to get
- # CollectionWriter.write().
- super(arvados.collection.ResumableCollectionWriter, writer).write(r)
- r = sys.stdin.read(64*1024)
- elif os.path.isdir(path):
- writer.write_directory_tree(
- path, max_manifest_depth=args.max_manifest_depth)
- else:
- writer.start_new_stream()
- writer.write_file(path, args.filename or os.path.basename(path))
- writer.finish_current_stream()
-
+ output = None
+ writer.start()
if args.progress: # Print newline to split stderr from stdout for humans.
print >>stderr
- output = None
if args.stream:
- output = writer.manifest_text()
if args.normalize:
- output = arvados.collection.CollectionReader(output).manifest_text(normalize=True)
+ output = writer.manifest_text(normalize=True)
+ else:
+ output = writer.manifest_text()
elif args.raw:
output = ','.join(writer.data_locators())
else:
try:
- manifest_text = writer.manifest_text()
- if args.normalize:
- manifest_text = arvados.collection.CollectionReader(manifest_text).manifest_text(normalize=True)
- replication_attr = 'replication_desired'
- if api_client._schema.schemas['Collection']['properties'].get(replication_attr, None) is None:
- # API called it 'redundancy' before #3410.
- replication_attr = 'redundancy'
- # Register the resulting collection in Arvados.
- collection = api_client.collections().create(
- body={
- 'owner_uuid': project_uuid,
- 'name': collection_name,
- 'manifest_text': manifest_text,
- replication_attr: args.replication,
- },
- ensure_unique_name=True
- ).execute(num_retries=args.retries)
-
- print >>stderr, "Collection saved as '%s'" % collection['name']
-
- if args.portable_data_hash and 'portable_data_hash' in collection and collection['portable_data_hash']:
- output = collection['portable_data_hash']
+ writer.save_collection()
+ print >>stderr, "Collection saved as '%s'" % writer.collection_name()
+ if args.portable_data_hash:
+ output = writer.portable_data_hash()
else:
- output = collection['uuid']
-
+ output = writer.manifest_locator()
except apiclient_errors.Error as error:
print >>stderr, (
"arv-put: Error creating Collection on project: {}.".format(
if status != 0:
sys.exit(status)
- if resume_cache is not None:
- resume_cache.destroy()
-
+ # Success!
+ writer.destroy_cache()
return output
+
if __name__ == '__main__':
main()
self._closing_lock = threading.RLock()
def run(self):
- self.id = 0
if self.last_log_id != None:
- self.id = self.last_log_id
+ # Caller supplied the last-seen event ID from a previous
+ # connection
+ skip_old_events = [["id", ">", str(self.last_log_id)]]
else:
- for f in self.filters:
- for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
- try:
- items = self.api.logs().list(limit=1, order="id desc", filters=f).execute()['items']
- break
- except errors.ApiError as error:
- pass
- else:
- tries_left = 0
- break
- if tries_left == 0:
- _logger.exception("PollClient thread could not contact API server.")
- with self._closing_lock:
- self._closing.set()
- thread.interrupt_main()
- return
- if items:
- if items[0]['id'] > self.id:
- self.id = items[0]['id']
+ # We need to do a reverse-order query to find the most
+ # recent event ID (see "if not skip_old_events" below).
+ skip_old_events = False
self.on_event({'status': 200})
while not self._closing.is_set():
- max_id = self.id
moreitems = False
for f in self.filters:
for tries_left in RetryLoop(num_retries=25, backoff_start=.1, max_wait=self.poll_time):
try:
- items = self.api.logs().list(order="id asc", filters=f+[["id", ">", str(self.id)]]).execute()
+ if not skip_old_events:
+ # If the caller didn't provide a known
+ # recent ID, our first request will ask
+ # for the single most recent event from
+ # the last 2 hours (the time restriction
+ # avoids doing an expensive database
+ # query, and leaves a big enough margin to
+ # account for clock skew). If we do find a
+ # recent event, we remember its ID but
+ # then discard it (we are supposed to be
+ # returning new/current events, not old
+ # ones).
+ #
+ # Subsequent requests will get multiple
+ # events in chronological order, and
+ # filter on that same cutoff time, or
+ # (once we see our first matching event)
+ # the ID of the last-seen event.
+ skip_old_events = [[
+ "created_at", ">=",
+ time.strftime(
+ "%Y-%m-%dT%H:%M:%SZ",
+ time.gmtime(time.time()-7200))]]
+ items = self.api.logs().list(
+ order="id desc",
+ limit=1,
+ filters=f+skip_old_events).execute()
+ if items["items"]:
+ skip_old_events = [
+ ["id", ">", str(items["items"][0]["id"])]]
+ items = {
+ "items": [],
+ "items_available": 0,
+ }
+ else:
+ # In this case, either we know the most
+ # recent matching ID, or we know there
+ # were no matching events in the 2-hour
+ # window before subscribing. Either way we
+ # can safely ask for events in ascending
+ # order.
+ items = self.api.logs().list(
+ order="id asc",
+ filters=f+skip_old_events).execute()
break
except errors.ApiError as error:
pass
thread.interrupt_main()
return
for i in items["items"]:
- if i['id'] > max_id:
- max_id = i['id']
+ skip_old_events = [["id", ">", str(i["id"])]]
with self._closing_lock:
if self._closing.is_set():
return
thread.interrupt_main()
if items["items_available"] > len(items["items"]):
moreitems = True
- self.id = max_id
if not moreitems:
self._closing.wait(self.poll_time)
import time
import unittest
import yaml
+import threading
+import hashlib
+import random
from cStringIO import StringIO
import arvados
import arvados.commands.put as arv_put
+import arvados_testutil as tutil
from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
import run_test_server
arv_put.ResumeCache, path)
-class ArvadosPutCollectionWriterTest(run_test_server.TestCaseWithServers,
- ArvadosBaseTestCase):
+class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
+ ArvadosBaseTestCase):
def setUp(self):
- super(ArvadosPutCollectionWriterTest, self).setUp()
+ super(ArvPutUploadJobTest, self).setUp()
run_test_server.authorize_with('active')
- with tempfile.NamedTemporaryFile(delete=False) as cachefile:
- self.cache = arv_put.ResumeCache(cachefile.name)
- self.cache_filename = cachefile.name
+ # Temp files creation
+ self.tempdir = tempfile.mkdtemp()
+ subdir = os.path.join(self.tempdir, 'subdir')
+ os.mkdir(subdir)
+ data = "x" * 1024 # 1 KB
+ for i in range(1, 5):
+ with open(os.path.join(self.tempdir, str(i)), 'w') as f:
+ f.write(data * i)
+ with open(os.path.join(subdir, 'otherfile'), 'w') as f:
+ f.write(data * 5)
+ # Large temp file for resume test
+ _, self.large_file_name = tempfile.mkstemp()
+ fileobj = open(self.large_file_name, 'w')
+ # Make sure to write just a little more than one block
+ for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
+ data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
+ fileobj.write(data)
+ fileobj.close()
+ self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
def tearDown(self):
- super(ArvadosPutCollectionWriterTest, self).tearDown()
- if os.path.exists(self.cache_filename):
- self.cache.destroy()
- self.cache.close()
-
- def test_writer_caches(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- cwriter.write_file('/dev/null')
- cwriter.cache_state()
- self.assertTrue(self.cache.load())
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
+ super(ArvPutUploadJobTest, self).tearDown()
+ shutil.rmtree(self.tempdir)
+ os.unlink(self.large_file_name)
def test_writer_works_without_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter()
- cwriter.write_file('/dev/null')
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
-
- def test_writer_resumes_from_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(
- self.cache)
- self.assertEqual(
- ". 098f6bcd4621d373cade4e832627b4f6+4 0:4:test\n",
- new_writer.manifest_text())
-
- def test_new_writer_from_stale_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- new_writer.write_file('/dev/null')
- self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", new_writer.manifest_text())
-
- def test_new_writer_from_empty_cache(self):
- cwriter = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- cwriter.write_file('/dev/null')
+ cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
+ cwriter.start()
self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
- def test_writer_resumable_after_arbitrary_bytes(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache)
- # These bytes are intentionally not valid UTF-8.
- with self.make_test_file('\x00\x07\xe2') as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(
- self.cache)
- self.assertEqual(cwriter.manifest_text(), new_writer.manifest_text())
+ def test_writer_works_with_cache(self):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write('foo')
+ f.flush()
+ cwriter = arv_put.ArvPutUploadJob([f.name])
+ cwriter.start()
+ self.assertEqual(3, cwriter.bytes_written)
+ # Don't destroy the cache, and start another upload
+ cwriter_new = arv_put.ArvPutUploadJob([f.name])
+ cwriter_new.start()
+ cwriter_new.destroy_cache()
+ self.assertEqual(0, cwriter_new.bytes_written)
def make_progress_tester(self):
progression = []
return progression, record_func
def test_progress_reporting(self):
- for expect_count in (None, 8):
- progression, reporter = self.make_progress_tester()
- cwriter = arv_put.ArvPutCollectionWriter(
- reporter=reporter, bytes_expected=expect_count)
- with self.make_test_file() as testfile:
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- self.assertIn((4, expect_count), progression)
-
- def test_resume_progress(self):
- cwriter = arv_put.ArvPutCollectionWriter(self.cache, bytes_expected=4)
- with self.make_test_file() as testfile:
- # Set up a writer with some flushed bytes.
- cwriter.write_file(testfile.name, 'test')
- cwriter.finish_current_stream()
- cwriter.cache_state()
- new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache)
- self.assertEqual(new_writer.bytes_written, 4)
+ with tempfile.NamedTemporaryFile() as f:
+ f.write('foo')
+ f.flush()
+ for expect_count in (None, 8):
+ progression, reporter = self.make_progress_tester()
+ cwriter = arv_put.ArvPutUploadJob([f.name],
+ reporter=reporter, bytes_expected=expect_count)
+ cwriter.start()
+ cwriter.destroy_cache()
+ self.assertIn((3, expect_count), progression)
+
+ def test_writer_upload_directory(self):
+ cwriter = arv_put.ArvPutUploadJob([self.tempdir])
+ cwriter.start()
+ cwriter.destroy_cache()
+ self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
+
+ def test_resume_large_file_upload(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ with self.assertRaises(SystemExit):
+ writer.start()
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer2.start()
+ self.assertEqual(writer.bytes_written + writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
os.chmod(cachedir, 0o700)
def test_put_block_replication(self):
- with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock, \
- mock.patch('arvados.commands.put.ResumeCache.load') as cache_mock:
- cache_mock.side_effect = ValueError
+ self.call_main_on_test_file()
+ with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
self.call_main_on_test_file(['--replication', '1'])
self.call_main_on_test_file(['--replication', '4'])
['--project-uuid', self.Z_UUID, '--stream'])
def test_api_error_handling(self):
- collections_mock = mock.Mock(name='arv.collections()')
- coll_create_mock = collections_mock().create().execute
- coll_create_mock.side_effect = arvados.errors.ApiError(
+ coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
+ coll_save_mock.side_effect = arvados.errors.ApiError(
fake_httplib2_response(403), '{}')
- arv_put.api_client = arvados.api('v1')
- arv_put.api_client.collections = collections_mock
- with self.assertRaises(SystemExit) as exc_test:
- self.call_main_with_args(['/dev/null'])
- self.assertLess(0, exc_test.exception.args[0])
- self.assertLess(0, coll_create_mock.call_count)
- self.assertEqual("", self.main_stdout.getvalue())
+ with mock.patch('arvados.collection.Collection.save_new',
+ new=coll_save_mock):
+ with self.assertRaises(SystemExit) as exc_test:
+ self.call_main_with_args(['/dev/null'])
+ self.assertLess(0, exc_test.exception.args[0])
+ self.assertLess(0, coll_save_mock.call_count)
+ self.assertEqual("", self.main_stdout.getvalue())
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
protected
def load_limit_offset_order_params *args
+ super
if action_name == 'index'
# Omit manifest_text from index results unless expressly selected.
@select ||= model_class.selectable_attributes - ["manifest_text"]
end
- super
end
end
end
end
+ wanted_klasses = []
+ request_filters.each do |col,op,val|
+ if op == 'is_a'
+ (val.is_a?(Array) ? val : [val]).each do |type|
+ type = type.split('#')[-1]
+ type[0] = type[0].capitalize
+ wanted_klasses << type
+ end
+ end
+ end
+
klasses.each do |klass|
+ next if wanted_klasses.any? and !wanted_klasses.include?(klass.to_s)
+
# If the currently requested orders specifically match the
# table_name for the current klass, apply that order.
# Otherwise, order by recency.
end
def log_destroy
- log_change('destroy') do |log|
+ log_change('delete') do |log|
log.fill_properties('old', etag(@old_attributes), @old_logged_attributes)
log.update_to nil
end
serialize :mounts, Hash
serialize :runtime_constraints, Hash
serialize :command, Array
+ serialize :scheduling_parameters, Hash
before_validation :fill_field_defaults, :if => :new_record?
before_validation :set_timestamps
t.add :started_at
t.add :state
t.add :auth_uuid
+ t.add :scheduling_parameters
end
# Supported states for a container
self.mounts ||= {}
self.cwd ||= "."
self.priority ||= 1
+ self.scheduling_parameters ||= {}
end
def permission_to_create
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
:environment, :mounts, :output_path, :priority,
- :runtime_constraints)
+ :runtime_constraints, :scheduling_parameters)
end
case self.state
if self.runtime_constraints_changed?
self.runtime_constraints = self.class.deep_sort_hash(self.runtime_constraints)
end
+ if self.scheduling_parameters_changed?
+ self.scheduling_parameters = self.class.deep_sort_hash(self.scheduling_parameters)
+ end
end
def handle_completed
output_path: self.output_path,
container_image: self.container_image,
mounts: self.mounts,
- runtime_constraints: self.runtime_constraints
+ runtime_constraints: self.runtime_constraints,
+ scheduling_parameters: self.scheduling_parameters
}
c = Container.create! c_attrs
retryable_requests.each do |cr|
serialize :mounts, Hash
serialize :runtime_constraints, Hash
serialize :command, Array
+ serialize :scheduling_parameters, Hash
before_validation :fill_field_defaults, :if => :new_record?
+ before_validation :validate_runtime_constraints
+ before_validation :validate_scheduling_parameters
before_validation :set_container
validates :command, :container_image, :output_path, :cwd, :presence => true
validate :validate_state_change
validate :validate_change
- validate :validate_runtime_constraints
after_save :update_priority
after_save :finalize_if_needed
before_create :set_requesting_container_uuid
t.add :runtime_constraints
t.add :state
t.add :use_existing
+ t.add :scheduling_parameters
end
# Supported states for a container request
self.mounts ||= {}
self.cwd ||= "."
self.container_count_max ||= Rails.configuration.container_count_max
+ self.scheduling_parameters ||= {}
end
# Create a new container (or find an existing one) to satisfy this
if not reusable.nil?
reusable
else
+ c_attrs[:scheduling_parameters] = self.scheduling_parameters
Container.create!(c_attrs)
end
end
errors.add :runtime_constraints, "#{k} must be a positive integer"
end
end
+
+ if runtime_constraints.include? 'keep_cache_ram' and
+ (!runtime_constraints['keep_cache_ram'].is_a?(Integer) or
+ runtime_constraints['keep_cache_ram'] <= 0)
+ errors.add :runtime_constraints, "keep_cache_ram must be a positive integer"
+ elsif !runtime_constraints.include? 'keep_cache_ram'
+ runtime_constraints['keep_cache_ram'] = Rails.configuration.container_default_keep_cache_ram
+ end
+ end
+ end
+
+ def validate_scheduling_parameters
+ if self.state == Committed
+ if scheduling_parameters.include? 'partitions' and
+ (!scheduling_parameters['partitions'].is_a?(Array) ||
+ scheduling_parameters['partitions'].reject{|x| !x.is_a?(String)}.size !=
+ scheduling_parameters['partitions'].size)
+ errors.add :scheduling_parameters, "partitions must be an array of strings"
+ end
end
end
:container_image, :cwd, :description, :environment,
:filters, :mounts, :name, :output_path, :priority,
:properties, :requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid, :use_existing
+ :state, :container_uuid, :use_existing, :scheduling_parameters
when Committed
if container_uuid.nil?
permitted.push :command, :container_image, :cwd, :description, :environment,
:filters, :mounts, :name, :output_path, :properties,
:requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid
+ :state, :container_uuid, :scheduling_parameters
end
when Final
self.event_at = thing.created_at
when "update"
self.event_at = thing.modified_at
- when "destroy"
+ when "delete"
self.event_at = db_current_time
end
self
belongs_to(:job, foreign_key: :job_uuid, primary_key: :uuid)
attr_accessor :job_readable
+ UNUSED_NODE_IP = '127.40.4.0'
+
api_accessible :user, :extend => :common do |t|
t.add :hostname
t.add :domain
end
def dns_server_update
- if self.hostname_changed? or self.ip_address_changed?
- if not self.ip_address.nil?
- stale_conflicting_nodes = Node.where('id != ? and ip_address = ? and last_ping_at < ?',self.id,self.ip_address,10.minutes.ago)
- if not stale_conflicting_nodes.empty?
- # One or more stale compute node records have the same IP address as the new node.
- # Clear the ip_address field on the stale nodes.
- stale_conflicting_nodes.each do |stale_node|
- stale_node.ip_address = nil
- stale_node.save!
- end
+ if hostname_changed? && hostname_was
+ self.class.dns_server_update(hostname_was, UNUSED_NODE_IP)
+ end
+ if hostname_changed? or ip_address_changed?
+ if ip_address
+ Node.where('id != ? and ip_address = ? and last_ping_at < ?',
+ id, ip_address, 10.minutes.ago).each do |stale_node|
+ # One or more stale compute node records have the same IP
+ # address as the new node. Clear the ip_address field on
+ # the stale nodes.
+ stale_node.ip_address = nil
+ stale_node.save!
end
end
- if self.hostname and self.ip_address
- self.class.dns_server_update(self.hostname, self.ip_address)
+ if hostname
+ self.class.dns_server_update(hostname, ip_address || UNUSED_NODE_IP)
end
end
end
if !File.exists? hostfile
n = Node.where(:slot_number => slot_number).first
if n.nil? or n.ip_address.nil?
- dns_server_update(hostname, '127.40.4.0')
+ dns_server_update(hostname, UNUSED_NODE_IP)
else
dns_server_update(hostname, n.ip_address)
end
# with the cancelled container.
container_count_max: 3
+ # Default value for keep_cache_ram of a container's runtime_constraints.
+ container_default_keep_cache_ram: 268435456
+
development:
force_ssl: false
cache_classes: false
--- /dev/null
+class AddSchedulingParametersToContainer < ActiveRecord::Migration
+ def change
+ add_column :containers, :scheduling_parameters, :text
+ add_column :container_requests, :scheduling_parameters, :text
+ end
+end
filters text,
updated_at timestamp without time zone NOT NULL,
container_count integer DEFAULT 0,
- use_existing boolean DEFAULT true
+ use_existing boolean DEFAULT true,
+ scheduling_parameters text
);
updated_at timestamp without time zone NOT NULL,
exit_code integer,
auth_uuid character varying(255),
- locked_by_uuid character varying(255)
+ locked_by_uuid character varying(255),
+ scheduling_parameters text
);
INSERT INTO schema_migrations (version) VALUES ('20160926194129');
-INSERT INTO schema_migrations (version) VALUES ('20161019171346');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20161019171346');
+
+INSERT INTO schema_migrations (version) VALUES ('20161111143147');
\ No newline at end of file
hostname: compute3
slot_number: ~
domain: ""
- ip_address: 172.17.2.173
+ ip_address: 172.17.2.174
last_ping_at: <%= 1.hour.ago.to_s(:db) %>
first_ping_at: <%= 23.hour.ago.to_s(:db) %>
job_uuid: ~
owner_uuid: zzzzz-tpzed-000000000000000
hostname: ~
slot_number: ~
- ip_address: 172.17.2.173
+ ip_address: 172.17.2.175
last_ping_at: ~
first_ping_at: ~
job_uuid: ~
owner_uuid: zzzzz-tpzed-000000000000000
hostname: custom1
slot_number: 23
- ip_address: 172.17.2.173
+ ip_address: 172.17.2.176
last_ping_at: ~
first_ping_at: ~
job_uuid: ~
end
end
+ test 'index without select returns everything except manifest' do
+ authorize_with :active
+ get :index
+ assert_response :success
+ assert json_response['items'].any?
+ json_response['items'].each do |coll|
+ assert_includes(coll.keys, 'uuid')
+ assert_includes(coll.keys, 'name')
+ assert_includes(coll.keys, 'created_at')
+ refute_includes(coll.keys, 'manifest_text')
+ end
+ end
+
+ ['', nil, false, 'null'].each do |select|
+ test "index with select=#{select.inspect} returns everything except manifest" do
+ authorize_with :active
+ get :index, select: select
+ assert_response :success
+ assert json_response['items'].any?
+ json_response['items'].each do |coll|
+ assert_includes(coll.keys, 'uuid')
+ assert_includes(coll.keys, 'name')
+ assert_includes(coll.keys, 'created_at')
+ refute_includes(coll.keys, 'manifest_text')
+ end
+ end
+ end
+
+ [["uuid"],
+ ["uuid", "manifest_text"],
+ '["uuid"]',
+ '["uuid", "manifest_text"]'].each do |select|
+ test "index with select=#{select.inspect} returns no name" do
+ authorize_with :active
+ get :index, select: select
+ assert_response :success
+ assert json_response['items'].any?
+ json_response['items'].each do |coll|
+ refute_includes(coll.keys, 'name')
+ end
+ end
+ end
+
[0,1,2].each do |limit|
test "get index with limit=#{limit}" do
authorize_with :active
assert_equal({}, c.environment)
assert_equal({"/out" => {"kind"=>"tmp", "capacity"=>1000000}}, c.mounts)
assert_equal "/out", c.output_path
- assert_equal({"vcpus" => 2, "ram" => 30}, c.runtime_constraints)
+ assert_equal({"keep_cache_ram"=>268435456, "vcpus" => 2, "ram" => 30}, c.runtime_constraints)
assert_equal 1, c.priority
assert_raises(ActiveRecord::RecordInvalid) do
command: ["echo", "hello"],
output_path: "test",
runtime_constraints: {"vcpus" => 4,
- "ram" => 12000000000},
+ "ram" => 12000000000,
+ "keep_cache_ram" => 268435456},
mounts: {"test" => {"kind" => "json"}}}
set_user_from_auth :active
cr1 = create_minimal_req!(common_attrs.merge({state: ContainerRequest::Committed,
assert_equal cr.container_uuid, cr3.container_uuid
assert_equal ContainerRequest::Final, cr3.state
end
+
+ [
+ [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => 100}, ContainerRequest::Committed, 100],
+ [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Uncommitted],
+ [{"vcpus" => 1, "ram" => 123}, ContainerRequest::Committed],
+ [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => -1}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"vcpus" => 1, "ram" => 123, "keep_cache_ram" => '123'}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ ].each do |rc, state, expected|
+ test "create container request with #{rc} in state #{state} and verify keep_cache_ram #{expected}" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ runtime_constraints: rc,
+ mounts: {"test" => {"kind" => "json"}}}
+ set_user_from_auth :active
+
+ if expected == ActiveRecord::RecordInvalid
+ assert_raises(ActiveRecord::RecordInvalid) do
+ create_minimal_req!(common_attrs.merge({state: state}))
+ end
+ else
+ cr = create_minimal_req!(common_attrs.merge({state: state}))
+ expected = Rails.configuration.container_default_keep_cache_ram if state == ContainerRequest::Committed and expected.nil?
+ assert_equal expected, cr.runtime_constraints['keep_cache_ram']
+ end
+ end
+ end
+
+ [
+ [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
+ [{"partitions" => "fastcpu"}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
+ [{"partitions" => "fastcpu"}, ContainerRequest::Uncommitted],
+ [{"partitions" => ["fastcpu","vfastcpu"]}, ContainerRequest::Committed],
+ ].each do |sp, state, expected|
+ test "create container request with scheduling_parameters #{sp} in state #{state} and verify #{expected}" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ scheduling_parameters: sp,
+ mounts: {"test" => {"kind" => "json"}}}
+ set_user_from_auth :active
+
+ if expected == ActiveRecord::RecordInvalid
+ assert_raises(ActiveRecord::RecordInvalid) do
+ create_minimal_req!(common_attrs.merge({state: state}))
+ end
+ else
+ cr = create_minimal_req!(common_attrs.merge({state: state}))
+ assert_equal sp, cr.scheduling_parameters
+
+ if state == ContainerRequest::Committed
+ c = Container.find_by_uuid(cr.container_uuid)
+ assert_equal sp, c.scheduling_parameters
+ end
+ end
+ end
+ end
end
EVENT_TEST_METHODS = {
:create => [:created_at, :assert_nil, :assert_not_nil],
:update => [:modified_at, :assert_not_nil, :assert_not_nil],
- :destroy => [nil, :assert_not_nil, :assert_nil],
+ :delete => [nil, :assert_not_nil, :assert_nil],
}
setup do
orig_attrs = auth.attributes
orig_attrs.delete 'api_token'
auth.destroy
- assert_logged(auth, :destroy) do |props|
+ assert_logged(auth, :delete) do |props|
assert_equal(orig_etag, props['old_etag'], "destroyed auth etag mismatch")
assert_equal(orig_attrs, props['old_attributes'],
"destroyed auth attributes mismatch")
auth.save!
assert_logged_with_clean_properties(auth, :update, 'api_token')
auth.destroy
- assert_logged_with_clean_properties(auth, :destroy, 'api_token')
+ assert_logged_with_clean_properties(auth, :delete, 'api_token')
end
test "use ownership and permission links to determine which logs a user can see" do
coll.save!
assert_logged_with_clean_properties(coll, :update, 'manifest_text')
coll.destroy
- assert_logged_with_clean_properties(coll, :destroy, 'manifest_text')
+ assert_logged_with_clean_properties(coll, :delete, 'manifest_text')
end
end
assert_equal(txt, props['new_attributes']['manifest_text'])
end
coll.destroy
- assert_logged(coll, :destroy) do |props|
+ assert_logged(coll, :delete) do |props|
assert_equal(txt, props['old_attributes']['manifest_text'])
end
end
refute_nil node2.slot_number
assert_equal "custom1", node2.hostname
end
+
+ test "update dns when nodemanager clears hostname and ip_address" do
+ act_as_system_user do
+ node = ping_node(:new_with_custom_hostname, {})
+ Node.expects(:dns_server_update).with(node.hostname, Node::UNUSED_NODE_IP)
+ node.update_attributes(hostname: nil, ip_address: nil)
+ end
+ end
+
+ test "update dns when hostname changes" do
+ act_as_system_user do
+ node = ping_node(:new_with_custom_hostname, {})
+
+ Node.expects(:dns_server_update).with(node.hostname, Node::UNUSED_NODE_IP)
+ Node.expects(:dns_server_update).with('foo0', node.ip_address)
+ node.update_attributes!(hostname: 'foo0')
+
+ Node.expects(:dns_server_update).with('foo0', Node::UNUSED_NODE_IP)
+ node.update_attributes!(hostname: nil, ip_address: nil)
+
+ Node.expects(:dns_server_update).with('foo0', '10.11.12.13')
+ node.update_attributes!(hostname: 'foo0', ip_address: '10.11.12.13')
+
+ Node.expects(:dns_server_update).with('foo0', '10.11.12.14')
+ node.update_attributes!(hostname: 'foo0', ip_address: '10.11.12.14')
+ end
+ end
end
if err := srv.Start(); err != nil {
log.Fatal(err)
}
- if _, err := daemon.SdNotify("READY=1"); err != nil {
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
log.Println("Listening at", srv.Addr)
PollInterval: time.Duration(theConfig.PollPeriod),
DoneProcessing: make(chan struct{})}
- if _, err := daemon.SdNotify("READY=1"); err != nil {
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
- if container.RuntimeConstraints.Partition != nil {
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.RuntimeConstraints.Partition, ",")))
+ if container.SchedulingParameters.Partitions != nil {
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
return exec.Command("sbatch", sbatchArgs...)
func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
theConfig.SbatchArguments = nil
- container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1, Partition: []string{"blurb", "b2"}}}
+ container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
sbatchCmd := sbatchFunc(container)
var expected []string
pdhOnly := true
tmpcount := 0
arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+
+ if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 {
+ arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM))
+ }
+
collectionPaths := []string{}
runner.Binds = nil
checkEmpty()
}
+ {
+ i = 0
+ cr.Container.RuntimeConstraints.KeepCacheRAM = 512
+ cr.Container.Mounts = map[string]arvados.Mount{
+ "/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
+ "/keepout": {Kind: "collection", Writable: true},
+ }
+ cr.OutputPath = "/keepout"
+
+ os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+ os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
+
+ err := cr.SetupMounts()
+ c.Check(err, IsNil)
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+ sort.StringSlice(cr.Binds).Sort()
+ c.Check(cr.Binds, DeepEquals, []string{realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
+ realTemp + "/keep1/tmp0:/keepout"})
+ cr.CleanupDirs()
+ checkEmpty()
+ }
+
for _, test := range []struct {
in interface{}
out string
[Service]
Type=simple
-ExecStart=/usr/bin/env arvados-docker-cleaner
Restart=always
RestartSec=10s
+RestartPreventExitStatus=2
+#
+# This unwieldy ExecStart command detects at runtime whether
+# arvados-docker-cleaner is installed with the Python 3.3 Software
+# Collection, and if so, invokes it with the "scl" wrapper.
+ExecStart=/bin/sh -c 'if [ -e /opt/rh/python33/root/bin/arvados-docker-cleaner ]; then exec scl enable python33 arvados-docker-cleaner; else exec arvados-docker-cleaner; fi'
[Install]
WantedBy=multi-user.target
import docker
import json
+DEFAULT_CONFIG_FILE = '/etc/arvados/docker-cleaner/docker-cleaner.json'
+
SUFFIX_SIZES = {suffix: 1024 ** exp for exp, suffix in enumerate('kmgt', 1)}
logger = logging.getLogger('arvados_docker.cleaner')
c = json.load(f)
config.update(c)
except (FileNotFoundError, IOError, ValueError) as error:
- sys.exit('error reading config file {}: {}'.format(args.config, error))
+ if (isinstance(error, FileNotFoundError) and
+ args.config == DEFAULT_CONFIG_FILE):
+ logger.warning("DEPRECATED: default config file %s not found; "
+ "relying on command line configuration",
+ repr(DEFAULT_CONFIG_FILE))
+ else:
+ sys.exit('error reading config file {}: {}'.format(
+ args.config, error))
configargs = vars(args).copy()
configargs.pop('config')
formatter_class=Formatter,
)
parser.add_argument(
- '--config', action='store', type=str, default='/etc/arvados/docker-cleaner/docker-cleaner.json',
+ '--config', action='store', type=str, default=DEFAULT_CONFIG_FILE,
help="configuration file")
deprecated = " (DEPRECATED -- use config file instead)"
return parser.parse_args(arguments)
-def setup_logging(config):
+def setup_logging():
log_handler = logging.StreamHandler()
log_handler.setFormatter(logging.Formatter(
'%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
'%Y-%m-%d %H:%M:%S'))
logger.addHandler(log_handler)
+
+
+def configure_logging(config):
logger.setLevel(logging.ERROR - (10 * config['Verbose']))
def main(arguments=sys.argv[1:]):
+ setup_logging()
config = load_config(arguments)
- setup_logging(config)
+ configure_logging(config)
try:
run(config, docker.Client(version='1.14'))
except KeyboardInterrupt:
],
install_requires=[
'docker-py==1.7.2',
+ 'setuptools',
],
tests_require=[
'pbr<1.7.0',
import collections
import itertools
import json
+import os
import random
import tempfile
import time
self.assertEqual('never', config['RemoveStoppedContainers'])
self.assertEqual(1, config['Verbose'])
+ def test_args_no_config(self):
+ self.assertEqual(False, os.path.exists(cleaner.DEFAULT_CONFIG_FILE))
+ config = cleaner.load_config(['--quota', '1G'])
+ self.assertEqual(1 << 30, config['Quota'])
+
class ContainerRemovalTestCase(unittest.TestCase):
LIFECYCLE = ['create', 'attach', 'start', 'resize', 'die', 'destroy']
return True
def listen_for_events(self):
- self.events = arvados.events.subscribe(self._api_client,
- [["event_type", "in", ["create", "update", "delete"]]],
- self.on_event)
+ self.events = arvados.events.subscribe(
+ self._api_client,
+ [["event_type", "in", ["create", "update", "delete"]]],
+ self.on_event)
@catch_exceptions
def on_event(self, ev):
if err := srv.Start(); err != nil {
log.Fatal(err)
}
- if _, err := daemon.SdNotify("READY=1"); err != nil {
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
log.Println("Listening at", srv.Addr)
if err != nil {
log.Fatalf("listen(%s): %s", cfg.Listen, err)
}
- if _, err := daemon.SdNotify("READY=1"); err != nil {
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
log.Println("Listening at", listener.Addr())
import (
"bytes"
+ "context"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
+ "net/http"
"os"
"regexp"
"strconv"
"sync"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/curoverse/azure-sdk-for-go/storage"
)
+const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
+
var (
azureMaxGetBytes int
azureStorageAccountName string
ContainerName string
AzureReplication int
ReadOnly bool
+ RequestTimeout arvados.Duration
azClient storage.Client
bsClient storage.BlobStorageClient
StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
ContainerName: "example-container-name",
AzureReplication: 3,
+ RequestTimeout: azureDefaultRequestTimeout,
},
}
}
if err != nil {
return fmt.Errorf("creating Azure storage client: %s", err)
}
+
+ if v.RequestTimeout == 0 {
+ v.RequestTimeout = azureDefaultRequestTimeout
+ }
+ v.azClient.HTTPClient = &http.Client{
+ Timeout: time.Duration(v.RequestTimeout),
+ }
v.bsClient = v.azClient.GetBlobService()
ok, err := v.bsClient.ContainerExists(v.ContainerName)
// If the block is younger than azureWriteRaceInterval and is
// unexpectedly empty, assume a PutBlob operation is in progress, and
// wait for it to finish writing.
-func (v *AzureBlobVolume) Get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return 0, err
}
// Compare the given data with existing stored data.
-func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
+func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte) error {
trashed, _, err := v.checkTrashed(loc)
if err != nil {
return err
return v.translateError(err)
}
defer rdr.Close()
- return compareReaderWithBuf(rdr, expect, loc[:32])
+ return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
}
// Put stores a Keep block as a block blob in the container.
-func (v *AzureBlobVolume) Put(loc string, block []byte) error {
+func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
import (
"bytes"
+ "context"
"crypto/md5"
"encoding/base64"
"encoding/xml"
data[i] = byte((i + 7) & 0xff)
}
hash := fmt.Sprintf("%x", md5.Sum(data))
- err := v.Put(hash, data)
+ err := v.Put(context.Background(), hash, data)
if err != nil {
t.Error(err)
}
gotData := make([]byte, len(data))
- gotLen, err := v.Get(hash, gotData)
+ gotLen, err := v.Get(context.Background(), hash, gotData)
if err != nil {
t.Error(err)
}
allDone := make(chan struct{})
v.azHandler.race = make(chan chan struct{})
go func() {
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Error(err)
}
v.azHandler.race <- continuePut
go func() {
buf := make([]byte, len(TestBlock))
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
}
go func() {
defer close(allDone)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Error(err)
return
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io"
return <-outcome
}
-func compareReaderWithBuf(rdr io.Reader, expect []byte, hash string) error {
+func compareReaderWithBuf(ctx context.Context, rdr io.Reader, expect []byte, hash string) error {
bufLen := 1 << 20
if bufLen > len(expect) && len(expect) > 0 {
// No need for bufLen to be longer than
// expected to equal the next N bytes read from
// rdr.
for {
- n, err := rdr.Read(buf)
+ ready := make(chan bool)
+ var n int
+ var err error
+ go func() {
+ n, err = rdr.Read(buf)
+ close(ready)
+ }()
+ select {
+ case <-ready:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
return collisionOrCorrupt(hash, expect[:len(expect)-len(cmp)], buf[:n], rdr)
}
)
type Config struct {
+ Debug bool
Listen string
PIDFile string
blobSigningKey []byte
systemAuthToken string
+ debugLogf func(string, ...interface{})
}
var theConfig = DefaultConfig()
// Start should be called exactly once: after setting all public
// fields, and before using the config.
func (cfg *Config) Start() error {
+ if cfg.Debug {
+ cfg.debugLogf = log.Printf
+ cfg.debugLogf("debugging enabled")
+ } else {
+ cfg.debugLogf = func(string, ...interface{}) {}
+ }
+
if cfg.MaxBuffers < 0 {
return fmt.Errorf("MaxBuffers must be greater than zero")
}
--- /dev/null
+package main
+
+import (
+ "log"
+)
+
+func init() {
+ theConfig.debugLogf = log.Printf
+}
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"net/http"
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- if err := vols[0].Put(TestHash, TestBlock); err != nil {
+ if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
- vols[1].Put(TestHash2, TestBlock2)
- vols[0].Put(TestHash+".meta", []byte("metadata"))
- vols[1].Put(TestHash2+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), TestHash, TestBlock)
+ vols[1].Put(context.Background(), TestHash2, TestBlock2)
+ vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
+ vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
theConfig.systemAuthToken = "DATA MANAGER TOKEN"
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.Background(), TestHash, TestBlock)
// Explicitly set the BlobSignatureTTL to 0 for these
// tests, to ensure the MockVolume deletes the blocks
}
// Confirm the block has been deleted
buf := make([]byte, BlockSize)
- _, err := vols[0].Get(TestHash, buf)
+ _, err := vols[0].Get(context.Background(), TestHash, buf)
var blockDeleted = os.IsNotExist(err)
if !blockDeleted {
t.Error("superuserExistingBlockReq: block not deleted")
// A DELETE request on a block newer than BlobSignatureTTL
// should return success but leave the block on the volume.
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.Background(), TestHash, TestBlock)
theConfig.BlobSignatureTTL = arvados.Duration(time.Hour)
response = IssueRequest(superuserExistingBlockReq)
expectedDc, responseDc)
}
// Confirm the block has NOT been deleted.
- _, err = vols[0].Get(TestHash, buf)
+ _, err = vols[0].Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("testing delete on new block: %s\n", err)
}
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
- if err := KeepVM.AllWritable()[0].Put(TestHash, TestBlock); err != nil {
+ if err := KeepVM.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- if err := vols[0].Put(TestHash, TestBlock); err != nil {
+ if err := vols[0].Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, TestBlock)
+ vols[0].Put(context.Background(), TestHash, TestBlock)
theConfig.systemAuthToken = "DATA MANAGER TOKEN"
import (
"container/list"
+ "context"
"crypto/md5"
"encoding/json"
"fmt"
// GetBlockHandler is a HandleFunc to address Get block requests.
func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
+ ctx, cancel := contextForResponse(context.TODO(), resp)
+ defer cancel()
+
if theConfig.RequireSignatures {
locator := req.URL.Path[1:] // strip leading slash
if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
// isn't here, we can return 404 now instead of waiting for a
// buffer.
- buf, err := getBufferForResponseWriter(resp, bufs, BlockSize)
+ buf, err := getBufferWithContext(ctx, bufs, BlockSize)
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
defer bufs.Put(buf)
- size, err := GetBlock(mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
resp.Write(buf[:size])
}
+// Return a new context that gets cancelled by resp's CloseNotifier.
+func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
+ ctx, cancel := context.WithCancel(parent)
+ if cn, ok := resp.(http.CloseNotifier); ok {
+ go func(c <-chan bool) {
+ select {
+ case <-c:
+ theConfig.debugLogf("cancel context")
+ cancel()
+ case <-ctx.Done():
+ }
+ }(cn.CloseNotify())
+ }
+ return ctx, cancel
+}
+
// Get a buffer from the pool -- but give up and return a non-nil
-// error if resp implements http.CloseNotifier and tells us that the
-// client has disconnected before we get a buffer.
-func getBufferForResponseWriter(resp http.ResponseWriter, bufs *bufferPool, bufSize int) ([]byte, error) {
- var closeNotifier <-chan bool
- if resp, ok := resp.(http.CloseNotifier); ok {
- closeNotifier = resp.CloseNotify()
- }
- var buf []byte
+// error if ctx ends before we get a buffer.
+func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
bufReady := make(chan []byte)
go func() {
bufReady <- bufs.Get(bufSize)
- close(bufReady)
}()
select {
- case buf = <-bufReady:
+ case buf := <-bufReady:
return buf, nil
- case <-closeNotifier:
+ case <-ctx.Done():
go func() {
// Even if closeNotifier happened first, we
// need to keep waiting for our buf so we can
// PutBlockHandler is a HandleFunc to address Put block requests.
func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
+ ctx, cancel := contextForResponse(context.TODO(), resp)
+ defer cancel()
+
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
return
}
- buf, err := getBufferForResponseWriter(resp, bufs, int(req.ContentLength))
+ buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
return
}
- replication, err := PutBlock(buf, hash)
+ replication, err := PutBlock(ctx, buf, hash)
bufs.Put(buf)
if err != nil {
- ke := err.(*KeepError)
- http.Error(resp, ke.Error(), ke.HTTPCode)
+ code := http.StatusInternalServerError
+ if err, ok := err.(*KeepError); ok {
+ code = err.HTTPCode
+ }
+ http.Error(resp, err.Error(), code)
return
}
// If the block found does not have the correct MD5 hash, returns
// DiskHashError.
//
-func GetBlock(hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
for _, vol := range KeepVM.AllReadable() {
- size, err := vol.Get(hash, buf)
+ size, err := vol.Get(ctx, hash, buf)
+ select {
+ case <-ctx.Done():
+ return 0, ErrClientDisconnect
+ default:
+ }
if err != nil {
// IsNotExist is an expected error and may be
// ignored. All other errors are logged. In
// PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
//
-// PutBlock(block, hash)
+// PutBlock(ctx, block, hash)
// Stores the BLOCK (identified by the content id HASH) in Keep.
//
// The MD5 checksum of the block must be identical to the content id HASH.
// all writes failed). The text of the error message should
// provide as much detail as possible.
//
-func PutBlock(block []byte, hash string) (int, error) {
+func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
// If we already have this data, it's intact on disk, and we
// can update its timestamp, return success. If we have
// different data with the same hash, return failure.
- if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
+ if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
return n, err
+ } else if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if vol := KeepVM.NextWritable(); vol != nil {
- if err := vol.Put(hash, block); err == nil {
+ if err := vol.Put(ctx, hash, block); err == nil {
return vol.Replication(), nil // success!
}
+ if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
+ }
}
writables := KeepVM.AllWritable()
allFull := true
for _, vol := range writables {
- err := vol.Put(hash, block)
+ err := vol.Put(ctx, hash, block)
+ if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
+ }
if err == nil {
return vol.Replication(), nil // success!
}
// the relevant block's modification time in order to protect it from
// premature garbage collection. Otherwise, it returns a non-nil
// error.
-func CompareAndTouch(hash string, buf []byte) (int, error) {
+func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
var bestErr error = NotFoundError
for _, vol := range KeepVM.AllWritable() {
- if err := vol.Compare(hash, buf); err == CollisionError {
+ err := vol.Compare(ctx, hash, buf)
+ if ctx.Err() != nil {
+ return 0, ctx.Err()
+ } else if err == CollisionError {
// Stop if we have a block with same hash but
// different content. (It will be impossible
// to tell which one is wanted if we have
import (
"bytes"
+ "context"
)
// A TestableVolumeManagerFactory creates a volume manager with at least two TestableVolume instances.
// Get should pass
buf := make([]byte, len(testBlock))
- n, err := GetBlock(testHash, buf, nil)
+ n, err := GetBlock(context.Background(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error while getting block %s", err)
}
// Get should fail
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.Background(), testHash, buf, nil)
if err == nil {
t.Fatalf("Got %+q, expected error while getting corrupt block %v", buf[:size], testHash)
}
setupHandlersWithGenericVolumeTest(t, factory)
// PutBlock
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
// Check that PutBlock succeeds again even after CompareAndTouch
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock: %s", err)
}
// Check that PutBlock stored the data as expected
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.Background(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
} else if bytes.Compare(buf[:size], testBlock) != 0 {
testableVolumes[1].PutRaw(testHash, badData)
// Check that PutBlock with good data succeeds
- if _, err := PutBlock(testBlock, testHash); err != nil {
+ if _, err := PutBlock(context.Background(), testBlock, testHash); err != nil {
t.Fatalf("Error during PutBlock for %q: %s", testHash, err)
}
// Put succeeded and overwrote the badData in one volume,
// and Get should return the testBlock now, ignoring the bad data.
buf := make([]byte, BlockSize)
- size, err := GetBlock(testHash, buf, nil)
+ size, err := GetBlock(context.Background(), testHash, buf, nil)
if err != nil {
t.Fatalf("Error during GetBlock for %q: %s", testHash, err)
} else if bytes.Compare(buf[:size], testBlock) != 0 {
signal.Notify(term, syscall.SIGTERM)
signal.Notify(term, syscall.SIGINT)
- if _, err := daemon.SdNotify("READY=1"); err != nil {
+ if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
log.Println("listening at", listener.Addr())
import (
"bytes"
+ "context"
"fmt"
"io/ioutil"
"os"
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- if err := vols[1].Put(TestHash, TestBlock); err != nil {
+ if err := vols[1].Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
// Check that GetBlock returns success.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.Background(), TestHash, buf, nil)
if err != nil {
t.Errorf("GetBlock error: %s", err)
}
// Check that GetBlock returns failure.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.Background(), TestHash, buf, nil)
if err != NotFoundError {
t.Errorf("Expected NotFoundError, got %v, err %v", buf[:size], err)
}
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- vols[0].Put(TestHash, BadBlock)
+ vols[0].Put(context.Background(), TestHash, BadBlock)
// Check that GetBlock returns failure.
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.Background(), TestHash, buf, nil)
if err != DiskHashError {
t.Errorf("Expected DiskHashError, got %v (buf: %v)", err, buf[:size])
}
defer KeepVM.Close()
// Check that PutBlock stores the data as expected.
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols := KeepVM.AllReadable()
buf := make([]byte, BlockSize)
- n, err := vols[1].Get(TestHash, buf)
+ n, err := vols[1].Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatalf("Volume #0 Get returned error: %v", err)
}
vols[0].(*MockVolume).Bad = true
// Check that PutBlock stores the data as expected.
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
buf := make([]byte, BlockSize)
- size, err := GetBlock(TestHash, buf, nil)
+ size, err := GetBlock(context.Background(), TestHash, buf, nil)
if err != nil {
t.Fatalf("GetBlock: %v", err)
}
// Check that PutBlock returns the expected error when the hash does
// not match the block.
- if _, err := PutBlock(BadBlock, TestHash); err != RequestHashError {
+ if _, err := PutBlock(context.Background(), BadBlock, TestHash); err != RequestHashError {
t.Errorf("Expected RequestHashError, got %v", err)
}
// Confirm that GetBlock fails to return anything.
- if result, err := GetBlock(TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
+ if result, err := GetBlock(context.Background(), TestHash, make([]byte, BlockSize), nil); err != NotFoundError {
t.Errorf("GetBlock succeeded after a corrupt block store (result = %s, err = %v)",
string(result), err)
}
// Store a corrupted block under TestHash.
vols := KeepVM.AllWritable()
- vols[0].Put(TestHash, BadBlock)
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ vols[0].Put(context.Background(), TestHash, BadBlock)
+ if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
t.Errorf("PutBlock: n %d err %v", n, err)
}
// The block on disk should now match TestBlock.
buf := make([]byte, BlockSize)
- if size, err := GetBlock(TestHash, buf, nil); err != nil {
+ if size, err := GetBlock(context.Background(), TestHash, buf, nil); err != nil {
t.Errorf("GetBlock: %v", err)
} else if bytes.Compare(buf[:size], TestBlock) != 0 {
t.Errorf("Got %+q, expected %+q", buf[:size], TestBlock)
// Store one block, then attempt to store the other. Confirm that
// PutBlock reported a CollisionError.
- if _, err := PutBlock(b1, locator); err != nil {
+ if _, err := PutBlock(context.Background(), b1, locator); err != nil {
t.Error(err)
}
- if _, err := PutBlock(b2, locator); err == nil {
+ if _, err := PutBlock(context.Background(), b2, locator); err == nil {
t.Error("PutBlock did not report a collision")
} else if err != CollisionError {
t.Errorf("PutBlock returned %v", err)
// Store a block and then make the underlying volume bad,
// so a subsequent attempt to update the file timestamp
// will fail.
- vols[0].Put(TestHash, BadBlock)
+ vols[0].Put(context.Background(), TestHash, BadBlock)
oldMtime, err := vols[0].Mtime(TestHash)
if err != nil {
t.Fatalf("vols[0].Mtime(%s): %s\n", TestHash, err)
// vols[0].Touch will fail on the next call, so the volume
// manager will store a copy on vols[1] instead.
vols[0].(*MockVolume).Touchable = false
- if n, err := PutBlock(TestBlock, TestHash); err != nil || n < 1 {
+ if n, err := PutBlock(context.Background(), TestBlock, TestHash); err != nil || n < 1 {
t.Fatalf("PutBlock: n %d err %v", n, err)
}
vols[0].(*MockVolume).Touchable = true
oldMtime, newMtime)
}
buf := make([]byte, BlockSize)
- n, err := vols[1].Get(TestHash, buf)
+ n, err := vols[1].Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatalf("vols[1]: %v", err)
}
defer KeepVM.Close()
vols := KeepVM.AllReadable()
- vols[0].Put(TestHash, TestBlock)
- vols[1].Put(TestHash2, TestBlock2)
- vols[0].Put(TestHash3, TestBlock3)
- vols[0].Put(TestHash+".meta", []byte("metadata"))
- vols[1].Put(TestHash2+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), TestHash, TestBlock)
+ vols[1].Put(context.Background(), TestHash2, TestBlock2)
+ vols[0].Put(context.Background(), TestHash3, TestBlock3)
+ vols[0].Put(context.Background(), TestHash+".meta", []byte("metadata"))
+ vols[1].Put(context.Background(), TestHash2+".meta", []byte("metadata"))
buf := new(bytes.Buffer)
vols[0].IndexTo("", buf)
package main
import (
+ "context"
"crypto/rand"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
// Put block
var PutContent = func(content []byte, locator string) (err error) {
- _, err = PutBlock(content, locator)
+ _, err = PutBlock(context.Background(), content, locator)
return
}
package main
import (
+ "bytes"
+ "context"
"encoding/base64"
"encoding/hex"
"flag"
"fmt"
"io"
+ "io/ioutil"
"log"
"net/http"
"os"
"github.com/AdRoll/goamz/s3"
)
+const (
+ s3DefaultReadTimeout = arvados.Duration(10 * time.Minute)
+ s3DefaultConnectTimeout = arvados.Duration(time.Minute)
+)
+
var (
// ErrS3TrashDisabled is returned by Trash if that operation
// is impossible with the current config.
LocationConstraint bool
IndexPageSize int
S3Replication int
+ ConnectTimeout arvados.Duration
+ ReadTimeout arvados.Duration
RaceWindow arvados.Duration
ReadOnly bool
UnsafeDelete bool
func (*S3Volume) Examples() []Volume {
return []Volume{
&S3Volume{
- AccessKeyFile: "/etc/aws_s3_access_key.txt",
- SecretKeyFile: "/etc/aws_s3_secret_key.txt",
- Endpoint: "",
- Region: "us-east-1",
- Bucket: "example-bucket-name",
- IndexPageSize: 1000,
- S3Replication: 2,
- RaceWindow: arvados.Duration(24 * time.Hour),
+ AccessKeyFile: "/etc/aws_s3_access_key.txt",
+ SecretKeyFile: "/etc/aws_s3_secret_key.txt",
+ Endpoint: "",
+ Region: "us-east-1",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
+ ConnectTimeout: arvados.Duration(time.Minute),
+ ReadTimeout: arvados.Duration(5 * time.Minute),
},
&S3Volume{
- AccessKeyFile: "/etc/gce_s3_access_key.txt",
- SecretKeyFile: "/etc/gce_s3_secret_key.txt",
- Endpoint: "https://storage.googleapis.com",
- Region: "",
- Bucket: "example-bucket-name",
- IndexPageSize: 1000,
- S3Replication: 2,
- RaceWindow: arvados.Duration(24 * time.Hour),
+ AccessKeyFile: "/etc/gce_s3_access_key.txt",
+ SecretKeyFile: "/etc/gce_s3_secret_key.txt",
+ Endpoint: "https://storage.googleapis.com",
+ Region: "",
+ Bucket: "example-bucket-name",
+ IndexPageSize: 1000,
+ S3Replication: 2,
+ RaceWindow: arvados.Duration(24 * time.Hour),
+ ConnectTimeout: arvados.Duration(time.Minute),
+ ReadTimeout: arvados.Duration(5 * time.Minute),
},
}
}
if err != nil {
return err
}
+
+ // Zero timeouts mean "wait forever", which is a bad
+ // default. Default to long timeouts instead.
+ if v.ConnectTimeout == 0 {
+ v.ConnectTimeout = s3DefaultConnectTimeout
+ }
+ if v.ReadTimeout == 0 {
+ v.ReadTimeout = s3DefaultReadTimeout
+ }
+
+ client := s3.New(auth, region)
+ client.ConnectTimeout = time.Duration(v.ConnectTimeout)
+ client.ReadTimeout = time.Duration(v.ReadTimeout)
v.bucket = &s3.Bucket{
- S3: s3.New(auth, region),
+ S3: client,
Name: v.Bucket,
}
return nil
}
+func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+ ready := make(chan bool)
+ go func() {
+ rdr, err = v.getReader(loc)
+ close(ready)
+ }()
+ select {
+ case <-ready:
+ return
+ case <-ctx.Done():
+ theConfig.debugLogf("s3: abandoning getReader(): %s", ctx.Err())
+ go func() {
+ <-ready
+ if err == nil {
+ rdr.Close()
+ }
+ }()
+ return nil, ctx.Err()
+ }
+}
+
// getReader wraps (Bucket)GetReader.
//
// In situations where (Bucket)GetReader would fail because the block
// Get a block: copy the block data into buf, and return the number of
// bytes copied.
-func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
- rdr, err := v.getReader(loc)
+func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
+ rdr, err := v.getReaderWithContext(ctx, loc)
if err != nil {
return 0, err
}
- defer rdr.Close()
- n, err := io.ReadFull(rdr, buf)
- switch err {
- case nil, io.EOF, io.ErrUnexpectedEOF:
- return n, nil
- default:
- return 0, v.translateError(err)
+
+ var n int
+ ready := make(chan bool)
+ go func() {
+ defer close(ready)
+
+ defer rdr.Close()
+ n, err = io.ReadFull(rdr, buf)
+
+ switch err {
+ case nil, io.EOF, io.ErrUnexpectedEOF:
+ err = nil
+ default:
+ err = v.translateError(err)
+ }
+ }()
+ select {
+ case <-ctx.Done():
+ theConfig.debugLogf("s3: interrupting ReadFull() with Close() because %s", ctx.Err())
+ rdr.Close()
+ // Must wait for ReadFull to return, to ensure it
+ // doesn't write to buf after we return.
+ theConfig.debugLogf("s3: waiting for ReadFull() to fail")
+ <-ready
+ return 0, ctx.Err()
+ case <-ready:
+ return n, err
}
}
// Compare the given data with the stored data.
-func (v *S3Volume) Compare(loc string, expect []byte) error {
- rdr, err := v.getReader(loc)
+func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+ rdr, err := v.getReaderWithContext(ctx, loc)
if err != nil {
return err
}
defer rdr.Close()
- return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
+ return v.translateError(compareReaderWithBuf(ctx, rdr, expect, loc[:32]))
}
// Put writes a block.
-func (v *S3Volume) Put(loc string, block []byte) error {
+func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
var opts s3.Options
- if len(block) > 0 {
+ size := len(block)
+ if size > 0 {
md5, err := hex.DecodeString(loc)
if err != nil {
return err
}
opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
}
- err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
- if err != nil {
+
+ // Send the block data through a pipe, so that (if we need to)
+ // we can close the pipe early and abandon our PutReader()
+ // goroutine, without worrying about PutReader() accessing our
+ // block buffer after we release it.
+ bufr, bufw := io.Pipe()
+ go func() {
+ io.Copy(bufw, bytes.NewReader(block))
+ bufw.Close()
+ }()
+
+ var err error
+ ready := make(chan bool)
+ go func() {
+ defer func() {
+ if ctx.Err() != nil {
+ theConfig.debugLogf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+ }
+ }()
+ defer close(ready)
+ err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+ if err != nil {
+ return
+ }
+ err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ }()
+ select {
+ case <-ctx.Done():
+ theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
+ // Our pipe might be stuck in Write(), waiting for
+ // io.Copy() to read. If so, un-stick it. This means
+ // PutReader will get corrupt data, but that's OK: the
+ // size and MD5 won't match, so the write will fail.
+ go io.Copy(ioutil.Discard, bufr)
+ // CloseWithError() will return once pending I/O is done.
+ bufw.CloseWithError(ctx.Err())
+ theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
+ return ctx.Err()
+ case <-ready:
return v.translateError(err)
}
- err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
- return v.translateError(err)
}
// Touch sets the timestamp for the given locator to the current time.
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"io/ioutil"
// Check canGet
loc, blk := setupScenario()
buf := make([]byte, len(blk))
- _, err := v.Get(loc, buf)
+ _, err := v.Get(context.Background(), loc, buf)
c.Check(err == nil, check.Equals, scenario.canGet)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
loc, blk = setupScenario()
err = v.Trash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
- _, err = v.Get(loc, buf)
+ _, err = v.Get(context.Background(), loc, buf)
c.Check(err == nil, check.Equals, scenario.canGetAfterTrash)
if err != nil {
c.Check(os.IsNotExist(err), check.Equals, true)
// should be able to Get after Untrash --
// regardless of timestamps, errors, race
// conditions, etc.
- _, err = v.Get(loc, buf)
+ _, err = v.Get(context.Background(), loc, buf)
c.Check(err, check.IsNil)
}
// Check for current Mtime after Put (applies to all
// scenarios)
loc, blk = setupScenario()
- err = v.Put(loc, blk)
+ err = v.Put(context.Background(), loc, blk)
c.Check(err, check.IsNil)
t, err := v.Mtime(loc)
c.Check(err, check.IsNil)
import (
"container/list"
+ "context"
"testing"
"time"
)
// Put test content
vols := KeepVM.AllWritable()
if testData.CreateData {
- vols[0].Put(testData.Locator1, testData.Block1)
- vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
+ vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
if testData.CreateInVolume1 {
- vols[0].Put(testData.Locator2, testData.Block2)
- vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
+ vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
} else {
- vols[1].Put(testData.Locator2, testData.Block2)
- vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
+ vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
+ vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
}
}
// Verify Locator1 to be un/deleted as expected
buf := make([]byte, BlockSize)
- size, err := GetBlock(testData.Locator1, buf, nil)
+ size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
if testData.ExpectLocator1 {
if size == 0 || err != nil {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- size, err = GetBlock(testData.Locator2, buf, nil)
+ size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
if testData.ExpectLocator2 {
if size == 0 || err != nil {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
locatorFoundIn := 0
for _, volume := range KeepVM.AllReadable() {
buf := make([]byte, BlockSize)
- if _, err := volume.Get(testData.Locator1, buf); err == nil {
+ if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
}
package main
import (
+ "context"
"io"
"sync/atomic"
"time"
// any of the data.
//
// len(buf) will not exceed BlockSize.
- Get(loc string, buf []byte) (int, error)
+ Get(ctx context.Context, loc string, buf []byte) (int, error)
// Compare the given data with the stored data (i.e., what Get
// would return). If equal, return nil. If not, return
// CollisionError or DiskHashError (depending on whether the
// data on disk matches the expected hash), or whatever error
// was encountered opening/reading the stored data.
- Compare(loc string, data []byte) error
+ Compare(ctx context.Context, loc string, data []byte) error
// Put writes a block to an underlying storage device.
//
//
// Put should not verify that loc==hash(block): this is the
// caller's responsibility.
- Put(loc string, block []byte) error
+ Put(ctx context.Context, loc string, block []byte) error
// Touch sets the timestamp for the given locator to the
// current time.
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"os"
v.PutRaw(TestHash, TestBlock)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
defer v.Teardown()
buf := make([]byte, BlockSize)
- if _, err := v.Get(TestHash2, buf); err == nil {
+ if _, err := v.Get(context.Background(), TestHash2, buf); err == nil {
t.Errorf("Expected error while getting non-existing block %v", TestHash2)
}
}
v := factory(t)
defer v.Teardown()
- err := v.Compare(TestHash, TestBlock)
+ err := v.Compare(context.Background(), TestHash, TestBlock)
if err != os.ErrNotExist {
t.Errorf("Got err %T %q, expected os.ErrNotExist", err, err)
}
v.PutRaw(testHash, testData)
// Compare the block locator with same content
- err := v.Compare(testHash, testData)
+ err := v.Compare(context.Background(), testHash, testData)
if err != nil {
t.Errorf("Got err %q, expected nil", err)
}
v.PutRaw(testHash, testDataA)
// Compare the block locator with different content; collision
- err := v.Compare(TestHash, testDataB)
+ err := v.Compare(context.Background(), TestHash, testDataB)
if err == nil {
t.Errorf("Got err nil, expected error due to collision")
}
v.PutRaw(TestHash, testDataB)
- err := v.Compare(testHash, testDataA)
+ err := v.Compare(context.Background(), testHash, testDataA)
if err == nil || err == CollisionError {
t.Errorf("Got err %+v, expected non-collision error", err)
}
return
}
- err := v.Put(testHash, testData)
+ err := v.Put(context.Background(), testHash, testData)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
}
- err = v.Put(testHash, testData)
+ err = v.Put(context.Background(), testHash, testData)
if err != nil {
t.Errorf("Got err putting block second time %q: %q, expected nil", TestBlock, err)
}
v.PutRaw(testHash, testDataA)
- putErr := v.Put(testHash, testDataB)
+ putErr := v.Put(context.Background(), testHash, testDataB)
buf := make([]byte, BlockSize)
- n, getErr := v.Get(testHash, buf)
+ n, getErr := v.Get(context.Background(), testHash, buf)
if putErr == nil {
// Put must not return a nil error unless it has
// overwritten the existing data.
return
}
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
}
- err = v.Put(TestHash2, TestBlock2)
+ err = v.Put(context.Background(), TestHash2, TestBlock2)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock2, err)
}
- err = v.Put(TestHash3, TestBlock3)
+ err = v.Put(context.Background(), TestHash3, TestBlock3)
if err != nil {
t.Errorf("Got err putting block %q: %q, expected nil", TestBlock3, err)
}
data := make([]byte, BlockSize)
- n, err := v.Get(TestHash, data)
+ n, err := v.Get(context.Background(), TestHash, data)
if err != nil {
t.Error(err)
} else {
}
}
- n, err = v.Get(TestHash2, data)
+ n, err = v.Get(context.Background(), TestHash2, data)
if err != nil {
t.Error(err)
} else {
}
}
- n, err = v.Get(TestHash3, data)
+ n, err = v.Get(context.Background(), TestHash3, data)
if err != nil {
t.Error(err)
} else {
return
}
- if err := v.Put(TestHash, TestBlock); err != nil {
+ if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
}
// Write the same block again.
- if err := v.Put(TestHash, TestBlock); err != nil {
+ if err := v.Put(context.Background(), TestHash, TestBlock); err != nil {
t.Error(err)
}
return
}
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data := make([]byte, BlockSize)
- n, err := v.Get(TestHash, data)
+ n, err := v.Get(context.Background(), TestHash, data)
if err != nil {
t.Error(err)
} else if bytes.Compare(data[:n], TestBlock) != 0 {
return
}
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
if err := v.Trash(TestHash); err != nil {
t.Error(err)
}
data := make([]byte, BlockSize)
- if _, err := v.Get(TestHash, data); err == nil || !os.IsNotExist(err) {
+ if _, err := v.Get(context.Background(), TestHash, data); err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
t.Fatalf("os.IsNotExist(%v) should have been true", err)
}
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.Background(), TestHash, TestBlock)
if err == nil || !os.IsNotExist(err) {
t.Fatalf("os.IsNotExist(%v) should have been true", err)
}
buf := make([]byte, BlockSize)
// Get from read-only volume should succeed
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
// Put a new block to read-only volume should result in error
- err = v.Put(TestHash2, TestBlock2)
+ err = v.Put(context.Background(), TestHash2, TestBlock2)
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
- _, err = v.Get(TestHash2, buf)
+ _, err = v.Get(context.Background(), TestHash2, buf)
if err == nil {
t.Errorf("Expected error when getting block whose put in read-only volume failed")
}
}
// Overwriting an existing block in read-only volume should result in error
- err = v.Put(TestHash, TestBlock)
+ err = v.Put(context.Background(), TestHash, TestBlock)
if err == nil {
t.Errorf("Expected error when putting block in a read-only volume")
}
sem := make(chan int)
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("err1: %v", err)
}
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash2, buf)
+ n, err := v.Get(context.Background(), TestHash2, buf)
if err != nil {
t.Errorf("err2: %v", err)
}
go func() {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash3, buf)
+ n, err := v.Get(context.Background(), TestHash3, buf)
if err != nil {
t.Errorf("err3: %v", err)
}
sem := make(chan int)
go func(sem chan int) {
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Errorf("err1: %v", err)
}
}(sem)
go func(sem chan int) {
- err := v.Put(TestHash2, TestBlock2)
+ err := v.Put(context.Background(), TestHash2, TestBlock2)
if err != nil {
t.Errorf("err2: %v", err)
}
}(sem)
go func(sem chan int) {
- err := v.Put(TestHash3, TestBlock3)
+ err := v.Put(context.Background(), TestHash3, TestBlock3)
if err != nil {
t.Errorf("err3: %v", err)
}
// Double check that we actually wrote the blocks we expected to write.
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("Get #1: %v", err)
}
t.Errorf("Get #1: expected %s, got %s", string(TestBlock), string(buf[:n]))
}
- n, err = v.Get(TestHash2, buf)
+ n, err = v.Get(context.Background(), TestHash2, buf)
if err != nil {
t.Errorf("Get #2: %v", err)
}
t.Errorf("Get #2: expected %s, got %s", string(TestBlock2), string(buf[:n]))
}
- n, err = v.Get(TestHash3, buf)
+ n, err = v.Get(context.Background(), TestHash3, buf)
if err != nil {
t.Errorf("Get #3: %v", err)
}
wdata[0] = 'a'
wdata[BlockSize-1] = 'z'
hash := fmt.Sprintf("%x", md5.Sum(wdata))
- err := v.Put(hash, wdata)
+ err := v.Put(context.Background(), hash, wdata)
if err != nil {
t.Fatal(err)
}
buf := make([]byte, BlockSize)
- n, err := v.Get(hash, buf)
+ n, err := v.Get(context.Background(), hash, buf)
if err != nil {
t.Error(err)
}
v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
t.Fatal(err)
}
} else {
- _, err = v.Get(TestHash, buf)
+ _, err = v.Get(context.Background(), TestHash, buf)
if err == nil || !os.IsNotExist(err) {
t.Errorf("os.IsNotExist(%v) should have been true", err)
}
}
// Get the block - after trash and untrash sequence
- n, err = v.Get(TestHash, buf)
+ n, err = v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Fatal(err)
}
checkGet := func() error {
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash, buf)
+ n, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
return err
}
return err
}
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.Background(), TestHash, TestBlock)
if err != nil {
return err
}
import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
}
}
-func (v *MockVolume) Compare(loc string, buf []byte) error {
+func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
v.gotCall("Compare")
<-v.Gate
if v.Bad {
}
}
-func (v *MockVolume) Get(loc string, buf []byte) (int, error) {
+func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
v.gotCall("Get")
<-v.Gate
if v.Bad {
return 0, os.ErrNotExist
}
-func (v *MockVolume) Put(loc string, block []byte) error {
+func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
v.gotCall("Put")
<-v.Gate
if v.Bad {
import (
"bufio"
+ "context"
"flag"
"fmt"
"io"
// Lock the locker (if one is in use), open the file for reading, and
// call the given function if and when the file is ready to read.
-func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
+func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
if v.locker != nil {
v.locker.Lock()
defer v.locker.Unlock()
}
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
f, err := os.Open(path)
if err != nil {
return err
// Get retrieves a block, copies it to the given slice, and returns
// the number of bytes copied.
-func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
+func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
path := v.blockPath(loc)
stat, err := v.stat(path)
if err != nil {
}
var read int
size := int(stat.Size())
- err = v.getFunc(path, func(rdr io.Reader) error {
+ err = v.getFunc(ctx, path, func(rdr io.Reader) error {
read, err = io.ReadFull(rdr, buf[:size])
return err
})
// Compare returns nil if Get(loc) would return the same content as
// expect. It is functionally equivalent to Get() followed by
// bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(loc string, expect []byte) error {
+func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
path := v.blockPath(loc)
if _, err := v.stat(path); err != nil {
return v.translateError(err)
}
- return v.getFunc(path, func(rdr io.Reader) error {
- return compareReaderWithBuf(rdr, expect, loc[:32])
+ return v.getFunc(ctx, path, func(rdr io.Reader) error {
+ return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
})
}
// "loc". It returns nil on success. If the volume is full, it
// returns a FullError. If the write fails due to some other error,
// that error is returned.
-func (v *UnixVolume) Put(loc string, block []byte) error {
+func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
if v.ReadOnly {
return MethodDisabledError
}
v.locker.Lock()
defer v.locker.Unlock()
}
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
if _, err := tmpfile.Write(block); err != nil {
log.Printf("%s: writing to %s: %s\n", v, bpath, err)
tmpfile.Close()
import (
"bytes"
+ "context"
"errors"
"fmt"
"io"
v.ReadOnly = orig
}(v.ReadOnly)
v.ReadOnly = false
- err := v.Put(locator, data)
+ err := v.Put(context.Background(), locator, data)
if err != nil {
v.t.Fatal(err)
}
func TestGetNotFound(t *testing.T) {
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
buf := make([]byte, BlockSize)
- n, err := v.Get(TestHash2, buf)
+ n, err := v.Get(context.Background(), TestHash2, buf)
switch {
case os.IsNotExist(err):
break
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err != nil {
t.Error(err)
}
defer v.Teardown()
os.Chmod(v.Root, 000)
- err := v.Put(TestHash, TestBlock)
+ err := v.Put(context.Background(), TestHash, TestBlock)
if err == nil {
t.Error("Write should have failed")
}
v.PutRaw(TestHash, TestBlock)
buf := make([]byte, BlockSize)
- _, err := v.Get(TestHash, buf)
+ _, err := v.Get(context.Background(), TestHash, buf)
if err != nil {
t.Errorf("got err %v, expected nil", err)
}
- err = v.Put(TestHash, TestBlock)
+ err = v.Put(context.Background(), TestHash, TestBlock)
if err != MethodDisabledError {
t.Errorf("got err %v, expected MethodDisabledError", err)
}
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
mockErr := errors.New("Mock error")
- err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
return mockErr
})
if err != mockErr {
defer v.Teardown()
funcCalled := false
- err := v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ err := v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
funcCalled = true
return nil
})
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
mtx := NewMockMutex()
v.locker = mtx
funcCalled := make(chan struct{})
- go v.getFunc(v.blockPath(TestHash), func(rdr io.Reader) error {
+ go v.getFunc(context.Background(), v.blockPath(TestHash), func(rdr io.Reader) error {
funcCalled <- struct{}{}
return nil
})
v := NewTestableUnixVolume(t, false, false)
defer v.Teardown()
- v.Put(TestHash, TestBlock)
- err := v.Compare(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, TestBlock)
+ err := v.Compare(context.Background(), TestHash, TestBlock)
if err != nil {
t.Errorf("Got err %q, expected nil", err)
}
- err = v.Compare(TestHash, []byte("baddata"))
+ err = v.Compare(context.Background(), TestHash, []byte("baddata"))
if err != CollisionError {
t.Errorf("Got err %q, expected %q", err, CollisionError)
}
- v.Put(TestHash, []byte("baddata"))
- err = v.Compare(TestHash, TestBlock)
+ v.Put(context.Background(), TestHash, []byte("baddata"))
+ err = v.Compare(context.Background(), TestHash, TestBlock)
if err != DiskHashError {
t.Errorf("Got err %q, expected %q", err, DiskHashError)
}
p := fmt.Sprintf("%s/%s/%s", v.Root, TestHash[:3], TestHash)
os.Chmod(p, 000)
- err = v.Compare(TestHash, TestBlock)
+ err = v.Compare(context.Background(), TestHash, TestBlock)
if err == nil || strings.Index(err.Error(), "permission denied") < 0 {
t.Errorf("Got err %q, expected %q", err, "permission denied")
}
echo "Could not find Dockerfile (expected it at $ARVBOX_DOCKER/Dockerfile.base)"
exit 1
fi
- GITHEAD=$(cd $ARVBOX_DOCKER && git log --format=%H -n1 HEAD)
- docker build --build-arg=arvados_version=$GITHEAD $NO_CACHE -t arvados/arvbox-base:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
if docker --version |grep " 1\.[0-9]\." ; then
# Docker version prior 1.10 require -f flag
# -f flag removed in Docker 1.12
FORCE=-f
fi
+ GITHEAD=$(cd $ARVBOX_DOCKER && git log --format=%H -n1 HEAD)
+ docker build --build-arg=arvados_version=$GITHEAD $NO_CACHE -t arvados/arvbox-base:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
+ docker tag $FORCE arvados/arvbox-base:$GITHEAD arvados/arvbox-base:latest
if test "$1" = localdemo -o "$1" = publicdemo ; then
docker build $NO_CACHE -t arvados/arvbox-demo:$GITHEAD -f "$ARVBOX_DOCKER/Dockerfile.demo" "$ARVBOX_DOCKER"
docker tag $FORCE arvados/arvbox-demo:$GITHEAD arvados/arvbox-demo:latest
export CRUNCH_DISPATCH_LOCKFILE=/var/lock/$1-dispatch
export CRUNCH_JOB_DOCKER_BIN=docker
export HOME=/tmp/$1
+export CRUNCH_JOB_DOCKER_RUN_ARGS=--net=host
cd /usr/src/arvados/services/api
if test "$1" = "crunch0" ; then
if err != nil {
log.Fatal(err)
}
- kc, err := keepclient.MakeKeepClient(&arv)
+ kc, err := keepclient.MakeKeepClient(arv)
if err != nil {
log.Fatal(err)
}
overrideServices(kc)
- nextBuf := make(chan []byte, *WriteThreads)
nextLocator := make(chan string, *ReadThreads+*WriteThreads)
go countBeans(nextLocator)
for i := 0; i < *WriteThreads; i++ {
+ nextBuf := make(chan []byte, 1)
go makeBufs(nextBuf, i)
go doWrites(kc, nextBuf, nextLocator)
}
}
}
-func makeBufs(nextBuf chan []byte, threadID int) {
+func makeBufs(nextBuf chan<- []byte, threadID int) {
buf := make([]byte, *BlockSize)
if *VaryThread {
binary.PutVarint(buf, int64(threadID))
}
+ randSize := 524288
+ if randSize > *BlockSize {
+ randSize = *BlockSize
+ }
for {
if *VaryRequest {
- buf = make([]byte, *BlockSize)
- if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+ rnd := make([]byte, randSize)
+ if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
log.Fatal(err)
}
+ buf = append(rnd, buf[randSize:]...)
}
nextBuf <- buf
}
}
-func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string) {
for buf := range nextBuf {
locator, _, err := kc.PutB(buf)
if err != nil {
}
}
-func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) {
for locator := range nextLocator {
rdr, size, url, err := kc.Get(locator)
if err != nil {