Merge branch 'master' into 6277-manifest-validation-api
[arvados.git] / services / datamanager / keep / keep.go
1 /* Deals with getting Keep Server blocks from API Server and Keep Servers. */
2
3 package keep
4
5 import (
6         "bufio"
7         "encoding/json"
8         "flag"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
12         "git.curoverse.com/arvados.git/sdk/go/logger"
13         "git.curoverse.com/arvados.git/sdk/go/manifest"
14         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
15         "io"
16         "io/ioutil"
17         "log"
18         "net/http"
19         "strconv"
20         "strings"
21         "sync"
22         "time"
23 )
24
25 type ServerAddress struct {
26         Host string `json:"service_host"`
27         Port int    `json:"service_port"`
28         Uuid string `json:"uuid"`
29 }
30
31 // Info about a particular block returned by the server
32 type BlockInfo struct {
33         Digest blockdigest.BlockDigest
34         Size   int
35         Mtime  int64 // TODO(misha): Replace this with a timestamp.
36 }
37
38 // Info about a specified block given by a server
39 type BlockServerInfo struct {
40         ServerIndex int
41         Size        int
42         Mtime       int64 // TODO(misha): Replace this with a timestamp.
43 }
44
45 type ServerContents struct {
46         BlockDigestToInfo map[blockdigest.BlockDigest]BlockInfo
47 }
48
49 type ServerResponse struct {
50         Address  ServerAddress
51         Contents ServerContents
52 }
53
54 type ReadServers struct {
55         ReadAllServers           bool
56         KeepServerIndexToAddress []ServerAddress
57         KeepServerAddressToIndex map[ServerAddress]int
58         ServerToContents         map[ServerAddress]ServerContents
59         BlockToServers           map[blockdigest.BlockDigest][]BlockServerInfo
60         BlockReplicationCounts   map[int]int
61 }
62
63 type GetKeepServersParams struct {
64         Client arvadosclient.ArvadosClient
65         Logger *logger.Logger
66         Limit  int
67 }
68
69 type KeepServiceList struct {
70         ItemsAvailable int             `json:"items_available"`
71         KeepServers    []ServerAddress `json:"items"`
72 }
73
74 var (
75         // Don't access the token directly, use getDataManagerToken() to
76         // make sure it's been read.
77         dataManagerToken             string
78         dataManagerTokenFile         string
79         dataManagerTokenFileReadOnce sync.Once
80 )
81
82 func init() {
83         flag.StringVar(&dataManagerTokenFile,
84                 "data-manager-token-file",
85                 "",
86                 "File with the API token we should use to contact keep servers.")
87 }
88
89 // TODO(misha): Change this to include the UUID as well.
90 func (s ServerAddress) String() string {
91         return fmt.Sprintf("%s:%d", s.Host, s.Port)
92 }
93
94 func getDataManagerToken(arvLogger *logger.Logger) string {
95         readDataManagerToken := func() {
96                 if dataManagerTokenFile == "" {
97                         flag.Usage()
98                         loggerutil.FatalWithMessage(arvLogger,
99                                 "Data Manager Token needed, but data manager token file not specified.")
100                 } else {
101                         rawRead, err := ioutil.ReadFile(dataManagerTokenFile)
102                         if err != nil {
103                                 loggerutil.FatalWithMessage(arvLogger,
104                                         fmt.Sprintf("Unexpected error reading token file %s: %v",
105                                                 dataManagerTokenFile,
106                                                 err))
107                         }
108                         dataManagerToken = strings.TrimSpace(string(rawRead))
109                 }
110         }
111
112         dataManagerTokenFileReadOnce.Do(readDataManagerToken)
113         return dataManagerToken
114 }
115
116 func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
117         results = GetKeepServers(params)
118         log.Printf("Returned %d keep disks", len(results.ServerToContents))
119
120         ComputeBlockReplicationCounts(&results)
121         log.Printf("Replication level distribution: %v",
122                 results.BlockReplicationCounts)
123
124         return
125 }
126
127 func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
128         if &params.Client == nil {
129                 log.Fatalf("params.Client passed to GetKeepServers() should " +
130                         "contain a valid ArvadosClient, but instead it is nil.")
131         }
132
133         sdkParams := arvadosclient.Dict{
134                 "filters": [][]string{[]string{"service_type", "=", "disk"}},
135         }
136         if params.Limit > 0 {
137                 sdkParams["limit"] = params.Limit
138         }
139
140         var sdkResponse KeepServiceList
141         err := params.Client.List("keep_services", sdkParams, &sdkResponse)
142
143         if err != nil {
144                 loggerutil.FatalWithMessage(params.Logger,
145                         fmt.Sprintf("Error requesting keep disks from API server: %v", err))
146         }
147
148         if params.Logger != nil {
149                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
150                         keepInfo := make(map[string]interface{})
151
152                         keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
153                         keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
154                         keepInfo["keep_servers"] = sdkResponse.KeepServers
155
156                         p["keep_info"] = keepInfo
157                 })
158         }
159
160         log.Printf("Received keep services list: %+v", sdkResponse)
161
162         if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
163                 loggerutil.FatalWithMessage(params.Logger,
164                         fmt.Sprintf("Did not receive all available keep servers: %+v", sdkResponse))
165         }
166
167         results.KeepServerIndexToAddress = sdkResponse.KeepServers
168         results.KeepServerAddressToIndex = make(map[ServerAddress]int)
169         for i, address := range results.KeepServerIndexToAddress {
170                 results.KeepServerAddressToIndex[address] = i
171         }
172
173         log.Printf("Got Server Addresses: %v", results)
174
175         // This is safe for concurrent use
176         client := http.Client{}
177
178         // Send off all the index requests concurrently
179         responseChan := make(chan ServerResponse)
180         for _, keepServer := range sdkResponse.KeepServers {
181                 // The above keepsServer variable is reused for each iteration, so
182                 // it would be shared across all goroutines. This would result in
183                 // us querying one server n times instead of n different servers
184                 // as we intended. To avoid this we add it as an explicit
185                 // parameter which gets copied. This bug and solution is described
186                 // in https://golang.org/doc/effective_go.html#channels
187                 go func(keepServer ServerAddress) {
188                         responseChan <- GetServerContents(params.Logger,
189                                 keepServer,
190                                 client)
191                 }(keepServer)
192         }
193
194         results.ServerToContents = make(map[ServerAddress]ServerContents)
195         results.BlockToServers = make(map[blockdigest.BlockDigest][]BlockServerInfo)
196
197         // Read all the responses
198         for i := range sdkResponse.KeepServers {
199                 _ = i // Here to prevent go from complaining.
200                 response := <-responseChan
201                 log.Printf("Received channel response from %v containing %d files",
202                         response.Address,
203                         len(response.Contents.BlockDigestToInfo))
204                 results.ServerToContents[response.Address] = response.Contents
205                 serverIndex := results.KeepServerAddressToIndex[response.Address]
206                 for _, blockInfo := range response.Contents.BlockDigestToInfo {
207                         results.BlockToServers[blockInfo.Digest] = append(
208                                 results.BlockToServers[blockInfo.Digest],
209                                 BlockServerInfo{ServerIndex: serverIndex,
210                                         Size:  blockInfo.Size,
211                                         Mtime: blockInfo.Mtime})
212                 }
213         }
214         return
215 }
216
217 func GetServerContents(arvLogger *logger.Logger,
218         keepServer ServerAddress,
219         client http.Client) (response ServerResponse) {
220
221         GetServerStatus(arvLogger, keepServer, client)
222
223         req := CreateIndexRequest(arvLogger, keepServer)
224         resp, err := client.Do(req)
225         if err != nil {
226                 loggerutil.FatalWithMessage(arvLogger,
227                         fmt.Sprintf("Error fetching %s: %v", req.URL.String(), err))
228         }
229
230         return ReadServerResponse(arvLogger, keepServer, resp)
231 }
232
233 func GetServerStatus(arvLogger *logger.Logger,
234         keepServer ServerAddress,
235         client http.Client) {
236         url := fmt.Sprintf("http://%s:%d/status.json",
237                 keepServer.Host,
238                 keepServer.Port)
239
240         if arvLogger != nil {
241                 now := time.Now()
242                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
243                         keepInfo := p["keep_info"].(map[string]interface{})
244                         serverInfo := make(map[string]interface{})
245                         serverInfo["status_request_sent_at"] = now
246                         serverInfo["host"] = keepServer.Host
247                         serverInfo["port"] = keepServer.Port
248
249                         keepInfo[keepServer.Uuid] = serverInfo
250                 })
251         }
252
253         resp, err := client.Get(url)
254         if err != nil {
255                 loggerutil.FatalWithMessage(arvLogger,
256                         fmt.Sprintf("Error getting keep status from %s: %v", url, err))
257         } else if resp.StatusCode != 200 {
258                 loggerutil.FatalWithMessage(arvLogger,
259                         fmt.Sprintf("Received error code %d in response to request "+
260                                 "for %s status: %s",
261                                 resp.StatusCode, url, resp.Status))
262         }
263
264         var keepStatus map[string]interface{}
265         decoder := json.NewDecoder(resp.Body)
266         decoder.UseNumber()
267         err = decoder.Decode(&keepStatus)
268         if err != nil {
269                 loggerutil.FatalWithMessage(arvLogger,
270                         fmt.Sprintf("Error decoding keep status from %s: %v", url, err))
271         }
272
273         if arvLogger != nil {
274                 now := time.Now()
275                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
276                         keepInfo := p["keep_info"].(map[string]interface{})
277                         serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
278                         serverInfo["status_response_processed_at"] = now
279                         serverInfo["status"] = keepStatus
280                 })
281         }
282 }
283
284 func CreateIndexRequest(arvLogger *logger.Logger,
285         keepServer ServerAddress) (req *http.Request) {
286         url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
287         log.Println("About to fetch keep server contents from " + url)
288
289         if arvLogger != nil {
290                 now := time.Now()
291                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
292                         keepInfo := p["keep_info"].(map[string]interface{})
293                         serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
294                         serverInfo["index_request_sent_at"] = now
295                 })
296         }
297
298         req, err := http.NewRequest("GET", url, nil)
299         if err != nil {
300                 loggerutil.FatalWithMessage(arvLogger,
301                         fmt.Sprintf("Error building http request for %s: %v", url, err))
302         }
303
304         req.Header.Add("Authorization",
305                 fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
306         return
307 }
308
309 func ReadServerResponse(arvLogger *logger.Logger,
310         keepServer ServerAddress,
311         resp *http.Response) (response ServerResponse) {
312
313         if resp.StatusCode != 200 {
314                 loggerutil.FatalWithMessage(arvLogger,
315                         fmt.Sprintf("Received error code %d in response to request "+
316                                 "for %s index: %s",
317                                 resp.StatusCode, keepServer.String(), resp.Status))
318         }
319
320         if arvLogger != nil {
321                 now := time.Now()
322                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
323                         keepInfo := p["keep_info"].(map[string]interface{})
324                         serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
325                         serverInfo["index_response_received_at"] = now
326                 })
327         }
328
329         response.Address = keepServer
330         response.Contents.BlockDigestToInfo =
331                 make(map[blockdigest.BlockDigest]BlockInfo)
332         reader := bufio.NewReader(resp.Body)
333         numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
334         for {
335                 numLines++
336                 line, err := reader.ReadString('\n')
337                 if err == io.EOF {
338                         loggerutil.FatalWithMessage(arvLogger,
339                                 fmt.Sprintf("Index from %s truncated at line %d",
340                                         keepServer.String(), numLines))
341                 } else if err != nil {
342                         loggerutil.FatalWithMessage(arvLogger,
343                                 fmt.Sprintf("Error reading index response from %s at line %d: %v",
344                                         keepServer.String(), numLines, err))
345                 }
346                 if line == "\n" {
347                         if _, err := reader.Peek(1); err == nil {
348                                 extra, _ := reader.ReadString('\n')
349                                 loggerutil.FatalWithMessage(arvLogger,
350                                         fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
351                                                 keepServer.String(), numLines+1, extra))
352                         } else if err != io.EOF {
353                                 loggerutil.FatalWithMessage(arvLogger,
354                                         fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
355                                                 keepServer.String(), numLines, err))
356                         }
357                         numLines--
358                         break
359                 }
360                 blockInfo, err := parseBlockInfoFromIndexLine(line)
361                 if err != nil {
362                         loggerutil.FatalWithMessage(arvLogger,
363                                 fmt.Sprintf("Error parsing BlockInfo from index line "+
364                                         "received from %s: %v",
365                                         keepServer.String(),
366                                         err))
367                 }
368
369                 if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
370                         // This server returned multiple lines containing the same block digest.
371                         numDuplicates += 1
372                         if storedBlock.Size != blockInfo.Size {
373                                 numSizeDisagreements += 1
374                                 // TODO(misha): Consider failing here.
375                                 message := fmt.Sprintf("Saw different sizes for the same block "+
376                                         "on %s: %+v %+v",
377                                         keepServer.String(),
378                                         storedBlock,
379                                         blockInfo)
380                                 log.Println(message)
381                                 if arvLogger != nil {
382                                         arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
383                                                 keepInfo := p["keep_info"].(map[string]interface{})
384                                                 serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
385                                                 var error_list []string
386                                                 read_error_list, has_list := serverInfo["error_list"]
387                                                 if has_list {
388                                                         error_list = read_error_list.([]string)
389                                                 } // If we didn't have the list, error_list is already an empty list
390                                                 serverInfo["error_list"] = append(error_list, message)
391                                         })
392                                 }
393                         }
394                         // Keep the block that is bigger, or the block that's newer in
395                         // the case of a size tie.
396                         if storedBlock.Size < blockInfo.Size ||
397                                 (storedBlock.Size == blockInfo.Size &&
398                                         storedBlock.Mtime < blockInfo.Mtime) {
399                                 response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
400                         }
401                 } else {
402                         response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
403                 }
404         }
405
406         log.Printf("%s index contained %d lines with %d duplicates with "+
407                 "%d size disagreements",
408                 keepServer.String(),
409                 numLines,
410                 numDuplicates,
411                 numSizeDisagreements)
412
413         if arvLogger != nil {
414                 now := time.Now()
415                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
416                         keepInfo := p["keep_info"].(map[string]interface{})
417                         serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
418
419                         serverInfo["processing_finished_at"] = now
420                         serverInfo["lines_received"] = numLines
421                         serverInfo["duplicates_seen"] = numDuplicates
422                         serverInfo["size_disagreements_seen"] = numSizeDisagreements
423                 })
424         }
425         resp.Body.Close()
426         return
427 }
428
429 func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err error) {
430         tokens := strings.Fields(indexLine)
431         if len(tokens) != 2 {
432                 err = fmt.Errorf("Expected 2 tokens per line but received a "+
433                         "line containing %v instead.",
434                         tokens)
435         }
436
437         var locator manifest.BlockLocator
438         if locator, err = manifest.ParseBlockLocator(tokens[0]); err != nil {
439                 return
440         }
441         if len(locator.Hints) > 0 {
442                 err = fmt.Errorf("Block locator in index line should not contain hints "+
443                         "but it does: %v",
444                         locator)
445                 return
446         }
447
448         blockInfo.Mtime, err = strconv.ParseInt(tokens[1], 10, 64)
449         if err != nil {
450                 return
451         }
452         blockInfo.Digest = locator.Digest
453         blockInfo.Size = locator.Size
454         return
455 }
456
457 func ComputeBlockReplicationCounts(readServers *ReadServers) {
458         readServers.BlockReplicationCounts = make(map[int]int)
459         for _, infos := range readServers.BlockToServers {
460                 replication := len(infos)
461                 readServers.BlockReplicationCounts[replication] += 1
462         }
463 }