1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
28 from collections import deque
31 from ._internal import streams
32 from .api import ThreadSafeAPIClient
33 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
34 from .keep import KeepLocator, KeepClient
35 import arvados.config as config
36 import arvados.errors as errors
38 import arvados.events as events
39 from arvados.retry import retry_method
54 if sys.version_info < (3, 8):
55 from typing_extensions import Literal
57 from typing import Literal
59 _logger = logging.getLogger('arvados.collection')
62 """Argument value for `Collection` methods to represent an added item"""
64 """Argument value for `Collection` methods to represent a removed item"""
66 """Argument value for `Collection` methods to represent a modified item"""
68 """Argument value for `Collection` methods to represent an item with token differences"""
70 """`create_type` value for `Collection.find_or_create`"""
71 COLLECTION = "collection"
72 """`create_type` value for `Collection.find_or_create`"""
74 ChangeList = List[Union[
75 Tuple[Literal[ADD, DEL], str, 'Collection'],
76 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
78 ChangeType = Literal[ADD, DEL, MOD, TOK]
79 CollectionItem = Union[ArvadosFile, 'Collection']
80 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
81 CreateType = Literal[COLLECTION, FILE]
82 Properties = Dict[str, Any]
83 StorageClasses = List[str]
85 class CollectionBase(object):
86 """Abstract base class for Collection classes
88 .. ATTENTION:: Internal
89 This class is meant to be used by other parts of the SDK. User code
90 should instantiate or subclass `Collection` or one of its subclasses
95 """Enter a context block with this collection instance"""
98 def __exit__(self, exc_type, exc_value, traceback):
99 """Exit a context block with this collection instance"""
103 if self._keep_client is None:
104 self._keep_client = KeepClient(api_client=self._api_client,
105 num_retries=self.num_retries)
106 return self._keep_client
108 def stripped_manifest(self) -> str:
109 """Create a copy of the collection manifest with only size hints
111 This method returns a string with the current collection's manifest
112 text with all non-portable locator hints like permission hints and
113 remote cluster hints removed. The only hints in the returned manifest
116 raw = self.manifest_text()
118 for line in raw.split("\n"):
119 fields = line.split()
121 clean_fields = fields[:1] + [
122 (re.sub(r'\+[^\d][^\+]*', '', x)
123 if re.match(arvados.util.keep_locator_pattern, x)
126 clean += [' '.join(clean_fields), "\n"]
127 return ''.join(clean)
130 class _WriterFile(_FileLikeObjectBase):
131 def __init__(self, coll_writer, name):
132 super(_WriterFile, self).__init__(name, 'wb')
133 self.dest = coll_writer
136 super(_WriterFile, self).close()
137 self.dest.finish_current_file()
139 @_FileLikeObjectBase._before_close
140 def write(self, data):
141 self.dest.write(data)
143 @_FileLikeObjectBase._before_close
144 def writelines(self, seq):
148 @_FileLikeObjectBase._before_close
150 self.dest.flush_data()
153 class RichCollectionBase(CollectionBase):
154 """Base class for Collection classes
156 .. ATTENTION:: Internal
157 This class is meant to be used by other parts of the SDK. User code
158 should instantiate or subclass `Collection` or one of its subclasses
162 def __init__(self, parent=None):
164 self._committed = False
165 self._has_remote_blocks = False
166 self._callback = None
170 raise NotImplementedError()
173 raise NotImplementedError()
175 def _my_block_manager(self):
176 raise NotImplementedError()
178 def writable(self) -> bool:
179 """Indicate whether this collection object can be modified
181 This method returns `False` if this object is a `CollectionReader`,
184 raise NotImplementedError()
186 def root_collection(self) -> 'Collection':
187 """Get this collection's root collection object
189 If you open a subcollection with `Collection.find`, calling this method
190 on that subcollection returns the source Collection object.
192 raise NotImplementedError()
194 def stream_name(self) -> str:
195 """Get the name of the manifest stream represented by this collection
197 If you open a subcollection with `Collection.find`, calling this method
198 on that subcollection returns the name of the stream you opened.
200 raise NotImplementedError()
203 def has_remote_blocks(self) -> bool:
204 """Indiciate whether the collection refers to remote data
206 Returns `True` if the collection manifest includes any Keep locators
207 with a remote hint (`+R`), else `False`.
209 if self._has_remote_blocks:
212 if self[item].has_remote_blocks():
217 def set_has_remote_blocks(self, val: bool) -> None:
218 """Cache whether this collection refers to remote blocks
220 .. ATTENTION:: Internal
221 This method is only meant to be used by other Collection methods.
223 Set this collection's cached "has remote blocks" flag to the given
226 self._has_remote_blocks = val
228 self.parent.set_has_remote_blocks(val)
235 create_type: CreateType,
237 """Get the item at the given path, creating it if necessary
239 If `path` refers to a stream in this collection, returns a
240 corresponding `Subcollection` object. If `path` refers to a file in
241 this collection, returns a corresponding
242 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
243 this collection, then this method creates a new object and returns
244 it, creating parent streams as needed. The type of object created is
245 determined by the value of `create_type`.
249 * path: str --- The path to find or create within this collection.
251 * create_type: Literal[COLLECTION, FILE] --- The type of object to
252 create at `path` if one does not exist. Passing `COLLECTION`
253 creates a stream and returns the corresponding
254 `Subcollection`. Passing `FILE` creates a new file and returns the
255 corresponding `arvados.arvfile.ArvadosFile`.
257 pathcomponents = path.split("/", 1)
258 if pathcomponents[0]:
259 item = self._items.get(pathcomponents[0])
260 if len(pathcomponents) == 1:
263 if create_type == COLLECTION:
264 item = Subcollection(self, pathcomponents[0])
266 item = ArvadosFile(self, pathcomponents[0])
267 self._items[pathcomponents[0]] = item
268 self.set_committed(False)
269 self.notify(ADD, self, pathcomponents[0], item)
273 # create new collection
274 item = Subcollection(self, pathcomponents[0])
275 self._items[pathcomponents[0]] = item
276 self.set_committed(False)
277 self.notify(ADD, self, pathcomponents[0], item)
278 if isinstance(item, RichCollectionBase):
279 return item.find_or_create(pathcomponents[1], create_type)
281 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
286 def find(self, path: str) -> CollectionItem:
287 """Get the item at the given path
289 If `path` refers to a stream in this collection, returns a
290 corresponding `Subcollection` object. If `path` refers to a file in
291 this collection, returns a corresponding
292 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
293 this collection, then this method raises `NotADirectoryError`.
297 * path: str --- The path to find or create within this collection.
300 raise errors.ArgumentError("Parameter 'path' is empty.")
302 pathcomponents = path.split("/", 1)
303 if pathcomponents[0] == '':
304 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
306 item = self._items.get(pathcomponents[0])
309 elif len(pathcomponents) == 1:
312 if isinstance(item, RichCollectionBase):
313 if pathcomponents[1]:
314 return item.find(pathcomponents[1])
318 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
321 def mkdirs(self, path: str) -> 'Subcollection':
322 """Create and return a subcollection at `path`
324 If `path` exists within this collection, raises `FileExistsError`.
325 Otherwise, creates a stream at that path and returns the
326 corresponding `Subcollection`.
328 if self.find(path) != None:
329 raise IOError(errno.EEXIST, "Directory or file exists", path)
331 return self.find_or_create(path, COLLECTION)
337 encoding: Optional[str]=None
339 """Open a file-like object within the collection
341 This method returns a file-like object that can read and/or write the
342 file located at `path` within the collection. If you attempt to write
343 a `path` that does not exist, the file is created with `find_or_create`.
344 If the file cannot be opened for any other reason, this method raises
345 `OSError` with an appropriate errno.
349 * path: str --- The path of the file to open within this collection
351 * mode: str --- The mode to open this file. Supports all the same
352 values as `builtins.open`.
354 * encoding: str | None --- The text encoding of the file. Only used
355 when the file is opened in text mode. The default is
359 if not re.search(r'^[rwa][bt]?\+?$', mode):
360 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
362 if mode[0] == 'r' and '+' not in mode:
363 fclass = ArvadosFileReader
364 arvfile = self.find(path)
365 elif not self.writable():
366 raise IOError(errno.EROFS, "Collection is read only")
368 fclass = ArvadosFileWriter
369 arvfile = self.find_or_create(path, FILE)
372 raise IOError(errno.ENOENT, "File not found", path)
373 if not isinstance(arvfile, ArvadosFile):
374 raise IOError(errno.EISDIR, "Is a directory", path)
379 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
380 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
382 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
383 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
386 def modified(self) -> bool:
387 """Indicate whether this collection has an API server record
389 Returns `False` if this collection corresponds to a record loaded from
390 the API server, `True` otherwise.
392 return not self.committed()
396 """Indicate whether this collection has an API server record
398 Returns `True` if this collection corresponds to a record loaded from
399 the API server, `False` otherwise.
401 return self._committed
404 def set_committed(self, value: bool=True):
405 """Cache whether this collection has an API server record
407 .. ATTENTION:: Internal
408 This method is only meant to be used by other Collection methods.
410 Set this collection's cached "committed" flag to the given
411 value and propagates it as needed.
413 if value == self._committed:
416 for k,v in self._items.items():
417 v.set_committed(True)
418 self._committed = True
420 self._committed = False
421 if self.parent is not None:
422 self.parent.set_committed(False)
425 def __iter__(self) -> Iterator[str]:
426 """Iterate names of streams and files in this collection
428 This method does not recurse. It only iterates the contents of this
429 collection's corresponding stream.
431 return iter(self._items)
434 def __getitem__(self, k: str) -> CollectionItem:
435 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
437 This method does not recurse. If you want to search a path, use
438 `RichCollectionBase.find` instead.
440 return self._items[k]
443 def __contains__(self, k: str) -> bool:
444 """Indicate whether this collection has an item with this name
446 This method does not recurse. It you want to check a path, use
447 `RichCollectionBase.exists` instead.
449 return k in self._items
453 """Get the number of items directly contained in this collection
455 This method does not recurse. It only counts the streams and files
456 in this collection's corresponding stream.
458 return len(self._items)
462 def __delitem__(self, p: str) -> None:
463 """Delete an item from this collection's stream
465 This method does not recurse. If you want to remove an item by a
466 path, use `RichCollectionBase.remove` instead.
469 self.set_committed(False)
470 self.notify(DEL, self, p, None)
473 def keys(self) -> Iterator[str]:
474 """Iterate names of streams and files in this collection
476 This method does not recurse. It only iterates the contents of this
477 collection's corresponding stream.
479 return self._items.keys()
482 def values(self) -> List[CollectionItem]:
483 """Get a list of objects in this collection's stream
485 The return value includes a `Subcollection` for every stream, and an
486 `arvados.arvfile.ArvadosFile` for every file, directly within this
487 collection's stream. This method does not recurse.
489 return list(self._items.values())
492 def items(self) -> List[Tuple[str, CollectionItem]]:
493 """Get a list of `(name, object)` tuples from this collection's stream
495 The return value includes a `Subcollection` for every stream, and an
496 `arvados.arvfile.ArvadosFile` for every file, directly within this
497 collection's stream. This method does not recurse.
499 return list(self._items.items())
501 def exists(self, path: str) -> bool:
502 """Indicate whether this collection includes an item at `path`
504 This method returns `True` if `path` refers to a stream or file within
505 this collection, else `False`.
509 * path: str --- The path to check for existence within this collection
511 return self.find(path) is not None
515 def remove(self, path: str, recursive: bool=False) -> None:
516 """Remove the file or stream at `path`
520 * path: str --- The path of the item to remove from the collection
522 * recursive: bool --- Controls the method's behavior if `path` refers
523 to a nonempty stream. If `False` (the default), this method raises
524 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
525 items under the stream.
528 raise errors.ArgumentError("Parameter 'path' is empty.")
530 pathcomponents = path.split("/", 1)
531 item = self._items.get(pathcomponents[0])
533 raise IOError(errno.ENOENT, "File not found", path)
534 if len(pathcomponents) == 1:
535 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
536 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
537 deleteditem = self._items[pathcomponents[0]]
538 del self._items[pathcomponents[0]]
539 self.set_committed(False)
540 self.notify(DEL, self, pathcomponents[0], deleteditem)
542 item.remove(pathcomponents[1], recursive=recursive)
544 def _clonefrom(self, source):
545 for k,v in source.items():
546 self._items[k] = v.clone(self, k)
549 raise NotImplementedError()
555 source_obj: CollectionItem,
557 overwrite: bool=False,
558 reparent: bool=False,
560 """Copy or move a file or subcollection object to this collection
564 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
565 to add to this collection
567 * target_name: str --- The path inside this collection where
568 `source_obj` should be added.
570 * overwrite: bool --- Controls the behavior of this method when the
571 collection already contains an object at `target_name`. If `False`
572 (the default), this method will raise `FileExistsError`. If `True`,
573 the object at `target_name` will be replaced with `source_obj`.
575 * reparent: bool --- Controls whether this method copies or moves
576 `source_obj`. If `False` (the default), `source_obj` is copied into
577 this collection. If `True`, `source_obj` is moved into this
580 if target_name in self and not overwrite:
581 raise IOError(errno.EEXIST, "File already exists", target_name)
584 if target_name in self:
585 modified_from = self[target_name]
587 # Actually make the move or copy.
589 source_obj._reparent(self, target_name)
592 item = source_obj.clone(self, target_name)
594 self._items[target_name] = item
595 self.set_committed(False)
596 if not self._has_remote_blocks and source_obj.has_remote_blocks():
597 self.set_has_remote_blocks(True)
600 self.notify(MOD, self, target_name, (modified_from, item))
602 self.notify(ADD, self, target_name, item)
604 def _get_src_target(self, source, target_path, source_collection, create_dest):
605 if source_collection is None:
606 source_collection = self
609 if isinstance(source, str):
610 source_obj = source_collection.find(source)
611 if source_obj is None:
612 raise IOError(errno.ENOENT, "File not found", source)
613 sourcecomponents = source.split("/")
616 sourcecomponents = None
618 # Find parent collection the target path
619 targetcomponents = target_path.split("/")
621 # Determine the name to use.
622 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
625 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
628 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
630 if len(targetcomponents) > 1:
631 target_dir = self.find("/".join(targetcomponents[0:-1]))
635 if target_dir is None:
636 raise IOError(errno.ENOENT, "Target directory not found", target_name)
638 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
639 target_dir = target_dir[target_name]
640 target_name = sourcecomponents[-1]
642 return (source_obj, target_dir, target_name)
648 source: Union[str, CollectionItem],
650 source_collection: Optional['RichCollectionBase']=None,
651 overwrite: bool=False,
653 """Copy a file or subcollection object to this collection
657 * source: str | arvados.arvfile.ArvadosFile |
658 arvados.collection.Subcollection --- The file or subcollection to
659 add to this collection. If `source` is a str, the object will be
660 found by looking up this path from `source_collection` (see
663 * target_path: str --- The path inside this collection where the
664 source object should be added.
666 * source_collection: arvados.collection.Collection | None --- The
667 collection to find the source object from when `source` is a
668 path. Defaults to the current collection (`self`).
670 * overwrite: bool --- Controls the behavior of this method when the
671 collection already contains an object at `target_path`. If `False`
672 (the default), this method will raise `FileExistsError`. If `True`,
673 the object at `target_path` will be replaced with `source_obj`.
675 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
676 target_dir.add(source_obj, target_name, overwrite, False)
682 source: Union[str, CollectionItem],
684 source_collection: Optional['RichCollectionBase']=None,
685 overwrite: bool=False,
687 """Move a file or subcollection object to this collection
691 * source: str | arvados.arvfile.ArvadosFile |
692 arvados.collection.Subcollection --- The file or subcollection to
693 add to this collection. If `source` is a str, the object will be
694 found by looking up this path from `source_collection` (see
697 * target_path: str --- The path inside this collection where the
698 source object should be added.
700 * source_collection: arvados.collection.Collection | None --- The
701 collection to find the source object from when `source` is a
702 path. Defaults to the current collection (`self`).
704 * overwrite: bool --- Controls the behavior of this method when the
705 collection already contains an object at `target_path`. If `False`
706 (the default), this method will raise `FileExistsError`. If `True`,
707 the object at `target_path` will be replaced with `source_obj`.
709 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
710 if not source_obj.writable():
711 raise IOError(errno.EROFS, "Source collection is read only", source)
712 target_dir.add(source_obj, target_name, overwrite, True)
714 def portable_manifest_text(self, stream_name: str=".") -> str:
715 """Get the portable manifest text for this collection
717 The portable manifest text is normalized, and does not include access
718 tokens. This method does not flush outstanding blocks to Keep.
722 * stream_name: str --- The name to use for this collection's stream in
723 the generated manifest. Default `'.'`.
725 return self._get_manifest_text(stream_name, True, True)
730 stream_name: str=".",
732 normalize: bool=False,
733 only_committed: bool=False,
735 """Get the manifest text for this collection
739 * stream_name: str --- The name to use for this collection's stream in
740 the generated manifest. Default `'.'`.
742 * strip: bool --- Controls whether or not the returned manifest text
743 includes access tokens. If `False` (the default), the manifest text
744 will include access tokens. If `True`, the manifest text will not
745 include access tokens.
747 * normalize: bool --- Controls whether or not the returned manifest
748 text is normalized. Default `False`.
750 * only_committed: bool --- Controls whether or not this method uploads
751 pending data to Keep before building and returning the manifest text.
752 If `False` (the default), this method will finish uploading all data
753 to Keep, then return the final manifest. If `True`, this method will
754 build and return a manifest that only refers to the data that has
755 finished uploading at the time this method was called.
757 if not only_committed:
758 self._my_block_manager().commit_all()
759 return self._get_manifest_text(stream_name, strip, normalize,
760 only_committed=only_committed)
763 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
764 """Get the manifest text for this collection, sub collections and files.
767 Name to use for this stream (directory)
770 If True, remove signing tokens from block locators if present.
771 If False (default), block locators are left unchanged.
774 If True, always export the manifest text in normalized form
775 even if the Collection is not modified. If False (default) and the collection
776 is not modified, return the original manifest text even if it is not
780 If True, only include blocks that were already committed to Keep.
784 if not self.committed() or self._manifest_text is None or normalize:
787 sorted_keys = sorted(self.keys())
788 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
789 # Create a stream per file `k`
790 arvfile = self[filename]
792 for segment in arvfile.segments():
793 loc = segment.locator
794 if arvfile.parent._my_block_manager().is_bufferblock(loc):
797 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
799 loc = KeepLocator(loc).stripped()
800 filestream.append(streams.LocatorAndRange(
802 KeepLocator(loc).size,
803 segment.segment_offset,
806 stream[filename] = filestream
808 buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n")
809 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
810 buf.append(self[dirname].manifest_text(
811 stream_name=os.path.join(stream_name, dirname),
812 strip=strip, normalize=True, only_committed=only_committed))
816 return self.stripped_manifest()
818 return self._manifest_text
821 def _copy_remote_blocks(self, remote_blocks={}):
822 """Scan through the entire collection and ask Keep to copy remote blocks.
824 When accessing a remote collection, blocks will have a remote signature
825 (+R instead of +A). Collect these signatures and request Keep to copy the
826 blocks to the local cluster, returning local (+A) signatures.
829 Shared cache of remote to local block mappings. This is used to avoid
830 doing extra work when blocks are shared by more than one file in
831 different subdirectories.
835 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
841 end_collection: 'RichCollectionBase',
843 holding_collection: Optional['Collection']=None,
845 """Build a list of differences between this collection and another
849 * end_collection: arvados.collection.RichCollectionBase --- A
850 collection object with the desired end state. The returned diff
851 list will describe how to go from the current collection object
852 `self` to `end_collection`.
854 * prefix: str --- The name to use for this collection's stream in
855 the diff list. Default `'.'`.
857 * holding_collection: arvados.collection.Collection | None --- A
858 collection object used to hold objects for the returned diff
859 list. By default, a new empty collection is created.
862 if holding_collection is None:
863 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
865 if k not in end_collection:
866 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
867 for k in end_collection:
869 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
870 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
871 elif end_collection[k] != self[k]:
872 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
874 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
876 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
881 def apply(self, changes: ChangeList) -> None:
882 """Apply a list of changes from to this collection
884 This method takes a list of changes generated by
885 `RichCollectionBase.diff` and applies it to this
886 collection. Afterward, the state of this collection object will
887 match the state of `end_collection` passed to `diff`. If a change
888 conflicts with a local change, it will be saved to an alternate path
889 indicating the conflict.
893 * changes: arvados.collection.ChangeList --- The list of differences
894 generated by `RichCollectionBase.diff`.
897 self.set_committed(False)
898 for change in changes:
899 event_type = change[0]
902 local = self.find(path)
903 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
905 if event_type == ADD:
907 # No local file at path, safe to copy over new file
908 self.copy(initial, path)
909 elif local is not None and local != initial:
910 # There is already local file and it is different:
911 # save change to conflict file.
912 self.copy(initial, conflictpath)
913 elif event_type == MOD or event_type == TOK:
916 # Local matches the "initial" item so it has not
917 # changed locally and is safe to update.
918 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
919 # Replace contents of local file with new contents
920 local.replace_contents(final)
922 # Overwrite path with new item; this can happen if
923 # path was a file and is now a collection or vice versa
924 self.copy(final, path, overwrite=True)
926 # Local is missing (presumably deleted) or local doesn't
927 # match the "start" value, so save change to conflict file
928 self.copy(final, conflictpath)
929 elif event_type == DEL:
931 # Local item matches "initial" value, so it is safe to remove.
932 self.remove(path, recursive=True)
933 # else, the file is modified or already removed, in either
934 # case we don't want to try to remove it.
936 def portable_data_hash(self) -> str:
937 """Get the portable data hash for this collection's manifest"""
938 if self._manifest_locator and self.committed():
939 # If the collection is already saved on the API server, and it's committed
940 # then return API server's PDH response.
941 return self._portable_data_hash
943 stripped = self.portable_manifest_text().encode()
944 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
947 def subscribe(self, callback: ChangeCallback) -> None:
948 """Set a notify callback for changes to this collection
952 * callback: arvados.collection.ChangeCallback --- The callable to
953 call each time the collection is changed.
955 if self._callback is None:
956 self._callback = callback
958 raise errors.ArgumentError("A callback is already set on this collection.")
961 def unsubscribe(self) -> None:
962 """Remove any notify callback set for changes to this collection"""
963 if self._callback is not None:
964 self._callback = None
970 collection: 'RichCollectionBase',
972 item: CollectionItem,
974 """Notify any subscribed callback about a change to this collection
976 .. ATTENTION:: Internal
977 This method is only meant to be used by other Collection methods.
979 If a callback has been registered with `RichCollectionBase.subscribe`,
980 it will be called with information about a change to this collection.
981 Then this notification will be propagated to this collection's root.
985 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
988 * collection: arvados.collection.RichCollectionBase --- The
989 collection that was modified.
991 * name: str --- The name of the file or stream within `collection` that
994 * item: arvados.arvfile.ArvadosFile |
995 arvados.collection.Subcollection --- The new contents at `name`
999 self._callback(event, collection, name, item)
1000 self.root_collection().notify(event, collection, name, item)
1003 def __eq__(self, other: Any) -> bool:
1004 """Indicate whether this collection object is equal to another"""
1007 if not isinstance(other, RichCollectionBase):
1009 if len(self._items) != len(other):
1011 for k in self._items:
1014 if self._items[k] != other[k]:
1018 def __ne__(self, other: Any) -> bool:
1019 """Indicate whether this collection object is not equal to another"""
1020 return not self.__eq__(other)
1023 def flush(self) -> None:
1024 """Upload any pending data to Keep"""
1025 for e in self.values():
1029 class Collection(RichCollectionBase):
1030 """Read and manipulate an Arvados collection
1032 This class provides a high-level interface to create, read, and update
1033 Arvados collections and their contents. Refer to the Arvados Python SDK
1034 cookbook for [an introduction to using the Collection class][cookbook].
1036 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1039 def __init__(self, manifest_locator_or_text: Optional[str]=None,
1040 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1041 keep_client: Optional['arvados.keep.KeepClient']=None,
1042 num_retries: int=10,
1043 parent: Optional['Collection']=None,
1044 apiconfig: Optional[Mapping[str, str]]=None,
1045 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1046 replication_desired: Optional[int]=None,
1047 storage_classes_desired: Optional[List[str]]=None,
1048 put_threads: Optional[int]=None):
1049 """Initialize a Collection object
1053 * manifest_locator_or_text: str | None --- This string can contain a
1054 collection manifest text, portable data hash, or UUID. When given a
1055 portable data hash or UUID, this instance will load a collection
1056 record from the API server. Otherwise, this instance will represent a
1057 new collection without an API server record. The default value `None`
1058 instantiates a new collection with an empty manifest.
1060 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1061 Arvados API client object this instance uses to make requests. If
1062 none is given, this instance creates its own client using the
1063 settings from `apiconfig` (see below). If your client instantiates
1064 many Collection objects, you can help limit memory utilization by
1065 calling `arvados.api.api` to construct an
1066 `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1067 for every Collection.
1069 * keep_client: arvados.keep.KeepClient | None --- The Keep client
1070 object this instance uses to make requests. If none is given, this
1071 instance creates its own client using its `api_client`.
1073 * num_retries: int --- The number of times that client requests are
1074 retried. Default 10.
1076 * parent: arvados.collection.Collection | None --- The parent Collection
1077 object of this instance, if any. This argument is primarily used by
1078 other Collection methods; user client code shouldn't need to use it.
1080 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1081 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1082 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1083 Collection object constructs one from these settings. If no
1084 mapping is provided, calls `arvados.config.settings` to get these
1085 parameters from user configuration.
1087 * block_manager: arvados.arvfile._BlockManager | None --- The
1088 _BlockManager object used by this instance to coordinate reading
1089 and writing Keep data blocks. If none is given, this instance
1090 constructs its own. This argument is primarily used by other
1091 Collection methods; user client code shouldn't need to use it.
1093 * replication_desired: int | None --- This controls both the value of
1094 the `replication_desired` field on API collection records saved by
1095 this class, as well as the number of Keep services that the object
1096 writes new data blocks to. If none is given, uses the default value
1097 configured for the cluster.
1099 * storage_classes_desired: list[str] | None --- This controls both
1100 the value of the `storage_classes_desired` field on API collection
1101 records saved by this class, as well as selecting which specific
1102 Keep services the object writes new data blocks to. If none is
1103 given, defaults to an empty list.
1105 * put_threads: int | None --- The number of threads to run
1106 simultaneously to upload data blocks to Keep. This value is used when
1107 building a new `block_manager`. It is unused when a `block_manager`
1111 if storage_classes_desired and type(storage_classes_desired) is not list:
1112 raise errors.ArgumentError("storage_classes_desired must be list type.")
1114 super(Collection, self).__init__(parent)
1115 self._api_client = api_client
1116 self._keep_client = keep_client
1118 # Use the keep client from ThreadSafeAPIClient
1119 if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1120 self._keep_client = self._api_client.keep
1122 self._block_manager = block_manager
1123 self.replication_desired = replication_desired
1124 self._storage_classes_desired = storage_classes_desired
1125 self.put_threads = put_threads
1128 self._config = apiconfig
1130 self._config = config.settings()
1132 self.num_retries = num_retries
1133 self._manifest_locator = None
1134 self._manifest_text = None
1135 self._portable_data_hash = None
1136 self._api_response = None
1137 self._past_versions = set()
1139 self.lock = threading.RLock()
1142 if manifest_locator_or_text:
1143 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1144 self._manifest_locator = manifest_locator_or_text
1145 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1146 self._manifest_locator = manifest_locator_or_text
1147 if not self._has_local_collection_uuid():
1148 self._has_remote_blocks = True
1149 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1150 self._manifest_text = manifest_locator_or_text
1151 if '+R' in self._manifest_text:
1152 self._has_remote_blocks = True
1154 raise errors.ArgumentError(
1155 "Argument to CollectionReader is not a manifest or a collection UUID")
1159 except errors.SyntaxError as e:
1160 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1162 def storage_classes_desired(self) -> List[str]:
1163 """Get this collection's `storage_classes_desired` value"""
1164 return self._storage_classes_desired or []
1166 def root_collection(self) -> 'Collection':
1169 def get_properties(self) -> Properties:
1170 """Get this collection's properties
1172 This method always returns a dict. If this collection object does not
1173 have an associated API record, or that record does not have any
1174 properties set, this method returns an empty dict.
1176 if self._api_response and self._api_response["properties"]:
1177 return self._api_response["properties"]
1181 def get_trash_at(self) -> Optional[datetime.datetime]:
1182 """Get this collection's `trash_at` field
1184 This method parses the `trash_at` field of the collection's API
1185 record and returns a datetime from it. If that field is not set, or
1186 this collection object does not have an associated API record,
1189 if self._api_response and self._api_response["trash_at"]:
1191 return ciso8601.parse_datetime(self._api_response["trash_at"])
1197 def stream_name(self) -> str:
1200 def writable(self) -> bool:
1204 def known_past_version(
1206 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1208 """Indicate whether an API record for this collection has been seen before
1210 As this collection object loads records from the API server, it records
1211 their `modified_at` and `portable_data_hash` fields. This method accepts
1212 a 2-tuple with values for those fields, and returns `True` if the
1213 combination was previously loaded.
1215 return modified_at_and_portable_data_hash in self._past_versions
1221 other: Optional['Collection']=None,
1222 num_retries: Optional[int]=None,
1224 """Merge another collection's contents into this one
1226 This method compares the manifest of this collection instance with
1227 another, then updates this instance's manifest with changes from the
1228 other, renaming files to flag conflicts where necessary.
1230 When called without any arguments, this method reloads the collection's
1231 API record, and updates this instance with any changes that have
1232 appeared server-side. If this instance does not have a corresponding
1233 API record, this method raises `arvados.errors.ArgumentError`.
1237 * other: arvados.collection.Collection | None --- The collection
1238 whose contents should be merged into this instance. When not
1239 provided, this method reloads this collection's API record and
1240 constructs a Collection object from it. If this instance does not
1241 have a corresponding API record, this method raises
1242 `arvados.errors.ArgumentError`.
1244 * num_retries: int | None --- The number of times to retry reloading
1245 the collection's API record from the API server. If not specified,
1246 uses the `num_retries` provided when this instance was constructed.
1249 if self._manifest_locator is None:
1250 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1251 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1252 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1253 response.get("portable_data_hash") != self.portable_data_hash()):
1254 # The record on the server is different from our current one, but we've seen it before,
1255 # so ignore it because it's already been merged.
1256 # However, if it's the same as our current record, proceed with the update, because we want to update
1260 self._remember_api_response(response)
1261 other = CollectionReader(response["manifest_text"])
1262 baseline = CollectionReader(self._manifest_text)
1263 self.apply(baseline.diff(other))
1264 self._manifest_text = self.manifest_text()
1268 if self._api_client is None:
1269 self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1270 if self._keep_client is None:
1271 self._keep_client = self._api_client.keep
1272 return self._api_client
1276 if self._keep_client is None:
1277 if self._api_client is None:
1280 self._keep_client = KeepClient(api_client=self._api_client)
1281 return self._keep_client
1284 def _my_block_manager(self):
1285 if self._block_manager is None:
1286 copies = (self.replication_desired or
1287 self._my_api()._rootDesc.get('defaultCollectionReplication',
1289 self._block_manager = _BlockManager(self._my_keep(),
1291 put_threads=self.put_threads,
1292 num_retries=self.num_retries,
1293 storage_classes_func=self.storage_classes_desired)
1294 return self._block_manager
1296 def _remember_api_response(self, response):
1297 self._api_response = response
1298 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1300 def _populate_from_api_server(self):
1301 # As in KeepClient itself, we must wait until the last
1302 # possible moment to instantiate an API client, in order to
1303 # avoid tripping up clients that don't have access to an API
1304 # server. If we do build one, make sure our Keep client uses
1305 # it. If instantiation fails, we'll fall back to the except
1306 # clause, just like any other Collection lookup
1307 # failure. Return an exception, or None if successful.
1308 self._remember_api_response(self._my_api().collections().get(
1309 uuid=self._manifest_locator).execute(
1310 num_retries=self.num_retries))
1311 self._manifest_text = self._api_response['manifest_text']
1312 self._portable_data_hash = self._api_response['portable_data_hash']
1313 # If not overriden via kwargs, we should try to load the
1314 # replication_desired and storage_classes_desired from the API server
1315 if self.replication_desired is None:
1316 self.replication_desired = self._api_response.get('replication_desired', None)
1317 if self._storage_classes_desired is None:
1318 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1320 def _populate(self):
1321 if self._manifest_text is None:
1322 if self._manifest_locator is None:
1325 self._populate_from_api_server()
1326 self._baseline_manifest = self._manifest_text
1327 self._import_manifest(self._manifest_text)
1329 def _has_collection_uuid(self):
1330 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1332 def _has_local_collection_uuid(self):
1333 return self._has_collection_uuid and \
1334 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1336 def __enter__(self):
1339 def __exit__(self, exc_type, exc_value, traceback):
1340 """Exit a context with this collection instance
1342 If no exception was raised inside the context block, and this
1343 collection is writable and has a corresponding API record, that
1344 record will be updated to match the state of this instance at the end
1347 if exc_type is None:
1348 if self.writable() and self._has_collection_uuid():
1352 def stop_threads(self) -> None:
1353 """Stop background Keep upload/download threads"""
1354 if self._block_manager is not None:
1355 self._block_manager.stop_threads()
1358 def manifest_locator(self) -> Optional[str]:
1359 """Get this collection's manifest locator, if any
1361 * If this collection instance is associated with an API record with a
1363 * Otherwise, if this collection instance was loaded from an API record
1364 by portable data hash, return that.
1365 * Otherwise, return `None`.
1367 return self._manifest_locator
1372 new_parent: Optional['Collection']=None,
1373 new_name: Optional[str]=None,
1374 readonly: bool=False,
1375 new_config: Optional[Mapping[str, str]]=None,
1377 """Create a Collection object with the same contents as this instance
1379 This method creates a new Collection object with contents that match
1380 this instance's. The new collection will not be associated with any API
1385 * new_parent: arvados.collection.Collection | None --- This value is
1386 passed to the new Collection's constructor as the `parent`
1389 * new_name: str | None --- This value is unused.
1391 * readonly: bool --- If this value is true, this method constructs and
1392 returns a `CollectionReader`. Otherwise, it returns a mutable
1393 `Collection`. Default `False`.
1395 * new_config: Mapping[str, str] | None --- This value is passed to the
1396 new Collection's constructor as `apiconfig`. If no value is provided,
1397 defaults to the configuration passed to this instance's constructor.
1399 if new_config is None:
1400 new_config = self._config
1402 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1404 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1406 newcollection._clonefrom(self)
1407 return newcollection
1410 def api_response(self) -> Optional[Dict[str, Any]]:
1411 """Get this instance's associated API record
1413 If this Collection instance has an associated API record, return it.
1414 Otherwise, return `None`.
1416 return self._api_response
1421 create_type: CreateType,
1422 ) -> CollectionItem:
1426 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1428 def find(self, path: str) -> CollectionItem:
1432 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1434 def remove(self, path: str, recursive: bool=False) -> None:
1436 raise errors.ArgumentError("Cannot remove '.'")
1438 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1445 properties: Optional[Properties]=None,
1446 storage_classes: Optional[StorageClasses]=None,
1447 trash_at: Optional[datetime.datetime]=None,
1449 num_retries: Optional[int]=None,
1450 preserve_version: bool=False,
1452 """Save collection to an existing API record
1454 This method updates the instance's corresponding API record to match
1455 the instance's state. If this instance does not have a corresponding API
1456 record yet, raises `AssertionError`. (To create a new API record, use
1457 `Collection.save_new`.) This method returns the saved collection
1462 * properties: dict[str, Any] | None --- If provided, the API record will
1463 be updated with these properties. Note this will completely replace
1464 any existing properties.
1466 * storage_classes: list[str] | None --- If provided, the API record will
1467 be updated with this value in the `storage_classes_desired` field.
1468 This value will also be saved on the instance and used for any
1469 changes that follow.
1471 * trash_at: datetime.datetime | None --- If provided, the API record
1472 will be updated with this value in the `trash_at` field.
1474 * merge: bool --- If `True` (the default), this method will first
1475 reload this collection's API record, and merge any new contents into
1476 this instance before saving changes. See `Collection.update` for
1479 * num_retries: int | None --- The number of times to retry reloading
1480 the collection's API record from the API server. If not specified,
1481 uses the `num_retries` provided when this instance was constructed.
1483 * preserve_version: bool --- This value will be passed to directly
1484 to the underlying API call. If `True`, the Arvados API will
1485 preserve the versions of this collection both immediately before
1486 and after the update. If `True` when the API server is not
1487 configured with collection versioning, this method raises
1488 `arvados.errors.ArgumentError`.
1490 if properties and type(properties) is not dict:
1491 raise errors.ArgumentError("properties must be dictionary type.")
1493 if storage_classes and type(storage_classes) is not list:
1494 raise errors.ArgumentError("storage_classes must be list type.")
1496 self._storage_classes_desired = storage_classes
1498 if trash_at and type(trash_at) is not datetime.datetime:
1499 raise errors.ArgumentError("trash_at must be datetime type.")
1501 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1502 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1506 body["properties"] = properties
1507 if self.storage_classes_desired():
1508 body["storage_classes_desired"] = self.storage_classes_desired()
1510 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1511 body["trash_at"] = t
1512 if preserve_version:
1513 body["preserve_version"] = preserve_version
1515 if not self.committed():
1516 if self._has_remote_blocks:
1517 # Copy any remote blocks to the local cluster.
1518 self._copy_remote_blocks(remote_blocks={})
1519 self._has_remote_blocks = False
1520 if not self._has_collection_uuid():
1521 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1522 elif not self._has_local_collection_uuid():
1523 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1525 self._my_block_manager().commit_all()
1530 text = self.manifest_text(strip=False)
1531 body['manifest_text'] = text
1533 self._remember_api_response(self._my_api().collections().update(
1534 uuid=self._manifest_locator,
1536 ).execute(num_retries=num_retries))
1537 self._manifest_text = self._api_response["manifest_text"]
1538 self._portable_data_hash = self._api_response["portable_data_hash"]
1539 self.set_committed(True)
1541 self._remember_api_response(self._my_api().collections().update(
1542 uuid=self._manifest_locator,
1544 ).execute(num_retries=num_retries))
1546 return self._manifest_text
1554 name: Optional[str]=None,
1555 create_collection_record: bool=True,
1556 owner_uuid: Optional[str]=None,
1557 properties: Optional[Properties]=None,
1558 storage_classes: Optional[StorageClasses]=None,
1559 trash_at: Optional[datetime.datetime]=None,
1560 ensure_unique_name: bool=False,
1561 num_retries: Optional[int]=None,
1562 preserve_version: bool=False,
1564 """Save collection to a new API record
1566 This method finishes uploading new data blocks and (optionally)
1567 creates a new API collection record with the provided data. If a new
1568 record is created, this instance becomes associated with that record
1569 for future updates like `save()`. This method returns the saved
1570 collection manifest.
1574 * name: str | None --- The `name` field to use on the new collection
1575 record. If not specified, a generic default name is generated.
1577 * create_collection_record: bool --- If `True` (the default), creates a
1578 collection record on the API server. If `False`, the method finishes
1579 all data uploads and only returns the resulting collection manifest
1580 without sending it to the API server.
1582 * owner_uuid: str | None --- The `owner_uuid` field to use on the
1583 new collection record.
1585 * properties: dict[str, Any] | None --- The `properties` field to use on
1586 the new collection record.
1588 * storage_classes: list[str] | None --- The
1589 `storage_classes_desired` field to use on the new collection record.
1591 * trash_at: datetime.datetime | None --- The `trash_at` field to use
1592 on the new collection record.
1594 * ensure_unique_name: bool --- This value is passed directly to the
1595 Arvados API when creating the collection record. If `True`, the API
1596 server may modify the submitted `name` to ensure the collection's
1597 `name`+`owner_uuid` combination is unique. If `False` (the default),
1598 if a collection already exists with this same `name`+`owner_uuid`
1599 combination, creating a collection record will raise a validation
1602 * num_retries: int | None --- The number of times to retry reloading
1603 the collection's API record from the API server. If not specified,
1604 uses the `num_retries` provided when this instance was constructed.
1606 * preserve_version: bool --- This value will be passed to directly
1607 to the underlying API call. If `True`, the Arvados API will
1608 preserve the versions of this collection both immediately before
1609 and after the update. If `True` when the API server is not
1610 configured with collection versioning, this method raises
1611 `arvados.errors.ArgumentError`.
1613 if properties and type(properties) is not dict:
1614 raise errors.ArgumentError("properties must be dictionary type.")
1616 if storage_classes and type(storage_classes) is not list:
1617 raise errors.ArgumentError("storage_classes must be list type.")
1619 if trash_at and type(trash_at) is not datetime.datetime:
1620 raise errors.ArgumentError("trash_at must be datetime type.")
1622 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1623 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1625 if self._has_remote_blocks:
1626 # Copy any remote blocks to the local cluster.
1627 self._copy_remote_blocks(remote_blocks={})
1628 self._has_remote_blocks = False
1631 self._storage_classes_desired = storage_classes
1633 self._my_block_manager().commit_all()
1634 text = self.manifest_text(strip=False)
1636 if create_collection_record:
1638 name = "New collection"
1639 ensure_unique_name = True
1641 body = {"manifest_text": text,
1643 "replication_desired": self.replication_desired}
1645 body["owner_uuid"] = owner_uuid
1647 body["properties"] = properties
1648 if self.storage_classes_desired():
1649 body["storage_classes_desired"] = self.storage_classes_desired()
1651 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1652 body["trash_at"] = t
1653 if preserve_version:
1654 body["preserve_version"] = preserve_version
1656 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1657 text = self._api_response["manifest_text"]
1659 self._manifest_locator = self._api_response["uuid"]
1660 self._portable_data_hash = self._api_response["portable_data_hash"]
1662 self._manifest_text = text
1663 self.set_committed(True)
1667 _token_re = re.compile(r'(\S+)(\s+|$)')
1668 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1669 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1671 def _unescape_manifest_path(self, path):
1672 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1675 def _import_manifest(self, manifest_text):
1676 """Import a manifest into a `Collection`.
1679 The manifest text to import from.
1683 raise ArgumentError("Can only import manifest into an empty collection")
1692 for token_and_separator in self._token_re.finditer(manifest_text):
1693 tok = token_and_separator.group(1)
1694 sep = token_and_separator.group(2)
1696 if state == STREAM_NAME:
1697 # starting a new stream
1698 stream_name = self._unescape_manifest_path(tok)
1703 self.find_or_create(stream_name, COLLECTION)
1707 block_locator = self._block_re.match(tok)
1709 blocksize = int(block_locator.group(1))
1710 blocks.append(streams.Range(tok, streamoffset, blocksize, 0))
1711 streamoffset += blocksize
1715 if state == SEGMENTS:
1716 file_segment = self._segment_re.match(tok)
1718 pos = int(file_segment.group(1))
1719 size = int(file_segment.group(2))
1720 name = self._unescape_manifest_path(file_segment.group(3))
1721 if name.split('/')[-1] == '.':
1722 # placeholder for persisting an empty directory, not a real file
1724 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1726 filepath = os.path.join(stream_name, name)
1728 afile = self.find_or_create(filepath, FILE)
1729 except IOError as e:
1730 if e.errno == errno.ENOTDIR:
1731 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1734 if isinstance(afile, ArvadosFile):
1735 afile.add_segment(blocks, pos, size)
1737 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1740 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1746 self.set_committed(True)
1752 collection: 'RichCollectionBase',
1754 item: CollectionItem,
1757 self._callback(event, collection, name, item)
1760 class Subcollection(RichCollectionBase):
1761 """Read and manipulate a stream/directory within an Arvados collection
1763 This class represents a single stream (like a directory) within an Arvados
1764 `Collection`. It is returned by `Collection.find` and provides the same API.
1765 Operations that work on the API collection record propagate to the parent
1766 `Collection` object.
1769 def __init__(self, parent, name):
1770 super(Subcollection, self).__init__(parent)
1771 self.lock = self.root_collection().lock
1772 self._manifest_text = None
1774 self.num_retries = parent.num_retries
1776 def root_collection(self) -> 'Collection':
1777 return self.parent.root_collection()
1779 def writable(self) -> bool:
1780 return self.root_collection().writable()
1783 return self.root_collection()._my_api()
1786 return self.root_collection()._my_keep()
1788 def _my_block_manager(self):
1789 return self.root_collection()._my_block_manager()
1791 def stream_name(self) -> str:
1792 return os.path.join(self.parent.stream_name(), self.name)
1797 new_parent: Optional['Collection']=None,
1798 new_name: Optional[str]=None,
1799 ) -> 'Subcollection':
1800 c = Subcollection(new_parent, new_name)
1806 def _reparent(self, newparent, newname):
1807 self.set_committed(False)
1809 self.parent.remove(self.name, recursive=True)
1810 self.parent = newparent
1812 self.lock = self.parent.root_collection().lock
1815 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1816 """Encode empty directories by using an \056-named (".") empty file"""
1817 if len(self._items) == 0:
1818 return "%s %s 0:0:\\056\n" % (
1819 streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1820 return super(Subcollection, self)._get_manifest_text(stream_name,
1825 class CollectionReader(Collection):
1826 """Read-only `Collection` subclass
1828 This class will never create or update any API collection records. You can
1829 use this class for additional code safety when you only need to read
1830 existing collections.
1832 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1833 self._in_init = True
1834 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1835 self._in_init = False
1837 # Forego any locking since it should never change once initialized.
1838 self.lock = NoopLock()
1840 # Backwards compatability with old CollectionReader
1841 # all_streams() and all_files()
1842 self._streams = None
1844 def writable(self) -> bool:
1845 return self._in_init