Cluster:
# The cluster uuid prefix
zzzzz:
+ ManagementToken: xyzzy
NodeProfile:
# For each node, the profile name corresponds to a
# locally-resolvable hostname, and describes which Arvados
# services are available on that machine.
api:
arvados-controller:
- Listen: 8000
+ Listen: :8000
arvados-api-server:
- Listen: 8001
+ Listen: :8001
manage:
arvados-node-manager:
- Listen: 8002
+ Listen: :8002
workbench:
arvados-workbench:
- Listen: 8003
+ Listen: :8003
arvados-ws:
- Listen: 8004
+ Listen: :8004
keep:
keep-web:
- Listen: 8005
+ Listen: :8005
keepproxy:
- Listen: 8006
+ Listen: :8006
+ keep-balance:
+ Listen: :9005
keep0:
keepstore:
- Listen: 25701
+ Listen: :25107
keep1:
keepstore:
- Listen: 25701
+ Listen: :25107
</pre>
}
</pre>
+h2. Keep-balance
+
+Keep-balance exports metrics at @/metrics@ -- e.g., @http://keep.zzzzz.arvadosapi.com:9005/metrics@.
+
+table(table table-bordered table-condensed).
+|_. Name|_. Type|_. Description|
+|arvados_keep_total_{replicas,blocks,bytes}|gauge|stored data (stored in backend volumes, whether referenced or not)|
+|arvados_keep_garbage_{replicas,blocks,bytes}|gauge|garbage data (unreferenced, and old enough to trash)|
+|arvados_keep_transient_{replicas,blocks,bytes}|gauge|transient data (unreferenced, but too new to trash)|
+|arvados_keep_overreplicated_{replicas,blocks,bytes}|gauge|overreplicated data (more replicas exist than are needed)|
+|arvados_keep_underreplicated_{replicas,blocks,bytes}|gauge|underreplicated data (fewer replicas exist than are needed)|
+|arvados_keep_lost_{replicas,blocks,bytes}|gauge|lost data (referenced by collections, but not found on any backend volume)|
+|arvados_keepbalance_get_state_seconds|summary|time to get all collections and keepstore volume indexes for one iteration|
+|arvados_keepbalance_changeset_compute_seconds|summary|time to compute changesets for one iteration|
+|arvados_keepbalance_send_pull_list_seconds|summary|time to send pull lists to all keepstore servers for one iteration|
+|arvados_keepbalance_send_trash_list_seconds|summary|time to send trash lists to all keepstore servers for one iteration|
+|arvados_keepbalance_sweep_seconds|summary|time to complete one iteration|
+
+Each @arvados_keep_@ storage state statistic above is presented as a set of three metrics:
+
+table(table table-bordered table-condensed).
+|*_blocks|distinct block hashes|
+|*_bytes|bytes stored on backend volumes|
+|*_replicas|objects/files stored on backend volumes|
+
h2. Node manager
The node manager status end point provides a snapshot of internal status at the time of the most recent wishlist update.
On the host running keep-balance, create @/etc/arvados/keep-balance/keep-balance.yml@ using the token you generated above. Follow this YAML format:
<notextile>
-<pre><code>Client:
+<pre><code>Listen: :9005
+Client:
APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
KeepServiceTypes:
- disk
+Listen: :9005
+ManagementToken: <span class="userinput">xyzzy</span>
RunPeriod: 10m
CollectionBatchSize: 100000
CollectionBuffers: 1000
# Format of request/response and error logs: "json" or "text".
LogFormat: json
-# The secret key that must be provided by monitoring services
-# wishing to access the health check endpoint (/_health).
-ManagementToken: ""
+# The secret key that must be provided by monitoring services when
+# using the health check and metrics endpoints (/_health, /metrics).
+ManagementToken: xyzzy
# Maximum RAM to use for data buffers, given in multiples of block
# size (64 MiB). When this limit is reached, HTTP requests requiring
type NodeProfile struct {
Controller SystemServiceInstance `json:"arvados-controller"`
Health SystemServiceInstance `json:"arvados-health"`
+ Keepbalance SystemServiceInstance `json:"keep-balance"`
Keepproxy SystemServiceInstance `json:"keepproxy"`
Keepstore SystemServiceInstance `json:"keepstore"`
Keepweb SystemServiceInstance `json:"keep-web"`
ServiceNameNodemanager ServiceName = "arvados-node-manager"
ServiceNameWorkbench ServiceName = "arvados-workbench"
ServiceNameWebsocket ServiceName = "arvados-ws"
+ ServiceNameKeepbalance ServiceName = "keep-balance"
ServiceNameKeepweb ServiceName = "keep-web"
ServiceNameKeepproxy ServiceName = "keepproxy"
ServiceNameKeepstore ServiceName = "keepstore"
ServiceNameNodemanager: np.Nodemanager.Listen,
ServiceNameWorkbench: np.Workbench.Listen,
ServiceNameWebsocket: np.Websocket.Listen,
+ ServiceNameKeepbalance: np.Keepbalance.Listen,
ServiceNameKeepweb: np.Keepweb.Listen,
ServiceNameKeepproxy: np.Keepproxy.Listen,
ServiceNameKeepstore: np.Keepstore.Listen,
return &Credentials{Tokens: []string{}}
}
-func NewCredentialsFromHTTPRequest(r *http.Request) *Credentials {
+func CredentialsFromRequest(r *http.Request) *Credentials {
+ if c, ok := r.Context().Value(contextKeyCredentials).(*Credentials); ok {
+ // preloaded by middleware
+ return c
+ }
c := NewCredentials()
c.LoadTokensFromHTTPRequest(r)
return c
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package auth
+
+import (
+ "context"
+ "net/http"
+)
+
+type contextKey string
+
+var contextKeyCredentials contextKey = "credentials"
+
+// LoadToken wraps the next handler, adding credentials to the request
+// context so subsequent handlers can access them efficiently via
+// CredentialsFromRequest.
+func LoadToken(next http.Handler) http.Handler {
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), contextKeyCredentials, CredentialsFromRequest(r))))
+ })
+}
+
+// RequireLiteralToken wraps the next handler, rejecting any request
+// that doesn't supply the given token. If the given token is empty,
+// RequireLiteralToken returns next (i.e., no auth checks are
+// performed).
+func RequireLiteralToken(token string, next http.Handler) http.Handler {
+ if token == "" {
+ return next
+ }
+ return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ c := CredentialsFromRequest(r)
+ if len(c.Tokens) == 0 {
+ http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
+ return
+ }
+ for _, t := range c.Tokens {
+ if t == token {
+ next.ServeHTTP(w, r)
+ return
+ }
+ }
+ http.Error(w, http.StatusText(http.StatusForbidden), http.StatusForbidden)
+ })
+}
}
func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
- creds := auth.NewCredentialsFromHTTPRequest(req)
+ creds := auth.CredentialsFromRequest(req)
for _, token := range creds.Tokens {
if token != "" && token == cluster.ManagementToken {
return true
defer srv.Close()
s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
Controller: arvados.SystemServiceInstance{Listen: listen},
+ Keepbalance: arvados.SystemServiceInstance{Listen: listen},
Keepproxy: arvados.SystemServiceInstance{Listen: listen},
Keepstore: arvados.SystemServiceInstance{Listen: listen},
Keepweb: arvados.SystemServiceInstance{Listen: listen},
defer srvU.Close()
s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
Controller: arvados.SystemServiceInstance{Listen: listenH},
+ Keepbalance: arvados.SystemServiceInstance{Listen: listenH},
Keepproxy: arvados.SystemServiceInstance{Listen: listenH},
Keepstore: arvados.SystemServiceInstance{Listen: listenH},
Keepweb: arvados.SystemServiceInstance{Listen: listenH},
"strings"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
"git.curoverse.com/arvados.git/sdk/go/stats"
"github.com/Sirupsen/logrus"
"github.com/gogo/protobuf/jsonpb"
// Returns an http.Handler that serves the Handler's metrics
// data at /metrics and /metrics.json, and passes other
// requests through to next.
- ServeAPI(next http.Handler) http.Handler
+ ServeAPI(token string, next http.Handler) http.Handler
}
type metrics struct {
// metrics API endpoints (currently "GET /metrics(.json)?") and passes
// other requests through to next.
//
+// If the given token is not empty, that token must be supplied by a
+// client in order to access the metrics endpoints.
+//
// Typical example:
//
// m := Instrument(...)
-// srv := http.Server{Handler: m.ServeAPI(m)}
-func (m *metrics) ServeAPI(next http.Handler) http.Handler {
+// srv := http.Server{Handler: m.ServeAPI("secrettoken", m)}
+func (m *metrics) ServeAPI(token string, next http.Handler) http.Handler {
+ jsonMetrics := auth.RequireLiteralToken(token, http.HandlerFunc(m.exportJSON))
+ plainMetrics := auth.RequireLiteralToken(token, m.exportProm)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
switch {
case req.Method != "GET" && req.Method != "HEAD":
next.ServeHTTP(w, req)
case req.URL.Path == "/metrics.json":
- m.exportJSON(w, req)
+ jsonMetrics.ServeHTTP(w, req)
case req.URL.Path == "/metrics":
- m.exportProm.ServeHTTP(w, req)
+ plainMetrics.ServeHTTP(w, req)
default:
next.ServeHTTP(w, req)
}
httpserver.Log(r.RemoteAddr, passwordToLog, w.WroteStatus(), statusText, repoName, r.Method, r.URL.Path)
}()
- creds := auth.NewCredentialsFromHTTPRequest(r)
+ creds := auth.CredentialsFromRequest(r)
if len(creds.Tokens) == 0 {
statusCode, statusText = http.StatusUnauthorized, "no credentials provided"
w.Header().Add("WWW-Authenticate", "Basic realm=\"git\"")
"fmt"
"log"
"math"
- "os"
"runtime"
"sort"
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/Sirupsen/logrus"
)
-// CheckConfig returns an error if anything is wrong with the given
-// config and runOptions.
-func CheckConfig(config Config, runOptions RunOptions) error {
- if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
- return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
- }
- if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
- return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
- }
- return nil
-}
-
// Balancer compares the contents of keepstore servers with the
// collections stored in Arvados, and issues pull/trash requests
// needed to get (closer to) the optimal data layout.
// BlobSignatureTTL; and all N existing replicas of a given data block
// are in the N best positions in rendezvous probe order.
type Balancer struct {
+ Logger *logrus.Logger
+ Dumper *logrus.Logger
+ Metrics *metrics
+
*BlockStateMap
KeepServices map[string]*KeepService
DefaultReplication int
- Logger *log.Logger
- Dumper *log.Logger
MinMtime int64
classes []string
func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
nextRunOptions = runOptions
- bal.Dumper = runOptions.Dumper
- bal.Logger = runOptions.Logger
- if bal.Logger == nil {
- bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
- }
-
- defer timeMe(bal.Logger, "Run")()
+ defer bal.time("sweep", "wall clock time to run one full sweep")()
if len(config.KeepServiceList.Items) > 0 {
err = bal.SetKeepServices(config.KeepServiceList)
//
// It encodes the resulting information in BlockStateMap.
func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
- defer timeMe(bal.Logger, "GetCurrentState")()
+ defer bal.time("get_state", "wall clock time to get current state")()
bal.BlockStateMap = NewBlockStateMap()
dd, err := c.DiscoveryDocument()
func (bal *Balancer) ComputeChangeSets() {
// This just calls balanceBlock() once for each block, using a
// pool of worker goroutines.
- defer timeMe(bal.Logger, "ComputeChangeSets")()
+ defer bal.time("changeset_compute", "wall clock time to compute changesets")()
bal.setupLookupTables()
type balanceTask struct {
s.trashes += len(srv.ChangeSet.Trashes)
}
bal.stats = s
+ bal.Metrics.UpdateStats(s)
}
// PrintStatistics writes statistics about the computed changes to
// existing blocks that are either underreplicated or poorly
// distributed according to rendezvous hashing.
func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+ defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
return bal.commitAsync(c, "send pull list",
func(srv *KeepService) error {
return srv.CommitPulls(c)
// keepstore servers. This has the effect of deleting blocks that are
// overreplicated or unreferenced.
func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+ defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
return bal.commitAsync(c, "send trash list",
func(srv *KeepService) error {
return srv.CommitTrash(c)
var err error
defer func() { errs <- err }()
label := fmt.Sprintf("%s: %v", srv, label)
- defer timeMe(bal.Logger, label)()
err = f(srv)
if err != nil {
err = fmt.Errorf("%s: %v", label, err)
}
}
+func (bal *Balancer) time(name, help string) func() {
+ observer := bal.Metrics.DurationObserver(name+"_seconds", help)
+ t0 := time.Now()
+ bal.Logger.Printf("%s: start", name)
+ return func() {
+ dur := time.Since(t0)
+ observer.Observe(dur.Seconds())
+ bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
+ }
+}
+
// Rendezvous hash sort function. Less efficient than sorting on
// precomputed rendezvous hashes, but also rarely used.
func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"net/http/httptest"
"strings"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/Sirupsen/logrus"
check "gopkg.in/check.v1"
)
}
// make a log.Logger that writes to the current test's c.Log().
-func (s *runSuite) logger(c *check.C) *log.Logger {
+func (s *runSuite) logger(c *check.C) *logrus.Logger {
r, w := io.Pipe()
go func() {
buf := make([]byte, 10000)
}
}
}()
- return log.New(w, "", log.LstdFlags)
+ logger := logrus.New()
+ logger.Out = w
+ return logger
}
func (s *runSuite) SetUpTest(c *check.C) {
AuthToken: "xyzzy",
APIHost: "zzzzz.arvadosapi.com",
Client: s.stub.Start()},
- KeepServiceTypes: []string{"disk"}}
+ KeepServiceTypes: []string{"disk"},
+ RunPeriod: arvados.Duration(time.Second),
+ }
s.stub.serveDiscoveryDoc()
s.stub.logf = c.Logf
}
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- _, err := (&Balancer{}).Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ _, err = srv.Run()
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveKeepstoreMounts()
indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
- _, err := (&Balancer{}).Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ _, err = srv.Run()
c.Check(err, check.IsNil)
c.Check(indexReqs.Count(), check.Equals, 0)
c.Check(trashReqs.Count(), check.Equals, 0)
s.stub.serveKeepstoreMounts()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- _, err := (&Balancer{}).Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ _, err = srv.Run()
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- _, err := (&Balancer{}).Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ _, err = srv.Run()
c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- var bal Balancer
- _, err := bal.Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ bal, err := srv.Run()
c.Check(err, check.IsNil)
for _, req := range collReqs.reqs {
c.Check(req.Form.Get("include_trash"), check.Equals, "true")
}
func (s *runSuite) TestCommit(c *check.C) {
+ s.config.Listen = ":"
+ s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- var bal Balancer
- _, err := bal.Run(s.config, opts)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+ bal, err := srv.Run()
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
// "bar" block is underreplicated by 1, and its only copy is
// in a poor rendezvous position
c.Check(bal.stats.pulls, check.Equals, 2)
+
+ metrics := s.getMetrics(c, srv)
+ c.Check(metrics, check.Matches, `(?ms).*\nkeep_total_bytes 15\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+ c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count 1\n.*`)
}
func (s *runSuite) TestRunForever(c *check.C) {
+ s.config.Listen = ":"
+ s.config.ManagementToken = "xyzzy"
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
stop := make(chan interface{})
s.config.RunPeriod = arvados.Duration(time.Millisecond)
- go RunForever(s.config, opts, stop)
+ srv, err := NewServer(s.config, opts)
+ c.Assert(err, check.IsNil)
+
+ done := make(chan bool)
+ go func() {
+ srv.RunForever(stop)
+ close(done)
+ }()
// Each run should send 4 pull lists + 4 trash lists. The
// first run should also send 4 empty trash lists at
time.Sleep(time.Millisecond)
}
stop <- true
+ <-done
c.Check(pullReqs.Count() >= 16, check.Equals, true)
c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
+ c.Check(s.getMetrics(c, srv), check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
+}
+
+func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
+ resp, err := http.Get("http://" + srv.listening + "/metrics")
+ c.Assert(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+
+ resp, err = http.Get("http://" + srv.listening + "/metrics?api_token=xyzzy")
+ c.Assert(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ buf, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, check.IsNil)
+ return string(buf)
}
import (
"bytes"
- "log"
"os"
"strings"
"testing"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "github.com/Sirupsen/logrus"
check "gopkg.in/check.v1"
)
Insecure: true,
},
KeepServiceTypes: []string{"disk"},
+ RunPeriod: arvados.Duration(time.Second),
}
}
var logBuf *bytes.Buffer
for iter := 0; iter < 20; iter++ {
logBuf := &bytes.Buffer{}
+ logger := logrus.New()
+ logger.Out = logBuf
opts := RunOptions{
CommitPulls: true,
CommitTrash: true,
- Logger: log.New(logBuf, "", log.LstdFlags),
+ Logger: logger,
}
- nextOpts, err := (&Balancer{}).Run(s.config, opts)
+
+ bal := &Balancer{
+ Logger: logger,
+ Metrics: newMetrics(),
+ }
+ nextOpts, err := bal.Run(s.config, opts)
c.Check(err, check.IsNil)
c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
c.Check(nextOpts.CommitPulls, check.Equals, true)
"log"
"net/http"
"os"
- "os/signal"
- "syscall"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/config"
+ "github.com/Sirupsen/logrus"
)
-var version = "dev"
-
-const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
-
-// Config specifies site configuration, like API credentials and the
-// choice of which servers are to be balanced.
-//
-// Config is loaded from a JSON config file (see usage()).
-type Config struct {
- // Arvados API endpoint and credentials.
- Client arvados.Client
-
- // List of service types (e.g., "disk") to balance.
- KeepServiceTypes []string
-
- KeepServiceList arvados.KeepServiceList
-
- // How often to check
- RunPeriod arvados.Duration
-
- // Number of collections to request in each API call
- CollectionBatchSize int
-
- // Max collections to buffer in memory (bigger values consume
- // more memory, but can reduce store-and-forward latency when
- // fetching pages)
- CollectionBuffers int
-
- // Timeout for outgoing http request/response cycle.
- RequestTimeout arvados.Duration
-}
-
-// RunOptions controls runtime behavior. The flags/options that belong
-// here are the ones that are useful for interactive use. For example,
-// "CommitTrash" is a runtime option rather than a config item because
-// it invokes a troubleshooting feature rather than expressing how
-// balancing is meant to be done at a given site.
-//
-// RunOptions fields are controlled by command line flags.
-type RunOptions struct {
- Once bool
- CommitPulls bool
- CommitTrash bool
- Logger *log.Logger
- Dumper *log.Logger
-
- // SafeRendezvousState from the most recent balance operation,
- // or "" if unknown. If this changes from one run to the next,
- // we need to watch out for races. See
- // (*Balancer)ClearTrashLists.
- SafeRendezvousState string
-}
-
var debugf = func(string, ...interface{}) {}
func main() {
}
}
if *dumpFlag {
- runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+ runOptions.Dumper = logrus.New()
+ runOptions.Dumper.Out = os.Stdout
+ runOptions.Dumper.Formatter = &logrus.TextFormatter{}
}
- err := CheckConfig(cfg, runOptions)
+ srv, err := NewServer(cfg, runOptions)
if err != nil {
// (don't run)
} else if runOptions.Once {
- _, err = (&Balancer{}).Run(cfg, runOptions)
+ _, err = srv.Run()
} else {
- err = RunForever(cfg, runOptions, nil)
+ err = srv.RunForever(nil)
}
if err != nil {
log.Fatal(err)
log.Fatal(err)
}
}
-
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
- if runOptions.Logger == nil {
- runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
- }
- logger := runOptions.Logger
-
- ticker := time.NewTicker(time.Duration(config.RunPeriod))
-
- // The unbuffered channel here means we only hear SIGUSR1 if
- // it arrives while we're waiting in select{}.
- sigUSR1 := make(chan os.Signal)
- signal.Notify(sigUSR1, syscall.SIGUSR1)
-
- logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
-
- for {
- if !runOptions.CommitPulls && !runOptions.CommitTrash {
- logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
- logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
- }
-
- bal := &Balancer{}
- var err error
- runOptions, err = bal.Run(config, runOptions)
- if err != nil {
- logger.Print("run failed: ", err)
- } else {
- logger.Print("run succeeded")
- }
-
- select {
- case <-stop:
- signal.Stop(sigUSR1)
- return nil
- case <-ticker.C:
- logger.Print("timer went off")
- case <-sigUSR1:
- logger.Print("received SIGUSR1, resetting timer")
- // Reset the timer so we don't start the N+1st
- // run too soon after the Nth run is triggered
- // by SIGUSR1.
- ticker.Stop()
- ticker = time.NewTicker(time.Duration(config.RunPeriod))
- }
- logger.Print("starting next run")
- }
-}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "fmt"
+ "net/http"
+ "sync"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type observer interface{ Observe(float64) }
+type setter interface{ Set(float64) }
+
+type metrics struct {
+ reg *prometheus.Registry
+ statsGauges map[string]setter
+ observers map[string]observer
+ setupOnce sync.Once
+ mtx sync.Mutex
+}
+
+func newMetrics() *metrics {
+ return &metrics{
+ reg: prometheus.NewRegistry(),
+ statsGauges: map[string]setter{},
+ observers: map[string]observer{},
+ }
+}
+
+func (m *metrics) DurationObserver(name, help string) observer {
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+ if obs, ok := m.observers[name]; ok {
+ return obs
+ }
+ summary := prometheus.NewSummary(prometheus.SummaryOpts{
+ Name: name,
+ Subsystem: "keepbalance",
+ Help: help,
+ })
+ m.reg.MustRegister(summary)
+ m.observers[name] = summary
+ return summary
+}
+
+// UpdateStats updates prometheus metrics using the given
+// balancerStats. It creates and registers the needed gauges on its
+// first invocation.
+func (m *metrics) UpdateStats(s balancerStats) {
+ type gauge struct {
+ Value interface{}
+ Help string
+ }
+ s2g := map[string]gauge{
+ "total": {s.current, "current backend storage usage"},
+ "garbage": {s.garbage, "garbage (unreferenced, old)"},
+ "transient": {s.unref, "transient (unreferenced, new)"},
+ "overreplicated": {s.overrep, "overreplicated"},
+ "underreplicated": {s.underrep, "underreplicated"},
+ "lost": {s.lost, "lost"},
+ }
+ m.setupOnce.Do(func() {
+ // Register gauge(s) for each balancerStats field.
+ addGauge := func(name, help string) {
+ g := prometheus.NewGauge(prometheus.GaugeOpts{
+ Name: name,
+ Subsystem: "keep",
+ Help: help,
+ })
+ m.reg.MustRegister(g)
+ m.statsGauges[name] = g
+ }
+ for name, gauge := range s2g {
+ switch gauge.Value.(type) {
+ case blocksNBytes:
+ for _, sub := range []string{"blocks", "bytes", "replicas"} {
+ addGauge(name+"_"+sub, sub+" of "+gauge.Help)
+ }
+ case int, int64:
+ addGauge(name, gauge.Help)
+ default:
+ panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+ }
+ }
+ })
+ // Set gauges to values from s.
+ for name, gauge := range s2g {
+ switch val := gauge.Value.(type) {
+ case blocksNBytes:
+ m.statsGauges[name+"_blocks"].Set(float64(val.blocks))
+ m.statsGauges[name+"_bytes"].Set(float64(val.bytes))
+ m.statsGauges[name+"_replicas"].Set(float64(val.replicas))
+ case int:
+ m.statsGauges[name].Set(float64(val))
+ case int64:
+ m.statsGauges[name].Set(float64(val))
+ default:
+ panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+ }
+ }
+}
+
+func (m *metrics) Handler(log promhttp.Logger) http.Handler {
+ return promhttp.HandlerFor(m.reg, promhttp.HandlerOpts{
+ ErrorLog: log,
+ })
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "fmt"
+ "net/http"
+ "os"
+ "os/signal"
+ "syscall"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "github.com/Sirupsen/logrus"
+)
+
+var version = "dev"
+
+const (
+ defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+ rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+)
+
+// Config specifies site configuration, like API credentials and the
+// choice of which servers are to be balanced.
+//
+// Config is loaded from a JSON config file (see usage()).
+type Config struct {
+ // Arvados API endpoint and credentials.
+ Client arvados.Client
+
+ // List of service types (e.g., "disk") to balance.
+ KeepServiceTypes []string
+
+ KeepServiceList arvados.KeepServiceList
+
+ // address, address:port, or :port for management interface
+ Listen string
+
+ // token for management APIs
+ ManagementToken string
+
+ // How often to check
+ RunPeriod arvados.Duration
+
+ // Number of collections to request in each API call
+ CollectionBatchSize int
+
+ // Max collections to buffer in memory (bigger values consume
+ // more memory, but can reduce store-and-forward latency when
+ // fetching pages)
+ CollectionBuffers int
+
+ // Timeout for outgoing http request/response cycle.
+ RequestTimeout arvados.Duration
+}
+
+// RunOptions controls runtime behavior. The flags/options that belong
+// here are the ones that are useful for interactive use. For example,
+// "CommitTrash" is a runtime option rather than a config item because
+// it invokes a troubleshooting feature rather than expressing how
+// balancing is meant to be done at a given site.
+//
+// RunOptions fields are controlled by command line flags.
+type RunOptions struct {
+ Once bool
+ CommitPulls bool
+ CommitTrash bool
+ Logger *logrus.Logger
+ Dumper *logrus.Logger
+
+ // SafeRendezvousState from the most recent balance operation,
+ // or "" if unknown. If this changes from one run to the next,
+ // we need to watch out for races. See
+ // (*Balancer)ClearTrashLists.
+ SafeRendezvousState string
+}
+
+type Server struct {
+ config Config
+ runOptions RunOptions
+ metrics *metrics
+ listening string // for tests
+
+ Logger *logrus.Logger
+ Dumper *logrus.Logger
+}
+
+// NewServer returns a new Server that runs Balancers using the given
+// config and runOptions.
+func NewServer(config Config, runOptions RunOptions) (*Server, error) {
+ if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+ return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+ }
+ if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+ return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+ }
+
+ if runOptions.Logger == nil {
+ log := logrus.New()
+ log.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ }
+ log.Out = os.Stderr
+ runOptions.Logger = log
+ }
+
+ srv := &Server{
+ config: config,
+ runOptions: runOptions,
+ metrics: newMetrics(),
+ Logger: runOptions.Logger,
+ Dumper: runOptions.Dumper,
+ }
+ return srv, srv.start()
+}
+
+func (srv *Server) start() error {
+ if srv.config.Listen == "" {
+ return nil
+ }
+ server := &httpserver.Server{
+ Server: http.Server{
+ Handler: httpserver.LogRequests(srv.Logger,
+ auth.RequireLiteralToken(srv.config.ManagementToken,
+ srv.metrics.Handler(srv.Logger))),
+ },
+ Addr: srv.config.Listen,
+ }
+ err := server.Start()
+ if err != nil {
+ return err
+ }
+ srv.Logger.Printf("listening at %s", server.Addr)
+ srv.listening = server.Addr
+ return nil
+}
+
+func (srv *Server) Run() (*Balancer, error) {
+ bal := &Balancer{
+ Logger: srv.Logger,
+ Dumper: srv.Dumper,
+ Metrics: srv.metrics,
+ }
+ var err error
+ srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+ return bal, err
+}
+
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func (srv *Server) RunForever(stop <-chan interface{}) error {
+ logger := srv.runOptions.Logger
+
+ ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
+
+ // The unbuffered channel here means we only hear SIGUSR1 if
+ // it arrives while we're waiting in select{}.
+ sigUSR1 := make(chan os.Signal)
+ signal.Notify(sigUSR1, syscall.SIGUSR1)
+
+ logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
+
+ for {
+ if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
+ logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
+ logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
+ }
+
+ _, err := srv.Run()
+ if err != nil {
+ logger.Print("run failed: ", err)
+ } else {
+ logger.Print("run succeeded")
+ }
+
+ select {
+ case <-stop:
+ signal.Stop(sigUSR1)
+ return nil
+ case <-ticker.C:
+ logger.Print("timer went off")
+ case <-sigUSR1:
+ logger.Print("received SIGUSR1, resetting timer")
+ // Reset the timer so we don't start the N+1st
+ // run too soon after the Nth run is triggered
+ // by SIGUSR1.
+ ticker.Stop()
+ ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
+ }
+ logger.Print("starting next run")
+ }
+}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "log"
- "time"
-)
-
-func timeMe(logger *log.Logger, label string) func() {
- t0 := time.Now()
- logger.Printf("%s: start", label)
- return func() {
- logger.Printf("%s: took %v", label, time.Since(t0))
- }
-}
Insecure: false
KeepServiceTypes:
- disk
+Listen: ":9005"
+ManagementToken: xyzzy
RunPeriod: 600s
CollectionBatchSize: 100000
CollectionBuffers: 1000
if useSiteFS {
if tokens == nil {
- tokens = auth.NewCredentialsFromHTTPRequest(r).Tokens
+ tokens = auth.CredentialsFromRequest(r).Tokens
}
h.serveSiteFS(w, r, tokens, credentialsOK, attachment)
return
if tokens == nil {
if credentialsOK {
- reqTokens = auth.NewCredentialsFromHTTPRequest(r).Tokens
+ reqTokens = auth.CredentialsFromRequest(r).Tokens
}
tokens = append(reqTokens, h.Config.AnonymousTokens...)
}
reg := prometheus.NewRegistry()
h.Config.Cache.registry = reg
mh := httpserver.Instrument(reg, nil, httpserver.AddRequestIDs(httpserver.LogRequests(nil, h)))
- h.MetricsAPI = mh.ServeAPI(http.NotFoundHandler())
+ h.MetricsAPI = mh.ServeAPI(h.Config.ManagementToken, http.NotFoundHandler())
srv.Handler = mh
srv.Addr = srv.Config.Listen
return srv.Server.Start()
req, _ = http.NewRequest("GET", origin+"/metrics.json", nil)
resp, err = http.DefaultClient.Do(req)
c.Assert(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusUnauthorized)
+
+ req, _ = http.NewRequest("GET", origin+"/metrics.json", nil)
+ req.Header.Set("Authorization", "Bearer badtoken")
+ resp, err = http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusForbidden)
+
+ req, _ = http.NewRequest("GET", origin+"/metrics.json", nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+ resp, err = http.DefaultClient.Do(req)
+ c.Assert(err, check.IsNil)
c.Check(resp.StatusCode, check.Equals, http.StatusOK)
type summary struct {
SampleCount string `json:"sample_count"`
Insecure: true,
}
cfg.Listen = "127.0.0.1:0"
+ cfg.ManagementToken = arvadostest.ManagementToken
s.testServer = &server{Config: cfg}
err := s.testServer.Start()
c.Assert(err, check.Equals, nil)
systemAuthToken string
debugLogf func(string, ...interface{})
- ManagementToken string `doc: The secret key that must be provided by monitoring services
-wishing to access the health check endpoint (/_health).`
+ ManagementToken string
}
var (
rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
- stack := httpserver.Instrument(nil, nil,
+ instrumented := httpserver.Instrument(nil, nil,
httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
- return stack.ServeAPI(stack)
+ return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
}
// BadRequestHandler is a HandleFunc to address bad requests.
KeepVM = s.vm
theConfig = DefaultConfig()
theConfig.systemAuthToken = arvadostest.DataManagerToken
+ theConfig.ManagementToken = arvadostest.ManagementToken
theConfig.Start()
s.rtr = MakeRESTRouter(testCluster)
}
s.call("PUT", "/"+TestHash, "", TestBlock)
s.call("PUT", "/"+TestHash2, "", TestBlock2)
resp := s.call("GET", "/metrics.json", "", nil)
+ c.Check(resp.Code, check.Equals, http.StatusUnauthorized)
+ resp = s.call("GET", "/metrics.json", "foobar", nil)
+ c.Check(resp.Code, check.Equals, http.StatusForbidden)
+ resp = s.call("GET", "/metrics.json", arvadostest.ManagementToken, nil)
c.Check(resp.Code, check.Equals, http.StatusOK)
var j []struct {
Name string
resp := httptest.NewRecorder()
req, _ := http.NewRequest(method, path, bytes.NewReader(body))
if tok != "" {
- req.Header.Set("Authorization", "OAuth2 "+tok)
+ req.Header.Set("Authorization", "Bearer "+tok)
}
s.rtr.ServeHTTP(resp, req)
return resp