10165: Add --output-name option to set name of output collection.
[arvados.git] / sdk / go / manifest / manifest.go
index ad0948847e03778be0f3d8b854c10d5fa7146a90..22b1c974e634cd8229b645421ecb09480807c000 100644 (file)
 package manifest
 
 import (
-       "bufio"
+       "errors"
        "fmt"
-       "log"
+       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "regexp"
        "strconv"
        "strings"
 )
 
+var ErrInvalidToken = errors.New("Invalid token")
+
 var LocatorPattern = regexp.MustCompile(
        "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
 
 type Manifest struct {
        Text string
+       Err  error
 }
 
 type BlockLocator struct {
-       Digest  string
-       Size    int
-       Hints   []string
+       Digest blockdigest.BlockDigest
+       Size   int
+       Hints  []string
+}
+
+type DataSegment struct {
+       BlockLocator
+       Locator      string
+       StreamOffset uint64
+}
+
+// FileSegment is a portion of a file that is contained within a
+// single block.
+type FileSegment struct {
+       Locator string
+       // Offset (within this block) of this data segment
+       Offset int
+       Len    int
+}
+
+// FileStreamSegment is a portion of a file described as a segment of a stream.
+type FileStreamSegment struct {
+       SegPos uint64
+       SegLen uint64
+       Name   string
 }
 
-type ManifestLine struct {
-       StreamName  string
-       Blocks       []string
-       Files        []string
+// Represents a single line from a manifest.
+type ManifestStream struct {
+       StreamName         string
+       Blocks             []string
+       FileStreamSegments []FileStreamSegment
+       Err                error
 }
 
-func parseBlockLocator(s string) (b BlockLocator, err error) {
+var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
+
+func unescapeSeq(seq string) string {
+       if seq == `\\` {
+               return `\`
+       }
+       i, err := strconv.ParseUint(seq[1:], 8, 8)
+       if err != nil {
+               // Invalid escape sequence: can't unescape.
+               return seq
+       }
+       return string([]byte{byte(i)})
+}
+
+func UnescapeName(s string) string {
+       return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
+}
+
+func ParseBlockLocator(s string) (b BlockLocator, err error) {
        if !LocatorPattern.MatchString(s) {
-               fmt.Errorf("String \"%s\" does not match BlockLocator pattern \"%s\".",
+               err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
+                       "\"%s\".",
                        s,
                        LocatorPattern.String())
        } else {
                tokens := strings.Split(s, "+")
                var blockSize int64
-               // We expect ParseInt to succeed since LocatorPattern restricts
-               // tokens[1] to contain exclusively digits.
+               var blockDigest blockdigest.BlockDigest
+               // We expect both of the following to succeed since LocatorPattern
+               // restricts the strings appropriately.
+               blockDigest, err = blockdigest.FromString(tokens[0])
+               if err != nil {
+                       return
+               }
                blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
-               if err == nil {
-                       b.Digest = tokens[0]
-                       b.Size = int(blockSize)
-                       b.Hints = tokens[2:]
+               if err != nil {
+                       return
                }
+               b.Digest = blockDigest
+               b.Size = int(blockSize)
+               b.Hints = tokens[2:]
        }
        return
 }
 
-func parseManifestLine(s string) (m ManifestLine) {
+func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
+       parts := strings.SplitN(tok, ":", 3)
+       if len(parts) != 3 {
+               err = ErrInvalidToken
+               return
+       }
+       ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
+       if err != nil {
+               return
+       }
+       ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
+       if err != nil {
+               return
+       }
+       ft.Name = UnescapeName(parts[2])
+       return
+}
+
+func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
+       ch := make(chan *FileSegment)
+       go func() {
+               s.sendFileSegmentIterByName(filepath, ch)
+               close(ch)
+       }()
+       return ch
+}
+
+func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
+       blockLens := make([]int, 0, len(s.Blocks))
+       // This is what streamName+"/"+fileName will look like:
+       target := "./" + filepath
+       for _, fTok := range s.FileStreamSegments {
+               wantPos := fTok.SegPos
+               wantLen := fTok.SegLen
+               name := fTok.Name
+
+               if s.StreamName+"/"+name != target {
+                       continue
+               }
+               if wantLen == 0 {
+                       ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
+                       continue
+               }
+               // Linear search for blocks containing data for this
+               // file
+               var blockPos uint64 = 0 // position of block in stream
+               for i, loc := range s.Blocks {
+                       if blockPos >= wantPos+wantLen {
+                               break
+                       }
+                       if len(blockLens) <= i {
+                               blockLens = blockLens[:i+1]
+                               b, err := ParseBlockLocator(loc)
+                               if err != nil {
+                                       // Unparseable locator -> unusable
+                                       // stream.
+                                       ch <- nil
+                                       return
+                               }
+                               blockLens[i] = b.Size
+                       }
+                       blockLen := uint64(blockLens[i])
+                       if blockPos+blockLen <= wantPos {
+                               blockPos += blockLen
+                               continue
+                       }
+                       fseg := FileSegment{
+                               Locator: loc,
+                               Offset:  0,
+                               Len:     blockLens[i],
+                       }
+                       if blockPos < wantPos {
+                               fseg.Offset = int(wantPos - blockPos)
+                               fseg.Len -= fseg.Offset
+                       }
+                       if blockPos+blockLen > wantPos+wantLen {
+                               fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
+                       }
+                       ch <- &fseg
+                       blockPos += blockLen
+               }
+       }
+}
+
+func parseManifestStream(s string) (m ManifestStream) {
        tokens := strings.Split(s, " ")
-       m.StreamName = tokens[0]
+
+       m.StreamName = UnescapeName(tokens[0])
+       if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
+               m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
+               return
+       }
+
        tokens = tokens[1:]
        var i int
-       for i = range tokens {
-               if !LocatorPattern.MatchString(tokens[i]) {
+       for i = 0; i < len(tokens); i++ {
+               if !blockdigest.IsBlockLocator(tokens[i]) {
                        break
                }
        }
        m.Blocks = tokens[:i]
-       m.Files = tokens[i:]
+       fileTokens := tokens[i:]
+
+       if len(m.Blocks) == 0 {
+               m.Err = fmt.Errorf("No block locators found")
+               return
+       }
+
+       if len(fileTokens) == 0 {
+               m.Err = fmt.Errorf("No file tokens found")
+               return
+       }
+
+       for _, ft := range fileTokens {
+               pft, err := parseFileStreamSegment(ft)
+               if err != nil {
+                       m.Err = fmt.Errorf("Invalid file token: %s", ft)
+                       break
+               }
+               m.FileStreamSegments = append(m.FileStreamSegments, pft)
+       }
+
        return
 }
 
-func (m *Manifest) LineIter() <-chan ManifestLine {
-       ch := make(chan ManifestLine)
+func (m *Manifest) StreamIter() <-chan ManifestStream {
+       ch := make(chan ManifestStream)
        go func(input string) {
-               scanner := bufio.NewScanner(strings.NewReader(input))
-               for scanner.Scan() {
-                       // We parse one line at a time, to save effort if we only need
-                       // the first few lines.
-                       ch <- parseManifestLine(scanner.Text())
+               // This slice holds the current line and the remainder of the
+               // manifest.  We parse one line at a time, to save effort if we
+               // only need the first few lines.
+               lines := []string{"", input}
+               for {
+                       lines = strings.SplitN(lines[1], "\n", 2)
+                       if len(lines[0]) > 0 {
+                               // Only parse non-blank lines
+                               ch <- parseManifestStream(lines[0])
+                       }
+                       if len(lines) == 1 {
+                               break
+                       }
                }
                close(ch)
        }(m.Text)
        return ch
 }
 
+func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
+       ch := make(chan *FileSegment)
+       go func() {
+               for stream := range m.StreamIter() {
+                       if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
+                               continue
+                       }
+                       stream.sendFileSegmentIterByName(filepath, ch)
+               }
+               close(ch)
+       }()
+       return ch
+}
 
-// Blocks may appear mulitple times within the same manifest if they
+// Blocks may appear multiple times within the same manifest if they
 // are used by multiple files. In that case this Iterator will output
 // the same block multiple times.
-func (m *Manifest) BlockIterWithDuplicates() <-chan BlockLocator {
-       blockChannel := make(chan BlockLocator)
-       go func(lineChannel <-chan ManifestLine) {
-               for m := range lineChannel {
-                       for _, block := range m.Blocks {
-                               if b, err := parseBlockLocator(block); err == nil {
+//
+// In order to detect parse errors, caller must check m.Err after the returned channel closes.
+func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
+       blockChannel := make(chan blockdigest.BlockLocator)
+       go func(streamChannel <-chan ManifestStream) {
+               for ms := range streamChannel {
+                       if ms.Err != nil {
+                               m.Err = ms.Err
+                               continue
+                       }
+                       for _, block := range ms.Blocks {
+                               if b, err := blockdigest.ParseBlockLocator(block); err == nil {
                                        blockChannel <- b
                                } else {
-                                       log.Printf("ERROR: Failed to parse block: %v", err)
+                                       m.Err = err
                                }
                        }
                }
                close(blockChannel)
-       }(m.LineIter())
+       }(m.StreamIter())
        return blockChannel
 }