12483: Avoid write/marshal race. Remove dead code.
[arvados.git] / sdk / ruby / lib / arvados / collection.rb
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 require "arvados/keep"
6
7 module Arv
8   class Collection
9     def initialize(manifest_text="")
10       @manifest_text = manifest_text
11       @modified = false
12       @root = CollectionRoot.new
13       manifest = Keep::Manifest.new(manifest_text)
14       manifest.each_line do |stream_root, locators, file_specs|
15         if stream_root.empty? or locators.empty? or file_specs.empty?
16           raise ArgumentError.new("manifest text includes malformed line")
17         end
18         loc_list = LocatorList.new(locators)
19         file_specs.map { |s| manifest.split_file_token(s) }.
20             each do |file_start, file_len, file_path|
21           begin
22             @root.file_at(normalize_path(stream_root, file_path)).
23               add_segment(loc_list.segment(file_start, file_len))
24           rescue Errno::ENOTDIR, Errno::EISDIR => error
25             raise ArgumentError.new("%p is both a stream and file" %
26                                     error.to_s.partition(" - ").last)
27           end
28         end
29       end
30     end
31
32     def manifest_text
33       @manifest_text ||= @root.manifest_text
34     end
35
36     def modified?
37       @modified
38     end
39
40     def unmodified
41       @modified = false
42       self
43     end
44
45     def normalize
46       @manifest_text = @root.manifest_text
47       self
48     end
49
50     def cp_r(source, target, source_collection=nil)
51       opts = {:descend_target => !source.end_with?("/")}
52       copy(:merge, source.chomp("/"), target, source_collection, opts)
53     end
54
55     def each_file_path(&block)
56       @root.each_file_path(&block)
57     end
58
59     def exist?(path)
60       begin
61         substream, item = find(path)
62         not (substream.leaf? or substream[item].nil?)
63       rescue Errno::ENOENT, Errno::ENOTDIR
64         false
65       end
66     end
67
68     def rename(source, target)
69       copy(:add_copy, source, target) { rm_r(source) }
70     end
71
72     def rm(source)
73       remove(source)
74     end
75
76     def rm_r(source)
77       remove(source, :recursive => true)
78     end
79
80     protected
81
82     def find(*parts)
83       @root.find(normalize_path(*parts))
84     end
85
86     private
87
88     def modified
89       @manifest_text = nil
90       @modified = true
91       self
92     end
93
94     def normalize_path(*parts)
95       path = File.join(*parts)
96       if path.empty?
97         raise ArgumentError.new("empty path")
98       elsif (path == ".") or path.start_with?("./")
99         path
100       else
101         "./#{path}"
102       end
103     end
104
105     def copy(copy_method, source, target, source_collection=nil, opts={})
106       # Find the item at path `source` in `source_collection`, find the
107       # destination stream at path `target`, and use `copy_method` to copy
108       # the found object there.  If a block is passed in, it will be called
109       # right before we do the actual copy, after we confirm that everything
110       # is found and can be copied.
111       source_collection = self if source_collection.nil?
112       src_stream, src_tail = source_collection.find(source)
113       dst_stream_path, _, dst_tail = normalize_path(target).rpartition("/")
114       if dst_stream_path.empty?
115         dst_stream, dst_tail = @root.find(dst_tail)
116         dst_tail ||= src_tail
117       else
118         dst_stream = @root.stream_at(dst_stream_path)
119         dst_tail = src_tail if dst_tail.empty?
120       end
121       if (source_collection.equal?(self) and
122           (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
123         return self
124       end
125       src_item = src_stream[src_tail]
126       check_method = "check_can_#{copy_method}".to_sym
127       target_name = nil
128       if opts.fetch(:descend_target, true)
129         begin
130           # Find out if `target` refers to a stream we should copy into.
131           tail_stream = dst_stream[dst_tail]
132           tail_stream.send(check_method, src_item, src_tail)
133           # Yes it does.  Copy the item at `source` into it with the same name.
134           dst_stream = tail_stream
135           target_name = src_tail
136         rescue Errno::ENOENT, Errno::ENOTDIR
137           # It does not.  We'll fall back to writing to `target` below.
138         end
139       end
140       if target_name.nil?
141         dst_stream.send(check_method, src_item, dst_tail)
142         target_name = dst_tail
143       end
144       # At this point, we know the operation will work.  Call any block as
145       # a pre-copy hook.
146       if block_given?
147         yield
148         # Re-find the destination stream, in case the block removed
149         # the original (that's how rename is implemented).
150         dst_stream = @root.stream_at(dst_stream.path)
151       end
152       dst_stream.send(copy_method, src_item, target_name)
153       modified
154     end
155
156     def remove(path, opts={})
157       stream, name = find(path)
158       stream.delete(name, opts)
159       modified
160     end
161
162     Struct.new("LocatorSegment", :locators, :start_pos, :length)
163
164     class LocatorRange < Range
165       attr_reader :locator
166
167       def initialize(loc_s, start)
168         @locator = loc_s
169         range_end = start + Keep::Locator.parse(loc_s).size.to_i
170         super(start, range_end, false)
171       end
172     end
173
174     class LocatorList
175       # LocatorList efficiently builds LocatorSegments from a stream manifest.
176       def initialize(locators)
177         next_start = 0
178         @ranges = locators.map do |loc_s|
179           new_range = LocatorRange.new(loc_s, next_start)
180           next_start = new_range.end
181           new_range
182         end
183       end
184
185       def segment(start_pos, length)
186         # Return a LocatorSegment that captures `length` bytes from `start_pos`.
187         start_index = search_for_byte(start_pos)
188         if length == 0
189           end_index = start_index
190         else
191           end_index = search_for_byte(start_pos + length - 1, start_index)
192         end
193         seg_ranges = @ranges[start_index..end_index]
194         Struct::LocatorSegment.new(seg_ranges.map(&:locator),
195                                    start_pos - seg_ranges.first.begin,
196                                    length)
197       end
198
199       private
200
201       def search_for_byte(target, start_index=0)
202         # Do a binary search for byte `target` in the list of locators,
203         # starting from `start_index`.  Return the index of the range in
204         # @ranges that contains the byte.
205         lo = start_index
206         hi = @ranges.size
207         loop do
208           ii = (lo + hi) / 2
209           range = @ranges[ii]
210           if range.include?(target)
211             return ii
212           elsif ii == lo
213             raise RangeError.new("%i not in segment" % target)
214           elsif target < range.begin
215             hi = ii
216           else
217             lo = ii
218           end
219         end
220       end
221     end
222
223     class CollectionItem
224       attr_reader :path, :name
225
226       def initialize(path)
227         @path = path
228         @name = File.basename(path)
229       end
230     end
231
232     class CollectionFile < CollectionItem
233       def initialize(path)
234         super
235         @segments = []
236       end
237
238       def self.human_name
239         "file"
240       end
241
242       def file?
243         true
244       end
245
246       def leaf?
247         true
248       end
249
250       def add_segment(segment)
251         @segments << segment
252       end
253
254       def each_segment(&block)
255         @segments.each(&block)
256       end
257
258       def check_can_add_copy(src_item, name)
259         raise Errno::ENOTDIR.new(path)
260       end
261
262       alias_method :check_can_merge, :check_can_add_copy
263
264       def copy_named(copy_path)
265         copy = self.class.new(copy_path)
266         each_segment { |segment| copy.add_segment(segment) }
267         copy
268       end
269     end
270
271     class CollectionStream < CollectionItem
272       def initialize(path)
273         super
274         @items = {}
275       end
276
277       def self.human_name
278         "stream"
279       end
280
281       def file?
282         false
283       end
284
285       def leaf?
286         items.empty?
287       end
288
289       def [](key)
290         items[key] or
291           raise Errno::ENOENT.new("%p not found in %p" % [key, path])
292       end
293
294       def delete(name, opts={})
295         item = self[name]
296         if item.file? or opts[:recursive]
297           items.delete(name)
298         else
299           raise Errno::EISDIR.new(path)
300         end
301       end
302
303       def each_file_path
304         return to_enum(__method__) unless block_given?
305         items.each_value do |item|
306           if item.file?
307             yield item.path
308           else
309             item.each_file_path { |path| yield path }
310           end
311         end
312       end
313
314       def find(find_path)
315         # Given a POSIX-style path, return the CollectionStream that
316         # contains the object at that path, and the name of the object
317         # inside it.
318         components = find_path.split("/")
319         tail = components.pop
320         [components.reduce(self, :[]), tail]
321       end
322
323       def stream_at(find_path)
324         key, rest = find_path.split("/", 2)
325         next_stream = get_or_new(key, CollectionStream, Errno::ENOTDIR)
326         if rest.nil?
327           next_stream
328         else
329           next_stream.stream_at(rest)
330         end
331       end
332
333       def file_at(find_path)
334         stream_path, _, file_name = find_path.rpartition("/")
335         if stream_path.empty?
336           get_or_new(file_name, CollectionFile, Errno::EISDIR)
337         else
338           stream_at(stream_path).file_at(file_name)
339         end
340       end
341
342       def manifest_text
343         # Return a string with the normalized manifest text for this stream,
344         # including all substreams.
345         file_keys, stream_keys = items.keys.sort.partition do |key|
346           items[key].file?
347         end
348         my_line = StreamManifest.new(path)
349         file_keys.each do |file_name|
350           my_line.add_file(items[file_name])
351         end
352         sub_lines = stream_keys.map do |sub_name|
353           items[sub_name].manifest_text
354         end
355         my_line.to_s + sub_lines.join("")
356       end
357
358       def check_can_add_copy(src_item, key)
359         if existing = check_can_merge(src_item, key) and not existing.leaf?
360           raise Errno::ENOTEMPTY.new(existing.path)
361         end
362       end
363
364       def check_can_merge(src_item, key)
365         if existing = items[key] and (existing.class != src_item.class)
366           raise Errno::ENOTDIR.new(existing.path)
367         end
368         existing
369       end
370
371       def add_copy(src_item, key)
372         self[key] = src_item.copy_named("#{path}/#{key}")
373       end
374
375       def merge(src_item, key)
376         # Do a recursive copy of the collection item `src_item` to destination
377         # `key`.  If a simple copy is safe, do that; otherwise, recursively
378         # merge the contents of the stream `src_item` into the stream at
379         # `key`.
380         begin
381           check_can_add_copy(src_item, key)
382           add_copy(src_item, key)
383         rescue Errno::ENOTEMPTY
384           dest = self[key]
385           error = nil
386           # Copy as much as possible, then raise any error encountered.
387           # Start with streams for a depth-first merge.
388           src_items = src_item.items.each_pair.sort_by do |_, sub_item|
389             (sub_item.file?) ? 1 : 0
390           end
391           src_items.each do |sub_key, sub_item|
392             begin
393               dest.merge(sub_item, sub_key)
394             rescue Errno::ENOTDIR => error
395             end
396           end
397           raise error unless error.nil?
398         end
399       end
400
401       def copy_named(copy_path)
402         copy = self.class.new(copy_path)
403         items.each_pair do |key, item|
404           copy.add_copy(item, key)
405         end
406         copy
407       end
408
409       protected
410
411       attr_reader :items
412
413       private
414
415       def []=(key, item)
416         items[key] = item
417       end
418
419       def get_or_new(key, klass, err_class)
420         # Return the collection item at `key` and ensure that it's a `klass`.
421         # If `key` does not exist, create a new `klass` there.
422         # If the value for `key` is not a `klass`, raise an `err_class`.
423         item = items[key]
424         if item.nil?
425           self[key] = klass.new("#{path}/#{key}")
426         elsif not item.is_a?(klass)
427           raise err_class.new(item.path)
428         else
429           item
430         end
431       end
432     end
433
434     class CollectionRoot < CollectionStream
435       def initialize
436         super("")
437         setup
438       end
439
440       def delete(name, opts={})
441         super
442         # If that didn't fail, it deleted the . stream.  Recreate it.
443         setup
444       end
445
446       def check_can_merge(src_item, key)
447         if items.include?(key)
448           super
449         else
450           raise_root_write_error(key)
451         end
452       end
453
454       private
455
456       def setup
457         items["."] = CollectionStream.new(".")
458       end
459
460       def raise_root_write_error(key)
461         raise ArgumentError.new("can't write to %p at collection root" % key)
462       end
463
464       def []=(key, item)
465         raise_root_write_error(key)
466       end
467     end
468
469     class StreamManifest
470       # Build a manifest text for a single stream, without substreams.
471       # The manifest includes files in the order they're added.  If you want
472       # a normalized manifest, add files in lexical order by name.
473
474       def initialize(name)
475         @name = name
476         @loc_ranges = {}
477         @loc_range_start = 0
478         @file_specs = []
479       end
480
481       def add_file(coll_file)
482         coll_file.each_segment do |segment|
483           extend_locator_ranges(segment.locators)
484           extend_file_specs(coll_file.name, segment)
485         end
486       end
487
488       def to_s
489         if @file_specs.empty?
490           ""
491         else
492           "%s %s %s\n" % [escape_name(@name),
493                           @loc_ranges.keys.join(" "),
494                           @file_specs.join(" ")]
495         end
496       end
497
498       private
499
500       def extend_locator_ranges(locators)
501         locators.
502             select { |loc_s| not @loc_ranges.include?(loc_s) }.
503             each do |loc_s|
504           @loc_ranges[loc_s] = LocatorRange.new(loc_s, @loc_range_start)
505           @loc_range_start = @loc_ranges[loc_s].end
506         end
507       end
508
509       def extend_file_specs(filename, segment)
510         # Given a filename and a LocatorSegment, add the smallest
511         # possible array of file spec strings to @file_specs that
512         # builds the file from available locators.
513         filename = escape_name(filename)
514         start_pos = segment.start_pos
515         length = segment.length
516         start_loc = segment.locators.first
517         prev_loc = start_loc
518         # Build a list of file specs by iterating through the segment's
519         # locators and preparing a file spec for each contiguous range.
520         segment.locators[1..-1].each do |loc_s|
521           range = @loc_ranges[loc_s]
522           if range.begin != @loc_ranges[prev_loc].end
523             range_start, range_length =
524               start_and_length_at(start_loc, prev_loc, start_pos, length)
525             @file_specs << "#{range_start}:#{range_length}:#{filename}"
526             start_pos = 0
527             length -= range_length
528             start_loc = loc_s
529           end
530           prev_loc = loc_s
531         end
532         range_start, range_length =
533           start_and_length_at(start_loc, prev_loc, start_pos, length)
534         @file_specs << "#{range_start}:#{range_length}:#{filename}"
535       end
536
537       def escape_name(name)
538         name.gsub(/\\/, "\\\\\\\\").gsub(/\s/) do |s|
539           s.each_byte.map { |c| "\\%03o" % c }.join("")
540         end
541       end
542
543       def start_and_length_at(start_key, end_key, start_pos, length)
544         range_begin = @loc_ranges[start_key].begin + start_pos
545         range_length = [@loc_ranges[end_key].end - range_begin, length].min
546         [range_begin, range_length]
547       end
548     end
549   end
550 end