More reformatting of unittests to improve readability.
[arvados.git] / services / datamanager / summary / pull_list.go
1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
2 package summary
3
4 import (
5         "encoding/json"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/sdk/go/logger"
10         "git.curoverse.com/arvados.git/services/datamanager/keep"
11         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
12         "log"
13         "os"
14         "strings"
15 )
16
17 type Locator struct {
18         Digest blockdigest.BlockDigest
19         // TODO(misha): Add size field to the Locator (and MarshalJSON() below)
20 }
21
22 func (l Locator) MarshalJSON() ([]byte, error) {
23         return []byte("\"" + l.Digest.String() + "\""), nil
24 }
25
26 // One entry in the Pull List
27 type PullRequest struct {
28         Locator Locator  `json:"locator"`
29         Servers []string `json:"servers"`
30 }
31
32 // The Pull List for a particular server
33 type PullList []PullRequest
34
35 // PullListByDigest implements sort.Interface for PullList based on
36 // the Digest.
37 type PullListByDigest PullList
38
39 func (a PullListByDigest) Len() int      { return len(a) }
40 func (a PullListByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
41 func (a PullListByDigest) Less(i, j int) bool {
42         di, dj := a[i].Locator.Digest, a[j].Locator.Digest
43         return di.H < dj.H || (di.H == dj.H && di.L < dj.L)
44 }
45
46 // For a given under-replicated block, this structure represents which
47 // servers should pull the specified block and which servers they can
48 // pull it from.
49 type PullServers struct {
50         To   []string // Servers that should pull the specified block
51         From []string // Servers that already contain the specified block
52 }
53
54 // Creates a map from block locator to PullServers with one entry for
55 // each under-replicated block.
56 func ComputePullServers(kc *keepclient.KeepClient,
57         keepServerInfo *keep.ReadServers,
58         blockToDesiredReplication map[blockdigest.BlockDigest]int,
59         underReplicated BlockSet) (m map[Locator]PullServers) {
60         m = map[Locator]PullServers{}
61         // We use CanonicalString to avoid filling memory with dupicate
62         // copies of the same string.
63         var cs CanonicalString
64
65         for block, _ := range underReplicated {
66                 serversStoringBlock := keepServerInfo.BlockToServers[block]
67                 numCopies := len(serversStoringBlock)
68                 numCopiesMissing := blockToDesiredReplication[block] - numCopies
69                 if numCopiesMissing > 0 {
70                         // We expect this to always be true, since the block was listed
71                         // in underReplicated.
72                         // TODO(misha): Consider asserting the above conditional.
73
74                         if numCopies > 0 {
75                                 // I believe that we should expect this to always be true.
76
77                                 // A server's host-port string appears as a key in this map
78                                 // iff it contains the block.
79                                 serverHasBlock := map[string]struct{}{}
80                                 for _, info := range serversStoringBlock {
81                                         sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
82                                         serverHasBlock[sa.HostPort()] = struct{}{}
83                                 }
84
85                                 roots := keepclient.NewRootSorter(kc.LocalRoots(),
86                                         block.String()).GetSortedRoots()
87
88                                 l := Locator{Digest: block}
89                                 m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing)
90                         }
91                 }
92         }
93         return m
94 }
95
96 // Creates a pull list in which the To and From fields preserve the
97 // ordering of sorted servers and the contents are all canonical
98 // strings.
99 func CreatePullServers(cs CanonicalString,
100         serverHasBlock map[string]struct{},
101         sortedServers []string,
102         maxToFields int) (ps PullServers) {
103
104         ps = PullServers{
105                 To:   make([]string, 0, maxToFields),
106                 From: make([]string, 0, len(serverHasBlock)),
107         }
108
109         for _, host := range sortedServers {
110                 // Strip the protocol portion of the url.
111                 // Use the canonical copy of the string to avoid memory waste.
112                 server := cs.Get(RemoveProtocolPrefix(host))
113                 _, hasBlock := serverHasBlock[server]
114                 if hasBlock {
115                         ps.From = append(ps.From, server)
116                 } else if len(ps.To) < maxToFields {
117                         ps.To = append(ps.To, server)
118                 }
119         }
120
121         return
122 }
123
124 // Strips the protocol prefix from a url.
125 func RemoveProtocolPrefix(url string) string {
126         return url[(strings.LastIndex(url, "/") + 1):]
127 }
128
129 // Produces a PullList for each keep server.
130 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
131         spl = map[string]PullList{}
132         // We don't worry about canonicalizing our strings here, because we
133         // assume lps was created by ComputePullServers() which already
134         // canonicalized the strings for us.
135         for locator, pullServers := range lps {
136                 for _, destination := range pullServers.To {
137                         pullList, pullListExists := spl[destination]
138                         if !pullListExists {
139                                 pullList = PullList{}
140                                 spl[destination] = pullList
141                         }
142                         pullList = append(pullList,
143                                 PullRequest{Locator: locator, Servers: pullServers.From})
144                         spl[destination] = pullList
145                 }
146         }
147         return
148 }
149
150 // Writes each pull list to a file.
151 // The filename is based on the hostname.
152 //
153 // This is just a hack for prototyping, it is not expected to be used
154 // in production.
155 func WritePullLists(arvLogger *logger.Logger,
156         pullLists map[string]PullList) {
157         r := strings.NewReplacer(":", ".")
158         for host, list := range pullLists {
159                 filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
160                 pullListFile, err := os.Create(filename)
161                 if err != nil {
162                         loggerutil.FatalWithMessage(arvLogger,
163                                 fmt.Sprintf("Failed to open %s: %v", filename, err))
164                 }
165                 defer pullListFile.Close()
166
167                 enc := json.NewEncoder(pullListFile)
168                 err = enc.Encode(list)
169                 if err != nil {
170                         loggerutil.FatalWithMessage(arvLogger,
171                                 fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
172                 }
173                 log.Printf("Wrote pull list to %s.", filename)
174         }
175 }