1 /* Deals with getting Keep Server blocks from API Server and Keep Servers. */
10 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
11 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
12 "git.curoverse.com/arvados.git/sdk/go/logger"
13 "git.curoverse.com/arvados.git/sdk/go/manifest"
14 "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
24 type ServerAddress struct {
25 Host string `json:"service_host"`
26 Port int `json:"service_port"`
29 // Info about a particular block returned by the server
30 type BlockInfo struct {
31 Digest blockdigest.BlockDigest
33 Mtime int64 // TODO(misha): Replace this with a timestamp.
36 // Info about a specified block given by a server
37 type BlockServerInfo struct {
40 Mtime int64 // TODO(misha): Replace this with a timestamp.
43 type ServerContents struct {
44 BlockDigestToInfo map[blockdigest.BlockDigest]BlockInfo
47 type ServerResponse struct {
49 Contents ServerContents
52 type ReadServers struct {
54 KeepServerIndexToAddress []ServerAddress
55 KeepServerAddressToIndex map[ServerAddress]int
56 ServerToContents map[ServerAddress]ServerContents
57 BlockToServers map[blockdigest.BlockDigest][]BlockServerInfo
58 BlockReplicationCounts map[int]int
61 type GetKeepServersParams struct {
62 Client arvadosclient.ArvadosClient
67 type KeepServiceList struct {
68 ItemsAvailable int `json:"items_available"`
69 KeepServers []ServerAddress `json:"items"`
72 // Methods to implement util.SdkListResponse Interface
73 func (k KeepServiceList) NumItemsAvailable() (numAvailable int, err error) {
74 return k.ItemsAvailable, nil
77 func (k KeepServiceList) NumItemsContained() (numContained int, err error) {
78 return len(k.KeepServers), nil
82 // Don't access the token directly, use getDataManagerToken() to
83 // make sure it's been read.
84 dataManagerToken string
85 dataManagerTokenFile string
86 dataManagerTokenFileReadOnce sync.Once
90 flag.StringVar(&dataManagerTokenFile,
91 "data-manager-token-file",
93 "File with the API token we should use to contact keep servers.")
96 func (s ServerAddress) String() string {
97 return fmt.Sprintf("%s:%d", s.Host, s.Port)
100 func getDataManagerToken(arvLogger *logger.Logger) string {
101 readDataManagerToken := func() {
102 if dataManagerTokenFile == "" {
104 loggerutil.FatalWithMessage(arvLogger,
105 "Data Manager Token needed, but data manager token file not specified.")
107 rawRead, err := ioutil.ReadFile(dataManagerTokenFile)
109 loggerutil.FatalWithMessage(arvLogger,
110 fmt.Sprintf("Unexpected error reading token file %s: %v",
111 dataManagerTokenFile,
114 dataManagerToken = strings.TrimSpace(string(rawRead))
118 dataManagerTokenFileReadOnce.Do(readDataManagerToken)
119 return dataManagerToken
122 func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
123 results = GetKeepServers(params)
124 log.Printf("Returned %d keep disks", len(results.ServerToContents))
126 ComputeBlockReplicationCounts(&results)
127 log.Printf("Replication level distribution: %v",
128 results.BlockReplicationCounts)
133 func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
134 if ¶ms.Client == nil {
135 log.Fatalf("params.Client passed to GetKeepServers() should " +
136 "contain a valid ArvadosClient, but instead it is nil.")
139 sdkParams := arvadosclient.Dict{
140 "filters": [][]string{[]string{"service_type", "=", "disk"}},
142 if params.Limit > 0 {
143 sdkParams["limit"] = params.Limit
146 var sdkResponse KeepServiceList
147 err := params.Client.List("keep_services", sdkParams, &sdkResponse)
150 loggerutil.FatalWithMessage(params.Logger,
151 fmt.Sprintf("Error requesting keep disks from API server: %v", err))
154 if params.Logger != nil {
155 params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
156 keepInfo := make(map[string]interface{})
158 keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
159 keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
160 keepInfo["keep_servers"] = sdkResponse.KeepServers
162 p["keep_info"] = keepInfo
166 log.Printf("Received keep services list: %+v", sdkResponse)
168 if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
169 loggerutil.FatalWithMessage(params.Logger,
170 fmt.Sprintf("Did not receive all available keep servers: %+v", sdkResponse))
173 results.KeepServerIndexToAddress = sdkResponse.KeepServers
174 results.KeepServerAddressToIndex = make(map[ServerAddress]int)
175 for i, address := range results.KeepServerIndexToAddress {
176 results.KeepServerAddressToIndex[address] = i
179 log.Printf("Got Server Addresses: %v", results)
181 // This is safe for concurrent use
182 client := http.Client{}
184 // Send off all the index requests concurrently
185 responseChan := make(chan ServerResponse)
186 for _, keepServer := range sdkResponse.KeepServers {
187 // The above keepsServer variable is reused for each iteration, so
188 // it would be shared across all goroutines. This would result in
189 // us querying one server n times instead of n different servers
190 // as we intended. To avoid this we add it as an explicit
191 // parameter which gets copied. This bug and solution is described
192 // in https://golang.org/doc/effective_go.html#channels
193 go func(keepServer ServerAddress) {
194 responseChan <- GetServerContents(params.Logger,
200 results.ServerToContents = make(map[ServerAddress]ServerContents)
201 results.BlockToServers = make(map[blockdigest.BlockDigest][]BlockServerInfo)
203 // Read all the responses
204 for i := range sdkResponse.KeepServers {
205 _ = i // Here to prevent go from complaining.
206 response := <-responseChan
207 log.Printf("Received channel response from %v containing %d files",
209 len(response.Contents.BlockDigestToInfo))
210 results.ServerToContents[response.Address] = response.Contents
211 serverIndex := results.KeepServerAddressToIndex[response.Address]
212 for _, blockInfo := range response.Contents.BlockDigestToInfo {
213 results.BlockToServers[blockInfo.Digest] = append(
214 results.BlockToServers[blockInfo.Digest],
215 BlockServerInfo{ServerIndex: serverIndex,
216 Size: blockInfo.Size,
217 Mtime: blockInfo.Mtime})
223 func GetServerContents(arvLogger *logger.Logger,
224 keepServer ServerAddress,
225 client http.Client) (response ServerResponse) {
227 GetServerStatus(arvLogger, keepServer, client)
229 req := CreateIndexRequest(arvLogger, keepServer)
230 resp, err := client.Do(req)
232 loggerutil.FatalWithMessage(arvLogger,
233 fmt.Sprintf("Error fetching %s: %v", req.URL.String(), err))
236 return ReadServerResponse(arvLogger, keepServer, resp)
239 func GetServerStatus(arvLogger *logger.Logger,
240 keepServer ServerAddress,
241 client http.Client) {
242 url := fmt.Sprintf("http://%s:%d/status.json",
246 if arvLogger != nil {
247 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
248 keepInfo := p["keep_info"].(map[string]interface{})
249 serverInfo := make(map[string]interface{})
250 serverInfo["time_status_request_sent"] = time.Now()
252 keepInfo[keepServer.String()] = serverInfo
256 resp, err := client.Get(url)
258 loggerutil.FatalWithMessage(arvLogger,
259 fmt.Sprintf("Error getting keep status from %s: %v", url, err))
260 } else if resp.StatusCode != 200 {
261 loggerutil.FatalWithMessage(arvLogger,
262 fmt.Sprintf("Received error code %d in response to request "+
264 resp.StatusCode, url, resp.Status))
267 var keepStatus map[string]interface{}
268 decoder := json.NewDecoder(resp.Body)
270 err = decoder.Decode(&keepStatus)
272 loggerutil.FatalWithMessage(arvLogger,
273 fmt.Sprintf("Error decoding keep status from %s: %v", url, err))
276 if arvLogger != nil {
277 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
278 keepInfo := p["keep_info"].(map[string]interface{})
279 serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
280 serverInfo["time_status_response_processed"] = time.Now()
281 serverInfo["status"] = keepStatus
286 func CreateIndexRequest(arvLogger *logger.Logger,
287 keepServer ServerAddress) (req *http.Request) {
288 url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
289 log.Println("About to fetch keep server contents from " + url)
291 if arvLogger != nil {
292 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
293 keepInfo := p["keep_info"].(map[string]interface{})
294 serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
295 serverInfo["time_index_request_sent"] = time.Now()
299 req, err := http.NewRequest("GET", url, nil)
301 loggerutil.FatalWithMessage(arvLogger,
302 fmt.Sprintf("Error building http request for %s: %v", url, err))
305 req.Header.Add("Authorization",
306 fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
310 func ReadServerResponse(arvLogger *logger.Logger,
311 keepServer ServerAddress,
312 resp *http.Response) (response ServerResponse) {
314 if resp.StatusCode != 200 {
315 loggerutil.FatalWithMessage(arvLogger,
316 fmt.Sprintf("Received error code %d in response to request "+
318 resp.StatusCode, keepServer.String(), resp.Status))
321 if arvLogger != nil {
322 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
323 keepInfo := p["keep_info"].(map[string]interface{})
324 serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
326 serverInfo["time_index_response_received"] = time.Now()
330 response.Address = keepServer
331 response.Contents.BlockDigestToInfo =
332 make(map[blockdigest.BlockDigest]BlockInfo)
333 scanner := bufio.NewScanner(resp.Body)
334 numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
337 blockInfo, err := parseBlockInfoFromIndexLine(scanner.Text())
339 loggerutil.FatalWithMessage(arvLogger,
340 fmt.Sprintf("Error parsing BlockInfo from index line "+
341 "received from %s: %v",
346 if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
347 // This server returned multiple lines containing the same block digest.
349 if storedBlock.Size != blockInfo.Size {
350 numSizeDisagreements += 1
351 // TODO(misha): Consider failing here.
352 message := fmt.Sprintf("Saw different sizes for the same block "+
358 if arvLogger != nil {
359 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
360 keepInfo := p["keep_info"].(map[string]interface{})
361 serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
362 var error_list []string
363 read_error_list, has_list := serverInfo["error_list"]
365 error_list = read_error_list.([]string)
366 } // If we didn't have the list, error_list is already an empty list
367 serverInfo["error_list"] = append(error_list, message)
371 // Keep the block that is bigger, or the block that's newer in
372 // the case of a size tie.
373 if storedBlock.Size < blockInfo.Size ||
374 (storedBlock.Size == blockInfo.Size &&
375 storedBlock.Mtime < blockInfo.Mtime) {
376 response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
379 response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
382 if err := scanner.Err(); err != nil {
383 loggerutil.FatalWithMessage(arvLogger,
384 fmt.Sprintf("Received error scanning index response from %s: %v",
388 log.Printf("%s index contained %d lines with %d duplicates with "+
389 "%d size disagreements",
393 numSizeDisagreements)
395 if arvLogger != nil {
396 arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
397 keepInfo := p["keep_info"].(map[string]interface{})
398 serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
400 serverInfo["time_processing_finished"] = time.Now()
401 serverInfo["lines_received"] = numLines
402 serverInfo["duplicates_seen"] = numDuplicates
403 serverInfo["size_disagreements_seen"] = numSizeDisagreements
411 func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err error) {
412 tokens := strings.Fields(indexLine)
413 if len(tokens) != 2 {
414 err = fmt.Errorf("Expected 2 tokens per line but received a "+
415 "line containing %v instead.",
419 var locator manifest.BlockLocator
420 if locator, err = manifest.ParseBlockLocator(tokens[0]); err != nil {
423 if len(locator.Hints) > 0 {
424 err = fmt.Errorf("Block locator in index line should not contain hints "+
430 blockInfo.Mtime, err = strconv.ParseInt(tokens[1], 10, 64)
434 blockInfo.Digest = locator.Digest
435 blockInfo.Size = locator.Size
439 func ComputeBlockReplicationCounts(readServers *ReadServers) {
440 readServers.BlockReplicationCounts = make(map[int]int)
441 for _, infos := range readServers.BlockToServers {
442 replication := len(infos)
443 readServers.BlockReplicationCounts[replication] += 1