X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d005c38bea17107e7be3c5338ad31ba54085df61..d03f44d2d2c93841a50cbfca1a74600bc504b593:/services/datamanager/keep/keep.go?ds=sidebyside diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go index c49a0211a8..206a9c43fd 100644 --- a/services/datamanager/keep/keep.go +++ b/services/datamanager/keep/keep.go @@ -6,6 +6,7 @@ import ( "bufio" "encoding/json" "errors" + "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/blockdigest" @@ -76,6 +77,15 @@ type ServiceList struct { KeepServers []ServerAddress `json:"items"` } +var serviceType string + +func init() { + flag.StringVar(&serviceType, + "service-type", + "disk", + "Operate only on keep_services with the specified service_type, ignoring all others.") +} + // String // TODO(misha): Change this to include the UUID as well. func (s ServerAddress) String() string { @@ -121,19 +131,26 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error return } - // Currently, only "disk" types are supported. Stop if any other service types are found. + var keepServers []ServerAddress for _, server := range sdkResponse.KeepServers { - if server.ServiceType != "disk" { - return results, fmt.Errorf("Unsupported service type %q found for: %v", server.ServiceType, server) + if server.ServiceType == serviceType { + keepServers = append(keepServers, server) + } else { + log.Printf("Skipping keep_service %q because its service_type %q does not match -service-type=%q", server, server.ServiceType, serviceType) } } + if len(keepServers) == 0 { + return results, fmt.Errorf("Found no keepservices with the service type %v", serviceType) + } + if params.Logger != nil { params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := logger.GetOrCreateMap(p, "keep_info") keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers) keepInfo["keep_servers"] = sdkResponse.KeepServers + keepInfo["indexable_keep_servers"] = keepServers }) } @@ -143,7 +160,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse) } - results.KeepServerIndexToAddress = sdkResponse.KeepServers + results.KeepServerIndexToAddress = keepServers results.KeepServerAddressToIndex = make(map[ServerAddress]int) for i, address := range results.KeepServerIndexToAddress { results.KeepServerAddressToIndex[address] = i @@ -153,7 +170,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error // Send off all the index requests concurrently responseChan := make(chan ServerResponse) - for _, keepServer := range sdkResponse.KeepServers { + for _, keepServer := range results.KeepServerIndexToAddress { // The above keepsServer variable is reused for each iteration, so // it would be shared across all goroutines. This would result in // us querying one server n times instead of n different servers @@ -171,7 +188,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error results.BlockToServers = make(map[blockdigest.DigestWithSize][]BlockServerInfo) // Read all the responses - for i := range sdkResponse.KeepServers { + for i := range results.KeepServerIndexToAddress { _ = i // Here to prevent go from complaining. response := <-responseChan @@ -456,57 +473,61 @@ func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map client := kc.Client for url, v := range spl { + if arvLogger != nil { + // We need a local variable because Update doesn't call our mutator func until later, + // when our list variable might have been reused by the next loop iteration. + url := url + trashLen := len(v) + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + trashListInfo := logger.GetOrCreateMap(p, "trash_list_len") + trashListInfo[url] = trashLen + }) + } + if dryRun { - if arvLogger != nil { - for url, v := range spl { - arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { - trashListInfo := logger.GetOrCreateMap(p, "trash_list") - trashListInfo["started_at"] = time.Now() - trashListInfo[url] = v - }) - } - } - } else { - count++ - log.Printf("Sending trash list to %v", url) - - go (func(url string, v TrashList) { - pipeReader, pipeWriter := io.Pipe() - go (func() { - enc := json.NewEncoder(pipeWriter) - enc.Encode(v) - pipeWriter.Close() - })() - - req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader) - if err != nil { - log.Printf("Error creating trash list request for %v error: %v", url, err.Error()) - barrier <- err - return - } - - req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken) - - // Make the request - var resp *http.Response - if resp, err = client.Do(req); err != nil { - log.Printf("Error sending trash list to %v error: %v", url, err.Error()) - barrier <- err - return - } - - log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status) - - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - - if resp.StatusCode != 200 { - barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode)) - } else { - barrier <- nil - } - })(url, v) + log.Printf("dry run, not sending trash list to service %s with %d blocks", url, len(v)) + continue } + + count++ + log.Printf("Sending trash list to %v", url) + + go (func(url string, v TrashList) { + pipeReader, pipeWriter := io.Pipe() + go (func() { + enc := json.NewEncoder(pipeWriter) + enc.Encode(v) + pipeWriter.Close() + })() + + req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader) + if err != nil { + log.Printf("Error creating trash list request for %v error: %v", url, err.Error()) + barrier <- err + return + } + + req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken) + + // Make the request + var resp *http.Response + if resp, err = client.Do(req); err != nil { + log.Printf("Error sending trash list to %v error: %v", url, err.Error()) + barrier <- err + return + } + + log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status) + + io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + + if resp.StatusCode != 200 { + barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode)) + } else { + barrier <- nil + } + })(url, v) } for i := 0; i < count; i++ {