From: Peter Amstutz Date: Tue, 14 Jul 2015 17:39:36 +0000 (-0400) Subject: Merge branch 'master' into github-3408-production-datamanager X-Git-Tag: 1.1.0~1505^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/0e57453d2b637a3d105d4e3d67031f3915f9d302 Merge branch 'master' into github-3408-production-datamanager Conflicts: services/datamanager/keep/keep.go --- 0e57453d2b637a3d105d4e3d67031f3915f9d302 diff --cc services/datamanager/keep/keep.go index c666337d0d,912fcf5359..c2346cd154 --- a/services/datamanager/keep/keep.go +++ b/services/datamanager/keep/keep.go @@@ -10,7 -10,9 +10,8 @@@ import "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/blockdigest" "git.curoverse.com/arvados.git/sdk/go/logger" - "git.curoverse.com/arvados.git/sdk/go/manifest" "git.curoverse.com/arvados.git/services/datamanager/loggerutil" + "io" "io/ioutil" "log" "net/http" @@@ -327,12 -328,36 +328,36 @@@ func ReadServerResponse(arvLogger *logg response.Address = keepServer response.Contents.BlockDigestToInfo = - make(map[blockdigest.BlockDigest]BlockInfo) + make(map[blockdigest.DigestWithSize]BlockInfo) - scanner := bufio.NewScanner(resp.Body) + reader := bufio.NewReader(resp.Body) numLines, numDuplicates, numSizeDisagreements := 0, 0, 0 - for scanner.Scan() { + for { numLines++ - blockInfo, err := parseBlockInfoFromIndexLine(scanner.Text()) + line, err := reader.ReadString('\n') + if err == io.EOF { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Index from %s truncated at line %d", + keepServer.String(), numLines)) + } else if err != nil { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Error reading index response from %s at line %d: %v", + keepServer.String(), numLines, err)) + } + if line == "\n" { + if _, err := reader.Peek(1); err == nil { + extra, _ := reader.ReadString('\n') + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s", + keepServer.String(), numLines+1, extra)) + } else if err != io.EOF { + loggerutil.FatalWithMessage(arvLogger, + fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v", + keepServer.String(), numLines, err)) + } + numLines-- + break + } + blockInfo, err := parseBlockInfoFromIndexLine(line) if err != nil { loggerutil.FatalWithMessage(arvLogger, fmt.Sprintf("Error parsing BlockInfo from index line "+ @@@ -352,31 -402,25 +377,25 @@@ response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo } } - if err := scanner.Err(); err != nil { - loggerutil.FatalWithMessage(arvLogger, - fmt.Sprintf("Received error scanning index response from %s: %v", - keepServer.String(), - err)) - } else { - log.Printf("%s index contained %d lines with %d duplicates with "+ - "%d size disagreements", - keepServer.String(), - numLines, - numDuplicates, - numSizeDisagreements) - - if arvLogger != nil { - now := time.Now() - arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { - keepInfo := logger.GetOrCreateMap(p, "keep_info") - serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) - - serverInfo["processing_finished_at"] = now - serverInfo["lines_received"] = numLines - serverInfo["duplicates_seen"] = numDuplicates - serverInfo["size_disagreements_seen"] = numSizeDisagreements - }) - } + + log.Printf("%s index contained %d lines with %d duplicates with "+ + "%d size disagreements", + keepServer.String(), + numLines, + numDuplicates, + numSizeDisagreements) + + if arvLogger != nil { + now := time.Now() + arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) { - keepInfo := p["keep_info"].(map[string]interface{}) ++ keepInfo := logger.GetOrCreateMap(p, "keep_info") + serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{}) + + serverInfo["processing_finished_at"] = now + serverInfo["lines_received"] = numLines + serverInfo["duplicates_seen"] = numDuplicates + serverInfo["size_disagreements_seen"] = numSizeDisagreements + }) } resp.Body.Close() return