pane_list = []
procs = ["arvados#containerRequest"]
+ procs_pane_name = 'Processes'
if PipelineInstance.api_exists?(:index)
procs << "arvados#pipelineInstance"
+ procs_pane_name = 'Pipelines_and_processes'
end
workflows = ["arvados#workflow"]
}
pane_list <<
{
- :name => 'Pipelines_and_processes',
+ :name => procs_pane_name,
:filters => [%w(uuid is_a) + [procs]]
}
pane_list <<
preload_links_for_objects(collection_pdhs + collection_uuids)
%>
+<%
+ if !PipelineInstance.api_exists?(:index)
+ recent_procs_title = 'Recent processes'
+ run_proc_title = 'Choose a workflow to run:'
+ else
+ recent_procs_title = 'Recent pipelines and processes'
+ run_proc_title = 'Choose a pipeline or workflow to run:'
+ end
+%>
+
<div class="row">
<div class="col-md-6">
<div class="panel panel-default" style="min-height: 10.5em">
<div class="panel-heading">
- <span class="panel-title">Recent pipelines and processes</span>
+ <span class="panel-title"><%=recent_procs_title%></span>
<% if current_user.andand.is_active %>
<span class="pull-right recent-processes-actions">
<span>
<%= link_to(
choose_work_unit_templates_path(
- title: 'Choose a pipeline or workflow to run:',
+ title: run_proc_title,
action_name: 'Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i>',
action_href: work_units_path,
action_method: 'post',
action_data: {'selection_param' => 'work_unit[template_uuid]', 'work_unit[owner_uuid]' => current_user.uuid, 'success' => 'redirect-to-created-object'}.to_json),
{ class: "btn btn-primary btn-xs", remote: true }) do %>
- <i class="fa fa-fw fa-gear"></i> Run a pipeline...
+ <i class="fa fa-fw fa-gear"></i> Run a process...
<% end %>
</span>
<span>
--- /dev/null
+<%= render_pane 'tab_contents', to_string: true, locals: {
+ limit: 50,
+ filters: [['uuid', 'is_a', ["arvados#containerRequest"]]],
+ sortable_columns: { 'name' => 'container_requests.name', 'description' => 'container_requests.description' }
+ }.merge(local_assigns) %>
</h2>
<% end %>
+<%
+ if !PipelineInstance.api_exists?(:index)
+ run_proc_title = 'Choose a workflow to run:'
+ run_proc_hover = 'Run a workflow in this project'
+ else
+ run_proc_title = 'Choose a pipeline or workflow to run:'
+ run_proc_hover = 'Run a pipeline or workflow in this project'
+ end
+%>
+
<% content_for :tab_line_buttons do %>
<% if @object.editable? %>
<div class="btn-group btn-group-sm">
</div>
<%= link_to(
choose_work_unit_templates_path(
- title: 'Choose a pipeline or workflow to run:',
+ title: run_proc_title,
action_name: 'Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i>',
action_href: work_units_path,
action_method: 'post',
action_data: {'selection_param' => 'work_unit[template_uuid]', 'work_unit[owner_uuid]' => @object.uuid, 'success' => 'redirect-to-created-object'}.to_json),
- { class: "btn btn-primary btn-sm", remote: true, title: "Run a pipeline or workflow in this project" }) do %>
- <i class="fa fa-fw fa-gear"></i> Run a pipeline...
+ { class: "btn btn-primary btn-sm", remote: true, title: run_proc_hover }) do %>
+ <i class="fa fa-fw fa-gear"></i> Run a process...
<% end %>
<%= link_to projects_path({'project[owner_uuid]' => @object.uuid, 'options' => {'ensure_unique_name' => true}}), method: :post, title: "Add a subproject to this project", class: 'btn btn-sm btn-primary' do %>
<i class="fa fa-fw fa-plus"></i>
get :index, {}, session_for(:active)
assert_includes @response.body, "zzzzz-xvhdp-cr4runningcntnr" # expect crs
assert_not_includes @response.body, "zzzzz-d1hrv-" # expect no pipelines
+ assert_includes @response.body, "Run a process"
end
[
end
[
+ :admin,
:active,
nil,
].each do |user|
assert_includes resp, "href=\"#Pipelines_and_processes\""
assert_includes resp, "href=\"#Workflows\""
assert_not_includes resp, "href=\"#Pipeline_templates\""
+ assert_includes @response.body, "Run a process" if user == :admin
end
end
end
assert_text 'Recent pipelines and processes' # seeing dashboard now
within('.recent-processes-actions') do
- assert page.has_link?('Run a pipeline')
+ assert page.has_link?('Run a process')
assert page.has_link?('All processes')
end
collection = api_fixture('collections', collection_fixture)
# create a pipeline instance
- find('.btn', text: 'Run a pipeline').click
+ find('.btn', text: 'Run a process').click
within('.modal-dialog') do
find('.selectable', text: template_name).click
find('.btn', text: 'Next: choose inputs').click
project = api_fixture('groups')['aproject']
visit page_with_token 'active', '/projects/' + project['uuid']
- find('.btn', text: 'Run a pipeline').click
+ find('.btn', text: 'Run a process').click
# in the chooser, verify preview and click Next button
within('.modal-dialog') do
within('.recent-processes-actions') do
assert page.has_link?('All processes')
- find('a', text: 'Run a pipeline').click
+ find('a', text: 'Run a process').click
end
# in the chooser, verify preview and click Next button
test "home page" do
visit_page_with_token
assert_text 'Dashboard'
- assert_selector 'a', text: 'Run a pipeline'
+ assert_selector 'a', text: 'Run a process'
end
test "search for hash" do
Run package install test script "test-packages-$target.sh"
--debug
Output debug information (default: false)
---only-test
+--only-build <package>
+ Build only a specific package
+--only-test <package>
Test only a specific package
WORKSPACE=path Path to the Arvados source tree to build packages from
fi
PARSEDOPTS=$(getopt --name "$0" --longoptions \
- help,debug,test-packages,target:,command:,only-test: \
+ help,debug,test-packages,target:,command:,only-test:,only-build: \
-- "" "$@")
if [ $? -ne 0 ]; then
exit 1
TARGET="$2"; shift
;;
--only-test)
+ test_packages=1
packages="$2"; shift
;;
+ --only-build)
+ ONLY_BUILD="$2"; shift
+ ;;
--debug)
DEBUG=" --debug"
;;
if docker run --rm \
"${docker_volume_args[@]}" \
--env ARVADOS_DEBUG=1 \
+ --env "ONLY_BUILD=$ONLY_BUILD" \
"$IMAGE" $COMMAND
then
echo
Build api server and workbench packages with vendor/bundle included
--debug
Output debug information (default: false)
---target
+--target <target>
Distribution to build packages for (default: debian7)
+--only-build <package>
+ Build only a specific package (or $ONLY_BUILD from environment)
--command
Build command to execute (defaults to the run command defined in the
Docker image)
COMMAND=
PARSEDOPTS=$(getopt --name "$0" --longoptions \
- help,build-bundle-packages,debug,target: \
+ help,build-bundle-packages,debug,target:,only-build: \
-- "" "$@")
if [ $? -ne 0 ]; then
exit 1
--target)
TARGET="$2"; shift
;;
+ --only-build)
+ ONLY_BUILD="$2"; shift
+ ;;
--debug)
DEBUG=1
;;
# Perl packages
debug_echo -e "\nPerl packages\n"
+if [[ -z "$ONLY_BUILD" ]] || [[ "libarvados-perl" = "$ONLY_BUILD" ]] ; then
cd "$WORKSPACE/sdk/perl"
if [[ -e Makefile ]]; then
"Curoverse, Inc." dir "$(version_from_git)" install/man/=/usr/share/man \
"$WORKSPACE/LICENSE-2.0.txt=/usr/share/doc/libarvados-perl/LICENSE-2.0.txt" && \
mv --no-clobber libarvados-perl*.$FORMAT "$WORKSPACE/packages/$TARGET/"
+fi
# Ruby gems
debug_echo -e "\nRuby gems\n"
# So we build this thing separately.
#
# Ward, 2016-03-17
-fpm_build schema_salad "" "" python 1.18.20161005190847 --depends "${PYTHON2_PKG_PREFIX}-lockfile >= 1:0.12.2-2"
+fpm_build schema_salad "" "" python 1.20.20161122192122 --depends "${PYTHON2_PKG_PREFIX}-lockfile >= 1:0.12.2-2"
# And schema_salad now depends on ruamel-yaml, which apparently has a braindead setup.py that requires special arguments to build (otherwise, it aborts with 'error: you have to install with "pip install ."'). Sigh.
# Ward, 2016-05-26
fpm_build cwltest "" "" python 1.0.20160907111242
# And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20161107145355
+fpm_build cwltool "" "" python 1.0.20161128202906
# FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
fpm_build rdflib-jsonld "" "" python 0.3.0
for deppkg in "${PYTHON_BACKPORTS[@]}"; do
outname=$(echo "$deppkg" | sed -e 's/^python-//' -e 's/[<=>].*//' -e 's/_/-/g' -e "s/^/${PYTHON2_PKG_PREFIX}-/")
+
+ if [[ -n "$ONLY_BUILD" ]] && [[ "$outname" != "$ONLY_BUILD" ]] ; then
+ continue
+ fi
+
case "$deppkg" in
httplib2|google-api-python-client)
# Work around 0640 permissions on some package files.
--license="GNU Affero General Public License, version 3.0"
# Build the workbench server package
+if [[ -z "$ONLY_BUILD" ]] || [[ "arvados-workbench" = "$ONLY_BUILD" ]] ; then
(
set -e
cd "$WORKSPACE/apps/workbench"
# Remove generated configuration files so they don't go in the package.
rm config/application.yml config/environments/production.rb
)
+fi
if [[ "$?" != "0" ]]; then
echo "ERROR: Asset precompilation failed"
local gem_version="$(nohash_version_from_git)"
local gem_src_dir="$(pwd)"
+ if [[ -n "$ONLY_BUILD" ]] && [[ "$gem_name" != "$ONLY_BUILD" ]] ; then
+ return 0
+ fi
+
if ! [[ -e "${gem_name}-${gem_version}.gem" ]]; then
find -maxdepth 1 -name "${gem_name}-*.gem" -delete
local description="$1"; shift
local license_file="${1:-agpl-3.0.txt}"; shift
+ if [[ -n "$ONLY_BUILD" ]] && [[ "$prog" != "$ONLY_BUILD" ]] ; then
+ return 0
+ fi
+
debug_echo "package_go_binary $src_path as $prog"
local basename="${src_path##*/}"
handle_rails_package() {
local pkgname="$1"; shift
+
+ if [[ -n "$ONLY_BUILD" ]] && [[ "$pkgname" != "$ONLY_BUILD" ]] ; then
+ return 0
+ fi
+
local srcdir="$1"; shift
local license_path="$1"; shift
local scripts_dir="$(mktemp --tmpdir -d "$pkgname-XXXXXXXX.scripts")" && \
VERSION=$1
shift
+ if [[ -n "$ONLY_BUILD" ]] && [[ "$PACKAGE_NAME" != "$ONLY_BUILD" ]] && [[ "$PACKAGE" != "$ONLY_BUILD" ]] ; then
+ return 0
+ fi
+
local default_iteration_value="$(default_iteration "$PACKAGE" "$VERSION")"
case "$PACKAGE_TYPE" in
|vcpus|integer|Number of cores to be used to run this process.|Optional. However, a ContainerRequest that is in "Committed" state must provide this.|
|keep_cache_ram|integer|Number of keep cache bytes to be used to run this process.|Optional.|
|API|boolean|When set, ARVADOS_API_HOST and ARVADOS_API_TOKEN will be set, and container will have networking enabled to access the Arvados API server.|Optional.|
-|partition|array of strings|Specify the names of one or more compute partitions that may run this container. If not provided, the system chooses where to run the container.|Optional.|
--- /dev/null
+Scheduling parameters
+
+Parameters to be passed to the container scheduler (e.g., SLURM) when running a container.
+
+table(table table-bordered table-condensed).
+|_. Key|_. Type|_. Description|_. Notes|
+|partitions|array of strings|The names of one or more compute partitions that may run this container. If not provided, the system will choose where to run the container.|Optional.|
--- /dev/null
+{% include 'notebox_begin_warning' %}
+This section assumes the legacy Jobs API is available. Some newer installations have already disabled the Jobs API in favor of the Containers API.
--- /dev/null
+{% include 'notebox_end' %}
--- /dev/null
+<div class="alert alert-block alert-warning">
+ <h4>Note:</h4>
-{% include 'notebox_begin' %}
+{% include 'notebox_begin_warning' %}
Arvados pipeline templates are deprecated. The recommended way to develop new workflows for Arvados is using the "Common Workflow Language":{{site.baseurl}}/user/cwl/cwl-runner.html.
{% include 'notebox_end' %}
"vcpus":2,
"API":true
}</code></pre>See "Runtime constraints":#runtime_constraints for more details.|
+|scheduling_parameters|hash|Parameters to be passed to the container scheduler when running this container.|e.g.,<pre><code>{
+"partitions":["fastcpu","vfastcpu"]
+}</code></pre>See "Scheduling parameters":#scheduling_parameters for more details.|
|container_image|string|Portable data hash of a collection containing the docker image to run the container.|Required.|
|environment|hash|Environment variables and values that should be set in the container environment (@docker run --env@). This augments and (when conflicts exist) overrides environment variables given in the image's Dockerfile.||
|cwd|string|Initial working directory, given as an absolute path (in the container) or a path relative to the WORKDIR given in the image's Dockerfile.|Required.|
|command|array of strings|Command to execute in the container.|Required. e.g., @["echo","hello"]@|
|output_path|string|Path to a directory or file inside the container that should be preserved as container's output when it finishes. This path must be, or be inside, one of the mount targets. For best performance, point output_path to a writable collection mount.|Required.|
-|priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to prevew the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
+|priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to preview the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
|expires_at|datetime|After this time, priority is considered to be zero.|Not yet implemented.|
|use_existing|boolean|If possible, use an existing (non-failed) container to satisfy the request instead of creating a new one.|Default is true|
|filters|string|Additional constraints for satisfying the container_request, given in the same form as the filters parameter accepted by the container_requests.list API.|
h2(#runtime_constraints). {% include 'container_runtime_constraints' %}
+h2(#scheduling_parameters). {% include 'container_scheduling_parameters' %}
+
h2(#container_reuse). Container reuse
When a container request is "Committed", the system will try to find and reuse any preexisting Container with the same exact command, cwd, environment, output_path, container_image, mounts, and runtime_constraints as this container request. The serialized fields environment, mounts and runtime_constraints are sorted to facilitate comparison.
"vcpus":2,
"API":true
}</code></pre>See "Runtime constraints":#runtime_constraints for more details.|
+|scheduling_parameters|hash|Parameters to be passed to the container scheduler when running this container.|e.g.,<pre><code>{
+"partitions":["fastcpu","vfastcpu"]
+}</code></pre>See "Scheduling parameters":#scheduling_parameters for more details.|
|output|string|Portable data hash of the output collection.|Null if the container is not yet finished.|
|container_image|string|Portable data hash of a collection containing the docker image used to run the container.||
|progress|number|A number between 0.0 and 1.0 describing the fraction of work done.||
h2(#runtime_constraints). {% include 'container_runtime_constraints' %}
+h2(#scheduling_parameters). {% include 'container_scheduling_parameters' %}
+
h2. Methods
See "Common resource methods":{{site.baseurl}}/api/methods.html for more information about @create@, @delete@, @get@, @list@, and @update@.
</code></pre>
</notextile>
-{% assign railscmd = "bundle exec ./script/get_anonymous_user_token.rb" %}
+{% assign railscmd = "bundle exec ./script/get_anonymous_user_token.rb --get" %}
{% assign railsout = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" %}
If you intend to use Keep-web to serve public data to anonymous clients, configure it with an anonymous token. You can use the same one you used when you set up your Keepproxy server, or use the following command on the <strong>API server</strong> to create another. {% include 'install_rails_command' %}
h3. Create an API token for the Keepproxy server
-{% assign railscmd = "bundle exec ./script/get_anonymous_user_token.rb" %}
+{% assign railscmd = "bundle exec ./script/get_anonymous_user_token.rb --get" %}
{% assign railsout = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" %}
The Keepproxy server needs a token to talk to the API server. On the <strong>API server</strong>, use the following command to create the token. {% include 'install_rails_command' %}
If you reference a local file which is not in @arv-mount@, then @arvados-cwl-runner@ will upload the file to Keep and use the Keep URI reference from the upload.
-h2. Registering a workflow with Workbench
+h2. Registering a workflow to use in Workbench
-Use @--create-template@ to register a CWL workflow with Arvados Workbench. This enables you to run workflows by clicking on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a pipeline...</span> on the Workbench Dashboard.
+Use @--create-workflow@ to register a CWL workflow with Arvados. This enables you to share workflows with other Arvados users, and run them by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard.
<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-template bwa-mem.cwl</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to qr1hi-4zz18-7e0hedrmkuyoei3
You can provide a partial input file to set default values for the workflow input parameters:
<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-template bwa-mem.cwl bwa-mem-template.yml</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to qr1hi-4zz18-0f91qkovk4ml18o
Once your account is active, logging in to the Workbench will present you with the Dashboard. This gives a summary of your projects and recent activity in the Arvados instance. "You are now ready to run your first pipeline.":{{ site.baseurl }}/user/tutorials/tutorial-pipeline-workbench.html
-!{{ site.baseurl }}/images/workbench-dashboard.png!
+!{display: block;margin-left: 25px;margin-right: auto;border:1px solid lightgray;}{{ site.baseurl }}/images/workbench-dashboard.png!
title: "Using arv-run"
...
+{% include 'crunch1only_begin' %}
+On those sites, the features described here are not yet implemented.
+{% include 'crunch1only_end' %}
+
The @arv-run@ command enables you create Arvados pipelines at the command line that fan out to multiple concurrent tasks across Arvados compute nodes.
{% include 'tutorial_expectations' %}
title: "Running an Arvados pipeline"
...
+{% include 'crunch1only_begin' %}
+If the Jobs API is not available, use the "Common Workflow Language":{{site.baseurl}}/user/cwl/cwl-runner.html instead.
+{% include 'crunch1only_end' %}
+
This tutorial demonstrates how to use the command line to run the same pipeline as described in "running a pipeline using Workbench.":{{site.baseurl}}/user/tutorials/tutorial-pipeline-workbench.html
{% include 'tutorial_expectations' %}
title: "Running a pipeline using Workbench"
...
+{% include 'crunch1only_begin' %}
+On those sites, the details will be slightly different and the example pipeline might not be available.
+{% include 'crunch1only_end' %}
+
A "pipeline" (sometimes called a "workflow" in other systems) is a sequence of steps that apply various programs or tools to transform input data to output data. Pipelines are the principal means of performing computation with Arvados. This tutorial demonstrates how to run a single-stage pipeline to take a small data set of paired-end reads from a sample "exome":https://en.wikipedia.org/wiki/Exome in "FASTQ":https://en.wikipedia.org/wiki/FASTQ_format format and align them to "Chromosome 19":https://en.wikipedia.org/wiki/Chromosome_19_%28human%29 using the "bwa mem":http://bio-bwa.sourceforge.net/ tool, producing a "Sequence Alignment/Map (SAM)":https://samtools.github.io/ file. This tutorial will introduce the following Arvados features:
<div>
h3. Steps
# Start from the *Workbench Dashboard*. You can access the Dashboard by clicking on *<i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard* in the upper left corner of any Workbench page.
-# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a pipeline...</span> button. This will open a dialog box titled *Choose a pipeline to run*.
+# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button. This will open a dialog box titled *Choose a pipeline to run*.
# In the search box, type in *Tutorial align using bwa mem*.
# Select *<i class="fa fa-fw fa-gear"></i> Tutorial align using bwa mem* and click the <span class="btn btn-sm btn-primary" >Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span> button. This will create a new pipeline in your *Home* project and will open it. You can now supply the inputs for the pipeline.
# The first input parameter to the pipeline is *"reference_collection" parameter for run-command script in bwa-mem component*. Click the <span class="btn btn-sm btn-primary">Choose</span> button beneath that header. This will open a dialog box titled *Choose a dataset for "reference_collection" parameter for run-command script in bwa-mem component*.
my $line = $1;
substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
Log ($jobstepidx, "stderr $line");
- if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
+ if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
# If the allocation is revoked, we can't possibly continue, so mark all
# nodes as failed. This will cause the overall exit code to be
# EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
$st->{node}->{fail_count}++;
}
}
- elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
+ elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
$jobstep[$jobstepidx]->{tempfail} = 1;
if (defined($job_slot_index)) {
$slot[$job_slot_index]->{node}->{fail_count}++;
ban_node_by_slot($job_slot_index);
}
}
- elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
+ elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
$jobstep[$jobstepidx]->{tempfail} = 1;
ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
}
Runs in a separate thread.
"""
- while True:
- self.stop_polling.wait(15)
- if self.stop_polling.is_set():
- break
- with self.lock:
- keys = self.processes.keys()
- if not keys:
- continue
+ try:
+ while True:
+ self.stop_polling.wait(15)
+ if self.stop_polling.is_set():
+ break
+ with self.lock:
+ keys = self.processes.keys()
+ if not keys:
+ continue
- if self.work_api == "containers":
- table = self.poll_api.containers()
- elif self.work_api == "jobs":
- table = self.poll_api.jobs()
+ if self.work_api == "containers":
+ table = self.poll_api.containers()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
- try:
- proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
- except Exception as e:
- logger.warn("Error checking states on API server: %s", e)
- continue
-
- for p in proc_states["items"]:
- self.on_message({
- "object_uuid": p["uuid"],
- "event_type": "update",
- "properties": {
- "new_attributes": p
- }
- })
+ try:
+ proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+ except:
+ logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+ self.cond.acquire()
+ self.processes.clear()
+ self.cond.notify()
+ self.cond.release()
+ finally:
+ self.stop_polling.set()
def get_uploaded(self):
return self.uploaded.copy()
tool.visit(self.check_writable)
- useruuid = self.api.users().current().execute()["uuid"]
- self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+ self.project_uuid = kwargs.get("project_uuid")
self.pipeline = None
make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
api_client=self.api,
keep_client=self.keep_client)
self.fs_access = make_fs_access(kwargs["basedir"])
- if kwargs.get("create_template"):
- tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
- tmpl.save()
- # cwltool.main will write our return value to stdout.
- return tmpl.uuid
-
- if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
- return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+ existing_uuid = kwargs.get("update_workflow")
+ if existing_uuid or kwargs.get("create_workflow"):
+ if self.work_api == "jobs":
+ tmpl = RunnerTemplate(self, tool, job_order,
+ kwargs.get("enable_reuse"),
+ uuid=existing_uuid)
+ tmpl.save()
+ # cwltool.main will write our return value to stdout.
+ return tmpl.uuid
+ else:
+ return upload_workflow(self, tool, job_order,
+ self.project_uuid,
+ uuid=existing_uuid)
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
loopperf.__enter__()
for runnable in jobiter:
loopperf.__exit__()
+
+ if self.stop_polling.is_set():
+ break
+
if runnable:
with Perf(metrics, "run"):
runnable.run(**kwargs)
if sys.exc_info()[0] is KeyboardInterrupt:
logger.error("Interrupted, marking pipeline as failed")
else:
- logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
default=True, dest="submit")
exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
default=True, dest="submit")
- exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
- exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
- exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
+ exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
+ dest="create_workflow")
+ exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
+ exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
job_order_object = None
arvargs = parser.parse_args(args)
- if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+
+ if arvargs.update_workflow:
+ if arvargs.update_workflow.find('-7fd4e-') == 5:
+ want_api = 'containers'
+ elif arvargs.update_workflow.find('-p5p6p-') == 5:
+ want_api = 'jobs'
+ else:
+ want_api = None
+ if want_api and arvargs.work_api and want_api != arvargs.work_api:
+ logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+ arvargs.update_workflow, want_api, arvargs.work_api))
+ return 1
+ arvargs.work_api = want_api
+
+ if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
job_order_object = ({}, "")
add_arv_hints()
arvargs.conformance_test = None
arvargs.use_container = True
+ arvargs.relax_path_checks = True
return cwltool.main.main(args=arvargs,
stdout=stdout,
if record["output"]:
outputs = done.done(self, record, "/tmp", self.outdir, "/keep")
except WorkflowException as e:
- logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ logger.error("Error while collecting output for container %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
except Exception as e:
- logger.exception("Got unknown exception while collecting job outputs:")
+ logger.exception("Got unknown exception while collecting output for container %s:", self.name)
+ processStatus = "permanentFail"
+
+ # Note: Currently, on error output_callback is expecting an empty dict,
+ # anything else will fail.
+ if not isinstance(outputs, dict):
+ logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
+ outputs = {}
processStatus = "permanentFail"
self.output_callback(outputs, processStatus)
cwltool.docker.get_image(dockerRequirement, pull_image)
# Upload image to Arvados
- args = ["--project-uuid="+project_uuid, image_name]
+ args = []
+ if project_uuid:
+ args.append("--project-uuid="+project_uuid)
+ args.append(image_name)
if image_tag:
args.append(image_tag)
logger.info("Uploading Docker image %s", ":".join(args[1:]))
with Perf(metrics, "done %s" % self.name):
self.done(response)
except Exception as e:
- logger.error("Got error %s" % str(e))
+ logger.exception("Job %s error" % (self.name))
self.output_callback({}, "permanentFail")
def update_pipeline_component(self, record):
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
except WorkflowException as e:
- logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+ logger.error("Error while collecting output for job %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
- outputs = None
except Exception as e:
- logger.exception("Got unknown exception while collecting job outputs:")
+ logger.exception("Got unknown exception while collecting output for job %s:", self.name)
+ processStatus = "permanentFail"
+
+ # Note: Currently, on error output_callback is expecting an empty dict,
+ # anything else will fail.
+ if not isinstance(outputs, dict):
+ logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
+ outputs = {}
processStatus = "permanentFail"
- outputs = None
self.output_callback(outputs, processStatus)
finally:
'string': 'text',
}
- def __init__(self, runner, tool, job_order, enable_reuse):
+ def __init__(self, runner, tool, job_order, enable_reuse, uuid):
self.runner = runner
self.tool = tool
self.job = RunnerJob(
enable_reuse=enable_reuse,
output_name=None,
output_tags=None)
+ self.uuid = uuid
def pipeline_component_spec(self):
"""Return a component that Workbench and a-r-p-i will understand.
return spec
def save(self):
- job_spec = self.pipeline_component_spec()
- response = self.runner.api.pipeline_templates().create(body={
+ body = {
"components": {
- self.job.name: job_spec,
+ self.job.name: self.pipeline_component_spec(),
},
"name": self.job.name,
- "owner_uuid": self.runner.project_uuid,
- }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
- self.uuid = response["uuid"]
- logger.info("Created template %s", self.uuid)
+ }
+ if self.runner.project_uuid:
+ body["owner_uuid"] = self.runner.project_uuid
+ if self.uuid:
+ self.runner.api.pipeline_templates().update(
+ uuid=self.uuid, body=body).execute(
+ num_retries=self.runner.num_retries)
+ logger.info("Updated template %s", self.uuid)
+ else:
+ self.uuid = self.runner.api.pipeline_templates().create(
+ body=body, ensure_unique_name=True).execute(
+ num_retries=self.runner.num_retries)['uuid']
+ logger.info("Created template %s", self.uuid)
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
-def upload_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
+def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None):
upload_docker(arvRunner, tool)
document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
body = {
"workflow": {
- "owner_uuid": project_uuid,
"name": tool.tool.get("label", name),
"description": tool.tool.get("doc", ""),
"definition":yaml.safe_dump(packed)
}}
+ if project_uuid:
+ body["workflow"]["owner_uuid"] = project_uuid
- if update_uuid:
- return arvRunner.api.workflows().update(uuid=update_uuid, body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
+ if uuid:
+ call = arvRunner.api.workflows().update(uuid=uuid, body=body)
else:
- return arvRunner.api.workflows().create(body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
+ call = arvRunner.api.workflows().create(body=body)
+ return call.execute(num_retries=arvRunner.num_retries)["uuid"]
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
else:
processStatus = "permanentFail"
- outputs = None
+ outputs = {}
try:
try:
self.final_output = record["output"]
adjustFileObjs(outputs, keepify)
adjustDirObjs(outputs, keepify)
except Exception as e:
- logger.error("While getting final output object: %s", e)
+ logger.exception("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
finally:
del self.arvrunner.processes[record["uuid"]]
# Make sure to update arvados/build/run-build-packages.sh as well
# when updating the cwltool version pin.
install_requires=[
- 'cwltool==1.0.20161107145355',
+ 'cwltool==1.0.20161128202906',
'arvados-python-client>=0.1.20160826210445'
],
data_files=[
stubs.expect_pipeline_instance = {
'name': 'submit_wf.cwl',
'state': 'RunningOnServer',
+ 'owner_uuid': None,
"components": {
"cwl-runner": {
'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__},
}
},
'state': 'Committed',
- 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
stubs.api.workflows().create().execute.return_value = {
"uuid": stubs.expect_workflow_uuid,
}
+ def update_mock(**kwargs):
+ stubs.updated_uuid = kwargs.get('uuid')
+ return mock.DEFAULT
+ stubs.api.workflows().update.side_effect = update_mock
+ stubs.api.workflows().update().execute.side_effect = lambda **kwargs: {
+ "uuid": stubs.updated_uuid,
+ }
return func(self, stubs, *args, **kwargs)
return wrapped
'./tool d51232d96b6116d964a69bfb7e0c73bf+450 '
'0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
- 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'owner_uuid': None,
'name': 'submit_wf.cwl',
}, ensure_unique_name=True),
mock.call().execute(),
mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
'0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
'replication_desired': None,
'name': 'New collection'
}, ensure_unique_name=True),
mock.call(body={
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
- 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'owner_uuid': None,
'name': '#',
}, ensure_unique_name=True),
mock.call().execute()])
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
stubs.api.pipeline_instances().create.assert_called_with(
body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
stubs.api.pipeline_instances().create.assert_called_with(
body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
stubs.api.pipeline_instances().create.assert_called_with(
body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
- expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
stubs.api.pipeline_instances().create.assert_called_with(
body=expect_pipeline)
self.assertEqual(capture_stdout.getvalue(),
'./tool d51232d96b6116d964a69bfb7e0c73bf+450 '
'0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
- 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'owner_uuid': None,
'name': 'submit_wf.cwl',
}, ensure_unique_name=True),
mock.call().execute(),
mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
'0:0:blub.txt 0:0:submit_tool.cwl\n',
- 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
'name': 'New collection',
'replication_desired': None,
}, ensure_unique_name=True),
mock.call(body={
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
- 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'owner_uuid': None,
'name': '#',
}, ensure_unique_name=True),
mock.call().execute()])
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["owner_uuid"] = stubs.fake_user_uuid
stubs.api.container_requests().create.assert_called_with(
body=expect_container)
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--disable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["owner_uuid"] = stubs.fake_user_uuid
stubs.api.container_requests().create.assert_called_with(
body=expect_container)
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-name="+output_name, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["owner_uuid"] = stubs.fake_user_uuid
stubs.api.container_requests().create.assert_called_with(
body=expect_container)
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-tags="+output_tags, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["owner_uuid"] = stubs.fake_user_uuid
stubs.api.container_requests().create.assert_called_with(
body=expect_container)
self.assertEqual(capture_stdout.getvalue(),
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--create-template", "--debug",
+ ["--create-workflow", "--debug",
"--project-uuid", project_uuid,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
class TestCreateWorkflow(unittest.TestCase):
+ existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
+ expect_workflow = open("tests/wf/expect_packed.cwl").read()
+
@stubs
def test_create(self, stubs):
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
exited = arvados_cwl.main(
["--create-workflow", "--debug",
+ "--api=containers",
"--project-uuid", project_uuid,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
stubs.api.pipeline_templates().create.refute_called()
stubs.api.container_requests().create.refute_called()
- with open("tests/wf/expect_packed.cwl") as f:
- expect_workflow = f.read()
-
body = {
"workflow": {
"owner_uuid": project_uuid,
"name": "submit_wf.cwl",
"description": "",
- "definition": expect_workflow
- }
+ "definition": self.expect_workflow,
+ }
}
stubs.api.workflows().create.assert_called_with(
body=JsonDiffMatcher(body))
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_workflow_uuid + '\n')
+ @stubs
+ def test_incompatible_api(self, stubs):
+ capture_stderr = cStringIO.StringIO()
+ logging.getLogger('arvados.cwl-runner').addHandler(
+ logging.StreamHandler(capture_stderr))
+
+ exited = arvados_cwl.main(
+ ["--update-workflow", self.existing_workflow_uuid,
+ "--api=jobs",
+ "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stderr, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 1)
+ self.assertRegexpMatches(
+ capture_stderr.getvalue(),
+ "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
+
+ @stubs
+ def test_update(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+
+ exited = arvados_cwl.main(
+ ["--update-workflow", self.existing_workflow_uuid,
+ "--debug",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 0)
+
+ body = {
+ "workflow": {
+ "name": "submit_wf.cwl",
+ "description": "",
+ "definition": self.expect_workflow,
+ }
+ }
+ stubs.api.workflows().update.assert_called_with(
+ uuid=self.existing_workflow_uuid,
+ body=JsonDiffMatcher(body))
+ self.assertEqual(capture_stdout.getvalue(),
+ self.existing_workflow_uuid + '\n')
+
class TestTemplateInputs(unittest.TestCase):
expect_template = {
cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- expect_template = copy.deepcopy(self.expect_template)
- expect_template["owner_uuid"] = stubs.fake_user_uuid
-
stubs.api.pipeline_templates().create.assert_called_with(
- body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
+ body=JsonDiffMatcher(self.expect_template), ensure_unique_name=True)
@stubs
def test_inputs(self, stubs):
cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
- self.expect_template["owner_uuid"] = stubs.fake_user_uuid
-
expect_template = copy.deepcopy(self.expect_template)
- expect_template["owner_uuid"] = stubs.fake_user_uuid
params = expect_template[
"components"]["inputs_test.cwl"]["script_parameters"]
params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
--- /dev/null
+package httpserver
+
+import (
+ "strconv"
+ "sync"
+ "time"
+)
+
+// IDGenerator generates alphanumeric strings suitable for use as
+// unique IDs (a given IDGenerator will never return the same ID
+// twice).
+type IDGenerator struct {
+ // Prefix is prepended to each returned ID.
+ Prefix string
+
+ lastID int64
+ mtx sync.Mutex
+}
+
+// Next returns a new ID string. It is safe to call Next from multiple
+// goroutines.
+func (g *IDGenerator) Next() string {
+ id := time.Now().UnixNano()
+ g.mtx.Lock()
+ if id <= g.lastID {
+ id = g.lastID + 1
+ }
+ g.lastID = id
+ g.mtx.Unlock()
+ return g.Prefix + strconv.FormatInt(id, 36)
+}
"net/http"
)
+// RequestCounter is an http.Handler that tracks the number of
+// requests in progress.
+type RequestCounter interface {
+ http.Handler
+
+ // Current() returns the number of requests in progress.
+ Current() int
+
+ // Max() returns the maximum number of concurrent requests
+ // that will be accepted.
+ Max() int
+}
+
type limiterHandler struct {
requests chan struct{}
handler http.Handler
}
-func NewRequestLimiter(maxRequests int, handler http.Handler) http.Handler {
+// NewRequestLimiter returns a RequestCounter that delegates up to
+// maxRequests at a time to the given handler, and responds 503 to all
+// incoming requests beyond that limit.
+func NewRequestLimiter(maxRequests int, handler http.Handler) RequestCounter {
return &limiterHandler{
requests: make(chan struct{}, maxRequests),
handler: handler,
}
}
+func (h *limiterHandler) Current() int {
+ return len(h.requests)
+}
+
+func (h *limiterHandler) Max() int {
+ return cap(h.requests)
+}
+
func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
select {
case h.requests <- struct{}{}:
return
self._keep.get(b)
except Exception:
- pass
+ _logger.exception("Exception doing block prefetch")
@synchronized
def start_get_threads(self):
with self.successful_copies_lock:
self.successful_copies += replicas_nr
self.response = response
+ with self.pending_tries_notification:
+ self.pending_tries_notification.notify_all()
- def write_fail(self, ks, status_code):
+ def write_fail(self, ks):
with self.pending_tries_notification:
self.pending_tries += 1
self.pending_tries_notification.notify()
def pending_copies(self):
with self.successful_copies_lock:
return self.wanted_copies - self.successful_copies
-
-
+
+ def get_next_task(self):
+ with self.pending_tries_notification:
+ while True:
+ if self.pending_copies() < 1:
+ # This notify_all() is unnecessary --
+ # write_success() already called notify_all()
+ # when pending<1 became true, so it's not
+ # possible for any other thread to be in
+ # wait() now -- but it's cheap insurance
+ # against deadlock so we do it anyway:
+ self.pending_tries_notification.notify_all()
+ # Drain the queue and then raise Queue.Empty
+ while True:
+ self.get_nowait()
+ self.task_done()
+ elif self.pending_tries > 0:
+ service, service_root = self.get_nowait()
+ if service.finished():
+ self.task_done()
+ continue
+ self.pending_tries -= 1
+ return service, service_root
+ elif self.empty():
+ self.pending_tries_notification.notify_all()
+ raise Queue.Empty
+ else:
+ self.pending_tries_notification.wait()
+
+
class KeepWriterThreadPool(object):
def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
self.total_task_nr = 0
worker.start()
# Wait for finished work
self.queue.join()
- with self.queue.pending_tries_notification:
- self.queue.pending_tries_notification.notify_all()
- for worker in self.workers:
- worker.join()
def response(self):
return self.queue.response
class KeepWriterThread(threading.Thread):
+ TaskFailed = RuntimeError()
+
def __init__(self, queue, data, data_hash, timeout=None):
super(KeepClient.KeepWriterThread, self).__init__()
self.timeout = timeout
self.queue = queue
self.data = data
self.data_hash = data_hash
-
+ self.daemon = True
+
def run(self):
- while not self.queue.empty():
- if self.queue.pending_copies() > 0:
- # Avoid overreplication, wait for some needed re-attempt
- with self.queue.pending_tries_notification:
- if self.queue.pending_tries <= 0:
- self.queue.pending_tries_notification.wait()
- continue # try again when awake
- self.queue.pending_tries -= 1
-
- # Get to work
- try:
- service, service_root = self.queue.get_nowait()
- except Queue.Empty:
- continue
- if service.finished():
- self.queue.task_done()
- continue
- success = bool(service.put(self.data_hash,
- self.data,
- timeout=self.timeout))
- result = service.last_result()
- if success:
- _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
- str(threading.current_thread()),
- self.data_hash,
- len(self.data),
- service_root)
- try:
- replicas_stored = int(result['headers']['x-keep-replicas-stored'])
- except (KeyError, ValueError):
- replicas_stored = 1
-
- self.queue.write_success(result['body'].strip(), replicas_stored)
- else:
- if result.get('status_code', None):
- _logger.debug("Request fail: PUT %s => %s %s",
- self.data_hash,
- result['status_code'],
- result['body'])
- self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
- # Mark as done so the queue can be join()ed
- self.queue.task_done()
+ while True:
+ try:
+ service, service_root = self.queue.get_next_task()
+ except Queue.Empty:
+ return
+ try:
+ locator, copies = self.do_task(service, service_root)
+ except Exception as e:
+ if e is not self.TaskFailed:
+ _logger.exception("Exception in KeepWriterThread")
+ self.queue.write_fail(service)
else:
- # Remove the task from the queue anyways
- try:
- self.queue.get_nowait()
- # Mark as done so the queue can be join()ed
- self.queue.task_done()
- except Queue.Empty:
- continue
+ self.queue.write_success(locator, copies)
+ finally:
+ self.queue.task_done()
+
+ def do_task(self, service, service_root):
+ success = bool(service.put(self.data_hash,
+ self.data,
+ timeout=self.timeout))
+ result = service.last_result()
+
+ if not success:
+ if result.get('status_code', None):
+ _logger.debug("Request fail: PUT %s => %s %s",
+ self.data_hash,
+ result['status_code'],
+ result['body'])
+ raise self.TaskFailed
+
+ _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+ str(threading.current_thread()),
+ self.data_hash,
+ len(self.data),
+ service_root)
+ try:
+ replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+ except (KeyError, ValueError):
+ replicas_stored = 1
+
+ return result['body'].strip(), replicas_stored
def __init__(self, api_client=None, proxy=None,
TIME_FUTURE = time.time()+3600
MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
+ TEST_TIMEOUT = 10.0
+
def setUp(self):
self.ws = None
@mock.patch('arvados.events._EventClient')
def test_run_forever_survives_reconnects(self, websocket_client):
- connection_cond = threading.Condition()
- def ws_connect():
- with connection_cond:
- connection_cond.notify_all()
- websocket_client().connect.side_effect = ws_connect
+ connected = threading.Event()
+ websocket_client().connect.side_effect = connected.set
client = arvados.events.EventClient(
self.MOCK_WS_URL, [], lambda event: None, None)
- with connection_cond:
- forever_thread = threading.Thread(target=client.run_forever)
- forever_thread.start()
- # Simulate an unexpected disconnect, and wait for reconnect.
- close_thread = threading.Thread(target=client.on_closed)
- close_thread.start()
- connection_cond.wait()
+ forever_thread = threading.Thread(target=client.run_forever)
+ forever_thread.start()
+ # Simulate an unexpected disconnect, and wait for reconnect.
+ close_thread = threading.Thread(target=client.on_closed)
+ close_thread.start()
+ self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT))
close_thread.join()
run_forever_alive = forever_thread.is_alive()
client.close()
class PollClientTestCase(unittest.TestCase):
+ TEST_TIMEOUT = 10.0
+
class MockLogs(object):
+
def __init__(self):
self.logs = []
self.lock = threading.Lock()
self.logs = []
return {'items': retval, 'items_available': len(retval)}
-
def setUp(self):
self.logs = self.MockLogs()
self.arv = mock.MagicMock(name='arvados.api()')
self.arv.logs().list().execute.side_effect = self.logs.return_list
- self.callback_cond = threading.Condition()
+ self.callback_called = threading.Event()
self.recv_events = []
def tearDown(self):
self.client.close(timeout=None)
def callback(self, event):
- with self.callback_cond:
- self.recv_events.append(event)
- self.callback_cond.notify_all()
+ self.recv_events.append(event)
+ self.callback_called.set()
def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
if filters is None:
test_log = {'id': 12345, 'testkey': 'testtext'}
self.logs.add({'id': 123})
self.build_client(poll_time=.01)
- with self.callback_cond:
- self.client.start()
- self.callback_cond.wait()
- self.logs.add(test_log.copy())
- self.callback_cond.wait()
+ self.client.start()
+ self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
+ self.callback_called.clear()
+ self.logs.add(test_log.copy())
+ self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
self.client.close(timeout=None)
self.assertIn(test_log, self.recv_events)
client_filter = ['kind', '=', 'arvados#test']
self.build_client()
self.client.subscribe([client_filter[:]])
- with self.callback_cond:
- self.client.start()
- self.callback_cond.wait()
+ self.client.start()
+ self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
self.client.close(timeout=None)
self.assertTrue(self.was_filter_used(client_filter))
def test_run_forever(self):
self.build_client()
- with self.callback_cond:
- self.client.start()
- forever_thread = threading.Thread(target=self.client.run_forever)
- forever_thread.start()
- self.callback_cond.wait()
+ self.client.start()
+ forever_thread = threading.Thread(target=self.client.run_forever)
+ forever_thread.start()
+ self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
self.assertTrue(forever_thread.is_alive())
self.client.close()
forever_thread.join()
self.check_exception(copies=2, num_retries=3)
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-
-
+class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
+
class FakeKeepService(object):
- def __init__(self, delay, will_succeed, replicas=1):
+ def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
self.delay = delay
- self.success = will_succeed
+ self.will_succeed = will_succeed
+ self.will_raise = will_raise
self._result = {}
self._result['headers'] = {}
self._result['headers']['x-keep-replicas-stored'] = str(replicas)
self._result['body'] = 'foobar'
-
+
def put(self, data_hash, data, timeout):
time.sleep(self.delay)
- return self.success
-
+ if self.will_raise is not None:
+ raise self.will_raise
+ return self.will_succeed
+
def last_result(self):
- return self._result
-
+ if self.will_succeed:
+ return self._result
+
def finished(self):
return False
-
- def test_only_write_enough_on_success(self):
- copies = 3
- pool = arvados.KeepClient.KeepWriterThreadPool(
+ def setUp(self):
+ self.copies = 3
+ self.pool = arvados.KeepClient.KeepWriterThreadPool(
data = 'foo',
data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
- max_service_replicas = copies,
- copies = copies
+ max_service_replicas = self.copies,
+ copies = self.copies
)
+
+ def test_only_write_enough_on_success(self):
for i in range(10):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
- pool.add_task(ks, None)
- pool.join()
- self.assertEqual(pool.done(), copies)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
def test_only_write_enough_on_partial_success(self):
- copies = 3
- pool = arvados.KeepClient.KeepWriterThreadPool(
- data = 'foo',
- data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
- max_service_replicas = copies,
- copies = copies
- )
for i in range(5):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
- pool.add_task(ks, None)
+ self.pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_only_write_enough_when_some_crash(self):
+ for i in range(5):
+ ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+ self.pool.add_task(ks, None)
+ ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies)
+
+ def test_fail_when_too_many_crash(self):
+ for i in range(self.copies+1):
+ ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+ self.pool.add_task(ks, None)
+ for i in range(self.copies-1):
ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
- pool.add_task(ks, None)
- pool.join()
- self.assertEqual(pool.done(), copies)
+ self.pool.add_task(ks, None)
+ self.pool.join()
+ self.assertEqual(self.pool.done(), self.copies-1)
@tutil.skip_sleep
accept_attribute_as_json :runtime_constraints, Hash
accept_attribute_as_json :command, Array
accept_attribute_as_json :filters, Array
+ accept_attribute_as_json :scheduling_parameters, Hash
end
accept_attribute_as_json :mounts, Hash
accept_attribute_as_json :runtime_constraints, Hash
accept_attribute_as_json :command, Array
+ accept_attribute_as_json :scheduling_parameters, Hash
skip_before_filter :find_object_by_uuid, only: [:current]
skip_before_filter :render_404_if_no_object, only: [:current]
Collection,
Human, Specimen, Trait]
- table_names = klasses.map(&:table_name)
+ table_names = Hash[klasses.collect { |k| [k, k.table_name] }]
+
+ disabled_methods = Rails.configuration.disable_api_methods
+ avail_klasses = table_names.select{|k, t| !disabled_methods.include?(t+'.index')}
+ klasses = avail_klasses.keys
+
request_filters.each do |col, op, val|
- if col.index('.') && !table_names.include?(col.split('.', 2)[0])
+ if col.index('.') && !table_names.values.include?(col.split('.', 2)[0])
raise ArgumentError.new("Invalid attribute '#{col}' in filter")
end
end
t.add :runtime_constraints
t.add :state
t.add :use_existing
+ t.add :output_uuid
+ t.add :log_uuid
t.add :scheduling_parameters
end
# Finalize the container request after the container has
# finished/cancelled.
def finalize!
- update_attributes!(state: Final)
+ out_coll = nil
+ log_coll = nil
c = Container.find_by_uuid(container_uuid)
['output', 'log'].each do |out_type|
pdh = c.send(out_type)
next if pdh.nil?
manifest = Collection.where(portable_data_hash: pdh).first.manifest_text
- Collection.create!(owner_uuid: owner_uuid,
+ coll = Collection.create!(owner_uuid: owner_uuid,
manifest_text: manifest,
portable_data_hash: pdh,
name: "Container #{out_type} for request #{uuid}",
'type' => out_type,
'container_request' => uuid,
})
+ if out_type == 'output'
+ out_coll = coll.uuid
+ else
+ log_coll = coll.uuid
+ end
end
+ update_attributes!(state: Final, output_uuid: out_coll, log_uuid: log_coll)
end
protected
permitted.push :command, :container_image, :cwd, :description, :environment,
:filters, :mounts, :name, :output_path, :properties,
:requesting_container_uuid, :runtime_constraints,
- :state, :container_uuid, :scheduling_parameters
+ :state, :container_uuid, :use_existing, :scheduling_parameters
end
when Final
errors.add :state, "of container request can only be set to Final by system."
end
- if self.state_changed? || self.name_changed? || self.description_changed?
- permitted.push :state, :name, :description
+ if self.state_changed? || self.name_changed? || self.description_changed? || self.output_uuid_changed? || self.log_uuid_changed?
+ permitted.push :state, :name, :description, :output_uuid, :log_uuid
else
errors.add :state, "does not allow updates"
end
--- /dev/null
+require 'has_uuid'
+
+class AddOutputAndLogUuidToContainerRequest < ActiveRecord::Migration
+ extend HasUuid::ClassMethods
+
+ def up
+ add_column :container_requests, :output_uuid, :string
+ add_column :container_requests, :log_uuid, :string
+
+ no_such_out_coll = Server::Application.config.uuid_prefix + '-' + '4zz18' + '-xxxxxxxxxxxxxxx'
+ no_such_log_coll = Server::Application.config.uuid_prefix + '-' + '4zz18' + '-yyyyyyyyyyyyyyy'
+
+ update_sql <<-EOS
+update container_requests set output_uuid = ('#{no_such_out_coll}'), log_uuid = ('#{no_such_log_coll}');
+EOS
+ end
+
+ def down
+ remove_column :container_requests, :log_uuid
+ remove_column :container_requests, :output_uuid
+ end
+end
--- /dev/null
+class AddOutputAndLogUuidsToContainerRequestSearchIndex < ActiveRecord::Migration
+ def up
+ begin
+ remove_index :container_requests, :name => 'container_requests_search_index'
+ rescue
+ end
+ add_index :container_requests,
+ ["uuid", "owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "name", "state", "requesting_container_uuid", "container_uuid", "container_image", "cwd", "output_path", "output_uuid", "log_uuid"],
+ name: "container_requests_search_index"
+ end
+
+ def down
+ begin
+ remove_index :container_requests, :name => 'container_requests_search_index'
+ rescue
+ end
+ add_index :container_requests,
+ ["uuid", "owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "name", "state", "requesting_container_uuid", "container_uuid", "container_image", "cwd", "output_path"],
+ name: "container_requests_search_index"
+ end
+end
updated_at timestamp without time zone NOT NULL,
container_count integer DEFAULT 0,
use_existing boolean DEFAULT true,
- scheduling_parameters text
+ scheduling_parameters text,
+ output_uuid character varying(255),
+ log_uuid character varying(255)
);
-- Name: container_requests_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX container_requests_search_index ON container_requests USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, name, state, requesting_container_uuid, container_uuid, container_image, cwd, output_path);
+CREATE INDEX container_requests_search_index ON container_requests USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, name, state, requesting_container_uuid, container_uuid, container_image, cwd, output_path, output_uuid, log_uuid);
--
INSERT INTO schema_migrations (version) VALUES ('20161019171346');
-INSERT INTO schema_migrations (version) VALUES ('20161111143147');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20161111143147');
+
+INSERT INTO schema_migrations (version) VALUES ('20161115171221');
+
+INSERT INTO schema_migrations (version) VALUES ('20161115174218');
\ No newline at end of file
--- /dev/null
+require 'test_helper'
+
+class Arvados::V1::ContainerRequestsControllerTest < ActionController::TestCase
+ test 'create with scheduling parameters' do
+ authorize_with :system_user
+
+ sp = {'partitions' => ['test1', 'test2']}
+ post :create, {
+ container_request: {
+ command: ['echo', 'hello'],
+ container_image: 'test',
+ output_path: 'test',
+ scheduling_parameters: sp,
+ },
+ }
+ assert_response :success
+
+ cr = JSON.parse(@response.body)
+ assert_not_nil cr, 'Expected container request'
+ assert_equal sp, cr['scheduling_parameters']
+ end
+end
assert_equal 0, json_response['items_available']
end
- def check_project_contents_response
+ def check_project_contents_response disabled_kinds=[]
assert_response :success
assert_operator 2, :<=, json_response['items_available']
assert_operator 2, :<=, json_response['items'].count
kinds = json_response['items'].collect { |i| i['kind'] }.uniq
- expect_kinds = %w'arvados#group arvados#specimen arvados#pipelineTemplate arvados#job'
+ expect_kinds = %w'arvados#group arvados#specimen arvados#pipelineTemplate arvados#job' - disabled_kinds
assert_equal expect_kinds, (expect_kinds & kinds)
json_response['items'].each do |i|
"group#contents returned a non-project group")
end
end
+
+ disabled_kinds.each do |d|
+ assert_equal true, !kinds.include?(d)
+ end
end
test 'get group-owned objects' do
end
end
end
+
+ test 'get contents with jobs and pipeline instances disabled' do
+ Rails.configuration.disable_api_methods = ['jobs.index', 'pipeline_instances.index']
+
+ authorize_with :active
+ get :contents, {
+ id: groups(:aproject).uuid,
+ format: :json,
+ }
+ check_project_contents_response %w'arvados#pipelineInstance arvados#job'
+ end
end
cr.reload
assert_equal "Committed", cr.state
+ output_pdh = '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'
+ log_pdh = 'fa7aeb5140e2848d39b416daeef4ffc5+45'
act_as_system_user do
c.update_attributes!(state: Container::Complete,
- output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
- log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+ output: output_pdh,
+ log: log_pdh)
end
cr.reload
owner_uuid: project.uuid).count,
"Container #{out_type} should be copied to #{project.uuid}")
end
+ assert_not_nil cr.output_uuid
+ assert_not_nil cr.log_uuid
+ output = Collection.find_by_uuid cr.output_uuid
+ assert_equal output_pdh, output.portable_data_hash
+ log = Collection.find_by_uuid cr.log_uuid
+ assert_equal log_pdh, log.portable_data_hash
end
test "Container makes container request, then is cancelled" do
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"os"
"regexp"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
"github.com/curoverse/azure-sdk-for-go/storage"
)
"flag"
"fmt"
"io/ioutil"
- "log"
"math/rand"
"net"
"net/http"
"testing"
"time"
+ log "github.com/Sirupsen/logrus"
"github.com/curoverse/azure-sdk-for-go/storage"
)
package main
import (
- "log"
"sync"
"sync/atomic"
"time"
+
+ log "github.com/Sirupsen/logrus"
)
type bufferPool struct {
"encoding/json"
"fmt"
"io/ioutil"
- "log"
"strings"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
)
type Config struct {
Debug bool
Listen string
+ LogFormat string
+
PIDFile string
MaxBuffers int
var theConfig = DefaultConfig()
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
// DefaultConfig returns the default configuration.
func DefaultConfig() *Config {
return &Config{
Listen: ":25107",
+ LogFormat: "json",
MaxBuffers: 128,
RequireSignatures: true,
BlobSignatureTTL: arvados.Duration(14 * 24 * time.Hour),
// fields, and before using the config.
func (cfg *Config) Start() error {
if cfg.Debug {
+ log.SetLevel(log.DebugLevel)
cfg.debugLogf = log.Printf
cfg.debugLogf("debugging enabled")
} else {
cfg.debugLogf = func(string, ...interface{}) {}
}
+ switch strings.ToLower(cfg.LogFormat) {
+ case "text":
+ log.SetFormatter(&log.TextFormatter{
+ FullTimestamp: true,
+ TimestampFormat: rfc3339NanoFixed,
+ })
+ case "json":
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ })
+ default:
+ return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
+ }
+
if cfg.MaxBuffers < 0 {
return fmt.Errorf("MaxBuffers must be greater than zero")
}
package main
import (
- "log"
+ log "github.com/Sirupsen/logrus"
)
func init() {
--- /dev/null
+package main
+
+import (
+ "io"
+)
+
+func NewCountingWriter(w io.Writer, f func(uint64)) io.WriteCloser {
+ return &countingReadWriter{
+ writer: w,
+ counter: f,
+ }
+}
+
+func NewCountingReader(r io.Reader, f func(uint64)) io.ReadCloser {
+ return &countingReadWriter{
+ reader: r,
+ counter: f,
+ }
+}
+
+type countingReadWriter struct {
+ reader io.Reader
+ writer io.Writer
+ counter func(uint64)
+}
+
+func (crw *countingReadWriter) Read(buf []byte) (int, error) {
+ n, err := crw.reader.Read(buf)
+ crw.counter(uint64(n))
+ return n, err
+}
+
+func (crw *countingReadWriter) Write(buf []byte) (int, error) {
+ n, err := crw.writer.Write(buf)
+ crw.counter(uint64(n))
+ return n, err
+}
+
+func (crw *countingReadWriter) Close() error {
+ if c, ok := crw.writer.(io.Closer); ok {
+ return c.Close()
+ }
+ return nil
+}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+ (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
ok <- struct{}{}
}()
"fmt"
"github.com/gorilla/mux"
"io"
- "log"
"net/http"
"os"
"regexp"
"strings"
"sync"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ log "github.com/Sirupsen/logrus"
)
-// MakeRESTRouter returns a new mux.Router that forwards all Keep
-// requests to the appropriate handlers.
-//
-func MakeRESTRouter() *mux.Router {
+type router struct {
+ *mux.Router
+ limiter httpserver.RequestCounter
+}
+
+// MakeRESTRouter returns a new router that forwards all Keep requests
+// to the appropriate handlers.
+func MakeRESTRouter() *router {
rest := mux.NewRouter()
+ rtr := &router{Router: rest}
rest.HandleFunc(
`/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
// Privileged client only.
rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
+ // Internals/debugging info (runtime.MemStats)
+ rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
+
// List volumes: path, device number, bytes used/avail.
- rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
+ rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
// Replace the current pull queue.
rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
// 400 Bad Request.
rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
- return rest
+ return rtr
}
// BadRequestHandler is a HandleFunc to address bad requests.
resp.Write([]byte{'\n'})
}
-// StatusHandler
-// Responds to /status.json requests with the current node status,
-// described in a JSON structure.
-//
-// The data given in a status.json response includes:
-// volumes - a list of Keep volumes currently in use by this server
-// each volume is an object with the following fields:
-// * mount_point
-// * device_num (an integer identifying the underlying filesystem)
-// * bytes_free
-// * bytes_used
-
// PoolStatus struct
type PoolStatus struct {
Alloc uint64 `json:"BytesAllocated"`
Len int `json:"BuffersInUse"`
}
+type volumeStatusEnt struct {
+ Label string
+ Status *VolumeStatus `json:",omitempty"`
+ VolumeStats *ioStats `json:",omitempty"`
+ InternalStats interface{} `json:",omitempty"`
+}
+
// NodeStatus struct
type NodeStatus struct {
- Volumes []*VolumeStatus `json:"volumes"`
- BufferPool PoolStatus
- PullQueue WorkQueueStatus
- TrashQueue WorkQueueStatus
- Memory runtime.MemStats
+ Volumes []*volumeStatusEnt
+ BufferPool PoolStatus
+ PullQueue WorkQueueStatus
+ TrashQueue WorkQueueStatus
+ RequestsCurrent int
+ RequestsMax int
}
var st NodeStatus
var stLock sync.Mutex
+// DebugHandler addresses /debug.json requests.
+func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
+ type debugStats struct {
+ MemStats runtime.MemStats
+ }
+ var ds debugStats
+ runtime.ReadMemStats(&ds.MemStats)
+ err := json.NewEncoder(resp).Encode(&ds)
+ if err != nil {
+ http.Error(resp, err.Error(), 500)
+ }
+}
+
// StatusHandler addresses /status.json requests.
-func StatusHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
- readNodeStatus(&st)
+ rtr.readNodeStatus(&st)
jstat, err := json.Marshal(&st)
stLock.Unlock()
if err == nil {
}
// populate the given NodeStatus struct with current values.
-func readNodeStatus(st *NodeStatus) {
+func (rtr *router) readNodeStatus(st *NodeStatus) {
vols := KeepVM.AllReadable()
if cap(st.Volumes) < len(vols) {
- st.Volumes = make([]*VolumeStatus, len(vols))
+ st.Volumes = make([]*volumeStatusEnt, len(vols))
}
st.Volumes = st.Volumes[:0]
for _, vol := range vols {
- if s := vol.Status(); s != nil {
- st.Volumes = append(st.Volumes, s)
+ var internalStats interface{}
+ if vol, ok := vol.(InternalStatser); ok {
+ internalStats = vol.InternalStats()
}
+ st.Volumes = append(st.Volumes, &volumeStatusEnt{
+ Label: vol.String(),
+ Status: vol.Status(),
+ InternalStats: internalStats,
+ //VolumeStats: KeepVM.VolumeStats(vol),
+ })
}
st.BufferPool.Alloc = bufs.Alloc()
st.BufferPool.Cap = bufs.Cap()
st.BufferPool.Len = bufs.Len()
st.PullQueue = getWorkQueueStatus(pullq)
st.TrashQueue = getWorkQueueStatus(trashq)
- runtime.ReadMemStats(&st.Memory)
+ if rtr.limiter != nil {
+ st.RequestsCurrent = rtr.limiter.Current()
+ st.RequestsMax = rtr.limiter.Max()
+ }
}
// return a WorkQueueStatus for the given queue. If q is nil (which
import (
"flag"
"fmt"
- "log"
"net"
"net/http"
"os"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ log "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
)
}
err = theConfig.Start()
+ if err != nil {
+ log.Fatal(err)
+ }
if pidfile := theConfig.PIDFile; pidfile != "" {
f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
// Middleware stack: logger, MaxRequests limiter, method handlers
- http.Handle("/", &LoggingRESTRouter{
- httpserver.NewRequestLimiter(theConfig.MaxRequests,
- MakeRESTRouter()),
- })
+ router := MakeRESTRouter()
+ limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
+ router.limiter = limiter
+ http.Handle("/", &LoggingRESTRouter{router: limiter})
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
// LoggingResponseWriter
import (
- "log"
+ "context"
+ "fmt"
"net/http"
"strings"
"time"
+
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ log "github.com/Sirupsen/logrus"
)
// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
// LoggingRESTRouter is used to add logging capabilities to mux.Router
type LoggingRESTRouter struct {
- router http.Handler
+ router http.Handler
+ idGenerator httpserver.IDGenerator
}
func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
- t0 := time.Now()
+ tStart := time.Now()
+
+ // Attach a requestID-aware logger to the request context.
+ lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
+ ctx := context.WithValue(req.Context(), "logger", lgr)
+ req = req.WithContext(ctx)
+
+ lgr = lgr.WithFields(log.Fields{
+ "remoteAddr": req.RemoteAddr,
+ "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+ "reqMethod": req.Method,
+ "reqPath": req.URL.Path[1:],
+ "reqBytes": req.ContentLength,
+ })
+ lgr.Debug("request")
+
resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
loggingRouter.router.ServeHTTP(&resp, req)
+ tDone := time.Now()
+
statusText := http.StatusText(resp.Status)
if resp.Status >= 400 {
statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
}
- now := time.Now()
- tTotal := now.Sub(t0)
- tLatency := resp.sentHdr.Sub(t0)
- tResponse := now.Sub(resp.sentHdr)
- log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), resp.Status, resp.Length, statusText)
+ if resp.sentHdr == zeroTime {
+ // Nobody changed status or wrote any data, i.e., we
+ // returned a 200 response with no body.
+ resp.sentHdr = tDone
+ }
+
+ lgr.WithFields(log.Fields{
+ "timeTotal": loggedDuration(tDone.Sub(tStart)),
+ "timeToStatus": loggedDuration(resp.sentHdr.Sub(tStart)),
+ "timeWriteBody": loggedDuration(tDone.Sub(resp.sentHdr)),
+ "respStatusCode": resp.Status,
+ "respStatus": statusText,
+ "respBytes": resp.Length,
+ }).Info("response")
+}
+
+type loggedDuration time.Duration
+
+// MarshalJSON formats a duration as a number of seconds, using
+// fixed-point notation with no more than 6 decimal places.
+func (d loggedDuration) MarshalJSON() ([]byte, error) {
+ return []byte(d.String()), nil
+}
+// String formats a duration as a number of seconds, using
+// fixed-point notation with no more than 6 decimal places.
+func (d loggedDuration) String() string {
+ return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
}
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"io"
"io/ioutil"
- "log"
"time"
+
+ log "github.com/Sirupsen/logrus"
)
// RunPullWorker is used by Keepstore to initiate pull worker channel goroutine.
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"os"
"regexp"
"strings"
"sync"
+ "sync/atomic"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
+ log "github.com/Sirupsen/logrus"
)
const (
ReadOnly bool
UnsafeDelete bool
- bucket *s3.Bucket
+ bucket *s3bucket
startOnce sync.Once
}
client := s3.New(auth, region)
client.ConnectTimeout = time.Duration(v.ConnectTimeout)
client.ReadTimeout = time.Duration(v.ReadTimeout)
- v.bucket = &s3.Bucket{
- S3: client,
- Name: v.Bucket,
+ v.bucket = &s3bucket{
+ Bucket: &s3.Bucket{
+ S3: client,
+ Name: v.Bucket,
+ },
}
return nil
}
if err == nil || !os.IsNotExist(err) {
return
}
+
_, err = v.bucket.Head("recent/"+loc, nil)
err = v.translateError(err)
if err != nil {
err = os.ErrNotExist
return
}
+
rdr, err = v.bucket.GetReader(loc)
if err != nil {
log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
- Bucket: v.bucket,
+ Bucket: v.bucket.Bucket,
Prefix: prefix,
PageSize: v.IndexPageSize,
}
recentL := s3Lister{
- Bucket: v.bucket,
+ Bucket: v.bucket.Bucket,
Prefix: "recent/" + prefix,
PageSize: v.IndexPageSize,
}
+ v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+ v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
+ v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
if data.Key >= "g" {
// Conveniently, "recent/*" and "trash/*" are
// lexically greater than all hex-encoded data
for recent != nil {
if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
recent = recentL.Next()
+ v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
continue
} else if cmp == 0 {
stamp = recent
recent = recentL.Next()
+ v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
break
} else {
// recent/X marker is missing: we'll
if !s3UnsafeDelete {
return ErrS3TrashDisabled
}
- return v.bucket.Del(loc)
+ return v.translateError(v.bucket.Del(loc))
}
err := v.checkRaceWindow(loc)
if err != nil {
}
}
+// InternalStats returns bucket I/O and API call counters.
+func (v *S3Volume) InternalStats() interface{} {
+ return &v.bucket.stats
+}
+
// String implements fmt.Stringer.
func (v *S3Volume) String() string {
- return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
+ return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
}
// Writable returns false if all future Put, Mtime, and Delete calls
// Use a merge sort to find matching sets of trash/X and recent/X.
trashL := s3Lister{
- Bucket: v.bucket,
+ Bucket: v.bucket.Bucket,
Prefix: "trash/",
PageSize: v.IndexPageSize,
}
v.fixRace(loc)
v.Touch(loc)
continue
- } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
+ }
+ _, err := v.bucket.Head(loc, nil)
+ if os.IsNotExist(err) {
log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
v.fixRace(loc)
continue
}
return
}
+
+// s3bucket wraps s3.bucket and counts I/O and API usage stats.
+type s3bucket struct {
+ *s3.Bucket
+ stats s3bucketStats
+}
+
+func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
+ rdr, err := b.Bucket.GetReader(path)
+ b.stats.tick(&b.stats.Ops, &b.stats.GetOps)
+ b.stats.tickErr(err)
+ return NewCountingReader(rdr, b.stats.tickInBytes), err
+}
+
+func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
+ resp, err := b.Bucket.Head(path, headers)
+ b.stats.tick(&b.stats.Ops, &b.stats.HeadOps)
+ b.stats.tickErr(err)
+ return resp, err
+}
+
+func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
+ err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options)
+ b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+ b.stats.tickErr(err)
+ return err
+}
+
+func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
+ err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options)
+ b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+ b.stats.tickErr(err)
+ return err
+}
+
+func (b *s3bucket) Del(path string) error {
+ err := b.Bucket.Del(path)
+ b.stats.tick(&b.stats.Ops, &b.stats.DelOps)
+ b.stats.tickErr(err)
+ return err
+}
+
+type s3bucketStats struct {
+ Errors uint64
+ Ops uint64
+ GetOps uint64
+ PutOps uint64
+ HeadOps uint64
+ DelOps uint64
+ ListOps uint64
+ InBytes uint64
+ OutBytes uint64
+
+ ErrorCodes map[string]uint64 `json:",omitempty"`
+
+ lock sync.Mutex
+}
+
+func (s *s3bucketStats) tickInBytes(n uint64) {
+ atomic.AddUint64(&s.InBytes, n)
+}
+
+func (s *s3bucketStats) tickOutBytes(n uint64) {
+ atomic.AddUint64(&s.OutBytes, n)
+}
+
+func (s *s3bucketStats) tick(counters ...*uint64) {
+ for _, counter := range counters {
+ atomic.AddUint64(counter, 1)
+ }
+}
+
+func (s *s3bucketStats) tickErr(err error) {
+ if err == nil {
+ return
+ }
+ atomic.AddUint64(&s.Errors, 1)
+ errStr := fmt.Sprintf("%T", err)
+ if err, ok := err.(*s3.Error); ok {
+ errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
+ }
+ s.lock.Lock()
+ if s.ErrorCodes == nil {
+ s.ErrorCodes = make(map[string]uint64)
+ }
+ s.ErrorCodes[errStr]++
+ s.lock.Unlock()
+}
"bytes"
"context"
"crypto/md5"
+ "encoding/json"
"fmt"
"io/ioutil"
- "log"
"os"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
+ log "github.com/Sirupsen/logrus"
check "gopkg.in/check.v1"
)
}
}
+func (s *StubbedS3Suite) TestStats(c *check.C) {
+ v := s.newTestableVolume(c, 5*time.Minute, false, 2)
+ stats := func() string {
+ buf, err := json.Marshal(v.InternalStats())
+ c.Check(err, check.IsNil)
+ return string(buf)
+ }
+
+ c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+
+ loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+ _, err := v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.NotNil)
+ c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+ c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
+ c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+ err = v.Put(context.Background(), loc, []byte("foo"))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+ c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
+
+ _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ _, err = v.Get(context.Background(), loc, make([]byte, 3))
+ c.Check(err, check.IsNil)
+ c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
defer func(tl, bs arvados.Duration) {
theConfig.TrashLifetime = tl
import (
"errors"
- "log"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ log "github.com/Sirupsen/logrus"
)
// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
"address" is a host IP address or name and "port" is a port number
or name.
+LogFormat:
+
+ Format of request/response and error logs: "json" or "text".
+
PIDFile:
Path to write PID file during startup. This file is kept open and
// with more free space, etc.
NextWritable() Volume
+ // VolumeStats returns the ioStats used for tracking stats for
+ // the given Volume.
+ VolumeStats(Volume) *ioStats
+
// Close shuts down the volume manager cleanly.
Close()
}
readables []Volume
writables []Volume
counter uint32
+ iostats map[Volume]*ioStats
}
// MakeRRVolumeManager initializes RRVolumeManager
func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
- vm := &RRVolumeManager{}
+ vm := &RRVolumeManager{
+ iostats: make(map[Volume]*ioStats),
+ }
for _, v := range volumes {
+ vm.iostats[v] = &ioStats{}
vm.readables = append(vm.readables, v)
if v.Writable() {
vm.writables = append(vm.writables, v)
return vm.writables[i%uint32(len(vm.writables))]
}
+// VolumeStats returns an ioStats for the given volume.
+func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
+ return vm.iostats[v]
+}
+
// Close the RRVolumeManager
func (vm *RRVolumeManager) Close() {
}
-// VolumeStatus provides status information of the volume consisting of:
-// * mount_point
-// * device_num (an integer identifying the underlying storage system)
-// * bytes_free
-// * bytes_used
+// VolumeStatus describes the current condition of a volume
type VolumeStatus struct {
- MountPoint string `json:"mount_point"`
- DeviceNum uint64 `json:"device_num"`
- BytesFree uint64 `json:"bytes_free"`
- BytesUsed uint64 `json:"bytes_used"`
+ MountPoint string
+ DeviceNum uint64
+ BytesFree uint64
+ BytesUsed uint64
+}
+
+// ioStats tracks I/O statistics for a volume or server
+type ioStats struct {
+ Errors uint64
+ Ops uint64
+ CompareOps uint64
+ GetOps uint64
+ PutOps uint64
+ TouchOps uint64
+ InBytes uint64
+ OutBytes uint64
+}
+
+type InternalStatser interface {
+ InternalStats() interface{}
}
"fmt"
"io"
"io/ioutil"
- "log"
"os"
"path/filepath"
"regexp"
"sync"
"syscall"
"time"
+
+ log "github.com/Sirupsen/logrus"
)
type unixVolumeAdder struct {
// uses fs.Blocks - fs.Bfree.
free := fs.Bavail * uint64(fs.Bsize)
used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
- return &VolumeStatus{v.Root, devnum, free, used}
+ return &VolumeStatus{
+ MountPoint: v.Root,
+ DeviceNum: devnum,
+ BytesFree: free,
+ BytesUsed: used,
+ }
}
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
tar -C /usr/local -xjf /tmp/$PJS.tar.bz2 && \
ln -s ../$PJS/bin/phantomjs /usr/local/bin/
+RUN pip install -U setuptools
+
ARG arvados_version
RUN echo arvados_version is git commit $arvados_version
+export PATH=${PATH}:/usr/local/go/bin:/var/lib/gems/bin
+export GEM_HOME=/var/lib/gems
+export GEM_PATH=/var/lib/gems
+
if test -s /var/run/localip_override ; then
localip=$(cat /var/run/localip_override)
else