5 def initialize(manifest_text="")
6 @manifest_text = manifest_text
8 @root = CollectionRoot.new
9 manifest = Keep::Manifest.new(manifest_text)
10 manifest.each_line do |stream_root, locators, file_specs|
11 if stream_root.empty? or locators.empty? or file_specs.empty?
12 raise ArgumentError.new("manifest text includes malformed line")
14 loc_list = LocatorList.new(locators)
15 file_specs.map { |s| manifest.split_file_token(s) }.
16 each do |file_start, file_len, file_path|
18 @root.file_at(normalize_path(stream_root, file_path)).
19 add_segment(loc_list.segment(file_start, file_len))
20 rescue Errno::ENOTDIR, Errno::EISDIR => error
21 raise ArgumentError.new("%p is both a stream and file" %
22 error.to_s.partition(" - ").last)
29 @manifest_text ||= @root.manifest_text
42 @manifest_text = @root.manifest_text
46 def cp_r(source, target, source_collection=nil)
47 opts = {:descend_target => !source.end_with?("/")}
48 copy(:merge, source.chomp("/"), target, source_collection, opts)
51 def each_file_path(&block)
52 @root.each_file_path(&block)
57 substream, item = find(path)
58 not (substream.leaf? or substream[item].nil?)
59 rescue Errno::ENOENT, Errno::ENOTDIR
64 def rename(source, target)
65 copy(:add_copy, source, target) { rm_r(source) }
73 remove(source, :recursive => true)
79 @root.find(normalize_path(*parts))
90 def normalize_path(*parts)
91 path = File.join(*parts)
93 raise ArgumentError.new("empty path")
94 elsif (path == ".") or path.start_with?("./")
101 def copy(copy_method, source, target, source_collection=nil, opts={})
102 # Find the item at path `source` in `source_collection`, find the
103 # destination stream at path `target`, and use `copy_method` to copy
104 # the found object there. If a block is passed in, it will be called
105 # right before we do the actual copy, after we confirm that everything
106 # is found and can be copied.
107 source_collection = self if source_collection.nil?
108 src_stream, src_tail = source_collection.find(source)
109 dst_stream_path, _, dst_tail = normalize_path(target).rpartition("/")
110 if dst_stream_path.empty?
111 dst_stream, dst_tail = @root.find(dst_tail)
112 dst_tail ||= src_tail
114 dst_stream = @root.stream_at(dst_stream_path)
115 dst_tail = src_tail if dst_tail.empty?
117 if (source_collection.equal?(self) and
118 (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
121 src_item = src_stream[src_tail]
122 check_method = "check_can_#{copy_method}".to_sym
124 if opts.fetch(:descend_target, true)
126 # Find out if `target` refers to a stream we should copy into.
127 tail_stream = dst_stream[dst_tail]
128 tail_stream.send(check_method, src_item, src_tail)
129 # Yes it does. Copy the item at `source` into it with the same name.
130 dst_stream = tail_stream
131 target_name = src_tail
132 rescue Errno::ENOENT, Errno::ENOTDIR
133 # It does not. We'll fall back to writing to `target` below.
137 dst_stream.send(check_method, src_item, dst_tail)
138 target_name = dst_tail
140 # At this point, we know the operation will work. Call any block as
144 # Re-find the destination stream, in case the block removed
145 # the original (that's how rename is implemented).
146 dst_stream = @root.stream_at(dst_stream.path)
148 dst_stream.send(copy_method, src_item, target_name)
152 def remove(path, opts={})
153 stream, name = find(path)
154 stream.delete(name, opts)
158 Struct.new("LocatorSegment", :locators, :start_pos, :length)
160 class LocatorRange < Range
163 def initialize(loc_s, start)
165 range_end = start + Keep::Locator.parse(loc_s).size.to_i
166 super(start, range_end, false)
171 # LocatorList efficiently builds LocatorSegments from a stream manifest.
172 def initialize(locators)
174 @ranges = locators.map do |loc_s|
175 new_range = LocatorRange.new(loc_s, next_start)
176 next_start = new_range.end
181 def segment(start_pos, length)
182 # Return a LocatorSegment that captures `length` bytes from `start_pos`.
183 start_index = search_for_byte(start_pos)
185 end_index = start_index
187 end_index = search_for_byte(start_pos + length - 1, start_index)
189 seg_ranges = @ranges[start_index..end_index]
190 Struct::LocatorSegment.new(seg_ranges.map(&:locator),
191 start_pos - seg_ranges.first.begin,
197 def search_for_byte(target, start_index=0)
198 # Do a binary search for byte `target` in the list of locators,
199 # starting from `start_index`. Return the index of the range in
200 # @ranges that contains the byte.
206 if range.include?(target)
209 raise RangeError.new("%i not in segment" % target)
210 elsif target < range.begin
220 attr_reader :path, :name
224 @name = File.basename(path)
228 class CollectionFile < CollectionItem
246 def add_segment(segment)
250 def each_segment(&block)
251 @segments.each(&block)
254 def check_can_add_copy(src_item, name)
255 raise Errno::ENOTDIR.new(path)
258 alias_method :check_can_merge, :check_can_add_copy
260 def copy_named(copy_path)
261 copy = self.class.new(copy_path)
262 each_segment { |segment| copy.add_segment(segment) }
267 class CollectionStream < CollectionItem
287 raise Errno::ENOENT.new("%p not found in %p" % [key, path])
290 def delete(name, opts={})
292 if item.file? or opts[:recursive]
295 raise Errno::EISDIR.new(path)
300 return to_enum(__method__) unless block_given?
301 items.each_value do |item|
305 item.each_file_path { |path| yield path }
311 # Given a POSIX-style path, return the CollectionStream that
312 # contains the object at that path, and the name of the object
314 components = find_path.split("/")
315 tail = components.pop
316 [components.reduce(self, :[]), tail]
319 def stream_at(find_path)
320 key, rest = find_path.split("/", 2)
321 next_stream = get_or_new(key, CollectionStream, Errno::ENOTDIR)
325 next_stream.stream_at(rest)
329 def file_at(find_path)
330 stream_path, _, file_name = find_path.rpartition("/")
331 if stream_path.empty?
332 get_or_new(file_name, CollectionFile, Errno::EISDIR)
334 stream_at(stream_path).file_at(file_name)
339 # Return a string with the normalized manifest text for this stream,
340 # including all substreams.
341 file_keys, stream_keys = items.keys.sort.partition do |key|
344 my_line = StreamManifest.new(path)
345 file_keys.each do |file_name|
346 my_line.add_file(items[file_name])
348 sub_lines = stream_keys.map do |sub_name|
349 items[sub_name].manifest_text
351 my_line.to_s + sub_lines.join("")
354 def check_can_add_copy(src_item, key)
355 if existing = check_can_merge(src_item, key) and not existing.leaf?
356 raise Errno::ENOTEMPTY.new(existing.path)
360 def check_can_merge(src_item, key)
361 if existing = items[key] and (existing.class != src_item.class)
362 raise Errno::ENOTDIR.new(existing.path)
367 def add_copy(src_item, key)
368 self[key] = src_item.copy_named("#{path}/#{key}")
371 def merge(src_item, key)
372 # Do a recursive copy of the collection item `src_item` to destination
373 # `key`. If a simple copy is safe, do that; otherwise, recursively
374 # merge the contents of the stream `src_item` into the stream at
377 check_can_add_copy(src_item, key)
378 add_copy(src_item, key)
379 rescue Errno::ENOTEMPTY
382 # Copy as much as possible, then raise any error encountered.
383 # Start with streams for a depth-first merge.
384 src_items = src_item.items.each_pair.sort_by do |_, sub_item|
385 (sub_item.file?) ? 1 : 0
387 src_items.each do |sub_key, sub_item|
389 dest.merge(sub_item, sub_key)
390 rescue Errno::ENOTDIR => error
393 raise error unless error.nil?
397 def copy_named(copy_path)
398 copy = self.class.new(copy_path)
399 items.each_pair do |key, item|
400 copy.add_copy(item, key)
415 def get_or_new(key, klass, err_class)
416 # Return the collection item at `key` and ensure that it's a `klass`.
417 # If `key` does not exist, create a new `klass` there.
418 # If the value for `key` is not a `klass`, raise an `err_class`.
421 self[key] = klass.new("#{path}/#{key}")
422 elsif not item.is_a?(klass)
423 raise err_class.new(item.path)
430 class CollectionRoot < CollectionStream
436 def delete(name, opts={})
438 # If that didn't fail, it deleted the . stream. Recreate it.
442 def check_can_merge(src_item, key)
443 if items.include?(key)
446 raise_root_write_error(key)
453 items["."] = CollectionStream.new(".")
456 def raise_root_write_error(key)
457 raise ArgumentError.new("can't write to %p at collection root" % key)
461 raise_root_write_error(key)
466 # Build a manifest text for a single stream, without substreams.
467 # The manifest includes files in the order they're added. If you want
468 # a normalized manifest, add files in lexical order by name.
477 def add_file(coll_file)
478 coll_file.each_segment do |segment|
479 extend_locator_ranges(segment.locators)
480 extend_file_specs(coll_file.name, segment)
485 if @file_specs.empty?
488 "%s %s %s\n" % [escape_name(@name),
489 @loc_ranges.keys.join(" "),
490 @file_specs.join(" ")]
496 def extend_locator_ranges(locators)
498 select { |loc_s| not @loc_ranges.include?(loc_s) }.
500 @loc_ranges[loc_s] = LocatorRange.new(loc_s, @loc_range_start)
501 @loc_range_start = @loc_ranges[loc_s].end
505 def extend_file_specs(filename, segment)
506 # Given a filename and a LocatorSegment, add the smallest
507 # possible array of file spec strings to @file_specs that
508 # builds the file from available locators.
509 filename = escape_name(filename)
510 start_pos = segment.start_pos
511 length = segment.length
512 start_loc = segment.locators.first
514 # Build a list of file specs by iterating through the segment's
515 # locators and preparing a file spec for each contiguous range.
516 segment.locators[1..-1].each do |loc_s|
517 range = @loc_ranges[loc_s]
518 if range.begin != @loc_ranges[prev_loc].end
519 range_start, range_length =
520 start_and_length_at(start_loc, prev_loc, start_pos, length)
521 @file_specs << "#{range_start}:#{range_length}:#{filename}"
523 length -= range_length
528 range_start, range_length =
529 start_and_length_at(start_loc, prev_loc, start_pos, length)
530 @file_specs << "#{range_start}:#{range_length}:#{filename}"
533 def escape_name(name)
534 name.gsub(/\\/, "\\\\\\\\").gsub(/\s/) do |s|
535 s.each_byte.map { |c| "\\%03o" % c }.join("")
539 def start_and_length_at(start_key, end_key, start_pos, length)
540 range_begin = @loc_ranges[start_key].begin + start_pos
541 range_length = [@loc_ranges[end_key].end - range_begin, length].min
542 [range_begin, range_length]