X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c79792f08d8faf712f70c67ed1216feabc5d9408..d03f44d2d2c93841a50cbfca1a74600bc504b593:/services/datamanager/keep/keep.go diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go index 20244fac35..206a9c43fd 100644 --- a/services/datamanager/keep/keep.go +++ b/services/datamanager/keep/keep.go @@ -6,12 +6,12 @@ import ( "bufio" "encoding/json" "errors" + "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/logger" - "git.curoverse.com/arvados.git/services/datamanager/loggerutil" "io" "io/ioutil" "log" @@ -51,6 +51,7 @@ type ServerContents struct { type ServerResponse struct { Address ServerAddress Contents ServerContents + Err error } // ReadServers struct @@ -76,6 +77,15 @@ type ServiceList struct { KeepServers []ServerAddress `json:"items"` } +var serviceType string + +func init() { + flag.StringVar(&serviceType, + "service-type", + "disk", + "Operate only on keep_services with the specified service_type, ignoring all others.") +} + // String // TODO(misha): Change this to include the UUID as well. func (s ServerAddress) String() string { @@ -93,6 +103,9 @@ func (s ServerAddress) URL() string { // GetKeepServersAndSummarize gets keep servers from api func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers, err error) { results, err = GetKeepServers(params) + if err != nil { + return + } log.Printf("Returned %d keep disks", len(results.ServerToContents)) results.Summarize(params.Logger) @@ -118,19 +131,26 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error return } - // Currently, only "disk" types are supported. Stop if any other service types are found. + var keepServers []ServerAddress for _, server := range sdkResponse.KeepServers { - if server.ServiceType != "disk" { - return results, fmt.Errorf("Unsupported service type %q found for: %v", server.ServiceType, server) + if server.ServiceType == serviceType { + keepServers = append(keepServers, server) + } else { + log.Printf("Skipping keep_service %q because its service_type %q does not match -service-type=%q", server, server.ServiceType, serviceType) } } + if len(keepServers) == 0 { + return results, fmt.Errorf("Found no keepservices with the service type %v", serviceType) + } + if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := logger.GetOrCreateMap(p, "keep_info") keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers) keepInfo["keep_servers"] = sdkResponse.KeepServers + keepInfo["indexable_keep_servers"] = keepServers }) } @@ -140,7 +160,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse) } - results.KeepServerIndexToAddress = sdkResponse.KeepServers + results.KeepServerIndexToAddress = keepServers results.KeepServerAddressToIndex = make(map[ServerAddress]int) for i, address := range results.KeepServerIndexToAddress { results.KeepServerAddressToIndex[address] = i @@ -150,7 +170,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error // Send off all the index requests concurrently responseChan := make(chan ServerResponse) - for _, keepServer := range sdkResponse.KeepServers { + for _, keepServer := range results.KeepServerIndexToAddress { // The above keepsServer variable is reused for each iteration, so // it would be shared across all goroutines. This would result in // us querying one server n times instead of n different servers @@ -168,13 +188,13 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error results.BlockToServers = make(map[blockdigest.DigestWithSize][]BlockServerInfo) // Read all the responses - for i := range sdkResponse.KeepServers { + for i := range results.KeepServerIndexToAddress { _ = i // Here to prevent go from complaining. response := <-responseChan - // There might have been an error during GetServerContents; so check if the response is empty - if response.Address.Host == "" { - return results, fmt.Errorf("Error during GetServerContents; no host info found") + // Check if there were any errors during GetServerContents + if response.Err != nil { + return results, response.Err } log.Printf("Received channel response from %v containing %d files", @@ -199,31 +219,26 @@ func GetServerContents(arvLogger *logger.Logger, err := GetServerStatus(arvLogger, keepServer, arv) if err != nil { - loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetServerStatus: %v", err)) - return ServerResponse{} + response.Err = err + return } req, err := CreateIndexRequest(arvLogger, keepServer, arv) if err != nil { - loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error building CreateIndexRequest: %v", err)) - return ServerResponse{} + response.Err = err + return } resp, err := arv.Client.Do(req) if err != nil { - loggerutil.LogErrorMessage(arvLogger, - fmt.Sprintf("Error fetching %s: %v. Response was %+v", - req.URL.String(), - err, - resp)) - return ServerResponse{} + response.Err = err + return } response, err = ReadServerResponse(arvLogger, keepServer, resp) if err != nil { - loggerutil.LogErrorMessage(arvLogger, - fmt.Sprintf("Error during ReadServerResponse %v", err)) - return ServerResponse{} + response.Err = err + return } return @@ -311,8 +326,7 @@ func ReadServerResponse(arvLogger *logger.Logger, resp *http.Response) (response ServerResponse, err error) { if resp.StatusCode != 200 { - return response, fmt.Errorf("Received error code %d in response to request "+ - "for %s index: %s", + return response, fmt.Errorf("Received error code %d in response to index request for %s: %s", resp.StatusCode, keepServer.String(), resp.Status) } @@ -399,19 +413,19 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err tokens := strings.Fields(indexLine) if len(tokens) != 2 { err = fmt.Errorf("Expected 2 tokens per line but received a "+ - "line containing %v instead.", + "line containing %#q instead.", tokens) } var locator blockdigest.BlockLocator if locator, err = blockdigest.ParseBlockLocator(tokens[0]); err != nil { - err = fmt.Errorf("%v Received error while parsing line \"%s\"", + err = fmt.Errorf("%v Received error while parsing line \"%#q\"", err, indexLine) return } if len(locator.Hints) > 0 { err = fmt.Errorf("Block locator in index line should not contain hints "+ - "but it does: %v", + "but it does: %#q", locator) return } @@ -452,13 +466,29 @@ type TrashRequest struct { type TrashList []TrashRequest // SendTrashLists to trash queue -func SendTrashLists(kc *keepclient.KeepClient, spl map[string]TrashList) (errs []error) { +func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList, dryRun bool) (errs []error) { count := 0 barrier := make(chan error) client := kc.Client for url, v := range spl { + if arvLogger != nil { + // We need a local variable because Update doesn't call our mutator func until later, + // when our list variable might have been reused by the next loop iteration. + url := url + trashLen := len(v) + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + trashListInfo := logger.GetOrCreateMap(p, "trash_list_len") + trashListInfo[url] = trashLen + }) + } + + if dryRun { + log.Printf("dry run, not sending trash list to service %s with %d blocks", url, len(v)) + continue + } + count++ log.Printf("Sending trash list to %v", url) @@ -498,7 +528,6 @@ func SendTrashLists(kc *keepclient.KeepClient, spl map[string]TrashList) (errs [ barrier <- nil } })(url, v) - } for i := 0; i < count; i++ {