Merge remote-tracking branch 'origin/master' into 14714-keep-balance-config
authorEric Biagiotti <ebiagiotti@veritasgenetics.com>
Mon, 30 Sep 2019 16:31:13 +0000 (12:31 -0400)
committerEric Biagiotti <ebiagiotti@veritasgenetics.com>
Mon, 30 Sep 2019 16:31:13 +0000 (12:31 -0400)
refs #14714

Arvados-DCO-1.1-Signed-off-by: Eric Biagiotti <ebiagiotti@veritasgenetics.com>

24 files changed:
doc/_config.yml
doc/admin/collection-versioning.html.textile.liquid
doc/admin/config-migration.html.textile.liquid
doc/admin/keep-balance.html.textile.liquid [new file with mode: 0644]
doc/admin/upgrading.html.textile.liquid
doc/install/install-keep-balance.html.textile.liquid
lib/config/config.default.yml
lib/config/deprecated.go
lib/config/deprecated_test.go
lib/config/export.go
lib/config/generated_config.go
lib/config/load.go
sdk/go/arvados/config.go
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/balance_test.go
services/keep-balance/collection_test.go
services/keep-balance/integration_test.go
services/keep-balance/keep-balance.service
services/keep-balance/main.go
services/keep-balance/main_test.go [deleted file]
services/keep-balance/metrics.go
services/keep-balance/server.go
services/keep-balance/usage.go [deleted file]

index 0547c8ee9366f6788fbe87b34c5b319638a0734f..c4fad997f17afb31327a17e8a38695f22a701934 100644 (file)
@@ -172,6 +172,7 @@ navbar:
     - Data Management:
       - admin/collection-versioning.html.textile.liquid
       - admin/collection-managed-properties.html.textile.liquid
+      - admin/keep-balance.html.textile.liquid
     - Other:
       - admin/federation.html.textile.liquid
       - admin/controlling-container-reuse.html.textile.liquid
index 6da1756b5ce94bc2f1e624a290aabe3bfb3f720e..0a4d1fa769ac14b691d05bff673a9487df9559c1 100644 (file)
@@ -16,7 +16,7 @@ h3. API Server configuration
 
 There are 2 configuration settings that control this feature, both go on the @application.yml@ file.
 
-h4. Settting: @collection_versioning@ (Boolean. Default: false)
+h4. Setting: @collection_versioning@ (Boolean. Default: false)
 
 If @true@, collection versioning is enabled, meaning that new version records can be created. Note that if you set @collection_versioning@ to @false@ after being enabled, old versions will still be accessible, but further changes will not be versioned.
 
index b2ca4fd0b8709ce507add3ec54f2026d673e9301..d40cd3bbdc5feeb548693d60d820aa75fd3fee7b 100644 (file)
@@ -74,6 +74,9 @@ h2(#arv-git-httpd). arv-git-httpd
 
 The legacy arv-git-httpd config (loaded from @/etc/arvados/git-httpd/git-httpd.yml@ or a different location specified via -legacy-git-httpd-config command line argument) takes precedence over the centralized config. After you migrate everything from the legacy config to the centralized config, you should delete @/etc/arvados/git-httpd/git-httpd.yml@ and stop using the -legacy-git-httpd-config argument.
 
+h2(#keepbalance). keep-balance
+
+The legacy keep-balance config (loaded from @/etc/arvados/keep-balance/keep-balance.yml@ or a different location specified via -legacy-keepbalance-config command line argument) takes precedence over the centralized config. After you migrate everything from the legacy config to the centralized config, you should delete @/etc/arvados/keep-balance/keep-balance.yml@ and stop using the -legacy-keepbalance-config argument.
 
 h2. arvados-controller
 
diff --git a/doc/admin/keep-balance.html.textile.liquid b/doc/admin/keep-balance.html.textile.liquid
new file mode 100644 (file)
index 0000000..1a77cdd
--- /dev/null
@@ -0,0 +1,35 @@
+---
+layout: default
+navsection: admin
+title: Balancing Keep servers
+...
+
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+This page describes how to balance keepstore servers using keep-balance. Keep-balance creates new copies of under-replicated blocks, deletes excess copies of over-replicated and unreferenced blocks, and moves blocks to better positions (e.g. after adding new keepstore servers) so clients find them faster.
+
+See "the Keep-balance install docs":{{site.baseurl}}/install/install-keep-balance.html for installation instructions.
+
+h3. Scanning
+
+By default, keep-balance operates periodically, i.e. do a scan/balance operation, sleep, repeat.
+
+The @Collections.BalancePeriod@ value in @/etc/arvados/config.yml@ determines the interval between start times of successive scan/balance operations. If an operation takes longer than the @Collections.BalancePeriod@, the next operation will follow it immediately. If SIGUSR1 is received during an idle period between operations, the next operation will start immediately.
+
+Keep-balance can also be run with the @-once@ flag to do a single scan/balance operation and then exit. The exit code will be zero if the operation was successful.
+
+h3. Committing
+
+Keep-balance computes and reports changes but does not implement them by sending pull and trash lists to the Keep services unless the @-commit-pull@ and @-commit-trash@ flags are used.
+
+h3. Additional configuration
+
+For configuring resource usage tuning and lost block reporting, please see the @Collections.BlobMissingReport@, @Collections.BalanceCollectionBatch@, @Collections.BalanceCollectionBuffers@ option in the "default config.yml file":{{site.baseurl}}/admin/config.html.
+
+h3. Limitations
+
+Keep-balance does not attempt to discover whether committed pull and trash requests ever get carried out -- only that they are accepted by the Keep services. If some services are full, new copies of under-replicated blocks might never get made, only repeatedly requested.
\ No newline at end of file
index 5faf79cde7a57f317b7f5efe9fbf6f3f17632c54..2ec636a639da5f17880ccb2e28e149b7970ed502 100644 (file)
@@ -52,6 +52,16 @@ $ arv --format=uuid keep_service list | xargs -n1 arv keep_service delete --uuid
 
 Once these old records are removed, @arv keep_service list@ will instead return the services listed under Services/Keepstore/InternalURLs and Services/Keepproxy/ExternalURL in your centralized configuration file.
 
+h4. Keep-balance configuration migration
+
+(feature "#14714":https://dev.arvados.org/issues/14714 ) The keep-balance service can now be configured using the centralized configuration file at @/etc/arvados/config.yml@. The following command line and configuration options have changed.
+
+You can no longer specify types of keep services to balance via the @KeepServiceTypes@ config option in the legacy config at @/etc/arvados/keep-balance/keep-balance.yml@. If you are still using the legacy config and @KeepServiceTypes@ has a value other than "disk", keep-balance will produce an error.
+
+You can no longer specify individual keep services to balance via the @config.KeepServiceList@ command line option or @KeepServiceList@ legacy config option. Instead, keep-balance operates on all configured volumes. If you are still using the legacy config, @KeepServiceList@ should be removed or keep-balance will produce an error.
+
+Please see the "config migration guide":{{site.baseurl}}/admin/config-migration.html and "keep-balance install guide":{{site.baseurl}}/install/install-keep-balance.html for more details.
+
 h4. Arv-git-httpd configuration migration
 
 (feature "#14712":https://dev.arvados.org/issues/14712 ) The arv-git-httpd package can now be configured using the centralized configuration file at @/etc/arvados/config.yml@. Configuration via individual command line arguments is no longer available. Please see "arv-git-httpd's config migration guide":{{site.baseurl}}/admin/config-migration.html#arv-git-httpd for more details.
index a7f31dfe5fd4f35de08df2b07d247812af4252e7..d29166459c95bd307e555eee42f6cbd82e1be323 100644 (file)
@@ -9,7 +9,7 @@ Copyright (C) The Arvados Authors. All rights reserved.
 SPDX-License-Identifier: CC-BY-SA-3.0
 {% endcomment %}
 
-Keep-balance deletes unreferenced and overreplicated blocks from Keep servers, makes additional copies of underreplicated blocks, and moves blocks into optimal locations as needed (e.g., after adding new servers).
+Keep-balance deletes unreferenced and overreplicated blocks from Keep servers, makes additional copies of underreplicated blocks, and moves blocks into optimal locations as needed (e.g., after adding new servers). See "Balancing Keep servers":{{site.baseurl}}/admin/keep-balance.html for usage details.
 
 {% include 'notebox_begin' %}
 
@@ -42,44 +42,32 @@ Verify that @keep-balance@ is functional:
 <notextile>
 <pre><code>~$ <span class="userinput">keep-balance -h</span>
 ...
-Usage: keep-balance [options]
-
-Options:
+Usage of ./keep-balance:
   -commit-pulls
-        send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)
+       send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)
   -commit-trash
-        send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)
+       send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)
 ...
 </code></pre>
 </notextile>
 
