1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
8 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9 "git.curoverse.com/arvados.git/sdk/go/keepclient"
10 "git.curoverse.com/arvados.git/sdk/go/logger"
11 "git.curoverse.com/arvados.git/services/datamanager/keep"
12 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
18 // Locator is a block digest
19 type Locator blockdigest.DigestWithSize
21 // MarshalJSON encoding
22 func (l Locator) MarshalJSON() ([]byte, error) {
23 return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
26 // PullRequest represents one entry in the Pull List
27 type PullRequest struct {
28 Locator Locator `json:"locator"`
29 Servers []string `json:"servers"`
32 // PullList for a particular server
33 type PullList []PullRequest
35 // PullListByLocator implements sort.Interface for PullList based on
37 type PullListByLocator PullList
39 func (a PullListByLocator) Len() int { return len(a) }
40 func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
41 func (a PullListByLocator) Less(i, j int) bool {
42 di, dj := a[i].Locator.Digest, a[j].Locator.Digest
45 } else if di.H == dj.H {
48 } else if di.L == dj.L {
49 return a[i].Locator.Size < a[j].Locator.Size
56 // For a given under-replicated block, this structure represents which
57 // servers should pull the specified block and which servers they can
59 type PullServers struct {
60 To []string // Servers that should pull the specified block
61 From []string // Servers that already contain the specified block
64 // ComputePullServers creates a map from block locator to PullServers
65 // with one entry for each under-replicated block.
67 // This method ignores zero-replica blocks since there are no servers
68 // to pull them from, so callers should feel free to omit them, but
69 // this function will ignore them if they are provided.
70 func ComputePullServers(kc *keepclient.KeepClient,
71 keepServerInfo *keep.ReadServers,
72 blockToDesiredReplication map[blockdigest.DigestWithSize]int,
73 underReplicated BlockSet) (m map[Locator]PullServers) {
74 m = map[Locator]PullServers{}
75 // We use CanonicalString to avoid filling memory with dupicate
76 // copies of the same string.
77 var cs CanonicalString
79 // Servers that are writeable
80 writableServers := map[string]struct{}{}
81 for _, url := range kc.WritableLocalRoots() {
82 writableServers[cs.Get(url)] = struct{}{}
85 for block := range underReplicated {
86 serversStoringBlock := keepServerInfo.BlockToServers[block]
87 numCopies := len(serversStoringBlock)
88 numCopiesMissing := blockToDesiredReplication[block] - numCopies
89 if numCopiesMissing > 0 {
90 // We expect this to always be true, since the block was listed
91 // in underReplicated.
94 // Not much we can do with blocks with no copies.
96 // A server's host-port string appears as a key in this map
97 // iff it contains the block.
98 serverHasBlock := map[string]struct{}{}
99 for _, info := range serversStoringBlock {
100 sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
101 serverHasBlock[cs.Get(sa.URL())] = struct{}{}
104 roots := keepclient.NewRootSorter(kc.LocalRoots(),
105 block.String()).GetSortedRoots()
108 m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
109 roots, numCopiesMissing)
116 // CreatePullServers creates a pull list in which the To and From
117 // fields preserve the ordering of sorted servers and the contents
118 // are all canonical strings.
119 func CreatePullServers(cs CanonicalString,
120 serverHasBlock map[string]struct{},
121 writableServers map[string]struct{},
122 sortedServers []string,
123 maxToFields int) (ps PullServers) {
126 To: make([]string, 0, maxToFields),
127 From: make([]string, 0, len(serverHasBlock)),
130 for _, host := range sortedServers {
131 // Strip the protocol portion of the url.
132 // Use the canonical copy of the string to avoid memory waste.
133 server := cs.Get(host)
134 _, hasBlock := serverHasBlock[server]
136 // The from field should include the protocol.
137 ps.From = append(ps.From, cs.Get(host))
138 } else if len(ps.To) < maxToFields {
139 _, writable := writableServers[host]
141 ps.To = append(ps.To, server)
149 // RemoveProtocolPrefix strips the protocol prefix from a url.
150 func RemoveProtocolPrefix(url string) string {
151 return url[(strings.LastIndex(url, "/") + 1):]
154 // BuildPullLists produces a PullList for each keep server.
155 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
156 spl = map[string]PullList{}
157 // We don't worry about canonicalizing our strings here, because we
158 // assume lps was created by ComputePullServers() which already
159 // canonicalized the strings for us.
160 for locator, pullServers := range lps {
161 for _, destination := range pullServers.To {
162 pullList, pullListExists := spl[destination]
164 pullList = PullList{}
166 spl[destination] = append(pullList,
167 PullRequest{Locator: locator, Servers: pullServers.From})
173 // WritePullLists writes each pull list to a file.
174 // The filename is based on the hostname.
176 // This is just a hack for prototyping, it is not expected to be used
178 func WritePullLists(arvLogger *logger.Logger,
179 pullLists map[string]PullList) {
180 r := strings.NewReplacer(":", ".")
181 for host, list := range pullLists {
182 filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
183 pullListFile, err := os.Create(filename)
185 loggerutil.FatalWithMessage(arvLogger,
186 fmt.Sprintf("Failed to open %s: %v", filename, err))
188 defer pullListFile.Close()
190 enc := json.NewEncoder(pullListFile)
191 err = enc.Encode(list)
193 loggerutil.FatalWithMessage(arvLogger,
194 fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
196 log.Printf("Wrote pull list to %s.", filename)