Now reading default replication level from api server.
[arvados.git] / services / datamanager / keep / keep.go
index 93246bc47227c345662fbdc0247d0548728c4aa6..c666337d0dd98da68b1a21e24dd06304b80f4930 100644 (file)
@@ -10,7 +10,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "git.curoverse.com/arvados.git/sdk/go/logger"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "io/ioutil"
        "log"
@@ -29,20 +28,18 @@ type ServerAddress struct {
 
 // Info about a particular block returned by the server
 type BlockInfo struct {
-       Digest blockdigest.BlockDigest
-       Size   int
+       Digest blockdigest.DigestWithSize
        Mtime  int64 // TODO(misha): Replace this with a timestamp.
 }
 
 // Info about a specified block given by a server
 type BlockServerInfo struct {
        ServerIndex int
-       Size        int
        Mtime       int64 // TODO(misha): Replace this with a timestamp.
 }
 
 type ServerContents struct {
-       BlockDigestToInfo map[blockdigest.BlockDigest]BlockInfo
+       BlockDigestToInfo map[blockdigest.DigestWithSize]BlockInfo
 }
 
 type ServerResponse struct {
@@ -55,7 +52,7 @@ type ReadServers struct {
        KeepServerIndexToAddress []ServerAddress
        KeepServerAddressToIndex map[ServerAddress]int
        ServerToContents         map[ServerAddress]ServerContents
-       BlockToServers           map[blockdigest.BlockDigest][]BlockServerInfo
+       BlockToServers           map[blockdigest.DigestWithSize][]BlockServerInfo
        BlockReplicationCounts   map[int]int
 }
 
@@ -87,6 +84,10 @@ func init() {
 
 // TODO(misha): Change this to include the UUID as well.
 func (s ServerAddress) String() string {
+       return s.HostPort()
+}
+
+func (s ServerAddress) HostPort() string {
        return fmt.Sprintf("%s:%d", s.Host, s.Port)
 }
 
@@ -116,7 +117,7 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer
        results = GetKeepServers(params)
        log.Printf("Returned %d keep disks", len(results.ServerToContents))
 
-       ComputeBlockReplicationCounts(&results)
+       results.Summarize(params.Logger)
        log.Printf("Replication level distribution: %v",
                results.BlockReplicationCounts)
 
@@ -146,13 +147,10 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
 
        if params.Logger != nil {
                params.Logger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := make(map[string]interface{})
-
+                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
                        keepInfo["num_keep_servers_available"] = sdkResponse.ItemsAvailable
                        keepInfo["num_keep_servers_received"] = len(sdkResponse.KeepServers)
                        keepInfo["keep_servers"] = sdkResponse.KeepServers
-
-                       p["keep_info"] = keepInfo
                })
        }
 
@@ -191,7 +189,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
        }
 
        results.ServerToContents = make(map[ServerAddress]ServerContents)
-       results.BlockToServers = make(map[blockdigest.BlockDigest][]BlockServerInfo)
+       results.BlockToServers = make(map[blockdigest.DigestWithSize][]BlockServerInfo)
 
        // Read all the responses
        for i := range sdkResponse.KeepServers {
@@ -206,7 +204,6 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
                        results.BlockToServers[blockInfo.Digest] = append(
                                results.BlockToServers[blockInfo.Digest],
                                BlockServerInfo{ServerIndex: serverIndex,
-                                       Size:  blockInfo.Size,
                                        Mtime: blockInfo.Mtime})
                }
        }
@@ -223,7 +220,10 @@ func GetServerContents(arvLogger *logger.Logger,
        resp, err := client.Do(req)
        if err != nil {
                loggerutil.FatalWithMessage(arvLogger,
-                       fmt.Sprintf("Error fetching %s: %v", req.URL.String(), err))
+                       fmt.Sprintf("Error fetching %s: %v. Response was %+v",
+                               req.URL.String(),
+                               err,
+                               resp))
        }
 
        return ReadServerResponse(arvLogger, keepServer, resp)