-h3. Create a keep-balance configuration file
+h3. Update the cluster config
 
-On the host running keep-balance, create @/etc/arvados/keep-balance/keep-balance.yml@ using the SystemRootToken from your cluster configuration file.  Follow this YAML format:
+Edit the cluster config at @/etc/arvados/config.yml@ and set @Services.Keepbalance.InternalURLs@. Replace @uuid_prefix@ with your cluster id.
 
 <notextile>
-<pre><code>Listen: :9005
-Client:
-  APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
-  AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
-KeepServiceTypes:
-  - disk
-ManagementToken: <span class="userinput">xyzzy</span>
-RunPeriod: 10m
-CollectionBatchSize: 100000
-CollectionBuffers: 1000
-LostBlocksFile: /tmp/keep-balance-lost-blocks.txt    # If given, this file will be updated atomically during each successful run.
+<pre><code>Clusters:
+  <span class="userinput">uuid_prefix</span>:
+    Services:
+      Keepbalance:
+        InternalURLs:
+          "http://localhost:9005/": {}
+    TLS:
+      Insecure: false
 </code></pre>
 </notextile>
 
-If your API server's SSL certificate is not signed by a recognized CA, add the @Insecure@ option to the @Client@ section:
-
-<notextile>
-<pre><code>Client:
-  <span class="userinput">Insecure: true</span>
-  APIHost: ...
-</code></pre>
-</notextile>
+Set @TLS.Insecure: true@ if your API server’s TLS certificate is not signed by a recognized CA.
 
 h3. Start the service (option 1: systemd)
 
index 572a2558eda3c291463e56515c0e1583c2a4adc7..c62a100cedb6c894bf842ecf8d8ef8f194e475af 100644 (file)
@@ -382,6 +382,36 @@ Clusters:
       # The default is 2 weeks.
       BlobSigningTTL: 336h
 
+      # When running keep-balance, this is the destination filename for
+      # the list of lost block hashes if there are any, one per line.
+      # Updated automically during each successful run.
+      BlobMissingReport: ""
+
+      # keep-balance operates periodically, i.e.: do a
+      # scan/balance operation, sleep, repeat.
+      #
+      # BalancePeriod determines the interval between start times of
+      # successive scan/balance operations. If a scan/balance operation
+      # takes longer than RunPeriod, the next one will follow it
+      # immediately.
+      #
+      # If SIGUSR1 is received during an idle period between operations,
+      # the next operation will start immediately.
+      BalancePeriod: 10m
+
+      # Limits the number of collections retrieved by keep-balance per
+      # API transaction. If this is zero, page size is
+      # determined by the API server's own page size limits (see
+      # API.MaxItemsPerResponse and API.MaxIndexDatabaseRead).
+      BalanceCollectionBatch: 0
+
+      # The size of keep-balance's internal queue of
+      # collections. Higher values use more memory and improve throughput
+      # by allowing keep-balance to fetch the next page of collections
+      # while the current page is still being processed. If this is zero
+      # or omitted, pages are processed serially.
+      BalanceCollectionBuffers: 1000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index d0e61dbca06454d3488b1f50473b55f67b2a66b1..e6b06c1314668a9662a60c74b710642f33ec5f83 100644 (file)
@@ -476,6 +476,81 @@ func (ldr *Loader) loadOldGitHttpdConfig(cfg *arvados.Config) error {
        return nil
 }
 
