Merge remote-tracking branch 'origin/master' into 14645-fuse-operations-reporting
authorEric Biagiotti <ebiagiotti@veritasgenetics.com>
Wed, 13 Mar 2019 17:49:48 +0000 (13:49 -0400)
committerEric Biagiotti <ebiagiotti@veritasgenetics.com>
Wed, 13 Mar 2019 17:49:48 +0000 (13:49 -0400)
refs #14645

Arvados-DCO-1.1-Signed-off-by: Eric Biagiotti <ebiagiotti@veritasgenetics.com>

37 files changed:
build/run-build-packages.sh
build/run-library.sh
doc/api/methods/groups.html.textile.liquid
docker/jobs/Dockerfile
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool_test.go
sdk/python/arvados/commands/keepdocker.py
sdk/python/arvados/commands/put.py
services/api/app/controllers/application_controller.rb
services/api/app/controllers/arvados/v1/groups_controller.rb
services/api/app/models/arvados_model.rb
services/api/app/models/group.rb
services/api/app/models/link.rb
services/api/app/models/user.rb
services/api/config/application.default.yml
services/api/lib/refresh_permission_view.rb
services/api/script/permission-updater.rb [deleted file]
services/api/test/integration/groups_test.rb
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go
services/keepstore/config.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/metrics.go [new file with mode: 0644]
services/keepstore/mounts_test.go
services/keepstore/pipe_adapters.go
services/keepstore/proxy_remote_test.go
services/keepstore/pull_worker_test.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/stats_ticker.go
services/keepstore/unix_volume.go [moved from services/keepstore/volume_unix.go with 96% similarity]
services/keepstore/unix_volume_test.go [moved from services/keepstore/volume_unix_test.go with 97% similarity]
services/keepstore/volume.go
services/keepstore/volume_generic_test.go
services/keepstore/volume_test.go

index 6264e93f0f1e9b3d2d6634de35a76a0a55fd2588..b800d43e08a060e1231cf1bfdfee5facdca02ed9 100755 (executable)
@@ -345,6 +345,17 @@ fpm_build_virtualenv "arvados-docker-cleaner" "services/dockercleaner" "python3"
 # The Arvados crunchstat-summary tool
 fpm_build_virtualenv "crunchstat-summary" "tools/crunchstat-summary"
 
+# The cwltest package, which lives out of tree
+cd "$WORKSPACE"
+if [[ -e "$WORKSPACE/cwltest" ]]; then
+       rm -rf "$WORKSPACE/cwltest"
+fi
+git clone https://github.com/common-workflow-language/cwltest.git
+# signal to our build script that we want a cwltest executable installed in /usr/bin/
+mkdir cwltest/bin && touch cwltest/bin/cwltest
+fpm_build_virtualenv "cwltest" "cwltest"
+rm -rf "$WORKSPACE/cwltest"
+
 # Build the API server package
 test_rails_package_presence arvados-api-server "$WORKSPACE/services/api"
 if [[ "$?" == "0" ]]; then
