1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
8 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/services/datamanager/keep"
17 // Locator is a block digest
18 type Locator blockdigest.DigestWithSize
20 // MarshalJSON encoding
21 func (l Locator) MarshalJSON() ([]byte, error) {
22 return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
25 // PullRequest represents one entry in the Pull List
26 type PullRequest struct {
27 Locator Locator `json:"locator"`
28 Servers []string `json:"servers"`
31 // PullList for a particular server
32 type PullList []PullRequest
34 // PullListByLocator implements sort.Interface for PullList based on
36 type PullListByLocator PullList
38 func (a PullListByLocator) Len() int { return len(a) }
39 func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
40 func (a PullListByLocator) Less(i, j int) bool {
41 di, dj := a[i].Locator.Digest, a[j].Locator.Digest
44 } else if di.H == dj.H {
47 } else if di.L == dj.L {
48 return a[i].Locator.Size < a[j].Locator.Size
55 // For a given under-replicated block, this structure represents which
56 // servers should pull the specified block and which servers they can
58 type PullServers struct {
59 To []string // Servers that should pull the specified block
60 From []string // Servers that already contain the specified block
63 // ComputePullServers creates a map from block locator to PullServers
64 // with one entry for each under-replicated block.
66 // This method ignores zero-replica blocks since there are no servers
67 // to pull them from, so callers should feel free to omit them, but
68 // this function will ignore them if they are provided.
69 func ComputePullServers(kc *keepclient.KeepClient,
70 keepServerInfo *keep.ReadServers,
71 blockToDesiredReplication map[blockdigest.DigestWithSize]int,
72 underReplicated BlockSet) (m map[Locator]PullServers) {
73 m = map[Locator]PullServers{}
74 // We use CanonicalString to avoid filling memory with duplicate
75 // copies of the same string.
76 var cs CanonicalString
78 // Servers that are writeable
79 writableServers := map[string]struct{}{}
80 for _, url := range kc.WritableLocalRoots() {
81 writableServers[cs.Get(url)] = struct{}{}
84 for block := range underReplicated {
85 serversStoringBlock := keepServerInfo.BlockToServers[block]
86 numCopies := len(serversStoringBlock)
87 numCopiesMissing := blockToDesiredReplication[block] - numCopies
88 if numCopiesMissing > 0 {
89 // We expect this to always be true, since the block was listed
90 // in underReplicated.
93 // Not much we can do with blocks with no copies.
95 // A server's host-port string appears as a key in this map
96 // iff it contains the block.
97 serverHasBlock := map[string]struct{}{}
98 for _, info := range serversStoringBlock {
99 sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
100 serverHasBlock[cs.Get(sa.URL())] = struct{}{}
103 roots := keepclient.NewRootSorter(kc.LocalRoots(),
104 block.String()).GetSortedRoots()
107 m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
108 roots, numCopiesMissing)
115 // CreatePullServers creates a pull list in which the To and From
116 // fields preserve the ordering of sorted servers and the contents
117 // are all canonical strings.
118 func CreatePullServers(cs CanonicalString,
119 serverHasBlock map[string]struct{},
120 writableServers map[string]struct{},
121 sortedServers []string,
122 maxToFields int) (ps PullServers) {
125 To: make([]string, 0, maxToFields),
126 From: make([]string, 0, len(serverHasBlock)),
129 for _, host := range sortedServers {
130 // Strip the protocol portion of the url.
131 // Use the canonical copy of the string to avoid memory waste.
132 server := cs.Get(host)
133 _, hasBlock := serverHasBlock[server]
135 // The from field should include the protocol.
136 ps.From = append(ps.From, cs.Get(host))
137 } else if len(ps.To) < maxToFields {
138 _, writable := writableServers[host]
140 ps.To = append(ps.To, server)
148 // RemoveProtocolPrefix strips the protocol prefix from a url.
149 func RemoveProtocolPrefix(url string) string {
150 return url[(strings.LastIndex(url, "/") + 1):]
153 // BuildPullLists produces a PullList for each keep server.
154 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
155 spl = map[string]PullList{}
156 // We don't worry about canonicalizing our strings here, because we
157 // assume lps was created by ComputePullServers() which already
158 // canonicalized the strings for us.
159 for locator, pullServers := range lps {
160 for _, destination := range pullServers.To {
161 pullList, pullListExists := spl[destination]
163 pullList = PullList{}
165 spl[destination] = append(pullList,
166 PullRequest{Locator: locator, Servers: pullServers.From})
172 // WritePullLists writes each pull list to a file.
173 // The filename is based on the hostname.
175 // This is just a hack for prototyping, it is not expected to be used
177 func WritePullLists(arvLogger *logger.Logger,
178 pullLists map[string]PullList,
180 r := strings.NewReplacer(":", ".")
182 for host, list := range pullLists {
183 if arvLogger != nil {
184 // We need a local variable because Update doesn't call our mutator func until later,
185 // when our list variable might have been reused by the next loop iteration.
188 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
189 pullListInfo := logger.GetOrCreateMap(p, "pull_list_len")
190 pullListInfo[host] = listLen
195 log.Print("dry run, not sending pull list to service %s with %d blocks", host, len(list))
199 filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
200 pullListFile, err := os.Create(filename)
204 defer pullListFile.Close()
206 enc := json.NewEncoder(pullListFile)
207 err = enc.Encode(list)
211 log.Printf("Wrote pull list to %s.", filename)