X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6eb3d1fb8fe71623fa63da46c250184cf2e4fbb8..44321d4b5e7677df2ad70d88db6358b2778b58e5:/services/datamanager/summary/pull_list.go diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go index fb1fed17f6..b326c9521a 100644 --- a/services/datamanager/summary/pull_list.go +++ b/services/datamanager/summary/pull_list.go @@ -14,35 +14,39 @@ import ( "strings" ) -type Locator struct { - Digest blockdigest.BlockDigest - // TODO(misha): Add size field to the Locator (and MarshalJSON() below) -} +type Locator blockdigest.DigestWithSize func (l Locator) MarshalJSON() ([]byte, error) { - return []byte("\"" + l.Digest.String() + "\""), nil + return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil } // One entry in the Pull List -type PullListEntry struct { +type PullRequest struct { Locator Locator `json:"locator"` Servers []string `json:"servers"` } // The Pull List for a particular server -type PullList struct { - Entries []PullListEntry `json:"blocks"` -} +type PullList []PullRequest -// EntriesByDigest implements sort.Interface for []PullListEntry -// based on the Digest. -type EntriesByDigest []PullListEntry +// PullListByLocator implements sort.Interface for PullList based on +// the Digest. +type PullListByLocator PullList -func (a EntriesByDigest) Len() int { return len(a) } -func (a EntriesByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a EntriesByDigest) Less(i, j int) bool { +func (a PullListByLocator) Len() int { return len(a) } +func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a PullListByLocator) Less(i, j int) bool { di, dj := a[i].Locator.Digest, a[j].Locator.Digest - return di.H < dj.H || (di.H == dj.H && di.L < dj.L) + if di.H < dj.H { + return true + } else if di.H == dj.H { + if di.L < dj.L { + return true + } else if di.L == dj.L { + return a[i].Locator.Size < a[j].Locator.Size + } + } + return false } // For a given under-replicated block, this structure represents which @@ -55,15 +59,25 @@ type PullServers struct { // Creates a map from block locator to PullServers with one entry for // each under-replicated block. +// +// This method ignores zero-replica blocks since there are no servers +// to pull them from, so callers should feel free to omit them, but +// this function will ignore them if they are provided. func ComputePullServers(kc *keepclient.KeepClient, keepServerInfo *keep.ReadServers, - blockToDesiredReplication map[blockdigest.BlockDigest]int, + blockToDesiredReplication map[blockdigest.DigestWithSize]int, underReplicated BlockSet) (m map[Locator]PullServers) { m = map[Locator]PullServers{} // We use CanonicalString to avoid filling memory with dupicate // copies of the same string. var cs CanonicalString + // Servers that are writeable + writableServers := map[string]struct{}{} + for _, url := range kc.WritableLocalRoots() { + writableServers[cs.Get(url)] = struct{}{} + } + for block, _ := range underReplicated { serversStoringBlock := keepServerInfo.BlockToServers[block] numCopies := len(serversStoringBlock) @@ -71,24 +85,24 @@ func ComputePullServers(kc *keepclient.KeepClient, if numCopiesMissing > 0 { // We expect this to always be true, since the block was listed // in underReplicated. - // TODO(misha): Consider asserting the above conditional. if numCopies > 0 { - // I believe that we should expect this to always be true. + // Not much we can do with blocks with no copies. // A server's host-port string appears as a key in this map // iff it contains the block. serverHasBlock := map[string]struct{}{} for _, info := range serversStoringBlock { sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex] - serverHasBlock[sa.HostPort()] = struct{}{} + serverHasBlock[cs.Get(sa.URL())] = struct{}{} } - roots := keepclient.NewRootSorter(kc.ServiceRoots(), + roots := keepclient.NewRootSorter(kc.LocalRoots(), block.String()).GetSortedRoots() - l := Locator{Digest: block} - m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing) + l := Locator(block) + m[l] = CreatePullServers(cs, serverHasBlock, writableServers, + roots, numCopiesMissing) } } } @@ -100,6 +114,7 @@ func ComputePullServers(kc *keepclient.KeepClient, // strings. func CreatePullServers(cs CanonicalString, serverHasBlock map[string]struct{}, + writableServers map[string]struct{}, sortedServers []string, maxToFields int) (ps PullServers) { @@ -111,12 +126,16 @@ func CreatePullServers(cs CanonicalString, for _, host := range sortedServers { // Strip the protocol portion of the url. // Use the canonical copy of the string to avoid memory waste. - server := cs.Get(RemoveProtocolPrefix(host)) + server := cs.Get(host) _, hasBlock := serverHasBlock[server] if hasBlock { - ps.From = append(ps.From, server) + // The from field should include the protocol. + ps.From = append(ps.From, cs.Get(host)) } else if len(ps.To) < maxToFields { - ps.To = append(ps.To, server) + _, writable := writableServers[host] + if writable { + ps.To = append(ps.To, server) + } } } @@ -138,12 +157,10 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) { for _, destination := range pullServers.To { pullList, pullListExists := spl[destination] if !pullListExists { - pullList = PullList{Entries: []PullListEntry{}} - spl[destination] = pullList + pullList = PullList{} } - pullList.Entries = append(pullList.Entries, - PullListEntry{Locator: locator, Servers: pullServers.From}) - spl[destination] = pullList + spl[destination] = append(pullList, + PullRequest{Locator: locator, Servers: pullServers.From}) } } return @@ -158,7 +175,7 @@ func WritePullLists(arvLogger *logger.Logger, pullLists map[string]PullList) { r := strings.NewReplacer(":", ".") for host, list := range pullLists { - filename := fmt.Sprintf("pull_list.%s", r.Replace(host)) + filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host))) pullListFile, err := os.Create(filename) if err != nil { loggerutil.FatalWithMessage(arvLogger,