5365: add @distinct handling to workbench select queries and use this to preload...
[arvados.git] / services / datamanager / keep / keep.go
index 20a59316b727699d3715367e3881af2a8a8394a3..93246bc47227c345662fbdc0247d0548728c4aa6 100644 (file)
@@ -24,6 +24,7 @@ import (
 type ServerAddress struct {
        Host string `json:"service_host"`
        Port int    `json:"service_port"`
+       Uuid string `json:"uuid"`
 }
 
 // Info about a particular block returned by the server
@@ -69,15 +70,6 @@ type KeepServiceList struct {
        KeepServers    []ServerAddress `json:"items"`
 }
 
-// Methods to implement util.SdkListResponse Interface
-func (k KeepServiceList) NumItemsAvailable() (numAvailable int, err error) {
-       return k.ItemsAvailable, nil
-}
-
-func (k KeepServiceList) NumItemsContained() (numContained int, err error) {
-       return len(k.KeepServers), nil
-}
-
 var (
        // Don't access the token directly, use getDataManagerToken() to
        // make sure it's been read.
@@ -93,6 +85,7 @@ func init() {
                "File with the API token we should use to contact keep servers.")
 }
 
+// TODO(misha): Change this to include the UUID as well.
 func (s ServerAddress) String() string {
        return fmt.Sprintf("%s:%d", s.Host, s.Port)
 }
@@ -244,12 +237,15 @@ func GetServerStatus(arvLogger *logger.Logger,
                keepServer.Port)
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
                        serverInfo := make(map[string]interface{})
-                       serverInfo["time_status_request_sent"] = time.Now()
+                       serverInfo["status_request_sent_at"] = now
+                       serverInfo["host"] = keepServer.Host
+                       serverInfo["port"] = keepServer.Port
 
-                       keepInfo[keepServer.String()] = serverInfo
+                       keepInfo[keepServer.Uuid] = serverInfo
                })
        }
 
@@ -274,10 +270,11 @@ func GetServerStatus(arvLogger *logger.Logger,
        }
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
-                       serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
-                       serverInfo["time_status_response_processed"] = time.Now()
+                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo["status_response_processed_at"] = now
                        serverInfo["status"] = keepStatus
                })
        }
@@ -289,10 +286,11 @@ func CreateIndexRequest(arvLogger *logger.Logger,
        log.Println("About to fetch keep server contents from " + url)
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
-                       serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
-                       serverInfo["time_index_request_sent"] = time.Now()
+                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo["index_request_sent_at"] = now
                })
        }
 
@@ -319,11 +317,11 @@ func ReadServerResponse(arvLogger *logger.Logger,
        }
 
        if arvLogger != nil {
+               now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
-                       serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
-
-                       serverInfo["time_index_response_received"] = time.Now()
+                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo["index_response_received_at"] = now
                })
        }
 
@@ -358,7 +356,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
                                if arvLogger != nil {
                                        arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                                                keepInfo := p["keep_info"].(map[string]interface{})
-                                               serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
+                                               serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
                                                var error_list []string
                                                read_error_list, has_list := serverInfo["error_list"]
                                                if has_list {
@@ -393,11 +391,12 @@ func ReadServerResponse(arvLogger *logger.Logger,
                        numSizeDisagreements)
 
                if arvLogger != nil {
+                       now := time.Now()
                        arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                                keepInfo := p["keep_info"].(map[string]interface{})
-                               serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
+                               serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
 
-                               serverInfo["time_processing_finished"] = time.Now()
+                               serverInfo["processing_finished_at"] = now
                                serverInfo["lines_received"] = numLines
                                serverInfo["duplicates_seen"] = numDuplicates
                                serverInfo["size_disagreements_seen"] = numSizeDisagreements