Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import time
7
8 from collections import deque
9 from stat import *
10
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
12 from keep import *
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 from .safeapi import SafeApi
16 import config
17 import errors
18 import util
19 import events
20 from arvados.retry import retry_method
21
22 _logger = logging.getLogger('arvados.collection')
23
24 class CollectionBase(object):
25     def __enter__(self):
26         return self
27
28     def __exit__(self, exc_type, exc_value, traceback):
29         pass
30
31     def _my_keep(self):
32         if self._keep_client is None:
33             self._keep_client = KeepClient(api_client=self._api_client,
34                                            num_retries=self.num_retries)
35         return self._keep_client
36
37     def stripped_manifest(self):
38         """
39         Return the manifest for the current collection with all
40         non-portable hints (i.e., permission signatures and other
41         hints other than size hints) removed from the locators.
42         """
43         raw = self.manifest_text()
44         clean = []
45         for line in raw.split("\n"):
46             fields = line.split()
47             if fields:
48                 clean_fields = fields[:1] + [
49                     (re.sub(r'\+[^\d][^\+]*', '', x)
50                      if re.match(util.keep_locator_pattern, x)
51                      else x)
52                     for x in fields[1:]]
53                 clean += [' '.join(clean_fields), "\n"]
54         return ''.join(clean)
55
56
57 class CollectionReader(CollectionBase):
58     def __init__(self, manifest_locator_or_text, api_client=None,
59                  keep_client=None, num_retries=0):
60         """Instantiate a CollectionReader.
61
62         This class parses Collection manifests to provide a simple interface
63         to read its underlying files.
64
65         Arguments:
66         * manifest_locator_or_text: One of a Collection UUID, portable data
67           hash, or full manifest text.
68         * api_client: The API client to use to look up Collections.  If not
69           provided, CollectionReader will build one from available Arvados
70           configuration.
71         * keep_client: The KeepClient to use to download Collection data.
72           If not provided, CollectionReader will build one from available
73           Arvados configuration.
74         * num_retries: The default number of times to retry failed
75           service requests.  Default 0.  You may change this value
76           after instantiation, but note those changes may not
77           propagate to related objects like the Keep client.
78         """
79         self._api_client = api_client
80         self._keep_client = keep_client
81         self.num_retries = num_retries
82         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
83             self._manifest_locator = manifest_locator_or_text
84             self._manifest_text = None
85         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
86             self._manifest_locator = manifest_locator_or_text
87             self._manifest_text = None
88         elif re.match(util.manifest_pattern, manifest_locator_or_text):
89             self._manifest_text = manifest_locator_or_text
90             self._manifest_locator = None
91         else:
92             raise errors.ArgumentError(
93                 "Argument to CollectionReader must be a manifest or a collection UUID")
94         self._api_response = None
95         self._streams = None
96
97     def _populate_from_api_server(self):
98         # As in KeepClient itself, we must wait until the last
99         # possible moment to instantiate an API client, in order to
100         # avoid tripping up clients that don't have access to an API
101         # server.  If we do build one, make sure our Keep client uses
102         # it.  If instantiation fails, we'll fall back to the except
103         # clause, just like any other Collection lookup
104         # failure. Return an exception, or None if successful.
105         try:
106             if self._api_client is None:
107                 self._api_client = arvados.api('v1')
108                 self._keep_client = None  # Make a new one with the new api.
109             self._api_response = self._api_client.collections().get(
110                 uuid=self._manifest_locator).execute(
111                 num_retries=self.num_retries)
112             self._manifest_text = self._api_response['manifest_text']
113             return None
114         except Exception as e:
115             return e
116
117     def _populate_from_keep(self):
118         # Retrieve a manifest directly from Keep. This has a chance of
119         # working if [a] the locator includes a permission signature
120         # or [b] the Keep services are operating in world-readable
121         # mode. Return an exception, or None if successful.
122         try:
123             self._manifest_text = self._my_keep().get(
124                 self._manifest_locator, num_retries=self.num_retries)
125         except Exception as e:
126             return e
127
128     def _populate(self):
129         error_via_api = None
130         error_via_keep = None
131         should_try_keep = ((self._manifest_text is None) and
132                            util.keep_locator_pattern.match(
133                 self._manifest_locator))
134         if ((self._manifest_text is None) and
135             util.signed_locator_pattern.match(self._manifest_locator)):
136             error_via_keep = self._populate_from_keep()
137         if self._manifest_text is None:
138             error_via_api = self._populate_from_api_server()
139             if error_via_api is not None and not should_try_keep:
140                 raise error_via_api
141         if ((self._manifest_text is None) and
142             not error_via_keep and
143             should_try_keep):
144             # Looks like a keep locator, and we didn't already try keep above
145             error_via_keep = self._populate_from_keep()
146         if self._manifest_text is None:
147             # Nothing worked!
148             raise arvados.errors.NotFoundError(
149                 ("Failed to retrieve collection '{}' " +
150                  "from either API server ({}) or Keep ({})."
151                  ).format(
152                     self._manifest_locator,
153                     error_via_api,
154                     error_via_keep))
155         self._streams = [sline.split()
156                          for sline in self._manifest_text.split("\n")
157                          if sline]
158
159     def _populate_first(orig_func):
160         # Decorator for methods that read actual Collection data.
161         @functools.wraps(orig_func)
162         def wrapper(self, *args, **kwargs):
163             if self._streams is None:
164                 self._populate()
165             return orig_func(self, *args, **kwargs)
166         return wrapper
167
168     @_populate_first
169     def api_response(self):
170         """api_response() -> dict or None
171
172         Returns information about this Collection fetched from the API server.
173         If the Collection exists in Keep but not the API server, currently
174         returns None.  Future versions may provide a synthetic response.
175         """
176         return self._api_response
177
178     @_populate_first
179     def normalize(self):
180         # Rearrange streams
181         streams = {}
182         for s in self.all_streams():
183             for f in s.all_files():
184                 streamname, filename = split(s.name() + "/" + f.name())
185                 if streamname not in streams:
186                     streams[streamname] = {}
187                 if filename not in streams[streamname]:
188                     streams[streamname][filename] = []
189                 for r in f.segments:
190                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
191
192         self._streams = [normalize_stream(s, streams[s])
193                          for s in sorted(streams)]
194
195         # Regenerate the manifest text based on the normalized streams
196         self._manifest_text = ''.join(
197             [StreamReader(stream, keep=self._my_keep()).manifest_text()
198              for stream in self._streams])
199
200     @_populate_first
201     def open(self, streampath, filename=None):
202         """open(streampath[, filename]) -> file-like object
203
204         Pass in the path of a file to read from the Collection, either as a
205         single string or as two separate stream name and file name arguments.
206         This method returns a file-like object to read that file.
207         """
208         if filename is None:
209             streampath, filename = split(streampath)
210         keep_client = self._my_keep()
211         for stream_s in self._streams:
212             stream = StreamReader(stream_s, keep_client,
213                                   num_retries=self.num_retries)
214             if stream.name() == streampath:
215                 break
216         else:
217             raise ValueError("stream '{}' not found in Collection".
218                              format(streampath))
219         try:
220             return stream.files()[filename]
221         except KeyError:
222             raise ValueError("file '{}' not found in Collection stream '{}'".
223                              format(filename, streampath))
224
225     @_populate_first
226     def all_streams(self):
227         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
228                 for s in self._streams]
229
230     def all_files(self):
231         for s in self.all_streams():
232             for f in s.all_files():
233                 yield f
234
235     @_populate_first
236     def manifest_text(self, strip=False, normalize=False):
237         if normalize:
238             cr = CollectionReader(self.manifest_text())
239             cr.normalize()
240             return cr.manifest_text(strip=strip, normalize=False)
241         elif strip:
242             return self.stripped_manifest()
243         else:
244             return self._manifest_text
245
246
247 class _WriterFile(ArvadosFileBase):
248     def __init__(self, coll_writer, name):
249         super(_WriterFile, self).__init__(name, 'wb')
250         self.dest = coll_writer
251
252     def close(self):
253         super(_WriterFile, self).close()
254         self.dest.finish_current_file()
255
256     @ArvadosFileBase._before_close
257     def write(self, data):
258         self.dest.write(data)
259
260     @ArvadosFileBase._before_close
261     def writelines(self, seq):
262         for data in seq:
263             self.write(data)
264
265     @ArvadosFileBase._before_close
266     def flush(self):
267         self.dest.flush_data()
268
269
270 class CollectionWriter(CollectionBase):
271     def __init__(self, api_client=None, num_retries=0):
272         """Instantiate a CollectionWriter.
273
274         CollectionWriter lets you build a new Arvados Collection from scratch.
275         Write files to it.  The CollectionWriter will upload data to Keep as
276         appropriate, and provide you with the Collection manifest text when
277         you're finished.
278
279         Arguments:
280         * api_client: The API client to use to look up Collections.  If not
281           provided, CollectionReader will build one from available Arvados
282           configuration.
283         * num_retries: The default number of times to retry failed
284           service requests.  Default 0.  You may change this value
285           after instantiation, but note those changes may not
286           propagate to related objects like the Keep client.
287         """
288         self._api_client = api_client
289         self.num_retries = num_retries
290         self._keep_client = None
291         self._data_buffer = []
292         self._data_buffer_len = 0
293         self._current_stream_files = []
294         self._current_stream_length = 0
295         self._current_stream_locators = []
296         self._current_stream_name = '.'
297         self._current_file_name = None
298         self._current_file_pos = 0
299         self._finished_streams = []
300         self._close_file = None
301         self._queued_file = None
302         self._queued_dirents = deque()
303         self._queued_trees = deque()
304         self._last_open = None
305
306     def __exit__(self, exc_type, exc_value, traceback):
307         if exc_type is None:
308             self.finish()
309
310     def do_queued_work(self):
311         # The work queue consists of three pieces:
312         # * _queued_file: The file object we're currently writing to the
313         #   Collection.
314         # * _queued_dirents: Entries under the current directory
315         #   (_queued_trees[0]) that we want to write or recurse through.
316         #   This may contain files from subdirectories if
317         #   max_manifest_depth == 0 for this directory.
318         # * _queued_trees: Directories that should be written as separate
319         #   streams to the Collection.
320         # This function handles the smallest piece of work currently queued
321         # (current file, then current directory, then next directory) until
322         # no work remains.  The _work_THING methods each do a unit of work on
323         # THING.  _queue_THING methods add a THING to the work queue.
324         while True:
325             if self._queued_file:
326                 self._work_file()
327             elif self._queued_dirents:
328                 self._work_dirents()
329             elif self._queued_trees:
330                 self._work_trees()
331             else:
332                 break
333
334     def _work_file(self):
335         while True:
336             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
337             if not buf:
338                 break
339             self.write(buf)
340         self.finish_current_file()
341         if self._close_file:
342             self._queued_file.close()
343         self._close_file = None
344         self._queued_file = None
345
346     def _work_dirents(self):
347         path, stream_name, max_manifest_depth = self._queued_trees[0]
348         if stream_name != self.current_stream_name():
349             self.start_new_stream(stream_name)
350         while self._queued_dirents:
351             dirent = self._queued_dirents.popleft()
352             target = os.path.join(path, dirent)
353             if os.path.isdir(target):
354                 self._queue_tree(target,
355                                  os.path.join(stream_name, dirent),
356                                  max_manifest_depth - 1)
357             else:
358                 self._queue_file(target, dirent)
359                 break
360         if not self._queued_dirents:
361             self._queued_trees.popleft()
362
363     def _work_trees(self):
364         path, stream_name, max_manifest_depth = self._queued_trees[0]
365         d = util.listdir_recursive(
366             path, max_depth = (None if max_manifest_depth == 0 else 0))
367         if d:
368             self._queue_dirents(stream_name, d)
369         else:
370             self._queued_trees.popleft()
371
372     def _queue_file(self, source, filename=None):
373         assert (self._queued_file is None), "tried to queue more than one file"
374         if not hasattr(source, 'read'):
375             source = open(source, 'rb')
376             self._close_file = True
377         else:
378             self._close_file = False
379         if filename is None:
380             filename = os.path.basename(source.name)
381         self.start_new_file(filename)
382         self._queued_file = source
383
384     def _queue_dirents(self, stream_name, dirents):
385         assert (not self._queued_dirents), "tried to queue more than one tree"
386         self._queued_dirents = deque(sorted(dirents))
387
388     def _queue_tree(self, path, stream_name, max_manifest_depth):
389         self._queued_trees.append((path, stream_name, max_manifest_depth))
390
391     def write_file(self, source, filename=None):
392         self._queue_file(source, filename)
393         self.do_queued_work()
394
395     def write_directory_tree(self,
396                              path, stream_name='.', max_manifest_depth=-1):
397         self._queue_tree(path, stream_name, max_manifest_depth)
398         self.do_queued_work()
399
400     def write(self, newdata):
401         if hasattr(newdata, '__iter__'):
402             for s in newdata:
403                 self.write(s)
404             return
405         self._data_buffer.append(newdata)
406         self._data_buffer_len += len(newdata)
407         self._current_stream_length += len(newdata)
408         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
409             self.flush_data()
410
411     def open(self, streampath, filename=None):
412         """open(streampath[, filename]) -> file-like object
413
414         Pass in the path of a file to write to the Collection, either as a
415         single string or as two separate stream name and file name arguments.
416         This method returns a file-like object you can write to add it to the
417         Collection.
418
419         You may only have one file object from the Collection open at a time,
420         so be sure to close the object when you're done.  Using the object in
421         a with statement makes that easy::
422
423           with cwriter.open('./doc/page1.txt') as outfile:
424               outfile.write(page1_data)
425           with cwriter.open('./doc/page2.txt') as outfile:
426               outfile.write(page2_data)
427         """
428         if filename is None:
429             streampath, filename = split(streampath)
430         if self._last_open and not self._last_open.closed:
431             raise errors.AssertionError(
432                 "can't open '{}' when '{}' is still open".format(
433                     filename, self._last_open.name))
434         if streampath != self.current_stream_name():
435             self.start_new_stream(streampath)
436         self.set_current_file_name(filename)
437         self._last_open = _WriterFile(self, filename)
438         return self._last_open
439
440     def flush_data(self):
441         data_buffer = ''.join(self._data_buffer)
442         if data_buffer:
443             self._current_stream_locators.append(
444                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
445             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
446             self._data_buffer_len = len(self._data_buffer[0])
447
448     def start_new_file(self, newfilename=None):
449         self.finish_current_file()
450         self.set_current_file_name(newfilename)
451
452     def set_current_file_name(self, newfilename):
453         if re.search(r'[\t\n]', newfilename):
454             raise errors.AssertionError(
455                 "Manifest filenames cannot contain whitespace: %s" %
456                 newfilename)
457         elif re.search(r'\x00', newfilename):
458             raise errors.AssertionError(
459                 "Manifest filenames cannot contain NUL characters: %s" %
460                 newfilename)
461         self._current_file_name = newfilename
462
463     def current_file_name(self):
464         return self._current_file_name
465
466     def finish_current_file(self):
467         if self._current_file_name is None:
468             if self._current_file_pos == self._current_stream_length:
469                 return
470             raise errors.AssertionError(
471                 "Cannot finish an unnamed file " +
472                 "(%d bytes at offset %d in '%s' stream)" %
473                 (self._current_stream_length - self._current_file_pos,
474                  self._current_file_pos,
475                  self._current_stream_name))
476         self._current_stream_files.append([
477                 self._current_file_pos,
478                 self._current_stream_length - self._current_file_pos,
479                 self._current_file_name])
480         self._current_file_pos = self._current_stream_length
481         self._current_file_name = None
482
483     def start_new_stream(self, newstreamname='.'):
484         self.finish_current_stream()
485         self.set_current_stream_name(newstreamname)
486
487     def set_current_stream_name(self, newstreamname):
488         if re.search(r'[\t\n]', newstreamname):
489             raise errors.AssertionError(
490                 "Manifest stream names cannot contain whitespace")
491         self._current_stream_name = '.' if newstreamname=='' else newstreamname
492
493     def current_stream_name(self):
494         return self._current_stream_name
495
496     def finish_current_stream(self):
497         self.finish_current_file()
498         self.flush_data()
499         if not self._current_stream_files:
500             pass
501         elif self._current_stream_name is None:
502             raise errors.AssertionError(
503                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
504                 (self._current_stream_length, len(self._current_stream_files)))
505         else:
506             if not self._current_stream_locators:
507                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
508             self._finished_streams.append([self._current_stream_name,
509                                            self._current_stream_locators,
510                                            self._current_stream_files])
511         self._current_stream_files = []
512         self._current_stream_length = 0
513         self._current_stream_locators = []
514         self._current_stream_name = None
515         self._current_file_pos = 0
516         self._current_file_name = None
517
518     def finish(self):
519         # Store the manifest in Keep and return its locator.
520         return self._my_keep().put(self.manifest_text())
521
522     def portable_data_hash(self):
523         stripped = self.stripped_manifest()
524         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
525
526     def manifest_text(self):
527         self.finish_current_stream()
528         manifest = ''
529
530         for stream in self._finished_streams:
531             if not re.search(r'^\.(/.*)?$', stream[0]):
532                 manifest += './'
533             manifest += stream[0].replace(' ', '\\040')
534             manifest += ' ' + ' '.join(stream[1])
535             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
536             manifest += "\n"
537
538         return manifest
539
540     def data_locators(self):
541         ret = []
542         for name, locators, files in self._finished_streams:
543             ret += locators
544         return ret
545
546
547 class ResumableCollectionWriter(CollectionWriter):
548     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
549                    '_current_stream_locators', '_current_stream_name',
550                    '_current_file_name', '_current_file_pos', '_close_file',
551                    '_data_buffer', '_dependencies', '_finished_streams',
552                    '_queued_dirents', '_queued_trees']
553
554     def __init__(self, api_client=None, num_retries=0):
555         self._dependencies = {}
556         super(ResumableCollectionWriter, self).__init__(
557             api_client, num_retries=num_retries)
558
559     @classmethod
560     def from_state(cls, state, *init_args, **init_kwargs):
561         # Try to build a new writer from scratch with the given state.
562         # If the state is not suitable to resume (because files have changed,
563         # been deleted, aren't predictable, etc.), raise a
564         # StaleWriterStateError.  Otherwise, return the initialized writer.
565         # The caller is responsible for calling writer.do_queued_work()
566         # appropriately after it's returned.
567         writer = cls(*init_args, **init_kwargs)
568         for attr_name in cls.STATE_PROPS:
569             attr_value = state[attr_name]
570             attr_class = getattr(writer, attr_name).__class__
571             # Coerce the value into the same type as the initial value, if
572             # needed.
573             if attr_class not in (type(None), attr_value.__class__):
574                 attr_value = attr_class(attr_value)
575             setattr(writer, attr_name, attr_value)
576         # Check dependencies before we try to resume anything.
577         if any(KeepLocator(ls).permission_expired()
578                for ls in writer._current_stream_locators):
579             raise errors.StaleWriterStateError(
580                 "locators include expired permission hint")
581         writer.check_dependencies()
582         if state['_current_file'] is not None:
583             path, pos = state['_current_file']
584             try:
585                 writer._queued_file = open(path, 'rb')
586                 writer._queued_file.seek(pos)
587             except IOError as error:
588                 raise errors.StaleWriterStateError(
589                     "failed to reopen active file {}: {}".format(path, error))
590         return writer
591
592     def check_dependencies(self):
593         for path, orig_stat in self._dependencies.items():
594             if not S_ISREG(orig_stat[ST_MODE]):
595                 raise errors.StaleWriterStateError("{} not file".format(path))
596             try:
597                 now_stat = tuple(os.stat(path))
598             except OSError as error:
599                 raise errors.StaleWriterStateError(
600                     "failed to stat {}: {}".format(path, error))
601             if ((not S_ISREG(now_stat[ST_MODE])) or
602                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
603                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
604                 raise errors.StaleWriterStateError("{} changed".format(path))
605
606     def dump_state(self, copy_func=lambda x: x):
607         state = {attr: copy_func(getattr(self, attr))
608                  for attr in self.STATE_PROPS}
609         if self._queued_file is None:
610             state['_current_file'] = None
611         else:
612             state['_current_file'] = (os.path.realpath(self._queued_file.name),
613                                       self._queued_file.tell())
614         return state
615
616     def _queue_file(self, source, filename=None):
617         try:
618             src_path = os.path.realpath(source)
619         except Exception:
620             raise errors.AssertionError("{} not a file path".format(source))
621         try:
622             path_stat = os.stat(src_path)
623         except OSError as stat_error:
624             path_stat = None
625         super(ResumableCollectionWriter, self)._queue_file(source, filename)
626         fd_stat = os.fstat(self._queued_file.fileno())
627         if not S_ISREG(fd_stat.st_mode):
628             # We won't be able to resume from this cache anyway, so don't
629             # worry about further checks.
630             self._dependencies[source] = tuple(fd_stat)
631         elif path_stat is None:
632             raise errors.AssertionError(
633                 "could not stat {}: {}".format(source, stat_error))
634         elif path_stat.st_ino != fd_stat.st_ino:
635             raise errors.AssertionError(
636                 "{} changed between open and stat calls".format(source))
637         else:
638             self._dependencies[src_path] = tuple(fd_stat)
639
640     def write(self, data):
641         if self._queued_file is None:
642             raise errors.AssertionError(
643                 "resumable writer can't accept unsourced data")
644         return super(ResumableCollectionWriter, self).write(data)
645
646 ADD = "add"
647 DEL = "del"
648 MOD = "mod"
649
650 class SynchronizedCollectionBase(CollectionBase):
651     """Base class for Collections and Subcollections.  Implements the majority of
652     functionality relating to accessing items in the Collection."""
653
654     def __init__(self, parent=None):
655         self.parent = parent
656         self._modified = True
657         self._items = {}
658
659     def _my_api(self):
660         raise NotImplementedError()
661
662     def _my_keep(self):
663         raise NotImplementedError()
664
665     def _my_block_manager(self):
666         raise NotImplementedError()
667
668     def _populate(self):
669         raise NotImplementedError()
670
671     def sync_mode(self):
672         raise NotImplementedError()
673
674     def root_collection(self):
675         raise NotImplementedError()
676
677     def notify(self, event, collection, name, item):
678         raise NotImplementedError()
679
680     @synchronized
681     def find(self, path, create=False, create_collection=False):
682         """Recursively search the specified file path.  May return either a Collection
683         or ArvadosFile.
684
685         :create:
686           If true, create path components (i.e. Collections) that are
687           missing.  If "create" is False, return None if a path component is
688           not found.
689
690         :create_collection:
691           If the path is not found, "create" is True, and
692           "create_collection" is False, then create and return a new
693           ArvadosFile for the last path component.  If "create_collection" is
694           True, then create and return a new Collection for the last path
695           component.
696
697         """
698         if create and self.sync_mode() == SYNC_READONLY:
699             raise IOError((errno.EROFS, "Collection is read only"))
700
701         p = path.split("/")
702         if p[0] == '.':
703             del p[0]
704
705         if p and p[0]:
706             item = self._items.get(p[0])
707             if len(p) == 1:
708                 # item must be a file
709                 if item is None and create:
710                     # create new file
711                     if create_collection:
712                         item = Subcollection(self)
713                     else:
714                         item = ArvadosFile(self)
715                     self._items[p[0]] = item
716                     self._modified = True
717                     self.notify(ADD, self, p[0], item)
718                 return item
719             else:
720                 if item is None and create:
721                     # create new collection
722                     item = Subcollection(self)
723                     self._items[p[0]] = item
724                     self._modified = True
725                     self.notify(ADD, self, p[0], item)
726                 del p[0]
727                 if isinstance(item, SynchronizedCollectionBase):
728                     return item.find("/".join(p), create=create)
729                 else:
730                     raise errors.ArgumentError("Interior path components must be subcollection")
731         else:
732             return self
733
734     def open(self, path, mode):
735         """Open a file-like object for access.
736
737         :path:
738           path to a file in the collection
739         :mode:
740           one of "r", "r+", "w", "w+", "a", "a+"
741           :"r":
742             opens for reading
743           :"r+":
744             opens for reading and writing.  Reads/writes share a file pointer.
745           :"w", "w+":
746             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
747           :"a", "a+":
748             opens for reading and writing.  All writes are appended to
749             the end of the file.  Writing does not affect the file pointer for
750             reading.
751         """
752         mode = mode.replace("b", "")
753         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
754             raise ArgumentError("Bad mode '%s'" % mode)
755         create = (mode != "r")
756
757         if create and self.sync_mode() == SYNC_READONLY:
758             raise IOError((errno.EROFS, "Collection is read only"))
759
760         f = self.find(path, create=create)
761
762         if f is None:
763             raise IOError((errno.ENOENT, "File not found"))
764         if not isinstance(f, ArvadosFile):
765             raise IOError((errno.EISDIR, "Path must refer to a file."))
766
767         if mode[0] == "w":
768             f.truncate(0)
769
770         if mode == "r":
771             return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
772         else:
773             return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
774
775     @synchronized
776     def modified(self):
777         """Test if the collection (or any subcollection or file) has been modified
778         since it was created."""
779         if self._modified:
780             return True
781         for k,v in self._items.items():
782             if v.modified():
783                 return True
784         return False
785
786     @synchronized
787     def set_unmodified(self):
788         """Recursively clear modified flag"""
789         self._modified = False
790         for k,v in self._items.items():
791             v.set_unmodified()
792
793     @synchronized
794     def __iter__(self):
795         """Iterate over names of files and collections contained in this collection."""
796         return self._items.keys().__iter__()
797
798     @synchronized
799     def iterkeys(self):
800         """Iterate over names of files and collections directly contained in this collection."""
801         return self._items.keys()
802
803     @synchronized
804     def __getitem__(self, k):
805         """Get a file or collection that is directly contained by this collection.  If
806         you want to search a path, use `find()` instead.
807         """
808         return self._items[k]
809
810     @synchronized
811     def __contains__(self, k):
812         """If there is a file or collection a directly contained by this collection
813         with name "k"."""
814         return k in self._items
815
816     @synchronized
817     def __len__(self):
818         """Get the number of items directly contained in this collection"""
819         return len(self._items)
820
821     @must_be_writable
822     @synchronized
823     def __delitem__(self, p):
824         """Delete an item by name which is directly contained by this collection."""
825         del self._items[p]
826         self._modified = True
827         self.notify(DEL, self, p, None)
828
829     @synchronized
830     def keys(self):
831         """Get a list of names of files and collections directly contained in this collection."""
832         return self._items.keys()
833
834     @synchronized
835     def values(self):
836         """Get a list of files and collection objects directly contained in this collection."""
837         return self._items.values()
838
839     @synchronized
840     def items(self):
841         """Get a list of (name, object) tuples directly contained in this collection."""
842         return self._items.items()
843
844     def exists(self, path):
845         """Test if there is a file or collection at "path" """
846         return self.find(path) != None
847
848     @must_be_writable
849     @synchronized
850     def remove(self, path, rm_r=False):
851         """Remove the file or subcollection (directory) at `path`.
852         :rm_r:
853           Specify whether to remove non-empty subcollections (True), or raise an error (False).
854         """
855         p = path.split("/")
856         if p[0] == '.':
857             # Remove '.' from the front of the path
858             del p[0]
859
860         if len(p) > 0:
861             item = self._items.get(p[0])
862             if item is None:
863                 raise IOError((errno.ENOENT, "File not found"))
864             if len(p) == 1:
865                 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
866                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
867                 d = self._items[p[0]]
868                 del self._items[p[0]]
869                 self._modified = True
870                 self.notify(DEL, self, p[0], d)
871             else:
872                 del p[0]
873                 item.remove("/".join(p))
874         else:
875             raise IOError((errno.ENOENT, "File not found"))
876
877     def _cloneinto(self, target):
878         for k,v in self._items.items():
879             target._items[k] = v.clone(target)
880
881     def clone(self):
882         raise NotImplementedError()
883
884     @must_be_writable
885     @synchronized
886     def copy(self, source, target_path, source_collection=None, overwrite=False):
887         """Copy a file or subcollection to a new path in this collection.
888
889         :source:
890           An ArvadosFile, Subcollection, or string with a path to source file or subcollection
891
892         :target_path:
893           Destination file or path.  If the target path already exists and is a
894           subcollection, the item will be placed inside the subcollection.  If
895           the target path already exists and is a file, this will raise an error
896           unless you specify `overwrite=True`.
897
898         :source_collection:
899           Collection to copy `source_path` from (default `self`)
900
901         :overwrite:
902           Whether to overwrite target file if it already exists.
903         """
904         if source_collection is None:
905             source_collection = self
906
907         # Find the object to copy
908         if isinstance(source, basestring):
909             source_obj = source_collection.find(source)
910             if source_obj is None:
911                 raise IOError((errno.ENOENT, "File not found"))
912             sp = source.split("/")
913         else:
914             source_obj = source
915             sp = None
916
917         # Find parent collection the target path
918         tp = target_path.split("/")
919
920         # Determine the name to use.
921         target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
922
923         if not target_name:
924             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
925
926         target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
927
928         with target_dir.lock:
929             if target_name in target_dir:
930                 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
931                     target_dir = target_dir[target_name]
932                     target_name = sp[-1]
933                 elif not overwrite:
934                     raise IOError((errno.EEXIST, "File already exists"))
935
936             mod = None
937             if target_name in target_dir:
938                 mod = target_dir[target_name]
939
940             # Actually make the copy.
941             dup = source_obj.clone(target_dir)
942             target_dir._items[target_name] = dup
943             target_dir._modified = True
944
945         if mod:
946             self.notify(MOD, target_dir, target_name, (mod, dup))
947         else:
948             self.notify(ADD, target_dir, target_name, dup)
949
950     @synchronized
951     def manifest_text(self, strip=False, normalize=False):
952         """Get the manifest text for this collection, sub collections and files.
953
954         :strip:
955           If True, remove signing tokens from block locators if present.
956           If False, block locators are left unchanged.
957
958         :normalize:
959           If True, always export the manifest text in normalized form
960           even if the Collection is not modified.  If False and the collection
961           is not modified, return the original manifest text even if it is not
962           in normalized form.
963
964         """
965         if self.modified() or self._manifest_text is None or normalize:
966             return export_manifest(self, stream_name=".", portable_locators=strip)
967         else:
968             if strip:
969                 return self.stripped_manifest()
970             else:
971                 return self._manifest_text
972
973     @synchronized
974     def diff(self, end_collection, prefix=".", holding_collection=None):
975         """
976         Generate list of add/modify/delete actions which, when given to `apply`, will
977         change `self` to match `end_collection`
978         """
979         changes = []
980         if holding_collection is None:
981             holding_collection = CollectionRoot(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
982         for k in self:
983             if k not in end_collection:
984                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
985         for k in end_collection:
986             if k in self:
987                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
988                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
989                 elif end_collection[k] != self[k]:
990                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
991             else:
992                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
993         return changes
994
995     @must_be_writable
996     @synchronized
997     def apply(self, changes):
998         """
999         Apply changes from `diff`.  If a change conflicts with a local change, it
1000         will be saved to an alternate path indicating the conflict.
1001         """
1002         for c in changes:
1003             path = c[1]
1004             initial = c[2]
1005             local = self.find(path)
1006             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
1007                                                                     time.gmtime()))
1008             if c[0] == ADD:
1009                 if local is None:
1010                     # No local file at path, safe to copy over new file
1011                     self.copy(initial, path)
1012                 elif local is not None and local != initial:
1013                     # There is already local file and it is different:
1014                     # save change to conflict file.
1015                     self.copy(initial, conflictpath)
1016             elif c[0] == MOD:
1017                 if local == initial:
1018                     # Local matches the "initial" item so it has not
1019                     # changed locally and is safe to update.
1020                     if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
1021                         # Replace contents of local file with new contents
1022                         local.replace_contents(c[3])
1023                     else:
1024                         # Overwrite path with new item; this can happen if
1025                         # path was a file and is now a collection or vice versa
1026                         self.copy(c[3], path, overwrite=True)
1027                 else:
1028                     # Local is missing (presumably deleted) or local doesn't
1029                     # match the "start" value, so save change to conflict file
1030                     self.copy(c[3], conflictpath)
1031             elif c[0] == DEL:
1032                 if local == initial:
1033                     # Local item matches "initial" value, so it is safe to remove.
1034                     self.remove(path, rm_r=True)
1035                 # else, the file is modified or already removed, in either
1036                 # case we don't want to try to remove it.
1037
1038     def portable_data_hash(self):
1039         """Get the portable data hash for this collection's manifest."""
1040         stripped = self.manifest_text(strip=True)
1041         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1042
1043     @synchronized
1044     def __eq__(self, other):
1045         if other is self:
1046             return True
1047         if not isinstance(other, SynchronizedCollectionBase):
1048             return False
1049         if len(self._items) != len(other):
1050             return False
1051         for k in self._items:
1052             if k not in other:
1053                 return False
1054             if self._items[k] != other[k]:
1055                 return False
1056         return True
1057
1058     def __ne__(self, other):
1059         return not self.__eq__(other)
1060
1061 class CollectionRoot(SynchronizedCollectionBase):
1062     """Represents the root of an Arvados Collection, which may be associated with
1063     an API server Collection record.
1064
1065     Brief summary of useful methods:
1066
1067     :To read an existing file:
1068       `c.open("myfile", "r")`
1069
1070     :To write a new file:
1071       `c.open("myfile", "w")`
1072
1073     :To determine if a file exists:
1074       `c.find("myfile") is not None`
1075
1076     :To copy a file:
1077       `c.copy("source", "dest")`
1078
1079     :To delete a file:
1080       `c.remove("myfile")`
1081
1082     :To save to an existing collection record:
1083       `c.save()`
1084
1085     :To save a new collection record:
1086     `c.save_new()`
1087
1088     :To merge remote changes into this object:
1089       `c.update()`
1090
1091     This class is threadsafe.  The root collection object, all subcollections
1092     and files are protected by a single lock (i.e. each access locks the entire
1093     collection).
1094
1095     """
1096
1097     def __init__(self, manifest_locator_or_text=None,
1098                  parent=None,
1099                  apiconfig=None,
1100                  api_client=None,
1101                  keep_client=None,
1102                  num_retries=None,
1103                  block_manager=None,
1104                  sync=None):
1105         """:manifest_locator_or_text:
1106           One of Arvados collection UUID, block locator of
1107           a manifest, raw manifest text, or None (to create an empty collection).
1108         :parent:
1109           the parent Collection, may be None.
1110         :apiconfig:
1111           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1112           Prefer this over supplying your own api_client and keep_client (except in testing).
1113           Will use default config settings if not specified.
1114         :api_client:
1115           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1116         :keep_client:
1117           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1118         :num_retries:
1119           the number of retries for API and Keep requests.
1120         :block_manager:
1121           the block manager to use.  If not specified, create one.
1122         :sync:
1123           Set synchronization policy with API server collection record.
1124           :SYNC_READONLY:
1125             Collection is read only.  No synchronization.  This mode will
1126             also forego locking, which gives better performance.
1127           :SYNC_EXPLICIT:
1128             Collection is writable.  Synchronize on explicit request via `update()` or `save()`
1129           :SYNC_LIVE:
1130             Collection is writable.  Synchronize with server in response to
1131             background websocket events, on block write, or on file close.
1132
1133         """
1134         super(CollectionRoot, self).__init__(parent)
1135         self._api_client = api_client
1136         self._keep_client = keep_client
1137         self._block_manager = block_manager
1138
1139         if apiconfig:
1140             self._config = apiconfig
1141         else:
1142             self._config = config.settings()
1143
1144         self.num_retries = num_retries
1145         self._manifest_locator = None
1146         self._manifest_text = None
1147         self._api_response = None
1148
1149         if sync is None:
1150             raise errors.ArgumentError("Must specify sync mode")
1151
1152         self._sync = sync
1153         self.lock = threading.RLock()
1154         self.callbacks = []
1155         self.events = None
1156
1157         if manifest_locator_or_text:
1158             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1159                 self._manifest_locator = manifest_locator_or_text
1160             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1161                 self._manifest_locator = manifest_locator_or_text
1162             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1163                 self._manifest_text = manifest_locator_or_text
1164             else:
1165                 raise errors.ArgumentError(
1166                     "Argument to CollectionReader must be a manifest or a collection UUID")
1167
1168             self._populate()
1169
1170             if self._sync == SYNC_LIVE:
1171                 if not self._has_collection_uuid():
1172                     raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
1173                 self.events = events.subscribe(arvados.api(apiconfig=self._config),
1174                                                [["object_uuid", "=", self._manifest_locator]],
1175                                                self.on_message)
1176
1177
1178     def root_collection(self):
1179         return self
1180
1181     def sync_mode(self):
1182         return self._sync
1183
1184     def on_message(self, event):
1185         if event.get("object_uuid") == self._manifest_locator:
1186             self.update()
1187
1188     @staticmethod
1189     def create(name, owner_uuid=None, sync=SYNC_EXPLICIT, apiconfig=None):
1190         """Create a new empty Collection with associated collection record."""
1191         c = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
1192         c.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1193         if sync == SYNC_LIVE:
1194             c.events = events.subscribe(arvados.api(apiconfig=self._config), [["object_uuid", "=", c._manifest_locator]], c.on_message)
1195         return c
1196
1197     @synchronized
1198     @retry_method
1199     def update(self, other=None, num_retries=None):
1200         if other is None:
1201             if self._manifest_locator is None:
1202                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1203             n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1204             other = import_collection(n["manifest_text"])
1205         baseline = import_collection(self._manifest_text)
1206         self.apply(other.diff(baseline))
1207
1208     @synchronized
1209     def _my_api(self):
1210         if self._api_client is None:
1211             self._api_client = arvados.SafeApi(self._config)
1212             self._keep_client = self._api_client.keep
1213         return self._api_client
1214
1215     @synchronized
1216     def _my_keep(self):
1217         if self._keep_client is None:
1218             if self._api_client is None:
1219                 self._my_api()
1220             else:
1221                 self._keep_client = KeepClient(api=self._api_client)
1222         return self._keep_client
1223
1224     @synchronized
1225     def _my_block_manager(self):
1226         if self._block_manager is None:
1227             self._block_manager = BlockManager(self._my_keep())
1228         return self._block_manager
1229
1230     def _populate_from_api_server(self):
1231         # As in KeepClient itself, we must wait until the last
1232         # possible moment to instantiate an API client, in order to
1233         # avoid tripping up clients that don't have access to an API
1234         # server.  If we do build one, make sure our Keep client uses
1235         # it.  If instantiation fails, we'll fall back to the except
1236         # clause, just like any other Collection lookup
1237         # failure. Return an exception, or None if successful.
1238         try:
1239             self._api_response = self._my_api().collections().get(
1240                 uuid=self._manifest_locator).execute(
1241                     num_retries=self.num_retries)
1242             self._manifest_text = self._api_response['manifest_text']
1243             return None
1244         except Exception as e:
1245             return e
1246
1247     def _populate_from_keep(self):
1248         # Retrieve a manifest directly from Keep. This has a chance of
1249         # working if [a] the locator includes a permission signature
1250         # or [b] the Keep services are operating in world-readable
1251         # mode. Return an exception, or None if successful.
1252         try:
1253             self._manifest_text = self._my_keep().get(
1254                 self._manifest_locator, num_retries=self.num_retries)
1255         except Exception as e:
1256             return e
1257
1258     def _populate(self):
1259         if self._manifest_locator is None and self._manifest_text is None:
1260             return
1261         error_via_api = None
1262         error_via_keep = None
1263         should_try_keep = ((self._manifest_text is None) and
1264                            util.keep_locator_pattern.match(
1265                                self._manifest_locator))
1266         if ((self._manifest_text is None) and
1267             util.signed_locator_pattern.match(self._manifest_locator)):
1268             error_via_keep = self._populate_from_keep()
1269         if self._manifest_text is None:
1270             error_via_api = self._populate_from_api_server()
1271             if error_via_api is not None and not should_try_keep:
1272                 raise error_via_api
1273         if ((self._manifest_text is None) and
1274             not error_via_keep and
1275             should_try_keep):
1276             # Looks like a keep locator, and we didn't already try keep above
1277             error_via_keep = self._populate_from_keep()
1278         if self._manifest_text is None:
1279             # Nothing worked!
1280             raise arvados.errors.NotFoundError(
1281                 ("Failed to retrieve collection '{}' " +
1282                  "from either API server ({}) or Keep ({})."
1283                  ).format(
1284                     self._manifest_locator,
1285                     error_via_api,
1286                     error_via_keep))
1287         # populate
1288         self._baseline_manifest = self._manifest_text
1289         import_manifest(self._manifest_text, self)
1290
1291         if self._sync == SYNC_READONLY:
1292             # Now that we're populated, knowing that this will be readonly,
1293             # forego any further locking.
1294             self.lock = NoopLock()
1295
1296     def _has_collection_uuid(self):
1297         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1298
1299     def __enter__(self):
1300         return self
1301
1302     def __exit__(self, exc_type, exc_value, traceback):
1303         """Support scoped auto-commit in a with: block"""
1304         if self._sync != SYNC_READONLY and self._has_collection_uuid():
1305             self.save()
1306         if self._block_manager is not None:
1307             self._block_manager.stop_threads()
1308
1309     @synchronized
1310     def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1311         if new_config is None:
1312             new_config = self._config
1313         c = CollectionRoot(parent=new_parent, apiconfig=new_config, sync=new_sync)
1314         if new_sync == SYNC_READONLY:
1315             c.lock = NoopLock()
1316         c._items = {}
1317         self._cloneinto(c)
1318         return c
1319
1320     @synchronized
1321     def api_response(self):
1322         """
1323         api_response() -> dict or None
1324
1325         Returns information about this Collection fetched from the API server.
1326         If the Collection exists in Keep but not the API server, currently
1327         returns None.  Future versions may provide a synthetic response.
1328         """
1329         return self._api_response
1330
1331     @must_be_writable
1332     @synchronized
1333     @retry_method
1334     def save(self, merge=True, num_retries=None):
1335         """Commit pending buffer blocks to Keep, merge with remote record (if
1336         update=True), write the manifest to Keep, and update the collection
1337         record.  Will raise AssertionError if not associated with a collection
1338         record on the API server.  If you want to save a manifest to Keep only,
1339         see `save_new()`.
1340
1341         :update:
1342           Update and merge remote changes before saving.  Otherwise, any
1343           remote changes will be ignored and overwritten.
1344
1345         """
1346         if self.modified():
1347             if not self._has_collection_uuid():
1348                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
1349             self._my_block_manager().commit_all()
1350             if merge:
1351                 self.update()
1352             self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1353
1354             mt = self.manifest_text(strip=False)
1355             self._api_response = self._my_api().collections().update(
1356                 uuid=self._manifest_locator,
1357                 body={'manifest_text': mt}
1358                 ).execute(
1359                     num_retries=num_retries)
1360             self._manifest_text = mt
1361             self.set_unmodified()
1362
1363     @must_be_writable
1364     @synchronized
1365     @retry_method
1366     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1367         """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
1368         a new collection record (if create_collection_record True).  After
1369         creating a new collection record, this Collection object will be
1370         associated with the new record for `save()` and SYNC_LIVE updates.
1371
1372         :name:
1373           The collection name.
1374
1375         :keep_only:
1376           Only save the manifest to keep, do not create a collection record.
1377
1378         :owner_uuid:
1379           the user, or project uuid that will own this collection.
1380           If None, defaults to the current user.
1381
1382         :ensure_unique_name:
1383           If True, ask the API server to rename the collection
1384           if it conflicts with a collection with the same name and owner.  If
1385           False, a name conflict will result in an error.
1386
1387         """
1388         self._my_block_manager().commit_all()
1389         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1390         mt = self.manifest_text(strip=False)
1391
1392         if create_collection_record:
1393             if name is None:
1394                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1395
1396             body = {"manifest_text": mt,
1397                     "name": name}
1398             if owner_uuid:
1399                 body["owner_uuid"] = owner_uuid
1400
1401             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1402
1403             if self.events:
1404                 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1405
1406             self._manifest_locator = self._api_response["uuid"]
1407
1408             if self.events:
1409                 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1410
1411         self._manifest_text = mt
1412         self.set_unmodified()
1413
1414     @synchronized
1415     def subscribe(self, callback):
1416         self.callbacks.append(callback)
1417
1418     @synchronized
1419     def unsubscribe(self, callback):
1420         self.callbacks.remove(callback)
1421
1422     @synchronized
1423     def notify(self, event, collection, name, item):
1424         for c in self.callbacks:
1425             c(event, collection, name, item)
1426
1427 def ReadOnlyCollection(*args, **kwargs):
1428     kwargs["sync"] = SYNC_READONLY
1429     return CollectionRoot(*args, **kwargs)
1430
1431 def WritableCollection(*args, **kwargs):
1432     kwargs["sync"] = SYNC_EXPLICIT
1433     return CollectionRoot(*args, **kwargs)
1434
1435 def LiveCollection(*args, **kwargs):
1436     kwargs["sync"] = SYNC_LIVE
1437     return CollectionRoot(*args, **kwargs)
1438
1439
1440 class Subcollection(SynchronizedCollectionBase):
1441     """This is a subdirectory within a collection that doesn't have its own API
1442     server record.  It falls under the umbrella of the root collection."""
1443
1444     def __init__(self, parent):
1445         super(Subcollection, self).__init__(parent)
1446         self.lock = self.root_collection().lock
1447
1448     def root_collection(self):
1449         return self.parent.root_collection()
1450
1451     def sync_mode(self):
1452         return self.root_collection().sync_mode()
1453
1454     def _my_api(self):
1455         return self.root_collection()._my_api()
1456
1457     def _my_keep(self):
1458         return self.root_collection()._my_keep()
1459
1460     def _my_block_manager(self):
1461         return self.root_collection()._my_block_manager()
1462
1463     def _populate(self):
1464         self.root_collection()._populate()
1465
1466     def notify(self, event, collection, name, item):
1467         return self.root_collection().notify(event, collection, name, item)
1468
1469     @synchronized
1470     def clone(self, new_parent):
1471         c = Subcollection(new_parent)
1472         self._cloneinto(c)
1473         return c
1474
1475 def import_manifest(manifest_text,
1476                     into_collection=None,
1477                     api_client=None,
1478                     keep=None,
1479                     num_retries=None,
1480                     sync=SYNC_READONLY):
1481     """Import a manifest into a `Collection`.
1482
1483     :manifest_text:
1484       The manifest text to import from.
1485
1486     :into_collection:
1487       The `Collection` that will be initialized (must be empty).
1488       If None, create a new `Collection` object.
1489
1490     :api_client:
1491       The API client object that will be used when creating a new `Collection` object.
1492
1493     :keep:
1494       The keep client object that will be used when creating a new `Collection` object.
1495
1496     :num_retries:
1497       the default number of api client and keep retries on error.
1498
1499     :sync:
1500       Collection sync mode (only if into_collection is None)
1501     """
1502     if into_collection is not None:
1503         if len(into_collection) > 0:
1504             raise ArgumentError("Can only import manifest into an empty collection")
1505         c = into_collection
1506     else:
1507         c = CollectionRoot(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1508
1509     save_sync = c.sync_mode()
1510     c._sync = None
1511
1512     STREAM_NAME = 0
1513     BLOCKS = 1
1514     SEGMENTS = 2
1515
1516     stream_name = None
1517     state = STREAM_NAME
1518
1519     for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1520         tok = n.group(1)
1521         sep = n.group(2)
1522
1523         if state == STREAM_NAME:
1524             # starting a new stream
1525             stream_name = tok.replace('\\040', ' ')
1526             blocks = []
1527             segments = []
1528             streamoffset = 0L
1529             state = BLOCKS
1530             continue
1531
1532         if state == BLOCKS:
1533             s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1534             if s:
1535                 blocksize = long(s.group(1))
1536                 blocks.append(Range(tok, streamoffset, blocksize))
1537                 streamoffset += blocksize
1538             else:
1539                 state = SEGMENTS
1540
1541         if state == SEGMENTS:
1542             s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1543             if s:
1544                 pos = long(s.group(1))
1545                 size = long(s.group(2))
1546                 name = s.group(3).replace('\\040', ' ')
1547                 f = c.find("%s/%s" % (stream_name, name), create=True)
1548                 f.add_segment(blocks, pos, size)
1549             else:
1550                 # error!
1551                 raise errors.SyntaxError("Invalid manifest format")
1552
1553         if sep == "\n":
1554             stream_name = None
1555             state = STREAM_NAME
1556
1557     c.set_unmodified()
1558     c._sync = save_sync
1559     return c
1560
1561 def export_manifest(item, stream_name=".", portable_locators=False):
1562     """
1563     :item:
1564       Create a manifest for `item` (must be a `Collection` or `ArvadosFile`).  If
1565       `item` is a is a `Collection`, this will also export subcollections.
1566
1567     :stream_name:
1568       the name of the stream when exporting `item`.
1569
1570     :portable_locators:
1571       If True, strip any permission hints on block locators.
1572       If False, use block locators as-is.
1573     """
1574     buf = ""
1575     if isinstance(item, SynchronizedCollectionBase):
1576         stream = {}
1577         sorted_keys = sorted(item.keys())
1578         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1579             v = item[k]
1580             st = []
1581             for s in v.segments():
1582                 loc = s.locator
1583                 if loc.startswith("bufferblock"):
1584                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1585                 if portable_locators:
1586                     loc = KeepLocator(loc).stripped()
1587                 st.append(LocatorAndRange(loc, locator_block_size(loc),
1588                                      s.segment_offset, s.range_size))
1589             stream[k] = st
1590         if stream:
1591             buf += ' '.join(normalize_stream(stream_name, stream))
1592             buf += "\n"
1593         for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1594             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1595     elif isinstance(item, ArvadosFile):
1596         st = []
1597         for s in item.segments:
1598             loc = s.locator
1599             if loc.startswith("bufferblock"):
1600                 loc = item._bufferblocks[loc].calculate_locator()
1601             if portable_locators:
1602                 loc = KeepLocator(loc).stripped()
1603             st.append(LocatorAndRange(loc, locator_block_size(loc),
1604                                  s.segment_offset, s.range_size))
1605         stream[stream_name] = st
1606         buf += ' '.join(normalize_stream(stream_name, stream))
1607         buf += "\n"
1608     return buf