- // Assign existing replicas to slots.
- for ri := range blk.Replicas {
- repl := &blk.Replicas[ri]
- srv := repl.KeepService
- slotIdx := rendezvousOrder[srv]
- if slots[slotIdx].repl != nil {
- // Additional replicas on a single server are
- // considered non-optimal. Within this
- // category, we don't try to optimize layout:
- // we just say the optimal order is the order
- // we encounter them.
- slotIdx = len(slots)
- slots = append(slots, slot{srv: srv})
- }
- slots[slotIdx].repl = repl
- }
-
- // number of replicas already found in positions better than
- // the position we're contemplating now.
- reportedBestRepl := 0
- // To be safe we assume two replicas with the same Mtime are
- // in fact the same replica being reported more than
- // once. len(uniqueBestRepl) is the number of distinct
- // replicas in the best rendezvous positions we've considered
- // so far.
- uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
- // pulls is the number of Pull changes we have already
- // requested. (For purposes of deciding whether to Pull to
- // rendezvous position N, we should assume all pulls we have
- // requested on rendezvous positions M<N will be successful.)
- pulls := 0
+ unsafeToDelete := make(map[int64]bool, len(slots))
+ for _, class := range bal.classes {
+ desired := blk.Desired[class]
+ if desired == 0 {
+ continue
+ }
+
+ // Sort the slots by desirability.
+ sort.Slice(slots, func(i, j int) bool {
+ si, sj := slots[i], slots[j]
+ if classi, classj := bal.mountsByClass[class][si.mnt], bal.mountsByClass[class][sj.mnt]; classi != classj {
+ // Prefer a mount that satisfies the
+ // desired class.
+ return bal.mountsByClass[class][si.mnt]
+ } else if si.want != sj.want {
+ // Prefer a mount that will have a
+ // replica no matter what we do here
+ // -- either because it already has an
+ // untrashable replica, or because we
+ // already need it to satisfy a
+ // different storage class.
+ return si.want
+ } else if orderi, orderj := srvRendezvous[si.mnt.KeepService], srvRendezvous[sj.mnt.KeepService]; orderi != orderj {
+ // Prefer a better rendezvous
+ // position.
+ return orderi < orderj
+ } else if repli, replj := si.repl != nil, sj.repl != nil; repli != replj {
+ // Prefer a mount that already has a
+ // replica.
+ return repli
+ } else {
+ // If pull/trash turns out to be
+ // needed, distribute the
+ // new/remaining replicas uniformly
+ // across qualifying mounts on a given
+ // server.
+ return rendezvousLess(si.mnt.DeviceID, sj.mnt.DeviceID, blkid)
+ }
+ })
+
+ // Servers/mounts/devices (with or without existing
+ // replicas) that are part of the best achievable
+ // layout for this storage class.
+ wantSrv := map[*KeepService]bool{}
+ wantMnt := map[*KeepMount]bool{}
+ wantDev := map[string]bool{}
+ // Positions (with existing replicas) that have been
+ // protected (via unsafeToDelete) to ensure we don't
+ // reduce replication below desired level when
+ // trashing replicas that aren't optimal positions for
+ // any storage class.
+ protMnt := map[*KeepMount]bool{}
+ // Replication planned so far (corresponds to wantMnt).
+ replWant := 0
+ // Protected replication (corresponds to protMnt).
+ replProt := 0
+
+ // trySlot tries using a slot to meet requirements,
+ // and returns true if all requirements are met.
+ trySlot := func(i int) bool {
+ slot := slots[i]
+ if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
+ // Already allocated a replica to this
+ // backend device, possibly on a
+ // different server.
+ return false
+ }
+ if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
+ unsafeToDelete[slot.repl.Mtime] = true
+ protMnt[slot.mnt] = true
+ replProt += slot.mnt.Replication
+ }
+ if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
+ slots[i].want = true
+ wantSrv[slot.mnt.KeepService] = true
+ wantMnt[slot.mnt] = true
+ if slot.mnt.DeviceID != "" {
+ wantDev[slot.mnt.DeviceID] = true
+ }
+ replWant += slot.mnt.Replication
+ }
+ return replProt >= desired && replWant >= desired
+ }
+
+ // First try to achieve desired replication without
+ // using the same server twice.
+ done := false
+ for i := 0; i < len(slots) && !done; i++ {
+ if !wantSrv[slots[i].mnt.KeepService] {
+ done = trySlot(i)
+ }
+ }
+
+ // If that didn't suffice, do another pass without the
+ // "distinct services" restriction. (Achieving the
+ // desired volume replication on fewer than the
+ // desired number of services is better than
+ // underreplicating.)
+ for i := 0; i < len(slots) && !done; i++ {
+ done = trySlot(i)
+ }
+
+ if !underreplicated {
+ safe := 0
+ for _, slot := range slots {
+ if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
+ continue
+ }
+ if safe += slot.mnt.Replication; safe >= desired {
+ break
+ }
+ }
+ underreplicated = safe < desired
+ }
+
+ // Avoid deleting wanted replicas from devices that
+ // are mounted on multiple servers -- even if they
+ // haven't already been added to unsafeToDelete
+ // because the servers report different Mtimes.
+ for _, slot := range slots {
+ if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
+ unsafeToDelete[slot.repl.Mtime] = true
+ }
+ }
+ }
+
+ // TODO: If multiple replicas are trashable, prefer the oldest
+ // replica that doesn't have a timestamp collision with
+ // others.
+
+ for i, slot := range slots {
+ // Don't trash (1) any replicas of an underreplicated
+ // block, even if they're in the wrong positions, or
+ // (2) any replicas whose Mtimes are identical to
+ // needed replicas (in case we're really seeing the
+ // same copy via different mounts).
+ if slot.repl != nil && (underreplicated || unsafeToDelete[slot.repl.Mtime]) {
+ slots[i].want = true
+ }
+ }
+
+ classState := make(map[string]balancedBlockState, len(bal.classes))
+ for _, class := range bal.classes {
+ classState[class] = computeBlockState(slots, bal.mountsByClass[class], len(blk.Replicas), blk.Desired[class])
+ }
+ blockState := computeBlockState(slots, nil, len(blk.Replicas), 0)
+
+ var lost bool