6260: most golint suggestions are addressed.
[arvados.git] / services / datamanager / summary / pull_list.go
1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
2
3 package summary
4
5 import (
6         "encoding/json"
7         "fmt"
8         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
9         "git.curoverse.com/arvados.git/sdk/go/keepclient"
10         "git.curoverse.com/arvados.git/sdk/go/logger"
11         "git.curoverse.com/arvados.git/services/datamanager/keep"
12         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
13         "log"
14         "os"
15         "strings"
16 )
17
18 // Locator is a block digest
19 type Locator blockdigest.DigestWithSize
20
21 // MarshalJSON encoding
22 func (l Locator) MarshalJSON() ([]byte, error) {
23         return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
24 }
25
26 // PullRequest represents one entry in the Pull List
27 type PullRequest struct {
28         Locator Locator  `json:"locator"`
29         Servers []string `json:"servers"`
30 }
31
32 // PullList for a particular server
33 type PullList []PullRequest
34
35 // PullListByLocator implements sort.Interface for PullList based on
36 // the Digest.
37 type PullListByLocator PullList
38
39 func (a PullListByLocator) Len() int      { return len(a) }
40 func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
41 func (a PullListByLocator) Less(i, j int) bool {
42         di, dj := a[i].Locator.Digest, a[j].Locator.Digest
43         if di.H < dj.H {
44                 return true
45         } else if di.H == dj.H {
46                 if di.L < dj.L {
47                         return true
48                 } else if di.L == dj.L {
49                         return a[i].Locator.Size < a[j].Locator.Size
50                 }
51         }
52         return false
53 }
54
55 // PullServers struct
56 // For a given under-replicated block, this structure represents which
57 // servers should pull the specified block and which servers they can
58 // pull it from.
59 type PullServers struct {
60         To   []string // Servers that should pull the specified block
61         From []string // Servers that already contain the specified block
62 }
63
64 // ComputePullServers creates a map from block locator to PullServers
65 // with one entry for each under-replicated block.
66 //
67 // This method ignores zero-replica blocks since there are no servers
68 // to pull them from, so callers should feel free to omit them, but
69 // this function will ignore them if they are provided.
70 func ComputePullServers(kc *keepclient.KeepClient,
71         keepServerInfo *keep.ReadServers,
72         blockToDesiredReplication map[blockdigest.DigestWithSize]int,
73         underReplicated BlockSet) (m map[Locator]PullServers) {
74         m = map[Locator]PullServers{}
75         // We use CanonicalString to avoid filling memory with dupicate
76         // copies of the same string.
77         var cs CanonicalString
78
79         // Servers that are writeable
80         writableServers := map[string]struct{}{}
81         for _, url := range kc.WritableLocalRoots() {
82                 writableServers[cs.Get(url)] = struct{}{}
83         }
84
85         for block := range underReplicated {
86                 serversStoringBlock := keepServerInfo.BlockToServers[block]
87                 numCopies := len(serversStoringBlock)
88                 numCopiesMissing := blockToDesiredReplication[block] - numCopies
89                 if numCopiesMissing > 0 {
90                         // We expect this to always be true, since the block was listed
91                         // in underReplicated.
92
93                         if numCopies > 0 {
94                                 // Not much we can do with blocks with no copies.
95
96                                 // A server's host-port string appears as a key in this map
97                                 // iff it contains the block.
98                                 serverHasBlock := map[string]struct{}{}
99                                 for _, info := range serversStoringBlock {
100                                         sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
101                                         serverHasBlock[cs.Get(sa.URL())] = struct{}{}
102                                 }
103
104                                 roots := keepclient.NewRootSorter(kc.LocalRoots(),
105                                         block.String()).GetSortedRoots()
106
107                                 l := Locator(block)
108                                 m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
109                                         roots, numCopiesMissing)
110                         }
111                 }
112         }
113         return m
114 }
115
116 // CreatePullServers creates a pull list in which the To and From
117 // fields preserve the ordering of sorted servers and the contents
118 // are all canonical strings.
119 func CreatePullServers(cs CanonicalString,
120         serverHasBlock map[string]struct{},
121         writableServers map[string]struct{},
122         sortedServers []string,
123         maxToFields int) (ps PullServers) {
124
125         ps = PullServers{
126                 To:   make([]string, 0, maxToFields),
127                 From: make([]string, 0, len(serverHasBlock)),
128         }
129
130         for _, host := range sortedServers {
131                 // Strip the protocol portion of the url.
132                 // Use the canonical copy of the string to avoid memory waste.
133                 server := cs.Get(host)
134                 _, hasBlock := serverHasBlock[server]
135                 if hasBlock {
136                         // The from field should include the protocol.
137                         ps.From = append(ps.From, cs.Get(host))
138                 } else if len(ps.To) < maxToFields {
139                         _, writable := writableServers[host]
140                         if writable {
141                                 ps.To = append(ps.To, server)
142                         }
143                 }
144         }
145
146         return
147 }
148
149 // RemoveProtocolPrefix strips the protocol prefix from a url.
150 func RemoveProtocolPrefix(url string) string {
151         return url[(strings.LastIndex(url, "/") + 1):]
152 }
153
154 // BuildPullLists produces a PullList for each keep server.
155 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
156         spl = map[string]PullList{}
157         // We don't worry about canonicalizing our strings here, because we
158         // assume lps was created by ComputePullServers() which already
159         // canonicalized the strings for us.
160         for locator, pullServers := range lps {
161                 for _, destination := range pullServers.To {
162                         pullList, pullListExists := spl[destination]
163                         if !pullListExists {
164                                 pullList = PullList{}
165                         }
166                         spl[destination] = append(pullList,
167                                 PullRequest{Locator: locator, Servers: pullServers.From})
168                 }
169         }
170         return
171 }
172
173 // WritePullLists writes each pull list to a file.
174 // The filename is based on the hostname.
175 //
176 // This is just a hack for prototyping, it is not expected to be used
177 // in production.
178 func WritePullLists(arvLogger *logger.Logger,
179         pullLists map[string]PullList) {
180         r := strings.NewReplacer(":", ".")
181         for host, list := range pullLists {
182                 filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
183                 pullListFile, err := os.Create(filename)
184                 if err != nil {
185                         loggerutil.FatalWithMessage(arvLogger,
186                                 fmt.Sprintf("Failed to open %s: %v", filename, err))
187                 }
188                 defer pullListFile.Close()
189
190                 enc := json.NewEncoder(pullListFile)
191                 err = enc.Encode(list)
192                 if err != nil {
193                         loggerutil.FatalWithMessage(arvLogger,
194                                 fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
195                 }
196                 log.Printf("Wrote pull list to %s.", filename)
197         }
198 }