Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """
42         Return the manifest for the current collection with all
43         non-portable hints (i.e., permission signatures and other
44         hints other than size hints) removed from the locators.
45         """
46         raw = self.manifest_text()
47         clean = []
48         for line in raw.split("\n"):
49             fields = line.split()
50             if fields:
51                 clean_fields = fields[:1] + [
52                     (re.sub(r'\+[^\d][^\+]*', '', x)
53                      if re.match(util.keep_locator_pattern, x)
54                      else x)
55                     for x in fields[1:]]
56                 clean += [' '.join(clean_fields), "\n"]
57         return ''.join(clean)
58
59
60 class _WriterFile(_FileLikeObjectBase):
61     def __init__(self, coll_writer, name):
62         super(_WriterFile, self).__init__(name, 'wb')
63         self.dest = coll_writer
64
65     def close(self):
66         super(_WriterFile, self).close()
67         self.dest.finish_current_file()
68
69     @_FileLikeObjectBase._before_close
70     def write(self, data):
71         self.dest.write(data)
72
73     @_FileLikeObjectBase._before_close
74     def writelines(self, seq):
75         for data in seq:
76             self.write(data)
77
78     @_FileLikeObjectBase._before_close
79     def flush(self):
80         self.dest.flush_data()
81
82
83 class CollectionWriter(CollectionBase):
84     def __init__(self, api_client=None, num_retries=0, replication=None):
85         """Instantiate a CollectionWriter.
86
87         CollectionWriter lets you build a new Arvados Collection from scratch.
88         Write files to it.  The CollectionWriter will upload data to Keep as
89         appropriate, and provide you with the Collection manifest text when
90         you're finished.
91
92         Arguments:
93         * api_client: The API client to use to look up Collections.  If not
94           provided, CollectionReader will build one from available Arvados
95           configuration.
96         * num_retries: The default number of times to retry failed
97           service requests.  Default 0.  You may change this value
98           after instantiation, but note those changes may not
99           propagate to related objects like the Keep client.
100         * replication: The number of copies of each block to store.
101           If this argument is None or not supplied, replication is
102           the server-provided default if available, otherwise 2.
103         """
104         self._api_client = api_client
105         self.num_retries = num_retries
106         self.replication = (2 if replication is None else replication)
107         self._keep_client = None
108         self._data_buffer = []
109         self._data_buffer_len = 0
110         self._current_stream_files = []
111         self._current_stream_length = 0
112         self._current_stream_locators = []
113         self._current_stream_name = '.'
114         self._current_file_name = None
115         self._current_file_pos = 0
116         self._finished_streams = []
117         self._close_file = None
118         self._queued_file = None
119         self._queued_dirents = deque()
120         self._queued_trees = deque()
121         self._last_open = None
122
123     def __exit__(self, exc_type, exc_value, traceback):
124         if exc_type is None:
125             self.finish()
126
127     def do_queued_work(self):
128         # The work queue consists of three pieces:
129         # * _queued_file: The file object we're currently writing to the
130         #   Collection.
131         # * _queued_dirents: Entries under the current directory
132         #   (_queued_trees[0]) that we want to write or recurse through.
133         #   This may contain files from subdirectories if
134         #   max_manifest_depth == 0 for this directory.
135         # * _queued_trees: Directories that should be written as separate
136         #   streams to the Collection.
137         # This function handles the smallest piece of work currently queued
138         # (current file, then current directory, then next directory) until
139         # no work remains.  The _work_THING methods each do a unit of work on
140         # THING.  _queue_THING methods add a THING to the work queue.
141         while True:
142             if self._queued_file:
143                 self._work_file()
144             elif self._queued_dirents:
145                 self._work_dirents()
146             elif self._queued_trees:
147                 self._work_trees()
148             else:
149                 break
150
151     def _work_file(self):
152         while True:
153             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
154             if not buf:
155                 break
156             self.write(buf)
157         self.finish_current_file()
158         if self._close_file:
159             self._queued_file.close()
160         self._close_file = None
161         self._queued_file = None
162
163     def _work_dirents(self):
164         path, stream_name, max_manifest_depth = self._queued_trees[0]
165         if stream_name != self.current_stream_name():
166             self.start_new_stream(stream_name)
167         while self._queued_dirents:
168             dirent = self._queued_dirents.popleft()
169             target = os.path.join(path, dirent)
170             if os.path.isdir(target):
171                 self._queue_tree(target,
172                                  os.path.join(stream_name, dirent),
173                                  max_manifest_depth - 1)
174             else:
175                 self._queue_file(target, dirent)
176                 break
177         if not self._queued_dirents:
178             self._queued_trees.popleft()
179
180     def _work_trees(self):
181         path, stream_name, max_manifest_depth = self._queued_trees[0]
182         d = util.listdir_recursive(
183             path, max_depth = (None if max_manifest_depth == 0 else 0))
184         if d:
185             self._queue_dirents(stream_name, d)
186         else:
187             self._queued_trees.popleft()
188
189     def _queue_file(self, source, filename=None):
190         assert (self._queued_file is None), "tried to queue more than one file"
191         if not hasattr(source, 'read'):
192             source = open(source, 'rb')
193             self._close_file = True
194         else:
195             self._close_file = False
196         if filename is None:
197             filename = os.path.basename(source.name)
198         self.start_new_file(filename)
199         self._queued_file = source
200
201     def _queue_dirents(self, stream_name, dirents):
202         assert (not self._queued_dirents), "tried to queue more than one tree"
203         self._queued_dirents = deque(sorted(dirents))
204
205     def _queue_tree(self, path, stream_name, max_manifest_depth):
206         self._queued_trees.append((path, stream_name, max_manifest_depth))
207
208     def write_file(self, source, filename=None):
209         self._queue_file(source, filename)
210         self.do_queued_work()
211
212     def write_directory_tree(self,
213                              path, stream_name='.', max_manifest_depth=-1):
214         self._queue_tree(path, stream_name, max_manifest_depth)
215         self.do_queued_work()
216
217     def write(self, newdata):
218         if hasattr(newdata, '__iter__'):
219             for s in newdata:
220                 self.write(s)
221             return
222         self._data_buffer.append(newdata)
223         self._data_buffer_len += len(newdata)
224         self._current_stream_length += len(newdata)
225         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
226             self.flush_data()
227
228     def open(self, streampath, filename=None):
229         """open(streampath[, filename]) -> file-like object
230
231         Pass in the path of a file to write to the Collection, either as a
232         single string or as two separate stream name and file name arguments.
233         This method returns a file-like object you can write to add it to the
234         Collection.
235
236         You may only have one file object from the Collection open at a time,
237         so be sure to close the object when you're done.  Using the object in
238         a with statement makes that easy::
239
240           with cwriter.open('./doc/page1.txt') as outfile:
241               outfile.write(page1_data)
242           with cwriter.open('./doc/page2.txt') as outfile:
243               outfile.write(page2_data)
244         """
245         if filename is None:
246             streampath, filename = split(streampath)
247         if self._last_open and not self._last_open.closed:
248             raise errors.AssertionError(
249                 "can't open '{}' when '{}' is still open".format(
250                     filename, self._last_open.name))
251         if streampath != self.current_stream_name():
252             self.start_new_stream(streampath)
253         self.set_current_file_name(filename)
254         self._last_open = _WriterFile(self, filename)
255         return self._last_open
256
257     def flush_data(self):
258         data_buffer = ''.join(self._data_buffer)
259         if data_buffer:
260             self._current_stream_locators.append(
261                 self._my_keep().put(
262                     data_buffer[0:config.KEEP_BLOCK_SIZE],
263                     copies=self.replication))
264             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
265             self._data_buffer_len = len(self._data_buffer[0])
266
267     def start_new_file(self, newfilename=None):
268         self.finish_current_file()
269         self.set_current_file_name(newfilename)
270
271     def set_current_file_name(self, newfilename):
272         if re.search(r'[\t\n]', newfilename):
273             raise errors.AssertionError(
274                 "Manifest filenames cannot contain whitespace: %s" %
275                 newfilename)
276         elif re.search(r'\x00', newfilename):
277             raise errors.AssertionError(
278                 "Manifest filenames cannot contain NUL characters: %s" %
279                 newfilename)
280         self._current_file_name = newfilename
281
282     def current_file_name(self):
283         return self._current_file_name
284
285     def finish_current_file(self):
286         if self._current_file_name is None:
287             if self._current_file_pos == self._current_stream_length:
288                 return
289             raise errors.AssertionError(
290                 "Cannot finish an unnamed file " +
291                 "(%d bytes at offset %d in '%s' stream)" %
292                 (self._current_stream_length - self._current_file_pos,
293                  self._current_file_pos,
294                  self._current_stream_name))
295         self._current_stream_files.append([
296                 self._current_file_pos,
297                 self._current_stream_length - self._current_file_pos,
298                 self._current_file_name])
299         self._current_file_pos = self._current_stream_length
300         self._current_file_name = None
301
302     def start_new_stream(self, newstreamname='.'):
303         self.finish_current_stream()
304         self.set_current_stream_name(newstreamname)
305
306     def set_current_stream_name(self, newstreamname):
307         if re.search(r'[\t\n]', newstreamname):
308             raise errors.AssertionError(
309                 "Manifest stream names cannot contain whitespace")
310         self._current_stream_name = '.' if newstreamname=='' else newstreamname
311
312     def current_stream_name(self):
313         return self._current_stream_name
314
315     def finish_current_stream(self):
316         self.finish_current_file()
317         self.flush_data()
318         if not self._current_stream_files:
319             pass
320         elif self._current_stream_name is None:
321             raise errors.AssertionError(
322                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
323                 (self._current_stream_length, len(self._current_stream_files)))
324         else:
325             if not self._current_stream_locators:
326                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
327             self._finished_streams.append([self._current_stream_name,
328                                            self._current_stream_locators,
329                                            self._current_stream_files])
330         self._current_stream_files = []
331         self._current_stream_length = 0
332         self._current_stream_locators = []
333         self._current_stream_name = None
334         self._current_file_pos = 0
335         self._current_file_name = None
336
337     def finish(self):
338         """Store the manifest in Keep and return its locator.
339
340         This is useful for storing manifest fragments (task outputs)
341         temporarily in Keep during a Crunch job.
342
343         In other cases you should make a collection instead, by
344         sending manifest_text() to the API server's "create
345         collection" endpoint.
346         """
347         return self._my_keep().put(self.manifest_text(), copies=self.replication)
348
349     def portable_data_hash(self):
350         stripped = self.stripped_manifest()
351         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
352
353     def manifest_text(self):
354         self.finish_current_stream()
355         manifest = ''
356
357         for stream in self._finished_streams:
358             if not re.search(r'^\.(/.*)?$', stream[0]):
359                 manifest += './'
360             manifest += stream[0].replace(' ', '\\040')
361             manifest += ' ' + ' '.join(stream[1])
362             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
363             manifest += "\n"
364
365         return manifest
366
367     def data_locators(self):
368         ret = []
369         for name, locators, files in self._finished_streams:
370             ret += locators
371         return ret
372
373
374 class ResumableCollectionWriter(CollectionWriter):
375     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
376                    '_current_stream_locators', '_current_stream_name',
377                    '_current_file_name', '_current_file_pos', '_close_file',
378                    '_data_buffer', '_dependencies', '_finished_streams',
379                    '_queued_dirents', '_queued_trees']
380
381     def __init__(self, api_client=None, **kwargs):
382         self._dependencies = {}
383         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
384
385     @classmethod
386     def from_state(cls, state, *init_args, **init_kwargs):
387         # Try to build a new writer from scratch with the given state.
388         # If the state is not suitable to resume (because files have changed,
389         # been deleted, aren't predictable, etc.), raise a
390         # StaleWriterStateError.  Otherwise, return the initialized writer.
391         # The caller is responsible for calling writer.do_queued_work()
392         # appropriately after it's returned.
393         writer = cls(*init_args, **init_kwargs)
394         for attr_name in cls.STATE_PROPS:
395             attr_value = state[attr_name]
396             attr_class = getattr(writer, attr_name).__class__
397             # Coerce the value into the same type as the initial value, if
398             # needed.
399             if attr_class not in (type(None), attr_value.__class__):
400                 attr_value = attr_class(attr_value)
401             setattr(writer, attr_name, attr_value)
402         # Check dependencies before we try to resume anything.
403         if any(KeepLocator(ls).permission_expired()
404                for ls in writer._current_stream_locators):
405             raise errors.StaleWriterStateError(
406                 "locators include expired permission hint")
407         writer.check_dependencies()
408         if state['_current_file'] is not None:
409             path, pos = state['_current_file']
410             try:
411                 writer._queued_file = open(path, 'rb')
412                 writer._queued_file.seek(pos)
413             except IOError as error:
414                 raise errors.StaleWriterStateError(
415                     "failed to reopen active file {}: {}".format(path, error))
416         return writer
417
418     def check_dependencies(self):
419         for path, orig_stat in self._dependencies.items():
420             if not S_ISREG(orig_stat[ST_MODE]):
421                 raise errors.StaleWriterStateError("{} not file".format(path))
422             try:
423                 now_stat = tuple(os.stat(path))
424             except OSError as error:
425                 raise errors.StaleWriterStateError(
426                     "failed to stat {}: {}".format(path, error))
427             if ((not S_ISREG(now_stat[ST_MODE])) or
428                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
429                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
430                 raise errors.StaleWriterStateError("{} changed".format(path))
431
432     def dump_state(self, copy_func=lambda x: x):
433         state = {attr: copy_func(getattr(self, attr))
434                  for attr in self.STATE_PROPS}
435         if self._queued_file is None:
436             state['_current_file'] = None
437         else:
438             state['_current_file'] = (os.path.realpath(self._queued_file.name),
439                                       self._queued_file.tell())
440         return state
441
442     def _queue_file(self, source, filename=None):
443         try:
444             src_path = os.path.realpath(source)
445         except Exception:
446             raise errors.AssertionError("{} not a file path".format(source))
447         try:
448             path_stat = os.stat(src_path)
449         except OSError as stat_error:
450             path_stat = None
451         super(ResumableCollectionWriter, self)._queue_file(source, filename)
452         fd_stat = os.fstat(self._queued_file.fileno())
453         if not S_ISREG(fd_stat.st_mode):
454             # We won't be able to resume from this cache anyway, so don't
455             # worry about further checks.
456             self._dependencies[source] = tuple(fd_stat)
457         elif path_stat is None:
458             raise errors.AssertionError(
459                 "could not stat {}: {}".format(source, stat_error))
460         elif path_stat.st_ino != fd_stat.st_ino:
461             raise errors.AssertionError(
462                 "{} changed between open and stat calls".format(source))
463         else:
464             self._dependencies[src_path] = tuple(fd_stat)
465
466     def write(self, data):
467         if self._queued_file is None:
468             raise errors.AssertionError(
469                 "resumable writer can't accept unsourced data")
470         return super(ResumableCollectionWriter, self).write(data)
471
472 ADD = "add"
473 DEL = "del"
474 MOD = "mod"
475 FILE = "file"
476 COLLECTION = "collection"
477
478 class SynchronizedCollectionBase(CollectionBase):
479     """Base class for Collections and Subcollections.
480
481     Implements the majority of functionality relating to accessing items in the
482     Collection.
483
484     """
485
486     def __init__(self, parent=None):
487         self.parent = parent
488         self._modified = True
489         self._items = {}
490
491     def _my_api(self):
492         raise NotImplementedError()
493
494     def _my_keep(self):
495         raise NotImplementedError()
496
497     def _my_block_manager(self):
498         raise NotImplementedError()
499
500     def sync_mode(self):
501         raise NotImplementedError()
502
503     def root_collection(self):
504         raise NotImplementedError()
505
506     def notify(self, event, collection, name, item):
507         raise NotImplementedError()
508
509     def stream_name(self):
510         raise NotImplementedError()
511
512     @must_be_writable
513     @synchronized
514     def find_or_create(self, path, create_type):
515         """Recursively search the specified file path.
516
517         May return either a `Collection` or `ArvadosFile`.  If not found, will
518         create a new item at the specified path based on `create_type`.  Will
519         create intermediate subcollections needed to contain the final item in
520         the path.
521
522         :create_type:
523           One of `arvado.collection.FILE` or
524           `arvado.collection.COLLECTION`.  If the path is not found, and value
525           of create_type is FILE then create and return a new ArvadosFile for
526           the last path component.  If COLLECTION, then create and return a new
527           Collection for the last path component.
528
529         """
530
531         pathcomponents = path.split("/")
532
533         if pathcomponents and pathcomponents[0]:
534             item = self._items.get(pathcomponents[0])
535             if len(pathcomponents) == 1:
536                 # item must be a file
537                 if item is None:
538                     # create new file
539                     if create_type == COLLECTION:
540                         item = Subcollection(self)
541                     else:
542                         item = ArvadosFile(self)
543                     self._items[pathcomponents[0]] = item
544                     self._modified = True
545                     self.notify(ADD, self, pathcomponents[0], item)
546                 return item
547             else:
548                 if item is None:
549                     # create new collection
550                     item = Subcollection(self)
551                     self._items[pathcomponents[0]] = item
552                     self._modified = True
553                     self.notify(ADD, self, pathcomponents[0], item)
554                 del pathcomponents[0]
555                 if isinstance(item, SynchronizedCollectionBase):
556                     return item.find_or_create("/".join(pathcomponents), create_type)
557                 else:
558                     raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
559         else:
560             return self
561
562     @synchronized
563     def find(self, path):
564         """Recursively search the specified file path.
565
566         May return either a Collection or ArvadosFile.  Return None if not
567         found.
568
569         """
570         pathcomponents = path.split("/")
571
572         if pathcomponents and pathcomponents[0]:
573             item = self._items.get(pathcomponents[0])
574             if len(pathcomponents) == 1:
575                 # item must be a file
576                 return item
577             else:
578                 del pathcomponents[0]
579                 if isinstance(item, SynchronizedCollectionBase):
580                     return item.find("/".join(pathcomponents))
581                 else:
582                     raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
583         else:
584             return self
585
586     def mkdirs(path):
587         """Recursive subcollection create.
588
589         Like `os.mkdirs()`.  Will create intermediate subcollections needed to
590         contain the leaf subcollection path.
591
592         """
593         return self.find_or_create(path, COLLECTION)
594
595     def open(self, path, mode="r"):
596         """Open a file-like object for access.
597
598         :path:
599           path to a file in the collection
600         :mode:
601           one of "r", "r+", "w", "w+", "a", "a+"
602           :"r":
603             opens for reading
604           :"r+":
605             opens for reading and writing.  Reads/writes share a file pointer.
606           :"w", "w+":
607             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
608           :"a", "a+":
609             opens for reading and writing.  All writes are appended to
610             the end of the file.  Writing does not affect the file pointer for
611             reading.
612         """
613         mode = mode.replace("b", "")
614         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
615             raise errors.ArgumentError("Bad mode '%s'" % mode)
616         create = (mode != "r")
617
618         if create and self.sync_mode() == SYNC_READONLY:
619             raise IOError((errno.EROFS, "Collection is read only"))
620
621         if create:
622             arvfile = self.find_or_create(path, FILE)
623         else:
624             arvfile = self.find(path)
625
626         if arvfile is None:
627             raise IOError((errno.ENOENT, "File not found"))
628         if not isinstance(arvfile, ArvadosFile):
629             raise IOError((errno.EISDIR, "Path must refer to a file."))
630
631         if mode[0] == "w":
632             arvfile.truncate(0)
633
634         name = os.path.basename(path)
635
636         if mode == "r":
637             return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
638         else:
639             return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
640
641     @synchronized
642     def modified(self):
643         """Test if the collection (or any subcollection or file) has been modified
644         since it was created."""
645         if self._modified:
646             return True
647         for k,v in self._items.items():
648             if v.modified():
649                 return True
650         return False
651
652     @synchronized
653     def set_unmodified(self):
654         """Recursively clear modified flag."""
655         self._modified = False
656         for k,v in self._items.items():
657             v.set_unmodified()
658
659     @synchronized
660     def __iter__(self):
661         """Iterate over names of files and collections contained in this collection."""
662         return iter(self._items.keys())
663
664     @synchronized
665     def iterkeys(self):
666         """Iterate over names of files and collections directly contained in this collection."""
667         return self._items.keys()
668
669     @synchronized
670     def __getitem__(self, k):
671         """Get a file or collection that is directly contained by this collection.  If
672         you want to search a path, use `find()` instead.
673         """
674         return self._items[k]
675
676     @synchronized
677     def __contains__(self, k):
678         """If there is a file or collection a directly contained by this collection
679         with name `k`."""
680         return k in self._items
681
682     @synchronized
683     def __len__(self):
684         """Get the number of items directly contained in this collection."""
685         return len(self._items)
686
687     @must_be_writable
688     @synchronized
689     def __delitem__(self, p):
690         """Delete an item by name which is directly contained by this collection."""
691         del self._items[p]
692         self._modified = True
693         self.notify(DEL, self, p, None)
694
695     @synchronized
696     def keys(self):
697         """Get a list of names of files and collections directly contained in this collection."""
698         return self._items.keys()
699
700     @synchronized
701     def values(self):
702         """Get a list of files and collection objects directly contained in this collection."""
703         return self._items.values()
704
705     @synchronized
706     def items(self):
707         """Get a list of (name, object) tuples directly contained in this collection."""
708         return self._items.items()
709
710     def exists(self, path):
711         """Test if there is a file or collection at `path`."""
712         return self.find(path) != None
713
714     @must_be_writable
715     @synchronized
716     def remove(self, path, recursive=False):
717         """Remove the file or subcollection (directory) at `path`.
718
719         :recursive:
720           Specify whether to remove non-empty subcollections (True), or raise an error (False).
721         """
722         pathcomponents = path.split("/")
723
724         if len(pathcomponents) > 0:
725             item = self._items.get(pathcomponents[0])
726             if item is None:
727                 raise IOError((errno.ENOENT, "File not found"))
728             if len(pathcomponents) == 1:
729                 if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
730                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
731                 deleteditem = self._items[pathcomponents[0]]
732                 del self._items[pathcomponents[0]]
733                 self._modified = True
734                 self.notify(DEL, self, pathcomponents[0], deleteditem)
735             else:
736                 del pathcomponents[0]
737                 item.remove("/".join(pathcomponents))
738         else:
739             raise IOError((errno.ENOENT, "File not found"))
740
741     def _cloneinto(self, target):
742         for k,v in self._items.items():
743             target._items[k] = v.clone(target)
744
745     def clone(self):
746         raise NotImplementedError()
747
748     @must_be_writable
749     @synchronized
750     def copy(self, source, target_path, source_collection=None, overwrite=False):
751         """Copy a file or subcollection to a new path in this collection.
752
753         :source:
754           An ArvadosFile, Subcollection, or string with a path to source file or subcollection
755
756         :target_path:
757           Destination file or path.  If the target path already exists and is a
758           subcollection, the item will be placed inside the subcollection.  If
759           the target path already exists and is a file, this will raise an error
760           unless you specify `overwrite=True`.
761
762         :source_collection:
763           Collection to copy `source_path` from (default `self`)
764
765         :overwrite:
766           Whether to overwrite target file if it already exists.
767         """
768         if source_collection is None:
769             source_collection = self
770
771         # Find the object to copy
772         if isinstance(source, basestring):
773             source_obj = source_collection.find(source)
774             if source_obj is None:
775                 raise IOError((errno.ENOENT, "File not found"))
776             sourcecomponents = source.split("/")
777         else:
778             source_obj = source
779             sourcecomponents = None
780
781         # Find parent collection the target path
782         targetcomponents = target_path.split("/")
783
784         # Determine the name to use.
785         target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
786
787         if not target_name:
788             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
789
790         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
791
792         if target_name in target_dir:
793             if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
794                 target_dir = target_dir[target_name]
795                 target_name = sourcecomponents[-1]
796             elif not overwrite:
797                 raise IOError((errno.EEXIST, "File already exists"))
798
799         modified_from = None
800         if target_name in target_dir:
801             modified_from = target_dir[target_name]
802
803         # Actually make the copy.
804         dup = source_obj.clone(target_dir)
805         target_dir._items[target_name] = dup
806         target_dir._modified = True
807
808         if modified_from:
809             self.notify(MOD, target_dir, target_name, (modified_from, dup))
810         else:
811             self.notify(ADD, target_dir, target_name, dup)
812
813     @synchronized
814     def manifest_text(self, stream_name=".", strip=False, normalize=False):
815         """Get the manifest text for this collection, sub collections and files.
816
817         :stream_name:
818           Name of the stream (directory)
819
820         :strip:
821           If True, remove signing tokens from block locators if present.
822           If False (default), block locators are left unchanged.
823
824         :normalize:
825           If True, always export the manifest text in normalized form
826           even if the Collection is not modified.  If False (default) and the collection
827           is not modified, return the original manifest text even if it is not
828           in normalized form.
829
830         """
831
832         if self.modified() or self._manifest_text is None or normalize:
833             item  = self
834             stream = {}
835             buf = ""
836             sorted_keys = sorted(item.keys())
837             for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
838                 # Create a stream per file `k`
839                 arvfile = item[filename]
840                 filestream = []
841                 for segment in arvfile.segments():
842                     loc = segment.locator
843                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
844                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
845                     if strip:
846                         loc = KeepLocator(loc).stripped()
847                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
848                                          segment.segment_offset, segment.range_size))
849                 stream[filename] = filestream
850             if stream:
851                 buf += ' '.join(normalize_stream(stream_name, stream))
852                 buf += "\n"
853             for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
854                 buf += item[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip)
855             return buf
856         else:
857             if strip:
858                 return self.stripped_manifest()
859             else:
860                 return self._manifest_text
861
862     @synchronized
863     def diff(self, end_collection, prefix=".", holding_collection=None):
864         """
865         Generate list of add/modify/delete actions which, when given to `apply`, will
866         change `self` to match `end_collection`
867         """
868         changes = []
869         if holding_collection is None:
870             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
871         for k in self:
872             if k not in end_collection:
873                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
874         for k in end_collection:
875             if k in self:
876                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
877                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
878                 elif end_collection[k] != self[k]:
879                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
880             else:
881                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
882         return changes
883
884     @must_be_writable
885     @synchronized
886     def apply(self, changes):
887         """Apply changes from `diff`.
888
889         If a change conflicts with a local change, it will be saved to an
890         alternate path indicating the conflict.
891
892         """
893         for change in changes:
894             event_type = change[0]
895             path = change[1]
896             initial = change[2]
897             local = self.find(path)
898             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
899                                                                     time.gmtime()))
900             if event_type == ADD:
901                 if local is None:
902                     # No local file at path, safe to copy over new file
903                     self.copy(initial, path)
904                 elif local is not None and local != initial:
905                     # There is already local file and it is different:
906                     # save change to conflict file.
907                     self.copy(initial, conflictpath)
908             elif event_type == MOD:
909                 final = change[3]
910                 if local == initial:
911                     # Local matches the "initial" item so it has not
912                     # changed locally and is safe to update.
913                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
914                         # Replace contents of local file with new contents
915                         local.replace_contents(final)
916                     else:
917                         # Overwrite path with new item; this can happen if
918                         # path was a file and is now a collection or vice versa
919                         self.copy(final, path, overwrite=True)
920                 else:
921                     # Local is missing (presumably deleted) or local doesn't
922                     # match the "start" value, so save change to conflict file
923                     self.copy(final, conflictpath)
924             elif event_type == DEL:
925                 if local == initial:
926                     # Local item matches "initial" value, so it is safe to remove.
927                     self.remove(path, recursive=True)
928                 # else, the file is modified or already removed, in either
929                 # case we don't want to try to remove it.
930
931     def portable_data_hash(self):
932         """Get the portable data hash for this collection's manifest."""
933         stripped = self.manifest_text(strip=True)
934         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
935
936     @synchronized
937     def __eq__(self, other):
938         if other is self:
939             return True
940         if not isinstance(other, SynchronizedCollectionBase):
941             return False
942         if len(self._items) != len(other):
943             return False
944         for k in self._items:
945             if k not in other:
946                 return False
947             if self._items[k] != other[k]:
948                 return False
949         return True
950
951     def __ne__(self, other):
952         return not self.__eq__(other)
953
954
955 class Collection(SynchronizedCollectionBase):
956     """Represents the root of an Arvados Collection, which may be associated with
957     an API server Collection record.
958
959     Brief summary of useful methods:
960
961     :To read an existing file:
962       `c.open("myfile", "r")`
963
964     :To write a new file:
965       `c.open("myfile", "w")`
966
967     :To determine if a file exists:
968       `c.find("myfile") is not None`
969
970     :To copy a file:
971       `c.copy("source", "dest")`
972
973     :To delete a file:
974       `c.remove("myfile")`
975
976     :To save to an existing collection record:
977       `c.save()`
978
979     :To save a new collection record:
980     `c.save_new()`
981
982     :To merge remote changes into this object:
983       `c.update()`
984
985     This class is threadsafe.  The root collection object, all subcollections
986     and files are protected by a single lock (i.e. each access locks the entire
987     collection).
988
989     """
990
991     def __init__(self, manifest_locator_or_text=None,
992                  api_client=None,
993                  keep_client=None,
994                  num_retries=None,
995                  parent=None,
996                  apiconfig=None,
997                  block_manager=None):
998         """Collection constructor.
999
1000         :manifest_locator_or_text:
1001           One of Arvados collection UUID, block locator of
1002           a manifest, raw manifest text, or None (to create an empty collection).
1003         :parent:
1004           the parent Collection, may be None.
1005         :apiconfig:
1006           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1007           Prefer this over supplying your own api_client and keep_client (except in testing).
1008           Will use default config settings if not specified.
1009         :api_client:
1010           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1011         :keep_client:
1012           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1013         :num_retries:
1014           the number of retries for API and Keep requests.
1015         :block_manager:
1016           the block manager to use.  If not specified, create one.
1017
1018         """
1019         super(Collection, self).__init__(parent)
1020         self._api_client = api_client
1021         self._keep_client = keep_client
1022         self._block_manager = block_manager
1023
1024         if apiconfig:
1025             self._config = apiconfig
1026         else:
1027             self._config = config.settings()
1028
1029         self.num_retries = num_retries if num_retries is not None else 0
1030         self._manifest_locator = None
1031         self._manifest_text = None
1032         self._api_response = None
1033
1034         self._sync = SYNC_EXPLICIT
1035         self.lock = threading.RLock()
1036         self.callbacks = []
1037         self.events = None
1038
1039         if manifest_locator_or_text:
1040             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1041                 self._manifest_locator = manifest_locator_or_text
1042             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1043                 self._manifest_locator = manifest_locator_or_text
1044             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1045                 self._manifest_text = manifest_locator_or_text
1046             else:
1047                 raise errors.ArgumentError(
1048                     "Argument to CollectionReader must be a manifest or a collection UUID")
1049
1050             self._populate()
1051
1052
1053     def root_collection(self):
1054         return self
1055
1056     def stream_name(self):
1057         return "."
1058
1059     def sync_mode(self):
1060         return self._sync
1061
1062     @synchronized
1063     @retry_method
1064     def update(self, other=None, num_retries=None):
1065         """Fetch the latest collection record on the API server and merge it with the
1066         current collection contents.
1067
1068         """
1069         if other is None:
1070             if self._manifest_locator is None:
1071                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1072             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1073             other = CollectionReader(response["manifest_text"])
1074         baseline = CollectionReader(self._manifest_text)
1075         self.apply(baseline.diff(other))
1076
1077     @synchronized
1078     def _my_api(self):
1079         if self._api_client is None:
1080             self._api_client = ThreadSafeApiCache(self._config)
1081             self._keep_client = self._api_client.keep
1082         return self._api_client
1083
1084     @synchronized
1085     def _my_keep(self):
1086         if self._keep_client is None:
1087             if self._api_client is None:
1088                 self._my_api()
1089             else:
1090                 self._keep_client = KeepClient(api_client=self._api_client)
1091         return self._keep_client
1092
1093     @synchronized
1094     def _my_block_manager(self):
1095         if self._block_manager is None:
1096             self._block_manager = _BlockManager(self._my_keep())
1097         return self._block_manager
1098
1099     def _populate_from_api_server(self):
1100         # As in KeepClient itself, we must wait until the last
1101         # possible moment to instantiate an API client, in order to
1102         # avoid tripping up clients that don't have access to an API
1103         # server.  If we do build one, make sure our Keep client uses
1104         # it.  If instantiation fails, we'll fall back to the except
1105         # clause, just like any other Collection lookup
1106         # failure. Return an exception, or None if successful.
1107         try:
1108             self._api_response = self._my_api().collections().get(
1109                 uuid=self._manifest_locator).execute(
1110                     num_retries=self.num_retries)
1111             self._manifest_text = self._api_response['manifest_text']
1112             return None
1113         except Exception as e:
1114             return e
1115
1116     def _populate_from_keep(self):
1117         # Retrieve a manifest directly from Keep. This has a chance of
1118         # working if [a] the locator includes a permission signature
1119         # or [b] the Keep services are operating in world-readable
1120         # mode. Return an exception, or None if successful.
1121         try:
1122             self._manifest_text = self._my_keep().get(
1123                 self._manifest_locator, num_retries=self.num_retries)
1124         except Exception as e:
1125             return e
1126
1127     def _populate(self):
1128         if self._manifest_locator is None and self._manifest_text is None:
1129             return
1130         error_via_api = None
1131         error_via_keep = None
1132         should_try_keep = ((self._manifest_text is None) and
1133                            util.keep_locator_pattern.match(
1134                                self._manifest_locator))
1135         if ((self._manifest_text is None) and
1136             util.signed_locator_pattern.match(self._manifest_locator)):
1137             error_via_keep = self._populate_from_keep()
1138         if self._manifest_text is None:
1139             error_via_api = self._populate_from_api_server()
1140             if error_via_api is not None and not should_try_keep:
1141                 raise error_via_api
1142         if ((self._manifest_text is None) and
1143             not error_via_keep and
1144             should_try_keep):
1145             # Looks like a keep locator, and we didn't already try keep above
1146             error_via_keep = self._populate_from_keep()
1147         if self._manifest_text is None:
1148             # Nothing worked!
1149             raise arvados.errors.NotFoundError(
1150                 ("Failed to retrieve collection '{}' " +
1151                  "from either API server ({}) or Keep ({})."
1152                  ).format(
1153                     self._manifest_locator,
1154                     error_via_api,
1155                     error_via_keep))
1156         # populate
1157         self._baseline_manifest = self._manifest_text
1158         self._import_manifest(self._manifest_text)
1159
1160
1161     def _has_collection_uuid(self):
1162         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1163
1164     def __enter__(self):
1165         return self
1166
1167     def __exit__(self, exc_type, exc_value, traceback):
1168         """Support scoped auto-commit in a with: block."""
1169         if self._sync != SYNC_READONLY and self._has_collection_uuid():
1170             self.save()
1171         if self._block_manager is not None:
1172             self._block_manager.stop_threads()
1173
1174     @synchronized
1175     def clone(self, new_parent=None, readonly=False, new_config=None):
1176         if new_config is None:
1177             new_config = self._config
1178         if readonly:
1179             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1180         else:
1181             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1182
1183         newcollection._sync = None
1184         self._cloneinto(newcollection)
1185         newcollection._sync = SYNC_READONLY if readonly else SYNC_EXPLICIT
1186         return newcollection
1187
1188     @synchronized
1189     def api_response(self):
1190         """Returns information about this Collection fetched from the API server.
1191
1192         If the Collection exists in Keep but not the API server, currently
1193         returns None.  Future versions may provide a synthetic response.
1194
1195         """
1196         return self._api_response
1197
1198     def find_or_create(self, path, create_type):
1199         """See `SynchronizedCollectionBase.find_or_create`"""
1200         if path == ".":
1201             return self
1202         else:
1203             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1204
1205     def find(self, path):
1206         """See `SynchronizedCollectionBase.find`"""
1207         if path == ".":
1208             return self
1209         else:
1210             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1211
1212     def remove(self, path, recursive=False):
1213         """See `SynchronizedCollectionBase.remove`"""
1214         if path == ".":
1215             raise errors.ArgumentError("Cannot remove '.'")
1216         else:
1217             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1218
1219     @must_be_writable
1220     @synchronized
1221     @retry_method
1222     def save(self, merge=True, num_retries=None):
1223         """Commit pending buffer blocks to Keep, merge with remote record (if
1224         update=True), write the manifest to Keep, and update the collection
1225         record.
1226
1227         Will raise AssertionError if not associated with a collection record on
1228         the API server.  If you want to save a manifest to Keep only, see
1229         `save_new()`.
1230
1231         :update:
1232           Update and merge remote changes before saving.  Otherwise, any
1233           remote changes will be ignored and overwritten.
1234
1235         """
1236         if self.modified():
1237             if not self._has_collection_uuid():
1238                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
1239             self._my_block_manager().commit_all()
1240             if merge:
1241                 self.update()
1242             self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1243
1244             text = self.manifest_text(strip=False)
1245             self._api_response = self._my_api().collections().update(
1246                 uuid=self._manifest_locator,
1247                 body={'manifest_text': text}
1248                 ).execute(
1249                     num_retries=num_retries)
1250             self._manifest_text = self._api_response["manifest_text"]
1251             self.set_unmodified()
1252
1253
1254     @must_be_writable
1255     @synchronized
1256     @retry_method
1257     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1258         """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
1259         a new collection record (if create_collection_record True).
1260
1261         After creating a new collection record, this Collection object will be
1262         associated with the new record used by `save()`.
1263
1264         :name:
1265           The collection name.
1266
1267         :keep_only:
1268           Only save the manifest to keep, do not create a collection record.
1269
1270         :owner_uuid:
1271           the user, or project uuid that will own this collection.
1272           If None, defaults to the current user.
1273
1274         :ensure_unique_name:
1275           If True, ask the API server to rename the collection
1276           if it conflicts with a collection with the same name and owner.  If
1277           False, a name conflict will result in an error.
1278
1279         """
1280         self._my_block_manager().commit_all()
1281         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1282         text = self.manifest_text(strip=False)
1283
1284         if create_collection_record:
1285             if name is None:
1286                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1287
1288             body = {"manifest_text": text,
1289                     "name": name}
1290             if owner_uuid:
1291                 body["owner_uuid"] = owner_uuid
1292
1293             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1294             text = self._api_response["manifest_text"]
1295
1296             self._manifest_locator = self._api_response["uuid"]
1297
1298         self._manifest_text = text
1299         self.set_unmodified()
1300
1301     @synchronized
1302     def subscribe(self, callback):
1303         self.callbacks.append(callback)
1304
1305     @synchronized
1306     def unsubscribe(self, callback):
1307         self.callbacks.remove(callback)
1308
1309     @synchronized
1310     def notify(self, event, collection, name, item):
1311         for c in self.callbacks:
1312             c(event, collection, name, item)
1313
1314     @synchronized
1315     def _import_manifest(self, manifest_text):
1316         """Import a manifest into a `Collection`.
1317
1318         :manifest_text:
1319           The manifest text to import from.
1320
1321         """
1322         if len(self) > 0:
1323             raise ArgumentError("Can only import manifest into an empty collection")
1324
1325         save_sync = self.sync_mode()
1326         self._sync = None
1327
1328         STREAM_NAME = 0
1329         BLOCKS = 1
1330         SEGMENTS = 2
1331
1332         stream_name = None
1333         state = STREAM_NAME
1334
1335         for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1336             tok = n.group(1)
1337             sep = n.group(2)
1338
1339             if state == STREAM_NAME:
1340                 # starting a new stream
1341                 stream_name = tok.replace('\\040', ' ')
1342                 blocks = []
1343                 segments = []
1344                 streamoffset = 0L
1345                 state = BLOCKS
1346                 continue
1347
1348             if state == BLOCKS:
1349                 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1350                 if s:
1351                     blocksize = long(s.group(1))
1352                     blocks.append(Range(tok, streamoffset, blocksize))
1353                     streamoffset += blocksize
1354                 else:
1355                     state = SEGMENTS
1356
1357             if state == SEGMENTS:
1358                 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1359                 if s:
1360                     pos = long(s.group(1))
1361                     size = long(s.group(2))
1362                     name = s.group(3).replace('\\040', ' ')
1363                     filepath = os.path.join(stream_name, name)
1364                     f = self.find_or_create(filepath, FILE)
1365                     if isinstance(f, ArvadosFile):
1366                         f.add_segment(blocks, pos, size)
1367                     else:
1368                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1369                 else:
1370                     # error!
1371                     raise errors.SyntaxError("Invalid manifest format")
1372
1373             if sep == "\n":
1374                 stream_name = None
1375                 state = STREAM_NAME
1376
1377         self.set_unmodified()
1378         self._sync = save_sync
1379
1380
1381 class Subcollection(SynchronizedCollectionBase):
1382     """This is a subdirectory within a collection that doesn't have its own API
1383     server record.
1384
1385     It falls under the umbrella of the root collection.
1386
1387     """
1388
1389     def __init__(self, parent):
1390         super(Subcollection, self).__init__(parent)
1391         self.lock = self.root_collection().lock
1392         self._manifest_text = None
1393
1394     def root_collection(self):
1395         return self.parent.root_collection()
1396
1397     def sync_mode(self):
1398         return self.root_collection().sync_mode()
1399
1400     def _my_api(self):
1401         return self.root_collection()._my_api()
1402
1403     def _my_keep(self):
1404         return self.root_collection()._my_keep()
1405
1406     def _my_block_manager(self):
1407         return self.root_collection()._my_block_manager()
1408
1409     def notify(self, event, collection, name, item):
1410         return self.root_collection().notify(event, collection, name, item)
1411
1412     def stream_name(self):
1413         for k, v in self.parent.items():
1414             if v is self:
1415                 return os.path.join(self.parent.stream_name(), k)
1416         return '.'
1417
1418     @synchronized
1419     def clone(self, new_parent):
1420         c = Subcollection(new_parent)
1421         self._cloneinto(c)
1422         return c
1423
1424
1425 class CollectionReader(Collection):
1426     """A read-only collection object from an api collection record locator,
1427     a portable data hash of a manifest, or raw manifest text.
1428
1429     See `Collection` constructor for detailed options.
1430
1431     """
1432     def __init__(self, *args, **kwargs):
1433         if not args and not kwargs.get("manifest_locator_or_text"):
1434             raise errors.ArgumentError("Must provide manifest locator or text to initialize ReadOnlyCollection")
1435
1436         super(CollectionReader, self).__init__(*args, **kwargs)
1437
1438         # Forego any locking since it should never change once initialized.
1439         self.lock = NoopLock()
1440         self._sync = SYNC_READONLY
1441
1442         # Backwards compatability with old CollectionReader
1443         # all_streams() and all_files()
1444         self._streams = None
1445
1446     def _populate_streams(orig_func):
1447         @functools.wraps(orig_func)
1448         def populate_streams_wrapper(self, *args, **kwargs):
1449             # Defer populating self._streams until needed since it creates a copy of the manifest.
1450             if self._streams is None:
1451                 if self._manifest_text:
1452                     self._streams = [sline.split()
1453                                      for sline in self._manifest_text.split("\n")
1454                                      if sline]
1455                 else:
1456                     self._streams = []
1457             return orig_func(self, *args, **kwargs)
1458         return populate_streams_wrapper
1459
1460     @_populate_streams
1461     def normalize(self):
1462         """Normalize the streams returned by `all_streams`.
1463
1464         This method is kept for backwards compatability and only affects the
1465         behavior of `all_streams()` and `all_files()`
1466
1467         """
1468
1469         # Rearrange streams
1470         streams = {}
1471         for s in self.all_streams():
1472             for f in s.all_files():
1473                 streamname, filename = split(s.name() + "/" + f.name())
1474                 if streamname not in streams:
1475                     streams[streamname] = {}
1476                 if filename not in streams[streamname]:
1477                     streams[streamname][filename] = []
1478                 for r in f.segments:
1479                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1480
1481         self._streams = [normalize_stream(s, streams[s])
1482                          for s in sorted(streams)]
1483     @_populate_streams
1484     def all_streams(self):
1485         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1486                 for s in self._streams]
1487
1488     @_populate_streams
1489     def all_files(self):
1490         for s in self.all_streams():
1491             for f in s.all_files():
1492                 yield f