9456: Do not clear trash lists between runs when the list of keep services has not...
authorTom Clegg <tom@curoverse.com>
Thu, 7 Jul 2016 15:43:17 +0000 (11:43 -0400)
committerTom Clegg <tom@curoverse.com>
Thu, 7 Jul 2016 15:43:20 +0000 (11:43 -0400)
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/integration_test.go
services/keep-balance/main.go

index d22074e000cb2ae6d3c99ec7afdda65980e5f80a..25b474b9ca0a475f81cc08db02ef007e9fe009f0 100644 (file)
@@ -6,6 +6,7 @@ import (
        "math"
        "os"
        "runtime"
+       "sort"
        "strings"
        "sync"
        "time"
@@ -50,11 +51,17 @@ type Balancer struct {
 }
 
 // Run performs a balance operation using the given config and
-// runOptions. It should only be called once on a given Balancer
-// object. Typical usage:
+// runOptions, and returns RunOptions suitable for passing to a
+// subsequent balance operation.
 //
-//   err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+// Run should only be called once on a given Balancer object.
+//
+// Typical usage:
+//
+//   runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+       nextRunOptions = runOptions
+
        bal.Dumper = runOptions.Dumper
        bal.Logger = runOptions.Logger
        if bal.Logger == nil {
@@ -75,10 +82,20 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
        if err = bal.CheckSanityEarly(&config.Client); err != nil {
                return
        }
-       if runOptions.CommitTrash {
+       rs := bal.rendezvousState()
+       if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+               if runOptions.SafeRendezvousState != "" {
+                       bal.logf("notice: KeepServices list has changed since last run")
+               }
+               bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
                if err = bal.ClearTrashLists(&config.Client); err != nil {
                        return
                }
+               // The current rendezvous state becomes "safe" (i.e.,
+               // OK to compute changes for that state without
+               // clearing existing trash lists) only now, after we
+               // succeed in clearing existing trash lists.
+               nextRunOptions.SafeRendezvousState = rs
        }
        if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
                return
@@ -158,6 +175,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
        return nil
 }
 
+// rendezvousState returns a fingerprint (e.g., a sorted list of
+// UUID+host+port) of the current set of keep services.
+func (bal *Balancer) rendezvousState() string {
+       srvs := make([]string, 0, len(bal.KeepServices))
+       for _, srv := range bal.KeepServices {
+               srvs = append(srvs, srv.String())
+       }
+       sort.Strings(srvs)
+       return strings.Join(srvs, "; ")
+}
+
 // ClearTrashLists sends an empty trash list to each keep
 // service. Calling this before GetCurrentState avoids races.
 //
index a138d911a3352a6edf261c6295110d1091b3d98a..edc88aa26286bac6f131a3c890b1c3fb36c75ec6 100644 (file)
@@ -236,7 +236,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       err := (&Balancer{}).Run(s.config, opts)
+       _, err := (&Balancer{}).Run(s.config, opts)
        c.Check(err, check.ErrorMatches, "received zero collections")
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -254,7 +254,7 @@ func (s *runSuite) TestServiceTypes(c *check.C) {
        s.stub.serveFourDiskKeepServices()
        indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
-       err := (&Balancer{}).Run(s.config, opts)
+       _, err := (&Balancer{}).Run(s.config, opts)
        c.Check(err, check.IsNil)
        c.Check(indexReqs.Count(), check.Equals, 0)
        c.Check(trashReqs.Count(), check.Equals, 0)
@@ -271,7 +271,7 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        s.stub.serveFourDiskKeepServices()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       err := (&Balancer{}).Run(s.config, opts)
+       _, err := (&Balancer{}).Run(s.config, opts)
        c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -289,7 +289,7 @@ func (s *runSuite) TestDetectSkippedCollections(c *check.C) {
        s.stub.serveKeepstoreIndexFoo4Bar1()
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
-       err := (&Balancer{}).Run(s.config, opts)
+       _, err := (&Balancer{}).Run(s.config, opts)
        c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -308,7 +308,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        var bal Balancer
-       err := bal.Run(s.config, opts)
+       _, err := bal.Run(s.config, opts)
        c.Check(err, check.IsNil)
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -332,7 +332,7 @@ func (s *runSuite) TestCommit(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        var bal Balancer
-       err := bal.Run(s.config, opts)
+       _, err := bal.Run(s.config, opts)
        c.Check(err, check.IsNil)
        c.Check(trashReqs.Count(), check.Equals, 8)
        c.Check(pullReqs.Count(), check.Equals, 4)
@@ -362,13 +362,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
        s.config.RunPeriod = arvados.Duration(time.Millisecond)
        go RunForever(s.config, opts, stop)
 
-       // Each run should send 4 clear trash lists + 4 pull lists + 4
-       // trash lists. We should complete four runs in much less than
+       // Each run should send 4 pull lists + 4 trash lists. The
+       // first run should also send 4 empty trash lists at
+       // startup. We should complete all four runs in much less than
        // a second.
        for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
                time.Sleep(time.Millisecond)
        }
        stop <- true
        c.Check(pullReqs.Count() >= 16, check.Equals, true)
-       c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
+       c.Check(trashReqs.Count(), check.Equals, pullReqs.Count() + 4)
 }
index b090614607ceed2a6e2bb7e66354644843186a34..0793889259eedb8c2abde3c4d22e9929281c678b 100644 (file)
@@ -78,8 +78,10 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                        CommitTrash: true,
                        Logger:      log.New(logBuf, "", log.LstdFlags),
                }
-               err := (&Balancer{}).Run(s.config, opts)
+               nextOpts, err := (&Balancer{}).Run(s.config, opts)
                c.Check(err, check.IsNil)
+               c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
+               c.Check(nextOpts.CommitPulls, check.Equals, true)
                if iter == 0 {
                        c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
                        c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
index 364bb3ffd3f6b437ec1d46edf52223539fb1161b..da4fb62a6ac8f0772e550c5eb2a103de7904ad7e 100644 (file)
@@ -51,6 +51,12 @@ type RunOptions struct {
        CommitTrash bool
        Logger      *log.Logger
        Dumper      *log.Logger
+
+       // SafeRendezvousState from the most recent balance operation,
+       // or "" if unknown. If this changes from one run to the next,
+       // we need to watch out for races. See
+       // (*Balancer)ClearTrashLists.
+       SafeRendezvousState string
 }
 
 var debugf = func(string, ...interface{}) {}
@@ -98,7 +104,7 @@ func main() {
        if err != nil {
                // (don't run)
        } else if runOptions.Once {
-               err = (&Balancer{}).Run(config, runOptions)
+               _, err = (&Balancer{}).Run(config, runOptions)
        } else {
                err = RunForever(config, runOptions, nil)
        }
@@ -138,7 +144,9 @@ func RunForever(config Config, runOptions RunOptions, stop <-chan interface{}) e
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               err := (&Balancer{}).Run(config, runOptions)
+               bal := &Balancer{}
+               var err error
+               runOptions, err = bal.Run(config, runOptions)
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {