Merge branch '6429-crunch2-api' closes #6429
[arvados.git] / services / datamanager / summary / pull_list.go
1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
2
3 package summary
4
5 import (
6         "encoding/json"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/services/datamanager/keep"
12         "log"
13         "os"
14         "strings"
15 )
16
17 // Locator is a block digest
18 type Locator blockdigest.DigestWithSize
19
20 // MarshalJSON encoding
21 func (l Locator) MarshalJSON() ([]byte, error) {
22         return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
23 }
24
25 // PullRequest represents one entry in the Pull List
26 type PullRequest struct {
27         Locator Locator  `json:"locator"`
28         Servers []string `json:"servers"`
29 }
30
31 // PullList for a particular server
32 type PullList []PullRequest
33
34 // PullListByLocator implements sort.Interface for PullList based on
35 // the Digest.
36 type PullListByLocator PullList
37
38 func (a PullListByLocator) Len() int      { return len(a) }
39 func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
40 func (a PullListByLocator) Less(i, j int) bool {
41         di, dj := a[i].Locator.Digest, a[j].Locator.Digest
42         if di.H < dj.H {
43                 return true
44         } else if di.H == dj.H {
45                 if di.L < dj.L {
46                         return true
47                 } else if di.L == dj.L {
48                         return a[i].Locator.Size < a[j].Locator.Size
49                 }
50         }
51         return false
52 }
53
54 // PullServers struct
55 // For a given under-replicated block, this structure represents which
56 // servers should pull the specified block and which servers they can
57 // pull it from.
58 type PullServers struct {
59         To   []string // Servers that should pull the specified block
60         From []string // Servers that already contain the specified block
61 }
62
63 // ComputePullServers creates a map from block locator to PullServers
64 // with one entry for each under-replicated block.
65 //
66 // This method ignores zero-replica blocks since there are no servers
67 // to pull them from, so callers should feel free to omit them, but
68 // this function will ignore them if they are provided.
69 func ComputePullServers(kc *keepclient.KeepClient,
70         keepServerInfo *keep.ReadServers,
71         blockToDesiredReplication map[blockdigest.DigestWithSize]int,
72         underReplicated BlockSet) (m map[Locator]PullServers) {
73         m = map[Locator]PullServers{}
74         // We use CanonicalString to avoid filling memory with dupicate
75         // copies of the same string.
76         var cs CanonicalString
77
78         // Servers that are writeable
79         writableServers := map[string]struct{}{}
80         for _, url := range kc.WritableLocalRoots() {
81                 writableServers[cs.Get(url)] = struct{}{}
82         }
83
84         for block := range underReplicated {
85                 serversStoringBlock := keepServerInfo.BlockToServers[block]
86                 numCopies := len(serversStoringBlock)
87                 numCopiesMissing := blockToDesiredReplication[block] - numCopies
88                 if numCopiesMissing > 0 {
89                         // We expect this to always be true, since the block was listed
90                         // in underReplicated.
91
92                         if numCopies > 0 {
93                                 // Not much we can do with blocks with no copies.
94
95                                 // A server's host-port string appears as a key in this map
96                                 // iff it contains the block.
97                                 serverHasBlock := map[string]struct{}{}
98                                 for _, info := range serversStoringBlock {
99                                         sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
100                                         serverHasBlock[cs.Get(sa.URL())] = struct{}{}
101                                 }
102
103                                 roots := keepclient.NewRootSorter(kc.LocalRoots(),
104                                         block.String()).GetSortedRoots()
105
106                                 l := Locator(block)
107                                 m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
108                                         roots, numCopiesMissing)
109                         }
110                 }
111         }
112         return m
113 }
114
115 // CreatePullServers creates a pull list in which the To and From
116 // fields preserve the ordering of sorted servers and the contents
117 // are all canonical strings.
118 func CreatePullServers(cs CanonicalString,
119         serverHasBlock map[string]struct{},
120         writableServers map[string]struct{},
121         sortedServers []string,
122         maxToFields int) (ps PullServers) {
123
124         ps = PullServers{
125                 To:   make([]string, 0, maxToFields),
126                 From: make([]string, 0, len(serverHasBlock)),
127         }
128
129         for _, host := range sortedServers {
130                 // Strip the protocol portion of the url.
131                 // Use the canonical copy of the string to avoid memory waste.
132                 server := cs.Get(host)
133                 _, hasBlock := serverHasBlock[server]
134                 if hasBlock {
135                         // The from field should include the protocol.
136                         ps.From = append(ps.From, cs.Get(host))
137                 } else if len(ps.To) < maxToFields {
138                         _, writable := writableServers[host]
139                         if writable {
140                                 ps.To = append(ps.To, server)
141                         }
142                 }
143         }
144
145         return
146 }
147
148 // RemoveProtocolPrefix strips the protocol prefix from a url.
149 func RemoveProtocolPrefix(url string) string {
150         return url[(strings.LastIndex(url, "/") + 1):]
151 }
152
153 // BuildPullLists produces a PullList for each keep server.
154 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
155         spl = map[string]PullList{}
156         // We don't worry about canonicalizing our strings here, because we
157         // assume lps was created by ComputePullServers() which already
158         // canonicalized the strings for us.
159         for locator, pullServers := range lps {
160                 for _, destination := range pullServers.To {
161                         pullList, pullListExists := spl[destination]
162                         if !pullListExists {
163                                 pullList = PullList{}
164                         }
165                         spl[destination] = append(pullList,
166                                 PullRequest{Locator: locator, Servers: pullServers.From})
167                 }
168         }
169         return
170 }
171
172 // WritePullLists writes each pull list to a file.
173 // The filename is based on the hostname.
174 //
175 // This is just a hack for prototyping, it is not expected to be used
176 // in production.
177 func WritePullLists(arvLogger *logger.Logger,
178         pullLists map[string]PullList,
179         dryRun bool) error {
180         r := strings.NewReplacer(":", ".")
181
182         for host, list := range pullLists {
183                 if arvLogger != nil {
184                         // We need a local variable because Update doesn't call our mutator func until later,
185                         // when our list variable might have been reused by the next loop iteration.
186                         host := host
187                         listLen := len(list)
188                         arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
189                                 pullListInfo := logger.GetOrCreateMap(p, "pull_list_len")
190                                 pullListInfo[host] = listLen
191                         })
192                 }
193
194                 if dryRun {
195                         log.Print("dry run, not sending pull list to service %s with %d blocks", host, len(list))
196                         continue
197                 }
198
199                 filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
200                 pullListFile, err := os.Create(filename)
201                 if err != nil {
202                         return err
203                 }
204                 defer pullListFile.Close()
205
206                 enc := json.NewEncoder(pullListFile)
207                 err = enc.Encode(list)
208                 if err != nil {
209                         return err
210                 }
211                 log.Printf("Wrote pull list to %s.", filename)
212         }
213
214         return nil
215 }