X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/139200027a3192260b5ea7c2d0c93a8eb5f8eb7e..4600343d1bff7ac4f7b9f08486541444c31af8b6:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 2a2480cc31..9389f19ed8 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -6,6 +6,7 @@ import ( "math" "os" "runtime" + "sort" "strings" "sync" "time" @@ -50,11 +51,17 @@ type Balancer struct { } // Run performs a balance operation using the given config and -// runOptions. It should only be called once on a given Balancer -// object. Typical usage: +// runOptions, and returns RunOptions suitable for passing to a +// subsequent balance operation. // -// err = (&Balancer{}).Run(config, runOptions) -func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { +// Run should only be called once on a given Balancer object. +// +// Typical usage: +// +// runOptions, err = (&Balancer{}).Run(config, runOptions) +func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) { + nextRunOptions = runOptions + bal.Dumper = runOptions.Dumper bal.Logger = runOptions.Logger if bal.Logger == nil { @@ -75,12 +82,22 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { if err = bal.CheckSanityEarly(&config.Client); err != nil { return } - if runOptions.CommitTrash { + rs := bal.rendezvousState() + if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { + if runOptions.SafeRendezvousState != "" { + bal.logf("notice: KeepServices list has changed since last run") + } + bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") if err = bal.ClearTrashLists(&config.Client); err != nil { return } + // The current rendezvous state becomes "safe" (i.e., + // OK to compute changes for that state without + // clearing existing trash lists) only now, after we + // succeed in clearing existing trash lists. + nextRunOptions.SafeRendezvousState = rs } - if err = bal.GetCurrentState(&config.Client); err != nil { + if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil { return } bal.ComputeChangeSets() @@ -158,6 +175,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { return nil } +// rendezvousState returns a fingerprint (e.g., a sorted list of +// UUID+host+port) of the current set of keep services. +func (bal *Balancer) rendezvousState() string { + srvs := make([]string, 0, len(bal.KeepServices)) + for _, srv := range bal.KeepServices { + srvs = append(srvs, srv.String()) + } + sort.Strings(srvs) + return strings.Join(srvs, "; ") +} + // ClearTrashLists sends an empty trash list to each keep // service. Calling this before GetCurrentState avoids races. // @@ -190,7 +218,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // collection manifests in the database (API server). // // It encodes the resulting information in BlockStateMap. -func (bal *Balancer) GetCurrentState(c *arvados.Client) error { +func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { defer timeMe(bal.Logger, "GetCurrentState")() bal.BlockStateMap = NewBlockStateMap() @@ -199,7 +227,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { return err } bal.DefaultReplication = dd.DefaultCollectionReplication - bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL + bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9 errs := make(chan error, 2+len(bal.KeepServices)) wg := sync.WaitGroup{} @@ -216,6 +244,12 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { errs <- fmt.Errorf("%s: %v", srv, err) return } + if len(errs) > 0 { + // Some other goroutine encountered an + // error -- any further effort here + // will be wasted. + return + } bal.logf("%s: add %d replicas to map", srv, len(idx)) bal.BlockStateMap.AddReplicas(srv, idx) bal.logf("%s: done", srv) @@ -224,10 +258,8 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { // collQ buffers incoming collections so we can start fetching // the next page without waiting for the current page to - // finish processing. (1000 happens to match the page size - // used by (*arvados.Client)EachCollection(), but it's OK if - // they don't match.) - collQ := make(chan arvados.Collection, 1000) + // finish processing. + collQ := make(chan arvados.Collection, bufs) // Start a goroutine to process collections. (We could use a // worker pool here, but even with a single worker we already @@ -252,7 +284,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { wg.Add(1) go func() { defer wg.Done() - err = EachCollection(c, + err = EachCollection(c, pageSize, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -272,14 +304,11 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { } }() - go func() { - // Send a nil error when all goroutines finish. If - // this is the first error sent to errs, then - // everything worked. - wg.Wait() - errs <- nil - }() - return <-errs + wg.Wait() + if len(errs) > 0 { + return <-errs + } + return nil } func (bal *Balancer) addCollection(coll arvados.Collection) error { @@ -621,7 +650,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke }(srv) } var lastErr error - for _ = range bal.KeepServices { + for range bal.KeepServices { if err := <-errs; err != nil { bal.logf("%v", err) lastErr = err