// to call multiple times in a single run.
// Otherwise we would see cumulative numbers as explained here:
// https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() {
+func WriteHeapProfile() error {
if heapProfileFilename != "" {
-
heapProfile, err := os.Create(heapProfileFilename)
if err != nil {
- log.Fatal(err)
+ return err
}
defer heapProfile.Close()
err = pprof.WriteHeapProfile(heapProfile)
- if err != nil {
- log.Fatal(err)
- }
+ return err
}
+
+ return nil
}
// GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
- results = GetCollections(params)
+func GetCollectionsAndSummarize(arvLogger *logger.Logger, params GetCollectionsParams) (results ReadCollections) {
+ results, err := GetCollections(params)
+ if err != nil {
+ loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetCollections with params %v: %v", params, err))
+ }
+
results.Summarize(params.Logger)
log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
}
// GetCollections gets collections from api
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
if ¶ms.Client == nil {
- log.Fatalf("params.Client passed to GetCollections() should " +
+ err = fmt.Errorf("params.Client passed to GetCollections() should " +
"contain a valid ArvadosClient, but instead it is nil.")
}
var defaultReplicationLevel int
{
- value, err := params.Client.Discovery("defaultCollectionReplication")
+ var value interface{}
+ value, err = params.Client.Discovery("defaultCollectionReplication")
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying default collection replication: %v", err))
+ return
}
defaultReplicationLevel = int(value.(float64))
if defaultReplicationLevel <= 0 {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Default collection replication returned by arvados SDK "+
- "should be a positive integer but instead it was %d.",
- defaultReplicationLevel))
+ err = fmt.Errorf("Default collection replication returned by arvados SDK "+
+ "should be a positive integer but instead it was %d.",
+ defaultReplicationLevel)
+ return
}
}
initialNumberOfCollectionsAvailable, err :=
util.NumberItemsAvailable(params.Client, "collections")
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying collection count: %v", err))
+ return
}
// Include a 1% margin for collections added while we're reading so
// that we don't have to grow the map in most cases.
// We're still finding new collections
// Write the heap profile for examining memory usage
- WriteHeapProfile()
+ err = WriteHeapProfile()
+ if err != nil {
+ return
+ }
// Get next batch of collections.
var collections SdkCollectionList
- err := params.Client.List("collections", sdkParams, &collections)
+ err = params.Client.List("collections", sdkParams, &collections)
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error querying collections: %v", err))
+ return
}
// Process collection and update our date filter.
- sdkParams["filters"].([][]string)[0][2] =
- ProcessCollections(params.Logger,
- collections.Items,
- defaultReplicationLevel,
- results.UUIDToCollection).Format(time.RFC3339)
+ var latestModificationDate time.Time
+ latestModificationDate, err = ProcessCollections(params.Logger,
+ collections.Items,
+ defaultReplicationLevel,
+ results.UUIDToCollection)
+ if err != nil {
+ return
+ }
+ sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
// update counts
previousTotalCollections = totalCollections
func ProcessCollections(arvLogger *logger.Logger,
receivedCollections []SdkCollectionInfo,
defaultReplicationLevel int,
- UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
+ UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
for _, sdkCollection := range receivedCollections {
collection := Collection{UUID: StrCopy(sdkCollection.UUID),
OwnerUUID: StrCopy(sdkCollection.OwnerUUID),
BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
if sdkCollection.ModifiedAt.IsZero() {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf(
- "Arvados SDK collection returned with unexpected zero "+
- "modification date. This probably means that either we failed to "+
- "parse the modification date or the API server has changed how "+
- "it returns modification dates: %+v",
- collection))
+ return latestModificationDate, fmt.Errorf(
+ "Arvados SDK collection returned with unexpected zero "+
+ "modification date. This probably means that either we failed to "+
+ "parse the modification date or the API server has changed how "+
+ "it returns modification dates: %+v",
+ collection)
}
if sdkCollection.ModifiedAt.After(latestModificationDate) {
blockChannel := manifest.BlockIterWithDuplicates()
for block := range blockChannel {
if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
- message := fmt.Sprintf(
+ err = fmt.Errorf(
"Collection %s contains multiple sizes (%d and %d) for block %s",
collection.UUID,
storedSize,
block.Size,
block.Digest)
- loggerutil.FatalWithMessage(arvLogger, message)
+ return
}
collection.BlockDigestToSize[block.Digest] = block.Size
}
}
// GetKeepServersAndSummarize gets keep servers from api
-func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
- results = GetKeepServers(params)
+func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers, err error) {
+ results, err = GetKeepServers(params)
log.Printf("Returned %d keep disks", len(results.ServerToContents))
results.Summarize(params.Logger)
}
// GetKeepServers from api server
-func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
+func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
sdkParams := arvadosclient.Dict{
"filters": [][]string{[]string{"service_type", "!=", "proxy"}},
}
}
var sdkResponse ServiceList
- err := params.Client.List("keep_services", sdkParams, &sdkResponse)
+ err = params.Client.List("keep_services", sdkParams, &sdkResponse)
if err != nil {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Error requesting keep disks from API server: %v", err))
+ return
}
// Currently, only "disk" types are supported. Stop if any other service types are found.
for _, server := range sdkResponse.KeepServers {
if server.ServiceType != "disk" {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Unsupported service type %q found for: %v", server.ServiceType, server))
+ return results, fmt.Errorf("Unsupported service type %q found for: %v", server.ServiceType, server)
}
}
log.Printf("Received keep services list: %+v", sdkResponse)
if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
- loggerutil.FatalWithMessage(params.Logger,
- fmt.Sprintf("Did not receive all available keep servers: %+v", sdkResponse))
+ return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse)
}
results.KeepServerIndexToAddress = sdkResponse.KeepServers
for i := range sdkResponse.KeepServers {
_ = i // Here to prevent go from complaining.
response := <-responseChan
+
+ // There might have been an error during GetServerContents; so check if the response is empty
+ if response.Address.Host == "" {
+ return results, fmt.Errorf("Error during GetServerContents; no host info found")
+ }
+
log.Printf("Received channel response from %v containing %d files",
response.Address,
len(response.Contents.BlockDigestToInfo))
keepServer ServerAddress,
arv arvadosclient.ArvadosClient) (response ServerResponse) {
- GetServerStatus(arvLogger, keepServer, arv)
+ err := GetServerStatus(arvLogger, keepServer, arv)
+ if err != nil {
+ loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetServerStatus: %v", err))
+ return ServerResponse{}
+ }
+
+ req, err := CreateIndexRequest(arvLogger, keepServer, arv)
+ if err != nil {
+ loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error building CreateIndexRequest: %v", err))
+ return ServerResponse{}
+ }
- req := CreateIndexRequest(arvLogger, keepServer, arv)
resp, err := arv.Client.Do(req)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
+ loggerutil.LogErrorMessage(arvLogger,
fmt.Sprintf("Error fetching %s: %v. Response was %+v",
req.URL.String(),
err,
resp))
+ return ServerResponse{}
+ }
+
+ response, err = ReadServerResponse(arvLogger, keepServer, resp)
+ if err != nil {
+ loggerutil.LogErrorMessage(arvLogger,
+ fmt.Sprintf("Error during ReadServerResponse %v", err))
+ return ServerResponse{}
}
- return ReadServerResponse(arvLogger, keepServer, resp)
+ return
}
// GetServerStatus get keep server status by invoking /status.json
func GetServerStatus(arvLogger *logger.Logger,
keepServer ServerAddress,
- arv arvadosclient.ArvadosClient) {
+ arv arvadosclient.ArvadosClient) error {
url := fmt.Sprintf("http://%s:%d/status.json",
keepServer.Host,
keepServer.Port)
resp, err := arv.Client.Get(url)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error getting keep status from %s: %v", url, err))
+ return fmt.Errorf("Error getting keep status from %s: %v", url, err)
} else if resp.StatusCode != 200 {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Received error code %d in response to request "+
- "for %s status: %s",
- resp.StatusCode, url, resp.Status))
+ return fmt.Errorf("Received error code %d in response to request "+
+ "for %s status: %s",
+ resp.StatusCode, url, resp.Status)
}
var keepStatus map[string]interface{}
decoder.UseNumber()
err = decoder.Decode(&keepStatus)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error decoding keep status from %s: %v", url, err))
+ return fmt.Errorf("Error decoding keep status from %s: %v", url, err)
}
if arvLogger != nil {
serverInfo["status"] = keepStatus
})
}
+
+ return nil
}
// CreateIndexRequest to the keep server
func CreateIndexRequest(arvLogger *logger.Logger,
keepServer ServerAddress,
- arv arvadosclient.ArvadosClient) (req *http.Request) {
+ arv arvadosclient.ArvadosClient) (req *http.Request, err error) {
url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
log.Println("About to fetch keep server contents from " + url)
})
}
- req, err := http.NewRequest("GET", url, nil)
+ req, err = http.NewRequest("GET", url, nil)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error building http request for %s: %v", url, err))
+ return req, fmt.Errorf("Error building http request for %s: %v", url, err)
}
req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
- return
+ return req, err
}
// ReadServerResponse reads reasponse from keep server
func ReadServerResponse(arvLogger *logger.Logger,
keepServer ServerAddress,
- resp *http.Response) (response ServerResponse) {
+ resp *http.Response) (response ServerResponse, err error) {
if resp.StatusCode != 200 {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Received error code %d in response to request "+
- "for %s index: %s",
- resp.StatusCode, keepServer.String(), resp.Status))
+ return response, fmt.Errorf("Received error code %d in response to request "+
+ "for %s index: %s",
+ resp.StatusCode, keepServer.String(), resp.Status)
}
if arvLogger != nil {
numLines++
line, err := reader.ReadString('\n')
if err == io.EOF {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Index from %s truncated at line %d",
- keepServer.String(), numLines))
+ return response, fmt.Errorf("Index from %s truncated at line %d",
+ keepServer.String(), numLines)
} else if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error reading index response from %s at line %d: %v",
- keepServer.String(), numLines, err))
+ return response, fmt.Errorf("Error reading index response from %s at line %d: %v",
+ keepServer.String(), numLines, err)
}
if line == "\n" {
if _, err := reader.Peek(1); err == nil {
extra, _ := reader.ReadString('\n')
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
- keepServer.String(), numLines+1, extra))
+ return response, fmt.Errorf("Index from %s had trailing data at line %d after EOF marker: %s",
+ keepServer.String(), numLines+1, extra)
} else if err != io.EOF {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
- keepServer.String(), numLines, err))
+ return response, fmt.Errorf("Index from %s had read error after EOF marker at line %d: %v",
+ keepServer.String(), numLines, err)
}
numLines--
break
}
blockInfo, err := parseBlockInfoFromIndexLine(line)
if err != nil {
- loggerutil.FatalWithMessage(arvLogger,
- fmt.Sprintf("Error parsing BlockInfo from index line "+
- "received from %s: %v",
- keepServer.String(),
- err))
+ return response, fmt.Errorf("Error parsing BlockInfo from index line "+
+ "received from %s: %v",
+ keepServer.String(),
+ err)
}
if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {