X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bdb92619b5f6d920119b8c32c3027cf4b751ed16..8e96f8cacd728b1a1b4316ecb2fb7cd233a97144:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 9389f19ed8..9338075d01 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -78,6 +82,12 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R if err != nil { return } + for _, srv := range bal.KeepServices { + err = srv.discoverMounts(&config.Client) + if err != nil { + return + } + } if err = bal.CheckSanityEarly(&config.Client); err != nil { return @@ -238,20 +248,24 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro wg.Add(1) go func(srv *KeepService) { defer wg.Done() - bal.logf("%s: retrieve index", srv) - idx, err := srv.Index(c, "") - if err != nil { - errs <- fmt.Errorf("%s: %v", srv, err) - return - } - if len(errs) > 0 { - // Some other goroutine encountered an - // error -- any further effort here - // will be wasted. - return + bal.logf("%s: retrieve indexes", srv) + for _, mount := range srv.mounts { + bal.logf("%s: retrieve index", mount) + idx, err := srv.IndexMount(c, mount.UUID, "") + if err != nil { + errs <- fmt.Errorf("%s: retrieve index: %v", mount, err) + return + } + if len(errs) > 0 { + // Some other goroutine encountered an + // error -- any further effort here + // will be wasted. + return + } + bal.logf("%s: add %d replicas to map", mount, len(idx)) + bal.BlockStateMap.AddReplicas(mount, idx) + bal.logf("%s: done", mount) } - bal.logf("%s: add %d replicas to map", srv, len(idx)) - bal.BlockStateMap.AddReplicas(srv, idx) bal.logf("%s: done", srv) }(srv) } @@ -391,14 +405,50 @@ var changeName = map[int]string{ // block, and makes the appropriate ChangeSet calls. func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { debugf("balanceBlock: %v %+v", blkid, blk) + + // A slot is somewhere a replica could potentially be trashed + // from, pulled from, or pulled to. Each KeepService gets + // either one empty slot, or one or more non-empty slots. + type slot struct { + srv *KeepService // never nil + repl *Replica // nil if none found + } + + // First, we build an ordered list of all slots worth + // considering (including all slots where replicas have been + // found, as well as all of the optimal slots for this block). + // Then, when we consider each slot in that order, we will + // have all of the information we need to make a decision + // about that slot. + uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots() - hasRepl := make(map[string]Replica, len(bal.serviceRoots)) - for _, repl := range blk.Replicas { - hasRepl[repl.UUID] = repl - // TODO: when multiple copies are on one server, use - // the oldest one that doesn't have a timestamp - // collision with other replicas. + rendezvousOrder := make(map[*KeepService]int, len(uuids)) + slots := make([]slot, len(uuids)) + for i, uuid := range uuids { + srv := bal.KeepServices[uuid] + rendezvousOrder[srv] = i + slots[i].srv = srv + } + for ri := range blk.Replicas { + // TODO: when multiple copies are on one server, + // prefer one on a readonly mount, or the oldest one + // that doesn't have a timestamp collision with other + // replicas. + repl := &blk.Replicas[ri] + srv := repl.KeepService + slotIdx := rendezvousOrder[srv] + if slots[slotIdx].repl != nil { + // Additional replicas on a single server are + // considered non-optimal. Within this + // category, we don't try to optimize layout: + // we just say the optimal order is the order + // we encounter them. + slotIdx = len(slots) + slots = append(slots, slot{srv: srv}) + } + slots[slotIdx].repl = repl } + // number of replicas already found in positions better than // the position we're contemplating now. reportedBestRepl := 0 @@ -414,12 +464,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { // requested on rendezvous positions M= blk.Desired && !uniqueBestRepl[repl.Mtime] { @@ -455,7 +505,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) { change = changePull } if bal.Dumper != nil { - changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime)) + var mtime int64 + if repl != nil { + mtime = repl.Mtime + } + changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime)) } } if bal.Dumper != nil {