Fix a bunch of misspellings in our Go code (all in comments).
[arvados.git] / services / datamanager / summary / pull_list.go
index fb1fed17f6014cd028b557bd6afbee1a195c4daa..d7fb3eb8f7cb3953d0f40e0e67626376f0d2941a 100644 (file)
@@ -1,4 +1,5 @@
 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
+
 package summary
 
 import (
@@ -8,43 +9,49 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
        "strings"
 )
 
-type Locator struct {
-       Digest blockdigest.BlockDigest
-       // TODO(misha): Add size field to the Locator (and MarshalJSON() below)
-}
+// Locator is a block digest
+type Locator blockdigest.DigestWithSize
 
+// MarshalJSON encoding
 func (l Locator) MarshalJSON() ([]byte, error) {
-       return []byte("\"" + l.Digest.String() + "\""), nil
+       return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
 }
 
-// One entry in the Pull List
-type PullListEntry struct {
+// PullRequest represents one entry in the Pull List
+type PullRequest struct {
        Locator Locator  `json:"locator"`
        Servers []string `json:"servers"`
 }
 
-// The Pull List for a particular server
-type PullList struct {
-       Entries []PullListEntry `json:"blocks"`
-}
+// PullList for a particular server
+type PullList []PullRequest
 
-// EntriesByDigest implements sort.Interface for []PullListEntry
-// based on the Digest.
-type EntriesByDigest []PullListEntry
+// PullListByLocator implements sort.Interface for PullList based on
+// the Digest.
+type PullListByLocator PullList
 
-func (a EntriesByDigest) Len() int      { return len(a) }
-func (a EntriesByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a EntriesByDigest) Less(i, j int) bool {
+func (a PullListByLocator) Len() int      { return len(a) }
+func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
+func (a PullListByLocator) Less(i, j int) bool {
        di, dj := a[i].Locator.Digest, a[j].Locator.Digest
-       return di.H < dj.H || (di.H == dj.H && di.L < dj.L)
+       if di.H < dj.H {
+               return true
+       } else if di.H == dj.H {
+               if di.L < dj.L {
+                       return true
+               } else if di.L == dj.L {
+                       return a[i].Locator.Size < a[j].Locator.Size
+               }
+       }
+       return false
 }
 
+// PullServers struct
 // For a given under-replicated block, this structure represents which
 // servers should pull the specified block and which servers they can
 // pull it from.
@@ -53,53 +60,64 @@ type PullServers struct {
        From []string // Servers that already contain the specified block
 }
 
-// Creates a map from block locator to PullServers with one entry for
-// each under-replicated block.
+// ComputePullServers creates a map from block locator to PullServers
+// with one entry for each under-replicated block.
+//
+// This method ignores zero-replica blocks since there are no servers
+// to pull them from, so callers should feel free to omit them, but
+// this function will ignore them if they are provided.
 func ComputePullServers(kc *keepclient.KeepClient,
        keepServerInfo *keep.ReadServers,
-       blockToDesiredReplication map[blockdigest.BlockDigest]int,
+       blockToDesiredReplication map[blockdigest.DigestWithSize]int,
        underReplicated BlockSet) (m map[Locator]PullServers) {
        m = map[Locator]PullServers{}
-       // We use CanonicalString to avoid filling memory with dupicate
+       // We use CanonicalString to avoid filling memory with duplicate
        // copies of the same string.
        var cs CanonicalString
 
-       for block, _ := range underReplicated {
+       // Servers that are writeable
+       writableServers := map[string]struct{}{}
+       for _, url := range kc.WritableLocalRoots() {
+               writableServers[cs.Get(url)] = struct{}{}
+       }
+
+       for block := range underReplicated {
                serversStoringBlock := keepServerInfo.BlockToServers[block]
                numCopies := len(serversStoringBlock)
                numCopiesMissing := blockToDesiredReplication[block] - numCopies
                if numCopiesMissing > 0 {
                        // We expect this to always be true, since the block was listed
                        // in underReplicated.
-                       // TODO(misha): Consider asserting the above conditional.
 
                        if numCopies > 0 {
-                               // I believe that we should expect this to always be true.
+                               // Not much we can do with blocks with no copies.
 
                                // A server's host-port string appears as a key in this map
                                // iff it contains the block.
                                serverHasBlock := map[string]struct{}{}
                                for _, info := range serversStoringBlock {
                                        sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
-                                       serverHasBlock[sa.HostPort()] = struct{}{}
+                                       serverHasBlock[cs.Get(sa.URL())] = struct{}{}
                                }
 
-                               roots := keepclient.NewRootSorter(kc.ServiceRoots(),
+                               roots := keepclient.NewRootSorter(kc.LocalRoots(),
                                        block.String()).GetSortedRoots()
 
-                               l := Locator{Digest: block}
-                               m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing)
+                               l := Locator(block)
+                               m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
+                                       roots, numCopiesMissing)
                        }
                }
        }
        return m
 }
 
-// Creates a pull list in which the To and From fields preserve the
-// ordering of sorted servers and the contents are all canonical
-// strings.
+// CreatePullServers creates a pull list in which the To and From
+// fields preserve the ordering of sorted servers and the contents
+// are all canonical strings.
 func CreatePullServers(cs CanonicalString,
        serverHasBlock map[string]struct{},
+       writableServers map[string]struct{},
        sortedServers []string,
        maxToFields int) (ps PullServers) {
 
@@ -111,24 +129,28 @@ func CreatePullServers(cs CanonicalString,
        for _, host := range sortedServers {
                // Strip the protocol portion of the url.
                // Use the canonical copy of the string to avoid memory waste.
-               server := cs.Get(RemoveProtocolPrefix(host))
+               server := cs.Get(host)
                _, hasBlock := serverHasBlock[server]
                if hasBlock {
-                       ps.From = append(ps.From, server)
+                       // The from field should include the protocol.
+                       ps.From = append(ps.From, cs.Get(host))
                } else if len(ps.To) < maxToFields {
-                       ps.To = append(ps.To, server)
+                       _, writable := writableServers[host]
+                       if writable {
+                               ps.To = append(ps.To, server)
+                       }
                }
        }
 
        return
 }
 
-// Strips the protocol prefix from a url.
+// RemoveProtocolPrefix strips the protocol prefix from a url.
 func RemoveProtocolPrefix(url string) string {
        return url[(strings.LastIndex(url, "/") + 1):]
 }
 
-// Produces a PullList for each keep server.
+// BuildPullLists produces a PullList for each keep server.
 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
        spl = map[string]PullList{}
        // We don't worry about canonicalizing our strings here, because we
@@ -138,40 +160,56 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
                for _, destination := range pullServers.To {
                        pullList, pullListExists := spl[destination]
                        if !pullListExists {
-                               pullList = PullList{Entries: []PullListEntry{}}
-                               spl[destination] = pullList
+                               pullList = PullList{}
                        }
-                       pullList.Entries = append(pullList.Entries,
-                               PullListEntry{Locator: locator, Servers: pullServers.From})
-                       spl[destination] = pullList
+                       spl[destination] = append(pullList,
+                               PullRequest{Locator: locator, Servers: pullServers.From})
                }
        }
        return
 }
 
-// Writes each pull list to a file.
+// WritePullLists writes each pull list to a file.
 // The filename is based on the hostname.
 //
 // This is just a hack for prototyping, it is not expected to be used
 // in production.
 func WritePullLists(arvLogger *logger.Logger,
-       pullLists map[string]PullList) {
+       pullLists map[string]PullList,
+       dryRun bool) error {
        r := strings.NewReplacer(":", ".")
+
        for host, list := range pullLists {
-               filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+               if arvLogger != nil {
+                       // We need a local variable because Update doesn't call our mutator func until later,
+                       // when our list variable might have been reused by the next loop iteration.
+                       host := host
+                       listLen := len(list)
+                       arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                               pullListInfo := logger.GetOrCreateMap(p, "pull_list_len")
+                               pullListInfo[host] = listLen
+                       })
+               }
+
+               if dryRun {
+                       log.Print("dry run, not sending pull list to service %s with %d blocks", host, len(list))
+                       continue
+               }
+
+               filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
                pullListFile, err := os.Create(filename)
                if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to open %s: %v", filename, err))
+                       return err
                }
                defer pullListFile.Close()
 
                enc := json.NewEncoder(pullListFile)
                err = enc.Encode(list)
                if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
+                       return err
                }
                log.Printf("Wrote pull list to %s.", filename)
        }
+
+       return nil
 }