Merge pull request #1 from curoverse/master
[arvados.git] / sdk / ruby / lib / arvados / collection.rb
1 require "arvados/keep"
2
3 module Arv
4   class Collection
5     def initialize(manifest_text="")
6       @manifest_text = manifest_text
7       @modified = false
8       @root = CollectionRoot.new
9       manifest = Keep::Manifest.new(manifest_text)
10       manifest.each_line do |stream_root, locators, file_specs|
11         if stream_root.empty? or locators.empty? or file_specs.empty?
12           raise ArgumentError.new("manifest text includes malformed line")
13         end
14         loc_list = LocatorList.new(locators)
15         file_specs.map { |s| manifest.split_file_token(s) }.
16             each do |file_start, file_len, file_path|
17           @root.file_at(normalize_path(stream_root, file_path)).
18             add_segment(loc_list.segment(file_start, file_len))
19         end
20       end
21     end
22
23     def manifest_text
24       @manifest_text ||= @root.manifest_text
25     end
26
27     def modified?
28       @modified
29     end
30
31     def unmodified
32       @modified = false
33       self
34     end
35
36     def normalize
37       @manifest_text = @root.manifest_text
38       self
39     end
40
41     def cp_r(source, target, source_collection=nil)
42       opts = {descend_target: !source.end_with?("/")}
43       copy(:merge, source.chomp("/"), target, source_collection, opts)
44     end
45
46     def rename(source, target)
47       copy(:add_copy, source, target) { rm_r(source) }
48     end
49
50     def rm(source)
51       remove(source)
52     end
53
54     def rm_r(source)
55       remove(source, recursive: true)
56     end
57
58     protected
59
60     def find(*parts)
61       @root.find(normalize_path(*parts))
62     end
63
64     private
65
66     def modified
67       @manifest_text = nil
68       @modified = true
69       self
70     end
71
72     def normalize_path(*parts)
73       path = File.join(*parts)
74       if path.empty?
75         raise ArgumentError.new("empty path")
76       elsif (path == ".") or path.start_with?("./")
77         path
78       else
79         "./#{path}"
80       end
81     end
82
83     def copy(copy_method, source, target, source_collection=nil, opts={})
84       # Find the item at path `source` in `source_collection`, find the
85       # destination stream at path `target`, and use `copy_method` to copy
86       # the found object there.  If a block is passed in, it will be called
87       # right before we do the actual copy, after we confirm that everything
88       # is found and can be copied.
89       source_collection = self if source_collection.nil?
90       src_stream, src_tail = source_collection.find(source)
91       dst_stream, dst_tail = find(target)
92       if (source_collection.equal?(self) and
93           (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
94         return self
95       end
96       src_item = src_stream[src_tail]
97       dst_tail ||= src_tail
98       check_method = "check_can_#{copy_method}".to_sym
99       target_name = nil
100       if opts.fetch(:descend_target, true)
101         begin
102           # Find out if `target` refers to a stream we should copy into.
103           tail_stream = dst_stream[dst_tail]
104           tail_stream.send(check_method, src_item, src_tail)
105           # Yes it does.  Copy the item at `source` into it with the same name.
106           dst_stream = tail_stream
107           target_name = src_tail
108         rescue Errno::ENOENT, Errno::ENOTDIR
109           # It does not.  We'll fall back to writing to `target` below.
110         end
111       end
112       if target_name.nil?
113         dst_stream.send(check_method, src_item, dst_tail)
114         target_name = dst_tail
115       end
116       # At this point, we know the operation will work.  Call any block as
117       # a pre-copy hook.
118       if block_given?
119         yield
120         # Re-find the destination stream, in case the block removed
121         # the original (that's how rename is implemented).
122         dst_stream = @root.stream_at(dst_stream.path)
123       end
124       dst_stream.send(copy_method, src_item, target_name)
125       modified
126     end
127
128     def remove(path, opts={})
129       stream, name = find(path)
130       stream.delete(name, opts)
131       modified
132     end
133
134     LocatorSegment = Struct.new(:locators, :start_pos, :length)
135
136     class LocatorRange < Range
137       attr_reader :locator
138
139       def initialize(loc_s, start)
140         @locator = loc_s
141         range_end = start + Keep::Locator.parse(loc_s).size.to_i
142         super(start, range_end, false)
143       end
144     end
145
146     class LocatorList
147       # LocatorList efficiently builds LocatorSegments from a stream manifest.
148       def initialize(locators)
149         next_start = 0
150         @ranges = locators.map do |loc_s|
151           new_range = LocatorRange.new(loc_s, next_start)
152           next_start = new_range.end
153           new_range
154         end
155       end
156
157       def segment(start_pos, length)
158         # Return a LocatorSegment that captures `length` bytes from `start_pos`.
159         start_index = search_for_byte(start_pos)
160         if length == 0
161           end_index = start_index
162         else
163           end_index = search_for_byte(start_pos + length - 1, start_index)
164         end
165         seg_ranges = @ranges[start_index..end_index]
166         LocatorSegment.new(seg_ranges.map(&:locator),
167                            start_pos - seg_ranges.first.begin,
168                            length)
169       end
170
171       private
172
173       def search_for_byte(target, start_index=0)
174         # Do a binary search for byte `target` in the list of locators,
175         # starting from `start_index`.  Return the index of the range in
176         # @ranges that contains the byte.
177         lo = start_index
178         hi = @ranges.size
179         loop do
180           ii = (lo + hi) / 2
181           range = @ranges[ii]
182           if range.include?(target)
183             return ii
184           elsif ii == lo
185             raise RangeError.new("%i not in segment" % target)
186           elsif target < range.begin
187             hi = ii
188           else
189             lo = ii
190           end
191         end
192       end
193     end
194
195     class CollectionItem
196       attr_reader :path, :name
197
198       def initialize(path)
199         @path = path
200         @name = File.basename(path)
201       end
202     end
203
204     class CollectionFile < CollectionItem
205       def initialize(path)
206         super
207         @segments = []
208       end
209
210       def self.human_name
211         "file"
212       end
213
214       def file?
215         true
216       end
217
218       def leaf?
219         true
220       end
221
222       def add_segment(segment)
223         @segments << segment
224       end
225
226       def each_segment(&block)
227         @segments.each(&block)
228       end
229
230       def check_can_add_copy(src_item, name)
231         raise Errno::ENOTDIR.new(path)
232       end
233
234       alias_method :check_can_merge, :check_can_add_copy
235
236       def copy_named(copy_path)
237         copy = self.class.new(copy_path)
238         each_segment { |segment| copy.add_segment(segment) }
239         copy
240       end
241     end
242
243     class CollectionStream < CollectionItem
244       def initialize(path)
245         super
246         @items = {}
247       end
248
249       def self.human_name
250         "stream"
251       end
252
253       def file?
254         false
255       end
256
257       def leaf?
258         items.empty?
259       end
260
261       def [](key)
262         items[key] or
263           raise Errno::ENOENT.new("%p not found in %p" % [key, path])
264       end
265
266       def delete(name, opts={})
267         item = self[name]
268         if item.file? or opts[:recursive]
269           items.delete(name)
270         else
271           raise Errno::EISDIR.new(path)
272         end
273       end
274
275       def find(find_path)
276         # Given a POSIX-style path, return the CollectionStream that
277         # contains the object at that path, and the name of the object
278         # inside it.
279         components = find_path.split("/")
280         tail = components.pop
281         [components.reduce(self, :[]), tail]
282       end
283
284       def stream_at(find_path)
285         key, rest = find_path.split("/", 2)
286         next_stream = get_or_new(key, CollectionStream)
287         if rest.nil?
288           next_stream
289         else
290           next_stream.stream_at(rest)
291         end
292       end
293
294       def file_at(find_path)
295         stream_path, _, file_name = find_path.rpartition("/")
296         if stream_path.empty?
297           get_or_new(file_name, CollectionFile)
298         else
299           stream_at(stream_path).file_at(file_name)
300         end
301       end
302
303       def manifest_text
304         # Return a string with the normalized manifest text for this stream,
305         # including all substreams.
306         file_keys, stream_keys = items.keys.sort.partition do |key|
307           items[key].file?
308         end
309         my_line = StreamManifest.new(path)
310         file_keys.each do |file_name|
311           my_line.add_file(items[file_name])
312         end
313         sub_lines = stream_keys.map do |sub_name|
314           items[sub_name].manifest_text
315         end
316         my_line.to_s + sub_lines.join("")
317       end
318
319       def check_can_add_copy(src_item, key)
320         if existing = check_can_merge(src_item, key) and not existing.leaf?
321           raise Errno::ENOTEMPTY.new(existing.path)
322         end
323       end
324
325       def check_can_merge(src_item, key)
326         if existing = items[key] and (existing.class != src_item.class)
327           raise Errno::ENOTDIR.new(existing.path)
328         end
329         existing
330       end
331
332       def add_copy(src_item, key)
333         self[key] = src_item.copy_named("#{path}/#{key}")
334       end
335
336       def merge(src_item, key)
337         # Do a recursive copy of the collection item `src_item` to destination
338         # `key`.  If a simple copy is safe, do that; otherwise, recursively
339         # merge the contents of the stream `src_item` into the stream at
340         # `key`.
341         begin
342           check_can_add_copy(src_item, key)
343           add_copy(src_item, key)
344         rescue Errno::ENOTEMPTY
345           dest = self[key]
346           error = nil
347           # Copy as much as possible, then raise any error encountered.
348           # Start with streams for a depth-first merge.
349           src_items = src_item.items.each_pair.sort_by do |_, sub_item|
350             (sub_item.file?) ? 1 : 0
351           end
352           src_items.each do |sub_key, sub_item|
353             begin
354               dest.merge(sub_item, sub_key)
355             rescue Errno::ENOTDIR => error
356             end
357           end
358           raise error unless error.nil?
359         end
360       end
361
362       def copy_named(copy_path)
363         copy = self.class.new(copy_path)
364         items.each_pair do |key, item|
365           copy.add_copy(item, key)
366         end
367         copy
368       end
369
370       protected
371
372       attr_reader :items
373
374       private
375
376       def []=(key, item)
377         items[key] = item
378       end
379
380       def get_or_new(key, klass)
381         # Return the collection item at `key` and ensure that it's a `klass`.
382         # If `key` does not exist, create a new `klass` there.
383         # If the value for `key` is not a `klass`, raise an ArgumentError.
384         item = items[key]
385         if item.nil?
386           self[key] = klass.new("#{path}/#{key}")
387         elsif not item.is_a?(klass)
388           raise ArgumentError.
389             new("in stream %p, %p is a %s, not a %s" %
390                 [path, key, items[key].class.human_name, klass.human_name])
391         else
392           item
393         end
394       end
395     end
396
397     class CollectionRoot < CollectionStream
398       def initialize
399         super("")
400         setup
401       end
402
403       def delete(name, opts={})
404         super
405         # If that didn't fail, it deleted the . stream.  Recreate it.
406         setup
407       end
408
409       def check_can_merge(src_item, key)
410         if items.include?(key)
411           super
412         else
413           raise_root_write_error(key)
414         end
415       end
416
417       private
418
419       def setup
420         items["."] = CollectionStream.new(".")
421       end
422
423       def raise_root_write_error(key)
424         raise ArgumentError.new("can't write to %p at collection root" % key)
425       end
426
427       def []=(key, item)
428         raise_root_write_error(key)
429       end
430     end
431
432     class StreamManifest
433       # Build a manifest text for a single stream, without substreams.
434       # The manifest includes files in the order they're added.  If you want
435       # a normalized manifest, add files in lexical order by name.
436
437       def initialize(name)
438         @name = name
439         @loc_ranges = {}
440         @loc_range_start = 0
441         @file_specs = []
442       end
443
444       def add_file(coll_file)
445         coll_file.each_segment do |segment|
446           extend_locator_ranges(segment.locators)
447           extend_file_specs(coll_file.name, segment)
448         end
449       end
450
451       def to_s
452         if @file_specs.empty?
453           ""
454         else
455           "%s %s %s\n" % [escape_name(@name),
456                           @loc_ranges.keys.join(" "),
457                           @file_specs.join(" ")]
458         end
459       end
460
461       private
462
463       def extend_locator_ranges(locators)
464         locators.
465             select { |loc_s| not @loc_ranges.include?(loc_s) }.
466             each do |loc_s|
467           @loc_ranges[loc_s] = LocatorRange.new(loc_s, @loc_range_start)
468           @loc_range_start = @loc_ranges[loc_s].end
469         end
470       end
471
472       def extend_file_specs(filename, segment)
473         # Given a filename and a LocatorSegment, add the smallest
474         # possible array of file spec strings to @file_specs that
475         # builds the file from available locators.
476         filename = escape_name(filename)
477         start_pos = segment.start_pos
478         length = segment.length
479         start_loc = segment.locators.first
480         prev_loc = start_loc
481         # Build a list of file specs by iterating through the segment's
482         # locators and preparing a file spec for each contiguous range.
483         segment.locators[1..-1].each do |loc_s|
484           range = @loc_ranges[loc_s]
485           if range.begin != @loc_ranges[prev_loc].end
486             range_start, range_length =
487               start_and_length_at(start_loc, prev_loc, start_pos, length)
488             @file_specs << "#{range_start}:#{range_length}:#{filename}"
489             start_pos = 0
490             length -= range_length
491             start_loc = loc_s
492           end
493           prev_loc = loc_s
494         end
495         range_start, range_length =
496           start_and_length_at(start_loc, prev_loc, start_pos, length)
497         @file_specs << "#{range_start}:#{range_length}:#{filename}"
498       end
499
500       def escape_name(name)
501         name.gsub(/\\/, "\\\\\\\\").gsub(/\s/) do |s|
502           s.each_byte.map { |c| "\\%03o" % c }.join("")
503         end
504       end
505
506       def start_and_length_at(start_key, end_key, start_pos, length)
507         range_begin = @loc_ranges[start_key].begin + start_pos
508         range_length = [@loc_ranges[end_key].end - range_begin, length].min
509         [range_begin, range_length]
510       end
511     end
512   end
513 end