1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
7 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "git.curoverse.com/arvados.git/sdk/go/logger"
10 "git.curoverse.com/arvados.git/services/datamanager/keep"
11 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
17 type Locator blockdigest.DigestWithSize
19 func (l Locator) MarshalJSON() ([]byte, error) {
20 return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
23 // One entry in the Pull List
24 type PullRequest struct {
25 Locator Locator `json:"locator"`
26 Servers []string `json:"servers"`
29 // The Pull List for a particular server
30 type PullList []PullRequest
32 // PullListByLocator implements sort.Interface for PullList based on
34 type PullListByLocator PullList
36 func (a PullListByLocator) Len() int { return len(a) }
37 func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
38 func (a PullListByLocator) Less(i, j int) bool {
39 di, dj := a[i].Locator.Digest, a[j].Locator.Digest
42 } else if di.H == dj.H {
45 } else if di.L == dj.L {
46 return a[i].Locator.Size < a[j].Locator.Size
52 // For a given under-replicated block, this structure represents which
53 // servers should pull the specified block and which servers they can
55 type PullServers struct {
56 To []string // Servers that should pull the specified block
57 From []string // Servers that already contain the specified block
60 // Creates a map from block locator to PullServers with one entry for
61 // each under-replicated block.
63 // This method ignores zero-replica blocks since there are no servers
64 // to pull them from, so callers should feel free to omit them, but
65 // this function will ignore them if they are provided.
66 func ComputePullServers(kc *keepclient.KeepClient,
67 keepServerInfo *keep.ReadServers,
68 blockToDesiredReplication map[blockdigest.DigestWithSize]int,
69 underReplicated BlockSet) (m map[Locator]PullServers) {
70 m = map[Locator]PullServers{}
71 // We use CanonicalString to avoid filling memory with dupicate
72 // copies of the same string.
73 var cs CanonicalString
75 // Servers that are writeable
76 writableServers := map[string]struct{}{}
77 for _, url := range kc.WritableLocalRoots() {
78 writableServers[cs.Get(url)] = struct{}{}
81 for block, _ := range underReplicated {
82 serversStoringBlock := keepServerInfo.BlockToServers[block]
83 numCopies := len(serversStoringBlock)
84 numCopiesMissing := blockToDesiredReplication[block] - numCopies
85 if numCopiesMissing > 0 {
86 // We expect this to always be true, since the block was listed
87 // in underReplicated.
90 // Not much we can do with blocks with no copies.
92 // A server's host-port string appears as a key in this map
93 // iff it contains the block.
94 serverHasBlock := map[string]struct{}{}
95 for _, info := range serversStoringBlock {
96 sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
97 serverHasBlock[cs.Get(sa.URL())] = struct{}{}
100 roots := keepclient.NewRootSorter(kc.LocalRoots(),
101 block.String()).GetSortedRoots()
104 m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
105 roots, numCopiesMissing)
112 // Creates a pull list in which the To and From fields preserve the
113 // ordering of sorted servers and the contents are all canonical
115 func CreatePullServers(cs CanonicalString,
116 serverHasBlock map[string]struct{},
117 writableServers map[string]struct{},
118 sortedServers []string,
119 maxToFields int) (ps PullServers) {
122 To: make([]string, 0, maxToFields),
123 From: make([]string, 0, len(serverHasBlock)),
126 for _, host := range sortedServers {
127 // Strip the protocol portion of the url.
128 // Use the canonical copy of the string to avoid memory waste.
129 server := cs.Get(host)
130 _, hasBlock := serverHasBlock[server]
132 // The from field should include the protocol.
133 ps.From = append(ps.From, cs.Get(host))
134 } else if len(ps.To) < maxToFields {
135 _, writable := writableServers[host]
137 ps.To = append(ps.To, server)
145 // Strips the protocol prefix from a url.
146 func RemoveProtocolPrefix(url string) string {
147 return url[(strings.LastIndex(url, "/") + 1):]
150 // Produces a PullList for each keep server.
151 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
152 spl = map[string]PullList{}
153 // We don't worry about canonicalizing our strings here, because we
154 // assume lps was created by ComputePullServers() which already
155 // canonicalized the strings for us.
156 for locator, pullServers := range lps {
157 for _, destination := range pullServers.To {
158 pullList, pullListExists := spl[destination]
160 pullList = PullList{}
162 spl[destination] = append(pullList,
163 PullRequest{Locator: locator, Servers: pullServers.From})
169 // Writes each pull list to a file.
170 // The filename is based on the hostname.
172 // This is just a hack for prototyping, it is not expected to be used
174 func WritePullLists(arvLogger *logger.Logger,
175 pullLists map[string]PullList) {
176 r := strings.NewReplacer(":", ".")
177 for host, list := range pullLists {
178 filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
179 pullListFile, err := os.Create(filename)
181 loggerutil.FatalWithMessage(arvLogger,
182 fmt.Sprintf("Failed to open %s: %v", filename, err))
184 defer pullListFile.Close()
186 enc := json.NewEncoder(pullListFile)
187 err = enc.Encode(list)
189 loggerutil.FatalWithMessage(arvLogger,
190 fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
192 log.Printf("Wrote pull list to %s.", filename)