# Deploy with Capistrano
# gem 'capistrano'
-# To use debugger
-#gem 'byebug'
-
gem 'passenger', :group => :production
gem 'andand'
gem 'RedCloth'
i18n (>= 0.7, < 2)
minitest (~> 5.1)
tzinfo (~> 1.1)
- addressable (2.6.0)
- public_suffix (>= 2.0.2, < 4.0)
+ addressable (2.7.0)
+ public_suffix (>= 2.0.2, < 5.0)
andand (1.3.3)
angularjs-rails (1.3.15)
arel (7.1.4)
- arvados (1.3.1.20190320201707)
+ arvados (1.3.3.20190320201707)
activesupport (>= 3)
andand (~> 1.3, >= 1.3.3)
arvados-google-api-client (>= 0.7, < 0.8.9)
flamegraph (0.9.5)
globalid (0.4.2)
activesupport (>= 4.2.0)
- googleauth (0.8.1)
+ googleauth (0.9.0)
faraday (~> 0.12)
jwt (>= 1.4, < 3.0)
memoist (~> 0.16)
actionpack (>= 4)
less (~> 2.6.0)
sprockets (>= 2)
- libv8 (3.16.14.19)
+ libv8 (3.16.14.19-x86_64-linux)
lograge (0.10.0)
actionpack (>= 4)
activesupport (>= 4)
morrisjs-rails (0.5.1.2)
railties (> 3.1, < 6)
multi_json (1.13.1)
- multipart-post (2.0.0)
+ multipart-post (2.1.1)
net-scp (2.0.0)
net-ssh (>= 2.6.5, < 6.0.0)
net-sftp (2.1.2)
net-ssh-gateway (2.0.0)
net-ssh (>= 4.0.0)
nio4r (2.3.1)
- nokogiri (1.10.2)
+ nokogiri (1.10.4)
mini_portile2 (~> 2.4.0)
npm-rails (0.2.1)
rails (>= 3.2)
- oj (3.7.11)
- os (1.0.0)
+ oj (3.7.12)
+ os (1.0.1)
passenger (6.0.2)
rack
rake (>= 0.8.1)
cliver (~> 0.3.1)
multi_json (~> 1.0)
websocket-driver (>= 0.2.0)
- public_suffix (3.0.3)
+ public_suffix (4.0.1)
rack (2.0.7)
rack-mini-profiler (1.0.2)
rack (>= 1.2.0)
retriable (1.4.1)
ruby-debug-passenger (0.2.0)
ruby-prof (0.17.0)
- rubyzip (1.2.2)
+ rubyzip (1.3.0)
rvm-capistrano (1.5.6)
capistrano (~> 2.15.4)
safe_yaml (1.0.5)
return 1
fi
- go get -ldflags "-X main.version=${go_package_version}" "git.curoverse.com/arvados.git/$src_path"
+ go get -ldflags "-X git.curoverse.com/arvados.git/lib/cmd.version=${go_package_version} -X main.version=${go_package_version}" "git.curoverse.com/arvados.git/$src_path"
local -a switches=()
systemd_unit="$WORKSPACE/${src_path}/${prog}.service"
retry do_test_once ${@}
}
+go_ldflags() {
+ version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}
+ echo "-X git.curoverse.com/arvados.git/lib/cmd.version=${version} -X main.version=${version}"
+}
+
do_test_once() {
unset result
# before trying "go test". Otherwise, coverage-reporting
# mode makes Go show the wrong line numbers when reporting
# compilation errors.
- go get -ldflags "-X git.curoverse.com/arvados.git/lib/cmd.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1" && \
+ go get -ldflags "$(go_ldflags)" -t "git.curoverse.com/arvados.git/$1" && \
cd "$GOPATH/src/git.curoverse.com/arvados.git/$1" && \
if [[ -n "${testargs[$1]}" ]]
then
result=1
elif [[ "$2" == "go" ]]
then
- go get -ldflags "-X git.curoverse.com/arvados.git/lib/cmd.version=${ARVADOS_VERSION:-$(git log -n1 --format=%H)-dev}" -t "git.curoverse.com/arvados.git/$1"
+ go get -ldflags "$(go_ldflags)" -t "git.curoverse.com/arvados.git/$1"
elif [[ "$2" == "pip" ]]
then
# $3 can name a path directory for us to use, including trailing
- Data Management:
- admin/collection-versioning.html.textile.liquid
- admin/collection-managed-properties.html.textile.liquid
+ - admin/keep-balance.html.textile.liquid
- Other:
- admin/federation.html.textile.liquid
- admin/controlling-container-reuse.html.textile.liquid
There are 2 configuration settings that control this feature, both go on the @application.yml@ file.
-h4. Settting: @collection_versioning@ (Boolean. Default: false)
+h4. Setting: @collection_versioning@ (Boolean. Default: false)
If @true@, collection versioning is enabled, meaning that new version records can be created. Note that if you set @collection_versioning@ to @false@ after being enabled, old versions will still be accessible, but further changes will not be versioned.
The legacy arv-git-httpd config (loaded from @/etc/arvados/git-httpd/git-httpd.yml@ or a different location specified via -legacy-git-httpd-config command line argument) takes precedence over the centralized config. After you migrate everything from the legacy config to the centralized config, you should delete @/etc/arvados/git-httpd/git-httpd.yml@ and stop using the -legacy-git-httpd-config argument.
+h2(#keepbalance). keep-balance
+
+The legacy keep-balance config (loaded from @/etc/arvados/keep-balance/keep-balance.yml@ or a different location specified via -legacy-keepbalance-config command line argument) takes precedence over the centralized config. After you migrate everything from the legacy config to the centralized config, you should delete @/etc/arvados/keep-balance/keep-balance.yml@ and stop using the -legacy-keepbalance-config argument.
h2. arvados-controller
--- /dev/null
+---
+layout: default
+navsection: admin
+title: Balancing Keep servers
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+This page describes how to balance keepstore servers using keep-balance. Keep-balance creates new copies of under-replicated blocks, deletes excess copies of over-replicated and unreferenced blocks, and moves blocks to better positions (e.g. after adding new keepstore servers) so clients find them faster.
+
+See "the Keep-balance install docs":{{site.baseurl}}/install/install-keep-balance.html for installation instructions.
+
+h3. Data deletion
+
+The keep-balance service determines which blocks are candidates for deletion and instructs the keepstore to move those blocks to the trash. When a block is newly written, it is protected from deletion for the duration in @BlobSigningTTL@. During this time, it cannot be trashed or deleted.
+
+If keep-balance instructs keepstore to trash a block which is older than @BlobSigningTTL@, and @BlobTrashLifetime@ is non-zero, the block will be moved to "trash". A block which is in the trash is no longer accessible by read requests, but has not yet been permanently deleted. Blocks which are in the trash may be recovered using the "untrash" API endpoint. Blocks are permanently deleted after they have been in the trash for the duration in @BlobTrashLifetime@.
+
+Keep-balance is also responsible for balancing the distribution of blocks across keepstore servers by asking servers to pull blocks from other servers (as determined by their "storage class":{{site.baseurl}}/admin/storage-classes.html and "rendezvous hashing order":{{site.baseurl}}/api/storage.html). Pulling a block makes a copy. If a block is overreplicated (i.e. there are excess copies) after pulling, it will be subsequently trashed and deleted on the original server, subject to @BlobTrash@ and @BlobTrashLifetime@ settings.
+
+h3. Scanning
+
+By default, keep-balance operates periodically, i.e. do a scan/balance operation, sleep, repeat.
+
+The @Collections.BalancePeriod@ value in @/etc/arvados/config.yml@ determines the interval between start times of successive scan/balance operations. If an operation takes longer than the @Collections.BalancePeriod@, the next operation will follow it immediately. If SIGUSR1 is received during an idle period between operations, the next operation will start immediately.
+
+Keep-balance can also be run with the @-once@ flag to do a single scan/balance operation and then exit. The exit code will be zero if the operation was successful.
+
+h3. Committing
+
+Keep-balance computes and reports changes but does not implement them by sending pull and trash lists to the Keep services unless the @-commit-pull@ and @-commit-trash@ flags are used.
+
+h3. Additional configuration
+
+For configuring resource usage tuning and lost block reporting, please see the @Collections.BlobMissingReport@, @Collections.BalanceCollectionBatch@, @Collections.BalanceCollectionBuffers@ option in the "default config.yml file":{{site.baseurl}}/admin/config.html.
+
+h3. Limitations
+
+Keep-balance does not attempt to discover whether committed pull and trash requests ever get carried out -- only that they are accepted by the Keep services. If some services are full, new copies of under-replicated blocks might never get made, only repeatedly requested.
\ No newline at end of file
Once these old records are removed, @arv keep_service list@ will instead return the services listed under Services/Keepstore/InternalURLs and Services/Keepproxy/ExternalURL in your centralized configuration file.
+h4. Keep-balance configuration migration
+
+(feature "#14714":https://dev.arvados.org/issues/14714 ) The keep-balance service can now be configured using the centralized configuration file at @/etc/arvados/config.yml@. The following command line and configuration options have changed.
+
+You can no longer specify types of keep services to balance via the @KeepServiceTypes@ config option in the legacy config at @/etc/arvados/keep-balance/keep-balance.yml@. If you are still using the legacy config and @KeepServiceTypes@ has a value other than "disk", keep-balance will produce an error.
+
+You can no longer specify individual keep services to balance via the @config.KeepServiceList@ command line option or @KeepServiceList@ legacy config option. Instead, keep-balance operates on all configured volumes. If you are still using the legacy config, @KeepServiceList@ should be removed or keep-balance will produce an error.
+
+Please see the "config migration guide":{{site.baseurl}}/admin/config-migration.html and "keep-balance install guide":{{site.baseurl}}/install/install-keep-balance.html for more details.
+
h4. Arv-git-httpd configuration migration
(feature "#14712":https://dev.arvados.org/issues/14712 ) The arv-git-httpd package can now be configured using the centralized configuration file at @/etc/arvados/config.yml@. Configuration via individual command line arguments is no longer available. Please see "arv-git-httpd's config migration guide":{{site.baseurl}}/admin/config-migration.html#arv-git-httpd for more details.
h3. delete
-Delete an existing Container.
+Delete a Container.
+
+This API requires admin privileges. In normal operation, it should not be used at all. API clients like Workbench might not work correctly when a container request references a container that has been deleted.
Arguments:
Driver: S3
DriverParameters:
- # The credentials to use to access the bucket.
- AccessKey: aaaaa
- SecretKey: aaaaa
+ # IAM role name to use when retrieving credentials from
+ # instance metadata. It can be omitted, in which case the
+ # role name itself will be retrieved from instance metadata
+ # -- but setting it explicitly may protect you from using
+ # the wrong credentials in the event of an
+ # installation/configuration error.
+ IAMRole: ""
+
+ # If you are not using an IAM role for authentication,
+ # specify access credentials here instead.
+ AccessKey: ""
+ SecretKey: ""
# Storage provider endpoint. For Amazon S3, use "" or
# omit. For Google Cloud Storage, use
SPDX-License-Identifier: CC-BY-SA-3.0
{% endcomment %}
-Keep-balance deletes unreferenced and overreplicated blocks from Keep servers, makes additional copies of underreplicated blocks, and moves blocks into optimal locations as needed (e.g., after adding new servers).
+Keep-balance deletes unreferenced and overreplicated blocks from Keep servers, makes additional copies of underreplicated blocks, and moves blocks into optimal locations as needed (e.g., after adding new servers). See "Balancing Keep servers":{{site.baseurl}}/admin/keep-balance.html for usage details.
{% include 'notebox_begin' %}
<notextile>
<pre><code>~$ <span class="userinput">keep-balance -h</span>
...
-Usage: keep-balance [options]
-
-Options:
+Usage of ./keep-balance:
-commit-pulls
- send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)
+ send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)
-commit-trash
- send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)
+ send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)
...
</code></pre>
</notextile>
-h3. Create a keep-balance configuration file
+h3. Update the cluster config
-On the host running keep-balance, create @/etc/arvados/keep-balance/keep-balance.yml@ using the SystemRootToken from your cluster configuration file. Follow this YAML format:
+Edit the cluster config at @/etc/arvados/config.yml@ and set @Services.Keepbalance.InternalURLs@. Replace @uuid_prefix@ with your cluster id.
<notextile>
-<pre><code>Listen: :9005
-Client:
- APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
- AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
-KeepServiceTypes:
- - disk
-ManagementToken: <span class="userinput">xyzzy</span>
-RunPeriod: 10m
-CollectionBatchSize: 100000
-CollectionBuffers: 1000
-LostBlocksFile: /tmp/keep-balance-lost-blocks.txt # If given, this file will be updated atomically during each successful run.
+<pre><code>Clusters:
+ <span class="userinput">uuid_prefix</span>:
+ Services:
+ Keepbalance:
+ InternalURLs:
+ "http://localhost:9005/": {}
+ TLS:
+ Insecure: false
</code></pre>
</notextile>
-If your API server's SSL certificate is not signed by a recognized CA, add the @Insecure@ option to the @Client@ section:
-
-<notextile>
-<pre><code>Client:
- <span class="userinput">Insecure: true</span>
- APIHost: ...
-</code></pre>
-</notextile>
+Set @TLS.Insecure: true@ if your API server’s TLS certificate is not signed by a recognized CA.
h3. Start the service (option 1: systemd)
</code></pre>
</notextile>
-h3. Notes on storage management
+h3. Note on storage management
-On its own, a keepstore server never deletes data. The "keep-balance":install-keep-balance.html service determines which blocks are candidates for deletion and instructs the keepstore to move those blocks to the trash.
-
-When a block is newly written, it is protected from deletion for the duration in @BlobSigningTTL@. During this time, it cannot be trashed or deleted.
-
-If keep-balance instructs keepstore to trash a block which is older than @BlobSigningTTL@, and @BlobTrashLifetime@ is non-zero, the block will be moved to "trash". A block which is in the trash is no longer accessible by read requests, but has not yet been permanently deleted. Blocks which are in the trash may be recovered using the "untrash" API endpoint. Blocks are permanently deleted after they have been in the trash for the duration in @BlobTrashLifetime@.
-
-Keep-balance is also responsible for balancing the distribution of blocks across keepstore servers by asking servers to pull blocks from other servers (as determined by their "storage class":{{site.baseurl}}/admin/storage-classes.html and "rendezvous hashing order":{{site.baseurl}}/api/storage.html). Pulling a block makes a copy. If a block is overreplicated (i.e. there are excess copies) after pulling, it will be subsequently trashed and deleted on the original server, subject to @BlobTrash@ and @BlobTrashLifetime@ settings.
+On its own, a keepstore server never deletes data. Instead, the keep-balance service determines which blocks are candidates for deletion and instructs the keepstore to move those blocks to the trash. Please see the "Balancing Keep servers":{{site.baseurl}}/admin/keep-balance.html for more details.
h3. Configure storage volumes
# collection's replication_desired attribute is nil.
DefaultReplication: 2
- # Lifetime (in seconds) of blob permission signatures generated by
- # the API server. This determines how long a client can take (after
- # retrieving a collection record) to retrieve the collection data
- # from Keep. If the client needs more time than that (assuming the
- # collection still has the same content and the relevant user/token
- # still has permission) the client can retrieve the collection again
- # to get fresh signatures.
+ # BlobSigningTTL determines the minimum lifetime of transient
+ # data, i.e., blocks that are not referenced by
+ # collections. Unreferenced blocks exist for two reasons:
+ #
+ # 1) A data block must be written to a disk/cloud backend device
+ # before a collection can be created/updated with a reference to
+ # it.
+ #
+ # 2) Deleting or updating a collection can remove the last
+ # remaining reference to a data block.
+ #
+ # If BlobSigningTTL is too short, long-running
+ # processes/containers will fail when they take too long (a)
+ # between writing blocks and writing collections that reference
+ # them, or (b) between reading collections and reading the
+ # referenced blocks.
+ #
+ # If BlobSigningTTL is too long, data will still be stored long
+ # after the referring collections are deleted, and you will
+ # needlessly fill up disks or waste money on cloud storage.
#
# Modifying BlobSigningTTL invalidates existing signatures; see
# BlobSigningKey note above.
# The default is 2 weeks.
BlobSigningTTL: 336h
+ # When running keep-balance, this is the destination filename for
+ # the list of lost block hashes if there are any, one per line.
+ # Updated automically during each successful run.
+ BlobMissingReport: ""
+
+ # keep-balance operates periodically, i.e.: do a
+ # scan/balance operation, sleep, repeat.
+ #
+ # BalancePeriod determines the interval between start times of
+ # successive scan/balance operations. If a scan/balance operation
+ # takes longer than RunPeriod, the next one will follow it
+ # immediately.
+ #
+ # If SIGUSR1 is received during an idle period between operations,
+ # the next operation will start immediately.
+ BalancePeriod: 10m
+
+ # Limits the number of collections retrieved by keep-balance per
+ # API transaction. If this is zero, page size is
+ # determined by the API server's own page size limits (see
+ # API.MaxItemsPerResponse and API.MaxIndexDatabaseRead).
+ BalanceCollectionBatch: 0
+
+ # The size of keep-balance's internal queue of
+ # collections. Higher values use more memory and improve throughput
+ # by allowing keep-balance to fetch the next page of collections
+ # while the current page is still being processed. If this is zero
+ # or omitted, pages are processed serially.
+ BalanceCollectionBuffers: 1000
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
# for s3 driver -- see
# https://doc.arvados.org/install/configure-s3-object-storage.html
+ IAMRole: aaaaa
AccessKey: aaaaa
SecretKey: aaaaa
Endpoint: ""
return nil
}
+const defaultKeepBalanceConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+
+type oldKeepBalanceConfig struct {
+ Client *arvados.Client
+ Listen *string
+ KeepServiceTypes *[]string
+ KeepServiceList *arvados.KeepServiceList
+ RunPeriod *arvados.Duration
+ CollectionBatchSize *int
+ CollectionBuffers *int
+ RequestTimeout *arvados.Duration
+ LostBlocksFile *string
+ ManagementToken *string
+}
+
+func (ldr *Loader) loadOldKeepBalanceConfig(cfg *arvados.Config) error {
+ if ldr.KeepBalancePath == "" {
+ return nil
+ }
+ var oc oldKeepBalanceConfig
+ err := ldr.loadOldConfigHelper("keep-balance", ldr.KeepBalancePath, &oc)
+ if os.IsNotExist(err) && ldr.KeepBalancePath == defaultKeepBalanceConfigPath {
+ return nil
+ } else if err != nil {
+ return err
+ }
+
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return err
+ }
+
+ loadOldClientConfig(cluster, oc.Client)
+
+ if oc.Listen != nil {
+ cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: *oc.Listen}] = arvados.ServiceInstance{}
+ }
+ if oc.ManagementToken != nil {
+ cluster.ManagementToken = *oc.ManagementToken
+ }
+ if oc.RunPeriod != nil {
+ cluster.Collections.BalancePeriod = *oc.RunPeriod
+ }
+ if oc.LostBlocksFile != nil {
+ cluster.Collections.BlobMissingReport = *oc.LostBlocksFile
+ }
+ if oc.CollectionBatchSize != nil {
+ cluster.Collections.BalanceCollectionBatch = *oc.CollectionBatchSize
+ }
+ if oc.CollectionBuffers != nil {
+ cluster.Collections.BalanceCollectionBuffers = *oc.CollectionBuffers
+ }
+ if oc.RequestTimeout != nil {
+ cluster.API.KeepServiceRequestTimeout = *oc.RequestTimeout
+ }
+
+ msg := "The %s configuration option is no longer supported. Please remove it from your configuration file. Keep-balance will operate on all configured volumes."
+
+ // If the keep service type provided is "disk" silently ignore it, since
+ // this is what ends up being done anyway.
+ if oc.KeepServiceTypes != nil {
+ numTypes := len(*oc.KeepServiceTypes)
+ if numTypes != 0 && !(numTypes == 1 && (*oc.KeepServiceTypes)[0] == "disk") {
+ return fmt.Errorf(msg, "KeepServiceType")
+ }
+ }
+
+ if oc.KeepServiceList != nil {
+ return fmt.Errorf(msg, "KeepServiceList")
+ }
+
+ cfg.Clusters[cluster.ClusterID] = *cluster
+ return nil
+}
+
func (ldr *Loader) loadOldEnvironmentVariables(cfg *arvados.Config) error {
if os.Getenv("ARVADOS_API_TOKEN") == "" && os.Getenv("ARVADOS_API_HOST") == "" {
return nil
c.Check(cluster.Git.Repositories, check.Equals, "/test/reporoot")
c.Check(cluster.Services.Keepproxy.InternalURLs[arvados.URL{Host: ":9000"}], check.Equals, arvados.ServiceInstance{})
}
+
+func (s *LoadSuite) TestLegacyKeepBalanceConfig(c *check.C) {
+ f := "-legacy-keepbalance-config"
+ content := []byte(fmtKeepBalanceConfig(""))
+ cluster, err := testLoadLegacyConfig(content, f, c)
+
+ c.Check(err, check.IsNil)
+ c.Check(cluster, check.NotNil)
+ c.Check(cluster.ManagementToken, check.Equals, "xyzzy")
+ c.Check(cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: ":80"}], check.Equals, arvados.ServiceInstance{})
+ c.Check(cluster.Collections.BalanceCollectionBuffers, check.Equals, 1000)
+ c.Check(cluster.Collections.BalanceCollectionBatch, check.Equals, 100000)
+ c.Check(cluster.Collections.BalancePeriod.String(), check.Equals, "10m")
+ c.Check(cluster.Collections.BlobMissingReport, check.Equals, "testfile")
+ c.Check(cluster.API.KeepServiceRequestTimeout.String(), check.Equals, "30m")
+
+ content = []byte(fmtKeepBalanceConfig(`"KeepServiceTypes":["disk"],`))
+ _, err = testLoadLegacyConfig(content, f, c)
+ c.Check(err, check.IsNil)
+
+ content = []byte(fmtKeepBalanceConfig(`"KeepServiceTypes":[],`))
+ _, err = testLoadLegacyConfig(content, f, c)
+ c.Check(err, check.IsNil)
+
+ content = []byte(fmtKeepBalanceConfig(`"KeepServiceTypes":["proxy"],`))
+ _, err = testLoadLegacyConfig(content, f, c)
+ c.Check(err, check.NotNil)
+
+ content = []byte(fmtKeepBalanceConfig(`"KeepServiceTypes":["disk", "proxy"],`))
+ _, err = testLoadLegacyConfig(content, f, c)
+ c.Check(err, check.NotNil)
+
+ content = []byte(fmtKeepBalanceConfig(`"KeepServiceList":{},`))
+ _, err = testLoadLegacyConfig(content, f, c)
+ c.Check(err, check.NotNil)
+}
+
+func fmtKeepBalanceConfig(param string) string {
+ return fmt.Sprintf(`
+{
+ "Client": {
+ "Scheme": "",
+ "APIHost": "example.com",
+ "AuthToken": "abcdefg",
+ "Insecure": false
+ },
+ "Listen": ":80",
+ %s
+ "RunPeriod": "10m",
+ "CollectionBatchSize": 100000,
+ "CollectionBuffers": 1000,
+ "RequestTimeout": "30m",
+ "ManagementToken": "xyzzy",
+ "LostBlocksFile": "testfile"
+}
+`, param)
+}
"Collections.TrashSweepInterval": false,
"Collections.TrustAllContent": false,
"Collections.WebDAVCache": false,
+ "Collections.BalanceCollectionBatch": false,
+ "Collections.BalancePeriod": false,
+ "Collections.BlobMissingReport": false,
+ "Collections.BalanceCollectionBuffers": false,
"Containers": true,
"Containers.CloudVMs": false,
"Containers.CrunchRunCommand": false,
# collection's replication_desired attribute is nil.
DefaultReplication: 2
- # Lifetime (in seconds) of blob permission signatures generated by
- # the API server. This determines how long a client can take (after
- # retrieving a collection record) to retrieve the collection data
- # from Keep. If the client needs more time than that (assuming the
- # collection still has the same content and the relevant user/token
- # still has permission) the client can retrieve the collection again
- # to get fresh signatures.
+ # BlobSigningTTL determines the minimum lifetime of transient
+ # data, i.e., blocks that are not referenced by
+ # collections. Unreferenced blocks exist for two reasons:
+ #
+ # 1) A data block must be written to a disk/cloud backend device
+ # before a collection can be created/updated with a reference to
+ # it.
+ #
+ # 2) Deleting or updating a collection can remove the last
+ # remaining reference to a data block.
+ #
+ # If BlobSigningTTL is too short, long-running
+ # processes/containers will fail when they take too long (a)
+ # between writing blocks and writing collections that reference
+ # them, or (b) between reading collections and reading the
+ # referenced blocks.
+ #
+ # If BlobSigningTTL is too long, data will still be stored long
+ # after the referring collections are deleted, and you will
+ # needlessly fill up disks or waste money on cloud storage.
#
# Modifying BlobSigningTTL invalidates existing signatures; see
# BlobSigningKey note above.
# The default is 2 weeks.
BlobSigningTTL: 336h
+ # When running keep-balance, this is the destination filename for
+ # the list of lost block hashes if there are any, one per line.
+ # Updated automically during each successful run.
+ BlobMissingReport: ""
+
+ # keep-balance operates periodically, i.e.: do a
+ # scan/balance operation, sleep, repeat.
+ #
+ # BalancePeriod determines the interval between start times of
+ # successive scan/balance operations. If a scan/balance operation
+ # takes longer than RunPeriod, the next one will follow it
+ # immediately.
+ #
+ # If SIGUSR1 is received during an idle period between operations,
+ # the next operation will start immediately.
+ BalancePeriod: 10m
+
+ # Limits the number of collections retrieved by keep-balance per
+ # API transaction. If this is zero, page size is
+ # determined by the API server's own page size limits (see
+ # API.MaxItemsPerResponse and API.MaxIndexDatabaseRead).
+ BalanceCollectionBatch: 0
+
+ # The size of keep-balance's internal queue of
+ # collections. Higher values use more memory and improve throughput
+ # by allowing keep-balance to fetch the next page of collections
+ # while the current page is still being processed. If this is zero
+ # or omitted, pages are processed serially.
+ BalanceCollectionBuffers: 1000
+
# Default lifetime for ephemeral collections: 2 weeks. This must not
# be less than BlobSigningTTL.
DefaultTrashLifetime: 336h
# for s3 driver -- see
# https://doc.arvados.org/install/configure-s3-object-storage.html
+ IAMRole: aaaaa
AccessKey: aaaaa
SecretKey: aaaaa
Endpoint: ""
WebsocketPath string
KeepproxyPath string
GitHttpdPath string
+ KeepBalancePath string
configdata []byte
}
flagset.StringVar(&ldr.WebsocketPath, "legacy-ws-config", defaultWebsocketConfigPath, "Legacy arvados-ws configuration `file`")
flagset.StringVar(&ldr.KeepproxyPath, "legacy-keepproxy-config", defaultKeepproxyConfigPath, "Legacy keepproxy configuration `file`")
flagset.StringVar(&ldr.GitHttpdPath, "legacy-git-httpd-config", defaultGitHttpdConfigPath, "Legacy arv-git-httpd configuration `file`")
+ flagset.StringVar(&ldr.KeepBalancePath, "legacy-keepbalance-config", defaultKeepBalanceConfigPath, "Legacy keep-balance configuration `file`")
flagset.BoolVar(&ldr.SkipLegacy, "skip-legacy", false, "Don't load legacy config files")
}
if legacyConfigArg != "-legacy-git-httpd-config" {
ldr.GitHttpdPath = ""
}
+ if legacyConfigArg != "-legacy-keepbalance-config" {
+ ldr.KeepBalancePath = ""
+ }
return munged
}
ldr.loadOldWebsocketConfig(&cfg),
ldr.loadOldKeepproxyConfig(&cfg),
ldr.loadOldGitHttpdConfig(&cfg),
+ ldr.loadOldKeepBalanceConfig(&cfg),
} {
if err != nil {
return nil, err
})
ctx := ctxlog.Context(c.ctx, logger)
- listenURL, err := getListenAddr(cluster.Services, c.svcName)
+ listenURL, err := getListenAddr(cluster.Services, c.svcName, log)
if err != nil {
return 1
}
const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-func getListenAddr(svcs arvados.Services, prog arvados.ServiceName) (arvados.URL, error) {
+func getListenAddr(svcs arvados.Services, prog arvados.ServiceName, log logrus.FieldLogger) (arvados.URL, error) {
svc, ok := svcs.Map()[prog]
if !ok {
return arvados.URL{}, fmt.Errorf("unknown service name %q", prog)
if err == nil {
listener.Close()
return url, nil
+ } else if strings.Contains(err.Error(), "cannot assign requested address") {
+ continue
+ } else if strings.Contains(err.Error(), "address already in use") {
+ return url, err
+ } else {
+ log.Warn(err)
}
}
return arvados.URL{}, fmt.Errorf("configuration does not enable the %s service on this host", prog)
hints:
- class: arv:RunInSingleContainer
- class: ResourceRequirement
- ramMin: $(inputs.count*32)
+ ramMin: $(96+inputs.count*32)
- class: arv:APIRequirement
scatter: count
run:
type: int
script: File
outputs: []
- arguments: [python, $(inputs.script), $(inputs.count * 32)]
+ arguments: [python, $(inputs.script), $(96+inputs.count * 32)]
outputs: []
hints:
- class: ResourceRequirement
- ramMin: $(inputs.count*32)
+ ramMin: $(96+inputs.count*32)
steps:
sleep1:
in:
type: int
script: File
outputs: []
- arguments: [python, $(inputs.script), $(inputs.count * 32)]
+ arguments: [python, $(inputs.script), $(96+inputs.count * 32)]
id: subtool
hints:
- class: ResourceRequirement
- ramMin: $(inputs.count*32)
+ ramMin: $(96+inputs.count*32)
inputs:
count:
type: int
script: File
outputs: []
- arguments: [python, $(inputs.script), $(inputs.count * 32)]
+ arguments: [python, $(inputs.script), $(96+inputs.count * 32)]
id: subtool
hints:
- class: ResourceRequirement
- ramMin: 32
+ ramMin: 128
inputs:
count:
type: int
script: File
outputs: []
- arguments: [python, $(inputs.script), "32"]
+ arguments: [python, $(inputs.script), "128"]
TrashSweepInterval Duration
TrustAllContent bool
+ BlobMissingReport string
+ BalancePeriod Duration
+ BalanceCollectionBatch int
+ BalanceCollectionBuffers int
+
WebDAVCache WebDAVCacheConfig
}
Git struct {
current_user.andand.is_admin
end
+ def permission_to_destroy
+ current_user.andand.is_admin
+ end
+
def ensure_owner_uuid_is_permitted
# validate_change ensures owner_uuid can't be changed at all --
# except during create, which requires admin privileges. Checking
end
end
+ test "user cannot delete" do
+ set_user_from_auth :active
+ c, _ = minimal_new
+ assert_raises ArvadosModel::PermissionDeniedError do
+ c.destroy
+ end
+ assert Container.find_by_uuid(c.uuid)
+ end
+
[
{state: Container::Complete, exit_code: 0, output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45'},
{state: Container::Cancelled},
// Typical usage:
//
// runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
nextRunOptions = runOptions
defer bal.time("sweep", "wall clock time to run one full sweep")()
bal.lostBlocks = ioutil.Discard
}
- if len(config.KeepServiceList.Items) > 0 {
- err = bal.SetKeepServices(config.KeepServiceList)
- } else {
- err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
- }
+ err = bal.DiscoverKeepServices(client)
if err != nil {
return
}
for _, srv := range bal.KeepServices {
- err = srv.discoverMounts(&config.Client)
+ err = srv.discoverMounts(client)
if err != nil {
return
}
}
bal.cleanupMounts()
- if err = bal.CheckSanityEarly(&config.Client); err != nil {
+ if err = bal.CheckSanityEarly(client); err != nil {
return
}
rs := bal.rendezvousState()
bal.logf("notice: KeepServices list has changed since last run")
}
bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
- if err = bal.ClearTrashLists(&config.Client); err != nil {
+ if err = bal.ClearTrashLists(client); err != nil {
return
}
// The current rendezvous state becomes "safe" (i.e.,
// succeed in clearing existing trash lists.
nextRunOptions.SafeRendezvousState = rs
}
- if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
+ if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
lbFile = nil
}
if runOptions.CommitPulls {
- err = bal.CommitPulls(&config.Client)
+ err = bal.CommitPulls(client)
if err != nil {
// Skip trash if we can't pull. (Too cautious?)
return
}
}
if runOptions.CommitTrash {
- err = bal.CommitTrash(&config.Client)
+ err = bal.CommitTrash(client)
}
return
}
// DiscoverKeepServices sets the list of KeepServices by calling the
// API to get a list of all services, and selecting the ones whose
-// ServiceType is in okTypes.
-func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+// ServiceType is "disk"
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
bal.KeepServices = make(map[string]*KeepService)
- ok := make(map[string]bool)
- for _, t := range okTypes {
- ok[t] = true
- }
return c.EachKeepService(func(srv arvados.KeepService) error {
- if ok[srv.ServiceType] {
+ if srv.ServiceType == "disk" {
bal.KeepServices[srv.UUID] = &KeepService{
KeepService: srv,
ChangeSet: &ChangeSet{},
if coll.ReplicationDesired != nil {
repl = *coll.ReplicationDesired
}
- debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+ bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
// Pass pdh to IncreaseDesired only if LostBlocksFile is being
// written -- otherwise it's just a waste of memory.
pdh := ""
// balanceBlock compares current state to desired state for a single
// block, and makes the appropriate ChangeSet calls.
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
- debugf("balanceBlock: %v %+v", blkid, blk)
+ bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
type slot struct {
mnt *KeepMount // never nil
package main
import (
+ "bytes"
"encoding/json"
"fmt"
"io"
"sync"
"time"
+ "git.curoverse.com/arvados.git/lib/config"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "github.com/sirupsen/logrus"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/common/expfmt"
check "gopkg.in/check.v1"
)
type runSuite struct {
stub stubServer
- config Config
-}
-
-// make a log.Logger that writes to the current test's c.Log().
-func (s *runSuite) logger(c *check.C) *logrus.Logger {
- r, w := io.Pipe()
- go func() {
- buf := make([]byte, 10000)
- for {
- n, err := r.Read(buf)
- if n > 0 {
- if buf[n-1] == '\n' {
- n--
- }
- c.Log(string(buf[:n]))
- }
- if err != nil {
- break
- }
- }
- }()
- logger := logrus.New()
- logger.Out = w
- return logger
+ config *arvados.Cluster
+ client *arvados.Client
+}
+
+func (s *runSuite) newServer(options *RunOptions) *Server {
+ srv := &Server{
+ Cluster: s.config,
+ ArvClient: s.client,
+ RunOptions: *options,
+ Metrics: newMetrics(prometheus.NewRegistry()),
+ Logger: options.Logger,
+ Dumper: options.Dumper,
+ }
+ return srv
}
func (s *runSuite) SetUpTest(c *check.C) {
- s.config = Config{
- Client: arvados.Client{
- AuthToken: "xyzzy",
- APIHost: "zzzzz.arvadosapi.com",
- Client: s.stub.Start()},
- KeepServiceTypes: []string{"disk"},
- RunPeriod: arvados.Duration(time.Second),
- }
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.Equals, nil)
+ s.config, err = cfg.GetCluster("")
+ c.Assert(err, check.Equals, nil)
+
+ s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
+ arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
+
+ s.client = &arvados.Client{
+ AuthToken: "xyzzy",
+ APIHost: "zzzzz.arvadosapi.com",
+ Client: s.stub.Start()}
+
s.stub.serveDiscoveryDoc()
s.stub.logf = c.Logf
}
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
- Logger: s.logger(c),
+ Logger: ctxlog.TestLogger(c),
}
s.stub.serveCurrentUserAdmin()
s.stub.serveZeroCollections()
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- _, err = srv.Run()
+ srv := s.newServer(&opts)
+ _, err := srv.runOnce()
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
}
-func (s *runSuite) TestServiceTypes(c *check.C) {
- opts := RunOptions{
- CommitPulls: true,
- CommitTrash: true,
- Logger: s.logger(c),
- }
- s.config.KeepServiceTypes = []string{"unlisted-type"}
- s.stub.serveCurrentUserAdmin()
- s.stub.serveFooBarFileCollections()
- s.stub.serveKeepServices(stubServices)
- s.stub.serveKeepstoreMounts()
- indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
- trashReqs := s.stub.serveKeepstoreTrash()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- _, err = srv.Run()
- c.Check(err, check.IsNil)
- c.Check(indexReqs.Count(), check.Equals, 0)
- c.Check(trashReqs.Count(), check.Equals, 0)
-}
-
func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
- Logger: s.logger(c),
+ Logger: ctxlog.TestLogger(c),
}
s.stub.serveCurrentUserNotAdmin()
s.stub.serveZeroCollections()
s.stub.serveKeepstoreMounts()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- _, err = srv.Run()
+ srv := s.newServer(&opts)
+ _, err := srv.runOnce()
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
- Logger: s.logger(c),
+ Logger: ctxlog.TestLogger(c),
}
s.stub.serveCurrentUserAdmin()
s.stub.serveCollectionsButSkipOne()
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- _, err = srv.Run()
+ srv := s.newServer(&opts)
+ _, err := srv.runOnce()
c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
func (s *runSuite) TestWriteLostBlocks(c *check.C) {
lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
c.Assert(err, check.IsNil)
- s.config.LostBlocksFile = lostf.Name()
+ s.config.Collections.BlobMissingReport = lostf.Name()
defer os.Remove(lostf.Name())
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
- Logger: s.logger(c),
+ Logger: ctxlog.TestLogger(c),
}
s.stub.serveCurrentUserAdmin()
s.stub.serveFooBarFileCollections()
s.stub.serveKeepstoreIndexFoo1()
s.stub.serveKeepstoreTrash()
s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
+ srv := s.newServer(&opts)
c.Assert(err, check.IsNil)
- _, err = srv.Run()
+ _, err = srv.runOnce()
c.Check(err, check.IsNil)
lost, err := ioutil.ReadFile(lostf.Name())
c.Assert(err, check.IsNil)
opts := RunOptions{
CommitPulls: false,
CommitTrash: false,
- Logger: s.logger(c),
+ Logger: ctxlog.TestLogger(c),
}
s.stub.serveCurrentUserAdmin()
collReqs := s.stub.serveFooBarFileCollections()
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- bal, err := srv.Run()
+ srv := s.newServer(&opts)
+ bal, err := srv.runOnce()
c.Check(err, check.IsNil)
for _, req := range collReqs.reqs {
c.Check(req.Form.Get("include_trash"), check.Equals, "true")
func (s *runSuite) TestCommit(c *check.C) {
lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
c.Assert(err, check.IsNil)
- s.config.LostBlocksFile = lostf.Name()
+ s.config.Collections.BlobMissingReport = lostf.Name()
defer os.Remove(lostf.Name())
- s.config.Listen = ":"
s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
- Logger: s.logger(c),
- Dumper: s.logger(c),
+ Logger: ctxlog.TestLogger(c),
+ Dumper: ctxlog.TestLogger(c),
}
s.stub.serveCurrentUserAdmin()
s.stub.serveFooBarFileCollections()
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
- bal, err := srv.Run()
+ srv := s.newServer(&opts)
+ bal, err := srv.runOnce()
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
c.Assert(err, check.IsNil)
c.Check(string(lost), check.Equals, "")
- metrics := s.getMetrics(c, srv)
- c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
- c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
- c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
- c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
- c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
+ buf, err := s.getMetrics(c, srv)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`)
+ c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+ c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count 1\n.*`)
+ c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_byte_ratio 1\.5\n.*`)
+ c.Check(buf, check.Matches, `(?ms).*\narvados_keep_dedup_block_ratio 1\.5\n.*`)
}
func (s *runSuite) TestRunForever(c *check.C) {
- s.config.Listen = ":"
s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
- Logger: s.logger(c),
- Dumper: s.logger(c),
+ Logger: ctxlog.TestLogger(c),
+ Dumper: ctxlog.TestLogger(c),
}
s.stub.serveCurrentUserAdmin()
s.stub.serveFooBarFileCollections()
pullReqs := s.stub.serveKeepstorePull()
stop := make(chan interface{})
- s.config.RunPeriod = arvados.Duration(time.Millisecond)
- srv, err := NewServer(s.config, opts)
- c.Assert(err, check.IsNil)
+ s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
+ srv := s.newServer(&opts)
done := make(chan bool)
go func() {
- srv.RunForever(stop)
+ srv.runForever(stop)
close(done)
}()
<-done
c.Check(pullReqs.Count() >= 16, check.Equals, true)
c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
- c.Check(s.getMetrics(c, srv), check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
+
+ buf, err := s.getMetrics(c, srv)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
}
-func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
- resp, err := http.Get("http://" + srv.listening + "/metrics")
- c.Assert(err, check.IsNil)
- c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+func (s *runSuite) getMetrics(c *check.C, srv *Server) (*bytes.Buffer, error) {
+ mfs, err := srv.Metrics.reg.Gather()
+ if err != nil {
+ return nil, err
+ }
- resp, err = http.Get("http://" + srv.listening + "/metrics?api_token=xyzzy")
- c.Assert(err, check.IsNil)
- c.Check(resp.StatusCode, check.Equals, http.StatusOK)
- buf, err := ioutil.ReadAll(resp.Body)
- c.Check(err, check.IsNil)
- return string(buf)
+ var buf bytes.Buffer
+ for _, mf := range mfs {
+ if _, err := expfmt.MetricFamilyToText(&buf, mf); err != nil {
+ return nil, err
+ }
+ }
+
+ return &buf, nil
}
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
check "gopkg.in/check.v1"
)
}
bal.signatureTTL = 3600
+ bal.Logger = ctxlog.TestLogger(c)
}
func (bal *balancerSuite) SetUpTest(c *check.C) {
longestStreak := 0
var lastMod time.Time
sawUUID := make(map[string]bool)
- err := EachCollection(&s.config.Client, pageSize, func(c arvados.Collection) error {
+ err := EachCollection(s.client, pageSize, func(c arvados.Collection) error {
if c.ModifiedAt == nil {
return nil
}
"testing"
"time"
+ "git.curoverse.com/arvados.git/lib/config"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&integrationSuite{})
type integrationSuite struct {
- config Config
+ config *arvados.Cluster
+ client *arvados.Client
keepClient *keepclient.KeepClient
}
}
func (s *integrationSuite) SetUpTest(c *check.C) {
- s.config = Config{
- Client: arvados.Client{
- APIHost: os.Getenv("ARVADOS_API_HOST"),
- AuthToken: arvadostest.DataManagerToken,
- Insecure: true,
- },
- KeepServiceTypes: []string{"disk"},
- RunPeriod: arvados.Duration(time.Second),
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.Equals, nil)
+ s.config, err = cfg.GetCluster("")
+ c.Assert(err, check.Equals, nil)
+ s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
+
+ s.client = &arvados.Client{
+ APIHost: os.Getenv("ARVADOS_API_HOST"),
+ AuthToken: arvadostest.DataManagerToken,
+ Insecure: true,
}
}
bal := &Balancer{
Logger: logger,
- Metrics: newMetrics(),
+ Metrics: newMetrics(prometheus.NewRegistry()),
}
- nextOpts, err := bal.Run(s.config, opts)
+ nextOpts, err := bal.Run(s.client, s.config, opts)
c.Check(err, check.IsNil)
c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
c.Check(nextOpts.CommitPulls, check.Equals, true)
Description=Arvados Keep Balance
Documentation=https://doc.arvados.org/
After=network.target
-AssertPathExists=/etc/arvados/keep-balance/keep-balance.yml
# systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
StartLimitInterval=0
package main
import (
- "encoding/json"
+ "context"
"flag"
"fmt"
- "log"
- "net/http"
+ "io"
"os"
- "time"
+ "git.curoverse.com/arvados.git/lib/config"
+ "git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/config"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
-var debugf = func(string, ...interface{}) {}
-
func main() {
- var cfg Config
- var runOptions RunOptions
+ os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
+}
+
+func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ logger := ctxlog.FromContext(context.Background())
- configPath := flag.String("config", defaultConfigPath,
- "`path` of JSON or YAML configuration file")
- serviceListPath := flag.String("config.KeepServiceList", "",
- "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
- "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
- flag.BoolVar(&runOptions.Once, "once", false,
+ var options RunOptions
+ flags := flag.NewFlagSet(prog, flag.ExitOnError)
+ flags.BoolVar(&options.Once, "once", false,
"balance once and then exit")
- flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+ flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
"send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
- flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+ flags.BoolVar(&options.CommitTrash, "commit-trash", false,
"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
- dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
- dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
- debugFlag := flag.Bool("debug", false, "enable debug messages")
- getVersion := flag.Bool("version", false, "Print version information and exit.")
- flag.Usage = usage
- flag.Parse()
+ flags.Bool("version", false, "Write version information to stdout and exit 0")
+ dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
- // Print version information if requested
- if *getVersion {
- fmt.Printf("keep-balance %s\n", version)
- return
- }
+ loader := config.NewLoader(os.Stdin, logger)
+ loader.SetupFlags(flags)
- mustReadConfig(&cfg, *configPath)
- if *serviceListPath != "" {
- mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
- }
+ munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config")
+ flags.Parse(munged)
- if *dumpConfig {
- log.Fatal(config.DumpAndExit(cfg))
- }
-
- to := time.Duration(cfg.RequestTimeout)
- if to == 0 {
- to = 30 * time.Minute
- }
- arvados.DefaultSecureClient.Timeout = to
- arvados.InsecureHTTPClient.Timeout = to
- http.DefaultClient.Timeout = to
-
- log.Printf("keep-balance %s started", version)
-
- if *debugFlag {
- debugf = log.Printf
- if j, err := json.Marshal(cfg); err != nil {
- log.Fatal(err)
- } else {
- log.Printf("config is %s", j)
- }
- }
if *dumpFlag {
dumper := logrus.New()
dumper.Out = os.Stdout
dumper.Formatter = &logrus.TextFormatter{}
- runOptions.Dumper = dumper
- }
- srv, err := NewServer(cfg, runOptions)
- if err != nil {
- // (don't run)
- } else if runOptions.Once {
- _, err = srv.Run()
- } else {
- err = srv.RunForever(nil)
- }
- if err != nil {
- log.Fatal(err)
+ options.Dumper = dumper
}
-}
-func mustReadConfig(dst interface{}, path string) {
- if err := config.LoadFile(dst, path); err != nil {
- log.Fatal(err)
- }
+ // Only pass along the version flag, which gets handled in RunCommand
+ args = nil
+ flags.Visit(func(f *flag.Flag) {
+ if f.Name == "version" {
+ args = append(args, "-"+f.Name, f.Value.String())
+ }
+ })
+
+ return service.Command(arvados.ServiceNameKeepbalance,
+ func(ctx context.Context, cluster *arvados.Cluster, token string, registry *prometheus.Registry) service.Handler {
+ if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("cannot start service: Collections.BalancePeriod is zero (if you want to run once and then exit, use the -once flag)"))
+ }
+
+ ac, err := arvados.NewClientFromConfig(cluster)
+ ac.AuthToken = token
+ if err != nil {
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+ }
+
+ if options.Logger == nil {
+ options.Logger = ctxlog.FromContext(ctx)
+ }
+
+ srv := &Server{
+ Cluster: cluster,
+ ArvClient: ac,
+ RunOptions: options,
+ Metrics: newMetrics(registry),
+ Logger: options.Logger,
+ Dumper: options.Dumper,
+ }
+
+ go srv.run()
+ return srv
+ }).RunCommand(prog, args, stdin, stdout, stderr)
}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "time"
-
- "github.com/ghodss/yaml"
- check "gopkg.in/check.v1"
-)
-
-var _ = check.Suite(&mainSuite{})
-
-type mainSuite struct{}
-
-func (s *mainSuite) TestExampleJSON(c *check.C) {
- var config Config
- c.Check(yaml.Unmarshal(exampleConfigFile, &config), check.IsNil)
- c.Check(config.KeepServiceTypes, check.DeepEquals, []string{"disk"})
- c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
- c.Check(time.Duration(config.RunPeriod), check.Equals, 600*time.Second)
-}
-
-func (s *mainSuite) TestConfigJSONWithKeepServiceList(c *check.C) {
- var config Config
- c.Check(yaml.Unmarshal([]byte(`{
- "Client": {
- "APIHost": "zzzzz.arvadosapi.com:443",
- "AuthToken": "xyzzy",
- "Insecure": false
- },
- "KeepServiceList": {
- "items": [
- {"uuid":"zzzzz-bi64l-abcdefghijklmno", "service_type":"disk", "service_host":"a.zzzzz.arvadosapi.com", "service_port":12345},
- {"uuid":"zzzzz-bi64l-bcdefghijklmnop", "service_type":"blob", "service_host":"b.zzzzz.arvadosapi.com", "service_port":12345}
- ]
- },
- "RunPeriod": "600s"
- }`), &config), check.IsNil)
- c.Assert(len(config.KeepServiceList.Items), check.Equals, 2)
- c.Check(config.KeepServiceList.Items[0].UUID, check.Equals, "zzzzz-bi64l-abcdefghijklmno")
- c.Check(config.KeepServiceList.Items[0].ServicePort, check.Equals, 12345)
- c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
-}
mtx sync.Mutex
}
-func newMetrics() *metrics {
+func newMetrics(registry *prometheus.Registry) *metrics {
return &metrics{
- reg: prometheus.NewRegistry(),
+ reg: registry,
statsGauges: map[string]setter{},
observers: map[string]observer{},
}
package main
import (
- "context"
- "fmt"
"net/http"
"os"
"os/signal"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/auth"
- "git.curoverse.com/arvados.git/sdk/go/ctxlog"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/sirupsen/logrus"
)
-var version = "dev"
-
-const (
- defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
- rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-)
-
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
- // Arvados API endpoint and credentials.
- Client arvados.Client
-
- // List of service types (e.g., "disk") to balance.
- KeepServiceTypes []string
-
- KeepServiceList arvados.KeepServiceList
-
- // address, address:port, or :port for management interface
- Listen string
-
- // token for management APIs
- ManagementToken string
-
- // How often to check
- RunPeriod arvados.Duration
-
- // Number of collections to request in each API call
- CollectionBatchSize int
-
- // Max collections to buffer in memory (bigger values consume
- // more memory, but can reduce store-and-forward latency when
- // fetching pages)
- CollectionBuffers int
-
- // Timeout for outgoing http request/response cycle.
- RequestTimeout arvados.Duration
-
- // Destination filename for the list of lost block hashes, one
- // per line. Updated atomically during each successful run.
- LostBlocksFile string
-}
-
// RunOptions controls runtime behavior. The flags/options that belong
// here are the ones that are useful for interactive use. For example,
// "CommitTrash" is a runtime option rather than a config item because
}
type Server struct {
- config Config
- runOptions RunOptions
- metrics *metrics
- listening string // for tests
+ http.Handler
+
+ Cluster *arvados.Cluster
+ ArvClient *arvados.Client
+ RunOptions RunOptions
+ Metrics *metrics
Logger logrus.FieldLogger
Dumper logrus.FieldLogger
}
-// NewServer returns a new Server that runs Balancers using the given
-// config and runOptions.
-func NewServer(config Config, runOptions RunOptions) (*Server, error) {
- if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
- return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
- }
- if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
- return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
- }
-
- if runOptions.Logger == nil {
- log := logrus.New()
- log.Formatter = &logrus.JSONFormatter{
- TimestampFormat: rfc3339NanoFixed,
- }
- log.Out = os.Stderr
- runOptions.Logger = log
- }
-
- srv := &Server{
- config: config,
- runOptions: runOptions,
- metrics: newMetrics(),
- Logger: runOptions.Logger,
- Dumper: runOptions.Dumper,
- }
- return srv, srv.start()
+// CheckHealth implements service.Handler.
+func (srv *Server) CheckHealth() error {
+ return nil
}
-func (srv *Server) start() error {
- if srv.config.Listen == "" {
- return nil
- }
- ctx := ctxlog.Context(context.Background(), srv.Logger)
- server := &httpserver.Server{
- Server: http.Server{
- Handler: httpserver.HandlerWithContext(ctx,
- httpserver.LogRequests(
- auth.RequireLiteralToken(srv.config.ManagementToken,
- srv.metrics.Handler(srv.Logger)))),
- },
- Addr: srv.config.Listen,
+func (srv *Server) run() {
+ var err error
+ if srv.RunOptions.Once {
+ _, err = srv.runOnce()
+ } else {
+ err = srv.runForever(nil)
}
- err := server.Start()
if err != nil {
- return err
+ srv.Logger.Error(err)
+ os.Exit(1)
+ } else {
+ os.Exit(0)
}
- srv.Logger.Printf("listening at %s", server.Addr)
- srv.listening = server.Addr
- return nil
}
-func (srv *Server) Run() (*Balancer, error) {
+func (srv *Server) runOnce() (*Balancer, error) {
bal := &Balancer{
Logger: srv.Logger,
Dumper: srv.Dumper,
- Metrics: srv.metrics,
- LostBlocksFile: srv.config.LostBlocksFile,
+ Metrics: srv.Metrics,
+ LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
}
var err error
- srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+ srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
return bal, err
}
// RunForever runs forever, or (for testing purposes) until the given
// stop channel is ready to receive.
-func (srv *Server) RunForever(stop <-chan interface{}) error {
- logger := srv.runOptions.Logger
+func (srv *Server) runForever(stop <-chan interface{}) error {
+ logger := srv.Logger
- ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
+ ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
// The unbuffered channel here means we only hear SIGUSR1 if
// it arrives while we're waiting in select{}.
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
- logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
+ logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
for {
- if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
+ if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- _, err := srv.Run()
+ _, err := srv.runOnce()
if err != nil {
logger.Print("run failed: ", err)
} else {
// run too soon after the Nth run is triggered
// by SIGUSR1.
ticker.Stop()
- ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
+ ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
}
logger.Print("starting next run")
}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "flag"
- "fmt"
- "os"
-)
-
-var exampleConfigFile = []byte(`
-Client:
- APIHost: zzzzz.arvadosapi.com:443
- AuthToken: xyzzy
- Insecure: false
-KeepServiceTypes:
- - disk
-Listen: ":9005"
-ManagementToken: xyzzy
-RunPeriod: 600s
-CollectionBatchSize: 100000
-CollectionBuffers: 1000
-RequestTimeout: 30m`)
-
-func usage() {
- fmt.Fprintf(os.Stderr, `
-
-keep-balance rebalances a set of keepstore servers. It creates new
-copies of underreplicated blocks, deletes excess copies of
-overreplicated and unreferenced blocks, and moves blocks to better
-positions (according to the rendezvous hash algorithm) so clients find
-them faster.
-
-Usage: keep-balance [options]
-
-Options:
-`)
- flag.PrintDefaults()
- fmt.Fprintf(os.Stderr, `
-Example config file:
-%s
-
- Client.AuthToken must be recognized by Arvados as an admin token,
- and must be recognized by all Keep services as a "data manager
- key".
-
- Client.Insecure should be true if your Arvados API endpoint uses
- an unverifiable SSL/TLS certificate.
-
-Periodic scanning:
-
- By default, keep-balance operates periodically, i.e.: do a
- scan/balance operation, sleep, repeat.
-
- RunPeriod determines the interval between start times of
- successive scan/balance operations. If a scan/balance operation
- takes longer than RunPeriod, the next one will follow it
- immediately.
-
- If SIGUSR1 is received during an idle period between operations,
- the next operation will start immediately.
-
-One-time scanning:
-
- Use the -once flag to do a single operation and then exit. The
- exit code will be zero if the operation was successful.
-
-Committing:
-
- By default, keep-service computes and reports changes but does not
- implement them by sending pull and trash lists to the Keep
- services.
-
- Use the -commit-pull and -commit-trash flags to implement the
- computed changes.
-
-Tuning resource usage:
-
- CollectionBatchSize limits the number of collections retrieved per
- API transaction. If this is zero or omitted, page size is
- determined by the API server's own page size limits (see
- max_items_per_response and max_index_database_read configs).
-
- CollectionBuffers sets the size of an internal queue of
- collections. Higher values use more memory, and improve throughput
- by allowing keep-balance to fetch the next page of collections
- while the current page is still being processed. If this is zero
- or omitted, pages are processed serially.
-
- RequestTimeout is the maximum time keep-balance will spend on a
- single HTTP request (getting a page of collections, getting the
- block index from a keepstore server, or sending a trash or pull
- list to a keepstore server). Defaults to 30 minutes.
-
-Limitations:
-
- keep-balance does not attempt to discover whether committed pull
- and trash requests ever get carried out -- only that they are
- accepted by the Keep services. If some services are full, new
- copies of underreplicated blocks might never get made, only
- repeatedly requested.
-
-`, exampleConfigFile)
-}
package main
import (
+ "bufio"
"bytes"
"context"
"crypto/sha256"
}
func (v *S3Volume) check() error {
- if v.Bucket == "" || v.AccessKey == "" || v.SecretKey == "" {
- return errors.New("DriverParameters: Bucket, AccessKey, and SecretKey must be provided")
+ if v.Bucket == "" {
+ return errors.New("DriverParameters: Bucket must be provided")
}
if v.IndexPageSize == 0 {
v.IndexPageSize = 1000
return errors.New("DriverParameters: RaceWindow must not be negative")
}
- region, ok := aws.Regions[v.Region]
+ var ok bool
+ v.region, ok = aws.Regions[v.Region]
if v.Endpoint == "" {
if !ok {
return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
"specify empty endpoint or use a different region name", v.Region, v.Endpoint)
} else {
- region = aws.Region{
+ v.region = aws.Region{
Name: v.Region,
S3Endpoint: v.Endpoint,
S3LocationConstraint: v.LocationConstraint,
}
}
- auth := aws.Auth{
- AccessKey: v.AccessKey,
- SecretKey: v.SecretKey,
- }
-
// Zero timeouts mean "wait forever", which is a bad
// default. Default to long timeouts instead.
if v.ConnectTimeout == 0 {
v.ReadTimeout = s3DefaultReadTimeout
}
- client := s3.New(auth, region)
- if region.EC2Endpoint.Signer == aws.V4Signature {
- // Currently affects only eu-central-1
- client.Signature = aws.V4Signature
- }
- client.ConnectTimeout = time.Duration(v.ConnectTimeout)
- client.ReadTimeout = time.Duration(v.ReadTimeout)
v.bucket = &s3bucket{
- Bucket: &s3.Bucket{
- S3: client,
+ bucket: &s3.Bucket{
+ S3: v.newS3Client(),
Name: v.Bucket,
},
}
lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
+ err := v.bootstrapIAMCredentials()
+ if err != nil {
+ return fmt.Errorf("error getting IAM credentials: %s", err)
+ }
+
return nil
}
type S3Volume struct {
AccessKey string
SecretKey string
+ AuthToken string // populated automatically when IAMRole is used
+ AuthExpiration time.Time // populated automatically when IAMRole is used
+ IAMRole string
Endpoint string
Region string
Bucket string
logger logrus.FieldLogger
metrics *volumeMetricsVecs
bucket *s3bucket
+ region aws.Region
startOnce sync.Once
}
return "s3://" + v.Endpoint + "/" + v.Bucket
}
+func (v *S3Volume) bootstrapIAMCredentials() error {
+ if v.AccessKey != "" || v.SecretKey != "" {
+ if v.IAMRole != "" {
+ return errors.New("invalid DriverParameters: AccessKey and SecretKey must be blank if IAMRole is specified")
+ }
+ return nil
+ }
+ ttl, err := v.updateIAMCredentials()
+ if err != nil {
+ return err
+ }
+ go func() {
+ for {
+ time.Sleep(ttl)
+ ttl, err = v.updateIAMCredentials()
+ if err != nil {
+ v.logger.WithError(err).Warnf("failed to update credentials for IAM role %q", v.IAMRole)
+ ttl = time.Second
+ } else if ttl < time.Second {
+ v.logger.WithField("TTL", ttl).Warnf("received stale credentials for IAM role %q", v.IAMRole)
+ ttl = time.Second
+ }
+ }
+ }()
+ return nil
+}
+
+func (v *S3Volume) newS3Client() *s3.S3 {
+ auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
+ client := s3.New(*auth, v.region)
+ if v.region.EC2Endpoint.Signer == aws.V4Signature {
+ // Currently affects only eu-central-1
+ client.Signature = aws.V4Signature
+ }
+ client.ConnectTimeout = time.Duration(v.ConnectTimeout)
+ client.ReadTimeout = time.Duration(v.ReadTimeout)
+ return client
+}
+
+// returned by AWS metadata endpoint .../security-credentials/${rolename}
+type iamCredentials struct {
+ Code string
+ LastUpdated time.Time
+ Type string
+ AccessKeyID string
+ SecretAccessKey string
+ Token string
+ Expiration time.Time
+}
+
+// Returns TTL of updated credentials, i.e., time to sleep until next
+// update.
+func (v *S3Volume) updateIAMCredentials() (time.Duration, error) {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+
+ metadataBaseURL := "http://169.254.169.254/latest/meta-data/iam/security-credentials/"
+
+ var url string
+ if strings.Contains(v.IAMRole, "://") {
+ // Configuration provides complete URL (used by tests)
+ url = v.IAMRole
+ } else if v.IAMRole != "" {
+ // Configuration provides IAM role name and we use the
+ // AWS metadata endpoint
+ url = metadataBaseURL + v.IAMRole
+ } else {
+ url = metadataBaseURL
+ v.logger.WithField("URL", url).Debug("looking up IAM role name")
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return 0, fmt.Errorf("error setting up request %s: %s", url, err)
+ }
+ resp, err := http.DefaultClient.Do(req.WithContext(ctx))
+ if err != nil {
+ return 0, fmt.Errorf("error getting %s: %s", url, err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode == http.StatusNotFound {
+ return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKey and SecretKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
+ } else if resp.StatusCode != http.StatusOK {
+ return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
+ }
+ body := bufio.NewReader(resp.Body)
+ var role string
+ _, err = fmt.Fscanf(body, "%s\n", &role)
+ if err != nil {
+ return 0, fmt.Errorf("error reading response from %s: %s", url, err)
+ }
+ if n, _ := body.Read(make([]byte, 64)); n > 0 {
+ v.logger.Warnf("ignoring additional data returned by metadata endpoint %s after the single role name that we expected", url)
+ }
+ v.logger.WithField("Role", role).Debug("looked up IAM role name")
+ url = url + role
+ }
+
+ v.logger.WithField("URL", url).Debug("getting credentials")
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return 0, fmt.Errorf("error setting up request %s: %s", url, err)
+ }
+ resp, err := http.DefaultClient.Do(req.WithContext(ctx))
+ if err != nil {
+ return 0, fmt.Errorf("error getting %s: %s", url, err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
+ }
+ var cred iamCredentials
+ err = json.NewDecoder(resp.Body).Decode(&cred)
+ if err != nil {
+ return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
+ }
+ v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
+ v.bucket.SetBucket(&s3.Bucket{
+ S3: v.newS3Client(),
+ Name: v.Bucket,
+ })
+ // TTL is time from now to expiration, minus 5m. "We make new
+ // credentials available at least five minutes before the
+ // expiration of the old credentials." --
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+ // (If that's not true, the returned ttl might be zero or
+ // negative, which the caller can handle.)
+ ttl := cred.Expiration.Sub(time.Now()) - 5*time.Minute
+ v.logger.WithFields(logrus.Fields{
+ "AccessKeyID": cred.AccessKeyID,
+ "LastUpdated": cred.LastUpdated,
+ "Expiration": cred.Expiration,
+ "TTL": arvados.Duration(ttl),
+ }).Debug("updated credentials")
+ return ttl, nil
+}
+
func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
ready := make(chan bool)
go func() {
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
- Bucket: v.bucket.Bucket,
+ Bucket: v.bucket.Bucket(),
Prefix: prefix,
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
recentL := s3Lister{
- Bucket: v.bucket.Bucket,
+ Bucket: v.bucket.Bucket(),
Prefix: "recent/" + prefix,
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
// (PutCopy returns 200 OK if the request was received, even if the
// copy failed).
func (v *S3Volume) safeCopy(dst, src string) error {
- resp, err := v.bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
+ resp, err := v.bucket.Bucket().PutCopy(dst, s3ACL, s3.CopyOptions{
ContentType: "application/octet-stream",
MetadataDirective: "REPLACE",
- }, v.bucket.Name+"/"+src)
+ }, v.bucket.Bucket().Name+"/"+src)
err = v.translateError(err)
if os.IsNotExist(err) {
return err
} else if err != nil {
- return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Name+"/"+src, err)
+ return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.Bucket().Name+"/"+src, err)
}
if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
}
trashL := s3Lister{
- Bucket: v.bucket.Bucket,
+ Bucket: v.bucket.Bucket(),
Prefix: "trash/",
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
return
}
-// s3bucket wraps s3.bucket and counts I/O and API usage stats.
+// s3bucket wraps s3.bucket and counts I/O and API usage stats. The
+// wrapped bucket can be replaced atomically with SetBucket in order
+// to update credentials.
type s3bucket struct {
- *s3.Bucket
- stats s3bucketStats
+ bucket *s3.Bucket
+ stats s3bucketStats
+ mu sync.Mutex
+}
+
+func (b *s3bucket) Bucket() *s3.Bucket {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ return b.bucket
+}
+
+func (b *s3bucket) SetBucket(bucket *s3.Bucket) {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ b.bucket = bucket
}
func (b *s3bucket) GetReader(path string) (io.ReadCloser, error) {
- rdr, err := b.Bucket.GetReader(path)
+ rdr, err := b.Bucket().GetReader(path)
b.stats.TickOps("get")
b.stats.Tick(&b.stats.Ops, &b.stats.GetOps)
b.stats.TickErr(err)
}
func (b *s3bucket) Head(path string, headers map[string][]string) (*http.Response, error) {
- resp, err := b.Bucket.Head(path, headers)
+ resp, err := b.Bucket().Head(path, headers)
b.stats.TickOps("head")
b.stats.Tick(&b.stats.Ops, &b.stats.HeadOps)
b.stats.TickErr(err)
} else {
r = NewCountingReader(r, b.stats.TickOutBytes)
}
- err := b.Bucket.PutReader(path, r, length, contType, perm, options)
+ err := b.Bucket().PutReader(path, r, length, contType, perm, options)
b.stats.TickOps("put")
b.stats.Tick(&b.stats.Ops, &b.stats.PutOps)
b.stats.TickErr(err)
}
func (b *s3bucket) Del(path string) error {
- err := b.Bucket.Del(path)
+ err := b.Bucket().Del(path)
b.stats.TickOps("delete")
b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
b.stats.TickErr(err)
"crypto/md5"
"encoding/json"
"fmt"
+ "io"
"log"
"net/http"
"net/http/httptest"
type StubbedS3Suite struct {
s3server *httptest.Server
+ metadata *httptest.Server
cluster *arvados.Cluster
handler *handler
volumes []*TestableS3Volume
func (s *StubbedS3Suite) SetUpTest(c *check.C) {
s.s3server = nil
+ s.metadata = nil
s.cluster = testCluster(c)
s.cluster.Volumes = map[string]arvados.Volume{
"zzzzz-nyw5e-000000000000000": {Driver: "S3"},
}
}
+func (s *StubbedS3Suite) TestIAMRoleCredentials(c *check.C) {
+ s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ upd := time.Now().UTC().Add(-time.Hour).Format(time.RFC3339)
+ exp := time.Now().UTC().Add(time.Hour).Format(time.RFC3339)
+ // Literal example from
+ // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html#instance-metadata-security-credentials
+ // but with updated timestamps
+ io.WriteString(w, `{"Code":"Success","LastUpdated":"`+upd+`","Type":"AWS-HMAC","AccessKeyId":"ASIAIOSFODNN7EXAMPLE","SecretAccessKey":"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY","Token":"token","Expiration":"`+exp+`"}`)
+ }))
+ defer s.metadata.Close()
+
+ v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
+ c.Check(v.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+ c.Check(v.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ c.Check(v.bucket.bucket.S3.Auth.AccessKey, check.Equals, "ASIAIOSFODNN7EXAMPLE")
+ c.Check(v.bucket.bucket.S3.Auth.SecretKey, check.Equals, "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+
+ s.metadata = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusNotFound)
+ }))
+ deadv := &S3Volume{
+ IAMRole: s.metadata.URL + "/fake-metadata/test-role",
+ Endpoint: "http://localhost:12345",
+ Region: "test-region-1",
+ Bucket: "test-bucket-name",
+ cluster: s.cluster,
+ logger: ctxlog.TestLogger(c),
+ metrics: newVolumeMetricsVecs(prometheus.NewRegistry()),
+ }
+ err := deadv.check()
+ c.Check(err, check.ErrorMatches, `.*/fake-metadata/test-role.*`)
+ c.Check(err, check.ErrorMatches, `.*404.*`)
+}
+
func (s *StubbedS3Suite) TestStats(c *check.C) {
v := s.newTestableVolume(c, s.cluster, arvados.Volume{Replication: 2}, newVolumeMetricsVecs(prometheus.NewRegistry()), 5*time.Minute)
stats := func() string {
return
}
v.serverClock.now = &t
- v.bucket.Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
+ v.bucket.Bucket().Put(key, data, "application/octet-stream", s3ACL, s3.Options{})
}
t0 := time.Now()
endpoint = s.s3server.URL
}
+ iamRole, accessKey, secretKey := "", "xxx", "xxx"
+ if s.metadata != nil {
+ iamRole, accessKey, secretKey = s.metadata.URL+"/fake-metadata/test-role", "", ""
+ }
+
v := &TestableS3Volume{
S3Volume: &S3Volume{
- AccessKey: "xxx",
- SecretKey: "xxx",
+ AccessKey: accessKey,
+ SecretKey: secretKey,
+ IAMRole: iamRole,
Bucket: TestBucketName,
Endpoint: endpoint,
Region: "test-region-1",
serverClock: clock,
}
c.Assert(v.S3Volume.check(), check.IsNil)
- c.Assert(v.bucket.PutBucket(s3.ACL("private")), check.IsNil)
+ c.Assert(v.bucket.Bucket().PutBucket(s3.ACL("private")), check.IsNil)
// We couldn't set RaceWindow until now because check()
// rejects negative values.
v.S3Volume.RaceWindow = arvados.Duration(raceWindow)
// PutRaw skips the ContentMD5 test
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
- err := v.bucket.Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("PutRaw: %s: %+v", loc, err)
}
- err = v.bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
log.Printf("PutRaw: recent/%s: %+v", loc, err)
}
// while we do this.
func (v *TestableS3Volume) TouchWithDate(locator string, lastPut time.Time) {
v.serverClock.now = &lastPut
- err := v.bucket.Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
+ err := v.bucket.Bucket().Put("recent/"+locator, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
panic(err)
}
PATH
remote: .
specs:
- arvados-login-sync (1.4.0.20190729193732)
+ arvados-login-sync (1.4.1.20190930204434)
arvados (~> 1.3.0, >= 1.3.0)
+ faraday (< 0.16)
GEM
remote: https://rubygems.org/
i18n (>= 0.7, < 2)
minitest (~> 5.1)
tzinfo (~> 1.1)
- addressable (2.6.0)
- public_suffix (>= 2.0.2, < 4.0)
+ addressable (2.7.0)
+ public_suffix (>= 2.0.2, < 5.0)
andand (1.3.3)
arvados (1.3.3.20190320201707)
activesupport (>= 3)
extlib (0.9.16)
faraday (0.15.4)
multipart-post (>= 1.2, < 3)
- googleauth (0.8.1)
+ googleauth (0.9.0)
faraday (~> 0.12)
jwt (>= 1.4, < 3.0)
memoist (~> 0.16)
multi_json (1.13.1)
multipart-post (2.1.1)
os (1.0.1)
- public_suffix (3.1.1)
+ public_suffix (4.0.1)
rake (12.3.2)
retriable (1.4.1)
signet (0.11.0)
cd /usr/src/composer
-npm -d install --prefix /usr/local --global yarn
+npm -d install --prefix /usr/local --global yarn@1.17.3
yarn install
cd /usr/src/workbench2
-npm -d install --prefix /usr/local --global yarn
+npm -d install --prefix /usr/local --global yarn@1.17.3
yarn install