+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
/* Deals with parsing Manifest Text. */
// Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
"errors"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
+ "path"
"regexp"
"sort"
"strconv"
type ManifestStream struct {
StreamName string
Blocks []string
- BlockOffsets []uint64
+ blockOffsets []uint64
FileStreamSegments []FileStreamSegment
Err error
}
// Array of segments referencing file content
-type SegmentedFile []FileSegment
+type segmentedFile []FileSegment
// Map of files to list of file segments referencing file content
-type SegmentedStream map[string]SegmentedFile
+type segmentedStream map[string]segmentedFile
// Map of streams
-type SegmentedManifest map[string]SegmentedStream
+type segmentedManifest map[string]segmentedStream
var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
}
func EscapeName(s string) string {
- return strings.Replace(s, " ", `\040`, -1)
+ raw := []byte(s)
+ escaped := make([]byte, 0, len(s))
+ for _, c := range raw {
+ if c <= 32 {
+ oct := fmt.Sprintf("\\%03o", c)
+ escaped = append(escaped, []byte(oct)...)
+ } else {
+ escaped = append(escaped, c)
+ }
+ }
+ return string(escaped)
}
func UnescapeName(s string) string {
return ch
}
-func FirstBlock(offsets []uint64, range_start uint64) int {
+func firstBlock(offsets []uint64, range_start uint64) int {
// range_start/block_start is the inclusive lower bound
// range_end/block_end is the exclusive upper bound
lo = i
} else {
hi = i
- i = ((hi + lo) / 2)
- block_start = offsets[i]
- block_end = offsets[i+1]
}
+ i = ((hi + lo) / 2)
+ block_start = offsets[i]
+ block_end = offsets[i+1]
}
return i
}
func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
// This is what streamName+"/"+fileName will look like:
- target := filepath
- if !strings.HasPrefix(target, "./") {
- target = "./" + target
- }
+ target := fixStreamName(filepath)
for _, fTok := range s.FileStreamSegments {
wantPos := fTok.SegPos
wantLen := fTok.SegLen
}
// Binary search to determine first block in the stream
- i := FirstBlock(s.BlockOffsets, wantPos)
+ i := firstBlock(s.blockOffsets, wantPos)
if i == -1 {
- // error
- break
+ // Shouldn't happen, file segments are checked in parseManifestStream
+ panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
}
- for i < len(s.Blocks) {
- blockPos := s.BlockOffsets[i]
- blockEnd := s.BlockOffsets[i+1]
+ for ; i < len(s.Blocks); i++ {
+ blockPos := s.blockOffsets[i]
+ blockEnd := s.blockOffsets[i+1]
if blockEnd <= wantPos {
- // current block comes before current file span
- // (shouldn't happen, FirstBlock() should start us
- // on the right block)
- break
+ // Shouldn't happen, FirstBlock() should start
+ // us on the right block, so if this triggers
+ // that means there is a bug.
+ panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos))
}
if blockPos >= wantPos+wantLen {
// current block comes after current file span
fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
}
ch <- &fseg
- i += 1
}
}
}
return
}
- m.BlockOffsets = make([]uint64, len(m.Blocks)+1)
+ m.blockOffsets = make([]uint64, len(m.Blocks)+1)
var streamoffset uint64
for i, b := range m.Blocks {
bl, err := ParseBlockLocator(b)
m.Err = err
return
}
- m.BlockOffsets[i] = streamoffset
+ m.blockOffsets[i] = streamoffset
streamoffset += uint64(bl.Size)
}
- m.BlockOffsets[len(m.Blocks)] = streamoffset
+ m.blockOffsets[len(m.Blocks)] = streamoffset
if len(fileTokens) == 0 {
m.Err = fmt.Errorf("No file tokens found")
m.Err = fmt.Errorf("Invalid file token: %s", ft)
break
}
+ if pft.SegPos+pft.SegLen > streamoffset {
+ m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset)
+ break
+ }
m.FileStreamSegments = append(m.FileStreamSegments, pft)
}
return
}
-func SplitPath(path string) (streamname, filename string) {
- pathIdx := strings.LastIndex(path, "/")
+func fixStreamName(sn string) string {
+ sn = path.Clean(sn)
+ if strings.HasPrefix(sn, "/") {
+ sn = "." + sn
+ } else if sn != "." {
+ sn = "./" + sn
+ }
+ return sn
+}
+
+func splitPath(srcpath string) (streamname, filename string) {
+ pathIdx := strings.LastIndex(srcpath, "/")
if pathIdx >= 0 {
- streamname = path[0:pathIdx]
- filename = path[pathIdx+1:]
+ streamname = srcpath[0:pathIdx]
+ filename = srcpath[pathIdx+1:]
} else {
- streamname = path
+ streamname = srcpath
filename = ""
}
return
}
-func (m *Manifest) SegmentManifest() *SegmentedManifest {
- files := make(SegmentedManifest)
+func (m *Manifest) segment() (*segmentedManifest, error) {
+ files := make(segmentedManifest)
for stream := range m.StreamIter() {
+ if stream.Err != nil {
+ // Stream has an error
+ return nil, stream.Err
+ }
currentStreamfiles := make(map[string]bool)
for _, f := range stream.FileStreamSegments {
sn := stream.StreamName
- if sn != "." && !strings.HasPrefix(sn, "./") {
- sn = "./" + sn
- }
if strings.HasSuffix(sn, "/") {
sn = sn[0 : len(sn)-1]
}
path := sn + "/" + f.Name
- streamname, filename := SplitPath(path)
+ streamname, filename := splitPath(path)
if files[streamname] == nil {
- files[streamname] = make(SegmentedStream)
+ files[streamname] = make(segmentedStream)
}
if !currentStreamfiles[path] {
segs := files[streamname][filename]
for seg := range stream.FileSegmentIterByName(path) {
- segs = append(segs, *seg)
+ if seg.Len > 0 {
+ segs = append(segs, *seg)
+ }
}
files[streamname][filename] = segs
currentStreamfiles[path] = true
}
}
- return &files
+ return &files, nil
}
-func (stream *SegmentedStream) NormalizeStream(name string) string {
+func (stream segmentedStream) normalizedText(name string) string {
var sortedfiles []string
- for k, _ := range *stream {
+ for k := range stream {
sortedfiles = append(sortedfiles, k)
}
sort.Strings(sortedfiles)
stream_tokens := []string{EscapeName(name)}
- blocks := make(map[string]int64)
+ blocks := make(map[blockdigest.BlockDigest]int64)
var streamoffset int64
// Go through each file and add each referenced block exactly once.
for _, streamfile := range sortedfiles {
- for _, segment := range (*stream)[streamfile] {
- if _, ok := blocks[segment.Locator]; !ok {
+ for _, segment := range stream[streamfile] {
+ b, _ := ParseBlockLocator(segment.Locator)
+ if _, ok := blocks[b.Digest]; !ok {
stream_tokens = append(stream_tokens, segment.Locator)
- blocks[segment.Locator] = streamoffset
- b, _ := ParseBlockLocator(segment.Locator)
+ blocks[b.Digest] = streamoffset
streamoffset += int64(b.Size)
}
}
span_start := int64(-1)
span_end := int64(0)
fout := EscapeName(streamfile)
- for _, segment := range (*stream)[streamfile] {
+ for _, segment := range stream[streamfile] {
// Collapse adjacent segments
- streamoffset = blocks[segment.Locator] + int64(segment.Offset)
+ b, _ := ParseBlockLocator(segment.Locator)
+ streamoffset = blocks[b.Digest] + int64(segment.Offset)
if span_start == -1 {
span_start = streamoffset
span_end = streamoffset + int64(segment.Len)
stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
}
- if len((*stream)[streamfile]) == 0 {
+ if len(stream[streamfile]) == 0 {
stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
}
}
return strings.Join(stream_tokens, " ") + "\n"
}
-func (m *Manifest) NormalizeManifest() string {
- segments := m.SegmentManifest()
+func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
+ srcpath = fixStreamName(srcpath)
+
+ var suffix string
+ if strings.HasSuffix(relocate, "/") {
+ suffix = "/"
+ }
+ relocate = fixStreamName(relocate) + suffix
+
+ streamname, filename := splitPath(srcpath)
+
+ if stream, ok := m[streamname]; ok {
+ // check if it refers to a single file in a stream
+ filesegs, okfile := stream[filename]
+ if okfile {
+ newstream := make(segmentedStream)
+ relocate_stream, relocate_filename := splitPath(relocate)
+ if relocate_filename == "" {
+ relocate_filename = filename
+ }
+ newstream[relocate_filename] = filesegs
+ return newstream.normalizedText(relocate_stream)
+ }
+ }
+
+ // Going to extract multiple streams
+ prefix := srcpath + "/"
+
+ if strings.HasSuffix(relocate, "/") {
+ relocate = relocate[0 : len(relocate)-1]
+ }
var sortedstreams []string
- for k, _ := range *segments {
+ for k := range m {
sortedstreams = append(sortedstreams, k)
}
sort.Strings(sortedstreams)
- var manifest string
+ manifest := ""
for _, k := range sortedstreams {
- stream := (*segments)[k]
- manifest += stream.NormalizeStream(k)
+ if strings.HasPrefix(k, prefix) || k == srcpath {
+ manifest += m[k].normalizedText(relocate + k[len(srcpath):])
+ }
}
return manifest
}
-func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
- if path == "" {
- path = "."
- }
- if relocate == "" {
- relocate = "."
- }
-
- streamname, filename := SplitPath(path)
- var relocate_stream, relocate_filename string
- relocate_stream, relocate_filename = SplitPath(relocate)
-
- if stream, ok := (*m)[path]; ok {
- // refers to a single stream
- return stream.NormalizeStream(relocate)
- } else if stream, ok := (*m)[streamname]; ok {
- // refers to a single file in a stream
- newstream := make(SegmentedStream)
- if relocate_filename == "" {
- relocate_filename = filename
- }
- newstream[relocate_filename] = stream[filename]
- return newstream.NormalizeStream(relocate_stream)
- } else {
- // refers to multiple streams
- manifest := ""
- prefix := path
- if !strings.HasSuffix(prefix, "/") {
- prefix += "/"
- }
- if !strings.HasSuffix(relocate, "/") {
- relocate += "/"
- }
-
- var sortedstreams []string
- for k, _ := range *m {
- sortedstreams = append(sortedstreams, k)
- }
- sort.Strings(sortedstreams)
-
- for _, k := range sortedstreams {
- if strings.HasPrefix(k, prefix) {
- v := (*m)[k]
- manifest += v.NormalizeStream(relocate + k[len(prefix):])
- }
- }
- return manifest
+// Extract extracts some or all of the manifest and returns the extracted
+// portion as a normalized manifest. This is a swiss army knife function that
+// can be several ways:
+//
+// If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
+// in normalized form.
+//
+// Extract(".", ".") // return entire normalized manfest text
+//
+// If 'srcpath' points to a single file, it will return manifest text for just that file.
+// The value of "relocate" is can be used to rename the file or set the file stream.
+//
+// Extract("./foo", ".") // extract file "foo" and put it in stream "."
+// Extract("./foo", "./bar") // extract file "foo", rename it to "bar" in stream "."
+// Extract("./foo", "./bar/") // extract file "foo", rename it to "./bar/foo"
+// Extract("./foo", "./bar/baz") // extract file "foo", rename it to "./bar/baz")
+//
+// Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
+// them under the path in "relocate".
+//
+// Extract("./stream", ".") // extract "./stream" to "." and "./stream/subdir" to "./subdir")
+// Extract("./stream", "./bar") // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
+func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
+ segmented, err := m.segment()
+ if err != nil {
+ ret.Err = err
+ return
}
-}
-
-func (m *Manifest) ManifestForPath(path, relocate string) string {
- return m.SegmentManifest().ManifestForPath(path, relocate)
+ ret.Text = segmented.manifestTextForPath(srcpath, relocate)
+ return
}
func (m *Manifest) StreamIter() <-chan ManifestStream {
func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
ch := make(chan *FileSegment, 64)
- if !strings.HasPrefix(filepath, "./") {
- filepath = "./" + filepath
- }
+ filepath = fixStreamName(filepath)
go func() {
for stream := range m.StreamIter() {
if !strings.HasPrefix(filepath, stream.StreamName+"/") {