12483: Simplify extent packing, reduce type casting.
[arvados.git] / sdk / go / manifest / manifest.go
index 362baf88eaeb6476d5fef0db0910f755f72525be..a517c064fb475e2102943a9b4b1b0356b6447d92 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 /* Deals with parsing Manifest Text. */
 
 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
@@ -8,6 +12,7 @@ import (
        "errors"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+       "path"
        "regexp"
        "sort"
        "strconv"
@@ -47,19 +52,19 @@ type FileStreamSegment struct {
 type ManifestStream struct {
        StreamName         string
        Blocks             []string
-       BlockOffsets       []uint64
+       blockOffsets       []uint64
        FileStreamSegments []FileStreamSegment
        Err                error
 }
 
 // Array of segments referencing file content
-type SegmentedFile []FileSegment
+type segmentedFile []FileSegment
 
 // Map of files to list of file segments referencing file content
-type SegmentedStream map[string]SegmentedFile
+type segmentedStream map[string]segmentedFile
 
 // Map of streams
-type SegmentedManifest map[string]SegmentedStream
+type segmentedManifest map[string]segmentedStream
 
 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
 
@@ -76,7 +81,17 @@ func unescapeSeq(seq string) string {
 }
 
 func EscapeName(s string) string {
-       return strings.Replace(s, " ", `\040`, -1)
+       raw := []byte(s)
+       escaped := make([]byte, 0, len(s))
+       for _, c := range raw {
+               if c <= 32 {
+                       oct := fmt.Sprintf("\\%03o", c)
+                       escaped = append(escaped, []byte(oct)...)
+               } else {
+                       escaped = append(escaped, c)
+               }
+       }
+       return string(escaped)
 }
 
 func UnescapeName(s string) string {
@@ -137,7 +152,7 @@ func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegm
        return ch
 }
 
-func FirstBlock(offsets []uint64, range_start uint64) int {
+func firstBlock(offsets []uint64, range_start uint64) int {
        // range_start/block_start is the inclusive lower bound
        // range_end/block_end is the exclusive upper bound
 
@@ -159,20 +174,17 @@ func FirstBlock(offsets []uint64, range_start uint64) int {
                        lo = i
                } else {
                        hi = i
-                       i = ((hi + lo) / 2)
-                       block_start = offsets[i]
-                       block_end = offsets[i+1]
                }
+               i = ((hi + lo) / 2)
+               block_start = offsets[i]
+               block_end = offsets[i+1]
        }
        return i
 }
 
 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
        // This is what streamName+"/"+fileName will look like:
-       target := filepath
-       if !strings.HasPrefix(target, "./") {
-               target = "./" + target
-       }
+       target := fixStreamName(filepath)
        for _, fTok := range s.FileStreamSegments {
                wantPos := fTok.SegPos
                wantLen := fTok.SegLen
@@ -187,19 +199,19 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F
                }
 
                // Binary search to determine first block in the stream
-               i := FirstBlock(s.BlockOffsets, wantPos)
+               i := firstBlock(s.blockOffsets, wantPos)
                if i == -1 {
-                       // error
-                       break
+                       // Shouldn't happen, file segments are checked in parseManifestStream
+                       panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
                }
-               for i < len(s.Blocks) {
-                       blockPos := s.BlockOffsets[i]
-                       blockEnd := s.BlockOffsets[i+1]
+               for ; i < len(s.Blocks); i++ {
+                       blockPos := s.blockOffsets[i]
+                       blockEnd := s.blockOffsets[i+1]
                        if blockEnd <= wantPos {
-                               // current block comes before current file span
-                               // (shouldn't happen, FirstBlock() should start us
-                               // on the right block)
-                               break
+                               // Shouldn't happen, FirstBlock() should start
+                               // us on the right block, so if this triggers
+                               // that means there is a bug.
+                               panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos))
                        }
                        if blockPos >= wantPos+wantLen {
                                // current block comes after current file span
@@ -219,7 +231,6 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F
                                fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
                        }
                        ch <- &fseg
-                       i += 1
                }
        }
 }
@@ -248,7 +259,7 @@ func parseManifestStream(s string) (m ManifestStream) {
                return
        }
 
-       m.BlockOffsets = make([]uint64, len(m.Blocks)+1)
+       m.blockOffsets = make([]uint64, len(m.Blocks)+1)
        var streamoffset uint64
        for i, b := range m.Blocks {
                bl, err := ParseBlockLocator(b)
@@ -256,10 +267,10 @@ func parseManifestStream(s string) (m ManifestStream) {
                        m.Err = err
                        return
                }
-               m.BlockOffsets[i] = streamoffset
+               m.blockOffsets[i] = streamoffset
                streamoffset += uint64(bl.Size)
        }
