+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"math"
"os"
"runtime"
+ "sort"
"strings"
"sync"
"time"
}
// Run performs a balance operation using the given config and
-// runOptions. It should only be called once on a given Balancer
-// object. Typical usage:
+// runOptions, and returns RunOptions suitable for passing to a
+// subsequent balance operation.
+//
+// Run should only be called once on a given Balancer object.
//
-// err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+// Typical usage:
+//
+// runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+ nextRunOptions = runOptions
+
bal.Dumper = runOptions.Dumper
bal.Logger = runOptions.Logger
if bal.Logger == nil {
if err != nil {
return
}
+ for _, srv := range bal.KeepServices {
+ err = srv.discoverMounts(&config.Client)
+ if err != nil {
+ return
+ }
+ }
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
}
- if runOptions.CommitTrash {
+ rs := bal.rendezvousState()
+ if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+ if runOptions.SafeRendezvousState != "" {
+ bal.logf("notice: KeepServices list has changed since last run")
+ }
+ bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
if err = bal.ClearTrashLists(&config.Client); err != nil {
return
}
+ // The current rendezvous state becomes "safe" (i.e.,
+ // OK to compute changes for that state without
+ // clearing existing trash lists) only now, after we
+ // succeed in clearing existing trash lists.
+ nextRunOptions.SafeRendezvousState = rs
}
- if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize); err != nil {
+ if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
return
}
bal.ComputeChangeSets()
return nil
}
+// rendezvousState returns a fingerprint (e.g., a sorted list of
+// UUID+host+port) of the current set of keep services.
+func (bal *Balancer) rendezvousState() string {
+ srvs := make([]string, 0, len(bal.KeepServices))
+ for _, srv := range bal.KeepServices {
+ srvs = append(srvs, srv.String())
+ }
+ sort.Strings(srvs)
+ return strings.Join(srvs, "; ")
+}
+
// ClearTrashLists sends an empty trash list to each keep
// service. Calling this before GetCurrentState avoids races.
//
// collection manifests in the database (API server).
//
// It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize int) error {
+func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
defer timeMe(bal.Logger, "GetCurrentState")()
bal.BlockStateMap = NewBlockStateMap()
return err
}
bal.DefaultReplication = dd.DefaultCollectionReplication
- bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+ bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
errs := make(chan error, 2+len(bal.KeepServices))
wg := sync.WaitGroup{}
wg.Add(1)
go func(srv *KeepService) {
defer wg.Done()
- bal.logf("%s: retrieve index", srv)
- idx, err := srv.Index(c, "")
- if err != nil {
- errs <- fmt.Errorf("%s: %v", srv, err)
- return
+ bal.logf("%s: retrieve indexes", srv)
+ for _, mount := range srv.mounts {
+ bal.logf("%s: retrieve index", mount)
+ idx, err := srv.IndexMount(c, mount.UUID, "")
+ if err != nil {
+ errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
+ return
+ }
+ if len(errs) > 0 {
+ // Some other goroutine encountered an
+ // error -- any further effort here
+ // will be wasted.
+ return
+ }
+ bal.logf("%s: add %d replicas to map", mount, len(idx))
+ bal.BlockStateMap.AddReplicas(mount, idx)
+ bal.logf("%s: done", mount)
}
- bal.logf("%s: add %d replicas to map", srv, len(idx))
- bal.BlockStateMap.AddReplicas(srv, idx)
bal.logf("%s: done", srv)
}(srv)
}
// collQ buffers incoming collections so we can start fetching
// the next page without waiting for the current page to
// finish processing.
- collQ := make(chan arvados.Collection, 1000)
+ collQ := make(chan arvados.Collection, bufs)
// Start a goroutine to process collections. (We could use a
// worker pool here, but even with a single worker we already
}
}()
- go func() {
- // Send a nil error when all goroutines finish. If
- // this is the first error sent to errs, then
- // everything worked.
- wg.Wait()
- errs <- nil
- }()
- return <-errs
+ wg.Wait()
+ if len(errs) > 0 {
+ return <-errs
+ }
+ return nil
}
func (bal *Balancer) addCollection(coll arvados.Collection) error {
// block, and makes the appropriate ChangeSet calls.
func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
debugf("balanceBlock: %v %+v", blkid, blk)
+
+ // A slot is somewhere a replica could potentially be trashed
+ // from, pulled from, or pulled to. Each KeepService gets
+ // either one empty slot, or one or more non-empty slots.
+ type slot struct {
+ srv *KeepService // never nil
+ repl *Replica // nil if none found
+ }
+
+ // First, we build an ordered list of all slots worth
+ // considering (including all slots where replicas have been
+ // found, as well as all of the optimal slots for this block).
+ // Then, when we consider each slot in that order, we will
+ // have all of the information we need to make a decision
+ // about that slot.
+
uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
- hasRepl := make(map[string]Replica, len(bal.serviceRoots))
- for _, repl := range blk.Replicas {
- hasRepl[repl.UUID] = repl
- // TODO: when multiple copies are on one server, use
- // the oldest one that doesn't have a timestamp
- // collision with other replicas.
+ rendezvousOrder := make(map[*KeepService]int, len(uuids))
+ slots := make([]slot, len(uuids))
+ for i, uuid := range uuids {
+ srv := bal.KeepServices[uuid]
+ rendezvousOrder[srv] = i
+ slots[i].srv = srv
+ }
+ for ri := range blk.Replicas {
+ // TODO: when multiple copies are on one server,
+ // prefer one on a readonly mount, or the oldest one
+ // that doesn't have a timestamp collision with other
+ // replicas.
+ repl := &blk.Replicas[ri]
+ srv := repl.KeepService
+ slotIdx := rendezvousOrder[srv]
+ if slots[slotIdx].repl != nil {
+ // Additional replicas on a single server are
+ // considered non-optimal. Within this
+ // category, we don't try to optimize layout:
+ // we just say the optimal order is the order
+ // we encounter them.
+ slotIdx = len(slots)
+ slots = append(slots, slot{srv: srv})
+ }
+ slots[slotIdx].repl = repl
}
+
// number of replicas already found in positions better than
// the position we're contemplating now.
reportedBestRepl := 0
// requested on rendezvous positions M<N will be successful.)
pulls := 0
var changes []string
- for _, uuid := range uuids {
+ for _, slot := range slots {
change := changeNone
- srv := bal.KeepServices[uuid]
+ srv, repl := slot.srv, slot.repl
// TODO: request a Touch if Mtime is duplicated.
- repl, ok := hasRepl[srv.UUID]
- if ok {
+ if repl != nil {
// This service has a replica. We should
// delete it if [1] we already have enough
// distinct replicas in better rendezvous
// distinct from all of the better replicas'
// Mtimes.
if !srv.ReadOnly &&
+ !repl.KeepMount.ReadOnly &&
repl.Mtime < bal.MinMtime &&
len(uniqueBestRepl) >= blk.Desired &&
!uniqueBestRepl[repl.Mtime] {
change = changePull
}
if bal.Dumper != nil {
- changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
+ var mtime int64
+ if repl != nil {
+ mtime = repl.Mtime
+ }
+ changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime))
}
}
if bal.Dumper != nil {
}(srv)
}
var lastErr error
- for _ = range bal.KeepServices {
+ for range bal.KeepServices {
if err := <-errs; err != nil {
bal.logf("%v", err)
lastErr = err