1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
28 from collections import deque
31 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
32 from .keep import KeepLocator, KeepClient
33 from .stream import StreamReader
34 from ._normalize_stream import normalize_stream, escape
35 from ._ranges import Range, LocatorAndRange
36 from .safeapi import ThreadSafeApiCache
37 import arvados.config as config
38 import arvados.errors as errors
40 import arvados.events as events
41 from arvados.retry import retry_method
56 if sys.version_info < (3, 8):
57 from typing_extensions import Literal
59 from typing import Literal
61 _logger = logging.getLogger('arvados.collection')
64 """Argument value for `Collection` methods to represent an added item"""
66 """Argument value for `Collection` methods to represent a removed item"""
68 """Argument value for `Collection` methods to represent a modified item"""
70 """Argument value for `Collection` methods to represent an item with token differences"""
72 """`create_type` value for `Collection.find_or_create`"""
73 COLLECTION = "collection"
74 """`create_type` value for `Collection.find_or_create`"""
76 ChangeList = List[Union[
77 Tuple[Literal[ADD, DEL], str, 'Collection'],
78 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
80 ChangeType = Literal[ADD, DEL, MOD, TOK]
81 CollectionItem = Union[ArvadosFile, 'Collection']
82 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
83 CreateType = Literal[COLLECTION, FILE]
84 Properties = Dict[str, Any]
85 StorageClasses = List[str]
87 class CollectionBase(object):
88 """Abstract base class for Collection classes
90 .. ATTENTION:: Internal
91 This class is meant to be used by other parts of the SDK. User code
92 should instantiate or subclass `Collection` or one of its subclasses
97 """Enter a context block with this collection instance"""
100 def __exit__(self, exc_type, exc_value, traceback):
101 """Exit a context block with this collection instance"""
105 if self._keep_client is None:
106 self._keep_client = KeepClient(api_client=self._api_client,
107 num_retries=self.num_retries)
108 return self._keep_client
110 def stripped_manifest(self) -> str:
111 """Create a copy of the collection manifest with only size hints
113 This method returns a string with the current collection's manifest
114 text with all non-portable locator hints like permission hints and
115 remote cluster hints removed. The only hints in the returned manifest
118 raw = self.manifest_text()
120 for line in raw.split("\n"):
121 fields = line.split()
123 clean_fields = fields[:1] + [
124 (re.sub(r'\+[^\d][^\+]*', '', x)
125 if re.match(arvados.util.keep_locator_pattern, x)
128 clean += [' '.join(clean_fields), "\n"]
129 return ''.join(clean)
132 class _WriterFile(_FileLikeObjectBase):
133 def __init__(self, coll_writer, name):
134 super(_WriterFile, self).__init__(name, 'wb')
135 self.dest = coll_writer
138 super(_WriterFile, self).close()
139 self.dest.finish_current_file()
141 @_FileLikeObjectBase._before_close
142 def write(self, data):
143 self.dest.write(data)
145 @_FileLikeObjectBase._before_close
146 def writelines(self, seq):
150 @_FileLikeObjectBase._before_close
152 self.dest.flush_data()
155 class RichCollectionBase(CollectionBase):
156 """Base class for Collection classes
158 .. ATTENTION:: Internal
159 This class is meant to be used by other parts of the SDK. User code
160 should instantiate or subclass `Collection` or one of its subclasses
164 def __init__(self, parent=None):
166 self._committed = False
167 self._has_remote_blocks = False
168 self._callback = None
172 raise NotImplementedError()
175 raise NotImplementedError()
177 def _my_block_manager(self):
178 raise NotImplementedError()
180 def writable(self) -> bool:
181 """Indicate whether this collection object can be modified
183 This method returns `False` if this object is a `CollectionReader`,
186 raise NotImplementedError()
188 def root_collection(self) -> 'Collection':
189 """Get this collection's root collection object
191 If you open a subcollection with `Collection.find`, calling this method
192 on that subcollection returns the source Collection object.
194 raise NotImplementedError()
196 def stream_name(self) -> str:
197 """Get the name of the manifest stream represented by this collection
199 If you open a subcollection with `Collection.find`, calling this method
200 on that subcollection returns the name of the stream you opened.
202 raise NotImplementedError()
205 def has_remote_blocks(self) -> bool:
206 """Indiciate whether the collection refers to remote data
208 Returns `True` if the collection manifest includes any Keep locators
209 with a remote hint (`+R`), else `False`.
211 if self._has_remote_blocks:
214 if self[item].has_remote_blocks():
219 def set_has_remote_blocks(self, val: bool) -> None:
220 """Cache whether this collection refers to remote blocks
222 .. ATTENTION:: Internal
223 This method is only meant to be used by other Collection methods.
225 Set this collection's cached "has remote blocks" flag to the given
228 self._has_remote_blocks = val
230 self.parent.set_has_remote_blocks(val)
237 create_type: CreateType,
239 """Get the item at the given path, creating it if necessary
241 If `path` refers to a stream in this collection, returns a
242 corresponding `Subcollection` object. If `path` refers to a file in
243 this collection, returns a corresponding
244 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
245 this collection, then this method creates a new object and returns
246 it, creating parent streams as needed. The type of object created is
247 determined by the value of `create_type`.
251 * path: str --- The path to find or create within this collection.
253 * create_type: Literal[COLLECTION, FILE] --- The type of object to
254 create at `path` if one does not exist. Passing `COLLECTION`
255 creates a stream and returns the corresponding
256 `Subcollection`. Passing `FILE` creates a new file and returns the
257 corresponding `arvados.arvfile.ArvadosFile`.
259 pathcomponents = path.split("/", 1)
260 if pathcomponents[0]:
261 item = self._items.get(pathcomponents[0])
262 if len(pathcomponents) == 1:
265 if create_type == COLLECTION:
266 item = Subcollection(self, pathcomponents[0])
268 item = ArvadosFile(self, pathcomponents[0])
269 self._items[pathcomponents[0]] = item
270 self.set_committed(False)
271 self.notify(ADD, self, pathcomponents[0], item)
275 # create new collection
276 item = Subcollection(self, pathcomponents[0])
277 self._items[pathcomponents[0]] = item
278 self.set_committed(False)
279 self.notify(ADD, self, pathcomponents[0], item)
280 if isinstance(item, RichCollectionBase):
281 return item.find_or_create(pathcomponents[1], create_type)
283 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
288 def find(self, path: str) -> CollectionItem:
289 """Get the item at the given path
291 If `path` refers to a stream in this collection, returns a
292 corresponding `Subcollection` object. If `path` refers to a file in
293 this collection, returns a corresponding
294 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
295 this collection, then this method raises `NotADirectoryError`.
299 * path: str --- The path to find or create within this collection.
302 raise errors.ArgumentError("Parameter 'path' is empty.")
304 pathcomponents = path.split("/", 1)
305 if pathcomponents[0] == '':
306 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
308 item = self._items.get(pathcomponents[0])
311 elif len(pathcomponents) == 1:
314 if isinstance(item, RichCollectionBase):
315 if pathcomponents[1]:
316 return item.find(pathcomponents[1])
320 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
323 def mkdirs(self, path: str) -> 'Subcollection':
324 """Create and return a subcollection at `path`
326 If `path` exists within this collection, raises `FileExistsError`.
327 Otherwise, creates a stream at that path and returns the
328 corresponding `Subcollection`.
330 if self.find(path) != None:
331 raise IOError(errno.EEXIST, "Directory or file exists", path)
333 return self.find_or_create(path, COLLECTION)
339 encoding: Optional[str]=None,
341 """Open a file-like object within the collection
343 This method returns a file-like object that can read and/or write the
344 file located at `path` within the collection. If you attempt to write
345 a `path` that does not exist, the file is created with `find_or_create`.
346 If the file cannot be opened for any other reason, this method raises
347 `OSError` with an appropriate errno.
351 * path: str --- The path of the file to open within this collection
353 * mode: str --- The mode to open this file. Supports all the same
354 values as `builtins.open`.
356 * encoding: str | None --- The text encoding of the file. Only used
357 when the file is opened in text mode. The default is
360 if not re.search(r'^[rwa][bt]?\+?$', mode):
361 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
363 if mode[0] == 'r' and '+' not in mode:
364 fclass = ArvadosFileReader
365 arvfile = self.find(path)
366 elif not self.writable():
367 raise IOError(errno.EROFS, "Collection is read only")
369 fclass = ArvadosFileWriter
370 arvfile = self.find_or_create(path, FILE)
373 raise IOError(errno.ENOENT, "File not found", path)
374 if not isinstance(arvfile, ArvadosFile):
375 raise IOError(errno.EISDIR, "Is a directory", path)
380 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
381 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
383 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
384 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
387 def modified(self) -> bool:
388 """Indicate whether this collection has an API server record
390 Returns `False` if this collection corresponds to a record loaded from
391 the API server, `True` otherwise.
393 return not self.committed()
397 """Indicate whether this collection has an API server record
399 Returns `True` if this collection corresponds to a record loaded from
400 the API server, `False` otherwise.
402 return self._committed
405 def set_committed(self, value: bool=True):
406 """Cache whether this collection has an API server record
408 .. ATTENTION:: Internal
409 This method is only meant to be used by other Collection methods.
411 Set this collection's cached "committed" flag to the given
412 value and propagates it as needed.
414 if value == self._committed:
417 for k,v in self._items.items():
418 v.set_committed(True)
419 self._committed = True
421 self._committed = False
422 if self.parent is not None:
423 self.parent.set_committed(False)
426 def __iter__(self) -> Iterator[str]:
427 """Iterate names of streams and files in this collection
429 This method does not recurse. It only iterates the contents of this
430 collection's corresponding stream.
432 return iter(self._items)
435 def __getitem__(self, k: str) -> CollectionItem:
436 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
438 This method does not recurse. If you want to search a path, use
439 `RichCollectionBase.find` instead.
441 return self._items[k]
444 def __contains__(self, k: str) -> bool:
445 """Indicate whether this collection has an item with this name
447 This method does not recurse. It you want to check a path, use
448 `RichCollectionBase.exists` instead.
450 return k in self._items
454 """Get the number of items directly contained in this collection
456 This method does not recurse. It only counts the streams and files
457 in this collection's corresponding stream.
459 return len(self._items)
463 def __delitem__(self, p: str) -> None:
464 """Delete an item from this collection's stream
466 This method does not recurse. If you want to remove an item by a
467 path, use `RichCollectionBase.remove` instead.
470 self.set_committed(False)
471 self.notify(DEL, self, p, None)
474 def keys(self) -> Iterator[str]:
475 """Iterate names of streams and files in this collection
477 This method does not recurse. It only iterates the contents of this
478 collection's corresponding stream.
480 return self._items.keys()
483 def values(self) -> List[CollectionItem]:
484 """Get a list of objects in this collection's stream
486 The return value includes a `Subcollection` for every stream, and an
487 `arvados.arvfile.ArvadosFile` for every file, directly within this
488 collection's stream. This method does not recurse.
490 return list(self._items.values())
493 def items(self) -> List[Tuple[str, CollectionItem]]:
494 """Get a list of `(name, object)` tuples from this collection's stream
496 The return value includes a `Subcollection` for every stream, and an
497 `arvados.arvfile.ArvadosFile` for every file, directly within this
498 collection's stream. This method does not recurse.
500 return list(self._items.items())
502 def exists(self, path: str) -> bool:
503 """Indicate whether this collection includes an item at `path`
505 This method returns `True` if `path` refers to a stream or file within
506 this collection, else `False`.
510 * path: str --- The path to check for existence within this collection
512 return self.find(path) is not None
516 def remove(self, path: str, recursive: bool=False) -> None:
517 """Remove the file or stream at `path`
521 * path: str --- The path of the item to remove from the collection
523 * recursive: bool --- Controls the method's behavior if `path` refers
524 to a nonempty stream. If `False` (the default), this method raises
525 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
526 items under the stream.
529 raise errors.ArgumentError("Parameter 'path' is empty.")
531 pathcomponents = path.split("/", 1)
532 item = self._items.get(pathcomponents[0])
534 raise IOError(errno.ENOENT, "File not found", path)
535 if len(pathcomponents) == 1:
536 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
537 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
538 deleteditem = self._items[pathcomponents[0]]
539 del self._items[pathcomponents[0]]
540 self.set_committed(False)
541 self.notify(DEL, self, pathcomponents[0], deleteditem)
543 item.remove(pathcomponents[1], recursive=recursive)
545 def _clonefrom(self, source):
546 for k,v in source.items():
547 self._items[k] = v.clone(self, k)
550 raise NotImplementedError()
556 source_obj: CollectionItem,
558 overwrite: bool=False,
559 reparent: bool=False,
561 """Copy or move a file or subcollection object to this collection
565 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
566 to add to this collection
568 * target_name: str --- The path inside this collection where
569 `source_obj` should be added.
571 * overwrite: bool --- Controls the behavior of this method when the
572 collection already contains an object at `target_name`. If `False`
573 (the default), this method will raise `FileExistsError`. If `True`,
574 the object at `target_name` will be replaced with `source_obj`.
576 * reparent: bool --- Controls whether this method copies or moves
577 `source_obj`. If `False` (the default), `source_obj` is copied into
578 this collection. If `True`, `source_obj` is moved into this
581 if target_name in self and not overwrite:
582 raise IOError(errno.EEXIST, "File already exists", target_name)
585 if target_name in self:
586 modified_from = self[target_name]
588 # Actually make the move or copy.
590 source_obj._reparent(self, target_name)
593 item = source_obj.clone(self, target_name)
595 self._items[target_name] = item
596 self.set_committed(False)
597 if not self._has_remote_blocks and source_obj.has_remote_blocks():
598 self.set_has_remote_blocks(True)
601 self.notify(MOD, self, target_name, (modified_from, item))
603 self.notify(ADD, self, target_name, item)
605 def _get_src_target(self, source, target_path, source_collection, create_dest):
606 if source_collection is None:
607 source_collection = self
610 if isinstance(source, str):
611 source_obj = source_collection.find(source)
612 if source_obj is None:
613 raise IOError(errno.ENOENT, "File not found", source)
614 sourcecomponents = source.split("/")
617 sourcecomponents = None
619 # Find parent collection the target path
620 targetcomponents = target_path.split("/")
622 # Determine the name to use.
623 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
626 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
629 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
631 if len(targetcomponents) > 1:
632 target_dir = self.find("/".join(targetcomponents[0:-1]))
636 if target_dir is None:
637 raise IOError(errno.ENOENT, "Target directory not found", target_name)
639 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
640 target_dir = target_dir[target_name]
641 target_name = sourcecomponents[-1]
643 return (source_obj, target_dir, target_name)
649 source: Union[str, CollectionItem],
651 source_collection: Optional['RichCollectionBase']=None,
652 overwrite: bool=False,
654 """Copy a file or subcollection object to this collection
658 * source: str | arvados.arvfile.ArvadosFile |
659 arvados.collection.Subcollection --- The file or subcollection to
660 add to this collection. If `source` is a str, the object will be
661 found by looking up this path from `source_collection` (see
664 * target_path: str --- The path inside this collection where the
665 source object should be added.
667 * source_collection: arvados.collection.Collection | None --- The
668 collection to find the source object from when `source` is a
669 path. Defaults to the current collection (`self`).
671 * overwrite: bool --- Controls the behavior of this method when the
672 collection already contains an object at `target_path`. If `False`
673 (the default), this method will raise `FileExistsError`. If `True`,
674 the object at `target_path` will be replaced with `source_obj`.
676 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
677 target_dir.add(source_obj, target_name, overwrite, False)
683 source: Union[str, CollectionItem],
685 source_collection: Optional['RichCollectionBase']=None,
686 overwrite: bool=False,
688 """Move a file or subcollection object to this collection
692 * source: str | arvados.arvfile.ArvadosFile |
693 arvados.collection.Subcollection --- The file or subcollection to
694 add to this collection. If `source` is a str, the object will be
695 found by looking up this path from `source_collection` (see
698 * target_path: str --- The path inside this collection where the
699 source object should be added.
701 * source_collection: arvados.collection.Collection | None --- The
702 collection to find the source object from when `source` is a
703 path. Defaults to the current collection (`self`).
705 * overwrite: bool --- Controls the behavior of this method when the
706 collection already contains an object at `target_path`. If `False`
707 (the default), this method will raise `FileExistsError`. If `True`,
708 the object at `target_path` will be replaced with `source_obj`.
710 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
711 if not source_obj.writable():
712 raise IOError(errno.EROFS, "Source collection is read only", source)
713 target_dir.add(source_obj, target_name, overwrite, True)
715 def portable_manifest_text(self, stream_name: str=".") -> str:
716 """Get the portable manifest text for this collection
718 The portable manifest text is normalized, and does not include access
719 tokens. This method does not flush outstanding blocks to Keep.
723 * stream_name: str --- The name to use for this collection's stream in
724 the generated manifest. Default `'.'`.
726 return self._get_manifest_text(stream_name, True, True)
731 stream_name: str=".",
733 normalize: bool=False,
734 only_committed: bool=False,
736 """Get the manifest text for this collection
740 * stream_name: str --- The name to use for this collection's stream in
741 the generated manifest. Default `'.'`.
743 * strip: bool --- Controls whether or not the returned manifest text
744 includes access tokens. If `False` (the default), the manifest text
745 will include access tokens. If `True`, the manifest text will not
746 include access tokens.
748 * normalize: bool --- Controls whether or not the returned manifest
749 text is normalized. Default `False`.
751 * only_committed: bool --- Controls whether or not this method uploads
752 pending data to Keep before building and returning the manifest text.
753 If `False` (the default), this method will finish uploading all data
754 to Keep, then return the final manifest. If `True`, this method will
755 build and return a manifest that only refers to the data that has
756 finished uploading at the time this method was called.
758 if not only_committed:
759 self._my_block_manager().commit_all()
760 return self._get_manifest_text(stream_name, strip, normalize,
761 only_committed=only_committed)
764 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
765 """Get the manifest text for this collection, sub collections and files.
768 Name to use for this stream (directory)
771 If True, remove signing tokens from block locators if present.
772 If False (default), block locators are left unchanged.
775 If True, always export the manifest text in normalized form
776 even if the Collection is not modified. If False (default) and the collection
777 is not modified, return the original manifest text even if it is not
781 If True, only include blocks that were already committed to Keep.
785 if not self.committed() or self._manifest_text is None or normalize:
788 sorted_keys = sorted(self.keys())
789 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
790 # Create a stream per file `k`
791 arvfile = self[filename]
793 for segment in arvfile.segments():
794 loc = segment.locator
795 if arvfile.parent._my_block_manager().is_bufferblock(loc):
798 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
800 loc = KeepLocator(loc).stripped()
801 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
802 segment.segment_offset, segment.range_size))
803 stream[filename] = filestream
805 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
806 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
807 buf.append(self[dirname].manifest_text(
808 stream_name=os.path.join(stream_name, dirname),
809 strip=strip, normalize=True, only_committed=only_committed))
813 return self.stripped_manifest()
815 return self._manifest_text
818 def _copy_remote_blocks(self, remote_blocks={}):
819 """Scan through the entire collection and ask Keep to copy remote blocks.
821 When accessing a remote collection, blocks will have a remote signature
822 (+R instead of +A). Collect these signatures and request Keep to copy the
823 blocks to the local cluster, returning local (+A) signatures.
826 Shared cache of remote to local block mappings. This is used to avoid
827 doing extra work when blocks are shared by more than one file in
828 different subdirectories.
832 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
838 end_collection: 'RichCollectionBase',
840 holding_collection: Optional['Collection']=None,
842 """Build a list of differences between this collection and another
846 * end_collection: arvados.collection.RichCollectionBase --- A
847 collection object with the desired end state. The returned diff
848 list will describe how to go from the current collection object
849 `self` to `end_collection`.
851 * prefix: str --- The name to use for this collection's stream in
852 the diff list. Default `'.'`.
854 * holding_collection: arvados.collection.Collection | None --- A
855 collection object used to hold objects for the returned diff
856 list. By default, a new empty collection is created.
859 if holding_collection is None:
860 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
862 if k not in end_collection:
863 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
864 for k in end_collection:
866 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
867 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
868 elif end_collection[k] != self[k]:
869 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
871 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
873 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
878 def apply(self, changes: ChangeList) -> None:
879 """Apply a list of changes from to this collection
881 This method takes a list of changes generated by
882 `RichCollectionBase.diff` and applies it to this
883 collection. Afterward, the state of this collection object will
884 match the state of `end_collection` passed to `diff`. If a change
885 conflicts with a local change, it will be saved to an alternate path
886 indicating the conflict.
890 * changes: arvados.collection.ChangeList --- The list of differences
891 generated by `RichCollectionBase.diff`.
894 self.set_committed(False)
895 for change in changes:
896 event_type = change[0]
899 local = self.find(path)
900 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
902 if event_type == ADD:
904 # No local file at path, safe to copy over new file
905 self.copy(initial, path)
906 elif local is not None and local != initial:
907 # There is already local file and it is different:
908 # save change to conflict file.
909 self.copy(initial, conflictpath)
910 elif event_type == MOD or event_type == TOK:
913 # Local matches the "initial" item so it has not
914 # changed locally and is safe to update.
915 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
916 # Replace contents of local file with new contents
917 local.replace_contents(final)
919 # Overwrite path with new item; this can happen if
920 # path was a file and is now a collection or vice versa
921 self.copy(final, path, overwrite=True)
923 # Local is missing (presumably deleted) or local doesn't
924 # match the "start" value, so save change to conflict file
925 self.copy(final, conflictpath)
926 elif event_type == DEL:
928 # Local item matches "initial" value, so it is safe to remove.
929 self.remove(path, recursive=True)
930 # else, the file is modified or already removed, in either
931 # case we don't want to try to remove it.
933 def portable_data_hash(self) -> str:
934 """Get the portable data hash for this collection's manifest"""
935 if self._manifest_locator and self.committed():
936 # If the collection is already saved on the API server, and it's committed
937 # then return API server's PDH response.
938 return self._portable_data_hash
940 stripped = self.portable_manifest_text().encode()
941 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
944 def subscribe(self, callback: ChangeCallback) -> None:
945 """Set a notify callback for changes to this collection
949 * callback: arvados.collection.ChangeCallback --- The callable to
950 call each time the collection is changed.
952 if self._callback is None:
953 self._callback = callback
955 raise errors.ArgumentError("A callback is already set on this collection.")
958 def unsubscribe(self) -> None:
959 """Remove any notify callback set for changes to this collection"""
960 if self._callback is not None:
961 self._callback = None
967 collection: 'RichCollectionBase',
969 item: CollectionItem,
971 """Notify any subscribed callback about a change to this collection
973 .. ATTENTION:: Internal
974 This method is only meant to be used by other Collection methods.
976 If a callback has been registered with `RichCollectionBase.subscribe`,
977 it will be called with information about a change to this collection.
978 Then this notification will be propagated to this collection's root.
982 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
985 * collection: arvados.collection.RichCollectionBase --- The
986 collection that was modified.
988 * name: str --- The name of the file or stream within `collection` that
991 * item: arvados.arvfile.ArvadosFile |
992 arvados.collection.Subcollection --- The new contents at `name`
996 self._callback(event, collection, name, item)
997 self.root_collection().notify(event, collection, name, item)
1000 def __eq__(self, other: Any) -> bool:
1001 """Indicate whether this collection object is equal to another"""
1004 if not isinstance(other, RichCollectionBase):
1006 if len(self._items) != len(other):
1008 for k in self._items:
1011 if self._items[k] != other[k]:
1015 def __ne__(self, other: Any) -> bool:
1016 """Indicate whether this collection object is not equal to another"""
1017 return not self.__eq__(other)
1020 def flush(self) -> None:
1021 """Upload any pending data to Keep"""
1022 for e in self.values():
1026 class Collection(RichCollectionBase):
1027 """Read and manipulate an Arvados collection
1029 This class provides a high-level interface to create, read, and update
1030 Arvados collections and their contents. Refer to the Arvados Python SDK
1031 cookbook for [an introduction to using the Collection class][cookbook].
1033 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1036 def __init__(self, manifest_locator_or_text: Optional[str]=None,
1037 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1038 keep_client: Optional['arvados.keep.KeepClient']=None,
1039 num_retries: int=10,
1040 parent: Optional['Collection']=None,
1041 apiconfig: Optional[Mapping[str, str]]=None,
1042 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1043 replication_desired: Optional[int]=None,
1044 storage_classes_desired: Optional[List[str]]=None,
1045 put_threads: Optional[int]=None):
1046 """Initialize a Collection object
1050 * manifest_locator_or_text: str | None --- This string can contain a
1051 collection manifest text, portable data hash, or UUID. When given a
1052 portable data hash or UUID, this instance will load a collection
1053 record from the API server. Otherwise, this instance will represent a
1054 new collection without an API server record. The default value `None`
1055 instantiates a new collection with an empty manifest.
1057 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1058 Arvados API client object this instance uses to make requests. If
1059 none is given, this instance creates its own client using the
1060 settings from `apiconfig` (see below). If your client instantiates
1061 many Collection objects, you can help limit memory utilization by
1062 calling `arvados.api.api` to construct an
1063 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1064 for every Collection.
1066 * keep_client: arvados.keep.KeepClient | None --- The Keep client
1067 object this instance uses to make requests. If none is given, this
1068 instance creates its own client using its `api_client`.
1070 * num_retries: int --- The number of times that client requests are
1071 retried. Default 10.
1073 * parent: arvados.collection.Collection | None --- The parent Collection
1074 object of this instance, if any. This argument is primarily used by
1075 other Collection methods; user client code shouldn't need to use it.
1077 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1078 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1079 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1080 Collection object constructs one from these settings. If no
1081 mapping is provided, calls `arvados.config.settings` to get these
1082 parameters from user configuration.
1084 * block_manager: arvados.arvfile._BlockManager | None --- The
1085 _BlockManager object used by this instance to coordinate reading
1086 and writing Keep data blocks. If none is given, this instance
1087 constructs its own. This argument is primarily used by other
1088 Collection methods; user client code shouldn't need to use it.
1090 * replication_desired: int | None --- This controls both the value of
1091 the `replication_desired` field on API collection records saved by
1092 this class, as well as the number of Keep services that the object
1093 writes new data blocks to. If none is given, uses the default value
1094 configured for the cluster.
1096 * storage_classes_desired: list[str] | None --- This controls both
1097 the value of the `storage_classes_desired` field on API collection
1098 records saved by this class, as well as selecting which specific
1099 Keep services the object writes new data blocks to. If none is
1100 given, defaults to an empty list.
1102 * put_threads: int | None --- The number of threads to run
1103 simultaneously to upload data blocks to Keep. This value is used when
1104 building a new `block_manager`. It is unused when a `block_manager`
1108 if storage_classes_desired and type(storage_classes_desired) is not list:
1109 raise errors.ArgumentError("storage_classes_desired must be list type.")
1111 super(Collection, self).__init__(parent)
1112 self._api_client = api_client
1113 self._keep_client = keep_client
1115 # Use the keep client from ThreadSafeApiCache
1116 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1117 self._keep_client = self._api_client.keep
1119 self._block_manager = block_manager
1120 self.replication_desired = replication_desired
1121 self._storage_classes_desired = storage_classes_desired
1122 self.put_threads = put_threads
1125 self._config = apiconfig
1127 self._config = config.settings()
1129 self.num_retries = num_retries
1130 self._manifest_locator = None
1131 self._manifest_text = None
1132 self._portable_data_hash = None
1133 self._api_response = None
1134 self._past_versions = set()
1136 self.lock = threading.RLock()
1139 if manifest_locator_or_text:
1140 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1141 self._manifest_locator = manifest_locator_or_text
1142 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1143 self._manifest_locator = manifest_locator_or_text
1144 if not self._has_local_collection_uuid():
1145 self._has_remote_blocks = True
1146 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1147 self._manifest_text = manifest_locator_or_text
1148 if '+R' in self._manifest_text:
1149 self._has_remote_blocks = True
1151 raise errors.ArgumentError(
1152 "Argument to CollectionReader is not a manifest or a collection UUID")
1156 except errors.SyntaxError as e:
1157 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1159 def storage_classes_desired(self) -> List[str]:
1160 """Get this collection's `storage_classes_desired` value"""
1161 return self._storage_classes_desired or []
1163 def root_collection(self) -> 'Collection':
1166 def get_properties(self) -> Properties:
1167 """Get this collection's properties
1169 This method always returns a dict. If this collection object does not
1170 have an associated API record, or that record does not have any
1171 properties set, this method returns an empty dict.
1173 if self._api_response and self._api_response["properties"]:
1174 return self._api_response["properties"]
1178 def get_trash_at(self) -> Optional[datetime.datetime]:
1179 """Get this collection's `trash_at` field
1181 This method parses the `trash_at` field of the collection's API
1182 record and returns a datetime from it. If that field is not set, or
1183 this collection object does not have an associated API record,
1186 if self._api_response and self._api_response["trash_at"]:
1188 return ciso8601.parse_datetime(self._api_response["trash_at"])
1194 def stream_name(self) -> str:
1197 def writable(self) -> bool:
1201 def known_past_version(
1203 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1205 """Indicate whether an API record for this collection has been seen before
1207 As this collection object loads records from the API server, it records
1208 their `modified_at` and `portable_data_hash` fields. This method accepts
1209 a 2-tuple with values for those fields, and returns `True` if the
1210 combination was previously loaded.
1212 return modified_at_and_portable_data_hash in self._past_versions
1218 other: Optional['Collection']=None,
1219 num_retries: Optional[int]=None,
1221 """Merge another collection's contents into this one
1223 This method compares the manifest of this collection instance with
1224 another, then updates this instance's manifest with changes from the
1225 other, renaming files to flag conflicts where necessary.
1227 When called without any arguments, this method reloads the collection's
1228 API record, and updates this instance with any changes that have
1229 appeared server-side. If this instance does not have a corresponding
1230 API record, this method raises `arvados.errors.ArgumentError`.
1234 * other: arvados.collection.Collection | None --- The collection
1235 whose contents should be merged into this instance. When not
1236 provided, this method reloads this collection's API record and
1237 constructs a Collection object from it. If this instance does not
1238 have a corresponding API record, this method raises
1239 `arvados.errors.ArgumentError`.
1241 * num_retries: int | None --- The number of times to retry reloading
1242 the collection's API record from the API server. If not specified,
1243 uses the `num_retries` provided when this instance was constructed.
1246 if self._manifest_locator is None:
1247 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1248 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1249 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1250 response.get("portable_data_hash") != self.portable_data_hash()):
1251 # The record on the server is different from our current one, but we've seen it before,
1252 # so ignore it because it's already been merged.
1253 # However, if it's the same as our current record, proceed with the update, because we want to update
1257 self._remember_api_response(response)
1258 other = CollectionReader(response["manifest_text"])
1259 baseline = CollectionReader(self._manifest_text)
1260 self.apply(baseline.diff(other))
1261 self._manifest_text = self.manifest_text()
1265 if self._api_client is None:
1266 self._api_client = ThreadSafeApiCache(self._config, version='v1')
1267 if self._keep_client is None:
1268 self._keep_client = self._api_client.keep
1269 return self._api_client
1273 if self._keep_client is None:
1274 if self._api_client is None:
1277 self._keep_client = KeepClient(api_client=self._api_client)
1278 return self._keep_client
1281 def _my_block_manager(self):
1282 if self._block_manager is None:
1283 copies = (self.replication_desired or
1284 self._my_api()._rootDesc.get('defaultCollectionReplication',
1286 self._block_manager = _BlockManager(self._my_keep(),
1288 put_threads=self.put_threads,
1289 num_retries=self.num_retries,
1290 storage_classes_func=self.storage_classes_desired)
1291 return self._block_manager
1293 def _remember_api_response(self, response):
1294 self._api_response = response
1295 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1297 def _populate_from_api_server(self):
1298 # As in KeepClient itself, we must wait until the last
1299 # possible moment to instantiate an API client, in order to
1300 # avoid tripping up clients that don't have access to an API
1301 # server. If we do build one, make sure our Keep client uses
1302 # it. If instantiation fails, we'll fall back to the except
1303 # clause, just like any other Collection lookup
1304 # failure. Return an exception, or None if successful.
1305 self._remember_api_response(self._my_api().collections().get(
1306 uuid=self._manifest_locator).execute(
1307 num_retries=self.num_retries))
1308 self._manifest_text = self._api_response['manifest_text']
1309 self._portable_data_hash = self._api_response['portable_data_hash']
1310 # If not overriden via kwargs, we should try to load the
1311 # replication_desired and storage_classes_desired from the API server
1312 if self.replication_desired is None:
1313 self.replication_desired = self._api_response.get('replication_desired', None)
1314 if self._storage_classes_desired is None:
1315 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1317 def _populate(self):
1318 if self._manifest_text is None:
1319 if self._manifest_locator is None:
1322 self._populate_from_api_server()
1323 self._baseline_manifest = self._manifest_text
1324 self._import_manifest(self._manifest_text)
1326 def _has_collection_uuid(self):
1327 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1329 def _has_local_collection_uuid(self):
1330 return self._has_collection_uuid and \
1331 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1333 def __enter__(self):
1336 def __exit__(self, exc_type, exc_value, traceback):
1337 """Exit a context with this collection instance
1339 If no exception was raised inside the context block, and this
1340 collection is writable and has a corresponding API record, that
1341 record will be updated to match the state of this instance at the end
1344 if exc_type is None:
1345 if self.writable() and self._has_collection_uuid():
1349 def stop_threads(self) -> None:
1350 """Stop background Keep upload/download threads"""
1351 if self._block_manager is not None:
1352 self._block_manager.stop_threads()
1355 def manifest_locator(self) -> Optional[str]:
1356 """Get this collection's manifest locator, if any
1358 * If this collection instance is associated with an API record with a
1360 * Otherwise, if this collection instance was loaded from an API record
1361 by portable data hash, return that.
1362 * Otherwise, return `None`.
1364 return self._manifest_locator
1369 new_parent: Optional['Collection']=None,
1370 new_name: Optional[str]=None,
1371 readonly: bool=False,
1372 new_config: Optional[Mapping[str, str]]=None,
1374 """Create a Collection object with the same contents as this instance
1376 This method creates a new Collection object with contents that match
1377 this instance's. The new collection will not be associated with any API
1382 * new_parent: arvados.collection.Collection | None --- This value is
1383 passed to the new Collection's constructor as the `parent`
1386 * new_name: str | None --- This value is unused.
1388 * readonly: bool --- If this value is true, this method constructs and
1389 returns a `CollectionReader`. Otherwise, it returns a mutable
1390 `Collection`. Default `False`.
1392 * new_config: Mapping[str, str] | None --- This value is passed to the
1393 new Collection's constructor as `apiconfig`. If no value is provided,
1394 defaults to the configuration passed to this instance's constructor.
1396 if new_config is None:
1397 new_config = self._config
1399 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1401 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1403 newcollection._clonefrom(self)
1404 return newcollection
1407 def api_response(self) -> Optional[Dict[str, Any]]:
1408 """Get this instance's associated API record
1410 If this Collection instance has an associated API record, return it.
1411 Otherwise, return `None`.
1413 return self._api_response
1418 create_type: CreateType,
1419 ) -> CollectionItem:
1423 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1425 def find(self, path: str) -> CollectionItem:
1429 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1431 def remove(self, path: str, recursive: bool=False) -> None:
1433 raise errors.ArgumentError("Cannot remove '.'")
1435 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1442 properties: Optional[Properties]=None,
1443 storage_classes: Optional[StorageClasses]=None,
1444 trash_at: Optional[datetime.datetime]=None,
1446 num_retries: Optional[int]=None,
1447 preserve_version: bool=False,
1449 """Save collection to an existing API record
1451 This method updates the instance's corresponding API record to match
1452 the instance's state. If this instance does not have a corresponding API
1453 record yet, raises `AssertionError`. (To create a new API record, use
1454 `Collection.save_new`.) This method returns the saved collection
1459 * properties: dict[str, Any] | None --- If provided, the API record will
1460 be updated with these properties. Note this will completely replace
1461 any existing properties.
1463 * storage_classes: list[str] | None --- If provided, the API record will
1464 be updated with this value in the `storage_classes_desired` field.
1465 This value will also be saved on the instance and used for any
1466 changes that follow.
1468 * trash_at: datetime.datetime | None --- If provided, the API record
1469 will be updated with this value in the `trash_at` field.
1471 * merge: bool --- If `True` (the default), this method will first
1472 reload this collection's API record, and merge any new contents into
1473 this instance before saving changes. See `Collection.update` for
1476 * num_retries: int | None --- The number of times to retry reloading
1477 the collection's API record from the API server. If not specified,
1478 uses the `num_retries` provided when this instance was constructed.
1480 * preserve_version: bool --- This value will be passed to directly
1481 to the underlying API call. If `True`, the Arvados API will
1482 preserve the versions of this collection both immediately before
1483 and after the update. If `True` when the API server is not
1484 configured with collection versioning, this method raises
1485 `arvados.errors.ArgumentError`.
1487 if properties and type(properties) is not dict:
1488 raise errors.ArgumentError("properties must be dictionary type.")
1490 if storage_classes and type(storage_classes) is not list:
1491 raise errors.ArgumentError("storage_classes must be list type.")
1493 self._storage_classes_desired = storage_classes
1495 if trash_at and type(trash_at) is not datetime.datetime:
1496 raise errors.ArgumentError("trash_at must be datetime type.")
1498 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1499 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1503 body["properties"] = properties
1504 if self.storage_classes_desired():
1505 body["storage_classes_desired"] = self.storage_classes_desired()
1507 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1508 body["trash_at"] = t
1509 if preserve_version:
1510 body["preserve_version"] = preserve_version
1512 if not self.committed():
1513 if self._has_remote_blocks:
1514 # Copy any remote blocks to the local cluster.
1515 self._copy_remote_blocks(remote_blocks={})
1516 self._has_remote_blocks = False
1517 if not self._has_collection_uuid():
1518 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1519 elif not self._has_local_collection_uuid():
1520 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1522 self._my_block_manager().commit_all()
1527 text = self.manifest_text(strip=False)
1528 body['manifest_text'] = text
1530 self._remember_api_response(self._my_api().collections().update(
1531 uuid=self._manifest_locator,
1533 ).execute(num_retries=num_retries))
1534 self._manifest_text = self._api_response["manifest_text"]
1535 self._portable_data_hash = self._api_response["portable_data_hash"]
1536 self.set_committed(True)
1538 self._remember_api_response(self._my_api().collections().update(
1539 uuid=self._manifest_locator,
1541 ).execute(num_retries=num_retries))
1543 return self._manifest_text
1551 name: Optional[str]=None,
1552 create_collection_record: bool=True,
1553 owner_uuid: Optional[str]=None,
1554 properties: Optional[Properties]=None,
1555 storage_classes: Optional[StorageClasses]=None,
1556 trash_at: Optional[datetime.datetime]=None,
1557 ensure_unique_name: bool=False,
1558 num_retries: Optional[int]=None,
1559 preserve_version: bool=False,
1561 """Save collection to a new API record
1563 This method finishes uploading new data blocks and (optionally)
1564 creates a new API collection record with the provided data. If a new
1565 record is created, this instance becomes associated with that record
1566 for future updates like `save()`. This method returns the saved
1567 collection manifest.
1571 * name: str | None --- The `name` field to use on the new collection
1572 record. If not specified, a generic default name is generated.
1574 * create_collection_record: bool --- If `True` (the default), creates a
1575 collection record on the API server. If `False`, the method finishes
1576 all data uploads and only returns the resulting collection manifest
1577 without sending it to the API server.
1579 * owner_uuid: str | None --- The `owner_uuid` field to use on the
1580 new collection record.
1582 * properties: dict[str, Any] | None --- The `properties` field to use on
1583 the new collection record.
1585 * storage_classes: list[str] | None --- The
1586 `storage_classes_desired` field to use on the new collection record.
1588 * trash_at: datetime.datetime | None --- The `trash_at` field to use
1589 on the new collection record.
1591 * ensure_unique_name: bool --- This value is passed directly to the
1592 Arvados API when creating the collection record. If `True`, the API
1593 server may modify the submitted `name` to ensure the collection's
1594 `name`+`owner_uuid` combination is unique. If `False` (the default),
1595 if a collection already exists with this same `name`+`owner_uuid`
1596 combination, creating a collection record will raise a validation
1599 * num_retries: int | None --- The number of times to retry reloading
1600 the collection's API record from the API server. If not specified,
1601 uses the `num_retries` provided when this instance was constructed.
1603 * preserve_version: bool --- This value will be passed to directly
1604 to the underlying API call. If `True`, the Arvados API will
1605 preserve the versions of this collection both immediately before
1606 and after the update. If `True` when the API server is not
1607 configured with collection versioning, this method raises
1608 `arvados.errors.ArgumentError`.
1610 if properties and type(properties) is not dict:
1611 raise errors.ArgumentError("properties must be dictionary type.")
1613 if storage_classes and type(storage_classes) is not list:
1614 raise errors.ArgumentError("storage_classes must be list type.")
1616 if trash_at and type(trash_at) is not datetime.datetime:
1617 raise errors.ArgumentError("trash_at must be datetime type.")
1619 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1620 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1622 if self._has_remote_blocks:
1623 # Copy any remote blocks to the local cluster.
1624 self._copy_remote_blocks(remote_blocks={})
1625 self._has_remote_blocks = False
1628 self._storage_classes_desired = storage_classes
1630 self._my_block_manager().commit_all()
1631 text = self.manifest_text(strip=False)
1633 if create_collection_record:
1635 name = "New collection"
1636 ensure_unique_name = True
1638 body = {"manifest_text": text,
1640 "replication_desired": self.replication_desired}
1642 body["owner_uuid"] = owner_uuid
1644 body["properties"] = properties
1645 if self.storage_classes_desired():
1646 body["storage_classes_desired"] = self.storage_classes_desired()
1648 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1649 body["trash_at"] = t
1650 if preserve_version:
1651 body["preserve_version"] = preserve_version
1653 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1654 text = self._api_response["manifest_text"]
1656 self._manifest_locator = self._api_response["uuid"]
1657 self._portable_data_hash = self._api_response["portable_data_hash"]
1659 self._manifest_text = text
1660 self.set_committed(True)
1664 _token_re = re.compile(r'(\S+)(\s+|$)')
1665 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1666 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1668 def _unescape_manifest_path(self, path):
1669 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1672 def _import_manifest(self, manifest_text):
1673 """Import a manifest into a `Collection`.
1676 The manifest text to import from.
1680 raise ArgumentError("Can only import manifest into an empty collection")
1689 for token_and_separator in self._token_re.finditer(manifest_text):
1690 tok = token_and_separator.group(1)
1691 sep = token_and_separator.group(2)
1693 if state == STREAM_NAME:
1694 # starting a new stream
1695 stream_name = self._unescape_manifest_path(tok)
1700 self.find_or_create(stream_name, COLLECTION)
1704 block_locator = self._block_re.match(tok)
1706 blocksize = int(block_locator.group(1))
1707 blocks.append(Range(tok, streamoffset, blocksize, 0))
1708 streamoffset += blocksize
1712 if state == SEGMENTS:
1713 file_segment = self._segment_re.match(tok)
1715 pos = int(file_segment.group(1))
1716 size = int(file_segment.group(2))
1717 name = self._unescape_manifest_path(file_segment.group(3))
1718 if name.split('/')[-1] == '.':
1719 # placeholder for persisting an empty directory, not a real file
1721 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1723 filepath = os.path.join(stream_name, name)
1725 afile = self.find_or_create(filepath, FILE)
1726 except IOError as e:
1727 if e.errno == errno.ENOTDIR:
1728 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1731 if isinstance(afile, ArvadosFile):
1732 afile.add_segment(blocks, pos, size)
1734 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1737 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1743 self.set_committed(True)
1749 collection: 'RichCollectionBase',
1751 item: CollectionItem,
1754 self._callback(event, collection, name, item)
1757 class Subcollection(RichCollectionBase):
1758 """Read and manipulate a stream/directory within an Arvados collection
1760 This class represents a single stream (like a directory) within an Arvados
1761 `Collection`. It is returned by `Collection.find` and provides the same API.
1762 Operations that work on the API collection record propagate to the parent
1763 `Collection` object.
1766 def __init__(self, parent, name):
1767 super(Subcollection, self).__init__(parent)
1768 self.lock = self.root_collection().lock
1769 self._manifest_text = None
1771 self.num_retries = parent.num_retries
1773 def root_collection(self) -> 'Collection':
1774 return self.parent.root_collection()
1776 def writable(self) -> bool:
1777 return self.root_collection().writable()
1780 return self.root_collection()._my_api()
1783 return self.root_collection()._my_keep()
1785 def _my_block_manager(self):
1786 return self.root_collection()._my_block_manager()
1788 def stream_name(self) -> str:
1789 return os.path.join(self.parent.stream_name(), self.name)
1794 new_parent: Optional['Collection']=None,
1795 new_name: Optional[str]=None,
1796 ) -> 'Subcollection':
1797 c = Subcollection(new_parent, new_name)
1803 def _reparent(self, newparent, newname):
1804 self.set_committed(False)
1806 self.parent.remove(self.name, recursive=True)
1807 self.parent = newparent
1809 self.lock = self.parent.root_collection().lock
1812 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1813 """Encode empty directories by using an \056-named (".") empty file"""
1814 if len(self._items) == 0:
1815 return "%s %s 0:0:\\056\n" % (
1816 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1817 return super(Subcollection, self)._get_manifest_text(stream_name,
1822 class CollectionReader(Collection):
1823 """Read-only `Collection` subclass
1825 This class will never create or update any API collection records. You can
1826 use this class for additional code safety when you only need to read
1827 existing collections.
1829 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1830 self._in_init = True
1831 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1832 self._in_init = False
1834 # Forego any locking since it should never change once initialized.
1835 self.lock = NoopLock()
1837 # Backwards compatability with old CollectionReader
1838 # all_streams() and all_files()
1839 self._streams = None
1841 def writable(self) -> bool:
1842 return self._in_init
1844 def _populate_streams(orig_func):
1845 @functools.wraps(orig_func)
1846 def populate_streams_wrapper(self, *args, **kwargs):
1847 # Defer populating self._streams until needed since it creates a copy of the manifest.
1848 if self._streams is None:
1849 if self._manifest_text:
1850 self._streams = [sline.split()
1851 for sline in self._manifest_text.split("\n")
1855 return orig_func(self, *args, **kwargs)
1856 return populate_streams_wrapper
1858 @arvados.util._deprecated('3.0', 'Collection iteration')
1860 def normalize(self):
1861 """Normalize the streams returned by `all_streams`"""
1863 for s in self.all_streams():
1864 for f in s.all_files():
1865 streamname, filename = split(s.name() + "/" + f.name())
1866 if streamname not in streams:
1867 streams[streamname] = {}
1868 if filename not in streams[streamname]:
1869 streams[streamname][filename] = []
1870 for r in f.segments:
1871 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1873 self._streams = [normalize_stream(s, streams[s])
1874 for s in sorted(streams)]
1876 @arvados.util._deprecated('3.0', 'Collection iteration')
1878 def all_streams(self):
1879 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1880 for s in self._streams]
1882 @arvados.util._deprecated('3.0', 'Collection iteration')
1884 def all_files(self):
1885 for s in self.all_streams():
1886 for f in s.all_files():
1890 class CollectionWriter(CollectionBase):
1891 """Create a new collection from scratch
1893 .. WARNING:: Deprecated
1894 This class is deprecated. Prefer `arvados.collection.Collection`
1898 @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1899 def __init__(self, api_client=None, num_retries=0, replication=None):
1900 """Instantiate a CollectionWriter.
1902 CollectionWriter lets you build a new Arvados Collection from scratch.
1903 Write files to it. The CollectionWriter will upload data to Keep as
1904 appropriate, and provide you with the Collection manifest text when
1908 * api_client: The API client to use to look up Collections. If not
1909 provided, CollectionReader will build one from available Arvados
1911 * num_retries: The default number of times to retry failed
1912 service requests. Default 0. You may change this value
1913 after instantiation, but note those changes may not
1914 propagate to related objects like the Keep client.
1915 * replication: The number of copies of each block to store.
1916 If this argument is None or not supplied, replication is
1917 the server-provided default if available, otherwise 2.
1919 self._api_client = api_client
1920 self.num_retries = num_retries
1921 self.replication = (2 if replication is None else replication)
1922 self._keep_client = None
1923 self._data_buffer = []
1924 self._data_buffer_len = 0
1925 self._current_stream_files = []
1926 self._current_stream_length = 0
1927 self._current_stream_locators = []
1928 self._current_stream_name = '.'
1929 self._current_file_name = None
1930 self._current_file_pos = 0
1931 self._finished_streams = []
1932 self._close_file = None
1933 self._queued_file = None
1934 self._queued_dirents = deque()
1935 self._queued_trees = deque()
1936 self._last_open = None
1938 def __exit__(self, exc_type, exc_value, traceback):
1939 if exc_type is None:
1942 def do_queued_work(self):
1943 # The work queue consists of three pieces:
1944 # * _queued_file: The file object we're currently writing to the
1946 # * _queued_dirents: Entries under the current directory
1947 # (_queued_trees[0]) that we want to write or recurse through.
1948 # This may contain files from subdirectories if
1949 # max_manifest_depth == 0 for this directory.
1950 # * _queued_trees: Directories that should be written as separate
1951 # streams to the Collection.
1952 # This function handles the smallest piece of work currently queued
1953 # (current file, then current directory, then next directory) until
1954 # no work remains. The _work_THING methods each do a unit of work on
1955 # THING. _queue_THING methods add a THING to the work queue.
1957 if self._queued_file:
1959 elif self._queued_dirents:
1960 self._work_dirents()
1961 elif self._queued_trees:
1966 def _work_file(self):
1968 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1972 self.finish_current_file()
1973 if self._close_file:
1974 self._queued_file.close()
1975 self._close_file = None
1976 self._queued_file = None
1978 def _work_dirents(self):
1979 path, stream_name, max_manifest_depth = self._queued_trees[0]
1980 if stream_name != self.current_stream_name():
1981 self.start_new_stream(stream_name)
1982 while self._queued_dirents:
1983 dirent = self._queued_dirents.popleft()
1984 target = os.path.join(path, dirent)
1985 if os.path.isdir(target):
1986 self._queue_tree(target,
1987 os.path.join(stream_name, dirent),
1988 max_manifest_depth - 1)
1990 self._queue_file(target, dirent)
1992 if not self._queued_dirents:
1993 self._queued_trees.popleft()
1995 def _work_trees(self):
1996 path, stream_name, max_manifest_depth = self._queued_trees[0]
1997 d = arvados.util.listdir_recursive(
1998 path, max_depth = (None if max_manifest_depth == 0 else 0))
2000 self._queue_dirents(stream_name, d)
2002 self._queued_trees.popleft()
2004 def _queue_file(self, source, filename=None):
2005 assert (self._queued_file is None), "tried to queue more than one file"
2006 if not hasattr(source, 'read'):
2007 source = open(source, 'rb')
2008 self._close_file = True
2010 self._close_file = False
2011 if filename is None:
2012 filename = os.path.basename(source.name)
2013 self.start_new_file(filename)
2014 self._queued_file = source
2016 def _queue_dirents(self, stream_name, dirents):
2017 assert (not self._queued_dirents), "tried to queue more than one tree"
2018 self._queued_dirents = deque(sorted(dirents))
2020 def _queue_tree(self, path, stream_name, max_manifest_depth):
2021 self._queued_trees.append((path, stream_name, max_manifest_depth))
2023 def write_file(self, source, filename=None):
2024 self._queue_file(source, filename)
2025 self.do_queued_work()
2027 def write_directory_tree(self,
2028 path, stream_name='.', max_manifest_depth=-1):
2029 self._queue_tree(path, stream_name, max_manifest_depth)
2030 self.do_queued_work()
2032 def write(self, newdata):
2033 if isinstance(newdata, bytes):
2035 elif isinstance(newdata, str):
2036 newdata = newdata.encode()
2037 elif hasattr(newdata, '__iter__'):
2041 self._data_buffer.append(newdata)
2042 self._data_buffer_len += len(newdata)
2043 self._current_stream_length += len(newdata)
2044 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2047 def open(self, streampath, filename=None):
2048 """open(streampath[, filename]) -> file-like object
2050 Pass in the path of a file to write to the Collection, either as a
2051 single string or as two separate stream name and file name arguments.
2052 This method returns a file-like object you can write to add it to the
2055 You may only have one file object from the Collection open at a time,
2056 so be sure to close the object when you're done. Using the object in
2057 a with statement makes that easy:
2059 with cwriter.open('./doc/page1.txt') as outfile:
2060 outfile.write(page1_data)
2061 with cwriter.open('./doc/page2.txt') as outfile:
2062 outfile.write(page2_data)
2064 if filename is None:
2065 streampath, filename = split(streampath)
2066 if self._last_open and not self._last_open.closed:
2067 raise errors.AssertionError(
2068 u"can't open '{}' when '{}' is still open".format(
2069 filename, self._last_open.name))
2070 if streampath != self.current_stream_name():
2071 self.start_new_stream(streampath)
2072 self.set_current_file_name(filename)
2073 self._last_open = _WriterFile(self, filename)
2074 return self._last_open
2076 def flush_data(self):
2077 data_buffer = b''.join(self._data_buffer)
2079 self._current_stream_locators.append(
2080 self._my_keep().put(
2081 data_buffer[0:config.KEEP_BLOCK_SIZE],
2082 copies=self.replication))
2083 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2084 self._data_buffer_len = len(self._data_buffer[0])
2086 def start_new_file(self, newfilename=None):
2087 self.finish_current_file()
2088 self.set_current_file_name(newfilename)
2090 def set_current_file_name(self, newfilename):
2091 if re.search(r'[\t\n]', newfilename):
2092 raise errors.AssertionError(
2093 "Manifest filenames cannot contain whitespace: %s" %
2095 elif re.search(r'\x00', newfilename):
2096 raise errors.AssertionError(
2097 "Manifest filenames cannot contain NUL characters: %s" %
2099 self._current_file_name = newfilename
2101 def current_file_name(self):
2102 return self._current_file_name
2104 def finish_current_file(self):
2105 if self._current_file_name is None:
2106 if self._current_file_pos == self._current_stream_length:
2108 raise errors.AssertionError(
2109 "Cannot finish an unnamed file " +
2110 "(%d bytes at offset %d in '%s' stream)" %
2111 (self._current_stream_length - self._current_file_pos,
2112 self._current_file_pos,
2113 self._current_stream_name))
2114 self._current_stream_files.append([
2115 self._current_file_pos,
2116 self._current_stream_length - self._current_file_pos,
2117 self._current_file_name])
2118 self._current_file_pos = self._current_stream_length
2119 self._current_file_name = None
2121 def start_new_stream(self, newstreamname='.'):
2122 self.finish_current_stream()
2123 self.set_current_stream_name(newstreamname)
2125 def set_current_stream_name(self, newstreamname):
2126 if re.search(r'[\t\n]', newstreamname):
2127 raise errors.AssertionError(
2128 "Manifest stream names cannot contain whitespace: '%s'" %
2130 self._current_stream_name = '.' if newstreamname=='' else newstreamname
2132 def current_stream_name(self):
2133 return self._current_stream_name
2135 def finish_current_stream(self):
2136 self.finish_current_file()
2138 if not self._current_stream_files:
2140 elif self._current_stream_name is None:
2141 raise errors.AssertionError(
2142 "Cannot finish an unnamed stream (%d bytes in %d files)" %
2143 (self._current_stream_length, len(self._current_stream_files)))
2145 if not self._current_stream_locators:
2146 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2147 self._finished_streams.append([self._current_stream_name,
2148 self._current_stream_locators,
2149 self._current_stream_files])
2150 self._current_stream_files = []
2151 self._current_stream_length = 0
2152 self._current_stream_locators = []
2153 self._current_stream_name = None
2154 self._current_file_pos = 0
2155 self._current_file_name = None
2158 """Store the manifest in Keep and return its locator.
2160 This is useful for storing manifest fragments (task outputs)
2161 temporarily in Keep during a Crunch job.
2163 In other cases you should make a collection instead, by
2164 sending manifest_text() to the API server's "create
2165 collection" endpoint.
2167 return self._my_keep().put(self.manifest_text().encode(),
2168 copies=self.replication)
2170 def portable_data_hash(self):
2171 stripped = self.stripped_manifest().encode()
2172 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2174 def manifest_text(self):
2175 self.finish_current_stream()
2178 for stream in self._finished_streams:
2179 if not re.search(r'^\.(/.*)?$', stream[0]):
2181 manifest += stream[0].replace(' ', '\\040')
2182 manifest += ' ' + ' '.join(stream[1])
2183 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2188 def data_locators(self):
2190 for name, locators, files in self._finished_streams:
2194 def save_new(self, name=None):
2195 return self._api_client.collections().create(
2196 ensure_unique_name=True,
2199 'manifest_text': self.manifest_text(),
2200 }).execute(num_retries=self.num_retries)
2203 class ResumableCollectionWriter(CollectionWriter):
2204 """CollectionWriter that can serialize internal state to disk
2206 .. WARNING:: Deprecated
2207 This class is deprecated. Prefer `arvados.collection.Collection`
2211 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2212 '_current_stream_locators', '_current_stream_name',
2213 '_current_file_name', '_current_file_pos', '_close_file',
2214 '_data_buffer', '_dependencies', '_finished_streams',
2215 '_queued_dirents', '_queued_trees']
2217 @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2218 def __init__(self, api_client=None, **kwargs):
2219 self._dependencies = {}
2220 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2223 def from_state(cls, state, *init_args, **init_kwargs):
2224 # Try to build a new writer from scratch with the given state.
2225 # If the state is not suitable to resume (because files have changed,
2226 # been deleted, aren't predictable, etc.), raise a
2227 # StaleWriterStateError. Otherwise, return the initialized writer.
2228 # The caller is responsible for calling writer.do_queued_work()
2229 # appropriately after it's returned.
2230 writer = cls(*init_args, **init_kwargs)
2231 for attr_name in cls.STATE_PROPS:
2232 attr_value = state[attr_name]
2233 attr_class = getattr(writer, attr_name).__class__
2234 # Coerce the value into the same type as the initial value, if
2236 if attr_class not in (type(None), attr_value.__class__):
2237 attr_value = attr_class(attr_value)
2238 setattr(writer, attr_name, attr_value)
2239 # Check dependencies before we try to resume anything.
2240 if any(KeepLocator(ls).permission_expired()
2241 for ls in writer._current_stream_locators):
2242 raise errors.StaleWriterStateError(
2243 "locators include expired permission hint")
2244 writer.check_dependencies()
2245 if state['_current_file'] is not None:
2246 path, pos = state['_current_file']
2248 writer._queued_file = open(path, 'rb')
2249 writer._queued_file.seek(pos)
2250 except IOError as error:
2251 raise errors.StaleWriterStateError(
2252 u"failed to reopen active file {}: {}".format(path, error))
2255 def check_dependencies(self):
2256 for path, orig_stat in self._dependencies.items():
2257 if not S_ISREG(orig_stat[ST_MODE]):
2258 raise errors.StaleWriterStateError(u"{} not file".format(path))
2260 now_stat = tuple(os.stat(path))
2261 except OSError as error:
2262 raise errors.StaleWriterStateError(
2263 u"failed to stat {}: {}".format(path, error))
2264 if ((not S_ISREG(now_stat[ST_MODE])) or
2265 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2266 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2267 raise errors.StaleWriterStateError(u"{} changed".format(path))
2269 def dump_state(self, copy_func=lambda x: x):
2270 state = {attr: copy_func(getattr(self, attr))
2271 for attr in self.STATE_PROPS}
2272 if self._queued_file is None:
2273 state['_current_file'] = None
2275 state['_current_file'] = (os.path.realpath(self._queued_file.name),
2276 self._queued_file.tell())
2279 def _queue_file(self, source, filename=None):
2281 src_path = os.path.realpath(source)
2283 raise errors.AssertionError(u"{} not a file path".format(source))
2285 path_stat = os.stat(src_path)
2286 except OSError as stat_error:
2288 super(ResumableCollectionWriter, self)._queue_file(source, filename)
2289 fd_stat = os.fstat(self._queued_file.fileno())
2290 if not S_ISREG(fd_stat.st_mode):
2291 # We won't be able to resume from this cache anyway, so don't
2292 # worry about further checks.
2293 self._dependencies[source] = tuple(fd_stat)
2294 elif path_stat is None:
2295 raise errors.AssertionError(
2296 u"could not stat {}: {}".format(source, stat_error))
2297 elif path_stat.st_ino != fd_stat.st_ino:
2298 raise errors.AssertionError(
2299 u"{} changed between open and stat calls".format(source))
2301 self._dependencies[src_path] = tuple(fd_stat)
2303 def write(self, data):
2304 if self._queued_file is None:
2305 raise errors.AssertionError(
2306 "resumable writer can't accept unsourced data")
2307 return super(ResumableCollectionWriter, self).write(data)