14285: Merge branch 'master' into 14285-keep-balance-metrics
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 15 Oct 2018 15:23:18 +0000 (11:23 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 15 Oct 2018 15:23:18 +0000 (11:23 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

25 files changed:
doc/admin/health-checks.html.textile.liquid
doc/admin/metrics.html.textile.liquid
doc/install/install-keep-balance.html.textile.liquid
doc/install/install-keepstore.html.textile.liquid
sdk/go/arvados/config.go
sdk/go/auth/auth.go
sdk/go/auth/handlers.go [new file with mode: 0644]
sdk/go/health/aggregator.go
sdk/go/health/aggregator_test.go
sdk/go/httpserver/metrics.go
services/arv-git-httpd/auth_handler.go
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/integration_test.go
services/keep-balance/main.go
services/keep-balance/metrics.go [new file with mode: 0644]
services/keep-balance/server.go [new file with mode: 0644]
services/keep-balance/time_me.go [deleted file]
services/keep-balance/usage.go
services/keep-web/handler.go
services/keep-web/server.go
services/keep-web/server_test.go
services/keepstore/config.go
services/keepstore/handlers.go
services/keepstore/mounts_test.go

index 630c6a178f1cbd39db459c7344ca081bc460604c..eb71fda5e628b13d0fb77153e673861edc5d6c20 100644 (file)
@@ -39,32 +39,35 @@ The healthcheck aggregator uses the @NodeProfile@ section of the cluster-wide @a
 Cluster:
   # The cluster uuid prefix
   zzzzz:
+    ManagementToken: xyzzy
     NodeProfile:
       # For each node, the profile name corresponds to a
       # locally-resolvable hostname, and describes which Arvados
       # services are available on that machine.
       api:
         arvados-controller:
-          Listen: 8000
+          Listen: :8000
         arvados-api-server:
-          Listen: 8001
+          Listen: :8001
       manage:
        arvados-node-manager:
-         Listen: 8002
+         Listen: :8002
       workbench:
        arvados-workbench:
-         Listen: 8003
+         Listen: :8003
        arvados-ws:
-         Listen: 8004
+         Listen: :8004
       keep:
        keep-web:
-         Listen: 8005
+         Listen: :8005
        keepproxy:
-         Listen: 8006
+         Listen: :8006
+       keep-balance:
+         Listen: :9005
       keep0:
         keepstore:
-         Listen: 25701
+         Listen: :25107
       keep1:
         keepstore:
-         Listen: 25701
+         Listen: :25107
 </pre>
index 45b9ece8c9926e0147313c05504f33cad994a8ef..d553263d560b6e5d5706dfc8a71e822a138a0ebd 100644 (file)
@@ -146,6 +146,31 @@ h3. Example response
 }
 </pre>
 
+h2. Keep-balance
+
+Keep-balance exports metrics at @/metrics@ -- e.g., @http://keep.zzzzz.arvadosapi.com:9005/metrics@.
+
+table(table table-bordered table-condensed).
+|_. Name|_. Type|_. Description|
+|arvados_keep_total_{replicas,blocks,bytes}|gauge|stored data (stored in backend volumes, whether referenced or not)|
+|arvados_keep_garbage_{replicas,blocks,bytes}|gauge|garbage data (unreferenced, and old enough to trash)|
+|arvados_keep_transient_{replicas,blocks,bytes}|gauge|transient data (unreferenced, but too new to trash)|
+|arvados_keep_overreplicated_{replicas,blocks,bytes}|gauge|overreplicated data (more replicas exist than are needed)|
+|arvados_keep_underreplicated_{replicas,blocks,bytes}|gauge|underreplicated data (fewer replicas exist than are needed)|
+|arvados_keep_lost_{replicas,blocks,bytes}|gauge|lost data (referenced by collections, but not found on any backend volume)|
+|arvados_keepbalance_get_state_seconds|summary|time to get all collections and keepstore volume indexes for one iteration|
+|arvados_keepbalance_changeset_compute_seconds|summary|time to compute changesets for one iteration|
+|arvados_keepbalance_send_pull_list_seconds|summary|time to send pull lists to all keepstore servers for one iteration|
+|arvados_keepbalance_send_trash_list_seconds|summary|time to send trash lists to all keepstore servers for one iteration|
+|arvados_keepbalance_sweep_seconds|summary|time to complete one iteration|
+
+Each @arvados_keep_@ storage state statistic above is presented as a set of three metrics:
+
+table(table table-bordered table-condensed).
+|*_blocks|distinct block hashes|
+|*_bytes|bytes stored on backend volumes|
+|*_replicas|objects/files stored on backend volumes|
+
 h2. Node manager
 
 The node manager status end point provides a snapshot of internal status at the time of the most recent wishlist update.
index 3a8dce078dd092bfe687639f912415b2553bf14c..3b8b3c05331ff13aaaaf1d03deb29de66f5b656f 100644 (file)
@@ -75,11 +75,14 @@ h3. Create a keep-balance configuration file
 On the host running keep-balance, create @/etc/arvados/keep-balance/keep-balance.yml@ using the token you generated above.  Follow this YAML format:
 
 <notextile>
-<pre><code>Client:
+<pre><code>Listen: :9005
+Client:
   APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
   AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
 KeepServiceTypes:
   - disk
+Listen: :9005
+ManagementToken: <span class="userinput">xyzzy</span>
 RunPeriod: 10m
 CollectionBatchSize: 100000
 CollectionBuffers: 1000
index 943c9bae36b1c7e2358a83f9636bb3eb3ddf3cd3..fc4914efdbcd67b636a30d7c535a17900fbf1c05 100644 (file)
@@ -88,9 +88,9 @@ Listen: :25107
 # Format of request/response and error logs: "json" or "text".
 LogFormat: json
 
-# The secret key that must be provided by monitoring services
-# wishing to access the health check endpoint (/_health).
-ManagementToken: ""
+# The secret key that must be provided by monitoring services when
+# using the health check and metrics endpoints (/_health, /metrics).
+ManagementToken: xyzzy
 
 # Maximum RAM to use for data buffers, given in multiples of block
 # size (64 MiB). When this limit is reached, HTTP requests requiring
index c723be7d10d4013b04e7a8b75ff7176bfb8f58f3..e2e9907d5d115ebb61a53ae873249511b3732a50 100644 (file)
@@ -161,6 +161,7 @@ func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
 type NodeProfile struct {
        Controller  SystemServiceInstance `json:"arvados-controller"`
        Health      SystemServiceInstance `json:"arvados-health"`
+       Keepbalance SystemServiceInstance `json:"keep-balance"`
        Keepproxy   SystemServiceInstance `json:"keepproxy"`
        Keepstore   SystemServiceInstance `json:"keepstore"`
        Keepweb     SystemServiceInstance `json:"keep-web"`
@@ -178,6 +179,7 @@ const (
        ServiceNameNodemanager ServiceName = "arvados-node-manager"
        ServiceNameWorkbench   ServiceName = "arvados-workbench"
        ServiceNameWebsocket   ServiceName = "arvados-ws"
+       ServiceNameKeepbalance ServiceName = "keep-balance"
        ServiceNameKeepweb     ServiceName = "keep-web"
        ServiceNameKeepproxy   ServiceName = "keepproxy"
        ServiceNameKeepstore   ServiceName = "keepstore"
@@ -192,6 +194,7 @@ func (np *NodeProfile) ServicePorts() map[ServiceName]string {
                ServiceNameNodemanager: np.Nodemanager.Listen,
                ServiceNameWorkbench:   np.Workbench.Listen,
                ServiceNameWebsocket:   np.Websocket.Listen,
+               ServiceNameKeepbalance: np.Keepbalance.Listen,
                ServiceNameKeepweb:     np.Keepweb.Listen,
                ServiceNameKeepproxy:   np.Keepproxy.Listen,
                ServiceNameKeepstore:   np.Keepstore.Listen,
index ad1d398c763d7eaacefefcde8993e39044582f2a..3c266e0d3afda2254df6b3c7ccad7157a121bc6c 100644 (file)
@@ -19,7 +19,11 @@ func NewCredentials() *Credentials {
        return &Credentials{Tokens: []string{}}
 }
 
-func NewCredentialsFromHTTPRequest(r *http.Request) *Credentials {
+func CredentialsFromRequest(r *http.Request) *Credentials {
+       if c, ok := r.Context().Value(contextKeyCredentials).(*Credentials); ok {
+               // preloaded by middleware
+               return c
+       }
        c := NewCredentials()
        c.LoadTokensFromHTTPRequest(r)
        return c
diff --git a/sdk/go/auth/handlers.go b/sdk/go/auth/handlers.go
new file mode 100644 (file)
index 0000000..7b1760f
--- /dev/null
@@ -0,0 +1,47 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package auth
+
+import (
+       "context"
+       "net/http"
+)
+
+type contextKey string
+
+var contextKeyCredentials contextKey = "credentials"
+
+// LoadToken wraps the next handler, adding credentials to the request
+// context so subsequent handlers can access them efficiently via
+// CredentialsFromRequest.
+func LoadToken(next http.Handler) http.Handler {
+       return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), contextKeyCredentials, CredentialsFromRequest(r))))
+       })
+}
+
+// RequireLiteralToken wraps the next handler, rejecting any request
+// that doesn't supply the given token. If the given token is empty,
+// RequireLiteralToken returns next (i.e., no auth checks are
+// performed).
+func RequireLiteralToken(token string, next http.Handler) http.Handler {
+       if token == "" {
+               return next
+       }
+       return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               c := CredentialsFromRequest(r)
+               if len(c.Tokens) == 0 {
+                       http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
+                       return
+               }
+               for _, t := range c.Tokens {
+                       if t == token {
+                               next.ServeHTTP(w, r)
+                               return
+                       }
+               }
+               http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
+       })
+}
index a6cb8798aa328a468c1db98c3c3e5bf38773f15c..564331327a8d53ad250b044112f25e1b07730444 100644 (file)
@@ -217,7 +217,7 @@ func (agg *Aggregator) ping(url string, cluster *arvados.Cluster) (result CheckR
 }
 
 func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
