4823: Implemented diff/apply/update methods for Collection synchronization. Needs...
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import time
7
8 from collections import deque
9 from stat import *
10
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
12 from keep import *
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 from .safeapi import SafeApi
16 import config
17 import errors
18 import util
19 import events
20
21 _logger = logging.getLogger('arvados.collection')
22
23 class CollectionBase(object):
24     def __enter__(self):
25         return self
26
27     def __exit__(self, exc_type, exc_value, traceback):
28         pass
29
30     def _my_keep(self):
31         if self._keep_client is None:
32             self._keep_client = KeepClient(api_client=self._api_client,
33                                            num_retries=self.num_retries)
34         return self._keep_client
35
36     def stripped_manifest(self):
37         """
38         Return the manifest for the current collection with all
39         non-portable hints (i.e., permission signatures and other
40         hints other than size hints) removed from the locators.
41         """
42         raw = self.manifest_text()
43         clean = []
44         for line in raw.split("\n"):
45             fields = line.split()
46             if fields:
47                 clean_fields = fields[:1] + [
48                     (re.sub(r'\+[^\d][^\+]*', '', x)
49                      if re.match(util.keep_locator_pattern, x)
50                      else x)
51                     for x in fields[1:]]
52                 clean += [' '.join(clean_fields), "\n"]
53         return ''.join(clean)
54
55
56 class CollectionReader(CollectionBase):
57     def __init__(self, manifest_locator_or_text, api_client=None,
58                  keep_client=None, num_retries=0):
59         """Instantiate a CollectionReader.
60
61         This class parses Collection manifests to provide a simple interface
62         to read its underlying files.
63
64         Arguments:
65         * manifest_locator_or_text: One of a Collection UUID, portable data
66           hash, or full manifest text.
67         * api_client: The API client to use to look up Collections.  If not
68           provided, CollectionReader will build one from available Arvados
69           configuration.
70         * keep_client: The KeepClient to use to download Collection data.
71           If not provided, CollectionReader will build one from available
72           Arvados configuration.
73         * num_retries: The default number of times to retry failed
74           service requests.  Default 0.  You may change this value
75           after instantiation, but note those changes may not
76           propagate to related objects like the Keep client.
77         """
78         self._api_client = api_client
79         self._keep_client = keep_client
80         self.num_retries = num_retries
81         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
82             self._manifest_locator = manifest_locator_or_text
83             self._manifest_text = None
84         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
85             self._manifest_locator = manifest_locator_or_text
86             self._manifest_text = None
87         elif re.match(util.manifest_pattern, manifest_locator_or_text):
88             self._manifest_text = manifest_locator_or_text
89             self._manifest_locator = None
90         else:
91             raise errors.ArgumentError(
92                 "Argument to CollectionReader must be a manifest or a collection UUID")
93         self._api_response = None
94         self._streams = None
95
96     def _populate_from_api_server(self):
97         # As in KeepClient itself, we must wait until the last
98         # possible moment to instantiate an API client, in order to
99         # avoid tripping up clients that don't have access to an API
100         # server.  If we do build one, make sure our Keep client uses
101         # it.  If instantiation fails, we'll fall back to the except
102         # clause, just like any other Collection lookup
103         # failure. Return an exception, or None if successful.
104         try:
105             if self._api_client is None:
106                 self._api_client = arvados.api('v1')
107                 self._keep_client = None  # Make a new one with the new api.
108             self._api_response = self._api_client.collections().get(
109                 uuid=self._manifest_locator).execute(
110                 num_retries=self.num_retries)
111             self._manifest_text = self._api_response['manifest_text']
112             return None
113         except Exception as e:
114             return e
115
116     def _populate_from_keep(self):
117         # Retrieve a manifest directly from Keep. This has a chance of
118         # working if [a] the locator includes a permission signature
119         # or [b] the Keep services are operating in world-readable
120         # mode. Return an exception, or None if successful.
121         try:
122             self._manifest_text = self._my_keep().get(
123                 self._manifest_locator, num_retries=self.num_retries)
124         except Exception as e:
125             return e
126
127     def _populate(self):
128         error_via_api = None
129         error_via_keep = None
130         should_try_keep = ((self._manifest_text is None) and
131                            util.keep_locator_pattern.match(
132                 self._manifest_locator))
133         if ((self._manifest_text is None) and
134             util.signed_locator_pattern.match(self._manifest_locator)):
135             error_via_keep = self._populate_from_keep()
136         if self._manifest_text is None:
137             error_via_api = self._populate_from_api_server()
138             if error_via_api is not None and not should_try_keep:
139                 raise error_via_api
140         if ((self._manifest_text is None) and
141             not error_via_keep and
142             should_try_keep):
143             # Looks like a keep locator, and we didn't already try keep above
144             error_via_keep = self._populate_from_keep()
145         if self._manifest_text is None:
146             # Nothing worked!
147             raise arvados.errors.NotFoundError(
148                 ("Failed to retrieve collection '{}' " +
149                  "from either API server ({}) or Keep ({})."
150                  ).format(
151                     self._manifest_locator,
152                     error_via_api,
153                     error_via_keep))
154         self._streams = [sline.split()
155                          for sline in self._manifest_text.split("\n")
156                          if sline]
157
158     def _populate_first(orig_func):
159         # Decorator for methods that read actual Collection data.
160         @functools.wraps(orig_func)
161         def wrapper(self, *args, **kwargs):
162             if self._streams is None:
163                 self._populate()
164             return orig_func(self, *args, **kwargs)
165         return wrapper
166
167     @_populate_first
168     def api_response(self):
169         """api_response() -> dict or None
170
171         Returns information about this Collection fetched from the API server.
172         If the Collection exists in Keep but not the API server, currently
173         returns None.  Future versions may provide a synthetic response.
174         """
175         return self._api_response
176
177     @_populate_first
178     def normalize(self):
179         # Rearrange streams
180         streams = {}
181         for s in self.all_streams():
182             for f in s.all_files():
183                 streamname, filename = split(s.name() + "/" + f.name())
184                 if streamname not in streams:
185                     streams[streamname] = {}
186                 if filename not in streams[streamname]:
187                     streams[streamname][filename] = []
188                 for r in f.segments:
189                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
190
191         self._streams = [normalize_stream(s, streams[s])
192                          for s in sorted(streams)]
193
194         # Regenerate the manifest text based on the normalized streams
195         self._manifest_text = ''.join(
196             [StreamReader(stream, keep=self._my_keep()).manifest_text()
197              for stream in self._streams])
198
199     @_populate_first
200     def open(self, streampath, filename=None):
201         """open(streampath[, filename]) -> file-like object
202
203         Pass in the path of a file to read from the Collection, either as a
204         single string or as two separate stream name and file name arguments.
205         This method returns a file-like object to read that file.
206         """
207         if filename is None:
208             streampath, filename = split(streampath)
209         keep_client = self._my_keep()
210         for stream_s in self._streams:
211             stream = StreamReader(stream_s, keep_client,
212                                   num_retries=self.num_retries)
213             if stream.name() == streampath:
214                 break
215         else:
216             raise ValueError("stream '{}' not found in Collection".
217                              format(streampath))
218         try:
219             return stream.files()[filename]
220         except KeyError:
221             raise ValueError("file '{}' not found in Collection stream '{}'".
222                              format(filename, streampath))
223
224     @_populate_first
225     def all_streams(self):
226         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
227                 for s in self._streams]
228
229     def all_files(self):
230         for s in self.all_streams():
231             for f in s.all_files():
232                 yield f
233
234     @_populate_first
235     def manifest_text(self, strip=False, normalize=False):
236         if normalize:
237             cr = CollectionReader(self.manifest_text())
238             cr.normalize()
239             return cr.manifest_text(strip=strip, normalize=False)
240         elif strip:
241             return self.stripped_manifest()
242         else:
243             return self._manifest_text
244
245
246 class _WriterFile(ArvadosFileBase):
247     def __init__(self, coll_writer, name):
248         super(_WriterFile, self).__init__(name, 'wb')
249         self.dest = coll_writer
250
251     def close(self):
252         super(_WriterFile, self).close()
253         self.dest.finish_current_file()
254
255     @ArvadosFileBase._before_close
256     def write(self, data):
257         self.dest.write(data)
258
259     @ArvadosFileBase._before_close
260     def writelines(self, seq):
261         for data in seq:
262             self.write(data)
263
264     @ArvadosFileBase._before_close
265     def flush(self):
266         self.dest.flush_data()
267
268
269 class CollectionWriter(CollectionBase):
270     def __init__(self, api_client=None, num_retries=0):
271         """Instantiate a CollectionWriter.
272
273         CollectionWriter lets you build a new Arvados Collection from scratch.
274         Write files to it.  The CollectionWriter will upload data to Keep as
275         appropriate, and provide you with the Collection manifest text when
276         you're finished.
277
278         Arguments:
279         * api_client: The API client to use to look up Collections.  If not
280           provided, CollectionReader will build one from available Arvados
281           configuration.
282         * num_retries: The default number of times to retry failed
283           service requests.  Default 0.  You may change this value
284           after instantiation, but note those changes may not
285           propagate to related objects like the Keep client.
286         """
287         self._api_client = api_client
288         self.num_retries = num_retries
289         self._keep_client = None
290         self._data_buffer = []
291         self._data_buffer_len = 0
292         self._current_stream_files = []
293         self._current_stream_length = 0
294         self._current_stream_locators = []
295         self._current_stream_name = '.'
296         self._current_file_name = None
297         self._current_file_pos = 0
298         self._finished_streams = []
299         self._close_file = None
300         self._queued_file = None
301         self._queued_dirents = deque()
302         self._queued_trees = deque()
303         self._last_open = None
304
305     def __exit__(self, exc_type, exc_value, traceback):
306         if exc_type is None:
307             self.finish()
308
309     def do_queued_work(self):
310         # The work queue consists of three pieces:
311         # * _queued_file: The file object we're currently writing to the
312         #   Collection.
313         # * _queued_dirents: Entries under the current directory
314         #   (_queued_trees[0]) that we want to write or recurse through.
315         #   This may contain files from subdirectories if
316         #   max_manifest_depth == 0 for this directory.
317         # * _queued_trees: Directories that should be written as separate
318         #   streams to the Collection.
319         # This function handles the smallest piece of work currently queued
320         # (current file, then current directory, then next directory) until
321         # no work remains.  The _work_THING methods each do a unit of work on
322         # THING.  _queue_THING methods add a THING to the work queue.
323         while True:
324             if self._queued_file:
325                 self._work_file()
326             elif self._queued_dirents:
327                 self._work_dirents()
328             elif self._queued_trees:
329                 self._work_trees()
330             else:
331                 break
332
333     def _work_file(self):
334         while True:
335             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
336             if not buf:
337                 break
338             self.write(buf)
339         self.finish_current_file()
340         if self._close_file:
341             self._queued_file.close()
342         self._close_file = None
343         self._queued_file = None
344
345     def _work_dirents(self):
346         path, stream_name, max_manifest_depth = self._queued_trees[0]
347         if stream_name != self.current_stream_name():
348             self.start_new_stream(stream_name)
349         while self._queued_dirents:
350             dirent = self._queued_dirents.popleft()
351             target = os.path.join(path, dirent)
352             if os.path.isdir(target):
353                 self._queue_tree(target,
354                                  os.path.join(stream_name, dirent),
355                                  max_manifest_depth - 1)
356             else:
357                 self._queue_file(target, dirent)
358                 break
359         if not self._queued_dirents:
360             self._queued_trees.popleft()
361
362     def _work_trees(self):
363         path, stream_name, max_manifest_depth = self._queued_trees[0]
364         d = util.listdir_recursive(
365             path, max_depth = (None if max_manifest_depth == 0 else 0))
366         if d:
367             self._queue_dirents(stream_name, d)
368         else:
369             self._queued_trees.popleft()
370
371     def _queue_file(self, source, filename=None):
372         assert (self._queued_file is None), "tried to queue more than one file"
373         if not hasattr(source, 'read'):
374             source = open(source, 'rb')
375             self._close_file = True
376         else:
377             self._close_file = False
378         if filename is None:
379             filename = os.path.basename(source.name)
380         self.start_new_file(filename)
381         self._queued_file = source
382
383     def _queue_dirents(self, stream_name, dirents):
384         assert (not self._queued_dirents), "tried to queue more than one tree"
385         self._queued_dirents = deque(sorted(dirents))
386
387     def _queue_tree(self, path, stream_name, max_manifest_depth):
388         self._queued_trees.append((path, stream_name, max_manifest_depth))
389
390     def write_file(self, source, filename=None):
391         self._queue_file(source, filename)
392         self.do_queued_work()
393
394     def write_directory_tree(self,
395                              path, stream_name='.', max_manifest_depth=-1):
396         self._queue_tree(path, stream_name, max_manifest_depth)
397         self.do_queued_work()
398
399     def write(self, newdata):
400         if hasattr(newdata, '__iter__'):
401             for s in newdata:
402                 self.write(s)
403             return
404         self._data_buffer.append(newdata)
405         self._data_buffer_len += len(newdata)
406         self._current_stream_length += len(newdata)
407         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
408             self.flush_data()
409
410     def open(self, streampath, filename=None):
411         """open(streampath[, filename]) -> file-like object
412
413         Pass in the path of a file to write to the Collection, either as a
414         single string or as two separate stream name and file name arguments.
415         This method returns a file-like object you can write to add it to the
416         Collection.
417
418         You may only have one file object from the Collection open at a time,
419         so be sure to close the object when you're done.  Using the object in
420         a with statement makes that easy::
421
422           with cwriter.open('./doc/page1.txt') as outfile:
423               outfile.write(page1_data)
424           with cwriter.open('./doc/page2.txt') as outfile:
425               outfile.write(page2_data)
426         """
427         if filename is None:
428             streampath, filename = split(streampath)
429         if self._last_open and not self._last_open.closed:
430             raise errors.AssertionError(
431                 "can't open '{}' when '{}' is still open".format(
432                     filename, self._last_open.name))
433         if streampath != self.current_stream_name():
434             self.start_new_stream(streampath)
435         self.set_current_file_name(filename)
436         self._last_open = _WriterFile(self, filename)
437         return self._last_open
438
439     def flush_data(self):
440         data_buffer = ''.join(self._data_buffer)
441         if data_buffer:
442             self._current_stream_locators.append(
443                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
444             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
445             self._data_buffer_len = len(self._data_buffer[0])
446
447     def start_new_file(self, newfilename=None):
448         self.finish_current_file()
449         self.set_current_file_name(newfilename)
450
451     def set_current_file_name(self, newfilename):
452         if re.search(r'[\t\n]', newfilename):
453             raise errors.AssertionError(
454                 "Manifest filenames cannot contain whitespace: %s" %
455                 newfilename)
456         elif re.search(r'\x00', newfilename):
457             raise errors.AssertionError(
458                 "Manifest filenames cannot contain NUL characters: %s" %
459                 newfilename)
460         self._current_file_name = newfilename
461
462     def current_file_name(self):
463         return self._current_file_name
464
465     def finish_current_file(self):
466         if self._current_file_name is None:
467             if self._current_file_pos == self._current_stream_length:
468                 return
469             raise errors.AssertionError(
470                 "Cannot finish an unnamed file " +
471                 "(%d bytes at offset %d in '%s' stream)" %
472                 (self._current_stream_length - self._current_file_pos,
473                  self._current_file_pos,
474                  self._current_stream_name))
475         self._current_stream_files.append([
476                 self._current_file_pos,
477                 self._current_stream_length - self._current_file_pos,
478                 self._current_file_name])
479         self._current_file_pos = self._current_stream_length
480         self._current_file_name = None
481
482     def start_new_stream(self, newstreamname='.'):
483         self.finish_current_stream()
484         self.set_current_stream_name(newstreamname)
485
486     def set_current_stream_name(self, newstreamname):
487         if re.search(r'[\t\n]', newstreamname):
488             raise errors.AssertionError(
489                 "Manifest stream names cannot contain whitespace")
490         self._current_stream_name = '.' if newstreamname=='' else newstreamname
491
492     def current_stream_name(self):
493         return self._current_stream_name
494
495     def finish_current_stream(self):
496         self.finish_current_file()
497         self.flush_data()
498         if not self._current_stream_files:
499             pass
500         elif self._current_stream_name is None:
501             raise errors.AssertionError(
502                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
503                 (self._current_stream_length, len(self._current_stream_files)))
504         else:
505             if not self._current_stream_locators:
506                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
507             self._finished_streams.append([self._current_stream_name,
508                                            self._current_stream_locators,
509                                            self._current_stream_files])
510         self._current_stream_files = []
511         self._current_stream_length = 0
512         self._current_stream_locators = []
513         self._current_stream_name = None
514         self._current_file_pos = 0
515         self._current_file_name = None
516
517     def finish(self):
518         # Store the manifest in Keep and return its locator.
519         return self._my_keep().put(self.manifest_text())
520
521     def portable_data_hash(self):
522         stripped = self.stripped_manifest()
523         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
524
525     def manifest_text(self):
526         self.finish_current_stream()
527         manifest = ''
528
529         for stream in self._finished_streams:
530             if not re.search(r'^\.(/.*)?$', stream[0]):
531                 manifest += './'
532             manifest += stream[0].replace(' ', '\\040')
533             manifest += ' ' + ' '.join(stream[1])
534             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
535             manifest += "\n"
536
537         return manifest
538
539     def data_locators(self):
540         ret = []
541         for name, locators, files in self._finished_streams:
542             ret += locators
543         return ret
544
545
546 class ResumableCollectionWriter(CollectionWriter):
547     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
548                    '_current_stream_locators', '_current_stream_name',
549                    '_current_file_name', '_current_file_pos', '_close_file',
550                    '_data_buffer', '_dependencies', '_finished_streams',
551                    '_queued_dirents', '_queued_trees']
552
553     def __init__(self, api_client=None, num_retries=0):
554         self._dependencies = {}
555         super(ResumableCollectionWriter, self).__init__(
556             api_client, num_retries=num_retries)
557
558     @classmethod
559     def from_state(cls, state, *init_args, **init_kwargs):
560         # Try to build a new writer from scratch with the given state.
561         # If the state is not suitable to resume (because files have changed,
562         # been deleted, aren't predictable, etc.), raise a
563         # StaleWriterStateError.  Otherwise, return the initialized writer.
564         # The caller is responsible for calling writer.do_queued_work()
565         # appropriately after it's returned.
566         writer = cls(*init_args, **init_kwargs)
567         for attr_name in cls.STATE_PROPS:
568             attr_value = state[attr_name]
569             attr_class = getattr(writer, attr_name).__class__
570             # Coerce the value into the same type as the initial value, if
571             # needed.
572             if attr_class not in (type(None), attr_value.__class__):
573                 attr_value = attr_class(attr_value)
574             setattr(writer, attr_name, attr_value)
575         # Check dependencies before we try to resume anything.
576         if any(KeepLocator(ls).permission_expired()
577                for ls in writer._current_stream_locators):
578             raise errors.StaleWriterStateError(
579                 "locators include expired permission hint")
580         writer.check_dependencies()
581         if state['_current_file'] is not None:
582             path, pos = state['_current_file']
583             try:
584                 writer._queued_file = open(path, 'rb')
585                 writer._queued_file.seek(pos)
586             except IOError as error:
587                 raise errors.StaleWriterStateError(
588                     "failed to reopen active file {}: {}".format(path, error))
589         return writer
590
591     def check_dependencies(self):
592         for path, orig_stat in self._dependencies.items():
593             if not S_ISREG(orig_stat[ST_MODE]):
594                 raise errors.StaleWriterStateError("{} not file".format(path))
595             try:
596                 now_stat = tuple(os.stat(path))
597             except OSError as error:
598                 raise errors.StaleWriterStateError(
599                     "failed to stat {}: {}".format(path, error))
600             if ((not S_ISREG(now_stat[ST_MODE])) or
601                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
602                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
603                 raise errors.StaleWriterStateError("{} changed".format(path))
604
605     def dump_state(self, copy_func=lambda x: x):
606         state = {attr: copy_func(getattr(self, attr))
607                  for attr in self.STATE_PROPS}
608         if self._queued_file is None:
609             state['_current_file'] = None
610         else:
611             state['_current_file'] = (os.path.realpath(self._queued_file.name),
612                                       self._queued_file.tell())
613         return state
614
615     def _queue_file(self, source, filename=None):
616         try:
617             src_path = os.path.realpath(source)
618         except Exception:
619             raise errors.AssertionError("{} not a file path".format(source))
620         try:
621             path_stat = os.stat(src_path)
622         except OSError as stat_error:
623             path_stat = None
624         super(ResumableCollectionWriter, self)._queue_file(source, filename)
625         fd_stat = os.fstat(self._queued_file.fileno())
626         if not S_ISREG(fd_stat.st_mode):
627             # We won't be able to resume from this cache anyway, so don't
628             # worry about further checks.
629             self._dependencies[source] = tuple(fd_stat)
630         elif path_stat is None:
631             raise errors.AssertionError(
632                 "could not stat {}: {}".format(source, stat_error))
633         elif path_stat.st_ino != fd_stat.st_ino:
634             raise errors.AssertionError(
635                 "{} changed between open and stat calls".format(source))
636         else:
637             self._dependencies[src_path] = tuple(fd_stat)
638
639     def write(self, data):
640         if self._queued_file is None:
641             raise errors.AssertionError(
642                 "resumable writer can't accept unsourced data")
643         return super(ResumableCollectionWriter, self).write(data)
644
645 ADD = "add"
646 DEL = "del"
647 MOD = "mod"
648
649 class SynchronizedCollectionBase(CollectionBase):
650     def __init__(self, parent=None):
651         self.parent = parent
652         self._items = {}
653
654     def _my_api(self):
655         raise NotImplementedError()
656
657     def _my_keep(self):
658         raise NotImplementedError()
659
660     def _my_block_manager(self):
661         raise NotImplementedError()
662
663     def _root_lock(self):
664         raise NotImplementedError()
665
666     def _populate(self):
667         raise NotImplementedError()
668
669     def sync_mode(self):
670         raise NotImplementedError()
671
672     def notify(self, collection, event, name, item):
673         raise NotImplementedError()
674
675     @_synchronized
676     def find(self, path, create=False, create_collection=False):
677         """Recursively search the specified file path.  May return either a Collection
678         or ArvadosFile.
679
680         :create:
681           If true, create path components (i.e. Collections) that are
682           missing.  If "create" is False, return None if a path component is
683           not found.
684
685         :create_collection:
686           If the path is not found, "create" is True, and
687           "create_collection" is False, then create and return a new
688           ArvadosFile for the last path component.  If "create_collection" is
689           True, then create and return a new Collection for the last path
690           component.
691
692         """
693         if create and self.sync_mode() == SYNC_READONLY:
694             raise IOError((errno.EROFS, "Collection is read only"))
695
696         p = path.split("/")
697         if p[0] == '.':
698             del p[0]
699
700         if p and p[0]:
701             item = self._items.get(p[0])
702             if len(p) == 1:
703                 # item must be a file
704                 if item is None and create:
705                     # create new file
706                     if create_collection:
707                         item = Subcollection(self)
708                     else:
709                         item = ArvadosFile(self)
710                     self._items[p[0]] = item
711                     self.notify(self, ADD, p[0], item)
712                 return item
713             else:
714                 if item is None and create:
715                     # create new collection
716                     item = Subcollection(self)
717                     self._items[p[0]] = item
718                     self.notify(self, ADD, p[0], item)
719                 del p[0]
720                 if isinstance(item, SynchronizedCollectionBase):
721                     return item.find("/".join(p), create=create)
722                 else:
723                     raise errors.ArgumentError("Interior path components must be subcollection")
724         else:
725             return self
726
727     def open(self, path, mode):
728         """Open a file-like object for access.
729
730         :path:
731           path to a file in the collection
732         :mode:
733           one of "r", "r+", "w", "w+", "a", "a+"
734           :"r":
735             opens for reading
736           :"r+":
737             opens for reading and writing.  Reads/writes share a file pointer.
738           :"w", "w+":
739             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
740           :"a", "a+":
741             opens for reading and writing.  All writes are appended to
742             the end of the file.  Writing does not affect the file pointer for
743             reading.
744         """
745         mode = mode.replace("b", "")
746         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
747             raise ArgumentError("Bad mode '%s'" % mode)
748         create = (mode != "r")
749
750         if create and self.sync_mode() == SYNC_READONLY:
751             raise IOError((errno.EROFS, "Collection is read only"))
752
753         f = self.find(path, create=create)
754
755         if f is None:
756             raise IOError((errno.ENOENT, "File not found"))
757         if not isinstance(f, ArvadosFile):
758             raise IOError((errno.EISDIR, "Path must refer to a file."))
759
760         if mode[0] == "w":
761             f.truncate(0)
762
763         if mode == "r":
764             return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
765         else:
766             return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
767
768     @_synchronized
769     def modified(self):
770         """Test if the collection (or any subcollection or file) has been modified
771         since it was created."""
772         for k,v in self._items.items():
773             if v.modified():
774                 return True
775         return False
776
777     @_synchronized
778     def set_unmodified(self):
779         """Recursively clear modified flag"""
780         for k,v in self._items.items():
781             v.set_unmodified()
782
783     @_synchronized
784     def __iter__(self):
785         """Iterate over names of files and collections contained in this collection."""
786         return self._items.keys()
787
788     @_synchronized
789     def iterkeys(self):
790         """Iterate over names of files and collections directly contained in this collection."""
791         return self._items.keys()
792
793     @_synchronized
794     def __getitem__(self, k):
795         """Get a file or collection that is directly contained by this collection.  If
796         you want to search a path, use `find()` instead.
797         """
798         return self._items[k]
799
800     @_synchronized
801     def __contains__(self, k):
802         """If there is a file or collection a directly contained by this collection
803         with name "k"."""
804         return k in self._items
805
806     @_synchronized
807     def __len__(self):
808         """Get the number of items directly contained in this collection"""
809         return len(self._items)
810
811     @_must_be_writable
812     @_synchronized
813     def __delitem__(self, p):
814         """Delete an item by name which is directly contained by this collection."""
815         del self._items[p]
816         self.notify(self, DEL, p, None)
817
818     @_synchronized
819     def keys(self):
820         """Get a list of names of files and collections directly contained in this collection."""
821         return self._items.keys()
822
823     @_synchronized
824     def values(self):
825         """Get a list of files and collection objects directly contained in this collection."""
826         return self._items.values()
827
828     @_synchronized
829     def items(self):
830         """Get a list of (name, object) tuples directly contained in this collection."""
831         return self._items.items()
832
833     def exists(self, path):
834         """Test if there is a file or collection at "path" """
835         return self.find(path) != None
836
837     @_must_be_writable
838     @_synchronized
839     def remove(self, path, rm_r=False):
840         """Remove the file or subcollection (directory) at `path`.
841         :rm_r:
842           Specify whether to remove non-empty subcollections (True), or raise an error (False).
843         """
844         p = path.split("/")
845         if p[0] == '.':
846             # Remove '.' from the front of the path
847             del p[0]
848
849         if len(p) > 0:
850             item = self._items.get(p[0])
851             if item is None:
852                 raise IOError((errno.ENOENT, "File not found"))
853             if len(p) == 1:
854                 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
855                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
856                 del self._items[p[0]]
857                 self.notify(self, DEL, p[0], None)
858             else:
859                 del p[0]
860                 item.remove("/".join(p))
861         else:
862             raise IOError((errno.ENOENT, "File not found"))
863
864     def _cloneinto(self, target):
865         for k,v in self._items.items():
866             target._items[k] = v.clone(target)
867
868     def clone(self):
869         raise NotImplementedError()
870
871     @_must_be_writable
872     @_synchronized
873     def copy(self, source, target_path, source_collection=None, overwrite=False):
874         """Copy a file or subcollection to a new path in this collection.
875
876         :source:
877           An ArvadosFile, Subcollection, or string with a path to source file or subcollection
878
879         :target_path:
880           Destination file or path.  If the target path already exists and is a
881           subcollection, the item will be placed inside the subcollection.  If
882           the target path already exists and is a file, this will raise an error
883           unless you specify `overwrite=True`.
884
885         :source_collection:
886           Collection to copy `source_path` from (default `self`)
887
888         :overwrite:
889           Whether to overwrite target file if it already exists.
890         """
891         if source_collection is None:
892             source_collection = self
893
894         # Find the object to copy
895         if isinstance(source, basestring):
896             source_obj = source_collection.find(source)
897             if source_obj is None:
898                 raise IOError((errno.ENOENT, "File not found"))
899             sp = source.split("/")
900         else:
901             source_obj = source
902             sp = None
903
904         # Find parent collection the target path
905         tp = target_path.split("/")
906
907         # Determine the name to use.
908         target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
909
910         if not target_name:
911             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
912
913         target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
914
915         if target_name in target_dir:
916             if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
917                 target_dir = target_dir[target_name]
918                 target_name = sp[-1]
919             elif not overwrite:
920                 raise IOError((errno.EEXIST, "File already exists"))
921
922         # Actually make the copy.
923         dup = source_obj.clone(target_dir)
924         with target_dir.lock:
925             target_dir._items[target_name] = dup
926
927         self.notify(target_dir, ADD, target_name, dup)
928
929
930     @_synchronized
931     def manifest_text(self, strip=False, normalize=False):
932         """Get the manifest text for this collection, sub collections and files.
933
934         :strip:
935           If True, remove signing tokens from block locators if present.
936           If False, block locators are left unchanged.
937
938         :normalize:
939           If True, always export the manifest text in normalized form
940           even if the Collection is not modified.  If False and the collection
941           is not modified, return the original manifest text even if it is not
942           in normalized form.
943
944         """
945         if self.modified() or self._manifest_text is None or normalize:
946             return export_manifest(self, stream_name=".", portable_locators=strip)
947         else:
948             if strip:
949                 return self.stripped_manifest()
950             else:
951                 return self._manifest_text
952
953     @_synchronized
954     def diff(self, start_collection, prefix="."):
955         """
956         Generate list of add/delete actions which change `start_collection` to result in `self`
957         """
958         changes = []
959         for k in start_collection:
960             if k not in self:
961                changes.append((DEL, os.path.join(prefix, k), start_collection[k]))
962         for k in self:
963             if k in start_collection:
964                 if isinstance(self[k], Subcollection) and isinstance(start_collection[k], Subcollection):
965                     changes.extend(self[k].diff(start_collection[k], os.path.join(prefix, k)))
966                 elif self[k] != start_collection[k]:
967                     changes.append((MOD, os.path.join(prefix, k), start_collection[k], self[k]))
968             else:
969                 changes.append((ADD, os.path.join(prefix, k), self[k]))
970         return changes
971
972     @_must_be_writable
973     @_synchronized
974     def apply(self, changes):
975         """
976         Apply changes from `diff`.  If a change conflicts with a local change, it
977         will be saved to an alternate path indicating the conflict.
978         """
979         for c in changes:
980             path = c[1]
981             initial = c[2]
982             local = self.find(path)
983             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
984                                                                     time.gmtime()))
985             if c[0] == ADD:
986                 if local is None:
987                     # No local file at path, safe to copy over new file
988                     self.copy(initial, path)
989                 elif local is not None and local != initial:
990                     # There is already local file and it is different:
991                     # save change to conflict file.
992                     self.copy(initial, conflictpath)
993             elif c[1] == MOD:
994                 if local == initial:
995                     # Local matches the "initial" item so assume it hasn't
996                     # changed locally and is safe to update.
997                     if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
998                         # Replace contents of local file with new contents
999                         local.replace_contents(c[3])
1000                     else:
1001                         # Overwrite path with new item; this can happen if if
1002                         # path was a file and is now a collection or vice versa
1003                         self.copy(c[3], path, overwrite=True)
1004                 else:
1005                     # Local is missing (presumably deleted) or local doesn't
1006                     # match the "start" value, so save change to conflict file
1007                     self.copy(c[3], conflictpath)
1008             elif c[1] == DEL:
1009                 if local == initial
1010                     # Local item matches "initial" value, so it is safe to remove.
1011                     self.remove(path, rm_r=True)
1012                 # else, the file is modified or already removed, in either
1013                 # case we don't want to try to remove it.
1014
1015     def portable_data_hash(self):
1016         """Get the portable data hash for this collection's manifest."""
1017         stripped = self.manifest_text(strip=True)
1018         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1019
1020
1021 class Collection(SynchronizedCollectionBase):
1022     """Store an Arvados collection, consisting of a set of files and
1023     sub-collections.
1024     """
1025
1026     def __init__(self, manifest_locator_or_text=None,
1027                  parent=None,
1028                  config=None,
1029                  api_client=None,
1030                  keep_client=None,
1031                  num_retries=None,
1032                  block_manager=None,
1033                  sync=SYNC_READONLY):
1034         """:manifest_locator_or_text:
1035           One of Arvados collection UUID, block locator of
1036           a manifest, raw manifest text, or None (to create an empty collection).
1037         :parent:
1038           the parent Collection, may be None.
1039         :config:
1040           the arvados configuration to get the hostname and api token.
1041           Prefer this over supplying your own api_client and keep_client (except in testing).
1042           Will use default config settings if not specified.
1043         :api_client:
1044           The API client object to use for requests.  If not specified, create one using `config`.
1045         :keep_client:
1046           the Keep client to use for requests.  If not specified, create one using `config`.
1047         :num_retries:
1048           the number of retries for API and Keep requests.
1049         :block_manager:
1050           the block manager to use.  If not specified, create one.
1051         :sync:
1052           Set synchronization policy with API server collection record.
1053           :SYNC_READONLY:
1054             Collection is read only.  No synchronization.  This mode will
1055             also forego locking, which gives better performance.
1056           :SYNC_EXPLICIT:
1057             Synchronize on explicit request via `update()` or `save()`
1058           :SYNC_LIVE:
1059             Synchronize with server in response to background websocket events,
1060             on block write, or on file close.
1061
1062         """
1063         super(Collection, self).__init__(parent)
1064         self._api_client = api_client
1065         self._keep_client = keep_client
1066         self._block_manager = block_manager
1067         self._config = config
1068         self.num_retries = num_retries
1069         self._manifest_locator = None
1070         self._manifest_text = None
1071         self._api_response = None
1072         self._sync = sync
1073         self.lock = threading.RLock()
1074         self.callbacks = []
1075         self.events = None
1076         self._baseline_manifest
1077
1078         if manifest_locator_or_text:
1079             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1080                 self._manifest_locator = manifest_locator_or_text
1081             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1082                 self._manifest_locator = manifest_locator_or_text
1083             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1084                 self._manifest_text = manifest_locator_or_text
1085             else:
1086                 raise errors.ArgumentError(
1087                     "Argument to CollectionReader must be a manifest or a collection UUID")
1088
1089             self._populate()
1090
1091             if self._sync == SYNC_LIVE:
1092                 if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
1093                     raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
1094                 self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
1095
1096     @staticmethod
1097     def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
1098         c = Collection(sync=sync)
1099         c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1100         return c
1101
1102     def _root_lock(self):
1103         return self.lock
1104
1105     def sync_mode(self):
1106         return self._sync
1107
1108     def on_message(self):
1109         self.update()
1110
1111     @_synchronized
1112     def update(self):
1113         n = self._my_api().collections().get(uuid=self._manifest_locator).execute()
1114         other = import_collection(n["manifest_text"])
1115         baseline = import_collection(self._baseline_manifest)
1116         self.apply(other.diff(baseline))
1117
1118     @_synchronized
1119     def _my_api(self):
1120         if self._api_client is None:
1121             self._api_client = arvados.SafeApi(self._config)
1122             self._keep_client = self._api_client.keep
1123         return self._api_client
1124
1125     @_synchronized
1126     def _my_keep(self):
1127         if self._keep_client is None:
1128             if self._api_client is None:
1129                 self._my_api()
1130             else:
1131                 self._keep_client = KeepClient(api=self._api_client)
1132         return self._keep_client
1133
1134     @_synchronized
1135     def _my_block_manager(self):
1136         if self._block_manager is None:
1137             self._block_manager = BlockManager(self._my_keep())
1138         return self._block_manager
1139
1140     def _populate_from_api_server(self):
1141         # As in KeepClient itself, we must wait until the last
1142         # possible moment to instantiate an API client, in order to
1143         # avoid tripping up clients that don't have access to an API
1144         # server.  If we do build one, make sure our Keep client uses
1145         # it.  If instantiation fails, we'll fall back to the except
1146         # clause, just like any other Collection lookup
1147         # failure. Return an exception, or None if successful.
1148         try:
1149             self._api_response = self._my_api().collections().get(
1150                 uuid=self._manifest_locator).execute(
1151                     num_retries=self.num_retries)
1152             self._manifest_text = self._api_response['manifest_text']
1153             return None
1154         except Exception as e:
1155             return e
1156
1157     def _populate_from_keep(self):
1158         # Retrieve a manifest directly from Keep. This has a chance of
1159         # working if [a] the locator includes a permission signature
1160         # or [b] the Keep services are operating in world-readable
1161         # mode. Return an exception, or None if successful.
1162         try:
1163             self._manifest_text = self._my_keep().get(
1164                 self._manifest_locator, num_retries=self.num_retries)
1165         except Exception as e:
1166             return e
1167
1168     def _populate(self):
1169         if self._manifest_locator is None and self._manifest_text is None:
1170             return
1171         error_via_api = None
1172         error_via_keep = None
1173         should_try_keep = ((self._manifest_text is None) and
1174                            util.keep_locator_pattern.match(
1175                                self._manifest_locator))
1176         if ((self._manifest_text is None) and
1177             util.signed_locator_pattern.match(self._manifest_locator)):
1178             error_via_keep = self._populate_from_keep()
1179         if self._manifest_text is None:
1180             error_via_api = self._populate_from_api_server()
1181             if error_via_api is not None and not should_try_keep:
1182                 raise error_via_api
1183         if ((self._manifest_text is None) and
1184             not error_via_keep and
1185             should_try_keep):
1186             # Looks like a keep locator, and we didn't already try keep above
1187             error_via_keep = self._populate_from_keep()
1188         if self._manifest_text is None:
1189             # Nothing worked!
1190             raise arvados.errors.NotFoundError(
1191                 ("Failed to retrieve collection '{}' " +
1192                  "from either API server ({}) or Keep ({})."
1193                  ).format(
1194                     self._manifest_locator,
1195                     error_via_api,
1196                     error_via_keep))
1197         # populate
1198         self._baseline_manifest = self._manifest_text
1199         import_manifest(self._manifest_text, self)
1200
1201         if self._sync == SYNC_READONLY:
1202             # Now that we're populated, knowing that this will be readonly,
1203             # forego any further locking.
1204             self.lock = NoopLock()
1205
1206     def __enter__(self):
1207         return self
1208
1209     def __exit__(self, exc_type, exc_value, traceback):
1210         """Support scoped auto-commit in a with: block"""
1211         if self._sync != SYNC_READONLY:
1212             self.save(allow_no_locator=True)
1213         if self._block_manager is not None:
1214             self._block_manager.stop_threads()
1215
1216     @_synchronized
1217     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1218         if new_config is None:
1219             new_config = self._config
1220         c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1221         if new_sync == SYNC_READONLY:
1222             c.lock = NoopLock()
1223         c._items = {}
1224         self._cloneinto(c)
1225         return c
1226
1227     @_synchronized
1228     def api_response(self):
1229         """
1230         api_response() -> dict or None
1231
1232         Returns information about this Collection fetched from the API server.
1233         If the Collection exists in Keep but not the API server, currently
1234         returns None.  Future versions may provide a synthetic response.
1235         """
1236         return self._api_response
1237
1238     @_must_be_writable
1239     @_synchronized
1240     def save(self, allow_no_locator=False):
1241         """Commit pending buffer blocks to Keep, write the manifest to Keep, and
1242         update the collection record to Keep.
1243
1244         :allow_no_locator:
1245           If there is no collection uuid associated with this
1246           Collection and `allow_no_locator` is False, raise an error.  If True,
1247           do not raise an error.
1248         """
1249         if self.modified():
1250             self._my_block_manager().commit_all()
1251             self._my_keep().put(self.manifest_text(strip=True))
1252             if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
1253                 self._api_response = self._my_api().collections().update(
1254                     uuid=self._manifest_locator,
1255                     body={'manifest_text': self.manifest_text(strip=False)}
1256                     ).execute(
1257                         num_retries=self.num_retries)
1258             elif not allow_no_locator:
1259                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
1260             self.set_unmodified()
1261
1262     @_must_be_writable
1263     @_synchronized
1264     def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1265         """Save a new collection record.
1266
1267         :name:
1268           The collection name.
1269
1270         :owner_uuid:
1271           the user, or project uuid that will own this collection.
1272           If None, defaults to the current user.
1273
1274         :ensure_unique_name:
1275           If True, ask the API server to rename the collection
1276           if it conflicts with a collection with the same name and owner.  If
1277           False, a name conflict will result in an error.
1278
1279         """
1280         self._my_block_manager().commit_all()
1281         self._my_keep().put(self.manifest_text(strip=True))
1282         body = {"manifest_text": self.manifest_text(strip=False),
1283                 "name": name}
1284         if owner_uuid:
1285             body["owner_uuid"] = owner_uuid
1286         self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1287
1288         if self.events:
1289             self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1290
1291         self._manifest_locator = self._api_response["uuid"]
1292
1293         if self.events:
1294             self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1295
1296         self.set_unmodified()
1297
1298     @_synchronized
1299     def subscribe(self, callback):
1300         self.callbacks.append(callback)
1301
1302     @_synchronized
1303     def unsubscribe(self, callback):
1304         self.callbacks.remove(callback)
1305
1306     @_synchronized
1307     def notify(self, collection, event, name, item):
1308         for c in self.callbacks:
1309             c(collection, event, name, item)
1310
1311 class Subcollection(SynchronizedCollectionBase):
1312     """This is a subdirectory within a collection that doesn't have its own API
1313     server record.  It falls under the umbrella of the root collection."""
1314
1315     def __init__(self, parent):
1316         super(Subcollection, self).__init__(parent)
1317         self.lock = parent._root_lock()
1318
1319     def _root_lock(self):
1320         return self.parent._root_lock()
1321
1322     def sync_mode(self):
1323         return self.parent.sync_mode()
1324
1325     def _my_api(self):
1326         return self.parent._my_api()
1327
1328     def _my_keep(self):
1329         return self.parent._my_keep()
1330
1331     def _my_block_manager(self):
1332         return self.parent._my_block_manager()
1333
1334     def _populate(self):
1335         self.parent._populate()
1336
1337     def notify(self, collection, event, name, item):
1338         self.parent.notify(collection, event, name, item)
1339
1340     @_synchronized
1341     def clone(self, new_parent):
1342         c = Subcollection(new_parent)
1343         c._items = {}
1344         self._cloneinto(c)
1345         return c
1346
1347 def import_manifest(manifest_text,
1348                     into_collection=None,
1349                     api_client=None,
1350                     keep=None,
1351                     num_retries=None,
1352                     sync=SYNC_READONLY):
1353     """Import a manifest into a `Collection`.
1354
1355     :manifest_text:
1356       The manifest text to import from.
1357
1358     :into_collection:
1359       The `Collection` that will be initialized (must be empty).
1360       If None, create a new `Collection` object.
1361
1362     :api_client:
1363       The API client object that will be used when creating a new `Collection` object.
1364
1365     :keep:
1366       The keep client object that will be used when creating a new `Collection` object.
1367
1368     :num_retries:
1369       the default number of api client and keep retries on error.
1370
1371     :sync:
1372       Collection sync mode (only if into_collection is None)
1373     """
1374     if into_collection is not None:
1375         if len(into_collection) > 0:
1376             raise ArgumentError("Can only import manifest into an empty collection")
1377         c = into_collection
1378     else:
1379         c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1380
1381     save_sync = c.sync_mode()
1382     c._sync = None
1383
1384     STREAM_NAME = 0
1385     BLOCKS = 1
1386     SEGMENTS = 2
1387
1388     stream_name = None
1389     state = STREAM_NAME
1390
1391     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1392         tok = n.group(1)
1393         sep = n.group(2)
1394
1395         if state == STREAM_NAME:
1396             # starting a new stream
1397             stream_name = tok.replace('\\040', ' ')
1398             blocks = []
1399             segments = []
1400             streamoffset = 0L
1401             state = BLOCKS
1402             continue
1403
1404         if state == BLOCKS:
1405             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1406             if s:
1407                 blocksize = long(s.group(1))
1408                 blocks.append(Range(tok, streamoffset, blocksize))
1409                 streamoffset += blocksize
1410             else:
1411                 state = SEGMENTS
1412
1413         if state == SEGMENTS:
1414             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1415             if s:
1416                 pos = long(s.group(1))
1417                 size = long(s.group(2))
1418                 name = s.group(3).replace('\\040', ' ')
1419                 f = c.find("%s/%s" % (stream_name, name), create=True)
1420                 f.add_segment(blocks, pos, size)
1421             else:
1422                 # error!
1423                 raise errors.SyntaxError("Invalid manifest format")
1424
1425         if sep == "\n":
1426             stream_name = None
1427             state = STREAM_NAME
1428
1429     c.set_unmodified()
1430     c._sync = save_sync
1431     return c
1432
1433 def export_manifest(item, stream_name=".", portable_locators=False):
1434     """
1435     :item:
1436       Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
1437       `item` is a is a `Collection`, this will also export subcollections.
1438
1439     :stream_name:
1440       the name of the stream when exporting `item`.
1441
1442     :portable_locators:
1443       If True, strip any permission hints on block locators.
1444       If False, use block locators as-is.
1445     """
1446     buf = ""
1447     if isinstance(item, SynchronizedCollectionBase):
1448         stream = {}
1449         sorted_keys = sorted(item.keys())
1450         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1451             v = item[k]
1452             st = []
1453             for s in v.segments():
1454                 loc = s.locator
1455                 if loc.startswith("bufferblock"):
1456                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1457                 if portable_locators:
1458                     loc = KeepLocator(loc).stripped()
1459                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1460                                      s.segment_offset, s.range_size))
1461             stream[k] = st
1462         if stream:
1463             buf += ' '.join(normalize_stream(stream_name, stream))
1464             buf += "\n"
1465         for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1466             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1467     elif isinstance(item, ArvadosFile):
1468         st = []
1469         for s in item.segments:
1470             loc = s.locator
1471             if loc.startswith("bufferblock"):
1472                 loc = item._bufferblocks[loc].calculate_locator()
1473             if portable_locators:
1474                 loc = KeepLocator(loc).stripped()
1475             st.append(LocatorAndRange(loc, locator_block_size(loc),
1476                                  s.segment_offset, s.range_size))
1477         stream[stream_name] = st
1478         buf += ' '.join(normalize_stream(stream_name, stream))
1479         buf += "\n"
1480     return buf