X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2b7834020290b28d797333f90fcb87e5da67d616..42bf31f017a009585eaac2fe44a83b2596b3e5c8:/sdk/go/manifest/manifest.go diff --git a/sdk/go/manifest/manifest.go b/sdk/go/manifest/manifest.go index 208cfdd411..954fb710c0 100644 --- a/sdk/go/manifest/manifest.go +++ b/sdk/go/manifest/manifest.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + /* Deals with parsing Manifest Text. */ // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb @@ -7,7 +11,7 @@ package manifest import ( "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/blockdigest" + "git.arvados.org/arvados.git/sdk/go/blockdigest" "path" "regexp" "sort" @@ -44,11 +48,11 @@ type FileStreamSegment struct { Name string } -// Represents a single line from a manifest. +// ManifestStream represents a single line from a manifest. type ManifestStream struct { StreamName string Blocks []string - BlockOffsets []uint64 + blockOffsets []uint64 FileStreamSegments []FileStreamSegment Err error } @@ -148,32 +152,32 @@ func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegm return ch } -func firstBlock(offsets []uint64, range_start uint64) int { - // range_start/block_start is the inclusive lower bound - // range_end/block_end is the exclusive upper bound +func firstBlock(offsets []uint64, rangeStart uint64) int { + // rangeStart/blockStart is the inclusive lower bound + // rangeEnd/blockEnd is the exclusive upper bound hi := len(offsets) - 1 var lo int i := ((hi + lo) / 2) - block_start := offsets[i] - block_end := offsets[i+1] + blockStart := offsets[i] + blockEnd := offsets[i+1] // perform a binary search for the first block - // assumes that all of the blocks are contiguous, so range_start is guaranteed + // assumes that all of the blocks are contiguous, so rangeStart is guaranteed // to either fall into the range of a block or be outside the block range entirely - for !(range_start >= block_start && range_start < block_end) { + for !(rangeStart >= blockStart && rangeStart < blockEnd) { if lo == i { // must be out of range, fail return -1 } - if range_start > block_start { + if rangeStart > blockStart { lo = i } else { hi = i - i = ((hi + lo) / 2) - block_start = offsets[i] - block_end = offsets[i+1] } + i = ((hi + lo) / 2) + blockStart = offsets[i] + blockEnd = offsets[i+1] } return i } @@ -195,14 +199,14 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F } // Binary search to determine first block in the stream - i := firstBlock(s.BlockOffsets, wantPos) + i := firstBlock(s.blockOffsets, wantPos) if i == -1 { // Shouldn't happen, file segments are checked in parseManifestStream panic(fmt.Sprintf("File segment %v extends past end of stream", fTok)) } - for i < len(s.Blocks) { - blockPos := s.BlockOffsets[i] - blockEnd := s.BlockOffsets[i+1] + for ; i < len(s.Blocks); i++ { + blockPos := s.blockOffsets[i] + blockEnd := s.blockOffsets[i+1] if blockEnd <= wantPos { // Shouldn't happen, FirstBlock() should start // us on the right block, so if this triggers @@ -227,7 +231,6 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset } ch <- &fseg - i += 1 } } } @@ -256,7 +259,7 @@ func parseManifestStream(s string) (m ManifestStream) { return } - m.BlockOffsets = make([]uint64, len(m.Blocks)+1) + m.blockOffsets = make([]uint64, len(m.Blocks)+1) var streamoffset uint64 for i, b := range m.Blocks { bl, err := ParseBlockLocator(b) @@ -264,10 +267,10 @@ func parseManifestStream(s string) (m ManifestStream) { m.Err = err return } - m.BlockOffsets[i] = streamoffset + m.blockOffsets[i] = streamoffset streamoffset += uint64(bl.Size) } - m.BlockOffsets[len(m.Blocks)] = streamoffset + m.blockOffsets[len(m.Blocks)] = streamoffset if len(fileTokens) == 0 { m.Err = fmt.Errorf("No file tokens found") @@ -312,13 +315,13 @@ func splitPath(srcpath string) (streamname, filename string) { return } -func (m *Manifest) segment() *segmentedManifest { +func (m *Manifest) segment() (*segmentedManifest, error) { files := make(segmentedManifest) for stream := range m.StreamIter() { if stream.Err != nil { - // Skip streams with errors - continue + // Stream has an error + return nil, stream.Err } currentStreamfiles := make(map[string]bool) for _, f := range stream.FileStreamSegments { @@ -344,69 +347,70 @@ func (m *Manifest) segment() *segmentedManifest { } } - return &files + return &files, nil } func (stream segmentedStream) normalizedText(name string) string { var sortedfiles []string - for k, _ := range stream { + for k := range stream { sortedfiles = append(sortedfiles, k) } sort.Strings(sortedfiles) - stream_tokens := []string{EscapeName(name)} + streamTokens := []string{EscapeName(name)} - blocks := make(map[string]int64) + blocks := make(map[blockdigest.BlockDigest]int64) var streamoffset int64 // Go through each file and add each referenced block exactly once. for _, streamfile := range sortedfiles { for _, segment := range stream[streamfile] { - if _, ok := blocks[segment.Locator]; !ok { - stream_tokens = append(stream_tokens, segment.Locator) - blocks[segment.Locator] = streamoffset - b, _ := ParseBlockLocator(segment.Locator) + b, _ := ParseBlockLocator(segment.Locator) + if _, ok := blocks[b.Digest]; !ok { + streamTokens = append(streamTokens, segment.Locator) + blocks[b.Digest] = streamoffset streamoffset += int64(b.Size) } } } - if len(stream_tokens) == 1 { - stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0") + if len(streamTokens) == 1 { + streamTokens = append(streamTokens, "d41d8cd98f00b204e9800998ecf8427e+0") } for _, streamfile := range sortedfiles { // Add in file segments - span_start := int64(-1) - span_end := int64(0) + spanStart := int64(-1) + spanEnd := int64(0) fout := EscapeName(streamfile) for _, segment := range stream[streamfile] { // Collapse adjacent segments - streamoffset = blocks[segment.Locator] + int64(segment.Offset) - if span_start == -1 { - span_start = streamoffset - span_end = streamoffset + int64(segment.Len) + b, _ := ParseBlockLocator(segment.Locator) + streamoffset = blocks[b.Digest] + int64(segment.Offset) + if spanStart == -1 { + spanStart = streamoffset + spanEnd = streamoffset + int64(segment.Len) } else { - if streamoffset == span_end { - span_end += int64(segment.Len) + if streamoffset == spanEnd { + spanEnd += int64(segment.Len) } else { - stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout)) - span_start = streamoffset - span_end = streamoffset + int64(segment.Len) + streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout)) + spanStart = streamoffset + spanEnd = streamoffset + int64(segment.Len) } } } - if span_start != -1 { - stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout)) + if spanStart != -1 { + streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout)) } if len(stream[streamfile]) == 0 { - stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout)) + streamTokens = append(streamTokens, fmt.Sprintf("0:0:%s", fout)) } } - return strings.Join(stream_tokens, " ") + "\n" + return strings.Join(streamTokens, " ") + "\n" } func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string { @@ -425,12 +429,12 @@ func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string filesegs, okfile := stream[filename] if okfile { newstream := make(segmentedStream) - relocate_stream, relocate_filename := splitPath(relocate) - if relocate_filename == "" { - relocate_filename = filename + relocateStream, relocateFilename := splitPath(relocate) + if relocateFilename == "" { + relocateFilename = filename } - newstream[relocate_filename] = filesegs - return newstream.normalizedText(relocate_stream) + newstream[relocateFilename] = filesegs + return newstream.normalizedText(relocateStream) } } @@ -442,7 +446,7 @@ func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string } var sortedstreams []string - for k, _ := range m { + for k := range m { sortedstreams = append(sortedstreams, k) } sort.Strings(sortedstreams) @@ -456,31 +460,36 @@ func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string return manifest } -// ManifestTextForPath extracts some or all of the manifest and returns -// normalized manifest text. This is a swiss army knife function that can be -// used a couple of different ways: +// Extract extracts some or all of the manifest and returns the extracted +// portion as a normalized manifest. This is a swiss army knife function that +// can be several ways: +// +// If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest +// in normalized form. +// +// Extract(".", ".") // return entire normalized manfest text // // If 'srcpath' points to a single file, it will return manifest text for just that file. // The value of "relocate" is can be used to rename the file or set the file stream. // -// ManifestTextForPath("./foo", ".") (extract file "foo" and put it in stream ".") -// ManifestTextForPath("./foo", "./bar") (extract file "foo", rename it to "bar" in stream ".") -// ManifestTextForPath("./foo", "./bar/") (extract file "foo", rename it to "./bar/foo") -// ManifestTextForPath("./foo", "./bar/baz") (extract file "foo", rename it to "./bar/baz") +// Extract("./foo", ".") // extract file "foo" and put it in stream "." +// Extract("./foo", "./bar") // extract file "foo", rename it to "bar" in stream "." +// Extract("./foo", "./bar/") // extract file "foo", rename it to "./bar/foo" +// Extract("./foo", "./bar/baz") // extract file "foo", rename it to "./bar/baz") // // Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place // them under the path in "relocate". // -// ManifestTextForPath(".", ".") (return entire normalized manfest text) -// ManifestTextForPath("./stream", ".") (extract "./stream" to "." and "./stream/subdir" to "./subdir") -// ManifestTextForPath("./stream", "./bar") (extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir") -func (m *Manifest) ManifestTextForPath(srcpath, relocate string) string { - return m.segment().manifestTextForPath(srcpath, relocate) -} - -// NormalizedText returns the manifest text in normalized form. -func (m *Manifest) NormalizedText() string { - return m.ManifestTextForPath(".", ".") +// Extract("./stream", ".") // extract "./stream" to "." and "./stream/subdir" to "./subdir") +// Extract("./stream", "./bar") // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir") +func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) { + segmented, err := m.segment() + if err != nil { + ret.Err = err + return + } + ret.Text = segmented.manifestTextForPath(srcpath, relocate) + return } func (m *Manifest) StreamIter() <-chan ManifestStream { @@ -520,6 +529,8 @@ func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment { return ch } +// BlockIterWithDuplicates iterates over the block locators of a manifest. +// // Blocks may appear multiple times within the same manifest if they // are used by multiple files. In that case this Iterator will output // the same block multiple times.