X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/27f948d88bbdd5be848011871d2b592d7057ece1..8e96f8cacd728b1a1b4316ecb2fb7cd233a97144:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 2a2480cc31..9338075d01 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -6,6 +10,7 @@ import ( "math" "os" "runtime" + "sort" "strings" "sync" "time" @@ -50,11 +55,17 @@ type Balancer struct { } // Run performs a balance operation using the given config and -// runOptions. It should only be called once on a given Balancer -// object. Typical usage: +// runOptions, and returns RunOptions suitable for passing to a +// subsequent balance operation. +// +// Run should only be called once on a given Balancer object. // -// err = (&Balancer{}).Run(config, runOptions) -func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { +// Typical usage: +// +// runOptions, err = (&Balancer{}).Run(config, runOptions) +func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) { + nextRunOptions = runOptions + bal.Dumper = runOptions.Dumper bal.Logger = runOptions.Logger if bal.Logger == nil { @@ -71,16 +82,32 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) { if err != nil { return } + for _, srv := range bal.KeepServices { + err = srv.discoverMounts(&config.Client) + if err != nil { + return + } + } if err = bal.CheckSanityEarly(&config.Client); err != nil { return } - if runOptions.CommitTrash { + rs := bal.rendezvousState() + if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { + if runOptions.SafeRendezvousState != "" { + bal.logf("notice: KeepServices list has changed since last run") + } + bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") if err = bal.ClearTrashLists(&config.Client); err != nil { return } + // The current rendezvous state becomes "safe" (i.e., + // OK to compute changes for that state without + // clearing existing trash lists) only now, after we + // succeed in clearing existing trash lists. + nextRunOptions.SafeRendezvousState = rs } - if err = bal.GetCurrentState(&config.Client); err != nil { + if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil { return } bal.ComputeChangeSets() @@ -158,6 +185,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { return nil } +// rendezvousState returns a fingerprint (e.g., a sorted list of +// UUID+host+port) of the current set of keep services. +func (bal *Balancer) rendezvousState() string { + srvs := make([]string, 0, len(bal.KeepServices)) + for _, srv := range bal.KeepServices { + srvs = append(srvs, srv.String()) + } + sort.Strings(srvs) + return strings.Join(srvs, "; ") +} + // ClearTrashLists sends an empty trash list to each keep // service. Calling this before GetCurrentState avoids races. // @@ -190,7 +228,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // collection manifests in the database (API server). // // It encodes the resulting information in BlockStateMap. -func (bal *Balancer) GetCurrentState(c *arvados.Client) error { +func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { defer timeMe(bal.Logger, "GetCurrentState")() bal.BlockStateMap = NewBlockStateMap() @@ -199,7 +237,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { return err } bal.DefaultReplication = dd.DefaultCollectionReplication - bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL + bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9 errs := make(chan error, 2+len(bal.KeepServices)) wg := sync.WaitGroup{} @@ -210,24 +248,32 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { wg.Add(1) go func(srv *KeepService) { defer wg.Done() - bal.logf("%s: retrieve index", srv) - idx, err := srv.Index(c, "") - if err != nil { - errs <- fmt.Errorf("%s: %v", srv, err) - return + bal.logf("%s: retrieve indexes", srv) + for _, mount := range srv.mounts { + bal.logf("%s: retrieve index", mount) + idx, err := srv.IndexMount(c, mount.UUID, "") + if err != nil { + errs <- fmt.Errorf("%s: retrieve index: %v", mount, err) + return + } + if len(errs) > 0 { + // Some other goroutine encountered an + // error -- any further effort here + // will be wasted. + return + } + bal.logf("%s: add %d replicas to map", mount, len(idx)) + bal.BlockStateMap.AddReplicas(mount, idx) + bal.logf("%s: done", mount) } - bal.logf("%s: add %d replicas to map", srv, len(idx)) - bal.BlockStateMap.AddReplicas(srv, idx) bal.logf("%s: done", srv) }(srv) } // collQ buffers incoming collections so we can start fetching // the next page without waiting for the current page to - // finish processing. (1000 happens to match the page size - // used by (*arvados.Client)EachCollection(), but it's OK if - // they don't match.) - collQ := make(chan arvados.Collection, 1000) + // finish processing. + collQ := make(chan arvados.Collection, bufs) // Start a goroutine to process collections. (We could use a // worker pool here, but even with a single worker we already @@ -252,7 +298,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { wg.Add(1) go func() { defer wg.Done() - err = EachCollection(c, + err = EachCollection(c, pageSize, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -272,14 +318,11 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error { } }() - go func() { - // Send a nil error when all goroutines finish. If - // this is the first error sent to errs, then - // everything worked. - wg.Wait() - errs <- nil - }() - return <-errs + wg.Wait() + if len(errs) > 0 { + return <-errs + } + return nil } func (bal *Balancer) addCollection(coll arvados.Collection) error { @@ -362,14 +405,50 @@ var changeName = map[int]string{ // block, and makes the appropriate ChangeSet calls. func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { debugf("balanceBlock: %v %+v", blkid, blk) + + // A slot is somewhere a replica could potentially be trashed + // from, pulled from, or pulled to. Each KeepService gets + // either one empty slot, or one or more non-empty slots. + type slot struct { + srv *KeepService // never nil + repl *Replica // nil if none found + } + + // First, we build an ordered list of all slots worth + // considering (including all slots where replicas have been + // found, as well as all of the optimal slots for this block). + // Then, when we consider each slot in that order, we will + // have all of the information we need to make a decision + // about that slot. + uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots() - hasRepl := make(map[string]Replica, len(bal.serviceRoots)) - for _, repl := range blk.Replicas { - hasRepl[repl.UUID] = repl - // TODO: when multiple copies are on one server, use - // the oldest one that doesn't have a timestamp - // collision with other replicas. + rendezvousOrder := make(map[*KeepService]int, len(uuids)) + slots := make([]slot, len(uuids)) + for i, uuid := range uuids { + srv := bal.KeepServices[uuid] + rendezvousOrder[srv] = i + slots[i].srv = srv + } + for ri := range blk.Replicas { + // TODO: when multiple copies are on one server, + // prefer one on a readonly mount, or the oldest one + // that doesn't have a timestamp collision with other + // replicas. + repl := &blk.Replicas[ri] + srv := repl.KeepService + slotIdx := rendezvousOrder[srv] + if slots[slotIdx].repl != nil { + // Additional replicas on a single server are + // considered non-optimal. Within this + // category, we don't try to optimize layout: + // we just say the optimal order is the order + // we encounter them. + slotIdx = len(slots) + slots = append(slots, slot{srv: srv}) + } + slots[slotIdx].repl = repl } + // number of replicas already found in positions better than // the position we're contemplating now. reportedBestRepl := 0 @@ -385,12 +464,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { // requested on rendezvous positions M= blk.Desired && !uniqueBestRepl[repl.Mtime] { @@ -426,7 +505,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { change = changePull } if bal.Dumper != nil { - changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime)) + var mtime int64 + if repl != nil { + mtime = repl.Mtime + } + changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime)) } } if bal.Dumper != nil { @@ -621,7 +704,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke }(srv) } var lastErr error - for _ = range bal.KeepServices { + for range bal.KeepServices { if err := <-errs; err != nil { bal.logf("%v", err) lastErr = err