X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6617a2ba4323d2f47566c89961763625fce2e1ca..4600343d1bff7ac4f7b9f08486541444c31af8b6:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 2d1a59e890..9389f19ed8 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -6,6 +6,7 @@ import ( "math" "os" "runtime" + "sort" "strings" "sync" "time" @@ -50,11 +51,17 @@ type Balancer struct { } // Run performs a balance operation using the given config and -// runOptions. It should only be called once on a given Balancer -// object. Typical usage: +// runOptions, and returns RunOptions suitable for passing to a +// subsequent balance operation. // -// err = (&Balancer{}).Run(config, runOptions) -func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { +// Run should only be called once on a given Balancer object. +// +// Typical usage: +// +// runOptions, err = (&Balancer{}).Run(config, runOptions) +func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) { + nextRunOptions = runOptions + bal.Dumper = runOptions.Dumper bal.Logger = runOptions.Logger if bal.Logger == nil { @@ -75,10 +82,20 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { if err = bal.CheckSanityEarly(&config.Client); err != nil { return } - if runOptions.CommitTrash { + rs := bal.rendezvousState() + if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { + if runOptions.SafeRendezvousState != "" { + bal.logf("notice: KeepServices list has changed since last run") + } + bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") if err = bal.ClearTrashLists(&config.Client); err != nil { return } + // The current rendezvous state becomes "safe" (i.e., + // OK to compute changes for that state without + // clearing existing trash lists) only now, after we + // succeed in clearing existing trash lists. + nextRunOptions.SafeRendezvousState = rs } if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil { return @@ -158,6 +175,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { return nil } +// rendezvousState returns a fingerprint (e.g., a sorted list of +// UUID+host+port) of the current set of keep services. +func (bal *Balancer) rendezvousState() string { + srvs := make([]string, 0, len(bal.KeepServices)) + for _, srv := range bal.KeepServices { + srvs = append(srvs, srv.String()) + } + sort.Strings(srvs) + return strings.Join(srvs, "; ") +} + // ClearTrashLists sends an empty trash list to each keep // service. Calling this before GetCurrentState avoids races. // @@ -199,7 +227,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro return err } bal.DefaultReplication = dd.DefaultCollectionReplication - bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL + bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9 errs := make(chan error, 2+len(bal.KeepServices)) wg := sync.WaitGroup{} @@ -216,6 +244,12 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro errs <- fmt.Errorf("%s: %v", srv, err) return } + if len(errs) > 0 { + // Some other goroutine encountered an + // error -- any further effort here + // will be wasted. + return + } bal.logf("%s: add %d replicas to map", srv, len(idx)) bal.BlockStateMap.AddReplicas(srv, idx) bal.logf("%s: done", srv) @@ -270,14 +304,11 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro } }() - go func() { - // Send a nil error when all goroutines finish. If - // this is the first error sent to errs, then - // everything worked. - wg.Wait() - errs <- nil - }() - return <-errs + wg.Wait() + if len(errs) > 0 { + return <-errs + } + return nil } func (bal *Balancer) addCollection(coll arvados.Collection) error { @@ -619,7 +650,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke }(srv) } var lastErr error - for _ = range bal.KeepServices { + for range bal.KeepServices { if err := <-errs; err != nil { bal.logf("%v", err) lastErr = err