7709: Merge branch 'master' into 7709-api-rails4
[arvados.git] / sdk / ruby / lib / arvados / collection.rb
1 require "arvados/keep"
2
3 module Arv
4   class Collection
5     def initialize(manifest_text="")
6       @manifest_text = manifest_text
7       @modified = false
8       @root = CollectionRoot.new
9       manifest = Keep::Manifest.new(manifest_text)
10       manifest.each_line do |stream_root, locators, file_specs|
11         if stream_root.empty? or locators.empty? or file_specs.empty?
12           raise ArgumentError.new("manifest text includes malformed line")
13         end
14         loc_list = LocatorList.new(locators)
15         file_specs.map { |s| manifest.split_file_token(s) }.
16             each do |file_start, file_len, file_path|
17           begin
18             @root.file_at(normalize_path(stream_root, file_path)).
19               add_segment(loc_list.segment(file_start, file_len))
20           rescue Errno::ENOTDIR, Errno::EISDIR => error
21             raise ArgumentError.new("%p is both a stream and file" %
22                                     error.to_s.partition(" - ").last)
23           end
24         end
25       end
26     end
27
28     def manifest_text
29       @manifest_text ||= @root.manifest_text
30     end
31
32     def modified?
33       @modified
34     end
35
36     def unmodified
37       @modified = false
38       self
39     end
40
41     def normalize
42       @manifest_text = @root.manifest_text
43       self
44     end
45
46     def cp_r(source, target, source_collection=nil)
47       opts = {:descend_target => !source.end_with?("/")}
48       copy(:merge, source.chomp("/"), target, source_collection, opts)
49     end
50
51     def each_file_path(&block)
52       @root.each_file_path(&block)
53     end
54
55     def exist?(path)
56       begin
57         substream, item = find(path)
58         not (substream.leaf? or substream[item].nil?)
59       rescue Errno::ENOENT, Errno::ENOTDIR
60         false
61       end
62     end
63
64     def rename(source, target)
65       copy(:add_copy, source, target) { rm_r(source) }
66     end
67
68     def rm(source)
69       remove(source)
70     end
71
72     def rm_r(source)
73       remove(source, :recursive => true)
74     end
75
76     protected
77
78     def find(*parts)
79       @root.find(normalize_path(*parts))
80     end
81
82     private
83
84     def modified
85       @manifest_text = nil
86       @modified = true
87       self
88     end
89
90     def normalize_path(*parts)
91       path = File.join(*parts)
92       if path.empty?
93         raise ArgumentError.new("empty path")
94       elsif (path == ".") or path.start_with?("./")
95         path
96       else
97         "./#{path}"
98       end
99     end
100
101     def copy(copy_method, source, target, source_collection=nil, opts={})
102       # Find the item at path `source` in `source_collection`, find the
103       # destination stream at path `target`, and use `copy_method` to copy
104       # the found object there.  If a block is passed in, it will be called
105       # right before we do the actual copy, after we confirm that everything
106       # is found and can be copied.
107       source_collection = self if source_collection.nil?
108       src_stream, src_tail = source_collection.find(source)
109       dst_stream_path, _, dst_tail = normalize_path(target).rpartition("/")
110       if dst_stream_path.empty?
111         dst_stream, dst_tail = @root.find(dst_tail)
112         dst_tail ||= src_tail
113       else
114         dst_stream = @root.stream_at(dst_stream_path)
115         dst_tail = src_tail if dst_tail.empty?
116       end
117       if (source_collection.equal?(self) and
118           (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
119         return self
120       end
121       src_item = src_stream[src_tail]
122       check_method = "check_can_#{copy_method}".to_sym
123       target_name = nil
124       if opts.fetch(:descend_target, true)
125         begin
126           # Find out if `target` refers to a stream we should copy into.
127           tail_stream = dst_stream[dst_tail]
128           tail_stream.send(check_method, src_item, src_tail)
129           # Yes it does.  Copy the item at `source` into it with the same name.
130           dst_stream = tail_stream
131           target_name = src_tail
132         rescue Errno::ENOENT, Errno::ENOTDIR
133           # It does not.  We'll fall back to writing to `target` below.
134         end
135       end
136       if target_name.nil?
137         dst_stream.send(check_method, src_item, dst_tail)
138         target_name = dst_tail
139       end
140       # At this point, we know the operation will work.  Call any block as
141       # a pre-copy hook.
142       if block_given?
143         yield
144         # Re-find the destination stream, in case the block removed
145         # the original (that's how rename is implemented).
146         dst_stream = @root.stream_at(dst_stream.path)
147       end
148       dst_stream.send(copy_method, src_item, target_name)
149       modified
150     end
151
152     def remove(path, opts={})
153       stream, name = find(path)
154       stream.delete(name, opts)
155       modified
156     end
157
158     Struct.new("LocatorSegment", :locators, :start_pos, :length)
159
160     class LocatorRange < Range
161       attr_reader :locator
162
163       def initialize(loc_s, start)
164         @locator = loc_s
165         range_end = start + Keep::Locator.parse(loc_s).size.to_i
166         super(start, range_end, false)
167       end
168     end
169
170     class LocatorList
171       # LocatorList efficiently builds LocatorSegments from a stream manifest.
172       def initialize(locators)
173         next_start = 0
174         @ranges = locators.map do |loc_s|
175           new_range = LocatorRange.new(loc_s, next_start)
176           next_start = new_range.end
177           new_range
178         end
179       end
180
181       def segment(start_pos, length)
182         # Return a LocatorSegment that captures `length` bytes from `start_pos`.
183         start_index = search_for_byte(start_pos)
184         if length == 0
185           end_index = start_index
186         else
187           end_index = search_for_byte(start_pos + length - 1, start_index)
188         end
189         seg_ranges = @ranges[start_index..end_index]
190         Struct::LocatorSegment.new(seg_ranges.map(&:locator),
191                                    start_pos - seg_ranges.first.begin,
192                                    length)
193       end
194
195       private
196
197       def search_for_byte(target, start_index=0)
198         # Do a binary search for byte `target` in the list of locators,
199         # starting from `start_index`.  Return the index of the range in
200         # @ranges that contains the byte.
201         lo = start_index
202         hi = @ranges.size
203         loop do
204           ii = (lo + hi) / 2
205           range = @ranges[ii]
206           if range.include?(target)
207             return ii
208           elsif ii == lo
209             raise RangeError.new("%i not in segment" % target)
210           elsif target < range.begin
211             hi = ii
212           else
213             lo = ii
214           end
215         end
216       end
217     end
218
219     class CollectionItem
220       attr_reader :path, :name
221
222       def initialize(path)
223         @path = path
224         @name = File.basename(path)
225       end
226     end
227
228     class CollectionFile < CollectionItem
229       def initialize(path)
230         super
231         @segments = []
232       end
233
234       def self.human_name
235         "file"
236       end
237
238       def file?
239         true
240       end
241
242       def leaf?
243         true
244       end
245
246       def add_segment(segment)
247         @segments << segment
248       end
249
250       def each_segment(&block)
251         @segments.each(&block)
252       end
253
254       def check_can_add_copy(src_item, name)
255         raise Errno::ENOTDIR.new(path)
256       end
257
258       alias_method :check_can_merge, :check_can_add_copy
259
260       def copy_named(copy_path)
261         copy = self.class.new(copy_path)
262         each_segment { |segment| copy.add_segment(segment) }
263         copy
264       end
265     end
266
267     class CollectionStream < CollectionItem
268       def initialize(path)
269         super
270         @items = {}
271       end
272
273       def self.human_name
274         "stream"
275       end
276
277       def file?
278         false
279       end
280
281       def leaf?
282         items.empty?
283       end
284
285       def [](key)
286         items[key] or
287           raise Errno::ENOENT.new("%p not found in %p" % [key, path])
288       end
289
290       def delete(name, opts={})
291         item = self[name]
292         if item.file? or opts[:recursive]
293           items.delete(name)
294         else
295           raise Errno::EISDIR.new(path)
296         end
297       end
298
299       def each_file_path
300         return to_enum(__method__) unless block_given?
301         items.each_value do |item|
302           if item.file?
303             yield item.path
304           else
305             item.each_file_path { |path| yield path }
306           end
307         end
308       end
309
310       def find(find_path)
311         # Given a POSIX-style path, return the CollectionStream that
312         # contains the object at that path, and the name of the object
313         # inside it.
314         components = find_path.split("/")
315         tail = components.pop
316         [components.reduce(self, :[]), tail]
317       end
318
319       def stream_at(find_path)
320         key, rest = find_path.split("/", 2)
321         next_stream = get_or_new(key, CollectionStream, Errno::ENOTDIR)
322         if rest.nil?
323           next_stream
324         else
325           next_stream.stream_at(rest)
326         end
327       end
328
329       def file_at(find_path)
330         stream_path, _, file_name = find_path.rpartition("/")
331         if stream_path.empty?
332           get_or_new(file_name, CollectionFile, Errno::EISDIR)
333         else
334           stream_at(stream_path).file_at(file_name)
335         end
336       end
337
338       def manifest_text
339         # Return a string with the normalized manifest text for this stream,
340         # including all substreams.
341         file_keys, stream_keys = items.keys.sort.partition do |key|
342           items[key].file?
343         end
344         my_line = StreamManifest.new(path)
345         file_keys.each do |file_name|
346           my_line.add_file(items[file_name])
347         end
348         sub_lines = stream_keys.map do |sub_name|
349           items[sub_name].manifest_text
350         end
351         my_line.to_s + sub_lines.join("")
352       end
353
354       def check_can_add_copy(src_item, key)
355         if existing = check_can_merge(src_item, key) and not existing.leaf?
356           raise Errno::ENOTEMPTY.new(existing.path)
357         end
358       end
359
360       def check_can_merge(src_item, key)
361         if existing = items[key] and (existing.class != src_item.class)
362           raise Errno::ENOTDIR.new(existing.path)
363         end
364         existing
365       end
366
367       def add_copy(src_item, key)
368         self[key] = src_item.copy_named("#{path}/#{key}")
369       end
370
371       def merge(src_item, key)
372         # Do a recursive copy of the collection item `src_item` to destination
373         # `key`.  If a simple copy is safe, do that; otherwise, recursively
374         # merge the contents of the stream `src_item` into the stream at
375         # `key`.
376         begin
377           check_can_add_copy(src_item, key)
378           add_copy(src_item, key)
379         rescue Errno::ENOTEMPTY
380           dest = self[key]
381           error = nil
382           # Copy as much as possible, then raise any error encountered.
383           # Start with streams for a depth-first merge.
384           src_items = src_item.items.each_pair.sort_by do |_, sub_item|
385             (sub_item.file?) ? 1 : 0
386           end
387           src_items.each do |sub_key, sub_item|
388             begin
389               dest.merge(sub_item, sub_key)
390             rescue Errno::ENOTDIR => error
391             end
392           end
393           raise error unless error.nil?
394         end
395       end
396
397       def copy_named(copy_path)
398         copy = self.class.new(copy_path)
399         items.each_pair do |key, item|
400           copy.add_copy(item, key)
401         end
402         copy
403       end
404
405       protected
406
407       attr_reader :items
408
409       private
410
411       def []=(key, item)
412         items[key] = item
413       end
414
415       def get_or_new(key, klass, err_class)
416         # Return the collection item at `key` and ensure that it's a `klass`.
417         # If `key` does not exist, create a new `klass` there.
418         # If the value for `key` is not a `klass`, raise an `err_class`.
419         item = items[key]
420         if item.nil?
421           self[key] = klass.new("#{path}/#{key}")
422         elsif not item.is_a?(klass)
423           raise err_class.new(item.path)
424         else
425           item
426         end
427       end
428     end
429
430     class CollectionRoot < CollectionStream
431       def initialize
432         super("")
433         setup
434       end
435
436       def delete(name, opts={})
437         super
438         # If that didn't fail, it deleted the . stream.  Recreate it.
439         setup
440       end
441
442       def check_can_merge(src_item, key)
443         if items.include?(key)
444           super
445         else
446           raise_root_write_error(key)
447         end
448       end
449
450       private
451
452       def setup
453         items["."] = CollectionStream.new(".")
454       end
455
456       def raise_root_write_error(key)
457         raise ArgumentError.new("can't write to %p at collection root" % key)
458       end
459
460       def []=(key, item)
461         raise_root_write_error(key)
462       end
463     end
464
465     class StreamManifest
466       # Build a manifest text for a single stream, without substreams.
467       # The manifest includes files in the order they're added.  If you want
468       # a normalized manifest, add files in lexical order by name.
469
470       def initialize(name)
471         @name = name
472         @loc_ranges = {}
473         @loc_range_start = 0
474         @file_specs = []
475       end
476
477       def add_file(coll_file)
478         coll_file.each_segment do |segment|
479           extend_locator_ranges(segment.locators)
480           extend_file_specs(coll_file.name, segment)
481         end
482       end
483
484       def to_s
485         if @file_specs.empty?
486           ""
487         else
488           "%s %s %s\n" % [escape_name(@name),
489                           @loc_ranges.keys.join(" "),
490                           @file_specs.join(" ")]
491         end
492       end
493
494       private
495
496       def extend_locator_ranges(locators)
497         locators.
498             select { |loc_s| not @loc_ranges.include?(loc_s) }.
499             each do |loc_s|
500           @loc_ranges[loc_s] = LocatorRange.new(loc_s, @loc_range_start)
501           @loc_range_start = @loc_ranges[loc_s].end
502         end
503       end
504
505       def extend_file_specs(filename, segment)
506         # Given a filename and a LocatorSegment, add the smallest
507         # possible array of file spec strings to @file_specs that
508         # builds the file from available locators.
509         filename = escape_name(filename)
510         start_pos = segment.start_pos
511         length = segment.length
512         start_loc = segment.locators.first
513         prev_loc = start_loc
514         # Build a list of file specs by iterating through the segment's
515         # locators and preparing a file spec for each contiguous range.
516         segment.locators[1..-1].each do |loc_s|
517           range = @loc_ranges[loc_s]
518           if range.begin != @loc_ranges[prev_loc].end
519             range_start, range_length =
520               start_and_length_at(start_loc, prev_loc, start_pos, length)
521             @file_specs << "#{range_start}:#{range_length}:#{filename}"
522             start_pos = 0
523             length -= range_length
524             start_loc = loc_s
525           end
526           prev_loc = loc_s
527         end
528         range_start, range_length =
529           start_and_length_at(start_loc, prev_loc, start_pos, length)
530         @file_specs << "#{range_start}:#{range_length}:#{filename}"
531       end
532
533       def escape_name(name)
534         name.gsub(/\\/, "\\\\\\\\").gsub(/\s/) do |s|
535           s.each_byte.map { |c| "\\%03o" % c }.join("")
536         end
537       end
538
539       def start_and_length_at(start_key, end_key, start_pos, length)
540         range_begin = @loc_ranges[start_key].begin + start_pos
541         range_length = [@loc_ranges[end_key].end - range_begin, length].min
542         [range_begin, range_length]
543       end
544     end
545   end
546 end