"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
"git.curoverse.com/arvados.git/sdk/go/logger"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+ "io"
"io/ioutil"
"log"
"net/http"
response.Address = keepServer
response.Contents.BlockDigestToInfo =
- make(map[blockdigest.BlockDigest]BlockInfo)
+ make(map[blockdigest.DigestWithSize]BlockInfo)
- scanner := bufio.NewScanner(resp.Body)
+ reader := bufio.NewReader(resp.Body)
numLines, numDuplicates, numSizeDisagreements := 0, 0, 0
- for scanner.Scan() {
+ for {
numLines++
- blockInfo, err := parseBlockInfoFromIndexLine(scanner.Text())
+ line, err := reader.ReadString('\n')
+ if err == io.EOF {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s truncated at line %d",
+ keepServer.String(), numLines))
+ } else if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Error reading index response from %s at line %d: %v",
+ keepServer.String(), numLines, err))
+ }
+ if line == "\n" {
+ if _, err := reader.Peek(1); err == nil {
+ extra, _ := reader.ReadString('\n')
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
+ keepServer.String(), numLines+1, extra))
+ } else if err != io.EOF {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
+ keepServer.String(), numLines, err))
+ }
+ numLines--
+ break
+ }
+ blockInfo, err := parseBlockInfoFromIndexLine(line)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
fmt.Sprintf("Error parsing BlockInfo from index line "+
response.Contents.BlockDigestToInfo[blockInfo.Digest] = blockInfo
}
}
- if err := scanner.Err(); err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Received error scanning index response from %s: %v",
- keepServer.String(),
- err))
- } else {
- log.Printf("%s index contained %d lines with %d duplicates with "+
- "%d size disagreements",
- keepServer.String(),
- numLines,
- numDuplicates,
- numSizeDisagreements)
-
- if arvLogger != nil {
- now := time.Now()
- arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := logger.GetOrCreateMap(p, "keep_info")
- serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
-
- serverInfo["processing_finished_at"] = now
- serverInfo["lines_received"] = numLines
- serverInfo["duplicates_seen"] = numDuplicates
- serverInfo["size_disagreements_seen"] = numSizeDisagreements
- })
- }
+
+ log.Printf("%s index contained %d lines with %d duplicates with "+
+ "%d size disagreements",
+ keepServer.String(),
+ numLines,
+ numDuplicates,
+ numSizeDisagreements)
+
+ if arvLogger != nil {
+ now := time.Now()
+ arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
- keepInfo := p["keep_info"].(map[string]interface{})
++ keepInfo := logger.GetOrCreateMap(p, "keep_info")
+ serverInfo := keepInfo[keepServer.Uuid].(map[string]interface{})
+
+ serverInfo["processing_finished_at"] = now
+ serverInfo["lines_received"] = numLines
+ serverInfo["duplicates_seen"] = numDuplicates
+ serverInfo["size_disagreements_seen"] = numSizeDisagreements
+ })
}
resp.Body.Close()
return