"bufio"
"encoding/json"
"errors"
+ "flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
KeepServers []ServerAddress `json:"items"`
}
+var serviceType string
+
+func init() {
+ flag.StringVar(&serviceType,
+ "service-type",
+ "disk",
+ "Operate only on keep_services with the specified service_type, ignoring all others.")
+}
+
// String
// TODO(misha): Change this to include the UUID as well.
func (s ServerAddress) String() string {
// GetKeepServers from api server
func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
sdkParams := arvadosclient.Dict{
- "filters": [][]string{[]string{"service_type", "!=", "proxy"}},
+ "filters": [][]string{{"service_type", "!=", "proxy"}},
}
if params.Limit > 0 {
sdkParams["limit"] = params.Limit
return
}
- // Currently, only "disk" types are supported. Stop if any other service types are found.
+ var keepServers []ServerAddress
for _, server := range sdkResponse.KeepServers {
- if server.ServiceType != "disk" {
- return results, fmt.Errorf("Unsupported service type %q found for: %v", server.ServiceType, server)
+ if server.ServiceType == serviceType {
+ keepServers = append(keepServers, server)
+ } else {
+ log.Printf("Skipping keep_service %q because its service_type %q does not match -service-type=%q", server, server.ServiceType, serviceType)
}
}
+ if len(keepServers) == 0 {
+ return results, fmt.Errorf("Found no keepservices with the service type %v", serviceType)
+ }
+
if params.Logger != nil {
params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
keepInfo := logger.GetOrCreateMap(p, "keep_info")
keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
keepInfo["keep_servers"] = sdkResponse.KeepServers
+ keepInfo["indexable_keep_servers"] = keepServers
})
}
return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse)
}
- results.KeepServerIndexToAddress = sdkResponse.KeepServers
+ results.KeepServerIndexToAddress = keepServers
results.KeepServerAddressToIndex = make(map[ServerAddress]int)
for i, address := range results.KeepServerIndexToAddress {
results.KeepServerAddressToIndex[address] = i
// Send off all the index requests concurrently
responseChan := make(chan ServerResponse)
- for _, keepServer := range sdkResponse.KeepServers {
+ for _, keepServer := range results.KeepServerIndexToAddress {
// The above keepsServer variable is reused for each iteration, so
// it would be shared across all goroutines. This would result in
// us querying one server n times instead of n different servers
results.BlockToServers = make(map[blockdigest.DigestWithSize][]BlockServerInfo)
// Read all the responses
- for i := range sdkResponse.KeepServers {
+ for i := range results.KeepServerIndexToAddress {
_ = i // Here to prevent go from complaining.
response := <-responseChan
client := kc.Client
for url, v := range spl {
+ if arvLogger != nil {
+ // We need a local variable because Update doesn't call our mutator func until later,
+ // when our list variable might have been reused by the next loop iteration.
+ url := url
+ trashLen := len(v)
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+ trashListInfo := logger.GetOrCreateMap(p, "trash_list_len")
+ trashListInfo[url] = trashLen
+ })
+ }
+
if dryRun {
- if arvLogger != nil {
- for url, v := range spl {
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- trashListInfo := logger.GetOrCreateMap(p, "trash_list")
- trashListInfo["started_at"] = time.Now()
- trashListInfo[url] = v
- })
- }
- }
- } else {
- count++
- log.Printf("Sending trash list to %v", url)
-
- go (func(url string, v TrashList) {
- pipeReader, pipeWriter := io.Pipe()
- go (func() {
- enc := json.NewEncoder(pipeWriter)
- enc.Encode(v)
- pipeWriter.Close()
- })()
-
- req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
- if err != nil {
- log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
- barrier <- err
- return
- }
-
- req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
-
- // Make the request
- var resp *http.Response
- if resp, err = client.Do(req); err != nil {
- log.Printf("Error sending trash list to %v error: %v", url, err.Error())
- barrier <- err
- return
- }
-
- log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
-
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
-
- if resp.StatusCode != 200 {
- barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode))
- } else {
- barrier <- nil
- }
- })(url, v)
+ log.Printf("dry run, not sending trash list to service %s with %d blocks", url, len(v))
+ continue
}
+
+ count++
+ log.Printf("Sending trash list to %v", url)
+
+ go (func(url string, v TrashList) {
+ pipeReader, pipeWriter := io.Pipe()
+ go (func() {
+ enc := json.NewEncoder(pipeWriter)
+ enc.Encode(v)
+ pipeWriter.Close()
+ })()
+
+ req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
+ if err != nil {
+ log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
+ barrier <- err
+ return
+ }
+
+ req.Header.Add("Authorization", "OAuth2 "+kc.Arvados.ApiToken)
+
+ // Make the request
+ var resp *http.Response
+ if resp, err = client.Do(req); err != nil {
+ log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+ barrier <- err
+ return
+ }
+
+ log.Printf("Sent trash list to %v: response was HTTP %v", url, resp.Status)
+
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ barrier <- errors.New(fmt.Sprintf("Got HTTP code %v", resp.StatusCode))
+ } else {
+ barrier <- nil
+ }
+ })(url, v)
}
for i := 0; i < count; i++ {