Merge branch '6598-crunch-progress-stats' closes #6598
[arvados.git] / services / datamanager / summary / pull_list.go
1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
2 package summary
3
4 import (
5         "encoding/json"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/sdk/go/logger"
10         "git.curoverse.com/arvados.git/services/datamanager/keep"
11         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
12         "log"
13         "os"
14         "strings"
15 )
16
17 type Locator blockdigest.DigestWithSize
18
19 func (l Locator) MarshalJSON() ([]byte, error) {
20         return []byte("\"" + blockdigest.DigestWithSize(l).String() + "\""), nil
21 }
22
23 // One entry in the Pull List
24 type PullRequest struct {
25         Locator Locator  `json:"locator"`
26         Servers []string `json:"servers"`
27 }
28
29 // The Pull List for a particular server
30 type PullList []PullRequest
31
32 // PullListByLocator implements sort.Interface for PullList based on
33 // the Digest.
34 type PullListByLocator PullList
35
36 func (a PullListByLocator) Len() int      { return len(a) }
37 func (a PullListByLocator) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
38 func (a PullListByLocator) Less(i, j int) bool {
39         di, dj := a[i].Locator.Digest, a[j].Locator.Digest
40         if di.H < dj.H {
41                 return true
42         } else if di.H == dj.H {
43                 if di.L < dj.L {
44                         return true
45                 } else if di.L == dj.L {
46                         return a[i].Locator.Size < a[j].Locator.Size
47                 }
48         }
49         return false
50 }
51
52 // For a given under-replicated block, this structure represents which
53 // servers should pull the specified block and which servers they can
54 // pull it from.
55 type PullServers struct {
56         To   []string // Servers that should pull the specified block
57         From []string // Servers that already contain the specified block
58 }
59
60 // Creates a map from block locator to PullServers with one entry for
61 // each under-replicated block.
62 //
63 // This method ignores zero-replica blocks since there are no servers
64 // to pull them from, so callers should feel free to omit them, but
65 // this function will ignore them if they are provided.
66 func ComputePullServers(kc *keepclient.KeepClient,
67         keepServerInfo *keep.ReadServers,
68         blockToDesiredReplication map[blockdigest.DigestWithSize]int,
69         underReplicated BlockSet) (m map[Locator]PullServers) {
70         m = map[Locator]PullServers{}
71         // We use CanonicalString to avoid filling memory with dupicate
72         // copies of the same string.
73         var cs CanonicalString
74
75         // Servers that are writeable
76         writableServers := map[string]struct{}{}
77         for _, url := range kc.WritableLocalRoots() {
78                 writableServers[cs.Get(url)] = struct{}{}
79         }
80
81         for block, _ := range underReplicated {
82                 serversStoringBlock := keepServerInfo.BlockToServers[block]
83                 numCopies := len(serversStoringBlock)
84                 numCopiesMissing := blockToDesiredReplication[block] - numCopies
85                 if numCopiesMissing > 0 {
86                         // We expect this to always be true, since the block was listed
87                         // in underReplicated.
88
89                         if numCopies > 0 {
90                                 // Not much we can do with blocks with no copies.
91
92                                 // A server's host-port string appears as a key in this map
93                                 // iff it contains the block.
94                                 serverHasBlock := map[string]struct{}{}
95                                 for _, info := range serversStoringBlock {
96                                         sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
97                                         serverHasBlock[cs.Get(sa.HostPort())] = struct{}{}
98                                 }
99
100                                 roots := keepclient.NewRootSorter(kc.LocalRoots(),
101                                         block.String()).GetSortedRoots()
102
103                                 l := Locator(block)
104                                 m[l] = CreatePullServers(cs, serverHasBlock, writableServers,
105                                         roots, numCopiesMissing)
106                         }
107                 }
108         }
109         return m
110 }
111
112 // Creates a pull list in which the To and From fields preserve the
113 // ordering of sorted servers and the contents are all canonical
114 // strings.
115 func CreatePullServers(cs CanonicalString,
116         serverHasBlock map[string]struct{},
117         writableServers map[string]struct{},
118         sortedServers []string,
119         maxToFields int) (ps PullServers) {
120
121         ps = PullServers{
122                 To:   make([]string, 0, maxToFields),
123                 From: make([]string, 0, len(serverHasBlock)),
124         }
125
126         for _, host := range sortedServers {
127                 // Strip the protocol portion of the url.
128                 // Use the canonical copy of the string to avoid memory waste.
129                 server := cs.Get(RemoveProtocolPrefix(host))
130                 _, hasBlock := serverHasBlock[server]
131                 if hasBlock {
132                         // The from field should include the protocol.
133                         ps.From = append(ps.From, cs.Get(host))
134                 } else if len(ps.To) < maxToFields {
135                         _, writable := writableServers[host]
136                         if writable {
137                                 ps.To = append(ps.To, server)
138                         }
139                 }
140         }
141
142         return
143 }
144
145 // Strips the protocol prefix from a url.
146 func RemoveProtocolPrefix(url string) string {
147         return url[(strings.LastIndex(url, "/") + 1):]
148 }
149
150 // Produces a PullList for each keep server.
151 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
152         spl = map[string]PullList{}
153         // We don't worry about canonicalizing our strings here, because we
154         // assume lps was created by ComputePullServers() which already
155         // canonicalized the strings for us.
156         for locator, pullServers := range lps {
157                 for _, destination := range pullServers.To {
158                         pullList, pullListExists := spl[destination]
159                         if !pullListExists {
160                                 pullList = PullList{}
161                         }
162                         spl[destination] = append(pullList,
163                                 PullRequest{Locator: locator, Servers: pullServers.From})
164                 }
165         }
166         return
167 }
168
169 // Writes each pull list to a file.
170 // The filename is based on the hostname.
171 //
172 // This is just a hack for prototyping, it is not expected to be used
173 // in production.
174 func WritePullLists(arvLogger *logger.Logger,
175         pullLists map[string]PullList) {
176         r := strings.NewReplacer(":", ".")
177         for host, list := range pullLists {
178                 filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
179                 pullListFile, err := os.Create(filename)
180                 if err != nil {
181                         loggerutil.FatalWithMessage(arvLogger,
182                                 fmt.Sprintf("Failed to open %s: %v", filename, err))
183                 }
184                 defer pullListFile.Close()
185
186                 enc := json.NewEncoder(pullListFile)
187                 err = enc.Encode(list)
188                 if err != nil {
189                         loggerutil.FatalWithMessage(arvLogger,
190                                 fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
191                 }
192                 log.Printf("Wrote pull list to %s.", filename)
193         }
194 }