X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ecda32d08a4f6d80a2f02ae305fdb43e141672ce..8e96f8cacd728b1a1b4316ecb2fb7cd233a97144:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 25b474b9ca..9338075d01 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -78,6 +82,12 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R if err != nil { return } + for _, srv := range bal.KeepServices { + err = srv.discoverMounts(&config.Client) + if err != nil { + return + } + } if err = bal.CheckSanityEarly(&config.Client); err != nil { return @@ -238,14 +248,24 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro wg.Add(1) go func(srv *KeepService) { defer wg.Done() - bal.logf("%s: retrieve index", srv) - idx, err := srv.Index(c, "") - if err != nil { - errs <- fmt.Errorf("%s: %v", srv, err) - return + bal.logf("%s: retrieve indexes", srv) + for _, mount := range srv.mounts { + bal.logf("%s: retrieve index", mount) + idx, err := srv.IndexMount(c, mount.UUID, "") + if err != nil { + errs <- fmt.Errorf("%s: retrieve index: %v", mount, err) + return + } + if len(errs) > 0 { + // Some other goroutine encountered an + // error -- any further effort here + // will be wasted. + return + } + bal.logf("%s: add %d replicas to map", mount, len(idx)) + bal.BlockStateMap.AddReplicas(mount, idx) + bal.logf("%s: done", mount) } - bal.logf("%s: add %d replicas to map", srv, len(idx)) - bal.BlockStateMap.AddReplicas(srv, idx) bal.logf("%s: done", srv) }(srv) } @@ -298,14 +318,11 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro } }() - go func() { - // Send a nil error when all goroutines finish. If - // this is the first error sent to errs, then - // everything worked. - wg.Wait() - errs <- nil - }() - return <-errs + wg.Wait() + if len(errs) > 0 { + return <-errs + } + return nil } func (bal *Balancer) addCollection(coll arvados.Collection) error { @@ -388,14 +405,50 @@ var changeName = map[int]string{ // block, and makes the appropriate ChangeSet calls. func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { debugf("balanceBlock: %v %+v", blkid, blk) + + // A slot is somewhere a replica could potentially be trashed + // from, pulled from, or pulled to. Each KeepService gets + // either one empty slot, or one or more non-empty slots. + type slot struct { + srv *KeepService // never nil + repl *Replica // nil if none found + } + + // First, we build an ordered list of all slots worth + // considering (including all slots where replicas have been + // found, as well as all of the optimal slots for this block). + // Then, when we consider each slot in that order, we will + // have all of the information we need to make a decision + // about that slot. + uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots() - hasRepl := make(map[string]Replica, len(bal.serviceRoots)) - for _, repl := range blk.Replicas { - hasRepl[repl.UUID] = repl - // TODO: when multiple copies are on one server, use - // the oldest one that doesn't have a timestamp - // collision with other replicas. + rendezvousOrder := make(map[*KeepService]int, len(uuids)) + slots := make([]slot, len(uuids)) + for i, uuid := range uuids { + srv := bal.KeepServices[uuid] + rendezvousOrder[srv] = i + slots[i].srv = srv + } + for ri := range blk.Replicas { + // TODO: when multiple copies are on one server, + // prefer one on a readonly mount, or the oldest one + // that doesn't have a timestamp collision with other + // replicas. + repl := &blk.Replicas[ri] + srv := repl.KeepService + slotIdx := rendezvousOrder[srv] + if slots[slotIdx].repl != nil { + // Additional replicas on a single server are + // considered non-optimal. Within this + // category, we don't try to optimize layout: + // we just say the optimal order is the order + // we encounter them. + slotIdx = len(slots) + slots = append(slots, slot{srv: srv}) + } + slots[slotIdx].repl = repl } + // number of replicas already found in positions better than // the position we're contemplating now. reportedBestRepl := 0 @@ -411,12 +464,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { // requested on rendezvous positions M= blk.Desired && !uniqueBestRepl[repl.Mtime] { @@ -452,7 +505,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { change = changePull } if bal.Dumper != nil { - changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime)) + var mtime int64 + if repl != nil { + mtime = repl.Mtime + } + changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime)) } } if bal.Dumper != nil {