1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
9 def initialize(manifest_text="")
10 @manifest_text = manifest_text
12 @root = CollectionRoot.new
13 manifest = Keep::Manifest.new(manifest_text)
14 manifest.each_line do |stream_root, locators, file_specs|
15 if stream_root.empty? or locators.empty? or file_specs.empty?
16 raise ArgumentError.new("manifest text includes malformed line")
18 loc_list = LocatorList.new(locators)
19 file_specs.map { |s| manifest.split_file_token(s) }.
20 each do |file_start, file_len, file_path|
22 @root.file_at(normalize_path(stream_root, file_path)).
23 add_segment(loc_list.segment(file_start, file_len))
24 rescue Errno::ENOTDIR, Errno::EISDIR => error
25 raise ArgumentError.new("%p is both a stream and file" %
26 error.to_s.partition(" - ").last)
33 @manifest_text ||= @root.manifest_text
46 @manifest_text = @root.manifest_text
50 def cp_r(source, target, source_collection=nil)
51 opts = {:descend_target => !source.end_with?("/")}
52 copy(:merge, source.chomp("/"), target, source_collection, opts)
55 def each_file_path(&block)
56 @root.each_file_path(&block)
61 substream, item = find(path)
62 not (substream.leaf? or substream[item].nil?)
63 rescue Errno::ENOENT, Errno::ENOTDIR
68 def rename(source, target)
69 copy(:add_copy, source, target) { rm_r(source) }
77 remove(source, :recursive => true)
83 @root.find(normalize_path(*parts))
94 def normalize_path(*parts)
95 path = File.join(*parts)
97 raise ArgumentError.new("empty path")
98 elsif (path == ".") or path.start_with?("./")
105 def copy(copy_method, source, target, source_collection=nil, opts={})
106 # Find the item at path `source` in `source_collection`, find the
107 # destination stream at path `target`, and use `copy_method` to copy
108 # the found object there. If a block is passed in, it will be called
109 # right before we do the actual copy, after we confirm that everything
110 # is found and can be copied.
111 source_collection = self if source_collection.nil?
112 src_stream, src_tail = source_collection.find(source)
113 dst_stream_path, _, dst_tail = normalize_path(target).rpartition("/")
114 if dst_stream_path.empty?
115 dst_stream, dst_tail = @root.find(dst_tail)
116 dst_tail ||= src_tail
118 dst_stream = @root.stream_at(dst_stream_path)
119 dst_tail = src_tail if dst_tail.empty?
121 if (source_collection.equal?(self) and
122 (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
125 src_item = src_stream[src_tail]
126 check_method = "check_can_#{copy_method}".to_sym
128 if opts.fetch(:descend_target, true)
130 # Find out if `target` refers to a stream we should copy into.
131 tail_stream = dst_stream[dst_tail]
132 tail_stream.send(check_method, src_item, src_tail)
133 # Yes it does. Copy the item at `source` into it with the same name.
134 dst_stream = tail_stream
135 target_name = src_tail
136 rescue Errno::ENOENT, Errno::ENOTDIR
137 # It does not. We'll fall back to writing to `target` below.
141 dst_stream.send(check_method, src_item, dst_tail)
142 target_name = dst_tail
144 # At this point, we know the operation will work. Call any block as
148 # Re-find the destination stream, in case the block removed
149 # the original (that's how rename is implemented).
150 dst_stream = @root.stream_at(dst_stream.path)
152 dst_stream.send(copy_method, src_item, target_name)
156 def remove(path, opts={})
157 stream, name = find(path)
158 stream.delete(name, opts)
162 Struct.new("LocatorSegment", :locators, :start_pos, :length)
164 class LocatorRange < Range
167 def initialize(loc_s, start)
169 range_end = start + Keep::Locator.parse(loc_s).size.to_i
170 super(start, range_end, false)
175 # LocatorList efficiently builds LocatorSegments from a stream manifest.
176 def initialize(locators)
178 @ranges = locators.map do |loc_s|
179 new_range = LocatorRange.new(loc_s, next_start)
180 next_start = new_range.end
185 def segment(start_pos, length)
186 # Return a LocatorSegment that captures `length` bytes from `start_pos`.
187 start_index = search_for_byte(start_pos)
189 end_index = start_index
191 end_index = search_for_byte(start_pos + length - 1, start_index)
193 seg_ranges = @ranges[start_index..end_index]
194 Struct::LocatorSegment.new(seg_ranges.map(&:locator),
195 start_pos - seg_ranges.first.begin,
201 def search_for_byte(target, start_index=0)
202 # Do a binary search for byte `target` in the list of locators,
203 # starting from `start_index`. Return the index of the range in
204 # @ranges that contains the byte.
210 if range.include?(target) && (target < range.end || ii == hi-1)
213 raise RangeError.new("%i not in segment" % target)
214 elsif target < range.begin
224 attr_reader :path, :name
228 @name = File.basename(path)
232 class CollectionFile < CollectionItem
250 def add_segment(segment)
254 def each_segment(&block)
255 @segments.each(&block)
258 def check_can_add_copy(src_item, name)
259 raise Errno::ENOTDIR.new(path)
262 alias_method :check_can_merge, :check_can_add_copy
264 def copy_named(copy_path)
265 copy = self.class.new(copy_path)
266 each_segment { |segment| copy.add_segment(segment) }
271 class CollectionStream < CollectionItem
291 raise Errno::ENOENT.new("%p not found in %p" % [key, path])
294 def delete(name, opts={})
296 if item.file? or opts[:recursive]
299 raise Errno::EISDIR.new(path)
304 return to_enum(__method__) unless block_given?
305 items.each_value do |item|
309 item.each_file_path { |path| yield path }
315 # Given a POSIX-style path, return the CollectionStream that
316 # contains the object at that path, and the name of the object
318 components = find_path.split("/")
319 tail = components.pop
320 [components.reduce(self, :[]), tail]
323 def stream_at(find_path)
324 key, rest = find_path.split("/", 2)
325 next_stream = get_or_new(key, CollectionStream, Errno::ENOTDIR)
329 next_stream.stream_at(rest)
333 def file_at(find_path)
334 stream_path, _, file_name = find_path.rpartition("/")
335 if stream_path.empty?
336 get_or_new(file_name, CollectionFile, Errno::EISDIR)
338 stream_at(stream_path).file_at(file_name)
343 # Return a string with the normalized manifest text for this stream,
344 # including all substreams.
345 file_keys, stream_keys = items.keys.sort.partition do |key|
348 my_line = StreamManifest.new(path)
349 file_keys.each do |file_name|
350 my_line.add_file(items[file_name])
352 sub_lines = stream_keys.map do |sub_name|
353 items[sub_name].manifest_text
355 my_line.to_s + sub_lines.join("")
358 def check_can_add_copy(src_item, key)
359 if existing = check_can_merge(src_item, key) and not existing.leaf?
360 raise Errno::ENOTEMPTY.new(existing.path)
364 def check_can_merge(src_item, key)
365 if existing = items[key] and (existing.class != src_item.class)
366 raise Errno::ENOTDIR.new(existing.path)
371 def add_copy(src_item, key)
373 self[key] = src_item.copy_named("#{path}")
375 self[key] = src_item.copy_named("#{path}/#{key}")
379 def merge(src_item, key)
380 # Do a recursive copy of the collection item `src_item` to destination
381 # `key`. If a simple copy is safe, do that; otherwise, recursively
382 # merge the contents of the stream `src_item` into the stream at
385 check_can_add_copy(src_item, key)
386 add_copy(src_item, key)
387 rescue Errno::ENOTEMPTY
390 # Copy as much as possible, then raise any error encountered.
391 # Start with streams for a depth-first merge.
392 src_items = src_item.items.each_pair.sort_by do |_, sub_item|
393 (sub_item.file?) ? 1 : 0
395 src_items.each do |sub_key, sub_item|
397 dest.merge(sub_item, sub_key)
398 rescue Errno::ENOTDIR => error
401 raise error unless error.nil?
405 def copy_named(copy_path)
406 copy = self.class.new(copy_path)
407 items.each_pair do |key, item|
408 copy.add_copy(item, key)
423 def get_or_new(key, klass, err_class)
424 # Return the collection item at `key` and ensure that it's a `klass`.
425 # If `key` does not exist, create a new `klass` there.
426 # If the value for `key` is not a `klass`, raise an `err_class`.
429 self[key] = klass.new("#{path}/#{key}")
430 elsif not item.is_a?(klass)
431 raise err_class.new(item.path)
438 class CollectionRoot < CollectionStream
444 def delete(name, opts={})
446 # If that didn't fail, it deleted the . stream. Recreate it.
450 def check_can_merge(src_item, key)
451 if items.include?(key)
454 raise_root_write_error(key)
461 items["."] = CollectionStream.new(".")
464 def add_copy(src_item, key)
465 items["."].add_copy(src_item, key)
468 def raise_root_write_error(key)
469 raise ArgumentError.new("can't write to %p at collection root" % key)
473 raise_root_write_error(key)
478 # Build a manifest text for a single stream, without substreams.
479 # The manifest includes files in the order they're added. If you want
480 # a normalized manifest, add files in lexical order by name.
489 def add_file(coll_file)
490 coll_file.each_segment do |segment|
491 extend_file_specs(coll_file.name, segment)
496 if @file_specs.empty?
499 "%s %s %s\n" % [escape_name(@name),
500 @loc_ranges.collect(&:locator).join(" "),
501 @file_specs.join(" ")]
507 def extend_file_specs(filename, segment)
508 found_overlap = false
509 # Find the longest prefix of segment.locators that's a suffix
510 # of the existing @loc_ranges. If we find one, drop those
511 # locators (they'll be added back below, when we're handling
512 # the normal/no-overlap case).
513 (1..segment.locators.length).each do |overlap|
514 if @loc_ranges.length >= overlap && @loc_ranges[-overlap..-1].collect(&:locator) == segment.locators[0..overlap-1]
516 discarded = @loc_ranges.pop
517 @loc_range_start -= (discarded.end - discarded.begin)
524 # If there was no overlap at the end of our existing
525 # @loc_ranges, check whether the full set of segment.locators
526 # appears earlier in @loc_ranges. If so, use those instead of
527 # appending the same locators again.
528 if !found_overlap && segment.locators.length < @loc_ranges.length
530 (0..@loc_ranges.length-1).each do |ri|
531 if @loc_ranges[ri..ri+segment.locators.length-1].collect(&:locator) == segment.locators
532 @file_specs << "#{segment.start_pos + @loc_ranges[ri].begin}:#{segment.length}:#{escape_name(filename)}"
538 segment_start = @loc_range_start
539 segment.locators.each do |loc_s|
540 r = LocatorRange.new(loc_s, @loc_range_start)
542 @loc_range_start = r.end
544 @file_specs << "#{segment.start_pos + segment_start}:#{segment.length}:#{escape_name(filename)}"
547 def escape_name(name)
548 name.gsub(/\\/, "\\\\\\\\").gsub(/\s/) do |s|
549 s.each_byte.map { |c| "\\%03o" % c }.join("")