Merge branch '14714-keep-balance-config'
[arvados.git] / services / keep-balance / balance.go
index e1b207805b58a81837eab51c80f8ce5e5e8df186..e50b0b505aee471f30918772f66f18ec0838e31d 100644 (file)
@@ -8,17 +8,21 @@ import (
        "bytes"
        "crypto/md5"
        "fmt"
+       "io"
+       "io/ioutil"
        "log"
        "math"
+       "os"
        "runtime"
        "sort"
        "strings"
        "sync"
+       "syscall"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "github.com/Sirupsen/logrus"
+       "github.com/sirupsen/logrus"
 )
 
 // Balancer compares the contents of keepstore servers with the
@@ -31,10 +35,12 @@ import (
 // BlobSignatureTTL; and all N existing replicas of a given data block
 // are in the N best positions in rendezvous probe order.
 type Balancer struct {
-       Logger  *logrus.Logger
-       Dumper  *logrus.Logger
+       Logger  logrus.FieldLogger
+       Dumper  logrus.FieldLogger
        Metrics *metrics
 
+       LostBlocksFile string
+
        *BlockStateMap
        KeepServices       map[string]*KeepService
        DefaultReplication int
@@ -48,6 +54,7 @@ type Balancer struct {
        errors        []error
        stats         balancerStats
        mutex         sync.Mutex
+       lostBlocks    io.Writer
 }
 
 // Run performs a balance operation using the given config and
@@ -59,29 +66,49 @@ type Balancer struct {
 // Typical usage:
 //
 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
        defer bal.time("sweep", "wall clock time to run one full sweep")()
 
-       if len(config.KeepServiceList.Items) > 0 {
-               err = bal.SetKeepServices(config.KeepServiceList)
+       var lbFile *os.File
+       if bal.LostBlocksFile != "" {
+               tmpfn := bal.LostBlocksFile + ".tmp"
+               lbFile, err = os.OpenFile(tmpfn, os.O_CREATE|os.O_WRONLY, 0777)
+               if err != nil {
+                       return
+               }
+               defer lbFile.Close()
+               err = syscall.Flock(int(lbFile.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
+               if err != nil {
+                       return
+               }
+               defer func() {
+                       // Remove the tempfile only if we didn't get
+                       // as far as successfully renaming it.
+                       if lbFile != nil {
+                               os.Remove(tmpfn)
+                       }
+               }()
+               bal.lostBlocks = lbFile
        } else {
-               err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
+               bal.lostBlocks = ioutil.Discard
        }
+
+       err = bal.DiscoverKeepServices(client)
        if err != nil {
                return
        }
 
        for _, srv := range bal.KeepServices {
-               err = srv.discoverMounts(&config.Client)
+               err = srv.discoverMounts(client)
                if err != nil {
                        return
                }
        }
        bal.cleanupMounts()
 
-       if err = bal.CheckSanityEarly(&config.Client); err != nil {
+       if err = bal.CheckSanityEarly(client); err != nil {
                return
        }
        rs := bal.rendezvousState()
@@ -90,7 +117,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                        bal.logf("notice: KeepServices list has changed since last run")
                }
                bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
-               if err = bal.ClearTrashLists(&config.Client); err != nil {
+               if err = bal.ClearTrashLists(client); err != nil {
                        return
                }
                // The current rendezvous state becomes "safe" (i.e.,
@@ -99,7 +126,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                // succeed in clearing existing trash lists.
                nextRunOptions.SafeRendezvousState = rs
        }
-       if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
+       if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -107,15 +134,26 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
        if err = bal.CheckSanityLate(); err != nil {
                return
        }
+       if lbFile != nil {
+               err = lbFile.Sync()
+               if err != nil {
+                       return
+               }
+               err = os.Rename(bal.LostBlocksFile+".tmp", bal.LostBlocksFile)
+               if err != nil {
+                       return
+               }
+               lbFile = nil
+       }
        if runOptions.CommitPulls {
-               err = bal.CommitPulls(&config.Client)
+               err = bal.CommitPulls(client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
        if runOptions.CommitTrash {
-               err = bal.CommitTrash(&config.Client)
+               err = bal.CommitTrash(client)
        }
        return
 }
@@ -134,15 +172,11 @@ func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
 
 // DiscoverKeepServices sets the list of KeepServices by calling the
 // API to get a list of all services, and selecting the ones whose
-// ServiceType is in okTypes.
-func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
+// ServiceType is "disk"
+func (bal *Balancer) DiscoverKeepServices(c *arvados.Client) error {
        bal.KeepServices = make(map[string]*KeepService)
-       ok := make(map[string]bool)
-       for _, t := range okTypes {
-               ok[t] = true
-       }
        return c.EachKeepService(func(srv arvados.KeepService) error {
-               if ok[srv.ServiceType] {
+               if srv.ServiceType == "disk" {
                        bal.KeepServices[srv.UUID] = &KeepService{
                                KeepService: srv,
                                ChangeSet:   &ChangeSet{},
@@ -206,6 +240,24 @@ func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
                        return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
                }
        }
+
+       var checkPage arvados.CollectionList
+       if err = c.RequestAndDecode(&checkPage, "GET", "arvados/v1/collections", nil, arvados.ResourceListParams{
+               Limit:              new(int),
+               Count:              "exact",
+               IncludeTrash:       true,
+               IncludeOldVersions: true,
+               Filters: []arvados.Filter{{
+                       Attr:     "modified_at",
+                       Operator: "=",
+                       Operand:  nil,
+               }},
+       }); err != nil {
+               return err
+       } else if n := checkPage.ItemsAvailable; n > 0 {
+               return fmt.Errorf("%d collections exist with null modified_at; cannot fetch reliably", n)
+       }
+
        return nil
 }
 
@@ -332,7 +384,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
                defer wg.Done()
                for coll := range collQ {
                        err := bal.addCollection(coll)
-                       if err != nil {
+                       if err != nil || len(errs) > 0 {
                                select {
                                case errs <- err:
                                default:
@@ -383,17 +435,20 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
 func (bal *Balancer) addCollection(coll arvados.Collection) error {
        blkids, err := coll.SizedDigests()
        if err != nil {
-               bal.mutex.Lock()
-               bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
-               bal.mutex.Unlock()
-               return nil
+               return fmt.Errorf("%v: %v", coll.UUID, err)
        }
        repl := bal.DefaultReplication
        if coll.ReplicationDesired != nil {
                repl = *coll.ReplicationDesired
        }
-       debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
-       bal.BlockStateMap.IncreaseDesired(coll.StorageClassesDesired, repl, blkids)
+       bal.Logger.Debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
+       // Pass pdh to IncreaseDesired only if LostBlocksFile is being
+       // written -- otherwise it's just a waste of memory.
+       pdh := ""
+       if bal.LostBlocksFile != "" {
+               pdh = coll.PortableDataHash
+       }
+       bal.BlockStateMap.IncreaseDesired(pdh, coll.StorageClassesDesired, repl, blkids)
        return nil
 }
 
@@ -444,7 +499,7 @@ func (bal *Balancer) ComputeChangeSets() {
 
 func (bal *Balancer) setupLookupTables() {
        bal.serviceRoots = make(map[string]string)
-       bal.classes = []string{"default"}
+       bal.classes = defaultClasses
        bal.mountsByClass = map[string]map[*KeepMount]bool{"default": {}}
        bal.mounts = 0
        for _, srv := range bal.KeepServices {
@@ -460,7 +515,7 @@ func (bal *Balancer) setupLookupTables() {
                                bal.mountsByClass["default"][mnt] = true
                                continue
                        }
-                       for _, class := range mnt.StorageClasses {
+                       for class := range mnt.StorageClasses {
                                if mbc := bal.mountsByClass[class]; mbc == nil {
                                        bal.classes = append(bal.classes, class)
                                        bal.mountsByClass[class] = map[*KeepMount]bool{mnt: true}
@@ -503,7 +558,7 @@ type balanceResult struct {
 // balanceBlock compares current state to desired state for a single
 // block, and makes the appropriate ChangeSet calls.
 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) balanceResult {
-       debugf("balanceBlock: %v %+v", blkid, blk)
+       bal.Logger.Debugf("balanceBlock: %v %+v", blkid, blk)
 
        type slot struct {
                mnt  *KeepMount // never nil
@@ -732,17 +787,17 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                                From:        slot.mnt,
                        })
                        change = changeTrash
-               case len(blk.Replicas) == 0:
-                       change = changeNone
-               case slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
+               case len(blk.Replicas) > 0 && slot.repl == nil && slot.want && !slot.mnt.ReadOnly:
                        slot.mnt.KeepService.AddPull(Pull{
                                SizedDigest: blkid,
                                From:        blk.Replicas[0].KeepMount.KeepService,
                                To:          slot.mnt,
                        })
                        change = changePull
-               default:
+               case slot.repl != nil:
                        change = changeStay
+               default:
+                       change = changeNone
                }
                if bal.Dumper != nil {
                        var mtime int64
@@ -754,7 +809,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                }
        }
        if bal.Dumper != nil {
-               bal.Dumper.Printf("%s have=%d want=%v %s", blkid, have, want, strings.Join(changes, " "))
+               bal.Dumper.Printf("%s refs=%d have=%d want=%v %v %v", blkid, blk.RefCount, have, want, blk.Desired, changes)
        }
        return balanceResult{
                blk:        blk,
@@ -867,6 +922,11 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                        s.lost.replicas -= surplus
                        s.lost.blocks++
                        s.lost.bytes += bytes * int64(-surplus)
+                       fmt.Fprintf(bal.lostBlocks, "%s", strings.SplitN(string(result.blkid), "+", 2)[0])
+                       for pdh := range result.blk.Refs {
+                               fmt.Fprintf(bal.lostBlocks, " %s", pdh)
+                       }
+                       fmt.Fprint(bal.lostBlocks, "\n")
                case surplus < 0:
                        s.underrep.replicas -= surplus
                        s.underrep.blocks++