1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
7 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
8 "git.curoverse.com/arvados.git/sdk/go/keepclient"
9 "git.curoverse.com/arvados.git/sdk/go/logger"
10 "git.curoverse.com/arvados.git/services/datamanager/keep"
11 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
18 Digest blockdigest.BlockDigest
19 // TODO(misha): Add size field to the Locator (and MarshalJSON() below)
22 func (l Locator) MarshalJSON() ([]byte, error) {
23 return []byte("\"" + l.Digest.String() + "\""), nil
26 // One entry in the Pull List
27 type PullListEntry struct {
28 Locator Locator `json:"locator"`
29 Servers []string `json:"servers"`
32 // The Pull List for a particular server
33 type PullList struct {
34 Entries []PullListEntry `json:"blocks"`
37 // EntriesByDigest implements sort.Interface for []PullListEntry
38 // based on the Digest.
39 type EntriesByDigest []PullListEntry
41 func (a EntriesByDigest) Len() int { return len(a) }
42 func (a EntriesByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
43 func (a EntriesByDigest) Less(i, j int) bool {
44 di, dj := a[i].Locator.Digest, a[j].Locator.Digest
45 return di.H < dj.H || (di.H == dj.H && di.L < dj.L)
48 // For a given under-replicated block, this structure represents which
49 // servers should pull the specified block and which servers they can
51 type PullServers struct {
52 To []string // Servers that should pull the specified block
53 From []string // Servers that already contain the specified block
56 // Creates a map from block locator to PullServers with one entry for
57 // each under-replicated block.
58 func ComputePullServers(kc *keepclient.KeepClient,
59 keepServerInfo *keep.ReadServers,
60 blockToDesiredReplication map[blockdigest.BlockDigest]int,
61 underReplicated BlockSet) (m map[Locator]PullServers) {
62 m = map[Locator]PullServers{}
63 // We use CanonicalString to avoid filling memory with dupicate
64 // copies of the same string.
65 var cs CanonicalString
67 for block, _ := range underReplicated {
68 serversStoringBlock := keepServerInfo.BlockToServers[block]
69 numCopies := len(serversStoringBlock)
70 numCopiesMissing := blockToDesiredReplication[block] - numCopies
71 if numCopiesMissing > 0 {
72 // We expect this to always be true, since the block was listed
73 // in underReplicated.
74 // TODO(misha): Consider asserting the above conditional.
77 // I believe that we should expect this to always be true.
79 // A server's host-port string appears as a key in this map
80 // iff it contains the block.
81 serverHasBlock := map[string]struct{}{}
82 for _, info := range serversStoringBlock {
83 sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
84 serverHasBlock[sa.HostPort()] = struct{}{}
87 roots := keepclient.NewRootSorter(kc.ServiceRoots(),
88 block.String()).GetSortedRoots()
90 l := Locator{Digest: block}
91 m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing)
98 // Creates a pull list in which the To and From fields preserve the
99 // ordering of sorted servers and the contents are all canonical
101 func CreatePullServers(cs CanonicalString,
102 serverHasBlock map[string]struct{},
103 sortedServers []string,
104 maxToFields int) (ps PullServers) {
107 To: make([]string, 0, maxToFields),
108 From: make([]string, 0, len(serverHasBlock)),
111 for _, host := range sortedServers {
112 // Strip the protocol portion of the url.
113 // Use the canonical copy of the string to avoid memory waste.
114 server := cs.Get(RemoveProtocolPrefix(host))
115 _, hasBlock := serverHasBlock[server]
117 ps.From = append(ps.From, server)
118 } else if len(ps.To) < maxToFields {
119 ps.To = append(ps.To, server)
126 // Strips the protocol prefix from a url.
127 func RemoveProtocolPrefix(url string) string {
128 return url[(strings.LastIndex(url, "/") + 1):]
131 // Produces a PullList for each keep server.
132 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
133 spl = map[string]PullList{}
134 // We don't worry about canonicalizing our strings here, because we
135 // assume lps was created by ComputePullServers() which already
136 // canonicalized the strings for us.
137 for locator, pullServers := range lps {
138 for _, destination := range pullServers.To {
139 pullList, pullListExists := spl[destination]
141 pullList = PullList{Entries: []PullListEntry{}}
142 spl[destination] = pullList
144 pullList.Entries = append(pullList.Entries,
145 PullListEntry{Locator: locator, Servers: pullServers.From})
146 spl[destination] = pullList
152 // Writes each pull list to a file.
153 // The filename is based on the hostname.
155 // This is just a hack for prototyping, it is not expected to be used
157 func WritePullLists(arvLogger *logger.Logger,
158 pullLists map[string]PullList) {
159 r := strings.NewReplacer(":", ".")
160 for host, list := range pullLists {
161 filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
162 pullListFile, err := os.Create(filename)
164 loggerutil.FatalWithMessage(arvLogger,
165 fmt.Sprintf("Failed to open %s: %v", filename, err))
167 defer pullListFile.Close()
169 enc := json.NewEncoder(pullListFile)
170 err = enc.Encode(list)
172 loggerutil.FatalWithMessage(arvLogger,
173 fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
175 log.Printf("Wrote pull list to %s.", filename)