X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/a23722793aa13e0e8dd37aa91e16111dba452ba0..282d7a0c9d8279fa0a1293573e4313c8223be1b5:/services/keep-balance/balance.go diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 333a4fbde9..08a6c5881c 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -8,17 +8,21 @@ import ( "bytes" "crypto/md5" "fmt" + "io" + "io/ioutil" "log" "math" + "os" "runtime" "sort" "strings" "sync" + "syscall" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/keepclient" - "github.com/Sirupsen/logrus" + "github.com/sirupsen/logrus" ) // Balancer compares the contents of keepstore servers with the @@ -31,10 +35,12 @@ import ( // BlobSignatureTTL; and all N existing replicas of a given data block // are in the N best positions in rendezvous probe order. type Balancer struct { - Logger *logrus.Logger - Dumper *logrus.Logger + Logger logrus.FieldLogger + Dumper logrus.FieldLogger Metrics *metrics + LostBlocksFile string + *BlockStateMap KeepServices map[string]*KeepService DefaultReplication int @@ -48,6 +54,7 @@ type Balancer struct { errors []error stats balancerStats mutex sync.Mutex + lostBlocks io.Writer } // Run performs a balance operation using the given config and @@ -64,6 +71,30 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R defer bal.time("sweep", "wall clock time to run one full sweep")() + var lbFile *os.File + if bal.LostBlocksFile != "" { + tmpfn := bal.LostBlocksFile + ".tmp" + lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777) + if err != nil { + return + } + defer lbFile.Close() + err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB) + if err != nil { + return + } + defer func() { + // Remove the tempfile only if we didn't get + // as far as successfully renaming it. + if lbFile != nil { + os.Remove(tmpfn) + } + }() + bal.lostBlocks = lbFile + } else { + bal.lostBlocks = ioutil.Discard + } + if len(config.KeepServiceList.Items) > 0 { err = bal.SetKeepServices(config.KeepServiceList) } else { @@ -107,6 +138,17 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R if err = bal.CheckSanityLate(); err != nil { return } + if lbFile != nil { + err = lbFile.Sync() + if err != nil { + return + } + err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile) + if err != nil { + return + } + lbFile = nil + } if runOptions.CommitPulls { err = bal.CommitPulls(&config.Client) if err != nil { @@ -206,6 +248,24 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error { return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv) } } + + var checkPage arvados.CollectionList + if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{ + Limit: new(int), + Count: "exact", + IncludeTrash: true, + IncludeOldVersions: true, + Filters: []arvados.Filter{{ + Attr: "modified_at", + Operator: "=", + Operand: nil, + }}, + }); err != nil { + return err + } else if n := checkPage.ItemsAvailable; n > 0 { + return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n) + } + return nil } @@ -263,7 +323,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro bal.DefaultReplication = dd.DefaultCollectionReplication bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9 - errs := make(chan error, 2+len(bal.KeepServices)) + errs := make(chan error, 1) wg := sync.WaitGroup{} // When a device is mounted more than once, we will get its @@ -298,7 +358,10 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService) idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "") if err != nil { - errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err) + select { + case errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err): + default: + } return } if len(errs) > 0 { @@ -308,9 +371,9 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro return } for _, mount := range mounts { - bal.logf("%s: add %d replicas to map", mount, len(idx)) + bal.logf("%s: add %d entries to map", mount, len(idx)) bal.BlockStateMap.AddReplicas(mount, idx) - bal.logf("%s: added %d replicas", mount, len(idx)) + bal.logf("%s: added %d entries to map at %dx (%d replicas)", mount, len(idx), mount.Replication, len(idx)*mount.Replication) } bal.logf("mount %s: index done", mounts[0]) }(mounts) @@ -329,8 +392,11 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro defer wg.Done() for coll := range collQ { err := bal.addCollection(coll) - if err != nil { - errs <- err + if err != nil || len(errs) > 0 { + select { + case errs <- err: + default: + } for range collQ { } return @@ -360,7 +426,10 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro }) close(collQ) if err != nil { - errs <- err + select { + case errs <- err: + default: + } } }() @@ -374,17 +443,20 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro func (bal *Balancer) addCollection(coll arvados.Collection) error { blkids, err := coll.SizedDigests() if err != nil { - bal.mutex.Lock() - bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err)) - bal.mutex.Unlock() - return nil + return fmt.Errorf("%v: %v", coll.UUID, err) } repl := bal.DefaultReplication if coll.ReplicationDesired != nil { repl = *coll.ReplicationDesired } debugf("%v: %d block x%d", coll.UUID, len(blkids), repl) - bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids) + // Pass pdh to IncreaseDesired only if LostBlocksFile is being + // written -- otherwise it's just a waste of memory. + pdh := "" + if bal.LostBlocksFile != "" { + pdh = coll.PortableDataHash + } + bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids) return nil } @@ -435,7 +507,7 @@ func (bal *Balancer) ComputeChangeSets() { func (bal *Balancer) setupLookupTables() { bal.serviceRoots = make(map[string]string) - bal.classes = []string{"default"} + bal.classes = defaultClasses bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}} bal.mounts = 0 for _, srv := range bal.KeepServices { @@ -520,7 +592,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba slots = append(slots, slot{ mnt: mnt, repl: repl, - want: repl != nil && (mnt.ReadOnly || repl.Mtime >= bal.MinMtime), + want: repl != nil && mnt.ReadOnly, }) } } @@ -568,14 +640,14 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba // Prefer a mount that satisfies the // desired class. return bal.mountsByClass[class][si.mnt] - } else if wanti, wantj := si.want, si.want; wanti != wantj { + } else if si.want != sj.want { // Prefer a mount that will have a // replica no matter what we do here // -- either because it already has an // untrashable replica, or because we // already need it to satisfy a // different storage class. - return slots[i].want + return si.want } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj { // Prefer a better rendezvous // position. @@ -716,24 +788,24 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba // TODO: request a Touch if Mtime is duplicated. var change int switch { - case !underreplicated && slot.repl != nil && !slot.want && !unsafeToDelete[slot.repl.Mtime]: + case !underreplicated && !slot.want && slot.repl != nil && slot.repl.Mtime < bal.MinMtime && !unsafeToDelete[slot.repl.Mtime]: slot.mnt.KeepService.AddTrash(Trash{ SizedDigest: blkid, Mtime: slot.repl.Mtime, From: slot.mnt, }) change = changeTrash - case len(blk.Replicas) == 0: - change = changeNone - case slot.repl == nil && slot.want && !slot.mnt.ReadOnly: + case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly: slot.mnt.KeepService.AddPull(Pull{ SizedDigest: blkid, From: blk.Replicas[0].KeepMount.KeepService, To: slot.mnt, }) change = changePull - default: + case slot.repl != nil: change = changeStay + default: + change = changeNone } if bal.Dumper != nil { var mtime int64 @@ -745,7 +817,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba } } if bal.Dumper != nil { - bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " ")) + bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes) } return balanceResult{ blk: blk, @@ -780,6 +852,26 @@ type balancerStats struct { trashes int replHistogram []int classStats map[string]replicationStats + + // collectionBytes / collectionBlockBytes = deduplication ratio + collectionBytes int64 // sum(bytes in referenced blocks) across all collections + collectionBlockBytes int64 // sum(block size) across all blocks referenced by collections + collectionBlockRefs int64 // sum(number of blocks referenced) across all collections + collectionBlocks int64 // number of blocks referenced by any collection +} + +func (s *balancerStats) dedupByteRatio() float64 { + if s.collectionBlockBytes == 0 { + return 0 + } + return float64(s.collectionBytes) / float64(s.collectionBlockBytes) +} + +func (s *balancerStats) dedupBlockRatio() float64 { + if s.collectionBlocks == 0 { + return 0 + } + return float64(s.collectionBlockRefs) / float64(s.collectionBlocks) } type replicationStats struct { @@ -803,6 +895,13 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) { surplus := result.have - result.want bytes := result.blkid.Size() + if rc := int64(result.blk.RefCount); rc > 0 { + s.collectionBytes += rc * bytes + s.collectionBlockBytes += bytes + s.collectionBlockRefs += rc + s.collectionBlocks++ + } + for class, state := range result.classState { cs := s.classStats[class] if state.unachievable { @@ -831,6 +930,11 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) { s.lost.replicas -= surplus s.lost.blocks++ s.lost.bytes += bytes * int64(-surplus) + fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0]) + for pdh := range result.blk.Refs { + fmt.Fprintf(bal.lostBlocks, " %s", pdh) + } + fmt.Fprint(bal.lostBlocks, "\n") case surplus < 0: s.underrep.replicas -= surplus s.underrep.blocks++