1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
21 from collections import deque
24 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
25 from .keep import KeepLocator, KeepClient
26 from .stream import StreamReader
27 from ._normalize_stream import normalize_stream
28 from ._ranges import Range, LocatorAndRange
29 from .safeapi import ThreadSafeApiCache
30 import arvados.config as config
31 import arvados.errors as errors
33 import arvados.events as events
34 from arvados.retry import retry_method
36 _logger = logging.getLogger('arvados.collection')
38 class CollectionBase(object):
39 """Abstract base class for Collection classes."""
44 def __exit__(self, exc_type, exc_value, traceback):
48 if self._keep_client is None:
49 self._keep_client = KeepClient(api_client=self._api_client,
50 num_retries=self.num_retries)
51 return self._keep_client
53 def stripped_manifest(self):
54 """Get the manifest with locator hints stripped.
56 Return the manifest for the current collection with all
57 non-portable hints (i.e., permission signatures and other
58 hints other than size hints) removed from the locators.
60 raw = self.manifest_text()
62 for line in raw.split("\n"):
65 clean_fields = fields[:1] + [
66 (re.sub(r'\+[^\d][^\+]*', '', x)
67 if re.match(arvados.util.keep_locator_pattern, x)
70 clean += [' '.join(clean_fields), "\n"]
74 class _WriterFile(_FileLikeObjectBase):
75 def __init__(self, coll_writer, name):
76 super(_WriterFile, self).__init__(name, 'wb')
77 self.dest = coll_writer
80 super(_WriterFile, self).close()
81 self.dest.finish_current_file()
83 @_FileLikeObjectBase._before_close
84 def write(self, data):
87 @_FileLikeObjectBase._before_close
88 def writelines(self, seq):
92 @_FileLikeObjectBase._before_close
94 self.dest.flush_data()
97 class CollectionWriter(CollectionBase):
98 """Deprecated, use Collection instead."""
100 def __init__(self, api_client=None, num_retries=0, replication=None):
101 """Instantiate a CollectionWriter.
103 CollectionWriter lets you build a new Arvados Collection from scratch.
104 Write files to it. The CollectionWriter will upload data to Keep as
105 appropriate, and provide you with the Collection manifest text when
109 * api_client: The API client to use to look up Collections. If not
110 provided, CollectionReader will build one from available Arvados
112 * num_retries: The default number of times to retry failed
113 service requests. Default 0. You may change this value
114 after instantiation, but note those changes may not
115 propagate to related objects like the Keep client.
116 * replication: The number of copies of each block to store.
117 If this argument is None or not supplied, replication is
118 the server-provided default if available, otherwise 2.
120 self._api_client = api_client
121 self.num_retries = num_retries
122 self.replication = (2 if replication is None else replication)
123 self._keep_client = None
124 self._data_buffer = []
125 self._data_buffer_len = 0
126 self._current_stream_files = []
127 self._current_stream_length = 0
128 self._current_stream_locators = []
129 self._current_stream_name = '.'
130 self._current_file_name = None
131 self._current_file_pos = 0
132 self._finished_streams = []
133 self._close_file = None
134 self._queued_file = None
135 self._queued_dirents = deque()
136 self._queued_trees = deque()
137 self._last_open = None
139 def __exit__(self, exc_type, exc_value, traceback):
143 def do_queued_work(self):
144 # The work queue consists of three pieces:
145 # * _queued_file: The file object we're currently writing to the
147 # * _queued_dirents: Entries under the current directory
148 # (_queued_trees[0]) that we want to write or recurse through.
149 # This may contain files from subdirectories if
150 # max_manifest_depth == 0 for this directory.
151 # * _queued_trees: Directories that should be written as separate
152 # streams to the Collection.
153 # This function handles the smallest piece of work currently queued
154 # (current file, then current directory, then next directory) until
155 # no work remains. The _work_THING methods each do a unit of work on
156 # THING. _queue_THING methods add a THING to the work queue.
158 if self._queued_file:
160 elif self._queued_dirents:
162 elif self._queued_trees:
167 def _work_file(self):
169 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
173 self.finish_current_file()
175 self._queued_file.close()
176 self._close_file = None
177 self._queued_file = None
179 def _work_dirents(self):
180 path, stream_name, max_manifest_depth = self._queued_trees[0]
181 if stream_name != self.current_stream_name():
182 self.start_new_stream(stream_name)
183 while self._queued_dirents:
184 dirent = self._queued_dirents.popleft()
185 target = os.path.join(path, dirent)
186 if os.path.isdir(target):
187 self._queue_tree(target,
188 os.path.join(stream_name, dirent),
189 max_manifest_depth - 1)
191 self._queue_file(target, dirent)
193 if not self._queued_dirents:
194 self._queued_trees.popleft()
196 def _work_trees(self):
197 path, stream_name, max_manifest_depth = self._queued_trees[0]
198 d = arvados.util.listdir_recursive(
199 path, max_depth = (None if max_manifest_depth == 0 else 0))
201 self._queue_dirents(stream_name, d)
203 self._queued_trees.popleft()
205 def _queue_file(self, source, filename=None):
206 assert (self._queued_file is None), "tried to queue more than one file"
207 if not hasattr(source, 'read'):
208 source = open(source, 'rb')
209 self._close_file = True
211 self._close_file = False
213 filename = os.path.basename(source.name)
214 self.start_new_file(filename)
215 self._queued_file = source
217 def _queue_dirents(self, stream_name, dirents):
218 assert (not self._queued_dirents), "tried to queue more than one tree"
219 self._queued_dirents = deque(sorted(dirents))
221 def _queue_tree(self, path, stream_name, max_manifest_depth):
222 self._queued_trees.append((path, stream_name, max_manifest_depth))
224 def write_file(self, source, filename=None):
225 self._queue_file(source, filename)
226 self.do_queued_work()
228 def write_directory_tree(self,
229 path, stream_name='.', max_manifest_depth=-1):
230 self._queue_tree(path, stream_name, max_manifest_depth)
231 self.do_queued_work()
233 def write(self, newdata):
234 if isinstance(newdata, bytes):
236 elif isinstance(newdata, str):
237 newdata = newdata.encode()
238 elif hasattr(newdata, '__iter__'):
242 self._data_buffer.append(newdata)
243 self._data_buffer_len += len(newdata)
244 self._current_stream_length += len(newdata)
245 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
248 def open(self, streampath, filename=None):
249 """open(streampath[, filename]) -> file-like object
251 Pass in the path of a file to write to the Collection, either as a
252 single string or as two separate stream name and file name arguments.
253 This method returns a file-like object you can write to add it to the
256 You may only have one file object from the Collection open at a time,
257 so be sure to close the object when you're done. Using the object in
258 a with statement makes that easy::
260 with cwriter.open('./doc/page1.txt') as outfile:
261 outfile.write(page1_data)
262 with cwriter.open('./doc/page2.txt') as outfile:
263 outfile.write(page2_data)
266 streampath, filename = split(streampath)
267 if self._last_open and not self._last_open.closed:
268 raise errors.AssertionError(
269 "can't open '{}' when '{}' is still open".format(
270 filename, self._last_open.name))
271 if streampath != self.current_stream_name():
272 self.start_new_stream(streampath)
273 self.set_current_file_name(filename)
274 self._last_open = _WriterFile(self, filename)
275 return self._last_open
277 def flush_data(self):
278 data_buffer = b''.join(self._data_buffer)
280 self._current_stream_locators.append(
282 data_buffer[0:config.KEEP_BLOCK_SIZE],
283 copies=self.replication))
284 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
285 self._data_buffer_len = len(self._data_buffer[0])
287 def start_new_file(self, newfilename=None):
288 self.finish_current_file()
289 self.set_current_file_name(newfilename)
291 def set_current_file_name(self, newfilename):
292 if re.search(r'[\t\n]', newfilename):
293 raise errors.AssertionError(
294 "Manifest filenames cannot contain whitespace: %s" %
296 elif re.search(r'\x00', newfilename):
297 raise errors.AssertionError(
298 "Manifest filenames cannot contain NUL characters: %s" %
300 self._current_file_name = newfilename
302 def current_file_name(self):
303 return self._current_file_name
305 def finish_current_file(self):
306 if self._current_file_name is None:
307 if self._current_file_pos == self._current_stream_length:
309 raise errors.AssertionError(
310 "Cannot finish an unnamed file " +
311 "(%d bytes at offset %d in '%s' stream)" %
312 (self._current_stream_length - self._current_file_pos,
313 self._current_file_pos,
314 self._current_stream_name))
315 self._current_stream_files.append([
316 self._current_file_pos,
317 self._current_stream_length - self._current_file_pos,
318 self._current_file_name])
319 self._current_file_pos = self._current_stream_length
320 self._current_file_name = None
322 def start_new_stream(self, newstreamname='.'):
323 self.finish_current_stream()
324 self.set_current_stream_name(newstreamname)
326 def set_current_stream_name(self, newstreamname):
327 if re.search(r'[\t\n]', newstreamname):
328 raise errors.AssertionError(
329 "Manifest stream names cannot contain whitespace: '%s'" %
331 self._current_stream_name = '.' if newstreamname=='' else newstreamname
333 def current_stream_name(self):
334 return self._current_stream_name
336 def finish_current_stream(self):
337 self.finish_current_file()
339 if not self._current_stream_files:
341 elif self._current_stream_name is None:
342 raise errors.AssertionError(
343 "Cannot finish an unnamed stream (%d bytes in %d files)" %
344 (self._current_stream_length, len(self._current_stream_files)))
346 if not self._current_stream_locators:
347 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
348 self._finished_streams.append([self._current_stream_name,
349 self._current_stream_locators,
350 self._current_stream_files])
351 self._current_stream_files = []
352 self._current_stream_length = 0
353 self._current_stream_locators = []
354 self._current_stream_name = None
355 self._current_file_pos = 0
356 self._current_file_name = None
359 """Store the manifest in Keep and return its locator.
361 This is useful for storing manifest fragments (task outputs)
362 temporarily in Keep during a Crunch job.
364 In other cases you should make a collection instead, by
365 sending manifest_text() to the API server's "create
366 collection" endpoint.
368 return self._my_keep().put(self.manifest_text().encode(),
369 copies=self.replication)
371 def portable_data_hash(self):
372 stripped = self.stripped_manifest().encode()
373 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
375 def manifest_text(self):
376 self.finish_current_stream()
379 for stream in self._finished_streams:
380 if not re.search(r'^\.(/.*)?$', stream[0]):
382 manifest += stream[0].replace(' ', '\\040')
383 manifest += ' ' + ' '.join(stream[1])
384 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
389 def data_locators(self):
391 for name, locators, files in self._finished_streams:
395 def save_new(self, name=None):
396 return self._api_client.collections().create(
397 ensure_unique_name=True,
400 'manifest_text': self.manifest_text(),
401 }).execute(num_retries=self.num_retries)
404 class ResumableCollectionWriter(CollectionWriter):
405 """Deprecated, use Collection instead."""
407 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
408 '_current_stream_locators', '_current_stream_name',
409 '_current_file_name', '_current_file_pos', '_close_file',
410 '_data_buffer', '_dependencies', '_finished_streams',
411 '_queued_dirents', '_queued_trees']
413 def __init__(self, api_client=None, **kwargs):
414 self._dependencies = {}
415 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
418 def from_state(cls, state, *init_args, **init_kwargs):
419 # Try to build a new writer from scratch with the given state.
420 # If the state is not suitable to resume (because files have changed,
421 # been deleted, aren't predictable, etc.), raise a
422 # StaleWriterStateError. Otherwise, return the initialized writer.
423 # The caller is responsible for calling writer.do_queued_work()
424 # appropriately after it's returned.
425 writer = cls(*init_args, **init_kwargs)
426 for attr_name in cls.STATE_PROPS:
427 attr_value = state[attr_name]
428 attr_class = getattr(writer, attr_name).__class__
429 # Coerce the value into the same type as the initial value, if
431 if attr_class not in (type(None), attr_value.__class__):
432 attr_value = attr_class(attr_value)
433 setattr(writer, attr_name, attr_value)
434 # Check dependencies before we try to resume anything.
435 if any(KeepLocator(ls).permission_expired()
436 for ls in writer._current_stream_locators):
437 raise errors.StaleWriterStateError(
438 "locators include expired permission hint")
439 writer.check_dependencies()
440 if state['_current_file'] is not None:
441 path, pos = state['_current_file']
443 writer._queued_file = open(path, 'rb')
444 writer._queued_file.seek(pos)
445 except IOError as error:
446 raise errors.StaleWriterStateError(
447 "failed to reopen active file {}: {}".format(path, error))
450 def check_dependencies(self):
451 for path, orig_stat in listitems(self._dependencies):
452 if not S_ISREG(orig_stat[ST_MODE]):
453 raise errors.StaleWriterStateError("{} not file".format(path))
455 now_stat = tuple(os.stat(path))
456 except OSError as error:
457 raise errors.StaleWriterStateError(
458 "failed to stat {}: {}".format(path, error))
459 if ((not S_ISREG(now_stat[ST_MODE])) or
460 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
461 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
462 raise errors.StaleWriterStateError("{} changed".format(path))
464 def dump_state(self, copy_func=lambda x: x):
465 state = {attr: copy_func(getattr(self, attr))
466 for attr in self.STATE_PROPS}
467 if self._queued_file is None:
468 state['_current_file'] = None
470 state['_current_file'] = (os.path.realpath(self._queued_file.name),
471 self._queued_file.tell())
474 def _queue_file(self, source, filename=None):
476 src_path = os.path.realpath(source)
478 raise errors.AssertionError("{} not a file path".format(source))
480 path_stat = os.stat(src_path)
481 except OSError as stat_error:
483 super(ResumableCollectionWriter, self)._queue_file(source, filename)
484 fd_stat = os.fstat(self._queued_file.fileno())
485 if not S_ISREG(fd_stat.st_mode):
486 # We won't be able to resume from this cache anyway, so don't
487 # worry about further checks.
488 self._dependencies[source] = tuple(fd_stat)
489 elif path_stat is None:
490 raise errors.AssertionError(
491 "could not stat {}: {}".format(source, stat_error))
492 elif path_stat.st_ino != fd_stat.st_ino:
493 raise errors.AssertionError(
494 "{} changed between open and stat calls".format(source))
496 self._dependencies[src_path] = tuple(fd_stat)
498 def write(self, data):
499 if self._queued_file is None:
500 raise errors.AssertionError(
501 "resumable writer can't accept unsourced data")
502 return super(ResumableCollectionWriter, self).write(data)
510 COLLECTION = "collection"
512 class RichCollectionBase(CollectionBase):
513 """Base class for Collections and Subcollections.
515 Implements the majority of functionality relating to accessing items in the
520 def __init__(self, parent=None):
522 self._committed = False
523 self._has_remote_blocks = False
524 self._callback = None
528 raise NotImplementedError()
531 raise NotImplementedError()
533 def _my_block_manager(self):
534 raise NotImplementedError()
537 raise NotImplementedError()
539 def root_collection(self):
540 raise NotImplementedError()
542 def notify(self, event, collection, name, item):
543 raise NotImplementedError()
545 def stream_name(self):
546 raise NotImplementedError()
549 def has_remote_blocks(self):
550 """Recursively check for a +R segment locator signature."""
552 if self._has_remote_blocks:
555 if self[item].has_remote_blocks():
560 def set_has_remote_blocks(self, val):
561 self._has_remote_blocks = val
563 self.parent.set_has_remote_blocks(val)
567 def find_or_create(self, path, create_type):
568 """Recursively search the specified file path.
570 May return either a `Collection` or `ArvadosFile`. If not found, will
571 create a new item at the specified path based on `create_type`. Will
572 create intermediate subcollections needed to contain the final item in
576 One of `arvados.collection.FILE` or
577 `arvados.collection.COLLECTION`. If the path is not found, and value
578 of create_type is FILE then create and return a new ArvadosFile for
579 the last path component. If COLLECTION, then create and return a new
580 Collection for the last path component.
584 pathcomponents = path.split("/", 1)
585 if pathcomponents[0]:
586 item = self._items.get(pathcomponents[0])
587 if len(pathcomponents) == 1:
590 if create_type == COLLECTION:
591 item = Subcollection(self, pathcomponents[0])
593 item = ArvadosFile(self, pathcomponents[0])
594 self._items[pathcomponents[0]] = item
595 self.set_committed(False)
596 self.notify(ADD, self, pathcomponents[0], item)
600 # create new collection
601 item = Subcollection(self, pathcomponents[0])
602 self._items[pathcomponents[0]] = item
603 self.set_committed(False)
604 self.notify(ADD, self, pathcomponents[0], item)
605 if isinstance(item, RichCollectionBase):
606 return item.find_or_create(pathcomponents[1], create_type)
608 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
613 def find(self, path):
614 """Recursively search the specified file path.
616 May return either a Collection or ArvadosFile. Return None if not
618 If path is invalid (ex: starts with '/'), an IOError exception will be
623 raise errors.ArgumentError("Parameter 'path' is empty.")
625 pathcomponents = path.split("/", 1)
626 if pathcomponents[0] == '':
627 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
629 item = self._items.get(pathcomponents[0])
632 elif len(pathcomponents) == 1:
635 if isinstance(item, RichCollectionBase):
636 if pathcomponents[1]:
637 return item.find(pathcomponents[1])
641 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
644 def mkdirs(self, path):
645 """Recursive subcollection create.
647 Like `os.makedirs()`. Will create intermediate subcollections needed
648 to contain the leaf subcollection path.
652 if self.find(path) != None:
653 raise IOError(errno.EEXIST, "Directory or file exists", path)
655 return self.find_or_create(path, COLLECTION)
657 def open(self, path, mode="r"):
658 """Open a file-like object for access.
661 path to a file in the collection
663 a string consisting of "r", "w", or "a", optionally followed
664 by "b" or "t", optionally followed by "+".
666 binary mode: write() accepts bytes, read() returns bytes.
668 text mode (default): write() accepts strings, read() returns strings.
672 opens for reading and writing. Reads/writes share a file pointer.
674 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
676 opens for reading and writing. All writes are appended to
677 the end of the file. Writing does not affect the file pointer for
681 if not re.search(r'^[rwa][bt]?\+?$', mode):
682 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
684 if mode[0] == 'r' and '+' not in mode:
685 fclass = ArvadosFileReader
686 arvfile = self.find(path)
687 elif not self.writable():
688 raise IOError(errno.EROFS, "Collection is read only")
690 fclass = ArvadosFileWriter
691 arvfile = self.find_or_create(path, FILE)
694 raise IOError(errno.ENOENT, "File not found", path)
695 if not isinstance(arvfile, ArvadosFile):
696 raise IOError(errno.EISDIR, "Is a directory", path)
701 return fclass(arvfile, mode=mode, num_retries=self.num_retries)
704 """Determine if the collection has been modified since last commited."""
705 return not self.committed()
709 """Determine if the collection has been committed to the API server."""
710 return self._committed
713 def set_committed(self, value=True):
714 """Recursively set committed flag.
716 If value is True, set committed to be True for this and all children.
718 If value is False, set committed to be False for this and all parents.
720 if value == self._committed:
723 for k,v in listitems(self._items):
724 v.set_committed(True)
725 self._committed = True
727 self._committed = False
728 if self.parent is not None:
729 self.parent.set_committed(False)
733 """Iterate over names of files and collections contained in this collection."""
734 return iter(viewkeys(self._items))
737 def __getitem__(self, k):
738 """Get a file or collection that is directly contained by this collection.
740 If you want to search a path, use `find()` instead.
743 return self._items[k]
746 def __contains__(self, k):
747 """Test if there is a file or collection a directly contained by this collection."""
748 return k in self._items
752 """Get the number of items directly contained in this collection."""
753 return len(self._items)
757 def __delitem__(self, p):
758 """Delete an item by name which is directly contained by this collection."""
760 self.set_committed(False)
761 self.notify(DEL, self, p, None)
765 """Get a list of names of files and collections directly contained in this collection."""
766 return self._items.keys()
770 """Get a list of files and collection objects directly contained in this collection."""
771 return listvalues(self._items)
775 """Get a list of (name, object) tuples directly contained in this collection."""
776 return listitems(self._items)
778 def exists(self, path):
779 """Test if there is a file or collection at `path`."""
780 return self.find(path) is not None
784 def remove(self, path, recursive=False):
785 """Remove the file or subcollection (directory) at `path`.
788 Specify whether to remove non-empty subcollections (True), or raise an error (False).
792 raise errors.ArgumentError("Parameter 'path' is empty.")
794 pathcomponents = path.split("/", 1)
795 item = self._items.get(pathcomponents[0])
797 raise IOError(errno.ENOENT, "File not found", path)
798 if len(pathcomponents) == 1:
799 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
800 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
801 deleteditem = self._items[pathcomponents[0]]
802 del self._items[pathcomponents[0]]
803 self.set_committed(False)
804 self.notify(DEL, self, pathcomponents[0], deleteditem)
806 item.remove(pathcomponents[1])
808 def _clonefrom(self, source):
809 for k,v in listitems(source):
810 self._items[k] = v.clone(self, k)
813 raise NotImplementedError()
817 def add(self, source_obj, target_name, overwrite=False, reparent=False):
818 """Copy or move a file or subcollection to this collection.
821 An ArvadosFile, or Subcollection object
824 Destination item name. If the target name already exists and is a
825 file, this will raise an error unless you specify `overwrite=True`.
828 Whether to overwrite target file if it already exists.
831 If True, source_obj will be moved from its parent collection to this collection.
832 If False, source_obj will be copied and the parent collection will be
837 if target_name in self and not overwrite:
838 raise IOError(errno.EEXIST, "File already exists", target_name)
841 if target_name in self:
842 modified_from = self[target_name]
844 # Actually make the move or copy.
846 source_obj._reparent(self, target_name)
849 item = source_obj.clone(self, target_name)
851 self._items[target_name] = item
852 self.set_committed(False)
853 if not self._has_remote_blocks and source_obj.has_remote_blocks():
854 self.set_has_remote_blocks(True)
857 self.notify(MOD, self, target_name, (modified_from, item))
859 self.notify(ADD, self, target_name, item)
861 def _get_src_target(self, source, target_path, source_collection, create_dest):
862 if source_collection is None:
863 source_collection = self
866 if isinstance(source, basestring):
867 source_obj = source_collection.find(source)
868 if source_obj is None:
869 raise IOError(errno.ENOENT, "File not found", source)
870 sourcecomponents = source.split("/")
873 sourcecomponents = None
875 # Find parent collection the target path
876 targetcomponents = target_path.split("/")
878 # Determine the name to use.
879 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
882 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
885 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
887 if len(targetcomponents) > 1:
888 target_dir = self.find("/".join(targetcomponents[0:-1]))
892 if target_dir is None:
893 raise IOError(errno.ENOENT, "Target directory not found", target_name)
895 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
896 target_dir = target_dir[target_name]
897 target_name = sourcecomponents[-1]
899 return (source_obj, target_dir, target_name)
903 def copy(self, source, target_path, source_collection=None, overwrite=False):
904 """Copy a file or subcollection to a new path in this collection.
907 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
910 Destination file or path. If the target path already exists and is a
911 subcollection, the item will be placed inside the subcollection. If
912 the target path already exists and is a file, this will raise an error
913 unless you specify `overwrite=True`.
916 Collection to copy `source_path` from (default `self`)
919 Whether to overwrite target file if it already exists.
922 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
923 target_dir.add(source_obj, target_name, overwrite, False)
927 def rename(self, source, target_path, source_collection=None, overwrite=False):
928 """Move a file or subcollection from `source_collection` to a new path in this collection.
931 A string with a path to source file or subcollection.
934 Destination file or path. If the target path already exists and is a
935 subcollection, the item will be placed inside the subcollection. If
936 the target path already exists and is a file, this will raise an error
937 unless you specify `overwrite=True`.
940 Collection to copy `source_path` from (default `self`)
943 Whether to overwrite target file if it already exists.
946 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
947 if not source_obj.writable():
948 raise IOError(errno.EROFS, "Source collection is read only", source)
949 target_dir.add(source_obj, target_name, overwrite, True)
951 def portable_manifest_text(self, stream_name="."):
952 """Get the manifest text for this collection, sub collections and files.
954 This method does not flush outstanding blocks to Keep. It will return
955 a normalized manifest with access tokens stripped.
958 Name to use for this stream (directory)
961 return self._get_manifest_text(stream_name, True, True)
964 def manifest_text(self, stream_name=".", strip=False, normalize=False,
965 only_committed=False):
966 """Get the manifest text for this collection, sub collections and files.
968 This method will flush outstanding blocks to Keep. By default, it will
969 not normalize an unmodified manifest or strip access tokens.
972 Name to use for this stream (directory)
975 If True, remove signing tokens from block locators if present.
976 If False (default), block locators are left unchanged.
979 If True, always export the manifest text in normalized form
980 even if the Collection is not modified. If False (default) and the collection
981 is not modified, return the original manifest text even if it is not
985 If True, don't commit pending blocks.
989 if not only_committed:
990 self._my_block_manager().commit_all()
991 return self._get_manifest_text(stream_name, strip, normalize,
992 only_committed=only_committed)
995 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
996 """Get the manifest text for this collection, sub collections and files.
999 Name to use for this stream (directory)
1002 If True, remove signing tokens from block locators if present.
1003 If False (default), block locators are left unchanged.
1006 If True, always export the manifest text in normalized form
1007 even if the Collection is not modified. If False (default) and the collection
1008 is not modified, return the original manifest text even if it is not
1012 If True, only include blocks that were already committed to Keep.
1016 if not self.committed() or self._manifest_text is None or normalize:
1019 sorted_keys = sorted(self.keys())
1020 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1021 # Create a stream per file `k`
1022 arvfile = self[filename]
1024 for segment in arvfile.segments():
1025 loc = segment.locator
1026 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1029 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1031 loc = KeepLocator(loc).stripped()
1032 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1033 segment.segment_offset, segment.range_size))
1034 stream[filename] = filestream
1036 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1037 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1038 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1042 return self.stripped_manifest()
1044 return self._manifest_text
1047 def _copy_remote_blocks(self, remote_blocks={}):
1048 """Scan through the entire collection and ask Keep to copy remote blocks.
1050 When accessing a remote collection, blocks will have a remote signature
1051 (+R instead of +A). Collect these signatures and request Keep to copy the
1052 blocks to the local cluster, returning local (+A) signatures.
1055 Shared cache of remote to local block mappings. This is used to avoid
1056 doing extra work when blocks are shared by more than one file in
1057 different subdirectories.
1061 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1062 return remote_blocks
1065 def diff(self, end_collection, prefix=".", holding_collection=None):
1066 """Generate list of add/modify/delete actions.
1068 When given to `apply`, will change `self` to match `end_collection`
1072 if holding_collection is None:
1073 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1075 if k not in end_collection:
1076 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1077 for k in end_collection:
1079 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1080 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1081 elif end_collection[k] != self[k]:
1082 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1084 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1086 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1091 def apply(self, changes):
1092 """Apply changes from `diff`.
1094 If a change conflicts with a local change, it will be saved to an
1095 alternate path indicating the conflict.
1099 self.set_committed(False)
1100 for change in changes:
1101 event_type = change[0]
1104 local = self.find(path)
1105 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1107 if event_type == ADD:
1109 # No local file at path, safe to copy over new file
1110 self.copy(initial, path)
1111 elif local is not None and local != initial:
1112 # There is already local file and it is different:
1113 # save change to conflict file.
1114 self.copy(initial, conflictpath)
1115 elif event_type == MOD or event_type == TOK:
1117 if local == initial:
1118 # Local matches the "initial" item so it has not
1119 # changed locally and is safe to update.
1120 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1121 # Replace contents of local file with new contents
1122 local.replace_contents(final)
1124 # Overwrite path with new item; this can happen if
1125 # path was a file and is now a collection or vice versa
1126 self.copy(final, path, overwrite=True)
1128 # Local is missing (presumably deleted) or local doesn't
1129 # match the "start" value, so save change to conflict file
1130 self.copy(final, conflictpath)
1131 elif event_type == DEL:
1132 if local == initial:
1133 # Local item matches "initial" value, so it is safe to remove.
1134 self.remove(path, recursive=True)
1135 # else, the file is modified or already removed, in either
1136 # case we don't want to try to remove it.
1138 def portable_data_hash(self):
1139 """Get the portable data hash for this collection's manifest."""
1140 if self._manifest_locator and self.committed():
1141 # If the collection is already saved on the API server, and it's committed
1142 # then return API server's PDH response.
1143 return self._portable_data_hash
1145 stripped = self.portable_manifest_text().encode()
1146 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1149 def subscribe(self, callback):
1150 if self._callback is None:
1151 self._callback = callback
1153 raise errors.ArgumentError("A callback is already set on this collection.")
1156 def unsubscribe(self):
1157 if self._callback is not None:
1158 self._callback = None
1161 def notify(self, event, collection, name, item):
1163 self._callback(event, collection, name, item)
1164 self.root_collection().notify(event, collection, name, item)
1167 def __eq__(self, other):
1170 if not isinstance(other, RichCollectionBase):
1172 if len(self._items) != len(other):
1174 for k in self._items:
1177 if self._items[k] != other[k]:
1181 def __ne__(self, other):
1182 return not self.__eq__(other)
1186 """Flush bufferblocks to Keep."""
1187 for e in listvalues(self):
1191 class Collection(RichCollectionBase):
1192 """Represents the root of an Arvados Collection.
1194 This class is threadsafe. The root collection object, all subcollections
1195 and files are protected by a single lock (i.e. each access locks the entire
1201 :To read an existing file:
1202 `c.open("myfile", "r")`
1204 :To write a new file:
1205 `c.open("myfile", "w")`
1207 :To determine if a file exists:
1208 `c.find("myfile") is not None`
1211 `c.copy("source", "dest")`
1214 `c.remove("myfile")`
1216 :To save to an existing collection record:
1219 :To save a new collection record:
1222 :To merge remote changes into this object:
1225 Must be associated with an API server Collection record (during
1226 initialization, or using `save_new`) to use `save` or `update`
1230 def __init__(self, manifest_locator_or_text=None,
1237 replication_desired=None,
1239 """Collection constructor.
1241 :manifest_locator_or_text:
1242 An Arvados collection UUID, portable data hash, raw manifest
1243 text, or (if creating an empty collection) None.
1246 the parent Collection, may be None.
1249 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1250 Prefer this over supplying your own api_client and keep_client (except in testing).
1251 Will use default config settings if not specified.
1254 The API client object to use for requests. If not specified, create one using `apiconfig`.
1257 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1260 the number of retries for API and Keep requests.
1263 the block manager to use. If not specified, create one.
1265 :replication_desired:
1266 How many copies should Arvados maintain. If None, API server default
1267 configuration applies. If not None, this value will also be used
1268 for determining the number of block copies being written.
1271 super(Collection, self).__init__(parent)
1272 self._api_client = api_client
1273 self._keep_client = keep_client
1274 self._block_manager = block_manager
1275 self.replication_desired = replication_desired
1276 self.put_threads = put_threads
1279 self._config = apiconfig
1281 self._config = config.settings()
1283 self.num_retries = num_retries if num_retries is not None else 0
1284 self._manifest_locator = None
1285 self._manifest_text = None
1286 self._portable_data_hash = None
1287 self._api_response = None
1288 self._past_versions = set()
1290 self.lock = threading.RLock()
1293 if manifest_locator_or_text:
1294 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1295 self._manifest_locator = manifest_locator_or_text
1296 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1297 self._manifest_locator = manifest_locator_or_text
1298 if not self._has_local_collection_uuid():
1299 self._has_remote_blocks = True
1300 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1301 self._manifest_text = manifest_locator_or_text
1302 if '+R' in self._manifest_text:
1303 self._has_remote_blocks = True
1305 raise errors.ArgumentError(
1306 "Argument to CollectionReader is not a manifest or a collection UUID")
1310 except (IOError, errors.SyntaxError) as e:
1311 raise errors.ArgumentError("Error processing manifest text: %s", e)
1313 def root_collection(self):
1316 def get_properties(self):
1317 if self._api_response and self._api_response["properties"]:
1318 return self._api_response["properties"]
1322 def get_trash_at(self):
1323 if self._api_response and self._api_response["trash_at"]:
1324 return ciso8601.parse_datetime(self._api_response["trash_at"])
1328 def stream_name(self):
1335 def known_past_version(self, modified_at_and_portable_data_hash):
1336 return modified_at_and_portable_data_hash in self._past_versions
1340 def update(self, other=None, num_retries=None):
1341 """Merge the latest collection on the API server with the current collection."""
1344 if self._manifest_locator is None:
1345 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1346 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1347 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1348 response.get("portable_data_hash") != self.portable_data_hash()):
1349 # The record on the server is different from our current one, but we've seen it before,
1350 # so ignore it because it's already been merged.
1351 # However, if it's the same as our current record, proceed with the update, because we want to update
1355 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1356 other = CollectionReader(response["manifest_text"])
1357 baseline = CollectionReader(self._manifest_text)
1358 self.apply(baseline.diff(other))
1359 self._manifest_text = self.manifest_text()
1363 if self._api_client is None:
1364 self._api_client = ThreadSafeApiCache(self._config)
1365 if self._keep_client is None:
1366 self._keep_client = self._api_client.keep
1367 return self._api_client
1371 if self._keep_client is None:
1372 if self._api_client is None:
1375 self._keep_client = KeepClient(api_client=self._api_client)
1376 return self._keep_client
1379 def _my_block_manager(self):
1380 if self._block_manager is None:
1381 copies = (self.replication_desired or
1382 self._my_api()._rootDesc.get('defaultCollectionReplication',
1384 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1385 return self._block_manager
1387 def _remember_api_response(self, response):
1388 self._api_response = response
1389 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1391 def _populate_from_api_server(self):
1392 # As in KeepClient itself, we must wait until the last
1393 # possible moment to instantiate an API client, in order to
1394 # avoid tripping up clients that don't have access to an API
1395 # server. If we do build one, make sure our Keep client uses
1396 # it. If instantiation fails, we'll fall back to the except
1397 # clause, just like any other Collection lookup
1398 # failure. Return an exception, or None if successful.
1399 self._remember_api_response(self._my_api().collections().get(
1400 uuid=self._manifest_locator).execute(
1401 num_retries=self.num_retries))
1402 self._manifest_text = self._api_response['manifest_text']
1403 self._portable_data_hash = self._api_response['portable_data_hash']
1404 # If not overriden via kwargs, we should try to load the
1405 # replication_desired from the API server
1406 if self.replication_desired is None:
1407 self.replication_desired = self._api_response.get('replication_desired', None)
1409 def _populate(self):
1410 if self._manifest_text is None:
1411 if self._manifest_locator is None:
1414 self._populate_from_api_server()
1415 self._baseline_manifest = self._manifest_text
1416 self._import_manifest(self._manifest_text)
1418 def _has_collection_uuid(self):
1419 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1421 def _has_local_collection_uuid(self):
1422 return self._has_collection_uuid and \
1423 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1425 def __enter__(self):
1428 def __exit__(self, exc_type, exc_value, traceback):
1429 """Support scoped auto-commit in a with: block."""
1430 if exc_type is None:
1431 if self.writable() and self._has_collection_uuid():
1435 def stop_threads(self):
1436 if self._block_manager is not None:
1437 self._block_manager.stop_threads()
1440 def manifest_locator(self):
1441 """Get the manifest locator, if any.
1443 The manifest locator will be set when the collection is loaded from an
1444 API server record or the portable data hash of a manifest.
1446 The manifest locator will be None if the collection is newly created or
1447 was created directly from manifest text. The method `save_new()` will
1448 assign a manifest locator.
1451 return self._manifest_locator
1454 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1455 if new_config is None:
1456 new_config = self._config
1458 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1460 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1462 newcollection._clonefrom(self)
1463 return newcollection
1466 def api_response(self):
1467 """Returns information about this Collection fetched from the API server.
1469 If the Collection exists in Keep but not the API server, currently
1470 returns None. Future versions may provide a synthetic response.
1473 return self._api_response
1475 def find_or_create(self, path, create_type):
1476 """See `RichCollectionBase.find_or_create`"""
1480 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1482 def find(self, path):
1483 """See `RichCollectionBase.find`"""
1487 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1489 def remove(self, path, recursive=False):
1490 """See `RichCollectionBase.remove`"""
1492 raise errors.ArgumentError("Cannot remove '.'")
1494 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1501 storage_classes=None,
1505 """Save collection to an existing collection record.
1507 Commit pending buffer blocks to Keep, merge with remote record (if
1508 merge=True, the default), and update the collection record. Returns
1509 the current manifest text.
1511 Will raise AssertionError if not associated with a collection record on
1512 the API server. If you want to save a manifest to Keep only, see
1516 Additional properties of collection. This value will replace any existing
1517 properties of collection.
1520 Specify desirable storage classes to be used when writing data to Keep.
1523 A collection is *expiring* when it has a *trash_at* time in the future.
1524 An expiring collection can be accessed as normal,
1525 but is scheduled to be trashed automatically at the *trash_at* time.
1528 Update and merge remote changes before saving. Otherwise, any
1529 remote changes will be ignored and overwritten.
1532 Retry count on API calls (if None, use the collection default)
1535 if properties and type(properties) is not dict:
1536 raise errors.ArgumentError("properties must be dictionary type.")
1538 if storage_classes and type(storage_classes) is not list:
1539 raise errors.ArgumentError("storage_classes must be list type.")
1541 if trash_at and type(trash_at) is not datetime.datetime:
1542 raise errors.ArgumentError("trash_at must be datetime type.")
1546 body["properties"] = properties
1548 body["storage_classes_desired"] = storage_classes
1550 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1551 body["trash_at"] = t
1553 if not self.committed():
1554 if self._has_remote_blocks:
1555 # Copy any remote blocks to the local cluster.
1556 self._copy_remote_blocks(remote_blocks={})
1557 self._has_remote_blocks = False
1558 if not self._has_collection_uuid():
1559 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1560 elif not self._has_local_collection_uuid():
1561 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1563 self._my_block_manager().commit_all()
1568 text = self.manifest_text(strip=False)
1569 body['manifest_text'] = text
1571 self._remember_api_response(self._my_api().collections().update(
1572 uuid=self._manifest_locator,
1574 ).execute(num_retries=num_retries))
1575 self._manifest_text = self._api_response["manifest_text"]
1576 self._portable_data_hash = self._api_response["portable_data_hash"]
1577 self.set_committed(True)
1579 self._remember_api_response(self._my_api().collections().update(
1580 uuid=self._manifest_locator,
1582 ).execute(num_retries=num_retries))
1584 return self._manifest_text
1590 def save_new(self, name=None,
1591 create_collection_record=True,
1594 storage_classes=None,
1596 ensure_unique_name=False,
1598 """Save collection to a new collection record.
1600 Commit pending buffer blocks to Keep and, when create_collection_record
1601 is True (default), create a new collection record. After creating a
1602 new collection record, this Collection object will be associated with
1603 the new record used by `save()`. Returns the current manifest text.
1606 The collection name.
1608 :create_collection_record:
1609 If True, create a collection record on the API server.
1610 If False, only commit blocks to Keep and return the manifest text.
1613 the user, or project uuid that will own this collection.
1614 If None, defaults to the current user.
1617 Additional properties of collection. This value will replace any existing
1618 properties of collection.
1621 Specify desirable storage classes to be used when writing data to Keep.
1624 A collection is *expiring* when it has a *trash_at* time in the future.
1625 An expiring collection can be accessed as normal,
1626 but is scheduled to be trashed automatically at the *trash_at* time.
1628 :ensure_unique_name:
1629 If True, ask the API server to rename the collection
1630 if it conflicts with a collection with the same name and owner. If
1631 False, a name conflict will result in an error.
1634 Retry count on API calls (if None, use the collection default)
1637 if properties and type(properties) is not dict:
1638 raise errors.ArgumentError("properties must be dictionary type.")
1640 if storage_classes and type(storage_classes) is not list:
1641 raise errors.ArgumentError("storage_classes must be list type.")
1643 if trash_at and type(trash_at) is not datetime.datetime:
1644 raise errors.ArgumentError("trash_at must be datetime type.")
1646 if self._has_remote_blocks:
1647 # Copy any remote blocks to the local cluster.
1648 self._copy_remote_blocks(remote_blocks={})
1649 self._has_remote_blocks = False
1651 self._my_block_manager().commit_all()
1652 text = self.manifest_text(strip=False)
1654 if create_collection_record:
1656 name = "New collection"
1657 ensure_unique_name = True
1659 body = {"manifest_text": text,
1661 "replication_desired": self.replication_desired}
1663 body["owner_uuid"] = owner_uuid
1665 body["properties"] = properties
1667 body["storage_classes_desired"] = storage_classes
1669 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1670 body["trash_at"] = t
1672 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1673 text = self._api_response["manifest_text"]
1675 self._manifest_locator = self._api_response["uuid"]
1676 self._portable_data_hash = self._api_response["portable_data_hash"]
1678 self._manifest_text = text
1679 self.set_committed(True)
1683 _token_re = re.compile(r'(\S+)(\s+|$)')
1684 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1685 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1687 def _unescape_manifest_path(self, path):
1688 return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1691 def _import_manifest(self, manifest_text):
1692 """Import a manifest into a `Collection`.
1695 The manifest text to import from.
1699 raise ArgumentError("Can only import manifest into an empty collection")
1708 for token_and_separator in self._token_re.finditer(manifest_text):
1709 tok = token_and_separator.group(1)
1710 sep = token_and_separator.group(2)
1712 if state == STREAM_NAME:
1713 # starting a new stream
1714 stream_name = self._unescape_manifest_path(tok)
1719 self.find_or_create(stream_name, COLLECTION)
1723 block_locator = self._block_re.match(tok)
1725 blocksize = int(block_locator.group(1))
1726 blocks.append(Range(tok, streamoffset, blocksize, 0))
1727 streamoffset += blocksize
1731 if state == SEGMENTS:
1732 file_segment = self._segment_re.match(tok)
1734 pos = int(file_segment.group(1))
1735 size = int(file_segment.group(2))
1736 name = self._unescape_manifest_path(file_segment.group(3))
1737 if name.split('/')[-1] == '.':
1738 # placeholder for persisting an empty directory, not a real file
1740 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1742 filepath = os.path.join(stream_name, name)
1743 afile = self.find_or_create(filepath, FILE)
1744 if isinstance(afile, ArvadosFile):
1745 afile.add_segment(blocks, pos, size)
1747 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1750 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1756 self.set_committed(True)
1759 def notify(self, event, collection, name, item):
1761 self._callback(event, collection, name, item)
1764 class Subcollection(RichCollectionBase):
1765 """This is a subdirectory within a collection that doesn't have its own API
1768 Subcollection locking falls under the umbrella lock of its root collection.
1772 def __init__(self, parent, name):
1773 super(Subcollection, self).__init__(parent)
1774 self.lock = self.root_collection().lock
1775 self._manifest_text = None
1777 self.num_retries = parent.num_retries
1779 def root_collection(self):
1780 return self.parent.root_collection()
1783 return self.root_collection().writable()
1786 return self.root_collection()._my_api()
1789 return self.root_collection()._my_keep()
1791 def _my_block_manager(self):
1792 return self.root_collection()._my_block_manager()
1794 def stream_name(self):
1795 return os.path.join(self.parent.stream_name(), self.name)
1798 def clone(self, new_parent, new_name):
1799 c = Subcollection(new_parent, new_name)
1805 def _reparent(self, newparent, newname):
1806 self.set_committed(False)
1808 self.parent.remove(self.name, recursive=True)
1809 self.parent = newparent
1811 self.lock = self.parent.root_collection().lock
1814 class CollectionReader(Collection):
1815 """A read-only collection object.
1817 Initialize from a collection UUID or portable data hash, or raw
1818 manifest text. See `Collection` constructor for detailed options.
1821 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1822 self._in_init = True
1823 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1824 self._in_init = False
1826 # Forego any locking since it should never change once initialized.
1827 self.lock = NoopLock()
1829 # Backwards compatability with old CollectionReader
1830 # all_streams() and all_files()
1831 self._streams = None
1834 return self._in_init
1836 def _populate_streams(orig_func):
1837 @functools.wraps(orig_func)
1838 def populate_streams_wrapper(self, *args, **kwargs):
1839 # Defer populating self._streams until needed since it creates a copy of the manifest.
1840 if self._streams is None:
1841 if self._manifest_text:
1842 self._streams = [sline.split()
1843 for sline in self._manifest_text.split("\n")
1847 return orig_func(self, *args, **kwargs)
1848 return populate_streams_wrapper
1851 def normalize(self):
1852 """Normalize the streams returned by `all_streams`.
1854 This method is kept for backwards compatability and only affects the
1855 behavior of `all_streams()` and `all_files()`
1861 for s in self.all_streams():
1862 for f in s.all_files():
1863 streamname, filename = split(s.name() + "/" + f.name())
1864 if streamname not in streams:
1865 streams[streamname] = {}
1866 if filename not in streams[streamname]:
1867 streams[streamname][filename] = []
1868 for r in f.segments:
1869 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1871 self._streams = [normalize_stream(s, streams[s])
1872 for s in sorted(streams)]
1874 def all_streams(self):
1875 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1876 for s in self._streams]
1879 def all_files(self):
1880 for s in self.all_streams():
1881 for f in s.all_files():