7931: Don't bother trying to trash replicas on readonly mounts.
[arvados.git] / services / keep-balance / balance.go
index 2a2480cc31b6a3eb9dedd6452e42e793c79e1c08..9338075d01f466f332dcf63b98b65ec353530d7f 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
@@ -6,6 +10,7 @@ import (
        "math"
        "os"
        "runtime"
+       "sort"
        "strings"
        "sync"
        "time"
@@ -50,11 +55,17 @@ type Balancer struct {
 }
 
 // Run performs a balance operation using the given config and
-// runOptions. It should only be called once on a given Balancer
-// object. Typical usage:
+// runOptions, and returns RunOptions suitable for passing to a
+// subsequent balance operation.
+//
+// Run should only be called once on a given Balancer object.
 //
-//   err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+// Typical usage:
+//
+//   runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+       nextRunOptions = runOptions
+
        bal.Dumper = runOptions.Dumper
        bal.Logger = runOptions.Logger
        if bal.Logger == nil {
@@ -71,16 +82,32 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
        if err != nil {
                return
        }
+       for _, srv := range bal.KeepServices {
+               err = srv.discoverMounts(&config.Client)
+               if err != nil {
+                       return
+               }
+       }
 
        if err = bal.CheckSanityEarly(&config.Client); err != nil {
                return
        }
-       if runOptions.CommitTrash {
+       rs := bal.rendezvousState()
+       if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+               if runOptions.SafeRendezvousState != "" {
+                       bal.logf("notice: KeepServices list has changed since last run")
+               }
+               bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
                if err = bal.ClearTrashLists(&config.Client); err != nil {
                        return
                }
+               // The current rendezvous state becomes "safe" (i.e.,
+               // OK to compute changes for that state without
+               // clearing existing trash lists) only now, after we
+               // succeed in clearing existing trash lists.
+               nextRunOptions.SafeRendezvousState = rs
        }
-       if err = bal.GetCurrentState(&config.Client); err != nil {
+       if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -158,6 +185,17 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
        return nil
 }
 
+// rendezvousState returns a fingerprint (e.g., a sorted list of
+// UUID+host+port) of the current set of keep services.
+func (bal *Balancer) rendezvousState() string {
+       srvs := make([]string, 0, len(bal.KeepServices))
+       for _, srv := range bal.KeepServices {
+               srvs = append(srvs, srv.String())
+       }
+       sort.Strings(srvs)
+       return strings.Join(srvs, "; ")
+}
+
 // ClearTrashLists sends an empty trash list to each keep
 // service. Calling this before GetCurrentState avoids races.
 //
@@ -190,7 +228,7 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 // collection manifests in the database (API server).
 //
 // It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
+func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
        defer timeMe(bal.Logger, "GetCurrentState")()
        bal.BlockStateMap = NewBlockStateMap()
 
@@ -199,7 +237,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
                return err
        }
        bal.DefaultReplication = dd.DefaultCollectionReplication
-       bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+       bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
 
        errs := make(chan error, 2+len(bal.KeepServices))
        wg := sync.WaitGroup{}
@@ -210,24 +248,32 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
                wg.Add(1)
                go func(srv *KeepService) {
                        defer wg.Done()
-                       bal.logf("%s: retrieve index", srv)
-                       idx, err := srv.Index(c, "")
-                       if err != nil {
-                               errs <- fmt.Errorf("%s: %v", srv, err)
-                               return
+                       bal.logf("%s: retrieve indexes", srv)
+                       for _, mount := range srv.mounts {
+                               bal.logf("%s: retrieve index", mount)
+                               idx, err := srv.IndexMount(c, mount.UUID, "")
+                               if err != nil {
+                                       errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
+                                       return
+                               }
+                               if len(errs) > 0 {
+                                       // Some other goroutine encountered an
+                                       // error -- any further effort here
+                                       // will be wasted.
+                                       return
+                               }
+                               bal.logf("%s: add %d replicas to map", mount, len(idx))
+                               bal.BlockStateMap.AddReplicas(mount, idx)
+                               bal.logf("%s: done", mount)
                        }
-                       bal.logf("%s: add %d replicas to map", srv, len(idx))
-                       bal.BlockStateMap.AddReplicas(srv, idx)
                        bal.logf("%s: done", srv)
                }(srv)
        }
 
        // collQ buffers incoming collections so we can start fetching
        // the next page without waiting for the current page to
-       // finish processing. (1000 happens to match the page size
-       // used by (*arvados.Client)EachCollection(), but it's OK if
-       // they don't match.)
-       collQ := make(chan arvados.Collection, 1000)
+       // finish processing.
+       collQ := make(chan arvados.Collection, bufs)
 
        // Start a goroutine to process collections. (We could use a
        // worker pool here, but even with a single worker we already
@@ -252,7 +298,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
        wg.Add(1)
        go func() {
                defer wg.Done()
-               err = EachCollection(c,
+               err = EachCollection(c, pageSize,
                        func(coll arvados.Collection) error {
                                collQ <- coll
                                if len(errs) > 0 {
@@ -272,14 +318,11 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
                }
        }()
 
-       go func() {
-               // Send a nil error when all goroutines finish. If
-               // this is the first error sent to errs, then
-               // everything worked.
-               wg.Wait()
-               errs <- nil
-       }()
-       return <-errs
+       wg.Wait()
+       if len(errs) > 0 {
+               return <-errs
+       }
+       return nil
 }
 
 func (bal *Balancer) addCollection(coll arvados.Collection) error {
@@ -362,14 +405,50 @@ var changeName = map[int]string{
 // block, and makes the appropriate ChangeSet calls.
 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
        debugf("balanceBlock: %v %+v", blkid, blk)
+
+       // A slot is somewhere a replica could potentially be trashed
+       // from, pulled from, or pulled to. Each KeepService gets
+       // either one empty slot, or one or more non-empty slots.
+       type slot struct {
+               srv  *KeepService // never nil
+               repl *Replica     // nil if none found
+       }
+
+       // First, we build an ordered list of all slots worth
+       // considering (including all slots where replicas have been
+       // found, as well as all of the optimal slots for this block).
+       // Then, when we consider each slot in that order, we will
+       // have all of the information we need to make a decision
+       // about that slot.
+
        uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
-       hasRepl := make(map[string]Replica, len(bal.serviceRoots))
-       for _, repl := range blk.Replicas {
-               hasRepl[repl.UUID] = repl
-               // TODO: when multiple copies are on one server, use
-               // the oldest one that doesn't have a timestamp
-               // collision with other replicas.
+       rendezvousOrder := make(map[*KeepService]int, len(uuids))
+       slots := make([]slot, len(uuids))
+       for i, uuid := range uuids {
+               srv := bal.KeepServices[uuid]
+               rendezvousOrder[srv] = i
+               slots[i].srv = srv
+       }
+       for ri := range blk.Replicas {
+               // TODO: when multiple copies are on one server,
+               // prefer one on a readonly mount, or the oldest one
+               // that doesn't have a timestamp collision with other
+               // replicas.
+               repl := &blk.Replicas[ri]
+               srv := repl.KeepService
+               slotIdx := rendezvousOrder[srv]
+               if slots[slotIdx].repl != nil {
+                       // Additional replicas on a single server are
+                       // considered non-optimal. Within this
+                       // category, we don't try to optimize layout:
+                       // we just say the optimal order is the order
+                       // we encounter them.
+                       slotIdx = len(slots)
+                       slots = append(slots, slot{srv: srv})
+               }
+               slots[slotIdx].repl = repl
        }
+
        // number of replicas already found in positions better than
        // the position we're contemplating now.
        reportedBestRepl := 0
@@ -385,12 +464,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
        // requested on rendezvous positions M<N will be successful.)
        pulls := 0
        var changes []string
-       for _, uuid := range uuids {
+       for _, slot := range slots {
                change := changeNone
-               srv := bal.KeepServices[uuid]
+               srv, repl := slot.srv, slot.repl
                // TODO: request a Touch if Mtime is duplicated.
-               repl, ok := hasRepl[srv.UUID]
-               if ok {
+               if repl != nil {
                        // This service has a replica. We should
                        // delete it if [1] we already have enough
                        // distinct replicas in better rendezvous
@@ -398,6 +476,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
                        // distinct from all of the better replicas'
                        // Mtimes.
                        if !srv.ReadOnly &&
+                               !repl.KeepMount.ReadOnly &&
                                repl.Mtime < bal.MinMtime &&
                                len(uniqueBestRepl) >= blk.Desired &&
                                !uniqueBestRepl[repl.Mtime] {
@@ -426,7 +505,11 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
                        change = changePull
                }
                if bal.Dumper != nil {
-                       changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
+                       var mtime int64
+                       if repl != nil {
+                               mtime = repl.Mtime
+                       }
+                       changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], mtime))
                }
        }
        if bal.Dumper != nil {
@@ -621,7 +704,7 @@ func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *Ke
                }(srv)
        }
        var lastErr error
-       for _ = range bal.KeepServices {
+       for range bal.KeepServices {
                if err := <-errs; err != nil {
                        bal.logf("%v", err)
                        lastErr = err