5692: Add flush flag to manifest_text() which calls commit_all(). Added
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace")
311         self._current_stream_name = '.' if newstreamname=='' else newstreamname
312
313     def current_stream_name(self):
314         return self._current_stream_name
315
316     def finish_current_stream(self):
317         self.finish_current_file()
318         self.flush_data()
319         if not self._current_stream_files:
320             pass
321         elif self._current_stream_name is None:
322             raise errors.AssertionError(
323                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324                 (self._current_stream_length, len(self._current_stream_files)))
325         else:
326             if not self._current_stream_locators:
327                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328             self._finished_streams.append([self._current_stream_name,
329                                            self._current_stream_locators,
330                                            self._current_stream_files])
331         self._current_stream_files = []
332         self._current_stream_length = 0
333         self._current_stream_locators = []
334         self._current_stream_name = None
335         self._current_file_pos = 0
336         self._current_file_name = None
337
338     def finish(self):
339         """Store the manifest in Keep and return its locator.
340
341         This is useful for storing manifest fragments (task outputs)
342         temporarily in Keep during a Crunch job.
343
344         In other cases you should make a collection instead, by
345         sending manifest_text() to the API server's "create
346         collection" endpoint.
347         """
348         return self._my_keep().put(self.manifest_text(), copies=self.replication)
349
350     def portable_data_hash(self):
351         stripped = self.stripped_manifest()
352         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
353
354     def manifest_text(self):
355         self.finish_current_stream()
356         manifest = ''
357
358         for stream in self._finished_streams:
359             if not re.search(r'^\.(/.*)?$', stream[0]):
360                 manifest += './'
361             manifest += stream[0].replace(' ', '\\040')
362             manifest += ' ' + ' '.join(stream[1])
363             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
364             manifest += "\n"
365
366         return manifest
367
368     def data_locators(self):
369         ret = []
370         for name, locators, files in self._finished_streams:
371             ret += locators
372         return ret
373
374
375 class ResumableCollectionWriter(CollectionWriter):
376     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377                    '_current_stream_locators', '_current_stream_name',
378                    '_current_file_name', '_current_file_pos', '_close_file',
379                    '_data_buffer', '_dependencies', '_finished_streams',
380                    '_queued_dirents', '_queued_trees']
381
382     def __init__(self, api_client=None, **kwargs):
383         self._dependencies = {}
384         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
385
386     @classmethod
387     def from_state(cls, state, *init_args, **init_kwargs):
388         # Try to build a new writer from scratch with the given state.
389         # If the state is not suitable to resume (because files have changed,
390         # been deleted, aren't predictable, etc.), raise a
391         # StaleWriterStateError.  Otherwise, return the initialized writer.
392         # The caller is responsible for calling writer.do_queued_work()
393         # appropriately after it's returned.
394         writer = cls(*init_args, **init_kwargs)
395         for attr_name in cls.STATE_PROPS:
396             attr_value = state[attr_name]
397             attr_class = getattr(writer, attr_name).__class__
398             # Coerce the value into the same type as the initial value, if
399             # needed.
400             if attr_class not in (type(None), attr_value.__class__):
401                 attr_value = attr_class(attr_value)
402             setattr(writer, attr_name, attr_value)
403         # Check dependencies before we try to resume anything.
404         if any(KeepLocator(ls).permission_expired()
405                for ls in writer._current_stream_locators):
406             raise errors.StaleWriterStateError(
407                 "locators include expired permission hint")
408         writer.check_dependencies()
409         if state['_current_file'] is not None:
410             path, pos = state['_current_file']
411             try:
412                 writer._queued_file = open(path, 'rb')
413                 writer._queued_file.seek(pos)
414             except IOError as error:
415                 raise errors.StaleWriterStateError(
416                     "failed to reopen active file {}: {}".format(path, error))
417         return writer
418
419     def check_dependencies(self):
420         for path, orig_stat in self._dependencies.items():
421             if not S_ISREG(orig_stat[ST_MODE]):
422                 raise errors.StaleWriterStateError("{} not file".format(path))
423             try:
424                 now_stat = tuple(os.stat(path))
425             except OSError as error:
426                 raise errors.StaleWriterStateError(
427                     "failed to stat {}: {}".format(path, error))
428             if ((not S_ISREG(now_stat[ST_MODE])) or
429                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431                 raise errors.StaleWriterStateError("{} changed".format(path))
432
433     def dump_state(self, copy_func=lambda x: x):
434         state = {attr: copy_func(getattr(self, attr))
435                  for attr in self.STATE_PROPS}
436         if self._queued_file is None:
437             state['_current_file'] = None
438         else:
439             state['_current_file'] = (os.path.realpath(self._queued_file.name),
440                                       self._queued_file.tell())
441         return state
442
443     def _queue_file(self, source, filename=None):
444         try:
445             src_path = os.path.realpath(source)
446         except Exception:
447             raise errors.AssertionError("{} not a file path".format(source))
448         try:
449             path_stat = os.stat(src_path)
450         except OSError as stat_error:
451             path_stat = None
452         super(ResumableCollectionWriter, self)._queue_file(source, filename)
453         fd_stat = os.fstat(self._queued_file.fileno())
454         if not S_ISREG(fd_stat.st_mode):
455             # We won't be able to resume from this cache anyway, so don't
456             # worry about further checks.
457             self._dependencies[source] = tuple(fd_stat)
458         elif path_stat is None:
459             raise errors.AssertionError(
460                 "could not stat {}: {}".format(source, stat_error))
461         elif path_stat.st_ino != fd_stat.st_ino:
462             raise errors.AssertionError(
463                 "{} changed between open and stat calls".format(source))
464         else:
465             self._dependencies[src_path] = tuple(fd_stat)
466
467     def write(self, data):
468         if self._queued_file is None:
469             raise errors.AssertionError(
470                 "resumable writer can't accept unsourced data")
471         return super(ResumableCollectionWriter, self).write(data)
472
473
474 ADD = "add"
475 DEL = "del"
476 MOD = "mod"
477 FILE = "file"
478 COLLECTION = "collection"
479
480 class RichCollectionBase(CollectionBase):
481     """Base class for Collections and Subcollections.
482
483     Implements the majority of functionality relating to accessing items in the
484     Collection.
485
486     """
487
488     def __init__(self, parent=None):
489         self.parent = parent
490         self._modified = True
491         self._items = {}
492
493     def _my_api(self):
494         raise NotImplementedError()
495
496     def _my_keep(self):
497         raise NotImplementedError()
498
499     def _my_block_manager(self):
500         raise NotImplementedError()
501
502     def writable(self):
503         raise NotImplementedError()
504
505     def root_collection(self):
506         raise NotImplementedError()
507
508     def notify(self, event, collection, name, item):
509         raise NotImplementedError()
510
511     def stream_name(self):
512         raise NotImplementedError()
513
514     @must_be_writable
515     @synchronized
516     def find_or_create(self, path, create_type):
517         """Recursively search the specified file path.
518
519         May return either a `Collection` or `ArvadosFile`.  If not found, will
520         create a new item at the specified path based on `create_type`.  Will
521         create intermediate subcollections needed to contain the final item in
522         the path.
523
524         :create_type:
525           One of `arvados.collection.FILE` or
526           `arvados.collection.COLLECTION`.  If the path is not found, and value
527           of create_type is FILE then create and return a new ArvadosFile for
528           the last path component.  If COLLECTION, then create and return a new
529           Collection for the last path component.
530
531         """
532
533         pathcomponents = path.split("/", 1)
534         if pathcomponents[0]:
535             item = self._items.get(pathcomponents[0])
536             if len(pathcomponents) == 1:
537                 if item is None:
538                     # create new file
539                     if create_type == COLLECTION:
540                         item = Subcollection(self)
541                     else:
542                         item = ArvadosFile(self)
543                     self._items[pathcomponents[0]] = item
544                     self._modified = True
545                     self.notify(ADD, self, pathcomponents[0], item)
546                 return item
547             else:
548                 if item is None:
549                     # create new collection
550                     item = Subcollection(self)
551                     self._items[pathcomponents[0]] = item
552                     self._modified = True
553                     self.notify(ADD, self, pathcomponents[0], item)
554                 if isinstance(item, RichCollectionBase):
555                     return item.find_or_create(pathcomponents[1], create_type)
556                 else:
557                     raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
558         else:
559             return self
560
561     @synchronized
562     def find(self, path):
563         """Recursively search the specified file path.
564
565         May return either a Collection or ArvadosFile.  Return None if not
566         found.
567
568         """
569         if not path:
570             raise errors.ArgumentError("Parameter 'path' must not be empty.")
571
572         pathcomponents = path.split("/", 1)
573         item = self._items.get(pathcomponents[0])
574         if len(pathcomponents) == 1:
575             return item
576         else:
577             if isinstance(item, RichCollectionBase):
578                 if pathcomponents[1]:
579                     return item.find(pathcomponents[1])
580                 else:
581                     return item
582             else:
583                 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
584
585     def mkdirs(path):
586         """Recursive subcollection create.
587
588         Like `os.mkdirs()`.  Will create intermediate subcollections needed to
589         contain the leaf subcollection path.
590
591         """
592         return self.find_or_create(path, COLLECTION)
593
594     def open(self, path, mode="r"):
595         """Open a file-like object for access.
596
597         :path:
598           path to a file in the collection
599         :mode:
600           one of "r", "r+", "w", "w+", "a", "a+"
601           :"r":
602             opens for reading
603           :"r+":
604             opens for reading and writing.  Reads/writes share a file pointer.
605           :"w", "w+":
606             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
607           :"a", "a+":
608             opens for reading and writing.  All writes are appended to
609             the end of the file.  Writing does not affect the file pointer for
610             reading.
611         """
612         mode = mode.replace("b", "")
613         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
614             raise errors.ArgumentError("Bad mode '%s'" % mode)
615         create = (mode != "r")
616
617         if create and not self.writable():
618             raise IOError((errno.EROFS, "Collection is read only"))
619
620         if create:
621             arvfile = self.find_or_create(path, FILE)
622         else:
623             arvfile = self.find(path)
624
625         if arvfile is None:
626             raise IOError((errno.ENOENT, "File not found"))
627         if not isinstance(arvfile, ArvadosFile):
628             raise IOError((errno.EISDIR, "Path must refer to a file."))
629
630         if mode[0] == "w":
631             arvfile.truncate(0)
632
633         name = os.path.basename(path)
634
635         if mode == "r":
636             return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
637         else:
638             return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
639
640     @synchronized
641     def modified(self):
642         """Test if the collection (or any subcollection or file) has been modified."""
643         if self._modified:
644             return True
645         for k,v in self._items.items():
646             if v.modified():
647                 return True
648         return False
649
650     @synchronized
651     def set_unmodified(self):
652         """Recursively clear modified flag."""
653         self._modified = False
654         for k,v in self._items.items():
655             v.set_unmodified()
656
657     @synchronized
658     def __iter__(self):
659         """Iterate over names of files and collections contained in this collection."""
660         return iter(self._items.keys())
661
662     @synchronized
663     def __getitem__(self, k):
664         """Get a file or collection that is directly contained by this collection.
665
666         If you want to search a path, use `find()` instead.
667
668         """
669         return self._items[k]
670
671     @synchronized
672     def __contains__(self, k):
673         """Test if there is a file or collection a directly contained by this collection."""
674         return k in self._items
675
676     @synchronized
677     def __len__(self):
678         """Get the number of items directly contained in this collection."""
679         return len(self._items)
680
681     @must_be_writable
682     @synchronized
683     def __delitem__(self, p):
684         """Delete an item by name which is directly contained by this collection."""
685         del self._items[p]
686         self._modified = True
687         self.notify(DEL, self, p, None)
688
689     @synchronized
690     def keys(self):
691         """Get a list of names of files and collections directly contained in this collection."""
692         return self._items.keys()
693
694     @synchronized
695     def values(self):
696         """Get a list of files and collection objects directly contained in this collection."""
697         return self._items.values()
698
699     @synchronized
700     def items(self):
701         """Get a list of (name, object) tuples directly contained in this collection."""
702         return self._items.items()
703
704     def exists(self, path):
705         """Test if there is a file or collection at `path`."""
706         return self.find(path) is not None
707
708     @must_be_writable
709     @synchronized
710     def remove(self, path, recursive=False):
711         """Remove the file or subcollection (directory) at `path`.
712
713         :recursive:
714           Specify whether to remove non-empty subcollections (True), or raise an error (False).
715         """
716
717         if not path:
718             raise errors.ArgumentError("Parameter 'path' must not be empty.")
719
720         pathcomponents = path.split("/", 1)
721         item = self._items.get(pathcomponents[0])
722         if item is None:
723             raise IOError((errno.ENOENT, "File not found"))
724         if len(pathcomponents) == 1:
725             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
726                 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
727             deleteditem = self._items[pathcomponents[0]]
728             del self._items[pathcomponents[0]]
729             self._modified = True
730             self.notify(DEL, self, pathcomponents[0], deleteditem)
731         else:
732             item.remove(pathcomponents[1])
733
734     def _clonefrom(self, source):
735         for k,v in source.items():
736             self._items[k] = v.clone(self)
737
738     def clone(self):
739         raise NotImplementedError()
740
741     @must_be_writable
742     @synchronized
743     def add(self, source_obj, target_name, overwrite=False):
744         """Copy a file or subcollection to this collection.
745
746         :source_obj:
747           An ArvadosFile, or Subcollection object
748
749         :target_name:
750           Destination item name.  If the target name already exists and is a
751           file, this will raise an error unless you specify `overwrite=True`.
752
753         :overwrite:
754           Whether to overwrite target file if it already exists.
755
756         """
757
758         if target_name in self and not overwrite:
759             raise IOError((errno.EEXIST, "File already exists"))
760
761         modified_from = None
762         if target_name in self:
763             modified_from = self[target_name]
764
765         # Actually make the copy.
766         dup = source_obj.clone(self)
767         self._items[target_name] = dup
768         self._modified = True
769
770         if modified_from:
771             self.notify(MOD, self, target_name, (modified_from, dup))
772         else:
773             self.notify(ADD, self, target_name, dup)
774
775     @must_be_writable
776     @synchronized
777     def copy(self, source, target_path, source_collection=None, overwrite=False):
778         """Copy a file or subcollection to a new path in this collection.
779
780         :source:
781           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
782
783         :target_path:
784           Destination file or path.  If the target path already exists and is a
785           subcollection, the item will be placed inside the subcollection.  If
786           the target path already exists and is a file, this will raise an error
787           unless you specify `overwrite=True`.
788
789         :source_collection:
790           Collection to copy `source_path` from (default `self`)
791
792         :overwrite:
793           Whether to overwrite target file if it already exists.
794         """
795         if source_collection is None:
796             source_collection = self
797
798         # Find the object to copy
799         if isinstance(source, basestring):
800             source_obj = source_collection.find(source)
801             if source_obj is None:
802                 raise IOError((errno.ENOENT, "File not found"))
803             sourcecomponents = source.split("/")
804         else:
805             source_obj = source
806             sourcecomponents = None
807
808         # Find parent collection the target path
809         targetcomponents = target_path.split("/")
810
811         # Determine the name to use.
812         target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
813
814         if not target_name:
815             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
816
817         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
818
819         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
820             target_dir = target_dir[target_name]
821             target_name = sourcecomponents[-1]
822
823         target_dir.add(source_obj, target_name, overwrite)
824
825     def portable_manifest_text(self, stream_name="."):
826         """Get the manifest text for this collection, sub collections and files.
827
828         This method does not flush outstanding to Keep.  It will return a
829         normalized manifest with access tokens stripped.
830
831         :stream_name:
832           Name to use for this stream (directory)
833
834         """
835         return self.manifest_text(stream_name, strip=True, normalize=True, flush=False)
836
837     @synchronized
838     def manifest_text(self, stream_name=".", strip=False, normalize=False, flush=True):
839         """Get the manifest text for this collection, sub collections and files.
840
841         By default, this method will flush outstanding blocksto Keep.  By
842         default it will not normalize the manifest or strip access tokens.
843
844         :stream_name:
845           Name to use for this stream (directory)
846
847         :strip:
848           If True, remove signing tokens from block locators if present.
849           If False (default), block locators are left unchanged.
850
851         :normalize:
852           If True, always export the manifest text in normalized form
853           even if the Collection is not modified.  If False (default) and the collection
854           is not modified, return the original manifest text even if it is not
855           in normalized form.
856
857         :flush:
858           If true (default) write any outstanding blocks.
859
860         """
861
862         if self.modified() or self._manifest_text is None or normalize:
863             if flush:
864                 self._my_block_manager().commit_all()
865
866             stream = {}
867             buf = []
868             sorted_keys = sorted(self.keys())
869             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
870                 # Create a stream per file `k`
871                 arvfile = self[filename]
872                 filestream = []
873                 for segment in arvfile.segments():
874                     loc = segment.locator
875                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
876                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
877                     if strip:
878                         loc = KeepLocator(loc).stripped()
879                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
880                                          segment.segment_offset, segment.range_size))
881                 stream[filename] = filestream
882             if stream:
883                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
884             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
885                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
886             return "".join(buf)
887         else:
888             if strip:
889                 return self.stripped_manifest()
890             else:
891                 return self._manifest_text
892
893     @synchronized
894     def diff(self, end_collection, prefix=".", holding_collection=None):
895         """Generate list of add/modify/delete actions.
896
897         When given to `apply`, will change `self` to match `end_collection`
898
899         """
900         changes = []
901         if holding_collection is None:
902             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
903         for k in self:
904             if k not in end_collection:
905                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
906         for k in end_collection:
907             if k in self:
908                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
909                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
910                 elif end_collection[k] != self[k]:
911                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
912             else:
913                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
914         return changes
915
916     @must_be_writable
917     @synchronized
918     def apply(self, changes):
919         """Apply changes from `diff`.
920
921         If a change conflicts with a local change, it will be saved to an
922         alternate path indicating the conflict.
923
924         """
925         for change in changes:
926             event_type = change[0]
927             path = change[1]
928             initial = change[2]
929             local = self.find(path)
930             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
931                                                                     time.gmtime()))
932             if event_type == ADD:
933                 if local is None:
934                     # No local file at path, safe to copy over new file
935                     self.copy(initial, path)
936                 elif local is not None and local != initial:
937                     # There is already local file and it is different:
938                     # save change to conflict file.
939                     self.copy(initial, conflictpath)
940             elif event_type == MOD:
941                 final = change[3]
942                 if local == initial:
943                     # Local matches the "initial" item so it has not
944                     # changed locally and is safe to update.
945                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
946                         # Replace contents of local file with new contents
947                         local.replace_contents(final)
948                     else:
949                         # Overwrite path with new item; this can happen if
950                         # path was a file and is now a collection or vice versa
951                         self.copy(final, path, overwrite=True)
952                 else:
953                     # Local is missing (presumably deleted) or local doesn't
954                     # match the "start" value, so save change to conflict file
955                     self.copy(final, conflictpath)
956             elif event_type == DEL:
957                 if local == initial:
958                     # Local item matches "initial" value, so it is safe to remove.
959                     self.remove(path, recursive=True)
960                 # else, the file is modified or already removed, in either
961                 # case we don't want to try to remove it.
962
963     def portable_data_hash(self):
964         """Get the portable data hash for this collection's manifest."""
965         stripped = self.portable_manifest_text()
966         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
967
968     @synchronized
969     def __eq__(self, other):
970         if other is self:
971             return True
972         if not isinstance(other, RichCollectionBase):
973             return False
974         if len(self._items) != len(other):
975             return False
976         for k in self._items:
977             if k not in other:
978                 return False
979             if self._items[k] != other[k]:
980                 return False
981         return True
982
983     def __ne__(self, other):
984         return not self.__eq__(other)
985
986
987 class Collection(RichCollectionBase):
988     """Represents the root of an Arvados Collection.
989
990     This class is threadsafe.  The root collection object, all subcollections
991     and files are protected by a single lock (i.e. each access locks the entire
992     collection).
993
994     Brief summary of
995     useful methods:
996
997     :To read an existing file:
998       `c.open("myfile", "r")`
999
1000     :To write a new file:
1001       `c.open("myfile", "w")`
1002
1003     :To determine if a file exists:
1004       `c.find("myfile") is not None`
1005
1006     :To copy a file:
1007       `c.copy("source", "dest")`
1008
1009     :To delete a file:
1010       `c.remove("myfile")`
1011
1012     :To save to an existing collection record:
1013       `c.save()`
1014
1015     :To save a new collection record:
1016     `c.save_new()`
1017
1018     :To merge remote changes into this object:
1019       `c.update()`
1020
1021     Must be associated with an API server Collection record (during
1022     initialization, or using `save_new`) to use `save` or `update`
1023
1024     """
1025
1026     def __init__(self, manifest_locator_or_text=None,
1027                  api_client=None,
1028                  keep_client=None,
1029                  num_retries=None,
1030                  parent=None,
1031                  apiconfig=None,
1032                  block_manager=None):
1033         """Collection constructor.
1034
1035         :manifest_locator_or_text:
1036           One of Arvados collection UUID, block locator of
1037           a manifest, raw manifest text, or None (to create an empty collection).
1038         :parent:
1039           the parent Collection, may be None.
1040         :apiconfig:
1041           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1042           Prefer this over supplying your own api_client and keep_client (except in testing).
1043           Will use default config settings if not specified.
1044         :api_client:
1045           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1046         :keep_client:
1047           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1048         :num_retries:
1049           the number of retries for API and Keep requests.
1050         :block_manager:
1051           the block manager to use.  If not specified, create one.
1052
1053         """
1054         super(Collection, self).__init__(parent)
1055         self._api_client = api_client
1056         self._keep_client = keep_client
1057         self._block_manager = block_manager
1058
1059         if apiconfig:
1060             self._config = apiconfig
1061         else:
1062             self._config = config.settings()
1063
1064         self.num_retries = num_retries if num_retries is not None else 0
1065         self._manifest_locator = None
1066         self._manifest_text = None
1067         self._api_response = None
1068
1069         self.lock = threading.RLock()
1070         self.callbacks = []
1071         self.events = None
1072
1073         if manifest_locator_or_text:
1074             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1075                 self._manifest_locator = manifest_locator_or_text
1076             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1077                 self._manifest_locator = manifest_locator_or_text
1078             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1079                 self._manifest_text = manifest_locator_or_text
1080             else:
1081                 raise errors.ArgumentError(
1082                     "Argument to CollectionReader must be a manifest or a collection UUID")
1083
1084             try:
1085                 self._populate()
1086             except (IOError, errors.SyntaxError) as e:
1087                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1088
1089     def root_collection(self):
1090         return self
1091
1092     def stream_name(self):
1093         return "."
1094
1095     def writable(self):
1096         return True
1097
1098     @synchronized
1099     @retry_method
1100     def update(self, other=None, num_retries=None):
1101         """Merge the latest collection on the API server with the current collection."""
1102
1103         if other is None:
1104             if self._manifest_locator is None:
1105                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1106             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1107             other = CollectionReader(response["manifest_text"])
1108         baseline = CollectionReader(self._manifest_text)
1109         self.apply(baseline.diff(other))
1110
1111     @synchronized
1112     def _my_api(self):
1113         if self._api_client is None:
1114             self._api_client = ThreadSafeApiCache(self._config)
1115             self._keep_client = self._api_client.keep
1116         return self._api_client
1117
1118     @synchronized
1119     def _my_keep(self):
1120         if self._keep_client is None:
1121             if self._api_client is None:
1122                 self._my_api()
1123             else:
1124                 self._keep_client = KeepClient(api_client=self._api_client)
1125         return self._keep_client
1126
1127     @synchronized
1128     def _my_block_manager(self):
1129         if self._block_manager is None:
1130             self._block_manager = _BlockManager(self._my_keep())
1131         return self._block_manager
1132
1133     def _populate_from_api_server(self):
1134         # As in KeepClient itself, we must wait until the last
1135         # possible moment to instantiate an API client, in order to
1136         # avoid tripping up clients that don't have access to an API
1137         # server.  If we do build one, make sure our Keep client uses
1138         # it.  If instantiation fails, we'll fall back to the except
1139         # clause, just like any other Collection lookup
1140         # failure. Return an exception, or None if successful.
1141         try:
1142             self._api_response = self._my_api().collections().get(
1143                 uuid=self._manifest_locator).execute(
1144                     num_retries=self.num_retries)
1145             self._manifest_text = self._api_response['manifest_text']
1146             return None
1147         except Exception as e:
1148             return e
1149
1150     def _populate_from_keep(self):
1151         # Retrieve a manifest directly from Keep. This has a chance of
1152         # working if [a] the locator includes a permission signature
1153         # or [b] the Keep services are operating in world-readable
1154         # mode. Return an exception, or None if successful.
1155         try:
1156             self._manifest_text = self._my_keep().get(
1157                 self._manifest_locator, num_retries=self.num_retries)
1158         except Exception as e:
1159             return e
1160
1161     def _populate(self):
1162         if self._manifest_locator is None and self._manifest_text is None:
1163             return
1164         error_via_api = None
1165         error_via_keep = None
1166         should_try_keep = ((self._manifest_text is None) and
1167                            util.keep_locator_pattern.match(
1168                                self._manifest_locator))
1169         if ((self._manifest_text is None) and
1170             util.signed_locator_pattern.match(self._manifest_locator)):
1171             error_via_keep = self._populate_from_keep()
1172         if self._manifest_text is None:
1173             error_via_api = self._populate_from_api_server()
1174             if error_via_api is not None and not should_try_keep:
1175                 raise error_via_api
1176         if ((self._manifest_text is None) and
1177             not error_via_keep and
1178             should_try_keep):
1179             # Looks like a keep locator, and we didn't already try keep above
1180             error_via_keep = self._populate_from_keep()
1181         if self._manifest_text is None:
1182             # Nothing worked!
1183             raise errors.NotFoundError(
1184                 ("Failed to retrieve collection '{}' " +
1185                  "from either API server ({}) or Keep ({})."
1186                  ).format(
1187                     self._manifest_locator,
1188                     error_via_api,
1189                     error_via_keep))
1190         # populate
1191         self._baseline_manifest = self._manifest_text
1192         self._import_manifest(self._manifest_text)
1193
1194
1195     def _has_collection_uuid(self):
1196         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1197
1198     def __enter__(self):
1199         return self
1200
1201     def __exit__(self, exc_type, exc_value, traceback):
1202         """Support scoped auto-commit in a with: block."""
1203         if exc_type is not None:
1204             if self.writable() and self._has_collection_uuid():
1205                 self.save()
1206         if self._block_manager is not None:
1207             self._block_manager.stop_threads()
1208
1209     @synchronized
1210     def manifest_locator(self):
1211         """Get the manifest locator, if any.
1212
1213         The manifest locator will be set when the collection is loaded from an
1214         API server record or the portable data hash of a manifest.
1215
1216         The manifest locator will be None if the collection is newly created or
1217         was created directly from manifest text.  The method `save_new()` will
1218         assign a manifest locator.
1219
1220         """
1221         return self._manifest_locator
1222
1223     @synchronized
1224     def clone(self, new_parent=None, readonly=False, new_config=None):
1225         if new_config is None:
1226             new_config = self._config
1227         if readonly:
1228             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1229         else:
1230             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1231
1232         newcollection._clonefrom(self)
1233         return newcollection
1234
1235     @synchronized
1236     def api_response(self):
1237         """Returns information about this Collection fetched from the API server.
1238
1239         If the Collection exists in Keep but not the API server, currently
1240         returns None.  Future versions may provide a synthetic response.
1241
1242         """
1243         return self._api_response
1244
1245     def find_or_create(self, path, create_type):
1246         """See `RichCollectionBase.find_or_create`"""
1247         if path == ".":
1248             return self
1249         else:
1250             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1251
1252     def find(self, path):
1253         """See `RichCollectionBase.find`"""
1254         if path == ".":
1255             return self
1256         else:
1257             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1258
1259     def remove(self, path, recursive=False):
1260         """See `RichCollectionBase.remove`"""
1261         if path == ".":
1262             raise errors.ArgumentError("Cannot remove '.'")
1263         else:
1264             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1265
1266     @must_be_writable
1267     @synchronized
1268     @retry_method
1269     def save(self, merge=True, num_retries=None):
1270         """Save collection to an existing collection record.
1271
1272         Commit pending buffer blocks to Keep, merge with remote record (if
1273         merge=True, the default), and update the collection record.  Returns
1274         the current manifest text.
1275
1276         Will raise AssertionError if not associated with a collection record on
1277         the API server.  If you want to save a manifest to Keep only, see
1278         `save_new()`.
1279
1280         :merge:
1281           Update and merge remote changes before saving.  Otherwise, any
1282           remote changes will be ignored and overwritten.
1283
1284         :num_retries:
1285           Retry count on API calls (if None,  use the collection default)
1286
1287         """
1288         if self.modified():
1289             if not self._has_collection_uuid():
1290                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
1291
1292             self._my_block_manager().commit_all()
1293
1294             if merge:
1295                 self.update()
1296
1297             text = self.manifest_text(strip=False)
1298             self._api_response = self._my_api().collections().update(
1299                 uuid=self._manifest_locator,
1300                 body={'manifest_text': text}
1301                 ).execute(
1302                     num_retries=num_retries)
1303             self._manifest_text = self._api_response["manifest_text"]
1304             self.set_unmodified()
1305
1306         return self._manifest_text
1307
1308
1309     @must_be_writable
1310     @synchronized
1311     @retry_method
1312     def save_new(self, name=None,
1313                  create_collection_record=True,
1314                  owner_uuid=None,
1315                  ensure_unique_name=False,
1316                  num_retries=None):
1317         """Save collection to a new collection record.
1318
1319         Commit pending buffer blocks to Keep and, when create_collection_record
1320         is True (default), create a new collection record.  After creating a
1321         new collection record, this Collection object will be associated with
1322         the new record used by `save()`.  Returns the current manifest text.
1323
1324         :name:
1325           The collection name.
1326
1327         :create_collection_record:
1328            If True, create a collection record on the API server.
1329            If False, only commit blocks to Keep and return the manifest text.
1330
1331         :owner_uuid:
1332           the user, or project uuid that will own this collection.
1333           If None, defaults to the current user.
1334
1335         :ensure_unique_name:
1336           If True, ask the API server to rename the collection
1337           if it conflicts with a collection with the same name and owner.  If
1338           False, a name conflict will result in an error.
1339
1340         :num_retries:
1341           Retry count on API calls (if None,  use the collection default)
1342
1343         """
1344         self._my_block_manager().commit_all()
1345         text = self.manifest_text(strip=False)
1346
1347         if create_collection_record:
1348             if name is None:
1349                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1350
1351             body = {"manifest_text": text,
1352                     "name": name}
1353             if owner_uuid:
1354                 body["owner_uuid"] = owner_uuid
1355
1356             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1357             text = self._api_response["manifest_text"]
1358
1359             self._manifest_locator = self._api_response["uuid"]
1360
1361             self._manifest_text = text
1362             self.set_unmodified()
1363
1364         return text
1365
1366     @synchronized
1367     def subscribe(self, callback):
1368         self.callbacks.append(callback)
1369
1370     @synchronized
1371     def unsubscribe(self, callback):
1372         self.callbacks.remove(callback)
1373
1374     @synchronized
1375     def notify(self, event, collection, name, item):
1376         for c in self.callbacks:
1377             c(event, collection, name, item)
1378
1379     @synchronized
1380     def _import_manifest(self, manifest_text):
1381         """Import a manifest into a `Collection`.
1382
1383         :manifest_text:
1384           The manifest text to import from.
1385
1386         """
1387         if len(self) > 0:
1388             raise ArgumentError("Can only import manifest into an empty collection")
1389
1390         STREAM_NAME = 0
1391         BLOCKS = 1
1392         SEGMENTS = 2
1393
1394         stream_name = None
1395         state = STREAM_NAME
1396
1397         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1398             tok = token_and_separator.group(1)
1399             sep = token_and_separator.group(2)
1400
1401             if state == STREAM_NAME:
1402                 # starting a new stream
1403                 stream_name = tok.replace('\\040', ' ')
1404                 blocks = []
1405                 segments = []
1406                 streamoffset = 0L
1407                 state = BLOCKS
1408                 continue
1409
1410             if state == BLOCKS:
1411                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1412                 if block_locator:
1413                     blocksize = long(block_locator.group(1))
1414                     blocks.append(Range(tok, streamoffset, blocksize))
1415                     streamoffset += blocksize
1416                 else:
1417                     state = SEGMENTS
1418
1419             if state == SEGMENTS:
1420                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1421                 if file_segment:
1422                     pos = long(file_segment.group(1))
1423                     size = long(file_segment.group(2))
1424                     name = file_segment.group(3).replace('\\040', ' ')
1425                     filepath = os.path.join(stream_name, name)
1426                     afile = self.find_or_create(filepath, FILE)
1427                     if isinstance(afile, ArvadosFile):
1428                         afile.add_segment(blocks, pos, size)
1429                     else:
1430                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1431                 else:
1432                     # error!
1433                     raise errors.SyntaxError("Invalid manifest format")
1434
1435             if sep == "\n":
1436                 stream_name = None
1437                 state = STREAM_NAME
1438
1439         self.set_unmodified()
1440
1441
1442 class Subcollection(RichCollectionBase):
1443     """This is a subdirectory within a collection that doesn't have its own API
1444     server record.
1445
1446     It falls under the umbrella of the root collection.
1447
1448     """
1449
1450     def __init__(self, parent):
1451         super(Subcollection, self).__init__(parent)
1452         self.lock = self.root_collection().lock
1453         self._manifest_text = None
1454
1455     def root_collection(self):
1456         return self.parent.root_collection()
1457
1458     def writable(self):
1459         return self.root_collection().writable()
1460
1461     def _my_api(self):
1462         return self.root_collection()._my_api()
1463
1464     def _my_keep(self):
1465         return self.root_collection()._my_keep()
1466
1467     def _my_block_manager(self):
1468         return self.root_collection()._my_block_manager()
1469
1470     def notify(self, event, collection, name, item):
1471         return self.root_collection().notify(event, collection, name, item)
1472
1473     def stream_name(self):
1474         for k, v in self.parent.items():
1475             if v is self:
1476                 return os.path.join(self.parent.stream_name(), k)
1477         return '.'
1478
1479     @synchronized
1480     def clone(self, new_parent):
1481         c = Subcollection(new_parent)
1482         c._clonefrom(self)
1483         return c
1484
1485
1486 class CollectionReader(Collection):
1487     """A read-only collection object.
1488
1489     Initialize from an api collection record locator, a portable data hash of a
1490     manifest, or raw manifest text.  See `Collection` constructor for detailed
1491     options.
1492
1493     """
1494     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1495         self._in_init = True
1496         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1497         self._in_init = False
1498
1499         # Forego any locking since it should never change once initialized.
1500         self.lock = NoopLock()
1501
1502         # Backwards compatability with old CollectionReader
1503         # all_streams() and all_files()
1504         self._streams = None
1505
1506     def writable(self):
1507         return self._in_init
1508
1509     def _populate_streams(orig_func):
1510         @functools.wraps(orig_func)
1511         def populate_streams_wrapper(self, *args, **kwargs):
1512             # Defer populating self._streams until needed since it creates a copy of the manifest.
1513             if self._streams is None:
1514                 if self._manifest_text:
1515                     self._streams = [sline.split()
1516                                      for sline in self._manifest_text.split("\n")
1517                                      if sline]
1518                 else:
1519                     self._streams = []
1520             return orig_func(self, *args, **kwargs)
1521         return populate_streams_wrapper
1522
1523     @_populate_streams
1524     def normalize(self):
1525         """Normalize the streams returned by `all_streams`.
1526
1527         This method is kept for backwards compatability and only affects the
1528         behavior of `all_streams()` and `all_files()`
1529
1530         """
1531
1532         # Rearrange streams
1533         streams = {}
1534         for s in self.all_streams():
1535             for f in s.all_files():
1536                 streamname, filename = split(s.name() + "/" + f.name())
1537                 if streamname not in streams:
1538                     streams[streamname] = {}
1539                 if filename not in streams[streamname]:
1540                     streams[streamname][filename] = []
1541                 for r in f.segments:
1542                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1543
1544         self._streams = [normalize_stream(s, streams[s])
1545                          for s in sorted(streams)]
1546     @_populate_streams
1547     def all_streams(self):
1548         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1549                 for s in self._streams]
1550
1551     @_populate_streams
1552     def all_files(self):
1553         for s in self.all_streams():
1554             for f in s.all_files():
1555                 yield f