7253: instead of ignoring errors during BlockIterWithDuplicates, send them to the...
authorradhika <radhika@curoverse.com>
Wed, 25 Nov 2015 17:06:05 +0000 (12:06 -0500)
committerradhika <radhika@curoverse.com>
Wed, 25 Nov 2015 17:06:05 +0000 (12:06 -0500)
sdk/go/manifest/manifest.go
sdk/go/manifest/manifest_test.go
services/datamanager/collection/collection.go
services/datamanager/collection/collection_test.go

index f104d9a1035127c3f75502854e176c72a4017e1d..9ce3d820bbba9364483eb6ca8e65ba89e945a455 100644 (file)
@@ -8,7 +8,6 @@ import (
        "errors"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
-       "log"
        "regexp"
        "strconv"
        "strings"
@@ -49,6 +48,7 @@ type ManifestStream struct {
        StreamName string
        Blocks     []string
        FileTokens []string
+       Err        error
 }
 
 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
@@ -193,6 +193,21 @@ func parseManifestStream(s string) (m ManifestStream) {
        }
        m.Blocks = tokens[:i]
        m.FileTokens = tokens[i:]
+
+       if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
+               m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
+               return
+       }
+
+       fileTokens := m.FileTokens
+       for j := range m.FileTokens {
+               _, _, _, err := parseFileToken(fileTokens[j])
+               if err != nil {
+                       m.Err = fmt.Errorf("Invalid file token: %s", fileTokens[j])
+                       break
+               }
+       }
+
        return
 }
 
@@ -232,18 +247,28 @@ func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
        return ch
 }
 
+type ManifestBlockLocator struct {
+       Locator blockdigest.BlockLocator
+       Err     error
+}
+
 // Blocks may appear mulitple times within the same manifest if they
 // are used by multiple files. In that case this Iterator will output
 // the same block multiple times.
