X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4df1175e30c21850af394fcd60c9bb7ca3d981a5..9daf42fbdb868939653c6e3ca8a4fffd1cf94e31:/services/datamanager/keep/keep.go diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go index cf6803ac96..93246bc472 100644 --- a/services/datamanager/keep/keep.go +++ b/services/datamanager/keep/keep.go @@ -4,6 +4,7 @@ package keep import ( "bufio" + "encoding/json" "flag" "fmt" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" @@ -23,6 +24,7 @@ import ( type ServerAddress struct { Host string `json:"service_host"` Port int `json:"service_port"` + Uuid string `json:"uuid"` } // Info about a particular block returned by the server @@ -68,15 +70,6 @@ type KeepServiceList struct { KeepServers []ServerAddress `json:"items"` } -// Methods to implement util.SdkListResponse Interface -func (k KeepServiceList) NumItemsAvailable() (numAvailable int, err error) { - return k.ItemsAvailable, nil -} - -func (k KeepServiceList) NumItemsContained() (numContained int, err error) { - return len(k.KeepServers), nil -} - var ( // Don't access the token directly, use getDataManagerToken() to // make sure it's been read. @@ -92,6 +85,7 @@ func init() { "File with the API token we should use to contact keep servers.") } +// TODO(misha): Change this to include the UUID as well. func (s ServerAddress) String() string { return fmt.Sprintf("%s:%d", s.Host, s.Port) } @@ -223,6 +217,8 @@ func GetServerContents(arvLogger *logger.Logger, keepServer ServerAddress, client http.Client) (response ServerResponse) { + GetServerStatus(arvLogger, keepServer, client) + req := CreateIndexRequest(arvLogger, keepServer) resp, err := client.Do(req) if err != nil { @@ -233,18 +229,68 @@ func GetServerContents(arvLogger *logger.Logger, return ReadServerResponse(arvLogger, keepServer, resp) } +func GetServerStatus(arvLogger *logger.Logger, + keepServer ServerAddress, + client http.Client) { + url := fmt.Sprintf("http://%s:%d/status.json", + keepServer.Host, + keepServer.Port) + + if arvLogger != nil { + now := time.Now() + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + keepInfo := p["keep_info"].(map[string]interface{}) + serverInfo := make(map[string]interface{}) + serverInfo["status_request_sent_at"] = now + serverInfo["host"] = keepServer.Host + serverInfo["port"] = keepServer.Port + + keepInfo[keepServer.Uuid] = serverInfo + }) + } + + resp, err := client.Get(url) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Error getting keep status from %s: %v", url, err)) + } else if resp.StatusCode != 200 { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Received error code %d in response to request "+ + "for %s status: %s", + resp.StatusCode, url, resp.Status)) + } + + var keepStatus map[string]interface{} + decoder := json.NewDecoder(resp.Body) + decoder.UseNumber() + err = decoder.Decode(&keepStatus) + if err != nil { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Error decoding keep status from %s: %v", url, err)) + } + + if arvLogger != nil { + now := time.Now() + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { + keepInfo := p["keep_info"].(map[string]interface{}) + serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) + serverInfo["status_response_processed_at"] = now + serverInfo["status"] = keepStatus + }) + } +} + func CreateIndexRequest(arvLogger *logger.Logger, keepServer ServerAddress) (req *http.Request) { url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port) log.Println("About to fetch keep server contents from " + url) if arvLogger != nil { + now := time.Now() arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := p["keep_info"].(map[string]interface{}) - serverInfo := make(map[string]interface{}) - serverInfo["request_sent"] = time.Now() - - keepInfo[keepServer.String()] = serverInfo + serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) + serverInfo["index_request_sent_at"] = now }) } @@ -271,11 +317,11 @@ func ReadServerResponse(arvLogger *logger.Logger, } if arvLogger != nil { + now := time.Now() arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := p["keep_info"].(map[string]interface{}) - serverInfo := keepInfo[keepServer.String()].(map[string]interface{}) - - serverInfo["response_received"] = time.Now() + serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) + serverInfo["index_response_received_at"] = now }) } @@ -302,7 +348,7 @@ func ReadServerResponse(arvLogger *logger.Logger, numSizeDisagreements += 1 // TODO(misha): Consider failing here. message := fmt.Sprintf("Saw different sizes for the same block "+ - "on %s: %v %v", + "on %s: %+v %+v", keepServer.String(), storedBlock, blockInfo) @@ -310,7 +356,7 @@ func ReadServerResponse(arvLogger *logger.Logger, if arvLogger != nil { arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := p["keep_info"].(map[string]interface{}) - serverInfo := keepInfo[keepServer.String()].(map[string]interface{}) + serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) var error_list []string read_error_list, has_list := serverInfo["error_list"] if has_list { @@ -345,11 +391,12 @@ func ReadServerResponse(arvLogger *logger.Logger, numSizeDisagreements) if arvLogger != nil { + now := time.Now() arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { keepInfo := p["keep_info"].(map[string]interface{}) - serverInfo := keepInfo[keepServer.String()].(map[string]interface{}) + serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) - serverInfo["processing_finished"] = time.Now() + serverInfo["processing_finished_at"] = now serverInfo["lines_received"] = numLines serverInfo["duplicates_seen"] = numDuplicates serverInfo["size_disagreements_seen"] = numSizeDisagreements