// Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
+
package summary
import (
"strings"
)
-type Locator struct {
- Digest blockdigest.BlockDigest
- // TODO(misha): Add size field to the Locator (and MarshalJSON() below)
-}
+// Locator is a block digest
+type Locator blockdigest.DigestWithSize
+// MarshalJSON encoding
func (l Locator) MarshalJSON() ([]byte, error) {
- return []byte("\"" + l.Digest.String() + "\""), nil
+ return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
}
-// One entry in the Pull List
-type PullListEntry struct {
+// PullRequest represents one entry in the Pull List
+type PullRequest struct {
Locator Locator `json:"locator"`
Servers []string `json:"servers"`
}
-// The Pull List for a particular server
-type PullList struct {
- Entries []PullListEntry `json:"blocks"`
-}
+// PullList for a particular server
+type PullList []PullRequest
-// EntriesByDigest implements sort.Interface for []PullListEntry
-// based on the Digest.
-type EntriesByDigest []PullListEntry
+// PullListByLocator implements sort.Interface for PullList based on
+// the Digest.
+type PullListByLocator PullList
-func (a EntriesByDigest) Len() int { return len(a) }
-func (a EntriesByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a EntriesByDigest) Less(i, j int) bool {
+func (a PullListByLocator) Len() int { return len(a) }
+func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a PullListByLocator) Less(i, j int) bool {
di, dj := a[i].Locator.Digest, a[j].Locator.Digest
- return di.H < dj.H || (di.H == dj.H && di.L < dj.L)
+ if di.H < dj.H {
+ return true
+ } else if di.H == dj.H {
+ if di.L < dj.L {
+ return true
+ } else if di.L == dj.L {
+ return a[i].Locator.Size < a[j].Locator.Size
+ }
+ }
+ return false
}
+// PullServers struct
// For a given under-replicated block, this structure represents which
// servers should pull the specified block and which servers they can
// pull it from.
From []string // Servers that already contain the specified block
}
-// Creates a map from block locator to PullServers with one entry for
-// each under-replicated block.
+// ComputePullServers creates a map from block locator to PullServers
+// with one entry for each under-replicated block.
+//
+// This method ignores zero-replica blocks since there are no servers
+// to pull them from, so callers should feel free to omit them, but
+// this function will ignore them if they are provided.
func ComputePullServers(kc *keepclient.KeepClient,
keepServerInfo *keep.ReadServers,
- blockToDesiredReplication map[blockdigest.BlockDigest]int,
+ blockToDesiredReplication map[blockdigest.DigestWithSize]int,
underReplicated BlockSet) (m map[Locator]PullServers) {
m = map[Locator]PullServers{}
// We use CanonicalString to avoid filling memory with dupicate
// copies of the same string.
var cs CanonicalString
- for block, _ := range underReplicated {
+ // Servers that are writeable
+ writableServers := map[string]struct{}{}
+ for _, url := range kc.WritableLocalRoots() {
+ writableServers[cs.Get(url)] = struct{}{}
+ }
+
+ for block := range underReplicated {
serversStoringBlock := keepServerInfo.BlockToServers[block]
numCopies := len(serversStoringBlock)
numCopiesMissing := blockToDesiredReplication[block] - numCopies
if numCopiesMissing > 0 {
// We expect this to always be true, since the block was listed
// in underReplicated.
- // TODO(misha): Consider asserting the above conditional.
if numCopies > 0 {
- // I believe that we should expect this to always be true.
+ // Not much we can do with blocks with no copies.
// A server's host-port string appears as a key in this map
// iff it contains the block.
serverHasBlock := map[string]struct{}{}
for _, info := range serversStoringBlock {
sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
- serverHasBlock[sa.HostPort()] = struct{}{}
+ serverHasBlock[cs.Get(sa.URL())] = struct{}{}
}
- roots := keepclient.NewRootSorter(kc.ServiceRoots(),
+ roots := keepclient.NewRootSorter(kc.LocalRoots(),
block.String()).GetSortedRoots()
- l := Locator{Digest: block}
- m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing)
+ l := Locator(block)
+ m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
+ roots, numCopiesMissing)
}
}
}
return m
}
-// Creates a pull list in which the To and From fields preserve the
-// ordering of sorted servers and the contents are all canonical
-// strings.
+// CreatePullServers creates a pull list in which the To and From
+// fields preserve the ordering of sorted servers and the contents
+// are all canonical strings.
func CreatePullServers(cs CanonicalString,
serverHasBlock map[string]struct{},
+ writableServers map[string]struct{},
sortedServers []string,
maxToFields int) (ps PullServers) {
for _, host := range sortedServers {
// Strip the protocol portion of the url.
// Use the canonical copy of the string to avoid memory waste.
- server := cs.Get(RemoveProtocolPrefix(host))
+ server := cs.Get(host)
_, hasBlock := serverHasBlock[server]
if hasBlock {
- ps.From = append(ps.From, server)
+ // The from field should include the protocol.
+ ps.From = append(ps.From, cs.Get(host))
} else if len(ps.To) < maxToFields {
- ps.To = append(ps.To, server)
+ _, writable := writableServers[host]
+ if writable {
+ ps.To = append(ps.To, server)
+ }
}
}
return
}
-// Strips the protocol prefix from a url.
+// RemoveProtocolPrefix strips the protocol prefix from a url.
func RemoveProtocolPrefix(url string) string {
return url[(strings.LastIndex(url, "/") + 1):]
}
-// Produces a PullList for each keep server.
+// BuildPullLists produces a PullList for each keep server.
func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
spl = map[string]PullList{}
// We don't worry about canonicalizing our strings here, because we
for _, destination := range pullServers.To {
pullList, pullListExists := spl[destination]
if !pullListExists {
- pullList = PullList{Entries: []PullListEntry{}}
- spl[destination] = pullList
+ pullList = PullList{}
}
- pullList.Entries = append(pullList.Entries,
- PullListEntry{Locator: locator, Servers: pullServers.From})
- spl[destination] = pullList
+ spl[destination] = append(pullList,
+ PullRequest{Locator: locator, Servers: pullServers.From})
}
}
return
}
-// Writes each pull list to a file.
+// WritePullLists writes each pull list to a file.
// The filename is based on the hostname.
//
// This is just a hack for prototyping, it is not expected to be used
pullLists map[string]PullList) {
r := strings.NewReplacer(":", ".")
for host, list := range pullLists {
- filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+ filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
pullListFile, err := os.Create(filename)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,