8460: Merge branch 'master' into 8460-websocket-go
authorTom Clegg <tom@curoverse.com>
Wed, 30 Nov 2016 20:09:13 +0000 (15:09 -0500)
committerTom Clegg <tom@curoverse.com>
Wed, 30 Nov 2016 20:09:13 +0000 (15:09 -0500)
75 files changed:
apps/workbench/app/controllers/projects_controller.rb
apps/workbench/app/views/projects/_show_dashboard.html.erb
apps/workbench/app/views/projects/_show_processes.html.erb [new file with mode: 0644]
apps/workbench/app/views/projects/show.html.erb
apps/workbench/test/controllers/disabled_api_test.rb
apps/workbench/test/integration/application_layout_test.rb
apps/workbench/test/integration/pipeline_instances_test.rb
apps/workbench/test/integration/projects_test.rb
apps/workbench/test/integration/work_units_test.rb
apps/workbench/test/performance/browsing_test.rb
build/run-build-packages-one-target.sh
build/run-build-packages.sh
build/run-library.sh
doc/_includes/_container_runtime_constraints.liquid
doc/_includes/_container_scheduling_parameters.liquid [new file with mode: 0644]
doc/_includes/_crunch1only_begin.liquid [new file with mode: 0644]
doc/_includes/_crunch1only_end.liquid [new file with mode: 0644]
doc/_includes/_notebox_begin_warning.liquid [new file with mode: 0644]
doc/_includes/_pipeline_deprecation_notice.liquid
doc/api/methods/container_requests.html.textile.liquid
doc/api/methods/containers.html.textile.liquid
doc/images/upload-using-workbench.png
doc/images/workbench-dashboard.png
doc/images/workbench-move-selected.png
doc/install/install-keep-web.html.textile.liquid
doc/install/install-keepproxy.html.textile.liquid
doc/user/cwl/cwl-runner.html.textile.liquid
doc/user/getting_started/workbench.html.textile.liquid
doc/user/topics/arv-run.html.textile.liquid
doc/user/topics/running-pipeline-command-line.html.textile.liquid
doc/user/tutorials/tutorial-pipeline-workbench.html.textile.liquid
sdk/cli/bin/crunch-job
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_submit.py
sdk/go/httpserver/id_generator.go [new file with mode: 0644]
sdk/go/httpserver/request_limiter.go
sdk/python/arvados/arvfile.py
sdk/python/arvados/keep.py
sdk/python/tests/test_events.py
sdk/python/tests/test_keep_client.py
services/api/app/controllers/arvados/v1/container_requests_controller.rb
services/api/app/controllers/arvados/v1/containers_controller.rb
services/api/app/controllers/arvados/v1/groups_controller.rb
services/api/app/models/container_request.rb
services/api/db/migrate/20161115171221_add_output_and_log_uuid_to_container_request.rb [new file with mode: 0644]
services/api/db/migrate/20161115174218_add_output_and_log_uuids_to_container_request_search_index.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/functional/arvados/v1/container_requests_controller_test.rb [new file with mode: 0644]
services/api/test/functional/arvados/v1/groups_controller_test.rb
services/api/test/unit/container_request_test.rb
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/bufferpool.go
services/keepstore/config.go
services/keepstore/config_test.go
services/keepstore/count.go [new file with mode: 0644]
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/logging_router.go
services/keepstore/pull_worker.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/trash_worker.go
services/keepstore/usage.go
services/keepstore/volume.go
services/keepstore/volume_unix.go
tools/arvbox/lib/arvbox/docker/Dockerfile.base
tools/arvbox/lib/arvbox/docker/common.sh

index 0a2044a0e23e96b741d77658dfa91057fe57bdfa..48b2c421fb08385e648d95ba8cac809e948eb7b8 100644 (file)
@@ -55,8 +55,10 @@ class ProjectsController < ApplicationController
     pane_list = []
 
     procs = ["arvados#containerRequest"]
+    procs_pane_name = 'Processes'
     if PipelineInstance.api_exists?(:index)
       procs << "arvados#pipelineInstance"
+      procs_pane_name = 'Pipelines_and_processes'
     end
 
     workflows = ["arvados#workflow"]
@@ -76,7 +78,7 @@ class ProjectsController < ApplicationController
       }
     pane_list <<
       {
-        :name => 'Pipelines_and_processes',
+        :name => procs_pane_name,
         :filters => [%w(uuid is_a) + [procs]]
       }
     pane_list <<
index e0093bf6de320a3aacd471e8dbc3c4128983aed5..ab6eb16f5153862061a40c6b59f85bd89819c4f5 100644 (file)
   preload_links_for_objects(collection_pdhs + collection_uuids)
 %>
 
+<%
+  if !PipelineInstance.api_exists?(:index)
+    recent_procs_title = 'Recent processes'
+    run_proc_title = 'Choose a workflow to run:'
+  else
+    recent_procs_title = 'Recent pipelines and processes'
+    run_proc_title = 'Choose a pipeline or workflow to run:'
+  end
+%>
+
   <div class="row">
     <div class="col-md-6">
       <div class="panel panel-default" style="min-height: 10.5em">
         <div class="panel-heading">
-          <span class="panel-title">Recent pipelines and processes</span>
+          <span class="panel-title"><%=recent_procs_title%></span>
           <% if current_user.andand.is_active %>
             <span class="pull-right recent-processes-actions">
               <span>
                 <%= link_to(
                 choose_work_unit_templates_path(
-                  title: 'Choose a pipeline or workflow to run:',
+                  title: run_proc_title,
                   action_name: 'Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i>',
                   action_href: work_units_path,
                   action_method: 'post',
                   action_data: {'selection_param' => 'work_unit[template_uuid]', 'work_unit[owner_uuid]' => current_user.uuid, 'success' => 'redirect-to-created-object'}.to_json),
                 { class: "btn btn-primary btn-xs", remote: true }) do %>
-                  <i class="fa fa-fw fa-gear"></i> Run a pipeline...
+                  <i class="fa fa-fw fa-gear"></i> Run a process...
                 <% end %>
               </span>
               <span>
diff --git a/apps/workbench/app/views/projects/_show_processes.html.erb b/apps/workbench/app/views/projects/_show_processes.html.erb
new file mode 100644 (file)
index 0000000..71f6a89
--- /dev/null
@@ -0,0 +1,5 @@
+<%= render_pane 'tab_contents', to_string: true, locals: {
+      limit: 50,
+      filters: [['uuid', 'is_a', ["arvados#containerRequest"]]],
+      sortable_columns: { 'name' => 'container_requests.name', 'description' => 'container_requests.description' }
+    }.merge(local_assigns) %>
index e52d826cf60da778f9b343d1d44578c6cc5b0c7f..56055645170b8640b69c5691352ca858c45205ce 100644 (file)
@@ -9,6 +9,16 @@
   </h2>
 <% end %>
 
+<%
+  if !PipelineInstance.api_exists?(:index)
+    run_proc_title = 'Choose a workflow to run:'
+    run_proc_hover = 'Run a workflow in this project'
+  else
+    run_proc_title = 'Choose a pipeline or workflow to run:'
+    run_proc_hover = 'Run a pipeline or workflow in this project'
+  end
+%>
+
 <% content_for :tab_line_buttons do %>
   <% if @object.editable? %>
     <div class="btn-group btn-group-sm">
     </div>
     <%= link_to(
           choose_work_unit_templates_path(
-            title: 'Choose a pipeline or workflow to run:',
+            title: run_proc_title,
             action_name: 'Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i>',
             action_href: work_units_path,
             action_method: 'post',
             action_data: {'selection_param' => 'work_unit[template_uuid]', 'work_unit[owner_uuid]' => @object.uuid, 'success' => 'redirect-to-created-object'}.to_json),
-          { class: "btn btn-primary btn-sm", remote: true, title: "Run a pipeline or workflow in this project" }) do %>
-      <i class="fa fa-fw fa-gear"></i> Run a pipeline...
+          { class: "btn btn-primary btn-sm", remote: true, title: run_proc_hover }) do %>
+      <i class="fa fa-fw fa-gear"></i> Run a process...
     <% end %>
     <%= link_to projects_path({'project[owner_uuid]' => @object.uuid, 'options' => {'ensure_unique_name' => true}}), method: :post, title: "Add a subproject to this project", class: 'btn btn-sm btn-primary' do %>
       <i class="fa fa-fw fa-plus"></i>
index a41d87f31ab34187804bbfb25beb12da310f9aea..47276c02e835419cf89601e78c153f4f5431df21 100644 (file)
@@ -12,6 +12,7 @@ class DisabledApiTest < ActionController::TestCase
     get :index, {}, session_for(:active)
     assert_includes @response.body, "zzzzz-xvhdp-cr4runningcntnr" # expect crs
     assert_not_includes @response.body, "zzzzz-d1hrv-"   # expect no pipelines
+    assert_includes @response.body, "Run a process"
   end
 
   [
@@ -33,6 +34,7 @@ class DisabledApiTest < ActionController::TestCase
   end
 
   [
+    :admin,
     :active,
     nil,
   ].each do |user|
@@ -58,6 +60,7 @@ class DisabledApiTest < ActionController::TestCase
       assert_includes resp, "href=\"#Pipelines_and_processes\""
       assert_includes resp, "href=\"#Workflows\""
       assert_not_includes resp, "href=\"#Pipeline_templates\""
+      assert_includes @response.body, "Run a process" if user == :admin
     end
   end
 end
index c4eb941b08894bb4cf58c88f75ba7e721e120f3f..1d68b38d98ec79a1f755a7f6f1a4b7ee2d3284ec 100644 (file)
@@ -251,7 +251,7 @@ class ApplicationLayoutTest < ActionDispatch::IntegrationTest
 
       assert_text 'Recent pipelines and processes' # seeing dashboard now
       within('.recent-processes-actions') do
-        assert page.has_link?('Run a pipeline')
+        assert page.has_link?('Run a process')
         assert page.has_link?('All processes')
       end
 
index 171580bbaa2bc9816a9ba1061e40142d0487c8e9..338280684ecd3a07a5f8e5f244c528a57d7b51b5 100644 (file)
@@ -391,7 +391,7 @@ class PipelineInstancesTest < ActionDispatch::IntegrationTest
     collection = api_fixture('collections', collection_fixture)
 
     # create a pipeline instance
-    find('.btn', text: 'Run a pipeline').click
+    find('.btn', text: 'Run a process').click
     within('.modal-dialog') do
       find('.selectable', text: template_name).click
       find('.btn', text: 'Next: choose inputs').click
index e5877aca6d1e88824b2575ba571eda21de403ee3..27eac8ab566ca916d42569c52ceba15e41e7fcd1 100644 (file)
@@ -746,7 +746,7 @@ class ProjectsTest < ActionDispatch::IntegrationTest
       project = api_fixture('groups')['aproject']
       visit page_with_token 'active', '/projects/' + project['uuid']
 
-      find('.btn', text: 'Run a pipeline').click
+      find('.btn', text: 'Run a process').click
 
       # in the chooser, verify preview and click Next button
       within('.modal-dialog') do
index f04616dd383ac49f927a544fb2e7d372c30b8acb..3f551a012ea62a692b5fdfcf0c99de0208b359bd 100644 (file)
@@ -118,7 +118,7 @@ class WorkUnitsTest < ActionDispatch::IntegrationTest
 
       within('.recent-processes-actions') do
         assert page.has_link?('All processes')
-        find('a', text: 'Run a pipeline').click
+        find('a', text: 'Run a process').click
       end
 
       # in the chooser, verify preview and click Next button
index d068ee2aaf08f3e7451ae817599e7dcaaca0d535..dcfd7d86e050532319662f13666db55dd69bf06c 100644 (file)
@@ -19,7 +19,7 @@ class BrowsingTest < WorkbenchPerformanceTest
   test "home page" do
     visit_page_with_token
     assert_text 'Dashboard'
-    assert_selector 'a', text: 'Run a pipeline'
+    assert_selector 'a', text: 'Run a process'
   end
 
   test "search for hash" do
index adcb87f34d79b6e344317e75fa8fca3897d166cc..60250c9d50b4b07befa49903645ae00cd0b8a4d6 100755 (executable)
@@ -14,7 +14,9 @@ Syntax:
     Run package install test script "test-packages-$target.sh"
 --debug
     Output debug information (default: false)
---only-test
+--only-build <package>
+    Build only a specific package
+--only-test <package>
     Test only a specific package
 
 WORKSPACE=path         Path to the Arvados source tree to build packages from
@@ -40,7 +42,7 @@ if ! [[ -d "$WORKSPACE" ]]; then
 fi
 
 PARSEDOPTS=$(getopt --name "$0" --longoptions \
-    help,debug,test-packages,target:,command:,only-test: \
+    help,debug,test-packages,target:,command:,only-test:,only-build: \
     -- "" "$@")
 if [ $? -ne 0 ]; then
     exit 1
@@ -62,8 +64,12 @@ while [ $# -gt 0 ]; do
             TARGET="$2"; shift
             ;;
         --only-test)
+            test_packages=1
             packages="$2"; shift
             ;;
+        --only-build)
+            ONLY_BUILD="$2"; shift
+            ;;
         --debug)
             DEBUG=" --debug"
             ;;
@@ -191,6 +197,7 @@ else
     if docker run --rm \
         "${docker_volume_args[@]}" \
         --env ARVADOS_DEBUG=1 \
+        --env "ONLY_BUILD=$ONLY_BUILD" \
         "$IMAGE" $COMMAND
     then
         echo
index 0a4559f95b3518aff58246fe4839052a7d3746b8..116d14bfe786e88429a5d1ca58d2aba9abcba418 100755 (executable)
@@ -15,8 +15,10 @@ Options:
     Build api server and workbench packages with vendor/bundle included
 --debug
     Output debug information (default: false)
---target
+--target <target>
     Distribution to build packages for (default: debian7)
+--only-build <package>
+    Build only a specific package (or $ONLY_BUILD from environment)
 --command
     Build command to execute (defaults to the run command defined in the
     Docker image)
