Merge branch 'master' into 3408-production-datamanager
[arvados.git] / services / datamanager / summary / pull_list.go
1 // Code for generating pull lists as described in https://arvados.org/projects/arvados/wiki/Keep_Design_Doc#Pull-List
2 package summary
3
4 import (
5         "encoding/json"
6         "fmt"
7         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
8         "git.curoverse.com/arvados.git/sdk/go/keepclient"
9         "git.curoverse.com/arvados.git/sdk/go/logger"
10         "git.curoverse.com/arvados.git/services/datamanager/keep"
11         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
12         "log"
13         "os"
14         "strings"
15 )
16
17 type Locator struct {
18         Digest blockdigest.BlockDigest
19         // TODO(misha): Add size field to the Locator (and MarshalJSON() below)
20 }
21
22 func (l Locator) MarshalJSON() ([]byte, error) {
23         return []byte("\"" + l.Digest.String() + "\""), nil
24 }
25
26 // One entry in the Pull List
27 type PullListEntry struct {
28         Locator Locator  `json:"locator"`
29         Servers []string `json:"servers"`
30 }
31
32 // The Pull List for a particular server
33 type PullList struct {
34         Entries []PullListEntry `json:"blocks"`
35 }
36
37 // EntriesByDigest implements sort.Interface for []PullListEntry
38 // based on the Digest.
39 type EntriesByDigest []PullListEntry
40
41 func (a EntriesByDigest) Len() int      { return len(a) }
42 func (a EntriesByDigest) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
43 func (a EntriesByDigest) Less(i, j int) bool {
44         di, dj := a[i].Locator.Digest, a[j].Locator.Digest
45         return di.H < dj.H || (di.H == dj.H && di.L < dj.L)
46 }
47
48 // For a given under-replicated block, this structure represents which
49 // servers should pull the specified block and which servers they can
50 // pull it from.
51 type PullServers struct {
52         To   []string // Servers that should pull the specified block
53         From []string // Servers that already contain the specified block
54 }
55
56 // Creates a map from block locator to PullServers with one entry for
57 // each under-replicated block.
58 func ComputePullServers(kc *keepclient.KeepClient,
59         keepServerInfo *keep.ReadServers,
60         blockToDesiredReplication map[blockdigest.BlockDigest]int,
61         underReplicated BlockSet) (m map[Locator]PullServers) {
62         m = map[Locator]PullServers{}
63         // We use CanonicalString to avoid filling memory with dupicate
64         // copies of the same string.
65         var cs CanonicalString
66
67         for block, _ := range underReplicated {
68                 serversStoringBlock := keepServerInfo.BlockToServers[block]
69                 numCopies := len(serversStoringBlock)
70                 numCopiesMissing := blockToDesiredReplication[block] - numCopies
71                 if numCopiesMissing > 0 {
72                         // We expect this to always be true, since the block was listed
73                         // in underReplicated.
74                         // TODO(misha): Consider asserting the above conditional.
75
76                         if numCopies > 0 {
77                                 // I believe that we should expect this to always be true.
78
79                                 // A server's host-port string appears as a key in this map
80                                 // iff it contains the block.
81                                 serverHasBlock := map[string]struct{}{}
82                                 for _, info := range serversStoringBlock {
83                                         sa := keepServerInfo.KeepServerIndexToAddress[info.ServerIndex]
84                                         serverHasBlock[sa.HostPort()] = struct{}{}
85                                 }
86
87                                 roots := keepclient.NewRootSorter(kc.ServiceRoots(),
88                                         block.String()).GetSortedRoots()
89
90                                 l := Locator{Digest: block}
91                                 m[l] = CreatePullServers(cs, serverHasBlock, roots, numCopiesMissing)
92                         }
93                 }
94         }
95         return m
96 }
97
98 // Creates a pull list in which the To and From fields preserve the
99 // ordering of sorted servers and the contents are all canonical
100 // strings.
101 func CreatePullServers(cs CanonicalString,
102         serverHasBlock map[string]struct{},
103         sortedServers []string,
104         maxToFields int) (ps PullServers) {
105
106         ps = PullServers{
107                 To:   make([]string, 0, maxToFields),
108                 From: make([]string, 0, len(serverHasBlock)),
109         }
110
111         for _, host := range sortedServers {
112                 // Strip the protocol portion of the url.
113                 // Use the canonical copy of the string to avoid memory waste.
114                 server := cs.Get(RemoveProtocolPrefix(host))
115                 _, hasBlock := serverHasBlock[server]
116                 if hasBlock {
117                         ps.From = append(ps.From, server)
118                 } else if len(ps.To) < maxToFields {
119                         ps.To = append(ps.To, server)
120                 }
121         }
122
123         return
124 }
125
126 // Strips the protocol prefix from a url.
127 func RemoveProtocolPrefix(url string) string {
128         return url[(strings.LastIndex(url, "/") + 1):]
129 }
130
131 // Produces a PullList for each keep server.
132 func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
133         spl = map[string]PullList{}
134         // We don't worry about canonicalizing our strings here, because we
135         // assume lps was created by ComputePullServers() which already
136         // canonicalized the strings for us.
137         for locator, pullServers := range lps {
138                 for _, destination := range pullServers.To {
139                         pullList, pullListExists := spl[destination]
140                         if !pullListExists {
141                                 pullList = PullList{Entries: []PullListEntry{}}
142                                 spl[destination] = pullList
143                         }
144                         pullList.Entries = append(pullList.Entries,
145                                 PullListEntry{Locator: locator, Servers: pullServers.From})
146                         spl[destination] = pullList
147                 }
148         }
149         return
150 }
151
152 // Writes each pull list to a file.
153 // The filename is based on the hostname.
154 //
155 // This is just a hack for prototyping, it is not expected to be used
156 // in production.
157 func WritePullLists(arvLogger *logger.Logger,
158         pullLists map[string]PullList) {
159         r := strings.NewReplacer(":", ".")
160         for host, list := range pullLists {
161                 filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
162                 pullListFile, err := os.Create(filename)
163                 if err != nil {
164                         loggerutil.FatalWithMessage(arvLogger,
165                                 fmt.Sprintf("Failed to open %s: %v", filename, err))
166                 }
167                 defer pullListFile.Close()
168
169                 enc := json.NewEncoder(pullListFile)
170                 err = enc.Encode(list)
171                 if err != nil {
172                         loggerutil.FatalWithMessage(arvLogger,
173                                 fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
174                 }
175                 log.Printf("Wrote pull list to %s.", filename)
176         }
177 }