15699: Fix handling of streams with multiple refs to a block ID.
[arvados.git] / sdk / ruby / lib / arvados / collection.rb
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 require "arvados/keep"
6
7 module Arv
8   class Collection
9     def initialize(manifest_text="")
10       @manifest_text = manifest_text
11       @modified = false
12       @root = CollectionRoot.new
13       manifest = Keep::Manifest.new(manifest_text)
14       manifest.each_line do |stream_root, locators, file_specs|
15         if stream_root.empty? or locators.empty? or file_specs.empty?
16           raise ArgumentError.new("manifest text includes malformed line")
17         end
18         loc_list = LocatorList.new(locators)
19         file_specs.map { |s| manifest.split_file_token(s) }.
20             each do |file_start, file_len, file_path|
21           begin
22             @root.file_at(normalize_path(stream_root, file_path)).
23               add_segment(loc_list.segment(file_start, file_len))
24           rescue Errno::ENOTDIR, Errno::EISDIR => error
25             raise ArgumentError.new("%p is both a stream and file" %
26                                     error.to_s.partition(" - ").last)
27           end
28         end
29       end
30     end
31
32     def manifest_text
33       @manifest_text ||= @root.manifest_text
34     end
35
36     def modified?
37       @modified
38     end
39
40     def unmodified
41       @modified = false
42       self
43     end
44
45     def normalize
46       @manifest_text = @root.manifest_text
47       self
48     end
49
50     def cp_r(source, target, source_collection=nil)
51       opts = {:descend_target => !source.end_with?("/")}
52       copy(:merge, source.chomp("/"), target, source_collection, opts)
53     end
54
55     def each_file_path(&block)
56       @root.each_file_path(&block)
57     end
58
59     def exist?(path)
60       begin
61         substream, item = find(path)
62         not (substream.leaf? or substream[item].nil?)
63       rescue Errno::ENOENT, Errno::ENOTDIR
64         false
65       end
66     end
67
68     def rename(source, target)
69       copy(:add_copy, source, target) { rm_r(source) }
70     end
71
72     def rm(source)
73       remove(source)
74     end
75
76     def rm_r(source)
77       remove(source, :recursive => true)
78     end
79
80     protected
81
82     def find(*parts)
83       @root.find(normalize_path(*parts))
84     end
85
86     private
87
88     def modified
89       @manifest_text = nil
90       @modified = true
91       self
92     end
93
94     def normalize_path(*parts)
95       path = File.join(*parts)
96       if path.empty?
97         raise ArgumentError.new("empty path")
98       elsif (path == ".") or path.start_with?("./")
99         path
100       else
101         "./#{path}"
102       end
103     end
104
105     def copy(copy_method, source, target, source_collection=nil, opts={})
106       # Find the item at path `source` in `source_collection`, find the
107       # destination stream at path `target`, and use `copy_method` to copy
108       # the found object there.  If a block is passed in, it will be called
109       # right before we do the actual copy, after we confirm that everything
110       # is found and can be copied.
111       source_collection = self if source_collection.nil?
112       src_stream, src_tail = source_collection.find(source)
113       dst_stream_path, _, dst_tail = normalize_path(target).rpartition("/")
114       if dst_stream_path.empty?
115         dst_stream, dst_tail = @root.find(dst_tail)
116         dst_tail ||= src_tail
117       else
118         dst_stream = @root.stream_at(dst_stream_path)
119         dst_tail = src_tail if dst_tail.empty?
120       end
121       if (source_collection.equal?(self) and
122           (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
123         return self
124       end
125       src_item = src_stream[src_tail]
126       check_method = "check_can_#{copy_method}".to_sym
127       target_name = nil
128       if opts.fetch(:descend_target, true)
129         begin
130           # Find out if `target` refers to a stream we should copy into.
131           tail_stream = dst_stream[dst_tail]
132           tail_stream.send(check_method, src_item, src_tail)
133           # Yes it does.  Copy the item at `source` into it with the same name.
134           dst_stream = tail_stream
135           target_name = src_tail
136         rescue Errno::ENOENT, Errno::ENOTDIR
137           # It does not.  We'll fall back to writing to `target` below.
138         end
139       end
140       if target_name.nil?
141         dst_stream.send(check_method, src_item, dst_tail)
142         target_name = dst_tail
143       end
144       # At this point, we know the operation will work.  Call any block as
145       # a pre-copy hook.
146       if block_given?
147         yield
148         # Re-find the destination stream, in case the block removed
149         # the original (that's how rename is implemented).
150         dst_stream = @root.stream_at(dst_stream.path)
151       end
152       dst_stream.send(copy_method, src_item, target_name)
153       modified
154     end
155
156     def remove(path, opts={})
157       stream, name = find(path)
158       stream.delete(name, opts)
159       modified
160     end
161
162     Struct.new("LocatorSegment", :locators, :start_pos, :length)
163
164     class LocatorRange < Range
165       attr_reader :locator
166
167       def initialize(loc_s, start)
168         @locator = loc_s
169         range_end = start + Keep::Locator.parse(loc_s).size.to_i
170         super(start, range_end, false)
171       end
172     end
173
174     class LocatorList
175       # LocatorList efficiently builds LocatorSegments from a stream manifest.
176       def initialize(locators)
177         next_start = 0
178         @ranges = locators.map do |loc_s|
179           new_range = LocatorRange.new(loc_s, next_start)
180           next_start = new_range.end
181           new_range
182         end
183       end
184
185       def segment(start_pos, length)
186         # Return a LocatorSegment that captures `length` bytes from `start_pos`.
187         start_index = search_for_byte(start_pos)
188         if length == 0
189           end_index = start_index
190         else
191           end_index = search_for_byte(start_pos + length - 1, start_index)
192         end
193         seg_ranges = @ranges[start_index..end_index]
194         Struct::LocatorSegment.new(seg_ranges.map(&:locator),
195                                    start_pos - seg_ranges.first.begin,
196                                    length)
197       end
198
199       private
200
201       def search_for_byte(target, start_index=0)
202         # Do a binary search for byte `target` in the list of locators,
203         # starting from `start_index`.  Return the index of the range in
204         # @ranges that contains the byte.
205         lo = start_index
206         hi = @ranges.size
207         loop do
208           ii = (lo + hi) / 2
209           range = @ranges[ii]
210           if range.include?(target) && (target < range.end || ii == hi)
211             return ii
212           elsif ii == lo
213             raise RangeError.new("%i not in segment" % target)
214           elsif target < range.begin
215             hi = ii
216           else
217             lo = ii
218           end
219         end
220       end
221     end
222
223     class CollectionItem
224       attr_reader :path, :name
225
226       def initialize(path)
227         @path = path
228         @name = File.basename(path)
229       end
230     end
231
232     class CollectionFile < CollectionItem
233       def initialize(path)
234         super
235         @segments = []
236       end
237
238       def self.human_name
239         "file"
240       end
241
242       def file?
243         true
244       end
245
246       def leaf?
247         true
248       end
249
250       def add_segment(segment)
251         @segments << segment
252       end
253
254       def each_segment(&block)
255         @segments.each(&block)
256       end
257
258       def check_can_add_copy(src_item, name)
259         raise Errno::ENOTDIR.new(path)
260       end
261
262       alias_method :check_can_merge, :check_can_add_copy
263
264       def copy_named(copy_path)
265         copy = self.class.new(copy_path)
266         each_segment { |segment| copy.add_segment(segment) }
267         copy
268       end
269     end
270
271     class CollectionStream < CollectionItem
272       def initialize(path)
273         super
274         @items = {}
275       end
276
277       def self.human_name
278         "stream"
279       end
280
281       def file?
282         false
283       end
284
285       def leaf?
286         items.empty?
287       end
288
289       def [](key)
290         items[key] or
291           raise Errno::ENOENT.new("%p not found in %p" % [key, path])
292       end
293
294       def delete(name, opts={})
295         item = self[name]
296         if item.file? or opts[:recursive]
297           items.delete(name)
298         else
299           raise Errno::EISDIR.new(path)
300         end
301       end
302
303       def each_file_path
304         return to_enum(__method__) unless block_given?
305         items.each_value do |item|
306           if item.file?
307             yield item.path
308           else
309             item.each_file_path { |path| yield path }
310           end
311         end
312       end
313
314       def find(find_path)
315         # Given a POSIX-style path, return the CollectionStream that
316         # contains the object at that path, and the name of the object
317         # inside it.
318         components = find_path.split("/")
319         tail = components.pop
320         [components.reduce(self, :[]), tail]
321       end
322
323       def stream_at(find_path)
324         key, rest = find_path.split("/", 2)
325         next_stream = get_or_new(key, CollectionStream, Errno::ENOTDIR)
326         if rest.nil?
327           next_stream
328         else
329           next_stream.stream_at(rest)
330         end
331       end
332
333       def file_at(find_path)
334         stream_path, _, file_name = find_path.rpartition("/")
335         if stream_path.empty?
336           get_or_new(file_name, CollectionFile, Errno::EISDIR)
337         else
338           stream_at(stream_path).file_at(file_name)
339         end
340       end
341
342       def manifest_text
343         # Return a string with the normalized manifest text for this stream,
344         # including all substreams.
345         file_keys, stream_keys = items.keys.sort.partition do |key|
346           items[key].file?
347         end
348         my_line = StreamManifest.new(path)
349         file_keys.each do |file_name|
350           my_line.add_file(items[file_name])
351         end
352         sub_lines = stream_keys.map do |sub_name|
353           items[sub_name].manifest_text
354         end
355         my_line.to_s + sub_lines.join("")
356       end
357
358       def check_can_add_copy(src_item, key)
359         if existing = check_can_merge(src_item, key) and not existing.leaf?
360           raise Errno::ENOTEMPTY.new(existing.path)
361         end
362       end
363
364       def check_can_merge(src_item, key)
365         if existing = items[key] and (existing.class != src_item.class)
366           raise Errno::ENOTDIR.new(existing.path)
367         end
368         existing
369       end
370
371       def add_copy(src_item, key)
372         if key == "."
373           self[key] = src_item.copy_named("#{path}")
374         else
375           self[key] = src_item.copy_named("#{path}/#{key}")
376         end
377       end
378
379       def merge(src_item, key)
380         # Do a recursive copy of the collection item `src_item` to destination
381         # `key`.  If a simple copy is safe, do that; otherwise, recursively
382         # merge the contents of the stream `src_item` into the stream at
383         # `key`.
384         begin
385           check_can_add_copy(src_item, key)
386           add_copy(src_item, key)
387         rescue Errno::ENOTEMPTY
388           dest = self[key]
389           error = nil
390           # Copy as much as possible, then raise any error encountered.
391           # Start with streams for a depth-first merge.
392           src_items = src_item.items.each_pair.sort_by do |_, sub_item|
393             (sub_item.file?) ? 1 : 0
394           end
395           src_items.each do |sub_key, sub_item|
396             begin
397               dest.merge(sub_item, sub_key)
398             rescue Errno::ENOTDIR => error
399             end
400           end
401           raise error unless error.nil?
402         end
403       end
404
405       def copy_named(copy_path)
406         copy = self.class.new(copy_path)
407         items.each_pair do |key, item|
408           copy.add_copy(item, key)
409         end
410         copy
411       end
412
413       protected
414
415       attr_reader :items
416
417       private
418
419       def []=(key, item)
420         items[key] = item
421       end
422
423       def get_or_new(key, klass, err_class)
424         # Return the collection item at `key` and ensure that it's a `klass`.
425         # If `key` does not exist, create a new `klass` there.
426         # If the value for `key` is not a `klass`, raise an `err_class`.
427         item = items[key]
428         if item.nil?
429           self[key] = klass.new("#{path}/#{key}")
430         elsif not item.is_a?(klass)
431           raise err_class.new(item.path)
432         else
433           item
434         end
435       end
436     end
437
438     class CollectionRoot < CollectionStream
439       def initialize
440         super("")
441         setup
442       end
443
444       def delete(name, opts={})
445         super
446         # If that didn't fail, it deleted the . stream.  Recreate it.
447         setup
448       end
449
450       def check_can_merge(src_item, key)
451         if items.include?(key)
452           super
453         else
454           raise_root_write_error(key)
455         end
456       end
457
458       private
459
460       def setup
461         items["."] = CollectionStream.new(".")
462       end
463
464       def add_copy(src_item, key)
465         items["."].add_copy(src_item, key)
466       end
467
468       def raise_root_write_error(key)
469         raise ArgumentError.new("can't write to %p at collection root" % key)
470       end
471
472       def []=(key, item)
473         raise_root_write_error(key)
474       end
475     end
476
477     class StreamManifest
478       # Build a manifest text for a single stream, without substreams.
479       # The manifest includes files in the order they're added.  If you want
480       # a normalized manifest, add files in lexical order by name.
481
482       def initialize(name)
483         @name = name
484         @loc_ranges = []
485         @loc_range_start = 0
486         @file_specs = []
487       end
488
489       def add_file(coll_file)
490         coll_file.each_segment do |segment|
491           extend_file_specs(coll_file.name, segment)
492         end
493       end
494
495       def to_s
496         if @file_specs.empty?
497           ""
498         else
499           "%s %s %s\n" % [escape_name(@name),
500                           @loc_ranges.collect(&:locator).join(" "),
501                           @file_specs.join(" ")]
502         end
503       end
504
505       private
506
507       def extend_file_specs(filename, segment)
508         found_overlap = false
509         # Find the longest prefix of segment.locators that's a suffix
510         # of the existing @loc_ranges. If we find one, drop those
511         # locators (they'll be added back below, when we're handling
512         # the normal/no-overlap case).
513         (1..segment.locators.length).each do |overlap|
514           if @loc_ranges.length >= overlap && @loc_ranges[-overlap..-1].collect(&:locator) == segment.locators[0..overlap-1]
515             (1..overlap).each do
516               discarded = @loc_ranges.pop
517               @loc_range_start -= (discarded.end - discarded.begin)
518             end
519             found_overlap = true
520             break
521           end
522         end
523
524         # If there was no overlap at the end of our existing
525         # @loc_ranges, check whether the full set of segment.locators
526         # appears earlier in @loc_ranges. If so, use those instead of
527         # appending the same locators again.
528         if !found_overlap && segment.locators.length < @loc_ranges.length
529           segment_start = 0
530           (0..@loc_ranges.length-1).each do |ri|
531             if @loc_ranges[ri..ri+segment.locators.length-1].collect(&:locator) == segment.locators
532               @file_specs << "#{segment.start_pos + @loc_ranges[ri].begin}:#{segment.length}:#{escape_name(filename)}"
533               return
534             end
535           end
536         end
537
538         segment_start = @loc_range_start
539         segment_end = segment_start
540         segment.locators.each do |loc_s|
541           r = LocatorRange.new(loc_s, @loc_range_start)
542           @loc_ranges << r
543           @loc_range_start = r.end
544           segment_end += (r.end - r.begin)
545         end
546         @file_specs << "#{segment.start_pos + segment_start}:#{segment.length}:#{escape_name(filename)}"
547       end
548
549       def escape_name(name)
550         name.gsub(/\\/, "\\\\\\\\").gsub(/\s/) do |s|
551           s.each_byte.map { |c| "\\%03o" % c }.join("")
552         end
553       end
554     end
555   end
556 end