Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace")
311         self._current_stream_name = '.' if newstreamname=='' else newstreamname
312
313     def current_stream_name(self):
314         return self._current_stream_name
315
316     def finish_current_stream(self):
317         self.finish_current_file()
318         self.flush_data()
319         if not self._current_stream_files:
320             pass
321         elif self._current_stream_name is None:
322             raise errors.AssertionError(
323                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324                 (self._current_stream_length, len(self._current_stream_files)))
325         else:
326             if not self._current_stream_locators:
327                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328             self._finished_streams.append([self._current_stream_name,
329                                            self._current_stream_locators,
330                                            self._current_stream_files])
331         self._current_stream_files = []
332         self._current_stream_length = 0
333         self._current_stream_locators = []
334         self._current_stream_name = None
335         self._current_file_pos = 0
336         self._current_file_name = None
337
338     def finish(self):
339         """Store the manifest in Keep and return its locator.
340
341         This is useful for storing manifest fragments (task outputs)
342         temporarily in Keep during a Crunch job.
343
344         In other cases you should make a collection instead, by
345         sending manifest_text() to the API server's "create
346         collection" endpoint.
347         """
348         return self._my_keep().put(self.manifest_text(), copies=self.replication)
349
350     def portable_data_hash(self):
351         stripped = self.stripped_manifest()
352         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
353
354     def manifest_text(self):
355         self.finish_current_stream()
356         manifest = ''
357
358         for stream in self._finished_streams:
359             if not re.search(r'^\.(/.*)?$', stream[0]):
360                 manifest += './'
361             manifest += stream[0].replace(' ', '\\040')
362             manifest += ' ' + ' '.join(stream[1])
363             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
364             manifest += "\n"
365
366         return manifest
367
368     def data_locators(self):
369         ret = []
370         for name, locators, files in self._finished_streams:
371             ret += locators
372         return ret
373
374
375 class ResumableCollectionWriter(CollectionWriter):
376     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377                    '_current_stream_locators', '_current_stream_name',
378                    '_current_file_name', '_current_file_pos', '_close_file',
379                    '_data_buffer', '_dependencies', '_finished_streams',
380                    '_queued_dirents', '_queued_trees']
381
382     def __init__(self, api_client=None, **kwargs):
383         self._dependencies = {}
384         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
385
386     @classmethod
387     def from_state(cls, state, *init_args, **init_kwargs):
388         # Try to build a new writer from scratch with the given state.
389         # If the state is not suitable to resume (because files have changed,
390         # been deleted, aren't predictable, etc.), raise a
391         # StaleWriterStateError.  Otherwise, return the initialized writer.
392         # The caller is responsible for calling writer.do_queued_work()
393         # appropriately after it's returned.
394         writer = cls(*init_args, **init_kwargs)
395         for attr_name in cls.STATE_PROPS:
396             attr_value = state[attr_name]
397             attr_class = getattr(writer, attr_name).__class__
398             # Coerce the value into the same type as the initial value, if
399             # needed.
400             if attr_class not in (type(None), attr_value.__class__):
401                 attr_value = attr_class(attr_value)
402             setattr(writer, attr_name, attr_value)
403         # Check dependencies before we try to resume anything.
404         if any(KeepLocator(ls).permission_expired()
405                for ls in writer._current_stream_locators):
406             raise errors.StaleWriterStateError(
407                 "locators include expired permission hint")
408         writer.check_dependencies()
409         if state['_current_file'] is not None:
410             path, pos = state['_current_file']
411             try:
412                 writer._queued_file = open(path, 'rb')
413                 writer._queued_file.seek(pos)
414             except IOError as error:
415                 raise errors.StaleWriterStateError(
416                     "failed to reopen active file {}: {}".format(path, error))
417         return writer
418
419     def check_dependencies(self):
420         for path, orig_stat in self._dependencies.items():
421             if not S_ISREG(orig_stat[ST_MODE]):
422                 raise errors.StaleWriterStateError("{} not file".format(path))
423             try:
424                 now_stat = tuple(os.stat(path))
425             except OSError as error:
426                 raise errors.StaleWriterStateError(
427                     "failed to stat {}: {}".format(path, error))
428             if ((not S_ISREG(now_stat[ST_MODE])) or
429                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431                 raise errors.StaleWriterStateError("{} changed".format(path))
432
433     def dump_state(self, copy_func=lambda x: x):
434         state = {attr: copy_func(getattr(self, attr))
435                  for attr in self.STATE_PROPS}
436         if self._queued_file is None:
437             state['_current_file'] = None
438         else:
439             state['_current_file'] = (os.path.realpath(self._queued_file.name),
440                                       self._queued_file.tell())
441         return state
442
443     def _queue_file(self, source, filename=None):
444         try:
445             src_path = os.path.realpath(source)
446         except Exception:
447             raise errors.AssertionError("{} not a file path".format(source))
448         try:
449             path_stat = os.stat(src_path)
450         except OSError as stat_error:
451             path_stat = None
452         super(ResumableCollectionWriter, self)._queue_file(source, filename)
453         fd_stat = os.fstat(self._queued_file.fileno())
454         if not S_ISREG(fd_stat.st_mode):
455             # We won't be able to resume from this cache anyway, so don't
456             # worry about further checks.
457             self._dependencies[source] = tuple(fd_stat)
458         elif path_stat is None:
459             raise errors.AssertionError(
460                 "could not stat {}: {}".format(source, stat_error))
461         elif path_stat.st_ino != fd_stat.st_ino:
462             raise errors.AssertionError(
463                 "{} changed between open and stat calls".format(source))
464         else:
465             self._dependencies[src_path] = tuple(fd_stat)
466
467     def write(self, data):
468         if self._queued_file is None:
469             raise errors.AssertionError(
470                 "resumable writer can't accept unsourced data")
471         return super(ResumableCollectionWriter, self).write(data)
472
473 ADD = "add"
474 DEL = "del"
475 MOD = "mod"
476 FILE = "file"
477 COLLECTION = "collection"
478
479 class SynchronizedCollectionBase(CollectionBase):
480     """Base class for Collections and Subcollections.
481
482     Implements the majority of functionality relating to accessing items in the
483     Collection.
484
485     """
486
487     def __init__(self, parent=None):
488         self.parent = parent
489         self._modified = True
490         self._items = {}
491
492     def _my_api(self):
493         raise NotImplementedError()
494
495     def _my_keep(self):
496         raise NotImplementedError()
497
498     def _my_block_manager(self):
499         raise NotImplementedError()
500
501     def writable(self):
502         raise NotImplementedError()
503
504     def root_collection(self):
505         raise NotImplementedError()
506
507     def notify(self, event, collection, name, item):
508         raise NotImplementedError()
509
510     def stream_name(self):
511         raise NotImplementedError()
512
513     @must_be_writable
514     @synchronized
515     def find_or_create(self, path, create_type):
516         """Recursively search the specified file path.
517
518         May return either a `Collection` or `ArvadosFile`.  If not found, will
519         create a new item at the specified path based on `create_type`.  Will
520         create intermediate subcollections needed to contain the final item in
521         the path.
522
523         :create_type:
524           One of `arvados.collection.FILE` or
525           `arvados.collection.COLLECTION`.  If the path is not found, and value
526           of create_type is FILE then create and return a new ArvadosFile for
527           the last path component.  If COLLECTION, then create and return a new
528           Collection for the last path component.
529
530         """
531
532         pathcomponents = path.split("/", 1)
533         if pathcomponents[0]:
534             item = self._items.get(pathcomponents[0])
535             if len(pathcomponents) == 1:
536                 if item is None:
537                     # create new file
538                     if create_type == COLLECTION:
539                         item = Subcollection(self)
540                     else:
541                         item = ArvadosFile(self)
542                     self._items[pathcomponents[0]] = item
543                     self._modified = True
544                     self.notify(ADD, self, pathcomponents[0], item)
545                 return item
546             else:
547                 if item is None:
548                     # create new collection
549                     item = Subcollection(self)
550                     self._items[pathcomponents[0]] = item
551                     self._modified = True
552                     self.notify(ADD, self, pathcomponents[0], item)
553                 if isinstance(item, SynchronizedCollectionBase):
554                     return item.find_or_create(pathcomponents[1], create_type)
555                 else:
556                     raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
557         else:
558             return self
559
560     @synchronized
561     def find(self, path):
562         """Recursively search the specified file path.
563
564         May return either a Collection or ArvadosFile.  Return None if not
565         found.
566
567         """
568         if not path:
569             raise errors.ArgumentError("Parameter 'path' must not be empty.")
570
571         pathcomponents = path.split("/", 1)
572         item = self._items.get(pathcomponents[0])
573         if len(pathcomponents) == 1:
574             return item
575         else:
576             if isinstance(item, SynchronizedCollectionBase):
577                 if pathcomponents[1]:
578                     return item.find(pathcomponents[1])
579                 else:
580                     return item
581             else:
582                 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
583
584     def mkdirs(path):
585         """Recursive subcollection create.
586
587         Like `os.mkdirs()`.  Will create intermediate subcollections needed to
588         contain the leaf subcollection path.
589
590         """
591         return self.find_or_create(path, COLLECTION)
592
593     def open(self, path, mode="r"):
594         """Open a file-like object for access.
595
596         :path:
597           path to a file in the collection
598         :mode:
599           one of "r", "r+", "w", "w+", "a", "a+"
600           :"r":
601             opens for reading
602           :"r+":
603             opens for reading and writing.  Reads/writes share a file pointer.
604           :"w", "w+":
605             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
606           :"a", "a+":
607             opens for reading and writing.  All writes are appended to
608             the end of the file.  Writing does not affect the file pointer for
609             reading.
610         """
611         mode = mode.replace("b", "")
612         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
613             raise errors.ArgumentError("Bad mode '%s'" % mode)
614         create = (mode != "r")
615
616         if create and not self.writable():
617             raise IOError((errno.EROFS, "Collection is read only"))
618
619         if create:
620             arvfile = self.find_or_create(path, FILE)
621         else:
622             arvfile = self.find(path)
623
624         if arvfile is None:
625             raise IOError((errno.ENOENT, "File not found"))
626         if not isinstance(arvfile, ArvadosFile):
627             raise IOError((errno.EISDIR, "Path must refer to a file."))
628
629         if mode[0] == "w":
630             arvfile.truncate(0)
631
632         name = os.path.basename(path)
633
634         if mode == "r":
635             return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
636         else:
637             return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
638
639     @synchronized
640     def modified(self):
641         """Test if the collection (or any subcollection or file) has been modified."""
642         if self._modified:
643             return True
644         for k,v in self._items.items():
645             if v.modified():
646                 return True
647         return False
648
649     @synchronized
650     def set_unmodified(self):
651         """Recursively clear modified flag."""
652         self._modified = False
653         for k,v in self._items.items():
654             v.set_unmodified()
655
656     @synchronized
657     def __iter__(self):
658         """Iterate over names of files and collections contained in this collection."""
659         return iter(self._items.keys())
660
661     @synchronized
662     def __getitem__(self, k):
663         """Get a file or collection that is directly contained by this collection.
664
665         If you want to search a path, use `find()` instead.
666
667         """
668         return self._items[k]
669
670     @synchronized
671     def __contains__(self, k):
672         """Test if there is a file or collection a directly contained by this collection."""
673         return k in self._items
674
675     @synchronized
676     def __len__(self):
677         """Get the number of items directly contained in this collection."""
678         return len(self._items)
679
680     @must_be_writable
681     @synchronized
682     def __delitem__(self, p):
683         """Delete an item by name which is directly contained by this collection."""
684         del self._items[p]
685         self._modified = True
686         self.notify(DEL, self, p, None)
687
688     @synchronized
689     def keys(self):
690         """Get a list of names of files and collections directly contained in this collection."""
691         return self._items.keys()
692
693     @synchronized
694     def values(self):
695         """Get a list of files and collection objects directly contained in this collection."""
696         return self._items.values()
697
698     @synchronized
699     def items(self):
700         """Get a list of (name, object) tuples directly contained in this collection."""
701         return self._items.items()
702
703     def exists(self, path):
704         """Test if there is a file or collection at `path`."""
705         return self.find(path) != None
706
707     @must_be_writable
708     @synchronized
709     def remove(self, path, recursive=False):
710         """Remove the file or subcollection (directory) at `path`.
711
712         :recursive:
713           Specify whether to remove non-empty subcollections (True), or raise an error (False).
714         """
715
716         if not path:
717             raise errors.ArgumentError("Parameter 'path' must not be empty.")
718
719         pathcomponents = path.split("/", 1)
720         item = self._items.get(pathcomponents[0])
721         if item is None:
722             raise IOError((errno.ENOENT, "File not found"))
723         if len(pathcomponents) == 1:
724             if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
725                 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
726             deleteditem = self._items[pathcomponents[0]]
727             del self._items[pathcomponents[0]]
728             self._modified = True
729             self.notify(DEL, self, pathcomponents[0], deleteditem)
730         else:
731             item.remove(pathcomponents[1])
732
733     def _clonefrom(self, source):
734         for k,v in source.items():
735             self._items[k] = v.clone(self)
736
737     def clone(self):
738         raise NotImplementedError()
739
740     @must_be_writable
741     @synchronized
742     def copy(self, source, target_path, source_collection=None, overwrite=False):
743         """Copy a file or subcollection to a new path in this collection.
744
745         :source:
746           An ArvadosFile, Subcollection, or string with a path to source file or subcollection
747
748         :target_path:
749           Destination file or path.  If the target path already exists and is a
750           subcollection, the item will be placed inside the subcollection.  If
751           the target path already exists and is a file, this will raise an error
752           unless you specify `overwrite=True`.
753
754         :source_collection:
755           Collection to copy `source_path` from (default `self`)
756
757         :overwrite:
758           Whether to overwrite target file if it already exists.
759         """
760         if source_collection is None:
761             source_collection = self
762
763         # Find the object to copy
764         if isinstance(source, basestring):
765             source_obj = source_collection.find(source)
766             if source_obj is None:
767                 raise IOError((errno.ENOENT, "File not found"))
768             sourcecomponents = source.split("/")
769         else:
770             source_obj = source
771             sourcecomponents = None
772
773         # Find parent collection the target path
774         targetcomponents = target_path.split("/")
775
776         # Determine the name to use.
777         target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
778
779         if not target_name:
780             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
781
782         target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
783
784         if target_name in target_dir:
785             if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
786                 target_dir = target_dir[target_name]
787                 target_name = sourcecomponents[-1]
788             elif not overwrite:
789                 raise IOError((errno.EEXIST, "File already exists"))
790
791         modified_from = None
792         if target_name in target_dir:
793             modified_from = target_dir[target_name]
794
795         # Actually make the copy.
796         dup = source_obj.clone(target_dir)
797         target_dir._items[target_name] = dup
798         target_dir._modified = True
799
800         if modified_from:
801             self.notify(MOD, target_dir, target_name, (modified_from, dup))
802         else:
803             self.notify(ADD, target_dir, target_name, dup)
804
805     @synchronized
806     def manifest_text(self, stream_name=".", strip=False, normalize=False):
807         """Get the manifest text for this collection, sub collections and files.
808
809         :stream_name:
810           Name of the stream (directory)
811
812         :strip:
813           If True, remove signing tokens from block locators if present.
814           If False (default), block locators are left unchanged.
815
816         :normalize:
817           If True, always export the manifest text in normalized form
818           even if the Collection is not modified.  If False (default) and the collection
819           is not modified, return the original manifest text even if it is not
820           in normalized form.
821
822         """
823
824         if self.modified() or self._manifest_text is None or normalize:
825             item  = self
826             stream = {}
827             buf = []
828             sorted_keys = sorted(item.keys())
829             for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
830                 # Create a stream per file `k`
831                 arvfile = item[filename]
832                 filestream = []
833                 for segment in arvfile.segments():
834                     loc = segment.locator
835                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
836                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
837                     if strip:
838                         loc = KeepLocator(loc).stripped()
839                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
840                                          segment.segment_offset, segment.range_size))
841                 stream[filename] = filestream
842             if stream:
843                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
844             for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
845                 buf.append(item[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip))
846             return "".join(buf)
847         else:
848             if strip:
849                 return self.stripped_manifest()
850             else:
851                 return self._manifest_text
852
853     @synchronized
854     def diff(self, end_collection, prefix=".", holding_collection=None):
855         """Generate list of add/modify/delete actions.
856
857         When given to `apply`, will change `self` to match `end_collection`
858
859         """
860         changes = []
861         if holding_collection is None:
862             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
863         for k in self:
864             if k not in end_collection:
865                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
866         for k in end_collection:
867             if k in self:
868                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
869                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
870                 elif end_collection[k] != self[k]:
871                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
872             else:
873                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
874         return changes
875
876     @must_be_writable
877     @synchronized
878     def apply(self, changes):
879         """Apply changes from `diff`.
880
881         If a change conflicts with a local change, it will be saved to an
882         alternate path indicating the conflict.
883
884         """
885         for change in changes:
886             event_type = change[0]
887             path = change[1]
888             initial = change[2]
889             local = self.find(path)
890             conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
891                                                                     time.gmtime()))
892             if event_type == ADD:
893                 if local is None:
894                     # No local file at path, safe to copy over new file
895                     self.copy(initial, path)
896                 elif local is not None and local != initial:
897                     # There is already local file and it is different:
898                     # save change to conflict file.
899                     self.copy(initial, conflictpath)
900             elif event_type == MOD:
901                 final = change[3]
902                 if local == initial:
903                     # Local matches the "initial" item so it has not
904                     # changed locally and is safe to update.
905                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
906                         # Replace contents of local file with new contents
907                         local.replace_contents(final)
908                     else:
909                         # Overwrite path with new item; this can happen if
910                         # path was a file and is now a collection or vice versa
911                         self.copy(final, path, overwrite=True)
912                 else:
913                     # Local is missing (presumably deleted) or local doesn't
914                     # match the "start" value, so save change to conflict file
915                     self.copy(final, conflictpath)
916             elif event_type == DEL:
917                 if local == initial:
918                     # Local item matches "initial" value, so it is safe to remove.
919                     self.remove(path, recursive=True)
920                 # else, the file is modified or already removed, in either
921                 # case we don't want to try to remove it.
922
923     def portable_data_hash(self):
924         """Get the portable data hash for this collection's manifest."""
925         stripped = self.manifest_text(strip=True)
926         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
927
928     @synchronized
929     def __eq__(self, other):
930         if other is self:
931             return True
932         if not isinstance(other, SynchronizedCollectionBase):
933             return False
934         if len(self._items) != len(other):
935             return False
936         for k in self._items:
937             if k not in other:
938                 return False
939             if self._items[k] != other[k]:
940                 return False
941         return True
942
943     def __ne__(self, other):
944         return not self.__eq__(other)
945
946
947 class Collection(SynchronizedCollectionBase):
948     """Represents the root of an Arvados Collection.
949
950     This class is threadsafe.  The root collection object, all subcollections
951     and files are protected by a single lock (i.e. each access locks the entire
952     collection).
953
954     Brief summary of
955     useful methods:
956
957     :To read an existing file:
958       `c.open("myfile", "r")`
959
960     :To write a new file:
961       `c.open("myfile", "w")`
962
963     :To determine if a file exists:
964       `c.find("myfile") is not None`
965
966     :To copy a file:
967       `c.copy("source", "dest")`
968
969     :To delete a file:
970       `c.remove("myfile")`
971
972     :To save to an existing collection record:
973       `c.save()`
974
975     :To save a new collection record:
976     `c.save_new()`
977
978     :To merge remote changes into this object:
979       `c.update()`
980
981     Must be associated with an API server Collection record (during
982     initialization, or using `save_new`) to use `save` or `update`
983
984     """
985
986     def __init__(self, manifest_locator_or_text=None,
987                  api_client=None,
988                  keep_client=None,
989                  num_retries=None,
990                  parent=None,
991                  apiconfig=None,
992                  block_manager=None):
993         """Collection constructor.
994
995         :manifest_locator_or_text:
996           One of Arvados collection UUID, block locator of
997           a manifest, raw manifest text, or None (to create an empty collection).
998         :parent:
999           the parent Collection, may be None.
1000         :apiconfig:
1001           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1002           Prefer this over supplying your own api_client and keep_client (except in testing).
1003           Will use default config settings if not specified.
1004         :api_client:
1005           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1006         :keep_client:
1007           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1008         :num_retries:
1009           the number of retries for API and Keep requests.
1010         :block_manager:
1011           the block manager to use.  If not specified, create one.
1012
1013         """
1014         super(Collection, self).__init__(parent)
1015         self._api_client = api_client
1016         self._keep_client = keep_client
1017         self._block_manager = block_manager
1018
1019         if apiconfig:
1020             self._config = apiconfig
1021         else:
1022             self._config = config.settings()
1023
1024         self.num_retries = num_retries if num_retries is not None else 0
1025         self._manifest_locator = None
1026         self._manifest_text = None
1027         self._api_response = None
1028
1029         self.lock = threading.RLock()
1030         self.callbacks = []
1031         self.events = None
1032
1033         if manifest_locator_or_text:
1034             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1035                 self._manifest_locator = manifest_locator_or_text
1036             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1037                 self._manifest_locator = manifest_locator_or_text
1038             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1039                 self._manifest_text = manifest_locator_or_text
1040             else:
1041                 raise errors.ArgumentError(
1042                     "Argument to CollectionReader must be a manifest or a collection UUID")
1043
1044             try:
1045                 self._populate()
1046             except (IOError, errors.SyntaxError) as e:
1047                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1048
1049     def root_collection(self):
1050         return self
1051
1052     def stream_name(self):
1053         return "."
1054
1055     def writable(self):
1056         return True
1057
1058     @synchronized
1059     @retry_method
1060     def update(self, other=None, num_retries=None):
1061         """Merge the latest collection on the API server with the current collection."""
1062
1063         if other is None:
1064             if self._manifest_locator is None:
1065                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1066             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1067             other = CollectionReader(response["manifest_text"])
1068         baseline = CollectionReader(self._manifest_text)
1069         self.apply(baseline.diff(other))
1070
1071     @synchronized
1072     def _my_api(self):
1073         if self._api_client is None:
1074             self._api_client = ThreadSafeApiCache(self._config)
1075             self._keep_client = self._api_client.keep
1076         return self._api_client
1077
1078     @synchronized
1079     def _my_keep(self):
1080         if self._keep_client is None:
1081             if self._api_client is None:
1082                 self._my_api()
1083             else:
1084                 self._keep_client = KeepClient(api_client=self._api_client)
1085         return self._keep_client
1086
1087     @synchronized
1088     def _my_block_manager(self):
1089         if self._block_manager is None:
1090             self._block_manager = _BlockManager(self._my_keep())
1091         return self._block_manager
1092
1093     def _populate_from_api_server(self):
1094         # As in KeepClient itself, we must wait until the last
1095         # possible moment to instantiate an API client, in order to
1096         # avoid tripping up clients that don't have access to an API
1097         # server.  If we do build one, make sure our Keep client uses
1098         # it.  If instantiation fails, we'll fall back to the except
1099         # clause, just like any other Collection lookup
1100         # failure. Return an exception, or None if successful.
1101         try:
1102             self._api_response = self._my_api().collections().get(
1103                 uuid=self._manifest_locator).execute(
1104                     num_retries=self.num_retries)
1105             self._manifest_text = self._api_response['manifest_text']
1106             return None
1107         except Exception as e:
1108             return e
1109
1110     def _populate_from_keep(self):
1111         # Retrieve a manifest directly from Keep. This has a chance of
1112         # working if [a] the locator includes a permission signature
1113         # or [b] the Keep services are operating in world-readable
1114         # mode. Return an exception, or None if successful.
1115         try:
1116             self._manifest_text = self._my_keep().get(
1117                 self._manifest_locator, num_retries=self.num_retries)
1118         except Exception as e:
1119             return e
1120
1121     def _populate(self):
1122         if self._manifest_locator is None and self._manifest_text is None:
1123             return
1124         error_via_api = None
1125         error_via_keep = None
1126         should_try_keep = ((self._manifest_text is None) and
1127                            util.keep_locator_pattern.match(
1128                                self._manifest_locator))
1129         if ((self._manifest_text is None) and
1130             util.signed_locator_pattern.match(self._manifest_locator)):
1131             error_via_keep = self._populate_from_keep()
1132         if self._manifest_text is None:
1133             error_via_api = self._populate_from_api_server()
1134             if error_via_api is not None and not should_try_keep:
1135                 raise error_via_api
1136         if ((self._manifest_text is None) and
1137             not error_via_keep and
1138             should_try_keep):
1139             # Looks like a keep locator, and we didn't already try keep above
1140             error_via_keep = self._populate_from_keep()
1141         if self._manifest_text is None:
1142             # Nothing worked!
1143             raise errors.NotFoundError(
1144                 ("Failed to retrieve collection '{}' " +
1145                  "from either API server ({}) or Keep ({})."
1146                  ).format(
1147                     self._manifest_locator,
1148                     error_via_api,
1149                     error_via_keep))
1150         # populate
1151         self._baseline_manifest = self._manifest_text
1152         self._import_manifest(self._manifest_text)
1153
1154
1155     def _has_collection_uuid(self):
1156         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1157
1158     def __enter__(self):
1159         return self
1160
1161     def __exit__(self, exc_type, exc_value, traceback):
1162         """Support scoped auto-commit in a with: block."""
1163         if exc_type is not None:
1164             if self.writable() and self._has_collection_uuid():
1165                 self.save()
1166         if self._block_manager is not None:
1167             self._block_manager.stop_threads()
1168
1169     @synchronized
1170     def clone(self, new_parent=None, readonly=False, new_config=None):
1171         if new_config is None:
1172             new_config = self._config
1173         if readonly:
1174             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1175         else:
1176             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1177
1178         newcollection._clonefrom(self)
1179         return newcollection
1180
1181     @synchronized
1182     def api_response(self):
1183         """Returns information about this Collection fetched from the API server.
1184
1185         If the Collection exists in Keep but not the API server, currently
1186         returns None.  Future versions may provide a synthetic response.
1187
1188         """
1189         return self._api_response
1190
1191     def find_or_create(self, path, create_type):
1192         """See `SynchronizedCollectionBase.find_or_create`"""
1193         if path == ".":
1194             return self
1195         else:
1196             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1197
1198     def find(self, path):
1199         """See `SynchronizedCollectionBase.find`"""
1200         if path == ".":
1201             return self
1202         else:
1203             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1204
1205     def remove(self, path, recursive=False):
1206         """See `SynchronizedCollectionBase.remove`"""
1207         if path == ".":
1208             raise errors.ArgumentError("Cannot remove '.'")
1209         else:
1210             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1211
1212     @must_be_writable
1213     @synchronized
1214     @retry_method
1215     def save(self, merge=True, num_retries=None):
1216         """Save collection to an existing collection record.
1217
1218         Commit pending buffer blocks to Keep, merge with remote record (if
1219         update=True), write the manifest to Keep, and update the collection
1220         record.
1221
1222         Will raise AssertionError if not associated with a collection record on
1223         the API server.  If you want to save a manifest to Keep only, see
1224         `save_new()`.
1225
1226         :update:
1227           Update and merge remote changes before saving.  Otherwise, any
1228           remote changes will be ignored and overwritten.
1229
1230         """
1231         if self.modified():
1232             if not self._has_collection_uuid():
1233                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_new() for new collections.")
1234             self._my_block_manager().commit_all()
1235             if merge:
1236                 self.update()
1237             self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1238
1239             text = self.manifest_text(strip=False)
1240             self._api_response = self._my_api().collections().update(
1241                 uuid=self._manifest_locator,
1242                 body={'manifest_text': text}
1243                 ).execute(
1244                     num_retries=num_retries)
1245             self._manifest_text = self._api_response["manifest_text"]
1246             self.set_unmodified()
1247
1248
1249     @must_be_writable
1250     @synchronized
1251     @retry_method
1252     def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1253         """Save collection to a new collection record.
1254
1255         Commit pending buffer blocks to Keep, write the manifest to Keep, and
1256         create a new collection record (if create_collection_record True).
1257         After creating a new collection record, this Collection object will be
1258         associated with the new record used by `save()`.
1259
1260         :name:
1261           The collection name.
1262
1263         :keep_only:
1264           Only save the manifest to keep, do not create a collection record.
1265
1266         :owner_uuid:
1267           the user, or project uuid that will own this collection.
1268           If None, defaults to the current user.
1269
1270         :ensure_unique_name:
1271           If True, ask the API server to rename the collection
1272           if it conflicts with a collection with the same name and owner.  If
1273           False, a name conflict will result in an error.
1274
1275         """
1276         self._my_block_manager().commit_all()
1277         self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1278         text = self.manifest_text(strip=False)
1279
1280         if create_collection_record:
1281             if name is None:
1282                 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1283
1284             body = {"manifest_text": text,
1285                     "name": name}
1286             if owner_uuid:
1287                 body["owner_uuid"] = owner_uuid
1288
1289             self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1290             text = self._api_response["manifest_text"]
1291
1292             self._manifest_locator = self._api_response["uuid"]
1293
1294         self._manifest_text = text
1295         self.set_unmodified()
1296
1297     @synchronized
1298     def subscribe(self, callback):
1299         self.callbacks.append(callback)
1300
1301     @synchronized
1302     def unsubscribe(self, callback):
1303         self.callbacks.remove(callback)
1304
1305     @synchronized
1306     def notify(self, event, collection, name, item):
1307         for c in self.callbacks:
1308             c(event, collection, name, item)
1309
1310     @synchronized
1311     def _import_manifest(self, manifest_text):
1312         """Import a manifest into a `Collection`.
1313
1314         :manifest_text:
1315           The manifest text to import from.
1316
1317         """
1318         if len(self) > 0:
1319             raise ArgumentError("Can only import manifest into an empty collection")
1320
1321         STREAM_NAME = 0
1322         BLOCKS = 1
1323         SEGMENTS = 2
1324
1325         stream_name = None
1326         state = STREAM_NAME
1327
1328         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1329             tok = token_and_separator.group(1)
1330             sep = token_and_separator.group(2)
1331
1332             if state == STREAM_NAME:
1333                 # starting a new stream
1334                 stream_name = tok.replace('\\040', ' ')
1335                 blocks = []
1336                 segments = []
1337                 streamoffset = 0L
1338                 state = BLOCKS
1339                 continue
1340
1341             if state == BLOCKS:
1342                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1343                 if block_locator:
1344                     blocksize = long(block_locator.group(1))
1345                     blocks.append(Range(tok, streamoffset, blocksize))
1346                     streamoffset += blocksize
1347                 else:
1348                     state = SEGMENTS
1349
1350             if state == SEGMENTS:
1351                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1352                 if file_segment:
1353                     pos = long(file_segment.group(1))
1354                     size = long(file_segment.group(2))
1355                     name = file_segment.group(3).replace('\\040', ' ')
1356                     filepath = os.path.join(stream_name, name)
1357                     afile = self.find_or_create(filepath, FILE)
1358                     if isinstance(afile, ArvadosFile):
1359                         afile.add_segment(blocks, pos, size)
1360                     else:
1361                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1362                 else:
1363                     # error!
1364                     raise errors.SyntaxError("Invalid manifest format")
1365
1366             if sep == "\n":
1367                 stream_name = None
1368                 state = STREAM_NAME
1369
1370         self.set_unmodified()
1371
1372
1373 class Subcollection(SynchronizedCollectionBase):
1374     """This is a subdirectory within a collection that doesn't have its own API
1375     server record.
1376
1377     It falls under the umbrella of the root collection.
1378
1379     """
1380
1381     def __init__(self, parent):
1382         super(Subcollection, self).__init__(parent)
1383         self.lock = self.root_collection().lock
1384         self._manifest_text = None
1385
1386     def root_collection(self):
1387         return self.parent.root_collection()
1388
1389     def writable(self):
1390         return self.root_collection().writable()
1391
1392     def _my_api(self):
1393         return self.root_collection()._my_api()
1394
1395     def _my_keep(self):
1396         return self.root_collection()._my_keep()
1397
1398     def _my_block_manager(self):
1399         return self.root_collection()._my_block_manager()
1400
1401     def notify(self, event, collection, name, item):
1402         return self.root_collection().notify(event, collection, name, item)
1403
1404     def stream_name(self):
1405         for k, v in self.parent.items():
1406             if v is self:
1407                 return os.path.join(self.parent.stream_name(), k)
1408         return '.'
1409
1410     @synchronized
1411     def clone(self, new_parent):
1412         c = Subcollection(new_parent)
1413         c._clonefrom(self)
1414         return c
1415
1416
1417 class CollectionReader(Collection):
1418     """A read-only collection object.
1419
1420     Initialize from an api collection record locator, a portable data hash of a
1421     manifest, or raw manifest text.  See `Collection` constructor for detailed
1422     options.
1423
1424     """
1425     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1426         self._in_init = True
1427         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1428         self._in_init = False
1429
1430         # Forego any locking since it should never change once initialized.
1431         self.lock = NoopLock()
1432
1433         # Backwards compatability with old CollectionReader
1434         # all_streams() and all_files()
1435         self._streams = None
1436
1437     def writable(self):
1438         return self._in_init
1439
1440     def _populate_streams(orig_func):
1441         @functools.wraps(orig_func)
1442         def populate_streams_wrapper(self, *args, **kwargs):
1443             # Defer populating self._streams until needed since it creates a copy of the manifest.
1444             if self._streams is None:
1445                 if self._manifest_text:
1446                     self._streams = [sline.split()
1447                                      for sline in self._manifest_text.split("\n")
1448                                      if sline]
1449                 else:
1450                     self._streams = []
1451             return orig_func(self, *args, **kwargs)
1452         return populate_streams_wrapper
1453
1454     @_populate_streams
1455     def normalize(self):
1456         """Normalize the streams returned by `all_streams`.
1457
1458         This method is kept for backwards compatability and only affects the
1459         behavior of `all_streams()` and `all_files()`
1460
1461         """
1462
1463         # Rearrange streams
1464         streams = {}
1465         for s in self.all_streams():
1466             for f in s.all_files():
1467                 streamname, filename = split(s.name() + "/" + f.name())
1468                 if streamname not in streams:
1469                     streams[streamname] = {}
1470                 if filename not in streams[streamname]:
1471                     streams[streamname][filename] = []
1472                 for r in f.segments:
1473                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1474
1475         self._streams = [normalize_stream(s, streams[s])
1476                          for s in sorted(streams)]
1477     @_populate_streams
1478     def all_streams(self):
1479         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1480                 for s in self._streams]
1481
1482     @_populate_streams
1483     def all_files(self):
1484         for s in self.all_streams():
1485             for f in s.all_files():
1486                 yield f