@@ -31,7 +33,7 @@ TARGET=debian7
 COMMAND=
 
 PARSEDOPTS=$(getopt --name "$0" --longoptions \
-    help,build-bundle-packages,debug,target: \
+    help,build-bundle-packages,debug,target:,only-build: \
     -- "" "$@")
 if [ $? -ne 0 ]; then
     exit 1
@@ -48,6 +50,9 @@ while [ $# -gt 0 ]; do
         --target)
             TARGET="$2"; shift
             ;;
+        --only-build)
+            ONLY_BUILD="$2"; shift
+            ;;
         --debug)
             DEBUG=1
             ;;
@@ -248,6 +253,7 @@ fi
 # Perl packages
 debug_echo -e "\nPerl packages\n"
 
+if [[ -z "$ONLY_BUILD" ]] || [[ "libarvados-perl" = "$ONLY_BUILD" ]] ; then
 cd "$WORKSPACE/sdk/perl"
 
 if [[ -e Makefile ]]; then
@@ -263,6 +269,7 @@ perl Makefile.PL INSTALL_BASE=install >"$STDOUT_IF_DEBUG" && \
     "Curoverse, Inc." dir "$(version_from_git)" install/man/=/usr/share/man \
     "$WORKSPACE/LICENSE-2.0.txt=/usr/share/doc/libarvados-perl/LICENSE-2.0.txt" && \
     mv --no-clobber libarvados-perl*.$FORMAT "$WORKSPACE/packages/$TARGET/"
+fi
 
 # Ruby gems
 debug_echo -e "\nRuby gems\n"
@@ -469,7 +476,7 @@ fpm_build lockfile "" "" python 0.12.2 --epoch 1
 # So we build this thing separately.
 #
 # Ward, 2016-03-17
-fpm_build schema_salad "" "" python 1.18.20161005190847 --depends "${PYTHON2_PKG_PREFIX}-lockfile >= 1:0.12.2-2"
+fpm_build schema_salad "" "" python 1.20.20161122192122 --depends "${PYTHON2_PKG_PREFIX}-lockfile >= 1:0.12.2-2"
 
 # And schema_salad now depends on ruamel-yaml, which apparently has a braindead setup.py that requires special arguments to build (otherwise, it aborts with 'error: you have to install with "pip install ."'). Sigh.
 # Ward, 2016-05-26
@@ -480,7 +487,7 @@ fpm_build ruamel.yaml "" "" python 0.12.4 --python-setup-py-arguments "--single-
 fpm_build cwltest "" "" python 1.0.20160907111242
 
 # And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20161107145355
+fpm_build cwltool "" "" python 1.0.20161128202906
 
 # FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
 fpm_build rdflib-jsonld "" "" python 0.3.0
@@ -541,6 +548,11 @@ esac
 
 for deppkg in "${PYTHON_BACKPORTS[@]}"; do
     outname=$(echo "$deppkg" | sed -e 's/^python-//' -e 's/[<=>].*//' -e 's/_/-/g' -e "s/^/${PYTHON2_PKG_PREFIX}-/")
+
+    if [[ -n "$ONLY_BUILD" ]] && [[ "$outname" != "$ONLY_BUILD" ]] ; then
+        continue
+    fi
+
     case "$deppkg" in
         httplib2|google-api-python-client)
             # Work around 0640 permissions on some package files.
@@ -590,6 +602,7 @@ handle_rails_package arvados-api-server "$WORKSPACE/services/api" \
     --license="GNU Affero General Public License, version 3.0"
 
 # Build the workbench server package
+if [[ -z "$ONLY_BUILD" ]] || [[ "arvados-workbench" = "$ONLY_BUILD" ]] ; then
 (
     set -e
     cd "$WORKSPACE/apps/workbench"
@@ -614,6 +627,7 @@ handle_rails_package arvados-api-server "$WORKSPACE/services/api" \
     # Remove generated configuration files so they don't go in the package.
     rm config/application.yml config/environments/production.rb
 )
+fi
 
 if [[ "$?" != "0" ]]; then
   echo "ERROR: Asset precompilation failed"
