5365: add @distinct handling to workbench select queries and use this to preload...
[arvados.git] / services / datamanager / keep / keep.go
index cf6803ac96376a27304841c72d6aee35118cad58..93246bc47227c345662fbdc0247d0548728c4aa6 100644 (file)
@@ -4,6 +4,7 @@ package keep
 
 import (
        "bufio"
+       "encoding/json"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -23,6 +24,7 @@ import (
 type ServerAddress struct {
        Host string `json:"service_host"`
        Port int    `json:"service_port"`
+       Uuid string `json:"uuid"`
 }
 
 // Info about a particular block returned by the server
@@ -68,15 +70,6 @@ type KeepServiceList struct {
        KeepServers    []ServerAddress `json:"items"`
 }
 
-// Methods to implement util.SdkListResponse Interface
-func (k KeepServiceList) NumItemsAvailable() (numAvailable int, err error) {
-       return k.ItemsAvailable, nil
-}
-
-func (k KeepServiceList) NumItemsContained() (numContained int, err error) {
-       return len(k.KeepServers), nil
-}
-
 var (
        // Don't access the token directly, use getDataManagerToken() to
        // make sure it's been read.
@@ -92,6 +85,7 @@ func init() {
                "File with the API token we should use to contact keep servers.")
 }
 
+// TODO(misha): Change this to include the UUID as well.
 func (s ServerAddress) String() string {
        return fmt.Sprintf("%s:%d", s.Host, s.Port)
 }
@@ -223,6 +217,8 @@ func GetServerContents(arvLogger *logger.Logger,
        keepServer ServerAddress,
        client http.Client) (response ServerResponse) {
 
+       GetServerStatus(arvLogger, keepServer, client)
+
        req := CreateIndexRequest(arvLogger, keepServer)
        resp, err := client.Do(req)
        if err != nil {
@@ -233,18 +229,68 @@ func GetServerContents(arvLogger *logger.Logger,
        return ReadServerResponse(arvLogger, keepServer, resp)
 }
 
+func GetServerStatus(arvLogger *logger.Logger,
+       keepServer ServerAddress,
+       client http.Client) {
+       url := fmt.Sprintf("http://%s:%d/status.json",
+               keepServer.Host,
+               keepServer.Port)
+
+       if arvLogger != nil {
+               now := time.Now()
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       keepInfo := p["keep_info"].(map[string]interface{})
+                       serverInfo := make(map[string]interface{})
+                       serverInfo["status_request_sent_at"] = now
+                       serverInfo["host"] = keepServer.Host
+                       serverInfo["port"] = keepServer.Port
+
+                       keepInfo[keepServer.Uuid] = serverInfo
+               })
+       }
+
+       resp, err := client.Get(url)
+       if err != nil {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Error getting keep status from %s: %v", url, err))
+       } else if resp.StatusCode != 200 {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Received error code %d in response to request "+
+                               "for %s status: %s",
+                               resp.StatusCode, url, resp.Status))
+       }
+
+       var keepStatus map[string]interface{}
+       decoder := json.NewDecoder(resp.Body)
+       decoder.UseNumber()
+       err = decoder.Decode(&keepStatus)
+       if err != nil {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Error decoding keep status from %s: %v", url, err))
+       }
+
+       if arvLogger != nil {
+               now := time.Now()
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       keepInfo := p["keep_info"].(map[string]interface{})
+                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo["status_response_processed_at"] = now
+                       serverInfo["status"] = keepStatus
+               })
+       }
+}
+
 func CreateIndexRequest(arvLogger *logger.Logger,
        keepServer ServerAddress) (req *http.Request) {
        url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
        log.Println("About to fetch keep server contents from " + url)
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
-                       serverInfo := make(map[string]interface{})
-                       serverInfo["request_sent"] = time.Now()
-
-                       keepInfo[keepServer.String()] = serverInfo
+                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo["index_request_sent_at"] = now
                })
        }
 
@@ -271,11 +317,11 @@ func ReadServerResponse(arvLogger *logger.Logger,
        }
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
-                       serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
-
-                       serverInfo["response_received"] = time.Now()
+                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo["index_response_received_at"] = now
                })
        }
 
@@ -302,7 +348,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
                                numSizeDisagreements += 1
                                // TODO(misha): Consider failing here.
                                message := fmt.Sprintf("Saw different sizes for the same block "+
-                                       "on %s: %v %v",
+                                       "on %s: %+v %+v",
                                        keepServer.String(),
                                        storedBlock,
                                        blockInfo)
@@ -310,7 +356,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
                                if arvLogger != nil {
                                        arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                                                keepInfo := p["keep_info"].(map[string]interface{})
-                                               serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
+                                               serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
                                                var error_list []string
                                                read_error_list, has_list := serverInfo["error_list"]
                                                if has_list {
@@ -345,11 +391,12 @@ func ReadServerResponse(arvLogger *logger.Logger,
                        numSizeDisagreements)
 
                if arvLogger != nil {
+                       now := time.Now()
                        arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                                keepInfo := p["keep_info"].(map[string]interface{})
-                               serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
+                               serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
 
-                               serverInfo["processing_finished"] = time.Now()
+                               serverInfo["processing_finished_at"] = now
                                serverInfo["lines_received"] = numLines
                                serverInfo["duplicates_seen"] = numDuplicates
                                serverInfo["size_disagreements_seen"] = numSizeDisagreements