-func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
-       blockChannel := make(chan blockdigest.BlockLocator)
+func (m *Manifest) BlockIterWithDuplicates() <-chan ManifestBlockLocator {
+       blockChannel := make(chan ManifestBlockLocator)
        go func(streamChannel <-chan ManifestStream) {
                for m := range streamChannel {
+                       if m.Err != nil {
+                               blockChannel <- ManifestBlockLocator{Locator: blockdigest.BlockLocator{}, Err: m.Err}
+                               continue
+                       }
                        for _, block := range m.Blocks {
-                               if b, err := blockdigest.ParseBlockLocator(block); err == nil {
-                                       blockChannel <- b
+                               b, err := blockdigest.ParseBlockLocator(block)
+                               if err == nil {
+                                       blockChannel <- ManifestBlockLocator{b, nil}
                                } else {
-                                       log.Printf("ERROR: Failed to parse block: %v", err)
+                                       blockChannel <- ManifestBlockLocator{Locator: blockdigest.BlockLocator{}, Err: err}
                                }
                        }
                }
index 1483b361198a92516d786bfeef9390ea0e5c24ba..d8227ed687060ecd237131ee999d9237991a11ea 100644 (file)
@@ -3,6 +3,7 @@ package manifest
 import (
        "io/ioutil"
        "reflect"
+       "regexp"
        "runtime"
        "testing"
 
@@ -12,8 +13,8 @@ import (
 
 func getStackTrace() string {
        buf := make([]byte, 1000)
-       bytes_written := runtime.Stack(buf, false)
-       return "Stack Trace:\n" + string(buf[:bytes_written])
+       bytesWritten := runtime.Stack(buf, false)
+       return "Stack Trace:\n" + string(buf[:bytesWritten])
 }
 
 func expectFromChannel(t *testing.T, c <-chan string, expected string) {
@@ -121,21 +122,21 @@ func TestBlockIterLongManifest(t *testing.T) {
        blockChannel := manifest.BlockIterWithDuplicates()
 
        firstBlock := <-blockChannel
+
        expectBlockLocator(t,
-               firstBlock,
+               firstBlock.Locator,
                blockdigest.BlockLocator{Digest: blockdigest.AssertFromString("b746e3d2104645f2f64cd3cc69dd895d"),
                        Size:  15693477,
                        Hints: []string{"E2866e643690156651c03d876e638e674dcd79475@5441920c"}})
        blocksRead := 1
-       var lastBlock blockdigest.BlockLocator
+       var lastBlock ManifestBlockLocator
        for lastBlock = range blockChannel {
-               //log.Printf("Blocks Read: %d", blocksRead)
                blocksRead++
        }
        expectEqual(t, blocksRead, 853)
 
        expectBlockLocator(t,
-               lastBlock,
+               lastBlock.Locator,
                blockdigest.BlockLocator{Digest: blockdigest.AssertFromString("f9ce82f59e5908d2d70e18df9679b469"),
                        Size:  31367794,
                        Hints: []string{"E53f903684239bcc114f7bf8ff9bd6089f33058db@5441920c"}})
@@ -195,3 +196,40 @@ func TestFileSegmentIterByName(t *testing.T) {
                }
        }
 }
+
+func TestBlockIterWithBadManifest(t *testing.T) {
+       testCases := [][]string{
+               {"notavalidstreamname acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt", "Invalid stream name: notavalidstreamname"},
+               {". acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt", ""},
+               {". acbd18db4cc2f85cedef654fccc4a4d8+3 file1.txt", "Invalid file token: file1.txt"},
+               {". acbd18db4cc2f85cedef654fccc4a4d+3 0:1:file1.txt", "Invalid file token: acbd18db4cc2f85cedef654fccc4a4d.*"},
+               {". acbd18db4cc2f85cedef654fccc4a4d8 0:1:file1.txt", "Invalid file token: acbd18db4cc2f85cedef654fccc4a4d8"},
+               {". acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt file2.txt 1:2:file3.txt", "Invalid file token: file2.txt"},
+               {"/badstream acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt file2.txt 1:2:file3.txt", "Invalid stream name: /badstream"},
+               {"./goodstream acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt 1:2:file2.txt", ""},
+       }
+       for _, testCase := range testCases {
+               manifest := Manifest{string(testCase[0])}
+               blockChannel := manifest.BlockIterWithDuplicates()
+
+               block := <-blockChannel
+
+               if testCase[1] != "" { // expecting error
+                       if block.Err == nil {
+                               t.Errorf("Expected error")
+                       }
+
+                       matched, err := regexp.MatchString(testCase[1], block.Err.Error())
+                       if err != nil {
+                               t.Errorf("Got error verifying returned block locator error: %v", err)
+                       }
+                       if !matched {
+                               t.Errorf("Expected error not found. Expected: %v; Found = %v", testCase[1], block.Err.Error())
+                       }
+               } else {
+                       if block.Err != nil {
+                               t.Errorf("Got error: %v", block.Err)
+                       }
+               }
+       }
+}
index 7102ec3c054bb24df5e3e0979784b9c08b3949ed..0d583c2c99845ffcbd0841b73e28a78ab90d1592 100644 (file)
@@ -286,16 +286,20 @@ func ProcessCollections(arvLogger *logger.Logger,
 
                blockChannel := manifest.BlockIterWithDuplicates()
                for block := range blockChannel {
-                       if storedSize, stored := collection.BlockDigestToSize[block.Digest]; stored && storedSize != block.Size {
+                       if block.Err != nil {
+                               err = block.Err
+                               return
+                       }
+                       if storedSize, stored := collection.BlockDigestToSize[block.Locator.Digest]; stored && storedSize != block.Locator.Size {
                                err = fmt.Errorf(
                                        "Collection %s contains multiple sizes (%d and %d) for block %s",
                                        collection.UUID,
                                        storedSize,
-                                       block.Size,
-                                       block.Digest)
+                                       block.Locator.Size,
+                                       block.Locator.Digest)
                                return
                        }
-                       collection.BlockDigestToSize[block.Digest] = block.Size
+                       collection.BlockDigestToSize[block.Locator.Digest] = block.Locator.Size
                }
                collection.TotalSize = 0
                for _, size := range collection.BlockDigestToSize {
index 3938b2f3fc984421e688dc08715dba83429d1675..6bb7ae5ede252c9b76ca4b04afed26b03be17902 100644 (file)
@@ -152,14 +152,26 @@ func (s *MySuite) TestGetCollectionsAndSummarize_ApiErrorGetCollections(c *C) {
        testGetCollectionsAndSummarize(c, testData)
 }
 
-func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadManifest(c *C) {
+func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadStreamName(c *C) {
        dataMap := make(map[string]arvadostest.StatusAndBody)
        dataMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StatusAndBody{200, `{"defaultCollectionReplication":2}`}
-       dataMap["/arvados/v1/collections"] = arvadostest.StatusAndBody{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"thisisnotavalidmanifest"}]}`}
+       dataMap["/arvados/v1/collections"] = arvadostest.StatusAndBody{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"badstreamname"}]}`}
 
        testData := APITestData{}
        testData.data = dataMap
-       testData.expectedError = ".*invalid manifest format.*"
+       testData.expectedError = "Invalid stream name: badstreamname"
+
+       testGetCollectionsAndSummarize(c, testData)
+}
+
+func (s *MySuite) TestGetCollectionsAndSummarize_GetCollectionsBadFileToken(c *C) {
+       dataMap := make(map[string]arvadostest.StatusAndBody)
+       dataMap["/discovery/v1/apis/arvados/v1/rest"] = arvadostest.StatusAndBody{200, `{"defaultCollectionReplication":2}`}
+       dataMap["/arvados/v1/collections"] = arvadostest.StatusAndBody{200, `{"items_available":1,"items":[{"modified_at":"2015-11-24T15:04:05Z","manifest_text":"./goodstream acbd18db4cc2f85cedef654fccc4a4d8+3 0:1:file1.txt file2.txt"}]}`}
+
+       testData := APITestData{}
+       testData.data = dataMap
+       testData.expectedError = "Invalid file token: file2.txt"
 
        testGetCollectionsAndSummarize(c, testData)
 }