X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/21006cfaf6d4d0ac3884a72803a8723bc4bb76fb..f9e3d32c92920a806d50548dbb9b6c0aab7d8c54:/services/datamanager/keep/keep.go diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go index 3a9c21a438..e7843ea02f 100644 --- a/services/datamanager/keep/keep.go +++ b/services/datamanager/keep/keep.go @@ -6,12 +6,12 @@ import ( "bufio" "encoding/json" "errors" + "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/logger" - "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "io" "io/ioutil" "log" @@ -51,6 +51,7 @@ type ServerContents struct { type ServerResponse struct { Address ServerAddress Contents ServerContents + Err error } // ReadServers struct @@ -76,6 +77,15 @@ type ServiceList struct { KeepServers []ServerAddress `json:"items"` } +var serviceType string + +func init() { + flag.StringVar(&serviceType, + "service-type", + "disk", + "Operate only on keep_services with the specified service_type, ignoring all others.") +} + // String // TODO(misha): Change this to include the UUID as well. func (s ServerAddress) String() string { @@ -91,8 +101,11 @@ func (s ServerAddress) URL() string { } // GetKeepServersAndSummarize gets keep servers from api -func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) { - results = GetKeepServers(params) +func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers, err error) { + results, err = GetKeepServers(params) + if err != nil { + return + } log.Printf("Returned %d keep disks", len(results.ServerToContents)) results.Summarize(params.Logger) @@ -103,47 +116,51 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer } // GetKeepServers from api server -func GetKeepServers(params GetKeepServersParams) (results ReadServers) { +func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) { sdkParams := arvadosclient.Dict{ - "filters": [][]string{[]string{"service_type", "!=", "proxy"}}, + "filters": [][]string{{"service_type", "!=", "proxy"}}, } if params.Limit > 0 { sdkParams["limit"] = params.Limit } var sdkResponse ServiceList - err := params.Client.List("keep_services", sdkParams, &sdkResponse) + err = params.Client.List("keep_services", sdkParams, &sdkResponse) if err != nil { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Error requesting keep disks from API server: %v", err)) + return } - // Currently, only "disk" types are supported. Stop if any other service types are found. + var keepServers []ServerAddress for _, server := range sdkResponse.KeepServers { - if server.ServiceType != "disk" { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Unsupported service type %q found for: %v", server.ServiceType, server)) + if server.ServiceType == serviceType { + keepServers = append(keepServers, server) + } else { + log.Printf("Skipping keep_service %q because its service_type %q does not match -service-type=%q", server, server.ServiceType, serviceType) } } + if len(keepServers) == 0 { + return results, fmt.Errorf("Found no keepservices with the service type %v", serviceType) + } + if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := logger.GetOrCreateMap(p, "keep_info") keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers) keepInfo["keep_servers"] = sdkResponse.KeepServers + keepInfo["indexable_keep_servers"] = keepServers }) } log.Printf("Received keep services list: %+v", sdkResponse) if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable { - loggerutil.FatalWithMessage(params.Logger, - fmt.Sprintf("Did not receive all available keep servers: %+v", sdkResponse)) + return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse) } - results.KeepServerIndexToAddress = sdkResponse.KeepServers + results.KeepServerIndexToAddress = keepServers results.KeepServerAddressToIndex = make(map[ServerAddress]int) for i, address := range results.KeepServerIndexToAddress { results.KeepServerAddressToIndex[address] = i @@ -153,7 +170,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { // Send off all the index requests concurrently responseChan := make(chan ServerResponse) - for _, keepServer := range sdkResponse.KeepServers { + for _, keepServer := range results.KeepServerIndexToAddress { // The above keepsServer variable is reused for each iteration, so // it would be shared across all goroutines. This would result in // us querying one server n times instead of n different servers @@ -171,9 +188,15 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) { results.BlockToServers = make(map[blockdigest.DigestWithSize][]BlockServerInfo) // Read all the responses - for i := range sdkResponse.KeepServers { + for i := range results.KeepServerIndexToAddress { _ = i // Here to prevent go from complaining. response := <-responseChan + + // Check if there were any errors during GetServerContents + if response.Err != nil { + return results, response.Err + } + log.Printf("Received channel response from %v containing %d files", response.Address, len(response.Contents.BlockDigestToInfo)) @@ -194,25 +217,37 @@ func GetServerContents(arvLogger *logger.Logger, keepServer ServerAddress, arv arvadosclient.ArvadosClient) (response ServerResponse) { - GetServerStatus(arvLogger, keepServer, arv) + err := GetServerStatus(arvLogger, keepServer, arv) + if err != nil { + response.Err = err + return + } + + req, err := CreateIndexRequest(arvLogger, keepServer, arv) + if err != nil { + response.Err = err + return + } - req := CreateIndexRequest(arvLogger, keepServer, arv) resp, err := arv.Client.Do(req) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Error fetching %s: %v. Response was %+v", - req.URL.String(), - err, - resp)) + response.Err = err + return + } + + response, err = ReadServerResponse(arvLogger, keepServer, resp) + if err != nil { + response.Err = err + return } - return ReadServerResponse(arvLogger, keepServer, resp) + return } // GetServerStatus get keep server status by invoking /status.json func GetServerStatus(arvLogger *logger.Logger, keepServer ServerAddress, - arv arvadosclient.ArvadosClient) { + arv arvadosclient.ArvadosClient) error { url := fmt.Sprintf("http://%s:%d/status.json", keepServer.Host, keepServer.Port) @@ -232,13 +267,11 @@ func GetServerStatus(arvLogger *logger.Logger, resp, err := arv.Client.Get(url) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Error getting keep status from %s: %v", url, err)) + return fmt.Errorf("Error getting keep status from %s: %v", url, err) } else if resp.StatusCode != 200 { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Received error code %d in response to request "+ - "for %s status: %s", - resp.StatusCode, url, resp.Status)) + return fmt.Errorf("Received error code %d in response to request "+ + "for %s status: %s", + resp.StatusCode, url, resp.Status) } var keepStatus map[string]interface{} @@ -246,8 +279,7 @@ func GetServerStatus(arvLogger *logger.Logger, decoder.UseNumber() err = decoder.Decode(&keepStatus) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Error decoding keep status from %s: %v", url, err)) + return fmt.Errorf("Error decoding keep status from %s: %v", url, err) } if arvLogger != nil { @@ -259,12 +291,14 @@ func GetServerStatus(arvLogger *logger.Logger, serverInfo["status"] = keepStatus }) } + + return nil } // CreateIndexRequest to the keep server func CreateIndexRequest(arvLogger *logger.Logger, keepServer ServerAddress, - arv arvadosclient.ArvadosClient) (req *http.Request) { + arv arvadosclient.ArvadosClient) (req *http.Request, err error) { url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port) log.Println("About to fetch keep server contents from " + url) @@ -277,26 +311,23 @@ func CreateIndexRequest(arvLogger *logger.Logger, }) } - req, err := http.NewRequest("GET", url, nil) + req, err = http.NewRequest("GET", url, nil) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Error building http request for %s: %v", url, err)) + return req, fmt.Errorf("Error building http request for %s: %v", url, err) } req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken) - return + return req, err } // ReadServerResponse reads reasponse from keep server func ReadServerResponse(arvLogger *logger.Logger, keepServer ServerAddress, - resp *http.Response) (response ServerResponse) { + resp *http.Response) (response ServerResponse, err error) { if resp.StatusCode != 200 { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Received error code %d in response to request "+ - "for %s index: %s", - resp.StatusCode, keepServer.String(), resp.Status)) + return response, fmt.Errorf("Received error code %d in response to index request for %s: %s", + resp.StatusCode, keepServer.String(), resp.Status) } if arvLogger != nil { @@ -317,35 +348,30 @@ func ReadServerResponse(arvLogger *logger.Logger, numLines++ line, err := reader.ReadString('\n') if err == io.EOF { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Index from %s truncated at line %d", - keepServer.String(), numLines)) + return response, fmt.Errorf("Index from %s truncated at line %d", + keepServer.String(), numLines) } else if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Error reading index response from %s at line %d: %v", - keepServer.String(), numLines, err)) + return response, fmt.Errorf("Error reading index response from %s at line %d: %v", + keepServer.String(), numLines, err) } if line == "\n" { if _, err := reader.Peek(1); err == nil { extra, _ := reader.ReadString('\n') - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s", - keepServer.String(), numLines+1, extra)) + return response, fmt.Errorf("Index from %s had trailing data at line %d after EOF marker: %s", + keepServer.String(), numLines+1, extra) } else if err != io.EOF { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v", - keepServer.String(), numLines, err)) + return response, fmt.Errorf("Index from %s had read error after EOF marker at line %d: %v", + keepServer.String(), numLines, err) } numLines-- break } blockInfo, err := parseBlockInfoFromIndexLine(line) if err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Error parsing BlockInfo from index line "+ - "received from %s: %v", - keepServer.String(), - err)) + return response, fmt.Errorf("Error parsing BlockInfo from index line "+ + "received from %s: %v", + keepServer.String(), + err) } if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok { @@ -387,19 +413,19 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err tokens := strings.Fields(indexLine) if len(tokens) != 2 { err = fmt.Errorf("Expected 2 tokens per line but received a "+ - "line containing %v instead.", + "line containing %#q instead.", tokens) } var locator blockdigest.BlockLocator if locator, err = blockdigest.ParseBlockLocator(tokens[0]); err != nil { - err = fmt.Errorf("%v Received error while parsing line \"%s\"", + err = fmt.Errorf("%v Received error while parsing line \"%#q\"", err, indexLine) return } if len(locator.Hints) > 0 { err = fmt.Errorf("Block locator in index line should not contain hints "+ - "but it does: %v", + "but it does: %#q", locator) return } @@ -440,13 +466,29 @@ type TrashRequest struct { type TrashList []TrashRequest // SendTrashLists to trash queue -func SendTrashLists(kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) { +func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList, dryRun bool) (errs []error) { count := 0 barrier := make(chan error) client := kc.Client for url, v := range spl { + if arvLogger != nil { + // We need a local variable because Update doesn't call our mutator func until later, + // when our list variable might have been reused by the next loop iteration. + url := url + trashLen := len(v) + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + trashListInfo := logger.GetOrCreateMap(p, "trash_list_len") + trashListInfo[url] = trashLen + }) + } + + if dryRun { + log.Printf("dry run, not sending trash list to service %s with %d blocks", url, len(v)) + continue + } + count++ log.Printf("Sending trash list to %v", url) @@ -486,7 +528,6 @@ func SendTrashLists(kc *keepclient.KeepClient, spl map[string]TrashList) (errs [ barrier <- nil } })(url, v) - } for i := 0; i < count; i++ {