15699: Fix handling of streams with multiple refs to a block ID.
[arvados.git] / sdk / ruby / lib / arvados / collection.rb
index ec0f443daabaea6ce1062bb106b44844851a1387..796d1785aed99b346c16f4fbdd877af28378b3a8 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 require "arvados/keep"
 
 module Arv
@@ -14,8 +18,13 @@ module Arv
         loc_list = LocatorList.new(locators)
         file_specs.map { |s| manifest.split_file_token(s) }.
             each do |file_start, file_len, file_path|
-          @root.file_at(normalize_path(stream_root, file_path)).
-            add_segment(loc_list.segment(file_start, file_len))
+          begin
+            @root.file_at(normalize_path(stream_root, file_path)).
+              add_segment(loc_list.segment(file_start, file_len))
+          rescue Errno::ENOTDIR, Errno::EISDIR => error
+            raise ArgumentError.new("%p is both a stream and file" %
+                                    error.to_s.partition(" - ").last)
+          end
         end
       end
     end
@@ -39,10 +48,23 @@ module Arv
     end
 
     def cp_r(source, target, source_collection=nil)
-      opts = {descend_target: !source.end_with?("/")}
+      opts = {:descend_target => !source.end_with?("/")}
       copy(:merge, source.chomp("/"), target, source_collection, opts)
     end
 
+    def each_file_path(&block)
+      @root.each_file_path(&block)
+    end
+
+    def exist?(path)
+      begin
+        substream, item = find(path)
+        not (substream.leaf? or substream[item].nil?)
+      rescue Errno::ENOENT, Errno::ENOTDIR
+        false
+      end
+    end
+
     def rename(source, target)
       copy(:add_copy, source, target) { rm_r(source) }
     end
@@ -52,7 +74,7 @@ module Arv
     end
 
     def rm_r(source)
-      remove(source, recursive: true)
+      remove(source, :recursive => true)
     end
 
     protected
@@ -88,13 +110,19 @@ module Arv
       # is found and can be copied.
       source_collection = self if source_collection.nil?
       src_stream, src_tail = source_collection.find(source)
