5614: Add Collection#each_file to Ruby SDK.
[arvados.git] / sdk / ruby / lib / arvados / collection.rb
1 require "arvados/keep"
2
3 module Arv
4   class Collection
5     def initialize(manifest_text="")
6       @manifest_text = manifest_text
7       @modified = false
8       @root = CollectionRoot.new
9       manifest = Keep::Manifest.new(manifest_text)
10       manifest.each_line do |stream_root, locators, file_specs|
11         if stream_root.empty? or locators.empty? or file_specs.empty?
12           raise ArgumentError.new("manifest text includes malformed line")
13         end
14         loc_list = LocatorList.new(locators)
15         file_specs.map { |s| manifest.split_file_token(s) }.
16             each do |file_start, file_len, file_path|
17           @root.file_at(normalize_path(stream_root, file_path)).
18             add_segment(loc_list.segment(file_start, file_len))
19         end
20       end
21     end
22
23     def manifest_text
24       @manifest_text ||= @root.manifest_text
25     end
26
27     def modified?
28       @modified
29     end
30
31     def unmodified
32       @modified = false
33       self
34     end
35
36     def normalize
37       @manifest_text = @root.manifest_text
38       self
39     end
40
41     def cp_r(source, target, source_collection=nil)
42       opts = {descend_target: !source.end_with?("/")}
43       copy(:merge, source.chomp("/"), target, source_collection, opts)
44     end
45
46     def each_file_path(&block)
47       @root.each_file_path(&block)
48     end
49
50     def exist?(path)
51       begin
52         substream, item = find(path)
53         not (substream.leaf? or substream[item].nil?)
54       rescue Errno::ENOENT, Errno::ENOTDIR
55         false
56       end
57     end
58
59     def rename(source, target)
60       copy(:add_copy, source, target) { rm_r(source) }
61     end
62
63     def rm(source)
64       remove(source)
65     end
66
67     def rm_r(source)
68       remove(source, recursive: true)
69     end
70
71     protected
72
73     def find(*parts)
74       @root.find(normalize_path(*parts))
75     end
76
77     private
78
79     def modified
80       @manifest_text = nil
81       @modified = true
82       self
83     end
84
85     def normalize_path(*parts)
86       path = File.join(*parts)
87       if path.empty?
88         raise ArgumentError.new("empty path")
89       elsif (path == ".") or path.start_with?("./")
90         path
91       else
92         "./#{path}"
93       end
94     end
95
96     def copy(copy_method, source, target, source_collection=nil, opts={})
97       # Find the item at path `source` in `source_collection`, find the
98       # destination stream at path `target`, and use `copy_method` to copy
99       # the found object there.  If a block is passed in, it will be called
100       # right before we do the actual copy, after we confirm that everything
101       # is found and can be copied.
102       source_collection = self if source_collection.nil?
103       src_stream, src_tail = source_collection.find(source)
104       dst_stream, dst_tail = find(target)
105       if (source_collection.equal?(self) and
106           (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
107         return self
108       end
109       src_item = src_stream[src_tail]
110       dst_tail ||= src_tail
111       check_method = "check_can_#{copy_method}".to_sym
112       target_name = nil
113       if opts.fetch(:descend_target, true)
114         begin
115           # Find out if `target` refers to a stream we should copy into.
116           tail_stream = dst_stream[dst_tail]
117           tail_stream.send(check_method, src_item, src_tail)
118           # Yes it does.  Copy the item at `source` into it with the same name.
119           dst_stream = tail_stream
120           target_name = src_tail
121         rescue Errno::ENOENT, Errno::ENOTDIR
122           # It does not.  We'll fall back to writing to `target` below.
123         end
124       end
125       if target_name.nil?
126         dst_stream.send(check_method, src_item, dst_tail)
127         target_name = dst_tail
128       end
129       # At this point, we know the operation will work.  Call any block as
130       # a pre-copy hook.
131       if block_given?
132         yield
133         # Re-find the destination stream, in case the block removed
134         # the original (that's how rename is implemented).
135         dst_stream = @root.stream_at(dst_stream.path)
136       end
137       dst_stream.send(copy_method, src_item, target_name)
138       modified
139     end
140
141     def remove(path, opts={})
142       stream, name = find(path)
143       stream.delete(name, opts)
144       modified
145     end
146
147     LocatorSegment = Struct.new(:locators, :start_pos, :length)
148
149     class LocatorRange < Range
150       attr_reader :locator
151
152       def initialize(loc_s, start)
153         @locator = loc_s
154         range_end = start + Keep::Locator.parse(loc_s).size.to_i
155         super(start, range_end, false)
156       end
157     end
158
159     class LocatorList
160       # LocatorList efficiently builds LocatorSegments from a stream manifest.
161       def initialize(locators)
162         next_start = 0
163         @ranges = locators.map do |loc_s|
164           new_range = LocatorRange.new(loc_s, next_start)
165           next_start = new_range.end
166           new_range
167         end
168       end
169
170       def segment(start_pos, length)
171         # Return a LocatorSegment that captures `length` bytes from `start_pos`.
172         start_index = search_for_byte(start_pos)
173         if length == 0
174           end_index = start_index
175         else
176           end_index = search_for_byte(start_pos + length - 1, start_index)
177         end
178         seg_ranges = @ranges[start_index..end_index]
179         LocatorSegment.new(seg_ranges.map(&:locator),
180                            start_pos - seg_ranges.first.begin,
181                            length)
182       end
183
184       private
185
186       def search_for_byte(target, start_index=0)
187         # Do a binary search for byte `target` in the list of locators,
188         # starting from `start_index`.  Return the index of the range in
189         # @ranges that contains the byte.
190         lo = start_index
191         hi = @ranges.size
192         loop do
193           ii = (lo + hi) / 2
194           range = @ranges[ii]
195           if range.include?(target)
196             return ii
197           elsif ii == lo
198             raise RangeError.new("%i not in segment" % target)
199           elsif target < range.begin
200             hi = ii
201           else
202             lo = ii
203           end
204         end
205       end
206     end
207
208     class CollectionItem
209       attr_reader :path, :name
210
211       def initialize(path)
212         @path = path
213         @name = File.basename(path)
214       end
215     end
216
217     class CollectionFile < CollectionItem
218       def initialize(path)
219         super
220         @segments = []
221       end
222
223       def self.human_name
224         "file"
225       end
226
227       def file?
228         true
229       end
230
231       def leaf?
232         true
233       end
234
235       def add_segment(segment)
236         @segments << segment
237       end
238
239       def each_segment(&block)
240         @segments.each(&block)
241       end
242
243       def check_can_add_copy(src_item, name)
244         raise Errno::ENOTDIR.new(path)
245       end
246
247       alias_method :check_can_merge, :check_can_add_copy
248
249       def copy_named(copy_path)
250         copy = self.class.new(copy_path)
251         each_segment { |segment| copy.add_segment(segment) }
252         copy
253       end
254     end
255
256     class CollectionStream < CollectionItem
257       def initialize(path)
258         super
259         @items = {}
260       end
261
262       def self.human_name
263         "stream"
264       end
265
266       def file?
267         false
268       end
269
270       def leaf?
271         items.empty?
272       end
273
274       def [](key)
275         items[key] or
276           raise Errno::ENOENT.new("%p not found in %p" % [key, path])
277       end
278
279       def delete(name, opts={})
280         item = self[name]
281         if item.file? or opts[:recursive]
282           items.delete(name)
283         else
284           raise Errno::EISDIR.new(path)
285         end
286       end
287
288       def each_file_path
289         return to_enum(__method__) unless block_given?
290         items.each_value do |item|
291           if item.file?
292             yield item.path
293           else
294             item.each_file_path { |path| yield path }
295           end
296         end
297       end
298
299       def find(find_path)
300         # Given a POSIX-style path, return the CollectionStream that
301         # contains the object at that path, and the name of the object
302         # inside it.
303         components = find_path.split("/")
304         tail = components.pop
305         [components.reduce(self, :[]), tail]
306       end
307
308       def stream_at(find_path)
309         key, rest = find_path.split("/", 2)
310         next_stream = get_or_new(key, CollectionStream)
311         if rest.nil?
312           next_stream
313         else
314           next_stream.stream_at(rest)
315         end
316       end
317
318       def file_at(find_path)
319         stream_path, _, file_name = find_path.rpartition("/")
320         if stream_path.empty?
321           get_or_new(file_name, CollectionFile)
322         else
323           stream_at(stream_path).file_at(file_name)
324         end
325       end
326
327       def manifest_text
328         # Return a string with the normalized manifest text for this stream,
329         # including all substreams.
330         file_keys, stream_keys = items.keys.sort.partition do |key|
331           items[key].file?
332         end
333         my_line = StreamManifest.new(path)
334         file_keys.each do |file_name|
335           my_line.add_file(items[file_name])
336         end
337         sub_lines = stream_keys.map do |sub_name|
338           items[sub_name].manifest_text
339         end
340         my_line.to_s + sub_lines.join("")
341       end
342
343       def check_can_add_copy(src_item, key)
344         if existing = check_can_merge(src_item, key) and not existing.leaf?
345           raise Errno::ENOTEMPTY.new(existing.path)
346         end
347       end
348
349       def check_can_merge(src_item, key)
350         if existing = items[key] and (existing.class != src_item.class)
351           raise Errno::ENOTDIR.new(existing.path)
352         end
353         existing
354       end
355
356       def add_copy(src_item, key)
357         self[key] = src_item.copy_named("#{path}/#{key}")
358       end
359
360       def merge(src_item, key)
361         # Do a recursive copy of the collection item `src_item` to destination
362         # `key`.  If a simple copy is safe, do that; otherwise, recursively
363         # merge the contents of the stream `src_item` into the stream at
364         # `key`.
365         begin
366           check_can_add_copy(src_item, key)
367           add_copy(src_item, key)
368         rescue Errno::ENOTEMPTY
369           dest = self[key]
370           error = nil
371           # Copy as much as possible, then raise any error encountered.
372           # Start with streams for a depth-first merge.
373           src_items = src_item.items.each_pair.sort_by do |_, sub_item|
374             (sub_item.file?) ? 1 : 0
375           end
376           src_items.each do |sub_key, sub_item|
377             begin
378               dest.merge(sub_item, sub_key)
379             rescue Errno::ENOTDIR => error
380             end
381           end
382           raise error unless error.nil?
383         end
384       end
385
386       def copy_named(copy_path)
387         copy = self.class.new(copy_path)
388         items.each_pair do |key, item|
389           copy.add_copy(item, key)
390         end
391         copy
392       end
393
394       protected
395
396       attr_reader :items
397
398       private
399
400       def []=(key, item)
401         items[key] = item
402       end
403
404       def get_or_new(key, klass)
405         # Return the collection item at `key` and ensure that it's a `klass`.
406         # If `key` does not exist, create a new `klass` there.
407         # If the value for `key` is not a `klass`, raise an ArgumentError.
408         item = items[key]
409         if item.nil?
410           self[key] = klass.new("#{path}/#{key}")
411         elsif not item.is_a?(klass)
412           raise ArgumentError.
413             new("in stream %p, %p is a %s, not a %s" %
414                 [path, key, items[key].class.human_name, klass.human_name])
415         else
416           item
417         end
418       end
419     end
420
421     class CollectionRoot < CollectionStream
422       def initialize
423         super("")
424         setup
425       end
426
427       def delete(name, opts={})
428         super
429         # If that didn't fail, it deleted the . stream.  Recreate it.
430         setup
431       end
432
433       def check_can_merge(src_item, key)
434         if items.include?(key)
435           super
436         else
437           raise_root_write_error(key)
438         end
439       end
440
441       private
442
443       def setup
444         items["."] = CollectionStream.new(".")
445       end
446
447       def raise_root_write_error(key)
448         raise ArgumentError.new("can't write to %p at collection root" % key)
449       end
450
451       def []=(key, item)
452         raise_root_write_error(key)
453       end
454     end
455
456     class StreamManifest
457       # Build a manifest text for a single stream, without substreams.
458       # The manifest includes files in the order they're added.  If you want
459       # a normalized manifest, add files in lexical order by name.
460
461       def initialize(name)
462         @name = name
463         @loc_ranges = {}
464         @loc_range_start = 0
465         @file_specs = []
466       end
467
468       def add_file(coll_file)
469         coll_file.each_segment do |segment|
470           extend_locator_ranges(segment.locators)
471           extend_file_specs(coll_file.name, segment)
472         end
473       end
474
475       def to_s
476         if @file_specs.empty?
477           ""
478         else
479           "%s %s %s\n" % [escape_name(@name),
480                           @loc_ranges.keys.join(" "),
481                           @file_specs.join(" ")]
482         end
483       end
484
485       private
486
487       def extend_locator_ranges(locators)
488         locators.
489             select { |loc_s| not @loc_ranges.include?(loc_s) }.
490             each do |loc_s|
491           @loc_ranges[loc_s] = LocatorRange.new(loc_s, @loc_range_start)
492           @loc_range_start = @loc_ranges[loc_s].end
493         end
494       end
495
496       def extend_file_specs(filename, segment)
497         # Given a filename and a LocatorSegment, add the smallest
498         # possible array of file spec strings to @file_specs that
499         # builds the file from available locators.
500         filename = escape_name(filename)
501         start_pos = segment.start_pos
502         length = segment.length
503         start_loc = segment.locators.first
504         prev_loc = start_loc
505         # Build a list of file specs by iterating through the segment's
506         # locators and preparing a file spec for each contiguous range.
507         segment.locators[1..-1].each do |loc_s|
508           range = @loc_ranges[loc_s]
509           if range.begin != @loc_ranges[prev_loc].end
510             range_start, range_length =
511               start_and_length_at(start_loc, prev_loc, start_pos, length)
512             @file_specs << "#{range_start}:#{range_length}:#{filename}"
513             start_pos = 0
514             length -= range_length
515             start_loc = loc_s
516           end
517           prev_loc = loc_s
518         end
519         range_start, range_length =
520           start_and_length_at(start_loc, prev_loc, start_pos, length)
521         @file_specs << "#{range_start}:#{range_length}:#{filename}"
522       end
523
524       def escape_name(name)
525         name.gsub(/\\/, "\\\\\\\\").gsub(/\s/) do |s|
526           s.each_byte.map { |c| "\\%03o" % c }.join("")
527         end
528       end
529
530       def start_and_length_at(start_key, end_key, start_pos, length)
531         range_begin = @loc_ranges[start_key].begin + start_pos
532         range_length = [@loc_ranges[end_key].end - range_begin, length].min
533         [range_begin, range_length]
534       end
535     end
536   end
537 end