+const defaultKeepBalanceConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+
+type oldKeepBalanceConfig struct {
+       Client              *arvados.Client
+       Listen              *string
+       KeepServiceTypes    *[]string
+       KeepServiceList     *arvados.KeepServiceList
+       RunPeriod           *arvados.Duration
+       CollectionBatchSize *int
+       CollectionBuffers   *int
+       RequestTimeout      *arvados.Duration
+       LostBlocksFile      *string
+       ManagementToken     *string
+}
+
+func (ldr *Loader) loadOldKeepBalanceConfig(cfg *arvados.Config) error {
+       if ldr.KeepBalancePath == "" {
+               return nil
+       }
+       var oc oldKeepBalanceConfig
+       err := ldr.loadOldConfigHelper("keep-balance", ldr.KeepBalancePath, &oc)
+       if os.IsNotExist(err) && ldr.KeepBalancePath == defaultKeepBalanceConfigPath {
+               return nil
+       } else if err != nil {
+               return err
+       }
+
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               return err
+       }
+
+       loadOldClientConfig(cluster, oc.Client)
+
+       if oc.Listen != nil {
+               cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: *oc.Listen}] = arvados.ServiceInstance{}
+       }
+       if oc.ManagementToken != nil {
+               cluster.ManagementToken = *oc.ManagementToken
+       }
+       if oc.RunPeriod != nil {
+               cluster.Collections.BalancePeriod = *oc.RunPeriod
+       }
+       if oc.LostBlocksFile != nil {
+               cluster.Collections.BlobMissingReport = *oc.LostBlocksFile
+       }
+       if oc.CollectionBatchSize != nil {
+               cluster.Collections.BalanceCollectionBatch = *oc.CollectionBatchSize
+       }
+       if oc.CollectionBuffers != nil {
+               cluster.Collections.BalanceCollectionBuffers = *oc.CollectionBuffers
+       }
+       if oc.RequestTimeout != nil {
+               cluster.API.KeepServiceRequestTimeout = *oc.RequestTimeout
+       }
+
+       msg := "The %s configuration option is no longer supported. Please remove it from your configuration file. Keep-balance will operate on all configured volumes."
+
+       // If the keep service type provided is "disk" silently ignore it, since
+       // this is what ends up being done anyway.
+       if oc.KeepServiceTypes != nil {
+               numTypes := len(*oc.KeepServiceTypes)
+               if numTypes != 0 && !(numTypes == 1 && (*oc.KeepServiceTypes)[0] == "disk") {
+                       return fmt.Errorf(msg, "KeepServiceType")
+               }
+       }
+
+       if oc.KeepServiceList != nil {
+               return fmt.Errorf(msg, "KeepServiceList")
+       }
+
+       cfg.Clusters[cluster.ClusterID] = *cluster
+       return nil
+}
+
 func (ldr *Loader) loadOldEnvironmentVariables(cfg *arvados.Config) error {
        if os.Getenv("ARVADOS_API_TOKEN") == "" && os.Getenv("ARVADOS_API_HOST") == "" {
                return nil
index ea9b50d035483ab7fd0463669ab4dfda591bdc3f..ff1bb9434a42c8babc3cedef9165e7ad3d16d949 100644 (file)
@@ -216,3 +216,60 @@ func (s *LoadSuite) TestLegacyArvGitHttpdConfig(c *check.C) {
        c.Check(cluster.Git.Repositories, check.Equals, "/test/reporoot")
        c.Check(cluster.Services.Keepproxy.InternalURLs[arvados.URL{Host: ":9000"}], check.Equals, arvados.ServiceInstance{})
 }
+
+func (s *LoadSuite) TestLegacyKeepBalanceConfig(c *check.C) {
+       f := "-legacy-keepbalance-config"
+       content := []byte(fmtKeepBalanceConfig(""))
+       cluster, err := testLoadLegacyConfig(content, f, c)
+
+       c.Check(err, check.IsNil)
+       c.Check(cluster, check.NotNil)
+       c.Check(cluster.ManagementToken, check.Equals, "xyzzy")
+       c.Check(cluster.Services.Keepbalance.InternalURLs[arvados.URL{Host: ":80"}], check.Equals, arvados.ServiceInstance{})
+       c.Check(cluster.Collections.BalanceCollectionBuffers, check.Equals, 1000)
+       c.Check(cluster.Collections.BalanceCollectionBatch, check.Equals, 100000)
+       c.Check(cluster.Collections.BalancePeriod.String(), check.Equals, "10m")
+       c.Check(cluster.Collections.BlobMissingReport, check.Equals, "testfile")
+       c.Check(cluster.API.KeepServiceRequestTimeout.String(), check.Equals, "30m")
+
+       content = []byte(fmtKeepBalanceConfig(`"KeepServiceTypes":["disk"],`))
+       _, err = testLoadLegacyConfig(content, f, c)
+       c.Check(err, check.IsNil)
+
+       content = []byte(fmtKeepBalanceConfig(`"KeepServiceTypes":[],`))
+       _, err = testLoadLegacyConfig(content, f, c)
+       c.Check(err, check.IsNil)
+
+       content = []byte(fmtKeepBalanceConfig(`"KeepServiceTypes":["proxy"],`))
+       _, err = testLoadLegacyConfig(content, f, c)
+       c.Check(err, check.NotNil)
+
+       content = []byte(fmtKeepBalanceConfig(`"KeepServiceTypes":["disk", "proxy"],`))
+       _, err = testLoadLegacyConfig(content, f, c)
+       c.Check(err, check.NotNil)
+
+       content = []byte(fmtKeepBalanceConfig(`"KeepServiceList":{},`))
+       _, err = testLoadLegacyConfig(content, f, c)
+       c.Check(err, check.NotNil)
+}
+
+func fmtKeepBalanceConfig(param string) string {
+       return fmt.Sprintf(`
+{
+       "Client": {
+               "Scheme": "",
+               "APIHost": "example.com",
+               "AuthToken": "abcdefg",
+               "Insecure": false
+       },
+       "Listen": ":80",
+       %s
+       "RunPeriod": "10m",
+       "CollectionBatchSize": 100000,
+       "CollectionBuffers": 1000,
+       "RequestTimeout": "30m",
+       "ManagementToken": "xyzzy",
+       "LostBlocksFile": "testfile"
+}
+`, param)
+}
index 8df561c00fa0fce082ee9635f7f3fbf1ef067936..5437836f6fee05f3aded39954ea8d626d3c12f6e 100644 (file)
@@ -99,6 +99,10 @@ var whitelist = map[string]bool{
        "Collections.TrashSweepInterval":               false,
        "Collections.TrustAllContent":                  false,
        "Collections.WebDAVCache":                      false,
+       "Collections.BalanceCollectionBatch":           false,
+       "Collections.BalancePeriod":                    false,
+       "Collections.BlobMissingReport":                false,
+       "Collections.BalanceCollectionBuffers":         false,
        "Containers":                                   true,
        "Containers.CloudVMs":                          false,
        "Containers.CrunchRunCommand":                  false,
index 32c101a5a080836aa84a3f2a5d0fc7d244813dda..cd5b6bba14f8c3c70d9d5cd6a0ec035b26b2b106 100644 (file)
@@ -388,6 +388,36 @@ Clusters:
       # The default is 2 weeks.
       BlobSigningTTL: 336h
 
+      # When running keep-balance, this is the destination filename for
+      # the list of lost block hashes if there are any, one per line.
+      # Updated automically during each successful run.
+      BlobMissingReport: ""
+
+      # keep-balance operates periodically, i.e.: do a
+      # scan/balance operation, sleep, repeat.
+      #
+      # BalancePeriod determines the interval between start times of
+      # successive scan/balance operations. If a scan/balance operation
+      # takes longer than RunPeriod, the next one will follow it
+      # immediately.
+      #
+      # If SIGUSR1 is received during an idle period between operations,
+      # the next operation will start immediately.
+      BalancePeriod: 10m
+
+      # Limits the number of collections retrieved by keep-balance per
+      # API transaction. If this is zero, page size is
+      # determined by the API server's own page size limits (see
+      # API.MaxItemsPerResponse and API.MaxIndexDatabaseRead).
+      BalanceCollectionBatch: 0
+
+      # The size of keep-balance's internal queue of
+      # collections. Higher values use more memory and improve throughput
+      # by allowing keep-balance to fetch the next page of collections
+      # while the current page is still being processed. If this is zero
+      # or omitted, pages are processed serially.
+      BalanceCollectionBuffers: 1000
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index 61d80a3c58f535733ad65b9053b55283c9d08510..21d17227372d4d3f6776b2526581508d89c937ef 100644 (file)
@@ -37,6 +37,7 @@ type Loader struct {
        WebsocketPath           string
        KeepproxyPath           string
        GitHttpdPath            string
+       KeepBalancePath         string
 
        configdata []byte
 }
@@ -69,6 +70,7 @@ func (ldr *Loader) SetupFlags(flagset *flag.FlagSet) {
        flagset.StringVar(&ldr.WebsocketPath, "legacy-ws-config", defaultWebsocketConfigPath, "Legacy arvados-ws configuration `file`")
        flagset.StringVar(&ldr.KeepproxyPath, "legacy-keepproxy-config", defaultKeepproxyConfigPath, "Legacy keepproxy configuration `file`")
        flagset.StringVar(&ldr.GitHttpdPath, "legacy-git-httpd-config", defaultGitHttpdConfigPath, "Legacy arv-git-httpd configuration `file`")
+       flagset.StringVar(&ldr.KeepBalancePath, "legacy-keepbalance-config", defaultKeepBalanceConfigPath, "Legacy keep-balance configuration `file`")
        flagset.BoolVar(&ldr.SkipLegacy, "skip-legacy", false, "Don't load legacy config files")
 }
 
@@ -149,6 +151,9 @@ func (ldr *Loader) MungeLegacyConfigArgs(lgr logrus.FieldLogger, args []string,
        if legacyConfigArg != "-legacy-git-httpd-config" {
                ldr.GitHttpdPath = ""
        }
+       if legacyConfigArg != "-legacy-keepbalance-config" {
+               ldr.KeepBalancePath = ""
+       }
 
        return munged
 }
@@ -252,6 +257,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
                        ldr.loadOldWebsocketConfig(&cfg),
                        ldr.loadOldKeepproxyConfig(&cfg),
                        ldr.loadOldGitHttpdConfig(&cfg),
+                       ldr.loadOldKeepBalanceConfig(&cfg),
                } {
                        if err != nil {
                                return nil, err
index 076a3c44d7701c63e691d1ff54a1bae4be8e6dab..7c1c3538094869ff82a510226575a2dbbd0491ab 100644 (file)
@@ -119,6 +119,11 @@ type Cluster struct {
                TrashSweepInterval    Duration
                TrustAllContent       bool
 
+               BlobMissingReport        string
+               BalancePeriod            Duration
+               BalanceCollectionBatch   int
+               BalanceCollectionBuffers int
+
                WebDAVCache WebDAVCacheConfig
        }
        Git struct {
index 9f814a20d3572b58ea5c65333a94a6d0136f62c2..3471d50fe356157ff85b762d5d650303ac6930b8 100644 (file)
@@ -66,7 +66,7 @@ type Balancer struct {
 // Typical usage:
 //
 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
        defer bal.time("sweep", "wall clock time to run one full sweep")()
@@ -95,24 +95,21 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                bal.lostBlocks = ioutil.Discard
        }
 
-       if len(config.KeepServiceList.Items) > 0 {
-               err = bal.SetKeepServices(config.KeepServiceList)
-       } else {
-               err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
-       }
+       diskService := []string{"disk"}
+       err = bal.DiscoverKeepServices(client, diskService)
        if err != nil {
                return
        }
 
        for _, srv := range bal.KeepServices {
-               err = srv.discoverMounts(&config.Client)
+               err = srv.discoverMounts(client)
                if err != nil {
                        return
                }
        }
        bal.cleanupMounts()
 
-       if err = bal.CheckSanityEarly(&config.Client); err != nil {
+       if err = bal.CheckSanityEarly(client); err != nil {
                return
        }
        rs := bal.rendezvousState()
@@ -121,7 +118,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                        bal.logf("notice: KeepServices list has changed since last run")
                }
                bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
-               if err = bal.ClearTrashLists(&config.Client); err != nil {
+               if err = bal.ClearTrashLists(client); err != nil {
                        return
                }
                // The current rendezvous state becomes "safe" (i.e.,
@@ -130,7 +127,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                // succeed in clearing existing trash lists.
                nextRunOptions.SafeRendezvousState = rs
        }
-       if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
+       if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -150,14 +147,14 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                lbFile = nil
        }
        if runOptions.CommitPulls {
-               err = bal.CommitPulls(&config.Client)
+               err = bal.CommitPulls(client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
        if runOptions.CommitTrash {
-               err = bal.CommitTrash(&config.Client)
+               err = bal.CommitTrash(client)
        }
        return
 }
@@ -449,7 +446,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
        if coll.ReplicationDesired != nil {
                repl = *coll.ReplicationDesired
        }
-       debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+       bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
        // Pass pdh to IncreaseDesired only if LostBlocksFile is being
        // written -- otherwise it's just a waste of memory.
        pdh := ""
@@ -566,7 +563,7 @@ type balanceResult struct {
 // balanceBlock compares current state to desired state for a single
 // block, and makes the appropriate ChangeSet calls.
 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
-       debugf("balanceBlock: %v %+v", blkid, blk)
+       bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
 
        type slot struct {
                mnt  *KeepMount // never nil
index db530bc4926de88502132730f35c816ec3cf92b6..5e2e7524848e76343b6e02dca1dd3447b1277c21 100644 (file)
@@ -16,8 +16,11 @@ import (
        "sync"
        "time"
 
+       "git.curoverse.com/arvados.git/lib/config"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "github.com/sirupsen/logrus"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
 
@@ -303,41 +306,37 @@ func (s *stubServer) serveKeepstorePull() *reqTracker {
 
 type runSuite struct {
        stub   stubServer
-       config Config
-}
-
-// make a log.Logger that writes to the current test's c.Log().
-func (s *runSuite) logger(c *check.C) *logrus.Logger {
-       r, w := io.Pipe()
-       go func() {
-               buf := make([]byte, 10000)
-               for {
-                       n, err := r.Read(buf)
-                       if n > 0 {
-                               if buf[n-1] == '\n' {
-                                       n--
-                               }
-                               c.Log(string(buf[:n]))
-                       }
-                       if err != nil {
-                               break
-                       }
-               }
-       }()
-       logger := logrus.New()
-       logger.Out = w
-       return logger
+       config *arvados.Cluster
+       client *arvados.Client
+}
+
+func (s *runSuite) newServer(options *RunOptions) *Server {
+       srv := &Server{
+               Cluster:    s.config,
+               ArvClient:  s.client,
+               RunOptions: *options,
+               Metrics:    newMetrics(prometheus.NewRegistry()),
+               Logger:     options.Logger,
+               Dumper:     options.Dumper,
+       }
+       srv.setup()
+       return srv
 }
 
 func (s *runSuite) SetUpTest(c *check.C) {
-       s.config = Config{
-               Client: arvados.Client{
-                       AuthToken: "xyzzy",
-                       APIHost:   "zzzzz.arvadosapi.com",
-                       Client:    s.stub.Start()},
-               KeepServiceTypes: []string{"disk"},
-               RunPeriod:        arvados.Duration(time.Second),
-       }
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.Equals, nil)
+       s.config, err = cfg.GetCluster("")
+       c.Assert(err, check.Equals, nil)
+
+       s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
+       arvadostest.SetServiceURL(&s.config.Services.Keepbalance, "http://localhost:/")
+
+       s.client = &arvados.Client{
+               AuthToken: "xyzzy",
+               APIHost:   "zzzzz.arvadosapi.com",
+               Client:    s.stub.Start()}
+
        s.stub.serveDiscoveryDoc()
        s.stub.logf = c.Logf
 }
@@ -350,7 +349,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
-               Logger:      s.logger(c),
+               Logger:      ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveZeroCollections()
@@ -359,40 +358,18 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       srv, err := NewServer(s.config, opts)
-       c.Assert(err, check.IsNil)
-       _, err = srv.Run()
+       srv := s.newServer(&opts)
+       _, err := srv.runOnce()
        c.Check(err, check.ErrorMatches, "received zero collections")
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
 }
 
-func (s *runSuite) TestServiceTypes(c *check.C) {
-       opts := RunOptions{
-               CommitPulls: true,
-               CommitTrash: true,
-               Logger:      s.logger(c),
-       }
-       s.config.KeepServiceTypes = []string{"unlisted-type"}
-       s.stub.serveCurrentUserAdmin()
-       s.stub.serveFooBarFileCollections()
-       s.stub.serveKeepServices(stubServices)
-       s.stub.serveKeepstoreMounts()
-       indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
-       trashReqs := s.stub.serveKeepstoreTrash()
-       srv, err := NewServer(s.config, opts)
-       c.Assert(err, check.IsNil)
-       _, err = srv.Run()
-       c.Check(err, check.IsNil)
-       c.Check(indexReqs.Count(), check.Equals, 0)
-       c.Check(trashReqs.Count(), check.Equals, 0)
-}
-
 func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
-               Logger:      s.logger(c),
+               Logger:      ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserNotAdmin()
        s.stub.serveZeroCollections()
@@ -400,9 +377,8 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        s.stub.serveKeepstoreMounts()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       srv, err := NewServer(s.config, opts)
-       c.Assert(err, check.IsNil)
-       _, err = srv.Run()
+       srv := s.newServer(&opts)
+       _, err := srv.runOnce()
        c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -412,7 +388,7 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
-               Logger:      s.logger(c),
+               Logger:      ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveCollectionsButSkipOne()
@@ -421,9 +397,8 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       srv, err := NewServer(s.config, opts)
-       c.Assert(err, check.IsNil)
-       _, err = srv.Run()
+       srv := s.newServer(&opts)
+       _, err := srv.runOnce()
        c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -432,12 +407,12 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
 func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
        c.Assert(err, check.IsNil)
-       s.config.LostBlocksFile = lostf.Name()
+       s.config.Collections.BlobMissingReport = lostf.Name()
        defer os.Remove(lostf.Name())
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
-               Logger:      s.logger(c),
+               Logger:      ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
@@ -446,9 +421,9 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        s.stub.serveKeepstoreIndexFoo1()
        s.stub.serveKeepstoreTrash()
        s.stub.serveKeepstorePull()
-       srv, err := NewServer(s.config, opts)
+       srv := s.newServer(&opts)
        c.Assert(err, check.IsNil)
-       _, err = srv.Run()
+       _, err = srv.runOnce()
        c.Check(err, check.IsNil)
        lost, err := ioutil.ReadFile(lostf.Name())
        c.Assert(err, check.IsNil)
@@ -459,7 +434,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
        opts := RunOptions{
                CommitPulls: false,
                CommitTrash: false,
-               Logger:      s.logger(c),
+               Logger:      ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        collReqs := s.stub.serveFooBarFileCollections()
@@ -468,9 +443,8 @@ func (s *runSuite) TestDryRun(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       srv, err := NewServer(s.config, opts)
-       c.Assert(err, check.IsNil)
-       bal, err := srv.Run()
+       srv := s.newServer(&opts)
+       bal, err := srv.runOnce()
        c.Check(err, check.IsNil)
        for _, req := range collReqs.reqs {
                c.Check(req.Form.Get("include_trash"), check.Equals, "true")
@@ -486,16 +460,15 @@ func (s *runSuite) TestDryRun(c *check.C) {
 func (s *runSuite) TestCommit(c *check.C) {
        lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-")
        c.Assert(err, check.IsNil)
-       s.config.LostBlocksFile = lostf.Name()
+       s.config.Collections.BlobMissingReport = lostf.Name()
        defer os.Remove(lostf.Name())
 
-       s.config.Listen = ":"
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
-               Logger:      s.logger(c),
-               Dumper:      s.logger(c),
+               Logger:      ctxlog.TestLogger(c),
+               Dumper:      ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
@@ -504,9 +477,8 @@ func (s *runSuite) TestCommit(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       srv, err := NewServer(s.config, opts)
-       c.Assert(err, check.IsNil)
-       bal, err := srv.Run()
+       srv := s.newServer(&opts)
+       bal, err := srv.runOnce()
        c.Check(err, check.IsNil)
        c.Check(trashReqs.Count(), check.Equals, 8)
        c.Check(pullReqs.Count(), check.Equals, 4)
@@ -529,13 +501,12 @@ func (s *runSuite) TestCommit(c *check.C) {
 }
 
 func (s *runSuite) TestRunForever(c *check.C) {
-       s.config.Listen = ":"
        s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
-               Logger:      s.logger(c),
-               Dumper:      s.logger(c),
+               Logger:      ctxlog.TestLogger(c),
+               Dumper:      ctxlog.TestLogger(c),
        }
        s.stub.serveCurrentUserAdmin()
        s.stub.serveFooBarFileCollections()
@@ -546,13 +517,12 @@ func (s *runSuite) TestRunForever(c *check.C) {
        pullReqs := s.stub.serveKeepstorePull()
 
        stop := make(chan interface{})
-       s.config.RunPeriod = arvados.Duration(time.Millisecond)
-       srv, err := NewServer(s.config, opts)
-       c.Assert(err, check.IsNil)
+       s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
+       srv := s.newServer(&opts)
 
        done := make(chan bool)
        go func() {
-               srv.RunForever(stop)
+               srv.runForever(stop)
                close(done)
        }()
 
@@ -571,13 +541,16 @@ func (s *runSuite) TestRunForever(c *check.C) {
 }
 
 func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
-       resp, err := http.Get("http://" + srv.listening + "/metrics")
-       c.Assert(err, check.IsNil)
-       c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+       req := httptest.NewRequest("GET", "/metrics", nil)
+       resp := httptest.NewRecorder()
+       srv.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+
+       req = httptest.NewRequest("GET", "/metrics?api_token=xyzzy", nil)
+       resp = httptest.NewRecorder()
+       srv.ServeHTTP(resp, req)
+       c.Check(resp.Code, check.Equals, http.StatusOK)
 
-       resp, err = http.Get("http://" + srv.listening + "/metrics?api_token=xyzzy")
-       c.Assert(err, check.IsNil)
-       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
        buf, err := ioutil.ReadAll(resp.Body)
        c.Check(err, check.IsNil)
        return string(buf)
index e372d37841a7b095cc659216bb11b87b7bc793dd..6cffa8ded4dbad6975225949e871852e5ca2d50e 100644 (file)
@@ -13,6 +13,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        check "gopkg.in/check.v1"
 )
 
@@ -69,6 +70,7 @@ func (bal *balancerSuite) SetUpSuite(c *check.C) {
        }
 
        bal.signatureTTL = 3600
+       bal.Logger = ctxlog.TestLogger(c)
 }
 
 func (bal *balancerSuite) SetUpTest(c *check.C) {
index 6aaf07abae395241fdbd5f26be8ae111f14aac1f..a2200e1db90a4ddf69fd65112c432df9bbcba2c6 100644 (file)
@@ -29,7 +29,7 @@ func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
                        longestStreak := 0
                        var lastMod time.Time
                        sawUUID := make(map[string]bool)
-                       err := EachCollection(&s.config.Client, pageSize, func(c arvados.Collection) error {
+                       err := EachCollection(s.client, pageSize, func(c arvados.Collection) error {
                                if c.ModifiedAt == nil {
                                        return nil
                                }
index a79779c7dc8f9fdb5eb7316a74c28fb614d9da52..5b0dc123ae49a627a98c4f5254ff6a1649e869e6 100644 (file)
@@ -11,10 +11,13 @@ import (
        "testing"
        "time"
 
+       "git.curoverse.com/arvados.git/lib/config"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
@@ -22,7 +25,8 @@ import (
 var _ = check.Suite(&integrationSuite{})
 
 type integrationSuite struct {
-       config     Config
+       config     *arvados.Cluster
+       client     *arvados.Client
        keepClient *keepclient.KeepClient
 }
 
@@ -59,14 +63,16 @@ func (s *integrationSuite) TearDownSuite(c *check.C) {
 }
 
 func (s *integrationSuite) SetUpTest(c *check.C) {
-       s.config = Config{
-               Client: arvados.Client{
-                       APIHost:   os.Getenv("ARVADOS_API_HOST"),
-                       AuthToken: arvadostest.DataManagerToken,
-                       Insecure:  true,
-               },
-               KeepServiceTypes: []string{"disk"},
-               RunPeriod:        arvados.Duration(time.Second),
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.Equals, nil)
+       s.config, err = cfg.GetCluster("")
+       c.Assert(err, check.Equals, nil)
+       s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
+
+       s.client = &arvados.Client{
+               APIHost:   os.Getenv("ARVADOS_API_HOST"),
+               AuthToken: arvadostest.DataManagerToken,
+               Insecure:  true,
        }
 }
 
@@ -84,9 +90,9 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
 
                bal := &Balancer{
                        Logger:  logger,
-                       Metrics: newMetrics(),
+                       Metrics: newMetrics(prometheus.NewRegistry()),
                }
-               nextOpts, err := bal.Run(s.config, opts)
+               nextOpts, err := bal.Run(s.client, s.config, opts)
                c.Check(err, check.IsNil)
                c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
                c.Check(nextOpts.CommitPulls, check.Equals, true)
index 563871607874f9ad44a07315ce08bfd68274a23b..1b71fb4e44350bac913961e598494d0c01a333ab 100644 (file)
@@ -6,7 +6,6 @@
 Description=Arvados Keep Balance
 Documentation=https://doc.arvados.org/
 After=network.target
-AssertPathExists=/etc/arvados/keep-balance/keep-balance.yml
 
 # systemd==229 (ubuntu:xenial) obeys StartLimitInterval in the [Unit] section
 StartLimitInterval=0
index 84516a821060da1b795da1b40655a9a62157fd52..cf844ab050043c1662c3cf5cd62ef9ef238a6789 100644 (file)
@@ -5,97 +5,85 @@
 package main
 
 import (
-       "encoding/json"
+       "context"
        "flag"
        "fmt"
-       "log"
-       "net/http"
+       "io"
        "os"
-       "time"
 
+       "git.curoverse.com/arvados.git/lib/config"
+       "git.curoverse.com/arvados.git/lib/service"
        "git.curoverse.com/arvados.git/sdk/go/arvados"
-       "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+       "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
 )
 
-var debugf = func(string, ...interface{}) {}
-
 func main() {
-       var cfg Config
-       var runOptions RunOptions
+       os.Exit(runCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
+}
+
+func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       logger := ctxlog.FromContext(context.Background())
 
-       configPath := flag.String("config", defaultConfigPath,
-               "`path` of JSON or YAML configuration file")
-       serviceListPath := flag.String("config.KeepServiceList", "",
-               "`path` of JSON or YAML file with list of keep services to balance, as given by \"arv keep_service list\" "+
-                       "(default: config[\"KeepServiceList\"], or if none given, get all available services and filter by config[\"KeepServiceTypes\"])")
-       flag.BoolVar(&runOptions.Once, "once", false,
+       var options RunOptions
+       flags := flag.NewFlagSet(prog, flag.ExitOnError)
+       flags.BoolVar(&options.Once, "once", false,
                "balance once and then exit")
-       flag.BoolVar(&runOptions.CommitPulls, "commit-pulls", false,
+       flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
                "send pull requests (make more replicas of blocks that are underreplicated or are not in optimal rendezvous probe order)")
-       flag.BoolVar(&runOptions.CommitTrash, "commit-trash", false,
+       flags.BoolVar(&options.CommitTrash, "commit-trash", false,
                "send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
-       dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
-       dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
-       debugFlag := flag.Bool("debug", false, "enable debug messages")
-       getVersion := flag.Bool("version", false, "Print version information and exit.")
-       flag.Usage = usage
-       flag.Parse()
+       flags.Bool("version", false, "Write version information to stdout and exit 0")
+       dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
 
-       // Print version information if requested
-       if *getVersion {
-               fmt.Printf("keep-balance %s\n", version)
-               return
-       }
+       loader := config.NewLoader(os.Stdin, logger)
+       loader.SetupFlags(flags)
 
-       mustReadConfig(&cfg, *configPath)
-       if *serviceListPath != "" {
-               mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
-       }
+       munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config")
+       flags.Parse(munged)
 
-       if *dumpConfig {
-               log.Fatal(config.DumpAndExit(cfg))
-       }
-
-       to := time.Duration(cfg.RequestTimeout)
-       if to == 0 {
-               to = 30 * time.Minute
-       }
-       arvados.DefaultSecureClient.Timeout = to
-       arvados.InsecureHTTPClient.Timeout = to
-       http.DefaultClient.Timeout = to
-
-       log.Printf("keep-balance %s started", version)
-
-       if *debugFlag {
-               debugf = log.Printf
-               if j, err := json.Marshal(cfg); err != nil {
-                       log.Fatal(err)
-               } else {
-                       log.Printf("config is %s", j)
-               }
-       }
        if *dumpFlag {
                dumper := logrus.New()
                dumper.Out = os.Stdout
                dumper.Formatter = &logrus.TextFormatter{}
-               runOptions.Dumper = dumper
-       }
-       srv, err := NewServer(cfg, runOptions)
-       if err != nil {
-               // (don't run)
-       } else if runOptions.Once {
-               _, err = srv.Run()
-       } else {
-               err = srv.RunForever(nil)
-       }
-       if err != nil {
-               log.Fatal(err)
+               options.Dumper = dumper
        }
-}
 
-func mustReadConfig(dst interface{}, path string) {
-       if err := config.LoadFile(dst, path); err != nil {
-               log.Fatal(err)
-       }
+       // Only pass along the version flag, which gets handled in RunCommand
+       args = nil
+       flags.Visit(func(f *flag.Flag) {
+               if f.Name == "version" {
+                       args = append(args, "-"+f.Name, f.Value.String())
+               }
+       })
+
+       return service.Command(arvados.ServiceNameKeepbalance,
+               func(ctx context.Context, cluster *arvados.Cluster, token string, registry *prometheus.Registry) service.Handler {
+                       if !options.Once && cluster.Collections.BalancePeriod == arvados.Duration(0) {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("cannot start service: Collections.BalancePeriod is zero (if you want to run once and then exit, use the -once flag)"))
+                       }
+
+                       ac, err := arvados.NewClientFromConfig(cluster)
+                       ac.AuthToken = token
+                       if err != nil {
+                               return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
+                       }
+
+                       if options.Logger == nil {
+                               options.Logger = ctxlog.FromContext(ctx)
+                       }
+
+                       srv := &Server{
+                               Cluster:    cluster,
+                               ArvClient:  ac,
+                               RunOptions: options,
+                               Metrics:    newMetrics(registry),
+                               Logger:     options.Logger,
+                               Dumper:     options.Dumper,
+                       }
+
+                       go srv.run()
+                       return srv
+               }).RunCommand(prog, args, stdin, stdout, stderr)
 }
diff --git a/services/keep-balance/main_test.go b/services/keep-balance/main_test.go
deleted file mode 100644 (file)
index a280434..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "time"
-
-       "github.com/ghodss/yaml"
-       check "gopkg.in/check.v1"
-)
-
-var _ = check.Suite(&mainSuite{})
-
-type mainSuite struct{}
-
-func (s *mainSuite) TestExampleJSON(c *check.C) {
-       var config Config
-       c.Check(yaml.Unmarshal(exampleConfigFile, &config), check.IsNil)
-       c.Check(config.KeepServiceTypes, check.DeepEquals, []string{"disk"})
-       c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
-       c.Check(time.Duration(config.RunPeriod), check.Equals, 600*time.Second)
-}
-
-func (s *mainSuite) TestConfigJSONWithKeepServiceList(c *check.C) {
-       var config Config
-       c.Check(yaml.Unmarshal([]byte(`{
-                   "Client": {
-                       "APIHost": "zzzzz.arvadosapi.com:443",
-                       "AuthToken": "xyzzy",
-                       "Insecure": false
-                   },
-                   "KeepServiceList": {
-                       "items": [
-                           {"uuid":"zzzzz-bi64l-abcdefghijklmno", "service_type":"disk", "service_host":"a.zzzzz.arvadosapi.com", "service_port":12345},
-                           {"uuid":"zzzzz-bi64l-bcdefghijklmnop", "service_type":"blob", "service_host":"b.zzzzz.arvadosapi.com", "service_port":12345}
-                       ]
-                   },
-                   "RunPeriod": "600s"
-               }`), &config), check.IsNil)
-       c.Assert(len(config.KeepServiceList.Items), check.Equals, 2)
-       c.Check(config.KeepServiceList.Items[0].UUID, check.Equals, "zzzzz-bi64l-abcdefghijklmno")
-       c.Check(config.KeepServiceList.Items[0].ServicePort, check.Equals, 12345)
-       c.Check(config.Client.AuthToken, check.Equals, "xyzzy")
-}
index 5f3c98723d02a82e9410053657d3262dca3af1be..ce1b1811cc69f28f3fad955a5525b35a666baf3a 100644 (file)
@@ -24,9 +24,9 @@ type metrics struct {
        mtx         sync.Mutex
 }
 
-func newMetrics() *metrics {
+func newMetrics(registry *prometheus.Registry) *metrics {
        return &metrics{
-               reg:         prometheus.NewRegistry(),
+               reg:         registry,
                statsGauges: map[string]setter{},
                observers:   map[string]observer{},
        }
index e2f13a425ed8dfabc729649d98aa7e4ed977899a..9f192d6355cacc791e3ac2f71cf3afc66da2cadc 100644 (file)
@@ -5,8 +5,6 @@
 package main
 
 import (
-       "context"
-       "fmt"
        "net/http"
        "os"
        "os/signal"
@@ -15,56 +13,11 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/auth"
-       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/julienschmidt/httprouter"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
        "github.com/sirupsen/logrus"
 )
 
-var version = "dev"
-
-const (
-       defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
-       rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
-)
-
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
-       // Arvados API endpoint and credentials.
-       Client arvados.Client
-
-       // List of service types (e.g., "disk") to balance.
-       KeepServiceTypes []string
-
-       KeepServiceList arvados.KeepServiceList
-
-       // address, address:port, or :port for management interface
-       Listen string
-
-       // token for management APIs
-       ManagementToken string
-
-       // How often to check
-       RunPeriod arvados.Duration
-
-       // Number of collections to request in each API call
-       CollectionBatchSize int
-
-       // Max collections to buffer in memory (bigger values consume
-       // more memory, but can reduce store-and-forward latency when
-       // fetching pages)
-       CollectionBuffers int
-
-       // Timeout for outgoing http request/response cycle.
-       RequestTimeout arvados.Duration
-
-       // Destination filename for the list of lost block hashes, one
-       // per line. Updated atomically during each successful run.
-       LostBlocksFile string
-}
-
 // RunOptions controls runtime behavior. The flags/options that belong
 // here are the ones that are useful for interactive use. For example,
 // "CommitTrash" is a runtime option rather than a config item because
@@ -87,100 +40,88 @@ type RunOptions struct {
 }
 
 type Server struct {
-       config     Config
-       runOptions RunOptions
-       metrics    *metrics
-       listening  string // for tests
+       Cluster    *arvados.Cluster
+       ArvClient  *arvados.Client
+       RunOptions RunOptions
+       Metrics    *metrics
+
+       httpHandler http.Handler
 
        Logger logrus.FieldLogger
        Dumper logrus.FieldLogger
 }
 
-// NewServer returns a new Server that runs Balancers using the given
-// config and runOptions.
-func NewServer(config Config, runOptions RunOptions) (*Server, error) {
-       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
-               return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
-       }
-       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
-               return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
-       }
+// ServeHTTP implements service.Handler.
+func (srv *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       srv.httpHandler.ServeHTTP(w, r)
+}
 
-       if runOptions.Logger == nil {
-               log := logrus.New()
-               log.Formatter = &logrus.JSONFormatter{
-                       TimestampFormat: rfc3339NanoFixed,
-               }
-               log.Out = os.Stderr
-               runOptions.Logger = log
-       }
+// CheckHealth implements service.Handler.
+func (srv *Server) CheckHealth() error {
+       return nil
+}
 
-       srv := &Server{
-               config:     config,
-               runOptions: runOptions,
-               metrics:    newMetrics(),
-               Logger:     runOptions.Logger,
-               Dumper:     runOptions.Dumper,
+func (srv *Server) setup() {
+       if srv.Cluster.ManagementToken == "" {
+               srv.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+                       http.Error(w, "Management API authentication is not configured", http.StatusForbidden)
+               })
+       } else {
+               mux := httprouter.New()
+               metricsH := promhttp.HandlerFor(srv.Metrics.reg, promhttp.HandlerOpts{
+                       ErrorLog: srv.Logger,
+               })
+               mux.Handler("GET", "/metrics", metricsH)
+               mux.Handler("GET", "/metrics.json", metricsH)
+               srv.httpHandler = auth.RequireLiteralToken(srv.Cluster.ManagementToken, mux)
        }
-       return srv, srv.start()
 }
 
-func (srv *Server) start() error {
-       if srv.config.Listen == "" {
-               return nil
-       }
-       ctx := ctxlog.Context(context.Background(), srv.Logger)
-       server := &httpserver.Server{
-               Server: http.Server{
-                       Handler: httpserver.HandlerWithContext(ctx,
-                               httpserver.LogRequests(
-                                       auth.RequireLiteralToken(srv.config.ManagementToken,
-                                               srv.metrics.Handler(srv.Logger)))),
-               },
-               Addr: srv.config.Listen,
+func (srv *Server) run() {
+       var err error
+       if srv.RunOptions.Once {
+               _, err = srv.runOnce()
+       } else {
+               err = srv.runForever(nil)
        }
-       err := server.Start()
        if err != nil {
-               return err
+               srv.Logger.Error(err)
        }
-       srv.Logger.Printf("listening at %s", server.Addr)
-       srv.listening = server.Addr
-       return nil
 }
 
-func (srv *Server) Run() (*Balancer, error) {
+func (srv *Server) runOnce() (*Balancer, error) {
        bal := &Balancer{
                Logger:         srv.Logger,
                Dumper:         srv.Dumper,
-               Metrics:        srv.metrics,
-               LostBlocksFile: srv.config.LostBlocksFile,
+               Metrics:        srv.Metrics,
+               LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
        }
        var err error
-       srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+       srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
        return bal, err
 }
 
 // RunForever runs forever, or (for testing purposes) until the given
 // stop channel is ready to receive.
-func (srv *Server) RunForever(stop <-chan interface{}) error {
-       logger := srv.runOptions.Logger
+func (srv *Server) runForever(stop <-chan interface{}) error {
+       logger := srv.Logger
 
-       ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
+       ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
 
        // The unbuffered channel here means we only hear SIGUSR1 if
        // it arrives while we're waiting in select{}.
        sigUSR1 := make(chan os.Signal)
        signal.Notify(sigUSR1, syscall.SIGUSR1)
 
-       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
+       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
        for {
-               if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
+               if !srv.RunOptions.CommitPulls && !srv.RunOptions.CommitTrash {
                        logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               _, err := srv.Run()
+               _, err := srv.runOnce()
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
@@ -199,7 +140,7 @@ func (srv *Server) RunForever(stop <-chan interface{}) error {
                        // run too soon after the Nth run is triggered
                        // by SIGUSR1.
                        ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
+                       ticker = time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
                }
                logger.Print("starting next run")
        }
diff --git a/services/keep-balance/usage.go b/services/keep-balance/usage.go
deleted file mode 100644 (file)
index b39e839..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "flag"
-       "fmt"
-       "os"
-)
-
-var exampleConfigFile = []byte(`
-Client:
-    APIHost: zzzzz.arvadosapi.com:443
-    AuthToken: xyzzy
-    Insecure: false
-KeepServiceTypes:
-    - disk
-Listen: ":9005"
-ManagementToken: xyzzy
-RunPeriod: 600s
-CollectionBatchSize: 100000
-CollectionBuffers: 1000
-RequestTimeout: 30m`)
-
-func usage() {
-       fmt.Fprintf(os.Stderr, `
-
-keep-balance rebalances a set of keepstore servers. It creates new
-copies of underreplicated blocks, deletes excess copies of
-overreplicated and unreferenced blocks, and moves blocks to better
-positions (according to the rendezvous hash algorithm) so clients find
-them faster.
-
-Usage: keep-balance [options]
-
-Options:
-`)
-       flag.PrintDefaults()
-       fmt.Fprintf(os.Stderr, `
-Example config file:
-%s
-
-    Client.AuthToken must be recognized by Arvados as an admin token,
-    and must be recognized by all Keep services as a "data manager
-    key".
-
-    Client.Insecure should be true if your Arvados API endpoint uses
-    an unverifiable SSL/TLS certificate.
-
-Periodic scanning:
-
-    By default, keep-balance operates periodically, i.e.: do a
-    scan/balance operation, sleep, repeat.
-
-    RunPeriod determines the interval between start times of
-    successive scan/balance operations. If a scan/balance operation
-    takes longer than RunPeriod, the next one will follow it
-    immediately.
-
-    If SIGUSR1 is received during an idle period between operations,
-    the next operation will start immediately.
-
-One-time scanning:
-
-    Use the -once flag to do a single operation and then exit. The
-    exit code will be zero if the operation was successful.
-
-Committing:
-
-    By default, keep-service computes and reports changes but does not
-    implement them by sending pull and trash lists to the Keep
-    services.
-
-    Use the -commit-pull and -commit-trash flags to implement the
-    computed changes.
-
-Tuning resource usage:
-
-    CollectionBatchSize limits the number of collections retrieved per
-    API transaction. If this is zero or omitted, page size is
-    determined by the API server's own page size limits (see
-    max_items_per_response and max_index_database_read configs).
-
-    CollectionBuffers sets the size of an internal queue of
-    collections. Higher values use more memory, and improve throughput
-    by allowing keep-balance to fetch the next page of collections
-    while the current page is still being processed. If this is zero
-    or omitted, pages are processed serially.
-
-    RequestTimeout is the maximum time keep-balance will spend on a
-    single HTTP request (getting a page of collections, getting the
-    block index from a keepstore server, or sending a trash or pull
-    list to a keepstore server). Defaults to 30 minutes.
-
-Limitations:
-
-    keep-balance does not attempt to discover whether committed pull
-    and trash requests ever get carried out -- only that they are
-    accepted by the Keep services. If some services are full, new
-    copies of underreplicated blocks might never get made, only
-    repeatedly requested.
-
-`, exampleConfigFile)
-}