-      dst_stream, dst_tail = find(target)
+      dst_stream_path, _, dst_tail = normalize_path(target).rpartition("/")
+      if dst_stream_path.empty?
+        dst_stream, dst_tail = @root.find(dst_tail)
+        dst_tail ||= src_tail
+      else
+        dst_stream = @root.stream_at(dst_stream_path)
+        dst_tail = src_tail if dst_tail.empty?
+      end
       if (source_collection.equal?(self) and
           (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
         return self
       end
       src_item = src_stream[src_tail]
-      dst_tail ||= src_tail
       check_method = "check_can_#{copy_method}".to_sym
       target_name = nil
       if opts.fetch(:descend_target, true)
@@ -131,7 +159,7 @@ module Arv
       modified
     end
 
-    LocatorSegment = Struct.new(:locators, :start_pos, :length)
+    Struct.new("LocatorSegment", :locators, :start_pos, :length)
 
     class LocatorRange < Range
       attr_reader :locator
@@ -163,9 +191,9 @@ module Arv
           end_index = search_for_byte(start_pos + length - 1, start_index)
         end
         seg_ranges = @ranges[start_index..end_index]
-        LocatorSegment.new(seg_ranges.map(&:locator),
-                           start_pos - seg_ranges.first.begin,
-                           length)
+        Struct::LocatorSegment.new(seg_ranges.map(&:locator),
+                                   start_pos - seg_ranges.first.begin,
+                                   length)
       end
 
       private
@@ -179,7 +207,7 @@ module Arv
         loop do
           ii = (lo + hi) / 2
           range = @ranges[ii]
-          if range.include?(target)
+          if range.include?(target) && (target < range.end || ii == hi)
             return ii
           elsif ii == lo
             raise RangeError.new("%i not in segment" % target)
@@ -272,6 +300,17 @@ module Arv
         end
       end
 
+      def each_file_path
+        return to_enum(__method__) unless block_given?
+        items.each_value do |item|
+          if item.file?
+            yield item.path
+          else
+            item.each_file_path { |path| yield path }
+          end
+        end
+      end
+
       def find(find_path)
         # Given a POSIX-style path, return the CollectionStream that
         # contains the object at that path, and the name of the object
@@ -283,7 +322,7 @@ module Arv
 
       def stream_at(find_path)
         key, rest = find_path.split("/", 2)
-        next_stream = get_or_new(key, CollectionStream)
+        next_stream = get_or_new(key, CollectionStream, Errno::ENOTDIR)
         if rest.nil?
           next_stream
         else
@@ -294,7 +333,7 @@ module Arv
       def file_at(find_path)
         stream_path, _, file_name = find_path.rpartition("/")
         if stream_path.empty?
-          get_or_new(file_name, CollectionFile)
+          get_or_new(file_name, CollectionFile, Errno::EISDIR)
         else
           stream_at(stream_path).file_at(file_name)
         end
@@ -330,7 +369,11 @@ module Arv
       end
 
       def add_copy(src_item, key)
-        self[key] = src_item.copy_named("#{path}/#{key}")
+        if key == "."
+          self[key] = src_item.copy_named("#{path}")
+        else
+          self[key] = src_item.copy_named("#{path}/#{key}")
+        end
       end
 
       def merge(src_item, key)
@@ -377,17 +420,15 @@ module Arv
         items[key] = item
       end
 
-      def get_or_new(key, klass)
+      def get_or_new(key, klass, err_class)
         # Return the collection item at `key` and ensure that it's a `klass`.
         # If `key` does not exist, create a new `klass` there.
-        # If the value for `key` is not a `klass`, raise an ArgumentError.
+        # If the value for `key` is not a `klass`, raise an `err_class`.
         item = items[key]
         if item.nil?
           self[key] = klass.new("#{path}/#{key}")
         elsif not item.is_a?(klass)
-          raise ArgumentError.
-            new("in stream %p, %p is a %s, not a %s" %
-                [path, key, items[key].class.human_name, klass.human_name])
+          raise err_class.new(item.path)
         else
           item
         end
@@ -420,6 +461,10 @@ module Arv
         items["."] = CollectionStream.new(".")
       end
 
+      def add_copy(src_item, key)
+        items["."].add_copy(src_item, key)
+      end
+
       def raise_root_write_error(key)
         raise ArgumentError.new("can't write to %p at collection root" % key)
       end
@@ -436,14 +481,13 @@ module Arv
 
       def initialize(name)
         @name = name
-        @loc_ranges = {}
+        @loc_ranges = []
         @loc_range_start = 0
         @file_specs = []
       end
 
       def add_file(coll_file)
         coll_file.each_segment do |segment|
-          extend_locator_ranges(segment.locators)
           extend_file_specs(coll_file.name, segment)
         end
       end
@@ -453,48 +497,53 @@ module Arv
           ""
         else
           "%s %s %s\n" % [escape_name(@name),
-                          @loc_ranges.keys.join(" "),
+                          @loc_ranges.collect(&:locator).join(" "),
                           @file_specs.join(" ")]
         end
       end
 
       private
 
-      def extend_locator_ranges(locators)
-        locators.
-            select { |loc_s| not @loc_ranges.include?(loc_s) }.
-            each do |loc_s|
-          @loc_ranges[loc_s] = LocatorRange.new(loc_s, @loc_range_start)
-          @loc_range_start = @loc_ranges[loc_s].end
+      def extend_file_specs(filename, segment)
+        found_overlap = false
+        # Find the longest prefix of segment.locators that's a suffix
+        # of the existing @loc_ranges. If we find one, drop those
+        # locators (they'll be added back below, when we're handling
+        # the normal/no-overlap case).
+        (1..segment.locators.length).each do |overlap|
+          if @loc_ranges.length >= overlap && @loc_ranges[-overlap..-1].collect(&:locator) == segment.locators[0..overlap-1]
+            (1..overlap).each do
+              discarded = @loc_ranges.pop
+              @loc_range_start -= (discarded.end - discarded.begin)
+            end
+            found_overlap = true
+            break
+          end
         end
-      end
 
-      def extend_file_specs(filename, segment)
-        # Given a filename and a LocatorSegment, add the smallest
-        # possible array of file spec strings to @file_specs that
-        # builds the file from available locators.
-        filename = escape_name(filename)
-        start_pos = segment.start_pos
-        length = segment.length
-        start_loc = segment.locators.first
-        prev_loc = start_loc
-        # Build a list of file specs by iterating through the segment's
-        # locators and preparing a file spec for each contiguous range.
-        segment.locators[1..-1].each do |loc_s|
-          range = @loc_ranges[loc_s]
-          if range.begin != @loc_ranges[prev_loc].end
-            range_start, range_length =
-              start_and_length_at(start_loc, prev_loc, start_pos, length)
-            @file_specs << "#{range_start}:#{range_length}:#{filename}"
-            start_pos = 0
-            length -= range_length
-            start_loc = loc_s
+        # If there was no overlap at the end of our existing
+        # @loc_ranges, check whether the full set of segment.locators
+        # appears earlier in @loc_ranges. If so, use those instead of
+        # appending the same locators again.
+        if !found_overlap && segment.locators.length < @loc_ranges.length
+          segment_start = 0
+          (0..@loc_ranges.length-1).each do |ri|
+            if @loc_ranges[ri..ri+segment.locators.length-1].collect(&:locator) == segment.locators
+              @file_specs << "#{segment.start_pos + @loc_ranges[ri].begin}:#{segment.length}:#{escape_name(filename)}"
+              return
+            end
           end
-          prev_loc = loc_s
         end
-        range_start, range_length =
-          start_and_length_at(start_loc, prev_loc, start_pos, length)
-        @file_specs << "#{range_start}:#{range_length}:#{filename}"
+
+        segment_start = @loc_range_start
+        segment_end = segment_start
+        segment.locators.each do |loc_s|
+          r = LocatorRange.new(loc_s, @loc_range_start)
+          @loc_ranges << r
+          @loc_range_start = r.end
+          segment_end += (r.end - r.begin)
+        end
+        @file_specs << "#{segment.start_pos + segment_start}:#{segment.length}:#{escape_name(filename)}"
       end
 
       def escape_name(name)
@@ -502,12 +551,6 @@ module Arv
           s.each_byte.map { |c| "\\%03o" % c }.join("")
         end
       end
-
-      def start_and_length_at(start_key, end_key, start_pos, length)
-        range_begin = @loc_ranges[start_key].begin + start_pos
-        range_length = [@loc_ranges[end_key].end - range_begin, length].min
-        [range_begin, range_length]
-      end
     end
   end
 end