index f0b120f6bf1e4e011a69f9f811ee67ad55624938..541e684bf3ac202940742bd787a62baa93ffc7b9 100755 (executable)
@@ -69,6 +69,10 @@ handle_ruby_gem() {
     local gem_version="$(nohash_version_from_git)"
     local gem_src_dir="$(pwd)"
 
+    if [[ -n "$ONLY_BUILD" ]] && [[ "$gem_name" != "$ONLY_BUILD" ]] ; then
+        return 0
+    fi
+
     if ! [[ -e "${gem_name}-${gem_version}.gem" ]]; then
         find -maxdepth 1 -name "${gem_name}-*.gem" -delete
 
@@ -84,6 +88,10 @@ package_go_binary() {
     local description="$1"; shift
     local license_file="${1:-agpl-3.0.txt}"; shift
 
+    if [[ -n "$ONLY_BUILD" ]] && [[ "$prog" != "$ONLY_BUILD" ]] ; then
+        return 0
+    fi
+
     debug_echo "package_go_binary $src_path as $prog"
 
     local basename="${src_path##*/}"
@@ -143,6 +151,11 @@ _build_rails_package_scripts() {
 
 handle_rails_package() {
     local pkgname="$1"; shift
+
+    if [[ -n "$ONLY_BUILD" ]] && [[ "$pkgname" != "$ONLY_BUILD" ]] ; then
+        return 0
+    fi
+
     local srcdir="$1"; shift
     local license_path="$1"; shift
     local scripts_dir="$(mktemp --tmpdir -d "$pkgname-XXXXXXXX.scripts")" && \
@@ -208,6 +221,10 @@ fpm_build () {
   VERSION=$1
   shift
 
+  if [[ -n "$ONLY_BUILD" ]] && [[ "$PACKAGE_NAME" != "$ONLY_BUILD" ]] && [[ "$PACKAGE" != "$ONLY_BUILD" ]] ; then
+      return 0
+  fi
+
   local default_iteration_value="$(default_iteration "$PACKAGE" "$VERSION")"
 
   case "$PACKAGE_TYPE" in
index 849db42e47827c7a3cc2ddea8a28f36d3434979e..d505bfd9e0ec9981b84a19f046ea8260f34577e4 100644 (file)
@@ -8,4 +8,3 @@ table(table table-bordered table-condensed).
 |vcpus|integer|Number of cores to be used to run this process.|Optional. However, a ContainerRequest that is in "Committed" state must provide this.|
 |keep_cache_ram|integer|Number of keep cache bytes to be used to run this process.|Optional.|
 |API|boolean|When set, ARVADOS_API_HOST and ARVADOS_API_TOKEN will be set, and container will have networking enabled to access the Arvados API server.|Optional.|
-|partition|array of strings|Specify the names of one or more compute partitions that may run this container.  If not provided, the system chooses where to run the container.|Optional.|
diff --git a/doc/_includes/_container_scheduling_parameters.liquid b/doc/_includes/_container_scheduling_parameters.liquid
new file mode 100644 (file)
index 0000000..ee2ca07
--- /dev/null
@@ -0,0 +1,7 @@
+Scheduling parameters
+
+Parameters to be passed to the container scheduler (e.g., SLURM) when running a container.
+
+table(table table-bordered table-condensed).
+|_. Key|_. Type|_. Description|_. Notes|
+|partitions|array of strings|The names of one or more compute partitions that may run this container. If not provided, the system will choose where to run the container.|Optional.|
diff --git a/doc/_includes/_crunch1only_begin.liquid b/doc/_includes/_crunch1only_begin.liquid
new file mode 100644 (file)
index 0000000..5c08d1e
--- /dev/null
@@ -0,0 +1,2 @@
+{% include 'notebox_begin_warning' %}
+This section assumes the legacy Jobs API is available. Some newer installations have already disabled the Jobs API in favor of the Containers API.
diff --git a/doc/_includes/_crunch1only_end.liquid b/doc/_includes/_crunch1only_end.liquid
new file mode 100644 (file)
index 0000000..f8b437a
--- /dev/null
@@ -0,0 +1 @@
+{% include 'notebox_end' %}
diff --git a/doc/_includes/_notebox_begin_warning.liquid b/doc/_includes/_notebox_begin_warning.liquid
new file mode 100644 (file)
index 0000000..9f97e74
--- /dev/null
@@ -0,0 +1,2 @@
+<div class="alert alert-block alert-warning">
+  <h4>Note:</h4>
index 682511f70c37ac9f2d31960adc8a3d07a712efd5..a2747b0a5916b76c34df8466a9a08069676feb3c 100644 (file)
@@ -1,3 +1,3 @@
-{% include 'notebox_begin' %}
+{% include 'notebox_begin_warning' %}
 Arvados pipeline templates are deprecated.  The recommended way to develop new workflows for Arvados is using the "Common Workflow Language":{{site.baseurl}}/user/cwl/cwl-runner.html.
 {% include 'notebox_end' %}
index 304226d5de59af541906b975b2b4cd2313e2290e..3ed5b358fef136a90b7840680097b3957b356ab9 100644 (file)
@@ -35,12 +35,15 @@ table(table table-bordered table-condensed).
   "vcpus":2,
   "API":true
 }</code></pre>See "Runtime constraints":#runtime_constraints for more details.|
+|scheduling_parameters|hash|Parameters to be passed to the container scheduler when running this container.|e.g.,<pre><code>{
+"partitions":["fastcpu","vfastcpu"]
+}</code></pre>See "Scheduling parameters":#scheduling_parameters for more details.|
 |container_image|string|Portable data hash of a collection containing the docker image to run the container.|Required.|
 |environment|hash|Environment variables and values that should be set in the container environment (@docker run --env@). This augments and (when conflicts exist) overrides environment variables given in the image's Dockerfile.||
 |cwd|string|Initial working directory, given as an absolute path (in the container) or a path relative to the WORKDIR given in the image's Dockerfile.|Required.|
 |command|array of strings|Command to execute in the container.|Required. e.g., @["echo","hello"]@|
 |output_path|string|Path to a directory or file inside the container that should be preserved as container's output when it finishes. This path must be, or be inside, one of the mount targets. For best performance, point output_path to a writable collection mount.|Required.|
-|priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to prevew the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
+|priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to preview the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
 |expires_at|datetime|After this time, priority is considered to be zero.|Not yet implemented.|
 |use_existing|boolean|If possible, use an existing (non-failed) container to satisfy the request instead of creating a new one.|Default is true|
 |filters|string|Additional constraints for satisfying the container_request, given in the same form as the filters parameter accepted by the container_requests.list API.|
@@ -49,6 +52,8 @@ h2(#mount_types). {% include 'mount_types' %}
 
 h2(#runtime_constraints). {% include 'container_runtime_constraints' %}
 
+h2(#scheduling_parameters). {% include 'container_scheduling_parameters' %}
+
 h2(#container_reuse). Container reuse
 
 When a container request is "Committed", the system will try to find and reuse any preexisting Container with the same exact command, cwd, environment, output_path, container_image, mounts, and runtime_constraints as this container request. The serialized fields environment, mounts and runtime_constraints are sorted to facilitate comparison.
index 221141cebc82537631b2c6b7fd53d18f3af36afa..7eed8b0d30f84aa07dddbc46edf0a35773b63977 100644 (file)
@@ -36,6 +36,9 @@ Generally this will contain additional keys that are not present in any correspo
   "vcpus":2,
   "API":true
 }</code></pre>See "Runtime constraints":#runtime_constraints for more details.|
+|scheduling_parameters|hash|Parameters to be passed to the container scheduler when running this container.|e.g.,<pre><code>{
+"partitions":["fastcpu","vfastcpu"]
+}</code></pre>See "Scheduling parameters":#scheduling_parameters for more details.|
 |output|string|Portable data hash of the output collection.|Null if the container is not yet finished.|
 |container_image|string|Portable data hash of a collection containing the docker image used to run the container.||
 |progress|number|A number between 0.0 and 1.0 describing the fraction of work done.||
@@ -58,6 +61,8 @@ h2(#mount_types). {% include 'mount_types' %}
 
 h2(#runtime_constraints). {% include 'container_runtime_constraints' %}
 
+h2(#scheduling_parameters). {% include 'container_scheduling_parameters' %}
+
 h2. Methods
 
 See "Common resource methods":{{site.baseurl}}/api/methods.html for more information about @create@, @delete@, @get@, @list@, and @update@.
index de8dc9e477f53e1369da23f5705d57cfdc901f0a..3d67577e68412a7e11b8f3307ff162caaee30520 100644 (file)
Binary files a/doc/images/upload-using-workbench.png and b/doc/images/upload-using-workbench.png differ
index 76df32c9e2b27aefdb96e6119b4e1bb216d18174..3cdf1e4a66cfd552fbcf41ce46c1e13687d673a2 100644 (file)
Binary files a/doc/images/workbench-dashboard.png and b/doc/images/workbench-dashboard.png differ
index 5ed1ef53e1b4a29a18cb5cb1394f1750e3963df7..bba1a1c60176748f51fb653bfdb3918a7e485e7b 100644 (file)
Binary files a/doc/images/workbench-move-selected.png and b/doc/images/workbench-move-selected.png differ
index 16d23e6df56d2fb58cf38974a1e31dbda82d8915..f1c38ea4826394c3b77474c1787a23be6514b476 100644 (file)
@@ -51,7 +51,7 @@ Usage of keep-web:
 </code></pre>
 </notextile>
 
-{% assign railscmd = "bundle exec ./script/get_anonymous_user_token.rb" %}
+{% assign railscmd = "bundle exec ./script/get_anonymous_user_token.rb --get" %}
 {% assign railsout = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" %}
 If you intend to use Keep-web to serve public data to anonymous clients, configure it with an anonymous token. You can use the same one you used when you set up your Keepproxy server, or use the following command on the <strong>API server</strong> to create another. {% include 'install_rails_command' %}
 
index a6bb5d4bd9aeb3a6d2b276d20e1fc1e0505bf2c9..f1a2688014b54fc3da1db8309b231e766bbccfa8 100644 (file)
@@ -51,7 +51,7 @@ Usage of keepproxy:
 
 h3. Create an API token for the Keepproxy server
 
-{% assign railscmd = "bundle exec ./script/get_anonymous_user_token.rb" %}
+{% assign railscmd = "bundle exec ./script/get_anonymous_user_token.rb --get" %}
 {% assign railsout = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" %}
 The Keepproxy server needs a token to talk to the API server.  On the <strong>API server</strong>, use the following command to create the token.  {% include 'install_rails_command' %}
 
index 6dac43aa65f87dbaa1e239d3c8dd0ccb836dbf4c..30069907432097120d9e18ec8054126a1887fbee 100644 (file)
@@ -141,12 +141,12 @@ If you reference a file in "arv-mount":{{site.baseurl}}/user/tutorials/tutorial-
 
 If you reference a local file which is not in @arv-mount@, then @arvados-cwl-runner@ will upload the file to Keep and use the Keep URI reference from the upload.
 
-h2. Registering a workflow with Workbench
+h2. Registering a workflow to use in Workbench
 
-Use @--create-template@ to register a CWL workflow with Arvados Workbench.  This enables you to run workflows by clicking on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a pipeline...</span> on the Workbench Dashboard.
+Use @--create-workflow@ to register a CWL workflow with Arvados.  This enables you to share workflows with other Arvados users, and run them by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard.
 
 <notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-template bwa-mem.cwl</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
 arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
 2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
 2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to qr1hi-4zz18-7e0hedrmkuyoei3
@@ -158,7 +158,7 @@ qr1hi-p5p6p-rjleou1dwr167v5
 You can provide a partial input file to set default values for the workflow input parameters:
 
 <notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-template bwa-mem.cwl bwa-mem-template.yml</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
 arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
 2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
 2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to qr1hi-4zz18-0f91qkovk4ml18o
index 6e334ba0dd7583ddae43b48149a270d3d92ad458..a64727fed00fc7df4971da4c12058c65c53ac45c 100644 (file)
@@ -14,4 +14,4 @@ You may be asked to log in using a Google account.  Arvados uses only your name
 
 Once your account is active, logging in to the Workbench will present you with the Dashboard. This gives a summary of your projects and recent activity in the Arvados instance.  "You are now ready to run your first pipeline.":{{ site.baseurl }}/user/tutorials/tutorial-pipeline-workbench.html
 
-!{{ site.baseurl }}/images/workbench-dashboard.png!
+!{display: block;margin-left: 25px;margin-right: auto;border:1px solid lightgray;}{{ site.baseurl }}/images/workbench-dashboard.png!
index 8d1aca63057f44f22abb5811c2ef73c8478e4129..93fc2c0f34e4a3da4388560b265d7c189da30fa3 100644 (file)
@@ -4,6 +4,10 @@ navsection: userguide
 title: "Using arv-run"
 ...
 
+{% include 'crunch1only_begin' %}
+On those sites, the features described here are not yet implemented.
+{% include 'crunch1only_end' %}
+
 The @arv-run@ command enables you create Arvados pipelines at the command line that fan out to multiple concurrent tasks across Arvados compute nodes.
 
 {% include 'tutorial_expectations' %}
index 9a2e12c09677beb59495b95404caea2c8622be5a..14c88d1311194b1b576f94209a6dcbf56469d494 100644 (file)
@@ -4,6 +4,10 @@ navsection: userguide
 title: "Running an Arvados pipeline"
 ...
 
+{% include 'crunch1only_begin' %}
+If the Jobs API is not available, use the "Common Workflow Language":{{site.baseurl}}/user/cwl/cwl-runner.html instead.
+{% include 'crunch1only_end' %}
+
 This tutorial demonstrates how to use the command line to run the same pipeline as described in "running a pipeline using Workbench.":{{site.baseurl}}/user/tutorials/tutorial-pipeline-workbench.html
 
 {% include 'tutorial_expectations' %}
index fac573aca9bdbc30a1f93b8f75e4af10e0818cfe..37a575c045049a3ed03c8106e943cb15eac1736e 100644 (file)
@@ -4,6 +4,10 @@ navsection: userguide
 title: "Running a pipeline using Workbench"
 ...
 
+{% include 'crunch1only_begin' %}
+On those sites, the details will be slightly different and the example pipeline might not be available.
+{% include 'crunch1only_end' %}
+
 A "pipeline" (sometimes called a "workflow" in other systems) is a sequence of steps that apply various programs or tools to transform input data to output data.  Pipelines are the principal means of performing computation with Arvados.  This tutorial demonstrates how to run a single-stage pipeline to take a small data set of paired-end reads from a sample "exome":https://en.wikipedia.org/wiki/Exome in "FASTQ":https://en.wikipedia.org/wiki/FASTQ_format format and align them to "Chromosome 19":https://en.wikipedia.org/wiki/Chromosome_19_%28human%29 using the "bwa mem":http://bio-bwa.sourceforge.net/ tool, producing a "Sequence Alignment/Map (SAM)":https://samtools.github.io/ file.  This tutorial will introduce the following Arvados features:
 
 <div>
@@ -17,7 +21,7 @@ notextile. <div class="spaced-out">
 h3. Steps
 
 # Start from the *Workbench Dashboard*.  You can access the Dashboard by clicking on *<i class="fa fa-lg fa-fw fa-dashboard"></i> Dashboard* in the upper left corner of any Workbench page.
-# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a pipeline...</span> button.  This will open a dialog box titled *Choose a pipeline to run*.
+# Click on the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button.  This will open a dialog box titled *Choose a pipeline to run*.
 # In the search box, type in *Tutorial align using bwa mem*.
 # Select *<i class="fa fa-fw fa-gear"></i> Tutorial align using bwa mem* and click the <span class="btn btn-sm btn-primary" >Next: choose inputs <i class="fa fa-fw fa-arrow-circle-right"></i></span> button.  This will create a new pipeline in your *Home* project and will open it. You can now supply the inputs for the pipeline.
 # The first input parameter to the pipeline is *"reference_collection" parameter for run-command script in bwa-mem component*.  Click the <span class="btn btn-sm btn-primary">Choose</span> button beneath that header.  This will open a dialog box titled *Choose a dataset for "reference_collection" parameter for run-command script in bwa-mem component*.
index be14be9d4adb6b2f76406912cf92057d055aea25..358743608b1f7f5e796e5d3f3d90e3c9dc6f8cb6 100755 (executable)
@@ -1509,7 +1509,7 @@ sub preprocess_stderr
     my $line = $1;
     substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
     Log ($jobstepidx, "stderr $line");
-    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
+    if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/i) {
       # If the allocation is revoked, we can't possibly continue, so mark all
       # nodes as failed.  This will cause the overall exit code to be
       # EX_RETRY_UNLOCKED instead of failure so that crunch_dispatch can re-run
@@ -1519,14 +1519,14 @@ sub preprocess_stderr
         $st->{node}->{fail_count}++;
       }
     }
-    elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
+    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
       if (defined($job_slot_index)) {
         $slot[$job_slot_index]->{node}->{fail_count}++;
         ban_node_by_slot($job_slot_index);
       }
     }
-    elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
+    elsif ($line =~ /srun: error: (Unable to create job step|.*?: Communication connection failure)/i) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
       ban_node_by_slot($job_slot_index) if (defined($job_slot_index));
     }
index b3d47dd8d05e5981ae4f645fa9c968ee7707e747..9cabedf7f77a15532ad1379d638d5ca9a4169e44 100644 (file)
@@ -139,34 +139,43 @@ class ArvCwlRunner(object):
         Runs in a separate thread.
         """
 
-        while True:
-            self.stop_polling.wait(15)
-            if self.stop_polling.is_set():
-                break
-            with self.lock:
-                keys = self.processes.keys()
-            if not keys:
-                continue
+        try:
+            while True:
+                self.stop_polling.wait(15)
+                if self.stop_polling.is_set():
+                    break
+                with self.lock:
+                    keys = self.processes.keys()
+                if not keys:
+                    continue
 
-            if self.work_api == "containers":
-                table = self.poll_api.containers()
-            elif self.work_api == "jobs":
-                table = self.poll_api.jobs()
+                if self.work_api == "containers":
+                    table = self.poll_api.containers()
+                elif self.work_api == "jobs":
+                    table = self.poll_api.jobs()
 
-            try:
-                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.warn("Error checking states on API server: %s", e)
-                continue
-
-            for p in proc_states["items"]:
-                self.on_message({
-                    "object_uuid": p["uuid"],
-                    "event_type": "update",
-                    "properties": {
-                        "new_attributes": p
-                    }
-                })
+                try:
+                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    logger.warn("Error checking states on API server: %s", e)
+                    continue
+
+                for p in proc_states["items"]:
+                    self.on_message({
+                        "object_uuid": p["uuid"],
+                        "event_type": "update",
+                        "properties": {
+                            "new_attributes": p
+                        }
+                    })
+        except:
+            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+            self.cond.acquire()
+            self.processes.clear()
+            self.cond.notify()
+            self.cond.release()
+        finally:
+            self.stop_polling.set()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -287,22 +296,26 @@ class ArvCwlRunner(object):
 
         tool.visit(self.check_writable)
 
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+        self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
                                                                  api_client=self.api,
                                                                  keep_client=self.keep_client)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
-        if kwargs.get("create_template"):
-            tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
-            tmpl.save()
-            # cwltool.main will write our return value to stdout.
-            return tmpl.uuid
-
-        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
-            return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+        existing_uuid = kwargs.get("update_workflow")
+        if existing_uuid or kwargs.get("create_workflow"):
+            if self.work_api == "jobs":
+                tmpl = RunnerTemplate(self, tool, job_order,
+                                      kwargs.get("enable_reuse"),
+                                      uuid=existing_uuid)
+                tmpl.save()
+                # cwltool.main will write our return value to stdout.
+                return tmpl.uuid
+            else:
+                return upload_workflow(self, tool, job_order,
+                                       self.project_uuid,
+                                       uuid=existing_uuid)
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -374,6 +387,10 @@ class ArvCwlRunner(object):
             loopperf.__enter__()
             for runnable in jobiter:
                 loopperf.__exit__()
+
+                if self.stop_polling.is_set():
+                    break
+
                 if runnable:
                     with Perf(metrics, "run"):
                         runnable.run(**kwargs)
@@ -395,7 +412,7 @@ class ArvCwlRunner(object):
             if sys.exc_info()[0] is KeyboardInterrupt:
                 logger.error("Interrupted, marking pipeline as failed")
             else:
-                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
@@ -488,9 +505,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="submit")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
-    exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
-    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
-    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
+    exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
+                         dest="create_workflow")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -528,7 +546,21 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+
+    if arvargs.update_workflow:
+        if arvargs.update_workflow.find('-7fd4e-') == 5:
+            want_api = 'containers'
+        elif arvargs.update_workflow.find('-p5p6p-') == 5:
+            want_api = 'jobs'
+        else:
+            want_api = None
+        if want_api and arvargs.work_api and want_api != arvargs.work_api:
+            logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+                arvargs.update_workflow, want_api, arvargs.work_api))
+            return 1
+        arvargs.work_api = want_api
+
+    if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     add_arv_hints()
@@ -554,6 +586,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     arvargs.conformance_test = None
     arvargs.use_container = True
+    arvargs.relax_path_checks = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
index e7cd617baee8d063da303aac89a93f434234c551..1fda4122172c06f926ec89941868be4914be0b13 100644 (file)
@@ -151,10 +151,17 @@ class ArvadosContainer(object):
                 if record["output"]:
                     outputs = done.done(self, record, "/tmp", self.outdir, "/keep")
             except WorkflowException as e:
-                logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("Error while collecting output for container %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
             except Exception as e:
-                logger.exception("Got unknown exception while collecting job outputs:")
+                logger.exception("Got unknown exception while collecting output for container %s:", self.name)
+                processStatus = "permanentFail"
+
+            # Note: Currently, on error output_callback is expecting an empty dict,
+            # anything else will fail.
+            if not isinstance(outputs, dict):
+                logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
+                outputs = {}
                 processStatus = "permanentFail"
 
             self.output_callback(outputs, processStatus)
index b9691d215c4d46e071d13740c2d3c90b9f7d1a81..7f6ab587d323a7dc65e39c00e8e1b38d019f009d 100644 (file)
@@ -36,7 +36,10 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         cwltool.docker.get_image(dockerRequirement, pull_image)
 
         # Upload image to Arvados
-        args = ["--project-uuid="+project_uuid, image_name]
+        args = []
+        if project_uuid:
+            args.append("--project-uuid="+project_uuid)
+        args.append(image_name)
         if image_tag:
             args.append(image_tag)
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
index 4db23b98a961904675727a13c33bf91cd3aa1f55..1afb9afc0e4cf4a888ad34b73cb0a5d7713f5ef9 100644 (file)
@@ -139,7 +139,7 @@ class ArvadosJob(object):
                 with Perf(metrics, "done %s" % self.name):
                     self.done(response)
         except Exception as e:
-            logger.error("Got error %s" % str(e))
+            logger.exception("Job %s error" % (self.name))
             self.output_callback({}, "permanentFail")
 
     def update_pipeline_component(self, record):
@@ -204,13 +204,18 @@ class ArvadosJob(object):
                         outputs = done.done(self, record, dirs["tmpdir"],
                                             dirs["outdir"], dirs["keep"])
             except WorkflowException as e:
-                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                logger.error("Error while collecting output for job %s:\n%s", self.name, e, exc_info=(e if self.arvrunner.debug else False))
                 processStatus = "permanentFail"
-                outputs = None
             except Exception as e:
-                logger.exception("Got unknown exception while collecting job outputs:")
+                logger.exception("Got unknown exception while collecting output for job %s:", self.name)
+                processStatus = "permanentFail"
+
+            # Note: Currently, on error output_callback is expecting an empty dict,
+            # anything else will fail.
+            if not isinstance(outputs, dict):
+                logger.error("Unexpected output type %s '%s'", type(outputs), outputs)
+                outputs = {}
                 processStatus = "permanentFail"
-                outputs = None
 
             self.output_callback(outputs, processStatus)
         finally:
@@ -303,7 +308,7 @@ class RunnerTemplate(object):
         'string': 'text',
     }
 
-    def __init__(self, runner, tool, job_order, enable_reuse):
+    def __init__(self, runner, tool, job_order, enable_reuse, uuid):
         self.runner = runner
         self.tool = tool
         self.job = RunnerJob(
@@ -313,6 +318,7 @@ class RunnerTemplate(object):
             enable_reuse=enable_reuse,
             output_name=None,
             output_tags=None)
+        self.uuid = uuid
 
     def pipeline_component_spec(self):
         """Return a component that Workbench and a-r-p-i will understand.
@@ -375,13 +381,21 @@ class RunnerTemplate(object):
         return spec
 
     def save(self):
-        job_spec = self.pipeline_component_spec()
-        response = self.runner.api.pipeline_templates().create(body={
+        body = {
             "components": {
-                self.job.name: job_spec,
+                self.job.name: self.pipeline_component_spec(),
             },
             "name": self.job.name,
-            "owner_uuid": self.runner.project_uuid,
-        }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
-        self.uuid = response["uuid"]
-        logger.info("Created template %s", self.uuid)
+        }
+        if self.runner.project_uuid:
+            body["owner_uuid"] = self.runner.project_uuid
+        if self.uuid:
+            self.runner.api.pipeline_templates().update(
+                uuid=self.uuid, body=body).execute(
+                    num_retries=self.runner.num_retries)
+            logger.info("Updated template %s", self.uuid)
+        else:
+            self.uuid = self.runner.api.pipeline_templates().create(
+                body=body, ensure_unique_name=True).execute(
+                    num_retries=self.runner.num_retries)['uuid']
+            logger.info("Created template %s", self.uuid)
index ce633d43285a537268f3bc96dc446696d17d06a6..f425ae8df9d2478a566c3dc47aea7e7e1c678d45 100644 (file)
@@ -18,7 +18,7 @@ from .perf import Perf
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
-def upload_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
+def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None):
     upload_docker(arvRunner, tool)
 
     document_loader, workflowobj, uri = (tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]), tool.tool["id"])
@@ -39,16 +39,18 @@ def upload_workflow(arvRunner, tool, job_order, project_uuid, update_uuid):
 
     body = {
         "workflow": {
-            "owner_uuid": project_uuid,
             "name": tool.tool.get("label", name),
             "description": tool.tool.get("doc", ""),
             "definition":yaml.safe_dump(packed)
         }}
+    if project_uuid:
+        body["workflow"]["owner_uuid"] = project_uuid
 
-    if update_uuid:
-        return arvRunner.api.workflows().update(uuid=update_uuid, body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
+    if uuid:
+        call = arvRunner.api.workflows().update(uuid=uuid, body=body)
     else:
-        return arvRunner.api.workflows().create(body=body).execute(num_retries=arvRunner.num_retries)["uuid"]
+        call = arvRunner.api.workflows().create(body=body)
+    return call.execute(num_retries=arvRunner.num_retries)["uuid"]
 
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
index 5cc447e9a3bad9202d9e77fb53919dcc66b804c8..3bbcb8b091763cd565051c23a65ce13d1069f08f 100644 (file)
@@ -195,7 +195,7 @@ class Runner(object):
         else:
             processStatus = "permanentFail"
 
-        outputs = None
+        outputs = {}
         try:
             try:
                 self.final_output = record["output"]
@@ -212,7 +212,7 @@ class Runner(object):
                 adjustFileObjs(outputs, keepify)
                 adjustDirObjs(outputs, keepify)
             except Exception as e:
-                logger.error("While getting final output object: %s", e)
+                logger.exception("While getting final output object: %s", e)
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
             del self.arvrunner.processes[record["uuid"]]
index 9d9a1e1a7acf99f46d61d96de384681da114925a..7751644f51795e60271759287290381919f3dc4d 100644 (file)
@@ -48,7 +48,7 @@ setup(name='arvados-cwl-runner',
       # Make sure to update arvados/build/run-build-packages.sh as well
       # when updating the cwltool version pin.
       install_requires=[
-          'cwltool==1.0.20161107145355',
+          'cwltool==1.0.20161128202906',
           'arvados-python-client>=0.1.20160826210445'
       ],
       data_files=[
index 085509fbb17b334fc24d8c3bcdad99fa1068c416..c4b0ceab0e219856a5f3bdec125d3ce06bbb8921 100644 (file)
@@ -131,6 +131,7 @@ def stubs(func):
         stubs.expect_pipeline_instance = {
             'name': 'submit_wf.cwl',
             'state': 'RunningOnServer',
+            'owner_uuid': None,
             "components": {
                 "cwl-runner": {
                     'runtime_constraints': {'docker_image': 'arvados/jobs:'+arvados_cwl.__version__},
@@ -183,7 +184,7 @@ def stubs(func):
                 }
             },
             'state': 'Committed',
-            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+            'owner_uuid': None,
             'command': ['arvados-cwl-runner', '--local', '--api=containers', '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
@@ -200,6 +201,13 @@ def stubs(func):
         stubs.api.workflows().create().execute.return_value = {
             "uuid": stubs.expect_workflow_uuid,
         }
+        def update_mock(**kwargs):
+            stubs.updated_uuid = kwargs.get('uuid')
+            return mock.DEFAULT
+        stubs.api.workflows().update.side_effect = update_mock
+        stubs.api.workflows().update().execute.side_effect = lambda **kwargs: {
+            "uuid": stubs.updated_uuid,
+        }
 
         return func(self, stubs, *args, **kwargs)
     return wrapped
@@ -223,13 +231,12 @@ class TestSubmit(unittest.TestCase):
                 './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
                 '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
                 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
-                'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                'owner_uuid': None,
                 'name': 'submit_wf.cwl',
             }, ensure_unique_name=True),
             mock.call().execute(),
             mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
                             '0:0:blub.txt 0:0:submit_tool.cwl\n',
-                            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
                             'replication_desired': None,
                             'name': 'New collection'
             }, ensure_unique_name=True),
@@ -237,13 +244,12 @@ class TestSubmit(unittest.TestCase):
             mock.call(body={
                 'manifest_text':
                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
-                'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                'owner_uuid': None,
                 'name': '#',
             }, ensure_unique_name=True),
             mock.call().execute()])
 
         expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
-        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.pipeline_instances().create.assert_called_with(
             body=expect_pipeline)
         self.assertEqual(capture_stdout.getvalue(),
@@ -263,7 +269,6 @@ class TestSubmit(unittest.TestCase):
         stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:enable_reuse"] = {"value": False}
 
         expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
-        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.pipeline_instances().create.assert_called_with(
             body=expect_pipeline)
         self.assertEqual(capture_stdout.getvalue(),
@@ -284,7 +289,6 @@ class TestSubmit(unittest.TestCase):
         stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_name"] = output_name
 
         expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
-        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.pipeline_instances().create.assert_called_with(
             body=expect_pipeline)
         self.assertEqual(capture_stdout.getvalue(),
@@ -305,7 +309,6 @@ class TestSubmit(unittest.TestCase):
         stubs.expect_pipeline_instance["components"]["cwl-runner"]["script_parameters"]["arv:output_tags"] = output_tags
 
         expect_pipeline = copy.deepcopy(stubs.expect_pipeline_instance)
-        expect_pipeline["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.pipeline_instances().create.assert_called_with(
             body=expect_pipeline)
         self.assertEqual(capture_stdout.getvalue(),
@@ -347,13 +350,12 @@ class TestSubmit(unittest.TestCase):
                 './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
                 '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
                 'cc2ffb940e60adf1b2b282c67587e43d+413 0:413:submit_wf.cwl\n',
-                'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                'owner_uuid': None,
                 'name': 'submit_wf.cwl',
             }, ensure_unique_name=True),
             mock.call().execute(),
             mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
                             '0:0:blub.txt 0:0:submit_tool.cwl\n',
-                            'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
                             'name': 'New collection',
                             'replication_desired': None,
             }, ensure_unique_name=True),
@@ -361,13 +363,12 @@ class TestSubmit(unittest.TestCase):
             mock.call(body={
                 'manifest_text':
                 '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
-                'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+                'owner_uuid': None,
                 'name': '#',
             }, ensure_unique_name=True),
             mock.call().execute()])
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
-        expect_container["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.container_requests().create.assert_called_with(
             body=expect_container)
         self.assertEqual(capture_stdout.getvalue(),
@@ -388,7 +389,6 @@ class TestSubmit(unittest.TestCase):
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--disable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
-        expect_container["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.container_requests().create.assert_called_with(
             body=expect_container)
         self.assertEqual(capture_stdout.getvalue(),
@@ -411,7 +411,6 @@ class TestSubmit(unittest.TestCase):
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-name="+output_name, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
-        expect_container["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.container_requests().create.assert_called_with(
             body=expect_container)
         self.assertEqual(capture_stdout.getvalue(),
@@ -434,7 +433,6 @@ class TestSubmit(unittest.TestCase):
         stubs.expect_container_spec["command"] = ['arvados-cwl-runner', '--local', '--api=containers', "--output-tags="+output_tags, '--enable-reuse', '/var/lib/cwl/workflow/submit_wf.cwl', '/var/lib/cwl/job/cwl.input.json']
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
-        expect_container["owner_uuid"] = stubs.fake_user_uuid
         stubs.api.container_requests().create.assert_called_with(
             body=expect_container)
         self.assertEqual(capture_stdout.getvalue(),
@@ -491,7 +489,7 @@ class TestCreateTemplate(unittest.TestCase):
         capture_stdout = cStringIO.StringIO()
 
         exited = arvados_cwl.main(
-            ["--create-template", "--debug",
+            ["--create-workflow", "--debug",
              "--project-uuid", project_uuid,
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             capture_stdout, sys.stderr, api_client=stubs.api)
@@ -533,6 +531,9 @@ class TestCreateTemplate(unittest.TestCase):
 
 
 class TestCreateWorkflow(unittest.TestCase):
+    existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
+    expect_workflow = open("tests/wf/expect_packed.cwl").read()
+
     @stubs
     def test_create(self, stubs):
         project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
@@ -541,6 +542,7 @@ class TestCreateWorkflow(unittest.TestCase):
 
         exited = arvados_cwl.main(
             ["--create-workflow", "--debug",
+             "--api=containers",
              "--project-uuid", project_uuid,
              "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
             capture_stdout, sys.stderr, api_client=stubs.api)
@@ -549,16 +551,13 @@ class TestCreateWorkflow(unittest.TestCase):
         stubs.api.pipeline_templates().create.refute_called()
         stubs.api.container_requests().create.refute_called()
 
-        with open("tests/wf/expect_packed.cwl") as f:
-            expect_workflow = f.read()
-
         body = {
             "workflow": {
                 "owner_uuid": project_uuid,
                 "name": "submit_wf.cwl",
                 "description": "",
-                "definition": expect_workflow
-                }
+                "definition": self.expect_workflow,
+            }
         }
         stubs.api.workflows().create.assert_called_with(
             body=JsonDiffMatcher(body))
@@ -566,6 +565,47 @@ class TestCreateWorkflow(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_workflow_uuid + '\n')
 
+    @stubs
+    def test_incompatible_api(self, stubs):
+        capture_stderr = cStringIO.StringIO()
+        logging.getLogger('arvados.cwl-runner').addHandler(
+            logging.StreamHandler(capture_stderr))
+
+        exited = arvados_cwl.main(
+            ["--update-workflow", self.existing_workflow_uuid,
+             "--api=jobs",
+             "--debug",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            sys.stderr, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 1)
+        self.assertRegexpMatches(
+            capture_stderr.getvalue(),
+            "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid))
+
+    @stubs
+    def test_update(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+
+        exited = arvados_cwl.main(
+            ["--update-workflow", self.existing_workflow_uuid,
+             "--debug",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api)
+        self.assertEqual(exited, 0)
+
+        body = {
+            "workflow": {
+                "name": "submit_wf.cwl",
+                "description": "",
+                "definition": self.expect_workflow,
+            }
+        }
+        stubs.api.workflows().update.assert_called_with(
+            uuid=self.existing_workflow_uuid,
+            body=JsonDiffMatcher(body))
+        self.assertEqual(capture_stdout.getvalue(),
+                         self.existing_workflow_uuid + '\n')
+
 
 class TestTemplateInputs(unittest.TestCase):
     expect_template = {
@@ -622,11 +662,8 @@ class TestTemplateInputs(unittest.TestCase):
             cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
 
-        expect_template = copy.deepcopy(self.expect_template)
-        expect_template["owner_uuid"] = stubs.fake_user_uuid
-
         stubs.api.pipeline_templates().create.assert_called_with(
-            body=JsonDiffMatcher(expect_template), ensure_unique_name=True)
+            body=JsonDiffMatcher(self.expect_template), ensure_unique_name=True)
 
     @stubs
     def test_inputs(self, stubs):
@@ -636,10 +673,7 @@ class TestTemplateInputs(unittest.TestCase):
             cStringIO.StringIO(), sys.stderr, api_client=stubs.api)
         self.assertEqual(exited, 0)
 
-        self.expect_template["owner_uuid"] = stubs.fake_user_uuid
-
         expect_template = copy.deepcopy(self.expect_template)
-        expect_template["owner_uuid"] = stubs.fake_user_uuid
         params = expect_template[
             "components"]["inputs_test.cwl"]["script_parameters"]
         params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
diff --git a/sdk/go/httpserver/id_generator.go b/sdk/go/httpserver/id_generator.go
new file mode 100644 (file)
index 0000000..c2830f7
--- /dev/null
@@ -0,0 +1,31 @@
+package httpserver
+
+import (
+       "strconv"
+       "sync"
+       "time"
+)
+
+// IDGenerator generates alphanumeric strings suitable for use as
+// unique IDs (a given IDGenerator will never return the same ID
+// twice).
+type IDGenerator struct {
+       // Prefix is prepended to each returned ID.
+       Prefix string
+
+       lastID int64
+       mtx    sync.Mutex
+}
+
+// Next returns a new ID string. It is safe to call Next from multiple
+// goroutines.
+func (g *IDGenerator) Next() string {
+       id := time.Now().UnixNano()
+       g.mtx.Lock()
+       if id <= g.lastID {
+               id = g.lastID + 1
+       }
+       g.lastID = id
+       g.mtx.Unlock()
+       return g.Prefix + strconv.FormatInt(id, 36)
+}
index 178ffb90f4facbebdfd6809bb1448e84904bc82f..ee35f4748b78ecfabac5c431ac5ad73340e4f300 100644 (file)
@@ -4,18 +4,42 @@ import (
        "net/http"
 )
 
+// RequestCounter is an http.Handler that tracks the number of
+// requests in progress.
+type RequestCounter interface {
+       http.Handler
+
+       // Current() returns the number of requests in progress.
+       Current() int
+
+       // Max() returns the maximum number of concurrent requests
+       // that will be accepted.
+       Max() int
+}
+
 type limiterHandler struct {
        requests chan struct{}
        handler  http.Handler
 }
 
-func NewRequestLimiter(maxRequests int, handler http.Handler) http.Handler {
+// NewRequestLimiter returns a RequestCounter that delegates up to
+// maxRequests at a time to the given handler, and responds 503 to all
+// incoming requests beyond that limit.
+func NewRequestLimiter(maxRequests int, handler http.Handler) RequestCounter {
        return &limiterHandler{
                requests: make(chan struct{}, maxRequests),
                handler:  handler,
        }
 }
 
+func (h *limiterHandler) Current() int {
+       return len(h.requests)
+}
+
+func (h *limiterHandler) Max() int {
+       return cap(h.requests)
+}
+
 func (h *limiterHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
        select {
        case h.requests <- struct{}{}:
index 610fd7dc1317b6f0a6af7672d148766dbf9ce961..517d617d8c4f8403953b5d0b105808e0bd18ac0d 100644 (file)
@@ -516,7 +516,7 @@ class _BlockManager(object):
                     return
                 self._keep.get(b)
             except Exception:
-                pass
+                _logger.exception("Exception doing block prefetch")
 
     @synchronized
     def start_get_threads(self):
index db7835be3746f8f67eddd61d2aac505356e601f4..c98947945669338384147d9a8a0baf6917c43db9 100644 (file)
@@ -511,8 +511,10 @@ class KeepClient(object):
             with self.successful_copies_lock:
                 self.successful_copies += replicas_nr
                 self.response = response
+            with self.pending_tries_notification:
+                self.pending_tries_notification.notify_all()
         
-        def write_fail(self, ks, status_code):
+        def write_fail(self, ks):
             with self.pending_tries_notification:
                 self.pending_tries += 1
                 self.pending_tries_notification.notify()
@@ -520,8 +522,36 @@ class KeepClient(object):
         def pending_copies(self):
             with self.successful_copies_lock:
                 return self.wanted_copies - self.successful_copies
-    
-    
+
+        def get_next_task(self):
+            with self.pending_tries_notification:
+                while True:
+                    if self.pending_copies() < 1:
+                        # This notify_all() is unnecessary --
+                        # write_success() already called notify_all()
+                        # when pending<1 became true, so it's not
+                        # possible for any other thread to be in
+                        # wait() now -- but it's cheap insurance
+                        # against deadlock so we do it anyway:
+                        self.pending_tries_notification.notify_all()
+                        # Drain the queue and then raise Queue.Empty
+                        while True:
+                            self.get_nowait()
+                            self.task_done()
+                    elif self.pending_tries > 0:
+                        service, service_root = self.get_nowait()
+                        if service.finished():
+                            self.task_done()
+                            continue
+                        self.pending_tries -= 1
+                        return service, service_root
+                    elif self.empty():
+                        self.pending_tries_notification.notify_all()
+                        raise Queue.Empty
+                    else:
+                        self.pending_tries_notification.wait()
+
+
     class KeepWriterThreadPool(object):
         def __init__(self, data, data_hash, copies, max_service_replicas, timeout=None):
             self.total_task_nr = 0
@@ -551,74 +581,64 @@ class KeepClient(object):
                 worker.start()
             # Wait for finished work
             self.queue.join()
-            with self.queue.pending_tries_notification:
-                self.queue.pending_tries_notification.notify_all()
-            for worker in self.workers:
-                worker.join()
         
         def response(self):
             return self.queue.response
     
     
     class KeepWriterThread(threading.Thread):
+        TaskFailed = RuntimeError()
+
         def __init__(self, queue, data, data_hash, timeout=None):
             super(KeepClient.KeepWriterThread, self).__init__()
             self.timeout = timeout
             self.queue = queue
             self.data = data
             self.data_hash = data_hash
-        
+            self.daemon = True
+
         def run(self):
-            while not self.queue.empty():
-                if self.queue.pending_copies() > 0:
-                    # Avoid overreplication, wait for some needed re-attempt
-                    with self.queue.pending_tries_notification:
-                        if self.queue.pending_tries <= 0:
-                            self.queue.pending_tries_notification.wait()
-                            continue # try again when awake
-                        self.queue.pending_tries -= 1
-
-                    # Get to work
-                    try:
-                        service, service_root = self.queue.get_nowait()
-                    except Queue.Empty:
-                        continue
-                    if service.finished():
-                        self.queue.task_done()
-                        continue
-                    success = bool(service.put(self.data_hash,
-                                                self.data,
-                                                timeout=self.timeout))
-                    result = service.last_result()
-                    if success:
-                        _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
-                                      str(threading.current_thread()),
-                                      self.data_hash,
-                                      len(self.data),
-                                      service_root)
-                        try:
-                            replicas_stored = int(result['headers']['x-keep-replicas-stored'])
-                        except (KeyError, ValueError):
-                            replicas_stored = 1
-                        
-                        self.queue.write_success(result['body'].strip(), replicas_stored)
-                    else:
-                        if result.get('status_code', None):
-                            _logger.debug("Request fail: PUT %s => %s %s",
-                                          self.data_hash,
-                                          result['status_code'],
-                                          result['body'])
-                        self.queue.write_fail(service, result.get('status_code', None)) # Schedule a re-attempt with next service
-                    # Mark as done so the queue can be join()ed
-                    self.queue.task_done()
+            while True:
+                try:
+                    service, service_root = self.queue.get_next_task()
+                except Queue.Empty:
+                    return
+                try:
+                    locator, copies = self.do_task(service, service_root)
+                except Exception as e:
+                    if e is not self.TaskFailed:
+                        _logger.exception("Exception in KeepWriterThread")
+                    self.queue.write_fail(service)
                 else:
-                    # Remove the task from the queue anyways
-                    try:
-                        self.queue.get_nowait()
-                        # Mark as done so the queue can be join()ed
-                        self.queue.task_done()
-                    except Queue.Empty:
-                        continue
+                    self.queue.write_success(locator, copies)
+                finally:
+                    self.queue.task_done()
+
+        def do_task(self, service, service_root):
+            success = bool(service.put(self.data_hash,
+                                        self.data,
+                                        timeout=self.timeout))
+            result = service.last_result()
+
+            if not success:
+                if result.get('status_code', None):
+                    _logger.debug("Request fail: PUT %s => %s %s",
+                                  self.data_hash,
+                                  result['status_code'],
+                                  result['body'])
+                raise self.TaskFailed
+
+            _logger.debug("KeepWriterThread %s succeeded %s+%i %s",
+                          str(threading.current_thread()),
+                          self.data_hash,
+                          len(self.data),
+                          service_root)
+            try:
+                replicas_stored = int(result['headers']['x-keep-replicas-stored'])
+            except (KeyError, ValueError):
+                replicas_stored = 1
+
+            return result['body'].strip(), replicas_stored
 
 
     def __init__(self, api_client=None, proxy=None,
index 7ce4dc93fc7d070e6a5da40adf8fe26ce13c4b33..7e8c84ec11279495d55fd47770378847886ae76e 100644 (file)
@@ -17,6 +17,8 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
     TIME_FUTURE = time.time()+3600
     MOCK_WS_URL = 'wss://[{}]/'.format(arvados_testutil.TEST_HOST)
 
+    TEST_TIMEOUT = 10.0
+
     def setUp(self):
         self.ws = None
 
@@ -263,20 +265,16 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
     @mock.patch('arvados.events._EventClient')
     def test_run_forever_survives_reconnects(self, websocket_client):
-        connection_cond = threading.Condition()
-        def ws_connect():
-            with connection_cond:
-                connection_cond.notify_all()
-        websocket_client().connect.side_effect = ws_connect
+        connected = threading.Event()
+        websocket_client().connect.side_effect = connected.set
         client = arvados.events.EventClient(
             self.MOCK_WS_URL, [], lambda event: None, None)
-        with connection_cond:
-            forever_thread = threading.Thread(target=client.run_forever)
-            forever_thread.start()
-            # Simulate an unexpected disconnect, and wait for reconnect.
-            close_thread = threading.Thread(target=client.on_closed)
-            close_thread.start()
-            connection_cond.wait()
+        forever_thread = threading.Thread(target=client.run_forever)
+        forever_thread.start()
+        # Simulate an unexpected disconnect, and wait for reconnect.
+        close_thread = threading.Thread(target=client.on_closed)
+        close_thread.start()
+        self.assertTrue(connected.wait(timeout=self.TEST_TIMEOUT))
         close_thread.join()
         run_forever_alive = forever_thread.is_alive()
         client.close()
@@ -286,7 +284,10 @@ class WebsocketTest(run_test_server.TestCaseWithServers):
 
 
 class PollClientTestCase(unittest.TestCase):
+    TEST_TIMEOUT = 10.0
+
     class MockLogs(object):
+
         def __init__(self):
             self.logs = []
             self.lock = threading.Lock()
@@ -301,12 +302,11 @@ class PollClientTestCase(unittest.TestCase):
                 self.logs = []
             return {'items': retval, 'items_available': len(retval)}
 
-
     def setUp(self):
         self.logs = self.MockLogs()
         self.arv = mock.MagicMock(name='arvados.api()')
         self.arv.logs().list().execute.side_effect = self.logs.return_list
-        self.callback_cond = threading.Condition()
+        self.callback_called = threading.Event()
         self.recv_events = []
 
     def tearDown(self):
@@ -314,9 +314,8 @@ class PollClientTestCase(unittest.TestCase):
             self.client.close(timeout=None)
 
     def callback(self, event):
-        with self.callback_cond:
-            self.recv_events.append(event)
-            self.callback_cond.notify_all()
+        self.recv_events.append(event)
+        self.callback_called.set()
 
     def build_client(self, filters=None, callback=None, last_log_id=None, poll_time=99):
         if filters is None:
@@ -334,11 +333,11 @@ class PollClientTestCase(unittest.TestCase):
         test_log = {'id': 12345, 'testkey': 'testtext'}
         self.logs.add({'id': 123})
         self.build_client(poll_time=.01)
-        with self.callback_cond:
-            self.client.start()
-            self.callback_cond.wait()
-            self.logs.add(test_log.copy())
-            self.callback_cond.wait()
+        self.client.start()
+        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
+        self.callback_called.clear()
+        self.logs.add(test_log.copy())
+        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
         self.client.close(timeout=None)
         self.assertIn(test_log, self.recv_events)
 
@@ -346,9 +345,8 @@ class PollClientTestCase(unittest.TestCase):
         client_filter = ['kind', '=', 'arvados#test']
         self.build_client()
         self.client.subscribe([client_filter[:]])
-        with self.callback_cond:
-            self.client.start()
-            self.callback_cond.wait()
+        self.client.start()
+        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
         self.client.close(timeout=None)
         self.assertTrue(self.was_filter_used(client_filter))
 
@@ -363,11 +361,10 @@ class PollClientTestCase(unittest.TestCase):
 
     def test_run_forever(self):
         self.build_client()
-        with self.callback_cond:
-            self.client.start()
-            forever_thread = threading.Thread(target=self.client.run_forever)
-            forever_thread.start()
-            self.callback_cond.wait()
+        self.client.start()
+        forever_thread = threading.Thread(target=self.client.run_forever)
+        forever_thread.start()
+        self.assertTrue(self.callback_called.wait(self.TEST_TIMEOUT))
         self.assertTrue(forever_thread.is_alive())
         self.client.close()
         forever_thread.join()
index 908539b8cae010f1cf0f23046bdcaf1f15f136b0..85b5bc81f00902a2a816d606bbc2cecff06de289 100644 (file)
@@ -1081,58 +1081,74 @@ class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
             self.check_exception(copies=2, num_retries=3)
 
 
-class KeepClientAvoidClientOverreplicationTestCase(unittest.TestCase, tutil.ApiClientMock):
-    
-    
+class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
+
     class FakeKeepService(object):
-        def __init__(self, delay, will_succeed, replicas=1):
+        def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
             self.delay = delay
-            self.success = will_succeed
+            self.will_succeed = will_succeed
+            self.will_raise = will_raise
             self._result = {}
             self._result['headers'] = {}
             self._result['headers']['x-keep-replicas-stored'] = str(replicas)
             self._result['body'] = 'foobar'
-        
+
         def put(self, data_hash, data, timeout):
             time.sleep(self.delay)
-            return self.success
-        
+            if self.will_raise is not None:
+                raise self.will_raise
+            return self.will_succeed
+
         def last_result(self):
-            return self._result
-        
+            if self.will_succeed:
+                return self._result
+
         def finished(self):
             return False
     
-    
-    def test_only_write_enough_on_success(self):
-        copies = 3
-        pool = arvados.KeepClient.KeepWriterThreadPool(
+    def setUp(self):
+        self.copies = 3
+        self.pool = arvados.KeepClient.KeepWriterThreadPool(
             data = 'foo',
             data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
-            max_service_replicas = copies,
-            copies = copies
+            max_service_replicas = self.copies,
+            copies = self.copies
         )
+
+    def test_only_write_enough_on_success(self):
         for i in range(10):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
-            pool.add_task(ks, None)
-        pool.join()
-        self.assertEqual(pool.done(), copies)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
 
     def test_only_write_enough_on_partial_success(self):
-        copies = 3
-        pool = arvados.KeepClient.KeepWriterThreadPool(
-            data = 'foo',
-            data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
-            max_service_replicas = copies,
-            copies = copies
-        )
         for i in range(5):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
-            pool.add_task(ks, None)
+            self.pool.add_task(ks, None)
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
+
+    def test_only_write_enough_when_some_crash(self):
+        for i in range(5):
+            ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+            self.pool.add_task(ks, None)
+            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies)
+
+    def test_fail_when_too_many_crash(self):
+        for i in range(self.copies+1):
+            ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
+            self.pool.add_task(ks, None)
+        for i in range(self.copies-1):
             ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
-            pool.add_task(ks, None)
-        pool.join()
-        self.assertEqual(pool.done(), copies)
+            self.pool.add_task(ks, None)
+        self.pool.join()
+        self.assertEqual(self.pool.done(), self.copies-1)
     
 
 @tutil.skip_sleep
index 6e2848ceb53f34165379e3f1afa539d67dbb5651..ed04a4ba8f71c148d424db545c5d9bd7ddc69a7c 100644 (file)
@@ -4,4 +4,5 @@ class Arvados::V1::ContainerRequestsController < ApplicationController
   accept_attribute_as_json :runtime_constraints, Hash
   accept_attribute_as_json :command, Array
   accept_attribute_as_json :filters, Array
+  accept_attribute_as_json :scheduling_parameters, Hash
 end
index 7728ce6d536e90080be8a92d2b51753827e5342e..51f15ad84fd94c7c7834e952a14f5d75d6deaf04 100644 (file)
@@ -3,6 +3,7 @@ class Arvados::V1::ContainersController < ApplicationController
   accept_attribute_as_json :mounts, Hash
   accept_attribute_as_json :runtime_constraints, Hash
   accept_attribute_as_json :command, Array
+  accept_attribute_as_json :scheduling_parameters, Hash
 
   skip_before_filter :find_object_by_uuid, only: [:current]
   skip_before_filter :render_404_if_no_object, only: [:current]
index d6adbf08516a7c2c199cb240017b5e64ede195be..5d91a81074cdfe9e75df182132af4b17f1ff85e3 100644 (file)
@@ -68,9 +68,14 @@ class Arvados::V1::GroupsController < ApplicationController
      Collection,
      Human, Specimen, Trait]
 
-    table_names = klasses.map(&:table_name)
+    table_names = Hash[klasses.collect { |k| [k, k.table_name] }]
+
+    disabled_methods = Rails.configuration.disable_api_methods
+    avail_klasses = table_names.select{|k, t| !disabled_methods.include?(t+'.index')}
+    klasses = avail_klasses.keys
+
     request_filters.each do |col, op, val|
-      if col.index('.') && !table_names.include?(col.split('.', 2)[0])
+      if col.index('.') && !table_names.values.include?(col.split('.', 2)[0])
         raise ArgumentError.new("Invalid attribute '#{col}' in filter")
       end
     end
index 7dcfbe378b6700a88b41ae11b6a15e8a28a6fe20..f92fa21314824b4118ac8f725470fee2bee41ea6 100644 (file)
@@ -44,6 +44,8 @@ class ContainerRequest < ArvadosModel
     t.add :runtime_constraints
     t.add :state
     t.add :use_existing
+    t.add :output_uuid
+    t.add :log_uuid
     t.add :scheduling_parameters
   end
 
@@ -82,13 +84,14 @@ class ContainerRequest < ArvadosModel
   # Finalize the container request after the container has
   # finished/cancelled.
   def finalize!
-    update_attributes!(state: Final)
+    out_coll = nil
+    log_coll = nil
     c = Container.find_by_uuid(container_uuid)
     ['output', 'log'].each do |out_type|
       pdh = c.send(out_type)
       next if pdh.nil?
       manifest = Collection.where(portable_data_hash: pdh).first.manifest_text
-      Collection.create!(owner_uuid: owner_uuid,
+      coll = Collection.create!(owner_uuid: owner_uuid,
                          manifest_text: manifest,
                          portable_data_hash: pdh,
                          name: "Container #{out_type} for request #{uuid}",
@@ -96,7 +99,13 @@ class ContainerRequest < ArvadosModel
                            'type' => out_type,
                            'container_request' => uuid,
                          })
+      if out_type == 'output'
+        out_coll = coll.uuid
+      else
+        log_coll = coll.uuid
+      end
     end
+    update_attributes!(state: Final, output_uuid: out_coll, log_uuid: log_coll)
   end
 
   protected
@@ -279,7 +288,7 @@ class ContainerRequest < ArvadosModel
         permitted.push :command, :container_image, :cwd, :description, :environment,
                        :filters, :mounts, :name, :output_path, :properties,
                        :requesting_container_uuid, :runtime_constraints,
-                       :state, :container_uuid, :scheduling_parameters
+                       :state, :container_uuid, :use_existing, :scheduling_parameters
       end
 
     when Final
@@ -287,8 +296,8 @@ class ContainerRequest < ArvadosModel
         errors.add :state, "of container request can only be set to Final by system."
       end
 
-      if self.state_changed? || self.name_changed? || self.description_changed?
-          permitted.push :state, :name, :description
+      if self.state_changed? || self.name_changed? || self.description_changed? || self.output_uuid_changed? || self.log_uuid_changed?
+          permitted.push :state, :name, :description, :output_uuid, :log_uuid
       else
         errors.add :state, "does not allow updates"
       end
diff --git a/services/api/db/migrate/20161115171221_add_output_and_log_uuid_to_container_request.rb b/services/api/db/migrate/20161115171221_add_output_and_log_uuid_to_container_request.rb
new file mode 100644 (file)
index 0000000..e38bf7c
--- /dev/null
@@ -0,0 +1,22 @@
+require 'has_uuid'
+
+class AddOutputAndLogUuidToContainerRequest < ActiveRecord::Migration
+  extend HasUuid::ClassMethods
+
+  def up
+    add_column :container_requests, :output_uuid, :string
+    add_column :container_requests, :log_uuid, :string
+
+    no_such_out_coll = Server::Application.config.uuid_prefix + '-' + '4zz18' + '-xxxxxxxxxxxxxxx'
+    no_such_log_coll = Server::Application.config.uuid_prefix + '-' + '4zz18' + '-yyyyyyyyyyyyyyy'
+
+    update_sql <<-EOS
+update container_requests set output_uuid = ('#{no_such_out_coll}'), log_uuid = ('#{no_such_log_coll}');
+EOS
+  end
+
+  def down
+    remove_column :container_requests, :log_uuid
+    remove_column :container_requests, :output_uuid
+  end
+end
diff --git a/services/api/db/migrate/20161115174218_add_output_and_log_uuids_to_container_request_search_index.rb b/services/api/db/migrate/20161115174218_add_output_and_log_uuids_to_container_request_search_index.rb
new file mode 100644 (file)
index 0000000..b069d02
--- /dev/null
@@ -0,0 +1,21 @@
+class AddOutputAndLogUuidsToContainerRequestSearchIndex < ActiveRecord::Migration
+  def up
+    begin
+      remove_index :container_requests, :name => 'container_requests_search_index'
+    rescue
+    end
+    add_index :container_requests,
+              ["uuid", "owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "name", "state", "requesting_container_uuid", "container_uuid", "container_image", "cwd", "output_path", "output_uuid", "log_uuid"],
+              name: "container_requests_search_index"
+  end
+
+  def down
+    begin
+      remove_index :container_requests, :name => 'container_requests_search_index'
+    rescue
+    end
+         add_index :container_requests,
+              ["uuid", "owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "name", "state", "requesting_container_uuid", "container_uuid", "container_image", "cwd", "output_path"],
+              name: "container_requests_search_index"
+  end
+end
index 1d3d238c837611e2858dbfd0959cfc7373a41917..e715cd60c4fcc0bb56e7a3df0ca7bbf01362cf9a 100644 (file)
@@ -292,7 +292,9 @@ CREATE TABLE container_requests (
     updated_at timestamp without time zone NOT NULL,
     container_count integer DEFAULT 0,
     use_existing boolean DEFAULT true,
-    scheduling_parameters text
+    scheduling_parameters text,
+    output_uuid character varying(255),
+    log_uuid character varying(255)
 );
 
 
@@ -1525,7 +1527,7 @@ CREATE INDEX container_requests_full_text_search_idx ON container_requests USING
 -- Name: container_requests_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX container_requests_search_index ON container_requests USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, name, state, requesting_container_uuid, container_uuid, container_image, cwd, output_path);
+CREATE INDEX container_requests_search_index ON container_requests USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, name, state, requesting_container_uuid, container_uuid, container_image, cwd, output_path, output_uuid, log_uuid);
 
 
 --
@@ -2698,4 +2700,8 @@ INSERT INTO schema_migrations (version) VALUES ('20160926194129');
 
 INSERT INTO schema_migrations (version) VALUES ('20161019171346');
 
-INSERT INTO schema_migrations (version) VALUES ('20161111143147');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20161111143147');
+
+INSERT INTO schema_migrations (version) VALUES ('20161115171221');
+
+INSERT INTO schema_migrations (version) VALUES ('20161115174218');
\ No newline at end of file
diff --git a/services/api/test/functional/arvados/v1/container_requests_controller_test.rb b/services/api/test/functional/arvados/v1/container_requests_controller_test.rb
new file mode 100644 (file)
index 0000000..e54e15d
--- /dev/null
@@ -0,0 +1,22 @@
+require 'test_helper'
+
+class Arvados::V1::ContainerRequestsControllerTest < ActionController::TestCase
+  test 'create with scheduling parameters' do
+    authorize_with :system_user
+
+    sp = {'partitions' => ['test1', 'test2']}
+    post :create, {
+      container_request: {
+        command: ['echo', 'hello'],
+        container_image: 'test',
+        output_path: 'test',
+        scheduling_parameters: sp,
+      },
+    }
+    assert_response :success
+
+    cr = JSON.parse(@response.body)
+    assert_not_nil cr, 'Expected container request'
+    assert_equal sp, cr['scheduling_parameters']
+  end
+end
index 10534a70610a8188d35863992f2810ac29195937..e9abf9d495bbaf937532e6ad44ebc1449254c66e 100644 (file)
@@ -55,12 +55,12 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
     assert_equal 0, json_response['items_available']
   end
 
-  def check_project_contents_response
+  def check_project_contents_response disabled_kinds=[]
     assert_response :success
     assert_operator 2, :<=, json_response['items_available']
     assert_operator 2, :<=, json_response['items'].count
     kinds = json_response['items'].collect { |i| i['kind'] }.uniq
-    expect_kinds = %w'arvados#group arvados#specimen arvados#pipelineTemplate arvados#job'
+    expect_kinds = %w'arvados#group arvados#specimen arvados#pipelineTemplate arvados#job' - disabled_kinds
     assert_equal expect_kinds, (expect_kinds & kinds)
 
     json_response['items'].each do |i|
@@ -69,6 +69,10 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
                "group#contents returned a non-project group")
       end
     end
+
+    disabled_kinds.each do |d|
+      assert_equal true, !kinds.include?(d)
+    end
   end
 
   test 'get group-owned objects' do
@@ -448,4 +452,15 @@ class Arvados::V1::GroupsControllerTest < ActionController::TestCase
       end
     end
   end
+
+  test 'get contents with jobs and pipeline instances disabled' do
+    Rails.configuration.disable_api_methods = ['jobs.index', 'pipeline_instances.index']
+
+    authorize_with :active
+    get :contents, {
+      id: groups(:aproject).uuid,
+      format: :json,
+    }
+    check_project_contents_response %w'arvados#pipelineInstance arvados#job'
+  end
 end
index 1465c7180ad7b59cb947b4cfb825d2957ea17844..c4d1efec30802d75025c9c4560457815fea7f80b 100644 (file)
@@ -230,10 +230,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
     cr.reload
     assert_equal "Committed", cr.state
 
+    output_pdh = '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'
+    log_pdh = 'fa7aeb5140e2848d39b416daeef4ffc5+45'
     act_as_system_user do
       c.update_attributes!(state: Container::Complete,
-                           output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
-                           log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+                           output: output_pdh,
+                           log: log_pdh)
     end
 
     cr.reload
@@ -244,6 +246,12 @@ class ContainerRequestTest < ActiveSupport::TestCase
                                        owner_uuid: project.uuid).count,
                    "Container #{out_type} should be copied to #{project.uuid}")
     end
+    assert_not_nil cr.output_uuid
+    assert_not_nil cr.log_uuid
+    output = Collection.find_by_uuid cr.output_uuid
+    assert_equal output_pdh, output.portable_data_hash
+    log = Collection.find_by_uuid cr.log_uuid
+    assert_equal log_pdh, log.portable_data_hash
   end
 
   test "Container makes container request, then is cancelled" do
index 6ca31c38329ec7347631a03c802b4216bd05f167..43cf83a07ead3db94b2620be74375c738d4e5d08 100644 (file)
@@ -8,7 +8,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net/http"
        "os"
        "regexp"
@@ -18,6 +17,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       log "github.com/Sirupsen/logrus"
        "github.com/curoverse/azure-sdk-for-go/storage"
 )
 
index d636a5ee86887806372a14e2f291e5c4f2c11b33..c5dbc8f5831402aa3e223391c3ad0ece918de0a3 100644 (file)
@@ -9,7 +9,6 @@ import (
        "flag"
        "fmt"
        "io/ioutil"
-       "log"
        "math/rand"
        "net"
        "net/http"
@@ -22,6 +21,7 @@ import (
        "testing"
        "time"
 
+       log "github.com/Sirupsen/logrus"
        "github.com/curoverse/azure-sdk-for-go/storage"
 )
 
index 9a3509424a3b10a1b8361c06be8475ddaa31f832..38f97aff1183d0da0fdaa05b3022765277c9a954 100644 (file)
@@ -1,10 +1,11 @@
 package main
 
 import (
-       "log"
        "sync"
        "sync/atomic"
        "time"
+
+       log "github.com/Sirupsen/logrus"
 )
 
 type bufferPool struct {
index dc06ef549877ba0316294e4a0e7767393ef4436d..83dd84ecc09d3ebf41c4407344b2387b3dc5fd4b 100644 (file)
@@ -5,17 +5,19 @@ import (
        "encoding/json"
        "fmt"
        "io/ioutil"
-       "log"
        "strings"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       log "github.com/Sirupsen/logrus"
 )
 
 type Config struct {
        Debug  bool
        Listen string
 
+       LogFormat string
+
        PIDFile string
 
        MaxBuffers  int
@@ -38,10 +40,13 @@ type Config struct {
 
 var theConfig = DefaultConfig()
 
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
 // DefaultConfig returns the default configuration.
 func DefaultConfig() *Config {
        return &Config{
                Listen:             ":25107",
+               LogFormat:          "json",
                MaxBuffers:         128,
                RequireSignatures:  true,
                BlobSignatureTTL:   arvados.Duration(14 * 24 * time.Hour),
@@ -55,12 +60,27 @@ func DefaultConfig() *Config {
 // fields, and before using the config.
 func (cfg *Config) Start() error {
        if cfg.Debug {
+               log.SetLevel(log.DebugLevel)
                cfg.debugLogf = log.Printf
                cfg.debugLogf("debugging enabled")
        } else {
                cfg.debugLogf = func(string, ...interface{}) {}
        }
 
+       switch strings.ToLower(cfg.LogFormat) {
+       case "text":
+               log.SetFormatter(&log.TextFormatter{
+                       FullTimestamp:   true,
+                       TimestampFormat: rfc3339NanoFixed,
+               })
+       case "json":
+               log.SetFormatter(&log.JSONFormatter{
+                       TimestampFormat: rfc3339NanoFixed,
+               })
+       default:
+               return fmt.Errorf(`unsupported log format %q (try "text" or "json")`, cfg.LogFormat)
+       }
+
        if cfg.MaxBuffers < 0 {
                return fmt.Errorf("MaxBuffers must be greater than zero")
        }
index eaa09042484388dd8185b2d8c79679034c6951f7..a6d46e5e4a1166967f5d3be9c22581d3c1401ef9 100644 (file)
@@ -1,7 +1,7 @@
 package main
 
 import (
-       "log"
+       log "github.com/Sirupsen/logrus"
 )
 
 func init() {
diff --git a/services/keepstore/count.go b/services/keepstore/count.go
new file mode 100644 (file)
index 0000000..a9f7436
--- /dev/null
@@ -0,0 +1,44 @@
+package main
+
+import (
+       "io"
+)
+
+func NewCountingWriter(w io.Writer, f func(uint64)) io.WriteCloser {
+       return &countingReadWriter{
+               writer:  w,
+               counter: f,
+       }
+}
+
+func NewCountingReader(r io.Reader, f func(uint64)) io.ReadCloser {
+       return &countingReadWriter{
+               reader:  r,
+               counter: f,
+       }
+}
+
+type countingReadWriter struct {
+       reader  io.Reader
+       writer  io.Writer
+       counter func(uint64)
+}
+
+func (crw *countingReadWriter) Read(buf []byte) (int, error) {
+       n, err := crw.reader.Read(buf)
+       crw.counter(uint64(n))
+       return n, err
+}
+
+func (crw *countingReadWriter) Write(buf []byte) (int, error) {
+       n, err := crw.writer.Write(buf)
+       crw.counter(uint64(n))
+       return n, err
+}
+
+func (crw *countingReadWriter) Close() error {
+       if c, ok := crw.writer.(io.Closer); ok {
+               return c.Close()
+       }
+       return nil
+}
index 9708b4e6be32f96645d500dfcd4319972f213d47..40b4839e06cc96fc05cd8eb5a2be4e73707ac03d 100644 (file)
@@ -958,7 +958,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               (&LoggingRESTRouter{MakeRESTRouter()}).ServeHTTP(resp, req)
+               (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index 289dce15a06168572f5269d7fed82bdb31a75075..adaaa361e96177080a9df4e2b2f1d77aac98424d 100644 (file)
@@ -15,7 +15,6 @@ import (
        "fmt"
        "github.com/gorilla/mux"
        "io"
-       "log"
        "net/http"
        "os"
        "regexp"
@@ -24,13 +23,21 @@ import (
        "strings"
        "sync"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       log "github.com/Sirupsen/logrus"
 )
 
-// MakeRESTRouter returns a new mux.Router that forwards all Keep
-// requests to the appropriate handlers.
-//
-func MakeRESTRouter() *mux.Router {
+type router struct {
+       *mux.Router
+       limiter httpserver.RequestCounter
+}
+
+// MakeRESTRouter returns a new router that forwards all Keep requests
+// to the appropriate handlers.
+func MakeRESTRouter() *router {
        rest := mux.NewRouter()
+       rtr := &router{Router: rest}
 
        rest.HandleFunc(
                `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
@@ -46,8 +53,11 @@ func MakeRESTRouter() *mux.Router {
        // Privileged client only.
        rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
 
+       // Internals/debugging info (runtime.MemStats)
+       rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
+
        // List volumes: path, device number, bytes used/avail.
-       rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
+       rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
 
        // Replace the current pull queue.
        rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
@@ -62,7 +72,7 @@ func MakeRESTRouter() *mux.Router {
        // 400 Bad Request.
        rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
-       return rest
+       return rtr
 }
 
 // BadRequestHandler is a HandleFunc to address bad requests.
@@ -239,18 +249,6 @@ func IndexHandler(resp http.ResponseWriter, req *http.Request) {
        resp.Write([]byte{'\n'})
 }
 
-// StatusHandler
-//     Responds to /status.json requests with the current node status,
-//     described in a JSON structure.
-//
-//     The data given in a status.json response includes:
-//        volumes - a list of Keep volumes currently in use by this server
-//          each volume is an object with the following fields:
-//            * mount_point
-//            * device_num (an integer identifying the underlying filesystem)
-//            * bytes_free
-//            * bytes_used
-
 // PoolStatus struct
 type PoolStatus struct {
        Alloc uint64 `json:"BytesAllocated"`
@@ -258,22 +256,43 @@ type PoolStatus struct {
        Len   int    `json:"BuffersInUse"`
 }
 
+type volumeStatusEnt struct {
+       Label         string
+       Status        *VolumeStatus `json:",omitempty"`
+       VolumeStats   *ioStats      `json:",omitempty"`
+       InternalStats interface{}   `json:",omitempty"`
+}
+
 // NodeStatus struct
 type NodeStatus struct {
-       Volumes    []*VolumeStatus `json:"volumes"`
-       BufferPool PoolStatus
-       PullQueue  WorkQueueStatus
-       TrashQueue WorkQueueStatus
-       Memory     runtime.MemStats
+       Volumes         []*volumeStatusEnt
+       BufferPool      PoolStatus
+       PullQueue       WorkQueueStatus
+       TrashQueue      WorkQueueStatus
+       RequestsCurrent int
+       RequestsMax     int
 }
 
 var st NodeStatus
 var stLock sync.Mutex
 
+// DebugHandler addresses /debug.json requests.
+func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
+       type debugStats struct {
+               MemStats runtime.MemStats
+       }
+       var ds debugStats
+       runtime.ReadMemStats(&ds.MemStats)
+       err := json.NewEncoder(resp).Encode(&ds)
+       if err != nil {
+               http.Error(resp, err.Error(), 500)
+       }
+}
+
 // StatusHandler addresses /status.json requests.
-func StatusHandler(resp http.ResponseWriter, req *http.Request) {
+func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
-       readNodeStatus(&st)
+       rtr.readNodeStatus(&st)
        jstat, err := json.Marshal(&st)
        stLock.Unlock()
        if err == nil {
@@ -286,23 +305,33 @@ func StatusHandler(resp http.ResponseWriter, req *http.Request) {
 }
 
 // populate the given NodeStatus struct with current values.
-func readNodeStatus(st *NodeStatus) {
+func (rtr *router) readNodeStatus(st *NodeStatus) {
        vols := KeepVM.AllReadable()
        if cap(st.Volumes) < len(vols) {
-               st.Volumes = make([]*VolumeStatus, len(vols))
+               st.Volumes = make([]*volumeStatusEnt, len(vols))
        }
        st.Volumes = st.Volumes[:0]
        for _, vol := range vols {
-               if s := vol.Status(); s != nil {
-                       st.Volumes = append(st.Volumes, s)
+               var internalStats interface{}
+               if vol, ok := vol.(InternalStatser); ok {
+                       internalStats = vol.InternalStats()
                }
+               st.Volumes = append(st.Volumes, &volumeStatusEnt{
+                       Label:         vol.String(),
+                       Status:        vol.Status(),
+                       InternalStats: internalStats,
+                       //VolumeStats: KeepVM.VolumeStats(vol),
+               })
        }
        st.BufferPool.Alloc = bufs.Alloc()
        st.BufferPool.Cap = bufs.Cap()
        st.BufferPool.Len = bufs.Len()
        st.PullQueue = getWorkQueueStatus(pullq)
        st.TrashQueue = getWorkQueueStatus(trashq)
-       runtime.ReadMemStats(&st.Memory)
+       if rtr.limiter != nil {
+               st.RequestsCurrent = rtr.limiter.Current()
+               st.RequestsMax = rtr.limiter.Max()
+       }
 }
 
 // return a WorkQueueStatus for the given queue. If q is nil (which
index 2f5f8d43ea70c5953a71b0936b124c3b148f1ad5..54147959719183141a8e3137d5d1363ec9667e6b 100644 (file)
@@ -3,7 +3,6 @@ package main
 import (
        "flag"
        "fmt"
-       "log"
        "net"
        "net/http"
        "os"
@@ -15,6 +14,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
 )
@@ -114,6 +114,9 @@ func main() {
        }
 
        err = theConfig.Start()
+       if err != nil {
+               log.Fatal(err)
+       }
 
        if pidfile := theConfig.PIDFile; pidfile != "" {
                f, err := os.OpenFile(pidfile, os.O_RDWR|os.O_CREATE, 0777)
@@ -147,10 +150,10 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware stack: logger, MaxRequests limiter, method handlers
-       http.Handle("/", &LoggingRESTRouter{
-               httpserver.NewRequestLimiter(theConfig.MaxRequests,
-                       MakeRESTRouter()),
-       })
+       router := MakeRESTRouter()
+       limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
+       router.limiter = limiter
+       http.Handle("/", &LoggingRESTRouter{router: limiter})
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
index 0f556b538ac7ae15b1939f61bad13be3ed0404e5..bfd006ee8d2f3576332b8a3be4c6b040cce14214 100644 (file)
@@ -4,10 +4,14 @@ package main
 // LoggingResponseWriter
 
 import (
-       "log"
+       "context"
+       "fmt"
        "net/http"
        "strings"
        "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       log "github.com/Sirupsen/logrus"
 )
 
 // LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
@@ -57,21 +61,61 @@ func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
 
 // LoggingRESTRouter is used to add logging capabilities to mux.Router
 type LoggingRESTRouter struct {
-       router http.Handler
+       router      http.Handler
+       idGenerator httpserver.IDGenerator
 }
 
 func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
-       t0 := time.Now()
+       tStart := time.Now()
+
+       // Attach a requestID-aware logger to the request context.
+       lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
+       ctx := context.WithValue(req.Context(), "logger", lgr)
+       req = req.WithContext(ctx)
+
+       lgr = lgr.WithFields(log.Fields{
+               "remoteAddr":      req.RemoteAddr,
+               "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+               "reqMethod":       req.Method,
+               "reqPath":         req.URL.Path[1:],
+               "reqBytes":        req.ContentLength,
+       })
+       lgr.Debug("request")
+
        resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
        loggingRouter.router.ServeHTTP(&resp, req)
+       tDone := time.Now()
+
        statusText := http.StatusText(resp.Status)
        if resp.Status >= 400 {
                statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
        }
-       now := time.Now()
-       tTotal := now.Sub(t0)
-       tLatency := resp.sentHdr.Sub(t0)
-       tResponse := now.Sub(resp.sentHdr)
-       log.Printf("[%s] %s %s %d %.6fs %.6fs %.6fs %d %d \"%s\"", req.RemoteAddr, req.Method, req.URL.Path[1:], req.ContentLength, tTotal.Seconds(), tLatency.Seconds(), tResponse.Seconds(), resp.Status, resp.Length, statusText)
+       if resp.sentHdr == zeroTime {
+               // Nobody changed status or wrote any data, i.e., we
+               // returned a 200 response with no body.
+               resp.sentHdr = tDone
+       }
+
+       lgr.WithFields(log.Fields{
+               "timeTotal":      loggedDuration(tDone.Sub(tStart)),
+               "timeToStatus":   loggedDuration(resp.sentHdr.Sub(tStart)),
+               "timeWriteBody":  loggedDuration(tDone.Sub(resp.sentHdr)),
+               "respStatusCode": resp.Status,
+               "respStatus":     statusText,
+               "respBytes":      resp.Length,
+       }).Info("response")
+}
+
+type loggedDuration time.Duration
+
+// MarshalJSON formats a duration as a number of seconds, using
+// fixed-point notation with no more than 6 decimal places.
+func (d loggedDuration) MarshalJSON() ([]byte, error) {
+       return []byte(d.String()), nil
+}
 
+// String formats a duration as a number of seconds, using
+// fixed-point notation with no more than 6 decimal places.
+func (d loggedDuration) String() string {
+       return fmt.Sprintf("%.6f", time.Duration(d).Seconds())
 }
index 12860bb662d91a1e31191fed2bccace97bc2ac30..3c6278d478d3d897982b2b6f8c9a166c505e6433 100644 (file)
@@ -7,8 +7,9 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "io"
        "io/ioutil"
-       "log"
        "time"
+
+       log "github.com/Sirupsen/logrus"
 )
 
 // RunPullWorker is used by Keepstore to initiate pull worker channel goroutine.
index 17923f807dc8a8f11bc77ce8dc0732001a4a8ba8..ca5b1a2eb945cb2ae940c9599c955fb59d9e489a 100644 (file)
@@ -9,17 +9,18 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net/http"
        "os"
        "regexp"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
+       log "github.com/Sirupsen/logrus"
 )
 
 const (
@@ -148,7 +149,7 @@ type S3Volume struct {
        ReadOnly           bool
        UnsafeDelete       bool
 
-       bucket *s3.Bucket
+       bucket *s3bucket
 
        startOnce sync.Once
 }
@@ -230,9 +231,11 @@ func (v *S3Volume) Start() error {
        client := s3.New(auth, region)
        client.ConnectTimeout = time.Duration(v.ConnectTimeout)
        client.ReadTimeout = time.Duration(v.ReadTimeout)
-       v.bucket = &s3.Bucket{
-               S3:   client,
-               Name: v.Bucket,
+       v.bucket = &s3bucket{
+               Bucket: &s3.Bucket{
+                       S3:   client,
+                       Name: v.Bucket,
+               },
        }
        return nil
 }
@@ -269,6 +272,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
        if err == nil || !os.IsNotExist(err) {
                return
        }
+
        _, err = v.bucket.Head("recent/"+loc, nil)
        err = v.translateError(err)
        if err != nil {
@@ -280,6 +284,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
                err = os.ErrNotExist
                return
        }
+
        rdr, err = v.bucket.GetReader(loc)
        if err != nil {
                log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
@@ -442,16 +447,19 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        // Use a merge sort to find matching sets of X and recent/X.
        dataL := s3Lister{
-               Bucket:   v.bucket,
+               Bucket:   v.bucket.Bucket,
                Prefix:   prefix,
                PageSize: v.IndexPageSize,
        }
        recentL := s3Lister{
-               Bucket:   v.bucket,
+               Bucket:   v.bucket.Bucket,
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
        }
+       v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
+       v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
        for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
+               v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                if data.Key >= "g" {
                        // Conveniently, "recent/*" and "trash/*" are
                        // lexically greater than all hex-encoded data
@@ -473,10 +481,12 @@ func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
                for recent != nil {
                        if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
                                recent = recentL.Next()
+                               v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                continue
                        } else if cmp == 0 {
                                stamp = recent
                                recent = recentL.Next()
+                               v.bucket.stats.tick(&v.bucket.stats.Ops, &v.bucket.stats.ListOps)
                                break
                        } else {
                                // recent/X marker is missing: we'll
@@ -508,7 +518,7 @@ func (v *S3Volume) Trash(loc string) error {
                if !s3UnsafeDelete {
                        return ErrS3TrashDisabled
                }
-               return v.bucket.Del(loc)
+               return v.translateError(v.bucket.Del(loc))
        }
        err := v.checkRaceWindow(loc)
        if err != nil {
@@ -611,9 +621,14 @@ func (v *S3Volume) Status() *VolumeStatus {
        }
 }
 
+// InternalStats returns bucket I/O and API call counters.
+func (v *S3Volume) InternalStats() interface{} {
+       return &v.bucket.stats
+}
+
 // String implements fmt.Stringer.
 func (v *S3Volume) String() string {
-       return fmt.Sprintf("s3-bucket:%+q", v.bucket.Name)
+       return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
 }
 
 // Writable returns false if all future Put, Mtime, and Delete calls
@@ -702,7 +717,7 @@ func (v *S3Volume) EmptyTrash() {
 
        // Use a merge sort to find matching sets of trash/X and recent/X.
        trashL := s3Lister{
-               Bucket:   v.bucket,
+               Bucket:   v.bucket.Bucket,
                Prefix:   "trash/",
                PageSize: v.IndexPageSize,
        }
@@ -752,7 +767,9 @@ func (v *S3Volume) EmptyTrash() {
                                v.fixRace(loc)
                                v.Touch(loc)
                                continue
-                       } else if _, err := v.bucket.Head(loc, nil); os.IsNotExist(err) {
+                       }
+                       _, err := v.bucket.Head(loc, nil)
+                       if os.IsNotExist(err) {
                                log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
                                v.fixRace(loc)
                                continue
@@ -846,3 +863,91 @@ func (lister *s3Lister) pop() (k *s3.Key) {
        }
        return
 }
+
+// s3bucket wraps s3.bucket and counts I/O and API usage stats.
+type s3bucket struct {
+       *s3.Bucket
+       stats s3bucketStats
+}
+
+func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
+       rdr, err := b.Bucket.GetReader(path)
+       b.stats.tick(&b.stats.Ops, &b.stats.GetOps)
+       b.stats.tickErr(err)
+       return NewCountingReader(rdr, b.stats.tickInBytes), err
+}
+
+func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
+       resp, err := b.Bucket.Head(path, headers)
+       b.stats.tick(&b.stats.Ops, &b.stats.HeadOps)
+       b.stats.tickErr(err)
+       return resp, err
+}
+
+func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType string, perm s3.ACL, options s3.Options) error {
+       err := b.Bucket.PutReader(path, NewCountingReader(r, b.stats.tickOutBytes), length, contType, perm, options)
+       b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+       b.stats.tickErr(err)
+       return err
+}
+
+func (b *s3bucket) Put(path string, data []byte, contType string, perm s3.ACL, options s3.Options) error {
+       err := b.Bucket.PutReader(path, NewCountingReader(bytes.NewBuffer(data), b.stats.tickOutBytes), int64(len(data)), contType, perm, options)
+       b.stats.tick(&b.stats.Ops, &b.stats.PutOps)
+       b.stats.tickErr(err)
+       return err
+}
+
+func (b *s3bucket) Del(path string) error {
+       err := b.Bucket.Del(path)
+       b.stats.tick(&b.stats.Ops, &b.stats.DelOps)
+       b.stats.tickErr(err)
+       return err
+}
+
+type s3bucketStats struct {
+       Errors   uint64
+       Ops      uint64
+       GetOps   uint64
+       PutOps   uint64
+       HeadOps  uint64
+       DelOps   uint64
+       ListOps  uint64
+       InBytes  uint64
+       OutBytes uint64
+
+       ErrorCodes map[string]uint64 `json:",omitempty"`
+
+       lock sync.Mutex
+}
+
+func (s *s3bucketStats) tickInBytes(n uint64) {
+       atomic.AddUint64(&s.InBytes, n)
+}
+
+func (s *s3bucketStats) tickOutBytes(n uint64) {
+       atomic.AddUint64(&s.OutBytes, n)
+}
+
+func (s *s3bucketStats) tick(counters ...*uint64) {
+       for _, counter := range counters {
+               atomic.AddUint64(counter, 1)
+       }
+}
+
+func (s *s3bucketStats) tickErr(err error) {
+       if err == nil {
+               return
+       }
+       atomic.AddUint64(&s.Errors, 1)
+       errStr := fmt.Sprintf("%T", err)
+       if err, ok := err.(*s3.Error); ok {
+               errStr = errStr + fmt.Sprintf(" %d %s", err.StatusCode, err.Code)
+       }
+       s.lock.Lock()
+       if s.ErrorCodes == nil {
+               s.ErrorCodes = make(map[string]uint64)
+       }
+       s.ErrorCodes[errStr]++
+       s.lock.Unlock()
+}
index 63b186220c30a562e900722d63d38be50bde05d6..6389d503dfc5ccee32471d308bf81ee64b012135 100644 (file)
@@ -4,15 +4,16 @@ import (
        "bytes"
        "context"
        "crypto/md5"
+       "encoding/json"
        "fmt"
        "io/ioutil"
-       "log"
        "os"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
+       log "github.com/Sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
@@ -82,6 +83,35 @@ func (s *StubbedS3Suite) TestIndex(c *check.C) {
        }
 }
 
+func (s *StubbedS3Suite) TestStats(c *check.C) {
+       v := s.newTestableVolume(c, 5*time.Minute, false, 2)
+       stats := func() string {
+               buf, err := json.Marshal(v.InternalStats())
+               c.Check(err, check.IsNil)
+               return string(buf)
+       }
+
+       c.Check(stats(), check.Matches, `.*"Ops":0,.*`)
+
+       loc := "acbd18db4cc2f85cedef654fccc4a4d8"
+       _, err := v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.NotNil)
+       c.Check(stats(), check.Matches, `.*"Ops":[^0],.*`)
+       c.Check(stats(), check.Matches, `.*"\*s3.Error 404 [^"]*":[^0].*`)
+       c.Check(stats(), check.Matches, `.*"InBytes":0,.*`)
+
+       err = v.Put(context.Background(), loc, []byte("foo"))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"OutBytes":3,.*`)
+       c.Check(stats(), check.Matches, `.*"PutOps":2,.*`)
+
+       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       _, err = v.Get(context.Background(), loc, make([]byte, 3))
+       c.Check(err, check.IsNil)
+       c.Check(stats(), check.Matches, `.*"InBytes":6,.*`)
+}
+
 func (s *StubbedS3Suite) TestBackendStates(c *check.C) {
        defer func(tl, bs arvados.Duration) {
                theConfig.TrashLifetime = tl
index 27d6216d01633feca360de94f0a8febaabfb475a..696c3e53a60abbd352efc035f5a0bb1afaec737f 100644 (file)
@@ -2,10 +2,10 @@ package main
 
 import (
        "errors"
-       "log"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       log "github.com/Sirupsen/logrus"
 )
 
 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
index 29f89f567d7b64729d10ecde3a4699186f233711..887cfd3a9edf80f5ef9a620efa880b9bc93856d8 100644 (file)
@@ -48,6 +48,10 @@ Listen:
     "address" is a host IP address or name and "port" is a port number
     or name.
 
+LogFormat:
+
+    Format of request/response and error logs: "json" or "text".
+
 PIDFile:
 
    Path to write PID file during startup. This file is kept open and
index 57e18aba9f691ceb43f32a928a0e3a95e9d505ec..b72258d51a5e6358c190227db08d5bf8c419dfe4 100644 (file)
@@ -243,6 +243,10 @@ type VolumeManager interface {
        // with more free space, etc.
        NextWritable() Volume
 
+       // VolumeStats returns the ioStats used for tracking stats for
+       // the given Volume.
+       VolumeStats(Volume) *ioStats
+
        // Close shuts down the volume manager cleanly.
        Close()
 }
@@ -254,12 +258,16 @@ type RRVolumeManager struct {
        readables []Volume
        writables []Volume
        counter   uint32
+       iostats   map[Volume]*ioStats
 }
 
 // MakeRRVolumeManager initializes RRVolumeManager
 func MakeRRVolumeManager(volumes []Volume) *RRVolumeManager {
-       vm := &RRVolumeManager{}
+       vm := &RRVolumeManager{
+               iostats: make(map[Volume]*ioStats),
+       }
        for _, v := range volumes {
+               vm.iostats[v] = &ioStats{}
                vm.readables = append(vm.readables, v)
                if v.Writable() {
                        vm.writables = append(vm.writables, v)
@@ -287,18 +295,35 @@ func (vm *RRVolumeManager) NextWritable() Volume {
        return vm.writables[i%uint32(len(vm.writables))]
 }
 
+// VolumeStats returns an ioStats for the given volume.
+func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
+       return vm.iostats[v]
+}
+
 // Close the RRVolumeManager
 func (vm *RRVolumeManager) Close() {
 }
 
-// VolumeStatus provides status information of the volume consisting of:
-//   * mount_point
-//   * device_num (an integer identifying the underlying storage system)
-//   * bytes_free
-//   * bytes_used
+// VolumeStatus describes the current condition of a volume
 type VolumeStatus struct {
-       MountPoint string `json:"mount_point"`
-       DeviceNum  uint64 `json:"device_num"`
-       BytesFree  uint64 `json:"bytes_free"`
-       BytesUsed  uint64 `json:"bytes_used"`
+       MountPoint string
+       DeviceNum  uint64
+       BytesFree  uint64
+       BytesUsed  uint64
+}
+
+// ioStats tracks I/O statistics for a volume or server
+type ioStats struct {
+       Errors     uint64
+       Ops        uint64
+       CompareOps uint64
+       GetOps     uint64
+       PutOps     uint64
+       TouchOps   uint64
+       InBytes    uint64
+       OutBytes   uint64
+}
+
+type InternalStatser interface {
+       InternalStats() interface{}
 }
index 5239ed37402c93f25af0d6c65c03f6a953597cda..fff02aac260f59a6fc46fc24cbebea57b27e5743 100644 (file)
@@ -7,7 +7,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
        "path/filepath"
        "regexp"
@@ -16,6 +15,8 @@ import (
        "sync"
        "syscall"
        "time"
+
+       log "github.com/Sirupsen/logrus"
 )
 
 type unixVolumeAdder struct {
@@ -322,7 +323,12 @@ func (v *UnixVolume) Status() *VolumeStatus {
        // uses fs.Blocks - fs.Bfree.
        free := fs.Bavail * uint64(fs.Bsize)
        used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
-       return &VolumeStatus{v.Root, devnum, free, used}
+       return &VolumeStatus{
+               MountPoint: v.Root,
+               DeviceNum:  devnum,
+               BytesFree:  free,
+               BytesUsed:  used,
+       }
 }
 
 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
index 6568a887b3c829681de54cc294b68389f3cc0001..a894350970f115594300befe9eab2798946cdfe4 100644 (file)
@@ -68,6 +68,8 @@ RUN set -e && \
  tar -C /usr/local -xjf /tmp/$PJS.tar.bz2 && \
  ln -s ../$PJS/bin/phantomjs /usr/local/bin/
 
+RUN pip install -U setuptools
+
 ARG arvados_version
 RUN echo arvados_version is git commit $arvados_version
 
index b3dfedcf83c4d22d346010c8ddbee138d67ad8fd..230a189a9a244c79d94380960f82d4ad52ddb6c3 100644 (file)
@@ -1,4 +1,8 @@
 
+export PATH=${PATH}:/usr/local/go/bin:/var/lib/gems/bin
+export GEM_HOME=/var/lib/gems
+export GEM_PATH=/var/lib/gems
+
 if test -s /var/run/localip_override ; then
     localip=$(cat /var/run/localip_override)
 else