d610c3509d672b6ddb828249e594bca1f9c80b1e
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace")
311         self._current_stream_name = '.' if newstreamname=='' else newstreamname
312
313     def current_stream_name(self):
314         return self._current_stream_name
315
316     def finish_current_stream(self):
317         self.finish_current_file()
318         self.flush_data()
319         if not self._current_stream_files:
320             pass
321         elif self._current_stream_name is None:
322             raise errors.AssertionError(
323                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324                 (self._current_stream_length, len(self._current_stream_files)))
325         else:
326             if not self._current_stream_locators:
327                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328             self._finished_streams.append([self._current_stream_name,
329                                            self._current_stream_locators,
330                                            self._current_stream_files])
331         self._current_stream_files = []
332         self._current_stream_length = 0
333         self._current_stream_locators = []
334         self._current_stream_name = None
335         self._current_file_pos = 0
336         self._current_file_name = None
337
338     def finish(self):
339         """Store the manifest in Keep and return its locator.
340
341         This is useful for storing manifest fragments (task outputs)
342         temporarily in Keep during a Crunch job.
343
344         In other cases you should make a collection instead, by
345         sending manifest_text() to the API server's "create
346         collection" endpoint.
347         """
348         return self._my_keep().put(self.manifest_text(), copies=self.replication)
349
350     def portable_data_hash(self):
351         stripped = self.stripped_manifest()
352         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
353
354     def manifest_text(self):
355         self.finish_current_stream()
356         manifest = ''
357
358         for stream in self._finished_streams:
359             if not re.search(r'^\.(/.*)?$', stream[0]):
360                 manifest += './'
361             manifest += stream[0].replace(' ', '\\040')
362             manifest += ' ' + ' '.join(stream[1])
363             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
364             manifest += "\n"
365
366         return manifest
367
368     def data_locators(self):
369         ret = []
370         for name, locators, files in self._finished_streams:
371             ret += locators
372         return ret
373
374
375 class ResumableCollectionWriter(CollectionWriter):
376     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377                    '_current_stream_locators', '_current_stream_name',
378                    '_current_file_name', '_current_file_pos', '_close_file',
379                    '_data_buffer', '_dependencies', '_finished_streams',
380                    '_queued_dirents', '_queued_trees']
381
382     def __init__(self, api_client=None, **kwargs):
383         self._dependencies = {}
384         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
385
386     @classmethod
387     def from_state(cls, state, *init_args, **init_kwargs):
388         # Try to build a new writer from scratch with the given state.
389         # If the state is not suitable to resume (because files have changed,
390         # been deleted, aren't predictable, etc.), raise a
391         # StaleWriterStateError.  Otherwise, return the initialized writer.
392         # The caller is responsible for calling writer.do_queued_work()
393         # appropriately after it's returned.
394         writer = cls(*init_args, **init_kwargs)
395         for attr_name in cls.STATE_PROPS:
396             attr_value = state[attr_name]
397             attr_class = getattr(writer, attr_name).__class__
398             # Coerce the value into the same type as the initial value, if
399             # needed.
400             if attr_class not in (type(None), attr_value.__class__):
401                 attr_value = attr_class(attr_value)
402             setattr(writer, attr_name, attr_value)
403         # Check dependencies before we try to resume anything.
404         if any(KeepLocator(ls).permission_expired()
405                for ls in writer._current_stream_locators):
406             raise errors.StaleWriterStateError(
407                 "locators include expired permission hint")
408         writer.check_dependencies()
409         if state['_current_file'] is not None:
410             path, pos = state['_current_file']
411             try:
412                 writer._queued_file = open(path, 'rb')
413                 writer._queued_file.seek(pos)
414             except IOError as error:
415                 raise errors.StaleWriterStateError(
416                     "failed to reopen active file {}: {}".format(path, error))
417         return writer
418
419     def check_dependencies(self):
420         for path, orig_stat in self._dependencies.items():
421             if not S_ISREG(orig_stat[ST_MODE]):
422                 raise errors.StaleWriterStateError("{} not file".format(path))
423             try:
424                 now_stat = tuple(os.stat(path))
425             except OSError as error:
426                 raise errors.StaleWriterStateError(
427                     "failed to stat {}: {}".format(path, error))
428             if ((not S_ISREG(now_stat[ST_MODE])) or
429                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431                 raise errors.StaleWriterStateError("{} changed".format(path))
432
433     def dump_state(self, copy_func=lambda x: x):
434         state = {attr: copy_func(getattr(self, attr))
435                  for attr in self.STATE_PROPS}
436         if self._queued_file is None:
437             state['_current_file'] = None
438         else:
439             state['_current_file'] = (os.path.realpath(self._queued_file.name),
440                                       self._queued_file.tell())
441         return state
442
443     def _queue_file(self, source, filename=None):
444         try:
445             src_path = os.path.realpath(source)
446         except Exception:
447             raise errors.AssertionError("{} not a file path".format(source))
448         try:
449             path_stat = os.stat(src_path)
450         except OSError as stat_error:
451             path_stat = None
452         super(ResumableCollectionWriter, self)._queue_file(source, filename)
453         fd_stat = os.fstat(self._queued_file.fileno())
454         if not S_ISREG(fd_stat.st_mode):
455             # We won't be able to resume from this cache anyway, so don't
456             # worry about further checks.
457             self._dependencies[source] = tuple(fd_stat)
458         elif path_stat is None:
459             raise errors.AssertionError(
460                 "could not stat {}: {}".format(source, stat_error))
461         elif path_stat.st_ino != fd_stat.st_ino:
462             raise errors.AssertionError(
463                 "{} changed between open and stat calls".format(source))
464         else:
465             self._dependencies[src_path] = tuple(fd_stat)
466
467     def write(self, data):
468         if self._queued_file is None:
469             raise errors.AssertionError(
470                 "resumable writer can't accept unsourced data")
471         return super(ResumableCollectionWriter, self).write(data)
472
473
474 ADD = "add"
475 DEL = "del"
476 MOD = "mod"
477 FILE = "file"
478 COLLECTION = "collection"
479
480 class RichCollectionBase(CollectionBase):
481     """Base class for Collections and Subcollections.
482
483     Implements the majority of functionality relating to accessing items in the
484     Collection.
485
486     """
487
488     def __init__(self, parent=None):
489         self.parent = parent
490         self._modified = True
491         self._callback = None
492         self._items = {}
493
494     def _my_api(self):
495         raise NotImplementedError()
496
497     def _my_keep(self):
498         raise NotImplementedError()
499
500     def _my_block_manager(self):
501         raise NotImplementedError()
502
503     def writable(self):
504         raise NotImplementedError()
505
506     def root_collection(self):
507         raise NotImplementedError()
508
509     def notify(self, event, collection, name, item):
510         raise NotImplementedError()
511
512     def stream_name(self):
513         raise NotImplementedError()
514
515     @must_be_writable
516     @synchronized
517     def find_or_create(self, path, create_type):
518         """Recursively search the specified file path.
519
520         May return either a `Collection` or `ArvadosFile`.  If not found, will
521         create a new item at the specified path based on `create_type`.  Will
522         create intermediate subcollections needed to contain the final item in
523         the path.
524
525         :create_type:
526           One of `arvados.collection.FILE` or
527           `arvados.collection.COLLECTION`.  If the path is not found, and value
528           of create_type is FILE then create and return a new ArvadosFile for
529           the last path component.  If COLLECTION, then create and return a new
530           Collection for the last path component.
531
532         """
533
534         pathcomponents = path.split("/", 1)
535         if pathcomponents[0]:
536             item = self._items.get(pathcomponents[0])
537             if len(pathcomponents) == 1:
538                 if item is None:
539                     # create new file
540                     if create_type == COLLECTION:
541                         item = Subcollection(self, pathcomponents[0])
542                     else:
543                         item = ArvadosFile(self, pathcomponents[0])
544                     self._items[pathcomponents[0]] = item
545                     self._modified = True
546                     self.notify(ADD, self, pathcomponents[0], item)
547                 return item
548             else:
549                 if item is None:
550                     # create new collection
551                     item = Subcollection(self, pathcomponents[0])
552                     self._items[pathcomponents[0]] = item
553                     self._modified = True
554                     self.notify(ADD, self, pathcomponents[0], item)
555                 if isinstance(item, RichCollectionBase):
556                     return item.find_or_create(pathcomponents[1], create_type)
557                 else:
558                     raise IOError(errno.ENOTDIR, "Interior path components must be subcollection")
559         else:
560             return self
561
562     @synchronized
563     def find(self, path):
564         """Recursively search the specified file path.
565
566         May return either a Collection or ArvadosFile.  Return None if not
567         found.
568
569         """
570         if not path:
571             raise errors.ArgumentError("Parameter 'path' must not be empty.")
572
573         pathcomponents = path.split("/", 1)
574         item = self._items.get(pathcomponents[0])
575         if len(pathcomponents) == 1:
576             return item
577         else:
578             if isinstance(item, RichCollectionBase):
579                 if pathcomponents[1]:
580                     return item.find(pathcomponents[1])
581                 else:
582                     return item
583             else:
584                 raise IOError(errno.ENOTDIR, "Interior path components must be subcollection")
585
586     def mkdirs(self, path):
587         """Recursive subcollection create.
588
589         Like `os.mkdirs()`.  Will create intermediate subcollections needed to
590         contain the leaf subcollection path.
591
592         """
593         return self.find_or_create(path, COLLECTION)
594
595     def open(self, path, mode="r"):
596         """Open a file-like object for access.
597
598         :path:
599           path to a file in the collection
600         :mode:
601           one of "r", "r+", "w", "w+", "a", "a+"
602           :"r":
603             opens for reading
604           :"r+":
605             opens for reading and writing.  Reads/writes share a file pointer.
606           :"w", "w+":
607             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
608           :"a", "a+":
609             opens for reading and writing.  All writes are appended to
610             the end of the file.  Writing does not affect the file pointer for
611             reading.
612         """
613         mode = mode.replace("b", "")
614         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
615             raise errors.ArgumentError("Bad mode '%s'" % mode)
616         create = (mode != "r")
617
618         if create and not self.writable():
619             raise IOError(errno.EROFS, "Collection is read only")
620
621         if create:
622             arvfile = self.find_or_create(path, FILE)
623         else:
624             arvfile = self.find(path)
625
626         if arvfile is None:
627             raise IOError(errno.ENOENT, "File not found")
628         if not isinstance(arvfile, ArvadosFile):
629             raise IOError(errno.EISDIR, "Path must refer to a file.")
630
631         if mode[0] == "w":
632             arvfile.truncate(0)
633
634         name = os.path.basename(path)
635
636         if mode == "r":
637             return ArvadosFileReader(arvfile, mode, num_retries=self.num_retries)
638         else:
639             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
640
641     @synchronized
642     def modified(self):
643         """Test if the collection (or any subcollection or file) has been modified."""
644         if self._modified:
645             return True
646         for k,v in self._items.items():
647             if v.modified():
648                 return True
649         return False
650
651     @synchronized
652     def set_unmodified(self):
653         """Recursively clear modified flag."""
654         self._modified = False
655         for k,v in self._items.items():
656             v.set_unmodified()
657
658     @synchronized
659     def __iter__(self):
660         """Iterate over names of files and collections contained in this collection."""
661         return iter(self._items.keys())
662
663     @synchronized
664     def __getitem__(self, k):
665         """Get a file or collection that is directly contained by this collection.
666
667         If you want to search a path, use `find()` instead.
668
669         """
670         return self._items[k]
671
672     @synchronized
673     def __contains__(self, k):
674         """Test if there is a file or collection a directly contained by this collection."""
675         return k in self._items
676
677     @synchronized
678     def __len__(self):
679         """Get the number of items directly contained in this collection."""
680         return len(self._items)
681
682     @must_be_writable
683     @synchronized
684     def __delitem__(self, p):
685         """Delete an item by name which is directly contained by this collection."""
686         del self._items[p]
687         self._modified = True
688         self.notify(DEL, self, p, None)
689
690     @synchronized
691     def keys(self):
692         """Get a list of names of files and collections directly contained in this collection."""
693         return self._items.keys()
694
695     @synchronized
696     def values(self):
697         """Get a list of files and collection objects directly contained in this collection."""
698         return self._items.values()
699
700     @synchronized
701     def items(self):
702         """Get a list of (name, object) tuples directly contained in this collection."""
703         return self._items.items()
704
705     def exists(self, path):
706         """Test if there is a file or collection at `path`."""
707         return self.find(path) is not None
708
709     @must_be_writable
710     @synchronized
711     def remove(self, path, recursive=False):
712         """Remove the file or subcollection (directory) at `path`.
713
714         :recursive:
715           Specify whether to remove non-empty subcollections (True), or raise an error (False).
716         """
717
718         if not path:
719             raise errors.ArgumentError("Parameter 'path' must not be empty.")
720
721         pathcomponents = path.split("/", 1)
722         item = self._items.get(pathcomponents[0])
723         if item is None:
724             raise IOError(errno.ENOENT, "File not found")
725         if len(pathcomponents) == 1:
726             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
727                 raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
728             deleteditem = self._items[pathcomponents[0]]
729             del self._items[pathcomponents[0]]
730             self._modified = True
731             self.notify(DEL, self, pathcomponents[0], deleteditem)
732         else:
733             item.remove(pathcomponents[1])
734
735     def _clonefrom(self, source):
736         for k,v in source.items():
737             self._items[k] = v.clone(self, k)
738
739     def clone(self):
740         raise NotImplementedError()
741
742     @must_be_writable
743     @synchronized
744     def add(self, source_obj, target_name, overwrite=False):
745         """Copy a file or subcollection to this collection.
746
747         :source_obj:
748           An ArvadosFile, or Subcollection object
749
750         :target_name:
751           Destination item name.  If the target name already exists and is a
752           file, this will raise an error unless you specify `overwrite=True`.
753
754         :overwrite:
755           Whether to overwrite target file if it already exists.
756
757         """
758
759         if target_name in self and not overwrite:
760             raise IOError(errno.EEXIST, "File already exists")
761
762         modified_from = None
763         if target_name in self:
764             modified_from = self[target_name]
765
766         # Actually make the copy.
767         dup = source_obj.clone(self, target_name)
768         self._items[target_name] = dup
769         self._modified = True
770
771         if modified_from:
772             self.notify(MOD, self, target_name, (modified_from, dup))
773         else:
774             self.notify(ADD, self, target_name, dup)
775
776     @must_be_writable
777     @synchronized
778     def copy(self, source, target_path, source_collection=None, overwrite=False):
779         """Copy a file or subcollection to a new path in this collection.
780
781         :source:
782           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
783
784         :target_path:
785           Destination file or path.  If the target path already exists and is a
786           subcollection, the item will be placed inside the subcollection.  If
787           the target path already exists and is a file, this will raise an error
788           unless you specify `overwrite=True`.
789
790         :source_collection:
791           Collection to copy `source_path` from (default `self`)
792
793         :overwrite:
794           Whether to overwrite target file if it already exists.
795         """
796         if source_collection is None:
797             source_collection = self
798
799         # Find the object to copy
800         if isinstance(source, basestring):
801             source_obj = source_collection.find(source)
802             if source_obj is None:
803                 raise IOError(errno.ENOENT, "File not found")
804             sourcecomponents = source.split("/")
805         else:
806             source_obj = source
807             sourcecomponents = None
808
809         # Find parent collection the target path
810         targetcomponents = target_path.split("/")
811
812         # Determine the name to use.
813         target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
814
815         if not target_name:
816             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
817
818         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
819
820         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
821             target_dir = target_dir[target_name]
822             target_name = sourcecomponents[-1]
823
824         target_dir.add(source_obj, target_name, overwrite)
825
826     @synchronized
827     def manifest_text(self, stream_name=".", strip=False, normalize=False):
828         """Get the manifest text for this collection, sub collections and files.
829
830         :stream_name:
831           Name of the stream (directory)
832
833         :strip:
834           If True, remove signing tokens from block locators if present.
835           If False (default), block locators are left unchanged.
836
837         :normalize:
838           If True, always export the manifest text in normalized form
839           even if the Collection is not modified.  If False (default) and the collection
840           is not modified, return the original manifest text even if it is not
841           in normalized form.
842
843         """
844
845         if self.modified() or self._manifest_text is None or normalize:
846             stream = {}
847             buf = []
848             sorted_keys = sorted(self.keys())
849             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
850                 # Create a stream per file `k`
851                 arvfile = self[filename]
852                 filestream = []
853                 for segment in arvfile.segments():
854                     loc = segment.locator
855                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
856                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
857                     if strip:
858                         loc = KeepLocator(loc).stripped()
859                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
860                                          segment.segment_offset, segment.range_size))
861                 stream[filename] = filestream
862             if stream:
863                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
864             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
865                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
866             return "".join(buf)
867         else:
868             if strip:
869                 return self.stripped_manifest()
870             else:
871                 return self._manifest_text
872
873     @synchronized
874     def diff(self, end_collection, prefix=".", holding_collection=None):
875         """Generate list of add/modify/delete actions.
876
877         When given to `apply`, will change `self` to match `end_collection`
878
879         """
880         changes = []
881         if holding_collection is None:
882             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
883         for k in self:
884             if k not in end_collection:
885                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
886         for k in end_collection:
887             if k in self:
888                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
889                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
890                 elif end_collection[k] != self[k]:
891                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
892             else:
893                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
894         return changes
895
896     @must_be_writable
897     @synchronized
898     def apply(self, changes):
899         """Apply changes from `diff`.
900
901         If a change conflicts with a local change, it will be saved to an
902         alternate path indicating the conflict.
903
904         """
905         for change in changes:
906             event_type = change[0]
907             path = change[1]
908             initial = change[2]
909             local = self.find(path)
910             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
911                                                                     time.gmtime()))
912             if event_type == ADD:
913                 if local is None:
914                     # No local file at path, safe to copy over new file
915                     self.copy(initial, path)
916                 elif local is not None and local != initial:
917                     # There is already local file and it is different:
918                     # save change to conflict file.
919                     self.copy(initial, conflictpath)
920             elif event_type == MOD:
921                 final = change[3]
922                 if local == initial:
923                     # Local matches the "initial" item so it has not
924                     # changed locally and is safe to update.
925                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
926                         # Replace contents of local file with new contents
927                         local.replace_contents(final)
928                     else:
929                         # Overwrite path with new item; this can happen if
930                         # path was a file and is now a collection or vice versa
931                         self.copy(final, path, overwrite=True)
932                 else:
933                     # Local is missing (presumably deleted) or local doesn't
934                     # match the "start" value, so save change to conflict file
935                     self.copy(final, conflictpath)
936             elif event_type == DEL:
937                 if local == initial:
938                     # Local item matches "initial" value, so it is safe to remove.
939                     self.remove(path, recursive=True)
940                 # else, the file is modified or already removed, in either
941                 # case we don't want to try to remove it.
942
943     def portable_data_hash(self):
944         """Get the portable data hash for this collection's manifest."""
945         stripped = self.manifest_text(strip=True)
946         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
947
948     @synchronized
949     def subscribe(self, callback):
950         if self._callback is None:
951             self._callback = callback
952         else:
953             raise errors.ArgumentError("A callback is already set on this collection.")
954
955     @synchronized
956     def unsubscribe(self):
957         if self._callback is not None:
958             self._callback = None
959
960     @synchronized
961     def notify(self, event, collection, name, item):
962         if self._callback:
963             self._callback(event, collection, name, item)
964         self.root_collection().notify(event, collection, name, item)
965
966     @synchronized
967     def __eq__(self, other):
968         if other is self:
969             return True
970         if not isinstance(other, RichCollectionBase):
971             return False
972         if len(self._items) != len(other):
973             return False
974         for k in self._items:
975             if k not in other:
976                 return False
977             if self._items[k] != other[k]:
978                 return False
979         return True
980
981     def __ne__(self, other):
982         return not self.__eq__(other)
983
984
985 class Collection(RichCollectionBase):
986     """Represents the root of an Arvados Collection.
987
988     This class is threadsafe.  The root collection object, all subcollections
989     and files are protected by a single lock (i.e. each access locks the entire
990     collection).
991
992     Brief summary of
993     useful methods:
994
995     :To read an existing file:
996       `c.open("myfile", "r")`
997
998     :To write a new file:
999       `c.open("myfile", "w")`
1000
1001     :To determine if a file exists:
1002       `c.find("myfile") is not None`
1003
1004     :To copy a file:
1005       `c.copy("source", "dest")`
1006
1007     :To delete a file:
1008       `c.remove("myfile")`
1009
1010     :To save to an existing collection record:
1011       `c.save()`
1012
1013     :To save a new collection record:
1014     `c.save_new()`
1015
1016     :To merge remote changes into this object:
1017       `c.update()`
1018
1019     Must be associated with an API server Collection record (during
1020     initialization, or using `save_new`) to use `save` or `update`
1021
1022     """
1023
1024     def __init__(self, manifest_locator_or_text=None,
1025                  api_client=None,
1026                  keep_client=None,
1027                  num_retries=None,
1028                  parent=None,
1029                  apiconfig=None,
1030                  block_manager=None):
1031         """Collection constructor.
1032
1033         :manifest_locator_or_text:
1034           One of Arvados collection UUID, block locator of
1035           a manifest, raw manifest text, or None (to create an empty collection).
1036         :parent:
1037           the parent Collection, may be None.
1038         :apiconfig:
1039           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1040           Prefer this over supplying your own api_client and keep_client (except in testing).
1041           Will use default config settings if not specified.
1042         :api_client:
1043           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1044         :keep_client:
1045           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1046         :num_retries:
1047           the number of retries for API and Keep requests.
1048         :block_manager:
1049           the block manager to use.  If not specified, create one.
1050
1051         """
1052         super(Collection, self).__init__(parent)
1053         self._api_client = api_client
1054         self._keep_client = keep_client
1055         self._block_manager = block_manager
1056
1057         if apiconfig:
1058             self._config = apiconfig
1059         else:
1060             self._config = config.settings()
1061
1062         self.num_retries = num_retries if num_retries is not None else 0
1063         self._manifest_locator = None
1064         self._manifest_text = None
1065         self._api_response = None
1066
1067         self.lock = threading.RLock()
1068         self.events = None
1069
1070         if manifest_locator_or_text:
1071             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1072                 self._manifest_locator = manifest_locator_or_text
1073             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1074                 self._manifest_locator = manifest_locator_or_text
1075             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1076                 self._manifest_text = manifest_locator_or_text
1077             else:
1078                 raise errors.ArgumentError(
1079                     "Argument to CollectionReader must be a manifest or a collection UUID")
1080
1081             try:
1082                 self._populate()
1083             except (IOError, errors.SyntaxError) as e:
1084                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1085
1086     def root_collection(self):
1087         return self
1088
1089     def stream_name(self):
1090         return "."
1091
1092     def writable(self):
1093         return True
1094
1095     @synchronized
1096     @retry_method
1097     def update(self, other=None, num_retries=None):
1098         """Merge the latest collection on the API server with the current collection."""
1099
1100         if other is None:
1101             if self._manifest_locator is None:
1102                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1103             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1104             other = CollectionReader(response["manifest_text"])
1105         baseline = CollectionReader(self._manifest_text)
1106         self.apply(baseline.diff(other))
1107
1108     @synchronized
1109     def _my_api(self):
1110         if self._api_client is None:
1111             self._api_client = ThreadSafeApiCache(self._config)
1112             self._keep_client = self._api_client.keep
1113         return self._api_client
1114
1115     @synchronized
1116     def _my_keep(self):
1117         if self._keep_client is None:
1118             if self._api_client is None:
1119                 self._my_api()
1120             else:
1121                 self._keep_client = KeepClient(api_client=self._api_client)
1122         return self._keep_client
1123
1124     @synchronized
1125     def _my_block_manager(self):
1126         if self._block_manager is None:
1127             self._block_manager = _BlockManager(self._my_keep())
1128         return self._block_manager
1129
1130     def _populate_from_api_server(self):
1131         # As in KeepClient itself, we must wait until the last
1132         # possible moment to instantiate an API client, in order to
1133         # avoid tripping up clients that don't have access to an API
1134         # server.  If we do build one, make sure our Keep client uses
1135         # it.  If instantiation fails, we'll fall back to the except
1136         # clause, just like any other Collection lookup
1137         # failure. Return an exception, or None if successful.
1138         try:
1139             self._api_response = self._my_api().collections().get(
1140                 uuid=self._manifest_locator).execute(
1141                     num_retries=self.num_retries)
1142             self._manifest_text = self._api_response['manifest_text']
1143             return None
1144         except Exception as e:
1145             return e
1146
1147     def _populate_from_keep(self):
1148         # Retrieve a manifest directly from Keep. This has a chance of
1149         # working if [a] the locator includes a permission signature
1150         # or [b] the Keep services are operating in world-readable
1151         # mode. Return an exception, or None if successful.
1152         try:
1153             self._manifest_text = self._my_keep().get(
1154                 self._manifest_locator, num_retries=self.num_retries)
1155         except Exception as e:
1156             return e
1157
1158     def _populate(self):
1159         if self._manifest_locator is None and self._manifest_text is None:
1160             return
1161         error_via_api = None
1162         error_via_keep = None
1163         should_try_keep = ((self._manifest_text is None) and
1164                            util.keep_locator_pattern.match(
1165                                self._manifest_locator))
1166         if ((self._manifest_text is None) and
1167             util.signed_locator_pattern.match(self._manifest_locator)):
1168             error_via_keep = self._populate_from_keep()
1169         if self._manifest_text is None:
1170             error_via_api = self._populate_from_api_server()
1171             if error_via_api is not None and not should_try_keep:
1172                 raise error_via_api
1173         if ((self._manifest_text is None) and
1174             not error_via_keep and
1175             should_try_keep):
1176             # Looks like a keep locator, and we didn't already try keep above
1177             error_via_keep = self._populate_from_keep()
1178         if self._manifest_text is None:
1179             # Nothing worked!
1180             raise errors.NotFoundError(
1181                 ("Failed to retrieve collection '{}' " +
1182                  "from either API server ({}) or Keep ({})."
1183                  ).format(
1184                     self._manifest_locator,
1185                     error_via_api,
1186                     error_via_keep))
1187         # populate
1188         self._baseline_manifest = self._manifest_text
1189         self._import_manifest(self._manifest_text)
1190
1191
1192     def _has_collection_uuid(self):
1193         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1194
1195     def __enter__(self):
1196         return self
1197
1198     def __exit__(self, exc_type, exc_value, traceback):
1199         """Support scoped auto-commit in a with: block."""
1200         if exc_type is not None:
1201             if self.writable() and self._has_collection_uuid():
1202                 self.save()
1203         if self._block_manager is not None:
1204             self._block_manager.stop_threads()
1205
1206     @synchronized
1207     def manifest_locator(self):
1208         """Get the manifest locator, if any.
1209
1210         The manifest locator will be set when the collection is loaded from an
1211         API server record or the portable data hash of a manifest.
1212
1213         The manifest locator will be None if the collection is newly created or
1214         was created directly from manifest text.  The method `save_new()` will
1215         assign a manifest locator.
1216
1217         """
1218         return self._manifest_locator
1219
1220     @synchronized
1221     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1222         if new_config is None:
1223             new_config = self._config
1224         if readonly:
1225             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1226         else:
1227             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1228
1229         newcollection._clonefrom(self)
1230         return newcollection
1231
1232     @synchronized
1233     def api_response(self):
1234         """Returns information about this Collection fetched from the API server.
1235
1236         If the Collection exists in Keep but not the API server, currently
1237         returns None.  Future versions may provide a synthetic response.
1238
1239         """
1240         return self._api_response
1241
1242     def find_or_create(self, path, create_type):
1243         """See `RichCollectionBase.find_or_create`"""
1244         if path == ".":
1245             return self
1246         else:
1247             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1248
1249     def find(self, path):
1250         """See `RichCollectionBase.find`"""
1251         if path == ".":
1252             return self
1253         else:
1254             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1255
1256     def remove(self, path, recursive=False):
1257         """See `RichCollectionBase.remove`"""
1258         if path == ".":
1259             raise errors.ArgumentError("Cannot remove '.'")
1260         else:
1261             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1262
1263     @must_be_writable
1264     @synchronized
1265     @retry_method
1266     def save(self, merge=True, num_retries=None):
1267         """Save collection to an existing collection record.
1268
1269         Commit pending buffer blocks to Keep, merge with remote record (if
1270         merge=True, the default), write the manifest to Keep, and update the
1271         collection record.
1272
1273         Will raise AssertionError if not associated with a collection record on
1274         the API server.  If you want to save a manifest to Keep only, see
1275         `save_new()`.
1276
1277         :merge:
1278           Update and merge remote changes before saving.  Otherwise, any
1279           remote changes will be ignored and overwritten.
1280
1281         :num_retries:
1282           Retry count on API calls (if None,  use the collection default)
1283
1284         """
1285         if self.modified():
1286             if not self._has_collection_uuid():
1287                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
1288             self._my_block_manager().commit_all()
1289             if merge:
1290                 self.update()
1291             self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1292
1293             text = self.manifest_text(strip=False)
1294             self._api_response = self._my_api().collections().update(
1295                 uuid=self._manifest_locator,
1296                 body={'manifest_text': text}
1297                 ).execute(
1298                     num_retries=num_retries)
1299             self._manifest_text = self._api_response["manifest_text"]
1300             self.set_unmodified()
1301
1302
1303     @must_be_writable
1304     @synchronized
1305     @retry_method
1306     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1307         """Save collection to a new collection record.
1308
1309         Commit pending buffer blocks to Keep, write the manifest to Keep, and
1310         create a new collection record (if create_collection_record True).
1311         After creating a new collection record, this Collection object will be
1312         associated with the new record used by `save()`.
1313
1314         :name:
1315           The collection name.
1316
1317         :create_collection_record:
1318           If True, create a collection record.  If False, only save the manifest to keep.
1319
1320         :owner_uuid:
1321           the user, or project uuid that will own this collection.
1322           If None, defaults to the current user.
1323
1324         :ensure_unique_name:
1325           If True, ask the API server to rename the collection
1326           if it conflicts with a collection with the same name and owner.  If
1327           False, a name conflict will result in an error.
1328
1329         :num_retries:
1330           Retry count on API calls (if None,  use the collection default)
1331
1332         """
1333         self._my_block_manager().commit_all()
1334         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1335         text = self.manifest_text(strip=False)
1336
1337         if create_collection_record:
1338             if name is None:
1339                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1340
1341             body = {"manifest_text": text,
1342                     "name": name}
1343             if owner_uuid:
1344                 body["owner_uuid"] = owner_uuid
1345
1346             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1347             text = self._api_response["manifest_text"]
1348
1349             self._manifest_locator = self._api_response["uuid"]
1350
1351         self._manifest_text = text
1352         self.set_unmodified()
1353
1354     @synchronized
1355     def _import_manifest(self, manifest_text):
1356         """Import a manifest into a `Collection`.
1357
1358         :manifest_text:
1359           The manifest text to import from.
1360
1361         """
1362         if len(self) > 0:
1363             raise ArgumentError("Can only import manifest into an empty collection")
1364
1365         STREAM_NAME = 0
1366         BLOCKS = 1
1367         SEGMENTS = 2
1368
1369         stream_name = None
1370         state = STREAM_NAME
1371
1372         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1373             tok = token_and_separator.group(1)
1374             sep = token_and_separator.group(2)
1375
1376             if state == STREAM_NAME:
1377                 # starting a new stream
1378                 stream_name = tok.replace('\\040', ' ')
1379                 blocks = []
1380                 segments = []
1381                 streamoffset = 0L
1382                 state = BLOCKS
1383                 self.mkdirs(stream_name)
1384                 continue
1385
1386             if state == BLOCKS:
1387                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1388                 if block_locator:
1389                     blocksize = long(block_locator.group(1))
1390                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1391                     streamoffset += blocksize
1392                 else:
1393                     state = SEGMENTS
1394
1395             if state == SEGMENTS:
1396                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1397                 if file_segment:
1398                     pos = long(file_segment.group(1))
1399                     size = long(file_segment.group(2))
1400                     name = file_segment.group(3).replace('\\040', ' ')
1401                     filepath = os.path.join(stream_name, name)
1402                     afile = self.find_or_create(filepath, FILE)
1403                     if isinstance(afile, ArvadosFile):
1404                         afile.add_segment(blocks, pos, size)
1405                     else:
1406                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1407                 else:
1408                     # error!
1409                     raise errors.SyntaxError("Invalid manifest format")
1410
1411             if sep == "\n":
1412                 stream_name = None
1413                 state = STREAM_NAME
1414
1415         self.set_unmodified()
1416
1417     @synchronized
1418     def notify(self, event, collection, name, item):
1419         if self._callback:
1420             self._callback(event, collection, name, item)
1421
1422
1423 class Subcollection(RichCollectionBase):
1424     """This is a subdirectory within a collection that doesn't have its own API
1425     server record.
1426
1427     It falls under the umbrella of the root collection.
1428
1429     """
1430
1431     def __init__(self, parent, name):
1432         super(Subcollection, self).__init__(parent)
1433         self.lock = self.root_collection().lock
1434         self._manifest_text = None
1435         self.name = name
1436         self.num_retries = parent.num_retries
1437
1438     def root_collection(self):
1439         return self.parent.root_collection()
1440
1441     def writable(self):
1442         return self.root_collection().writable()
1443
1444     def _my_api(self):
1445         return self.root_collection()._my_api()
1446
1447     def _my_keep(self):
1448         return self.root_collection()._my_keep()
1449
1450     def _my_block_manager(self):
1451         return self.root_collection()._my_block_manager()
1452
1453     def stream_name(self):
1454         return os.path.join(self.parent.stream_name(), self.name)
1455
1456     @synchronized
1457     def clone(self, new_parent, new_name):
1458         c = Subcollection(new_parent, new_name)
1459         c._clonefrom(self)
1460         return c
1461
1462
1463 class CollectionReader(Collection):
1464     """A read-only collection object.
1465
1466     Initialize from an api collection record locator, a portable data hash of a
1467     manifest, or raw manifest text.  See `Collection` constructor for detailed
1468     options.
1469
1470     """
1471     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1472         self._in_init = True
1473         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1474         self._in_init = False
1475
1476         # Forego any locking since it should never change once initialized.
1477         self.lock = NoopLock()
1478
1479         # Backwards compatability with old CollectionReader
1480         # all_streams() and all_files()
1481         self._streams = None
1482
1483     def writable(self):
1484         return self._in_init
1485
1486     def _populate_streams(orig_func):
1487         @functools.wraps(orig_func)
1488         def populate_streams_wrapper(self, *args, **kwargs):
1489             # Defer populating self._streams until needed since it creates a copy of the manifest.
1490             if self._streams is None:
1491                 if self._manifest_text:
1492                     self._streams = [sline.split()
1493                                      for sline in self._manifest_text.split("\n")
1494                                      if sline]
1495                 else:
1496                     self._streams = []
1497             return orig_func(self, *args, **kwargs)
1498         return populate_streams_wrapper
1499
1500     @_populate_streams
1501     def normalize(self):
1502         """Normalize the streams returned by `all_streams`.
1503
1504         This method is kept for backwards compatability and only affects the
1505         behavior of `all_streams()` and `all_files()`
1506
1507         """
1508
1509         # Rearrange streams
1510         streams = {}
1511         for s in self.all_streams():
1512             for f in s.all_files():
1513                 streamname, filename = split(s.name() + "/" + f.name())
1514                 if streamname not in streams:
1515                     streams[streamname] = {}
1516                 if filename not in streams[streamname]:
1517                     streams[streamname][filename] = []
1518                 for r in f.segments:
1519                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1520
1521         self._streams = [normalize_stream(s, streams[s])
1522                          for s in sorted(streams)]
1523     @_populate_streams
1524     def all_streams(self):
1525         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1526                 for s in self._streams]
1527
1528     @_populate_streams
1529     def all_files(self):
1530         for s in self.all_streams():
1531             for f in s.all_files():
1532                 yield f