Added string to error message to help with debugging.
[arvados.git] / services / datamanager / keep / keep.go
1 /* Deals with getting Keep Server blocks from API Server and Keep Servers. */
2
3 package keep
4
5 import (
6         "bufio"
7         "encoding/json"
8         "flag"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
12         "git.curoverse.com/arvados.git/sdk/go/logger"
13         "git.curoverse.com/arvados.git/sdk/go/manifest"
14         "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
15         "io/ioutil"
16         "log"
17         "net/http"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22 )
23
24 type ServerAddress struct {
25         Host string `json:"service_host"`
26         Port int    `json:"service_port"`
27         Uuid string `json:"uuid"`
28 }
29
30 // Info about a particular block returned by the server
31 type BlockInfo struct {
32         Digest blockdigest.BlockDigest
33         Size   int
34         Mtime  int64 // TODO(misha): Replace this with a timestamp.
35 }
36
37 // Info about a specified block given by a server
38 type BlockServerInfo struct {
39         ServerIndex int
40         Size        int
41         Mtime       int64 // TODO(misha): Replace this with a timestamp.
42 }
43
44 type ServerContents struct {
45         BlockDigestToInfo map[blockdigest.BlockDigest]BlockInfo
46 }
47
48 type ServerResponse struct {
49         Address  ServerAddress
50         Contents ServerContents
51 }
52
53 type ReadServers struct {
54         ReadAllServers           bool
55         KeepServerIndexToAddress []ServerAddress
56         KeepServerAddressToIndex map[ServerAddress]int
57         ServerToContents         map[ServerAddress]ServerContents
58         BlockToServers           map[blockdigest.BlockDigest][]BlockServerInfo
59         BlockReplicationCounts   map[int]int
60 }
61
62 type GetKeepServersParams struct {
63         Client arvadosclient.ArvadosClient
64         Logger *logger.Logger
65         Limit  int
66 }
67
68 type KeepServiceList struct {
69         ItemsAvailable int             `json:"items_available"`
70         KeepServers    []ServerAddress `json:"items"`
71 }
72
73 var (
74         // Don't access the token directly, use getDataManagerToken() to
75         // make sure it's been read.
76         dataManagerToken             string
77         dataManagerTokenFile         string
78         dataManagerTokenFileReadOnce sync.Once
79 )
80
81 func init() {
82         flag.StringVar(&dataManagerTokenFile,
83                 "data-manager-token-file",
84                 "",
85                 "File with the API token we should use to contact keep servers.")
86 }
87
88 // TODO(misha): Change this to include the UUID as well.
89 func (s ServerAddress) String() string {
90         return s.HostPort()
91 }
92
93 func (s ServerAddress) HostPort() string {
94         return fmt.Sprintf("%s:%d", s.Host, s.Port)
95 }
96
97 func getDataManagerToken(arvLogger *logger.Logger) string {
98         readDataManagerToken := func() {
99                 if dataManagerTokenFile == "" {
100                         flag.Usage()
101                         loggerutil.FatalWithMessage(arvLogger,
102                                 "Data Manager Token needed, but data manager token file not specified.")
103                 } else {
104                         rawRead, err := ioutil.ReadFile(dataManagerTokenFile)
105                         if err != nil {
106                                 loggerutil.FatalWithMessage(arvLogger,
107                                         fmt.Sprintf("Unexpected error reading token file %s: %v",
108                                                 dataManagerTokenFile,
109                                                 err))
110                         }
111                         dataManagerToken = strings.TrimSpace(string(rawRead))
112                 }
113         }
114
115         dataManagerTokenFileReadOnce.Do(readDataManagerToken)
116         return dataManagerToken
117 }
118
119 func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
120         results = GetKeepServers(params)
121         log.Printf("Returned %d keep disks", len(results.ServerToContents))
122
123         results.Summarize(params.Logger)
124         log.Printf("Replication level distribution: %v",
125                 results.BlockReplicationCounts)
126
127         return
128 }
129
130 func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
131         if &params.Client == nil {
132                 log.Fatalf("params.Client passed to GetKeepServers() should " +
133                         "contain a valid ArvadosClient, but instead it is nil.")
134         }
135
136         sdkParams := arvadosclient.Dict{
137                 "filters": [][]string{[]string{"service_type", "=", "disk"}},
138         }
139         if params.Limit > 0 {
140                 sdkParams["limit"] = params.Limit
141         }
142
143         var sdkResponse KeepServiceList
144         err := params.Client.List("keep_services", sdkParams, &sdkResponse)
145
146         if err != nil {
147                 loggerutil.FatalWithMessage(params.Logger,
148                         fmt.Sprintf("Error requesting keep disks from API server: %v", err))
149         }
150
151         if params.Logger != nil {
152                 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
153                         keepInfo := logger.GetOrCreateMap(p, "keep_info")
154                         keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
155                         keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
156                         keepInfo["keep_servers"] = sdkResponse.KeepServers
157                 })
158         }
159
160         log.Printf("Received keep services list: %+v", sdkResponse)
161
162         if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
163                 loggerutil.FatalWithMessage(params.Logger,
164                         fmt.Sprintf("Did not receive all available keep servers: %+v", sdkResponse))
165         }
166
167         results.KeepServerIndexToAddress = sdkResponse.KeepServers
168         results.KeepServerAddressToIndex = make(map[ServerAddress]int)
169         for i, address := range results.KeepServerIndexToAddress {
170                 results.KeepServerAddressToIndex[address] = i
171         }
172
173         log.Printf("Got Server Addresses: %v", results)
174
175         // This is safe for concurrent use
176         client := http.Client{}
177
178         // Send off all the index requests concurrently
179         responseChan := make(chan ServerResponse)
180         for _, keepServer := range sdkResponse.KeepServers {
181                 // The above keepsServer variable is reused for each iteration, so
182                 // it would be shared across all goroutines. This would result in
183                 // us querying one server n times instead of n different servers
184                 // as we intended. To avoid this we add it as an explicit
185                 // parameter which gets copied. This bug and solution is described
186                 // in https://golang.org/doc/effective_go.html#channels
187                 go func(keepServer ServerAddress) {
188                         responseChan <- GetServerContents(params.Logger,
189                                 keepServer,
190                                 client)
191                 }(keepServer)
192         }
193
194         results.ServerToContents = make(map[ServerAddress]ServerContents)
195         results.BlockToServers = make(map[blockdigest.BlockDigest][]BlockServerInfo)
196
197         // Read all the responses
198         for i := range sdkResponse.KeepServers {
199                 _ = i // Here to prevent go from complaining.
200                 response := <-responseChan
201                 log.Printf("Received channel response from %v containing %d files",
202                         response.Address,
203                         len(response.Contents.BlockDigestToInfo))
204                 results.ServerToContents[response.Address] = response.Contents
205                 serverIndex := results.KeepServerAddressToIndex[response.Address]
206                 for _, blockInfo := range response.Contents.BlockDigestToInfo {
207                         results.BlockToServers[blockInfo.Digest] = append(
208                                 results.BlockToServers[blockInfo.Digest],
209                                 BlockServerInfo{ServerIndex: serverIndex,
210                                         Size:  blockInfo.Size,
211                                         Mtime: blockInfo.Mtime})
212                 }
213         }
214         return
215 }
216
217 func GetServerContents(arvLogger *logger.Logger,
218         keepServer ServerAddress,
219         client http.Client) (response ServerResponse) {
220
221         GetServerStatus(arvLogger, keepServer, client)
222
223         req := CreateIndexRequest(arvLogger, keepServer)
224         resp, err := client.Do(req)
225         if err != nil {
226                 loggerutil.FatalWithMessage(arvLogger,
227                         fmt.Sprintf("Error fetching %s: %v. Response was %+v",
228                                 req.URL.String(),
229                                 err,
230                                 resp))
231         }
232
233         return ReadServerResponse(arvLogger, keepServer, resp)
234 }
235
236 func GetServerStatus(arvLogger *logger.Logger,
237         keepServer ServerAddress,
238         client http.Client) {
239         url := fmt.Sprintf("http://%s:%d/status.json",
240                 keepServer.Host,
241                 keepServer.Port)
242
243         if arvLogger != nil {
244                 now := time.Now()
245                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
246                         keepInfo := logger.GetOrCreateMap(p, "keep_info")
247                         serverInfo := make(map[string]interface{})
248                         serverInfo["status_request_sent_at"] = now
249                         serverInfo["host"] = keepServer.Host
250                         serverInfo["port"] = keepServer.Port
251
252                         keepInfo[keepServer.Uuid] = serverInfo
253                 })
254         }
255
256         resp, err := client.Get(url)
257         if err != nil {
258                 loggerutil.FatalWithMessage(arvLogger,
259                         fmt.Sprintf("Error getting keep status from %s: %v", url, err))
260         } else if resp.StatusCode != 200 {
261                 loggerutil.FatalWithMessage(arvLogger,
262                         fmt.Sprintf("Received error code %d in response to request "+
263                                 "for %s status: %s",
264                                 resp.StatusCode, url, resp.Status))
265         }
266
267         var keepStatus map[string]interface{}
268         decoder := json.NewDecoder(resp.Body)
269         decoder.UseNumber()
270         err = decoder.Decode(&keepStatus)
271         if err != nil {
272                 loggerutil.FatalWithMessage(arvLogger,
273                         fmt.Sprintf("Error decoding keep status from %s: %v", url, err))
274         }
275
276         if arvLogger != nil {
277                 now := time.Now()
278                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
279                         keepInfo := logger.GetOrCreateMap(p, "keep_info")
280                         serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
281                         serverInfo["status_response_processed_at"] = now
282                         serverInfo["status"] = keepStatus
283                 })
284         }
285 }
286
287 func CreateIndexRequest(arvLogger *logger.Logger,
288         keepServer ServerAddress) (req *http.Request) {
289         url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
290         log.Println("About to fetch keep server contents from " + url)
291
292         if arvLogger != nil {
293                 now := time.Now()
294                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
295                         keepInfo := logger.GetOrCreateMap(p, "keep_info")
296                         serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
297                         serverInfo["index_request_sent_at"] = now
298                 })
299         }
300
301         req, err := http.NewRequest("GET", url, nil)
302         if err != nil {
303                 loggerutil.FatalWithMessage(arvLogger,
304                         fmt.Sprintf("Error building http request for %s: %v", url, err))
305         }
306
307         req.Header.Add("Authorization",
308                 fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
309         return
310 }
311
312 func ReadServerResponse(arvLogger *logger.Logger,
313         keepServer ServerAddress,
314         resp *http.Response) (response ServerResponse) {
315
316         if resp.StatusCode != 200 {
317                 loggerutil.FatalWithMessage(arvLogger,
318                         fmt.Sprintf("Received error code %d in response to request "+
319                                 "for %s index: %s",
320                                 resp.StatusCode, keepServer.String(), resp.Status))
321         }
322
323         if arvLogger != nil {
324                 now := time.Now()
325                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
326                         keepInfo := logger.GetOrCreateMap(p, "keep_info")
327                         serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
328                         serverInfo["index_response_received_at"] = now
329                 })
330         }
331
332         response.Address = keepServer
333         response.Contents.BlockDigestToInfo =
334                 make(map[blockdigest.BlockDigest]BlockInfo)
335         scanner := bufio.NewScanner(resp.Body)
336         numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
337         for scanner.Scan() {
338                 numLines++
339                 blockInfo, err := parseBlockInfoFromIndexLine(scanner.Text())
340                 if err != nil {
341                         loggerutil.FatalWithMessage(arvLogger,
342                                 fmt.Sprintf("Error parsing BlockInfo from index line "+
343                                         "received from %s: %v",
344                                         keepServer.String(),
345                                         err))
346                 }
347
348                 if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
349                         // This server returned multiple lines containing the same block digest.
350                         numDuplicates += 1
351                         if storedBlock.Size != blockInfo.Size {
352                                 numSizeDisagreements += 1
353                                 // TODO(misha): Consider failing here.
354                                 message := fmt.Sprintf("Saw different sizes for the same block "+
355                                         "on %s: %+v %+v",
356                                         keepServer.String(),
357                                         storedBlock,
358                                         blockInfo)
359                                 log.Println(message)
360                                 if arvLogger != nil {
361                                         arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
362                                                 keepInfo := logger.GetOrCreateMap(p, "keep_info")
363                                                 serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
364                                                 var error_list []string
365                                                 read_error_list, has_list := serverInfo["error_list"]
366                                                 if has_list {
367                                                         error_list = read_error_list.([]string)
368                                                 } // If we didn't have the list, error_list is already an empty list
369                                                 serverInfo["error_list"] = append(error_list, message)
370                                         })
371                                 }
372                         }
373                         // Keep the block that is bigger, or the block that's newer in
374                         // the case of a size tie.
375                         if storedBlock.Size < blockInfo.Size ||
376                                 (storedBlock.Size == blockInfo.Size &&
377                                         storedBlock.Mtime < blockInfo.Mtime) {
378                                 response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
379                         }
380                 } else {
381                         response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
382                 }
383         }
384         if err := scanner.Err(); err != nil {
385                 loggerutil.FatalWithMessage(arvLogger,
386                         fmt.Sprintf("Received error scanning index response from %s: %v",
387                                 keepServer.String(),
388                                 err))
389         } else {
390                 log.Printf("%s index contained %d lines with %d duplicates with "+
391                         "%d size disagreements",
392                         keepServer.String(),
393                         numLines,
394                         numDuplicates,
395                         numSizeDisagreements)
396
397                 if arvLogger != nil {
398                         now := time.Now()
399                         arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
400                                 keepInfo := logger.GetOrCreateMap(p, "keep_info")
401                                 serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
402
403                                 serverInfo["processing_finished_at"] = now
404                                 serverInfo["lines_received"] = numLines
405                                 serverInfo["duplicates_seen"] = numDuplicates
406                                 serverInfo["size_disagreements_seen"] = numSizeDisagreements
407                         })
408                 }
409         }
410         resp.Body.Close()
411         return
412 }
413
414 func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err error) {
415         tokens := strings.Fields(indexLine)
416         if len(tokens) != 2 {
417                 err = fmt.Errorf("Expected 2 tokens per line but received a "+
418                         "line containing %v instead.",
419                         tokens)
420         }
421
422         var locator manifest.BlockLocator
423         if locator, err = manifest.ParseBlockLocator(tokens[0]); err != nil {
424                 err = fmt.Errorf("%v Received error while parsing line \"%s\"",
425                         err, indexLine)
426                 return
427         }
428         if len(locator.Hints) > 0 {
429                 err = fmt.Errorf("Block locator in index line should not contain hints "+
430                         "but it does: %v",
431                         locator)
432                 return
433         }
434
435         blockInfo.Mtime, err = strconv.ParseInt(tokens[1], 10, 64)
436         if err != nil {
437                 return
438         }
439         blockInfo.Digest = locator.Digest
440         blockInfo.Size = locator.Size
441         return
442 }
443
444 func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
445         readServers.BlockReplicationCounts = make(map[int]int)
446         for _, infos := range readServers.BlockToServers {
447                 replication := len(infos)
448                 readServers.BlockReplicationCounts[replication] += 1
449         }
450
451         if arvLogger != nil {
452                 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
453                         keepInfo := logger.GetOrCreateMap(p, "keep_info")
454                         keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
455                 })
456         }
457
458 }