X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d7635fbe751b2d00dd722a038723577f344406e1..628f2f2e1bfabdb7221badab3d5189011aee5a54:/services/datamanager/summary/pull_list.go diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go index 726f2c69a7..d7fb3eb8f7 100644 --- a/services/datamanager/summary/pull_list.go +++ b/services/datamanager/summary/pull_list.go @@ -1,4 +1,5 @@ // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List + package summary import ( @@ -8,41 +9,49 @@ import ( "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/logger" "git.curoverse.com/arvados.git/services/datamanager/keep" - "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "log" "os" "strings" ) -type Locator struct { - Digest blockdigest.BlockDigest - // TODO(misha): Add size field to the Locator (and MarshalJSON() below) -} +// Locator is a block digest +type Locator blockdigest.DigestWithSize +// MarshalJSON encoding func (l Locator) MarshalJSON() ([]byte, error) { - return []byte("\"" + l.Digest.String() + "\""), nil + return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil } -// One entry in the Pull List +// PullRequest represents one entry in the Pull List type PullRequest struct { Locator Locator `json:"locator"` Servers []string `json:"servers"` } -// The Pull List for a particular server +// PullList for a particular server type PullList []PullRequest -// PullListByDigest implements sort.Interface for PullList based on +// PullListByLocator implements sort.Interface for PullList based on // the Digest. -type PullListByDigest PullList +type PullListByLocator PullList -func (a PullListByDigest) Len() int { return len(a) } -func (a PullListByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a PullListByDigest) Less(i, j int) bool { +func (a PullListByLocator) Len() int { return len(a) } +func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a PullListByLocator) Less(i, j int) bool { di, dj := a[i].Locator.Digest, a[j].Locator.Digest - return di.H < dj.H || (di.H == dj.H && di.L < dj.L) + if di.H < dj.H { + return true + } else if di.H == dj.H { + if di.L < dj.L { + return true + } else if di.L == dj.L { + return a[i].Locator.Size < a[j].Locator.Size + } + } + return false } +// PullServers struct // For a given under-replicated block, this structure represents which // servers should pull the specified block and which servers they can // pull it from. @@ -51,47 +60,50 @@ type PullServers struct { From []string // Servers that already contain the specified block } -// Creates a map from block locator to PullServers with one entry for -// each under-replicated block. +// ComputePullServers creates a map from block locator to PullServers +// with one entry for each under-replicated block. +// +// This method ignores zero-replica blocks since there are no servers +// to pull them from, so callers should feel free to omit them, but +// this function will ignore them if they are provided. func ComputePullServers(kc *keepclient.KeepClient, keepServerInfo *keep.ReadServers, - blockToDesiredReplication map[blockdigest.BlockDigest]int, + blockToDesiredReplication map[blockdigest.DigestWithSize]int, underReplicated BlockSet) (m map[Locator]PullServers) { m = map[Locator]PullServers{} - // We use CanonicalString to avoid filling memory with dupicate + // We use CanonicalString to avoid filling memory with duplicate // copies of the same string. var cs CanonicalString // Servers that are writeable writableServers := map[string]struct{}{} for _, url := range kc.WritableLocalRoots() { - writableServers[cs.Get(RemoveProtocolPrefix(url))] = struct{}{} + writableServers[cs.Get(url)] = struct{}{} } - for block, _ := range underReplicated { + for block := range underReplicated { serversStoringBlock := keepServerInfo.BlockToServers[block] numCopies := len(serversStoringBlock) numCopiesMissing := blockToDesiredReplication[block] - numCopies if numCopiesMissing > 0 { // We expect this to always be true, since the block was listed // in underReplicated. - // TODO(misha): Consider asserting the above conditional. if numCopies > 0 { - // I believe that we should expect this to always be true. + // Not much we can do with blocks with no copies. // A server's host-port string appears as a key in this map // iff it contains the block. serverHasBlock := map[string]struct{}{} for _, info := range serversStoringBlock { sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex] - serverHasBlock[cs.Get(sa.HostPort())] = struct{}{} + serverHasBlock[cs.Get(sa.URL())] = struct{}{} } roots := keepclient.NewRootSorter(kc.LocalRoots(), block.String()).GetSortedRoots() - l := Locator{Digest: block} + l := Locator(block) m[l] = CreatePullServers(cs, serverHasBlock, writableServers, roots, numCopiesMissing) } @@ -100,9 +112,9 @@ func ComputePullServers(kc *keepclient.KeepClient, return m } -// Creates a pull list in which the To and From fields preserve the -// ordering of sorted servers and the contents are all canonical -// strings. +// CreatePullServers creates a pull list in which the To and From +// fields preserve the ordering of sorted servers and the contents +// are all canonical strings. func CreatePullServers(cs CanonicalString, serverHasBlock map[string]struct{}, writableServers map[string]struct{}, @@ -117,12 +129,13 @@ func CreatePullServers(cs CanonicalString, for _, host := range sortedServers { // Strip the protocol portion of the url. // Use the canonical copy of the string to avoid memory waste. - server := cs.Get(RemoveProtocolPrefix(host)) + server := cs.Get(host) _, hasBlock := serverHasBlock[server] if hasBlock { - ps.From = append(ps.From, server) + // The from field should include the protocol. + ps.From = append(ps.From, cs.Get(host)) } else if len(ps.To) < maxToFields { - _, writable := writableServers[server] + _, writable := writableServers[host] if writable { ps.To = append(ps.To, server) } @@ -132,12 +145,12 @@ func CreatePullServers(cs CanonicalString, return } -// Strips the protocol prefix from a url. +// RemoveProtocolPrefix strips the protocol prefix from a url. func RemoveProtocolPrefix(url string) string { return url[(strings.LastIndex(url, "/") + 1):] } -// Produces a PullList for each keep server. +// BuildPullLists produces a PullList for each keep server. func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) { spl = map[string]PullList{} // We don't worry about canonicalizing our strings here, because we @@ -156,29 +169,47 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) { return } -// Writes each pull list to a file. +// WritePullLists writes each pull list to a file. // The filename is based on the hostname. // // This is just a hack for prototyping, it is not expected to be used // in production. func WritePullLists(arvLogger *logger.Logger, - pullLists map[string]PullList) { + pullLists map[string]PullList, + dryRun bool) error { r := strings.NewReplacer(":", ".") + for host, list := range pullLists { - filename := fmt.Sprintf("pull_list.%s", r.Replace(host)) + if arvLogger != nil { + // We need a local variable because Update doesn't call our mutator func until later, + // when our list variable might have been reused by the next loop iteration. + host := host + listLen := len(list) + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + pullListInfo := logger.GetOrCreateMap(p, "pull_list_len") + pullListInfo[host] = listLen + }) + } + + if dryRun { + log.Print("dry run, not sending pull list to service %s with %d blocks", host, len(list)) + continue + } + + filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host))) pullListFile, err := os.Create(filename) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Failed to open %s: %v", filename, err)) + return err } defer pullListFile.Close() enc := json.NewEncoder(pullListFile) err = enc.Encode(list) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Failed to write pull list to %s: %v", filename, err)) + return err } log.Printf("Wrote pull list to %s.", filename) } + + return nil }