X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0c0f18dfbcdcf552889258b76563315fbe2eb060..84d86c1721bf549c2dc38df95f29f3579b36a5ae:/sdk/go/manifest/manifest.go diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go index 49faa01b4d..22b1c974e6 100644 --- a/sdk/go/manifest/manifest.go +++ b/sdk/go/manifest/manifest.go @@ -20,6 +20,7 @@ var LocatorPattern = regexp.MustCompile( type Manifest struct { Text string + Err error } type BlockLocator struct { @@ -43,12 +44,19 @@ type FileSegment struct { Len int } +// FileStreamSegment is a portion of a file described as a segment of a stream. +type FileStreamSegment struct { + SegPos uint64 + SegLen uint64 + Name string +} + // Represents a single line from a manifest. type ManifestStream struct { - StreamName string - Blocks []string - FileTokens []string - Err error + StreamName string + Blocks []string + FileStreamSegments []FileStreamSegment + Err error } var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`) @@ -96,21 +104,21 @@ func ParseBlockLocator(s string) (b BlockLocator, err error) { return } -func parseFileToken(tok string) (segPos, segLen uint64, name string, err error) { +func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) { parts := strings.SplitN(tok, ":", 3) if len(parts) != 3 { err = ErrInvalidToken return } - segPos, err = strconv.ParseUint(parts[0], 10, 64) + ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64) if err != nil { return } - segLen, err = strconv.ParseUint(parts[1], 10, 64) + ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64) if err != nil { return } - name = UnescapeName(parts[2]) + ft.Name = UnescapeName(parts[2]) return } @@ -127,12 +135,11 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F blockLens := make([]int, 0, len(s.Blocks)) // This is what streamName+"/"+fileName will look like: target := "./" + filepath - for _, fTok := range s.FileTokens { - wantPos, wantLen, name, err := parseFileToken(fTok) - if err != nil { - // Skip (!) invalid file tokens. - continue - } + for _, fTok := range s.FileStreamSegments { + wantPos := fTok.SegPos + wantLen := fTok.SegLen + name := fTok.Name + if s.StreamName+"/"+name != target { continue } @@ -183,28 +190,40 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F func parseManifestStream(s string) (m ManifestStream) { tokens := strings.Split(s, " ") + m.StreamName = UnescapeName(tokens[0]) + if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") { + m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName) + return + } + tokens = tokens[1:] var i int - for i = range tokens { + for i = 0; i < len(tokens); i++ { if !blockdigest.IsBlockLocator(tokens[i]) { break } } m.Blocks = tokens[:i] - m.FileTokens = tokens[i:] + fileTokens := tokens[i:] - if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") { - m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName) + if len(m.Blocks) == 0 { + m.Err = fmt.Errorf("No block locators found") return } - for _, ft := range m.FileTokens { - _, _, _, err := parseFileToken(ft) + if len(fileTokens) == 0 { + m.Err = fmt.Errorf("No file tokens found") + return + } + + for _, ft := range fileTokens { + pft, err := parseFileStreamSegment(ft) if err != nil { m.Err = fmt.Errorf("Invalid file token: %s", ft) break } + m.FileStreamSegments = append(m.FileStreamSegments, pft) } return @@ -246,28 +265,24 @@ func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment { return ch } -type ManifestBlockLocator struct { - Locator blockdigest.BlockLocator - Err error -} - -// Blocks may appear mulitple times within the same manifest if they +// Blocks may appear multiple times within the same manifest if they // are used by multiple files. In that case this Iterator will output // the same block multiple times. -func (m *Manifest) BlockIterWithDuplicates() <-chan ManifestBlockLocator { - blockChannel := make(chan ManifestBlockLocator) +// +// In order to detect parse errors, caller must check m.Err after the returned channel closes. +func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator { + blockChannel := make(chan blockdigest.BlockLocator) go func(streamChannel <-chan ManifestStream) { for ms := range streamChannel { if ms.Err != nil { - blockChannel <- ManifestBlockLocator{Locator: blockdigest.BlockLocator{}, Err: ms.Err} + m.Err = ms.Err continue } for _, block := range ms.Blocks { - b, err := blockdigest.ParseBlockLocator(block) - if err == nil { - blockChannel <- ManifestBlockLocator{b, nil} + if b, err := blockdigest.ParseBlockLocator(block); err == nil { + blockChannel <- b } else { - blockChannel <- ManifestBlockLocator{Locator: blockdigest.BlockLocator{}, Err: err} + m.Err = err } } }