14285: Export stats as prometheus metrics.
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 8 Oct 2018 20:43:40 +0000 (16:43 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 8 Oct 2018 20:43:40 +0000 (16:43 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

doc/install/install-keep-balance.html.textile.liquid
sdk/go/arvados/config.go
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/integration_test.go
services/keep-balance/main.go
services/keep-balance/metrics.go [new file with mode: 0644]
services/keep-balance/time_me.go [deleted file]

index 3a8dce078dd092bfe687639f912415b2553bf14c..043f3ebfd279d4034125852c7a08de7812a442d6 100644 (file)
@@ -75,7 +75,8 @@ h3. Create a keep-balance configuration file
 On the host running keep-balance, create @/etc/arvados/keep-balance/keep-balance.yml@ using the token you generated above.  Follow this YAML format:
 
 <notextile>
-<pre><code>Client:
+<pre><code>Listen: :9005
+Client:
   APIHost: <span class="userinput">uuid_prefix.your.domain</span>:443
   AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
 KeepServiceTypes:
index c723be7d10d4013b04e7a8b75ff7176bfb8f58f3..e2e9907d5d115ebb61a53ae873249511b3732a50 100644 (file)
@@ -161,6 +161,7 @@ func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
 type NodeProfile struct {
        Controller  SystemServiceInstance `json:"arvados-controller"`
        Health      SystemServiceInstance `json:"arvados-health"`
+       Keepbalance SystemServiceInstance `json:"keep-balance"`
        Keepproxy   SystemServiceInstance `json:"keepproxy"`
        Keepstore   SystemServiceInstance `json:"keepstore"`
        Keepweb     SystemServiceInstance `json:"keep-web"`
@@ -178,6 +179,7 @@ const (
        ServiceNameNodemanager ServiceName = "arvados-node-manager"
        ServiceNameWorkbench   ServiceName = "arvados-workbench"
        ServiceNameWebsocket   ServiceName = "arvados-ws"
+       ServiceNameKeepbalance ServiceName = "keep-balance"
        ServiceNameKeepweb     ServiceName = "keep-web"
        ServiceNameKeepproxy   ServiceName = "keepproxy"
        ServiceNameKeepstore   ServiceName = "keepstore"
@@ -192,6 +194,7 @@ func (np *NodeProfile) ServicePorts() map[ServiceName]string {
                ServiceNameNodemanager: np.Nodemanager.Listen,
                ServiceNameWorkbench:   np.Workbench.Listen,
                ServiceNameWebsocket:   np.Websocket.Listen,
+               ServiceNameKeepbalance: np.Keepbalance.Listen,
                ServiceNameKeepweb:     np.Keepweb.Listen,
                ServiceNameKeepproxy:   np.Keepproxy.Listen,
                ServiceNameKeepstore:   np.Keepstore.Listen,
index d86234a936cc96702f3a79d12c10d04548c0faa2..333a4fbde99b8470ed25fb45503baf2f2d5241a1 100644 (file)
@@ -10,7 +10,6 @@ import (
        "fmt"
        "log"
        "math"
-       "os"
        "runtime"
        "sort"
        "strings"
@@ -19,20 +18,9 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/Sirupsen/logrus"
 )
 
-// CheckConfig returns an error if anything is wrong with the given
-// config and runOptions.
-func CheckConfig(config Config, runOptions RunOptions) error {
-       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
-               return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
-       }
-       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
-               return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
-       }
-       return nil
-}
-
 // Balancer compares the contents of keepstore servers with the
 // collections stored in Arvados, and issues pull/trash requests
 // needed to get (closer to) the optimal data layout.
@@ -43,11 +31,13 @@ func CheckConfig(config Config, runOptions RunOptions) error {
 // BlobSignatureTTL; and all N existing replicas of a given data block
 // are in the N best positions in rendezvous probe order.
 type Balancer struct {
+       Logger  *logrus.Logger
+       Dumper  *logrus.Logger
+       Metrics *metrics
+
        *BlockStateMap
        KeepServices       map[string]*KeepService
        DefaultReplication int
-       Logger             *log.Logger
-       Dumper             *log.Logger
        MinMtime           int64
 
        classes       []string
@@ -72,13 +62,7 @@ type Balancer struct {
 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
-       bal.Dumper = runOptions.Dumper
-       bal.Logger = runOptions.Logger
-       if bal.Logger == nil {
-               bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
-       }
-
-       defer timeMe(bal.Logger, "Run")()
+       defer bal.time("sweep", "wall clock time to run one full sweep")()
 
        if len(config.KeepServiceList.Items) > 0 {
                err = bal.SetKeepServices(config.KeepServiceList)
@@ -269,7 +253,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 //
 // It encodes the resulting information in BlockStateMap.
 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
-       defer timeMe(bal.Logger, "GetCurrentState")()
+       defer bal.time("get_state", "wall clock time to get current state")()
        bal.BlockStateMap = NewBlockStateMap()
 
        dd, err := c.DiscoveryDocument()
@@ -413,7 +397,7 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
 func (bal *Balancer) ComputeChangeSets() {
        // This just calls balanceBlock() once for each block, using a
        // pool of worker goroutines.
-       defer timeMe(bal.Logger, "ComputeChangeSets")()
+       defer bal.time("changeset_compute", "wall clock time to compute changesets")()
        bal.setupLookupTables()
 
        type balanceTask struct {
@@ -893,6 +877,7 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                s.trashes += len(srv.ChangeSet.Trashes)
        }
        bal.stats = s
+       bal.Metrics.UpdateStats(s)
 }
 
 // PrintStatistics writes statistics about the computed changes to
@@ -986,6 +971,7 @@ func (bal *Balancer) CheckSanityLate() error {
 // existing blocks that are either underreplicated or poorly
 // distributed according to rendezvous hashing.
 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+       defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
        return bal.commitAsync(c, "send pull list",
                func(srv *KeepService) error {
                        return srv.CommitPulls(c)
@@ -996,6 +982,7 @@ func (bal *Balancer) CommitPulls(c *arvados.Client) error {
 // keepstore servers. This has the effect of deleting blocks that are
 // overreplicated or unreferenced.
 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+       defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
        return bal.commitAsync(c, "send trash list",
                func(srv *KeepService) error {
                        return srv.CommitTrash(c)
@@ -1009,7 +996,6 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
                        var err error
                        defer func() { errs <- err }()
                        label := fmt.Sprintf("%s: %v", srv, label)
-                       defer timeMe(bal.Logger, label)()
                        err = f(srv)
                        if err != nil {
                                err = fmt.Errorf("%s: %v", label, err)
@@ -1033,6 +1019,17 @@ func (bal *Balancer) logf(f string, args ...interface{}) {
        }
 }
 
+func (bal *Balancer) time(name, help string) func() {
+       observer := bal.Metrics.DurationObserver(name+"_seconds", help)
+       t0 := time.Now()
+       bal.Logger.Printf("%s: start", name)
+       return func() {
+               dur := time.Since(t0)
+               observer.Observe(dur.Seconds())
+               bal.Logger.Printf("%s: took %vs", name, dur.Seconds())
+       }
+}
+
 // Rendezvous hash sort function. Less efficient than sorting on
 // precomputed rendezvous hashes, but also rarely used.
 func rendezvousLess(i, j string, blkid arvados.SizedDigest) bool {
index 28776abc47c600ce8540949d8b6fdd7ed63708ff..26aee213dfa3ba488ff38e855333ad216c5225b2 100644 (file)
@@ -9,7 +9,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net/http"
        "net/http/httptest"
        "strings"
@@ -17,6 +16,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/Sirupsen/logrus"
 
        check "gopkg.in/check.v1"
 )
@@ -282,7 +282,7 @@ type runSuite struct {
 }
 
 // make a log.Logger that writes to the current test's c.Log().
-func (s *runSuite) logger(c *check.C) *log.Logger {
+func (s *runSuite) logger(c *check.C) *logrus.Logger {
        r, w := io.Pipe()
        go func() {
                buf := make([]byte, 10000)
@@ -299,7 +299,9 @@ func (s *runSuite) logger(c *check.C) *log.Logger {
                        }
                }
        }()
-       return log.New(w, "", log.LstdFlags)
+       logger := logrus.New()
+       logger.Out = w
+       return logger
 }
 
 func (s *runSuite) SetUpTest(c *check.C) {
@@ -308,7 +310,9 @@ func (s *runSuite) SetUpTest(c *check.C) {
                        AuthToken: "xyzzy",
                        APIHost:   "zzzzz.arvadosapi.com",
                        Client:    s.stub.Start()},
-               KeepServiceTypes: []string{"disk"}}
+               KeepServiceTypes: []string{"disk"},
+               RunPeriod:        arvados.Duration(time.Second),
+       }
        s.stub.serveDiscoveryDoc()
        s.stub.logf = c.Logf
 }
@@ -330,7 +334,9 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       _, err := (&Balancer{}).Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       _, err = srv.Run()
        c.Check(err, check.ErrorMatches, "received zero collections")
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -349,7 +355,9 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
        s.stub.serveKeepstoreMounts()
        indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
-       _, err := (&Balancer{}).Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       _, err = srv.Run()
        c.Check(err, check.IsNil)
        c.Check(indexReqs.Count(), check.Equals, 0)
        c.Check(trashReqs.Count(), check.Equals, 0)
@@ -367,7 +375,9 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        s.stub.serveKeepstoreMounts()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       _, err := (&Balancer{}).Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       _, err = srv.Run()
        c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -386,7 +396,9 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       _, err := (&Balancer{}).Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       _, err = srv.Run()
        c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -405,8 +417,9 @@ func (s *runSuite) TestDryRun(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       var bal Balancer
-       _, err := bal.Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       bal, err := srv.Run()
        c.Check(err, check.IsNil)
        for _, req := range collReqs.reqs {
                c.Check(req.Form.Get("include_trash"), check.Equals, "true")
@@ -419,6 +432,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
 }
 
 func (s *runSuite) TestCommit(c *check.C) {
+       s.config.Listen = ":"
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
@@ -432,8 +446,9 @@ func (s *runSuite) TestCommit(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       var bal Balancer
-       _, err := bal.Run(s.config, opts)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+       bal, err := srv.Run()
        c.Check(err, check.IsNil)
        c.Check(trashReqs.Count(), check.Equals, 8)
        c.Check(pullReqs.Count(), check.Equals, 4)
@@ -442,9 +457,15 @@ func (s *runSuite) TestCommit(c *check.C) {
        // "bar" block is underreplicated by 1, and its only copy is
        // in a poor rendezvous position
        c.Check(bal.stats.pulls, check.Equals, 2)
+
+       metrics := s.getMetrics(c, srv)
+       c.Check(metrics, check.Matches, `(?ms).*\nkeep_total_bytes 15\n.*`)
+       c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`)
+       c.Check(metrics, check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count 1\n.*`)
 }
 
 func (s *runSuite) TestRunForever(c *check.C) {
+       s.config.Listen = ":"
        opts := RunOptions{
                CommitPulls: true,
                CommitTrash: true,
@@ -461,7 +482,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
 
        stop := make(chan interface{})
        s.config.RunPeriod = arvados.Duration(time.Millisecond)
-       go RunForever(s.config, opts, stop)
+       srv, err := NewServer(s.config, opts)
+       c.Assert(err, check.IsNil)
+
+       done := make(chan bool)
+       go func() {
+               srv.RunForever(stop)
+               close(done)
+       }()
 
        // Each run should send 4 pull lists + 4 trash lists. The
        // first run should also send 4 empty trash lists at
@@ -471,6 +499,16 @@ func (s *runSuite) TestRunForever(c *check.C) {
                time.Sleep(time.Millisecond)
        }
        stop <- true
+       <-done
        c.Check(pullReqs.Count() >= 16, check.Equals, true)
        c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
+       c.Check(s.getMetrics(c, srv), check.Matches, `(?ms).*\nkeepbalance_changeset_compute_seconds_count `+fmt.Sprintf("%d", pullReqs.Count()/4)+`\n.*`)
+}
+
+func (s *runSuite) getMetrics(c *check.C, srv *Server) string {
+       resp, err := http.Get("http://" + srv.listening + "/metrics")
+       c.Assert(err, check.IsNil)
+       buf, err := ioutil.ReadAll(resp.Body)
+       c.Check(err, check.IsNil)
+       return string(buf)
 }
index 9fc47623e73f40157ad14d0056ae1b21f85e1e1e..5280b40c9123012210010585308dd883e3d7962a 100644 (file)
@@ -6,7 +6,6 @@ package main
 
 import (
        "bytes"
-       "log"
        "os"
        "strings"
        "testing"
@@ -16,6 +15,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "github.com/Sirupsen/logrus"
 
        check "gopkg.in/check.v1"
 )
@@ -67,6 +67,7 @@ func (s *integrationSuite) SetUpTest(c *check.C) {
                        Insecure:  true,
                },
                KeepServiceTypes: []string{"disk"},
+               RunPeriod:        arvados.Duration(time.Second),
        }
 }
 
@@ -74,12 +75,19 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
        var logBuf *bytes.Buffer
        for iter := 0; iter < 20; iter++ {
                logBuf := &bytes.Buffer{}
+               logger := logrus.New()
+               logger.Out = logBuf
                opts := RunOptions{
                        CommitPulls: true,
                        CommitTrash: true,
-                       Logger:      log.New(logBuf, "", log.LstdFlags),
+                       Logger:      logger,
                }
-               nextOpts, err := (&Balancer{}).Run(s.config, opts)
+
+               bal := &Balancer{
+                       Logger:  logger,
+                       Metrics: newMetrics(),
+               }
+               nextOpts, err := bal.Run(s.config, opts)
                c.Check(err, check.IsNil)
                c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
                c.Check(nextOpts.CommitPulls, check.Equals, true)
index 90235cbf3188d91bc274412ddd5522dc639fa812..eb741fa7e949803df99c67bab02936ad66f17457 100644 (file)
@@ -17,11 +17,16 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/config"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
+       "github.com/Sirupsen/logrus"
 )
 
 var version = "dev"
 
-const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+const (
+       defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
+       rfc3339NanoFixed  = "2006-01-02T15:04:05.000000000Z07:00"
+)
 
 // Config specifies site configuration, like API credentials and the
 // choice of which servers are to be balanced.
@@ -36,6 +41,9 @@ type Config struct {
 
        KeepServiceList arvados.KeepServiceList
 
+       // address, address:port, or :port for management interface
+       Listen string
+
        // How often to check
        RunPeriod arvados.Duration
 
@@ -62,8 +70,8 @@ type RunOptions struct {
        Once        bool
        CommitPulls bool
        CommitTrash bool
-       Logger      *log.Logger
-       Dumper      *log.Logger
+       Logger      *logrus.Logger
+       Dumper      *logrus.Logger
 
        // SafeRendezvousState from the most recent balance operation,
        // or "" if unknown. If this changes from one run to the next,
@@ -130,15 +138,17 @@ func main() {
                }
        }
        if *dumpFlag {
-               runOptions.Dumper = log.New(os.Stdout, "", log.LstdFlags)
+               runOptions.Dumper = logrus.New()
+               runOptions.Dumper.Out = os.Stdout
+               runOptions.Dumper.Formatter = &logrus.TextFormatter{}
        }
-       err := CheckConfig(cfg, runOptions)
+       srv, err := NewServer(cfg, runOptions)
        if err != nil {
                // (don't run)
        } else if runOptions.Once {
-               _, err = (&Balancer{}).Run(cfg, runOptions)
+               _, err = srv.Run()
        } else {
-               err = RunForever(cfg, runOptions, nil)
+               err = srv.RunForever(nil)
        }
        if err != nil {
                log.Fatal(err)
@@ -151,32 +161,96 @@ func mustReadConfig(dst interface{}, path string) {
        }
 }
 
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) error {
+type Server struct {
+       config     Config
+       runOptions RunOptions
+       metrics    *metrics
+       listening  string // for tests
+
+       Logger *logrus.Logger
+       Dumper *logrus.Logger
+}
+
+// NewServer returns a new Server that runs Balancers using the given
+// config and runOptions.
+func NewServer(config Config, runOptions RunOptions) (*Server, error) {
+       if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
+               return nil, fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
+       }
+       if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
+               return nil, fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
+       }
+
        if runOptions.Logger == nil {
-               runOptions.Logger = log.New(os.Stderr, "", log.LstdFlags)
+               log := logrus.New()
+               log.Formatter = &logrus.JSONFormatter{
+                       TimestampFormat: rfc3339NanoFixed,
+               }
+               log.Out = os.Stderr
+               runOptions.Logger = log
+       }
+
+       srv := &Server{
+               config:     config,
+               runOptions: runOptions,
+               metrics:    newMetrics(),
+               Logger:     runOptions.Logger,
+               Dumper:     runOptions.Dumper,
        }
-       logger := runOptions.Logger
+       return srv, srv.start()
+}
+
+func (srv *Server) start() error {
+       if srv.config.Listen == "" {
+               return nil
+       }
+       server := &httpserver.Server{
+               Server: http.Server{
+                       Handler: httpserver.LogRequests(srv.Logger, srv.metrics.Handler(srv.Logger)),
+               },
+               Addr: srv.config.Listen,
+       }
+       err := server.Start()
+       if err != nil {
+               return err
+       }
+       srv.Logger.Printf("listening at %s", server.Addr)
+       srv.listening = server.Addr
+       return nil
+}
+
+func (srv *Server) Run() (*Balancer, error) {
+       bal := &Balancer{
+               Logger:  srv.Logger,
+               Dumper:  srv.Dumper,
+               Metrics: srv.metrics,
+       }
+       var err error
+       srv.runOptions, err = bal.Run(srv.config, srv.runOptions)
+       return bal, err
+}
+
+// RunForever runs forever, or (for testing purposes) until the given
+// stop channel is ready to receive.
+func (srv *Server) RunForever(stop <-chan interface{}) error {
+       logger := srv.runOptions.Logger
 
-       ticker := time.NewTicker(time.Duration(config.RunPeriod))
+       ticker := time.NewTicker(time.Duration(srv.config.RunPeriod))
 
        // The unbuffered channel here means we only hear SIGUSR1 if
        // it arrives while we're waiting in select{}.
        sigUSR1 := make(chan os.Signal)
        signal.Notify(sigUSR1, syscall.SIGUSR1)
 
-       logger.Printf("starting up: will scan every %v and on SIGUSR1", config.RunPeriod)
+       logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.config.RunPeriod)
 
        for {
-               if !runOptions.CommitPulls && !runOptions.CommitTrash {
+               if !srv.runOptions.CommitPulls && !srv.runOptions.CommitTrash {
                        logger.Print("WARNING: Will scan periodically, but no changes will be committed.")
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               bal := &Balancer{}
-               var err error
-               runOptions, err = bal.Run(config, runOptions)
+               _, err := srv.Run()
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
@@ -195,7 +269,7 @@ func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) e
                        // run too soon after the Nth run is triggered
                        // by SIGUSR1.
                        ticker.Stop()
-                       ticker = time.NewTicker(time.Duration(config.RunPeriod))
+                       ticker = time.NewTicker(time.Duration(srv.config.RunPeriod))
                }
                logger.Print("starting next run")
        }
diff --git a/services/keep-balance/metrics.go b/services/keep-balance/metrics.go
new file mode 100644 (file)
index 0000000..96ee66c
--- /dev/null
@@ -0,0 +1,112 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "fmt"
+       "net/http"
+       "sync"
+
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type observer interface{ Observe(float64) }
+type setter interface{ Set(float64) }
+
+type metrics struct {
+       reg         *prometheus.Registry
+       statsGauges map[string]setter
+       observers   map[string]observer
+       setupOnce   sync.Once
+       mtx         sync.Mutex
+}
+
+func newMetrics() *metrics {
+       return &metrics{
+               reg:         prometheus.NewRegistry(),
+               statsGauges: map[string]setter{},
+               observers:   map[string]observer{},
+       }
+}
+
+func (m *metrics) DurationObserver(name, help string) observer {
+       m.mtx.Lock()
+       defer m.mtx.Unlock()
+       if obs, ok := m.observers[name]; ok {
+               return obs
+       }
+       summary := prometheus.NewSummary(prometheus.SummaryOpts{
+               Name:      name,
+               Subsystem: "keepbalance",
+               Help:      help,
+       })
+       m.reg.MustRegister(summary)
+       m.observers[name] = summary
+       return summary
+}
+
+// UpdateStats updates prometheus metrics using the given
+// balancerStats. It creates and registers the needed gauges on its
+// first invocation.
+func (m *metrics) UpdateStats(s balancerStats) {
+       type gauge struct {
+               Value interface{}
+               Help  string
+       }
+       s2g := map[string]gauge{
+               "total":           {s.current, "current backend storage usage"},
+               "garbage":         {s.garbage, "garbage (unreferenced, old)"},
+               "transient":       {s.unref, "transient (unreferenced, new)"},
+               "overreplicated":  {s.overrep, "overreplicated"},
+               "underreplicated": {s.underrep, "underreplicated"},
+               "lost":            {s.lost, "lost"},
+       }
+       m.setupOnce.Do(func() {
+               // Register gauge(s) for each balancerStats field.
+               addGauge := func(name, help string) {
+                       g := prometheus.NewGauge(prometheus.GaugeOpts{
+                               Name:      name,
+                               Subsystem: "keep",
+                               Help:      help,
+                       })
+                       m.reg.MustRegister(g)
+                       m.statsGauges[name] = g
+               }
+               for name, gauge := range s2g {
+                       switch gauge.Value.(type) {
+                       case blocksNBytes:
+                               for _, sub := range []string{"blocks", "bytes", "replicas"} {
+                                       addGauge(name+"_"+sub, sub+" of "+gauge.Help)
+                               }
+                       case int, int64:
+                               addGauge(name, gauge.Help)
+                       default:
+                               panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+                       }
+               }
+       })
+       // Set gauges to values from s.
+       for name, gauge := range s2g {
+               switch val := gauge.Value.(type) {
+               case blocksNBytes:
+                       m.statsGauges[name+"_blocks"].Set(float64(val.blocks))
+                       m.statsGauges[name+"_bytes"].Set(float64(val.bytes))
+                       m.statsGauges[name+"_replicas"].Set(float64(val.replicas))
+               case int:
+                       m.statsGauges[name].Set(float64(val))
+               case int64:
+                       m.statsGauges[name].Set(float64(val))
+               default:
+                       panic(fmt.Sprintf("bad gauge type %T", gauge.Value))
+               }
+       }
+}
+
+func (m *metrics) Handler(log promhttp.Logger) http.Handler {
+       return promhttp.HandlerFor(m.reg, promhttp.HandlerOpts{
+               ErrorLog: log,
+       })
+}
diff --git a/services/keep-balance/time_me.go b/services/keep-balance/time_me.go
deleted file mode 100644 (file)
index 06d727d..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "log"
-       "time"
-)
-
-func timeMe(logger *log.Logger, label string) func() {
-       t0 := time.Now()
-       logger.Printf("%s: start", label)
-       return func() {
-               logger.Printf("%s: took %v", label, time.Since(t0))
-       }
-}