Now fetch Keep Server Status and record it to the log. Renamed some fields and added...
authormishaz <misha@curoverse.com>
Fri, 30 Jan 2015 01:25:11 +0000 (01:25 +0000)
committerTom Clegg <tom@curoverse.com>
Fri, 13 Feb 2015 21:25:31 +0000 (16:25 -0500)
sdk/go/logger/logger.go
services/datamanager/keep/keep.go

index ca344be045a79d554cdb0226d9c0066ae055146a..a53ab3cd8d2b4d1f792e75d2b211a4a75811d654 100644 (file)
@@ -173,6 +173,14 @@ func (l *Logger) write() {
        l.entry["event_type"] = l.params.EventType
 
        // Write the log entry.
+       // This is a network write and will take a while, which is bad
+       // because we're holding a lock and all other goroutines will back
+       // up behind it.
+       //
+       // TODO(misha): Consider rewriting this so that we can encode l.data
+       // into a string, release the lock, write the string, and then
+       // acquire the lock again to note that we succeeded in writing. This
+       // will be tricky and will require support in the client.
        err := l.params.Client.Create("logs", l.data, nil)
        if err != nil {
                log.Printf("Attempted to log: %v", l.data)
index 39ac30a1b9938247ee537fe77824bcf07647d066..20a59316b727699d3715367e3881af2a8a8394a3 100644 (file)
@@ -4,6 +4,7 @@ package keep
 
 import (
        "bufio"
+       "encoding/json"
        "flag"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
@@ -223,6 +224,8 @@ func GetServerContents(arvLogger *logger.Logger,
        keepServer ServerAddress,
        client http.Client) (response ServerResponse) {
 
+       GetServerStatus(arvLogger, keepServer, client)
+
        req := CreateIndexRequest(arvLogger, keepServer)
        resp, err := client.Do(req)
        if err != nil {
@@ -233,6 +236,53 @@ func GetServerContents(arvLogger *logger.Logger,
        return ReadServerResponse(arvLogger, keepServer, resp)
 }
 
+func GetServerStatus(arvLogger *logger.Logger,
+       keepServer ServerAddress,
+       client http.Client) {
+       url := fmt.Sprintf("http://%s:%d/status.json",
+               keepServer.Host,
+               keepServer.Port)
+
+       if arvLogger != nil {
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       keepInfo := p["keep_info"].(map[string]interface{})
+                       serverInfo := make(map[string]interface{})
+                       serverInfo["time_status_request_sent"] = time.Now()
+
+                       keepInfo[keepServer.String()] = serverInfo
+               })
+       }
+
+       resp, err := client.Get(url)
+       if err != nil {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Error getting keep status from %s: %v", url, err))
+       } else if resp.StatusCode != 200 {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Received error code %d in response to request "+
+                               "for %s status: %s",
+                               resp.StatusCode, url, resp.Status))
+       }
+
+       var keepStatus map[string]interface{}
+       decoder := json.NewDecoder(resp.Body)
+       decoder.UseNumber()
+       err = decoder.Decode(&keepStatus)
+       if err != nil {
+               loggerutil.FatalWithMessage(arvLogger,
+                       fmt.Sprintf("Error decoding keep status from %s: %v", url, err))
+       }
+
+       if arvLogger != nil {
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       keepInfo := p["keep_info"].(map[string]interface{})
+                       serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
+                       serverInfo["time_status_response_processed"] = time.Now()
+                       serverInfo["status"] = keepStatus
+               })
+       }
+}
+
 func CreateIndexRequest(arvLogger *logger.Logger,
        keepServer ServerAddress) (req *http.Request) {
        url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
@@ -241,10 +291,8 @@ func CreateIndexRequest(arvLogger *logger.Logger,
        if arvLogger != nil {
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
                        keepInfo := p["keep_info"].(map[string]interface{})
-                       serverInfo := make(map[string]interface{})
-                       serverInfo["request_sent"] = time.Now()
-
-                       keepInfo[keepServer.String()] = serverInfo
+                       serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
+                       serverInfo["time_index_request_sent"] = time.Now()
                })
        }
 
@@ -275,7 +323,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
                        keepInfo := p["keep_info"].(map[string]interface{})
                        serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
 
-                       serverInfo["response_received"] = time.Now()
+                       serverInfo["time_index_response_received"] = time.Now()
                })
        }
 
@@ -349,7 +397,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
                                keepInfo := p["keep_info"].(map[string]interface{})
                                serverInfo := keepInfo[keepServer.String()].(map[string]interface{})
 
-                               serverInfo["processing_finished"] = time.Now()
+                               serverInfo["time_processing_finished"] = time.Now()
                                serverInfo["lines_received"] = numLines
                                serverInfo["duplicates_seen"] = numDuplicates
                                serverInfo["size_disagreements_seen"] = numSizeDisagreements