X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/d44a5c508cfa664134daad806d7be9a7cb0bd6ee..f8192fbfdac74e88ac60dc8c1e652745873471a7:/sdk/go/manifest/manifest.go?ds=sidebyside diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go index 882b4ff45d..f104d9a103 100644 --- a/sdk/go/manifest/manifest.go +++ b/sdk/go/manifest/manifest.go @@ -5,14 +5,17 @@ package manifest import ( - "bufio" + "errors" "fmt" + "git.curoverse.com/arvados.git/sdk/go/blockdigest" "log" "regexp" "strconv" "strings" ) +var ErrInvalidToken = errors.New("Invalid token") + var LocatorPattern = regexp.MustCompile( "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$") @@ -21,77 +24,223 @@ type Manifest struct { } type BlockLocator struct { - Digest string - Size int - Hints []string + Digest blockdigest.BlockDigest + Size int + Hints []string +} + +type DataSegment struct { + BlockLocator + Locator string + StreamOffset uint64 +} + +// FileSegment is a portion of a file that is contained within a +// single block. +type FileSegment struct { + Locator string + // Offset (within this block) of this data segment + Offset int + Len int +} + +// Represents a single line from a manifest. +type ManifestStream struct { + StreamName string + Blocks []string + FileTokens []string +} + +var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`) + +func unescapeSeq(seq string) string { + if seq == `\\` { + return `\` + } + i, err := strconv.ParseUint(seq[1:], 8, 8) + if err != nil { + // Invalid escape sequence: can't unescape. + return seq + } + return string([]byte{byte(i)}) } -type ManifestLine struct { - StreamName string - Blocks []string - Files []string +func UnescapeName(s string) string { + return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq) } -func parseBlockLocator(s string) (b BlockLocator, err error) { +func ParseBlockLocator(s string) (b BlockLocator, err error) { if !LocatorPattern.MatchString(s) { - err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern " + + err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+ "\"%s\".", s, LocatorPattern.String()) } else { tokens := strings.Split(s, "+") var blockSize int64 - // We expect ParseInt to succeed since LocatorPattern restricts - // tokens[1] to contain exclusively digits. + var blockDigest blockdigest.BlockDigest + // We expect both of the following to succeed since LocatorPattern + // restricts the strings appropriately. + blockDigest, err = blockdigest.FromString(tokens[0]) + if err != nil { + return + } blockSize, err = strconv.ParseInt(tokens[1], 10, 0) - if err == nil { - b.Digest = tokens[0] - b.Size = int(blockSize) - b.Hints = tokens[2:] + if err != nil { + return } + b.Digest = blockDigest + b.Size = int(blockSize) + b.Hints = tokens[2:] } return } -func parseManifestLine(s string) (m ManifestLine) { +func parseFileToken(tok string) (segPos, segLen uint64, name string, err error) { + parts := strings.SplitN(tok, ":", 3) + if len(parts) != 3 { + err = ErrInvalidToken + return + } + segPos, err = strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return + } + segLen, err = strconv.ParseUint(parts[1], 10, 64) + if err != nil { + return + } + name = UnescapeName(parts[2]) + return +} + +func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment { + ch := make(chan *FileSegment) + go func() { + s.sendFileSegmentIterByName(filepath, ch) + close(ch) + }() + return ch +} + +func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) { + blockLens := make([]int, 0, len(s.Blocks)) + // This is what streamName+"/"+fileName will look like: + target := "./" + filepath + for _, fTok := range s.FileTokens { + wantPos, wantLen, name, err := parseFileToken(fTok) + if err != nil { + // Skip (!) invalid file tokens. + continue + } + if s.StreamName+"/"+name != target { + continue + } + if wantLen == 0 { + ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0} + continue + } + // Linear search for blocks containing data for this + // file + var blockPos uint64 = 0 // position of block in stream + for i, loc := range s.Blocks { + if blockPos >= wantPos+wantLen { + break + } + if len(blockLens) <= i { + blockLens = blockLens[:i+1] + b, err := ParseBlockLocator(loc) + if err != nil { + // Unparseable locator -> unusable + // stream. + ch <- nil + return + } + blockLens[i] = b.Size + } + blockLen := uint64(blockLens[i]) + if blockPos+blockLen <= wantPos { + blockPos += blockLen + continue + } + fseg := FileSegment{ + Locator: loc, + Offset: 0, + Len: blockLens[i], + } + if blockPos < wantPos { + fseg.Offset = int(wantPos - blockPos) + fseg.Len -= fseg.Offset + } + if blockPos+blockLen > wantPos+wantLen { + fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset + } + ch <- &fseg + blockPos += blockLen + } + } +} + +func parseManifestStream(s string) (m ManifestStream) { tokens := strings.Split(s, " ") - m.StreamName = tokens[0] + m.StreamName = UnescapeName(tokens[0]) tokens = tokens[1:] var i int for i = range tokens { - if !LocatorPattern.MatchString(tokens[i]) { + if !blockdigest.IsBlockLocator(tokens[i]) { break } } m.Blocks = tokens[:i] - m.Files = tokens[i:] + m.FileTokens = tokens[i:] return } -func (m *Manifest) LineIter() <-chan ManifestLine { - ch := make(chan ManifestLine) +func (m *Manifest) StreamIter() <-chan ManifestStream { + ch := make(chan ManifestStream) go func(input string) { - scanner := bufio.NewScanner(strings.NewReader(input)) - for scanner.Scan() { - // We parse one line at a time, to save effort if we only need - // the first few lines. - ch <- parseManifestLine(scanner.Text()) + // This slice holds the current line and the remainder of the + // manifest. We parse one line at a time, to save effort if we + // only need the first few lines. + lines := []string{"", input} + for { + lines = strings.SplitN(lines[1], "\n", 2) + if len(lines[0]) > 0 { + // Only parse non-blank lines + ch <- parseManifestStream(lines[0]) + } + if len(lines) == 1 { + break + } } close(ch) }(m.Text) return ch } +func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment { + ch := make(chan *FileSegment) + go func() { + for stream := range m.StreamIter() { + if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") { + continue + } + stream.sendFileSegmentIterByName(filepath, ch) + } + close(ch) + }() + return ch +} // Blocks may appear mulitple times within the same manifest if they // are used by multiple files. In that case this Iterator will output // the same block multiple times. -func (m *Manifest) BlockIterWithDuplicates() <-chan BlockLocator { - blockChannel := make(chan BlockLocator) - go func(lineChannel <-chan ManifestLine) { - for m := range lineChannel { +func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator { + blockChannel := make(chan blockdigest.BlockLocator) + go func(streamChannel <-chan ManifestStream) { + for m := range streamChannel { for _, block := range m.Blocks { - if b, err := parseBlockLocator(block); err == nil { + if b, err := blockdigest.ParseBlockLocator(block); err == nil { blockChannel <- b } else { log.Printf("ERROR: Failed to parse block: %v", err) @@ -99,6 +248,6 @@ func (m *Manifest) BlockIterWithDuplicates() <-chan BlockLocator { } } close(blockChannel) - }(m.LineIter()) + }(m.StreamIter()) return blockChannel }