1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
28 from collections import deque
31 from ._internal import streams
32 from .api import ThreadSafeAPIClient
33 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock, ADD, DEL, MOD, TOK, WRITE
34 from .keep import KeepLocator, KeepClient
35 import arvados.config as config
36 import arvados.errors as errors
38 import arvados.events as events
39 from arvados.retry import retry_method
54 if sys.version_info < (3, 8):
55 from typing_extensions import Literal
57 from typing import Literal
59 _logger = logging.getLogger('arvados.collection')
63 """`create_type` value for `Collection.find_or_create`"""
64 COLLECTION = "collection"
65 """`create_type` value for `Collection.find_or_create`"""
67 ChangeList = List[Union[
68 Tuple[Literal[ADD, DEL], str, 'Collection'],
69 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
71 ChangeType = Literal[ADD, DEL, MOD, TOK]
72 CollectionItem = Union[ArvadosFile, 'Collection']
73 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
74 CreateType = Literal[COLLECTION, FILE]
75 Properties = Dict[str, Any]
76 StorageClasses = List[str]
78 class CollectionBase(object):
79 """Abstract base class for Collection classes
81 .. ATTENTION:: Internal
82 This class is meant to be used by other parts of the SDK. User code
83 should instantiate or subclass `Collection` or one of its subclasses
88 """Enter a context block with this collection instance"""
91 def __exit__(self, exc_type, exc_value, traceback):
92 """Exit a context block with this collection instance"""
96 if self._keep_client is None:
97 self._keep_client = KeepClient(api_client=self._api_client,
98 num_retries=self.num_retries)
99 return self._keep_client
101 def stripped_manifest(self) -> str:
102 """Create a copy of the collection manifest with only size hints
104 This method returns a string with the current collection's manifest
105 text with all non-portable locator hints like permission hints and
106 remote cluster hints removed. The only hints in the returned manifest
109 raw = self.manifest_text()
111 for line in raw.split("\n"):
112 fields = line.split()
114 clean_fields = fields[:1] + [
115 (re.sub(r'\+[^\d][^\+]*', '', x)
116 if re.match(arvados.util.keep_locator_pattern, x)
119 clean += [' '.join(clean_fields), "\n"]
120 return ''.join(clean)
123 class _WriterFile(_FileLikeObjectBase):
124 def __init__(self, coll_writer, name):
125 super(_WriterFile, self).__init__(name, 'wb')
126 self.dest = coll_writer
129 super(_WriterFile, self).close()
130 self.dest.finish_current_file()
132 @_FileLikeObjectBase._before_close
133 def write(self, data):
134 self.dest.write(data)
136 @_FileLikeObjectBase._before_close
137 def writelines(self, seq):
141 @_FileLikeObjectBase._before_close
143 self.dest.flush_data()
146 class RichCollectionBase(CollectionBase):
147 """Base class for Collection classes
149 .. ATTENTION:: Internal
150 This class is meant to be used by other parts of the SDK. User code
151 should instantiate or subclass `Collection` or one of its subclasses
155 def __init__(self, parent=None):
157 self._committed = False
158 self._has_remote_blocks = False
159 self._callback = None
163 raise NotImplementedError()
166 raise NotImplementedError()
168 def _my_block_manager(self):
169 raise NotImplementedError()
171 def writable(self) -> bool:
172 """Indicate whether this collection object can be modified
174 This method returns `False` if this object is a `CollectionReader`,
177 raise NotImplementedError()
179 def root_collection(self) -> 'Collection':
180 """Get this collection's root collection object
182 If you open a subcollection with `Collection.find`, calling this method
183 on that subcollection returns the source Collection object.
185 raise NotImplementedError()
187 def stream_name(self) -> str:
188 """Get the name of the manifest stream represented by this collection
190 If you open a subcollection with `Collection.find`, calling this method
191 on that subcollection returns the name of the stream you opened.
193 raise NotImplementedError()
196 def has_remote_blocks(self) -> bool:
197 """Indiciate whether the collection refers to remote data
199 Returns `True` if the collection manifest includes any Keep locators
200 with a remote hint (`+R`), else `False`.
202 if self._has_remote_blocks:
205 if self[item].has_remote_blocks():
210 def set_has_remote_blocks(self, val: bool) -> None:
211 """Cache whether this collection refers to remote blocks
213 .. ATTENTION:: Internal
214 This method is only meant to be used by other Collection methods.
216 Set this collection's cached "has remote blocks" flag to the given
219 self._has_remote_blocks = val
221 self.parent.set_has_remote_blocks(val)
228 create_type: CreateType,
230 """Get the item at the given path, creating it if necessary
232 If `path` refers to a stream in this collection, returns a
233 corresponding `Subcollection` object. If `path` refers to a file in
234 this collection, returns a corresponding
235 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
236 this collection, then this method creates a new object and returns
237 it, creating parent streams as needed. The type of object created is
238 determined by the value of `create_type`.
242 * path: str --- The path to find or create within this collection.
244 * create_type: Literal[COLLECTION, FILE] --- The type of object to
245 create at `path` if one does not exist. Passing `COLLECTION`
246 creates a stream and returns the corresponding
247 `Subcollection`. Passing `FILE` creates a new file and returns the
248 corresponding `arvados.arvfile.ArvadosFile`.
250 pathcomponents = path.split("/", 1)
251 if pathcomponents[0]:
252 item = self._items.get(pathcomponents[0])
253 if len(pathcomponents) == 1:
256 if create_type == COLLECTION:
257 item = Subcollection(self, pathcomponents[0])
259 item = ArvadosFile(self, pathcomponents[0])
260 self._items[pathcomponents[0]] = item
261 self.set_committed(False)
262 self.notify(ADD, self, pathcomponents[0], item)
266 # create new collection
267 item = Subcollection(self, pathcomponents[0])
268 self._items[pathcomponents[0]] = item
269 self.set_committed(False)
270 self.notify(ADD, self, pathcomponents[0], item)
271 if isinstance(item, RichCollectionBase):
272 return item.find_or_create(pathcomponents[1], create_type)
274 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
279 def find(self, path: str) -> CollectionItem:
280 """Get the item at the given path
282 If `path` refers to a stream in this collection, returns a
283 corresponding `Subcollection` object. If `path` refers to a file in
284 this collection, returns a corresponding
285 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
286 this collection, then this method raises `NotADirectoryError`.
290 * path: str --- The path to find or create within this collection.
293 raise errors.ArgumentError("Parameter 'path' is empty.")
295 pathcomponents = path.split("/", 1)
296 if pathcomponents[0] == '':
297 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
299 item = self._items.get(pathcomponents[0])
302 elif len(pathcomponents) == 1:
305 if isinstance(item, RichCollectionBase):
306 if pathcomponents[1]:
307 return item.find(pathcomponents[1])
311 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
314 def mkdirs(self, path: str) -> 'Subcollection':
315 """Create and return a subcollection at `path`
317 If `path` exists within this collection, raises `FileExistsError`.
318 Otherwise, creates a stream at that path and returns the
319 corresponding `Subcollection`.
321 if self.find(path) != None:
322 raise IOError(errno.EEXIST, "Directory or file exists", path)
324 return self.find_or_create(path, COLLECTION)
330 encoding: Optional[str]=None
332 """Open a file-like object within the collection
334 This method returns a file-like object that can read and/or write the
335 file located at `path` within the collection. If you attempt to write
336 a `path` that does not exist, the file is created with `find_or_create`.
337 If the file cannot be opened for any other reason, this method raises
338 `OSError` with an appropriate errno.
342 * path: str --- The path of the file to open within this collection
344 * mode: str --- The mode to open this file. Supports all the same
345 values as `builtins.open`.
347 * encoding: str | None --- The text encoding of the file. Only used
348 when the file is opened in text mode. The default is
352 if not re.search(r'^[rwa][bt]?\+?$', mode):
353 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
355 if mode[0] == 'r' and '+' not in mode:
356 fclass = ArvadosFileReader
357 arvfile = self.find(path)
358 elif not self.writable():
359 raise IOError(errno.EROFS, "Collection is read only")
361 fclass = ArvadosFileWriter
362 arvfile = self.find_or_create(path, FILE)
365 raise IOError(errno.ENOENT, "File not found", path)
366 if not isinstance(arvfile, ArvadosFile):
367 raise IOError(errno.EISDIR, "Is a directory", path)
372 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
373 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
375 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
376 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
379 def modified(self) -> bool:
380 """Indicate whether this collection has an API server record
382 Returns `False` if this collection corresponds to a record loaded from
383 the API server, `True` otherwise.
385 return not self.committed()
389 """Indicate whether this collection has an API server record
391 Returns `True` if this collection corresponds to a record loaded from
392 the API server, `False` otherwise.
394 return self._committed
397 def set_committed(self, value: bool=True):
398 """Cache whether this collection has an API server record
400 .. ATTENTION:: Internal
401 This method is only meant to be used by other Collection methods.
403 Set this collection's cached "committed" flag to the given
404 value and propagates it as needed.
406 if value == self._committed:
409 for k,v in self._items.items():
410 v.set_committed(True)
411 self._committed = True
413 self._committed = False
414 if self.parent is not None:
415 self.parent.set_committed(False)
418 def __iter__(self) -> Iterator[str]:
419 """Iterate names of streams and files in this collection
421 This method does not recurse. It only iterates the contents of this
422 collection's corresponding stream.
424 return iter(self._items)
427 def __getitem__(self, k: str) -> CollectionItem:
428 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
430 This method does not recurse. If you want to search a path, use
431 `RichCollectionBase.find` instead.
433 return self._items[k]
436 def __contains__(self, k: str) -> bool:
437 """Indicate whether this collection has an item with this name
439 This method does not recurse. It you want to check a path, use
440 `RichCollectionBase.exists` instead.
442 return k in self._items
446 """Get the number of items directly contained in this collection
448 This method does not recurse. It only counts the streams and files
449 in this collection's corresponding stream.
451 return len(self._items)
455 def __delitem__(self, p: str) -> None:
456 """Delete an item from this collection's stream
458 This method does not recurse. If you want to remove an item by a
459 path, use `RichCollectionBase.remove` instead.
462 self.set_committed(False)
463 self.notify(DEL, self, p, None)
466 def keys(self) -> Iterator[str]:
467 """Iterate names of streams and files in this collection
469 This method does not recurse. It only iterates the contents of this
470 collection's corresponding stream.
472 return self._items.keys()
475 def values(self) -> List[CollectionItem]:
476 """Get a list of objects in this collection's stream
478 The return value includes a `Subcollection` for every stream, and an
479 `arvados.arvfile.ArvadosFile` for every file, directly within this
480 collection's stream. This method does not recurse.
482 return list(self._items.values())
485 def items(self) -> List[Tuple[str, CollectionItem]]:
486 """Get a list of `(name, object)` tuples from this collection's stream
488 The return value includes a `Subcollection` for every stream, and an
489 `arvados.arvfile.ArvadosFile` for every file, directly within this
490 collection's stream. This method does not recurse.
492 return list(self._items.items())
494 def exists(self, path: str) -> bool:
495 """Indicate whether this collection includes an item at `path`
497 This method returns `True` if `path` refers to a stream or file within
498 this collection, else `False`.
502 * path: str --- The path to check for existence within this collection
504 return self.find(path) is not None
508 def remove(self, path: str, recursive: bool=False) -> None:
509 """Remove the file or stream at `path`
513 * path: str --- The path of the item to remove from the collection
515 * recursive: bool --- Controls the method's behavior if `path` refers
516 to a nonempty stream. If `False` (the default), this method raises
517 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
518 items under the stream.
521 raise errors.ArgumentError("Parameter 'path' is empty.")
523 pathcomponents = path.split("/", 1)
524 item = self._items.get(pathcomponents[0])
526 raise IOError(errno.ENOENT, "File not found", path)
527 if len(pathcomponents) == 1:
528 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
529 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
530 deleteditem = self._items[pathcomponents[0]]
531 del self._items[pathcomponents[0]]
532 self.set_committed(False)
533 self.notify(DEL, self, pathcomponents[0], deleteditem)
535 item.remove(pathcomponents[1], recursive=recursive)
537 def _clonefrom(self, source):
538 for k,v in source.items():
539 self._items[k] = v.clone(self, k)
542 raise NotImplementedError()
548 source_obj: CollectionItem,
550 overwrite: bool=False,
551 reparent: bool=False,
553 """Copy or move a file or subcollection object to this collection
557 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
558 to add to this collection
560 * target_name: str --- The path inside this collection where
561 `source_obj` should be added.
563 * overwrite: bool --- Controls the behavior of this method when the
564 collection already contains an object at `target_name`. If `False`
565 (the default), this method will raise `FileExistsError`. If `True`,
566 the object at `target_name` will be replaced with `source_obj`.
568 * reparent: bool --- Controls whether this method copies or moves
569 `source_obj`. If `False` (the default), `source_obj` is copied into
570 this collection. If `True`, `source_obj` is moved into this
573 if target_name in self and not overwrite:
574 raise IOError(errno.EEXIST, "File already exists", target_name)
577 if target_name in self:
578 modified_from = self[target_name]
580 # Actually make the move or copy.
582 source_obj._reparent(self, target_name)
585 item = source_obj.clone(self, target_name)
587 self._items[target_name] = item
588 self.set_committed(False)
589 if not self._has_remote_blocks and source_obj.has_remote_blocks():
590 self.set_has_remote_blocks(True)
593 self.notify(MOD, self, target_name, (modified_from, item))
595 self.notify(ADD, self, target_name, item)
597 def _get_src_target(self, source, target_path, source_collection, create_dest):
598 if source_collection is None:
599 source_collection = self
602 if isinstance(source, str):
603 source_obj = source_collection.find(source)
604 if source_obj is None:
605 raise IOError(errno.ENOENT, "File not found", source)
606 sourcecomponents = source.split("/")
609 sourcecomponents = None
611 # Find parent collection the target path
612 targetcomponents = target_path.split("/")
614 # Determine the name to use.
615 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
618 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
621 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
623 if len(targetcomponents) > 1:
624 target_dir = self.find("/".join(targetcomponents[0:-1]))
628 if target_dir is None:
629 raise IOError(errno.ENOENT, "Target directory not found", target_name)
631 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
632 target_dir = target_dir[target_name]
633 target_name = sourcecomponents[-1]
635 return (source_obj, target_dir, target_name)
641 source: Union[str, CollectionItem],
643 source_collection: Optional['RichCollectionBase']=None,
644 overwrite: bool=False,
646 """Copy a file or subcollection object to this collection
650 * source: str | arvados.arvfile.ArvadosFile |
651 arvados.collection.Subcollection --- The file or subcollection to
652 add to this collection. If `source` is a str, the object will be
653 found by looking up this path from `source_collection` (see
656 * target_path: str --- The path inside this collection where the
657 source object should be added.
659 * source_collection: arvados.collection.Collection | None --- The
660 collection to find the source object from when `source` is a
661 path. Defaults to the current collection (`self`).
663 * overwrite: bool --- Controls the behavior of this method when the
664 collection already contains an object at `target_path`. If `False`
665 (the default), this method will raise `FileExistsError`. If `True`,
666 the object at `target_path` will be replaced with `source_obj`.
668 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
669 target_dir.add(source_obj, target_name, overwrite, False)
675 source: Union[str, CollectionItem],
677 source_collection: Optional['RichCollectionBase']=None,
678 overwrite: bool=False,
680 """Move a file or subcollection object to this collection
684 * source: str | arvados.arvfile.ArvadosFile |
685 arvados.collection.Subcollection --- The file or subcollection to
686 add to this collection. If `source` is a str, the object will be
687 found by looking up this path from `source_collection` (see
690 * target_path: str --- The path inside this collection where the
691 source object should be added.
693 * source_collection: arvados.collection.Collection | None --- The
694 collection to find the source object from when `source` is a
695 path. Defaults to the current collection (`self`).
697 * overwrite: bool --- Controls the behavior of this method when the
698 collection already contains an object at `target_path`. If `False`
699 (the default), this method will raise `FileExistsError`. If `True`,
700 the object at `target_path` will be replaced with `source_obj`.
702 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
703 if not source_obj.writable():
704 raise IOError(errno.EROFS, "Source collection is read only", source)
705 target_dir.add(source_obj, target_name, overwrite, True)
707 def portable_manifest_text(self, stream_name: str=".") -> str:
708 """Get the portable manifest text for this collection
710 The portable manifest text is normalized, and does not include access
711 tokens. This method does not flush outstanding blocks to Keep.
715 * stream_name: str --- The name to use for this collection's stream in
716 the generated manifest. Default `'.'`.
718 return self._get_manifest_text(stream_name, True, True)
723 stream_name: str=".",
725 normalize: bool=False,
726 only_committed: bool=False,
728 """Get the manifest text for this collection
732 * stream_name: str --- The name to use for this collection's stream in
733 the generated manifest. Default `'.'`.
735 * strip: bool --- Controls whether or not the returned manifest text
736 includes access tokens. If `False` (the default), the manifest text
737 will include access tokens. If `True`, the manifest text will not
738 include access tokens.
740 * normalize: bool --- Controls whether or not the returned manifest
741 text is normalized. Default `False`.
743 * only_committed: bool --- Controls whether or not this method uploads
744 pending data to Keep before building and returning the manifest text.
745 If `False` (the default), this method will finish uploading all data
746 to Keep, then return the final manifest. If `True`, this method will
747 build and return a manifest that only refers to the data that has
748 finished uploading at the time this method was called.
750 if not only_committed:
751 self._my_block_manager().commit_all()
752 return self._get_manifest_text(stream_name, strip, normalize,
753 only_committed=only_committed)
756 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
757 """Get the manifest text for this collection, sub collections and files.
760 Name to use for this stream (directory)
763 If True, remove signing tokens from block locators if present.
764 If False (default), block locators are left unchanged.
767 If True, always export the manifest text in normalized form
768 even if the Collection is not modified. If False (default) and the collection
769 is not modified, return the original manifest text even if it is not
773 If True, only include blocks that were already committed to Keep.
777 if not self.committed() or self._manifest_text is None or normalize:
780 sorted_keys = sorted(self.keys())
781 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
782 # Create a stream per file `k`
783 arvfile = self[filename]
785 for segment in arvfile.segments():
786 loc = segment.locator
787 if arvfile.parent._my_block_manager().is_bufferblock(loc):
790 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
792 loc = KeepLocator(loc).stripped()
793 filestream.append(streams.LocatorAndRange(
795 KeepLocator(loc).size,
796 segment.segment_offset,
799 stream[filename] = filestream
801 buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n")
802 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
803 buf.append(self[dirname].manifest_text(
804 stream_name=os.path.join(stream_name, dirname),
805 strip=strip, normalize=True, only_committed=only_committed))
809 return self.stripped_manifest()
811 return self._manifest_text
814 def _copy_remote_blocks(self, remote_blocks={}):
815 """Scan through the entire collection and ask Keep to copy remote blocks.
817 When accessing a remote collection, blocks will have a remote signature
818 (+R instead of +A). Collect these signatures and request Keep to copy the
819 blocks to the local cluster, returning local (+A) signatures.
822 Shared cache of remote to local block mappings. This is used to avoid
823 doing extra work when blocks are shared by more than one file in
824 different subdirectories.
828 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
834 end_collection: 'RichCollectionBase',
836 holding_collection: Optional['Collection']=None,
838 """Build a list of differences between this collection and another
842 * end_collection: arvados.collection.RichCollectionBase --- A
843 collection object with the desired end state. The returned diff
844 list will describe how to go from the current collection object
845 `self` to `end_collection`.
847 * prefix: str --- The name to use for this collection's stream in
848 the diff list. Default `'.'`.
850 * holding_collection: arvados.collection.Collection | None --- A
851 collection object used to hold objects for the returned diff
852 list. By default, a new empty collection is created.
855 if holding_collection is None:
856 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
858 if k not in end_collection:
859 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
860 for k in end_collection:
862 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
863 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
864 elif end_collection[k] != self[k]:
865 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
867 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
869 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
874 def apply(self, changes: ChangeList) -> None:
875 """Apply a list of changes from to this collection
877 This method takes a list of changes generated by
878 `RichCollectionBase.diff` and applies it to this
879 collection. Afterward, the state of this collection object will
880 match the state of `end_collection` passed to `diff`. If a change
881 conflicts with a local change, it will be saved to an alternate path
882 indicating the conflict.
886 * changes: arvados.collection.ChangeList --- The list of differences
887 generated by `RichCollectionBase.diff`.
890 self.set_committed(False)
891 for change in changes:
892 event_type = change[0]
895 local = self.find(path)
896 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
898 if event_type == ADD:
900 # No local file at path, safe to copy over new file
901 self.copy(initial, path)
902 elif local is not None and local != initial:
903 # There is already local file and it is different:
904 # save change to conflict file.
905 self.copy(initial, conflictpath)
906 elif event_type == MOD or event_type == TOK:
909 # Local matches the "initial" item so it has not
910 # changed locally and is safe to update.
911 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
912 # Replace contents of local file with new contents
913 local.replace_contents(final)
915 # Overwrite path with new item; this can happen if
916 # path was a file and is now a collection or vice versa
917 self.copy(final, path, overwrite=True)
918 elif event_type == MOD:
919 # Local doesn't match the "start" value or local
920 # is missing (presumably deleted) so save change
921 # to conflict file. Don't do this for TOK events
922 # which means the file didn't change but only had
924 self.copy(final, conflictpath)
925 elif event_type == DEL:
927 # Local item matches "initial" value, so it is safe to remove.
928 self.remove(path, recursive=True)
929 # else, the file is modified or already removed, in either
930 # case we don't want to try to remove it.
932 def portable_data_hash(self) -> str:
933 """Get the portable data hash for this collection's manifest"""
934 if self._manifest_locator and self.committed():
935 # If the collection is already saved on the API server, and it's committed
936 # then return API server's PDH response.
937 return self._portable_data_hash
939 stripped = self.portable_manifest_text().encode()
940 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
943 def subscribe(self, callback: ChangeCallback) -> None:
944 """Set a notify callback for changes to this collection
948 * callback: arvados.collection.ChangeCallback --- The callable to
949 call each time the collection is changed.
951 if self._callback is None:
952 self._callback = callback
954 raise errors.ArgumentError("A callback is already set on this collection.")
957 def unsubscribe(self) -> None:
958 """Remove any notify callback set for changes to this collection"""
959 if self._callback is not None:
960 self._callback = None
966 collection: 'RichCollectionBase',
968 item: CollectionItem,
970 """Notify any subscribed callback about a change to this collection
972 .. ATTENTION:: Internal
973 This method is only meant to be used by other Collection methods.
975 If a callback has been registered with `RichCollectionBase.subscribe`,
976 it will be called with information about a change to this collection.
977 Then this notification will be propagated to this collection's root.
981 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
984 * collection: arvados.collection.RichCollectionBase --- The
985 collection that was modified.
987 * name: str --- The name of the file or stream within `collection` that
990 * item: arvados.arvfile.ArvadosFile |
991 arvados.collection.Subcollection --- For ADD events, the new
992 contents at `name` within `collection`; for DEL events, the
993 item that was removed. For MOD and TOK events, a 2-tuple of
994 the previous item and the new item (may be the same object
995 or different, depending on whether the action involved it
996 being modified in place or replaced).
1000 self._callback(event, collection, name, item)
1001 self.root_collection().notify(event, collection, name, item)
1004 def __eq__(self, other: Any) -> bool:
1005 """Indicate whether this collection object is equal to another"""
1008 if not isinstance(other, RichCollectionBase):
1010 if len(self._items) != len(other):
1012 for k in self._items:
1015 if self._items[k] != other[k]:
1019 def __ne__(self, other: Any) -> bool:
1020 """Indicate whether this collection object is not equal to another"""
1021 return not self.__eq__(other)
1024 def flush(self) -> None:
1025 """Upload any pending data to Keep"""
1026 for e in self.values():
1030 class Collection(RichCollectionBase):
1031 """Read and manipulate an Arvados collection
1033 This class provides a high-level interface to create, read, and update
1034 Arvados collections and their contents. Refer to the Arvados Python SDK
1035 cookbook for [an introduction to using the Collection class][cookbook].
1037 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1040 def __init__(self, manifest_locator_or_text: Optional[str]=None,
1041 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1042 keep_client: Optional['arvados.keep.KeepClient']=None,
1043 num_retries: int=10,
1044 parent: Optional['Collection']=None,
1045 apiconfig: Optional[Mapping[str, str]]=None,
1046 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1047 replication_desired: Optional[int]=None,
1048 storage_classes_desired: Optional[List[str]]=None,
1049 put_threads: Optional[int]=None):
1050 """Initialize a Collection object
1054 * manifest_locator_or_text: str | None --- This string can contain a
1055 collection manifest text, portable data hash, or UUID. When given a
1056 portable data hash or UUID, this instance will load a collection
1057 record from the API server. Otherwise, this instance will represent a
1058 new collection without an API server record. The default value `None`
1059 instantiates a new collection with an empty manifest.
1061 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1062 Arvados API client object this instance uses to make requests. If
1063 none is given, this instance creates its own client using the
1064 settings from `apiconfig` (see below). If your client instantiates
1065 many Collection objects, you can help limit memory utilization by
1066 calling `arvados.api.api` to construct an
1067 `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1068 for every Collection.
1070 * keep_client: arvados.keep.KeepClient | None --- The Keep client
1071 object this instance uses to make requests. If none is given, this
1072 instance creates its own client using its `api_client`.
1074 * num_retries: int --- The number of times that client requests are
1075 retried. Default 10.
1077 * parent: arvados.collection.Collection | None --- The parent Collection
1078 object of this instance, if any. This argument is primarily used by
1079 other Collection methods; user client code shouldn't need to use it.
1081 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1082 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1083 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1084 Collection object constructs one from these settings. If no
1085 mapping is provided, calls `arvados.config.settings` to get these
1086 parameters from user configuration.
1088 * block_manager: arvados.arvfile._BlockManager | None --- The
1089 _BlockManager object used by this instance to coordinate reading
1090 and writing Keep data blocks. If none is given, this instance
1091 constructs its own. This argument is primarily used by other
1092 Collection methods; user client code shouldn't need to use it.
1094 * replication_desired: int | None --- This controls both the value of
1095 the `replication_desired` field on API collection records saved by
1096 this class, as well as the number of Keep services that the object
1097 writes new data blocks to. If none is given, uses the default value
1098 configured for the cluster.
1100 * storage_classes_desired: list[str] | None --- This controls both
1101 the value of the `storage_classes_desired` field on API collection
1102 records saved by this class, as well as selecting which specific
1103 Keep services the object writes new data blocks to. If none is
1104 given, defaults to an empty list.
1106 * put_threads: int | None --- The number of threads to run
1107 simultaneously to upload data blocks to Keep. This value is used when
1108 building a new `block_manager`. It is unused when a `block_manager`
1112 if storage_classes_desired and type(storage_classes_desired) is not list:
1113 raise errors.ArgumentError("storage_classes_desired must be list type.")
1115 super(Collection, self).__init__(parent)
1116 self._api_client = api_client
1117 self._keep_client = keep_client
1119 # Use the keep client from ThreadSafeAPIClient
1120 if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1121 self._keep_client = self._api_client.keep
1123 self._block_manager = block_manager
1124 self.replication_desired = replication_desired
1125 self._storage_classes_desired = storage_classes_desired
1126 self.put_threads = put_threads
1129 self._config = apiconfig
1131 self._config = config.settings()
1133 self.num_retries = num_retries
1134 self._manifest_locator = None
1135 self._manifest_text = None
1136 self._portable_data_hash = None
1137 self._api_response = None
1138 self._token_refresh_timestamp = 0
1140 self.lock = threading.RLock()
1143 if manifest_locator_or_text:
1144 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1145 self._manifest_locator = manifest_locator_or_text
1146 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1147 self._manifest_locator = manifest_locator_or_text
1148 if not self._has_local_collection_uuid():
1149 self._has_remote_blocks = True
1150 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1151 self._manifest_text = manifest_locator_or_text
1152 if '+R' in self._manifest_text:
1153 self._has_remote_blocks = True
1155 raise errors.ArgumentError(
1156 "Argument to CollectionReader is not a manifest or a collection UUID")
1160 except errors.SyntaxError as e:
1161 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1163 def storage_classes_desired(self) -> List[str]:
1164 """Get this collection's `storage_classes_desired` value"""
1165 return self._storage_classes_desired or []
1167 def root_collection(self) -> 'Collection':
1170 def get_properties(self) -> Properties:
1171 """Get this collection's properties
1173 This method always returns a dict. If this collection object does not
1174 have an associated API record, or that record does not have any
1175 properties set, this method returns an empty dict.
1177 if self._api_response and self._api_response["properties"]:
1178 return self._api_response["properties"]
1182 def get_trash_at(self) -> Optional[datetime.datetime]:
1183 """Get this collection's `trash_at` field
1185 This method parses the `trash_at` field of the collection's API
1186 record and returns a datetime from it. If that field is not set, or
1187 this collection object does not have an associated API record,
1190 if self._api_response and self._api_response["trash_at"]:
1192 return ciso8601.parse_datetime(self._api_response["trash_at"])
1198 def stream_name(self) -> str:
1201 def writable(self) -> bool:
1208 other: Optional['Collection']=None,
1209 num_retries: Optional[int]=None,
1211 """Merge another collection's contents into this one
1213 This method compares the manifest of this collection instance with
1214 another, then updates this instance's manifest with changes from the
1215 other, renaming files to flag conflicts where necessary.
1217 When called without any arguments, this method reloads the collection's
1218 API record, and updates this instance with any changes that have
1219 appeared server-side. If this instance does not have a corresponding
1220 API record, this method raises `arvados.errors.ArgumentError`.
1224 * other: arvados.collection.Collection | None --- The collection
1225 whose contents should be merged into this instance. When not
1226 provided, this method reloads this collection's API record and
1227 constructs a Collection object from it. If this instance does not
1228 have a corresponding API record, this method raises
1229 `arvados.errors.ArgumentError`.
1231 * num_retries: int | None --- The number of times to retry reloading
1232 the collection's API record from the API server. If not specified,
1233 uses the `num_retries` provided when this instance was constructed.
1236 token_refresh_period = 60*60
1237 time_since_last_token_refresh = (time.time() - self._token_refresh_timestamp)
1238 upstream_response = None
1241 if self._manifest_locator is None:
1242 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1244 if re.match(arvados.util.portable_data_hash_pattern, self._manifest_locator) and time_since_last_token_refresh < token_refresh_period:
1247 upstream_response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1248 other = CollectionReader(upstream_response["manifest_text"])
1250 if self.committed():
1251 # 1st case, no local changes, content is the same
1252 if self.portable_data_hash() == other.portable_data_hash() and time_since_last_token_refresh < token_refresh_period:
1253 # No difference in content. Remember the API record
1254 # (metadata such as name or properties may have changed)
1255 # but don't update the token refresh timestamp.
1256 if upstream_response is not None:
1257 self._remember_api_response(upstream_response)
1260 # 2nd case, no local changes, but either upstream changed
1261 # or we want to refresh tokens.
1263 self.apply(self.diff(other))
1264 if upstream_response is not None:
1265 self._remember_api_response(upstream_response)
1266 self._update_token_timestamp()
1267 self.set_committed(True)
1270 # 3rd case, upstream changed, but we also have uncommitted
1271 # changes that we want to incorporate so they don't get lost.
1273 # _manifest_text stores the text from last time we received a
1274 # record from the API server. This is the state of the
1275 # collection before our uncommitted changes.
1276 baseline = Collection(self._manifest_text)
1278 # Get the set of changes between our baseline and the other
1279 # collection and apply them to self.
1281 # If a file was modified in both 'self' and 'other', the
1282 # 'apply' method keeps the contents of 'self' and creates a
1283 # conflict file with the contents of 'other'.
1284 self.apply(baseline.diff(other))
1286 # Remember the new baseline, changes to a file
1287 if upstream_response is not None:
1288 self._remember_api_response(upstream_response)
1293 if self._api_client is None:
1294 self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1295 if self._keep_client is None:
1296 self._keep_client = self._api_client.keep
1297 return self._api_client
1301 if self._keep_client is None:
1302 if self._api_client is None:
1305 self._keep_client = KeepClient(api_client=self._api_client)
1306 return self._keep_client
1309 def _my_block_manager(self):
1310 if self._block_manager is None:
1311 copies = (self.replication_desired or
1312 self._my_api()._rootDesc.get('defaultCollectionReplication',
1314 self._block_manager = _BlockManager(self._my_keep(),
1316 put_threads=self.put_threads,
1317 num_retries=self.num_retries,
1318 storage_classes_func=self.storage_classes_desired)
1319 return self._block_manager
1321 def _remember_api_response(self, response):
1322 self._api_response = response
1323 self._manifest_text = self._api_response['manifest_text']
1324 self._portable_data_hash = self._api_response['portable_data_hash']
1326 def _update_token_timestamp(self):
1327 self._token_refresh_timestamp = time.time()
1329 def _populate_from_api_server(self):
1330 # As in KeepClient itself, we must wait until the last
1331 # possible moment to instantiate an API client, in order to
1332 # avoid tripping up clients that don't have access to an API
1333 # server. If we do build one, make sure our Keep client uses
1334 # it. If instantiation fails, we'll fall back to the except
1335 # clause, just like any other Collection lookup
1336 # failure. Return an exception, or None if successful.
1337 self._remember_api_response(self._my_api().collections().get(
1338 uuid=self._manifest_locator).execute(
1339 num_retries=self.num_retries))
1341 # If not overriden via kwargs, we should try to load the
1342 # replication_desired and storage_classes_desired from the API server
1343 if self.replication_desired is None:
1344 self.replication_desired = self._api_response.get('replication_desired', None)
1345 if self._storage_classes_desired is None:
1346 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1348 def _populate(self):
1349 if self._manifest_text is None:
1350 if self._manifest_locator is None:
1353 self._populate_from_api_server()
1354 self._baseline_manifest = self._manifest_text
1355 self._import_manifest(self._manifest_text)
1357 def _has_collection_uuid(self):
1358 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1360 def _has_local_collection_uuid(self):
1361 return self._has_collection_uuid and \
1362 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1364 def __enter__(self):
1367 def __exit__(self, exc_type, exc_value, traceback):
1368 """Exit a context with this collection instance
1370 If no exception was raised inside the context block, and this
1371 collection is writable and has a corresponding API record, that
1372 record will be updated to match the state of this instance at the end
1375 if exc_type is None:
1376 if self.writable() and self._has_collection_uuid():
1380 def stop_threads(self) -> None:
1381 """Stop background Keep upload/download threads"""
1382 if self._block_manager is not None:
1383 self._block_manager.stop_threads()
1386 def manifest_locator(self) -> Optional[str]:
1387 """Get this collection's manifest locator, if any
1389 * If this collection instance is associated with an API record with a
1391 * Otherwise, if this collection instance was loaded from an API record
1392 by portable data hash, return that.
1393 * Otherwise, return `None`.
1395 return self._manifest_locator
1400 new_parent: Optional['Collection']=None,
1401 new_name: Optional[str]=None,
1402 readonly: bool=False,
1403 new_config: Optional[Mapping[str, str]]=None,
1405 """Create a Collection object with the same contents as this instance
1407 This method creates a new Collection object with contents that match
1408 this instance's. The new collection will not be associated with any API
1413 * new_parent: arvados.collection.Collection | None --- This value is
1414 passed to the new Collection's constructor as the `parent`
1417 * new_name: str | None --- This value is unused.
1419 * readonly: bool --- If this value is true, this method constructs and
1420 returns a `CollectionReader`. Otherwise, it returns a mutable
1421 `Collection`. Default `False`.
1423 * new_config: Mapping[str, str] | None --- This value is passed to the
1424 new Collection's constructor as `apiconfig`. If no value is provided,
1425 defaults to the configuration passed to this instance's constructor.
1427 if new_config is None:
1428 new_config = self._config
1430 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1432 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1434 newcollection._clonefrom(self)
1435 return newcollection
1438 def api_response(self) -> Optional[Dict[str, Any]]:
1439 """Get this instance's associated API record
1441 If this Collection instance has an associated API record, return it.
1442 Otherwise, return `None`.
1444 return self._api_response
1449 create_type: CreateType,
1450 ) -> CollectionItem:
1454 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1456 def find(self, path: str) -> CollectionItem:
1460 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1462 def remove(self, path: str, recursive: bool=False) -> None:
1464 raise errors.ArgumentError("Cannot remove '.'")
1466 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1473 properties: Optional[Properties]=None,
1474 storage_classes: Optional[StorageClasses]=None,
1475 trash_at: Optional[datetime.datetime]=None,
1477 num_retries: Optional[int]=None,
1478 preserve_version: bool=False,
1480 """Save collection to an existing API record
1482 This method updates the instance's corresponding API record to match
1483 the instance's state. If this instance does not have a corresponding API
1484 record yet, raises `AssertionError`. (To create a new API record, use
1485 `Collection.save_new`.) This method returns the saved collection
1490 * properties: dict[str, Any] | None --- If provided, the API record will
1491 be updated with these properties. Note this will completely replace
1492 any existing properties.
1494 * storage_classes: list[str] | None --- If provided, the API record will
1495 be updated with this value in the `storage_classes_desired` field.
1496 This value will also be saved on the instance and used for any
1497 changes that follow.
1499 * trash_at: datetime.datetime | None --- If provided, the API record
1500 will be updated with this value in the `trash_at` field.
1502 * merge: bool --- If `True` (the default), this method will first
1503 reload this collection's API record, and merge any new contents into
1504 this instance before saving changes. See `Collection.update` for
1507 * num_retries: int | None --- The number of times to retry reloading
1508 the collection's API record from the API server. If not specified,
1509 uses the `num_retries` provided when this instance was constructed.
1511 * preserve_version: bool --- This value will be passed to directly
1512 to the underlying API call. If `True`, the Arvados API will
1513 preserve the versions of this collection both immediately before
1514 and after the update. If `True` when the API server is not
1515 configured with collection versioning, this method raises
1516 `arvados.errors.ArgumentError`.
1518 if properties and type(properties) is not dict:
1519 raise errors.ArgumentError("properties must be dictionary type.")
1521 if storage_classes and type(storage_classes) is not list:
1522 raise errors.ArgumentError("storage_classes must be list type.")
1524 self._storage_classes_desired = storage_classes
1526 if trash_at and type(trash_at) is not datetime.datetime:
1527 raise errors.ArgumentError("trash_at must be datetime type.")
1529 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1530 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1534 body["properties"] = properties
1535 if self.storage_classes_desired():
1536 body["storage_classes_desired"] = self.storage_classes_desired()
1538 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1539 body["trash_at"] = t
1540 if preserve_version:
1541 body["preserve_version"] = preserve_version
1543 if not self.committed():
1544 if self._has_remote_blocks:
1545 # Copy any remote blocks to the local cluster.
1546 self._copy_remote_blocks(remote_blocks={})
1547 self._has_remote_blocks = False
1548 if not self._has_collection_uuid():
1549 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1550 elif not self._has_local_collection_uuid():
1551 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1553 self._my_block_manager().commit_all()
1558 text = self.manifest_text(strip=False)
1559 body['manifest_text'] = text
1561 self._remember_api_response(self._my_api().collections().update(
1562 uuid=self._manifest_locator,
1564 ).execute(num_retries=num_retries))
1565 self.set_committed(True)
1567 self._remember_api_response(self._my_api().collections().update(
1568 uuid=self._manifest_locator,
1570 ).execute(num_retries=num_retries))
1572 return self._manifest_text
1580 name: Optional[str]=None,
1581 create_collection_record: bool=True,
1582 owner_uuid: Optional[str]=None,
1583 properties: Optional[Properties]=None,
1584 storage_classes: Optional[StorageClasses]=None,
1585 trash_at: Optional[datetime.datetime]=None,
1586 ensure_unique_name: bool=False,
1587 num_retries: Optional[int]=None,
1588 preserve_version: bool=False,
1590 """Save collection to a new API record
1592 This method finishes uploading new data blocks and (optionally)
1593 creates a new API collection record with the provided data. If a new
1594 record is created, this instance becomes associated with that record
1595 for future updates like `save()`. This method returns the saved
1596 collection manifest.
1600 * name: str | None --- The `name` field to use on the new collection
1601 record. If not specified, a generic default name is generated.
1603 * create_collection_record: bool --- If `True` (the default), creates a
1604 collection record on the API server. If `False`, the method finishes
1605 all data uploads and only returns the resulting collection manifest
1606 without sending it to the API server.
1608 * owner_uuid: str | None --- The `owner_uuid` field to use on the
1609 new collection record.
1611 * properties: dict[str, Any] | None --- The `properties` field to use on
1612 the new collection record.
1614 * storage_classes: list[str] | None --- The
1615 `storage_classes_desired` field to use on the new collection record.
1617 * trash_at: datetime.datetime | None --- The `trash_at` field to use
1618 on the new collection record.
1620 * ensure_unique_name: bool --- This value is passed directly to the
1621 Arvados API when creating the collection record. If `True`, the API
1622 server may modify the submitted `name` to ensure the collection's
1623 `name`+`owner_uuid` combination is unique. If `False` (the default),
1624 if a collection already exists with this same `name`+`owner_uuid`
1625 combination, creating a collection record will raise a validation
1628 * num_retries: int | None --- The number of times to retry reloading
1629 the collection's API record from the API server. If not specified,
1630 uses the `num_retries` provided when this instance was constructed.
1632 * preserve_version: bool --- This value will be passed to directly
1633 to the underlying API call. If `True`, the Arvados API will
1634 preserve the versions of this collection both immediately before
1635 and after the update. If `True` when the API server is not
1636 configured with collection versioning, this method raises
1637 `arvados.errors.ArgumentError`.
1639 if properties and type(properties) is not dict:
1640 raise errors.ArgumentError("properties must be dictionary type.")
1642 if storage_classes and type(storage_classes) is not list:
1643 raise errors.ArgumentError("storage_classes must be list type.")
1645 if trash_at and type(trash_at) is not datetime.datetime:
1646 raise errors.ArgumentError("trash_at must be datetime type.")
1648 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1649 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1651 if self._has_remote_blocks:
1652 # Copy any remote blocks to the local cluster.
1653 self._copy_remote_blocks(remote_blocks={})
1654 self._has_remote_blocks = False
1657 self._storage_classes_desired = storage_classes
1659 self._my_block_manager().commit_all()
1660 text = self.manifest_text(strip=False)
1662 if create_collection_record:
1664 name = "New collection"
1665 ensure_unique_name = True
1667 body = {"manifest_text": text,
1669 "replication_desired": self.replication_desired}
1671 body["owner_uuid"] = owner_uuid
1673 body["properties"] = properties
1674 if self.storage_classes_desired():
1675 body["storage_classes_desired"] = self.storage_classes_desired()
1677 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1678 body["trash_at"] = t
1679 if preserve_version:
1680 body["preserve_version"] = preserve_version
1682 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1683 self._manifest_locator = self._api_response["uuid"]
1684 self.set_committed(True)
1688 _token_re = re.compile(r'(\S+)(\s+|$)')
1689 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1690 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1692 def _unescape_manifest_path(self, path):
1693 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1696 def _import_manifest(self, manifest_text):
1697 """Import a manifest into a `Collection`.
1700 The manifest text to import from.
1704 raise ArgumentError("Can only import manifest into an empty collection")
1713 for token_and_separator in self._token_re.finditer(manifest_text):
1714 tok = token_and_separator.group(1)
1715 sep = token_and_separator.group(2)
1717 if state == STREAM_NAME:
1718 # starting a new stream
1719 stream_name = self._unescape_manifest_path(tok)
1724 self.find_or_create(stream_name, COLLECTION)
1728 block_locator = self._block_re.match(tok)
1730 blocksize = int(block_locator.group(1))
1731 blocks.append(streams.Range(tok, streamoffset, blocksize, 0))
1732 streamoffset += blocksize
1736 if state == SEGMENTS:
1737 file_segment = self._segment_re.match(tok)
1739 pos = int(file_segment.group(1))
1740 size = int(file_segment.group(2))
1741 name = self._unescape_manifest_path(file_segment.group(3))
1742 if name.split('/')[-1] == '.':
1743 # placeholder for persisting an empty directory, not a real file
1745 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1747 filepath = os.path.join(stream_name, name)
1749 afile = self.find_or_create(filepath, FILE)
1750 except IOError as e:
1751 if e.errno == errno.ENOTDIR:
1752 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1755 if isinstance(afile, ArvadosFile):
1756 afile.add_segment(blocks, pos, size)
1758 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1761 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1767 self._update_token_timestamp()
1768 self.set_committed(True)
1774 collection: 'RichCollectionBase',
1776 item: CollectionItem,
1779 self._callback(event, collection, name, item)
1782 class Subcollection(RichCollectionBase):
1783 """Read and manipulate a stream/directory within an Arvados collection
1785 This class represents a single stream (like a directory) within an Arvados
1786 `Collection`. It is returned by `Collection.find` and provides the same API.
1787 Operations that work on the API collection record propagate to the parent
1788 `Collection` object.
1791 def __init__(self, parent, name):
1792 super(Subcollection, self).__init__(parent)
1793 self.lock = self.root_collection().lock
1794 self._manifest_text = None
1796 self.num_retries = parent.num_retries
1798 def root_collection(self) -> 'Collection':
1799 return self.parent.root_collection()
1801 def writable(self) -> bool:
1802 return self.root_collection().writable()
1805 return self.root_collection()._my_api()
1808 return self.root_collection()._my_keep()
1810 def _my_block_manager(self):
1811 return self.root_collection()._my_block_manager()
1813 def stream_name(self) -> str:
1814 return os.path.join(self.parent.stream_name(), self.name)
1819 new_parent: Optional['Collection']=None,
1820 new_name: Optional[str]=None,
1821 ) -> 'Subcollection':
1822 c = Subcollection(new_parent, new_name)
1828 def _reparent(self, newparent, newname):
1829 self.set_committed(False)
1831 self.parent.remove(self.name, recursive=True)
1832 self.parent = newparent
1834 self.lock = self.parent.root_collection().lock
1837 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1838 """Encode empty directories by using an \056-named (".") empty file"""
1839 if len(self._items) == 0:
1840 return "%s %s 0:0:\\056\n" % (
1841 streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1842 return super(Subcollection, self)._get_manifest_text(stream_name,
1847 class CollectionReader(Collection):
1848 """Read-only `Collection` subclass
1850 This class will never create or update any API collection records. You can
1851 use this class for additional code safety when you only need to read
1852 existing collections.
1854 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1855 self._in_init = True
1856 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1857 self._in_init = False
1859 # Forego any locking since it should never change once initialized.
1860 self.lock = NoopLock()
1862 # Backwards compatability with old CollectionReader
1863 # all_streams() and all_files()
1864 self._streams = None
1866 def writable(self) -> bool:
1867 return self._in_init