4823: Add root_collection() to make it easier to find the root. Adjusted
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import time
7
8 from collections import deque
9 from stat import *
10
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
12 from keep import *
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 from .safeapi import SafeApi
16 import config
17 import errors
18 import util
19 import events
20 from arvados.retry import retry_method
21
22 _logger = logging.getLogger('arvados.collection')
23
24 class CollectionBase(object):
25     def __enter__(self):
26         return self
27
28     def __exit__(self, exc_type, exc_value, traceback):
29         pass
30
31     def _my_keep(self):
32         if self._keep_client is None:
33             self._keep_client = KeepClient(api_client=self._api_client,
34                                            num_retries=self.num_retries)
35         return self._keep_client
36
37     def stripped_manifest(self):
38         """
39         Return the manifest for the current collection with all
40         non-portable hints (i.e., permission signatures and other
41         hints other than size hints) removed from the locators.
42         """
43         raw = self.manifest_text()
44         clean = []
45         for line in raw.split("\n"):
46             fields = line.split()
47             if fields:
48                 clean_fields = fields[:1] + [
49                     (re.sub(r'\+[^\d][^\+]*', '', x)
50                      if re.match(util.keep_locator_pattern, x)
51                      else x)
52                     for x in fields[1:]]
53                 clean += [' '.join(clean_fields), "\n"]
54         return ''.join(clean)
55
56
57 class CollectionReader(CollectionBase):
58     def __init__(self, manifest_locator_or_text, api_client=None,
59                  keep_client=None, num_retries=0):
60         """Instantiate a CollectionReader.
61
62         This class parses Collection manifests to provide a simple interface
63         to read its underlying files.
64
65         Arguments:
66         * manifest_locator_or_text: One of a Collection UUID, portable data
67           hash, or full manifest text.
68         * api_client: The API client to use to look up Collections.  If not
69           provided, CollectionReader will build one from available Arvados
70           configuration.
71         * keep_client: The KeepClient to use to download Collection data.
72           If not provided, CollectionReader will build one from available
73           Arvados configuration.
74         * num_retries: The default number of times to retry failed
75           service requests.  Default 0.  You may change this value
76           after instantiation, but note those changes may not
77           propagate to related objects like the Keep client.
78         """
79         self._api_client = api_client
80         self._keep_client = keep_client
81         self.num_retries = num_retries
82         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
83             self._manifest_locator = manifest_locator_or_text
84             self._manifest_text = None
85         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
86             self._manifest_locator = manifest_locator_or_text
87             self._manifest_text = None
88         elif re.match(util.manifest_pattern, manifest_locator_or_text):
89             self._manifest_text = manifest_locator_or_text
90             self._manifest_locator = None
91         else:
92             raise errors.ArgumentError(
93                 "Argument to CollectionReader must be a manifest or a collection UUID")
94         self._api_response = None
95         self._streams = None
96
97     def _populate_from_api_server(self):
98         # As in KeepClient itself, we must wait until the last
99         # possible moment to instantiate an API client, in order to
100         # avoid tripping up clients that don't have access to an API
101         # server.  If we do build one, make sure our Keep client uses
102         # it.  If instantiation fails, we'll fall back to the except
103         # clause, just like any other Collection lookup
104         # failure. Return an exception, or None if successful.
105         try:
106             if self._api_client is None:
107                 self._api_client = arvados.api('v1')
108                 self._keep_client = None  # Make a new one with the new api.
109             self._api_response = self._api_client.collections().get(
110                 uuid=self._manifest_locator).execute(
111                 num_retries=self.num_retries)
112             self._manifest_text = self._api_response['manifest_text']
113             return None
114         except Exception as e:
115             return e
116
117     def _populate_from_keep(self):
118         # Retrieve a manifest directly from Keep. This has a chance of
119         # working if [a] the locator includes a permission signature
120         # or [b] the Keep services are operating in world-readable
121         # mode. Return an exception, or None if successful.
122         try:
123             self._manifest_text = self._my_keep().get(
124                 self._manifest_locator, num_retries=self.num_retries)
125         except Exception as e:
126             return e
127
128     def _populate(self):
129         error_via_api = None
130         error_via_keep = None
131         should_try_keep = ((self._manifest_text is None) and
132                            util.keep_locator_pattern.match(
133                 self._manifest_locator))
134         if ((self._manifest_text is None) and
135             util.signed_locator_pattern.match(self._manifest_locator)):
136             error_via_keep = self._populate_from_keep()
137         if self._manifest_text is None:
138             error_via_api = self._populate_from_api_server()
139             if error_via_api is not None and not should_try_keep:
140                 raise error_via_api
141         if ((self._manifest_text is None) and
142             not error_via_keep and
143             should_try_keep):
144             # Looks like a keep locator, and we didn't already try keep above
145             error_via_keep = self._populate_from_keep()
146         if self._manifest_text is None:
147             # Nothing worked!
148             raise arvados.errors.NotFoundError(
149                 ("Failed to retrieve collection '{}' " +
150                  "from either API server ({}) or Keep ({})."
151                  ).format(
152                     self._manifest_locator,
153                     error_via_api,
154                     error_via_keep))
155         self._streams = [sline.split()
156                          for sline in self._manifest_text.split("\n")
157                          if sline]
158
159     def _populate_first(orig_func):
160         # Decorator for methods that read actual Collection data.
161         @functools.wraps(orig_func)
162         def wrapper(self, *args, **kwargs):
163             if self._streams is None:
164                 self._populate()
165             return orig_func(self, *args, **kwargs)
166         return wrapper
167
168     @_populate_first
169     def api_response(self):
170         """api_response() -> dict or None
171
172         Returns information about this Collection fetched from the API server.
173         If the Collection exists in Keep but not the API server, currently
174         returns None.  Future versions may provide a synthetic response.
175         """
176         return self._api_response
177
178     @_populate_first
179     def normalize(self):
180         # Rearrange streams
181         streams = {}
182         for s in self.all_streams():
183             for f in s.all_files():
184                 streamname, filename = split(s.name() + "/" + f.name())
185                 if streamname not in streams:
186                     streams[streamname] = {}
187                 if filename not in streams[streamname]:
188                     streams[streamname][filename] = []
189                 for r in f.segments:
190                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
191
192         self._streams = [normalize_stream(s, streams[s])
193                          for s in sorted(streams)]
194
195         # Regenerate the manifest text based on the normalized streams
196         self._manifest_text = ''.join(
197             [StreamReader(stream, keep=self._my_keep()).manifest_text()
198              for stream in self._streams])
199
200     @_populate_first
201     def open(self, streampath, filename=None):
202         """open(streampath[, filename]) -> file-like object
203
204         Pass in the path of a file to read from the Collection, either as a
205         single string or as two separate stream name and file name arguments.
206         This method returns a file-like object to read that file.
207         """
208         if filename is None:
209             streampath, filename = split(streampath)
210         keep_client = self._my_keep()
211         for stream_s in self._streams:
212             stream = StreamReader(stream_s, keep_client,
213                                   num_retries=self.num_retries)
214             if stream.name() == streampath:
215                 break
216         else:
217             raise ValueError("stream '{}' not found in Collection".
218                              format(streampath))
219         try:
220             return stream.files()[filename]
221         except KeyError:
222             raise ValueError("file '{}' not found in Collection stream '{}'".
223                              format(filename, streampath))
224
225     @_populate_first
226     def all_streams(self):
227         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
228                 for s in self._streams]
229
230     def all_files(self):
231         for s in self.all_streams():
232             for f in s.all_files():
233                 yield f
234
235     @_populate_first
236     def manifest_text(self, strip=False, normalize=False):
237         if normalize:
238             cr = CollectionReader(self.manifest_text())
239             cr.normalize()
240             return cr.manifest_text(strip=strip, normalize=False)
241         elif strip:
242             return self.stripped_manifest()
243         else:
244             return self._manifest_text
245
246
247 class _WriterFile(ArvadosFileBase):
248     def __init__(self, coll_writer, name):
249         super(_WriterFile, self).__init__(name, 'wb')
250         self.dest = coll_writer
251
252     def close(self):
253         super(_WriterFile, self).close()
254         self.dest.finish_current_file()
255
256     @ArvadosFileBase._before_close
257     def write(self, data):
258         self.dest.write(data)
259
260     @ArvadosFileBase._before_close
261     def writelines(self, seq):
262         for data in seq:
263             self.write(data)
264
265     @ArvadosFileBase._before_close
266     def flush(self):
267         self.dest.flush_data()
268
269
270 class CollectionWriter(CollectionBase):
271     def __init__(self, api_client=None, num_retries=0):
272         """Instantiate a CollectionWriter.
273
274         CollectionWriter lets you build a new Arvados Collection from scratch.
275         Write files to it.  The CollectionWriter will upload data to Keep as
276         appropriate, and provide you with the Collection manifest text when
277         you're finished.
278
279         Arguments:
280         * api_client: The API client to use to look up Collections.  If not
281           provided, CollectionReader will build one from available Arvados
282           configuration.
283         * num_retries: The default number of times to retry failed
284           service requests.  Default 0.  You may change this value
285           after instantiation, but note those changes may not
286           propagate to related objects like the Keep client.
287         """
288         self._api_client = api_client
289         self.num_retries = num_retries
290         self._keep_client = None
291         self._data_buffer = []
292         self._data_buffer_len = 0
293         self._current_stream_files = []
294         self._current_stream_length = 0
295         self._current_stream_locators = []
296         self._current_stream_name = '.'
297         self._current_file_name = None
298         self._current_file_pos = 0
299         self._finished_streams = []
300         self._close_file = None
301         self._queued_file = None
302         self._queued_dirents = deque()
303         self._queued_trees = deque()
304         self._last_open = None
305
306     def __exit__(self, exc_type, exc_value, traceback):
307         if exc_type is None:
308             self.finish()
309
310     def do_queued_work(self):
311         # The work queue consists of three pieces:
312         # * _queued_file: The file object we're currently writing to the
313         #   Collection.
314         # * _queued_dirents: Entries under the current directory
315         #   (_queued_trees[0]) that we want to write or recurse through.
316         #   This may contain files from subdirectories if
317         #   max_manifest_depth == 0 for this directory.
318         # * _queued_trees: Directories that should be written as separate
319         #   streams to the Collection.
320         # This function handles the smallest piece of work currently queued
321         # (current file, then current directory, then next directory) until
322         # no work remains.  The _work_THING methods each do a unit of work on
323         # THING.  _queue_THING methods add a THING to the work queue.
324         while True:
325             if self._queued_file:
326                 self._work_file()
327             elif self._queued_dirents:
328                 self._work_dirents()
329             elif self._queued_trees:
330                 self._work_trees()
331             else:
332                 break
333
334     def _work_file(self):
335         while True:
336             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
337             if not buf:
338                 break
339             self.write(buf)
340         self.finish_current_file()
341         if self._close_file:
342             self._queued_file.close()
343         self._close_file = None
344         self._queued_file = None
345
346     def _work_dirents(self):
347         path, stream_name, max_manifest_depth = self._queued_trees[0]
348         if stream_name != self.current_stream_name():
349             self.start_new_stream(stream_name)
350         while self._queued_dirents:
351             dirent = self._queued_dirents.popleft()
352             target = os.path.join(path, dirent)
353             if os.path.isdir(target):
354                 self._queue_tree(target,
355                                  os.path.join(stream_name, dirent),
356                                  max_manifest_depth - 1)
357             else:
358                 self._queue_file(target, dirent)
359                 break
360         if not self._queued_dirents:
361             self._queued_trees.popleft()
362
363     def _work_trees(self):
364         path, stream_name, max_manifest_depth = self._queued_trees[0]
365         d = util.listdir_recursive(
366             path, max_depth = (None if max_manifest_depth == 0 else 0))
367         if d:
368             self._queue_dirents(stream_name, d)
369         else:
370             self._queued_trees.popleft()
371
372     def _queue_file(self, source, filename=None):
373         assert (self._queued_file is None), "tried to queue more than one file"
374         if not hasattr(source, 'read'):
375             source = open(source, 'rb')
376             self._close_file = True
377         else:
378             self._close_file = False
379         if filename is None:
380             filename = os.path.basename(source.name)
381         self.start_new_file(filename)
382         self._queued_file = source
383
384     def _queue_dirents(self, stream_name, dirents):
385         assert (not self._queued_dirents), "tried to queue more than one tree"
386         self._queued_dirents = deque(sorted(dirents))
387
388     def _queue_tree(self, path, stream_name, max_manifest_depth):
389         self._queued_trees.append((path, stream_name, max_manifest_depth))
390
391     def write_file(self, source, filename=None):
392         self._queue_file(source, filename)
393         self.do_queued_work()
394
395     def write_directory_tree(self,
396                              path, stream_name='.', max_manifest_depth=-1):
397         self._queue_tree(path, stream_name, max_manifest_depth)
398         self.do_queued_work()
399
400     def write(self, newdata):
401         if hasattr(newdata, '__iter__'):
402             for s in newdata:
403                 self.write(s)
404             return
405         self._data_buffer.append(newdata)
406         self._data_buffer_len += len(newdata)
407         self._current_stream_length += len(newdata)
408         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
409             self.flush_data()
410
411     def open(self, streampath, filename=None):
412         """open(streampath[, filename]) -> file-like object
413
414         Pass in the path of a file to write to the Collection, either as a
415         single string or as two separate stream name and file name arguments.
416         This method returns a file-like object you can write to add it to the
417         Collection.
418
419         You may only have one file object from the Collection open at a time,
420         so be sure to close the object when you're done.  Using the object in
421         a with statement makes that easy::
422
423           with cwriter.open('./doc/page1.txt') as outfile:
424               outfile.write(page1_data)
425           with cwriter.open('./doc/page2.txt') as outfile:
426               outfile.write(page2_data)
427         """
428         if filename is None:
429             streampath, filename = split(streampath)
430         if self._last_open and not self._last_open.closed:
431             raise errors.AssertionError(
432                 "can't open '{}' when '{}' is still open".format(
433                     filename, self._last_open.name))
434         if streampath != self.current_stream_name():
435             self.start_new_stream(streampath)
436         self.set_current_file_name(filename)
437         self._last_open = _WriterFile(self, filename)
438         return self._last_open
439
440     def flush_data(self):
441         data_buffer = ''.join(self._data_buffer)
442         if data_buffer:
443             self._current_stream_locators.append(
444                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
445             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
446             self._data_buffer_len = len(self._data_buffer[0])
447
448     def start_new_file(self, newfilename=None):
449         self.finish_current_file()
450         self.set_current_file_name(newfilename)
451
452     def set_current_file_name(self, newfilename):
453         if re.search(r'[\t\n]', newfilename):
454             raise errors.AssertionError(
455                 "Manifest filenames cannot contain whitespace: %s" %
456                 newfilename)
457         elif re.search(r'\x00', newfilename):
458             raise errors.AssertionError(
459                 "Manifest filenames cannot contain NUL characters: %s" %
460                 newfilename)
461         self._current_file_name = newfilename
462
463     def current_file_name(self):
464         return self._current_file_name
465
466     def finish_current_file(self):
467         if self._current_file_name is None:
468             if self._current_file_pos == self._current_stream_length:
469                 return
470             raise errors.AssertionError(
471                 "Cannot finish an unnamed file " +
472                 "(%d bytes at offset %d in '%s' stream)" %
473                 (self._current_stream_length - self._current_file_pos,
474                  self._current_file_pos,
475                  self._current_stream_name))
476         self._current_stream_files.append([
477                 self._current_file_pos,
478                 self._current_stream_length - self._current_file_pos,
479                 self._current_file_name])
480         self._current_file_pos = self._current_stream_length
481         self._current_file_name = None
482
483     def start_new_stream(self, newstreamname='.'):
484         self.finish_current_stream()
485         self.set_current_stream_name(newstreamname)
486
487     def set_current_stream_name(self, newstreamname):
488         if re.search(r'[\t\n]', newstreamname):
489             raise errors.AssertionError(
490                 "Manifest stream names cannot contain whitespace")
491         self._current_stream_name = '.' if newstreamname=='' else newstreamname
492
493     def current_stream_name(self):
494         return self._current_stream_name
495
496     def finish_current_stream(self):
497         self.finish_current_file()
498         self.flush_data()
499         if not self._current_stream_files:
500             pass
501         elif self._current_stream_name is None:
502             raise errors.AssertionError(
503                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
504                 (self._current_stream_length, len(self._current_stream_files)))
505         else:
506             if not self._current_stream_locators:
507                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
508             self._finished_streams.append([self._current_stream_name,
509                                            self._current_stream_locators,
510                                            self._current_stream_files])
511         self._current_stream_files = []
512         self._current_stream_length = 0
513         self._current_stream_locators = []
514         self._current_stream_name = None
515         self._current_file_pos = 0
516         self._current_file_name = None
517
518     def finish(self):
519         # Store the manifest in Keep and return its locator.
520         return self._my_keep().put(self.manifest_text())
521
522     def portable_data_hash(self):
523         stripped = self.stripped_manifest()
524         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
525
526     def manifest_text(self):
527         self.finish_current_stream()
528         manifest = ''
529
530         for stream in self._finished_streams:
531             if not re.search(r'^\.(/.*)?$', stream[0]):
532                 manifest += './'
533             manifest += stream[0].replace(' ', '\\040')
534             manifest += ' ' + ' '.join(stream[1])
535             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
536             manifest += "\n"
537
538         return manifest
539
540     def data_locators(self):
541         ret = []
542         for name, locators, files in self._finished_streams:
543             ret += locators
544         return ret
545
546
547 class ResumableCollectionWriter(CollectionWriter):
548     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
549                    '_current_stream_locators', '_current_stream_name',
550                    '_current_file_name', '_current_file_pos', '_close_file',
551                    '_data_buffer', '_dependencies', '_finished_streams',
552                    '_queued_dirents', '_queued_trees']
553
554     def __init__(self, api_client=None, num_retries=0):
555         self._dependencies = {}
556         super(ResumableCollectionWriter, self).__init__(
557             api_client, num_retries=num_retries)
558
559     @classmethod
560     def from_state(cls, state, *init_args, **init_kwargs):
561         # Try to build a new writer from scratch with the given state.
562         # If the state is not suitable to resume (because files have changed,
563         # been deleted, aren't predictable, etc.), raise a
564         # StaleWriterStateError.  Otherwise, return the initialized writer.
565         # The caller is responsible for calling writer.do_queued_work()
566         # appropriately after it's returned.
567         writer = cls(*init_args, **init_kwargs)
568         for attr_name in cls.STATE_PROPS:
569             attr_value = state[attr_name]
570             attr_class = getattr(writer, attr_name).__class__
571             # Coerce the value into the same type as the initial value, if
572             # needed.
573             if attr_class not in (type(None), attr_value.__class__):
574                 attr_value = attr_class(attr_value)
575             setattr(writer, attr_name, attr_value)
576         # Check dependencies before we try to resume anything.
577         if any(KeepLocator(ls).permission_expired()
578                for ls in writer._current_stream_locators):
579             raise errors.StaleWriterStateError(
580                 "locators include expired permission hint")
581         writer.check_dependencies()
582         if state['_current_file'] is not None:
583             path, pos = state['_current_file']
584             try:
585                 writer._queued_file = open(path, 'rb')
586                 writer._queued_file.seek(pos)
587             except IOError as error:
588                 raise errors.StaleWriterStateError(
589                     "failed to reopen active file {}: {}".format(path, error))
590         return writer
591
592     def check_dependencies(self):
593         for path, orig_stat in self._dependencies.items():
594             if not S_ISREG(orig_stat[ST_MODE]):
595                 raise errors.StaleWriterStateError("{} not file".format(path))
596             try:
597                 now_stat = tuple(os.stat(path))
598             except OSError as error:
599                 raise errors.StaleWriterStateError(
600                     "failed to stat {}: {}".format(path, error))
601             if ((not S_ISREG(now_stat[ST_MODE])) or
602                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
603                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
604                 raise errors.StaleWriterStateError("{} changed".format(path))
605
606     def dump_state(self, copy_func=lambda x: x):
607         state = {attr: copy_func(getattr(self, attr))
608                  for attr in self.STATE_PROPS}
609         if self._queued_file is None:
610             state['_current_file'] = None
611         else:
612             state['_current_file'] = (os.path.realpath(self._queued_file.name),
613                                       self._queued_file.tell())
614         return state
615
616     def _queue_file(self, source, filename=None):
617         try:
618             src_path = os.path.realpath(source)
619         except Exception:
620             raise errors.AssertionError("{} not a file path".format(source))
621         try:
622             path_stat = os.stat(src_path)
623         except OSError as stat_error:
624             path_stat = None
625         super(ResumableCollectionWriter, self)._queue_file(source, filename)
626         fd_stat = os.fstat(self._queued_file.fileno())
627         if not S_ISREG(fd_stat.st_mode):
628             # We won't be able to resume from this cache anyway, so don't
629             # worry about further checks.
630             self._dependencies[source] = tuple(fd_stat)
631         elif path_stat is None:
632             raise errors.AssertionError(
633                 "could not stat {}: {}".format(source, stat_error))
634         elif path_stat.st_ino != fd_stat.st_ino:
635             raise errors.AssertionError(
636                 "{} changed between open and stat calls".format(source))
637         else:
638             self._dependencies[src_path] = tuple(fd_stat)
639
640     def write(self, data):
641         if self._queued_file is None:
642             raise errors.AssertionError(
643                 "resumable writer can't accept unsourced data")
644         return super(ResumableCollectionWriter, self).write(data)
645
646 ADD = "add"
647 DEL = "del"
648 MOD = "mod"
649
650 class SynchronizedCollectionBase(CollectionBase):
651     def __init__(self, parent=None):
652         self.parent = parent
653         self._modified = True
654         self._items = {}
655
656     def _my_api(self):
657         raise NotImplementedError()
658
659     def _my_keep(self):
660         raise NotImplementedError()
661
662     def _my_block_manager(self):
663         raise NotImplementedError()
664
665     def _populate(self):
666         raise NotImplementedError()
667
668     def sync_mode(self):
669         raise NotImplementedError()
670
671     def root_collection(self):
672         raise NotImplementedError()
673
674     def notify(self, event, collection, name, item):
675         raise NotImplementedError()
676
677     @synchronized
678     def find(self, path, create=False, create_collection=False):
679         """Recursively search the specified file path.  May return either a Collection
680         or ArvadosFile.
681
682         :create:
683           If true, create path components (i.e. Collections) that are
684           missing.  If "create" is False, return None if a path component is
685           not found.
686
687         :create_collection:
688           If the path is not found, "create" is True, and
689           "create_collection" is False, then create and return a new
690           ArvadosFile for the last path component.  If "create_collection" is
691           True, then create and return a new Collection for the last path
692           component.
693
694         """
695         if create and self.sync_mode() == SYNC_READONLY:
696             raise IOError((errno.EROFS, "Collection is read only"))
697
698         p = path.split("/")
699         if p[0] == '.':
700             del p[0]
701
702         if p and p[0]:
703             item = self._items.get(p[0])
704             if len(p) == 1:
705                 # item must be a file
706                 if item is None and create:
707                     # create new file
708                     if create_collection:
709                         item = Subcollection(self)
710                     else:
711                         item = ArvadosFile(self)
712                     self._items[p[0]] = item
713                     self._modified = True
714                     self.notify(ADD, self, p[0], item)
715                 return item
716             else:
717                 if item is None and create:
718                     # create new collection
719                     item = Subcollection(self)
720                     self._items[p[0]] = item
721                     self._modified = True
722                     self.notify(ADD, self, p[0], item)
723                 del p[0]
724                 if isinstance(item, SynchronizedCollectionBase):
725                     return item.find("/".join(p), create=create)
726                 else:
727                     raise errors.ArgumentError("Interior path components must be subcollection")
728         else:
729             return self
730
731     def open(self, path, mode):
732         """Open a file-like object for access.
733
734         :path:
735           path to a file in the collection
736         :mode:
737           one of "r", "r+", "w", "w+", "a", "a+"
738           :"r":
739             opens for reading
740           :"r+":
741             opens for reading and writing.  Reads/writes share a file pointer.
742           :"w", "w+":
743             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
744           :"a", "a+":
745             opens for reading and writing.  All writes are appended to
746             the end of the file.  Writing does not affect the file pointer for
747             reading.
748         """
749         mode = mode.replace("b", "")
750         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
751             raise ArgumentError("Bad mode '%s'" % mode)
752         create = (mode != "r")
753
754         if create and self.sync_mode() == SYNC_READONLY:
755             raise IOError((errno.EROFS, "Collection is read only"))
756
757         f = self.find(path, create=create)
758
759         if f is None:
760             raise IOError((errno.ENOENT, "File not found"))
761         if not isinstance(f, ArvadosFile):
762             raise IOError((errno.EISDIR, "Path must refer to a file."))
763
764         if mode[0] == "w":
765             f.truncate(0)
766
767         if mode == "r":
768             return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
769         else:
770             return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
771
772     @synchronized
773     def modified(self):
774         """Test if the collection (or any subcollection or file) has been modified
775         since it was created."""
776         if self._modified:
777             return True
778         for k,v in self._items.items():
779             if v.modified():
780                 return True
781         return False
782
783     @synchronized
784     def set_unmodified(self):
785         """Recursively clear modified flag"""
786         self._modified = False
787         for k,v in self._items.items():
788             v.set_unmodified()
789
790     @synchronized
791     def __iter__(self):
792         """Iterate over names of files and collections contained in this collection."""
793         return self._items.keys().__iter__()
794
795     @synchronized
796     def iterkeys(self):
797         """Iterate over names of files and collections directly contained in this collection."""
798         return self._items.keys()
799
800     @synchronized
801     def __getitem__(self, k):
802         """Get a file or collection that is directly contained by this collection.  If
803         you want to search a path, use `find()` instead.
804         """
805         return self._items[k]
806
807     @synchronized
808     def __contains__(self, k):
809         """If there is a file or collection a directly contained by this collection
810         with name "k"."""
811         return k in self._items
812
813     @synchronized
814     def __len__(self):
815         """Get the number of items directly contained in this collection"""
816         return len(self._items)
817
818     @must_be_writable
819     @synchronized
820     def __delitem__(self, p):
821         """Delete an item by name which is directly contained by this collection."""
822         del self._items[p]
823         self._modified = True
824         self.notify(DEL, self, p, None)
825
826     @synchronized
827     def keys(self):
828         """Get a list of names of files and collections directly contained in this collection."""
829         return self._items.keys()
830
831     @synchronized
832     def values(self):
833         """Get a list of files and collection objects directly contained in this collection."""
834         return self._items.values()
835
836     @synchronized
837     def items(self):
838         """Get a list of (name, object) tuples directly contained in this collection."""
839         return self._items.items()
840
841     def exists(self, path):
842         """Test if there is a file or collection at "path" """
843         return self.find(path) != None
844
845     @must_be_writable
846     @synchronized
847     def remove(self, path, rm_r=False):
848         """Remove the file or subcollection (directory) at `path`.
849         :rm_r:
850           Specify whether to remove non-empty subcollections (True), or raise an error (False).
851         """
852         p = path.split("/")
853         if p[0] == '.':
854             # Remove '.' from the front of the path
855             del p[0]
856
857         if len(p) > 0:
858             item = self._items.get(p[0])
859             if item is None:
860                 raise IOError((errno.ENOENT, "File not found"))
861             if len(p) == 1:
862                 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
863                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
864                 d = self._items[p[0]]
865                 del self._items[p[0]]
866                 self._modified = True
867                 self.notify(DEL, self, p[0], d)
868             else:
869                 del p[0]
870                 item.remove("/".join(p))
871         else:
872             raise IOError((errno.ENOENT, "File not found"))
873
874     def _cloneinto(self, target):
875         for k,v in self._items.items():
876             target._items[k] = v.clone(target)
877
878     def clone(self):
879         raise NotImplementedError()
880
881     @must_be_writable
882     @synchronized
883     def copy(self, source, target_path, source_collection=None, overwrite=False):
884         """Copy a file or subcollection to a new path in this collection.
885
886         :source:
887           An ArvadosFile, Subcollection, or string with a path to source file or subcollection
888
889         :target_path:
890           Destination file or path.  If the target path already exists and is a
891           subcollection, the item will be placed inside the subcollection.  If
892           the target path already exists and is a file, this will raise an error
893           unless you specify `overwrite=True`.
894
895         :source_collection:
896           Collection to copy `source_path` from (default `self`)
897
898         :overwrite:
899           Whether to overwrite target file if it already exists.
900         """
901         if source_collection is None:
902             source_collection = self
903
904         # Find the object to copy
905         if isinstance(source, basestring):
906             source_obj = source_collection.find(source)
907             if source_obj is None:
908                 raise IOError((errno.ENOENT, "File not found"))
909             sp = source.split("/")
910         else:
911             source_obj = source
912             sp = None
913
914         # Find parent collection the target path
915         tp = target_path.split("/")
916
917         # Determine the name to use.
918         target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
919
920         if not target_name:
921             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
922
923         target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
924
925         with target_dir.lock:
926             if target_name in target_dir:
927                 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
928                     target_dir = target_dir[target_name]
929                     target_name = sp[-1]
930                 elif not overwrite:
931                     raise IOError((errno.EEXIST, "File already exists"))
932
933             mod = None
934             if target_name in target_dir:
935                 mod = target_dir[target_name]
936
937             # Actually make the copy.
938             dup = source_obj.clone(target_dir)
939             target_dir._items[target_name] = dup
940             target_dir._modified = True
941
942         if mod:
943             self.notify(MOD, target_dir, target_name, (mod, dup))
944         else:
945             self.notify(ADD, target_dir, target_name, dup)
946
947     @synchronized
948     def manifest_text(self, strip=False, normalize=False):
949         """Get the manifest text for this collection, sub collections and files.
950
951         :strip:
952           If True, remove signing tokens from block locators if present.
953           If False, block locators are left unchanged.
954
955         :normalize:
956           If True, always export the manifest text in normalized form
957           even if the Collection is not modified.  If False and the collection
958           is not modified, return the original manifest text even if it is not
959           in normalized form.
960
961         """
962         if self.modified() or self._manifest_text is None or normalize:
963             return export_manifest(self, stream_name=".", portable_locators=strip)
964         else:
965             if strip:
966                 return self.stripped_manifest()
967             else:
968                 return self._manifest_text
969
970     @synchronized
971     def diff(self, end_collection, prefix=".", holding_collection=None):
972         """
973         Generate list of add/modify/delete actions which, when given to `apply`, will
974         change `self` to match `end_collection`
975         """
976         changes = []
977         if holding_collection is None:
978             holding_collection = Collection()
979         for k in self:
980             if k not in end_collection:
981                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
982         for k in end_collection:
983             if k in self:
984                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
985                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
986                 elif end_collection[k] != self[k]:
987                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
988             else:
989                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
990         return changes
991
992     @must_be_writable
993     @synchronized
994     def apply(self, changes):
995         """
996         Apply changes from `diff`.  If a change conflicts with a local change, it
997         will be saved to an alternate path indicating the conflict.
998         """
999         for c in changes:
1000             path = c[1]
1001             initial = c[2]
1002             local = self.find(path)
1003             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
1004                                                                     time.gmtime()))
1005             if c[0] == ADD:
1006                 if local is None:
1007                     # No local file at path, safe to copy over new file
1008                     self.copy(initial, path)
1009                 elif local is not None and local != initial:
1010                     # There is already local file and it is different:
1011                     # save change to conflict file.
1012                     self.copy(initial, conflictpath)
1013             elif c[0] == MOD:
1014                 if local == initial:
1015                     # Local matches the "initial" item so assume it hasn't
1016                     # changed locally and is safe to update.
1017                     if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
1018                         # Replace contents of local file with new contents
1019                         local.replace_contents(c[3])
1020                     else:
1021                         # Overwrite path with new item; this can happen if if
1022                         # path was a file and is now a collection or vice versa
1023                         self.copy(c[3], path, overwrite=True)
1024                 else:
1025                     # Local is missing (presumably deleted) or local doesn't
1026                     # match the "start" value, so save change to conflict file
1027                     self.copy(c[3], conflictpath)
1028             elif c[0] == DEL:
1029                 if local == initial:
1030                     # Local item matches "initial" value, so it is safe to remove.
1031                     self.remove(path, rm_r=True)
1032                 # else, the file is modified or already removed, in either
1033                 # case we don't want to try to remove it.
1034
1035     def portable_data_hash(self):
1036         """Get the portable data hash for this collection's manifest."""
1037         stripped = self.manifest_text(strip=True)
1038         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1039
1040     @synchronized
1041     def __eq__(self, other):
1042         if other is self:
1043             return True
1044         if not isinstance(other, SynchronizedCollectionBase):
1045             return False
1046         if len(self._items) != len(other):
1047             return False
1048         for k in self._items:
1049             if k not in other:
1050                 return False
1051             if self._items[k] != other[k]:
1052                 return False
1053         return True
1054
1055     def __ne__(self, other):
1056         return not self.__eq__(other)
1057
1058 class Collection(SynchronizedCollectionBase):
1059     """Store an Arvados collection, consisting of a set of files and
1060     sub-collections.  This object
1061     """
1062
1063     def __init__(self, manifest_locator_or_text=None,
1064                  parent=None,
1065                  config=None,
1066                  api_client=None,
1067                  keep_client=None,
1068                  num_retries=None,
1069                  block_manager=None,
1070                  sync=SYNC_READONLY):
1071         """:manifest_locator_or_text:
1072           One of Arvados collection UUID, block locator of
1073           a manifest, raw manifest text, or None (to create an empty collection).
1074         :parent:
1075           the parent Collection, may be None.
1076         :config:
1077           the arvados configuration to get the hostname and api token.
1078           Prefer this over supplying your own api_client and keep_client (except in testing).
1079           Will use default config settings if not specified.
1080         :api_client:
1081           The API client object to use for requests.  If not specified, create one using `config`.
1082         :keep_client:
1083           the Keep client to use for requests.  If not specified, create one using `config`.
1084         :num_retries:
1085           the number of retries for API and Keep requests.
1086         :block_manager:
1087           the block manager to use.  If not specified, create one.
1088         :sync:
1089           Set synchronization policy with API server collection record.
1090           :SYNC_READONLY:
1091             Collection is read only.  No synchronization.  This mode will
1092             also forego locking, which gives better performance.
1093           :SYNC_EXPLICIT:
1094             Collection is writable.  Synchronize on explicit request via `update()` or `save()`
1095           :SYNC_LIVE:
1096             Collection is writable.  Synchronize with server in response to
1097             background websocket events, on block write, or on file close.
1098
1099         """
1100         super(Collection, self).__init__(parent)
1101         self._api_client = api_client
1102         self._keep_client = keep_client
1103         self._block_manager = block_manager
1104         self._config = config
1105         self.num_retries = num_retries
1106         self._manifest_locator = None
1107         self._manifest_text = None
1108         self._api_response = None
1109         self._sync = sync
1110         self.lock = threading.RLock()
1111         self.callbacks = []
1112         self.events = None
1113
1114         if manifest_locator_or_text:
1115             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1116                 self._manifest_locator = manifest_locator_or_text
1117             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1118                 self._manifest_locator = manifest_locator_or_text
1119             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1120                 self._manifest_text = manifest_locator_or_text
1121             else:
1122                 raise errors.ArgumentError(
1123                     "Argument to CollectionReader must be a manifest or a collection UUID")
1124
1125             self._populate()
1126
1127             if self._sync == SYNC_LIVE:
1128                 if not self._has_collection_uuid():
1129                     raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
1130                 self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
1131
1132     @staticmethod
1133     def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
1134         c = Collection(sync=sync)
1135         c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1136         return c
1137
1138     def root_collection(self):
1139         return self
1140
1141     def sync_mode(self):
1142         return self._sync
1143
1144     def on_message(self):
1145         self.update()
1146
1147     @synchronized
1148     @retry_method
1149     def update(self, other=None, num_retries=None):
1150         if other is None:
1151             if self._manifest_locator is None:
1152                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1153             n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1154             other = import_collection(n["manifest_text"])
1155         baseline = import_collection(self._manifest_text)
1156         self.apply(other.diff(baseline))
1157
1158     @synchronized
1159     def _my_api(self):
1160         if self._api_client is None:
1161             self._api_client = arvados.SafeApi(self._config)
1162             self._keep_client = self._api_client.keep
1163         return self._api_client
1164
1165     @synchronized
1166     def _my_keep(self):
1167         if self._keep_client is None:
1168             if self._api_client is None:
1169                 self._my_api()
1170             else:
1171                 self._keep_client = KeepClient(api=self._api_client)
1172         return self._keep_client
1173
1174     @synchronized
1175     def _my_block_manager(self):
1176         if self._block_manager is None:
1177             self._block_manager = BlockManager(self._my_keep())
1178         return self._block_manager
1179
1180     def _populate_from_api_server(self):
1181         # As in KeepClient itself, we must wait until the last
1182         # possible moment to instantiate an API client, in order to
1183         # avoid tripping up clients that don't have access to an API
1184         # server.  If we do build one, make sure our Keep client uses
1185         # it.  If instantiation fails, we'll fall back to the except
1186         # clause, just like any other Collection lookup
1187         # failure. Return an exception, or None if successful.
1188         try:
1189             self._api_response = self._my_api().collections().get(
1190                 uuid=self._manifest_locator).execute(
1191                     num_retries=self.num_retries)
1192             self._manifest_text = self._api_response['manifest_text']
1193             return None
1194         except Exception as e:
1195             return e
1196
1197     def _populate_from_keep(self):
1198         # Retrieve a manifest directly from Keep. This has a chance of
1199         # working if [a] the locator includes a permission signature
1200         # or [b] the Keep services are operating in world-readable
1201         # mode. Return an exception, or None if successful.
1202         try:
1203             self._manifest_text = self._my_keep().get(
1204                 self._manifest_locator, num_retries=self.num_retries)
1205         except Exception as e:
1206             return e
1207
1208     def _populate(self):
1209         if self._manifest_locator is None and self._manifest_text is None:
1210             return
1211         error_via_api = None
1212         error_via_keep = None
1213         should_try_keep = ((self._manifest_text is None) and
1214                            util.keep_locator_pattern.match(
1215                                self._manifest_locator))
1216         if ((self._manifest_text is None) and
1217             util.signed_locator_pattern.match(self._manifest_locator)):
1218             error_via_keep = self._populate_from_keep()
1219         if self._manifest_text is None:
1220             error_via_api = self._populate_from_api_server()
1221             if error_via_api is not None and not should_try_keep:
1222                 raise error_via_api
1223         if ((self._manifest_text is None) and
1224             not error_via_keep and
1225             should_try_keep):
1226             # Looks like a keep locator, and we didn't already try keep above
1227             error_via_keep = self._populate_from_keep()
1228         if self._manifest_text is None:
1229             # Nothing worked!
1230             raise arvados.errors.NotFoundError(
1231                 ("Failed to retrieve collection '{}' " +
1232                  "from either API server ({}) or Keep ({})."
1233                  ).format(
1234                     self._manifest_locator,
1235                     error_via_api,
1236                     error_via_keep))
1237         # populate
1238         self._baseline_manifest = self._manifest_text
1239         import_manifest(self._manifest_text, self)
1240
1241         if self._sync == SYNC_READONLY:
1242             # Now that we're populated, knowing that this will be readonly,
1243             # forego any further locking.
1244             self.lock = NoopLock()
1245
1246     def _has_collection_uuid(self):
1247         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1248
1249     def __enter__(self):
1250         return self
1251
1252     def __exit__(self, exc_type, exc_value, traceback):
1253         """Support scoped auto-commit in a with: block"""
1254         if self._sync != SYNC_READONLY and self._has_collection_uuid():
1255             self.save()
1256         if self._block_manager is not None:
1257             self._block_manager.stop_threads()
1258
1259     @synchronized
1260     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1261         if new_config is None:
1262             new_config = self._config
1263         c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1264         if new_sync == SYNC_READONLY:
1265             c.lock = NoopLock()
1266         c._items = {}
1267         self._cloneinto(c)
1268         return c
1269
1270     @synchronized
1271     def api_response(self):
1272         """
1273         api_response() -> dict or None
1274
1275         Returns information about this Collection fetched from the API server.
1276         If the Collection exists in Keep but not the API server, currently
1277         returns None.  Future versions may provide a synthetic response.
1278         """
1279         return self._api_response
1280
1281     @must_be_writable
1282     @synchronized
1283     @retry_method
1284     def save(self, merge=True, num_retries=None):
1285         """Commit pending buffer blocks to Keep, merge with remote record (if
1286         update=True), write the manifest to Keep, and update the collection
1287         record.  Will raise AssertionError if not associated with a collection
1288         record on the API server.  If you want to save a manifest to Keep only,
1289         see `save_new()`.
1290
1291         :update:
1292           Update and merge remote changes before saving.  Otherwise, any
1293           remote changes will be ignored and overwritten.
1294
1295         """
1296         if self.modified():
1297             if not self._has_collection_uuid():
1298                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
1299             self._my_block_manager().commit_all()
1300             if merge:
1301                 self.update()
1302             self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1303
1304             mt = self.manifest_text(strip=False)
1305             self._api_response = self._my_api().collections().update(
1306                 uuid=self._manifest_locator,
1307                 body={'manifest_text': mt}
1308                 ).execute(
1309                     num_retries=num_retries)
1310             self._manifest_text = mt
1311             self.set_unmodified()
1312
1313     @must_be_writable
1314     @synchronized
1315     @retry_method
1316     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1317         """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
1318         a new collection record (if create_collection_record True).  After
1319         creating a new collection record, this Collection object will be
1320         associated with the new record for `save()` and SYNC_LIVE updates.
1321
1322         :name:
1323           The collection name.
1324
1325         :keep_only:
1326           Only save the manifest to keep, do not create a collection record.
1327
1328         :owner_uuid:
1329           the user, or project uuid that will own this collection.
1330           If None, defaults to the current user.
1331
1332         :ensure_unique_name:
1333           If True, ask the API server to rename the collection
1334           if it conflicts with a collection with the same name and owner.  If
1335           False, a name conflict will result in an error.
1336
1337         """
1338         self._my_block_manager().commit_all()
1339         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1340         mt = self.manifest_text(strip=False)
1341
1342         if create_collection_record:
1343             if name is None:
1344                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1345
1346             body = {"manifest_text": mt,
1347                     "name": name}
1348             if owner_uuid:
1349                 body["owner_uuid"] = owner_uuid
1350
1351             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1352
1353             if self.events:
1354                 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1355
1356             self._manifest_locator = self._api_response["uuid"]
1357
1358             if self.events:
1359                 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1360
1361         self._manifest_text = mt
1362         self.set_unmodified()
1363
1364     @synchronized
1365     def subscribe(self, callback):
1366         self.callbacks.append(callback)
1367
1368     @synchronized
1369     def unsubscribe(self, callback):
1370         self.callbacks.remove(callback)
1371
1372     @synchronized
1373     def notify(self, event, collection, name, item):
1374         for c in self.callbacks:
1375             c(event, collection, name, item)
1376
1377 class Subcollection(SynchronizedCollectionBase):
1378     """This is a subdirectory within a collection that doesn't have its own API
1379     server record.  It falls under the umbrella of the root collection."""
1380
1381     def __init__(self, parent):
1382         super(Subcollection, self).__init__(parent)
1383         self.lock = self.root_collection().lock
1384
1385     def root_collection(self):
1386         return self.parent.root_collection()
1387
1388     def sync_mode(self):
1389         return self.root_collection().sync_mode()
1390
1391     def _my_api(self):
1392         return self.root_collection()._my_api()
1393
1394     def _my_keep(self):
1395         return self.root_collection()._my_keep()
1396
1397     def _my_block_manager(self):
1398         return self.root_collection()._my_block_manager()
1399
1400     def _populate(self):
1401         self.root_collection()._populate()
1402
1403     def notify(self, event, collection, name, item):
1404         return self.root_collection().notify(event, collection, name, item)
1405
1406     @synchronized
1407     def clone(self, new_parent):
1408         c = Subcollection(new_parent)
1409         self._cloneinto(c)
1410         return c
1411
1412 def import_manifest(manifest_text,
1413                     into_collection=None,
1414                     api_client=None,
1415                     keep=None,
1416                     num_retries=None,
1417                     sync=SYNC_READONLY):
1418     """Import a manifest into a `Collection`.
1419
1420     :manifest_text:
1421       The manifest text to import from.
1422
1423     :into_collection:
1424       The `Collection` that will be initialized (must be empty).
1425       If None, create a new `Collection` object.
1426
1427     :api_client:
1428       The API client object that will be used when creating a new `Collection` object.
1429
1430     :keep:
1431       The keep client object that will be used when creating a new `Collection` object.
1432
1433     :num_retries:
1434       the default number of api client and keep retries on error.
1435
1436     :sync:
1437       Collection sync mode (only if into_collection is None)
1438     """
1439     if into_collection is not None:
1440         if len(into_collection) > 0:
1441             raise ArgumentError("Can only import manifest into an empty collection")
1442         c = into_collection
1443     else:
1444         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1445
1446     save_sync = c.sync_mode()
1447     c._sync = None
1448
1449     STREAM_NAME = 0
1450     BLOCKS = 1
1451     SEGMENTS = 2
1452
1453     stream_name = None
1454     state = STREAM_NAME
1455
1456     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1457         tok = n.group(1)
1458         sep = n.group(2)
1459
1460         if state == STREAM_NAME:
1461             # starting a new stream
1462             stream_name = tok.replace('\\040', ' ')
1463             blocks = []
1464             segments = []
1465             streamoffset = 0L
1466             state = BLOCKS
1467             continue
1468
1469         if state == BLOCKS:
1470             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1471             if s:
1472                 blocksize = long(s.group(1))
1473                 blocks.append(Range(tok, streamoffset, blocksize))
1474                 streamoffset += blocksize
1475             else:
1476                 state = SEGMENTS
1477
1478         if state == SEGMENTS:
1479             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1480             if s:
1481                 pos = long(s.group(1))
1482                 size = long(s.group(2))
1483                 name = s.group(3).replace('\\040', ' ')
1484                 f = c.find("%s/%s" % (stream_name, name), create=True)
1485                 f.add_segment(blocks, pos, size)
1486             else:
1487                 # error!
1488                 raise errors.SyntaxError("Invalid manifest format")
1489
1490         if sep == "\n":
1491             stream_name = None
1492             state = STREAM_NAME
1493
1494     c.set_unmodified()
1495     c._sync = save_sync
1496     return c
1497
1498 def export_manifest(item, stream_name=".", portable_locators=False):
1499     """
1500     :item:
1501       Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
1502       `item` is a is a `Collection`, this will also export subcollections.
1503
1504     :stream_name:
1505       the name of the stream when exporting `item`.
1506
1507     :portable_locators:
1508       If True, strip any permission hints on block locators.
1509       If False, use block locators as-is.
1510     """
1511     buf = ""
1512     if isinstance(item, SynchronizedCollectionBase):
1513         stream = {}
1514         sorted_keys = sorted(item.keys())
1515         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1516             v = item[k]
1517             st = []
1518             for s in v.segments():
1519                 loc = s.locator
1520                 if loc.startswith("bufferblock"):
1521                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1522                 if portable_locators:
1523                     loc = KeepLocator(loc).stripped()
1524                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1525                                      s.segment_offset, s.range_size))
1526             stream[k] = st
1527         if stream:
1528             buf += ' '.join(normalize_stream(stream_name, stream))
1529             buf += "\n"
1530         for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1531             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1532     elif isinstance(item, ArvadosFile):
1533         st = []
1534         for s in item.segments:
1535             loc = s.locator
1536             if loc.startswith("bufferblock"):
1537                 loc = item._bufferblocks[loc].calculate_locator()
1538             if portable_locators:
1539                 loc = KeepLocator(loc).stripped()
1540             st.append(LocatorAndRange(loc, locator_block_size(loc),
1541                                  s.segment_offset, s.range_size))
1542         stream[stream_name] = st
1543         buf += ' '.join(normalize_stream(stream_name, stream))
1544         buf += "\n"
1545     return buf