Merge branch 'master' into github-3408-production-datamanager
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 14 Jul 2015 17:39:36 +0000 (13:39 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 14 Jul 2015 17:39:36 +0000 (13:39 -0400)
Conflicts:
services/datamanager/keep/keep.go

1  2 
services/datamanager/keep/keep.go

index c666337d0dd98da68b1a21e24dd06304b80f4930,912fcf535958fe4dbb97f6667666c9d753f0fc67..c2346cd154fd7fe8ecda1a6a1130061a64bfdb8e
@@@ -10,7 -10,9 +10,8 @@@ import 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "git.curoverse.com/arvados.git/sdk/go/logger"
 -      "git.curoverse.com/arvados.git/sdk/go/manifest"
        "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+       "io"
        "io/ioutil"
        "log"
        "net/http"
@@@ -327,12 -328,36 +328,36 @@@ func ReadServerResponse(arvLogger *logg
  
        response.Address = keepServer
        response.Contents.BlockDigestToInfo =
 -              make(map[blockdigest.BlockDigest]BlockInfo)
 +              make(map[blockdigest.DigestWithSize]BlockInfo)
-       scanner := bufio.NewScanner(resp.Body)
+       reader := bufio.NewReader(resp.Body)
        numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
-       for scanner.Scan() {
+       for {
                numLines++
-               blockInfo, err := parseBlockInfoFromIndexLine(scanner.Text())
+               line, err := reader.ReadString('\n')
+               if err == io.EOF {
+                       loggerutil.FatalWithMessage(arvLogger,
+                               fmt.Sprintf("Index from %s truncated at line %d",
+                                       keepServer.String(), numLines))
+               } else if err != nil {
+                       loggerutil.FatalWithMessage(arvLogger,
+                               fmt.Sprintf("Error reading index response from %s at line %d: %v",
+                                       keepServer.String(), numLines, err))
+               }
+               if line == "\n" {
+                       if _, err := reader.Peek(1); err == nil {
+                               extra, _ := reader.ReadString('\n')
+                               loggerutil.FatalWithMessage(arvLogger,
+                                       fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
+                                               keepServer.String(), numLines+1, extra))
+                       } else if err != io.EOF {
+                               loggerutil.FatalWithMessage(arvLogger,
+                                       fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
+                                               keepServer.String(), numLines, err))
+                       }
+                       numLines--
+                       break
+               }
+               blockInfo, err := parseBlockInfoFromIndexLine(line)
                if err != nil {
                        loggerutil.FatalWithMessage(arvLogger,
                                fmt.Sprintf("Error parsing BlockInfo from index line "+
                        response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
                }
        }
-       if err := scanner.Err(); err != nil {
-               loggerutil.FatalWithMessage(arvLogger,
-                       fmt.Sprintf("Received error scanning index response from %s: %v",
-                               keepServer.String(),
-                               err))
-       } else {
-               log.Printf("%s index contained %d lines with %d duplicates with "+
-                       "%d size disagreements",
-                       keepServer.String(),
-                       numLines,
-                       numDuplicates,
-                       numSizeDisagreements)
-               if arvLogger != nil {
-                       now := time.Now()
-                       arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
-                               keepInfo := logger.GetOrCreateMap(p, "keep_info")
-                               serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
-                               serverInfo["processing_finished_at"] = now
-                               serverInfo["lines_received"] = numLines
-                               serverInfo["duplicates_seen"] = numDuplicates
-                               serverInfo["size_disagreements_seen"] = numSizeDisagreements
-                       })
-               }
+       log.Printf("%s index contained %d lines with %d duplicates with "+
+               "%d size disagreements",
+               keepServer.String(),
+               numLines,
+               numDuplicates,
+               numSizeDisagreements)
+       if arvLogger != nil {
+               now := time.Now()
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
 -                      keepInfo := p["keep_info"].(map[string]interface{})
++                      keepInfo := logger.GetOrCreateMap(p, "keep_info")
+                       serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+                       serverInfo["processing_finished_at"] = now
+                       serverInfo["lines_received"] = numLines
+                       serverInfo["duplicates_seen"] = numDuplicates
+                       serverInfo["size_disagreements_seen"] = numSizeDisagreements
+               })
        }
        resp.Body.Close()
        return