-       creds := auth.NewCredentialsFromHTTPRequest(req)
+       creds := auth.CredentialsFromRequest(req)
        for _, token := range creds.Tokens {
                if token != "" && token == cluster.ManagementToken {
                        return true
index a96ed136cbd1539d986a1332a4914c61af335d6a..cb47c9e6705ea096087199813050e3c3095f4974 100644 (file)
@@ -108,6 +108,7 @@ func (s *AggregatorSuite) TestHealthy(c *check.C) {
        defer srv.Close()
        s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
                Controller:  arvados.SystemServiceInstance{Listen: listen},
+               Keepbalance: arvados.SystemServiceInstance{Listen: listen},
                Keepproxy:   arvados.SystemServiceInstance{Listen: listen},
                Keepstore:   arvados.SystemServiceInstance{Listen: listen},
                Keepweb:     arvados.SystemServiceInstance{Listen: listen},
@@ -132,6 +133,7 @@ func (s *AggregatorSuite) TestHealthyAndUnhealthy(c *check.C) {
        defer srvU.Close()
        s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
                Controller:  arvados.SystemServiceInstance{Listen: listenH},
+               Keepbalance: arvados.SystemServiceInstance{Listen: listenH},
                Keepproxy:   arvados.SystemServiceInstance{Listen: listenH},
                Keepstore:   arvados.SystemServiceInstance{Listen: listenH},
                Keepweb:     arvados.SystemServiceInstance{Listen: listenH},
index b52068e9571d8518a5eb5afee1267e9470821617..a0455f11b11b19ac2d4c88d87554d9d7c5794d2a 100644 (file)
@@ -10,6 +10,7 @@ import (
        "strings"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/auth"
        "git.curoverse.com/arvados.git/sdk/go/stats"
        "github.com/Sirupsen/logrus"
        "github.com/gogo/protobuf/jsonpb"
@@ -23,7 +24,7 @@ type Handler interface {
        // Returns an http.Handler that serves the Handler's metrics
        // data at /metrics and /metrics.json, and passes other
        // requests through to next.
-       ServeAPI(next http.Handler) http.Handler
+       ServeAPI(token string, next http.Handler) http.Handler
 }
 
 type metrics struct {
@@ -73,19 +74,24 @@ func (m *metrics) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 // metrics API endpoints (currently "GET /metrics(.json)?") and passes
 // other requests through to next.
 //
+// If the given token is not empty, that token must be supplied by a
+// client in order to access the metrics endpoints.
+//
 // Typical example:
 //
 //     m := Instrument(...)
-//     srv := http.Server{Handler: m.ServeAPI(m)}
-func (m *metrics) ServeAPI(next http.Handler) http.Handler {
+//     srv := http.Server{Handler: m.ServeAPI("secrettoken", m)}
+func (m *metrics) ServeAPI(token string, next http.Handler) http.Handler {
+       jsonMetrics := auth.RequireLiteralToken(token, http.HandlerFunc(m.exportJSON))
+       plainMetrics := auth.RequireLiteralToken(token, m.exportProm)
        return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
                switch {
                case req.Method != "GET" && req.Method != "HEAD":
                        next.ServeHTTP(w, req)
                case req.URL.Path == "/metrics.json":
-                       m.exportJSON(w, req)
+                       jsonMetrics.ServeHTTP(w, req)
                case req.URL.Path == "/metrics":
-                       m.exportProm.ServeHTTP(w, req)
+                       plainMetrics.ServeHTTP(w, req)
                default:
                        next.ServeHTTP(w, req)
                }
index b4dc58b24fc1cb1436cbb1db9dbc6b73deec373c..3b3032afda5d9707616ce474c431e10d2e629e37 100644 (file)
@@ -91,7 +91,7 @@ func (h *authHandler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                httpserver.Log(r.RemoteAddr, passwordToLog, w.WroteStatus(), statusText, repoName, r.Method, r.URL.Path)
        }()
 
-       creds := auth.NewCredentialsFromHTTPRequest(r)
+       creds := auth.CredentialsFromRequest(r)
        if len(creds.Tokens) == 0 {
                statusCode, statusText = http.StatusUnauthorized, "no credentials provided"
                w.Header().Add("WWW-Authenticate", "Basic realm=\"git\"")
index d86234a936cc96702f3a79d12c10d04548c0faa2..333a4fbde99b8470ed25fb45503baf2f2d5241a1 100644 (file)
@@ -10,7 +10,6 @@ import (
        "fmt"
        "log"
        "math"
-       "os"
        "runtime"
        "sort"
        "strings"
@@ -19,20 +18,9 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/Sirupsen/logrus"
 )
 
-// CheckConfig returns an error if anything is wrong with the given
-// config and runOptions.
-func CheckConfig(config Config, runOptions RunOptions) error {
-       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
-               return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
-       }
-       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
-               return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
-       }
-       return nil
-}
-
 // Balancer compares the contents of keepstore servers with the
 // collections stored in Arvados, and issues pull/trash requests
 // needed to get (closer to) the optimal data layout.
@@ -43,11 +31,13 @@ func CheckConfig(config Config, runOptions RunOptions) error {
 // BlobSignatureTTL; and all N existing replicas of a given data block
 // are in the N best positions in rendezvous probe order.
 type Balancer struct {
+       Logger  *logrus.Logger
+       Dumper  *logrus.Logger
+       Metrics *metrics
+
        *BlockStateMap
        KeepServices       map[string]*KeepService
        DefaultReplication int
-       Logger             *log.Logger
-       Dumper             *log.Logger
        MinMtime           int64
 
        classes       []string
@@ -72,13 +62,7 @@ type Balancer struct {
 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
-       bal.Dumper = runOptions.Dumper
-       bal.Logger = runOptions.Logger
-       if bal.Logger == nil {
-               bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
-       }
-
-       defer timeMe(bal.Logger, "Run")()
+       defer bal.time("sweep", "wall clock time to run one full sweep")()
 
        if len(config.KeepServiceList.Items) > 0 {
                err = bal.SetKeepServices(config.KeepServiceList)
@@ -269,7 +253,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 //
 // It encodes the resulting information in BlockStateMap.
 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
-       defer timeMe(bal.Logger, "GetCurrentState")()
+       defer bal.time("get_state", "wall clock time to get current state")()
        bal.BlockStateMap = NewBlockStateMap()
 
        dd, err := c.DiscoveryDocument()
@@ -413,7 +397,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
 func (bal *Balancer) ComputeChangeSets() {
        // This just calls balanceBlock() once for each block, using a
        // pool of worker goroutines.
-       defer timeMe(bal.Logger, "ComputeChangeSets")()
+       defer bal.time("changeset_compute", "wall clock time to compute changesets")()
        bal.setupLookupTables()
 
        type balanceTask struct {
@@ -893,6 +877,7 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                s.trashes += len(srv.ChangeSet.Trashes)
        }
        bal.stats = s
+       bal.Metrics.UpdateStats(s)
 }
 
 // PrintStatistics writes statistics about the computed changes to
@@ -986,6 +971,7 @@ func (bal *Balancer) CheckSanityLate() error {
 // existing blocks that are either underreplicated or poorly
 // distributed according to rendezvous hashing.
 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+       defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
        return bal.commitAsync(c, "send pull list",
                func(srv *KeepService) error {
                        return srv.CommitPulls(c)
@@ -996,6 +982,7 @@ func (bal *Balancer) CommitPulls(c *arvados.Client) error {
 // keepstore servers. This has the effect of deleting blocks that are
 // overreplicated or unreferenced.
 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+       defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
        return bal.commitAsync(c, "send trash list",
                func(srv *KeepService) error {
                        return srv.CommitTrash(c)
@@ -1009,7 +996,6 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
                        var err error
                        defer func() { errs <- err }()
                        label := fmt.Sprintf("%s: %v", srv, label)
-                       defer timeMe(bal.Logger, label)()
                        err = f(srv)
                        if err != nil {
                                err = fmt.Errorf("%s: %v", label, err)
@@ -1033,6 +1019,17 @@ func (bal *Balancer) logf(f string, args ...interface{}) {
        }
 }
 
+func (bal *Balancer) time(name, help string) func() {
+       observer := bal.Metrics.DurationObserver(name+"_seconds", help)
+       t0 := time.Now()
+       bal.Logger.Printf("%s: start", name)
+       return func() {
+               dur := time.Since(t0)
+               observer.Observe(dur.Seconds())
+               bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
+       }
+}
+
 // Rendezvous hash sort function. Less efficient than sorting on
 // precomputed rendezvous hashes, but also rarely used.
 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
index 28776abc47c600ce8540949d8b6fdd7ed63708ff..f42383297f3fd41067cd44766c10fa4c4066f5fc 100644 (file)
@@ -9,7 +9,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net/http"
        "net/http/httptest"
        "strings"
@@ -17,6 +16,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
 
        check "gopkg.in/check.v1"
 )
@@ -282,7 +282,7 @@ type runSuite struct {
 }
 
 // make a log.Logger that writes to the current test's c.Log().
-func (s *runSuite) logger(c *check.C) *log.Logger {
+func (s *runSuite) logger(c *check.C) *logrus.Logger {
        r, w := io.Pipe()
        go func() {
                buf := make([]byte, 10000)
@@ -299,7 +299,9 @@ func (s *runSuite) logger(c *check.C) *log.Logger {
                        }
                }
        }()
-       return log.New(w, "", log.LstdFlags)
+       logger := logrus.New()
+       logger.Out = w
+       return logger
 }
 
 func (s *runSuite) SetUpTest(c *check.C) {
@@ -308,7 +310,9 @@ func (s *runSuite) SetUpTest(c *check.C) {
                        AuthToken: "xyzzy",
                        APIHost:   "zzzzz.arvadosapi.com",
                        Client:    s.stub.Start()},
-               KeepServiceTypes: []string{"disk"}}
+               KeepServiceTypes: []string{"disk"},
+               RunPeriod:        arvados.Duration(time.Second),
+       }
        s.stub.serveDiscoveryDoc()
        s.stub.logf = c.Logf
 }
@@ -330,7 +334,9 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       _, err := (&Balancer{}).Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       _, err = srv.Run()
        c.Check(err, check.ErrorMatches, "received zero collections")
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -349,7 +355,9 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
        s.stub.serveKeepstoreMounts()
        indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
-       _, err := (&Balancer{}).Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       _, err = srv.Run()
        c.Check(err, check.IsNil)
        c.Check(indexReqs.Count(), check.Equals, 0)
        c.Check(trashReqs.Count(), check.Equals, 0)
@@ -367,7 +375,9 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        s.stub.serveKeepstoreMounts()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       _, err := (&Balancer{}).Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       _, err = srv.Run()
        c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -386,7 +396,9 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       _, err := (&Balancer{}).Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       _, err = srv.Run()
        c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -405,8 +417,9 @@ func (s *runSuite) TestDryRun(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       var bal Balancer
-       _, err := bal.Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       bal, err := srv.Run()
        c.Check(err, check.IsNil)
        for _, req := range collReqs.reqs {
                c.Check(req.Form.Get("include_trash"), check.Equals, "true")
@@ -419,6 +432,8 @@ func (s *runSuite) TestDryRun(c *check.C) {
 }
 
 func (s *runSuite) TestCommit(c *check.C) {
+       s.config.Listen = ":"
+       s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
@@ -432,8 +447,9 @@ func (s *runSuite) TestCommit(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       var bal Balancer
-       _, err := bal.Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       bal, err := srv.Run()
        c.Check(err, check.IsNil)
        c.Check(trashReqs.Count(), check.Equals, 8)
        c.Check(pullReqs.Count(), check.Equals, 4)
@@ -442,9 +458,16 @@ func (s *runSuite) TestCommit(c *check.C) {
        // "bar" block is underreplicated by 1, and its only copy is
        // in a poor rendezvous position
        c.Check(bal.stats.pulls, check.Equals, 2)
+
+       metrics := s.getMetrics(c, srv)
+       c.Check(metrics, check.Matches, `(?ms).*\nkeep_total_bytes 15\n.*`)
+       c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+       c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count 1\n.*`)
 }
 
 func (s *runSuite) TestRunForever(c *check.C) {
+       s.config.Listen = ":"
+       s.config.ManagementToken = "xyzzy"
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
@@ -461,7 +484,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
 
        stop := make(chan interface{})
        s.config.RunPeriod = arvados.Duration(time.Millisecond)
-       go RunForever(s.config, opts, stop)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+
+       done := make(chan bool)
+       go func() {
+               srv.RunForever(stop)
+               close(done)
+       }()
 
        // Each run should send 4 pull lists + 4 trash lists. The
        // first run should also send 4 empty trash lists at
@@ -471,6 +501,21 @@ func (s *runSuite) TestRunForever(c *check.C) {
                time.Sleep(time.Millisecond)
        }
        stop <- true
+       <-done
        c.Check(pullReqs.Count() >= 16, check.Equals, true)
        c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
+       c.Check(s.getMetrics(c, srv), check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
+}
+
+func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
+       resp, err := http.Get("http://" + srv.listening + "/metrics")
+       c.Assert(err, check.IsNil)
+       c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+
+       resp, err = http.Get("http://" + srv.listening + "/metrics?api_token=xyzzy")
+       c.Assert(err, check.IsNil)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       buf, err := ioutil.ReadAll(resp.Body)
+       c.Check(err, check.IsNil)
+       return string(buf)
 }
index 9fc47623e73f40157ad14d0056ae1b21f85e1e1e..5280b40c9123012210010585308dd883e3d7962a 100644 (file)
@@ -6,7 +6,6 @@ package main
 
 import (
        "bytes"
-       "log"
        "os"
        "strings"
        "testing"
@@ -16,6 +15,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/Sirupsen/logrus"
 
        check "gopkg.in/check.v1"
 )
@@ -67,6 +67,7 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
                        Insecure:  true,
                },
                KeepServiceTypes: []string{"disk"},
+               RunPeriod:        arvados.Duration(time.Second),
        }
 }
 
@@ -74,12 +75,19 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
        var logBuf *bytes.Buffer
        for iter := 0; iter < 20; iter++ {
                logBuf := &bytes.Buffer{}
+               logger := logrus.New()
+               logger.Out = logBuf
                opts := RunOptions{
                        CommitPulls: true,
                        CommitTrash: true,
-                       Logger:      log.New(logBuf, "", log.LstdFlags),
+                       Logger:      logger,
                }
-               nextOpts, err := (&Balancer{}).Run(s.config, opts)
+
+               bal := &Balancer{
+                       Logger:  logger,
+                       Metrics: newMetrics(),
+               }
+               nextOpts, err := bal.Run(s.config, opts)
                c.Check(err, check.IsNil)
                c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
                c.Check(nextOpts.CommitPulls, check.Equals, true)
index 90235cbf3188d91bc274412ddd5522dc639fa812..e3e90d3581517c0ae8831f76be2881aaa0f1a44c 100644 (file)
@@ -11,67 +11,13 @@ import (
        "log"
        "net/http"
        "os"
-       "os/signal"
-       "syscall"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/config"
+       "github.com/Sirupsen/logrus"
 )
 
-var version = "dev"
-
-const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
-
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
-       // Arvados API endpoint and credentials.
-       Client arvados.Client
-
-       // List of service types (e.g., "disk") to balance.
-       KeepServiceTypes []string
-
-       KeepServiceList arvados.KeepServiceList
-
-       // How often to check
-       RunPeriod arvados.Duration
-
-       // Number of collections to request in each API call
-       CollectionBatchSize int
-
-       // Max collections to buffer in memory (bigger values consume
-       // more memory, but can reduce store-and-forward latency when
-       // fetching pages)
-       CollectionBuffers int
-
-       // Timeout for outgoing http request/response cycle.
-       RequestTimeout arvados.Duration
-}
-
-// RunOptions controls runtime behavior. The flags/options that belong
-// here are the ones that are useful for interactive use. For example,
-// "CommitTrash" is a runtime option rather than a config item because
-// it invokes a troubleshooting feature rather than expressing how
-// balancing is meant to be done at a given site.
-//
-// RunOptions fields are controlled by command line flags.
-type RunOptions struct {
-       Once        bool
-       CommitPulls bool
-       CommitTrash bool
-       Logger      *log.Logger
-       Dumper      *log.Logger
-
-       // SafeRendezvousState from the most recent balance operation,
-       // or "" if unknown. If this changes from one run to the next,
-       // we need to watch out for races. See
-       // (*Balancer)ClearTrashLists.
-       SafeRendezvousState string
-}
-
 var debugf = func(string, ...interface{}) {}
 
 func main() {
@@ -130,15 +76,17 @@ func main() {
                }
        }
        if *dumpFlag {
-               runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+               runOptions.Dumper = logrus.New()
+               runOptions.Dumper.Out = os.Stdout
+               runOptions.Dumper.Formatter = &logrus.TextFormatter{}
        }
-       err := CheckConfig(cfg, runOptions)
+       srv, err := NewServer(cfg, runOptions)
        if err != nil {
                // (don't run)
        } else if runOptions.Once {
-               _, err = (&Balancer{}).Run(cfg, runOptions)
+               _, err = srv.Run()
        } else {
-               err = RunForever(cfg, runOptions, nil)
+               err = srv.RunForever(nil)
        }
        if err != nil {
                log.Fatal(err)
@@ -150,53 +98,3 @@ func mustReadConfig(dst interface{}, path string) {
                log.Fatal(err)
        }
 }
-
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
-       if runOptions.Logger == nil {
-               runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
-       }
-       logger := runOptions.Logger
-
-       ticker := time.NewTicker(time.Duration(config.RunPeriod))
-
-       // The unbuffered channel here means we only hear SIGUSR1 if
-       // it arrives while we're waiting in select{}.
-       sigUSR1 := make(chan os.Signal)
-       signal.Notify(sigUSR1, syscall.SIGUSR1)
-
-       logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
-
-       for {
-               if !runOptions.CommitPulls && !runOptions.CommitTrash {
-                       logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
-                       logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
-               }
-
-               bal := &Balancer{}
-               var err error
-               runOptions, err = bal.Run(config, runOptions)
-               if err != nil {
-                       logger.Print("run failed: ", err)
-               } else {
-                       logger.Print("run succeeded")
-               }
-
-               select {
-               case <-stop:
-                       signal.Stop(sigUSR1)
-                       return nil
-               case <-ticker.C:
-                       logger.Print("timer went off")
-               case <-sigUSR1:
-                       logger.Print("received SIGUSR1, resetting timer")
-                       // Reset the timer so we don't start the N+1st
-                       // run too soon after the Nth run is triggered
-                       // by SIGUSR1.
-                       ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(config.RunPeriod))
-               }
-               logger.Print("starting next run")
-       }
-}
diff --git a/services/keep-balance/metrics.go b/services/keep-balance/metrics.go
new file mode 100644 (file)
index 0000000..96ee66c
--- /dev/null
@@ -0,0 +1,112 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+       "net/http"
+       "sync"
+
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type observer interface{ Observe(float64) }
+type setter interface{ Set(float64) }
+
+type metrics struct {
+       reg         *prometheus.Registry
+       statsGauges map[string]setter
+       observers   map[string]observer
+       setupOnce   sync.Once
+       mtx         sync.Mutex
+}
+
+func newMetrics() *metrics {
+       return &metrics{
+               reg:         prometheus.NewRegistry(),
+               statsGauges: map[string]setter{},
+               observers:   map[string]observer{},
+       }
+}
+
+func (m *metrics) DurationObserver(name, help string) observer {
+       m.mtx.Lock()
+       defer m.mtx.Unlock()
+       if obs, ok := m.observers[name]; ok {
+               return obs
+       }
+       summary := prometheus.NewSummary(prometheus.SummaryOpts{
+               Name:      name,
+               Subsystem: "keepbalance",
+               Help:      help,
+       })
+       m.reg.MustRegister(summary)
+       m.observers[name] = summary
+       return summary
+}
+
+// UpdateStats updates prometheus metrics using the given
+// balancerStats. It creates and registers the needed gauges on its
+// first invocation.
+func (m *metrics) UpdateStats(s balancerStats) {
+       type gauge struct {
+               Value interface{}
+               Help  string
+       }
+       s2g := map[string]gauge{
+               "total":           {s.current, "current backend storage usage"},
+               "garbage":         {s.garbage, "garbage (unreferenced, old)"},
+               "transient":       {s.unref, "transient (unreferenced, new)"},
+               "overreplicated":  {s.overrep, "overreplicated"},
+               "underreplicated": {s.underrep, "underreplicated"},
+               "lost":            {s.lost, "lost"},
+       }
+       m.setupOnce.Do(func() {
+               // Register gauge(s) for each balancerStats field.
+               addGauge := func(name, help string) {
+                       g := prometheus.NewGauge(prometheus.GaugeOpts{
+                               Name:      name,
+                               Subsystem: "keep",
+                               Help:      help,
+                       })
+                       m.reg.MustRegister(g)
+                       m.statsGauges[name] = g
+               }
+               for name, gauge := range s2g {
+                       switch gauge.Value.(type) {
+                       case blocksNBytes:
+                               for _, sub := range []string{"blocks", "bytes", "replicas"} {
+                                       addGauge(name+"_"+sub, sub+" of "+gauge.Help)
+                               }
+                       case int, int64:
+                               addGauge(name, gauge.Help)
+                       default:
+                               panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+                       }
+               }
+       })
+       // Set gauges to values from s.
+       for name, gauge := range s2g {
+               switch val := gauge.Value.(type) {
+               case blocksNBytes:
+                       m.statsGauges[name+"_blocks"].Set(float64(val.blocks))
+                       m.statsGauges[name+"_bytes"].Set(float64(val.bytes))
+                       m.statsGauges[name+"_replicas"].Set(float64(val.replicas))
+               case int:
+                       m.statsGauges[name].Set(float64(val))
+               case int64:
+                       m.statsGauges[name].Set(float64(val))
+               default:
+                       panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+               }
+       }
+}
+
+func (m *metrics) Handler(log promhttp.Logger) http.Handler {
+       return promhttp.HandlerFor(m.reg, promhttp.HandlerOpts{
+               ErrorLog: log,
+       })
+}
diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go
new file mode 100644 (file)
index 0000000..ad13be7
--- /dev/null
@@ -0,0 +1,197 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+       "net/http"
+       "os"
+       "os/signal"
+       "syscall"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/auth"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/Sirupsen/logrus"
+)
+
+var version = "dev"
+
+const (
+       defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+       rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
+)
+
+// Config specifies site configuration, like API credentials and the
+// choice of which servers are to be balanced.
+//
+// Config is loaded from a JSON config file (see usage()).
+type Config struct {
+       // Arvados API endpoint and credentials.
+       Client arvados.Client
+
+       // List of service types (e.g., "disk") to balance.
+       KeepServiceTypes []string
+
+       KeepServiceList arvados.KeepServiceList
+
+       // address, address:port, or :port for management interface
+       Listen string
+
+       // token for management APIs
+       ManagementToken string
+
+       // How often to check
+       RunPeriod arvados.Duration
+
+       // Number of collections to request in each API call
+       CollectionBatchSize int
+
+       // Max collections to buffer in memory (bigger values consume
+       // more memory, but can reduce store-and-forward latency when
+       // fetching pages)
+       CollectionBuffers int
+
+       // Timeout for outgoing http request/response cycle.
+       RequestTimeout arvados.Duration
+}
+
+// RunOptions controls runtime behavior. The flags/options that belong
+// here are the ones that are useful for interactive use. For example,
+// "CommitTrash" is a runtime option rather than a config item because
+// it invokes a troubleshooting feature rather than expressing how
+// balancing is meant to be done at a given site.
+//
+// RunOptions fields are controlled by command line flags.
+type RunOptions struct {
+       Once        bool
+       CommitPulls bool
+       CommitTrash bool
+       Logger      *logrus.Logger
+       Dumper      *logrus.Logger
+
+       // SafeRendezvousState from the most recent balance operation,
+       // or "" if unknown. If this changes from one run to the next,
+       // we need to watch out for races. See
+       // (*Balancer)ClearTrashLists.
+       SafeRendezvousState string
+}
+
+type Server struct {
+       config     Config
+       runOptions RunOptions
+       metrics    *metrics
+       listening  string // for tests
+
+       Logger *logrus.Logger
+       Dumper *logrus.Logger
+}
+
+// NewServer returns a new Server that runs Balancers using the given
+// config and runOptions.
+func NewServer(config Config, runOptions RunOptions) (*Server, error) {
+       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+               return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+       }
+       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+               return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+       }
+
+       if runOptions.Logger == nil {
+               log := logrus.New()
+               log.Formatter = &logrus.JSONFormatter{
+                       TimestampFormat: rfc3339NanoFixed,
+               }
+               log.Out = os.Stderr
+               runOptions.Logger = log
+       }
+
+       srv := &Server{
+               config:     config,
+               runOptions: runOptions,
+               metrics:    newMetrics(),
+               Logger:     runOptions.Logger,
+               Dumper:     runOptions.Dumper,
+       }
+       return srv, srv.start()
+}
+
+func (srv *Server) start() error {
+       if srv.config.Listen == "" {
+               return nil
+       }
+       server := &httpserver.Server{
+               Server: http.Server{
+                       Handler: httpserver.LogRequests(srv.Logger,
+                               auth.RequireLiteralToken(srv.config.ManagementToken,
+                                       srv.metrics.Handler(srv.Logger))),
+               },
+               Addr: srv.config.Listen,
+       }
+       err := server.Start()
+       if err != nil {
+               return err
+       }
+       srv.Logger.Printf("listening at %s", server.Addr)
+       srv.listening = server.Addr
+       return nil
+}
+
+func (srv *Server) Run() (*Balancer, error) {
+       bal := &Balancer{
+               Logger:  srv.Logger,
+               Dumper:  srv.Dumper,
+               Metrics: srv.metrics,
+       }
+       var err error
+       srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+       return bal, err
+}
+
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func (srv *Server) RunForever(stop <-chan interface{}) error {
+       logger := srv.runOptions.Logger
+
+       ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
+
+       // The unbuffered channel here means we only hear SIGUSR1 if
+       // it arrives while we're waiting in select{}.
+       sigUSR1 := make(chan os.Signal)
+       signal.Notify(sigUSR1, syscall.SIGUSR1)
+
+       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
+
+       for {
+               if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
+                       logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
+                       logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
+               }
+
+               _, err := srv.Run()
+               if err != nil {
+                       logger.Print("run failed: ", err)
+               } else {
+                       logger.Print("run succeeded")
+               }
+
+               select {
+               case <-stop:
+                       signal.Stop(sigUSR1)
+                       return nil
+               case <-ticker.C:
+                       logger.Print("timer went off")
+               case <-sigUSR1:
+                       logger.Print("received SIGUSR1, resetting timer")
+                       // Reset the timer so we don't start the N+1st
+                       // run too soon after the Nth run is triggered
+                       // by SIGUSR1.
+                       ticker.Stop()
+                       ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
+               }
+               logger.Print("starting next run")
+       }
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
deleted file mode 100644 (file)
index 06d727d..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "log"
-       "time"
-)
-
-func timeMe(logger *log.Logger, label string) func() {
-       t0 := time.Now()
-       logger.Printf("%s: start", label)
-       return func() {
-               logger.Printf("%s: took %v", label, time.Since(t0))
-       }
-}
index 4c7d5067182fe89783e104c56063fdaf86545c1b..b39e83905d617f58b605c22c7e1a43cc8aa4c8cb 100644 (file)
@@ -17,6 +17,8 @@ Client:
     Insecure: false
 KeepServiceTypes:
     - disk
+Listen: ":9005"
+ManagementToken: xyzzy
 RunPeriod: 600s
 CollectionBatchSize: 100000
 CollectionBuffers: 1000
index 912398fa64db5d8b18605178f14a77884e234f1d..95948e32505f40112cff4da72c88692d7ea6edff 100644 (file)
@@ -320,7 +320,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 
        if useSiteFS {
                if tokens == nil {
-                       tokens = auth.NewCredentialsFromHTTPRequest(r).Tokens
+                       tokens = auth.CredentialsFromRequest(r).Tokens
                }
                h.serveSiteFS(w, r, tokens, credentialsOK, attachment)
                return
@@ -342,7 +342,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 
        if tokens == nil {
                if credentialsOK {
-                       reqTokens = auth.NewCredentialsFromHTTPRequest(r).Tokens
+                       reqTokens = auth.CredentialsFromRequest(r).Tokens
                }
                tokens = append(reqTokens, h.Config.AnonymousTokens...)
        }
index 68ff8a7b013c2d685299eae2dc7c7da1d84f5606..f70dd1a71f6ae92ecdc3f2979e2296f33238e28f 100644 (file)
@@ -21,7 +21,7 @@ func (srv *server) Start() error {
        reg := prometheus.NewRegistry()
        h.Config.Cache.registry = reg
        mh := httpserver.Instrument(reg, nil, httpserver.AddRequestIDs(httpserver.LogRequests(nil, h)))
-       h.MetricsAPI = mh.ServeAPI(http.NotFoundHandler())
+       h.MetricsAPI = mh.ServeAPI(h.Config.ManagementToken, http.NotFoundHandler())
        srv.Handler = mh
        srv.Addr = srv.Config.Listen
        return srv.Server.Start()
index 7e738cb9f3467a63c5da91cbac253429f0dc5cad..48c9726e3bee6ff8acb56a1a02192963b76556a3 100644 (file)
@@ -323,6 +323,18 @@ func (s *IntegrationSuite) TestMetrics(c *check.C) {
        req, _ = http.NewRequest("GET", origin+"/metrics.json", nil)
        resp, err = http.DefaultClient.Do(req)
        c.Assert(err, check.IsNil)
+       c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+
+       req, _ = http.NewRequest("GET", origin+"/metrics.json", nil)
+       req.Header.Set("Authorization", "Bearer badtoken")
+       resp, err = http.DefaultClient.Do(req)
+       c.Assert(err, check.IsNil)
+       c.Check(resp.StatusCode, check.Equals, http.StatusForbidden)
+
+       req, _ = http.NewRequest("GET", origin+"/metrics.json", nil)
+       req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+       resp, err = http.DefaultClient.Do(req)
+       c.Assert(err, check.IsNil)
        c.Check(resp.StatusCode, check.Equals, http.StatusOK)
        type summary struct {
                SampleCount string  `json:"sample_count"`
@@ -418,6 +430,7 @@ func (s *IntegrationSuite) SetUpTest(c *check.C) {
                Insecure: true,
        }
        cfg.Listen = "127.0.0.1:0"
+       cfg.ManagementToken = arvadostest.ManagementToken
        s.testServer = &server{Config: cfg}
        err := s.testServer.Start()
        c.Assert(err, check.Equals, nil)
index 1f8c7e31a2997ac2884ae2936ea174a0d859e017..2e3fe0a5b130fe5259550b450dea2bf0237cd295 100644 (file)
@@ -46,8 +46,7 @@ type Config struct {
        systemAuthToken string
        debugLogf       func(string, ...interface{})
 
-       ManagementToken string `doc: The secret key that must be provided by monitoring services
-wishing to access the health check endpoint (/_health).`
+       ManagementToken string
 }
 
 var (
index d84ede6ef6b599fbac4aea3c94f43ba8009ff035..e079b96784a16b985ed6ce47f99655e39a571ce9 100644 (file)
@@ -87,9 +87,9 @@ func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
 
        rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
 
-       stack := httpserver.Instrument(nil, nil,
+       instrumented := httpserver.Instrument(nil, nil,
                httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
-       return stack.ServeAPI(stack)
+       return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
 }
 
 // BadRequestHandler is a HandleFunc to address bad requests.
index 9fa0090aa739be1d640b0d2ba3a693a659087284..31b1a684fe6a077ebbbfebf7bb846f6f508a00b5 100644 (file)
@@ -27,6 +27,7 @@ func (s *MountsSuite) SetUpTest(c *check.C) {
        KeepVM = s.vm
        theConfig = DefaultConfig()
        theConfig.systemAuthToken = arvadostest.DataManagerToken
+       theConfig.ManagementToken = arvadostest.ManagementToken
        theConfig.Start()
        s.rtr = MakeRESTRouter(testCluster)
 }
@@ -104,6 +105,10 @@ func (s *MountsSuite) TestMetrics(c *check.C) {
        s.call("PUT", "/"+TestHash, "", TestBlock)
        s.call("PUT", "/"+TestHash2, "", TestBlock2)
        resp := s.call("GET", "/metrics.json", "", nil)
+       c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+       resp = s.call("GET", "/metrics.json", "foobar", nil)
+       c.Check(resp.Code, check.Equals, http.StatusForbidden)
+       resp = s.call("GET", "/metrics.json", arvadostest.ManagementToken, nil)
        c.Check(resp.Code, check.Equals, http.StatusOK)
        var j []struct {
                Name   string
@@ -144,7 +149,7 @@ func (s *MountsSuite) call(method, path, tok string, body []byte) *httptest.Resp
        resp := httptest.NewRecorder()
        req, _ := http.NewRequest(method, path, bytes.NewReader(body))
        if tok != "" {
-               req.Header.Set("Authorization", "OAuth2 "+tok)
+               req.Header.Set("Authorization", "Bearer "+tok)
        }
        s.rtr.ServeHTTP(resp, req)
        return resp