X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/78af1220d9e2ddf4d933d9a9487397414d8a3909..3aaefcb3c76ff470b475d950398d01255e87712a:/sdk/go/manifest/manifest.go diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go index f104d9a103..954fb710c0 100644 --- a/sdk/go/manifest/manifest.go +++ b/sdk/go/manifest/manifest.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + /* Deals with parsing Manifest Text. */ // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb @@ -7,20 +11,19 @@ package manifest import ( "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/blockdigest" - "log" + "git.arvados.org/arvados.git/sdk/go/blockdigest" + "path" "regexp" + "sort" "strconv" "strings" ) var ErrInvalidToken = errors.New("Invalid token") -var LocatorPattern = regexp.MustCompile( - "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$") - type Manifest struct { Text string + Err error } type BlockLocator struct { @@ -29,12 +32,6 @@ type BlockLocator struct { Hints []string } -type DataSegment struct { - BlockLocator - Locator string - StreamOffset uint64 -} - // FileSegment is a portion of a file that is contained within a // single block. type FileSegment struct { @@ -44,13 +41,31 @@ type FileSegment struct { Len int } -// Represents a single line from a manifest. +// FileStreamSegment is a portion of a file described as a segment of a stream. +type FileStreamSegment struct { + SegPos uint64 + SegLen uint64 + Name string +} + +// ManifestStream represents a single line from a manifest. type ManifestStream struct { - StreamName string - Blocks []string - FileTokens []string + StreamName string + Blocks []string + blockOffsets []uint64 + FileStreamSegments []FileStreamSegment + Err error } +// Array of segments referencing file content +type segmentedFile []FileSegment + +// Map of files to list of file segments referencing file content +type segmentedStream map[string]segmentedFile + +// Map of streams +type segmentedManifest map[string]segmentedStream + var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`) func unescapeSeq(seq string) string { @@ -65,16 +80,30 @@ func unescapeSeq(seq string) string { return string([]byte{byte(i)}) } +func EscapeName(s string) string { + raw := []byte(s) + escaped := make([]byte, 0, len(s)) + for _, c := range raw { + if c <= 32 { + oct := fmt.Sprintf("\\%03o", c) + escaped = append(escaped, []byte(oct)...) + } else { + escaped = append(escaped, c) + } + } + return string(escaped) +} + func UnescapeName(s string) string { return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq) } func ParseBlockLocator(s string) (b BlockLocator, err error) { - if !LocatorPattern.MatchString(s) { + if !blockdigest.LocatorPattern.MatchString(s) { err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+ "\"%s\".", s, - LocatorPattern.String()) + blockdigest.LocatorPattern.String()) } else { tokens := strings.Split(s, "+") var blockSize int64 @@ -96,26 +125,26 @@ func ParseBlockLocator(s string) (b BlockLocator, err error) { return } -func parseFileToken(tok string) (segPos, segLen uint64, name string, err error) { +func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) { parts := strings.SplitN(tok, ":", 3) if len(parts) != 3 { err = ErrInvalidToken return } - segPos, err = strconv.ParseUint(parts[0], 10, 64) + ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64) if err != nil { return } - segLen, err = strconv.ParseUint(parts[1], 10, 64) + ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64) if err != nil { return } - name = UnescapeName(parts[2]) + ft.Name = UnescapeName(parts[2]) return } func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment { - ch := make(chan *FileSegment) + ch := make(chan *FileSegment, 64) go func() { s.sendFileSegmentIterByName(filepath, ch) close(ch) @@ -123,16 +152,44 @@ func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegm return ch } +func firstBlock(offsets []uint64, rangeStart uint64) int { + // rangeStart/blockStart is the inclusive lower bound + // rangeEnd/blockEnd is the exclusive upper bound + + hi := len(offsets) - 1 + var lo int + i := ((hi + lo) / 2) + blockStart := offsets[i] + blockEnd := offsets[i+1] + + // perform a binary search for the first block + // assumes that all of the blocks are contiguous, so rangeStart is guaranteed + // to either fall into the range of a block or be outside the block range entirely + for !(rangeStart >= blockStart && rangeStart < blockEnd) { + if lo == i { + // must be out of range, fail + return -1 + } + if rangeStart > blockStart { + lo = i + } else { + hi = i + } + i = ((hi + lo) / 2) + blockStart = offsets[i] + blockEnd = offsets[i+1] + } + return i +} + func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) { - blockLens := make([]int, 0, len(s.Blocks)) // This is what streamName+"/"+fileName will look like: - target := "./" + filepath - for _, fTok := range s.FileTokens { - wantPos, wantLen, name, err := parseFileToken(fTok) - if err != nil { - // Skip (!) invalid file tokens. - continue - } + target := fixStreamName(filepath) + for _, fTok := range s.FileStreamSegments { + wantPos := fTok.SegPos + wantLen := fTok.SegLen + name := fTok.Name + if s.StreamName+"/"+name != target { continue } @@ -140,59 +197,298 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0} continue } - // Linear search for blocks containing data for this - // file - var blockPos uint64 = 0 // position of block in stream - for i, loc := range s.Blocks { + + // Binary search to determine first block in the stream + i := firstBlock(s.blockOffsets, wantPos) + if i == -1 { + // Shouldn't happen, file segments are checked in parseManifestStream + panic(fmt.Sprintf("File segment %v extends past end of stream", fTok)) + } + for ; i < len(s.Blocks); i++ { + blockPos := s.blockOffsets[i] + blockEnd := s.blockOffsets[i+1] + if blockEnd <= wantPos { + // Shouldn't happen, FirstBlock() should start + // us on the right block, so if this triggers + // that means there is a bug. + panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos)) + } if blockPos >= wantPos+wantLen { + // current block comes after current file span break } - if len(blockLens) <= i { - blockLens = blockLens[:i+1] - b, err := ParseBlockLocator(loc) - if err != nil { - // Unparseable locator -> unusable - // stream. - ch <- nil - return - } - blockLens[i] = b.Size - } - blockLen := uint64(blockLens[i]) - if blockPos+blockLen <= wantPos { - blockPos += blockLen - continue - } + fseg := FileSegment{ - Locator: loc, + Locator: s.Blocks[i], Offset: 0, - Len: blockLens[i], + Len: int(blockEnd - blockPos), } if blockPos < wantPos { fseg.Offset = int(wantPos - blockPos) fseg.Len -= fseg.Offset } - if blockPos+blockLen > wantPos+wantLen { + if blockEnd > wantPos+wantLen { fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset } ch <- &fseg - blockPos += blockLen } } } func parseManifestStream(s string) (m ManifestStream) { tokens := strings.Split(s, " ") + m.StreamName = UnescapeName(tokens[0]) + if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") { + m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName) + return + } + tokens = tokens[1:] var i int - for i = range tokens { + for i = 0; i < len(tokens); i++ { if !blockdigest.IsBlockLocator(tokens[i]) { break } } m.Blocks = tokens[:i] - m.FileTokens = tokens[i:] + fileTokens := tokens[i:] + + if len(m.Blocks) == 0 { + m.Err = fmt.Errorf("No block locators found") + return + } + + m.blockOffsets = make([]uint64, len(m.Blocks)+1) + var streamoffset uint64 + for i, b := range m.Blocks { + bl, err := ParseBlockLocator(b) + if err != nil { + m.Err = err + return + } + m.blockOffsets[i] = streamoffset + streamoffset += uint64(bl.Size) + } + m.blockOffsets[len(m.Blocks)] = streamoffset + + if len(fileTokens) == 0 { + m.Err = fmt.Errorf("No file tokens found") + return + } + + for _, ft := range fileTokens { + pft, err := parseFileStreamSegment(ft) + if err != nil { + m.Err = fmt.Errorf("Invalid file token: %s", ft) + break + } + if pft.SegPos+pft.SegLen > streamoffset { + m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset) + break + } + m.FileStreamSegments = append(m.FileStreamSegments, pft) + } + + return +} + +func fixStreamName(sn string) string { + sn = path.Clean(sn) + if strings.HasPrefix(sn, "/") { + sn = "." + sn + } else if sn != "." { + sn = "./" + sn + } + return sn +} + +func splitPath(srcpath string) (streamname, filename string) { + pathIdx := strings.LastIndex(srcpath, "/") + if pathIdx >= 0 { + streamname = srcpath[0:pathIdx] + filename = srcpath[pathIdx+1:] + } else { + streamname = srcpath + filename = "" + } + return +} + +func (m *Manifest) segment() (*segmentedManifest, error) { + files := make(segmentedManifest) + + for stream := range m.StreamIter() { + if stream.Err != nil { + // Stream has an error + return nil, stream.Err + } + currentStreamfiles := make(map[string]bool) + for _, f := range stream.FileStreamSegments { + sn := stream.StreamName + if strings.HasSuffix(sn, "/") { + sn = sn[0 : len(sn)-1] + } + path := sn + "/" + f.Name + streamname, filename := splitPath(path) + if files[streamname] == nil { + files[streamname] = make(segmentedStream) + } + if !currentStreamfiles[path] { + segs := files[streamname][filename] + for seg := range stream.FileSegmentIterByName(path) { + if seg.Len > 0 { + segs = append(segs, *seg) + } + } + files[streamname][filename] = segs + currentStreamfiles[path] = true + } + } + } + + return &files, nil +} + +func (stream segmentedStream) normalizedText(name string) string { + var sortedfiles []string + for k := range stream { + sortedfiles = append(sortedfiles, k) + } + sort.Strings(sortedfiles) + + streamTokens := []string{EscapeName(name)} + + blocks := make(map[blockdigest.BlockDigest]int64) + var streamoffset int64 + + // Go through each file and add each referenced block exactly once. + for _, streamfile := range sortedfiles { + for _, segment := range stream[streamfile] { + b, _ := ParseBlockLocator(segment.Locator) + if _, ok := blocks[b.Digest]; !ok { + streamTokens = append(streamTokens, segment.Locator) + blocks[b.Digest] = streamoffset + streamoffset += int64(b.Size) + } + } + } + + if len(streamTokens) == 1 { + streamTokens = append(streamTokens, "d41d8cd98f00b204e9800998ecf8427e+0") + } + + for _, streamfile := range sortedfiles { + // Add in file segments + spanStart := int64(-1) + spanEnd := int64(0) + fout := EscapeName(streamfile) + for _, segment := range stream[streamfile] { + // Collapse adjacent segments + b, _ := ParseBlockLocator(segment.Locator) + streamoffset = blocks[b.Digest] + int64(segment.Offset) + if spanStart == -1 { + spanStart = streamoffset + spanEnd = streamoffset + int64(segment.Len) + } else { + if streamoffset == spanEnd { + spanEnd += int64(segment.Len) + } else { + streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout)) + spanStart = streamoffset + spanEnd = streamoffset + int64(segment.Len) + } + } + } + + if spanStart != -1 { + streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout)) + } + + if len(stream[streamfile]) == 0 { + streamTokens = append(streamTokens, fmt.Sprintf("0:0:%s", fout)) + } + } + + return strings.Join(streamTokens, " ") + "\n" +} + +func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string { + srcpath = fixStreamName(srcpath) + + var suffix string + if strings.HasSuffix(relocate, "/") { + suffix = "/" + } + relocate = fixStreamName(relocate) + suffix + + streamname, filename := splitPath(srcpath) + + if stream, ok := m[streamname]; ok { + // check if it refers to a single file in a stream + filesegs, okfile := stream[filename] + if okfile { + newstream := make(segmentedStream) + relocateStream, relocateFilename := splitPath(relocate) + if relocateFilename == "" { + relocateFilename = filename + } + newstream[relocateFilename] = filesegs + return newstream.normalizedText(relocateStream) + } + } + + // Going to extract multiple streams + prefix := srcpath + "/" + + if strings.HasSuffix(relocate, "/") { + relocate = relocate[0 : len(relocate)-1] + } + + var sortedstreams []string + for k := range m { + sortedstreams = append(sortedstreams, k) + } + sort.Strings(sortedstreams) + + manifest := "" + for _, k := range sortedstreams { + if strings.HasPrefix(k, prefix) || k == srcpath { + manifest += m[k].normalizedText(relocate + k[len(srcpath):]) + } + } + return manifest +} + +// Extract extracts some or all of the manifest and returns the extracted +// portion as a normalized manifest. This is a swiss army knife function that +// can be several ways: +// +// If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest +// in normalized form. +// +// Extract(".", ".") // return entire normalized manfest text +// +// If 'srcpath' points to a single file, it will return manifest text for just that file. +// The value of "relocate" is can be used to rename the file or set the file stream. +// +// Extract("./foo", ".") // extract file "foo" and put it in stream "." +// Extract("./foo", "./bar") // extract file "foo", rename it to "bar" in stream "." +// Extract("./foo", "./bar/") // extract file "foo", rename it to "./bar/foo" +// Extract("./foo", "./bar/baz") // extract file "foo", rename it to "./bar/baz") +// +// Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place +// them under the path in "relocate". +// +// Extract("./stream", ".") // extract "./stream" to "." and "./stream/subdir" to "./subdir") +// Extract("./stream", "./bar") // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir") +func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) { + segmented, err := m.segment() + if err != nil { + ret.Err = err + return + } + ret.Text = segmented.manifestTextForPath(srcpath, relocate) return } @@ -219,10 +515,11 @@ func (m *Manifest) StreamIter() <-chan ManifestStream { } func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment { - ch := make(chan *FileSegment) + ch := make(chan *FileSegment, 64) + filepath = fixStreamName(filepath) go func() { for stream := range m.StreamIter() { - if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") { + if !strings.HasPrefix(filepath, stream.StreamName+"/") { continue } stream.sendFileSegmentIterByName(filepath, ch) @@ -232,18 +529,26 @@ func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment { return ch } -// Blocks may appear mulitple times within the same manifest if they +// BlockIterWithDuplicates iterates over the block locators of a manifest. +// +// Blocks may appear multiple times within the same manifest if they // are used by multiple files. In that case this Iterator will output // the same block multiple times. +// +// In order to detect parse errors, caller must check m.Err after the returned channel closes. func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator { blockChannel := make(chan blockdigest.BlockLocator) go func(streamChannel <-chan ManifestStream) { - for m := range streamChannel { - for _, block := range m.Blocks { + for ms := range streamChannel { + if ms.Err != nil { + m.Err = ms.Err + continue + } + for _, block := range ms.Blocks { if b, err := blockdigest.ParseBlockLocator(block); err == nil { blockChannel <- b } else { - log.Printf("ERROR: Failed to parse block: %v", err) + m.Err = err } } }