15003: Load config over defaults.
[arvados.git] / services / keep-balance / balance.go
index 8b4924f0aaba517f3920bd474b20decff0483ff0..fd39ee693e2e64b6857e860b9bc63a8328ddf560 100644 (file)
@@ -18,7 +18,7 @@ import (
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "github.com/Sirupsen/logrus"
+       "github.com/sirupsen/logrus"
 )
 
 // Balancer compares the contents of keepstore servers with the
@@ -31,8 +31,8 @@ import (
 // BlobSignatureTTL; and all N existing replicas of a given data block
 // are in the N best positions in rendezvous probe order.
 type Balancer struct {
-       Logger  *logrus.Logger
-       Dumper  *logrus.Logger
+       Logger  logrus.FieldLogger
+       Dumper  logrus.FieldLogger
        Metrics *metrics
 
        *BlockStateMap
@@ -206,6 +206,24 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
                        return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
                }
        }
+
+       var checkPage arvados.CollectionList
+       if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
+               Limit:              new(int),
+               Count:              "exact",
+               IncludeTrash:       true,
+               IncludeOldVersions: true,
+               Filters: []arvados.Filter{{
+                       Attr:     "modified_at",
+                       Operator: "=",
+                       Operand:  nil,
+               }},
+       }); err != nil {
+               return err
+       } else if n := checkPage.ItemsAvailable; n > 0 {
+               return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
+       }
+
        return nil
 }
 
@@ -311,9 +329,9 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                                return
                        }
                        for _, mount := range mounts {
-                               bal.logf("%s: add %d replicas to map", mount, len(idx))
+                               bal.logf("%s: add %d entries to map", mount, len(idx))
                                bal.BlockStateMap.AddReplicas(mount, idx)
-                               bal.logf("%s: added %d replicas", mount, len(idx))
+                               bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication)
                        }
                        bal.logf("mount %s: index done", mounts[0])
                }(mounts)
@@ -332,7 +350,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                defer wg.Done()
                for coll := range collQ {
                        err := bal.addCollection(coll)
-                       if err != nil {
+                       if err != nil || len(errs) > 0 {
                                select {
                                case errs <- err:
                                default:
@@ -383,10 +401,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 func (bal *Balancer) addCollection(coll arvados.Collection) error {
        blkids, err := coll.SizedDigests()
        if err != nil {
-               bal.mutex.Lock()
-               bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
-               bal.mutex.Unlock()
-               return nil
+               return fmt.Errorf("%v: %v", coll.UUID, err)
        }
        repl := bal.DefaultReplication
        if coll.ReplicationDesired != nil {
@@ -444,7 +459,7 @@ func (bal *Balancer) ComputeChangeSets() {
 
 func (bal *Balancer) setupLookupTables() {
        bal.serviceRoots = make(map[string]string)
-       bal.classes = []string{"default"}
+       bal.classes = defaultClasses
        bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
        bal.mounts = 0
        for _, srv := range bal.KeepServices {
@@ -529,7 +544,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                        slots = append(slots, slot{
                                mnt:  mnt,
                                repl: repl,
-                               want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime),
+                               want: repl != nil && mnt.ReadOnly,
                        })
                }
        }
@@ -577,14 +592,14 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                                // Prefer a mount that satisfies the
                                // desired class.
                                return bal.mountsByClass[class][si.mnt]
-                       } else if wanti, wantj := si.want, si.want; wanti != wantj {
+                       } else if si.want != sj.want {
                                // Prefer a mount that will have a
                                // replica no matter what we do here
                                // -- either because it already has an
                                // untrashable replica, or because we
                                // already need it to satisfy a
                                // different storage class.
-                               return slots[i].want
+                               return si.want
                        } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
                                // Prefer a better rendezvous
                                // position.
@@ -725,24 +740,24 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                // TODO: request a Touch if Mtime is duplicated.
                var change int
                switch {
-               case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]:
+               case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]:
                        slot.mnt.KeepService.AddTrash(Trash{
                                SizedDigest: blkid,
                                Mtime:       slot.repl.Mtime,
                                From:        slot.mnt,
                        })
                        change = changeTrash
-               case len(blk.Replicas) == 0:
-                       change = changeNone
-               case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
+               case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
                        slot.mnt.KeepService.AddPull(Pull{
                                SizedDigest: blkid,
                                From:        blk.Replicas[0].KeepMount.KeepService,
                                To:          slot.mnt,
                        })
                        change = changePull
-               default:
+               case slot.repl != nil:
                        change = changeStay
+               default:
+                       change = changeNone
                }
                if bal.Dumper != nil {
                        var mtime int64
@@ -754,7 +769,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                }
        }
        if bal.Dumper != nil {
-               bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
+               bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
        }
        return balanceResult{
                blk:        blk,