1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
7 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "git.curoverse.com/arvados.git/sdk/go/logger"
10 "git.curoverse.com/arvados.git/services/datamanager/keep"
11 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
18 Digest blockdigest.BlockDigest
19 // TODO(misha): Add size field to the Locator (and MarshalJSON() below)
22 func (l Locator) MarshalJSON() ([]byte, error) {
23 return []byte("\"" + l.Digest.String() + "\""), nil
26 // One entry in the Pull List
27 type PullRequest struct {
28 Locator Locator `json:"locator"`
29 Servers []string `json:"servers"`
32 // The Pull List for a particular server
33 type PullList []PullRequest
35 // PullListByDigest implements sort.Interface for PullList based on
37 type PullListByDigest PullList
39 func (a PullListByDigest) Len() int { return len(a) }
40 func (a PullListByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
41 func (a PullListByDigest) Less(i, j int) bool {
42 di, dj := a[i].Locator.Digest, a[j].Locator.Digest
43 return di.H < dj.H || (di.H == dj.H && di.L < dj.L)
46 // For a given under-replicated block, this structure represents which
47 // servers should pull the specified block and which servers they can
49 type PullServers struct {
50 To []string // Servers that should pull the specified block
51 From []string // Servers that already contain the specified block
54 // Creates a map from block locator to PullServers with one entry for
55 // each under-replicated block.
56 func ComputePullServers(kc *keepclient.KeepClient,
57 keepServerInfo *keep.ReadServers,
58 blockToDesiredReplication map[blockdigest.BlockDigest]int,
59 underReplicated BlockSet) (m map[Locator]PullServers) {
60 m = map[Locator]PullServers{}
61 // We use CanonicalString to avoid filling memory with dupicate
62 // copies of the same string.
63 var cs CanonicalString
65 for block, _ := range underReplicated {
66 serversStoringBlock := keepServerInfo.BlockToServers[block]
67 numCopies := len(serversStoringBlock)
68 numCopiesMissing := blockToDesiredReplication[block] - numCopies
69 if numCopiesMissing > 0 {
70 // We expect this to always be true, since the block was listed
71 // in underReplicated.
72 // TODO(misha): Consider asserting the above conditional.
75 // I believe that we should expect this to always be true.
77 // A server's host-port string appears as a key in this map
78 // iff it contains the block.
79 serverHasBlock := map[string]struct{}{}
80 for _, info := range serversStoringBlock {
81 sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
82 serverHasBlock[sa.HostPort()] = struct{}{}
85 roots := keepclient.NewRootSorter(kc.LocalRoots(),
86 block.String()).GetSortedRoots()
88 l := Locator{Digest: block}
89 m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing)
96 // Creates a pull list in which the To and From fields preserve the
97 // ordering of sorted servers and the contents are all canonical
99 func CreatePullServers(cs CanonicalString,
100 serverHasBlock map[string]struct{},
101 sortedServers []string,
102 maxToFields int) (ps PullServers) {
105 To: make([]string, 0, maxToFields),
106 From: make([]string, 0, len(serverHasBlock)),
109 for _, host := range sortedServers {
110 // Strip the protocol portion of the url.
111 // Use the canonical copy of the string to avoid memory waste.
112 server := cs.Get(RemoveProtocolPrefix(host))
113 _, hasBlock := serverHasBlock[server]
115 ps.From = append(ps.From, server)
116 } else if len(ps.To) < maxToFields {
117 ps.To = append(ps.To, server)
124 // Strips the protocol prefix from a url.
125 func RemoveProtocolPrefix(url string) string {
126 return url[(strings.LastIndex(url, "/") + 1):]
129 // Produces a PullList for each keep server.
130 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
131 spl = map[string]PullList{}
132 // We don't worry about canonicalizing our strings here, because we
133 // assume lps was created by ComputePullServers() which already
134 // canonicalized the strings for us.
135 for locator, pullServers := range lps {
136 for _, destination := range pullServers.To {
137 pullList, pullListExists := spl[destination]
139 pullList = PullList{}
140 spl[destination] = pullList
142 pullList = append(pullList,
143 PullRequest{Locator: locator, Servers: pullServers.From})
144 spl[destination] = pullList
150 // Writes each pull list to a file.
151 // The filename is based on the hostname.
153 // This is just a hack for prototyping, it is not expected to be used
155 func WritePullLists(arvLogger *logger.Logger,
156 pullLists map[string]PullList) {
157 r := strings.NewReplacer(":", ".")
158 for host, list := range pullLists {
159 filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
160 pullListFile, err := os.Create(filename)
162 loggerutil.FatalWithMessage(arvLogger,
163 fmt.Sprintf("Failed to open %s: %v", filename, err))
165 defer pullListFile.Close()
167 enc := json.NewEncoder(pullListFile)
168 err = enc.Encode(list)
170 loggerutil.FatalWithMessage(arvLogger,
171 fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
173 log.Printf("Wrote pull list to %s.", filename)