5104: Add Collection class to the Ruby SDK.
[arvados.git] / sdk / ruby / lib / arvados / collection.rb
1 require "arvados/keep"
2
3 module Arv
4   class Collection
5     def initialize(manifest_text="")
6       @tree = CollectionStream.new(".")
7       @manifest_text = ""
8       import_manifest!(manifest_text)
9     end
10
11     def manifest_text
12       @manifest_text ||= @tree.manifest_text
13     end
14
15     def import_manifest!(manifest_text)
16       manifest = Keep::Manifest.new(manifest_text)
17       manifest.each_line do |stream_root, locators, file_specs|
18         if stream_root.empty? or locators.empty? or file_specs.empty?
19           raise ArgumentError.new("manifest text includes malformed line")
20         end
21         file_specs.map { |s| manifest.split_file_token(s) }.
22             each do |file_start, file_len, file_path|
23           @tree.file_at(normalize_path(stream_root, file_path)).
24             add_range(locators, file_start, file_len)
25         end
26       end
27       if @manifest_text == ""
28         @manifest_text = manifest_text
29         self
30       else
31         modified!
32       end
33     end
34
35     def normalize!
36       # We generate normalized manifests, so all we have to do is force
37       # regeneration.
38       modified!
39     end
40
41     def copy!(source, target, source_collection=nil)
42       copy(:merge, source, target, source_collection)
43     end
44
45     def rename!(source, target)
46       copy(:add_copy, source, target) { remove!(source, recursive: true) }
47     end
48
49     def remove!(path, opts={})
50       stream, name = find(path)
51       if name.nil?
52         return self if @tree.leaf?
53         @tree = CollectionStream.new(".")
54       else
55         stream.delete(name, opts)
56       end
57       modified!
58     end
59
60     protected
61
62     def find(*parts)
63       normpath = normalize_path(*parts)
64       if normpath.empty?
65         [@tree, nil]
66       else
67         @tree.find(normpath)
68       end
69     end
70
71     private
72
73     def copy(copy_method, source, target, source_collection=nil)
74       # Find the item at path `source` in `source_collection`, find the
75       # destination stream at path `target`, and use `copy_method` to copy
76       # the found object there.  If a block is passed in, it will be called
77       # right before we do the actual copy, after we confirm that everything
78       # is found and can be copied.
79       source_collection = self if source_collection.nil?
80       src_stream, src_tail = source_collection.find(source)
81       dst_stream, dst_tail = find(target)
82       if (source_collection.equal?(self) and
83           (src_stream.path == dst_stream.path) and (src_tail == dst_tail))
84         return self
85       elsif src_tail.nil?
86         src_item = src_stream
87         src_tail = src_stream.name
88       else
89         src_item = src_stream[src_tail]
90       end
91       dst_tail ||= src_tail
92       check_method = "check_can_#{copy_method}".to_sym
93       begin
94         # Find out if `target` refers to a stream we should copy into.
95         tail_stream = dst_stream[dst_tail]
96         tail_stream.send(check_method, src_item, src_tail)
97       rescue Errno::ENOENT, Errno::ENOTDIR
98         # It does not.  Check that we can copy `source` to the full
99         # path specified by `target`.
100         dst_stream.send(check_method, src_item, dst_tail)
101         target_name = dst_tail
102       else
103         # Yes, `target` is a stream.  Copy the item at `source` into it with
104         # the same name.
105         dst_stream = tail_stream
106         target_name = src_tail
107       end
108       # At this point, we know the operation will work.  Call any block as
109       # a pre-copy hook.
110       if block_given?
111         yield
112         # Re-find the destination stream, in case the block removed
113         # the original (that's how rename is implemented).
114         dst_path = normalize_path(dst_stream.path)
115         if dst_path.empty?
116           dst_stream = @tree
117         else
118           dst_stream = @tree.stream_at(dst_path)
119         end
120       end
121       dst_stream.send(copy_method, src_item, target_name)
122       modified!
123     end
124
125     def modified!
126       @manifest_text = nil
127       self
128     end
129
130     def normalize_path(*parts)
131       path = File.join(*parts)
132       raise ArgumentError.new("empty path") if path.empty?
133       path.sub(/^\.(\/|$)/, "")
134     end
135
136     class CollectionItem
137       attr_reader :path, :name
138
139       def initialize(path)
140         @path = path
141         @name = File.basename(path)
142       end
143     end
144
145     LocatorRange = Struct.new(:locators, :start_pos, :length)
146
147     class CollectionFile < CollectionItem
148       def initialize(path)
149         super
150         @ranges = []
151       end
152
153       def self.human_name
154         "file"
155       end
156
157       def leaf?
158         true
159       end
160
161       def add_range(locators, start_pos, length)
162         # Given an array of locators, and this file's start position and
163         # length within them, store a LocatorRange with information about
164         # the locators actually used.
165         loc_sizes = locators.map { |s| Keep::Locator.parse(s).size.to_i }
166         start_index, start_pos = loc_size_index(loc_sizes, start_pos, 0, :>=)
167         end_index, _ = loc_size_index(loc_sizes, length, start_index, :>)
168         @ranges << LocatorRange.
169           new(locators[start_index..end_index], start_pos, length)
170       end
171
172       def each_range(&block)
173         @ranges.each(&block)
174       end
175
176       def check_can_add_copy(src_item, name)
177         raise Errno::ENOTDIR.new(path)
178       end
179
180       alias_method :check_can_merge, :check_can_add_copy
181
182       def copy_named(copy_path)
183         copy = self.class.new(copy_path)
184         each_range { |range| copy.add_range(*range) }
185         copy
186       end
187
188       private
189
190       def loc_size_index(loc_sizes, length, index, comp_op)
191         # Pass in an array of locator size hints (integers).  Starting from
192         # `index`, step through the size array until they provide a number
193         # of bytes that is `comp_op` (:>= or :>) to `length`.  Return the
194         # index of the end locator and the amount of data to read from it.
195         while length.send(comp_op, loc_sizes[index])
196           index += 1
197           length -= loc_sizes[index]
198         end
199         [index, length]
200       end
201     end
202
203     class CollectionStream < CollectionItem
204       def initialize(path)
205         super
206         @items = {}
207       end
208
209       def self.human_name
210         "stream"
211       end
212
213       def leaf?
214         items.empty?
215       end
216
217       def [](key)
218         items[key] or
219           raise Errno::ENOENT.new("%p not found in %p" % [key, path])
220       end
221
222       def delete(name, opts={})
223         item = self[name]
224         if item.leaf? or opts[:recursive]
225           items.delete(name)
226         else
227           raise Errno::ENOTEMPTY.new(path)
228         end
229       end
230
231       def find(find_path)
232         # Given a POSIX-style path, return the CollectionStream that
233         # contains the object at that path, and the name of the object
234         # inside it.
235         components = find_path.split("/")
236         tail = components.pop
237         [components.reduce(self, :[]), tail]
238       end
239
240       def stream_at(find_path)
241         key, rest = find_path.split("/", 2)
242         next_stream = get_or_new(key, CollectionStream)
243         if rest.nil?
244           next_stream
245         else
246           next_stream.stream_at(rest)
247         end
248       end
249
250       def file_at(find_path)
251         stream_path, _, file_name = find_path.rpartition("/")
252         if stream_path.empty?
253           get_or_new(file_name, CollectionFile)
254         else
255           stream_at(stream_path).file_at(file_name)
256         end
257       end
258
259       def manifest_text
260         # Return a string with the normalized manifest text for this stream,
261         # including all substreams.
262         file_keys, stream_keys = items.keys.sort.partition do |key|
263           items[key].is_a?(CollectionFile)
264         end
265         my_line = StreamManifest.new(path)
266         file_keys.each do |file_name|
267           my_line.add_file(items[file_name])
268         end
269         sub_lines = stream_keys.map do |sub_name|
270           items[sub_name].manifest_text
271         end
272         my_line.to_s + sub_lines.join("")
273       end
274
275       def check_can_add_copy(src_item, key)
276         if existing = check_can_merge(src_item, key) and not existing.leaf?
277           raise Errno::ENOTEMPTY.new(existing.path)
278         end
279       end
280
281       def check_can_merge(src_item, key)
282         if existing = items[key] and (existing.class != src_item.class)
283           raise Errno::ENOTDIR.new(existing.path)
284         end
285         existing
286       end
287
288       def add_copy(src_item, key)
289         items[key] = src_item.copy_named("#{path}/#{key}")
290       end
291
292       def merge(src_item, key)
293         # Do a recursive copy of the collection item `src_item` to destination
294         # `key`.  If a simple copy is safe, do that; otherwise, recursively
295         # merge the contents of the stream `src_item` into the stream at
296         # `key`.
297         begin
298           check_can_add_copy(src_item, key)
299           add_copy(src_item, key)
300         rescue Errno::ENOTEMPTY
301           dest = self[key]
302           error = nil
303           # Copy as much as possible, then raise any error encountered.
304           src_item.items.each_pair do |sub_key, sub_item|
305             begin
306               dest.merge(sub_item, sub_key)
307             rescue Errno::ENOTDIR => error
308             end
309           end
310           raise error unless error.nil?
311         end
312       end
313
314       def copy_named(copy_path)
315         copy = self.class.new(copy_path)
316         items.each_pair do |key, item|
317           copy.add_copy(item, key)
318         end
319         copy
320       end
321
322       protected
323
324       attr_reader :items
325
326       private
327
328       def get_or_new(key, klass)
329         # Return the collection item at `key` and ensure that it's a `klass`.
330         # If `key` does not exist, create a new `klass` there.
331         # If the value for `key` is not a `klass`, raise an ArgumentError.
332         item = items[key]
333         if item.nil?
334           items[key] = klass.new("#{path}/#{key}")
335         elsif not item.is_a?(klass)
336           raise ArgumentError.
337             new("in stream %p, %p is a %s, not a %s" %
338                 [path, key, items[key].class.human_name, klass.human_name])
339         else
340           item
341         end
342       end
343     end
344
345     class StreamManifest
346       # Build a manifest text for a single stream, without substreams.
347
348       def initialize(name)
349         @name = name
350         @locators = []
351         @loc_sizes = []
352         @file_specs = []
353       end
354
355       def add_file(coll_file)
356         coll_file.each_range do |range|
357           add(coll_file.name, *range)
358         end
359       end
360
361       def to_s
362         if @file_specs.empty?
363           ""
364         else
365           "%s %s %s\n" % [escape_name(@name), @locators.join(" "),
366                           @file_specs.join(" ")]
367         end
368       end
369
370       private
371
372       def add(file_name, loc_a, file_start, file_len)
373         # Ensure that the locators in loc_a appear in this locator in sequence,
374         # adding as few as possible.  Save a new file spec based on those
375         # locators' position.
376         loc_size = @locators.size
377         add_size = loc_a.size
378         loc_ii = 0
379         add_ii = 0
380         while (loc_ii < loc_size) and (add_ii < add_size)
381           if @locators[loc_ii] == loc_a[add_ii]
382             add_ii += 1
383           else
384             add_ii = 0
385           end
386           loc_ii += 1
387         end
388         loc_ii -= add_ii
389         to_add = loc_a[add_ii, add_size] || []
390         @locators += to_add
391         @loc_sizes += to_add.map { |s| Keep::Locator.parse(s).size.to_i }
392         start = @loc_sizes[0, loc_ii].reduce(0, &:+) + file_start
393         @file_specs << "#{start}:#{file_len}:#{escape_name(file_name)}"
394       end
395
396       def escape_name(name)
397         name.gsub(/\\/, "\\\\\\\\").gsub(/\s/) do |s|
398           s.each_byte.map { |c| "\\%03o" % c }.join("")
399         end
400       end
401     end
402   end
403 end