index 40589fd565c258240fed5fe1057fad5ab38993b1..de9d67d41cbe3f94a6bb9ca1c9ac1a819051dc35 100755 (executable)
@@ -261,29 +261,33 @@ test_package_presence() {
     # Get the list of packages from the repos
 
     if [[ "$FORMAT" == "deb" ]]; then
-      debian_distros="jessie precise stretch trusty wheezy xenial bionic"
-
-      for D in ${debian_distros}; do
-        if [ ${pkgname:0:3} = "lib" ]; then
-          repo_subdir=${pkgname:0:4}
-        else
-          repo_subdir=${pkgname:0:1}
-        fi
+      declare -A dd
+      dd[debian8]=jessie
+      dd[debian9]=stretch
+      dd[debian10]=buster
+      dd[ubuntu1404]=trusty
+      dd[ubuntu1604]=xenial
+      dd[ubuntu1804]=bionic
+      D=${dd[$TARGET]}
+      if [ ${pkgname:0:3} = "lib" ]; then
+        repo_subdir=${pkgname:0:4}
+      else
+        repo_subdir=${pkgname:0:1}
+      fi
 
-        repo_pkg_list=$(curl -s -o - http://apt.arvados.org/pool/${D}/main/${repo_subdir}/)
-        echo ${repo_pkg_list} |grep -q ${complete_pkgname}
-        if [ $? -eq 0 ] ; then
-          echo "Package $complete_pkgname exists, not rebuilding!"
-          curl -s -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
-          return 1
-        elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
-          echo "Package $complete_pkgname exists, not rebuilding!"
-          return 1
-        else
-          echo "Package $complete_pkgname not found, building"
-          return 0
-        fi
-      done
+      repo_pkg_list=$(curl -s -o - http://apt.arvados.org/pool/${D}/main/${repo_subdir}/)
+      echo ${repo_pkg_list} |grep -q ${complete_pkgname}
+      if [ $? -eq 0 ] ; then
+        echo "Package $complete_pkgname exists, not rebuilding!"
+        curl -s -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
+        return 1
+      elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
+        echo "Package $complete_pkgname exists, not rebuilding!"
+        return 1
+      else
+        echo "Package $complete_pkgname not found, building"
+        return 0
+      fi
     else
       centos_repo="http://rpm.arvados.org/CentOS/7/dev/x86_64/"
 
@@ -423,7 +427,8 @@ fpm_build_virtualenv () {
     echo "  $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
     exit 1
   fi
-  if ! $python setup.py $DASHQ_UNLESS_DEBUG sdist; then
+  # filter a useless warning (when building the cwltest package) from the stderr output
+  if ! $python setup.py $DASHQ_UNLESS_DEBUG sdist 2> >(grep -v 'warning: no previously-included files matching'); then
     echo "Error, unable to run $python setup.py sdist for $PKG"
     exit 1
   fi
@@ -450,7 +455,7 @@ fpm_build_virtualenv () {
 
   rm -rf build
   rm -f $PYTHON_PKG*deb
-
+  echo "virtualenv version: `virtualenv --version`"
   virtualenv_command="virtualenv --python `which $python` $DASHQ_UNLESS_DEBUG build/usr/share/$python/dist/$PYTHON_PKG"
 
   if ! $virtualenv_command; then
@@ -464,16 +469,21 @@ fpm_build_virtualenv () {
     echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip"
     exit 1
   fi
+  echo "pip version:        `build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip --version`"
+
   if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools; then
     echo "Error, unable to upgrade setuptools with"
     echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
     exit 1
   fi
+  echo "setuptools version: `build/usr/share/$python/dist/$PYTHON_PKG/bin/$python -c 'import setuptools; print(setuptools.__version__)'`"
+
   if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then
     echo "Error, unable to upgrade wheel with"
     echo "  build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel"
     exit 1
   fi
+  echo "wheel version:      `build/usr/share/$python/dist/$PYTHON_PKG/bin/wheel version`"
 
   if [[ "$TARGET" != "centos7" ]] || [[ "$PYTHON_PKG" != "python-arvados-fuse" ]]; then
     build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
index c6f78b92978e366454928d68cce9f9d044afe792..9c75fa8ec7aec59d9ffea144cbf67d9a6caae7b0 100644 (file)
@@ -68,6 +68,7 @@ Arguments:
 table(table table-bordered table-condensed).
 |_. Argument |_. Type |_. Description |_. Location |_. Example |
 |group|object||query||
+|async|boolean (default false)|Defer the permissions graph update by a configured number of seconds. (By default, @async_permissions_update_interval@ is 20 seconds). On success, the response is 202 (Accepted).|query|@true@|
 
 h3. delete
 
@@ -115,6 +116,7 @@ table(table table-bordered table-condensed).
 |_. Argument |_. Type |_. Description |_. Location |_. Example |
 {background:#ccffcc}.|uuid|string|The UUID of the Group in question.|path||
 |group|object||query||
+|async|boolean (default false)|Defer the permissions graph update by a configured number of seconds. (By default, @async_permissions_update_interval@ is 20 seconds). On success, the response is 202 (Accepted).|query|@true@|
 
 h3. untrash
 
index 02a1c3829d432e284a770d11459aaa111bff57db..079276e52c56cc9bda844b8db13607a16c86bd86 100644 (file)
@@ -3,7 +3,7 @@
 # SPDX-License-Identifier: Apache-2.0
 
 # Based on Debian Stretch
-FROM debian:stretch
+FROM debian:stretch-slim
 MAINTAINER Ward Vandewege <wvandewege@veritasgenetics.com>
 
 ENV DEBIAN_FRONTEND noninteractive
index 5873e492213b86f58eaa98850c5c00c073cd2aee..4df39d0c46ac7dc538e5c7d9949cd884df8768b4 100644 (file)
@@ -371,7 +371,9 @@ func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
 }
 
 func (si stubInstance) Tags() cloud.InstanceTags {
-       return si.tags
+       // Return a copy to ensure a caller can't change our saved
+       // tags just by writing to the returned map.
+       return copyTags(si.tags)
 }
 
 func (si stubInstance) String() string {
index da9e650b8121889511886e9b16dc8eb827fcee14..fc33a7ab235d7a733903903219302a81c8fc44d0 100644 (file)
@@ -105,6 +105,23 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                        pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
                }
        }
+       // Wait for the tags to save to the cloud provider
+       deadline := time.Now().Add(time.Second)
+       for !func() bool {
+               pool.mtx.RLock()
+               defer pool.mtx.RUnlock()
+               for _, wkr := range pool.workers {
+                       if wkr.instType == type2 {
+                               return wkr.instance.Tags()[tagKeyIdleBehavior] == string(IdleBehaviorHold)
+                       }
+               }
+               return false
+       }() {
+               if time.Now().After(deadline) {
+                       c.Fatal("timeout")
+               }
+               time.Sleep(time.Millisecond * 10)
+       }
        pool.Stop()
 
        c.Log("------- starting new pool, waiting to recover state")
index ec2a9942a6794153ea69138ba467a20f8b1ae6a6..d4fecc47b4d23fbe3b35d734f7c2428ebbe622e1 100644 (file)
@@ -64,13 +64,6 @@ _group.add_argument(
     '--no-pull', action='store_false', dest='pull',
     help="Use locally installed image only, don't pull image from Docker registry (default)")
 
-keepdocker_parser.add_argument(
-    'image', nargs='?',
-    help="Docker image to upload: repo, repo:tag, or hash")
-keepdocker_parser.add_argument(
-    'tag', nargs='?',
-    help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
-
 # Combine keepdocker options listed above with run_opts options of arv-put.
 # The options inherited from arv-put include --name, --project-uuid,
 # --progress/--no-progress/--batch-progress and --resume/--no-resume.
@@ -78,6 +71,13 @@ arg_parser = argparse.ArgumentParser(
         description="Upload or list Docker images in Arvados",
         parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
 
+arg_parser.add_argument(
+    'image', nargs='?',
+    help="Docker image to upload: repo, repo:tag, or hash")
+arg_parser.add_argument(
+    'tag', nargs='?',
+    help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
+
 class DockerError(Exception):
     pass
 
@@ -492,6 +492,9 @@ def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None)
 
         # Call arv-put with switches we inherited from it
         # (a.k.a., switches that aren't our own).
+        if arguments is None:
+            arguments = sys.argv[1:]
+        arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
         put_args = keepdocker_parser.parse_known_args(arguments)[1]
 
         if args.name is None:
index 54fa356d3a4537364ab770175addc742ade4826d..afe75b31056c5c7d872256875a74265012de3b91 100644 (file)
@@ -154,6 +154,29 @@ On high latency installations, using a greater number will improve
 overall throughput.
 """)
 
+upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
+                      action='append', help="""
+Exclude files and directories whose names match the given glob pattern. When
+using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
+directory, relative to the provided input dirs will be excluded.
+When using a filename pattern like '*.txt', any text file will be excluded
+no matter where it is placed.
+For the special case of needing to exclude only files or dirs directly below
+the given input directory, you can use a pattern like './exclude_this.gif'.
+You can specify multiple patterns by using this argument more than once.
+""")
+
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--follow-links', action='store_true', default=True,
+                    dest='follow_links', help="""
+Follow file and directory symlinks (default).
+""")
+_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+                    help="""
+Do not follow file and directory symlinks.
+""")
+
+
 run_opts = argparse.ArgumentParser(add_help=False)
 
 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
@@ -165,18 +188,6 @@ run_opts.add_argument('--name', help="""
 Save the collection with the specified name.
 """)
 
-run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
-                      action='append', help="""
-Exclude files and directories whose names match the given glob pattern. When
-using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
-directory, relative to the provided input dirs will be excluded.
-When using a filename pattern like '*.txt', any text file will be excluded
-no matter where is placed.
-For the special case of needing to exclude only files or dirs directly below
-the given input directory, you can use a pattern like './exclude_this.gif'.
-You can specify multiple patterns by using this argument more than once.
-""")
-
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--progress', action='store_true',
                     help="""
@@ -213,16 +224,6 @@ _group.add_argument('--no-resume', action='store_false', dest='resume',
 Do not continue interrupted uploads from cached state.
 """)
 
-_group = run_opts.add_mutually_exclusive_group()
-_group.add_argument('--follow-links', action='store_true', default=True,
-                    dest='follow_links', help="""
-Follow file and directory symlinks (default).
-""")
-_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
-                    help="""
-Do not follow file and directory symlinks.
-""")
-
 _group = run_opts.add_mutually_exclusive_group()
 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
                     help="""
index 3cfe5b54fda2a2a90afa168938ab5ff9e4760064..6dbba1a24b088ad036bdd78af93816cdcaca9970 100644 (file)
@@ -164,6 +164,10 @@ class ApplicationController < ActionController::Base
     send_error("Path not found", status: 404)
   end
 
+  def render_accepted
+    send_json ({accepted: true}), status: 202
+  end
+
   protected
 
   def send_error(*args)
index 98989db079db085816d2964b4db40a137d2955ff..6163f893ce7d3c71040f08f2a66d25731bb1ff85 100644 (file)
@@ -33,6 +33,57 @@ class Arvados::V1::GroupsController < ApplicationController
     params
   end
 
+  def self._create_requires_parameters
+    super.merge(
+      {
+        async: {
+          required: false,
+          type: 'boolean',
+          location: 'query',
+          default: false,
+          description: 'defer permissions update'
+        }
+      }
+    )
+  end
+
+  def self._update_requires_parameters
+    super.merge(
+      {
+        async: {
+          required: false,
+          type: 'boolean',
+          location: 'query',
+          default: false,
+          description: 'defer permissions update'
+        }
+      }
+    )
+  end
+
+  def create
+    if params[:async]
+      @object = model_class.new(resource_attrs.merge({async_permissions_update: true}))
+      @object.save!
+      render_accepted
+    else
+      super
+    end
+  end
+
+  def update
+    if params[:async]
+      attrs_to_update = resource_attrs.reject { |k, v|
+        [:kind, :etag, :href].index k
+      }.merge({async_permissions_update: true})
+      @object.update_attributes!(attrs_to_update)
+      @object.save!
+      render_accepted
+    else
+      super
+    end
+  end
+
   def render_404_if_no_object
     if params[:action] == 'contents'
       if !params[:uuid]
index eea95e2be1aad7c7535f48e430d5662d40149e04..2002e90acbde7d0375f36f9a3926bb01fb02b24d 100644 (file)
@@ -41,6 +41,11 @@ class ArvadosModel < ActiveRecord::Base
            class_name: 'Link',
            primary_key: :uuid)
 
+  # If async is true at create or update, permission graph
+  # update is deferred allowing making multiple calls without the performance
+  # penalty.
+  attr_accessor :async_permissions_update
+
   class PermissionDeniedError < RequestError
     def http_status
       403
index 7a7f0a3a600643cd43afe4d0eca3a2f66ef2a2b1..46bb447d10f507b6e4fe03b7983e6cfa25111016 100644 (file)
@@ -40,7 +40,7 @@ class Group < ArvadosModel
   def invalidate_permissions_cache
     # Ensure a new group can be accessed by the appropriate users
     # immediately after being created.
-    User.invalidate_permissions_cache db_current_time.to_i
+    User.invalidate_permissions_cache self.async_permissions_update
   end
 
   def assign_name
index bf21cf4b672263b784d24b4f2cfcb00d65c0b195..ac3efe310dc6435ae5b4b303991778c84838eb41 100644 (file)
@@ -68,7 +68,7 @@ class Link < ArvadosModel
       # permissions for head_uuid and tail_uuid, and invalidate the
       # cache for only those users. (This would require a browseable
       # cache.)
-      User.invalidate_permissions_cache db_current_time.to_i
+      User.invalidate_permissions_cache
     end
   end
 
index e621505418a585f55d7cc49160ff561b9f1ed0d4..8ed97e6b14c7b5ea313c856a4c548cbc591917ac 100644 (file)
@@ -141,16 +141,11 @@ class User < ArvadosModel
     true
   end
 
-  def self.invalidate_permissions_cache(timestamp=nil)
-    if Rails.configuration.async_permissions_update
-      timestamp = DbCurrentTime::db_current_time.to_i if timestamp.nil?
-      connection.execute "NOTIFY invalidate_permissions_cache, '#{timestamp}'"
-    else
-      refresh_permission_view
-    end
+  def self.invalidate_permissions_cache(async=false)
+    refresh_permission_view(async)
   end
 
-  def invalidate_permissions_cache(timestamp=nil)
+  def invalidate_permissions_cache
     User.invalidate_permissions_cache
   end
 
index d0f3a4caeb11d9f931772a8b1036fb257c2632fe..8f0dbf4e496b15e73d37083c5d2ac84dd1e8f61d 100644 (file)
@@ -189,6 +189,11 @@ common:
   # arrived, and deleted if their delete_at time has arrived.
   trash_sweep_interval: 60
 
+  # Interval (seconds) between asynchronous permission view updates. Any
+  # permission-updating API called with the 'async' parameter schedules a an
+  # update on the permission view in the future, if not already scheduled.
+  async_permissions_update_interval: 20
+
   # Maximum characters of (JSON-encoded) query parameters to include
   # in each request log entry. When params exceed this size, they will
   # be JSON-encoded, truncated to this size, and logged as
@@ -491,12 +496,6 @@ common:
   # (included in vendor packages).
   package_version: false
 
-  # Enable asynchronous permission graph rebuild.  Must run
-  # script/permission-updater.rb as a separate process.  When the permission
-  # cache is invalidated, the background process will update the permission
-  # graph cache.  This feature is experimental!
-  async_permissions_update: false
-
   # Default value for container_count_max for container requests.  This is the
   # number of times Arvados will create a new container to satisfy a container
   # request.  If a container is cancelled it will retry a new container if
index 4ee45ab088af1faea093a3de5af033b68955002a..25be3c08d4d40d8f7dbc7307e76bc7d2423997a6 100644 (file)
@@ -4,9 +4,37 @@
 
 PERMISSION_VIEW = "materialized_permission_view"
 
-def refresh_permission_view
+def do_refresh_permission_view
   ActiveRecord::Base.transaction do
     ActiveRecord::Base.connection.execute("LOCK TABLE permission_refresh_lock")
     ActiveRecord::Base.connection.execute("REFRESH MATERIALIZED VIEW #{PERMISSION_VIEW}")
   end
 end
+
+def refresh_permission_view(async=false)
+  if async and Rails.configuration.async_permissions_update_interval > 0
+    exp = Rails.configuration.async_permissions_update_interval.seconds
+    need = false
+    Rails.cache.fetch('AsyncRefreshPermissionView', expires_in: exp) do
+      need = true
+    end
+    if need
+      # Schedule a new permission update and return immediately
+      Thread.new do
+        Thread.current.abort_on_exception = false
+        begin
+          sleep(exp)
+          Rails.cache.delete('AsyncRefreshPermissionView')
+          do_refresh_permission_view
+        rescue => e
+          Rails.logger.error "Updating permission view: #{e}\n#{e.backtrace.join("\n\t")}"
+        ensure
+          ActiveRecord::Base.connection.close
+        end
+      end
+      true
+    end
+  else
+    do_refresh_permission_view
+  end
+end
diff --git a/services/api/script/permission-updater.rb b/services/api/script/permission-updater.rb
deleted file mode 100755 (executable)
index 985aa05..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-#!/usr/bin/env ruby
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
-require File.dirname(__FILE__) + '/../config/boot'
-require File.dirname(__FILE__) + '/../config/environment'
-include DbCurrentTime
-
-def update_permissions
-  timestamp = DbCurrentTime::db_current_time.to_i
-  Rails.logger.info "Begin updating permission cache"
-  User.all.each do |u|
-    u.calculate_group_permissions
-  end
-  Rails.cache.write "last_updated_permissions", timestamp
-  Rails.logger.info "Permission cache updated"
-end
-
-ActiveRecord::Base.connection_pool.with_connection do |connection|
-  conn = connection.instance_variable_get(:@connection)
-  begin
-    conn.async_exec "LISTEN invalidate_permissions_cache"
-
-    # Initial refresh of permissions graph
-    update_permissions
-
-    while true
-      # wait_for_notify will block until there is a change
-      # notification from Postgres about the permission cache,
-      # and then rebuild the permission cache.
-      conn.wait_for_notify do |channel, pid, payload|
-        last_updated = Rails.cache.read("last_updated_permissions")
-        Rails.logger.info "Got notify #{payload} last update #{last_updated}"
-        if last_updated.nil? || last_updated.to_i <= payload.to_i
-          update_permissions
-        end
-      end
-    end
-  ensure
-    # Don't want the connection to still be listening once we return
-    # it to the pool - could result in weird behavior for the next
-    # thread to check it out.
-    conn.async_exec "UNLISTEN *"
-  end
-end
index c4ab3cffc8877ffea777424b432122252f60440a..6b1bf795ed7eced1e872809358150c324e234239 100644 (file)
@@ -122,4 +122,36 @@ class GroupsTest < ActionDispatch::IntegrationTest
     assert_includes coll_uuids, collections(:foo_collection_in_aproject).uuid
     assert_not_includes coll_uuids, collections(:expired_collection).uuid
   end
+
+  test "create request with async=true defers permissions update" do
+    Rails.configuration.async_permissions_update_interval = 1 # seconds
+    name = "Random group #{rand(1000)}"
+    assert_equal nil, Group.find_by_name(name)
+    post "/arvados/v1/groups", {
+      group: {
+        name: name
+      },
+      async: true
+    }, auth(:active)
+    assert_response 202
+    g = Group.find_by_name(name)
+    assert_not_nil g
+    get "/arvados/v1/groups", {
+      filters: [["name", "=", name]].to_json,
+      limit: 10
+    }, auth(:active)
+    assert_response 200
+    assert_equal 0, json_response['items_available']
+
+    # Unblock the thread doing the permissions update
+    ActiveRecord::Base.clear_active_connections!
+
+    sleep(3)
+    get "/arvados/v1/groups", {
+      filters: [["name", "=", name]].to_json,
+      limit: 10
+    }, auth(:active)
+    assert_response 200
+    assert_equal 1, json_response['items_available']
+  end
 end
index 4f7339facf4ace001ac886a5076afc217e040c18..6b5b233c2a6701912ce06b1356fdb864778d0cf8 100644 (file)
@@ -23,6 +23,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/Azure/azure-sdk-for-go/storage"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
@@ -147,7 +148,7 @@ func (v *AzureBlobVolume) Type() string {
 }
 
 // Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error {
        if v.ContainerName == "" {
                return errors.New("no container name given")
        }
@@ -183,6 +184,10 @@ func (v *AzureBlobVolume) Start() error {
        } else if !ok {
                return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        return nil
 }
 
@@ -727,6 +732,7 @@ type azureContainer struct {
 }
 
 func (c *azureContainer) Exists() (bool, error) {
+       c.stats.TickOps("exists")
        c.stats.Tick(&c.stats.Ops)
        ok, err := c.ctr.Exists()
        c.stats.TickErr(err)
@@ -734,6 +740,7 @@ func (c *azureContainer) Exists() (bool, error) {
 }
 
 func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+       c.stats.TickOps("get_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetMetadata(nil)
@@ -742,6 +749,7 @@ func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, er
 }
 
 func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+       c.stats.TickOps("get_properties")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.GetProperties(nil)
@@ -750,6 +758,7 @@ func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobPropertie
 }
 
 func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+       c.stats.TickOps("get")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.Get(nil)
@@ -758,6 +767,7 @@ func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
 }
 
 func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+       c.stats.TickOps("get_range")
        c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
        b := c.ctr.GetBlobReference(bname)
        rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
@@ -785,6 +795,7 @@ func (r *readerWithAzureLen) Len() int {
 }
 
 func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+       c.stats.TickOps("create")
        c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
        if size != 0 {
                rdr = &readerWithAzureLen{
@@ -799,6 +810,7 @@ func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr i
 }
 
 func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+       c.stats.TickOps("set_metadata")
        c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
        b := c.ctr.GetBlobReference(bname)
        b.Metadata = m
@@ -808,6 +820,7 @@ func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, o
 }
 
 func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+       c.stats.TickOps("list")
        c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
        resp, err := c.ctr.ListBlobs(params)
        c.stats.TickErr(err)
@@ -815,6 +828,7 @@ func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.
 }
 
 func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+       c.stats.TickOps("delete")
        c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
        b := c.ctr.GetBlobReference(bname)
        err := b.Delete(opts)
index 85d0a1eea4ee7136668debc0fcbbdcd86aed30a5..cfad7577c59d850d25e9f2281a4ad374a60295af 100644 (file)
@@ -29,6 +29,7 @@ import (
 
        "github.com/Azure/azure-sdk-for-go/storage"
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -745,6 +746,21 @@ func (v *TestableAzureBlobVolume) Teardown() {
        v.azStub.Close()
 }
 
+func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
+       return "get", "create"
+}
+
+func (v *TestableAzureBlobVolume) DeviceID() string {
+       // Dummy device id for testing purposes
+       return "azure://azure_blob_volume_test"
+}
+
+func (v *TestableAzureBlobVolume) Start(vm *volumeMetricsVecs) error {
+       // Override original Start() to be able to assign CounterVecs with a dummy DeviceID
+       v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
+       return nil
+}
+
 func makeEtag() string {
        return fmt.Sprintf("0x%x", rand.Int63())
 }
index 2bd989de30c1bffc020777ba1ecbb895591570cf..43a2191111376fbf86c6943ffffff6c22668aa38 100644 (file)
@@ -13,6 +13,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
@@ -81,7 +82,7 @@ func DefaultConfig() *Config {
 
 // Start should be called exactly once: after setting all public
 // fields, and before using the config.
-func (cfg *Config) Start() error {
+func (cfg *Config) Start(reg *prometheus.Registry) error {
        if cfg.Debug {
                log.Level = logrus.DebugLevel
                cfg.debugLogf = log.Printf
@@ -143,8 +144,9 @@ func (cfg *Config) Start() error {
                        return fmt.Errorf("no volumes found")
                }
        }
+       vm := newVolumeMetricsVecs(reg)
        for _, v := range cfg.Volumes {
-               if err := v.Start(); err != nil {
+               if err := v.Start(vm); err != nil {
                        return fmt.Errorf("volume %s: %s", v, err)
                }
                log.Printf("Using volume %v (writable=%v)", v, v.Writable())
@@ -165,7 +167,7 @@ func (vl *VolumeList) UnmarshalJSON(data []byte) error {
        for _, factory := range VolumeTypes {
                t := factory().Type()
                if _, ok := typeMap[t]; ok {
-                       log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
+                       log.Fatalf("volume type %+q is claimed by multiple VolumeTypes", t)
                }
                typeMap[t] = factory
        }
index 32b360b1276940c9da69bc4b44b02785ffefc97f..ad907ef10138f213e3831223d867fd3c114736d9 100644 (file)
@@ -28,6 +28,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var testCluster = &arvados.Cluster{
@@ -845,7 +846,7 @@ func IssueRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter(testCluster)
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -857,7 +858,7 @@ func IssueHealthCheckRequest(rt *RequestTester) *httptest.ResponseRecorder {
        if rt.apiToken != "" {
                req.Header.Set("Authorization", "Bearer "+rt.apiToken)
        }
-       loggingRouter := MakeRESTRouter(testCluster)
+       loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
        loggingRouter.ServeHTTP(response, req)
        return response
 }
@@ -997,7 +998,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+               MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index 2a1bbc972ffa6e4fe0675291b0c923efc4d4ac8d..51dd73a513c1d4c729a6743aaabe0cefa1202c4b 100644 (file)
@@ -24,6 +24,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/health"
        "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "github.com/gorilla/mux"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type router struct {
@@ -31,14 +32,16 @@ type router struct {
        limiter     httpserver.RequestCounter
        cluster     *arvados.Cluster
        remoteProxy remoteProxy
+       metrics     *nodeMetrics
 }
 
 // MakeRESTRouter returns a new router that forwards all Keep requests
 // to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
        rtr := &router{
                Router:  mux.NewRouter(),
                cluster: cluster,
+               metrics: &nodeMetrics{reg: reg},
        }
 
        rtr.HandleFunc(
@@ -85,8 +88,12 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
        rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
 
        rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+       rtr.metrics.setupBufferPoolMetrics(bufs)
+       rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+       rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+       rtr.metrics.setupRequestMetrics(rtr.limiter)
 
-       instrumented := httpserver.Instrument(nil, nil,
+       instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
                httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
        return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
 }
index a6c8cd99545c24fdc2a56f6c2ff1866682a6ed6d..fcbdddacb1d585e995c8f23a0528be2ce8c1723c 100644 (file)
@@ -18,6 +18,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "github.com/coreos/go-systemd/daemon"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 var version = "dev"
@@ -121,7 +122,9 @@ func main() {
 
        log.Printf("keepstore %s started", version)
 
-       err = theConfig.Start()
+       metricsRegistry := prometheus.NewRegistry()
+
+       err = theConfig.Start(metricsRegistry)
        if err != nil {
                log.Fatal(err)
        }
@@ -174,7 +177,7 @@ func main() {
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
        // Middleware/handler stack
-       router := MakeRESTRouter(cluster)
+       router := MakeRESTRouter(cluster, metricsRegistry)
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/metrics.go b/services/keepstore/metrics.go
new file mode 100644 (file)
index 0000000..235c418
--- /dev/null
@@ -0,0 +1,137 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+type nodeMetrics struct {
+       reg *prometheus.Registry
+}
+
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_allocated_bytes",
+                       Help:      "Number of bytes allocated to buffers",
+               },
+               func() float64 { return float64(b.Alloc()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_max_buffers",
+                       Help:      "Maximum number of buffers allowed",
+               },
+               func() float64 { return float64(b.Cap()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "bufferpool_inuse_buffers",
+                       Help:      "Number of buffers in use",
+               },
+               func() float64 { return float64(b.Len()) },
+       ))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      fmt.Sprintf("%s_queue_inprogress_entries", qName),
+                       Help:      fmt.Sprintf("Number of %s requests in progress", qName),
+               },
+               func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      fmt.Sprintf("%s_queue_pending_entries", qName),
+                       Help:      fmt.Sprintf("Number of queued %s requests", qName),
+               },
+               func() float64 { return float64(getWorkQueueStatus(q).Queued) },
+       ))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "concurrent_requests",
+                       Help:      "Number of requests in progress",
+               },
+               func() float64 { return float64(rc.Current()) },
+       ))
+       m.reg.MustRegister(prometheus.NewGaugeFunc(
+               prometheus.GaugeOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "max_concurrent_requests",
+                       Help:      "Maximum number of concurrent requests",
+               },
+               func() float64 { return float64(rc.Max()) },
+       ))
+}
+
+type volumeMetricsVecs struct {
+       ioBytes     *prometheus.CounterVec
+       errCounters *prometheus.CounterVec
+       opsCounters *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+       m := &volumeMetricsVecs{}
+       m.opsCounters = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_operations",
+                       Help:      "Number of volume operations",
+               },
+               []string{"device_id", "operation"},
+       )
+       reg.MustRegister(m.opsCounters)
+       m.errCounters = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_errors",
+                       Help:      "Number of volume errors",
+               },
+               []string{"device_id", "error_type"},
+       )
+       reg.MustRegister(m.errCounters)
+       m.ioBytes = prometheus.NewCounterVec(
+               prometheus.CounterOpts{
+                       Namespace: "arvados",
+                       Subsystem: "keepstore",
+                       Name:      "volume_io_bytes",
+                       Help:      "Volume I/O traffic in bytes",
+               },
+               []string{"device_id", "direction"},
+       )
+       reg.MustRegister(m.ioBytes)
+
+       return m
+}
+
+func (vm *volumeMetricsVecs) getCounterVecsFor(lbls prometheus.Labels) (opsCV, errCV, ioCV *prometheus.CounterVec) {
+       opsCV = vm.opsCounters.MustCurryWith(lbls)
+       errCV = vm.errCounters.MustCurryWith(lbls)
+       ioCV = vm.ioBytes.MustCurryWith(lbls)
+       return
+}
index 31b1a684fe6a077ebbbfebf7bb846f6f508a00b5..7c932ee023b2a188433e34bbf773cc0eb8b64b08 100644 (file)
@@ -12,6 +12,7 @@ import (
        "net/http/httptest"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -28,15 +29,16 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.ManagementToken = arvadostest.ManagementToken
-       theConfig.Start()
-       s.rtr = MakeRESTRouter(testCluster)
+       r := prometheus.NewRegistry()
+       theConfig.Start(r)
+       s.rtr = MakeRESTRouter(testCluster, r)
 }
 
 func (s *MountsSuite) TearDownTest(c *check.C) {
        s.vm.Close()
        KeepVM = nil
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
 }
 
 func (s *MountsSuite) TestMounts(c *check.C) {
@@ -131,7 +133,9 @@ func (s *MountsSuite) TestMetrics(c *check.C) {
        }
        json.NewDecoder(resp.Body).Decode(&j)
        found := make(map[string]bool)
+       names := map[string]bool{}
        for _, g := range j {
+               names[g.Name] = true
                for _, m := range g.Metric {
                        if len(m.Label) == 2 && m.Label[0].Name == "code" && m.Label[0].Value == "200" && m.Label[1].Name == "method" && m.Label[1].Value == "put" {
                                c.Check(m.Summary.SampleCount, check.Equals, "2")
@@ -143,6 +147,24 @@ func (s *MountsSuite) TestMetrics(c *check.C) {
        }
        c.Check(found["request_duration_seconds"], check.Equals, true)
        c.Check(found["time_to_status_seconds"], check.Equals, true)
+
+       metricsNames := []string{
+               "arvados_keepstore_bufferpool_inuse_buffers",
+               "arvados_keepstore_bufferpool_max_buffers",
+               "arvados_keepstore_bufferpool_allocated_bytes",
+               "arvados_keepstore_pull_queue_inprogress_entries",
+               "arvados_keepstore_pull_queue_pending_entries",
+               "arvados_keepstore_concurrent_requests",
+               "arvados_keepstore_max_concurrent_requests",
+               "arvados_keepstore_trash_queue_inprogress_entries",
+               "arvados_keepstore_trash_queue_pending_entries",
+               "request_duration_seconds",
+               "time_to_status_seconds",
+       }
+       for _, m := range metricsNames {
+               _, ok := names[m]
+               c.Check(ok, check.Equals, true)
+       }
 }
 
 func (s *MountsSuite) call(method, path, tok string, body []byte) *httptest.ResponseRecorder {
index e4a5865a43dd10d20215eb9cebd13bd6fe2271b9..69ed6d2ff5f1f8d80bd6c6e6ebe7d75f7e4ff259 100644 (file)
@@ -39,7 +39,7 @@ func getWithPipe(ctx context.Context, loc string, buf []byte, br BlockReader) (i
        }
 }
 
-// putWithPipe invokes putter with a new pipe, and and copies data
+// putWithPipe invokes putter with a new pipe, and copies data
 // from buf into the pipe. If ctx is done before all data is copied,
 // putWithPipe closes the pipe with an error, and returns early with
 // an error.
index 6e720b8499f366c5d931729047de7a3b1b632faf..6c22d1d32aa2f0745a2cc424cdfeef4d4d76ca75 100644 (file)
@@ -20,6 +20,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/auth"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -100,15 +101,16 @@ func (s *ProxyRemoteSuite) SetUpTest(c *check.C) {
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
        theConfig.blobSigningKey = []byte(knownKey)
-       theConfig.Start()
-       s.rtr = MakeRESTRouter(s.cluster)
+       r := prometheus.NewRegistry()
+       theConfig.Start(r)
+       s.rtr = MakeRESTRouter(s.cluster, r)
 }
 
 func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
        s.vm.Close()
        KeepVM = nil
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
        s.remoteAPI.Close()
        s.remoteKeepproxy.Close()
 }
index 7b5077c1a7f70dad794e14fc148ae1da7fc60d04..8e667e048f47ff3f3ac91df65c960ac94511e8b3 100644 (file)
@@ -14,6 +14,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        . "gopkg.in/check.v1"
 )
 
@@ -58,7 +59,7 @@ func (s *PullWorkerTestSuite) TearDownTest(c *C) {
        pullq = nil
        teardown()
        theConfig = DefaultConfig()
-       theConfig.Start()
+       theConfig.Start(prometheus.NewRegistry())
 }
 
 var firstPullList = []byte(`[
index fb978fe2ba41fbdf9895c0c718d2ca6c925d5f9c..4c39dcd5c4f12fc9a8b8ad36d165545af952fb7a 100644 (file)
@@ -25,6 +25,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "github.com/AdRoll/goamz/aws"
        "github.com/AdRoll/goamz/s3"
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 const (
@@ -198,7 +199,7 @@ func (*S3Volume) Type() string {
 
 // Start populates private fields and verifies the configuration is
 // valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(vm *volumeMetricsVecs) error {
        region, ok := aws.Regions[v.Region]
        if v.Endpoint == "" {
                if !ok {
@@ -248,6 +249,10 @@ func (v *S3Volume) Start() error {
                        Name: v.Bucket,
                },
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        return nil
 }
 
@@ -929,6 +934,7 @@ func (lister *s3Lister) Error() error {
 }
 
 func (lister *s3Lister) getPage() {
+       lister.Stats.TickOps("list")
        lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
        resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
        lister.nextMarker = ""
@@ -965,6 +971,7 @@ type s3bucket struct {
 
 func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
        rdr, err := b.Bucket.GetReader(path)
+       b.stats.TickOps("get")
        b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
        b.stats.TickErr(err)
        return NewCountingReader(rdr, b.stats.TickInBytes), err
@@ -972,6 +979,7 @@ func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
 
 func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
        resp, err := b.Bucket.Head(path, headers)
+       b.stats.TickOps("head")
        b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
        b.stats.TickErr(err)
        return resp, err
@@ -990,6 +998,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
                r = NewCountingReader(r, b.stats.TickOutBytes)
        }
        err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+       b.stats.TickOps("put")
        b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
        b.stats.TickErr(err)
        return err
@@ -997,6 +1006,7 @@ func (b *s3bucket) PutReader(path string, r io.Reader, length int64, contType st
 
 func (b *s3bucket) Del(path string) error {
        err := b.Bucket.Del(path)
+       b.stats.TickOps("delete")
        b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
        b.stats.TickErr(err)
        return err
index 10c71125df39acb3feadc4e69e4d2190d53a10fe..6377420ff4b381cba49b07d2813fb4803f03aa62 100644 (file)
@@ -20,6 +20,7 @@ import (
        "github.com/AdRoll/goamz/s3"
        "github.com/AdRoll/goamz/s3/s3test"
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -170,7 +171,8 @@ func (s *StubbedS3Suite) testContextCancel(c *check.C, testFunc func(context.Con
        vol := *v.S3Volume
        vol.Endpoint = srv.URL
        v = &TestableS3Volume{S3Volume: &vol}
-       v.Start()
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       v.Start(metrics)
 
        ctx, cancel := context.WithCancel(context.Background())
 
@@ -430,7 +432,8 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, raceWindow time.Duration,
                server:      srv,
                serverClock: clock,
        }
-       v.Start()
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       v.Start(metrics)
        err = v.bucket.PutBucket(s3.ACL("private"))
        c.Assert(err, check.IsNil)
        return v
@@ -448,7 +451,7 @@ Volumes:
        c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
 }
 
-func (v *TestableS3Volume) Start() error {
+func (v *TestableS3Volume) Start(vm *volumeMetricsVecs) error {
        tmp, err := ioutil.TempFile("", "keepstore")
        v.c.Assert(err, check.IsNil)
        defer os.Remove(tmp.Name())
@@ -459,7 +462,7 @@ func (v *TestableS3Volume) Start() error {
        v.S3Volume.AccessKeyFile = tmp.Name()
        v.S3Volume.SecretKeyFile = tmp.Name()
 
-       v.c.Assert(v.S3Volume.Start(), check.IsNil)
+       v.c.Assert(v.S3Volume.Start(vm), check.IsNil)
        return nil
 }
 
@@ -490,3 +493,7 @@ func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
 func (v *TestableS3Volume) Teardown() {
        v.server.Quit()
 }
+
+func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
+       return "get", "put"
+}
index 377a53675783b890fa7863dd98ea50681074697b..342b9e32058e23a1f09fc305d8fdc37caf104198 100644 (file)
@@ -7,6 +7,8 @@ package main
 import (
        "sync"
        "sync/atomic"
+
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type statsTicker struct {
@@ -16,6 +18,10 @@ type statsTicker struct {
 
        ErrorCodes map[string]uint64 `json:",omitempty"`
        lock       sync.Mutex
+
+       opsCounters *prometheus.CounterVec
+       errCounters *prometheus.CounterVec
+       ioBytes     *prometheus.CounterVec
 }
 
 // Tick increments each of the given counters by 1 using
@@ -41,14 +47,33 @@ func (s *statsTicker) TickErr(err error, errType string) {
        }
        s.ErrorCodes[errType]++
        s.lock.Unlock()
+       if s.errCounters != nil {
+               s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
+       }
 }
 
 // TickInBytes increments the incoming byte counter by n.
 func (s *statsTicker) TickInBytes(n uint64) {
+       if s.ioBytes != nil {
+               s.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(n))
+       }
        atomic.AddUint64(&s.InBytes, n)
 }
 
 // TickOutBytes increments the outgoing byte counter by n.
 func (s *statsTicker) TickOutBytes(n uint64) {
+       if s.ioBytes != nil {
+               s.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
+       }
        atomic.AddUint64(&s.OutBytes, n)
 }
+
+// TickOps increments the counter of the listed operations by 1.
+func (s *statsTicker) TickOps(operations ...string) {
+       if s.opsCounters == nil {
+               return
+       }
+       for _, opType := range operations {
+               s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
+       }
+}
similarity index 96%
rename from services/keepstore/volume_unix.go
rename to services/keepstore/unix_volume.go
index 23d675359244942097072d88e1bd98daf9d46c6c..96f458720d38b56b97fa51fd63e76faa798987bf 100644 (file)
@@ -21,6 +21,8 @@ import (
        "sync/atomic"
        "syscall"
        "time"
+
+       "github.com/prometheus/client_golang/prometheus"
 )
 
 type unixVolumeAdder struct {
@@ -28,7 +30,7 @@ type unixVolumeAdder struct {
 }
 
 // String implements flag.Value
-func (s *unixVolumeAdder) String() string {
+func (vs *unixVolumeAdder) String() string {
        return "-"
 }
 
@@ -218,7 +220,7 @@ func (v *UnixVolume) Type() string {
 }
 
 // Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(vm *volumeMetricsVecs) error {
        if v.Serialize {
                v.locker = &sync.Mutex{}
        }
@@ -228,7 +230,12 @@ func (v *UnixVolume) Start() error {
        if v.DirectoryReplication == 0 {
                v.DirectoryReplication = 1
        }
+       // Set up prometheus metrics
+       lbls := prometheus.Labels{"device_id": v.DeviceID()}
+       v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
        _, err := v.os.Stat(v.Root)
+
        return err
 }
 
@@ -252,6 +259,7 @@ func (v *UnixVolume) Touch(loc string) error {
        }
        defer v.unlockfile(f)
        ts := syscall.NsecToTimespec(time.Now().UnixNano())
+       v.os.stats.TickOps("utimes")
        v.os.stats.Tick(&v.os.stats.UtimesOps)
        err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
        v.os.stats.TickErr(err)
@@ -339,7 +347,7 @@ func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
        return putWithPipe(ctx, loc, block, v)
 }
 
-// ReadBlock implements BlockWriter.
+// WriteBlock implements BlockWriter.
 func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
        if v.ReadOnly {
                return MethodDisabledError
@@ -439,6 +447,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                return err
        }
        defer rootdir.Close()
+       v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        for {
                names, err := rootdir.Readdirnames(1)
@@ -461,6 +470,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        lastErr = err
                        continue
                }
+               v.os.stats.TickOps("readdir")
                v.os.stats.Tick(&v.os.stats.ReaddirOps)
                for {
                        fileInfo, err := blockdir.Readdir(1)
@@ -549,6 +559,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) {
                return MethodDisabledError
        }
 
+       v.os.stats.TickOps("readdir")
        v.os.stats.Tick(&v.os.stats.ReaddirOps)
        files, err := ioutil.ReadDir(v.blockDir(loc))
        if err != nil {
@@ -695,6 +706,7 @@ func (v *UnixVolume) unlock() {
 
 // lockfile and unlockfile use flock(2) to manage kernel file locks.
 func (v *UnixVolume) lockfile(f *os.File) error {
+       v.os.stats.TickOps("flock")
        v.os.stats.Tick(&v.os.stats.FlockOps)
        err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
        v.os.stats.TickErr(err)
@@ -813,6 +825,7 @@ type osWithStats struct {
 }
 
 func (o *osWithStats) Open(name string) (*os.File, error) {
+       o.stats.TickOps("open")
        o.stats.Tick(&o.stats.OpenOps)
        f, err := os.Open(name)
        o.stats.TickErr(err)
@@ -820,6 +833,7 @@ func (o *osWithStats) Open(name string) (*os.File, error) {
 }
 
 func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+       o.stats.TickOps("open")
        o.stats.Tick(&o.stats.OpenOps)
        f, err := os.OpenFile(name, flag, perm)
        o.stats.TickErr(err)
@@ -827,6 +841,7 @@ func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.Fil
 }
 
 func (o *osWithStats) Remove(path string) error {
+       o.stats.TickOps("unlink")
        o.stats.Tick(&o.stats.UnlinkOps)
        err := os.Remove(path)
        o.stats.TickErr(err)
@@ -834,6 +849,7 @@ func (o *osWithStats) Remove(path string) error {
 }
 
 func (o *osWithStats) Rename(a, b string) error {
+       o.stats.TickOps("rename")
        o.stats.Tick(&o.stats.RenameOps)
        err := os.Rename(a, b)
        o.stats.TickErr(err)
@@ -841,6 +857,7 @@ func (o *osWithStats) Rename(a, b string) error {
 }
 
 func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+       o.stats.TickOps("stat")
        o.stats.Tick(&o.stats.StatOps)
        fi, err := os.Stat(path)
        o.stats.TickErr(err)
@@ -848,6 +865,7 @@ func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
 }
 
 func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+       o.stats.TickOps("create")
        o.stats.Tick(&o.stats.CreateOps)
        f, err := ioutil.TempFile(dir, base)
        o.stats.TickErr(err)
similarity index 97%
rename from services/keepstore/volume_unix_test.go
rename to services/keepstore/unix_volume_test.go
index 7f1cd219644ab241f2c0a8a0e2353c8f4c16844f..872f408cf8cd68571705d240cb6b6184fce70a1d 100644 (file)
@@ -20,6 +20,7 @@ import (
        "time"
 
        "github.com/ghodss/yaml"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -73,6 +74,10 @@ func (v *TestableUnixVolume) Teardown() {
        }
 }
 
+func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
+       return "open", "create"
+}
+
 // serialize = false; readonly = false
 func TestUnixVolumeWithGenericTests(t *testing.T) {
        DoGenericVolumeTests(t, func(t TB) TestableVolume {
@@ -115,7 +120,8 @@ func TestReplicationDefault1(t *testing.T) {
                Root:     "/",
                ReadOnly: true,
        }
-       if err := v.Start(); err != nil {
+       metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+       if err := v.Start(metrics); err != nil {
                t.Error(err)
        }
        if got := v.Replication(); got != 1 {
index 6bce05bec033fbda6c759b6b4266bcbff0f3e051..52b9b1b244c0a7032c66a2ea12b8d867c2384940 100644 (file)
@@ -39,7 +39,7 @@ type Volume interface {
        // Do whatever private setup tasks and configuration checks
        // are needed. Return non-nil if the volume is unusable (e.g.,
        // invalid config).
-       Start() error
+       Start(vm *volumeMetricsVecs) error
 
        // Get a block: copy the block data into buf, and return the
        // number of bytes copied.
index 23a17fd0998ebff4b8b3781dcf590534e88bc8a0..d5a413693f6c46c1d8241838a1ea87581191f9a4 100644 (file)
@@ -18,6 +18,8 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "github.com/prometheus/client_golang/prometheus"
+       dto "github.com/prometheus/client_model/go"
 )
 
 type TB interface {
@@ -75,6 +77,8 @@ func DoGenericVolumeTests(t TB, factory TestableVolumeFactory) {
 
        testStatus(t, factory)
 
+       testMetrics(t, factory)
+
        testString(t, factory)
 
        testUpdateReadOnly(t, factory)
@@ -533,6 +537,84 @@ func testStatus(t TB, factory TestableVolumeFactory) {
        }
 }
 
+func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
+       c, _ := cv.GetMetricWith(lbls)
+       pb := &dto.Metric{}
+       c.Write(pb)
+       return pb.GetCounter().GetValue()
+}
+
+func testMetrics(t TB, factory TestableVolumeFactory) {
+       var err error
+
+       v := factory(t)
+       defer v.Teardown()
+       reg := prometheus.NewRegistry()
+       vm := newVolumeMetricsVecs(reg)
+
+       err = v.Start(vm)
+       if err != nil {
+               t.Error("Failed Start(): ", err)
+       }
+       opsC, _, ioC := vm.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
+
+       if ioC == nil {
+               t.Error("ioBytes CounterVec is nil")
+               return
+       }
+
+       if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
+               getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
+               t.Error("ioBytes counter should be zero")
+       }
+
+       if opsC == nil {
+               t.Error("opsCounter CounterVec is nil")
+               return
+       }
+
+       var c, writeOpCounter, readOpCounter float64
+
+       readOpType, writeOpType := v.ReadWriteOperationLabelValues()
+       writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
+       readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
+
+       // Test Put if volume is writable
+       if v.Writable() {
+               err = v.Put(context.Background(), TestHash, TestBlock)
+               if err != nil {
+                       t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+               }
+               // Check that the write operations counter increased
+               c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
+               if c <= writeOpCounter {
+                       t.Error("Operation(s) not counted on Put")
+               }
+               // Check that bytes counter is > 0
+               if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
+                       t.Error("ioBytes{direction=out} counter shouldn't be zero")
+               }
+       } else {
+               v.PutRaw(TestHash, TestBlock)
+       }
+
+       buf := make([]byte, BlockSize)
+       _, err = v.Get(context.Background(), TestHash, buf)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       // Check that the operations counter increased
+       c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
+       if c <= readOpCounter {
+               t.Error("Operation(s) not counted on Get")
+       }
+       // Check that the bytes "in" counter is > 0
+       if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
+               t.Error("ioBytes{direction=in} counter shouldn't be zero")
+       }
+}
+
 // Invoke String for the volume; expect non-empty result
 // Test should pass for both writable and read-only volumes
 func testString(t TB, factory TestableVolumeFactory) {
index 046f3fac2e0c8c27081c22fea69a0aae7f02acda..0b8af330fb2d86f771926f07f5f38a34cf09b8ef 100644 (file)
@@ -22,10 +22,14 @@ import (
 // impractical to achieve with a sequence of normal Volume operations.
 type TestableVolume interface {
        Volume
+
        // [Over]write content for a locator with the given data,
        // bypassing all constraints like readonly and serialize.
        PutRaw(locator string, data []byte)
 
+       // Returns the strings that a driver uses to record read/write operations.
+       ReadWriteOperationLabelValues() (r, w string)
+
        // Specify the value Mtime() should return, until the next
        // call to Touch, TouchWithDate, or Put.
        TouchWithDate(locator string, lastPut time.Time)
@@ -212,7 +216,7 @@ func (v *MockVolume) Type() string {
        return "Mock"
 }
 
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(vm *volumeMetricsVecs) error {
        return nil
 }