+++ /dev/null
-// Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
-package summary
-
-import (
- "encoding/json"
- "fmt"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/logger"
- "git.curoverse.com/arvados.git/services/datamanager/keep"
- "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
- "log"
- "os"
- "strings"
-)
-
-type Locator blockdigest.DigestWithSize
-
-func (l Locator) MarshalJSON() ([]byte, error) {
- return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
-}
-
-// One entry in the Pull List
-type PullRequest struct {
- Locator Locator `json:"locator"`
- Servers []string `json:"servers"`
-}
-
-// The Pull List for a particular server
-type PullList []PullRequest
-
-// PullListByLocator implements sort.Interface for PullList based on
-// the Digest.
-type PullListByLocator PullList
-
-func (a PullListByLocator) Len() int { return len(a) }
-func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
-func (a PullListByLocator) Less(i, j int) bool {
- di, dj := a[i].Locator.Digest, a[j].Locator.Digest
- if di.H < dj.H {
- return true
- } else if di.H == dj.H {
- if di.L < dj.L {
- return true
- } else if di.L == dj.L {
- return a[i].Locator.Size < a[j].Locator.Size
- }
- }
- return false
-}
-
-// For a given under-replicated block, this structure represents which
-// servers should pull the specified block and which servers they can
-// pull it from.
-type PullServers struct {
- To []string // Servers that should pull the specified block
- From []string // Servers that already contain the specified block
-}
-
-// Creates a map from block locator to PullServers with one entry for
-// each under-replicated block.
-//
-// This method ignores zero-replica blocks since there are no servers
-// to pull them from, so callers should feel free to omit them, but
-// this function will ignore them if they are provided.
-func ComputePullServers(kc *keepclient.KeepClient,
- keepServerInfo *keep.ReadServers,
- blockToDesiredReplication map[blockdigest.DigestWithSize]int,
- underReplicated BlockSet) (m map[Locator]PullServers) {
- m = map[Locator]PullServers{}
- // We use CanonicalString to avoid filling memory with dupicate
- // copies of the same string.
- var cs CanonicalString
-
- // Servers that are writeable
- writableServers := map[string]struct{}{}
- for _, url := range kc.WritableLocalRoots() {
- writableServers[cs.Get(url)] = struct{}{}
- }
-
- for block, _ := range underReplicated {
- serversStoringBlock := keepServerInfo.BlockToServers[block]
- numCopies := len(serversStoringBlock)
- numCopiesMissing := blockToDesiredReplication[block] - numCopies
- if numCopiesMissing > 0 {
- // We expect this to always be true, since the block was listed
- // in underReplicated.
-
- if numCopies > 0 {
- // Not much we can do with blocks with no copies.
-
- // A server's host-port string appears as a key in this map
- // iff it contains the block.
- serverHasBlock := map[string]struct{}{}
- for _, info := range serversStoringBlock {
- sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
- serverHasBlock[cs.Get(sa.HostPort())] = struct{}{}
- }
-
- roots := keepclient.NewRootSorter(kc.LocalRoots(),
- block.String()).GetSortedRoots()
-
- l := Locator(block)
- m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
- roots, numCopiesMissing)
- }
- }
- }
- return m
-}
-
-// Creates a pull list in which the To and From fields preserve the
-// ordering of sorted servers and the contents are all canonical
-// strings.
-func CreatePullServers(cs CanonicalString,
- serverHasBlock map[string]struct{},
- writableServers map[string]struct{},
- sortedServers []string,
- maxToFields int) (ps PullServers) {
-
- ps = PullServers{
- To: make([]string, 0, maxToFields),
- From: make([]string, 0, len(serverHasBlock)),
- }
-
- for _, host := range sortedServers {
- // Strip the protocol portion of the url.
- // Use the canonical copy of the string to avoid memory waste.
- server := cs.Get(RemoveProtocolPrefix(host))
- _, hasBlock := serverHasBlock[server]
- if hasBlock {
- // The from field should include the protocol.
- ps.From = append(ps.From, cs.Get(host))
- } else if len(ps.To) < maxToFields {
- _, writable := writableServers[host]
- if writable {
- ps.To = append(ps.To, server)
- }
- }
- }
-
- return
-}
-
-// Strips the protocol prefix from a url.
-func RemoveProtocolPrefix(url string) string {
- return url[(strings.LastIndex(url, "/") + 1):]
-}
-
-// Produces a PullList for each keep server.
-func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
- spl = map[string]PullList{}
- // We don't worry about canonicalizing our strings here, because we
- // assume lps was created by ComputePullServers() which already
- // canonicalized the strings for us.
- for locator, pullServers := range lps {
- for _, destination := range pullServers.To {
- pullList, pullListExists := spl[destination]
- if !pullListExists {
- pullList = PullList{}
- }
- spl[destination] = append(pullList,
- PullRequest{Locator: locator, Servers: pullServers.From})
- }
- }
- return
-}
-
-// Writes each pull list to a file.
-// The filename is based on the hostname.
-//
-// This is just a hack for prototyping, it is not expected to be used
-// in production.
-func WritePullLists(arvLogger *logger.Logger,
- pullLists map[string]PullList) {
- r := strings.NewReplacer(":", ".")
- for host, list := range pullLists {
- filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
- pullListFile, err := os.Create(filename)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to open %s: %v", filename, err))
- }
- defer pullListFile.Close()
-
- enc := json.NewEncoder(pullListFile)
- err = enc.Encode(list)
- if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
- }
- log.Printf("Wrote pull list to %s.", filename)
- }
-}