+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
/* Deals with parsing Manifest Text. */
// Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
import (
"errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/blockdigest"
"path"
"regexp"
"sort"
"strconv"
"strings"
+
+ "git.arvados.org/arvados.git/sdk/go/blockdigest"
)
var ErrInvalidToken = errors.New("Invalid token")
Name string
}
-// Represents a single line from a manifest.
+// ManifestStream represents a single line from a manifest.
type ManifestStream struct {
StreamName string
Blocks []string
- BlockOffsets []uint64
+ blockOffsets []uint64
FileStreamSegments []FileStreamSegment
Err error
}
return ch
}
-func firstBlock(offsets []uint64, range_start uint64) int {
- // range_start/block_start is the inclusive lower bound
- // range_end/block_end is the exclusive upper bound
+func firstBlock(offsets []uint64, rangeStart uint64) int {
+ // rangeStart/blockStart is the inclusive lower bound
+ // rangeEnd/blockEnd is the exclusive upper bound
hi := len(offsets) - 1
var lo int
i := ((hi + lo) / 2)
- block_start := offsets[i]
- block_end := offsets[i+1]
+ blockStart := offsets[i]
+ blockEnd := offsets[i+1]
// perform a binary search for the first block
- // assumes that all of the blocks are contiguous, so range_start is guaranteed
+ // assumes that all of the blocks are contiguous, so rangeStart is guaranteed
// to either fall into the range of a block or be outside the block range entirely
- for !(range_start >= block_start && range_start < block_end) {
+ for !(rangeStart >= blockStart && rangeStart < blockEnd) {
if lo == i {
// must be out of range, fail
return -1
}
- if range_start > block_start {
+ if rangeStart > blockStart {
lo = i
} else {
hi = i
- i = ((hi + lo) / 2)
- block_start = offsets[i]
- block_end = offsets[i+1]
}
+ i = ((hi + lo) / 2)
+ blockStart = offsets[i]
+ blockEnd = offsets[i+1]
}
return i
}
}
// Binary search to determine first block in the stream
- i := firstBlock(s.BlockOffsets, wantPos)
+ i := firstBlock(s.blockOffsets, wantPos)
if i == -1 {
// Shouldn't happen, file segments are checked in parseManifestStream
panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
}
for ; i < len(s.Blocks); i++ {
- blockPos := s.BlockOffsets[i]
- blockEnd := s.BlockOffsets[i+1]
+ blockPos := s.blockOffsets[i]
+ blockEnd := s.blockOffsets[i+1]
if blockEnd <= wantPos {
// Shouldn't happen, FirstBlock() should start
// us on the right block, so if this triggers
return
}
- m.BlockOffsets = make([]uint64, len(m.Blocks)+1)
+ m.blockOffsets = make([]uint64, len(m.Blocks)+1)
var streamoffset uint64
for i, b := range m.Blocks {
bl, err := ParseBlockLocator(b)
m.Err = err
return
}
- m.BlockOffsets[i] = streamoffset
+ m.blockOffsets[i] = streamoffset
streamoffset += uint64(bl.Size)
}
- m.BlockOffsets[len(m.Blocks)] = streamoffset
+ m.blockOffsets[len(m.Blocks)] = streamoffset
if len(fileTokens) == 0 {
m.Err = fmt.Errorf("No file tokens found")
return
}
-func (m *Manifest) segment() *segmentedManifest {
+func (m *Manifest) segment() (*segmentedManifest, error) {
files := make(segmentedManifest)
for stream := range m.StreamIter() {
if stream.Err != nil {
- // Skip streams with errors
- continue
+ // Stream has an error
+ return nil, stream.Err
}
currentStreamfiles := make(map[string]bool)
for _, f := range stream.FileStreamSegments {
}
}
- return &files
+ return &files, nil
}
func (stream segmentedStream) normalizedText(name string) string {
var sortedfiles []string
- for k, _ := range stream {
+ for k := range stream {
sortedfiles = append(sortedfiles, k)
}
sort.Strings(sortedfiles)
- stream_tokens := []string{EscapeName(name)}
+ streamTokens := []string{EscapeName(name)}
- blocks := make(map[string]int64)
+ blocks := make(map[blockdigest.BlockDigest]int64)
var streamoffset int64
// Go through each file and add each referenced block exactly once.
for _, streamfile := range sortedfiles {
for _, segment := range stream[streamfile] {
- if _, ok := blocks[segment.Locator]; !ok {
- stream_tokens = append(stream_tokens, segment.Locator)
- blocks[segment.Locator] = streamoffset
- b, _ := ParseBlockLocator(segment.Locator)
+ b, _ := ParseBlockLocator(segment.Locator)
+ if _, ok := blocks[b.Digest]; !ok {
+ streamTokens = append(streamTokens, segment.Locator)
+ blocks[b.Digest] = streamoffset
streamoffset += int64(b.Size)
}
}
}
- if len(stream_tokens) == 1 {
- stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
+ if len(streamTokens) == 1 {
+ streamTokens = append(streamTokens, "d41d8cd98f00b204e9800998ecf8427e+0")
}
for _, streamfile := range sortedfiles {
// Add in file segments
- span_start := int64(-1)
- span_end := int64(0)
+ spanStart := int64(-1)
+ spanEnd := int64(0)
fout := EscapeName(streamfile)
for _, segment := range stream[streamfile] {
// Collapse adjacent segments
- streamoffset = blocks[segment.Locator] + int64(segment.Offset)
- if span_start == -1 {
- span_start = streamoffset
- span_end = streamoffset + int64(segment.Len)
+ b, _ := ParseBlockLocator(segment.Locator)
+ streamoffset = blocks[b.Digest] + int64(segment.Offset)
+ if spanStart == -1 {
+ spanStart = streamoffset
+ spanEnd = streamoffset + int64(segment.Len)
} else {
- if streamoffset == span_end {
- span_end += int64(segment.Len)
+ if streamoffset == spanEnd {
+ spanEnd += int64(segment.Len)
} else {
- stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
- span_start = streamoffset
- span_end = streamoffset + int64(segment.Len)
+ streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
+ spanStart = streamoffset
+ spanEnd = streamoffset + int64(segment.Len)
}
}
}
- if span_start != -1 {
- stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
+ if spanStart != -1 {
+ streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
}
if len(stream[streamfile]) == 0 {
- stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
+ streamTokens = append(streamTokens, fmt.Sprintf("0:0:%s", fout))
}
}
- return strings.Join(stream_tokens, " ") + "\n"
+ return strings.Join(streamTokens, " ") + "\n"
}
func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
filesegs, okfile := stream[filename]
if okfile {
newstream := make(segmentedStream)
- relocate_stream, relocate_filename := splitPath(relocate)
- if relocate_filename == "" {
- relocate_filename = filename
+ relocateStream, relocateFilename := splitPath(relocate)
+ if relocateFilename == "" {
+ relocateFilename = filename
}
- newstream[relocate_filename] = filesegs
- return newstream.normalizedText(relocate_stream)
+ newstream[relocateFilename] = filesegs
+ return newstream.normalizedText(relocateStream)
}
}
}
var sortedstreams []string
- for k, _ := range m {
+ for k := range m {
sortedstreams = append(sortedstreams, k)
}
sort.Strings(sortedstreams)
return manifest
}
-// ManifestTextForPath extracts some or all of the manifest and returns
-// normalized manifest text. This is a swiss army knife function that can be
-// used a couple of different ways:
+// Extract extracts some or all of the manifest and returns the extracted
+// portion as a normalized manifest. This is a swiss army knife function that
+// can be several ways:
+//
+// If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
+// in normalized form.
+//
+// Extract(".", ".") // return entire normalized manfest text
//
// If 'srcpath' points to a single file, it will return manifest text for just that file.
// The value of "relocate" is can be used to rename the file or set the file stream.
//
-// ManifestTextForPath("./foo", ".") (extract file "foo" and put it in stream ".")
-// ManifestTextForPath("./foo", "./bar") (extract file "foo", rename it to "bar" in stream ".")
-// ManifestTextForPath("./foo", "./bar/") (extract file "foo", rename it to "./bar/foo")
-// ManifestTextForPath("./foo", "./bar/baz") (extract file "foo", rename it to "./bar/baz")
+// Extract("./foo", ".") // extract file "foo" and put it in stream "."
+// Extract("./foo", "./bar") // extract file "foo", rename it to "bar" in stream "."
+// Extract("./foo", "./bar/") // extract file "foo", rename it to "./bar/foo"
+// Extract("./foo", "./bar/baz") // extract file "foo", rename it to "./bar/baz")
//
// Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
// them under the path in "relocate".
//
-// ManifestTextForPath(".", ".") (return entire normalized manfest text)
-// ManifestTextForPath("./stream", ".") (extract "./stream" to "." and "./stream/subdir" to "./subdir")
-// ManifestTextForPath("./stream", "./bar") (extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
-func (m *Manifest) ManifestTextForPath(srcpath, relocate string) string {
- return m.segment().manifestTextForPath(srcpath, relocate)
-}
-
-// NormalizedText returns the manifest text in normalized form.
-func (m *Manifest) NormalizedText() string {
- return m.ManifestTextForPath(".", ".")
+// Extract("./stream", ".") // extract "./stream" to "." and "./stream/subdir" to "./subdir")
+// Extract("./stream", "./bar") // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
+func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
+ segmented, err := m.segment()
+ if err != nil {
+ ret.Err = err
+ return
+ }
+ ret.Text = segmented.manifestTextForPath(srcpath, relocate)
+ return
}
func (m *Manifest) StreamIter() <-chan ManifestStream {
return ch
}
+// BlockIterWithDuplicates iterates over the block locators of a manifest.
+//
// Blocks may appear multiple times within the same manifest if they
// are used by multiple files. In that case this Iterator will output
// the same block multiple times.