/* Deals with parsing Manifest Text. */ // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb package manifest import ( "errors" "fmt" "git.curoverse.com/arvados.git/sdk/go/blockdigest" "regexp" "strconv" "strings" ) var ErrInvalidToken = errors.New("Invalid token") var LocatorPattern = regexp.MustCompile( "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$") type Manifest struct { Text string Err error } type BlockLocator struct { Digest blockdigest.BlockDigest Size int Hints []string } type DataSegment struct { BlockLocator Locator string StreamOffset uint64 } // FileSegment is a portion of a file that is contained within a // single block. type FileSegment struct { Locator string // Offset (within this block) of this data segment Offset int Len int } // FileStreamSegment is a portion of a file described as a segment of a stream. type FileStreamSegment struct { SegPos uint64 SegLen uint64 Name string } // Represents a single line from a manifest. type ManifestStream struct { StreamName string Blocks []string FileStreamSegments []FileStreamSegment Err error } var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`) func unescapeSeq(seq string) string { if seq == `\\` { return `\` } i, err := strconv.ParseUint(seq[1:], 8, 8) if err != nil { // Invalid escape sequence: can't unescape. return seq } return string([]byte{byte(i)}) } func UnescapeName(s string) string { return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq) } func ParseBlockLocator(s string) (b BlockLocator, err error) { if !LocatorPattern.MatchString(s) { err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+ "\"%s\".", s, LocatorPattern.String()) } else { tokens := strings.Split(s, "+") var blockSize int64 var blockDigest blockdigest.BlockDigest // We expect both of the following to succeed since LocatorPattern // restricts the strings appropriately. blockDigest, err = blockdigest.FromString(tokens[0]) if err != nil { return } blockSize, err = strconv.ParseInt(tokens[1], 10, 0) if err != nil { return } b.Digest = blockDigest b.Size = int(blockSize) b.Hints = tokens[2:] } return } func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) { parts := strings.SplitN(tok, ":", 3) if len(parts) != 3 { err = ErrInvalidToken return } ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64) if err != nil { return } ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64) if err != nil { return } ft.Name = UnescapeName(parts[2]) return } func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment { ch := make(chan *FileSegment) go func() { s.sendFileSegmentIterByName(filepath, ch) close(ch) }() return ch } func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) { blockLens := make([]int, 0, len(s.Blocks)) // This is what streamName+"/"+fileName will look like: target := "./" + filepath for _, fTok := range s.FileStreamSegments { wantPos := fTok.SegPos wantLen := fTok.SegLen name := fTok.Name if s.StreamName+"/"+name != target { continue } if wantLen == 0 { ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0} continue } // Linear search for blocks containing data for this // file var blockPos uint64 = 0 // position of block in stream for i, loc := range s.Blocks { if blockPos >= wantPos+wantLen { break } if len(blockLens) <= i { blockLens = blockLens[:i+1] b, err := ParseBlockLocator(loc) if err != nil { // Unparseable locator -> unusable // stream. ch <- nil return } blockLens[i] = b.Size } blockLen := uint64(blockLens[i]) if blockPos+blockLen <= wantPos { blockPos += blockLen continue } fseg := FileSegment{ Locator: loc, Offset: 0, Len: blockLens[i], } if blockPos < wantPos { fseg.Offset = int(wantPos - blockPos) fseg.Len -= fseg.Offset } if blockPos+blockLen > wantPos+wantLen { fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset } ch <- &fseg blockPos += blockLen } } } func parseManifestStream(s string) (m ManifestStream) { tokens := strings.Split(s, " ") m.StreamName = UnescapeName(tokens[0]) if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") { m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName) return } tokens = tokens[1:] var i int for i = 0; i < len(tokens); i++ { if !blockdigest.IsBlockLocator(tokens[i]) { break } } m.Blocks = tokens[:i] fileTokens := tokens[i:] if len(m.Blocks) == 0 { m.Err = fmt.Errorf("No block locators found") return } if len(fileTokens) == 0 { m.Err = fmt.Errorf("No file tokens found") return } for _, ft := range fileTokens { pft, err := parseFileStreamSegment(ft) if err != nil { m.Err = fmt.Errorf("Invalid file token: %s", ft) break } m.FileStreamSegments = append(m.FileStreamSegments, pft) } return } func (m *Manifest) StreamIter() <-chan ManifestStream { ch := make(chan ManifestStream) go func(input string) { // This slice holds the current line and the remainder of the // manifest. We parse one line at a time, to save effort if we // only need the first few lines. lines := []string{"", input} for { lines = strings.SplitN(lines[1], "\n", 2) if len(lines[0]) > 0 { // Only parse non-blank lines ch <- parseManifestStream(lines[0]) } if len(lines) == 1 { break } } close(ch) }(m.Text) return ch } func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment { ch := make(chan *FileSegment) go func() { for stream := range m.StreamIter() { if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") { continue } stream.sendFileSegmentIterByName(filepath, ch) } close(ch) }() return ch } // Blocks may appear mulitple times within the same manifest if they // are used by multiple files. In that case this Iterator will output // the same block multiple times. // // In order to detect parse errors, caller must check m.Err after the returned channel closes. func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator { blockChannel := make(chan blockdigest.BlockLocator) go func(streamChannel <-chan ManifestStream) { for ms := range streamChannel { if ms.Err != nil { m.Err = ms.Err continue } for _, block := range ms.Blocks { if b, err := blockdigest.ParseBlockLocator(block); err == nil { blockChannel <- b } else { m.Err = err } } } close(blockChannel) }(m.StreamIter()) return blockChannel }