Merge branch 'master' into 7490-datamanager-dont-die-return-error
authorradhika <radhika@curoverse.com>
Wed, 11 Nov 2015 17:34:04 +0000 (12:34 -0500)
committerradhika <radhika@curoverse.com>
Wed, 11 Nov 2015 17:34:04 +0000 (12:34 -0500)
services/datamanager/collection/collection.go
services/datamanager/datamanager.go
services/datamanager/datamanager_test.go
services/datamanager/keep/keep.go
services/datamanager/loggerutil/loggerutil.go
services/datamanager/summary/pull_list.go

index ca03627405e0742ad419b1d1dd6daf64fba7a341..55b0b3711a258ef4aa89de27fd585f11577749b5 100644 (file)
@@ -79,26 +79,29 @@ func init() {
 // to call multiple times in a single run.
 // Otherwise we would see cumulative numbers as explained here:
 // https://groups.google.com/d/msg/golang-nuts/ZyHciRglQYc/2nh4Ndu2fZcJ
-func WriteHeapProfile() {
+func WriteHeapProfile() error {
        if heapProfileFilename != "" {
-
                heapProfile, err := os.Create(heapProfileFilename)
                if err != nil {
-                       log.Fatal(err)
+                       return err
                }
 
                defer heapProfile.Close()
 
                err = pprof.WriteHeapProfile(heapProfile)
-               if err != nil {
-                       log.Fatal(err)
-               }
+               return err
        }
+
+       return nil
 }
 
 // GetCollectionsAndSummarize gets collections from api and summarizes
-func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollections) {
-       results = GetCollections(params)
+func GetCollectionsAndSummarize(arvLogger *logger.Logger, params GetCollectionsParams) (results ReadCollections) {
+       results, err := GetCollections(params)
+       if err != nil {
+               loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetCollections with params %v: %v", params, err))
+       }
+
        results.Summarize(params.Logger)
 
        log.Printf("Uuid to Size used: %v", results.OwnerToCollectionSize)
@@ -116,9 +119,9 @@ func GetCollectionsAndSummarize(params GetCollectionsParams) (results ReadCollec
 }
 
 // GetCollections gets collections from api
-func GetCollections(params GetCollectionsParams) (results ReadCollections) {
+func GetCollections(params GetCollectionsParams) (results ReadCollections, err error) {
        if &params.Client == nil {
-               log.Fatalf("params.Client passed to GetCollections() should " +
+               err = fmt.Errorf("params.Client passed to GetCollections() should " +
                        "contain a valid ArvadosClient, but instead it is nil.")
        }
 
@@ -139,26 +142,25 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
 
        var defaultReplicationLevel int
        {
-               value, err := params.Client.Discovery("defaultCollectionReplication")
+               var value interface{}
+               value, err = params.Client.Discovery("defaultCollectionReplication")
                if err != nil {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Error querying default collection replication: %v", err))
+                       return
                }
 
                defaultReplicationLevel = int(value.(float64))
                if defaultReplicationLevel <= 0 {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Default collection replication returned by arvados SDK "+
-                                       "should be a positive integer but instead it was %d.",
-                                       defaultReplicationLevel))
+                       err = fmt.Errorf("Default collection replication returned by arvados SDK "+
+                               "should be a positive integer but instead it was %d.",
+                               defaultReplicationLevel)
+                       return
                }
        }
 
        initialNumberOfCollectionsAvailable, err :=
                util.NumberItemsAvailable(params.Client, "collections")
        if err != nil {
-               loggerutil.FatalWithMessage(params.Logger,
-                       fmt.Sprintf("Error querying collection count: %v", err))
+               return
        }
        // Include a 1% margin for collections added while we're reading so
        // that we don't have to grow the map in most cases.
@@ -183,22 +185,28 @@ func GetCollections(params GetCollectionsParams) (results ReadCollections) {
                // We're still finding new collections
 
                // Write the heap profile for examining memory usage
-               WriteHeapProfile()
+               err = WriteHeapProfile()
+               if err != nil {
+                       return
+               }
 
                // Get next batch of collections.
                var collections SdkCollectionList
-               err := params.Client.List("collections", sdkParams, &collections)
+               err = params.Client.List("collections", sdkParams, &collections)
                if err != nil {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Error querying collections: %v", err))
+                       return
                }
 
                // Process collection and update our date filter.
-               sdkParams["filters"].([][]string)[0][2] =
-                       ProcessCollections(params.Logger,
-                               collections.Items,
-                               defaultReplicationLevel,
-                               results.UUIDToCollection).Format(time.RFC3339)
+               var latestModificationDate time.Time
+               latestModificationDate, err = ProcessCollections(params.Logger,
+                       collections.Items,
+                       defaultReplicationLevel,
+                       results.UUIDToCollection)
+               if err != nil {
+                       return
+               }
+               sdkParams["filters"].([][]string)[0][2] = latestModificationDate.Format(time.RFC3339)
 
                // update counts
                previousTotalCollections = totalCollections
@@ -240,7 +248,7 @@ func StrCopy(s string) string {
 func ProcessCollections(arvLogger *logger.Logger,
        receivedCollections []SdkCollectionInfo,
        defaultReplicationLevel int,
-       UUIDToCollection map[string]Collection) (latestModificationDate time.Time) {
+       UUIDToCollection map[string]Collection) (latestModificationDate time.Time, err error) {
        for _, sdkCollection := range receivedCollections {
                collection := Collection{UUID: StrCopy(sdkCollection.UUID),
                        OwnerUUID:         StrCopy(sdkCollection.OwnerUUID),
@@ -248,13 +256,12 @@ func ProcessCollections(arvLogger *logger.Logger,
                        BlockDigestToSize: make(map[blockdigest.BlockDigest]int)}
 
                if sdkCollection.ModifiedAt.IsZero() {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf(
-                                       "Arvados SDK collection returned with unexpected zero "+
-                                               "modification date. This probably means that either we failed to "+
-                                               "parse the modification date or the API server has changed how "+
-                                               "it returns modification dates: %+v",
-                                       collection))
+                       return latestModificationDate, fmt.Errorf(
+                               "Arvados SDK collection returned with unexpected zero "+
+                                       "modification date. This probably means that either we failed to "+
+                                       "parse the modification date or the API server has changed how "+
+                                       "it returns modification dates: %+v",
+                               collection)
                }
 
                if sdkCollection.ModifiedAt.After(latestModificationDate) {
@@ -278,13 +285,13 @@ func ProcessCollections(arvLogger *logger.Logger,
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
                        if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
-                               message := fmt.Sprintf(
+                               err = fmt.Errorf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
                                        collection.UUID,
                                        storedSize,
                                        block.Size,
                                        block.Digest)
-                               loggerutil.FatalWithMessage(arvLogger, message)
+                               return
                        }
                        collection.BlockDigestToSize[block.Digest] = block.Size
                }
index a9306ce83a6011002cef96b86eb6caf700feda23..a19d01f6cb954eb8978cb596aa266b1c7800eff9 100644 (file)
@@ -42,7 +42,11 @@ func init() {
 func main() {
        flag.Parse()
        if minutesBetweenRuns == 0 {
-               err := singlerun(makeArvadosClient())
+               arv, err := makeArvadosClient()
+               if err != nil {
+                       log.Fatalf("makeArvadosClient: %v", err)
+               }
+               err = singlerun(arv)
                if err != nil {
                        log.Fatalf("singlerun: %v", err)
                }
@@ -50,7 +54,11 @@ func main() {
                waitTime := time.Minute * time.Duration(minutesBetweenRuns)
                for {
                        log.Println("Beginning Run")
-                       err := singlerun(makeArvadosClient())
+                       arv, err := makeArvadosClient()
+                       if err != nil {
+                               log.Fatalf("makeArvadosClient: %v", err)
+                       }
+                       err = singlerun(arv)
                        if err != nil {
                                log.Printf("singlerun: %v", err)
                        }
@@ -60,12 +68,8 @@ func main() {
        }
 }
 
-func makeArvadosClient() arvadosclient.ArvadosClient {
-       arv, err := arvadosclient.MakeArvadosClient()
-       if err != nil {
-               log.Fatalf("Error setting up arvados client: %s", err)
-       }
-       return arv
+func makeArvadosClient() (arvadosclient.ArvadosClient, error) {
+       return arvadosclient.MakeArvadosClient()
 }
 
 func singlerun(arv arvadosclient.ArvadosClient) error {
@@ -126,8 +130,7 @@ func singlerun(arv arvadosclient.ArvadosClient) error {
 
        kc, err := keepclient.MakeKeepClient(&arv)
        if err != nil {
-               loggerutil.FatalWithMessage(arvLogger,
-                       fmt.Sprintf("Error setting up keep client %s", err.Error()))
+               return fmt.Errorf("Error setting up keep client %v", err.Error())
        }
 
        // Log that we're finished. We force the recording, since go will
@@ -154,7 +157,10 @@ func singlerun(arv arvadosclient.ArvadosClient) error {
                &keepServerInfo,
                replicationSummary.KeepBlocksNotInCollections)
 
-       summary.WritePullLists(arvLogger, pullLists)
+       err = summary.WritePullLists(arvLogger, pullLists)
+       if err != nil {
+               return err
+       }
 
        if trashErr != nil {
                return err
@@ -173,18 +179,24 @@ func BuildDataFetcher(arv arvadosclient.ArvadosClient) summary.DataFetcher {
 
                go func() {
                        collectionChannel <- collection.GetCollectionsAndSummarize(
+                               arvLogger,
                                collection.GetCollectionsParams{
                                        Client:    arv,
                                        Logger:    arvLogger,
                                        BatchSize: 50})
                }()
 
-               *keepServerInfo = keep.GetKeepServersAndSummarize(
+               var err error
+               *keepServerInfo, err = keep.GetKeepServersAndSummarize(
                        keep.GetKeepServersParams{
                                Client: arv,
                                Logger: arvLogger,
                                Limit:  1000})
 
+               if err != nil {
+                       return
+               }
+
                *readCollections = <-collectionChannel
        }
 }
index 685f94c88f3a35c33f6aa986e85701a5853e5d32..26a2fbf92ba124c8cbe4e534c39931107f084dca 100644 (file)
@@ -28,7 +28,11 @@ func SetupDataManagerTest(t *testing.T) {
        arvadostest.StartAPI()
        arvadostest.StartKeep(2, false)
 
-       arv = makeArvadosClient()
+       var err error
+       arv, err = makeArvadosClient()
+       if err != nil {
+               t.Fatalf("Error making arvados client: %s", err)
+       }
        arv.ApiToken = arvadostest.DataManagerToken
 
        // keep client
@@ -40,7 +44,7 @@ func SetupDataManagerTest(t *testing.T) {
        }
 
        // discover keep services
-       if err := keepClient.DiscoverKeepServers(); err != nil {
+       if err = keepClient.DiscoverKeepServers(); err != nil {
                t.Fatalf("Error discovering keep services: %s", err)
        }
        keepServers = []string{}
index 3a9c21a43855a472c4f5b43aa9b651fd85f506d4..20244fac3573024af0a34c2a8d8de5415f70104a 100644 (file)
@@ -91,8 +91,8 @@ func (s ServerAddress) URL() string {
 }
 
 // GetKeepServersAndSummarize gets keep servers from api
-func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers) {
-       results = GetKeepServers(params)
+func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServers, err error) {
+       results, err = GetKeepServers(params)
        log.Printf("Returned %d keep disks", len(results.ServerToContents))
 
        results.Summarize(params.Logger)
@@ -103,7 +103,7 @@ func GetKeepServersAndSummarize(params GetKeepServersParams) (results ReadServer
 }
 
 // GetKeepServers from api server
-func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
+func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
        sdkParams := arvadosclient.Dict{
                "filters": [][]string{[]string{"service_type", "!=", "proxy"}},
        }
@@ -112,18 +112,16 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
        }
 
        var sdkResponse ServiceList
-       err := params.Client.List("keep_services", sdkParams, &sdkResponse)
+       err = params.Client.List("keep_services", sdkParams, &sdkResponse)
 
        if err != nil {
-               loggerutil.FatalWithMessage(params.Logger,
-                       fmt.Sprintf("Error requesting keep disks from API server: %v", err))
+               return
        }
 
        // Currently, only "disk" types are supported. Stop if any other service types are found.
        for _, server := range sdkResponse.KeepServers {
                if server.ServiceType != "disk" {
-                       loggerutil.FatalWithMessage(params.Logger,
-                               fmt.Sprintf("Unsupported service type %q found for: %v", server.ServiceType, server))
+                       return results, fmt.Errorf("Unsupported service type %q found for: %v", server.ServiceType, server)
                }
        }
 
@@ -139,8 +137,7 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
        log.Printf("Received keep services list: %+v", sdkResponse)
 
        if len(sdkResponse.KeepServers) < sdkResponse.ItemsAvailable {
-               loggerutil.FatalWithMessage(params.Logger,
-                       fmt.Sprintf("Did not receive all available keep servers: %+v", sdkResponse))
+               return results, fmt.Errorf("Did not receive all available keep servers: %+v", sdkResponse)
        }
 
        results.KeepServerIndexToAddress = sdkResponse.KeepServers
@@ -174,6 +171,12 @@ func GetKeepServers(params GetKeepServersParams) (results ReadServers) {
        for i := range sdkResponse.KeepServers {
                _ = i // Here to prevent go from complaining.
                response := <-responseChan
+
+               // There might have been an error during GetServerContents; so check if the response is empty
+               if response.Address.Host == "" {
+                       return results, fmt.Errorf("Error during GetServerContents; no host info found")
+               }
+
                log.Printf("Received channel response from %v containing %d files",
                        response.Address,
                        len(response.Contents.BlockDigestToInfo))
@@ -194,25 +197,42 @@ func GetServerContents(arvLogger *logger.Logger,
        keepServer ServerAddress,
        arv arvadosclient.ArvadosClient) (response ServerResponse) {
 
-       GetServerStatus(arvLogger, keepServer, arv)
+       err := GetServerStatus(arvLogger, keepServer, arv)
+       if err != nil {
+               loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error during GetServerStatus: %v", err))
+               return ServerResponse{}
+       }
+
+       req, err := CreateIndexRequest(arvLogger, keepServer, arv)
+       if err != nil {
+               loggerutil.LogErrorMessage(arvLogger, fmt.Sprintf("Error building CreateIndexRequest: %v", err))
+               return ServerResponse{}
+       }
 
-       req := CreateIndexRequest(arvLogger, keepServer, arv)
        resp, err := arv.Client.Do(req)
        if err != nil {
-               loggerutil.FatalWithMessage(arvLogger,
+               loggerutil.LogErrorMessage(arvLogger,
                        fmt.Sprintf("Error fetching %s: %v. Response was %+v",
                                req.URL.String(),
                                err,
                                resp))
+               return ServerResponse{}
+       }
+
+       response, err = ReadServerResponse(arvLogger, keepServer, resp)
+       if err != nil {
+               loggerutil.LogErrorMessage(arvLogger,
+                       fmt.Sprintf("Error during ReadServerResponse %v", err))
+               return ServerResponse{}
        }
 
-       return ReadServerResponse(arvLogger, keepServer, resp)
+       return
 }
 
 // GetServerStatus get keep server status by invoking /status.json
 func GetServerStatus(arvLogger *logger.Logger,
        keepServer ServerAddress,
-       arv arvadosclient.ArvadosClient) {
+       arv arvadosclient.ArvadosClient) error {
        url := fmt.Sprintf("http://%s:%d/status.json",
                keepServer.Host,
                keepServer.Port)
@@ -232,13 +252,11 @@ func GetServerStatus(arvLogger *logger.Logger,
 
        resp, err := arv.Client.Get(url)
        if err != nil {
-               loggerutil.FatalWithMessage(arvLogger,
-                       fmt.Sprintf("Error getting keep status from %s: %v", url, err))
+               return fmt.Errorf("Error getting keep status from %s: %v", url, err)
        } else if resp.StatusCode != 200 {
-               loggerutil.FatalWithMessage(arvLogger,
-                       fmt.Sprintf("Received error code %d in response to request "+
-                               "for %s status: %s",
-                               resp.StatusCode, url, resp.Status))
+               return fmt.Errorf("Received error code %d in response to request "+
+                       "for %s status: %s",
+                       resp.StatusCode, url, resp.Status)
        }
 
        var keepStatus map[string]interface{}
@@ -246,8 +264,7 @@ func GetServerStatus(arvLogger *logger.Logger,
        decoder.UseNumber()
        err = decoder.Decode(&keepStatus)
        if err != nil {
-               loggerutil.FatalWithMessage(arvLogger,
-                       fmt.Sprintf("Error decoding keep status from %s: %v", url, err))
+               return fmt.Errorf("Error decoding keep status from %s: %v", url, err)
        }
 
        if arvLogger != nil {
@@ -259,12 +276,14 @@ func GetServerStatus(arvLogger *logger.Logger,
                        serverInfo["status"] = keepStatus
                })
        }
+
+       return nil
 }
 
 // CreateIndexRequest to the keep server
 func CreateIndexRequest(arvLogger *logger.Logger,
        keepServer ServerAddress,
-       arv arvadosclient.ArvadosClient) (req *http.Request) {
+       arv arvadosclient.ArvadosClient) (req *http.Request, err error) {
        url := fmt.Sprintf("http://%s:%d/index", keepServer.Host, keepServer.Port)
        log.Println("About to fetch keep server contents from " + url)
 
@@ -277,26 +296,24 @@ func CreateIndexRequest(arvLogger *logger.Logger,
                })
        }
 
-       req, err := http.NewRequest("GET", url, nil)
+       req, err = http.NewRequest("GET", url, nil)
        if err != nil {
-               loggerutil.FatalWithMessage(arvLogger,
-                       fmt.Sprintf("Error building http request for %s: %v", url, err))
+               return req, fmt.Errorf("Error building http request for %s: %v", url, err)
        }
 
        req.Header.Add("Authorization", "OAuth2 "+arv.ApiToken)
-       return
+       return req, err
 }
 
 // ReadServerResponse reads reasponse from keep server
 func ReadServerResponse(arvLogger *logger.Logger,
        keepServer ServerAddress,
-       resp *http.Response) (response ServerResponse) {
+       resp *http.Response) (response ServerResponse, err error) {
 
        if resp.StatusCode != 200 {
-               loggerutil.FatalWithMessage(arvLogger,
-                       fmt.Sprintf("Received error code %d in response to request "+
-                               "for %s index: %s",
-                               resp.StatusCode, keepServer.String(), resp.Status))
+               return response, fmt.Errorf("Received error code %d in response to request "+
+                       "for %s index: %s",
+                       resp.StatusCode, keepServer.String(), resp.Status)
        }
 
        if arvLogger != nil {
@@ -317,35 +334,30 @@ func ReadServerResponse(arvLogger *logger.Logger,
                numLines++
                line, err := reader.ReadString('\n')
                if err == io.EOF {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Index from %s truncated at line %d",
-                                       keepServer.String(), numLines))
+                       return response, fmt.Errorf("Index from %s truncated at line %d",
+                               keepServer.String(), numLines)
                } else if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Error reading index response from %s at line %d: %v",
-                                       keepServer.String(), numLines, err))
+                       return response, fmt.Errorf("Error reading index response from %s at line %d: %v",
+                               keepServer.String(), numLines, err)
                }
                if line == "\n" {
                        if _, err := reader.Peek(1); err == nil {
                                extra, _ := reader.ReadString('\n')
-                               loggerutil.FatalWithMessage(arvLogger,
-                                       fmt.Sprintf("Index from %s had trailing data at line %d after EOF marker: %s",
-                                               keepServer.String(), numLines+1, extra))
+                               return response, fmt.Errorf("Index from %s had trailing data at line %d after EOF marker: %s",
+                                       keepServer.String(), numLines+1, extra)
                        } else if err != io.EOF {
-                               loggerutil.FatalWithMessage(arvLogger,
-                                       fmt.Sprintf("Index from %s had read error after EOF marker at line %d: %v",
-                                               keepServer.String(), numLines, err))
+                               return response, fmt.Errorf("Index from %s had read error after EOF marker at line %d: %v",
+                                       keepServer.String(), numLines, err)
                        }
                        numLines--
                        break
                }
                blockInfo, err := parseBlockInfoFromIndexLine(line)
                if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Error parsing BlockInfo from index line "+
-                                       "received from %s: %v",
-                                       keepServer.String(),
-                                       err))
+                       return response, fmt.Errorf("Error parsing BlockInfo from index line "+
+                               "received from %s: %v",
+                               keepServer.String(),
+                               err)
                }
 
                if storedBlock, ok := response.Contents.BlockDigestToInfo[blockInfo.Digest]; ok {
index 8c655cd5ff68a981146493bfa21fd71693ba12c0..453f7e16bc340cd286f97abfa9ac926a71110cca 100644 (file)
@@ -50,3 +50,12 @@ func FatalWithMessage(arvLogger *logger.Logger, message string) {
 
        log.Fatalf(message)
 }
+
+func LogErrorMessage(arvLogger *logger.Logger, message string) {
+       if arvLogger != nil {
+               arvLogger.Update(func(p map[string]interface{}, e map[string]interface{}) {
+                       runInfo := logger.GetOrCreateMap(p, "run_info")
+                       runInfo["ERROR"] = message
+               })
+       }
+}
index cc01249a624a7f4947cdcfc8dafd73dd7e347377..107abf66db63f997e60e6053590a8a921c16158d 100644 (file)
@@ -9,7 +9,6 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/logger"
        "git.curoverse.com/arvados.git/services/datamanager/keep"
-       "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
        "log"
        "os"
        "strings"
@@ -176,23 +175,22 @@ func BuildPullLists(lps map[Locator]PullServers) (spl map[string]PullList) {
 // This is just a hack for prototyping, it is not expected to be used
 // in production.
 func WritePullLists(arvLogger *logger.Logger,
-       pullLists map[string]PullList) {
+       pullLists map[string]PullList) error {
        r := strings.NewReplacer(":", ".")
        for host, list := range pullLists {
                filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
                pullListFile, err := os.Create(filename)
                if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to open %s: %v", filename, err))
+                       return err
                }
                defer pullListFile.Close()
 
                enc := json.NewEncoder(pullListFile)
                err = enc.Encode(list)
                if err != nil {
-                       loggerutil.FatalWithMessage(arvLogger,
-                               fmt.Sprintf("Failed to write pull list to %s: %v", filename, err))
+                       return err
                }
                log.Printf("Wrote pull list to %s.", filename)
        }
+       return nil
 }