Merge branch '14714-keep-balance-config'
[arvados.git] / services / keep-balance / balance.go
index fd39ee693e2e64b6857e860b9bc63a8328ddf560..e50b0b505aee471f30918772f66f18ec0838e31d 100644 (file)
@@ -8,12 +8,16 @@ import (
        "bytes"
        "crypto/md5"
        "fmt"
+       "io"
+       "io/ioutil"
        "log"
        "math"
+       "os"
        "runtime"
        "sort"
        "strings"
        "sync"
+       "syscall"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -35,6 +39,8 @@ type Balancer struct {
        Dumper  logrus.FieldLogger
        Metrics *metrics
 
+       LostBlocksFile string
+
        *BlockStateMap
        KeepServices       map[string]*KeepService
        DefaultReplication int
@@ -48,6 +54,7 @@ type Balancer struct {
        errors        []error
        stats         balancerStats
        mutex         sync.Mutex
+       lostBlocks    io.Writer
 }
 
 // Run performs a balance operation using the given config and
@@ -59,29 +66,49 @@ type Balancer struct {
 // Typical usage:
 //
 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
        defer bal.time("sweep", "wall clock time to run one full sweep")()
 
-       if len(config.KeepServiceList.Items) > 0 {
-               err = bal.SetKeepServices(config.KeepServiceList)
+       var lbFile *os.File
+       if bal.LostBlocksFile != "" {
+               tmpfn := bal.LostBlocksFile + ".tmp"
+               lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
+               if err != nil {
+                       return
+               }
+               defer lbFile.Close()
+               err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+               if err != nil {
+                       return
+               }
+               defer func() {
+                       // Remove the tempfile only if we didn't get
+                       // as far as successfully renaming it.
+                       if lbFile != nil {
+                               os.Remove(tmpfn)
+                       }
+               }()
+               bal.lostBlocks = lbFile
        } else {
-               err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
+               bal.lostBlocks = ioutil.Discard
        }
+
+       err = bal.DiscoverKeepServices(client)
        if err != nil {
                return
        }
 
        for _, srv := range bal.KeepServices {
-               err = srv.discoverMounts(&config.Client)
+               err = srv.discoverMounts(client)
                if err != nil {
                        return
                }
        }
        bal.cleanupMounts()
 
-       if err = bal.CheckSanityEarly(&config.Client); err != nil {
+       if err = bal.CheckSanityEarly(client); err != nil {
                return
        }
        rs := bal.rendezvousState()
@@ -90,7 +117,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                        bal.logf("notice: KeepServices list has changed since last run")
                }
                bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
-               if err = bal.ClearTrashLists(&config.Client); err != nil {
+               if err = bal.ClearTrashLists(client); err != nil {
                        return
                }
                // The current rendezvous state becomes "safe" (i.e.,
@@ -99,7 +126,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                // succeed in clearing existing trash lists.
                nextRunOptions.SafeRendezvousState = rs
        }
-       if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
+       if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -107,15 +134,26 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
        if err = bal.CheckSanityLate(); err != nil {
                return
        }
+       if lbFile != nil {
+               err = lbFile.Sync()
+               if err != nil {
+                       return
+               }
+               err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
+               if err != nil {
+                       return
+               }
+               lbFile = nil
+       }
        if runOptions.CommitPulls {
-               err = bal.CommitPulls(&config.Client)
+               err = bal.CommitPulls(client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
        if runOptions.CommitTrash {
-               err = bal.CommitTrash(&config.Client)
+               err = bal.CommitTrash(client)
        }
        return
 }
@@ -134,15 +172,11 @@ func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
 
 // DiscoverKeepServices sets the list of KeepServices by calling the
 // API to get a list of all services, and selecting the ones whose
-// ServiceType is in okTypes.
-func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+// ServiceType is "disk"
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
        bal.KeepServices = make(map[string]*KeepService)
-       ok := make(map[string]bool)
-       for _, t := range okTypes {
-               ok[t] = true
-       }
        return c.EachKeepService(func(srv arvados.KeepService) error {
-               if ok[srv.ServiceType] {
+               if srv.ServiceType == "disk" {
                        bal.KeepServices[srv.UUID] = &KeepService{
                                KeepService: srv,
                                ChangeSet:   &ChangeSet{},
@@ -407,8 +441,14 @@ func (bal *Balancer) addCollection(coll arvados.Collection) error {
        if coll.ReplicationDesired != nil {
                repl = *coll.ReplicationDesired
        }
-       debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
-       bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
+       bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+       // Pass pdh to IncreaseDesired only if LostBlocksFile is being
+       // written -- otherwise it's just a waste of memory.
+       pdh := ""
+       if bal.LostBlocksFile != "" {
+               pdh = coll.PortableDataHash
+       }
+       bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
        return nil
 }
 
@@ -475,7 +515,7 @@ func (bal *Balancer) setupLookupTables() {
                                bal.mountsByClass["default"][mnt] = true
                                continue
                        }
-                       for _, class := range mnt.StorageClasses {
+                       for class := range mnt.StorageClasses {
                                if mbc := bal.mountsByClass[class]; mbc == nil {
                                        bal.classes = append(bal.classes, class)
                                        bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
@@ -518,7 +558,7 @@ type balanceResult struct {
 // balanceBlock compares current state to desired state for a single
 // block, and makes the appropriate ChangeSet calls.
 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
-       debugf("balanceBlock: %v %+v", blkid, blk)
+       bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
 
        type slot struct {
                mnt  *KeepMount // never nil
@@ -882,6 +922,11 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                        s.lost.replicas -= surplus
                        s.lost.blocks++
                        s.lost.bytes += bytes * int64(-surplus)
+                       fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
+                       for pdh := range result.blk.Refs {
+                               fmt.Fprintf(bal.lostBlocks, " %s", pdh)
+                       }
+                       fmt.Fprint(bal.lostBlocks, "\n")
                case surplus < 0:
                        s.underrep.replicas -= surplus
                        s.underrep.blocks++