-       m.BlockOffsets[len(m.Blocks)] = streamoffset
+       m.blockOffsets[len(m.Blocks)] = streamoffset
 
        if len(fileTokens) == 0 {
                m.Err = fmt.Errorf("No file tokens found")
@@ -272,46 +283,63 @@ func parseManifestStream(s string) (m ManifestStream) {
                        m.Err = fmt.Errorf("Invalid file token: %s", ft)
                        break
                }
+               if pft.SegPos+pft.SegLen > streamoffset {
+                       m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset)
+                       break
+               }
                m.FileStreamSegments = append(m.FileStreamSegments, pft)
        }
 
        return
 }
 
-func SplitPath(path string) (streamname, filename string) {
-       pathIdx := strings.LastIndex(path, "/")
+func fixStreamName(sn string) string {
+       sn = path.Clean(sn)
+       if strings.HasPrefix(sn, "/") {
+               sn = "." + sn
+       } else if sn != "." {
+               sn = "./" + sn
+       }
+       return sn
+}
+
+func splitPath(srcpath string) (streamname, filename string) {
+       pathIdx := strings.LastIndex(srcpath, "/")
        if pathIdx >= 0 {
-               streamname = path[0:pathIdx]
-               filename = path[pathIdx+1:]
+               streamname = srcpath[0:pathIdx]
+               filename = srcpath[pathIdx+1:]
        } else {
-               streamname = path
+               streamname = srcpath
                filename = ""
        }
        return
 }
 
-func (m *Manifest) SegmentManifest() *SegmentedManifest {
-       files := make(SegmentedManifest)
+func (m *Manifest) segment() (*segmentedManifest, error) {
+       files := make(segmentedManifest)
 
        for stream := range m.StreamIter() {
+               if stream.Err != nil {
+                       // Stream has an error
+                       return nil, stream.Err
+               }
                currentStreamfiles := make(map[string]bool)
                for _, f := range stream.FileStreamSegments {
                        sn := stream.StreamName
-                       if sn != "." && !strings.HasPrefix(sn, "./") {
-                               sn = "./" + sn
-                       }
                        if strings.HasSuffix(sn, "/") {
                                sn = sn[0 : len(sn)-1]
                        }
                        path := sn + "/" + f.Name
-                       streamname, filename := SplitPath(path)
+                       streamname, filename := splitPath(path)
                        if files[streamname] == nil {
-                               files[streamname] = make(SegmentedStream)
+                               files[streamname] = make(segmentedStream)
                        }
                        if !currentStreamfiles[path] {
                                segs := files[streamname][filename]
                                for seg := range stream.FileSegmentIterByName(path) {
-                                       segs = append(segs, *seg)
+                                       if seg.Len > 0 {
+                                               segs = append(segs, *seg)
+                                       }
                                }
                                files[streamname][filename] = segs
                                currentStreamfiles[path] = true
@@ -319,28 +347,28 @@ func (m *Manifest) SegmentManifest() *SegmentedManifest {
                }
        }
 
-       return &files
+       return &files, nil
 }
 
-func (stream *SegmentedStream) NormalizeStream(name string) string {
+func (stream segmentedStream) normalizedText(name string) string {
        var sortedfiles []string
-       for k, _ := range *stream {
+       for k := range stream {
                sortedfiles = append(sortedfiles, k)
        }
        sort.Strings(sortedfiles)
 
        stream_tokens := []string{EscapeName(name)}
 
-       blocks := make(map[string]int64)
+       blocks := make(map[blockdigest.BlockDigest]int64)
        var streamoffset int64
 
        // Go through each file and add each referenced block exactly once.
        for _, streamfile := range sortedfiles {
-               for _, segment := range (*stream)[streamfile] {
-                       if _, ok := blocks[segment.Locator]; !ok {
+               for _, segment := range stream[streamfile] {
+                       b, _ := ParseBlockLocator(segment.Locator)
+                       if _, ok := blocks[b.Digest]; !ok {
                                stream_tokens = append(stream_tokens, segment.Locator)
-                               blocks[segment.Locator] = streamoffset
-                               b, _ := ParseBlockLocator(segment.Locator)
+                               blocks[b.Digest] = streamoffset
                                streamoffset += int64(b.Size)
                        }
                }
@@ -355,9 +383,10 @@ func (stream *SegmentedStream) NormalizeStream(name string) string {
                span_start := int64(-1)
                span_end := int64(0)
                fout := EscapeName(streamfile)
-               for _, segment := range (*stream)[streamfile] {
+               for _, segment := range stream[streamfile] {
                        // Collapse adjacent segments
-                       streamoffset = blocks[segment.Locator] + int64(segment.Offset)
+                       b, _ := ParseBlockLocator(segment.Locator)
+                       streamoffset = blocks[b.Digest] + int64(segment.Offset)
                        if span_start == -1 {
                                span_start = streamoffset
                                span_end = streamoffset + int64(segment.Len)
@@ -376,7 +405,7 @@ func (stream *SegmentedStream) NormalizeStream(name string) string {
                        stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
                }
 
-               if len((*stream)[streamfile]) == 0 {
+               if len(stream[streamfile]) == 0 {
                        stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
                }
        }
@@ -384,75 +413,83 @@ func (stream *SegmentedStream) NormalizeStream(name string) string {
        return strings.Join(stream_tokens, " ") + "\n"
 }
 
-func (m *Manifest) NormalizeManifest() string {
-       segments := m.SegmentManifest()
+func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
+       srcpath = fixStreamName(srcpath)
+
+       var suffix string
+       if strings.HasSuffix(relocate, "/") {
+               suffix = "/"
+       }
+       relocate = fixStreamName(relocate) + suffix
+
+       streamname, filename := splitPath(srcpath)
+
+       if stream, ok := m[streamname]; ok {
+               // check if it refers to a single file in a stream
+               filesegs, okfile := stream[filename]
+               if okfile {
+                       newstream := make(segmentedStream)
+                       relocate_stream, relocate_filename := splitPath(relocate)
+                       if relocate_filename == "" {
+                               relocate_filename = filename
+                       }
+                       newstream[relocate_filename] = filesegs
+                       return newstream.normalizedText(relocate_stream)
+               }
+       }
+
+       // Going to extract multiple streams
+       prefix := srcpath + "/"
+
+       if strings.HasSuffix(relocate, "/") {
+               relocate = relocate[0 : len(relocate)-1]
+       }
 
        var sortedstreams []string
-       for k, _ := range *segments {
+       for k := range m {
                sortedstreams = append(sortedstreams, k)
        }
        sort.Strings(sortedstreams)
 
-       var manifest string
+       manifest := ""
        for _, k := range sortedstreams {
-               stream := (*segments)[k]
-               manifest += stream.NormalizeStream(k)
+               if strings.HasPrefix(k, prefix) || k == srcpath {
+                       manifest += m[k].normalizedText(relocate + k[len(srcpath):])
+               }
        }
        return manifest
 }
 
-func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
-       if path == "" {
-               path = "."
-       }
-       if relocate == "" {
-               relocate = "."
-       }
-
-       streamname, filename := SplitPath(path)
-       var relocate_stream, relocate_filename string
-       relocate_stream, relocate_filename = SplitPath(relocate)
-
-       if stream, ok := (*m)[path]; ok {
-               // refers to a single stream
-               return stream.NormalizeStream(relocate)
-       } else if stream, ok := (*m)[streamname]; ok {
-               // refers to a single file in a stream
-               newstream := make(SegmentedStream)
-               if relocate_filename == "" {
-                       relocate_filename = filename
-               }
-               newstream[relocate_filename] = stream[filename]
-               return newstream.NormalizeStream(relocate_stream)
-       } else {
-               // refers to multiple streams
-               manifest := ""
-               prefix := path
-               if !strings.HasSuffix(prefix, "/") {
-                       prefix += "/"
-               }
-               if !strings.HasSuffix(relocate, "/") {
-                       relocate += "/"
-               }
-
-               var sortedstreams []string
-               for k, _ := range *m {
-                       sortedstreams = append(sortedstreams, k)
-               }
-               sort.Strings(sortedstreams)
-
-               for _, k := range sortedstreams {
-                       if strings.HasPrefix(k, prefix) {
-                               v := (*m)[k]
-                               manifest += v.NormalizeStream(relocate + k[len(prefix):])
-                       }
-               }
-               return manifest
+// Extract extracts some or all of the manifest and returns the extracted
+// portion as a normalized manifest.  This is a swiss army knife function that
+// can be several ways:
+//
+// If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
+// in normalized form.
+//
+//   Extract(".", ".")  // return entire normalized manfest text
+//
+// If 'srcpath' points to a single file, it will return manifest text for just that file.
+// The value of "relocate" is can be used to rename the file or set the file stream.
+//
+//   Extract("./foo", ".")          // extract file "foo" and put it in stream "."
+//   Extract("./foo", "./bar")      // extract file "foo", rename it to "bar" in stream "."
+//   Extract("./foo", "./bar/")     // extract file "foo", rename it to "./bar/foo"
+//   Extract("./foo", "./bar/baz")  // extract file "foo", rename it to "./bar/baz")
+//
+// Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
+// them under the path in "relocate".
+//
+//   Extract("./stream", ".")      // extract "./stream" to "." and "./stream/subdir" to "./subdir")
+//   Extract("./stream", "./bar")  // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
+func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
+       segmented, err := m.segment()
+       if err != nil {
+               ret.Err = err
+               return
        }
-}
-
-func (m *Manifest) ManifestForPath(path, relocate string) string {
-       return m.SegmentManifest().ManifestForPath(path, relocate)
+       ret.Text = segmented.manifestTextForPath(srcpath, relocate)
+       return
 }
 
 func (m *Manifest) StreamIter() <-chan ManifestStream {
@@ -479,9 +516,7 @@ func (m *Manifest) StreamIter() <-chan ManifestStream {
 
 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
        ch := make(chan *FileSegment, 64)
-       if !strings.HasPrefix(filepath, "./") {
-               filepath = "./" + filepath
-       }
+       filepath = fixStreamName(filepath)
        go func() {
                for stream := range m.StreamIter() {
                        if !strings.HasPrefix(filepath, stream.StreamName+"/") {