@@ -239,7 +239,7 @@ func GetServerStatus(arvLogger *logger.Logger,
        if arvLogger != nil {
                now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := p["keep_info"].(map[string]interface{})
+                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
                        serverInfo := make(map[string]interface{})
                        serverInfo["status_request_sent_at"] = now
                        serverInfo["host"] = keepServer.Host
@@ -272,7 +272,7 @@ func GetServerStatus(arvLogger *logger.Logger,
        if arvLogger != nil {
                now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := p["keep_info"].(map[string]interface{})
+                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
                        serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
                        serverInfo["status_response_processed_at"] = now
                        serverInfo["status"] = keepStatus
@@ -288,7 +288,7 @@ func CreateIndexRequest(arvLogger *logger.Logger,
        if arvLogger != nil {
                now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := p["keep_info"].(map[string]interface{})
+                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
                        serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
                        serverInfo["index_request_sent_at"] = now
                })
@@ -319,7 +319,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
        if arvLogger != nil {
                now := time.Now()
                arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                       keepInfo := p["keep_info"].(map[string]interface{})
+                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
                        serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
                        serverInfo["index_response_received_at"] = now
                })
@@ -327,7 +327,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
 
        response.Address = keepServer
        response.Contents.BlockDigestToInfo =
-               make(map[blockdigest.BlockDigest]BlockInfo)
+               make(map[blockdigest.DigestWithSize]BlockInfo)
        scanner := bufio.NewScanner(resp.Body)
        numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
        for scanner.Scan() {
@@ -344,33 +344,8 @@ func ReadServerResponse(arvLogger *logger.Logger,
                if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
                        // This server returned multiple lines containing the same block digest.
                        numDuplicates += 1
-                       if storedBlock.Size != blockInfo.Size {
-                               numSizeDisagreements += 1
-                               // TODO(misha): Consider failing here.
-                               message := fmt.Sprintf("Saw different sizes for the same block "+
-                                       "on %s: %+v %+v",
-                                       keepServer.String(),
-                                       storedBlock,
-                                       blockInfo)
-                               log.Println(message)
-                               if arvLogger != nil {
-                                       arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                                               keepInfo := p["keep_info"].(map[string]interface{})
-                                               serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
-                                               var error_list []string
-                                               read_error_list, has_list := serverInfo["error_list"]
-                                               if has_list {
-                                                       error_list = read_error_list.([]string)
-                                               } // If we didn't have the list, error_list is already an empty list
-                                               serverInfo["error_list"] = append(error_list, message)
-                                       })
-                               }
-                       }
-                       // Keep the block that is bigger, or the block that's newer in
-                       // the case of a size tie.
-                       if storedBlock.Size < blockInfo.Size ||
-                               (storedBlock.Size == blockInfo.Size &&
-                                       storedBlock.Mtime < blockInfo.Mtime) {
+                       // Keep the block that's newer.
+                       if storedBlock.Mtime < blockInfo.Mtime {
                                response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
                        }
                } else {
@@ -393,7 +368,7 @@ func ReadServerResponse(arvLogger *logger.Logger,
                if arvLogger != nil {
                        now := time.Now()
                        arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               keepInfo := p["keep_info"].(map[string]interface{})
+                               keepInfo := logger.GetOrCreateMap(p, "keep_info")
                                serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
 
                                serverInfo["processing_finished_at"] = now
@@ -415,8 +390,10 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err
                        tokens)
        }
 
-       var locator manifest.BlockLocator
-       if locator, err = manifest.ParseBlockLocator(tokens[0]); err != nil {
+       var locator blockdigest.BlockLocator
+       if locator, err = blockdigest.ParseBlockLocator(tokens[0]); err != nil {
+               err = fmt.Errorf("%v Received error while parsing line \"%s\"",
+                       err, indexLine)
                return
        }
        if len(locator.Hints) > 0 {
@@ -430,15 +407,24 @@ func parseBlockInfoFromIndexLine(indexLine string) (blockInfo BlockInfo, err err
        if err != nil {
                return
        }
-       blockInfo.Digest = locator.Digest
-       blockInfo.Size = locator.Size
+       blockInfo.Digest =
+               blockdigest.DigestWithSize{Digest: locator.Digest,
+                       Size: uint32(locator.Size)}
        return
 }
 
-func ComputeBlockReplicationCounts(readServers *ReadServers) {
+func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
        readServers.BlockReplicationCounts = make(map[int]int)
        for _, infos := range readServers.BlockToServers {
                replication := len(infos)
                readServers.BlockReplicationCounts[replication] += 1
        }
+
+       if arvLogger != nil {
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       keepInfo := logger.GetOrCreateMap(p, "keep_info")
+                       keepInfo["distinct_blocks_stored"] = len(readServers.BlockToServers)
+               })
+       }
+
 }