X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ab6a70e86dd041f3b4da167c59e3e91309f14365..6ce00fb7121813f187b555435a3f01c2aa380f93:/sdk/go/manifest/manifest.go?ds=sidebyside diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go index 362baf88ea..a517c064fb 100644 --- a/sdk/go/manifest/manifest.go +++ b/sdk/go/manifest/manifest.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + /* Deals with parsing Manifest Text. */ // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb @@ -8,6 +12,7 @@ import ( "errors" "fmt" "git.curoverse.com/arvados.git/sdk/go/blockdigest" + "path" "regexp" "sort" "strconv" @@ -47,19 +52,19 @@ type FileStreamSegment struct { type ManifestStream struct { StreamName string Blocks []string - BlockOffsets []uint64 + blockOffsets []uint64 FileStreamSegments []FileStreamSegment Err error } // Array of segments referencing file content -type SegmentedFile []FileSegment +type segmentedFile []FileSegment // Map of files to list of file segments referencing file content -type SegmentedStream map[string]SegmentedFile +type segmentedStream map[string]segmentedFile // Map of streams -type SegmentedManifest map[string]SegmentedStream +type segmentedManifest map[string]segmentedStream var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`) @@ -76,7 +81,17 @@ func unescapeSeq(seq string) string { } func EscapeName(s string) string { - return strings.Replace(s, " ", `\040`, -1) + raw := []byte(s) + escaped := make([]byte, 0, len(s)) + for _, c := range raw { + if c <= 32 { + oct := fmt.Sprintf("\\%03o", c) + escaped = append(escaped, []byte(oct)...) + } else { + escaped = append(escaped, c) + } + } + return string(escaped) } func UnescapeName(s string) string { @@ -137,7 +152,7 @@ func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegm return ch } -func FirstBlock(offsets []uint64, range_start uint64) int { +func firstBlock(offsets []uint64, range_start uint64) int { // range_start/block_start is the inclusive lower bound // range_end/block_end is the exclusive upper bound @@ -159,20 +174,17 @@ func FirstBlock(offsets []uint64, range_start uint64) int { lo = i } else { hi = i - i = ((hi + lo) / 2) - block_start = offsets[i] - block_end = offsets[i+1] } + i = ((hi + lo) / 2) + block_start = offsets[i] + block_end = offsets[i+1] } return i } func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) { // This is what streamName+"/"+fileName will look like: - target := filepath - if !strings.HasPrefix(target, "./") { - target = "./" + target - } + target := fixStreamName(filepath) for _, fTok := range s.FileStreamSegments { wantPos := fTok.SegPos wantLen := fTok.SegLen @@ -187,19 +199,19 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F } // Binary search to determine first block in the stream - i := FirstBlock(s.BlockOffsets, wantPos) + i := firstBlock(s.blockOffsets, wantPos) if i == -1 { - // error - break + // Shouldn't happen, file segments are checked in parseManifestStream + panic(fmt.Sprintf("File segment %v extends past end of stream", fTok)) } - for i < len(s.Blocks) { - blockPos := s.BlockOffsets[i] - blockEnd := s.BlockOffsets[i+1] + for ; i < len(s.Blocks); i++ { + blockPos := s.blockOffsets[i] + blockEnd := s.blockOffsets[i+1] if blockEnd <= wantPos { - // current block comes before current file span - // (shouldn't happen, FirstBlock() should start us - // on the right block) - break + // Shouldn't happen, FirstBlock() should start + // us on the right block, so if this triggers + // that means there is a bug. + panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos)) } if blockPos >= wantPos+wantLen { // current block comes after current file span @@ -219,7 +231,6 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset } ch <- &fseg - i += 1 } } } @@ -248,7 +259,7 @@ func parseManifestStream(s string) (m ManifestStream) { return } - m.BlockOffsets = make([]uint64, len(m.Blocks)+1) + m.blockOffsets = make([]uint64, len(m.Blocks)+1) var streamoffset uint64 for i, b := range m.Blocks { bl, err := ParseBlockLocator(b) @@ -256,10 +267,10 @@ func parseManifestStream(s string) (m ManifestStream) { m.Err = err return } - m.BlockOffsets[i] = streamoffset + m.blockOffsets[i] = streamoffset streamoffset += uint64(bl.Size) } - m.BlockOffsets[len(m.Blocks)] = streamoffset + m.blockOffsets[len(m.Blocks)] = streamoffset if len(fileTokens) == 0 { m.Err = fmt.Errorf("No file tokens found") @@ -272,46 +283,63 @@ func parseManifestStream(s string) (m ManifestStream) { m.Err = fmt.Errorf("Invalid file token: %s", ft) break } + if pft.SegPos+pft.SegLen > streamoffset { + m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset) + break + } m.FileStreamSegments = append(m.FileStreamSegments, pft) } return } -func SplitPath(path string) (streamname, filename string) { - pathIdx := strings.LastIndex(path, "/") +func fixStreamName(sn string) string { + sn = path.Clean(sn) + if strings.HasPrefix(sn, "/") { + sn = "." + sn + } else if sn != "." { + sn = "./" + sn + } + return sn +} + +func splitPath(srcpath string) (streamname, filename string) { + pathIdx := strings.LastIndex(srcpath, "/") if pathIdx >= 0 { - streamname = path[0:pathIdx] - filename = path[pathIdx+1:] + streamname = srcpath[0:pathIdx] + filename = srcpath[pathIdx+1:] } else { - streamname = path + streamname = srcpath filename = "" } return } -func (m *Manifest) SegmentManifest() *SegmentedManifest { - files := make(SegmentedManifest) +func (m *Manifest) segment() (*segmentedManifest, error) { + files := make(segmentedManifest) for stream := range m.StreamIter() { + if stream.Err != nil { + // Stream has an error + return nil, stream.Err + } currentStreamfiles := make(map[string]bool) for _, f := range stream.FileStreamSegments { sn := stream.StreamName - if sn != "." && !strings.HasPrefix(sn, "./") { - sn = "./" + sn - } if strings.HasSuffix(sn, "/") { sn = sn[0 : len(sn)-1] } path := sn + "/" + f.Name - streamname, filename := SplitPath(path) + streamname, filename := splitPath(path) if files[streamname] == nil { - files[streamname] = make(SegmentedStream) + files[streamname] = make(segmentedStream) } if !currentStreamfiles[path] { segs := files[streamname][filename] for seg := range stream.FileSegmentIterByName(path) { - segs = append(segs, *seg) + if seg.Len > 0 { + segs = append(segs, *seg) + } } files[streamname][filename] = segs currentStreamfiles[path] = true @@ -319,28 +347,28 @@ func (m *Manifest) SegmentManifest() *SegmentedManifest { } } - return &files + return &files, nil } -func (stream *SegmentedStream) NormalizeStream(name string) string { +func (stream segmentedStream) normalizedText(name string) string { var sortedfiles []string - for k, _ := range *stream { + for k := range stream { sortedfiles = append(sortedfiles, k) } sort.Strings(sortedfiles) stream_tokens := []string{EscapeName(name)} - blocks := make(map[string]int64) + blocks := make(map[blockdigest.BlockDigest]int64) var streamoffset int64 // Go through each file and add each referenced block exactly once. for _, streamfile := range sortedfiles { - for _, segment := range (*stream)[streamfile] { - if _, ok := blocks[segment.Locator]; !ok { + for _, segment := range stream[streamfile] { + b, _ := ParseBlockLocator(segment.Locator) + if _, ok := blocks[b.Digest]; !ok { stream_tokens = append(stream_tokens, segment.Locator) - blocks[segment.Locator] = streamoffset - b, _ := ParseBlockLocator(segment.Locator) + blocks[b.Digest] = streamoffset streamoffset += int64(b.Size) } } @@ -355,9 +383,10 @@ func (stream *SegmentedStream) NormalizeStream(name string) string { span_start := int64(-1) span_end := int64(0) fout := EscapeName(streamfile) - for _, segment := range (*stream)[streamfile] { + for _, segment := range stream[streamfile] { // Collapse adjacent segments - streamoffset = blocks[segment.Locator] + int64(segment.Offset) + b, _ := ParseBlockLocator(segment.Locator) + streamoffset = blocks[b.Digest] + int64(segment.Offset) if span_start == -1 { span_start = streamoffset span_end = streamoffset + int64(segment.Len) @@ -376,7 +405,7 @@ func (stream *SegmentedStream) NormalizeStream(name string) string { stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout)) } - if len((*stream)[streamfile]) == 0 { + if len(stream[streamfile]) == 0 { stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout)) } } @@ -384,75 +413,83 @@ func (stream *SegmentedStream) NormalizeStream(name string) string { return strings.Join(stream_tokens, " ") + "\n" } -func (m *Manifest) NormalizeManifest() string { - segments := m.SegmentManifest() +func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string { + srcpath = fixStreamName(srcpath) + + var suffix string + if strings.HasSuffix(relocate, "/") { + suffix = "/" + } + relocate = fixStreamName(relocate) + suffix + + streamname, filename := splitPath(srcpath) + + if stream, ok := m[streamname]; ok { + // check if it refers to a single file in a stream + filesegs, okfile := stream[filename] + if okfile { + newstream := make(segmentedStream) + relocate_stream, relocate_filename := splitPath(relocate) + if relocate_filename == "" { + relocate_filename = filename + } + newstream[relocate_filename] = filesegs + return newstream.normalizedText(relocate_stream) + } + } + + // Going to extract multiple streams + prefix := srcpath + "/" + + if strings.HasSuffix(relocate, "/") { + relocate = relocate[0 : len(relocate)-1] + } var sortedstreams []string - for k, _ := range *segments { + for k := range m { sortedstreams = append(sortedstreams, k) } sort.Strings(sortedstreams) - var manifest string + manifest := "" for _, k := range sortedstreams { - stream := (*segments)[k] - manifest += stream.NormalizeStream(k) + if strings.HasPrefix(k, prefix) || k == srcpath { + manifest += m[k].normalizedText(relocate + k[len(srcpath):]) + } } return manifest } -func (m *SegmentedManifest) ManifestForPath(path, relocate string) string { - if path == "" { - path = "." - } - if relocate == "" { - relocate = "." - } - - streamname, filename := SplitPath(path) - var relocate_stream, relocate_filename string - relocate_stream, relocate_filename = SplitPath(relocate) - - if stream, ok := (*m)[path]; ok { - // refers to a single stream - return stream.NormalizeStream(relocate) - } else if stream, ok := (*m)[streamname]; ok { - // refers to a single file in a stream - newstream := make(SegmentedStream) - if relocate_filename == "" { - relocate_filename = filename - } - newstream[relocate_filename] = stream[filename] - return newstream.NormalizeStream(relocate_stream) - } else { - // refers to multiple streams - manifest := "" - prefix := path - if !strings.HasSuffix(prefix, "/") { - prefix += "/" - } - if !strings.HasSuffix(relocate, "/") { - relocate += "/" - } - - var sortedstreams []string - for k, _ := range *m { - sortedstreams = append(sortedstreams, k) - } - sort.Strings(sortedstreams) - - for _, k := range sortedstreams { - if strings.HasPrefix(k, prefix) { - v := (*m)[k] - manifest += v.NormalizeStream(relocate + k[len(prefix):]) - } - } - return manifest +// Extract extracts some or all of the manifest and returns the extracted +// portion as a normalized manifest. This is a swiss army knife function that +// can be several ways: +// +// If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest +// in normalized form. +// +// Extract(".", ".") // return entire normalized manfest text +// +// If 'srcpath' points to a single file, it will return manifest text for just that file. +// The value of "relocate" is can be used to rename the file or set the file stream. +// +// Extract("./foo", ".") // extract file "foo" and put it in stream "." +// Extract("./foo", "./bar") // extract file "foo", rename it to "bar" in stream "." +// Extract("./foo", "./bar/") // extract file "foo", rename it to "./bar/foo" +// Extract("./foo", "./bar/baz") // extract file "foo", rename it to "./bar/baz") +// +// Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place +// them under the path in "relocate". +// +// Extract("./stream", ".") // extract "./stream" to "." and "./stream/subdir" to "./subdir") +// Extract("./stream", "./bar") // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir") +func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) { + segmented, err := m.segment() + if err != nil { + ret.Err = err + return } -} - -func (m *Manifest) ManifestForPath(path, relocate string) string { - return m.SegmentManifest().ManifestForPath(path, relocate) + ret.Text = segmented.manifestTextForPath(srcpath, relocate) + return } func (m *Manifest) StreamIter() <-chan ManifestStream { @@ -479,9 +516,7 @@ func (m *Manifest) StreamIter() <-chan ManifestStream { func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment { ch := make(chan *FileSegment, 64) - if !strings.HasPrefix(filepath, "./") { - filepath = "./" + filepath - } + filepath = fixStreamName(filepath) go func() { for stream := range m.StreamIter() { if !strings.HasPrefix(filepath, stream.StreamName+"/") {