# The Arvados crunchstat-summary tool
fpm_build_virtualenv "crunchstat-summary" "tools/crunchstat-summary"
+# The cwltest package, which lives out of tree
+cd "$WORKSPACE"
+if [[ -e "$WORKSPACE/cwltest" ]]; then
+ rm -rf "$WORKSPACE/cwltest"
+fi
+git clone https://github.com/common-workflow-language/cwltest.git
+# signal to our build script that we want a cwltest executable installed in /usr/bin/
+mkdir cwltest/bin && touch cwltest/bin/cwltest
+fpm_build_virtualenv "cwltest" "cwltest"
+rm -rf "$WORKSPACE/cwltest"
+
# Build the API server package
test_rails_package_presence arvados-api-server "$WORKSPACE/services/api"
if [[ "$?" == "0" ]]; then
# Get the list of packages from the repos
if [[ "$FORMAT" == "deb" ]]; then
- debian_distros="jessie precise stretch trusty wheezy xenial bionic"
-
- for D in ${debian_distros}; do
- if [ ${pkgname:0:3} = "lib" ]; then
- repo_subdir=${pkgname:0:4}
- else
- repo_subdir=${pkgname:0:1}
- fi
+ declare -A dd
+ dd[debian8]=jessie
+ dd[debian9]=stretch
+ dd[debian10]=buster
+ dd[ubuntu1404]=trusty
+ dd[ubuntu1604]=xenial
+ dd[ubuntu1804]=bionic
+ D=${dd[$TARGET]}
+ if [ ${pkgname:0:3} = "lib" ]; then
+ repo_subdir=${pkgname:0:4}
+ else
+ repo_subdir=${pkgname:0:1}
+ fi
- repo_pkg_list=$(curl -s -o - http://apt.arvados.org/pool/${D}/main/${repo_subdir}/)
- echo ${repo_pkg_list} |grep -q ${complete_pkgname}
- if [ $? -eq 0 ] ; then
- echo "Package $complete_pkgname exists, not rebuilding!"
- curl -s -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
- return 1
- elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
- echo "Package $complete_pkgname exists, not rebuilding!"
- return 1
- else
- echo "Package $complete_pkgname not found, building"
- return 0
- fi
- done
+ repo_pkg_list=$(curl -s -o - http://apt.arvados.org/pool/${D}/main/${repo_subdir}/)
+ echo ${repo_pkg_list} |grep -q ${complete_pkgname}
+ if [ $? -eq 0 ] ; then
+ echo "Package $complete_pkgname exists, not rebuilding!"
+ curl -s -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
+ return 1
+ elif test -f "$WORKSPACE/packages/$TARGET/processed/${complete_pkgname}" ; then
+ echo "Package $complete_pkgname exists, not rebuilding!"
+ return 1
+ else
+ echo "Package $complete_pkgname not found, building"
+ return 0
+ fi
else
centos_repo="http://rpm.arvados.org/CentOS/7/dev/x86_64/"
echo " $pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
exit 1
fi
- if ! $python setup.py $DASHQ_UNLESS_DEBUG sdist; then
+ # filter a useless warning (when building the cwltest package) from the stderr output
+ if ! $python setup.py $DASHQ_UNLESS_DEBUG sdist 2> >(grep -v 'warning: no previously-included files matching'); then
echo "Error, unable to run $python setup.py sdist for $PKG"
exit 1
fi
rm -rf build
rm -f $PYTHON_PKG*deb
-
+ echo "virtualenv version: `virtualenv --version`"
virtualenv_command="virtualenv --python `which $python` $DASHQ_UNLESS_DEBUG build/usr/share/$python/dist/$PYTHON_PKG"
if ! $virtualenv_command; then
echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U pip"
exit 1
fi
+ echo "pip version: `build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip --version`"
+
if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools; then
echo "Error, unable to upgrade setuptools with"
echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U setuptools"
exit 1
fi
+ echo "setuptools version: `build/usr/share/$python/dist/$PYTHON_PKG/bin/$python -c 'import setuptools; print(setuptools.__version__)'`"
+
if ! build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel; then
echo "Error, unable to upgrade wheel with"
echo " build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG -U wheel"
exit 1
fi
+ echo "wheel version: `build/usr/share/$python/dist/$PYTHON_PKG/bin/wheel version`"
if [[ "$TARGET" != "centos7" ]] || [[ "$PYTHON_PKG" != "python-arvados-fuse" ]]; then
build/usr/share/$python/dist/$PYTHON_PKG/bin/$pip install $DASHQ_UNLESS_DEBUG $CACHE_FLAG $PACKAGE_PATH
table(table table-bordered table-condensed).
|_. Argument |_. Type |_. Description |_. Location |_. Example |
|group|object||query||
+|async|boolean (default false)|Defer the permissions graph update by a configured number of seconds. (By default, @async_permissions_update_interval@ is 20 seconds). On success, the response is 202 (Accepted).|query|@true@|
h3. delete
|_. Argument |_. Type |_. Description |_. Location |_. Example |
{background:#ccffcc}.|uuid|string|The UUID of the Group in question.|path||
|group|object||query||
+|async|boolean (default false)|Defer the permissions graph update by a configured number of seconds. (By default, @async_permissions_update_interval@ is 20 seconds). On success, the response is 202 (Accepted).|query|@true@|
h3. untrash
# SPDX-License-Identifier: Apache-2.0
# Based on Debian Stretch
-FROM debian:stretch
+FROM debian:stretch-slim
MAINTAINER Ward Vandewege <wvandewege@veritasgenetics.com>
ENV DEBIAN_FRONTEND noninteractive
}
func (si stubInstance) Tags() cloud.InstanceTags {
- return si.tags
+ // Return a copy to ensure a caller can't change our saved
+ // tags just by writing to the returned map.
+ return copyTags(si.tags)
}
func (si stubInstance) String() string {
pool.SetIdleBehavior(heldInstanceID, IdleBehaviorHold)
}
}
+ // Wait for the tags to save to the cloud provider
+ deadline := time.Now().Add(time.Second)
+ for !func() bool {
+ pool.mtx.RLock()
+ defer pool.mtx.RUnlock()
+ for _, wkr := range pool.workers {
+ if wkr.instType == type2 {
+ return wkr.instance.Tags()[tagKeyIdleBehavior] == string(IdleBehaviorHold)
+ }
+ }
+ return false
+ }() {
+ if time.Now().After(deadline) {
+ c.Fatal("timeout")
+ }
+ time.Sleep(time.Millisecond * 10)
+ }
pool.Stop()
c.Log("------- starting new pool, waiting to recover state")
'--no-pull', action='store_false', dest='pull',
help="Use locally installed image only, don't pull image from Docker registry (default)")
-keepdocker_parser.add_argument(
- 'image', nargs='?',
- help="Docker image to upload: repo, repo:tag, or hash")
-keepdocker_parser.add_argument(
- 'tag', nargs='?',
- help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
-
# Combine keepdocker options listed above with run_opts options of arv-put.
# The options inherited from arv-put include --name, --project-uuid,
# --progress/--no-progress/--batch-progress and --resume/--no-resume.
description="Upload or list Docker images in Arvados",
parents=[keepdocker_parser, arv_put.run_opts, arv_cmd.retry_opt])
+arg_parser.add_argument(
+ 'image', nargs='?',
+ help="Docker image to upload: repo, repo:tag, or hash")
+arg_parser.add_argument(
+ 'tag', nargs='?',
+ help="Tag of the Docker image to upload (default 'latest'), if image is given as an untagged repo name")
+
class DockerError(Exception):
pass
# Call arv-put with switches we inherited from it
# (a.k.a., switches that aren't our own).
+ if arguments is None:
+ arguments = sys.argv[1:]
+ arguments = [i for i in arguments if i not in (args.image, args.tag, image_repo_tag)]
put_args = keepdocker_parser.parse_known_args(arguments)[1]
if args.name is None:
overall throughput.
""")
+upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
+ action='append', help="""
+Exclude files and directories whose names match the given glob pattern. When
+using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
+directory, relative to the provided input dirs will be excluded.
+When using a filename pattern like '*.txt', any text file will be excluded
+no matter where it is placed.
+For the special case of needing to exclude only files or dirs directly below
+the given input directory, you can use a pattern like './exclude_this.gif'.
+You can specify multiple patterns by using this argument more than once.
+""")
+
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--follow-links', action='store_true', default=True,
+ dest='follow_links', help="""
+Follow file and directory symlinks (default).
+""")
+_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+ help="""
+Do not follow file and directory symlinks.
+""")
+
+
run_opts = argparse.ArgumentParser(add_help=False)
run_opts.add_argument('--project-uuid', metavar='UUID', help="""
Save the collection with the specified name.
""")
-run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
- action='append', help="""
-Exclude files and directories whose names match the given glob pattern. When
-using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
-directory, relative to the provided input dirs will be excluded.
-When using a filename pattern like '*.txt', any text file will be excluded
-no matter where is placed.
-For the special case of needing to exclude only files or dirs directly below
-the given input directory, you can use a pattern like './exclude_this.gif'.
-You can specify multiple patterns by using this argument more than once.
-""")
-
_group = run_opts.add_mutually_exclusive_group()
_group.add_argument('--progress', action='store_true',
help="""
Do not continue interrupted uploads from cached state.
""")
-_group = run_opts.add_mutually_exclusive_group()
-_group.add_argument('--follow-links', action='store_true', default=True,
- dest='follow_links', help="""
-Follow file and directory symlinks (default).
-""")
-_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
- help="""
-Do not follow file and directory symlinks.
-""")
-
_group = run_opts.add_mutually_exclusive_group()
_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
help="""
send_error("Path not found", status: 404)
end
+ def render_accepted
+ send_json ({accepted: true}), status: 202
+ end
+
protected
def send_error(*args)
params
end
+ def self._create_requires_parameters
+ super.merge(
+ {
+ async: {
+ required: false,
+ type: 'boolean',
+ location: 'query',
+ default: false,
+ description: 'defer permissions update'
+ }
+ }
+ )
+ end
+
+ def self._update_requires_parameters
+ super.merge(
+ {
+ async: {
+ required: false,
+ type: 'boolean',
+ location: 'query',
+ default: false,
+ description: 'defer permissions update'
+ }
+ }
+ )
+ end
+
+ def create
+ if params[:async]
+ @object = model_class.new(resource_attrs.merge({async_permissions_update: true}))
+ @object.save!
+ render_accepted
+ else
+ super
+ end
+ end
+
+ def update
+ if params[:async]
+ attrs_to_update = resource_attrs.reject { |k, v|
+ [:kind, :etag, :href].index k
+ }.merge({async_permissions_update: true})
+ @object.update_attributes!(attrs_to_update)
+ @object.save!
+ render_accepted
+ else
+ super
+ end
+ end
+
def render_404_if_no_object
if params[:action] == 'contents'
if !params[:uuid]
class_name: 'Link',
primary_key: :uuid)
+ # If async is true at create or update, permission graph
+ # update is deferred allowing making multiple calls without the performance
+ # penalty.
+ attr_accessor :async_permissions_update
+
class PermissionDeniedError < RequestError
def http_status
403
def invalidate_permissions_cache
# Ensure a new group can be accessed by the appropriate users
# immediately after being created.
- User.invalidate_permissions_cache db_current_time.to_i
+ User.invalidate_permissions_cache self.async_permissions_update
end
def assign_name
# permissions for head_uuid and tail_uuid, and invalidate the
# cache for only those users. (This would require a browseable
# cache.)
- User.invalidate_permissions_cache db_current_time.to_i
+ User.invalidate_permissions_cache
end
end
true
end
- def self.invalidate_permissions_cache(timestamp=nil)
- if Rails.configuration.async_permissions_update
- timestamp = DbCurrentTime::db_current_time.to_i if timestamp.nil?
- connection.execute "NOTIFY invalidate_permissions_cache, '#{timestamp}'"
- else
- refresh_permission_view
- end
+ def self.invalidate_permissions_cache(async=false)
+ refresh_permission_view(async)
end
- def invalidate_permissions_cache(timestamp=nil)
+ def invalidate_permissions_cache
User.invalidate_permissions_cache
end
# arrived, and deleted if their delete_at time has arrived.
trash_sweep_interval: 60
+ # Interval (seconds) between asynchronous permission view updates. Any
+ # permission-updating API called with the 'async' parameter schedules a an
+ # update on the permission view in the future, if not already scheduled.
+ async_permissions_update_interval: 20
+
# Maximum characters of (JSON-encoded) query parameters to include
# in each request log entry. When params exceed this size, they will
# be JSON-encoded, truncated to this size, and logged as
# (included in vendor packages).
package_version: false
- # Enable asynchronous permission graph rebuild. Must run
- # script/permission-updater.rb as a separate process. When the permission
- # cache is invalidated, the background process will update the permission
- # graph cache. This feature is experimental!
- async_permissions_update: false
-
# Default value for container_count_max for container requests. This is the
# number of times Arvados will create a new container to satisfy a container
# request. If a container is cancelled it will retry a new container if
PERMISSION_VIEW = "materialized_permission_view"
-def refresh_permission_view
+def do_refresh_permission_view
ActiveRecord::Base.transaction do
ActiveRecord::Base.connection.execute("LOCK TABLE permission_refresh_lock")
ActiveRecord::Base.connection.execute("REFRESH MATERIALIZED VIEW #{PERMISSION_VIEW}")
end
end
+
+def refresh_permission_view(async=false)
+ if async and Rails.configuration.async_permissions_update_interval > 0
+ exp = Rails.configuration.async_permissions_update_interval.seconds
+ need = false
+ Rails.cache.fetch('AsyncRefreshPermissionView', expires_in: exp) do
+ need = true
+ end
+ if need
+ # Schedule a new permission update and return immediately
+ Thread.new do
+ Thread.current.abort_on_exception = false
+ begin
+ sleep(exp)
+ Rails.cache.delete('AsyncRefreshPermissionView')
+ do_refresh_permission_view
+ rescue => e
+ Rails.logger.error "Updating permission view: #{e}\n#{e.backtrace.join("\n\t")}"
+ ensure
+ ActiveRecord::Base.connection.close
+ end
+ end
+ true
+ end
+ else
+ do_refresh_permission_view
+ end
+end
+++ /dev/null
-#!/usr/bin/env ruby
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
-require File.dirname(__FILE__) + '/../config/boot'
-require File.dirname(__FILE__) + '/../config/environment'
-include DbCurrentTime
-
-def update_permissions
- timestamp = DbCurrentTime::db_current_time.to_i
- Rails.logger.info "Begin updating permission cache"
- User.all.each do |u|
- u.calculate_group_permissions
- end
- Rails.cache.write "last_updated_permissions", timestamp
- Rails.logger.info "Permission cache updated"
-end
-
-ActiveRecord::Base.connection_pool.with_connection do |connection|
- conn = connection.instance_variable_get(:@connection)
- begin
- conn.async_exec "LISTEN invalidate_permissions_cache"
-
- # Initial refresh of permissions graph
- update_permissions
-
- while true
- # wait_for_notify will block until there is a change
- # notification from Postgres about the permission cache,
- # and then rebuild the permission cache.
- conn.wait_for_notify do |channel, pid, payload|
- last_updated = Rails.cache.read("last_updated_permissions")
- Rails.logger.info "Got notify #{payload} last update #{last_updated}"
- if last_updated.nil? || last_updated.to_i <= payload.to_i
- update_permissions
- end
- end
- end
- ensure
- # Don't want the connection to still be listening once we return
- # it to the pool - could result in weird behavior for the next
- # thread to check it out.
- conn.async_exec "UNLISTEN *"
- end
-end
assert_includes coll_uuids, collections(:foo_collection_in_aproject).uuid
assert_not_includes coll_uuids, collections(:expired_collection).uuid
end
+
+ test "create request with async=true defers permissions update" do
+ Rails.configuration.async_permissions_update_interval = 1 # seconds
+ name = "Random group #{rand(1000)}"
+ assert_equal nil, Group.find_by_name(name)
+ post "/arvados/v1/groups", {
+ group: {
+ name: name
+ },
+ async: true
+ }, auth(:active)
+ assert_response 202
+ g = Group.find_by_name(name)
+ assert_not_nil g
+ get "/arvados/v1/groups", {
+ filters: [["name", "=", name]].to_json,
+ limit: 10
+ }, auth(:active)
+ assert_response 200
+ assert_equal 0, json_response['items_available']
+
+ # Unblock the thread doing the permissions update
+ ActiveRecord::Base.clear_active_connections!
+
+ sleep(3)
+ get "/arvados/v1/groups", {
+ filters: [["name", "=", name]].to_json,
+ limit: 10
+ }, auth(:active)
+ assert_response 200
+ assert_equal 1, json_response['items_available']
+ end
end
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/Azure/azure-sdk-for-go/storage"
+ "github.com/prometheus/client_golang/prometheus"
)
const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
}
// Start implements Volume.
-func (v *AzureBlobVolume) Start() error {
+func (v *AzureBlobVolume) Start(vm *volumeMetricsVecs) error {
if v.ContainerName == "" {
return errors.New("no container name given")
}
} else if !ok {
return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
return nil
}
}
func (c *azureContainer) Exists() (bool, error) {
+ c.stats.TickOps("exists")
c.stats.Tick(&c.stats.Ops)
ok, err := c.ctr.Exists()
c.stats.TickErr(err)
}
func (c *azureContainer) GetBlobMetadata(bname string) (storage.BlobMetadata, error) {
+ c.stats.TickOps("get_metadata")
c.stats.Tick(&c.stats.Ops, &c.stats.GetMetadataOps)
b := c.ctr.GetBlobReference(bname)
err := b.GetMetadata(nil)
}
func (c *azureContainer) GetBlobProperties(bname string) (*storage.BlobProperties, error) {
+ c.stats.TickOps("get_properties")
c.stats.Tick(&c.stats.Ops, &c.stats.GetPropertiesOps)
b := c.ctr.GetBlobReference(bname)
err := b.GetProperties(nil)
}
func (c *azureContainer) GetBlob(bname string) (io.ReadCloser, error) {
+ c.stats.TickOps("get")
c.stats.Tick(&c.stats.Ops, &c.stats.GetOps)
b := c.ctr.GetBlobReference(bname)
rdr, err := b.Get(nil)
}
func (c *azureContainer) GetBlobRange(bname string, start, end int, opts *storage.GetBlobOptions) (io.ReadCloser, error) {
+ c.stats.TickOps("get_range")
c.stats.Tick(&c.stats.Ops, &c.stats.GetRangeOps)
b := c.ctr.GetBlobReference(bname)
rdr, err := b.GetRange(&storage.GetBlobRangeOptions{
}
func (c *azureContainer) CreateBlockBlobFromReader(bname string, size int, rdr io.Reader, opts *storage.PutBlobOptions) error {
+ c.stats.TickOps("create")
c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
if size != 0 {
rdr = &readerWithAzureLen{
}
func (c *azureContainer) SetBlobMetadata(bname string, m storage.BlobMetadata, opts *storage.SetBlobMetadataOptions) error {
+ c.stats.TickOps("set_metadata")
c.stats.Tick(&c.stats.Ops, &c.stats.SetMetadataOps)
b := c.ctr.GetBlobReference(bname)
b.Metadata = m
}
func (c *azureContainer) ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error) {
+ c.stats.TickOps("list")
c.stats.Tick(&c.stats.Ops, &c.stats.ListOps)
resp, err := c.ctr.ListBlobs(params)
c.stats.TickErr(err)
}
func (c *azureContainer) DeleteBlob(bname string, opts *storage.DeleteBlobOptions) error {
+ c.stats.TickOps("delete")
c.stats.Tick(&c.stats.Ops, &c.stats.DelOps)
b := c.ctr.GetBlobReference(bname)
err := b.Delete(opts)
"github.com/Azure/azure-sdk-for-go/storage"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
v.azStub.Close()
}
+func (v *TestableAzureBlobVolume) ReadWriteOperationLabelValues() (r, w string) {
+ return "get", "create"
+}
+
+func (v *TestableAzureBlobVolume) DeviceID() string {
+ // Dummy device id for testing purposes
+ return "azure://azure_blob_volume_test"
+}
+
+func (v *TestableAzureBlobVolume) Start(vm *volumeMetricsVecs) error {
+ // Override original Start() to be able to assign CounterVecs with a dummy DeviceID
+ v.container.stats.opsCounters, v.container.stats.errCounters, v.container.stats.ioBytes = vm.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
+ return nil
+}
+
func makeEtag() string {
return fmt.Sprintf("0x%x", rand.Int63())
}
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
// Start should be called exactly once: after setting all public
// fields, and before using the config.
-func (cfg *Config) Start() error {
+func (cfg *Config) Start(reg *prometheus.Registry) error {
if cfg.Debug {
log.Level = logrus.DebugLevel
cfg.debugLogf = log.Printf
return fmt.Errorf("no volumes found")
}
}
+ vm := newVolumeMetricsVecs(reg)
for _, v := range cfg.Volumes {
- if err := v.Start(); err != nil {
+ if err := v.Start(vm); err != nil {
return fmt.Errorf("volume %s: %s", v, err)
}
log.Printf("Using volume %v (writable=%v)", v, v.Writable())
for _, factory := range VolumeTypes {
t := factory().Type()
if _, ok := typeMap[t]; ok {
- log.Fatal("volume type %+q is claimed by multiple VolumeTypes")
+ log.Fatalf("volume type %+q is claimed by multiple VolumeTypes", t)
}
typeMap[t] = factory
}
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/prometheus/client_golang/prometheus"
)
var testCluster = &arvados.Cluster{
if rt.apiToken != "" {
req.Header.Set("Authorization", "OAuth2 "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter(testCluster)
+ loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
loggingRouter.ServeHTTP(response, req)
return response
}
if rt.apiToken != "" {
req.Header.Set("Authorization", "Bearer "+rt.apiToken)
}
- loggingRouter := MakeRESTRouter(testCluster)
+ loggingRouter := MakeRESTRouter(testCluster, prometheus.NewRegistry())
loggingRouter.ServeHTTP(response, req)
return response
}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- MakeRESTRouter(testCluster).ServeHTTP(resp, req)
+ MakeRESTRouter(testCluster, prometheus.NewRegistry()).ServeHTTP(resp, req)
ok <- struct{}{}
}()
"git.curoverse.com/arvados.git/sdk/go/health"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/gorilla/mux"
+ "github.com/prometheus/client_golang/prometheus"
)
type router struct {
limiter httpserver.RequestCounter
cluster *arvados.Cluster
remoteProxy remoteProxy
+ metrics *nodeMetrics
}
// MakeRESTRouter returns a new router that forwards all Keep requests
// to the appropriate handlers.
-func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
+func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
rtr := &router{
Router: mux.NewRouter(),
cluster: cluster,
+ metrics: &nodeMetrics{reg: reg},
}
rtr.HandleFunc(
rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
+ rtr.metrics.setupBufferPoolMetrics(bufs)
+ rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
+ rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
+ rtr.metrics.setupRequestMetrics(rtr.limiter)
- instrumented := httpserver.Instrument(nil, nil,
+ instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
}
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"github.com/coreos/go-systemd/daemon"
+ "github.com/prometheus/client_golang/prometheus"
)
var version = "dev"
log.Printf("keepstore %s started", version)
- err = theConfig.Start()
+ metricsRegistry := prometheus.NewRegistry()
+
+ err = theConfig.Start(metricsRegistry)
if err != nil {
log.Fatal(err)
}
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
// Middleware/handler stack
- router := MakeRESTRouter(cluster)
+ router := MakeRESTRouter(cluster, metricsRegistry)
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "fmt"
+
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+type nodeMetrics struct {
+ reg *prometheus.Registry
+}
+
+func (m *nodeMetrics) setupBufferPoolMetrics(b *bufferPool) {
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_allocated_bytes",
+ Help: "Number of bytes allocated to buffers",
+ },
+ func() float64 { return float64(b.Alloc()) },
+ ))
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_max_buffers",
+ Help: "Maximum number of buffers allowed",
+ },
+ func() float64 { return float64(b.Cap()) },
+ ))
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "bufferpool_inuse_buffers",
+ Help: "Number of buffers in use",
+ },
+ func() float64 { return float64(b.Len()) },
+ ))
+}
+
+func (m *nodeMetrics) setupWorkQueueMetrics(q *WorkQueue, qName string) {
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: fmt.Sprintf("%s_queue_inprogress_entries", qName),
+ Help: fmt.Sprintf("Number of %s requests in progress", qName),
+ },
+ func() float64 { return float64(getWorkQueueStatus(q).InProgress) },
+ ))
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: fmt.Sprintf("%s_queue_pending_entries", qName),
+ Help: fmt.Sprintf("Number of queued %s requests", qName),
+ },
+ func() float64 { return float64(getWorkQueueStatus(q).Queued) },
+ ))
+}
+
+func (m *nodeMetrics) setupRequestMetrics(rc httpserver.RequestCounter) {
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "concurrent_requests",
+ Help: "Number of requests in progress",
+ },
+ func() float64 { return float64(rc.Current()) },
+ ))
+ m.reg.MustRegister(prometheus.NewGaugeFunc(
+ prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "max_concurrent_requests",
+ Help: "Maximum number of concurrent requests",
+ },
+ func() float64 { return float64(rc.Max()) },
+ ))
+}
+
+type volumeMetricsVecs struct {
+ ioBytes *prometheus.CounterVec
+ errCounters *prometheus.CounterVec
+ opsCounters *prometheus.CounterVec
+}
+
+func newVolumeMetricsVecs(reg *prometheus.Registry) *volumeMetricsVecs {
+ m := &volumeMetricsVecs{}
+ m.opsCounters = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_operations",
+ Help: "Number of volume operations",
+ },
+ []string{"device_id", "operation"},
+ )
+ reg.MustRegister(m.opsCounters)
+ m.errCounters = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_errors",
+ Help: "Number of volume errors",
+ },
+ []string{"device_id", "error_type"},
+ )
+ reg.MustRegister(m.errCounters)
+ m.ioBytes = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "arvados",
+ Subsystem: "keepstore",
+ Name: "volume_io_bytes",
+ Help: "Volume I/O traffic in bytes",
+ },
+ []string{"device_id", "direction"},
+ )
+ reg.MustRegister(m.ioBytes)
+
+ return m
+}
+
+func (vm *volumeMetricsVecs) getCounterVecsFor(lbls prometheus.Labels) (opsCV, errCV, ioCV *prometheus.CounterVec) {
+ opsCV = vm.opsCounters.MustCurryWith(lbls)
+ errCV = vm.errCounters.MustCurryWith(lbls)
+ ioCV = vm.ioBytes.MustCurryWith(lbls)
+ return
+}
"net/http/httptest"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
theConfig.ManagementToken = arvadostest.ManagementToken
- theConfig.Start()
- s.rtr = MakeRESTRouter(testCluster)
+ r := prometheus.NewRegistry()
+ theConfig.Start(r)
+ s.rtr = MakeRESTRouter(testCluster, r)
}
func (s *MountsSuite) TearDownTest(c *check.C) {
s.vm.Close()
KeepVM = nil
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
}
func (s *MountsSuite) TestMounts(c *check.C) {
}
json.NewDecoder(resp.Body).Decode(&j)
found := make(map[string]bool)
+ names := map[string]bool{}
for _, g := range j {
+ names[g.Name] = true
for _, m := range g.Metric {
if len(m.Label) == 2 && m.Label[0].Name == "code" && m.Label[0].Value == "200" && m.Label[1].Name == "method" && m.Label[1].Value == "put" {
c.Check(m.Summary.SampleCount, check.Equals, "2")
}
c.Check(found["request_duration_seconds"], check.Equals, true)
c.Check(found["time_to_status_seconds"], check.Equals, true)
+
+ metricsNames := []string{
+ "arvados_keepstore_bufferpool_inuse_buffers",
+ "arvados_keepstore_bufferpool_max_buffers",
+ "arvados_keepstore_bufferpool_allocated_bytes",
+ "arvados_keepstore_pull_queue_inprogress_entries",
+ "arvados_keepstore_pull_queue_pending_entries",
+ "arvados_keepstore_concurrent_requests",
+ "arvados_keepstore_max_concurrent_requests",
+ "arvados_keepstore_trash_queue_inprogress_entries",
+ "arvados_keepstore_trash_queue_pending_entries",
+ "request_duration_seconds",
+ "time_to_status_seconds",
+ }
+ for _, m := range metricsNames {
+ _, ok := names[m]
+ c.Check(ok, check.Equals, true)
+ }
}
func (s *MountsSuite) call(method, path, tok string, body []byte) *httptest.ResponseRecorder {
}
}
-// putWithPipe invokes putter with a new pipe, and and copies data
+// putWithPipe invokes putter with a new pipe, and copies data
// from buf into the pipe. If ctx is done before all data is copied,
// putWithPipe closes the pipe with an error, and returns early with
// an error.
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/auth"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
theConfig.blobSigningKey = []byte(knownKey)
- theConfig.Start()
- s.rtr = MakeRESTRouter(s.cluster)
+ r := prometheus.NewRegistry()
+ theConfig.Start(r)
+ s.rtr = MakeRESTRouter(s.cluster, r)
}
func (s *ProxyRemoteSuite) TearDownTest(c *check.C) {
s.vm.Close()
KeepVM = nil
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
s.remoteAPI.Close()
s.remoteKeepproxy.Close()
}
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
. "gopkg.in/check.v1"
)
pullq = nil
teardown()
theConfig = DefaultConfig()
- theConfig.Start()
+ theConfig.Start(prometheus.NewRegistry())
}
var firstPullList = []byte(`[
"git.curoverse.com/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
+ "github.com/prometheus/client_golang/prometheus"
)
const (
// Start populates private fields and verifies the configuration is
// valid.
-func (v *S3Volume) Start() error {
+func (v *S3Volume) Start(vm *volumeMetricsVecs) error {
region, ok := aws.Regions[v.Region]
if v.Endpoint == "" {
if !ok {
Name: v.Bucket,
},
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
return nil
}
}
func (lister *s3Lister) getPage() {
+ lister.Stats.TickOps("list")
lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
lister.nextMarker = ""
func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
rdr, err := b.Bucket.GetReader(path)
+ b.stats.TickOps("get")
b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
b.stats.TickErr(err)
return NewCountingReader(rdr, b.stats.TickInBytes), err
func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
resp, err := b.Bucket.Head(path, headers)
+ b.stats.TickOps("head")
b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
b.stats.TickErr(err)
return resp, err
r = NewCountingReader(r, b.stats.TickOutBytes)
}
err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+ b.stats.TickOps("put")
b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
b.stats.TickErr(err)
return err
func (b *s3bucket) Del(path string) error {
err := b.Bucket.Del(path)
+ b.stats.TickOps("delete")
b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
b.stats.TickErr(err)
return err
"github.com/AdRoll/goamz/s3"
"github.com/AdRoll/goamz/s3/s3test"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
vol := *v.S3Volume
vol.Endpoint = srv.URL
v = &TestableS3Volume{S3Volume: &vol}
- v.Start()
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ v.Start(metrics)
ctx, cancel := context.WithCancel(context.Background())
server: srv,
serverClock: clock,
}
- v.Start()
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ v.Start(metrics)
err = v.bucket.PutBucket(s3.ACL("private"))
c.Assert(err, check.IsNil)
return v
c.Check(cfg.Volumes[0].GetStorageClasses(), check.DeepEquals, []string{"class_a", "class_b"})
}
-func (v *TestableS3Volume) Start() error {
+func (v *TestableS3Volume) Start(vm *volumeMetricsVecs) error {
tmp, err := ioutil.TempFile("", "keepstore")
v.c.Assert(err, check.IsNil)
defer os.Remove(tmp.Name())
v.S3Volume.AccessKeyFile = tmp.Name()
v.S3Volume.SecretKeyFile = tmp.Name()
- v.c.Assert(v.S3Volume.Start(), check.IsNil)
+ v.c.Assert(v.S3Volume.Start(vm), check.IsNil)
return nil
}
func (v *TestableS3Volume) Teardown() {
v.server.Quit()
}
+
+func (v *TestableS3Volume) ReadWriteOperationLabelValues() (r, w string) {
+ return "get", "put"
+}
import (
"sync"
"sync/atomic"
+
+ "github.com/prometheus/client_golang/prometheus"
)
type statsTicker struct {
ErrorCodes map[string]uint64 `json:",omitempty"`
lock sync.Mutex
+
+ opsCounters *prometheus.CounterVec
+ errCounters *prometheus.CounterVec
+ ioBytes *prometheus.CounterVec
}
// Tick increments each of the given counters by 1 using
}
s.ErrorCodes[errType]++
s.lock.Unlock()
+ if s.errCounters != nil {
+ s.errCounters.With(prometheus.Labels{"error_type": errType}).Inc()
+ }
}
// TickInBytes increments the incoming byte counter by n.
func (s *statsTicker) TickInBytes(n uint64) {
+ if s.ioBytes != nil {
+ s.ioBytes.With(prometheus.Labels{"direction": "in"}).Add(float64(n))
+ }
atomic.AddUint64(&s.InBytes, n)
}
// TickOutBytes increments the outgoing byte counter by n.
func (s *statsTicker) TickOutBytes(n uint64) {
+ if s.ioBytes != nil {
+ s.ioBytes.With(prometheus.Labels{"direction": "out"}).Add(float64(n))
+ }
atomic.AddUint64(&s.OutBytes, n)
}
+
+// TickOps increments the counter of the listed operations by 1.
+func (s *statsTicker) TickOps(operations ...string) {
+ if s.opsCounters == nil {
+ return
+ }
+ for _, opType := range operations {
+ s.opsCounters.With(prometheus.Labels{"operation": opType}).Inc()
+ }
+}
"sync/atomic"
"syscall"
"time"
+
+ "github.com/prometheus/client_golang/prometheus"
)
type unixVolumeAdder struct {
}
// String implements flag.Value
-func (s *unixVolumeAdder) String() string {
+func (vs *unixVolumeAdder) String() string {
return "-"
}
}
// Start implements Volume
-func (v *UnixVolume) Start() error {
+func (v *UnixVolume) Start(vm *volumeMetricsVecs) error {
if v.Serialize {
v.locker = &sync.Mutex{}
}
if v.DirectoryReplication == 0 {
v.DirectoryReplication = 1
}
+ // Set up prometheus metrics
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
+ v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = vm.getCounterVecsFor(lbls)
+
_, err := v.os.Stat(v.Root)
+
return err
}
}
defer v.unlockfile(f)
ts := syscall.NsecToTimespec(time.Now().UnixNano())
+ v.os.stats.TickOps("utimes")
v.os.stats.Tick(&v.os.stats.UtimesOps)
err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
v.os.stats.TickErr(err)
return putWithPipe(ctx, loc, block, v)
}
-// ReadBlock implements BlockWriter.
+// WriteBlock implements BlockWriter.
func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
if v.ReadOnly {
return MethodDisabledError
return err
}
defer rootdir.Close()
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
names, err := rootdir.Readdirnames(1)
lastErr = err
continue
}
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
for {
fileInfo, err := blockdir.Readdir(1)
return MethodDisabledError
}
+ v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
files, err := ioutil.ReadDir(v.blockDir(loc))
if err != nil {
// lockfile and unlockfile use flock(2) to manage kernel file locks.
func (v *UnixVolume) lockfile(f *os.File) error {
+ v.os.stats.TickOps("flock")
v.os.stats.Tick(&v.os.stats.FlockOps)
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
v.os.stats.TickErr(err)
}
func (o *osWithStats) Open(name string) (*os.File, error) {
+ o.stats.TickOps("open")
o.stats.Tick(&o.stats.OpenOps)
f, err := os.Open(name)
o.stats.TickErr(err)
}
func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
+ o.stats.TickOps("open")
o.stats.Tick(&o.stats.OpenOps)
f, err := os.OpenFile(name, flag, perm)
o.stats.TickErr(err)
}
func (o *osWithStats) Remove(path string) error {
+ o.stats.TickOps("unlink")
o.stats.Tick(&o.stats.UnlinkOps)
err := os.Remove(path)
o.stats.TickErr(err)
}
func (o *osWithStats) Rename(a, b string) error {
+ o.stats.TickOps("rename")
o.stats.Tick(&o.stats.RenameOps)
err := os.Rename(a, b)
o.stats.TickErr(err)
}
func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
+ o.stats.TickOps("stat")
o.stats.Tick(&o.stats.StatOps)
fi, err := os.Stat(path)
o.stats.TickErr(err)
}
func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
+ o.stats.TickOps("create")
o.stats.Tick(&o.stats.CreateOps)
f, err := ioutil.TempFile(dir, base)
o.stats.TickErr(err)
"time"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
}
}
+func (v *TestableUnixVolume) ReadWriteOperationLabelValues() (r, w string) {
+ return "open", "create"
+}
+
// serialize = false; readonly = false
func TestUnixVolumeWithGenericTests(t *testing.T) {
DoGenericVolumeTests(t, func(t TB) TestableVolume {
Root: "/",
ReadOnly: true,
}
- if err := v.Start(); err != nil {
+ metrics := newVolumeMetricsVecs(prometheus.NewRegistry())
+ if err := v.Start(metrics); err != nil {
t.Error(err)
}
if got := v.Replication(); got != 1 {
// Do whatever private setup tasks and configuration checks
// are needed. Return non-nil if the volume is unusable (e.g.,
// invalid config).
- Start() error
+ Start(vm *volumeMetricsVecs) error
// Get a block: copy the block data into buf, and return the
// number of bytes copied.
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "github.com/prometheus/client_golang/prometheus"
+ dto "github.com/prometheus/client_model/go"
)
type TB interface {
testStatus(t, factory)
+ testMetrics(t, factory)
+
testString(t, factory)
testUpdateReadOnly(t, factory)
}
}
+func getValueFrom(cv *prometheus.CounterVec, lbls prometheus.Labels) float64 {
+ c, _ := cv.GetMetricWith(lbls)
+ pb := &dto.Metric{}
+ c.Write(pb)
+ return pb.GetCounter().GetValue()
+}
+
+func testMetrics(t TB, factory TestableVolumeFactory) {
+ var err error
+
+ v := factory(t)
+ defer v.Teardown()
+ reg := prometheus.NewRegistry()
+ vm := newVolumeMetricsVecs(reg)
+
+ err = v.Start(vm)
+ if err != nil {
+ t.Error("Failed Start(): ", err)
+ }
+ opsC, _, ioC := vm.getCounterVecsFor(prometheus.Labels{"device_id": v.DeviceID()})
+
+ if ioC == nil {
+ t.Error("ioBytes CounterVec is nil")
+ return
+ }
+
+ if getValueFrom(ioC, prometheus.Labels{"direction": "out"})+
+ getValueFrom(ioC, prometheus.Labels{"direction": "in"}) > 0 {
+ t.Error("ioBytes counter should be zero")
+ }
+
+ if opsC == nil {
+ t.Error("opsCounter CounterVec is nil")
+ return
+ }
+
+ var c, writeOpCounter, readOpCounter float64
+
+ readOpType, writeOpType := v.ReadWriteOperationLabelValues()
+ writeOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
+ readOpCounter = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
+
+ // Test Put if volume is writable
+ if v.Writable() {
+ err = v.Put(context.Background(), TestHash, TestBlock)
+ if err != nil {
+ t.Errorf("Got err putting block %q: %q, expected nil", TestBlock, err)
+ }
+ // Check that the write operations counter increased
+ c = getValueFrom(opsC, prometheus.Labels{"operation": writeOpType})
+ if c <= writeOpCounter {
+ t.Error("Operation(s) not counted on Put")
+ }
+ // Check that bytes counter is > 0
+ if getValueFrom(ioC, prometheus.Labels{"direction": "out"}) == 0 {
+ t.Error("ioBytes{direction=out} counter shouldn't be zero")
+ }
+ } else {
+ v.PutRaw(TestHash, TestBlock)
+ }
+
+ buf := make([]byte, BlockSize)
+ _, err = v.Get(context.Background(), TestHash, buf)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Check that the operations counter increased
+ c = getValueFrom(opsC, prometheus.Labels{"operation": readOpType})
+ if c <= readOpCounter {
+ t.Error("Operation(s) not counted on Get")
+ }
+ // Check that the bytes "in" counter is > 0
+ if getValueFrom(ioC, prometheus.Labels{"direction": "in"}) == 0 {
+ t.Error("ioBytes{direction=in} counter shouldn't be zero")
+ }
+}
+
// Invoke String for the volume; expect non-empty result
// Test should pass for both writable and read-only volumes
func testString(t TB, factory TestableVolumeFactory) {
// impractical to achieve with a sequence of normal Volume operations.
type TestableVolume interface {
Volume
+
// [Over]write content for a locator with the given data,
// bypassing all constraints like readonly and serialize.
PutRaw(locator string, data []byte)
+ // Returns the strings that a driver uses to record read/write operations.
+ ReadWriteOperationLabelValues() (r, w string)
+
// Specify the value Mtime() should return, until the next
// call to Touch, TouchWithDate, or Put.
TouchWithDate(locator string, lastPut time.Time)
return "Mock"
}
-func (v *MockVolume) Start() error {
+func (v *MockVolume) Start(vm *volumeMetricsVecs) error {
return nil
}