10181: Merge branch 'master' into 10181-incremental-log
[arvados.git] / services / keep-balance / balance.go
index 328e6234221f157f8ac18be525b5460d3a0ada93..d86234a936cc96702f3a79d12c10d04548c0faa2 100644 (file)
@@ -95,7 +95,7 @@ func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions R
                        return
                }
        }
-       bal.dedupDevices()
+       bal.cleanupMounts()
 
        if err = bal.CheckSanityEarly(&config.Client); err != nil {
                return
@@ -170,7 +170,7 @@ func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) e
        })
 }
 
-func (bal *Balancer) dedupDevices() {
+func (bal *Balancer) cleanupMounts() {
        rwdev := map[string]*KeepService{}
        for _, srv := range bal.KeepServices {
                for _, mnt := range srv.mounts {
@@ -192,6 +192,14 @@ func (bal *Balancer) dedupDevices() {
                }
                srv.mounts = dedup
        }
+       for _, srv := range bal.KeepServices {
+               for _, mnt := range srv.mounts {
+                       if mnt.Replication <= 0 {
+                               log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
+                               mnt.Replication = 1
+                       }
+               }
+       }
 }
 
 // CheckSanityEarly checks for configuration and runtime errors that
@@ -274,32 +282,54 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
        errs := make(chan error, 2+len(bal.KeepServices))
        wg := sync.WaitGroup{}
 
-       // Start one goroutine for each KeepService: retrieve the
-       // index, and add the returned blocks to BlockStateMap.
+       // When a device is mounted more than once, we will get its
+       // index only once, and call AddReplicas on all of the mounts.
+       // equivMount keys are the mounts that will be indexed, and
+       // each value is a list of mounts to apply the received index
+       // to.
+       equivMount := map[*KeepMount][]*KeepMount{}
+       // deviceMount maps each device ID to the one mount that will
+       // be indexed for that device.
+       deviceMount := map[string]*KeepMount{}
        for _, srv := range bal.KeepServices {
+               for _, mnt := range srv.mounts {
+                       equiv := deviceMount[mnt.DeviceID]
+                       if equiv == nil {
+                               equiv = mnt
+                               if mnt.DeviceID != "" {
+                                       deviceMount[mnt.DeviceID] = equiv
+                               }
+                       }
+                       equivMount[equiv] = append(equivMount[equiv], mnt)
+               }
+       }
+
+       // Start one goroutine for each (non-redundant) mount:
+       // retrieve the index, and add the returned blocks to
+       // BlockStateMap.
+       for _, mounts := range equivMount {
                wg.Add(1)
-               go func(srv *KeepService) {
+               go func(mounts []*KeepMount) {
                        defer wg.Done()
-                       bal.logf("%s: retrieve indexes", srv)
-                       for _, mount := range srv.mounts {
-                               bal.logf("%s: retrieve index", mount)
-                               idx, err := srv.IndexMount(c, mount.UUID, "")
-                               if err != nil {
-                                       errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
-                                       return
-                               }
-                               if len(errs) > 0 {
-                                       // Some other goroutine encountered an
-                                       // error -- any further effort here
-                                       // will be wasted.
-                                       return
-                               }
+                       bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
+                       idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+                       if err != nil {
+                               errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
+                               return
+                       }
+                       if len(errs) > 0 {
+                               // Some other goroutine encountered an
+                               // error -- any further effort here
+                               // will be wasted.
+                               return
+                       }
+                       for _, mount := range mounts {
                                bal.logf("%s: add %d replicas to map", mount, len(idx))
                                bal.BlockStateMap.AddReplicas(mount, idx)
-                               bal.logf("%s: done", mount)
+                               bal.logf("%s: added %d replicas", mount, len(idx))
                        }
-                       bal.logf("%s: done", srv)
-               }(srv)
+                       bal.logf("mount %s: index done", mounts[0])
+               }(mounts)
        }
 
        // collQ buffers incoming collections so we can start fetching
@@ -532,7 +562,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                have := 0
                for _, slot := range slots {
                        if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
-                               have++
+                               have += slot.mnt.Replication
                                if slot.mnt.DeviceID != "" {
                                        countedDev[slot.mnt.DeviceID] = true
                                }
@@ -592,30 +622,36 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                // trashing replicas that aren't optimal positions for
                // any storage class.
                protMnt := map[*KeepMount]bool{}
+               // Replication planned so far (corresponds to wantMnt).
+               replWant := 0
+               // Protected replication (corresponds to protMnt).
+               replProt := 0
 
                // trySlot tries using a slot to meet requirements,
                // and returns true if all requirements are met.
                trySlot := func(i int) bool {
                        slot := slots[i]
-                       if wantDev[slot.mnt.DeviceID] {
+                       if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
                                // Already allocated a replica to this
                                // backend device, possibly on a
                                // different server.
                                return false
                        }
-                       if len(protMnt) < desired && slot.repl != nil {
+                       if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
                                unsafeToDelete[slot.repl.Mtime] = true
                                protMnt[slot.mnt] = true
+                               replProt += slot.mnt.Replication
                        }
-                       if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
+                       if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
                                slots[i].want = true
                                wantSrv[slot.mnt.KeepService] = true
                                wantMnt[slot.mnt] = true
                                if slot.mnt.DeviceID != "" {
                                        wantDev[slot.mnt.DeviceID] = true
                                }
+                               replWant += slot.mnt.Replication
                        }
-                       return len(protMnt) >= desired && len(wantMnt) >= desired
+                       return replProt >= desired && replWant >= desired
                }
 
                // First try to achieve desired replication without
@@ -642,7 +678,7 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
                                if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
                                        continue
                                }
-                               if safe++; safe >= desired {
+                               if safe += slot.mnt.Replication; safe >= desired {
                                        break
                                }
                        }
@@ -677,14 +713,17 @@ func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) ba
        countedDev := map[string]bool{}
        var have, want int
        for _, slot := range slots {
+               if countedDev[slot.mnt.DeviceID] {
+                       continue
+               }
                if slot.want {
-                       want++
+                       want += slot.mnt.Replication
                }
-               if slot.repl != nil && !countedDev[slot.mnt.DeviceID] {
-                       have++
-                       if slot.mnt.DeviceID != "" {
-                               countedDev[slot.mnt.DeviceID] = true
-                       }
+               if slot.repl != nil {
+                       have += slot.mnt.Replication
+               }
+               if slot.mnt.DeviceID != "" {
+                       countedDev[slot.mnt.DeviceID] = true
                }
        }
 
@@ -826,7 +865,7 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                case surplus > 0:
                        s.overrep.replicas += surplus
                        s.overrep.blocks++
-                       s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
+                       s.overrep.bytes += bytes * int64(result.have-result.want)
                default:
                        s.justright.replicas += result.want
                        s.justright.blocks++
@@ -838,16 +877,16 @@ func (bal *Balancer) collectStatistics(results <-chan balanceResult) {
                        s.desired.blocks++
                        s.desired.bytes += bytes * int64(result.want)
                }
-               if len(result.blk.Replicas) > 0 {
-                       s.current.replicas += len(result.blk.Replicas)
+               if result.have > 0 {
+                       s.current.replicas += result.have
                        s.current.blocks++
-                       s.current.bytes += bytes * int64(len(result.blk.Replicas))
+                       s.current.bytes += bytes * int64(result.have)
                }
 
-               for len(s.replHistogram) <= len(result.blk.Replicas) {
+               for len(s.replHistogram) <= result.have {
                        s.replHistogram = append(s.replHistogram, 0)
                }
-               s.replHistogram[len(result.blk.Replicas)]++
+               s.replHistogram[result.have]++
        }
        for _, srv := range bal.KeepServices {
                s.pulls += len(srv.ChangeSet.Pulls)