Merge branch 'master' into 6260-test-datamanager
[arvados.git] / services / datamanager / summary / pull_list.go
index 726f2c69a71a3ad9ecdd1564f5103d29412defa5..b326c9521ab0d7b545fd52c340c2b17455ea5aa5 100644 (file)
@@ -14,13 +14,10 @@ import (
        "strings"
 )
 
-type Locator struct {
-       Digest blockdigest.BlockDigest
-       // TODO(misha): Add size field to the Locator (and MarshalJSON() below)
-}
+type Locator blockdigest.DigestWithSize
 
 func (l Locator) MarshalJSON() ([]byte, error) {
-       return []byte("\"" + l.Digest.String() + "\""), nil
+       return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
 }
 
 // One entry in the Pull List
@@ -32,15 +29,24 @@ type PullRequest struct {
 // The Pull List for a particular server
 type PullList []PullRequest
 
-// PullListByDigest implements sort.Interface for PullList based on
+// PullListByLocator implements sort.Interface for PullList based on
 // the Digest.
-type PullListByDigest PullList
+type PullListByLocator PullList
 
-func (a PullListByDigest) Len() int      { return len(a) }
-func (a PullListByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a PullListByDigest) Less(i, j int) bool {
+func (a PullListByLocator) Len() int      { return len(a) }
+func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a PullListByLocator) Less(i, j int) bool {
        di, dj := a[i].Locator.Digest, a[j].Locator.Digest
-       return di.H < dj.H || (di.H == dj.H && di.L < dj.L)
+       if di.H < dj.H {
+               return true
+       } else if di.H == dj.H {
+               if di.L < dj.L {
+                       return true
+               } else if di.L == dj.L {
+                       return a[i].Locator.Size < a[j].Locator.Size
+               }
+       }
+       return false
 }
 
 // For a given under-replicated block, this structure represents which
@@ -53,9 +59,13 @@ type PullServers struct {
 
 // Creates a map from block locator to PullServers with one entry for
 // each under-replicated block.
+//
+// This method ignores zero-replica blocks since there are no servers
+// to pull them from, so callers should feel free to omit them, but
+// this function will ignore them if they are provided.
 func ComputePullServers(kc *keepclient.KeepClient,
        keepServerInfo *keep.ReadServers,
-       blockToDesiredReplication map[blockdigest.BlockDigest]int,
+       blockToDesiredReplication map[blockdigest.DigestWithSize]int,
        underReplicated BlockSet) (m map[Locator]PullServers) {
        m = map[Locator]PullServers{}
        // We use CanonicalString to avoid filling memory with dupicate
@@ -65,7 +75,7 @@ func ComputePullServers(kc *keepclient.KeepClient,
        // Servers that are writeable
        writableServers := map[string]struct{}{}
        for _, url := range kc.WritableLocalRoots() {
-               writableServers[cs.Get(RemoveProtocolPrefix(url))] = struct{}{}
+               writableServers[cs.Get(url)] = struct{}{}
        }
 
        for block, _ := range underReplicated {
@@ -75,23 +85,22 @@ func ComputePullServers(kc *keepclient.KeepClient,
                if numCopiesMissing > 0 {
                        // We expect this to always be true, since the block was listed
                        // in underReplicated.
-                       // TODO(misha): Consider asserting the above conditional.
 
                        if numCopies > 0 {
-                               // I believe that we should expect this to always be true.
+                               // Not much we can do with blocks with no copies.
 
                                // A server's host-port string appears as a key in this map
                                // iff it contains the block.
                                serverHasBlock := map[string]struct{}{}
                                for _, info := range serversStoringBlock {
                                        sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
-                                       serverHasBlock[cs.Get(sa.HostPort())] = struct{}{}
+                                       serverHasBlock[cs.Get(sa.URL())] = struct{}{}
                                }
 
                                roots := keepclient.NewRootSorter(kc.LocalRoots(),
                                        block.String()).GetSortedRoots()
 
-                               l := Locator{Digest: block}
+                               l := Locator(block)
                                m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
                                        roots, numCopiesMissing)
                        }
@@ -117,12 +126,13 @@ func CreatePullServers(cs CanonicalString,
        for _, host := range sortedServers {
                // Strip the protocol portion of the url.
                // Use the canonical copy of the string to avoid memory waste.
-               server := cs.Get(RemoveProtocolPrefix(host))
+               server := cs.Get(host)
                _, hasBlock := serverHasBlock[server]
                if hasBlock {
-                       ps.From = append(ps.From, server)
+                       // The from field should include the protocol.
+                       ps.From = append(ps.From, cs.Get(host))
                } else if len(ps.To) < maxToFields {
-                       _, writable := writableServers[server]
+                       _, writable := writableServers[host]
                        if writable {
                                ps.To = append(ps.To, server)
                        }
@@ -165,7 +175,7 @@ func WritePullLists(arvLogger *logger.Logger,
        pullLists map[string]PullList) {
        r := strings.NewReplacer(":", ".")
        for host, list := range pullLists {
-               filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+               filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
                pullListFile, err := os.Create(filename)
                if err != nil {
                        loggerutil.FatalWithMessage(arvLogger,