package manifest
import (
+ "errors"
"fmt"
- "log"
+ "git.curoverse.com/arvados.git/sdk/go/blockdigest"
"regexp"
"strconv"
"strings"
)
+var ErrInvalidToken = errors.New("Invalid token")
+
var LocatorPattern = regexp.MustCompile(
"^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
type Manifest struct {
Text string
+ Err error
}
type BlockLocator struct {
- Digest string
- Size int
- Hints []string
+ Digest blockdigest.BlockDigest
+ Size int
+ Hints []string
+}
+
+type DataSegment struct {
+ BlockLocator
+ Locator string
+ StreamOffset uint64
+}
+
+// FileSegment is a portion of a file that is contained within a
+// single block.
+type FileSegment struct {
+ Locator string
+ // Offset (within this block) of this data segment
+ Offset int
+ Len int
+}
+
+// FileStreamSegment is a portion of a file described as a segment of a stream.
+type FileStreamSegment struct {
+ SegPos uint64
+ SegLen uint64
+ Name string
}
-type ManifestLine struct {
- StreamName string
- Blocks []string
- Files []string
+// Represents a single line from a manifest.
+type ManifestStream struct {
+ StreamName string
+ Blocks []string
+ FileStreamSegments []FileStreamSegment
+ Err error
+}
+
+var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
+
+func unescapeSeq(seq string) string {
+ if seq == `\\` {
+ return `\`
+ }
+ i, err := strconv.ParseUint(seq[1:], 8, 8)
+ if err != nil {
+ // Invalid escape sequence: can't unescape.
+ return seq
+ }
+ return string([]byte{byte(i)})
+}
+
+func UnescapeName(s string) string {
+ return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
}
func ParseBlockLocator(s string) (b BlockLocator, err error) {
if !LocatorPattern.MatchString(s) {
- err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern " +
+ err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
"\"%s\".",
s,
LocatorPattern.String())
} else {
tokens := strings.Split(s, "+")
var blockSize int64
- // We expect ParseInt to succeed since LocatorPattern restricts
- // tokens[1] to contain exclusively digits.
+ var blockDigest blockdigest.BlockDigest
+ // We expect both of the following to succeed since LocatorPattern
+ // restricts the strings appropriately.
+ blockDigest, err = blockdigest.FromString(tokens[0])
+ if err != nil {
+ return
+ }
blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
- if err == nil {
- b.Digest = tokens[0]
- b.Size = int(blockSize)
- b.Hints = tokens[2:]
+ if err != nil {
+ return
}
+ b.Digest = blockDigest
+ b.Size = int(blockSize)
+ b.Hints = tokens[2:]
}
return
}
-func parseManifestLine(s string) (m ManifestLine) {
+func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
+ parts := strings.SplitN(tok, ":", 3)
+ if len(parts) != 3 {
+ err = ErrInvalidToken
+ return
+ }
+ ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
+ if err != nil {
+ return
+ }
+ ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
+ if err != nil {
+ return
+ }
+ ft.Name = UnescapeName(parts[2])
+ return
+}
+
+func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
+ ch := make(chan *FileSegment)
+ go func() {
+ s.sendFileSegmentIterByName(filepath, ch)
+ close(ch)
+ }()
+ return ch
+}
+
+func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
+ blockLens := make([]int, 0, len(s.Blocks))
+ // This is what streamName+"/"+fileName will look like:
+ target := "./" + filepath
+ for _, fTok := range s.FileStreamSegments {
+ wantPos := fTok.SegPos
+ wantLen := fTok.SegLen
+ name := fTok.Name
+
+ if s.StreamName+"/"+name != target {
+ continue
+ }
+ if wantLen == 0 {
+ ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
+ continue
+ }
+ // Linear search for blocks containing data for this
+ // file
+ var blockPos uint64 = 0 // position of block in stream
+ for i, loc := range s.Blocks {
+ if blockPos >= wantPos+wantLen {
+ break
+ }
+ if len(blockLens) <= i {
+ blockLens = blockLens[:i+1]
+ b, err := ParseBlockLocator(loc)
+ if err != nil {
+ // Unparseable locator -> unusable
+ // stream.
+ ch <- nil
+ return
+ }
+ blockLens[i] = b.Size
+ }
+ blockLen := uint64(blockLens[i])
+ if blockPos+blockLen <= wantPos {
+ blockPos += blockLen
+ continue
+ }
+ fseg := FileSegment{
+ Locator: loc,
+ Offset: 0,
+ Len: blockLens[i],
+ }
+ if blockPos < wantPos {
+ fseg.Offset = int(wantPos - blockPos)
+ fseg.Len -= fseg.Offset
+ }
+ if blockPos+blockLen > wantPos+wantLen {
+ fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
+ }
+ ch <- &fseg
+ blockPos += blockLen
+ }
+ }
+}
+
+func parseManifestStream(s string) (m ManifestStream) {
tokens := strings.Split(s, " ")
- m.StreamName = tokens[0]
+
+ m.StreamName = UnescapeName(tokens[0])
+ if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
+ m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
+ return
+ }
+
tokens = tokens[1:]
var i int
- for i = range tokens {
- if !LocatorPattern.MatchString(tokens[i]) {
+ for i = 0; i < len(tokens); i++ {
+ if !blockdigest.IsBlockLocator(tokens[i]) {
break
}
}
m.Blocks = tokens[:i]
- m.Files = tokens[i:]
+ fileTokens := tokens[i:]
+
+ if len(m.Blocks) == 0 {
+ m.Err = fmt.Errorf("No block locators found")
+ return
+ }
+
+ if len(fileTokens) == 0 {
+ m.Err = fmt.Errorf("No file tokens found")
+ return
+ }
+
+ for _, ft := range fileTokens {
+ pft, err := parseFileStreamSegment(ft)
+ if err != nil {
+ m.Err = fmt.Errorf("Invalid file token: %s", ft)
+ break
+ }
+ m.FileStreamSegments = append(m.FileStreamSegments, pft)
+ }
+
return
}
-func (m *Manifest) LineIter() <-chan ManifestLine {
- ch := make(chan ManifestLine)
+func (m *Manifest) StreamIter() <-chan ManifestStream {
+ ch := make(chan ManifestStream)
go func(input string) {
// This slice holds the current line and the remainder of the
// manifest. We parse one line at a time, to save effort if we
lines = strings.SplitN(lines[1], "\n", 2)
if len(lines[0]) > 0 {
// Only parse non-blank lines
- ch <- parseManifestLine(lines[0])
+ ch <- parseManifestStream(lines[0])
}
if len(lines) == 1 {
break
return ch
}
+func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
+ ch := make(chan *FileSegment)
+ go func() {
+ for stream := range m.StreamIter() {
+ if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
+ continue
+ }
+ stream.sendFileSegmentIterByName(filepath, ch)
+ }
+ close(ch)
+ }()
+ return ch
+}
// Blocks may appear mulitple times within the same manifest if they
// are used by multiple files. In that case this Iterator will output
// the same block multiple times.
-func (m *Manifest) BlockIterWithDuplicates() <-chan BlockLocator {
- blockChannel := make(chan BlockLocator)
- go func(lineChannel <-chan ManifestLine) {
- for m := range lineChannel {
- for _, block := range m.Blocks {
- if b, err := ParseBlockLocator(block); err == nil {
+//
+// In order to detect parse errors, caller must check m.Err after the returned channel closes.
+func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
+ blockChannel := make(chan blockdigest.BlockLocator)
+ go func(streamChannel <-chan ManifestStream) {
+ for ms := range streamChannel {
+ if ms.Err != nil {
+ m.Err = ms.Err
+ continue
+ }
+ for _, block := range ms.Blocks {
+ if b, err := blockdigest.ParseBlockLocator(block); err == nil {
blockChannel <- b
} else {
- log.Printf("ERROR: Failed to parse block: %v", err)
+ m.Err = err
}
}
}
close(blockChannel)
- }(m.LineIter())
+ }(m.StreamIter())
return blockChannel
}