1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
28 from collections import deque
31 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
32 from .keep import KeepLocator, KeepClient
33 from ._normalize_stream import normalize_stream, escape
34 from ._ranges import Range, LocatorAndRange
35 from .safeapi import ThreadSafeApiCache
36 import arvados.config as config
37 import arvados.errors as errors
39 import arvados.events as events
40 from arvados.retry import retry_method
55 if sys.version_info < (3, 8):
56 from typing_extensions import Literal
58 from typing import Literal
60 _logger = logging.getLogger('arvados.collection')
63 """Argument value for `Collection` methods to represent an added item"""
65 """Argument value for `Collection` methods to represent a removed item"""
67 """Argument value for `Collection` methods to represent a modified item"""
69 """Argument value for `Collection` methods to represent an item with token differences"""
71 """`create_type` value for `Collection.find_or_create`"""
72 COLLECTION = "collection"
73 """`create_type` value for `Collection.find_or_create`"""
75 ChangeList = List[Union[
76 Tuple[Literal[ADD, DEL], str, 'Collection'],
77 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
79 ChangeType = Literal[ADD, DEL, MOD, TOK]
80 CollectionItem = Union[ArvadosFile, 'Collection']
81 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
82 CreateType = Literal[COLLECTION, FILE]
83 Properties = Dict[str, Any]
84 StorageClasses = List[str]
86 class CollectionBase(object):
87 """Abstract base class for Collection classes
89 .. ATTENTION:: Internal
90 This class is meant to be used by other parts of the SDK. User code
91 should instantiate or subclass `Collection` or one of its subclasses
96 """Enter a context block with this collection instance"""
99 def __exit__(self, exc_type, exc_value, traceback):
100 """Exit a context block with this collection instance"""
104 if self._keep_client is None:
105 self._keep_client = KeepClient(api_client=self._api_client,
106 num_retries=self.num_retries)
107 return self._keep_client
109 def stripped_manifest(self) -> str:
110 """Create a copy of the collection manifest with only size hints
112 This method returns a string with the current collection's manifest
113 text with all non-portable locator hints like permission hints and
114 remote cluster hints removed. The only hints in the returned manifest
117 raw = self.manifest_text()
119 for line in raw.split("\n"):
120 fields = line.split()
122 clean_fields = fields[:1] + [
123 (re.sub(r'\+[^\d][^\+]*', '', x)
124 if re.match(arvados.util.keep_locator_pattern, x)
127 clean += [' '.join(clean_fields), "\n"]
128 return ''.join(clean)
131 class _WriterFile(_FileLikeObjectBase):
132 def __init__(self, coll_writer, name):
133 super(_WriterFile, self).__init__(name, 'wb')
134 self.dest = coll_writer
137 super(_WriterFile, self).close()
138 self.dest.finish_current_file()
140 @_FileLikeObjectBase._before_close
141 def write(self, data):
142 self.dest.write(data)
144 @_FileLikeObjectBase._before_close
145 def writelines(self, seq):
149 @_FileLikeObjectBase._before_close
151 self.dest.flush_data()
154 class RichCollectionBase(CollectionBase):
155 """Base class for Collection classes
157 .. ATTENTION:: Internal
158 This class is meant to be used by other parts of the SDK. User code
159 should instantiate or subclass `Collection` or one of its subclasses
163 def __init__(self, parent=None):
165 self._committed = False
166 self._has_remote_blocks = False
167 self._callback = None
171 raise NotImplementedError()
174 raise NotImplementedError()
176 def _my_block_manager(self):
177 raise NotImplementedError()
179 def writable(self) -> bool:
180 """Indicate whether this collection object can be modified
182 This method returns `False` if this object is a `CollectionReader`,
185 raise NotImplementedError()
187 def root_collection(self) -> 'Collection':
188 """Get this collection's root collection object
190 If you open a subcollection with `Collection.find`, calling this method
191 on that subcollection returns the source Collection object.
193 raise NotImplementedError()
195 def stream_name(self) -> str:
196 """Get the name of the manifest stream represented by this collection
198 If you open a subcollection with `Collection.find`, calling this method
199 on that subcollection returns the name of the stream you opened.
201 raise NotImplementedError()
204 def has_remote_blocks(self) -> bool:
205 """Indiciate whether the collection refers to remote data
207 Returns `True` if the collection manifest includes any Keep locators
208 with a remote hint (`+R`), else `False`.
210 if self._has_remote_blocks:
213 if self[item].has_remote_blocks():
218 def set_has_remote_blocks(self, val: bool) -> None:
219 """Cache whether this collection refers to remote blocks
221 .. ATTENTION:: Internal
222 This method is only meant to be used by other Collection methods.
224 Set this collection's cached "has remote blocks" flag to the given
227 self._has_remote_blocks = val
229 self.parent.set_has_remote_blocks(val)
236 create_type: CreateType,
238 """Get the item at the given path, creating it if necessary
240 If `path` refers to a stream in this collection, returns a
241 corresponding `Subcollection` object. If `path` refers to a file in
242 this collection, returns a corresponding
243 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
244 this collection, then this method creates a new object and returns
245 it, creating parent streams as needed. The type of object created is
246 determined by the value of `create_type`.
250 * path: str --- The path to find or create within this collection.
252 * create_type: Literal[COLLECTION, FILE] --- The type of object to
253 create at `path` if one does not exist. Passing `COLLECTION`
254 creates a stream and returns the corresponding
255 `Subcollection`. Passing `FILE` creates a new file and returns the
256 corresponding `arvados.arvfile.ArvadosFile`.
258 pathcomponents = path.split("/", 1)
259 if pathcomponents[0]:
260 item = self._items.get(pathcomponents[0])
261 if len(pathcomponents) == 1:
264 if create_type == COLLECTION:
265 item = Subcollection(self, pathcomponents[0])
267 item = ArvadosFile(self, pathcomponents[0])
268 self._items[pathcomponents[0]] = item
269 self.set_committed(False)
270 self.notify(ADD, self, pathcomponents[0], item)
274 # create new collection
275 item = Subcollection(self, pathcomponents[0])
276 self._items[pathcomponents[0]] = item
277 self.set_committed(False)
278 self.notify(ADD, self, pathcomponents[0], item)
279 if isinstance(item, RichCollectionBase):
280 return item.find_or_create(pathcomponents[1], create_type)
282 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
287 def find(self, path: str) -> CollectionItem:
288 """Get the item at the given path
290 If `path` refers to a stream in this collection, returns a
291 corresponding `Subcollection` object. If `path` refers to a file in
292 this collection, returns a corresponding
293 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
294 this collection, then this method raises `NotADirectoryError`.
298 * path: str --- The path to find or create within this collection.
301 raise errors.ArgumentError("Parameter 'path' is empty.")
303 pathcomponents = path.split("/", 1)
304 if pathcomponents[0] == '':
305 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
307 item = self._items.get(pathcomponents[0])
310 elif len(pathcomponents) == 1:
313 if isinstance(item, RichCollectionBase):
314 if pathcomponents[1]:
315 return item.find(pathcomponents[1])
319 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
322 def mkdirs(self, path: str) -> 'Subcollection':
323 """Create and return a subcollection at `path`
325 If `path` exists within this collection, raises `FileExistsError`.
326 Otherwise, creates a stream at that path and returns the
327 corresponding `Subcollection`.
329 if self.find(path) != None:
330 raise IOError(errno.EEXIST, "Directory or file exists", path)
332 return self.find_or_create(path, COLLECTION)
338 encoding: Optional[str]=None
340 """Open a file-like object within the collection
342 This method returns a file-like object that can read and/or write the
343 file located at `path` within the collection. If you attempt to write
344 a `path` that does not exist, the file is created with `find_or_create`.
345 If the file cannot be opened for any other reason, this method raises
346 `OSError` with an appropriate errno.
350 * path: str --- The path of the file to open within this collection
352 * mode: str --- The mode to open this file. Supports all the same
353 values as `builtins.open`.
355 * encoding: str | None --- The text encoding of the file. Only used
356 when the file is opened in text mode. The default is
360 if not re.search(r'^[rwa][bt]?\+?$', mode):
361 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
363 if mode[0] == 'r' and '+' not in mode:
364 fclass = ArvadosFileReader
365 arvfile = self.find(path)
366 elif not self.writable():
367 raise IOError(errno.EROFS, "Collection is read only")
369 fclass = ArvadosFileWriter
370 arvfile = self.find_or_create(path, FILE)
373 raise IOError(errno.ENOENT, "File not found", path)
374 if not isinstance(arvfile, ArvadosFile):
375 raise IOError(errno.EISDIR, "Is a directory", path)
380 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
381 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
383 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
384 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
387 def modified(self) -> bool:
388 """Indicate whether this collection has an API server record
390 Returns `False` if this collection corresponds to a record loaded from
391 the API server, `True` otherwise.
393 return not self.committed()
397 """Indicate whether this collection has an API server record
399 Returns `True` if this collection corresponds to a record loaded from
400 the API server, `False` otherwise.
402 return self._committed
405 def set_committed(self, value: bool=True):
406 """Cache whether this collection has an API server record
408 .. ATTENTION:: Internal
409 This method is only meant to be used by other Collection methods.
411 Set this collection's cached "committed" flag to the given
412 value and propagates it as needed.
414 if value == self._committed:
417 for k,v in self._items.items():
418 v.set_committed(True)
419 self._committed = True
421 self._committed = False
422 if self.parent is not None:
423 self.parent.set_committed(False)
426 def __iter__(self) -> Iterator[str]:
427 """Iterate names of streams and files in this collection
429 This method does not recurse. It only iterates the contents of this
430 collection's corresponding stream.
432 return iter(self._items)
435 def __getitem__(self, k: str) -> CollectionItem:
436 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
438 This method does not recurse. If you want to search a path, use
439 `RichCollectionBase.find` instead.
441 return self._items[k]
444 def __contains__(self, k: str) -> bool:
445 """Indicate whether this collection has an item with this name
447 This method does not recurse. It you want to check a path, use
448 `RichCollectionBase.exists` instead.
450 return k in self._items
454 """Get the number of items directly contained in this collection
456 This method does not recurse. It only counts the streams and files
457 in this collection's corresponding stream.
459 return len(self._items)
463 def __delitem__(self, p: str) -> None:
464 """Delete an item from this collection's stream
466 This method does not recurse. If you want to remove an item by a
467 path, use `RichCollectionBase.remove` instead.
470 self.set_committed(False)
471 self.notify(DEL, self, p, None)
474 def keys(self) -> Iterator[str]:
475 """Iterate names of streams and files in this collection
477 This method does not recurse. It only iterates the contents of this
478 collection's corresponding stream.
480 return self._items.keys()
483 def values(self) -> List[CollectionItem]:
484 """Get a list of objects in this collection's stream
486 The return value includes a `Subcollection` for every stream, and an
487 `arvados.arvfile.ArvadosFile` for every file, directly within this
488 collection's stream. This method does not recurse.
490 return list(self._items.values())
493 def items(self) -> List[Tuple[str, CollectionItem]]:
494 """Get a list of `(name, object)` tuples from this collection's stream
496 The return value includes a `Subcollection` for every stream, and an
497 `arvados.arvfile.ArvadosFile` for every file, directly within this
498 collection's stream. This method does not recurse.
500 return list(self._items.items())
502 def exists(self, path: str) -> bool:
503 """Indicate whether this collection includes an item at `path`
505 This method returns `True` if `path` refers to a stream or file within
506 this collection, else `False`.
510 * path: str --- The path to check for existence within this collection
512 return self.find(path) is not None
516 def remove(self, path: str, recursive: bool=False) -> None:
517 """Remove the file or stream at `path`
521 * path: str --- The path of the item to remove from the collection
523 * recursive: bool --- Controls the method's behavior if `path` refers
524 to a nonempty stream. If `False` (the default), this method raises
525 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
526 items under the stream.
529 raise errors.ArgumentError("Parameter 'path' is empty.")
531 pathcomponents = path.split("/", 1)
532 item = self._items.get(pathcomponents[0])
534 raise IOError(errno.ENOENT, "File not found", path)
535 if len(pathcomponents) == 1:
536 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
537 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
538 deleteditem = self._items[pathcomponents[0]]
539 del self._items[pathcomponents[0]]
540 self.set_committed(False)
541 self.notify(DEL, self, pathcomponents[0], deleteditem)
543 item.remove(pathcomponents[1], recursive=recursive)
545 def _clonefrom(self, source):
546 for k,v in source.items():
547 self._items[k] = v.clone(self, k)
550 raise NotImplementedError()
556 source_obj: CollectionItem,
558 overwrite: bool=False,
559 reparent: bool=False,
561 """Copy or move a file or subcollection object to this collection
565 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
566 to add to this collection
568 * target_name: str --- The path inside this collection where
569 `source_obj` should be added.
571 * overwrite: bool --- Controls the behavior of this method when the
572 collection already contains an object at `target_name`. If `False`
573 (the default), this method will raise `FileExistsError`. If `True`,
574 the object at `target_name` will be replaced with `source_obj`.
576 * reparent: bool --- Controls whether this method copies or moves
577 `source_obj`. If `False` (the default), `source_obj` is copied into
578 this collection. If `True`, `source_obj` is moved into this
581 if target_name in self and not overwrite:
582 raise IOError(errno.EEXIST, "File already exists", target_name)
585 if target_name in self:
586 modified_from = self[target_name]
588 # Actually make the move or copy.
590 source_obj._reparent(self, target_name)
593 item = source_obj.clone(self, target_name)
595 self._items[target_name] = item
596 self.set_committed(False)
597 if not self._has_remote_blocks and source_obj.has_remote_blocks():
598 self.set_has_remote_blocks(True)
601 self.notify(MOD, self, target_name, (modified_from, item))
603 self.notify(ADD, self, target_name, item)
605 def _get_src_target(self, source, target_path, source_collection, create_dest):
606 if source_collection is None:
607 source_collection = self
610 if isinstance(source, str):
611 source_obj = source_collection.find(source)
612 if source_obj is None:
613 raise IOError(errno.ENOENT, "File not found", source)
614 sourcecomponents = source.split("/")
617 sourcecomponents = None
619 # Find parent collection the target path
620 targetcomponents = target_path.split("/")
622 # Determine the name to use.
623 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
626 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
629 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
631 if len(targetcomponents) > 1:
632 target_dir = self.find("/".join(targetcomponents[0:-1]))
636 if target_dir is None:
637 raise IOError(errno.ENOENT, "Target directory not found", target_name)
639 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
640 target_dir = target_dir[target_name]
641 target_name = sourcecomponents[-1]
643 return (source_obj, target_dir, target_name)
649 source: Union[str, CollectionItem],
651 source_collection: Optional['RichCollectionBase']=None,
652 overwrite: bool=False,
654 """Copy a file or subcollection object to this collection
658 * source: str | arvados.arvfile.ArvadosFile |
659 arvados.collection.Subcollection --- The file or subcollection to
660 add to this collection. If `source` is a str, the object will be
661 found by looking up this path from `source_collection` (see
664 * target_path: str --- The path inside this collection where the
665 source object should be added.
667 * source_collection: arvados.collection.Collection | None --- The
668 collection to find the source object from when `source` is a
669 path. Defaults to the current collection (`self`).
671 * overwrite: bool --- Controls the behavior of this method when the
672 collection already contains an object at `target_path`. If `False`
673 (the default), this method will raise `FileExistsError`. If `True`,
674 the object at `target_path` will be replaced with `source_obj`.
676 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
677 target_dir.add(source_obj, target_name, overwrite, False)
683 source: Union[str, CollectionItem],
685 source_collection: Optional['RichCollectionBase']=None,
686 overwrite: bool=False,
688 """Move a file or subcollection object to this collection
692 * source: str | arvados.arvfile.ArvadosFile |
693 arvados.collection.Subcollection --- The file or subcollection to
694 add to this collection. If `source` is a str, the object will be
695 found by looking up this path from `source_collection` (see
698 * target_path: str --- The path inside this collection where the
699 source object should be added.
701 * source_collection: arvados.collection.Collection | None --- The
702 collection to find the source object from when `source` is a
703 path. Defaults to the current collection (`self`).
705 * overwrite: bool --- Controls the behavior of this method when the
706 collection already contains an object at `target_path`. If `False`
707 (the default), this method will raise `FileExistsError`. If `True`,
708 the object at `target_path` will be replaced with `source_obj`.
710 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
711 if not source_obj.writable():
712 raise IOError(errno.EROFS, "Source collection is read only", source)
713 target_dir.add(source_obj, target_name, overwrite, True)
715 def portable_manifest_text(self, stream_name: str=".") -> str:
716 """Get the portable manifest text for this collection
718 The portable manifest text is normalized, and does not include access
719 tokens. This method does not flush outstanding blocks to Keep.
723 * stream_name: str --- The name to use for this collection's stream in
724 the generated manifest. Default `'.'`.
726 return self._get_manifest_text(stream_name, True, True)
731 stream_name: str=".",
733 normalize: bool=False,
734 only_committed: bool=False,
736 """Get the manifest text for this collection
740 * stream_name: str --- The name to use for this collection's stream in
741 the generated manifest. Default `'.'`.
743 * strip: bool --- Controls whether or not the returned manifest text
744 includes access tokens. If `False` (the default), the manifest text
745 will include access tokens. If `True`, the manifest text will not
746 include access tokens.
748 * normalize: bool --- Controls whether or not the returned manifest
749 text is normalized. Default `False`.
751 * only_committed: bool --- Controls whether or not this method uploads
752 pending data to Keep before building and returning the manifest text.
753 If `False` (the default), this method will finish uploading all data
754 to Keep, then return the final manifest. If `True`, this method will
755 build and return a manifest that only refers to the data that has
756 finished uploading at the time this method was called.
758 if not only_committed:
759 self._my_block_manager().commit_all()
760 return self._get_manifest_text(stream_name, strip, normalize,
761 only_committed=only_committed)
764 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
765 """Get the manifest text for this collection, sub collections and files.
768 Name to use for this stream (directory)
771 If True, remove signing tokens from block locators if present.
772 If False (default), block locators are left unchanged.
775 If True, always export the manifest text in normalized form
776 even if the Collection is not modified. If False (default) and the collection
777 is not modified, return the original manifest text even if it is not
781 If True, only include blocks that were already committed to Keep.
785 if not self.committed() or self._manifest_text is None or normalize:
788 sorted_keys = sorted(self.keys())
789 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
790 # Create a stream per file `k`
791 arvfile = self[filename]
793 for segment in arvfile.segments():
794 loc = segment.locator
795 if arvfile.parent._my_block_manager().is_bufferblock(loc):
798 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
800 loc = KeepLocator(loc).stripped()
801 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
802 segment.segment_offset, segment.range_size))
803 stream[filename] = filestream
805 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
806 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
807 buf.append(self[dirname].manifest_text(
808 stream_name=os.path.join(stream_name, dirname),
809 strip=strip, normalize=True, only_committed=only_committed))
813 return self.stripped_manifest()
815 return self._manifest_text
818 def _copy_remote_blocks(self, remote_blocks={}):
819 """Scan through the entire collection and ask Keep to copy remote blocks.
821 When accessing a remote collection, blocks will have a remote signature
822 (+R instead of +A). Collect these signatures and request Keep to copy the
823 blocks to the local cluster, returning local (+A) signatures.
826 Shared cache of remote to local block mappings. This is used to avoid
827 doing extra work when blocks are shared by more than one file in
828 different subdirectories.
832 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
838 end_collection: 'RichCollectionBase',
840 holding_collection: Optional['Collection']=None,
842 """Build a list of differences between this collection and another
846 * end_collection: arvados.collection.RichCollectionBase --- A
847 collection object with the desired end state. The returned diff
848 list will describe how to go from the current collection object
849 `self` to `end_collection`.
851 * prefix: str --- The name to use for this collection's stream in
852 the diff list. Default `'.'`.
854 * holding_collection: arvados.collection.Collection | None --- A
855 collection object used to hold objects for the returned diff
856 list. By default, a new empty collection is created.
859 if holding_collection is None:
860 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
862 if k not in end_collection:
863 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
864 for k in end_collection:
866 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
867 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
868 elif end_collection[k] != self[k]:
869 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
871 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
873 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
878 def apply(self, changes: ChangeList) -> None:
879 """Apply a list of changes from to this collection
881 This method takes a list of changes generated by
882 `RichCollectionBase.diff` and applies it to this
883 collection. Afterward, the state of this collection object will
884 match the state of `end_collection` passed to `diff`. If a change
885 conflicts with a local change, it will be saved to an alternate path
886 indicating the conflict.
890 * changes: arvados.collection.ChangeList --- The list of differences
891 generated by `RichCollectionBase.diff`.
894 self.set_committed(False)
895 for change in changes:
896 event_type = change[0]
899 local = self.find(path)
900 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
902 if event_type == ADD:
904 # No local file at path, safe to copy over new file
905 self.copy(initial, path)
906 elif local is not None and local != initial:
907 # There is already local file and it is different:
908 # save change to conflict file.
909 self.copy(initial, conflictpath)
910 elif event_type == MOD or event_type == TOK:
913 # Local matches the "initial" item so it has not
914 # changed locally and is safe to update.
915 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
916 # Replace contents of local file with new contents
917 local.replace_contents(final)
919 # Overwrite path with new item; this can happen if
920 # path was a file and is now a collection or vice versa
921 self.copy(final, path, overwrite=True)
923 # Local is missing (presumably deleted) or local doesn't
924 # match the "start" value, so save change to conflict file
925 self.copy(final, conflictpath)
926 elif event_type == DEL:
928 # Local item matches "initial" value, so it is safe to remove.
929 self.remove(path, recursive=True)
930 # else, the file is modified or already removed, in either
931 # case we don't want to try to remove it.
933 def portable_data_hash(self) -> str:
934 """Get the portable data hash for this collection's manifest"""
935 if self._manifest_locator and self.committed():
936 # If the collection is already saved on the API server, and it's committed
937 # then return API server's PDH response.
938 return self._portable_data_hash
940 stripped = self.portable_manifest_text().encode()
941 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
944 def subscribe(self, callback: ChangeCallback) -> None:
945 """Set a notify callback for changes to this collection
949 * callback: arvados.collection.ChangeCallback --- The callable to
950 call each time the collection is changed.
952 if self._callback is None:
953 self._callback = callback
955 raise errors.ArgumentError("A callback is already set on this collection.")
958 def unsubscribe(self) -> None:
959 """Remove any notify callback set for changes to this collection"""
960 if self._callback is not None:
961 self._callback = None
967 collection: 'RichCollectionBase',
969 item: CollectionItem,
971 """Notify any subscribed callback about a change to this collection
973 .. ATTENTION:: Internal
974 This method is only meant to be used by other Collection methods.
976 If a callback has been registered with `RichCollectionBase.subscribe`,
977 it will be called with information about a change to this collection.
978 Then this notification will be propagated to this collection's root.
982 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
985 * collection: arvados.collection.RichCollectionBase --- The
986 collection that was modified.
988 * name: str --- The name of the file or stream within `collection` that
991 * item: arvados.arvfile.ArvadosFile |
992 arvados.collection.Subcollection --- The new contents at `name`
996 self._callback(event, collection, name, item)
997 self.root_collection().notify(event, collection, name, item)
1000 def __eq__(self, other: Any) -> bool:
1001 """Indicate whether this collection object is equal to another"""
1004 if not isinstance(other, RichCollectionBase):
1006 if len(self._items) != len(other):
1008 for k in self._items:
1011 if self._items[k] != other[k]:
1015 def __ne__(self, other: Any) -> bool:
1016 """Indicate whether this collection object is not equal to another"""
1017 return not self.__eq__(other)
1020 def flush(self) -> None:
1021 """Upload any pending data to Keep"""
1022 for e in self.values():
1026 class Collection(RichCollectionBase):
1027 """Read and manipulate an Arvados collection
1029 This class provides a high-level interface to create, read, and update
1030 Arvados collections and their contents. Refer to the Arvados Python SDK
1031 cookbook for [an introduction to using the Collection class][cookbook].
1033 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1036 def __init__(self, manifest_locator_or_text: Optional[str]=None,
1037 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1038 keep_client: Optional['arvados.keep.KeepClient']=None,
1039 num_retries: int=10,
1040 parent: Optional['Collection']=None,
1041 apiconfig: Optional[Mapping[str, str]]=None,
1042 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1043 replication_desired: Optional[int]=None,
1044 storage_classes_desired: Optional[List[str]]=None,
1045 put_threads: Optional[int]=None):
1046 """Initialize a Collection object
1050 * manifest_locator_or_text: str | None --- This string can contain a
1051 collection manifest text, portable data hash, or UUID. When given a
1052 portable data hash or UUID, this instance will load a collection
1053 record from the API server. Otherwise, this instance will represent a
1054 new collection without an API server record. The default value `None`
1055 instantiates a new collection with an empty manifest.
1057 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1058 Arvados API client object this instance uses to make requests. If
1059 none is given, this instance creates its own client using the
1060 settings from `apiconfig` (see below). If your client instantiates
1061 many Collection objects, you can help limit memory utilization by
1062 calling `arvados.api.api` to construct an
1063 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1064 for every Collection.
1066 * keep_client: arvados.keep.KeepClient | None --- The Keep client
1067 object this instance uses to make requests. If none is given, this
1068 instance creates its own client using its `api_client`.
1070 * num_retries: int --- The number of times that client requests are
1071 retried. Default 10.
1073 * parent: arvados.collection.Collection | None --- The parent Collection
1074 object of this instance, if any. This argument is primarily used by
1075 other Collection methods; user client code shouldn't need to use it.
1077 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1078 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1079 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1080 Collection object constructs one from these settings. If no
1081 mapping is provided, calls `arvados.config.settings` to get these
1082 parameters from user configuration.
1084 * block_manager: arvados.arvfile._BlockManager | None --- The
1085 _BlockManager object used by this instance to coordinate reading
1086 and writing Keep data blocks. If none is given, this instance
1087 constructs its own. This argument is primarily used by other
1088 Collection methods; user client code shouldn't need to use it.
1090 * replication_desired: int | None --- This controls both the value of
1091 the `replication_desired` field on API collection records saved by
1092 this class, as well as the number of Keep services that the object
1093 writes new data blocks to. If none is given, uses the default value
1094 configured for the cluster.
1096 * storage_classes_desired: list[str] | None --- This controls both
1097 the value of the `storage_classes_desired` field on API collection
1098 records saved by this class, as well as selecting which specific
1099 Keep services the object writes new data blocks to. If none is
1100 given, defaults to an empty list.
1102 * put_threads: int | None --- The number of threads to run
1103 simultaneously to upload data blocks to Keep. This value is used when
1104 building a new `block_manager`. It is unused when a `block_manager`
1108 if storage_classes_desired and type(storage_classes_desired) is not list:
1109 raise errors.ArgumentError("storage_classes_desired must be list type.")
1111 super(Collection, self).__init__(parent)
1112 self._api_client = api_client
1113 self._keep_client = keep_client
1115 # Use the keep client from ThreadSafeApiCache
1116 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1117 self._keep_client = self._api_client.keep
1119 self._block_manager = block_manager
1120 self.replication_desired = replication_desired
1121 self._storage_classes_desired = storage_classes_desired
1122 self.put_threads = put_threads
1125 self._config = apiconfig
1127 self._config = config.settings()
1129 self.num_retries = num_retries
1130 self._manifest_locator = None
1131 self._manifest_text = None
1132 self._portable_data_hash = None
1133 self._api_response = None
1134 self._past_versions = set()
1136 self.lock = threading.RLock()
1139 if manifest_locator_or_text:
1140 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1141 self._manifest_locator = manifest_locator_or_text
1142 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1143 self._manifest_locator = manifest_locator_or_text
1144 if not self._has_local_collection_uuid():
1145 self._has_remote_blocks = True
1146 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1147 self._manifest_text = manifest_locator_or_text
1148 if '+R' in self._manifest_text:
1149 self._has_remote_blocks = True
1151 raise errors.ArgumentError(
1152 "Argument to CollectionReader is not a manifest or a collection UUID")
1156 except errors.SyntaxError as e:
1157 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1159 def storage_classes_desired(self) -> List[str]:
1160 """Get this collection's `storage_classes_desired` value"""
1161 return self._storage_classes_desired or []
1163 def root_collection(self) -> 'Collection':
1166 def get_properties(self) -> Properties:
1167 """Get this collection's properties
1169 This method always returns a dict. If this collection object does not
1170 have an associated API record, or that record does not have any
1171 properties set, this method returns an empty dict.
1173 if self._api_response and self._api_response["properties"]:
1174 return self._api_response["properties"]
1178 def get_trash_at(self) -> Optional[datetime.datetime]:
1179 """Get this collection's `trash_at` field
1181 This method parses the `trash_at` field of the collection's API
1182 record and returns a datetime from it. If that field is not set, or
1183 this collection object does not have an associated API record,
1186 if self._api_response and self._api_response["trash_at"]:
1188 return ciso8601.parse_datetime(self._api_response["trash_at"])
1194 def stream_name(self) -> str:
1197 def writable(self) -> bool:
1201 def known_past_version(
1203 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1205 """Indicate whether an API record for this collection has been seen before
1207 As this collection object loads records from the API server, it records
1208 their `modified_at` and `portable_data_hash` fields. This method accepts
1209 a 2-tuple with values for those fields, and returns `True` if the
1210 combination was previously loaded.
1212 return modified_at_and_portable_data_hash in self._past_versions
1218 other: Optional['Collection']=None,
1219 num_retries: Optional[int]=None,
1221 """Merge another collection's contents into this one
1223 This method compares the manifest of this collection instance with
1224 another, then updates this instance's manifest with changes from the
1225 other, renaming files to flag conflicts where necessary.
1227 When called without any arguments, this method reloads the collection's
1228 API record, and updates this instance with any changes that have
1229 appeared server-side. If this instance does not have a corresponding
1230 API record, this method raises `arvados.errors.ArgumentError`.
1234 * other: arvados.collection.Collection | None --- The collection
1235 whose contents should be merged into this instance. When not
1236 provided, this method reloads this collection's API record and
1237 constructs a Collection object from it. If this instance does not
1238 have a corresponding API record, this method raises
1239 `arvados.errors.ArgumentError`.
1241 * num_retries: int | None --- The number of times to retry reloading
1242 the collection's API record from the API server. If not specified,
1243 uses the `num_retries` provided when this instance was constructed.
1246 if self._manifest_locator is None:
1247 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1248 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1249 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1250 response.get("portable_data_hash") != self.portable_data_hash()):
1251 # The record on the server is different from our current one, but we've seen it before,
1252 # so ignore it because it's already been merged.
1253 # However, if it's the same as our current record, proceed with the update, because we want to update
1257 self._remember_api_response(response)
1258 other = CollectionReader(response["manifest_text"])
1259 baseline = CollectionReader(self._manifest_text)
1260 self.apply(baseline.diff(other))
1261 self._manifest_text = self.manifest_text()
1265 if self._api_client is None:
1266 self._api_client = ThreadSafeApiCache(self._config, version='v1')
1267 if self._keep_client is None:
1268 self._keep_client = self._api_client.keep
1269 return self._api_client
1273 if self._keep_client is None:
1274 if self._api_client is None:
1277 self._keep_client = KeepClient(api_client=self._api_client)
1278 return self._keep_client
1281 def _my_block_manager(self):
1282 if self._block_manager is None:
1283 copies = (self.replication_desired or
1284 self._my_api()._rootDesc.get('defaultCollectionReplication',
1286 self._block_manager = _BlockManager(self._my_keep(),
1288 put_threads=self.put_threads,
1289 num_retries=self.num_retries,
1290 storage_classes_func=self.storage_classes_desired)
1291 return self._block_manager
1293 def _remember_api_response(self, response):
1294 self._api_response = response
1295 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1297 def _populate_from_api_server(self):
1298 # As in KeepClient itself, we must wait until the last
1299 # possible moment to instantiate an API client, in order to
1300 # avoid tripping up clients that don't have access to an API
1301 # server. If we do build one, make sure our Keep client uses
1302 # it. If instantiation fails, we'll fall back to the except
1303 # clause, just like any other Collection lookup
1304 # failure. Return an exception, or None if successful.
1305 self._remember_api_response(self._my_api().collections().get(
1306 uuid=self._manifest_locator).execute(
1307 num_retries=self.num_retries))
1308 self._manifest_text = self._api_response['manifest_text']
1309 self._portable_data_hash = self._api_response['portable_data_hash']
1310 # If not overriden via kwargs, we should try to load the
1311 # replication_desired and storage_classes_desired from the API server
1312 if self.replication_desired is None:
1313 self.replication_desired = self._api_response.get('replication_desired', None)
1314 if self._storage_classes_desired is None:
1315 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1317 def _populate(self):
1318 if self._manifest_text is None:
1319 if self._manifest_locator is None:
1322 self._populate_from_api_server()
1323 self._baseline_manifest = self._manifest_text
1324 self._import_manifest(self._manifest_text)
1326 def _has_collection_uuid(self):
1327 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1329 def _has_local_collection_uuid(self):
1330 return self._has_collection_uuid and \
1331 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1333 def __enter__(self):
1336 def __exit__(self, exc_type, exc_value, traceback):
1337 """Exit a context with this collection instance
1339 If no exception was raised inside the context block, and this
1340 collection is writable and has a corresponding API record, that
1341 record will be updated to match the state of this instance at the end
1344 if exc_type is None:
1345 if self.writable() and self._has_collection_uuid():
1349 def stop_threads(self) -> None:
1350 """Stop background Keep upload/download threads"""
1351 if self._block_manager is not None:
1352 self._block_manager.stop_threads()
1355 def manifest_locator(self) -> Optional[str]:
1356 """Get this collection's manifest locator, if any
1358 * If this collection instance is associated with an API record with a
1360 * Otherwise, if this collection instance was loaded from an API record
1361 by portable data hash, return that.
1362 * Otherwise, return `None`.
1364 return self._manifest_locator
1369 new_parent: Optional['Collection']=None,
1370 new_name: Optional[str]=None,
1371 readonly: bool=False,
1372 new_config: Optional[Mapping[str, str]]=None,
1374 """Create a Collection object with the same contents as this instance
1376 This method creates a new Collection object with contents that match
1377 this instance's. The new collection will not be associated with any API
1382 * new_parent: arvados.collection.Collection | None --- This value is
1383 passed to the new Collection's constructor as the `parent`
1386 * new_name: str | None --- This value is unused.
1388 * readonly: bool --- If this value is true, this method constructs and
1389 returns a `CollectionReader`. Otherwise, it returns a mutable
1390 `Collection`. Default `False`.
1392 * new_config: Mapping[str, str] | None --- This value is passed to the
1393 new Collection's constructor as `apiconfig`. If no value is provided,
1394 defaults to the configuration passed to this instance's constructor.
1396 if new_config is None:
1397 new_config = self._config
1399 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1401 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1403 newcollection._clonefrom(self)
1404 return newcollection
1407 def api_response(self) -> Optional[Dict[str, Any]]:
1408 """Get this instance's associated API record
1410 If this Collection instance has an associated API record, return it.
1411 Otherwise, return `None`.
1413 return self._api_response
1418 create_type: CreateType,
1419 ) -> CollectionItem:
1423 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1425 def find(self, path: str) -> CollectionItem:
1429 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1431 def remove(self, path: str, recursive: bool=False) -> None:
1433 raise errors.ArgumentError("Cannot remove '.'")
1435 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1442 properties: Optional[Properties]=None,
1443 storage_classes: Optional[StorageClasses]=None,
1444 trash_at: Optional[datetime.datetime]=None,
1446 num_retries: Optional[int]=None,
1447 preserve_version: bool=False,
1449 """Save collection to an existing API record
1451 This method updates the instance's corresponding API record to match
1452 the instance's state. If this instance does not have a corresponding API
1453 record yet, raises `AssertionError`. (To create a new API record, use
1454 `Collection.save_new`.) This method returns the saved collection
1459 * properties: dict[str, Any] | None --- If provided, the API record will
1460 be updated with these properties. Note this will completely replace
1461 any existing properties.
1463 * storage_classes: list[str] | None --- If provided, the API record will
1464 be updated with this value in the `storage_classes_desired` field.
1465 This value will also be saved on the instance and used for any
1466 changes that follow.
1468 * trash_at: datetime.datetime | None --- If provided, the API record
1469 will be updated with this value in the `trash_at` field.
1471 * merge: bool --- If `True` (the default), this method will first
1472 reload this collection's API record, and merge any new contents into
1473 this instance before saving changes. See `Collection.update` for
1476 * num_retries: int | None --- The number of times to retry reloading
1477 the collection's API record from the API server. If not specified,
1478 uses the `num_retries` provided when this instance was constructed.
1480 * preserve_version: bool --- This value will be passed to directly
1481 to the underlying API call. If `True`, the Arvados API will
1482 preserve the versions of this collection both immediately before
1483 and after the update. If `True` when the API server is not
1484 configured with collection versioning, this method raises
1485 `arvados.errors.ArgumentError`.
1487 if properties and type(properties) is not dict:
1488 raise errors.ArgumentError("properties must be dictionary type.")
1490 if storage_classes and type(storage_classes) is not list:
1491 raise errors.ArgumentError("storage_classes must be list type.")
1493 self._storage_classes_desired = storage_classes
1495 if trash_at and type(trash_at) is not datetime.datetime:
1496 raise errors.ArgumentError("trash_at must be datetime type.")
1498 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1499 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1503 body["properties"] = properties
1504 if self.storage_classes_desired():
1505 body["storage_classes_desired"] = self.storage_classes_desired()
1507 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1508 body["trash_at"] = t
1509 if preserve_version:
1510 body["preserve_version"] = preserve_version
1512 if not self.committed():
1513 if self._has_remote_blocks:
1514 # Copy any remote blocks to the local cluster.
1515 self._copy_remote_blocks(remote_blocks={})
1516 self._has_remote_blocks = False
1517 if not self._has_collection_uuid():
1518 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1519 elif not self._has_local_collection_uuid():
1520 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1522 self._my_block_manager().commit_all()
1527 text = self.manifest_text(strip=False)
1528 body['manifest_text'] = text
1530 self._remember_api_response(self._my_api().collections().update(
1531 uuid=self._manifest_locator,
1533 ).execute(num_retries=num_retries))
1534 self._manifest_text = self._api_response["manifest_text"]
1535 self._portable_data_hash = self._api_response["portable_data_hash"]
1536 self.set_committed(True)
1538 self._remember_api_response(self._my_api().collections().update(
1539 uuid=self._manifest_locator,
1541 ).execute(num_retries=num_retries))
1543 return self._manifest_text
1551 name: Optional[str]=None,
1552 create_collection_record: bool=True,
1553 owner_uuid: Optional[str]=None,
1554 properties: Optional[Properties]=None,
1555 storage_classes: Optional[StorageClasses]=None,
1556 trash_at: Optional[datetime.datetime]=None,
1557 ensure_unique_name: bool=False,
1558 num_retries: Optional[int]=None,
1559 preserve_version: bool=False,
1561 """Save collection to a new API record
1563 This method finishes uploading new data blocks and (optionally)
1564 creates a new API collection record with the provided data. If a new
1565 record is created, this instance becomes associated with that record
1566 for future updates like `save()`. This method returns the saved
1567 collection manifest.
1571 * name: str | None --- The `name` field to use on the new collection
1572 record. If not specified, a generic default name is generated.
1574 * create_collection_record: bool --- If `True` (the default), creates a
1575 collection record on the API server. If `False`, the method finishes
1576 all data uploads and only returns the resulting collection manifest
1577 without sending it to the API server.
1579 * owner_uuid: str | None --- The `owner_uuid` field to use on the
1580 new collection record.
1582 * properties: dict[str, Any] | None --- The `properties` field to use on
1583 the new collection record.
1585 * storage_classes: list[str] | None --- The
1586 `storage_classes_desired` field to use on the new collection record.
1588 * trash_at: datetime.datetime | None --- The `trash_at` field to use
1589 on the new collection record.
1591 * ensure_unique_name: bool --- This value is passed directly to the
1592 Arvados API when creating the collection record. If `True`, the API
1593 server may modify the submitted `name` to ensure the collection's
1594 `name`+`owner_uuid` combination is unique. If `False` (the default),
1595 if a collection already exists with this same `name`+`owner_uuid`
1596 combination, creating a collection record will raise a validation
1599 * num_retries: int | None --- The number of times to retry reloading
1600 the collection's API record from the API server. If not specified,
1601 uses the `num_retries` provided when this instance was constructed.
1603 * preserve_version: bool --- This value will be passed to directly
1604 to the underlying API call. If `True`, the Arvados API will
1605 preserve the versions of this collection both immediately before
1606 and after the update. If `True` when the API server is not
1607 configured with collection versioning, this method raises
1608 `arvados.errors.ArgumentError`.
1610 if properties and type(properties) is not dict:
1611 raise errors.ArgumentError("properties must be dictionary type.")
1613 if storage_classes and type(storage_classes) is not list:
1614 raise errors.ArgumentError("storage_classes must be list type.")
1616 if trash_at and type(trash_at) is not datetime.datetime:
1617 raise errors.ArgumentError("trash_at must be datetime type.")
1619 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1620 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1622 if self._has_remote_blocks:
1623 # Copy any remote blocks to the local cluster.
1624 self._copy_remote_blocks(remote_blocks={})
1625 self._has_remote_blocks = False
1628 self._storage_classes_desired = storage_classes
1630 self._my_block_manager().commit_all()
1631 text = self.manifest_text(strip=False)
1633 if create_collection_record:
1635 name = "New collection"
1636 ensure_unique_name = True
1638 body = {"manifest_text": text,
1640 "replication_desired": self.replication_desired}
1642 body["owner_uuid"] = owner_uuid
1644 body["properties"] = properties
1645 if self.storage_classes_desired():
1646 body["storage_classes_desired"] = self.storage_classes_desired()
1648 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1649 body["trash_at"] = t
1650 if preserve_version:
1651 body["preserve_version"] = preserve_version
1653 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1654 text = self._api_response["manifest_text"]
1656 self._manifest_locator = self._api_response["uuid"]
1657 self._portable_data_hash = self._api_response["portable_data_hash"]
1659 self._manifest_text = text
1660 self.set_committed(True)
1664 _token_re = re.compile(r'(\S+)(\s+|$)')
1665 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1666 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1668 def _unescape_manifest_path(self, path):
1669 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1672 def _import_manifest(self, manifest_text):
1673 """Import a manifest into a `Collection`.
1676 The manifest text to import from.
1680 raise ArgumentError("Can only import manifest into an empty collection")
1689 for token_and_separator in self._token_re.finditer(manifest_text):
1690 tok = token_and_separator.group(1)
1691 sep = token_and_separator.group(2)
1693 if state == STREAM_NAME:
1694 # starting a new stream
1695 stream_name = self._unescape_manifest_path(tok)
1700 self.find_or_create(stream_name, COLLECTION)
1704 block_locator = self._block_re.match(tok)
1706 blocksize = int(block_locator.group(1))
1707 blocks.append(Range(tok, streamoffset, blocksize, 0))
1708 streamoffset += blocksize
1712 if state == SEGMENTS:
1713 file_segment = self._segment_re.match(tok)
1715 pos = int(file_segment.group(1))
1716 size = int(file_segment.group(2))
1717 name = self._unescape_manifest_path(file_segment.group(3))
1718 if name.split('/')[-1] == '.':
1719 # placeholder for persisting an empty directory, not a real file
1721 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1723 filepath = os.path.join(stream_name, name)
1725 afile = self.find_or_create(filepath, FILE)
1726 except IOError as e:
1727 if e.errno == errno.ENOTDIR:
1728 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1731 if isinstance(afile, ArvadosFile):
1732 afile.add_segment(blocks, pos, size)
1734 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1737 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1743 self.set_committed(True)
1749 collection: 'RichCollectionBase',
1751 item: CollectionItem,
1754 self._callback(event, collection, name, item)
1757 class Subcollection(RichCollectionBase):
1758 """Read and manipulate a stream/directory within an Arvados collection
1760 This class represents a single stream (like a directory) within an Arvados
1761 `Collection`. It is returned by `Collection.find` and provides the same API.
1762 Operations that work on the API collection record propagate to the parent
1763 `Collection` object.
1766 def __init__(self, parent, name):
1767 super(Subcollection, self).__init__(parent)
1768 self.lock = self.root_collection().lock
1769 self._manifest_text = None
1771 self.num_retries = parent.num_retries
1773 def root_collection(self) -> 'Collection':
1774 return self.parent.root_collection()
1776 def writable(self) -> bool:
1777 return self.root_collection().writable()
1780 return self.root_collection()._my_api()
1783 return self.root_collection()._my_keep()
1785 def _my_block_manager(self):
1786 return self.root_collection()._my_block_manager()
1788 def stream_name(self) -> str:
1789 return os.path.join(self.parent.stream_name(), self.name)
1794 new_parent: Optional['Collection']=None,
1795 new_name: Optional[str]=None,
1796 ) -> 'Subcollection':
1797 c = Subcollection(new_parent, new_name)
1803 def _reparent(self, newparent, newname):
1804 self.set_committed(False)
1806 self.parent.remove(self.name, recursive=True)
1807 self.parent = newparent
1809 self.lock = self.parent.root_collection().lock
1812 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1813 """Encode empty directories by using an \056-named (".") empty file"""
1814 if len(self._items) == 0:
1815 return "%s %s 0:0:\\056\n" % (
1816 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1817 return super(Subcollection, self)._get_manifest_text(stream_name,
1822 class CollectionReader(Collection):
1823 """Read-only `Collection` subclass
1825 This class will never create or update any API collection records. You can
1826 use this class for additional code safety when you only need to read
1827 existing collections.
1829 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1830 self._in_init = True
1831 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1832 self._in_init = False
1834 # Forego any locking since it should never change once initialized.
1835 self.lock = NoopLock()
1837 # Backwards compatability with old CollectionReader
1838 # all_streams() and all_files()
1839 self._streams = None
1841 def writable(self) -> bool:
1842 return self._in_init