1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
28 from collections import deque
31 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
32 from .keep import KeepLocator, KeepClient
33 from ._normalize_stream import normalize_stream, escape
34 from ._ranges import Range, LocatorAndRange
35 from .safeapi import ThreadSafeApiCache
36 import arvados.config as config
37 import arvados.errors as errors
39 import arvados.events as events
40 from arvados.retry import retry_method
55 if sys.version_info < (3, 8):
56 from typing_extensions import Literal
58 from typing import Literal
60 _logger = logging.getLogger('arvados.collection')
63 """Argument value for `Collection` methods to represent an added item"""
65 """Argument value for `Collection` methods to represent a removed item"""
67 """Argument value for `Collection` methods to represent a modified item"""
69 """Argument value for `Collection` methods to represent an item with token differences"""
71 """`create_type` value for `Collection.find_or_create`"""
72 COLLECTION = "collection"
73 """`create_type` value for `Collection.find_or_create`"""
75 ChangeList = List[Union[
76 Tuple[Literal[ADD, DEL], str, 'Collection'],
77 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
79 ChangeType = Literal[ADD, DEL, MOD, TOK]
80 CollectionItem = Union[ArvadosFile, 'Collection']
81 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
82 CreateType = Literal[COLLECTION, FILE]
83 Properties = Dict[str, Any]
84 StorageClasses = List[str]
86 class CollectionBase(object):
87 """Abstract base class for Collection classes
89 .. ATTENTION:: Internal
90 This class is meant to be used by other parts of the SDK. User code
91 should instantiate or subclass `Collection` or one of its subclasses
96 """Enter a context block with this collection instance"""
99 def __exit__(self, exc_type, exc_value, traceback):
100 """Exit a context block with this collection instance"""
104 if self._keep_client is None:
105 self._keep_client = KeepClient(api_client=self._api_client,
106 num_retries=self.num_retries)
107 return self._keep_client
109 def stripped_manifest(self) -> str:
110 """Create a copy of the collection manifest with only size hints
112 This method returns a string with the current collection's manifest
113 text with all non-portable locator hints like permission hints and
114 remote cluster hints removed. The only hints in the returned manifest
117 raw = self.manifest_text()
119 for line in raw.split("\n"):
120 fields = line.split()
122 clean_fields = fields[:1] + [
123 (re.sub(r'\+[^\d][^\+]*', '', x)
124 if re.match(arvados.util.keep_locator_pattern, x)
127 clean += [' '.join(clean_fields), "\n"]
128 return ''.join(clean)
131 class _WriterFile(_FileLikeObjectBase):
132 def __init__(self, coll_writer, name):
133 super(_WriterFile, self).__init__(name, 'wb')
134 self.dest = coll_writer
137 super(_WriterFile, self).close()
138 self.dest.finish_current_file()
140 @_FileLikeObjectBase._before_close
141 def write(self, data):
142 self.dest.write(data)
144 @_FileLikeObjectBase._before_close
145 def writelines(self, seq):
149 @_FileLikeObjectBase._before_close
151 self.dest.flush_data()
154 class RichCollectionBase(CollectionBase):
155 """Base class for Collection classes
157 .. ATTENTION:: Internal
158 This class is meant to be used by other parts of the SDK. User code
159 should instantiate or subclass `Collection` or one of its subclasses
163 def __init__(self, parent=None):
165 self._committed = False
166 self._has_remote_blocks = False
167 self._callback = None
171 raise NotImplementedError()
174 raise NotImplementedError()
176 def _my_block_manager(self):
177 raise NotImplementedError()
179 def writable(self) -> bool:
180 """Indicate whether this collection object can be modified
182 This method returns `False` if this object is a `CollectionReader`,
185 raise NotImplementedError()
187 def root_collection(self) -> 'Collection':
188 """Get this collection's root collection object
190 If you open a subcollection with `Collection.find`, calling this method
191 on that subcollection returns the source Collection object.
193 raise NotImplementedError()
195 def stream_name(self) -> str:
196 """Get the name of the manifest stream represented by this collection
198 If you open a subcollection with `Collection.find`, calling this method
199 on that subcollection returns the name of the stream you opened.
201 raise NotImplementedError()
204 def has_remote_blocks(self) -> bool:
205 """Indiciate whether the collection refers to remote data
207 Returns `True` if the collection manifest includes any Keep locators
208 with a remote hint (`+R`), else `False`.
210 if self._has_remote_blocks:
213 if self[item].has_remote_blocks():
218 def set_has_remote_blocks(self, val: bool) -> None:
219 """Cache whether this collection refers to remote blocks
221 .. ATTENTION:: Internal
222 This method is only meant to be used by other Collection methods.
224 Set this collection's cached "has remote blocks" flag to the given
227 self._has_remote_blocks = val
229 self.parent.set_has_remote_blocks(val)
236 create_type: CreateType,
238 """Get the item at the given path, creating it if necessary
240 If `path` refers to a stream in this collection, returns a
241 corresponding `Subcollection` object. If `path` refers to a file in
242 this collection, returns a corresponding
243 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
244 this collection, then this method creates a new object and returns
245 it, creating parent streams as needed. The type of object created is
246 determined by the value of `create_type`.
250 * path: str --- The path to find or create within this collection.
252 * create_type: Literal[COLLECTION, FILE] --- The type of object to
253 create at `path` if one does not exist. Passing `COLLECTION`
254 creates a stream and returns the corresponding
255 `Subcollection`. Passing `FILE` creates a new file and returns the
256 corresponding `arvados.arvfile.ArvadosFile`.
258 pathcomponents = path.split("/", 1)
259 if pathcomponents[0]:
260 item = self._items.get(pathcomponents[0])
261 if len(pathcomponents) == 1:
264 if create_type == COLLECTION:
265 item = Subcollection(self, pathcomponents[0])
267 item = ArvadosFile(self, pathcomponents[0])
268 self._items[pathcomponents[0]] = item
269 self.set_committed(False)
270 self.notify(ADD, self, pathcomponents[0], item)
274 # create new collection
275 item = Subcollection(self, pathcomponents[0])
276 self._items[pathcomponents[0]] = item
277 self.set_committed(False)
278 self.notify(ADD, self, pathcomponents[0], item)
279 if isinstance(item, RichCollectionBase):
280 return item.find_or_create(pathcomponents[1], create_type)
282 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
287 def find(self, path: str) -> CollectionItem:
288 """Get the item at the given path
290 If `path` refers to a stream in this collection, returns a
291 corresponding `Subcollection` object. If `path` refers to a file in
292 this collection, returns a corresponding
293 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
294 this collection, then this method raises `NotADirectoryError`.
298 * path: str --- The path to find or create within this collection.
301 raise errors.ArgumentError("Parameter 'path' is empty.")
303 pathcomponents = path.split("/", 1)
304 if pathcomponents[0] == '':
305 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
307 item = self._items.get(pathcomponents[0])
310 elif len(pathcomponents) == 1:
313 if isinstance(item, RichCollectionBase):
314 if pathcomponents[1]:
315 return item.find(pathcomponents[1])
319 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
322 def mkdirs(self, path: str) -> 'Subcollection':
323 """Create and return a subcollection at `path`
325 If `path` exists within this collection, raises `FileExistsError`.
326 Otherwise, creates a stream at that path and returns the
327 corresponding `Subcollection`.
329 if self.find(path) != None:
330 raise IOError(errno.EEXIST, "Directory or file exists", path)
332 return self.find_or_create(path, COLLECTION)
338 encoding: Optional[str]=None,
340 """Open a file-like object within the collection
342 This method returns a file-like object that can read and/or write the
343 file located at `path` within the collection. If you attempt to write
344 a `path` that does not exist, the file is created with `find_or_create`.
345 If the file cannot be opened for any other reason, this method raises
346 `OSError` with an appropriate errno.
350 * path: str --- The path of the file to open within this collection
352 * mode: str --- The mode to open this file. Supports all the same
353 values as `builtins.open`.
355 * encoding: str | None --- The text encoding of the file. Only used
356 when the file is opened in text mode. The default is
359 if not re.search(r'^[rwa][bt]?\+?$', mode):
360 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
362 if mode[0] == 'r' and '+' not in mode:
363 fclass = ArvadosFileReader
364 arvfile = self.find(path)
365 elif not self.writable():
366 raise IOError(errno.EROFS, "Collection is read only")
368 fclass = ArvadosFileWriter
369 arvfile = self.find_or_create(path, FILE)
372 raise IOError(errno.ENOENT, "File not found", path)
373 if not isinstance(arvfile, ArvadosFile):
374 raise IOError(errno.EISDIR, "Is a directory", path)
379 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
380 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
382 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
383 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
386 def modified(self) -> bool:
387 """Indicate whether this collection has an API server record
389 Returns `False` if this collection corresponds to a record loaded from
390 the API server, `True` otherwise.
392 return not self.committed()
396 """Indicate whether this collection has an API server record
398 Returns `True` if this collection corresponds to a record loaded from
399 the API server, `False` otherwise.
401 return self._committed
404 def set_committed(self, value: bool=True):
405 """Cache whether this collection has an API server record
407 .. ATTENTION:: Internal
408 This method is only meant to be used by other Collection methods.
410 Set this collection's cached "committed" flag to the given
411 value and propagates it as needed.
413 if value == self._committed:
416 for k,v in self._items.items():
417 v.set_committed(True)
418 self._committed = True
420 self._committed = False
421 if self.parent is not None:
422 self.parent.set_committed(False)
425 def __iter__(self) -> Iterator[str]:
426 """Iterate names of streams and files in this collection
428 This method does not recurse. It only iterates the contents of this
429 collection's corresponding stream.
431 return iter(self._items)
434 def __getitem__(self, k: str) -> CollectionItem:
435 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
437 This method does not recurse. If you want to search a path, use
438 `RichCollectionBase.find` instead.
440 return self._items[k]
443 def __contains__(self, k: str) -> bool:
444 """Indicate whether this collection has an item with this name
446 This method does not recurse. It you want to check a path, use
447 `RichCollectionBase.exists` instead.
449 return k in self._items
453 """Get the number of items directly contained in this collection
455 This method does not recurse. It only counts the streams and files
456 in this collection's corresponding stream.
458 return len(self._items)
462 def __delitem__(self, p: str) -> None:
463 """Delete an item from this collection's stream
465 This method does not recurse. If you want to remove an item by a
466 path, use `RichCollectionBase.remove` instead.
469 self.set_committed(False)
470 self.notify(DEL, self, p, None)
473 def keys(self) -> Iterator[str]:
474 """Iterate names of streams and files in this collection
476 This method does not recurse. It only iterates the contents of this
477 collection's corresponding stream.
479 return self._items.keys()
482 def values(self) -> List[CollectionItem]:
483 """Get a list of objects in this collection's stream
485 The return value includes a `Subcollection` for every stream, and an
486 `arvados.arvfile.ArvadosFile` for every file, directly within this
487 collection's stream. This method does not recurse.
489 return list(self._items.values())
492 def items(self) -> List[Tuple[str, CollectionItem]]:
493 """Get a list of `(name, object)` tuples from this collection's stream
495 The return value includes a `Subcollection` for every stream, and an
496 `arvados.arvfile.ArvadosFile` for every file, directly within this
497 collection's stream. This method does not recurse.
499 return list(self._items.items())
501 def exists(self, path: str) -> bool:
502 """Indicate whether this collection includes an item at `path`
504 This method returns `True` if `path` refers to a stream or file within
505 this collection, else `False`.
509 * path: str --- The path to check for existence within this collection
511 return self.find(path) is not None
515 def remove(self, path: str, recursive: bool=False) -> None:
516 """Remove the file or stream at `path`
520 * path: str --- The path of the item to remove from the collection
522 * recursive: bool --- Controls the method's behavior if `path` refers
523 to a nonempty stream. If `False` (the default), this method raises
524 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
525 items under the stream.
528 raise errors.ArgumentError("Parameter 'path' is empty.")
530 pathcomponents = path.split("/", 1)
531 item = self._items.get(pathcomponents[0])
533 raise IOError(errno.ENOENT, "File not found", path)
534 if len(pathcomponents) == 1:
535 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
536 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
537 deleteditem = self._items[pathcomponents[0]]
538 del self._items[pathcomponents[0]]
539 self.set_committed(False)
540 self.notify(DEL, self, pathcomponents[0], deleteditem)
542 item.remove(pathcomponents[1], recursive=recursive)
544 def _clonefrom(self, source):
545 for k,v in source.items():
546 self._items[k] = v.clone(self, k)
549 raise NotImplementedError()
555 source_obj: CollectionItem,
557 overwrite: bool=False,
558 reparent: bool=False,
560 """Copy or move a file or subcollection object to this collection
564 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
565 to add to this collection
567 * target_name: str --- The path inside this collection where
568 `source_obj` should be added.
570 * overwrite: bool --- Controls the behavior of this method when the
571 collection already contains an object at `target_name`. If `False`
572 (the default), this method will raise `FileExistsError`. If `True`,
573 the object at `target_name` will be replaced with `source_obj`.
575 * reparent: bool --- Controls whether this method copies or moves
576 `source_obj`. If `False` (the default), `source_obj` is copied into
577 this collection. If `True`, `source_obj` is moved into this
580 if target_name in self and not overwrite:
581 raise IOError(errno.EEXIST, "File already exists", target_name)
584 if target_name in self:
585 modified_from = self[target_name]
587 # Actually make the move or copy.
589 source_obj._reparent(self, target_name)
592 item = source_obj.clone(self, target_name)
594 self._items[target_name] = item
595 self.set_committed(False)
596 if not self._has_remote_blocks and source_obj.has_remote_blocks():
597 self.set_has_remote_blocks(True)
600 self.notify(MOD, self, target_name, (modified_from, item))
602 self.notify(ADD, self, target_name, item)
604 def _get_src_target(self, source, target_path, source_collection, create_dest):
605 if source_collection is None:
606 source_collection = self
609 if isinstance(source, str):
610 source_obj = source_collection.find(source)
611 if source_obj is None:
612 raise IOError(errno.ENOENT, "File not found", source)
613 sourcecomponents = source.split("/")
616 sourcecomponents = None
618 # Find parent collection the target path
619 targetcomponents = target_path.split("/")
621 # Determine the name to use.
622 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
625 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
628 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
630 if len(targetcomponents) > 1:
631 target_dir = self.find("/".join(targetcomponents[0:-1]))
635 if target_dir is None:
636 raise IOError(errno.ENOENT, "Target directory not found", target_name)
638 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
639 target_dir = target_dir[target_name]
640 target_name = sourcecomponents[-1]
642 return (source_obj, target_dir, target_name)
648 source: Union[str, CollectionItem],
650 source_collection: Optional['RichCollectionBase']=None,
651 overwrite: bool=False,
653 """Copy a file or subcollection object to this collection
657 * source: str | arvados.arvfile.ArvadosFile |
658 arvados.collection.Subcollection --- The file or subcollection to
659 add to this collection. If `source` is a str, the object will be
660 found by looking up this path from `source_collection` (see
663 * target_path: str --- The path inside this collection where the
664 source object should be added.
666 * source_collection: arvados.collection.Collection | None --- The
667 collection to find the source object from when `source` is a
668 path. Defaults to the current collection (`self`).
670 * overwrite: bool --- Controls the behavior of this method when the
671 collection already contains an object at `target_path`. If `False`
672 (the default), this method will raise `FileExistsError`. If `True`,
673 the object at `target_path` will be replaced with `source_obj`.
675 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
676 target_dir.add(source_obj, target_name, overwrite, False)
682 source: Union[str, CollectionItem],
684 source_collection: Optional['RichCollectionBase']=None,
685 overwrite: bool=False,
687 """Move a file or subcollection object to this collection
691 * source: str | arvados.arvfile.ArvadosFile |
692 arvados.collection.Subcollection --- The file or subcollection to
693 add to this collection. If `source` is a str, the object will be
694 found by looking up this path from `source_collection` (see
697 * target_path: str --- The path inside this collection where the
698 source object should be added.
700 * source_collection: arvados.collection.Collection | None --- The
701 collection to find the source object from when `source` is a
702 path. Defaults to the current collection (`self`).
704 * overwrite: bool --- Controls the behavior of this method when the
705 collection already contains an object at `target_path`. If `False`
706 (the default), this method will raise `FileExistsError`. If `True`,
707 the object at `target_path` will be replaced with `source_obj`.
709 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
710 if not source_obj.writable():
711 raise IOError(errno.EROFS, "Source collection is read only", source)
712 target_dir.add(source_obj, target_name, overwrite, True)
714 def portable_manifest_text(self, stream_name: str=".") -> str:
715 """Get the portable manifest text for this collection
717 The portable manifest text is normalized, and does not include access
718 tokens. This method does not flush outstanding blocks to Keep.
722 * stream_name: str --- The name to use for this collection's stream in
723 the generated manifest. Default `'.'`.
725 return self._get_manifest_text(stream_name, True, True)
730 stream_name: str=".",
732 normalize: bool=False,
733 only_committed: bool=False,
735 """Get the manifest text for this collection
739 * stream_name: str --- The name to use for this collection's stream in
740 the generated manifest. Default `'.'`.
742 * strip: bool --- Controls whether or not the returned manifest text
743 includes access tokens. If `False` (the default), the manifest text
744 will include access tokens. If `True`, the manifest text will not
745 include access tokens.
747 * normalize: bool --- Controls whether or not the returned manifest
748 text is normalized. Default `False`.
750 * only_committed: bool --- Controls whether or not this method uploads
751 pending data to Keep before building and returning the manifest text.
752 If `False` (the default), this method will finish uploading all data
753 to Keep, then return the final manifest. If `True`, this method will
754 build and return a manifest that only refers to the data that has
755 finished uploading at the time this method was called.
757 if not only_committed:
758 self._my_block_manager().commit_all()
759 return self._get_manifest_text(stream_name, strip, normalize,
760 only_committed=only_committed)
763 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
764 """Get the manifest text for this collection, sub collections and files.
767 Name to use for this stream (directory)
770 If True, remove signing tokens from block locators if present.
771 If False (default), block locators are left unchanged.
774 If True, always export the manifest text in normalized form
775 even if the Collection is not modified. If False (default) and the collection
776 is not modified, return the original manifest text even if it is not
780 If True, only include blocks that were already committed to Keep.
784 if not self.committed() or self._manifest_text is None or normalize:
787 sorted_keys = sorted(self.keys())
788 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
789 # Create a stream per file `k`
790 arvfile = self[filename]
792 for segment in arvfile.segments():
793 loc = segment.locator
794 if arvfile.parent._my_block_manager().is_bufferblock(loc):
797 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
799 loc = KeepLocator(loc).stripped()
800 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
801 segment.segment_offset, segment.range_size))
802 stream[filename] = filestream
804 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
805 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
806 buf.append(self[dirname].manifest_text(
807 stream_name=os.path.join(stream_name, dirname),
808 strip=strip, normalize=True, only_committed=only_committed))
812 return self.stripped_manifest()
814 return self._manifest_text
817 def _copy_remote_blocks(self, remote_blocks={}):
818 """Scan through the entire collection and ask Keep to copy remote blocks.
820 When accessing a remote collection, blocks will have a remote signature
821 (+R instead of +A). Collect these signatures and request Keep to copy the
822 blocks to the local cluster, returning local (+A) signatures.
825 Shared cache of remote to local block mappings. This is used to avoid
826 doing extra work when blocks are shared by more than one file in
827 different subdirectories.
831 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
837 end_collection: 'RichCollectionBase',
839 holding_collection: Optional['Collection']=None,
841 """Build a list of differences between this collection and another
845 * end_collection: arvados.collection.RichCollectionBase --- A
846 collection object with the desired end state. The returned diff
847 list will describe how to go from the current collection object
848 `self` to `end_collection`.
850 * prefix: str --- The name to use for this collection's stream in
851 the diff list. Default `'.'`.
853 * holding_collection: arvados.collection.Collection | None --- A
854 collection object used to hold objects for the returned diff
855 list. By default, a new empty collection is created.
858 if holding_collection is None:
859 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
861 if k not in end_collection:
862 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
863 for k in end_collection:
865 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
866 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
867 elif end_collection[k] != self[k]:
868 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
870 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
872 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
877 def apply(self, changes: ChangeList) -> None:
878 """Apply a list of changes from to this collection
880 This method takes a list of changes generated by
881 `RichCollectionBase.diff` and applies it to this
882 collection. Afterward, the state of this collection object will
883 match the state of `end_collection` passed to `diff`. If a change
884 conflicts with a local change, it will be saved to an alternate path
885 indicating the conflict.
889 * changes: arvados.collection.ChangeList --- The list of differences
890 generated by `RichCollectionBase.diff`.
893 self.set_committed(False)
894 for change in changes:
895 event_type = change[0]
898 local = self.find(path)
899 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
901 if event_type == ADD:
903 # No local file at path, safe to copy over new file
904 self.copy(initial, path)
905 elif local is not None and local != initial:
906 # There is already local file and it is different:
907 # save change to conflict file.
908 self.copy(initial, conflictpath)
909 elif event_type == MOD or event_type == TOK:
912 # Local matches the "initial" item so it has not
913 # changed locally and is safe to update.
914 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
915 # Replace contents of local file with new contents
916 local.replace_contents(final)
918 # Overwrite path with new item; this can happen if
919 # path was a file and is now a collection or vice versa
920 self.copy(final, path, overwrite=True)
922 # Local is missing (presumably deleted) or local doesn't
923 # match the "start" value, so save change to conflict file
924 self.copy(final, conflictpath)
925 elif event_type == DEL:
927 # Local item matches "initial" value, so it is safe to remove.
928 self.remove(path, recursive=True)
929 # else, the file is modified or already removed, in either
930 # case we don't want to try to remove it.
932 def portable_data_hash(self) -> str:
933 """Get the portable data hash for this collection's manifest"""
934 if self._manifest_locator and self.committed():
935 # If the collection is already saved on the API server, and it's committed
936 # then return API server's PDH response.
937 return self._portable_data_hash
939 stripped = self.portable_manifest_text().encode()
940 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
943 def subscribe(self, callback: ChangeCallback) -> None:
944 """Set a notify callback for changes to this collection
948 * callback: arvados.collection.ChangeCallback --- The callable to
949 call each time the collection is changed.
951 if self._callback is None:
952 self._callback = callback
954 raise errors.ArgumentError("A callback is already set on this collection.")
957 def unsubscribe(self) -> None:
958 """Remove any notify callback set for changes to this collection"""
959 if self._callback is not None:
960 self._callback = None
966 collection: 'RichCollectionBase',
968 item: CollectionItem,
970 """Notify any subscribed callback about a change to this collection
972 .. ATTENTION:: Internal
973 This method is only meant to be used by other Collection methods.
975 If a callback has been registered with `RichCollectionBase.subscribe`,
976 it will be called with information about a change to this collection.
977 Then this notification will be propagated to this collection's root.
981 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
984 * collection: arvados.collection.RichCollectionBase --- The
985 collection that was modified.
987 * name: str --- The name of the file or stream within `collection` that
990 * item: arvados.arvfile.ArvadosFile |
991 arvados.collection.Subcollection --- The new contents at `name`
995 self._callback(event, collection, name, item)
996 self.root_collection().notify(event, collection, name, item)
999 def __eq__(self, other: Any) -> bool:
1000 """Indicate whether this collection object is equal to another"""
1003 if not isinstance(other, RichCollectionBase):
1005 if len(self._items) != len(other):
1007 for k in self._items:
1010 if self._items[k] != other[k]:
1014 def __ne__(self, other: Any) -> bool:
1015 """Indicate whether this collection object is not equal to another"""
1016 return not self.__eq__(other)
1019 def flush(self) -> None:
1020 """Upload any pending data to Keep"""
1021 for e in self.values():
1025 class Collection(RichCollectionBase):
1026 """Read and manipulate an Arvados collection
1028 This class provides a high-level interface to create, read, and update
1029 Arvados collections and their contents. Refer to the Arvados Python SDK
1030 cookbook for [an introduction to using the Collection class][cookbook].
1032 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1035 def __init__(self, manifest_locator_or_text: Optional[str]=None,
1036 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1037 keep_client: Optional['arvados.keep.KeepClient']=None,
1038 num_retries: int=10,
1039 parent: Optional['Collection']=None,
1040 apiconfig: Optional[Mapping[str, str]]=None,
1041 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1042 replication_desired: Optional[int]=None,
1043 storage_classes_desired: Optional[List[str]]=None,
1044 put_threads: Optional[int]=None):
1045 """Initialize a Collection object
1049 * manifest_locator_or_text: str | None --- This string can contain a
1050 collection manifest text, portable data hash, or UUID. When given a
1051 portable data hash or UUID, this instance will load a collection
1052 record from the API server. Otherwise, this instance will represent a
1053 new collection without an API server record. The default value `None`
1054 instantiates a new collection with an empty manifest.
1056 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1057 Arvados API client object this instance uses to make requests. If
1058 none is given, this instance creates its own client using the
1059 settings from `apiconfig` (see below). If your client instantiates
1060 many Collection objects, you can help limit memory utilization by
1061 calling `arvados.api.api` to construct an
1062 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1063 for every Collection.
1065 * keep_client: arvados.keep.KeepClient | None --- The Keep client
1066 object this instance uses to make requests. If none is given, this
1067 instance creates its own client using its `api_client`.
1069 * num_retries: int --- The number of times that client requests are
1070 retried. Default 10.
1072 * parent: arvados.collection.Collection | None --- The parent Collection
1073 object of this instance, if any. This argument is primarily used by
1074 other Collection methods; user client code shouldn't need to use it.
1076 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1077 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1078 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1079 Collection object constructs one from these settings. If no
1080 mapping is provided, calls `arvados.config.settings` to get these
1081 parameters from user configuration.
1083 * block_manager: arvados.arvfile._BlockManager | None --- The
1084 _BlockManager object used by this instance to coordinate reading
1085 and writing Keep data blocks. If none is given, this instance
1086 constructs its own. This argument is primarily used by other
1087 Collection methods; user client code shouldn't need to use it.
1089 * replication_desired: int | None --- This controls both the value of
1090 the `replication_desired` field on API collection records saved by
1091 this class, as well as the number of Keep services that the object
1092 writes new data blocks to. If none is given, uses the default value
1093 configured for the cluster.
1095 * storage_classes_desired: list[str] | None --- This controls both
1096 the value of the `storage_classes_desired` field on API collection
1097 records saved by this class, as well as selecting which specific
1098 Keep services the object writes new data blocks to. If none is
1099 given, defaults to an empty list.
1101 * put_threads: int | None --- The number of threads to run
1102 simultaneously to upload data blocks to Keep. This value is used when
1103 building a new `block_manager`. It is unused when a `block_manager`
1107 if storage_classes_desired and type(storage_classes_desired) is not list:
1108 raise errors.ArgumentError("storage_classes_desired must be list type.")
1110 super(Collection, self).__init__(parent)
1111 self._api_client = api_client
1112 self._keep_client = keep_client
1114 # Use the keep client from ThreadSafeApiCache
1115 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1116 self._keep_client = self._api_client.keep
1118 self._block_manager = block_manager
1119 self.replication_desired = replication_desired
1120 self._storage_classes_desired = storage_classes_desired
1121 self.put_threads = put_threads
1124 self._config = apiconfig
1126 self._config = config.settings()
1128 self.num_retries = num_retries
1129 self._manifest_locator = None
1130 self._manifest_text = None
1131 self._portable_data_hash = None
1132 self._api_response = None
1133 self._past_versions = set()
1135 self.lock = threading.RLock()
1138 if manifest_locator_or_text:
1139 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1140 self._manifest_locator = manifest_locator_or_text
1141 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1142 self._manifest_locator = manifest_locator_or_text
1143 if not self._has_local_collection_uuid():
1144 self._has_remote_blocks = True
1145 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1146 self._manifest_text = manifest_locator_or_text
1147 if '+R' in self._manifest_text:
1148 self._has_remote_blocks = True
1150 raise errors.ArgumentError(
1151 "Argument to CollectionReader is not a manifest or a collection UUID")
1155 except errors.SyntaxError as e:
1156 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1158 def storage_classes_desired(self) -> List[str]:
1159 """Get this collection's `storage_classes_desired` value"""
1160 return self._storage_classes_desired or []
1162 def root_collection(self) -> 'Collection':
1165 def get_properties(self) -> Properties:
1166 """Get this collection's properties
1168 This method always returns a dict. If this collection object does not
1169 have an associated API record, or that record does not have any
1170 properties set, this method returns an empty dict.
1172 if self._api_response and self._api_response["properties"]:
1173 return self._api_response["properties"]
1177 def get_trash_at(self) -> Optional[datetime.datetime]:
1178 """Get this collection's `trash_at` field
1180 This method parses the `trash_at` field of the collection's API
1181 record and returns a datetime from it. If that field is not set, or
1182 this collection object does not have an associated API record,
1185 if self._api_response and self._api_response["trash_at"]:
1187 return ciso8601.parse_datetime(self._api_response["trash_at"])
1193 def stream_name(self) -> str:
1196 def writable(self) -> bool:
1200 def known_past_version(
1202 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1204 """Indicate whether an API record for this collection has been seen before
1206 As this collection object loads records from the API server, it records
1207 their `modified_at` and `portable_data_hash` fields. This method accepts
1208 a 2-tuple with values for those fields, and returns `True` if the
1209 combination was previously loaded.
1211 return modified_at_and_portable_data_hash in self._past_versions
1217 other: Optional['Collection']=None,
1218 num_retries: Optional[int]=None,
1220 """Merge another collection's contents into this one
1222 This method compares the manifest of this collection instance with
1223 another, then updates this instance's manifest with changes from the
1224 other, renaming files to flag conflicts where necessary.
1226 When called without any arguments, this method reloads the collection's
1227 API record, and updates this instance with any changes that have
1228 appeared server-side. If this instance does not have a corresponding
1229 API record, this method raises `arvados.errors.ArgumentError`.
1233 * other: arvados.collection.Collection | None --- The collection
1234 whose contents should be merged into this instance. When not
1235 provided, this method reloads this collection's API record and
1236 constructs a Collection object from it. If this instance does not
1237 have a corresponding API record, this method raises
1238 `arvados.errors.ArgumentError`.
1240 * num_retries: int | None --- The number of times to retry reloading
1241 the collection's API record from the API server. If not specified,
1242 uses the `num_retries` provided when this instance was constructed.
1245 if self._manifest_locator is None:
1246 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1247 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1248 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1249 response.get("portable_data_hash") != self.portable_data_hash()):
1250 # The record on the server is different from our current one, but we've seen it before,
1251 # so ignore it because it's already been merged.
1252 # However, if it's the same as our current record, proceed with the update, because we want to update
1256 self._remember_api_response(response)
1257 other = CollectionReader(response["manifest_text"])
1258 baseline = CollectionReader(self._manifest_text)
1259 self.apply(baseline.diff(other))
1260 self._manifest_text = self.manifest_text()
1264 if self._api_client is None:
1265 self._api_client = ThreadSafeApiCache(self._config, version='v1')
1266 if self._keep_client is None:
1267 self._keep_client = self._api_client.keep
1268 return self._api_client
1272 if self._keep_client is None:
1273 if self._api_client is None:
1276 self._keep_client = KeepClient(api_client=self._api_client)
1277 return self._keep_client
1280 def _my_block_manager(self):
1281 if self._block_manager is None:
1282 copies = (self.replication_desired or
1283 self._my_api()._rootDesc.get('defaultCollectionReplication',
1285 self._block_manager = _BlockManager(self._my_keep(),
1287 put_threads=self.put_threads,
1288 num_retries=self.num_retries,
1289 storage_classes_func=self.storage_classes_desired)
1290 return self._block_manager
1292 def _remember_api_response(self, response):
1293 self._api_response = response
1294 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1296 def _populate_from_api_server(self):
1297 # As in KeepClient itself, we must wait until the last
1298 # possible moment to instantiate an API client, in order to
1299 # avoid tripping up clients that don't have access to an API
1300 # server. If we do build one, make sure our Keep client uses
1301 # it. If instantiation fails, we'll fall back to the except
1302 # clause, just like any other Collection lookup
1303 # failure. Return an exception, or None if successful.
1304 self._remember_api_response(self._my_api().collections().get(
1305 uuid=self._manifest_locator).execute(
1306 num_retries=self.num_retries))
1307 self._manifest_text = self._api_response['manifest_text']
1308 self._portable_data_hash = self._api_response['portable_data_hash']
1309 # If not overriden via kwargs, we should try to load the
1310 # replication_desired and storage_classes_desired from the API server
1311 if self.replication_desired is None:
1312 self.replication_desired = self._api_response.get('replication_desired', None)
1313 if self._storage_classes_desired is None:
1314 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1316 def _populate(self):
1317 if self._manifest_text is None:
1318 if self._manifest_locator is None:
1321 self._populate_from_api_server()
1322 self._baseline_manifest = self._manifest_text
1323 self._import_manifest(self._manifest_text)
1325 def _has_collection_uuid(self):
1326 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1328 def _has_local_collection_uuid(self):
1329 return self._has_collection_uuid and \
1330 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1332 def __enter__(self):
1335 def __exit__(self, exc_type, exc_value, traceback):
1336 """Exit a context with this collection instance
1338 If no exception was raised inside the context block, and this
1339 collection is writable and has a corresponding API record, that
1340 record will be updated to match the state of this instance at the end
1343 if exc_type is None:
1344 if self.writable() and self._has_collection_uuid():
1348 def stop_threads(self) -> None:
1349 """Stop background Keep upload/download threads"""
1350 if self._block_manager is not None:
1351 self._block_manager.stop_threads()
1354 def manifest_locator(self) -> Optional[str]:
1355 """Get this collection's manifest locator, if any
1357 * If this collection instance is associated with an API record with a
1359 * Otherwise, if this collection instance was loaded from an API record
1360 by portable data hash, return that.
1361 * Otherwise, return `None`.
1363 return self._manifest_locator
1368 new_parent: Optional['Collection']=None,
1369 new_name: Optional[str]=None,
1370 readonly: bool=False,
1371 new_config: Optional[Mapping[str, str]]=None,
1373 """Create a Collection object with the same contents as this instance
1375 This method creates a new Collection object with contents that match
1376 this instance's. The new collection will not be associated with any API
1381 * new_parent: arvados.collection.Collection | None --- This value is
1382 passed to the new Collection's constructor as the `parent`
1385 * new_name: str | None --- This value is unused.
1387 * readonly: bool --- If this value is true, this method constructs and
1388 returns a `CollectionReader`. Otherwise, it returns a mutable
1389 `Collection`. Default `False`.
1391 * new_config: Mapping[str, str] | None --- This value is passed to the
1392 new Collection's constructor as `apiconfig`. If no value is provided,
1393 defaults to the configuration passed to this instance's constructor.
1395 if new_config is None:
1396 new_config = self._config
1398 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1400 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1402 newcollection._clonefrom(self)
1403 return newcollection
1406 def api_response(self) -> Optional[Dict[str, Any]]:
1407 """Get this instance's associated API record
1409 If this Collection instance has an associated API record, return it.
1410 Otherwise, return `None`.
1412 return self._api_response
1417 create_type: CreateType,
1418 ) -> CollectionItem:
1422 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1424 def find(self, path: str) -> CollectionItem:
1428 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1430 def remove(self, path: str, recursive: bool=False) -> None:
1432 raise errors.ArgumentError("Cannot remove '.'")
1434 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1441 properties: Optional[Properties]=None,
1442 storage_classes: Optional[StorageClasses]=None,
1443 trash_at: Optional[datetime.datetime]=None,
1445 num_retries: Optional[int]=None,
1446 preserve_version: bool=False,
1448 """Save collection to an existing API record
1450 This method updates the instance's corresponding API record to match
1451 the instance's state. If this instance does not have a corresponding API
1452 record yet, raises `AssertionError`. (To create a new API record, use
1453 `Collection.save_new`.) This method returns the saved collection
1458 * properties: dict[str, Any] | None --- If provided, the API record will
1459 be updated with these properties. Note this will completely replace
1460 any existing properties.
1462 * storage_classes: list[str] | None --- If provided, the API record will
1463 be updated with this value in the `storage_classes_desired` field.
1464 This value will also be saved on the instance and used for any
1465 changes that follow.
1467 * trash_at: datetime.datetime | None --- If provided, the API record
1468 will be updated with this value in the `trash_at` field.
1470 * merge: bool --- If `True` (the default), this method will first
1471 reload this collection's API record, and merge any new contents into
1472 this instance before saving changes. See `Collection.update` for
1475 * num_retries: int | None --- The number of times to retry reloading
1476 the collection's API record from the API server. If not specified,
1477 uses the `num_retries` provided when this instance was constructed.
1479 * preserve_version: bool --- This value will be passed to directly
1480 to the underlying API call. If `True`, the Arvados API will
1481 preserve the versions of this collection both immediately before
1482 and after the update. If `True` when the API server is not
1483 configured with collection versioning, this method raises
1484 `arvados.errors.ArgumentError`.
1486 if properties and type(properties) is not dict:
1487 raise errors.ArgumentError("properties must be dictionary type.")
1489 if storage_classes and type(storage_classes) is not list:
1490 raise errors.ArgumentError("storage_classes must be list type.")
1492 self._storage_classes_desired = storage_classes
1494 if trash_at and type(trash_at) is not datetime.datetime:
1495 raise errors.ArgumentError("trash_at must be datetime type.")
1497 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1498 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1502 body["properties"] = properties
1503 if self.storage_classes_desired():
1504 body["storage_classes_desired"] = self.storage_classes_desired()
1506 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1507 body["trash_at"] = t
1508 if preserve_version:
1509 body["preserve_version"] = preserve_version
1511 if not self.committed():
1512 if self._has_remote_blocks:
1513 # Copy any remote blocks to the local cluster.
1514 self._copy_remote_blocks(remote_blocks={})
1515 self._has_remote_blocks = False
1516 if not self._has_collection_uuid():
1517 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1518 elif not self._has_local_collection_uuid():
1519 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1521 self._my_block_manager().commit_all()
1526 text = self.manifest_text(strip=False)
1527 body['manifest_text'] = text
1529 self._remember_api_response(self._my_api().collections().update(
1530 uuid=self._manifest_locator,
1532 ).execute(num_retries=num_retries))
1533 self._manifest_text = self._api_response["manifest_text"]
1534 self._portable_data_hash = self._api_response["portable_data_hash"]
1535 self.set_committed(True)
1537 self._remember_api_response(self._my_api().collections().update(
1538 uuid=self._manifest_locator,
1540 ).execute(num_retries=num_retries))
1542 return self._manifest_text
1550 name: Optional[str]=None,
1551 create_collection_record: bool=True,
1552 owner_uuid: Optional[str]=None,
1553 properties: Optional[Properties]=None,
1554 storage_classes: Optional[StorageClasses]=None,
1555 trash_at: Optional[datetime.datetime]=None,
1556 ensure_unique_name: bool=False,
1557 num_retries: Optional[int]=None,
1558 preserve_version: bool=False,
1560 """Save collection to a new API record
1562 This method finishes uploading new data blocks and (optionally)
1563 creates a new API collection record with the provided data. If a new
1564 record is created, this instance becomes associated with that record
1565 for future updates like `save()`. This method returns the saved
1566 collection manifest.
1570 * name: str | None --- The `name` field to use on the new collection
1571 record. If not specified, a generic default name is generated.
1573 * create_collection_record: bool --- If `True` (the default), creates a
1574 collection record on the API server. If `False`, the method finishes
1575 all data uploads and only returns the resulting collection manifest
1576 without sending it to the API server.
1578 * owner_uuid: str | None --- The `owner_uuid` field to use on the
1579 new collection record.
1581 * properties: dict[str, Any] | None --- The `properties` field to use on
1582 the new collection record.
1584 * storage_classes: list[str] | None --- The
1585 `storage_classes_desired` field to use on the new collection record.
1587 * trash_at: datetime.datetime | None --- The `trash_at` field to use
1588 on the new collection record.
1590 * ensure_unique_name: bool --- This value is passed directly to the
1591 Arvados API when creating the collection record. If `True`, the API
1592 server may modify the submitted `name` to ensure the collection's
1593 `name`+`owner_uuid` combination is unique. If `False` (the default),
1594 if a collection already exists with this same `name`+`owner_uuid`
1595 combination, creating a collection record will raise a validation
1598 * num_retries: int | None --- The number of times to retry reloading
1599 the collection's API record from the API server. If not specified,
1600 uses the `num_retries` provided when this instance was constructed.
1602 * preserve_version: bool --- This value will be passed to directly
1603 to the underlying API call. If `True`, the Arvados API will
1604 preserve the versions of this collection both immediately before
1605 and after the update. If `True` when the API server is not
1606 configured with collection versioning, this method raises
1607 `arvados.errors.ArgumentError`.
1609 if properties and type(properties) is not dict:
1610 raise errors.ArgumentError("properties must be dictionary type.")
1612 if storage_classes and type(storage_classes) is not list:
1613 raise errors.ArgumentError("storage_classes must be list type.")
1615 if trash_at and type(trash_at) is not datetime.datetime:
1616 raise errors.ArgumentError("trash_at must be datetime type.")
1618 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1619 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1621 if self._has_remote_blocks:
1622 # Copy any remote blocks to the local cluster.
1623 self._copy_remote_blocks(remote_blocks={})
1624 self._has_remote_blocks = False
1627 self._storage_classes_desired = storage_classes
1629 self._my_block_manager().commit_all()
1630 text = self.manifest_text(strip=False)
1632 if create_collection_record:
1634 name = "New collection"
1635 ensure_unique_name = True
1637 body = {"manifest_text": text,
1639 "replication_desired": self.replication_desired}
1641 body["owner_uuid"] = owner_uuid
1643 body["properties"] = properties
1644 if self.storage_classes_desired():
1645 body["storage_classes_desired"] = self.storage_classes_desired()
1647 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1648 body["trash_at"] = t
1649 if preserve_version:
1650 body["preserve_version"] = preserve_version
1652 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1653 text = self._api_response["manifest_text"]
1655 self._manifest_locator = self._api_response["uuid"]
1656 self._portable_data_hash = self._api_response["portable_data_hash"]
1658 self._manifest_text = text
1659 self.set_committed(True)
1663 _token_re = re.compile(r'(\S+)(\s+|$)')
1664 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1665 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1667 def _unescape_manifest_path(self, path):
1668 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1671 def _import_manifest(self, manifest_text):
1672 """Import a manifest into a `Collection`.
1675 The manifest text to import from.
1679 raise ArgumentError("Can only import manifest into an empty collection")
1688 for token_and_separator in self._token_re.finditer(manifest_text):
1689 tok = token_and_separator.group(1)
1690 sep = token_and_separator.group(2)
1692 if state == STREAM_NAME:
1693 # starting a new stream
1694 stream_name = self._unescape_manifest_path(tok)
1699 self.find_or_create(stream_name, COLLECTION)
1703 block_locator = self._block_re.match(tok)
1705 blocksize = int(block_locator.group(1))
1706 blocks.append(Range(tok, streamoffset, blocksize, 0))
1707 streamoffset += blocksize
1711 if state == SEGMENTS:
1712 file_segment = self._segment_re.match(tok)
1714 pos = int(file_segment.group(1))
1715 size = int(file_segment.group(2))
1716 name = self._unescape_manifest_path(file_segment.group(3))
1717 if name.split('/')[-1] == '.':
1718 # placeholder for persisting an empty directory, not a real file
1720 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1722 filepath = os.path.join(stream_name, name)
1724 afile = self.find_or_create(filepath, FILE)
1725 except IOError as e:
1726 if e.errno == errno.ENOTDIR:
1727 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1730 if isinstance(afile, ArvadosFile):
1731 afile.add_segment(blocks, pos, size)
1733 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1736 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1742 self.set_committed(True)
1748 collection: 'RichCollectionBase',
1750 item: CollectionItem,
1753 self._callback(event, collection, name, item)
1756 class Subcollection(RichCollectionBase):
1757 """Read and manipulate a stream/directory within an Arvados collection
1759 This class represents a single stream (like a directory) within an Arvados
1760 `Collection`. It is returned by `Collection.find` and provides the same API.
1761 Operations that work on the API collection record propagate to the parent
1762 `Collection` object.
1765 def __init__(self, parent, name):
1766 super(Subcollection, self).__init__(parent)
1767 self.lock = self.root_collection().lock
1768 self._manifest_text = None
1770 self.num_retries = parent.num_retries
1772 def root_collection(self) -> 'Collection':
1773 return self.parent.root_collection()
1775 def writable(self) -> bool:
1776 return self.root_collection().writable()
1779 return self.root_collection()._my_api()
1782 return self.root_collection()._my_keep()
1784 def _my_block_manager(self):
1785 return self.root_collection()._my_block_manager()
1787 def stream_name(self) -> str:
1788 return os.path.join(self.parent.stream_name(), self.name)
1793 new_parent: Optional['Collection']=None,
1794 new_name: Optional[str]=None,
1795 ) -> 'Subcollection':
1796 c = Subcollection(new_parent, new_name)
1802 def _reparent(self, newparent, newname):
1803 self.set_committed(False)
1805 self.parent.remove(self.name, recursive=True)
1806 self.parent = newparent
1808 self.lock = self.parent.root_collection().lock
1811 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1812 """Encode empty directories by using an \056-named (".") empty file"""
1813 if len(self._items) == 0:
1814 return "%s %s 0:0:\\056\n" % (
1815 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1816 return super(Subcollection, self)._get_manifest_text(stream_name,
1821 class CollectionReader(Collection):
1822 """Read-only `Collection` subclass
1824 This class will never create or update any API collection records. You can
1825 use this class for additional code safety when you only need to read
1826 existing collections.
1828 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1829 self._in_init = True
1830 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1831 self._in_init = False
1833 # Forego any locking since it should never change once initialized.
1834 self.lock = NoopLock()
1836 # Backwards compatability with old CollectionReader
1837 # all_streams() and all_files()
1838 self._streams = None
1840 def writable(self) -> bool:
1841 return self._in_init