From c2ed4aab77cb1fa25487be495b6971107e69aab4 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Wed, 24 Apr 2019 16:42:06 -0400 Subject: [PATCH] Merge branch '15112-save-lost-blocks-file' refs #15112 refs #15148 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- .../install-keep-balance.html.textile.liquid | 2 +- services/keep-balance/balance.go | 43 ++++++++++++++ services/keep-balance/balance_run_test.go | 58 +++++++++++++++++++ services/keep-balance/server.go | 11 +++- 4 files changed, 110 insertions(+), 4 deletions(-) diff --git a/doc/install/install-keep-balance.html.textile.liquid b/doc/install/install-keep-balance.html.textile.liquid index 68bf07a4ae..4a35f448e2 100644 --- a/doc/install/install-keep-balance.html.textile.liquid +++ b/doc/install/install-keep-balance.html.textile.liquid @@ -81,11 +81,11 @@ Client: AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz KeepServiceTypes: - disk -Listen: :9005 ManagementToken: xyzzy RunPeriod: 10m CollectionBatchSize: 100000 CollectionBuffers: 1000 +LostBlocksFile: /tmp/keep-balance-lost-blocks.txt # If given, this file will be updated atomically during each successful run. diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index ab8aadca0f..adf9b9b3cc 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -8,12 +8,16 @@ import ( "bytes" "crypto/md5" "fmt" + "io" + "io/ioutil" "log" "math" + "os" "runtime" "sort" "strings" "sync" + "syscall" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -35,6 +39,8 @@ type Balancer struct { Dumper *logrus.Logger Metrics *metrics + LostBlocksFile string + *BlockStateMap KeepServices map[string]*KeepService DefaultReplication int @@ -48,6 +54,7 @@ type Balancer struct { errors []error stats balancerStats mutex sync.Mutex + lostBlocks io.Writer } // Run performs a balance operation using the given config and @@ -64,6 +71,30 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R defer bal.time("sweep", "wall clock time to run one full sweep")() + var lbFile *os.File + if bal.LostBlocksFile != "" { + tmpfn := bal.LostBlocksFile + ".tmp" + lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return + } + defer lbFile.Close() + err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + return + } + defer func() { + // Remove the tempfile only if we didn't get + // as far as successfully renaming it. + if lbFile != nil { + os.Remove(tmpfn) + } + }() + bal.lostBlocks = lbFile + } else { + bal.lostBlocks = ioutil.Discard + } + if len(config.KeepServiceList.Items) > 0 { err = bal.SetKeepServices(config.KeepServiceList) } else { @@ -107,6 +138,17 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R if err = bal.CheckSanityLate(); err != nil { return } + if lbFile != nil { + err = lbFile.Sync() + if err != nil { + return + } + err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile) + if err != nil { + return + } + lbFile = nil + } if runOptions.CommitPulls { err = bal.CommitPulls(&config.Client) if err != nil { @@ -885,6 +927,7 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) { s.lost.replicas -= surplus s.lost.blocks++ s.lost.bytes += bytes * int64(-surplus) + fmt.Fprintf(bal.lostBlocks, "%s\n", strings.SplitN(string(result.blkid), "+", 2)[0]) case surplus < 0: s.underrep.replicas -= surplus s.underrep.blocks++ diff --git a/services/keep-balance/balance_run_test.go b/services/keep-balance/balance_run_test.go index 545aeca860..de368239b6 100644 --- a/services/keep-balance/balance_run_test.go +++ b/services/keep-balance/balance_run_test.go @@ -11,6 +11,7 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "os" "strings" "sync" "time" @@ -270,6 +271,28 @@ func (s *stubServer) serveKeepstoreIndexFoo4Bar1() *reqTracker { return rt } +func (s *stubServer) serveKeepstoreIndexFoo1() *reqTracker { + rt := &reqTracker{} + s.mux.HandleFunc("/index/", func(w http.ResponseWriter, r *http.Request) { + rt.Add(r) + io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n") + }) + for _, mounts := range stubMounts { + for i, mnt := range mounts { + i := i + s.mux.HandleFunc(fmt.Sprintf("/mounts/%s/blocks", mnt.UUID), func(w http.ResponseWriter, r *http.Request) { + rt.Add(r) + if i == 0 { + io.WriteString(w, "acbd18db4cc2f85cedef654fccc4a4d8+3 12345678\n\n") + } else { + io.WriteString(w, "\n") + } + }) + } + } + return rt +} + func (s *stubServer) serveKeepstoreTrash() *reqTracker { return s.serveStatic("/trash", `{}`) } @@ -406,6 +429,32 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) { c.Check(pullReqs.Count(), check.Equals, 0) } +func (s *runSuite) TestWriteLostBlocks(c *check.C) { + lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-") + c.Assert(err, check.IsNil) + s.config.LostBlocksFile = lostf.Name() + defer os.Remove(lostf.Name()) + opts := RunOptions{ + CommitPulls: true, + CommitTrash: true, + Logger: s.logger(c), + } + s.stub.serveCurrentUserAdmin() + s.stub.serveFooBarFileCollections() + s.stub.serveKeepServices(stubServices) + s.stub.serveKeepstoreMounts() + s.stub.serveKeepstoreIndexFoo1() + s.stub.serveKeepstoreTrash() + s.stub.serveKeepstorePull() + srv, err := NewServer(s.config, opts) + c.Assert(err, check.IsNil) + _, err = srv.Run() + c.Check(err, check.IsNil) + lost, err := ioutil.ReadFile(lostf.Name()) + c.Assert(err, check.IsNil) + c.Check(string(lost), check.Equals, "37b51d194a7513e45b56f6524f2d51f2\n") +} + func (s *runSuite) TestDryRun(c *check.C) { opts := RunOptions{ CommitPulls: false, @@ -435,6 +484,11 @@ func (s *runSuite) TestDryRun(c *check.C) { } func (s *runSuite) TestCommit(c *check.C) { + lostf, err := ioutil.TempFile("", "keep-balance-lost-blocks-test-") + c.Assert(err, check.IsNil) + s.config.LostBlocksFile = lostf.Name() + defer os.Remove(lostf.Name()) + s.config.Listen = ":" s.config.ManagementToken = "xyzzy" opts := RunOptions{ @@ -462,6 +516,10 @@ func (s *runSuite) TestCommit(c *check.C) { // in a poor rendezvous position c.Check(bal.stats.pulls, check.Equals, 2) + lost, err := ioutil.ReadFile(lostf.Name()) + c.Assert(err, check.IsNil) + c.Check(string(lost), check.Equals, "") + metrics := s.getMetrics(c, srv) c.Check(metrics, check.Matches, `(?ms).*\narvados_keep_total_bytes 15\n.*`) c.Check(metrics, check.Matches, `(?ms).*\narvados_keepbalance_changeset_compute_seconds_sum [0-9\.]+\n.*`) diff --git a/services/keep-balance/server.go b/services/keep-balance/server.go index ad13be7511..4a5f4acb3e 100644 --- a/services/keep-balance/server.go +++ b/services/keep-balance/server.go @@ -57,6 +57,10 @@ type Config struct { // Timeout for outgoing http request/response cycle. RequestTimeout arvados.Duration + + // Destination filename for the list of lost block hashes, one + // per line. Updated atomically during each successful run. + LostBlocksFile string } // RunOptions controls runtime behavior. The flags/options that belong @@ -142,9 +146,10 @@ func (srv *Server) start() error { func (srv *Server) Run() (*Balancer, error) { bal := &Balancer{ - Logger: srv.Logger, - Dumper: srv.Dumper, - Metrics: srv.metrics, + Logger: srv.Logger, + Dumper: srv.Dumper, + Metrics: srv.metrics, + LostBlocksFile: srv.config.LostBlocksFile, } var err error srv.runOptions, err = bal.Run(srv.config, srv.runOptions) -- 2.30.2