1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
21 from collections import deque
24 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
25 from .keep import KeepLocator, KeepClient
26 from .stream import StreamReader
27 from ._normalize_stream import normalize_stream
28 from ._ranges import Range, LocatorAndRange
29 from .safeapi import ThreadSafeApiCache
30 import arvados.config as config
31 import arvados.errors as errors
33 import arvados.events as events
34 from arvados.retry import retry_method
36 _logger = logging.getLogger('arvados.collection')
38 class CollectionBase(object):
39 """Abstract base class for Collection classes."""
44 def __exit__(self, exc_type, exc_value, traceback):
48 if self._keep_client is None:
49 self._keep_client = KeepClient(api_client=self._api_client,
50 num_retries=self.num_retries)
51 return self._keep_client
53 def stripped_manifest(self):
54 """Get the manifest with locator hints stripped.
56 Return the manifest for the current collection with all
57 non-portable hints (i.e., permission signatures and other
58 hints other than size hints) removed from the locators.
60 raw = self.manifest_text()
62 for line in raw.split("\n"):
65 clean_fields = fields[:1] + [
66 (re.sub(r'\+[^\d][^\+]*', '', x)
67 if re.match(arvados.util.keep_locator_pattern, x)
70 clean += [' '.join(clean_fields), "\n"]
74 class _WriterFile(_FileLikeObjectBase):
75 def __init__(self, coll_writer, name):
76 super(_WriterFile, self).__init__(name, 'wb')
77 self.dest = coll_writer
80 super(_WriterFile, self).close()
81 self.dest.finish_current_file()
83 @_FileLikeObjectBase._before_close
84 def write(self, data):
87 @_FileLikeObjectBase._before_close
88 def writelines(self, seq):
92 @_FileLikeObjectBase._before_close
94 self.dest.flush_data()
97 class CollectionWriter(CollectionBase):
98 """Deprecated, use Collection instead."""
100 def __init__(self, api_client=None, num_retries=0, replication=None):
101 """Instantiate a CollectionWriter.
103 CollectionWriter lets you build a new Arvados Collection from scratch.
104 Write files to it. The CollectionWriter will upload data to Keep as
105 appropriate, and provide you with the Collection manifest text when
109 * api_client: The API client to use to look up Collections. If not
110 provided, CollectionReader will build one from available Arvados
112 * num_retries: The default number of times to retry failed
113 service requests. Default 0. You may change this value
114 after instantiation, but note those changes may not
115 propagate to related objects like the Keep client.
116 * replication: The number of copies of each block to store.
117 If this argument is None or not supplied, replication is
118 the server-provided default if available, otherwise 2.
120 self._api_client = api_client
121 self.num_retries = num_retries
122 self.replication = (2 if replication is None else replication)
123 self._keep_client = None
124 self._data_buffer = []
125 self._data_buffer_len = 0
126 self._current_stream_files = []
127 self._current_stream_length = 0
128 self._current_stream_locators = []
129 self._current_stream_name = '.'
130 self._current_file_name = None
131 self._current_file_pos = 0
132 self._finished_streams = []
133 self._close_file = None
134 self._queued_file = None
135 self._queued_dirents = deque()
136 self._queued_trees = deque()
137 self._last_open = None
139 def __exit__(self, exc_type, exc_value, traceback):
143 def do_queued_work(self):
144 # The work queue consists of three pieces:
145 # * _queued_file: The file object we're currently writing to the
147 # * _queued_dirents: Entries under the current directory
148 # (_queued_trees[0]) that we want to write or recurse through.
149 # This may contain files from subdirectories if
150 # max_manifest_depth == 0 for this directory.
151 # * _queued_trees: Directories that should be written as separate
152 # streams to the Collection.
153 # This function handles the smallest piece of work currently queued
154 # (current file, then current directory, then next directory) until
155 # no work remains. The _work_THING methods each do a unit of work on
156 # THING. _queue_THING methods add a THING to the work queue.
158 if self._queued_file:
160 elif self._queued_dirents:
162 elif self._queued_trees:
167 def _work_file(self):
169 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
173 self.finish_current_file()
175 self._queued_file.close()
176 self._close_file = None
177 self._queued_file = None
179 def _work_dirents(self):
180 path, stream_name, max_manifest_depth = self._queued_trees[0]
181 if stream_name != self.current_stream_name():
182 self.start_new_stream(stream_name)
183 while self._queued_dirents:
184 dirent = self._queued_dirents.popleft()
185 target = os.path.join(path, dirent)
186 if os.path.isdir(target):
187 self._queue_tree(target,
188 os.path.join(stream_name, dirent),
189 max_manifest_depth - 1)
191 self._queue_file(target, dirent)
193 if not self._queued_dirents:
194 self._queued_trees.popleft()
196 def _work_trees(self):
197 path, stream_name, max_manifest_depth = self._queued_trees[0]
198 d = arvados.util.listdir_recursive(
199 path, max_depth = (None if max_manifest_depth == 0 else 0))
201 self._queue_dirents(stream_name, d)
203 self._queued_trees.popleft()
205 def _queue_file(self, source, filename=None):
206 assert (self._queued_file is None), "tried to queue more than one file"
207 if not hasattr(source, 'read'):
208 source = open(source, 'rb')
209 self._close_file = True
211 self._close_file = False
213 filename = os.path.basename(source.name)
214 self.start_new_file(filename)
215 self._queued_file = source
217 def _queue_dirents(self, stream_name, dirents):
218 assert (not self._queued_dirents), "tried to queue more than one tree"
219 self._queued_dirents = deque(sorted(dirents))
221 def _queue_tree(self, path, stream_name, max_manifest_depth):
222 self._queued_trees.append((path, stream_name, max_manifest_depth))
224 def write_file(self, source, filename=None):
225 self._queue_file(source, filename)
226 self.do_queued_work()
228 def write_directory_tree(self,
229 path, stream_name='.', max_manifest_depth=-1):
230 self._queue_tree(path, stream_name, max_manifest_depth)
231 self.do_queued_work()
233 def write(self, newdata):
234 if isinstance(newdata, bytes):
236 elif isinstance(newdata, str):
237 newdata = newdata.encode()
238 elif hasattr(newdata, '__iter__'):
242 self._data_buffer.append(newdata)
243 self._data_buffer_len += len(newdata)
244 self._current_stream_length += len(newdata)
245 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
248 def open(self, streampath, filename=None):
249 """open(streampath[, filename]) -> file-like object
251 Pass in the path of a file to write to the Collection, either as a
252 single string or as two separate stream name and file name arguments.
253 This method returns a file-like object you can write to add it to the
256 You may only have one file object from the Collection open at a time,
257 so be sure to close the object when you're done. Using the object in
258 a with statement makes that easy::
260 with cwriter.open('./doc/page1.txt') as outfile:
261 outfile.write(page1_data)
262 with cwriter.open('./doc/page2.txt') as outfile:
263 outfile.write(page2_data)
266 streampath, filename = split(streampath)
267 if self._last_open and not self._last_open.closed:
268 raise errors.AssertionError(
269 "can't open '{}' when '{}' is still open".format(
270 filename, self._last_open.name))
271 if streampath != self.current_stream_name():
272 self.start_new_stream(streampath)
273 self.set_current_file_name(filename)
274 self._last_open = _WriterFile(self, filename)
275 return self._last_open
277 def flush_data(self):
278 data_buffer = b''.join(self._data_buffer)
280 self._current_stream_locators.append(
282 data_buffer[0:config.KEEP_BLOCK_SIZE],
283 copies=self.replication))
284 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
285 self._data_buffer_len = len(self._data_buffer[0])
287 def start_new_file(self, newfilename=None):
288 self.finish_current_file()
289 self.set_current_file_name(newfilename)
291 def set_current_file_name(self, newfilename):
292 if re.search(r'[\t\n]', newfilename):
293 raise errors.AssertionError(
294 "Manifest filenames cannot contain whitespace: %s" %
296 elif re.search(r'\x00', newfilename):
297 raise errors.AssertionError(
298 "Manifest filenames cannot contain NUL characters: %s" %
300 self._current_file_name = newfilename
302 def current_file_name(self):
303 return self._current_file_name
305 def finish_current_file(self):
306 if self._current_file_name is None:
307 if self._current_file_pos == self._current_stream_length:
309 raise errors.AssertionError(
310 "Cannot finish an unnamed file " +
311 "(%d bytes at offset %d in '%s' stream)" %
312 (self._current_stream_length - self._current_file_pos,
313 self._current_file_pos,
314 self._current_stream_name))
315 self._current_stream_files.append([
316 self._current_file_pos,
317 self._current_stream_length - self._current_file_pos,
318 self._current_file_name])
319 self._current_file_pos = self._current_stream_length
320 self._current_file_name = None
322 def start_new_stream(self, newstreamname='.'):
323 self.finish_current_stream()
324 self.set_current_stream_name(newstreamname)
326 def set_current_stream_name(self, newstreamname):
327 if re.search(r'[\t\n]', newstreamname):
328 raise errors.AssertionError(
329 "Manifest stream names cannot contain whitespace: '%s'" %
331 self._current_stream_name = '.' if newstreamname=='' else newstreamname
333 def current_stream_name(self):
334 return self._current_stream_name
336 def finish_current_stream(self):
337 self.finish_current_file()
339 if not self._current_stream_files:
341 elif self._current_stream_name is None:
342 raise errors.AssertionError(
343 "Cannot finish an unnamed stream (%d bytes in %d files)" %
344 (self._current_stream_length, len(self._current_stream_files)))
346 if not self._current_stream_locators:
347 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
348 self._finished_streams.append([self._current_stream_name,
349 self._current_stream_locators,
350 self._current_stream_files])
351 self._current_stream_files = []
352 self._current_stream_length = 0
353 self._current_stream_locators = []
354 self._current_stream_name = None
355 self._current_file_pos = 0
356 self._current_file_name = None
359 """Store the manifest in Keep and return its locator.
361 This is useful for storing manifest fragments (task outputs)
362 temporarily in Keep during a Crunch job.
364 In other cases you should make a collection instead, by
365 sending manifest_text() to the API server's "create
366 collection" endpoint.
368 return self._my_keep().put(self.manifest_text().encode(),
369 copies=self.replication)
371 def portable_data_hash(self):
372 stripped = self.stripped_manifest().encode()
373 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
375 def manifest_text(self):
376 self.finish_current_stream()
379 for stream in self._finished_streams:
380 if not re.search(r'^\.(/.*)?$', stream[0]):
382 manifest += stream[0].replace(' ', '\\040')
383 manifest += ' ' + ' '.join(stream[1])
384 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
389 def data_locators(self):
391 for name, locators, files in self._finished_streams:
395 def save_new(self, name=None):
396 return self._api_client.collections().create(
397 ensure_unique_name=True,
400 'manifest_text': self.manifest_text(),
401 }).execute(num_retries=self.num_retries)
404 class ResumableCollectionWriter(CollectionWriter):
405 """Deprecated, use Collection instead."""
407 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
408 '_current_stream_locators', '_current_stream_name',
409 '_current_file_name', '_current_file_pos', '_close_file',
410 '_data_buffer', '_dependencies', '_finished_streams',
411 '_queued_dirents', '_queued_trees']
413 def __init__(self, api_client=None, **kwargs):
414 self._dependencies = {}
415 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
418 def from_state(cls, state, *init_args, **init_kwargs):
419 # Try to build a new writer from scratch with the given state.
420 # If the state is not suitable to resume (because files have changed,
421 # been deleted, aren't predictable, etc.), raise a
422 # StaleWriterStateError. Otherwise, return the initialized writer.
423 # The caller is responsible for calling writer.do_queued_work()
424 # appropriately after it's returned.
425 writer = cls(*init_args, **init_kwargs)
426 for attr_name in cls.STATE_PROPS:
427 attr_value = state[attr_name]
428 attr_class = getattr(writer, attr_name).__class__
429 # Coerce the value into the same type as the initial value, if
431 if attr_class not in (type(None), attr_value.__class__):
432 attr_value = attr_class(attr_value)
433 setattr(writer, attr_name, attr_value)
434 # Check dependencies before we try to resume anything.
435 if any(KeepLocator(ls).permission_expired()
436 for ls in writer._current_stream_locators):
437 raise errors.StaleWriterStateError(
438 "locators include expired permission hint")
439 writer.check_dependencies()
440 if state['_current_file'] is not None:
441 path, pos = state['_current_file']
443 writer._queued_file = open(path, 'rb')
444 writer._queued_file.seek(pos)
445 except IOError as error:
446 raise errors.StaleWriterStateError(
447 "failed to reopen active file {}: {}".format(path, error))
450 def check_dependencies(self):
451 for path, orig_stat in listitems(self._dependencies):
452 if not S_ISREG(orig_stat[ST_MODE]):
453 raise errors.StaleWriterStateError("{} not file".format(path))
455 now_stat = tuple(os.stat(path))
456 except OSError as error:
457 raise errors.StaleWriterStateError(
458 "failed to stat {}: {}".format(path, error))
459 if ((not S_ISREG(now_stat[ST_MODE])) or
460 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
461 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
462 raise errors.StaleWriterStateError("{} changed".format(path))
464 def dump_state(self, copy_func=lambda x: x):
465 state = {attr: copy_func(getattr(self, attr))
466 for attr in self.STATE_PROPS}
467 if self._queued_file is None:
468 state['_current_file'] = None
470 state['_current_file'] = (os.path.realpath(self._queued_file.name),
471 self._queued_file.tell())
474 def _queue_file(self, source, filename=None):
476 src_path = os.path.realpath(source)
478 raise errors.AssertionError("{} not a file path".format(source))
480 path_stat = os.stat(src_path)
481 except OSError as stat_error:
483 super(ResumableCollectionWriter, self)._queue_file(source, filename)
484 fd_stat = os.fstat(self._queued_file.fileno())
485 if not S_ISREG(fd_stat.st_mode):
486 # We won't be able to resume from this cache anyway, so don't
487 # worry about further checks.
488 self._dependencies[source] = tuple(fd_stat)
489 elif path_stat is None:
490 raise errors.AssertionError(
491 "could not stat {}: {}".format(source, stat_error))
492 elif path_stat.st_ino != fd_stat.st_ino:
493 raise errors.AssertionError(
494 "{} changed between open and stat calls".format(source))
496 self._dependencies[src_path] = tuple(fd_stat)
498 def write(self, data):
499 if self._queued_file is None:
500 raise errors.AssertionError(
501 "resumable writer can't accept unsourced data")
502 return super(ResumableCollectionWriter, self).write(data)
510 COLLECTION = "collection"
512 class RichCollectionBase(CollectionBase):
513 """Base class for Collections and Subcollections.
515 Implements the majority of functionality relating to accessing items in the
520 def __init__(self, parent=None):
522 self._committed = False
523 self._callback = None
527 raise NotImplementedError()
530 raise NotImplementedError()
532 def _my_block_manager(self):
533 raise NotImplementedError()
536 raise NotImplementedError()
538 def root_collection(self):
539 raise NotImplementedError()
541 def notify(self, event, collection, name, item):
542 raise NotImplementedError()
544 def stream_name(self):
545 raise NotImplementedError()
549 def find_or_create(self, path, create_type):
550 """Recursively search the specified file path.
552 May return either a `Collection` or `ArvadosFile`. If not found, will
553 create a new item at the specified path based on `create_type`. Will
554 create intermediate subcollections needed to contain the final item in
558 One of `arvados.collection.FILE` or
559 `arvados.collection.COLLECTION`. If the path is not found, and value
560 of create_type is FILE then create and return a new ArvadosFile for
561 the last path component. If COLLECTION, then create and return a new
562 Collection for the last path component.
566 pathcomponents = path.split("/", 1)
567 if pathcomponents[0]:
568 item = self._items.get(pathcomponents[0])
569 if len(pathcomponents) == 1:
572 if create_type == COLLECTION:
573 item = Subcollection(self, pathcomponents[0])
575 item = ArvadosFile(self, pathcomponents[0])
576 self._items[pathcomponents[0]] = item
577 self.set_committed(False)
578 self.notify(ADD, self, pathcomponents[0], item)
582 # create new collection
583 item = Subcollection(self, pathcomponents[0])
584 self._items[pathcomponents[0]] = item
585 self.set_committed(False)
586 self.notify(ADD, self, pathcomponents[0], item)
587 if isinstance(item, RichCollectionBase):
588 return item.find_or_create(pathcomponents[1], create_type)
590 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
595 def find(self, path):
596 """Recursively search the specified file path.
598 May return either a Collection or ArvadosFile. Return None if not
600 If path is invalid (ex: starts with '/'), an IOError exception will be
605 raise errors.ArgumentError("Parameter 'path' is empty.")
607 pathcomponents = path.split("/", 1)
608 if pathcomponents[0] == '':
609 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
611 item = self._items.get(pathcomponents[0])
614 elif len(pathcomponents) == 1:
617 if isinstance(item, RichCollectionBase):
618 if pathcomponents[1]:
619 return item.find(pathcomponents[1])
623 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
626 def mkdirs(self, path):
627 """Recursive subcollection create.
629 Like `os.makedirs()`. Will create intermediate subcollections needed
630 to contain the leaf subcollection path.
634 if self.find(path) != None:
635 raise IOError(errno.EEXIST, "Directory or file exists", path)
637 return self.find_or_create(path, COLLECTION)
639 def open(self, path, mode="r"):
640 """Open a file-like object for access.
643 path to a file in the collection
645 a string consisting of "r", "w", or "a", optionally followed
646 by "b" or "t", optionally followed by "+".
648 binary mode: write() accepts bytes, read() returns bytes.
650 text mode (default): write() accepts strings, read() returns strings.
654 opens for reading and writing. Reads/writes share a file pointer.
656 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
658 opens for reading and writing. All writes are appended to
659 the end of the file. Writing does not affect the file pointer for
663 if not re.search(r'^[rwa][bt]?\+?$', mode):
664 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
666 if mode[0] == 'r' and '+' not in mode:
667 fclass = ArvadosFileReader
668 arvfile = self.find(path)
669 elif not self.writable():
670 raise IOError(errno.EROFS, "Collection is read only")
672 fclass = ArvadosFileWriter
673 arvfile = self.find_or_create(path, FILE)
676 raise IOError(errno.ENOENT, "File not found", path)
677 if not isinstance(arvfile, ArvadosFile):
678 raise IOError(errno.EISDIR, "Is a directory", path)
683 return fclass(arvfile, mode=mode, num_retries=self.num_retries)
686 """Determine if the collection has been modified since last commited."""
687 return not self.committed()
691 """Determine if the collection has been committed to the API server."""
692 return self._committed
695 def set_committed(self, value=True):
696 """Recursively set committed flag.
698 If value is True, set committed to be True for this and all children.
700 If value is False, set committed to be False for this and all parents.
702 if value == self._committed:
705 for k,v in listitems(self._items):
706 v.set_committed(True)
707 self._committed = True
709 self._committed = False
710 if self.parent is not None:
711 self.parent.set_committed(False)
715 """Iterate over names of files and collections contained in this collection."""
716 return iter(viewkeys(self._items))
719 def __getitem__(self, k):
720 """Get a file or collection that is directly contained by this collection.
722 If you want to search a path, use `find()` instead.
725 return self._items[k]
728 def __contains__(self, k):
729 """Test if there is a file or collection a directly contained by this collection."""
730 return k in self._items
734 """Get the number of items directly contained in this collection."""
735 return len(self._items)
739 def __delitem__(self, p):
740 """Delete an item by name which is directly contained by this collection."""
742 self.set_committed(False)
743 self.notify(DEL, self, p, None)
747 """Get a list of names of files and collections directly contained in this collection."""
748 return self._items.keys()
752 """Get a list of files and collection objects directly contained in this collection."""
753 return listvalues(self._items)
757 """Get a list of (name, object) tuples directly contained in this collection."""
758 return listitems(self._items)
760 def exists(self, path):
761 """Test if there is a file or collection at `path`."""
762 return self.find(path) is not None
766 def remove(self, path, recursive=False):
767 """Remove the file or subcollection (directory) at `path`.
770 Specify whether to remove non-empty subcollections (True), or raise an error (False).
774 raise errors.ArgumentError("Parameter 'path' is empty.")
776 pathcomponents = path.split("/", 1)
777 item = self._items.get(pathcomponents[0])
779 raise IOError(errno.ENOENT, "File not found", path)
780 if len(pathcomponents) == 1:
781 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
782 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
783 deleteditem = self._items[pathcomponents[0]]
784 del self._items[pathcomponents[0]]
785 self.set_committed(False)
786 self.notify(DEL, self, pathcomponents[0], deleteditem)
788 item.remove(pathcomponents[1])
790 def _clonefrom(self, source):
791 for k,v in listitems(source):
792 self._items[k] = v.clone(self, k)
795 raise NotImplementedError()
799 def add(self, source_obj, target_name, overwrite=False, reparent=False):
800 """Copy or move a file or subcollection to this collection.
803 An ArvadosFile, or Subcollection object
806 Destination item name. If the target name already exists and is a
807 file, this will raise an error unless you specify `overwrite=True`.
810 Whether to overwrite target file if it already exists.
813 If True, source_obj will be moved from its parent collection to this collection.
814 If False, source_obj will be copied and the parent collection will be
819 if target_name in self and not overwrite:
820 raise IOError(errno.EEXIST, "File already exists", target_name)
823 if target_name in self:
824 modified_from = self[target_name]
826 # Actually make the move or copy.
828 source_obj._reparent(self, target_name)
831 item = source_obj.clone(self, target_name)
833 self._items[target_name] = item
834 self.set_committed(False)
837 self.notify(MOD, self, target_name, (modified_from, item))
839 self.notify(ADD, self, target_name, item)
841 def _get_src_target(self, source, target_path, source_collection, create_dest):
842 if source_collection is None:
843 source_collection = self
846 if isinstance(source, basestring):
847 source_obj = source_collection.find(source)
848 if source_obj is None:
849 raise IOError(errno.ENOENT, "File not found", source)
850 sourcecomponents = source.split("/")
853 sourcecomponents = None
855 # Find parent collection the target path
856 targetcomponents = target_path.split("/")
858 # Determine the name to use.
859 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
862 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
865 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
867 if len(targetcomponents) > 1:
868 target_dir = self.find("/".join(targetcomponents[0:-1]))
872 if target_dir is None:
873 raise IOError(errno.ENOENT, "Target directory not found", target_name)
875 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
876 target_dir = target_dir[target_name]
877 target_name = sourcecomponents[-1]
879 return (source_obj, target_dir, target_name)
883 def copy(self, source, target_path, source_collection=None, overwrite=False):
884 """Copy a file or subcollection to a new path in this collection.
887 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
890 Destination file or path. If the target path already exists and is a
891 subcollection, the item will be placed inside the subcollection. If
892 the target path already exists and is a file, this will raise an error
893 unless you specify `overwrite=True`.
896 Collection to copy `source_path` from (default `self`)
899 Whether to overwrite target file if it already exists.
902 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
903 target_dir.add(source_obj, target_name, overwrite, False)
907 def rename(self, source, target_path, source_collection=None, overwrite=False):
908 """Move a file or subcollection from `source_collection` to a new path in this collection.
911 A string with a path to source file or subcollection.
914 Destination file or path. If the target path already exists and is a
915 subcollection, the item will be placed inside the subcollection. If
916 the target path already exists and is a file, this will raise an error
917 unless you specify `overwrite=True`.
920 Collection to copy `source_path` from (default `self`)
923 Whether to overwrite target file if it already exists.
926 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
927 if not source_obj.writable():
928 raise IOError(errno.EROFS, "Source collection is read only", source)
929 target_dir.add(source_obj, target_name, overwrite, True)
931 def portable_manifest_text(self, stream_name="."):
932 """Get the manifest text for this collection, sub collections and files.
934 This method does not flush outstanding blocks to Keep. It will return
935 a normalized manifest with access tokens stripped.
938 Name to use for this stream (directory)
941 return self._get_manifest_text(stream_name, True, True)
944 def manifest_text(self, stream_name=".", strip=False, normalize=False,
945 only_committed=False):
946 """Get the manifest text for this collection, sub collections and files.
948 This method will flush outstanding blocks to Keep. By default, it will
949 not normalize an unmodified manifest or strip access tokens.
952 Name to use for this stream (directory)
955 If True, remove signing tokens from block locators if present.
956 If False (default), block locators are left unchanged.
959 If True, always export the manifest text in normalized form
960 even if the Collection is not modified. If False (default) and the collection
961 is not modified, return the original manifest text even if it is not
965 If True, don't commit pending blocks.
969 if not only_committed:
970 self._my_block_manager().commit_all()
971 return self._get_manifest_text(stream_name, strip, normalize,
972 only_committed=only_committed)
975 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
976 """Get the manifest text for this collection, sub collections and files.
979 Name to use for this stream (directory)
982 If True, remove signing tokens from block locators if present.
983 If False (default), block locators are left unchanged.
986 If True, always export the manifest text in normalized form
987 even if the Collection is not modified. If False (default) and the collection
988 is not modified, return the original manifest text even if it is not
992 If True, only include blocks that were already committed to Keep.
996 if not self.committed() or self._manifest_text is None or normalize:
999 sorted_keys = sorted(self.keys())
1000 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1001 # Create a stream per file `k`
1002 arvfile = self[filename]
1004 for segment in arvfile.segments():
1005 loc = segment.locator
1006 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1009 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1011 loc = KeepLocator(loc).stripped()
1012 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1013 segment.segment_offset, segment.range_size))
1014 stream[filename] = filestream
1016 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1017 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1018 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1022 return self.stripped_manifest()
1024 return self._manifest_text
1027 def diff(self, end_collection, prefix=".", holding_collection=None):
1028 """Generate list of add/modify/delete actions.
1030 When given to `apply`, will change `self` to match `end_collection`
1034 if holding_collection is None:
1035 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1037 if k not in end_collection:
1038 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1039 for k in end_collection:
1041 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1042 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1043 elif end_collection[k] != self[k]:
1044 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1046 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1048 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1053 def apply(self, changes):
1054 """Apply changes from `diff`.
1056 If a change conflicts with a local change, it will be saved to an
1057 alternate path indicating the conflict.
1061 self.set_committed(False)
1062 for change in changes:
1063 event_type = change[0]
1066 local = self.find(path)
1067 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1069 if event_type == ADD:
1071 # No local file at path, safe to copy over new file
1072 self.copy(initial, path)
1073 elif local is not None and local != initial:
1074 # There is already local file and it is different:
1075 # save change to conflict file.
1076 self.copy(initial, conflictpath)
1077 elif event_type == MOD or event_type == TOK:
1079 if local == initial:
1080 # Local matches the "initial" item so it has not
1081 # changed locally and is safe to update.
1082 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1083 # Replace contents of local file with new contents
1084 local.replace_contents(final)
1086 # Overwrite path with new item; this can happen if
1087 # path was a file and is now a collection or vice versa
1088 self.copy(final, path, overwrite=True)
1090 # Local is missing (presumably deleted) or local doesn't
1091 # match the "start" value, so save change to conflict file
1092 self.copy(final, conflictpath)
1093 elif event_type == DEL:
1094 if local == initial:
1095 # Local item matches "initial" value, so it is safe to remove.
1096 self.remove(path, recursive=True)
1097 # else, the file is modified or already removed, in either
1098 # case we don't want to try to remove it.
1100 def portable_data_hash(self):
1101 """Get the portable data hash for this collection's manifest."""
1102 if self._manifest_locator and self.committed():
1103 # If the collection is already saved on the API server, and it's committed
1104 # then return API server's PDH response.
1105 return self._portable_data_hash
1107 stripped = self.portable_manifest_text().encode()
1108 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1111 def subscribe(self, callback):
1112 if self._callback is None:
1113 self._callback = callback
1115 raise errors.ArgumentError("A callback is already set on this collection.")
1118 def unsubscribe(self):
1119 if self._callback is not None:
1120 self._callback = None
1123 def notify(self, event, collection, name, item):
1125 self._callback(event, collection, name, item)
1126 self.root_collection().notify(event, collection, name, item)
1129 def __eq__(self, other):
1132 if not isinstance(other, RichCollectionBase):
1134 if len(self._items) != len(other):
1136 for k in self._items:
1139 if self._items[k] != other[k]:
1143 def __ne__(self, other):
1144 return not self.__eq__(other)
1148 """Flush bufferblocks to Keep."""
1149 for e in listvalues(self):
1153 class Collection(RichCollectionBase):
1154 """Represents the root of an Arvados Collection.
1156 This class is threadsafe. The root collection object, all subcollections
1157 and files are protected by a single lock (i.e. each access locks the entire
1163 :To read an existing file:
1164 `c.open("myfile", "r")`
1166 :To write a new file:
1167 `c.open("myfile", "w")`
1169 :To determine if a file exists:
1170 `c.find("myfile") is not None`
1173 `c.copy("source", "dest")`
1176 `c.remove("myfile")`
1178 :To save to an existing collection record:
1181 :To save a new collection record:
1184 :To merge remote changes into this object:
1187 Must be associated with an API server Collection record (during
1188 initialization, or using `save_new`) to use `save` or `update`
1192 def __init__(self, manifest_locator_or_text=None,
1199 replication_desired=None,
1201 """Collection constructor.
1203 :manifest_locator_or_text:
1204 An Arvados collection UUID, portable data hash, raw manifest
1205 text, or (if creating an empty collection) None.
1208 the parent Collection, may be None.
1211 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1212 Prefer this over supplying your own api_client and keep_client (except in testing).
1213 Will use default config settings if not specified.
1216 The API client object to use for requests. If not specified, create one using `apiconfig`.
1219 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1222 the number of retries for API and Keep requests.
1225 the block manager to use. If not specified, create one.
1227 :replication_desired:
1228 How many copies should Arvados maintain. If None, API server default
1229 configuration applies. If not None, this value will also be used
1230 for determining the number of block copies being written.
1233 super(Collection, self).__init__(parent)
1234 self._api_client = api_client
1235 self._keep_client = keep_client
1236 self._block_manager = block_manager
1237 self.replication_desired = replication_desired
1238 self.put_threads = put_threads
1241 self._config = apiconfig
1243 self._config = config.settings()
1245 self.num_retries = num_retries if num_retries is not None else 0
1246 self._manifest_locator = None
1247 self._manifest_text = None
1248 self._portable_data_hash = None
1249 self._api_response = None
1250 self._past_versions = set()
1252 self.lock = threading.RLock()
1255 if manifest_locator_or_text:
1256 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1257 self._manifest_locator = manifest_locator_or_text
1258 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1259 self._manifest_locator = manifest_locator_or_text
1260 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1261 self._manifest_text = manifest_locator_or_text
1263 raise errors.ArgumentError(
1264 "Argument to CollectionReader is not a manifest or a collection UUID")
1268 except (IOError, errors.SyntaxError) as e:
1269 raise errors.ArgumentError("Error processing manifest text: %s", e)
1271 def root_collection(self):
1274 def get_properties(self):
1275 if self._api_response and self._api_response["properties"]:
1276 return self._api_response["properties"]
1280 def get_trash_at(self):
1281 if self._api_response and self._api_response["trash_at"]:
1282 return ciso8601.parse_datetime(self._api_response["trash_at"])
1286 def stream_name(self):
1293 def known_past_version(self, modified_at_and_portable_data_hash):
1294 return modified_at_and_portable_data_hash in self._past_versions
1298 def update(self, other=None, num_retries=None):
1299 """Merge the latest collection on the API server with the current collection."""
1302 if self._manifest_locator is None:
1303 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1304 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1305 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1306 response.get("portable_data_hash") != self.portable_data_hash()):
1307 # The record on the server is different from our current one, but we've seen it before,
1308 # so ignore it because it's already been merged.
1309 # However, if it's the same as our current record, proceed with the update, because we want to update
1313 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1314 other = CollectionReader(response["manifest_text"])
1315 baseline = CollectionReader(self._manifest_text)
1316 self.apply(baseline.diff(other))
1317 self._manifest_text = self.manifest_text()
1321 if self._api_client is None:
1322 self._api_client = ThreadSafeApiCache(self._config)
1323 if self._keep_client is None:
1324 self._keep_client = self._api_client.keep
1325 return self._api_client
1329 if self._keep_client is None:
1330 if self._api_client is None:
1333 self._keep_client = KeepClient(api_client=self._api_client)
1334 return self._keep_client
1337 def _my_block_manager(self):
1338 if self._block_manager is None:
1339 copies = (self.replication_desired or
1340 self._my_api()._rootDesc.get('defaultCollectionReplication',
1342 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1343 return self._block_manager
1345 def _remember_api_response(self, response):
1346 self._api_response = response
1347 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1349 def _populate_from_api_server(self):
1350 # As in KeepClient itself, we must wait until the last
1351 # possible moment to instantiate an API client, in order to
1352 # avoid tripping up clients that don't have access to an API
1353 # server. If we do build one, make sure our Keep client uses
1354 # it. If instantiation fails, we'll fall back to the except
1355 # clause, just like any other Collection lookup
1356 # failure. Return an exception, or None if successful.
1357 self._remember_api_response(self._my_api().collections().get(
1358 uuid=self._manifest_locator).execute(
1359 num_retries=self.num_retries))
1360 self._manifest_text = self._api_response['manifest_text']
1361 self._portable_data_hash = self._api_response['portable_data_hash']
1362 # If not overriden via kwargs, we should try to load the
1363 # replication_desired from the API server
1364 if self.replication_desired is None:
1365 self.replication_desired = self._api_response.get('replication_desired', None)
1367 def _populate(self):
1368 if self._manifest_text is None:
1369 if self._manifest_locator is None:
1372 self._populate_from_api_server()
1373 self._baseline_manifest = self._manifest_text
1374 self._import_manifest(self._manifest_text)
1376 def _has_collection_uuid(self):
1377 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1379 def __enter__(self):
1382 def __exit__(self, exc_type, exc_value, traceback):
1383 """Support scoped auto-commit in a with: block."""
1384 if exc_type is None:
1385 if self.writable() and self._has_collection_uuid():
1389 def stop_threads(self):
1390 if self._block_manager is not None:
1391 self._block_manager.stop_threads()
1394 def manifest_locator(self):
1395 """Get the manifest locator, if any.
1397 The manifest locator will be set when the collection is loaded from an
1398 API server record or the portable data hash of a manifest.
1400 The manifest locator will be None if the collection is newly created or
1401 was created directly from manifest text. The method `save_new()` will
1402 assign a manifest locator.
1405 return self._manifest_locator
1408 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1409 if new_config is None:
1410 new_config = self._config
1412 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1414 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1416 newcollection._clonefrom(self)
1417 return newcollection
1420 def api_response(self):
1421 """Returns information about this Collection fetched from the API server.
1423 If the Collection exists in Keep but not the API server, currently
1424 returns None. Future versions may provide a synthetic response.
1427 return self._api_response
1429 def find_or_create(self, path, create_type):
1430 """See `RichCollectionBase.find_or_create`"""
1434 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1436 def find(self, path):
1437 """See `RichCollectionBase.find`"""
1441 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1443 def remove(self, path, recursive=False):
1444 """See `RichCollectionBase.remove`"""
1446 raise errors.ArgumentError("Cannot remove '.'")
1448 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1455 storage_classes=None,
1459 """Save collection to an existing collection record.
1461 Commit pending buffer blocks to Keep, merge with remote record (if
1462 merge=True, the default), and update the collection record. Returns
1463 the current manifest text.
1465 Will raise AssertionError if not associated with a collection record on
1466 the API server. If you want to save a manifest to Keep only, see
1470 Additional properties of collection. This value will replace any existing
1471 properties of collection.
1474 Specify desirable storage classes to be used when writing data to Keep.
1477 A collection is *expiring* when it has a *trash_at* time in the future.
1478 An expiring collection can be accessed as normal,
1479 but is scheduled to be trashed automatically at the *trash_at* time.
1482 Update and merge remote changes before saving. Otherwise, any
1483 remote changes will be ignored and overwritten.
1486 Retry count on API calls (if None, use the collection default)
1489 if properties and type(properties) is not dict:
1490 raise errors.ArgumentError("properties must be dictionary type.")
1492 if storage_classes and type(storage_classes) is not list:
1493 raise errors.ArgumentError("storage_classes must be list type.")
1495 if trash_at and type(trash_at) is not datetime.datetime:
1496 raise errors.ArgumentError("trash_at must be datetime type.")
1500 body["properties"] = properties
1502 body["storage_classes_desired"] = storage_classes
1504 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1505 body["trash_at"] = t
1507 if not self.committed():
1508 if not self._has_collection_uuid():
1509 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1511 self._my_block_manager().commit_all()
1516 text = self.manifest_text(strip=False)
1517 body['manifest_text'] = text
1519 self._remember_api_response(self._my_api().collections().update(
1520 uuid=self._manifest_locator,
1522 ).execute(num_retries=num_retries))
1523 self._manifest_text = self._api_response["manifest_text"]
1524 self._portable_data_hash = self._api_response["portable_data_hash"]
1525 self.set_committed(True)
1527 self._remember_api_response(self._my_api().collections().update(
1528 uuid=self._manifest_locator,
1530 ).execute(num_retries=num_retries))
1532 return self._manifest_text
1538 def save_new(self, name=None,
1539 create_collection_record=True,
1542 storage_classes=None,
1544 ensure_unique_name=False,
1546 """Save collection to a new collection record.
1548 Commit pending buffer blocks to Keep and, when create_collection_record
1549 is True (default), create a new collection record. After creating a
1550 new collection record, this Collection object will be associated with
1551 the new record used by `save()`. Returns the current manifest text.
1554 The collection name.
1556 :create_collection_record:
1557 If True, create a collection record on the API server.
1558 If False, only commit blocks to Keep and return the manifest text.
1561 the user, or project uuid that will own this collection.
1562 If None, defaults to the current user.
1565 Additional properties of collection. This value will replace any existing
1566 properties of collection.
1569 Specify desirable storage classes to be used when writing data to Keep.
1572 A collection is *expiring* when it has a *trash_at* time in the future.
1573 An expiring collection can be accessed as normal,
1574 but is scheduled to be trashed automatically at the *trash_at* time.
1576 :ensure_unique_name:
1577 If True, ask the API server to rename the collection
1578 if it conflicts with a collection with the same name and owner. If
1579 False, a name conflict will result in an error.
1582 Retry count on API calls (if None, use the collection default)
1585 if properties and type(properties) is not dict:
1586 raise errors.ArgumentError("properties must be dictionary type.")
1588 if storage_classes and type(storage_classes) is not list:
1589 raise errors.ArgumentError("storage_classes must be list type.")
1591 if trash_at and type(trash_at) is not datetime.datetime:
1592 raise errors.ArgumentError("trash_at must be datetime type.")
1594 self._my_block_manager().commit_all()
1595 text = self.manifest_text(strip=False)
1597 if create_collection_record:
1599 name = "New collection"
1600 ensure_unique_name = True
1602 body = {"manifest_text": text,
1604 "replication_desired": self.replication_desired}
1606 body["owner_uuid"] = owner_uuid
1608 body["properties"] = properties
1610 body["storage_classes_desired"] = storage_classes
1612 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1613 body["trash_at"] = t
1615 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1616 text = self._api_response["manifest_text"]
1618 self._manifest_locator = self._api_response["uuid"]
1619 self._portable_data_hash = self._api_response["portable_data_hash"]
1621 self._manifest_text = text
1622 self.set_committed(True)
1626 _token_re = re.compile(r'(\S+)(\s+|$)')
1627 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1628 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1631 def _import_manifest(self, manifest_text):
1632 """Import a manifest into a `Collection`.
1635 The manifest text to import from.
1639 raise ArgumentError("Can only import manifest into an empty collection")
1648 for token_and_separator in self._token_re.finditer(manifest_text):
1649 tok = token_and_separator.group(1)
1650 sep = token_and_separator.group(2)
1652 if state == STREAM_NAME:
1653 # starting a new stream
1654 stream_name = tok.replace('\\040', ' ')
1659 self.find_or_create(stream_name, COLLECTION)
1663 block_locator = self._block_re.match(tok)
1665 blocksize = int(block_locator.group(1))
1666 blocks.append(Range(tok, streamoffset, blocksize, 0))
1667 streamoffset += blocksize
1671 if state == SEGMENTS:
1672 file_segment = self._segment_re.match(tok)
1674 pos = int(file_segment.group(1))
1675 size = int(file_segment.group(2))
1676 name = file_segment.group(3).replace('\\040', ' ')
1677 filepath = os.path.join(stream_name, name)
1678 afile = self.find_or_create(filepath, FILE)
1679 if isinstance(afile, ArvadosFile):
1680 afile.add_segment(blocks, pos, size)
1682 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1685 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1691 self.set_committed(True)
1694 def notify(self, event, collection, name, item):
1696 self._callback(event, collection, name, item)
1699 class Subcollection(RichCollectionBase):
1700 """This is a subdirectory within a collection that doesn't have its own API
1703 Subcollection locking falls under the umbrella lock of its root collection.
1707 def __init__(self, parent, name):
1708 super(Subcollection, self).__init__(parent)
1709 self.lock = self.root_collection().lock
1710 self._manifest_text = None
1712 self.num_retries = parent.num_retries
1714 def root_collection(self):
1715 return self.parent.root_collection()
1718 return self.root_collection().writable()
1721 return self.root_collection()._my_api()
1724 return self.root_collection()._my_keep()
1726 def _my_block_manager(self):
1727 return self.root_collection()._my_block_manager()
1729 def stream_name(self):
1730 return os.path.join(self.parent.stream_name(), self.name)
1733 def clone(self, new_parent, new_name):
1734 c = Subcollection(new_parent, new_name)
1740 def _reparent(self, newparent, newname):
1741 self.set_committed(False)
1743 self.parent.remove(self.name, recursive=True)
1744 self.parent = newparent
1746 self.lock = self.parent.root_collection().lock
1749 class CollectionReader(Collection):
1750 """A read-only collection object.
1752 Initialize from a collection UUID or portable data hash, or raw
1753 manifest text. See `Collection` constructor for detailed options.
1756 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1757 self._in_init = True
1758 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1759 self._in_init = False
1761 # Forego any locking since it should never change once initialized.
1762 self.lock = NoopLock()
1764 # Backwards compatability with old CollectionReader
1765 # all_streams() and all_files()
1766 self._streams = None
1769 return self._in_init
1771 def _populate_streams(orig_func):
1772 @functools.wraps(orig_func)
1773 def populate_streams_wrapper(self, *args, **kwargs):
1774 # Defer populating self._streams until needed since it creates a copy of the manifest.
1775 if self._streams is None:
1776 if self._manifest_text:
1777 self._streams = [sline.split()
1778 for sline in self._manifest_text.split("\n")
1782 return orig_func(self, *args, **kwargs)
1783 return populate_streams_wrapper
1786 def normalize(self):
1787 """Normalize the streams returned by `all_streams`.
1789 This method is kept for backwards compatability and only affects the
1790 behavior of `all_streams()` and `all_files()`
1796 for s in self.all_streams():
1797 for f in s.all_files():
1798 streamname, filename = split(s.name() + "/" + f.name())
1799 if streamname not in streams:
1800 streams[streamname] = {}
1801 if filename not in streams[streamname]:
1802 streams[streamname][filename] = []
1803 for r in f.segments:
1804 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1806 self._streams = [normalize_stream(s, streams[s])
1807 for s in sorted(streams)]
1809 def all_streams(self):
1810 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1811 for s in self._streams]
1814 def all_files(self):
1815 for s in self.all_streams():
1816 for f in s.all_files():