4823: More tests and fixes for updating and merging from remote api record.
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import time
7
8 from collections import deque
9 from stat import *
10
11 from .arvfile import split, FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
12 from keep import *
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 from .safeapi import ThreadSafeApiCache
16 import config
17 import errors
18 import util
19 import events
20 from arvados.retry import retry_method
21
22 _logger = logging.getLogger('arvados.collection')
23
24 class CollectionBase(object):
25     def __enter__(self):
26         return self
27
28     def __exit__(self, exc_type, exc_value, traceback):
29         pass
30
31     def _my_keep(self):
32         if self._keep_client is None:
33             self._keep_client = KeepClient(api_client=self._api_client,
34                                            num_retries=self.num_retries)
35         return self._keep_client
36
37     def stripped_manifest(self):
38         """
39         Return the manifest for the current collection with all
40         non-portable hints (i.e., permission signatures and other
41         hints other than size hints) removed from the locators.
42         """
43         raw = self.manifest_text()
44         clean = []
45         for line in raw.split("\n"):
46             fields = line.split()
47             if fields:
48                 clean_fields = fields[:1] + [
49                     (re.sub(r'\+[^\d][^\+]*', '', x)
50                      if re.match(util.keep_locator_pattern, x)
51                      else x)
52                     for x in fields[1:]]
53                 clean += [' '.join(clean_fields), "\n"]
54         return ''.join(clean)
55
56
57 class CollectionReader(CollectionBase):
58     def __init__(self, manifest_locator_or_text, api_client=None,
59                  keep_client=None, num_retries=0):
60         """Instantiate a CollectionReader.
61
62         This class parses Collection manifests to provide a simple interface
63         to read its underlying files.
64
65         Arguments:
66         * manifest_locator_or_text: One of a Collection UUID, portable data
67           hash, or full manifest text.
68         * api_client: The API client to use to look up Collections.  If not
69           provided, CollectionReader will build one from available Arvados
70           configuration.
71         * keep_client: The KeepClient to use to download Collection data.
72           If not provided, CollectionReader will build one from available
73           Arvados configuration.
74         * num_retries: The default number of times to retry failed
75           service requests.  Default 0.  You may change this value
76           after instantiation, but note those changes may not
77           propagate to related objects like the Keep client.
78         """
79         self._api_client = api_client
80         self._keep_client = keep_client
81         self.num_retries = num_retries
82         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
83             self._manifest_locator = manifest_locator_or_text
84             self._manifest_text = None
85         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
86             self._manifest_locator = manifest_locator_or_text
87             self._manifest_text = None
88         elif re.match(util.manifest_pattern, manifest_locator_or_text):
89             self._manifest_text = manifest_locator_or_text
90             self._manifest_locator = None
91         else:
92             raise errors.ArgumentError(
93                 "Argument to CollectionReader must be a manifest or a collection UUID")
94         self._api_response = None
95         self._streams = None
96
97     def _populate_from_api_server(self):
98         # As in KeepClient itself, we must wait until the last
99         # possible moment to instantiate an API client, in order to
100         # avoid tripping up clients that don't have access to an API
101         # server.  If we do build one, make sure our Keep client uses
102         # it.  If instantiation fails, we'll fall back to the except
103         # clause, just like any other Collection lookup
104         # failure. Return an exception, or None if successful.
105         try:
106             if self._api_client is None:
107                 self._api_client = arvados.api('v1')
108                 self._keep_client = None  # Make a new one with the new api.
109             self._api_response = self._api_client.collections().get(
110                 uuid=self._manifest_locator).execute(
111                 num_retries=self.num_retries)
112             self._manifest_text = self._api_response['manifest_text']
113             return None
114         except Exception as e:
115             return e
116
117     def _populate_from_keep(self):
118         # Retrieve a manifest directly from Keep. This has a chance of
119         # working if [a] the locator includes a permission signature
120         # or [b] the Keep services are operating in world-readable
121         # mode. Return an exception, or None if successful.
122         try:
123             self._manifest_text = self._my_keep().get(
124                 self._manifest_locator, num_retries=self.num_retries)
125         except Exception as e:
126             return e
127
128     def _populate(self):
129         error_via_api = None
130         error_via_keep = None
131         should_try_keep = ((self._manifest_text is None) and
132                            util.keep_locator_pattern.match(
133                 self._manifest_locator))
134         if ((self._manifest_text is None) and
135             util.signed_locator_pattern.match(self._manifest_locator)):
136             error_via_keep = self._populate_from_keep()
137         if self._manifest_text is None:
138             error_via_api = self._populate_from_api_server()
139             if error_via_api is not None and not should_try_keep:
140                 raise error_via_api
141         if ((self._manifest_text is None) and
142             not error_via_keep and
143             should_try_keep):
144             # Looks like a keep locator, and we didn't already try keep above
145             error_via_keep = self._populate_from_keep()
146         if self._manifest_text is None:
147             # Nothing worked!
148             raise arvados.errors.NotFoundError(
149                 ("Failed to retrieve collection '{}' " +
150                  "from either API server ({}) or Keep ({})."
151                  ).format(
152                     self._manifest_locator,
153                     error_via_api,
154                     error_via_keep))
155         self._streams = [sline.split()
156                          for sline in self._manifest_text.split("\n")
157                          if sline]
158
159     def _populate_first(orig_func):
160         # Decorator for methods that read actual Collection data.
161         @functools.wraps(orig_func)
162         def populate_first_wrapper(self, *args, **kwargs):
163             if self._streams is None:
164                 self._populate()
165             return orig_func(self, *args, **kwargs)
166         return populate_first_wrapper
167
168     @_populate_first
169     def api_response(self):
170         """api_response() -> dict or None
171
172         Returns information about this Collection fetched from the API server.
173         If the Collection exists in Keep but not the API server, currently
174         returns None.  Future versions may provide a synthetic response.
175         """
176         return self._api_response
177
178     @_populate_first
179     def normalize(self):
180         # Rearrange streams
181         streams = {}
182         for s in self.all_streams():
183             for f in s.all_files():
184                 streamname, filename = split(s.name() + "/" + f.name())
185                 if streamname not in streams:
186                     streams[streamname] = {}
187                 if filename not in streams[streamname]:
188                     streams[streamname][filename] = []
189                 for r in f.segments:
190                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
191
192         self._streams = [normalize_stream(s, streams[s])
193                          for s in sorted(streams)]
194
195         # Regenerate the manifest text based on the normalized streams
196         self._manifest_text = ''.join(
197             [StreamReader(stream, keep=self._my_keep()).manifest_text()
198              for stream in self._streams])
199
200     @_populate_first
201     def open(self, streampath, filename=None):
202         """open(streampath[, filename]) -> file-like object
203
204         Pass in the path of a file to read from the Collection, either as a
205         single string or as two separate stream name and file name arguments.
206         This method returns a file-like object to read that file.
207         """
208         if filename is None:
209             streampath, filename = split(streampath)
210         keep_client = self._my_keep()
211         for stream_s in self._streams:
212             stream = StreamReader(stream_s, keep_client,
213                                   num_retries=self.num_retries)
214             if stream.name() == streampath:
215                 break
216         else:
217             raise ValueError("stream '{}' not found in Collection".
218                              format(streampath))
219         try:
220             return stream.files()[filename]
221         except KeyError:
222             raise ValueError("file '{}' not found in Collection stream '{}'".
223                              format(filename, streampath))
224
225     @_populate_first
226     def all_streams(self):
227         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
228                 for s in self._streams]
229
230     def all_files(self):
231         for s in self.all_streams():
232             for f in s.all_files():
233                 yield f
234
235     @_populate_first
236     def manifest_text(self, strip=False, normalize=False):
237         if normalize:
238             cr = CollectionReader(self.manifest_text())
239             cr.normalize()
240             return cr.manifest_text(strip=strip, normalize=False)
241         elif strip:
242             return self.stripped_manifest()
243         else:
244             return self._manifest_text
245
246
247 class _WriterFile(FileLikeObjectBase):
248     def __init__(self, coll_writer, name):
249         super(_WriterFile, self).__init__(name, 'wb')
250         self.dest = coll_writer
251
252     def close(self):
253         super(_WriterFile, self).close()
254         self.dest.finish_current_file()
255
256     @FileLikeObjectBase._before_close
257     def write(self, data):
258         self.dest.write(data)
259
260     @FileLikeObjectBase._before_close
261     def writelines(self, seq):
262         for data in seq:
263             self.write(data)
264
265     @FileLikeObjectBase._before_close
266     def flush(self):
267         self.dest.flush_data()
268
269
270 class CollectionWriter(CollectionBase):
271     def __init__(self, api_client=None, num_retries=0, replication=None):
272         """Instantiate a CollectionWriter.
273
274         CollectionWriter lets you build a new Arvados Collection from scratch.
275         Write files to it.  The CollectionWriter will upload data to Keep as
276         appropriate, and provide you with the Collection manifest text when
277         you're finished.
278
279         Arguments:
280         * api_client: The API client to use to look up Collections.  If not
281           provided, CollectionReader will build one from available Arvados
282           configuration.
283         * num_retries: The default number of times to retry failed
284           service requests.  Default 0.  You may change this value
285           after instantiation, but note those changes may not
286           propagate to related objects like the Keep client.
287         * replication: The number of copies of each block to store.
288           If this argument is None or not supplied, replication is
289           the server-provided default if available, otherwise 2.
290         """
291         self._api_client = api_client
292         self.num_retries = num_retries
293         self.replication = (2 if replication is None else replication)
294         self._keep_client = None
295         self._data_buffer = []
296         self._data_buffer_len = 0
297         self._current_stream_files = []
298         self._current_stream_length = 0
299         self._current_stream_locators = []
300         self._current_stream_name = '.'
301         self._current_file_name = None
302         self._current_file_pos = 0
303         self._finished_streams = []
304         self._close_file = None
305         self._queued_file = None
306         self._queued_dirents = deque()
307         self._queued_trees = deque()
308         self._last_open = None
309
310     def __exit__(self, exc_type, exc_value, traceback):
311         if exc_type is None:
312             self.finish()
313
314     def do_queued_work(self):
315         # The work queue consists of three pieces:
316         # * _queued_file: The file object we're currently writing to the
317         #   Collection.
318         # * _queued_dirents: Entries under the current directory
319         #   (_queued_trees[0]) that we want to write or recurse through.
320         #   This may contain files from subdirectories if
321         #   max_manifest_depth == 0 for this directory.
322         # * _queued_trees: Directories that should be written as separate
323         #   streams to the Collection.
324         # This function handles the smallest piece of work currently queued
325         # (current file, then current directory, then next directory) until
326         # no work remains.  The _work_THING methods each do a unit of work on
327         # THING.  _queue_THING methods add a THING to the work queue.
328         while True:
329             if self._queued_file:
330                 self._work_file()
331             elif self._queued_dirents:
332                 self._work_dirents()
333             elif self._queued_trees:
334                 self._work_trees()
335             else:
336                 break
337
338     def _work_file(self):
339         while True:
340             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
341             if not buf:
342                 break
343             self.write(buf)
344         self.finish_current_file()
345         if self._close_file:
346             self._queued_file.close()
347         self._close_file = None
348         self._queued_file = None
349
350     def _work_dirents(self):
351         path, stream_name, max_manifest_depth = self._queued_trees[0]
352         if stream_name != self.current_stream_name():
353             self.start_new_stream(stream_name)
354         while self._queued_dirents:
355             dirent = self._queued_dirents.popleft()
356             target = os.path.join(path, dirent)
357             if os.path.isdir(target):
358                 self._queue_tree(target,
359                                  os.path.join(stream_name, dirent),
360                                  max_manifest_depth - 1)
361             else:
362                 self._queue_file(target, dirent)
363                 break
364         if not self._queued_dirents:
365             self._queued_trees.popleft()
366
367     def _work_trees(self):
368         path, stream_name, max_manifest_depth = self._queued_trees[0]
369         d = util.listdir_recursive(
370             path, max_depth = (None if max_manifest_depth == 0 else 0))
371         if d:
372             self._queue_dirents(stream_name, d)
373         else:
374             self._queued_trees.popleft()
375
376     def _queue_file(self, source, filename=None):
377         assert (self._queued_file is None), "tried to queue more than one file"
378         if not hasattr(source, 'read'):
379             source = open(source, 'rb')
380             self._close_file = True
381         else:
382             self._close_file = False
383         if filename is None:
384             filename = os.path.basename(source.name)
385         self.start_new_file(filename)
386         self._queued_file = source
387
388     def _queue_dirents(self, stream_name, dirents):
389         assert (not self._queued_dirents), "tried to queue more than one tree"
390         self._queued_dirents = deque(sorted(dirents))
391
392     def _queue_tree(self, path, stream_name, max_manifest_depth):
393         self._queued_trees.append((path, stream_name, max_manifest_depth))
394
395     def write_file(self, source, filename=None):
396         self._queue_file(source, filename)
397         self.do_queued_work()
398
399     def write_directory_tree(self,
400                              path, stream_name='.', max_manifest_depth=-1):
401         self._queue_tree(path, stream_name, max_manifest_depth)
402         self.do_queued_work()
403
404     def write(self, newdata):
405         if hasattr(newdata, '__iter__'):
406             for s in newdata:
407                 self.write(s)
408             return
409         self._data_buffer.append(newdata)
410         self._data_buffer_len += len(newdata)
411         self._current_stream_length += len(newdata)
412         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
413             self.flush_data()
414
415     def open(self, streampath, filename=None):
416         """open(streampath[, filename]) -> file-like object
417
418         Pass in the path of a file to write to the Collection, either as a
419         single string or as two separate stream name and file name arguments.
420         This method returns a file-like object you can write to add it to the
421         Collection.
422
423         You may only have one file object from the Collection open at a time,
424         so be sure to close the object when you're done.  Using the object in
425         a with statement makes that easy::
426
427           with cwriter.open('./doc/page1.txt') as outfile:
428               outfile.write(page1_data)
429           with cwriter.open('./doc/page2.txt') as outfile:
430               outfile.write(page2_data)
431         """
432         if filename is None:
433             streampath, filename = split(streampath)
434         if self._last_open and not self._last_open.closed:
435             raise errors.AssertionError(
436                 "can't open '{}' when '{}' is still open".format(
437                     filename, self._last_open.name))
438         if streampath != self.current_stream_name():
439             self.start_new_stream(streampath)
440         self.set_current_file_name(filename)
441         self._last_open = _WriterFile(self, filename)
442         return self._last_open
443
444     def flush_data(self):
445         data_buffer = ''.join(self._data_buffer)
446         if data_buffer:
447             self._current_stream_locators.append(
448                 self._my_keep().put(
449                     data_buffer[0:config.KEEP_BLOCK_SIZE],
450                     copies=self.replication))
451             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
452             self._data_buffer_len = len(self._data_buffer[0])
453
454     def start_new_file(self, newfilename=None):
455         self.finish_current_file()
456         self.set_current_file_name(newfilename)
457
458     def set_current_file_name(self, newfilename):
459         if re.search(r'[\t\n]', newfilename):
460             raise errors.AssertionError(
461                 "Manifest filenames cannot contain whitespace: %s" %
462                 newfilename)
463         elif re.search(r'\x00', newfilename):
464             raise errors.AssertionError(
465                 "Manifest filenames cannot contain NUL characters: %s" %
466                 newfilename)
467         self._current_file_name = newfilename
468
469     def current_file_name(self):
470         return self._current_file_name
471
472     def finish_current_file(self):
473         if self._current_file_name is None:
474             if self._current_file_pos == self._current_stream_length:
475                 return
476             raise errors.AssertionError(
477                 "Cannot finish an unnamed file " +
478                 "(%d bytes at offset %d in '%s' stream)" %
479                 (self._current_stream_length - self._current_file_pos,
480                  self._current_file_pos,
481                  self._current_stream_name))
482         self._current_stream_files.append([
483                 self._current_file_pos,
484                 self._current_stream_length - self._current_file_pos,
485                 self._current_file_name])
486         self._current_file_pos = self._current_stream_length
487         self._current_file_name = None
488
489     def start_new_stream(self, newstreamname='.'):
490         self.finish_current_stream()
491         self.set_current_stream_name(newstreamname)
492
493     def set_current_stream_name(self, newstreamname):
494         if re.search(r'[\t\n]', newstreamname):
495             raise errors.AssertionError(
496                 "Manifest stream names cannot contain whitespace")
497         self._current_stream_name = '.' if newstreamname=='' else newstreamname
498
499     def current_stream_name(self):
500         return self._current_stream_name
501
502     def finish_current_stream(self):
503         self.finish_current_file()
504         self.flush_data()
505         if not self._current_stream_files:
506             pass
507         elif self._current_stream_name is None:
508             raise errors.AssertionError(
509                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
510                 (self._current_stream_length, len(self._current_stream_files)))
511         else:
512             if not self._current_stream_locators:
513                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
514             self._finished_streams.append([self._current_stream_name,
515                                            self._current_stream_locators,
516                                            self._current_stream_files])
517         self._current_stream_files = []
518         self._current_stream_length = 0
519         self._current_stream_locators = []
520         self._current_stream_name = None
521         self._current_file_pos = 0
522         self._current_file_name = None
523
524     def finish(self):
525         """Store the manifest in Keep and return its locator.
526
527         This is useful for storing manifest fragments (task outputs)
528         temporarily in Keep during a Crunch job.
529
530         In other cases you should make a collection instead, by
531         sending manifest_text() to the API server's "create
532         collection" endpoint.
533         """
534         return self._my_keep().put(self.manifest_text(), copies=self.replication)
535
536     def portable_data_hash(self):
537         stripped = self.stripped_manifest()
538         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
539
540     def manifest_text(self):
541         self.finish_current_stream()
542         manifest = ''
543
544         for stream in self._finished_streams:
545             if not re.search(r'^\.(/.*)?$', stream[0]):
546                 manifest += './'
547             manifest += stream[0].replace(' ', '\\040')
548             manifest += ' ' + ' '.join(stream[1])
549             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
550             manifest += "\n"
551
552         return manifest
553
554     def data_locators(self):
555         ret = []
556         for name, locators, files in self._finished_streams:
557             ret += locators
558         return ret
559
560
561 class ResumableCollectionWriter(CollectionWriter):
562     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
563                    '_current_stream_locators', '_current_stream_name',
564                    '_current_file_name', '_current_file_pos', '_close_file',
565                    '_data_buffer', '_dependencies', '_finished_streams',
566                    '_queued_dirents', '_queued_trees']
567
568     def __init__(self, api_client=None, **kwargs):
569         self._dependencies = {}
570         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
571
572     @classmethod
573     def from_state(cls, state, *init_args, **init_kwargs):
574         # Try to build a new writer from scratch with the given state.
575         # If the state is not suitable to resume (because files have changed,
576         # been deleted, aren't predictable, etc.), raise a
577         # StaleWriterStateError.  Otherwise, return the initialized writer.
578         # The caller is responsible for calling writer.do_queued_work()
579         # appropriately after it's returned.
580         writer = cls(*init_args, **init_kwargs)
581         for attr_name in cls.STATE_PROPS:
582             attr_value = state[attr_name]
583             attr_class = getattr(writer, attr_name).__class__
584             # Coerce the value into the same type as the initial value, if
585             # needed.
586             if attr_class not in (type(None), attr_value.__class__):
587                 attr_value = attr_class(attr_value)
588             setattr(writer, attr_name, attr_value)
589         # Check dependencies before we try to resume anything.
590         if any(KeepLocator(ls).permission_expired()
591                for ls in writer._current_stream_locators):
592             raise errors.StaleWriterStateError(
593                 "locators include expired permission hint")
594         writer.check_dependencies()
595         if state['_current_file'] is not None:
596             path, pos = state['_current_file']
597             try:
598                 writer._queued_file = open(path, 'rb')
599                 writer._queued_file.seek(pos)
600             except IOError as error:
601                 raise errors.StaleWriterStateError(
602                     "failed to reopen active file {}: {}".format(path, error))
603         return writer
604
605     def check_dependencies(self):
606         for path, orig_stat in self._dependencies.items():
607             if not S_ISREG(orig_stat[ST_MODE]):
608                 raise errors.StaleWriterStateError("{} not file".format(path))
609             try:
610                 now_stat = tuple(os.stat(path))
611             except OSError as error:
612                 raise errors.StaleWriterStateError(
613                     "failed to stat {}: {}".format(path, error))
614             if ((not S_ISREG(now_stat[ST_MODE])) or
615                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
616                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
617                 raise errors.StaleWriterStateError("{} changed".format(path))
618
619     def dump_state(self, copy_func=lambda x: x):
620         state = {attr: copy_func(getattr(self, attr))
621                  for attr in self.STATE_PROPS}
622         if self._queued_file is None:
623             state['_current_file'] = None
624         else:
625             state['_current_file'] = (os.path.realpath(self._queued_file.name),
626                                       self._queued_file.tell())
627         return state
628
629     def _queue_file(self, source, filename=None):
630         try:
631             src_path = os.path.realpath(source)
632         except Exception:
633             raise errors.AssertionError("{} not a file path".format(source))
634         try:
635             path_stat = os.stat(src_path)
636         except OSError as stat_error:
637             path_stat = None
638         super(ResumableCollectionWriter, self)._queue_file(source, filename)
639         fd_stat = os.fstat(self._queued_file.fileno())
640         if not S_ISREG(fd_stat.st_mode):
641             # We won't be able to resume from this cache anyway, so don't
642             # worry about further checks.
643             self._dependencies[source] = tuple(fd_stat)
644         elif path_stat is None:
645             raise errors.AssertionError(
646                 "could not stat {}: {}".format(source, stat_error))
647         elif path_stat.st_ino != fd_stat.st_ino:
648             raise errors.AssertionError(
649                 "{} changed between open and stat calls".format(source))
650         else:
651             self._dependencies[src_path] = tuple(fd_stat)
652
653     def write(self, data):
654         if self._queued_file is None:
655             raise errors.AssertionError(
656                 "resumable writer can't accept unsourced data")
657         return super(ResumableCollectionWriter, self).write(data)
658
659 ADD = "add"
660 DEL = "del"
661 MOD = "mod"
662 FILE = "file"
663 COLLECTION = "collection"
664
665 class SynchronizedCollectionBase(CollectionBase):
666     """Base class for Collections and Subcollections.
667
668     Implements the majority of functionality relating to accessing items in the
669     Collection.
670
671     """
672
673     def __init__(self, parent=None):
674         self.parent = parent
675         self._modified = True
676         self._items = {}
677
678     def _my_api(self):
679         raise NotImplementedError()
680
681     def _my_keep(self):
682         raise NotImplementedError()
683
684     def _my_block_manager(self):
685         raise NotImplementedError()
686
687     def _populate(self):
688         raise NotImplementedError()
689
690     def sync_mode(self):
691         raise NotImplementedError()
692
693     def root_collection(self):
694         raise NotImplementedError()
695
696     def notify(self, event, collection, name, item):
697         raise NotImplementedError()
698
699     @must_be_writable
700     @synchronized
701     def find_or_create(self, path, create_type):
702         """Recursively search the specified file path.
703
704         May return either a `Collection` or `ArvadosFile`.  If not found, will
705         create a new item at the specified path based on `create_type`.  Will
706         create intermediate subcollections needed to contain the final item in
707         the path.
708
709         :create_type:
710           One of `arvado.collection.FILE` or
711           `arvado.collection.COLLECTION`.  If the path is not found, and value
712           of create_type is FILE then create and return a new ArvadosFile for
713           the last path component.  If COLLECTION, then create and return a new
714           Collection for the last path component.
715
716         """
717
718         if self.sync_mode() == SYNC_READONLY:
719             raise IOError((errno.EROFS, "Collection is read only"))
720
721         pathcomponents = path.split("/")
722         if pathcomponents[0] == '.':
723             del pathcomponents[0]
724
725         if pathcomponents and pathcomponents[0]:
726             item = self._items.get(pathcomponents[0])
727             if len(pathcomponents) == 1:
728                 # item must be a file
729                 if item is None:
730                     # create new file
731                     if create_type == COLLECTION:
732                         item = Subcollection(self)
733                     else:
734                         item = ArvadosFile(self)
735                     self._items[pathcomponents[0]] = item
736                     self._modified = True
737                     self.notify(ADD, self, pathcomponents[0], item)
738                 return item
739             else:
740                 if item is None:
741                     # create new collection
742                     item = Subcollection(self)
743                     self._items[pathcomponents[0]] = item
744                     self._modified = True
745                     self.notify(ADD, self, pathcomponents[0], item)
746                 del pathcomponents[0]
747                 if isinstance(item, SynchronizedCollectionBase):
748                     return item.find_or_create("/".join(pathcomponents), create_type)
749                 else:
750                     raise errors.ArgumentError("Interior path components must be subcollection")
751         else:
752             return self
753
754     @synchronized
755     def find(self, path):
756         """Recursively search the specified file path.
757
758         May return either a Collection or ArvadosFile.  Return None if not
759         found.
760
761         """
762         pathcomponents = path.split("/")
763         if pathcomponents[0] == '.':
764             del pathcomponents[0]
765
766         if pathcomponents and pathcomponents[0]:
767             item = self._items.get(pathcomponents[0])
768             if len(pathcomponents) == 1:
769                 # item must be a file
770                 return item
771             else:
772                 del pathcomponents[0]
773                 if isinstance(item, SynchronizedCollectionBase):
774                     return item.find("/".join(pathcomponents))
775                 else:
776                     raise errors.ArgumentError("Interior path components must be subcollection")
777         else:
778             return self
779
780     def mkdirs(path):
781         """Recursive subcollection create.
782
783         Like `os.mkdirs()`.  Will create intermediate subcollections needed to
784         contain the leaf subcollection path.
785
786         """
787         return self.find_or_create(path, COLLECTION)
788
789     def open(self, path, mode):
790         """Open a file-like object for access.
791
792         :path:
793           path to a file in the collection
794         :mode:
795           one of "r", "r+", "w", "w+", "a", "a+"
796           :"r":
797             opens for reading
798           :"r+":
799             opens for reading and writing.  Reads/writes share a file pointer.
800           :"w", "w+":
801             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
802           :"a", "a+":
803             opens for reading and writing.  All writes are appended to
804             the end of the file.  Writing does not affect the file pointer for
805             reading.
806         """
807         mode = mode.replace("b", "")
808         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
809             raise ArgumentError("Bad mode '%s'" % mode)
810         create = (mode != "r")
811
812         if create and self.sync_mode() == SYNC_READONLY:
813             raise IOError((errno.EROFS, "Collection is read only"))
814
815         if create:
816             arvfile = self.find_or_create(path, FILE)
817         else:
818             arvfile = self.find(path)
819
820         if arvfile is None:
821             raise IOError((errno.ENOENT, "File not found"))
822         if not isinstance(arvfile, ArvadosFile):
823             raise IOError((errno.EISDIR, "Path must refer to a file."))
824
825         if mode[0] == "w":
826             arvfile.truncate(0)
827
828         if mode == "r":
829             return ArvadosFileReader(arvfile, path, mode, num_retries=self.num_retries)
830         else:
831             return ArvadosFileWriter(arvfile, path, mode, num_retries=self.num_retries)
832
833     @synchronized
834     def modified(self):
835         """Test if the collection (or any subcollection or file) has been modified
836         since it was created."""
837         if self._modified:
838             return True
839         for k,v in self._items.items():
840             if v.modified():
841                 return True
842         return False
843
844     @synchronized
845     def set_unmodified(self):
846         """Recursively clear modified flag."""
847         self._modified = False
848         for k,v in self._items.items():
849             v.set_unmodified()
850
851     @synchronized
852     def __iter__(self):
853         """Iterate over names of files and collections contained in this collection."""
854         return iter(self._items.keys())
855
856     @synchronized
857     def iterkeys(self):
858         """Iterate over names of files and collections directly contained in this collection."""
859         return self._items.keys()
860
861     @synchronized
862     def __getitem__(self, k):
863         """Get a file or collection that is directly contained by this collection.  If
864         you want to search a path, use `find()` instead.
865         """
866         return self._items[k]
867
868     @synchronized
869     def __contains__(self, k):
870         """If there is a file or collection a directly contained by this collection
871         with name `k`."""
872         return k in self._items
873
874     @synchronized
875     def __len__(self):
876         """Get the number of items directly contained in this collection."""
877         return len(self._items)
878
879     @must_be_writable
880     @synchronized
881     def __delitem__(self, p):
882         """Delete an item by name which is directly contained by this collection."""
883         del self._items[p]
884         self._modified = True
885         self.notify(DEL, self, p, None)
886
887     @synchronized
888     def keys(self):
889         """Get a list of names of files and collections directly contained in this collection."""
890         return self._items.keys()
891
892     @synchronized
893     def values(self):
894         """Get a list of files and collection objects directly contained in this collection."""
895         return self._items.values()
896
897     @synchronized
898     def items(self):
899         """Get a list of (name, object) tuples directly contained in this collection."""
900         return self._items.items()
901
902     def exists(self, path):
903         """Test if there is a file or collection at `path`."""
904         return self.find(path) != None
905
906     @must_be_writable
907     @synchronized
908     def remove(self, path, recursive=False):
909         """Remove the file or subcollection (directory) at `path`.
910
911         :recursive:
912           Specify whether to remove non-empty subcollections (True), or raise an error (False).
913         """
914         pathcomponents = path.split("/")
915         if pathcomponents[0] == '.':
916             # Remove '.' from the front of the path
917             del pathcomponents[0]
918
919         if len(pathcomponents) > 0:
920             item = self._items.get(pathcomponents[0])
921             if item is None:
922                 raise IOError((errno.ENOENT, "File not found"))
923             if len(pathcomponents) == 1:
924                 if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
925                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
926                 deleteditem = self._items[pathcomponents[0]]
927                 del self._items[pathcomponents[0]]
928                 self._modified = True
929                 self.notify(DEL, self, pathcomponents[0], deleteditem)
930             else:
931                 del pathcomponents[0]
932                 item.remove("/".join(pathcomponents))
933         else:
934             raise IOError((errno.ENOENT, "File not found"))
935
936     def _cloneinto(self, target):
937         for k,v in self._items.items():
938             target._items[k] = v.clone(target)
939
940     def clone(self):
941         raise NotImplementedError()
942
943     @must_be_writable
944     @synchronized
945     def copy(self, source, target_path, source_collection=None, overwrite=False):
946         """Copy a file or subcollection to a new path in this collection.
947
948         :source:
949           An ArvadosFile, Subcollection, or string with a path to source file or subcollection
950
951         :target_path:
952           Destination file or path.  If the target path already exists and is a
953           subcollection, the item will be placed inside the subcollection.  If
954           the target path already exists and is a file, this will raise an error
955           unless you specify `overwrite=True`.
956
957         :source_collection:
958           Collection to copy `source_path` from (default `self`)
959
960         :overwrite:
961           Whether to overwrite target file if it already exists.
962         """
963         if source_collection is None:
964             source_collection = self
965
966         # Find the object to copy
967         if isinstance(source, basestring):
968             source_obj = source_collection.find(source)
969             if source_obj is None:
970                 raise IOError((errno.ENOENT, "File not found"))
971             sourcecomponents = source.split("/")
972         else:
973             source_obj = source
974             sourcecomponents = None
975
976         # Find parent collection the target path
977         targetcomponents = target_path.split("/")
978
979         # Determine the name to use.
980         target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
981
982         if not target_name:
983             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
984
985         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
986
987         with target_dir.lock:
988             if target_name in target_dir:
989                 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
990                     target_dir = target_dir[target_name]
991                     target_name = sourcecomponents[-1]
992                 elif not overwrite:
993                     raise IOError((errno.EEXIST, "File already exists"))
994
995             modified_from = None
996             if target_name in target_dir:
997                 modified_from = target_dir[target_name]
998
999             # Actually make the copy.
1000             dup = source_obj.clone(target_dir)
1001             target_dir._items[target_name] = dup
1002             target_dir._modified = True
1003
1004         if modified_from:
1005             self.notify(MOD, target_dir, target_name, (modified_from, dup))
1006         else:
1007             self.notify(ADD, target_dir, target_name, dup)
1008
1009     @synchronized
1010     def manifest_text(self, strip=False, normalize=False):
1011         """Get the manifest text for this collection, sub collections and files.
1012
1013         :strip:
1014           If True, remove signing tokens from block locators if present.
1015           If False, block locators are left unchanged.
1016
1017         :normalize:
1018           If True, always export the manifest text in normalized form
1019           even if the Collection is not modified.  If False and the collection
1020           is not modified, return the original manifest text even if it is not
1021           in normalized form.
1022
1023         """
1024         if self.modified() or self._manifest_text is None or normalize:
1025             return export_manifest(self, stream_name=".", portable_locators=strip)
1026         else:
1027             if strip:
1028                 return self.stripped_manifest()
1029             else:
1030                 return self._manifest_text
1031
1032     @synchronized
1033     def diff(self, end_collection, prefix=".", holding_collection=None):
1034         """
1035         Generate list of add/modify/delete actions which, when given to `apply`, will
1036         change `self` to match `end_collection`
1037         """
1038         changes = []
1039         if holding_collection is None:
1040             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_EXPLICIT)
1041         for k in self:
1042             if k not in end_collection:
1043                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
1044         for k in end_collection:
1045             if k in self:
1046                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1047                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1048                 elif end_collection[k] != self[k]:
1049                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
1050             else:
1051                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
1052         return changes
1053
1054     @must_be_writable
1055     @synchronized
1056     def apply(self, changes):
1057         """Apply changes from `diff`.
1058
1059         If a change conflicts with a local change, it will be saved to an
1060         alternate path indicating the conflict.
1061
1062         """
1063         for change in changes:
1064             event_type = change[0]
1065             path = change[1]
1066             initial = change[2]
1067             local = self.find(path)
1068             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
1069                                                                     time.gmtime()))
1070             if event_type == ADD:
1071                 if local is None:
1072                     # No local file at path, safe to copy over new file
1073                     self.copy(initial, path)
1074                 elif local is not None and local != initial:
1075                     # There is already local file and it is different:
1076                     # save change to conflict file.
1077                     self.copy(initial, conflictpath)
1078             elif event_type == MOD:
1079                 final = change[3]
1080                 if local == initial:
1081                     # Local matches the "initial" item so it has not
1082                     # changed locally and is safe to update.
1083                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1084                         # Replace contents of local file with new contents
1085                         local.replace_contents(final)
1086                     else:
1087                         # Overwrite path with new item; this can happen if
1088                         # path was a file and is now a collection or vice versa
1089                         self.copy(final, path, overwrite=True)
1090                 else:
1091                     # Local is missing (presumably deleted) or local doesn't
1092                     # match the "start" value, so save change to conflict file
1093                     self.copy(final, conflictpath)
1094             elif event_type == DEL:
1095                 if local == initial:
1096                     # Local item matches "initial" value, so it is safe to remove.
1097                     self.remove(path, recursive=True)
1098                 # else, the file is modified or already removed, in either
1099                 # case we don't want to try to remove it.
1100
1101     def portable_data_hash(self):
1102         """Get the portable data hash for this collection's manifest."""
1103         stripped = self.manifest_text(strip=True)
1104         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1105
1106     @synchronized
1107     def __eq__(self, other):
1108         if other is self:
1109             return True
1110         if not isinstance(other, SynchronizedCollectionBase):
1111             return False
1112         if len(self._items) != len(other):
1113             return False
1114         for k in self._items:
1115             if k not in other:
1116                 return False
1117             if self._items[k] != other[k]:
1118                 return False
1119         return True
1120
1121     def __ne__(self, other):
1122         return not self.__eq__(other)
1123
1124 class Collection(SynchronizedCollectionBase):
1125     """Represents the root of an Arvados Collection, which may be associated with
1126     an API server Collection record.
1127
1128     Brief summary of useful methods:
1129
1130     :To read an existing file:
1131       `c.open("myfile", "r")`
1132
1133     :To write a new file:
1134       `c.open("myfile", "w")`
1135
1136     :To determine if a file exists:
1137       `c.find("myfile") is not None`
1138
1139     :To copy a file:
1140       `c.copy("source", "dest")`
1141
1142     :To delete a file:
1143       `c.remove("myfile")`
1144
1145     :To save to an existing collection record:
1146       `c.save()`
1147
1148     :To save a new collection record:
1149     `c.save_new()`
1150
1151     :To merge remote changes into this object:
1152       `c.update()`
1153
1154     This class is threadsafe.  The root collection object, all subcollections
1155     and files are protected by a single lock (i.e. each access locks the entire
1156     collection).
1157
1158     """
1159
1160     def __init__(self, manifest_locator_or_text=None,
1161                  parent=None,
1162                  apiconfig=None,
1163                  api_client=None,
1164                  keep_client=None,
1165                  num_retries=None,
1166                  block_manager=None,
1167                  sync=None):
1168         """Collection constructor.
1169
1170         :manifest_locator_or_text:
1171           One of Arvados collection UUID, block locator of
1172           a manifest, raw manifest text, or None (to create an empty collection).
1173         :parent:
1174           the parent Collection, may be None.
1175         :apiconfig:
1176           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1177           Prefer this over supplying your own api_client and keep_client (except in testing).
1178           Will use default config settings if not specified.
1179         :api_client:
1180           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1181         :keep_client:
1182           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1183         :num_retries:
1184           the number of retries for API and Keep requests.
1185         :block_manager:
1186           the block manager to use.  If not specified, create one.
1187         :sync:
1188           Set synchronization policy with API server collection record.
1189           :SYNC_READONLY:
1190             Collection is read only.  No synchronization.  This mode will
1191             also forego locking, which gives better performance.
1192           :SYNC_EXPLICIT:
1193             Collection is writable.  Synchronize on explicit request via `update()` or `save()`
1194           :SYNC_LIVE:
1195             Collection is writable.  Synchronize with server in response to
1196             background websocket events, on block write, or on file close.
1197
1198         """
1199         super(Collection, self).__init__(parent)
1200         self._api_client = api_client
1201         self._keep_client = keep_client
1202         self._block_manager = block_manager
1203
1204         if apiconfig:
1205             self._config = apiconfig
1206         else:
1207             self._config = config.settings()
1208
1209         self.num_retries = num_retries if num_retries is not None else 2
1210         self._manifest_locator = None
1211         self._manifest_text = None
1212         self._api_response = None
1213
1214         if sync is None:
1215             raise errors.ArgumentError("Must specify sync mode")
1216
1217         self._sync = sync
1218         self.lock = threading.RLock()
1219         self.callbacks = []
1220         self.events = None
1221
1222         if manifest_locator_or_text:
1223             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1224                 self._manifest_locator = manifest_locator_or_text
1225             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1226                 self._manifest_locator = manifest_locator_or_text
1227             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1228                 self._manifest_text = manifest_locator_or_text
1229             else:
1230                 raise errors.ArgumentError(
1231                     "Argument to CollectionReader must be a manifest or a collection UUID")
1232
1233             self._populate()
1234             self._subscribe_events()
1235
1236
1237     def root_collection(self):
1238         return self
1239
1240     def sync_mode(self):
1241         return self._sync
1242
1243     def _subscribe_events(self):
1244         if self._sync == SYNC_LIVE and self.events is None:
1245             if not self._has_collection_uuid():
1246                 raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
1247             self.events = events.subscribe(arvados.api(apiconfig=self._config),
1248                                            [["object_uuid", "=", self._manifest_locator]],
1249                                            self.on_message)
1250
1251     def on_message(self, event):
1252         if event.get("object_uuid") == self._manifest_locator:
1253             self.update()
1254
1255     @synchronized
1256     @retry_method
1257     def update(self, other=None, num_retries=None):
1258         """Fetch the latest collection record on the API server and merge it with the
1259         current collection contents.
1260
1261         """
1262         if other is None:
1263             if self._manifest_locator is None:
1264                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1265             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1266             other = import_manifest(response["manifest_text"])
1267         baseline = import_manifest(self._manifest_text)
1268         self.apply(baseline.diff(other))
1269
1270     @synchronized
1271     def _my_api(self):
1272         if self._api_client is None:
1273             self._api_client = ThreadSafeApiCache(self._config)
1274             self._keep_client = self._api_client.keep
1275         return self._api_client
1276
1277     @synchronized
1278     def _my_keep(self):
1279         if self._keep_client is None:
1280             if self._api_client is None:
1281                 self._my_api()
1282             else:
1283                 self._keep_client = KeepClient(api=self._api_client)
1284         return self._keep_client
1285
1286     @synchronized
1287     def _my_block_manager(self):
1288         if self._block_manager is None:
1289             self._block_manager = BlockManager(self._my_keep())
1290         return self._block_manager
1291
1292     def _populate_from_api_server(self):
1293         # As in KeepClient itself, we must wait until the last
1294         # possible moment to instantiate an API client, in order to
1295         # avoid tripping up clients that don't have access to an API
1296         # server.  If we do build one, make sure our Keep client uses
1297         # it.  If instantiation fails, we'll fall back to the except
1298         # clause, just like any other Collection lookup
1299         # failure. Return an exception, or None if successful.
1300         try:
1301             self._api_response = self._my_api().collections().get(
1302                 uuid=self._manifest_locator).execute(
1303                     num_retries=self.num_retries)
1304             self._manifest_text = self._api_response['manifest_text']
1305             return None
1306         except Exception as e:
1307             return e
1308
1309     def _populate_from_keep(self):
1310         # Retrieve a manifest directly from Keep. This has a chance of
1311         # working if [a] the locator includes a permission signature
1312         # or [b] the Keep services are operating in world-readable
1313         # mode. Return an exception, or None if successful.
1314         try:
1315             self._manifest_text = self._my_keep().get(
1316                 self._manifest_locator, num_retries=self.num_retries)
1317         except Exception as e:
1318             return e
1319
1320     def _populate(self):
1321         if self._manifest_locator is None and self._manifest_text is None:
1322             return
1323         error_via_api = None
1324         error_via_keep = None
1325         should_try_keep = ((self._manifest_text is None) and
1326                            util.keep_locator_pattern.match(
1327                                self._manifest_locator))
1328         if ((self._manifest_text is None) and
1329             util.signed_locator_pattern.match(self._manifest_locator)):
1330             error_via_keep = self._populate_from_keep()
1331         if self._manifest_text is None:
1332             error_via_api = self._populate_from_api_server()
1333             if error_via_api is not None and not should_try_keep:
1334                 raise error_via_api
1335         if ((self._manifest_text is None) and
1336             not error_via_keep and
1337             should_try_keep):
1338             # Looks like a keep locator, and we didn't already try keep above
1339             error_via_keep = self._populate_from_keep()
1340         if self._manifest_text is None:
1341             # Nothing worked!
1342             raise arvados.errors.NotFoundError(
1343                 ("Failed to retrieve collection '{}' " +
1344                  "from either API server ({}) or Keep ({})."
1345                  ).format(
1346                     self._manifest_locator,
1347                     error_via_api,
1348                     error_via_keep))
1349         # populate
1350         self._baseline_manifest = self._manifest_text
1351         import_manifest(self._manifest_text, self)
1352
1353         if self._sync == SYNC_READONLY:
1354             # Now that we're populated, knowing that this will be readonly,
1355             # forego any further locking.
1356             self.lock = NoopLock()
1357
1358     def _has_collection_uuid(self):
1359         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1360
1361     def __enter__(self):
1362         return self
1363
1364     def __exit__(self, exc_type, exc_value, traceback):
1365         """Support scoped auto-commit in a with: block."""
1366         if self._sync != SYNC_READONLY and self._has_collection_uuid():
1367             self.save()
1368         if self._block_manager is not None:
1369             self._block_manager.stop_threads()
1370
1371     @synchronized
1372     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1373         if new_config is None:
1374             new_config = self._config
1375         newcollection = Collection(parent=new_parent, apiconfig=new_config, sync=SYNC_EXPLICIT)
1376         if new_sync == SYNC_READONLY:
1377             newcollection.lock = NoopLock()
1378         self._cloneinto(newcollection)
1379         newcollection._sync = new_sync
1380         return newcollection
1381
1382     @synchronized
1383     def api_response(self):
1384         """Returns information about this Collection fetched from the API server.
1385
1386         If the Collection exists in Keep but not the API server, currently
1387         returns None.  Future versions may provide a synthetic response.
1388
1389         """
1390         return self._api_response
1391
1392     @must_be_writable
1393     @synchronized
1394     @retry_method
1395     def save(self, merge=True, num_retries=None):
1396         """Commit pending buffer blocks to Keep, merge with remote record (if
1397         update=True), write the manifest to Keep, and update the collection
1398         record.
1399
1400         Will raise AssertionError if not associated with a collection record on
1401         the API server.  If you want to save a manifest to Keep only, see
1402         `save_new()`.
1403
1404         :update:
1405           Update and merge remote changes before saving.  Otherwise, any
1406           remote changes will be ignored and overwritten.
1407
1408         """
1409         if self.modified():
1410             if not self._has_collection_uuid():
1411                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
1412             self._my_block_manager().commit_all()
1413             if merge:
1414                 self.update()
1415             self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1416
1417             text = self.manifest_text(strip=False)
1418             self._api_response = self._my_api().collections().update(
1419                 uuid=self._manifest_locator,
1420                 body={'manifest_text': text}
1421                 ).execute(
1422                     num_retries=num_retries)
1423             self._manifest_text = text
1424             self.set_unmodified()
1425
1426     @must_be_writable
1427     @synchronized
1428     @retry_method
1429     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1430         """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
1431         a new collection record (if create_collection_record True).
1432
1433         After creating a new collection record, this Collection object will be
1434         associated with the new record for `save()` and SYNC_LIVE updates.
1435
1436         :name:
1437           The collection name.
1438
1439         :keep_only:
1440           Only save the manifest to keep, do not create a collection record.
1441
1442         :owner_uuid:
1443           the user, or project uuid that will own this collection.
1444           If None, defaults to the current user.
1445
1446         :ensure_unique_name:
1447           If True, ask the API server to rename the collection
1448           if it conflicts with a collection with the same name and owner.  If
1449           False, a name conflict will result in an error.
1450
1451         """
1452         self._my_block_manager().commit_all()
1453         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1454         text = self.manifest_text(strip=False)
1455
1456         if create_collection_record:
1457             if name is None:
1458                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1459
1460             body = {"manifest_text": text,
1461                     "name": name}
1462             if owner_uuid:
1463                 body["owner_uuid"] = owner_uuid
1464
1465             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1466
1467             if self.events:
1468                 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1469
1470             self._manifest_locator = self._api_response["uuid"]
1471
1472             if self.events:
1473                 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1474
1475         self._manifest_text = text
1476         self.set_unmodified()
1477
1478     @synchronized
1479     def subscribe(self, callback):
1480         self.callbacks.append(callback)
1481
1482     @synchronized
1483     def unsubscribe(self, callback):
1484         self.callbacks.remove(callback)
1485
1486     @synchronized
1487     def notify(self, event, collection, name, item):
1488         for c in self.callbacks:
1489             c(event, collection, name, item)
1490
1491 def ReadOnlyCollection(*args, **kwargs):
1492     """Create a read-only collection object from an api collection record locator,
1493     a portable data hash of a manifest, or raw manifest text.
1494
1495     See `Collection` constructor for detailed options.
1496
1497     """
1498     kwargs["sync"] = SYNC_READONLY
1499     return Collection(*args, **kwargs)
1500
1501 def WritableCollection(*args, **kwargs):
1502     """Create a writable collection object from an api collection record locator,
1503     a portable data hash of a manifest, or raw manifest text.
1504
1505     See `Collection` constructor for detailed options.
1506
1507     """
1508
1509     kwargs["sync"] = SYNC_EXPLICIT
1510     return Collection(*args, **kwargs)
1511
1512 def LiveCollection(*args, **kwargs):
1513     """Create a writable, live updating collection object representing an existing
1514     collection record on the API server.
1515
1516     See `Collection` constructor for detailed options.
1517
1518     """
1519     kwargs["sync"] = SYNC_LIVE
1520     return Collection(*args, **kwargs)
1521
1522 def createWritableCollection(name, owner_uuid=None, apiconfig=None):
1523     """Create an empty, writable collection object and create an associated api
1524     collection record.
1525
1526     :name:
1527       The collection name
1528
1529     :owner_uuid:
1530       The parent project.
1531
1532     :apiconfig:
1533       Optional alternate api configuration to use (to specify alternate API
1534       host or token than the default.)
1535
1536     """
1537     newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
1538     newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1539     return newcollection
1540
1541 def createLiveCollection(name, owner_uuid=None, apiconfig=None):
1542     """Create an empty, writable, live updating Collection object and create an
1543     associated collection record on the API server.
1544
1545     :name:
1546       The collection name
1547
1548     :owner_uuid:
1549       The parent project.
1550
1551     :apiconfig:
1552       Optional alternate api configuration to use (to specify alternate API
1553       host or token than the default.)
1554
1555     """
1556     newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
1557     newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1558     newcollection._sync = SYNC_LIVE
1559     newcollection._subscribe_events()
1560     return newcollection
1561
1562 class Subcollection(SynchronizedCollectionBase):
1563     """This is a subdirectory within a collection that doesn't have its own API
1564     server record.
1565
1566     It falls under the umbrella of the root collection.
1567
1568     """
1569
1570     def __init__(self, parent):
1571         super(Subcollection, self).__init__(parent)
1572         self.lock = self.root_collection().lock
1573
1574     def root_collection(self):
1575         return self.parent.root_collection()
1576
1577     def sync_mode(self):
1578         return self.root_collection().sync_mode()
1579
1580     def _my_api(self):
1581         return self.root_collection()._my_api()
1582
1583     def _my_keep(self):
1584         return self.root_collection()._my_keep()
1585
1586     def _my_block_manager(self):
1587         return self.root_collection()._my_block_manager()
1588
1589     def _populate(self):
1590         self.root_collection()._populate()
1591
1592     def notify(self, event, collection, name, item):
1593         return self.root_collection().notify(event, collection, name, item)
1594
1595     @synchronized
1596     def clone(self, new_parent):
1597         c = Subcollection(new_parent)
1598         self._cloneinto(c)
1599         return c
1600
1601 def import_manifest(manifest_text,
1602                     into_collection=None,
1603                     api_client=None,
1604                     keep=None,
1605                     num_retries=None,
1606                     sync=SYNC_READONLY):
1607     """Import a manifest into a `Collection`.
1608
1609     :manifest_text:
1610       The manifest text to import from.
1611
1612     :into_collection:
1613       The `Collection` that will be initialized (must be empty).
1614       If None, create a new `Collection` object.
1615
1616     :api_client:
1617       The API client object that will be used when creating a new `Collection` object.
1618
1619     :keep:
1620       The keep client object that will be used when creating a new `Collection` object.
1621
1622     :num_retries:
1623       the default number of api client and keep retries on error.
1624
1625     :sync:
1626       Collection sync mode (only if into_collection is None)
1627     """
1628     if into_collection is not None:
1629         if len(into_collection) > 0:
1630             raise ArgumentError("Can only import manifest into an empty collection")
1631     else:
1632         into_collection = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1633
1634     save_sync = into_collection.sync_mode()
1635     into_collection._sync = None
1636
1637     STREAM_NAME = 0
1638     BLOCKS = 1
1639     SEGMENTS = 2
1640
1641     stream_name = None
1642     state = STREAM_NAME
1643
1644     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1645         tok = n.group(1)
1646         sep = n.group(2)
1647
1648         if state == STREAM_NAME:
1649             # starting a new stream
1650             stream_name = tok.replace('\\040', ' ')
1651             blocks = []
1652             segments = []
1653             streamoffset = 0L
1654             state = BLOCKS
1655             continue
1656
1657         if state == BLOCKS:
1658             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1659             if s:
1660                 blocksize = long(s.group(1))
1661                 blocks.append(Range(tok, streamoffset, blocksize))
1662                 streamoffset += blocksize
1663             else:
1664                 state = SEGMENTS
1665
1666         if state == SEGMENTS:
1667             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1668             if s:
1669                 pos = long(s.group(1))
1670                 size = long(s.group(2))
1671                 name = s.group(3).replace('\\040', ' ')
1672                 f = into_collection.find_or_create("%s/%s" % (stream_name, name), FILE)
1673                 f.add_segment(blocks, pos, size)
1674             else:
1675                 # error!
1676                 raise errors.SyntaxError("Invalid manifest format")
1677
1678         if sep == "\n":
1679             stream_name = None
1680             state = STREAM_NAME
1681
1682     into_collection.set_unmodified()
1683     into_collection._sync = save_sync
1684     return into_collection
1685
1686 def export_manifest(item, stream_name=".", portable_locators=False):
1687     """Export a manifest from the contents of a SynchronizedCollectionBase.
1688
1689     :item:
1690       Create a manifest for `item` (must be a `SynchronizedCollectionBase` or `ArvadosFile`).  If
1691       `item` is a is a `Collection`, this will also export subcollections.
1692
1693     :stream_name:
1694       the name of the stream when exporting `item`.
1695
1696     :portable_locators:
1697       If True, strip any permission hints on block locators.
1698       If False, use block locators as-is.
1699
1700     """
1701     buf = ""
1702     if isinstance(item, SynchronizedCollectionBase):
1703         stream = {}
1704         sorted_keys = sorted(item.keys())
1705         for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1706             # Create a stream per file `k`
1707             arvfile = item[filename]
1708             filestream = []
1709             for segment in arvfile.segments():
1710                 loc = segment.locator
1711                 if loc.startswith("bufferblock"):
1712                     loc = arvfile.parent._my_block_manager()._bufferblocks[loc].locator()
1713                 if portable_locators:
1714                     loc = KeepLocator(loc).stripped()
1715                 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1716                                      segment.segment_offset, segment.range_size))
1717             stream[filename] = filestream
1718         if stream:
1719             buf += ' '.join(normalize_stream(stream_name, stream))
1720             buf += "\n"
1721         for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1722             buf += export_manifest(item[dirname], stream_name=os.path.join(stream_name, dirname), portable_locators=portable_locators)
1723     elif isinstance(item, ArvadosFile):
1724         filestream = []
1725         for segment in item.segments:
1726             loc = segment.locator
1727             if loc.startswith("bufferblock"):
1728                 loc = item._bufferblocks[loc].calculate_locator()
1729             if portable_locators:
1730                 loc = KeepLocator(loc).stripped()
1731             filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1732                                  segment.segment_offset, segment.range_size))
1733         stream[stream_name] = filestream
1734         buf += ' '.join(normalize_stream(stream_name, stream))
1735         buf += "\n"
1736     return buf