17395: Add OutputStorageClasses support to crunch-run
[arvados.git] / sdk / go / manifest / manifest.go
index e8be7a2308c089803d227c7d50c19255b981dbd5..954fb710c0596a4580f76bf2a78945e39203f2f6 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 /* Deals with parsing Manifest Text. */
 
 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
@@ -7,7 +11,7 @@ package manifest
 import (
        "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/blockdigest"
+       "git.arvados.org/arvados.git/sdk/go/blockdigest"
        "path"
        "regexp"
        "sort"
@@ -44,11 +48,11 @@ type FileStreamSegment struct {
        Name   string
 }
 
-// Represents a single line from a manifest.
+// ManifestStream represents a single line from a manifest.
 type ManifestStream struct {
        StreamName         string
        Blocks             []string
-       BlockOffsets       []uint64
+       blockOffsets       []uint64
        FileStreamSegments []FileStreamSegment
        Err                error
 }
@@ -148,32 +152,32 @@ func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegm
        return ch
 }
 
-func firstBlock(offsets []uint64, range_start uint64) int {
-       // range_start/block_start is the inclusive lower bound
-       // range_end/block_end is the exclusive upper bound
+func firstBlock(offsets []uint64, rangeStart uint64) int {
+       // rangeStart/blockStart is the inclusive lower bound
+       // rangeEnd/blockEnd is the exclusive upper bound
 
        hi := len(offsets) - 1
        var lo int
        i := ((hi + lo) / 2)
-       block_start := offsets[i]
-       block_end := offsets[i+1]
+       blockStart := offsets[i]
+       blockEnd := offsets[i+1]
 
        // perform a binary search for the first block
-       // assumes that all of the blocks are contiguous, so range_start is guaranteed
+       // assumes that all of the blocks are contiguous, so rangeStart is guaranteed
        // to either fall into the range of a block or be outside the block range entirely
-       for !(range_start >= block_start && range_start < block_end) {
+       for !(rangeStart >= blockStart && rangeStart < blockEnd) {
                if lo == i {
                        // must be out of range, fail
                        return -1
                }
-               if range_start > block_start {
+               if rangeStart > blockStart {
                        lo = i
                } else {
                        hi = i
-                       i = ((hi + lo) / 2)
-                       block_start = offsets[i]
-                       block_end = offsets[i+1]
                }
+               i = ((hi + lo) / 2)
+               blockStart = offsets[i]
+               blockEnd = offsets[i+1]
        }
        return i
 }
@@ -195,14 +199,14 @@ func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F
                }
 
                // Binary search to determine first block in the stream
-               i := firstBlock(s.BlockOffsets, wantPos)
+               i := firstBlock(s.blockOffsets, wantPos)
                if i == -1 {
                        // Shouldn't happen, file segments are checked in parseManifestStream
                        panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
                }
                for ; i < len(s.Blocks); i++ {
-                       blockPos := s.BlockOffsets[i]
-                       blockEnd := s.BlockOffsets[i+1]
+                       blockPos := s.blockOffsets[i]
+                       blockEnd := s.blockOffsets[i+1]
                        if blockEnd <= wantPos {
                                // Shouldn't happen, FirstBlock() should start
                                // us on the right block, so if this triggers
@@ -255,7 +259,7 @@ func parseManifestStream(s string) (m ManifestStream) {
                return
        }
 
-       m.BlockOffsets = make([]uint64, len(m.Blocks)+1)
+       m.blockOffsets = make([]uint64, len(m.Blocks)+1)
        var streamoffset uint64
        for i, b := range m.Blocks {
                bl, err := ParseBlockLocator(b)
@@ -263,10 +267,10 @@ func parseManifestStream(s string) (m ManifestStream) {
                        m.Err = err
                        return
                }
-               m.BlockOffsets[i] = streamoffset
+               m.blockOffsets[i] = streamoffset
                streamoffset += uint64(bl.Size)
        }
-       m.BlockOffsets[len(m.Blocks)] = streamoffset
+       m.blockOffsets[len(m.Blocks)] = streamoffset
 
        if len(fileTokens) == 0 {
                m.Err = fmt.Errorf("No file tokens found")
@@ -311,13 +315,13 @@ func splitPath(srcpath string) (streamname, filename string) {
        return
 }
 
-func (m *Manifest) segment() *segmentedManifest {
+func (m *Manifest) segment() (*segmentedManifest, error) {
        files := make(segmentedManifest)
 
        for stream := range m.StreamIter() {
                if stream.Err != nil {
-                       // Skip streams with errors
-                       continue
+                       // Stream has an error
+                       return nil, stream.Err
                }
                currentStreamfiles := make(map[string]bool)
                for _, f := range stream.FileStreamSegments {
@@ -343,69 +347,70 @@ func (m *Manifest) segment() *segmentedManifest {
                }
        }
 
-       return &files
+       return &files, nil
 }
 
 func (stream segmentedStream) normalizedText(name string) string {
        var sortedfiles []string
-       for k, _ := range stream {
+       for k := range stream {
                sortedfiles = append(sortedfiles, k)
        }
        sort.Strings(sortedfiles)
 
-       stream_tokens := []string{EscapeName(name)}
+       streamTokens := []string{EscapeName(name)}
 
-       blocks := make(map[string]int64)
+       blocks := make(map[blockdigest.BlockDigest]int64)
        var streamoffset int64
 
        // Go through each file and add each referenced block exactly once.
        for _, streamfile := range sortedfiles {
                for _, segment := range stream[streamfile] {
-                       if _, ok := blocks[segment.Locator]; !ok {
-                               stream_tokens = append(stream_tokens, segment.Locator)
-                               blocks[segment.Locator] = streamoffset
-                               b, _ := ParseBlockLocator(segment.Locator)
+                       b, _ := ParseBlockLocator(segment.Locator)
+                       if _, ok := blocks[b.Digest]; !ok {
+                               streamTokens = append(streamTokens, segment.Locator)
+                               blocks[b.Digest] = streamoffset
                                streamoffset += int64(b.Size)
                        }
                }
        }
 
-       if len(stream_tokens) == 1 {
-               stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
+       if len(streamTokens) == 1 {
+               streamTokens = append(streamTokens, "d41d8cd98f00b204e9800998ecf8427e+0")
        }
 
        for _, streamfile := range sortedfiles {
                // Add in file segments
-               span_start := int64(-1)
-               span_end := int64(0)
+               spanStart := int64(-1)
+               spanEnd := int64(0)
                fout := EscapeName(streamfile)
                for _, segment := range stream[streamfile] {
                        // Collapse adjacent segments
-                       streamoffset = blocks[segment.Locator] + int64(segment.Offset)
-                       if span_start == -1 {
-                               span_start = streamoffset
-                               span_end = streamoffset + int64(segment.Len)
+                       b, _ := ParseBlockLocator(segment.Locator)
+                       streamoffset = blocks[b.Digest] + int64(segment.Offset)
+                       if spanStart == -1 {
+                               spanStart = streamoffset
+                               spanEnd = streamoffset + int64(segment.Len)
                        } else {
-                               if streamoffset == span_end {
-                                       span_end += int64(segment.Len)
+                               if streamoffset == spanEnd {
+                                       spanEnd += int64(segment.Len)
                                } else {
-                                       stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
-                                       span_start = streamoffset
-                                       span_end = streamoffset + int64(segment.Len)
+                                       streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
+                                       spanStart = streamoffset
+                                       spanEnd = streamoffset + int64(segment.Len)
                                }
                        }
                }
 
-               if span_start != -1 {
-                       stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
+               if spanStart != -1 {
+                       streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
                }
 
                if len(stream[streamfile]) == 0 {
-                       stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
+                       streamTokens = append(streamTokens, fmt.Sprintf("0:0:%s", fout))
                }
        }
 
-       return strings.Join(stream_tokens, " ") + "\n"
+       return strings.Join(streamTokens, " ") + "\n"
 }
 
 func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
@@ -424,12 +429,12 @@ func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string
                filesegs, okfile := stream[filename]
                if okfile {
                        newstream := make(segmentedStream)
-                       relocate_stream, relocate_filename := splitPath(relocate)
-                       if relocate_filename == "" {
-                               relocate_filename = filename
+                       relocateStream, relocateFilename := splitPath(relocate)
+                       if relocateFilename == "" {
+                               relocateFilename = filename
                        }
-                       newstream[relocate_filename] = filesegs
-                       return newstream.normalizedText(relocate_stream)
+                       newstream[relocateFilename] = filesegs
+                       return newstream.normalizedText(relocateStream)
                }
        }
 
@@ -441,7 +446,7 @@ func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string
        }
 
        var sortedstreams []string
-       for k, _ := range m {
+       for k := range m {
                sortedstreams = append(sortedstreams, k)
        }
        sort.Strings(sortedstreams)
@@ -455,31 +460,36 @@ func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string
        return manifest
 }
 
-// ManifestTextForPath extracts some or all of the manifest and returns
-// normalized manifest text.  This is a swiss army knife function that can be
-// used a couple of different ways:
+// Extract extracts some or all of the manifest and returns the extracted
+// portion as a normalized manifest.  This is a swiss army knife function that
+// can be several ways:
+//
+// If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
+// in normalized form.
+//
+//   Extract(".", ".")  // return entire normalized manfest text
 //
 // If 'srcpath' points to a single file, it will return manifest text for just that file.
 // The value of "relocate" is can be used to rename the file or set the file stream.
 //
-// ManifestTextForPath("./foo", ".")  (extract file "foo" and put it in stream ".")
-// ManifestTextForPath("./foo", "./bar")  (extract file "foo", rename it to "bar" in stream ".")
-// ManifestTextForPath("./foo", "./bar/") (extract file "foo", rename it to "./bar/foo")
-// ManifestTextForPath("./foo", "./bar/baz") (extract file "foo", rename it to "./bar/baz")
+//   Extract("./foo", ".")          // extract file "foo" and put it in stream "."
+//   Extract("./foo", "./bar")      // extract file "foo", rename it to "bar" in stream "."
+//   Extract("./foo", "./bar/")     // extract file "foo", rename it to "./bar/foo"
+//   Extract("./foo", "./bar/baz")  // extract file "foo", rename it to "./bar/baz")
 //
 // Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
 // them under the path in "relocate".
 //
-// ManifestTextForPath(".", ".")  (return entire normalized manfest text)
-// ManifestTextForPath("./stream", ".")  (extract "./stream" to "." and "./stream/subdir" to "./subdir")
-// ManifestTextForPath("./stream", "./bar")  (extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
-func (m *Manifest) ManifestTextForPath(srcpath, relocate string) string {
-       return m.segment().manifestTextForPath(srcpath, relocate)
-}
-
-// NormalizedText returns the manifest text in normalized form.
-func (m *Manifest) NormalizedText() string {
-       return m.ManifestTextForPath(".", ".")
+//   Extract("./stream", ".")      // extract "./stream" to "." and "./stream/subdir" to "./subdir")
+//   Extract("./stream", "./bar")  // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
+func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
+       segmented, err := m.segment()
+       if err != nil {
+               ret.Err = err
+               return
+       }
+       ret.Text = segmented.manifestTextForPath(srcpath, relocate)
+       return
 }
 
 func (m *Manifest) StreamIter() <-chan ManifestStream {
@@ -519,6 +529,8 @@ func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
        return ch
 }
 
+// BlockIterWithDuplicates iterates over the block locators of a manifest.
+//
 // Blocks may appear multiple times within the same manifest if they
 // are used by multiple files. In that case this Iterator will output
 // the same block multiple times.