1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
28 from collections import deque
31 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
32 from .keep import KeepLocator, KeepClient
33 from .stream import StreamReader
34 from ._normalize_stream import normalize_stream, escape
35 from ._ranges import Range, LocatorAndRange
36 from .safeapi import ThreadSafeApiCache
37 import arvados.config as config
38 import arvados.errors as errors
40 import arvados.events as events
41 from arvados.retry import retry_method
56 if sys.version_info < (3, 8):
57 from typing_extensions import Literal
59 from typing import Literal
61 _logger = logging.getLogger('arvados.collection')
64 """Argument value for `Collection` methods to represent an added item"""
66 """Argument value for `Collection` methods to represent a removed item"""
68 """Argument value for `Collection` methods to represent a modified item"""
70 """Argument value for `Collection` methods to represent an item with token differences"""
72 """`create_type` value for `Collection.find_or_create`"""
73 COLLECTION = "collection"
74 """`create_type` value for `Collection.find_or_create`"""
76 ChangeList = List[Union[
77 Tuple[Literal[ADD, DEL], str, 'Collection'],
78 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
80 ChangeType = Literal[ADD, DEL, MOD, TOK]
81 CollectionItem = Union[ArvadosFile, 'Collection']
82 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
83 CreateType = Literal[COLLECTION, FILE]
84 Properties = Dict[str, Any]
85 StorageClasses = List[str]
87 class CollectionBase(object):
88 """Abstract base class for Collection classes
90 .. ATTENTION:: Internal
91 This class is meant to be used by other parts of the SDK. User code
92 should instantiate or subclass `Collection` or one of its subclasses
97 """Enter a context block with this collection instance"""
100 def __exit__(self, exc_type, exc_value, traceback):
101 """Exit a context block with this collection instance"""
105 if self._keep_client is None:
106 self._keep_client = KeepClient(api_client=self._api_client,
107 num_retries=self.num_retries)
108 return self._keep_client
110 def stripped_manifest(self) -> str:
111 """Create a copy of the collection manifest with only size hints
113 This method returns a string with the current collection's manifest
114 text with all non-portable locator hints like permission hints and
115 remote cluster hints removed. The only hints in the returned manifest
118 raw = self.manifest_text()
120 for line in raw.split("\n"):
121 fields = line.split()
123 clean_fields = fields[:1] + [
124 (re.sub(r'\+[^\d][^\+]*', '', x)
125 if re.match(arvados.util.keep_locator_pattern, x)
128 clean += [' '.join(clean_fields), "\n"]
129 return ''.join(clean)
132 class _WriterFile(_FileLikeObjectBase):
133 def __init__(self, coll_writer, name):
134 super(_WriterFile, self).__init__(name, 'wb')
135 self.dest = coll_writer
138 super(_WriterFile, self).close()
139 self.dest.finish_current_file()
141 @_FileLikeObjectBase._before_close
142 def write(self, data):
143 self.dest.write(data)
145 @_FileLikeObjectBase._before_close
146 def writelines(self, seq):
150 @_FileLikeObjectBase._before_close
152 self.dest.flush_data()
155 class RichCollectionBase(CollectionBase):
156 """Base class for Collection classes
158 .. ATTENTION:: Internal
159 This class is meant to be used by other parts of the SDK. User code
160 should instantiate or subclass `Collection` or one of its subclasses
164 def __init__(self, parent=None):
166 self._committed = False
167 self._has_remote_blocks = False
168 self._callback = None
172 raise NotImplementedError()
175 raise NotImplementedError()
177 def _my_block_manager(self):
178 raise NotImplementedError()
180 def writable(self) -> bool:
181 """Indicate whether this collection object can be modified
183 This method returns `False` if this object is a `CollectionReader`,
186 raise NotImplementedError()
188 def root_collection(self) -> 'Collection':
189 """Get this collection's root collection object
191 If you open a subcollection with `Collection.find`, calling this method
192 on that subcollection returns the source Collection object.
194 raise NotImplementedError()
196 def stream_name(self) -> str:
197 """Get the name of the manifest stream represented by this collection
199 If you open a subcollection with `Collection.find`, calling this method
200 on that subcollection returns the name of the stream you opened.
202 raise NotImplementedError()
205 def has_remote_blocks(self) -> bool:
206 """Indiciate whether the collection refers to remote data
208 Returns `True` if the collection manifest includes any Keep locators
209 with a remote hint (`+R`), else `False`.
211 if self._has_remote_blocks:
214 if self[item].has_remote_blocks():
219 def set_has_remote_blocks(self, val: bool) -> None:
220 """Cache whether this collection refers to remote blocks
222 .. ATTENTION:: Internal
223 This method is only meant to be used by other Collection methods.
225 Set this collection's cached "has remote blocks" flag to the given
228 self._has_remote_blocks = val
230 self.parent.set_has_remote_blocks(val)
237 create_type: CreateType,
239 """Get the item at the given path, creating it if necessary
241 If `path` refers to a stream in this collection, returns a
242 corresponding `Subcollection` object. If `path` refers to a file in
243 this collection, returns a corresponding
244 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
245 this collection, then this method creates a new object and returns
246 it, creating parent streams as needed. The type of object created is
247 determined by the value of `create_type`.
251 * path: str --- The path to find or create within this collection.
253 * create_type: Literal[COLLECTION, FILE] --- The type of object to
254 create at `path` if one does not exist. Passing `COLLECTION`
255 creates a stream and returns the corresponding
256 `Subcollection`. Passing `FILE` creates a new file and returns the
257 corresponding `arvados.arvfile.ArvadosFile`.
259 pathcomponents = path.split("/", 1)
260 if pathcomponents[0]:
261 item = self._items.get(pathcomponents[0])
262 if len(pathcomponents) == 1:
265 if create_type == COLLECTION:
266 item = Subcollection(self, pathcomponents[0])
268 item = ArvadosFile(self, pathcomponents[0])
269 self._items[pathcomponents[0]] = item
270 self.set_committed(False)
271 self.notify(ADD, self, pathcomponents[0], item)
275 # create new collection
276 item = Subcollection(self, pathcomponents[0])
277 self._items[pathcomponents[0]] = item
278 self.set_committed(False)
279 self.notify(ADD, self, pathcomponents[0], item)
280 if isinstance(item, RichCollectionBase):
281 return item.find_or_create(pathcomponents[1], create_type)
283 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
288 def find(self, path: str) -> CollectionItem:
289 """Get the item at the given path
291 If `path` refers to a stream in this collection, returns a
292 corresponding `Subcollection` object. If `path` refers to a file in
293 this collection, returns a corresponding
294 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
295 this collection, then this method raises `NotADirectoryError`.
299 * path: str --- The path to find or create within this collection.
302 raise errors.ArgumentError("Parameter 'path' is empty.")
304 pathcomponents = path.split("/", 1)
305 if pathcomponents[0] == '':
306 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
308 item = self._items.get(pathcomponents[0])
311 elif len(pathcomponents) == 1:
314 if isinstance(item, RichCollectionBase):
315 if pathcomponents[1]:
316 return item.find(pathcomponents[1])
320 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
323 def mkdirs(self, path: str) -> 'Subcollection':
324 """Create and return a subcollection at `path`
326 If `path` exists within this collection, raises `FileExistsError`.
327 Otherwise, creates a stream at that path and returns the
328 corresponding `Subcollection`.
330 if self.find(path) != None:
331 raise IOError(errno.EEXIST, "Directory or file exists", path)
333 return self.find_or_create(path, COLLECTION)
339 encoding: Optional[str]=None
341 """Open a file-like object within the collection
343 This method returns a file-like object that can read and/or write the
344 file located at `path` within the collection. If you attempt to write
345 a `path` that does not exist, the file is created with `find_or_create`.
346 If the file cannot be opened for any other reason, this method raises
347 `OSError` with an appropriate errno.
351 * path: str --- The path of the file to open within this collection
353 * mode: str --- The mode to open this file. Supports all the same
354 values as `builtins.open`.
356 * encoding: str | None --- The text encoding of the file. Only used
357 when the file is opened in text mode. The default is
361 if not re.search(r'^[rwa][bt]?\+?$', mode):
362 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
364 if mode[0] == 'r' and '+' not in mode:
365 fclass = ArvadosFileReader
366 arvfile = self.find(path)
367 elif not self.writable():
368 raise IOError(errno.EROFS, "Collection is read only")
370 fclass = ArvadosFileWriter
371 arvfile = self.find_or_create(path, FILE)
374 raise IOError(errno.ENOENT, "File not found", path)
375 if not isinstance(arvfile, ArvadosFile):
376 raise IOError(errno.EISDIR, "Is a directory", path)
381 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
382 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
384 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
385 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
388 def modified(self) -> bool:
389 """Indicate whether this collection has an API server record
391 Returns `False` if this collection corresponds to a record loaded from
392 the API server, `True` otherwise.
394 return not self.committed()
398 """Indicate whether this collection has an API server record
400 Returns `True` if this collection corresponds to a record loaded from
401 the API server, `False` otherwise.
403 return self._committed
406 def set_committed(self, value: bool=True):
407 """Cache whether this collection has an API server record
409 .. ATTENTION:: Internal
410 This method is only meant to be used by other Collection methods.
412 Set this collection's cached "committed" flag to the given
413 value and propagates it as needed.
415 if value == self._committed:
418 for k,v in self._items.items():
419 v.set_committed(True)
420 self._committed = True
422 self._committed = False
423 if self.parent is not None:
424 self.parent.set_committed(False)
427 def __iter__(self) -> Iterator[str]:
428 """Iterate names of streams and files in this collection
430 This method does not recurse. It only iterates the contents of this
431 collection's corresponding stream.
433 return iter(self._items)
436 def __getitem__(self, k: str) -> CollectionItem:
437 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
439 This method does not recurse. If you want to search a path, use
440 `RichCollectionBase.find` instead.
442 return self._items[k]
445 def __contains__(self, k: str) -> bool:
446 """Indicate whether this collection has an item with this name
448 This method does not recurse. It you want to check a path, use
449 `RichCollectionBase.exists` instead.
451 return k in self._items
455 """Get the number of items directly contained in this collection
457 This method does not recurse. It only counts the streams and files
458 in this collection's corresponding stream.
460 return len(self._items)
464 def __delitem__(self, p: str) -> None:
465 """Delete an item from this collection's stream
467 This method does not recurse. If you want to remove an item by a
468 path, use `RichCollectionBase.remove` instead.
471 self.set_committed(False)
472 self.notify(DEL, self, p, None)
475 def keys(self) -> Iterator[str]:
476 """Iterate names of streams and files in this collection
478 This method does not recurse. It only iterates the contents of this
479 collection's corresponding stream.
481 return self._items.keys()
484 def values(self) -> List[CollectionItem]:
485 """Get a list of objects in this collection's stream
487 The return value includes a `Subcollection` for every stream, and an
488 `arvados.arvfile.ArvadosFile` for every file, directly within this
489 collection's stream. This method does not recurse.
491 return list(self._items.values())
494 def items(self) -> List[Tuple[str, CollectionItem]]:
495 """Get a list of `(name, object)` tuples from this collection's stream
497 The return value includes a `Subcollection` for every stream, and an
498 `arvados.arvfile.ArvadosFile` for every file, directly within this
499 collection's stream. This method does not recurse.
501 return list(self._items.items())
503 def exists(self, path: str) -> bool:
504 """Indicate whether this collection includes an item at `path`
506 This method returns `True` if `path` refers to a stream or file within
507 this collection, else `False`.
511 * path: str --- The path to check for existence within this collection
513 return self.find(path) is not None
517 def remove(self, path: str, recursive: bool=False) -> None:
518 """Remove the file or stream at `path`
522 * path: str --- The path of the item to remove from the collection
524 * recursive: bool --- Controls the method's behavior if `path` refers
525 to a nonempty stream. If `False` (the default), this method raises
526 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
527 items under the stream.
530 raise errors.ArgumentError("Parameter 'path' is empty.")
532 pathcomponents = path.split("/", 1)
533 item = self._items.get(pathcomponents[0])
535 raise IOError(errno.ENOENT, "File not found", path)
536 if len(pathcomponents) == 1:
537 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
538 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
539 deleteditem = self._items[pathcomponents[0]]
540 del self._items[pathcomponents[0]]
541 self.set_committed(False)
542 self.notify(DEL, self, pathcomponents[0], deleteditem)
544 item.remove(pathcomponents[1], recursive=recursive)
546 def _clonefrom(self, source):
547 for k,v in source.items():
548 self._items[k] = v.clone(self, k)
551 raise NotImplementedError()
557 source_obj: CollectionItem,
559 overwrite: bool=False,
560 reparent: bool=False,
562 """Copy or move a file or subcollection object to this collection
566 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
567 to add to this collection
569 * target_name: str --- The path inside this collection where
570 `source_obj` should be added.
572 * overwrite: bool --- Controls the behavior of this method when the
573 collection already contains an object at `target_name`. If `False`
574 (the default), this method will raise `FileExistsError`. If `True`,
575 the object at `target_name` will be replaced with `source_obj`.
577 * reparent: bool --- Controls whether this method copies or moves
578 `source_obj`. If `False` (the default), `source_obj` is copied into
579 this collection. If `True`, `source_obj` is moved into this
582 if target_name in self and not overwrite:
583 raise IOError(errno.EEXIST, "File already exists", target_name)
586 if target_name in self:
587 modified_from = self[target_name]
589 # Actually make the move or copy.
591 source_obj._reparent(self, target_name)
594 item = source_obj.clone(self, target_name)
596 self._items[target_name] = item
597 self.set_committed(False)
598 if not self._has_remote_blocks and source_obj.has_remote_blocks():
599 self.set_has_remote_blocks(True)
602 self.notify(MOD, self, target_name, (modified_from, item))
604 self.notify(ADD, self, target_name, item)
606 def _get_src_target(self, source, target_path, source_collection, create_dest):
607 if source_collection is None:
608 source_collection = self
611 if isinstance(source, str):
612 source_obj = source_collection.find(source)
613 if source_obj is None:
614 raise IOError(errno.ENOENT, "File not found", source)
615 sourcecomponents = source.split("/")
618 sourcecomponents = None
620 # Find parent collection the target path
621 targetcomponents = target_path.split("/")
623 # Determine the name to use.
624 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
627 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
630 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
632 if len(targetcomponents) > 1:
633 target_dir = self.find("/".join(targetcomponents[0:-1]))
637 if target_dir is None:
638 raise IOError(errno.ENOENT, "Target directory not found", target_name)
640 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
641 target_dir = target_dir[target_name]
642 target_name = sourcecomponents[-1]
644 return (source_obj, target_dir, target_name)
650 source: Union[str, CollectionItem],
652 source_collection: Optional['RichCollectionBase']=None,
653 overwrite: bool=False,
655 """Copy a file or subcollection object to this collection
659 * source: str | arvados.arvfile.ArvadosFile |
660 arvados.collection.Subcollection --- The file or subcollection to
661 add to this collection. If `source` is a str, the object will be
662 found by looking up this path from `source_collection` (see
665 * target_path: str --- The path inside this collection where the
666 source object should be added.
668 * source_collection: arvados.collection.Collection | None --- The
669 collection to find the source object from when `source` is a
670 path. Defaults to the current collection (`self`).
672 * overwrite: bool --- Controls the behavior of this method when the
673 collection already contains an object at `target_path`. If `False`
674 (the default), this method will raise `FileExistsError`. If `True`,
675 the object at `target_path` will be replaced with `source_obj`.
677 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
678 target_dir.add(source_obj, target_name, overwrite, False)
684 source: Union[str, CollectionItem],
686 source_collection: Optional['RichCollectionBase']=None,
687 overwrite: bool=False,
689 """Move a file or subcollection object to this collection
693 * source: str | arvados.arvfile.ArvadosFile |
694 arvados.collection.Subcollection --- The file or subcollection to
695 add to this collection. If `source` is a str, the object will be
696 found by looking up this path from `source_collection` (see
699 * target_path: str --- The path inside this collection where the
700 source object should be added.
702 * source_collection: arvados.collection.Collection | None --- The
703 collection to find the source object from when `source` is a
704 path. Defaults to the current collection (`self`).
706 * overwrite: bool --- Controls the behavior of this method when the
707 collection already contains an object at `target_path`. If `False`
708 (the default), this method will raise `FileExistsError`. If `True`,
709 the object at `target_path` will be replaced with `source_obj`.
711 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
712 if not source_obj.writable():
713 raise IOError(errno.EROFS, "Source collection is read only", source)
714 target_dir.add(source_obj, target_name, overwrite, True)
716 def portable_manifest_text(self, stream_name: str=".") -> str:
717 """Get the portable manifest text for this collection
719 The portable manifest text is normalized, and does not include access
720 tokens. This method does not flush outstanding blocks to Keep.
724 * stream_name: str --- The name to use for this collection's stream in
725 the generated manifest. Default `'.'`.
727 return self._get_manifest_text(stream_name, True, True)
732 stream_name: str=".",
734 normalize: bool=False,
735 only_committed: bool=False,
737 """Get the manifest text for this collection
741 * stream_name: str --- The name to use for this collection's stream in
742 the generated manifest. Default `'.'`.
744 * strip: bool --- Controls whether or not the returned manifest text
745 includes access tokens. If `False` (the default), the manifest text
746 will include access tokens. If `True`, the manifest text will not
747 include access tokens.
749 * normalize: bool --- Controls whether or not the returned manifest
750 text is normalized. Default `False`.
752 * only_committed: bool --- Controls whether or not this method uploads
753 pending data to Keep before building and returning the manifest text.
754 If `False` (the default), this method will finish uploading all data
755 to Keep, then return the final manifest. If `True`, this method will
756 build and return a manifest that only refers to the data that has
757 finished uploading at the time this method was called.
759 if not only_committed:
760 self._my_block_manager().commit_all()
761 return self._get_manifest_text(stream_name, strip, normalize,
762 only_committed=only_committed)
765 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
766 """Get the manifest text for this collection, sub collections and files.
769 Name to use for this stream (directory)
772 If True, remove signing tokens from block locators if present.
773 If False (default), block locators are left unchanged.
776 If True, always export the manifest text in normalized form
777 even if the Collection is not modified. If False (default) and the collection
778 is not modified, return the original manifest text even if it is not
782 If True, only include blocks that were already committed to Keep.
786 if not self.committed() or self._manifest_text is None or normalize:
789 sorted_keys = sorted(self.keys())
790 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
791 # Create a stream per file `k`
792 arvfile = self[filename]
794 for segment in arvfile.segments():
795 loc = segment.locator
796 if arvfile.parent._my_block_manager().is_bufferblock(loc):
799 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
801 loc = KeepLocator(loc).stripped()
802 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
803 segment.segment_offset, segment.range_size))
804 stream[filename] = filestream
806 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
807 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
808 buf.append(self[dirname].manifest_text(
809 stream_name=os.path.join(stream_name, dirname),
810 strip=strip, normalize=True, only_committed=only_committed))
814 return self.stripped_manifest()
816 return self._manifest_text
819 def _copy_remote_blocks(self, remote_blocks={}):
820 """Scan through the entire collection and ask Keep to copy remote blocks.
822 When accessing a remote collection, blocks will have a remote signature
823 (+R instead of +A). Collect these signatures and request Keep to copy the
824 blocks to the local cluster, returning local (+A) signatures.
827 Shared cache of remote to local block mappings. This is used to avoid
828 doing extra work when blocks are shared by more than one file in
829 different subdirectories.
833 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
839 end_collection: 'RichCollectionBase',
841 holding_collection: Optional['Collection']=None,
843 """Build a list of differences between this collection and another
847 * end_collection: arvados.collection.RichCollectionBase --- A
848 collection object with the desired end state. The returned diff
849 list will describe how to go from the current collection object
850 `self` to `end_collection`.
852 * prefix: str --- The name to use for this collection's stream in
853 the diff list. Default `'.'`.
855 * holding_collection: arvados.collection.Collection | None --- A
856 collection object used to hold objects for the returned diff
857 list. By default, a new empty collection is created.
860 if holding_collection is None:
861 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
863 if k not in end_collection:
864 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
865 for k in end_collection:
867 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
868 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
869 elif end_collection[k] != self[k]:
870 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
872 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
874 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
879 def apply(self, changes: ChangeList) -> None:
880 """Apply a list of changes from to this collection
882 This method takes a list of changes generated by
883 `RichCollectionBase.diff` and applies it to this
884 collection. Afterward, the state of this collection object will
885 match the state of `end_collection` passed to `diff`. If a change
886 conflicts with a local change, it will be saved to an alternate path
887 indicating the conflict.
891 * changes: arvados.collection.ChangeList --- The list of differences
892 generated by `RichCollectionBase.diff`.
895 self.set_committed(False)
896 for change in changes:
897 event_type = change[0]
900 local = self.find(path)
901 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
903 if event_type == ADD:
905 # No local file at path, safe to copy over new file
906 self.copy(initial, path)
907 elif local is not None and local != initial:
908 # There is already local file and it is different:
909 # save change to conflict file.
910 self.copy(initial, conflictpath)
911 elif event_type == MOD or event_type == TOK:
914 # Local matches the "initial" item so it has not
915 # changed locally and is safe to update.
916 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
917 # Replace contents of local file with new contents
918 local.replace_contents(final)
920 # Overwrite path with new item; this can happen if
921 # path was a file and is now a collection or vice versa
922 self.copy(final, path, overwrite=True)
924 # Local is missing (presumably deleted) or local doesn't
925 # match the "start" value, so save change to conflict file
926 self.copy(final, conflictpath)
927 elif event_type == DEL:
929 # Local item matches "initial" value, so it is safe to remove.
930 self.remove(path, recursive=True)
931 # else, the file is modified or already removed, in either
932 # case we don't want to try to remove it.
934 def portable_data_hash(self) -> str:
935 """Get the portable data hash for this collection's manifest"""
936 if self._manifest_locator and self.committed():
937 # If the collection is already saved on the API server, and it's committed
938 # then return API server's PDH response.
939 return self._portable_data_hash
941 stripped = self.portable_manifest_text().encode()
942 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
945 def subscribe(self, callback: ChangeCallback) -> None:
946 """Set a notify callback for changes to this collection
950 * callback: arvados.collection.ChangeCallback --- The callable to
951 call each time the collection is changed.
953 if self._callback is None:
954 self._callback = callback
956 raise errors.ArgumentError("A callback is already set on this collection.")
959 def unsubscribe(self) -> None:
960 """Remove any notify callback set for changes to this collection"""
961 if self._callback is not None:
962 self._callback = None
968 collection: 'RichCollectionBase',
970 item: CollectionItem,
972 """Notify any subscribed callback about a change to this collection
974 .. ATTENTION:: Internal
975 This method is only meant to be used by other Collection methods.
977 If a callback has been registered with `RichCollectionBase.subscribe`,
978 it will be called with information about a change to this collection.
979 Then this notification will be propagated to this collection's root.
983 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
986 * collection: arvados.collection.RichCollectionBase --- The
987 collection that was modified.
989 * name: str --- The name of the file or stream within `collection` that
992 * item: arvados.arvfile.ArvadosFile |
993 arvados.collection.Subcollection --- The new contents at `name`
997 self._callback(event, collection, name, item)
998 self.root_collection().notify(event, collection, name, item)
1001 def __eq__(self, other: Any) -> bool:
1002 """Indicate whether this collection object is equal to another"""
1005 if not isinstance(other, RichCollectionBase):
1007 if len(self._items) != len(other):
1009 for k in self._items:
1012 if self._items[k] != other[k]:
1016 def __ne__(self, other: Any) -> bool:
1017 """Indicate whether this collection object is not equal to another"""
1018 return not self.__eq__(other)
1021 def flush(self) -> None:
1022 """Upload any pending data to Keep"""
1023 for e in self.values():
1027 class Collection(RichCollectionBase):
1028 """Read and manipulate an Arvados collection
1030 This class provides a high-level interface to create, read, and update
1031 Arvados collections and their contents. Refer to the Arvados Python SDK
1032 cookbook for [an introduction to using the Collection class][cookbook].
1034 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1037 def __init__(self, manifest_locator_or_text: Optional[str]=None,
1038 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1039 keep_client: Optional['arvados.keep.KeepClient']=None,
1040 num_retries: int=10,
1041 parent: Optional['Collection']=None,
1042 apiconfig: Optional[Mapping[str, str]]=None,
1043 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1044 replication_desired: Optional[int]=None,
1045 storage_classes_desired: Optional[List[str]]=None,
1046 put_threads: Optional[int]=None):
1047 """Initialize a Collection object
1051 * manifest_locator_or_text: str | None --- This string can contain a
1052 collection manifest text, portable data hash, or UUID. When given a
1053 portable data hash or UUID, this instance will load a collection
1054 record from the API server. Otherwise, this instance will represent a
1055 new collection without an API server record. The default value `None`
1056 instantiates a new collection with an empty manifest.
1058 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1059 Arvados API client object this instance uses to make requests. If
1060 none is given, this instance creates its own client using the
1061 settings from `apiconfig` (see below). If your client instantiates
1062 many Collection objects, you can help limit memory utilization by
1063 calling `arvados.api.api` to construct an
1064 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1065 for every Collection.
1067 * keep_client: arvados.keep.KeepClient | None --- The Keep client
1068 object this instance uses to make requests. If none is given, this
1069 instance creates its own client using its `api_client`.
1071 * num_retries: int --- The number of times that client requests are
1072 retried. Default 10.
1074 * parent: arvados.collection.Collection | None --- The parent Collection
1075 object of this instance, if any. This argument is primarily used by
1076 other Collection methods; user client code shouldn't need to use it.
1078 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1079 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1080 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1081 Collection object constructs one from these settings. If no
1082 mapping is provided, calls `arvados.config.settings` to get these
1083 parameters from user configuration.
1085 * block_manager: arvados.arvfile._BlockManager | None --- The
1086 _BlockManager object used by this instance to coordinate reading
1087 and writing Keep data blocks. If none is given, this instance
1088 constructs its own. This argument is primarily used by other
1089 Collection methods; user client code shouldn't need to use it.
1091 * replication_desired: int | None --- This controls both the value of
1092 the `replication_desired` field on API collection records saved by
1093 this class, as well as the number of Keep services that the object
1094 writes new data blocks to. If none is given, uses the default value
1095 configured for the cluster.
1097 * storage_classes_desired: list[str] | None --- This controls both
1098 the value of the `storage_classes_desired` field on API collection
1099 records saved by this class, as well as selecting which specific
1100 Keep services the object writes new data blocks to. If none is
1101 given, defaults to an empty list.
1103 * put_threads: int | None --- The number of threads to run
1104 simultaneously to upload data blocks to Keep. This value is used when
1105 building a new `block_manager`. It is unused when a `block_manager`
1109 if storage_classes_desired and type(storage_classes_desired) is not list:
1110 raise errors.ArgumentError("storage_classes_desired must be list type.")
1112 super(Collection, self).__init__(parent)
1113 self._api_client = api_client
1114 self._keep_client = keep_client
1116 # Use the keep client from ThreadSafeApiCache
1117 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1118 self._keep_client = self._api_client.keep
1120 self._block_manager = block_manager
1121 self.replication_desired = replication_desired
1122 self._storage_classes_desired = storage_classes_desired
1123 self.put_threads = put_threads
1126 self._config = apiconfig
1128 self._config = config.settings()
1130 self.num_retries = num_retries
1131 self._manifest_locator = None
1132 self._manifest_text = None
1133 self._portable_data_hash = None
1134 self._api_response = None
1135 self._past_versions = set()
1137 self.lock = threading.RLock()
1140 if manifest_locator_or_text:
1141 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1142 self._manifest_locator = manifest_locator_or_text
1143 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1144 self._manifest_locator = manifest_locator_or_text
1145 if not self._has_local_collection_uuid():
1146 self._has_remote_blocks = True
1147 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1148 self._manifest_text = manifest_locator_or_text
1149 if '+R' in self._manifest_text:
1150 self._has_remote_blocks = True
1152 raise errors.ArgumentError(
1153 "Argument to CollectionReader is not a manifest or a collection UUID")
1157 except errors.SyntaxError as e:
1158 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1160 def storage_classes_desired(self) -> List[str]:
1161 """Get this collection's `storage_classes_desired` value"""
1162 return self._storage_classes_desired or []
1164 def root_collection(self) -> 'Collection':
1167 def get_properties(self) -> Properties:
1168 """Get this collection's properties
1170 This method always returns a dict. If this collection object does not
1171 have an associated API record, or that record does not have any
1172 properties set, this method returns an empty dict.
1174 if self._api_response and self._api_response["properties"]:
1175 return self._api_response["properties"]
1179 def get_trash_at(self) -> Optional[datetime.datetime]:
1180 """Get this collection's `trash_at` field
1182 This method parses the `trash_at` field of the collection's API
1183 record and returns a datetime from it. If that field is not set, or
1184 this collection object does not have an associated API record,
1187 if self._api_response and self._api_response["trash_at"]:
1189 return ciso8601.parse_datetime(self._api_response["trash_at"])
1195 def stream_name(self) -> str:
1198 def writable(self) -> bool:
1202 def known_past_version(
1204 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1206 """Indicate whether an API record for this collection has been seen before
1208 As this collection object loads records from the API server, it records
1209 their `modified_at` and `portable_data_hash` fields. This method accepts
1210 a 2-tuple with values for those fields, and returns `True` if the
1211 combination was previously loaded.
1213 return modified_at_and_portable_data_hash in self._past_versions
1219 other: Optional['Collection']=None,
1220 num_retries: Optional[int]=None,
1222 """Merge another collection's contents into this one
1224 This method compares the manifest of this collection instance with
1225 another, then updates this instance's manifest with changes from the
1226 other, renaming files to flag conflicts where necessary.
1228 When called without any arguments, this method reloads the collection's
1229 API record, and updates this instance with any changes that have
1230 appeared server-side. If this instance does not have a corresponding
1231 API record, this method raises `arvados.errors.ArgumentError`.
1235 * other: arvados.collection.Collection | None --- The collection
1236 whose contents should be merged into this instance. When not
1237 provided, this method reloads this collection's API record and
1238 constructs a Collection object from it. If this instance does not
1239 have a corresponding API record, this method raises
1240 `arvados.errors.ArgumentError`.
1242 * num_retries: int | None --- The number of times to retry reloading
1243 the collection's API record from the API server. If not specified,
1244 uses the `num_retries` provided when this instance was constructed.
1247 if self._manifest_locator is None:
1248 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1249 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1250 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1251 response.get("portable_data_hash") != self.portable_data_hash()):
1252 # The record on the server is different from our current one, but we've seen it before,
1253 # so ignore it because it's already been merged.
1254 # However, if it's the same as our current record, proceed with the update, because we want to update
1258 self._remember_api_response(response)
1259 other = CollectionReader(response["manifest_text"])
1260 baseline = CollectionReader(self._manifest_text)
1261 self.apply(baseline.diff(other))
1262 self._manifest_text = self.manifest_text()
1266 if self._api_client is None:
1267 self._api_client = ThreadSafeApiCache(self._config, version='v1')
1268 if self._keep_client is None:
1269 self._keep_client = self._api_client.keep
1270 return self._api_client
1274 if self._keep_client is None:
1275 if self._api_client is None:
1278 self._keep_client = KeepClient(api_client=self._api_client)
1279 return self._keep_client
1282 def _my_block_manager(self):
1283 if self._block_manager is None:
1284 copies = (self.replication_desired or
1285 self._my_api()._rootDesc.get('defaultCollectionReplication',
1287 self._block_manager = _BlockManager(self._my_keep(),
1289 put_threads=self.put_threads,
1290 num_retries=self.num_retries,
1291 storage_classes_func=self.storage_classes_desired)
1292 return self._block_manager
1294 def _remember_api_response(self, response):
1295 self._api_response = response
1296 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1298 def _populate_from_api_server(self):
1299 # As in KeepClient itself, we must wait until the last
1300 # possible moment to instantiate an API client, in order to
1301 # avoid tripping up clients that don't have access to an API
1302 # server. If we do build one, make sure our Keep client uses
1303 # it. If instantiation fails, we'll fall back to the except
1304 # clause, just like any other Collection lookup
1305 # failure. Return an exception, or None if successful.
1306 self._remember_api_response(self._my_api().collections().get(
1307 uuid=self._manifest_locator).execute(
1308 num_retries=self.num_retries))
1309 self._manifest_text = self._api_response['manifest_text']
1310 self._portable_data_hash = self._api_response['portable_data_hash']
1311 # If not overriden via kwargs, we should try to load the
1312 # replication_desired and storage_classes_desired from the API server
1313 if self.replication_desired is None:
1314 self.replication_desired = self._api_response.get('replication_desired', None)
1315 if self._storage_classes_desired is None:
1316 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1318 def _populate(self):
1319 if self._manifest_text is None:
1320 if self._manifest_locator is None:
1323 self._populate_from_api_server()
1324 self._baseline_manifest = self._manifest_text
1325 self._import_manifest(self._manifest_text)
1327 def _has_collection_uuid(self):
1328 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1330 def _has_local_collection_uuid(self):
1331 return self._has_collection_uuid and \
1332 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1334 def __enter__(self):
1337 def __exit__(self, exc_type, exc_value, traceback):
1338 """Exit a context with this collection instance
1340 If no exception was raised inside the context block, and this
1341 collection is writable and has a corresponding API record, that
1342 record will be updated to match the state of this instance at the end
1345 if exc_type is None:
1346 if self.writable() and self._has_collection_uuid():
1350 def stop_threads(self) -> None:
1351 """Stop background Keep upload/download threads"""
1352 if self._block_manager is not None:
1353 self._block_manager.stop_threads()
1356 def manifest_locator(self) -> Optional[str]:
1357 """Get this collection's manifest locator, if any
1359 * If this collection instance is associated with an API record with a
1361 * Otherwise, if this collection instance was loaded from an API record
1362 by portable data hash, return that.
1363 * Otherwise, return `None`.
1365 return self._manifest_locator
1370 new_parent: Optional['Collection']=None,
1371 new_name: Optional[str]=None,
1372 readonly: bool=False,
1373 new_config: Optional[Mapping[str, str]]=None,
1375 """Create a Collection object with the same contents as this instance
1377 This method creates a new Collection object with contents that match
1378 this instance's. The new collection will not be associated with any API
1383 * new_parent: arvados.collection.Collection | None --- This value is
1384 passed to the new Collection's constructor as the `parent`
1387 * new_name: str | None --- This value is unused.
1389 * readonly: bool --- If this value is true, this method constructs and
1390 returns a `CollectionReader`. Otherwise, it returns a mutable
1391 `Collection`. Default `False`.
1393 * new_config: Mapping[str, str] | None --- This value is passed to the
1394 new Collection's constructor as `apiconfig`. If no value is provided,
1395 defaults to the configuration passed to this instance's constructor.
1397 if new_config is None:
1398 new_config = self._config
1400 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1402 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1404 newcollection._clonefrom(self)
1405 return newcollection
1408 def api_response(self) -> Optional[Dict[str, Any]]:
1409 """Get this instance's associated API record
1411 If this Collection instance has an associated API record, return it.
1412 Otherwise, return `None`.
1414 return self._api_response
1419 create_type: CreateType,
1420 ) -> CollectionItem:
1424 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1426 def find(self, path: str) -> CollectionItem:
1430 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1432 def remove(self, path: str, recursive: bool=False) -> None:
1434 raise errors.ArgumentError("Cannot remove '.'")
1436 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1443 properties: Optional[Properties]=None,
1444 storage_classes: Optional[StorageClasses]=None,
1445 trash_at: Optional[datetime.datetime]=None,
1447 num_retries: Optional[int]=None,
1448 preserve_version: bool=False,
1450 """Save collection to an existing API record
1452 This method updates the instance's corresponding API record to match
1453 the instance's state. If this instance does not have a corresponding API
1454 record yet, raises `AssertionError`. (To create a new API record, use
1455 `Collection.save_new`.) This method returns the saved collection
1460 * properties: dict[str, Any] | None --- If provided, the API record will
1461 be updated with these properties. Note this will completely replace
1462 any existing properties.
1464 * storage_classes: list[str] | None --- If provided, the API record will
1465 be updated with this value in the `storage_classes_desired` field.
1466 This value will also be saved on the instance and used for any
1467 changes that follow.
1469 * trash_at: datetime.datetime | None --- If provided, the API record
1470 will be updated with this value in the `trash_at` field.
1472 * merge: bool --- If `True` (the default), this method will first
1473 reload this collection's API record, and merge any new contents into
1474 this instance before saving changes. See `Collection.update` for
1477 * num_retries: int | None --- The number of times to retry reloading
1478 the collection's API record from the API server. If not specified,
1479 uses the `num_retries` provided when this instance was constructed.
1481 * preserve_version: bool --- This value will be passed to directly
1482 to the underlying API call. If `True`, the Arvados API will
1483 preserve the versions of this collection both immediately before
1484 and after the update. If `True` when the API server is not
1485 configured with collection versioning, this method raises
1486 `arvados.errors.ArgumentError`.
1488 if properties and type(properties) is not dict:
1489 raise errors.ArgumentError("properties must be dictionary type.")
1491 if storage_classes and type(storage_classes) is not list:
1492 raise errors.ArgumentError("storage_classes must be list type.")
1494 self._storage_classes_desired = storage_classes
1496 if trash_at and type(trash_at) is not datetime.datetime:
1497 raise errors.ArgumentError("trash_at must be datetime type.")
1499 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1500 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1504 body["properties"] = properties
1505 if self.storage_classes_desired():
1506 body["storage_classes_desired"] = self.storage_classes_desired()
1508 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1509 body["trash_at"] = t
1510 if preserve_version:
1511 body["preserve_version"] = preserve_version
1513 if not self.committed():
1514 if self._has_remote_blocks:
1515 # Copy any remote blocks to the local cluster.
1516 self._copy_remote_blocks(remote_blocks={})
1517 self._has_remote_blocks = False
1518 if not self._has_collection_uuid():
1519 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1520 elif not self._has_local_collection_uuid():
1521 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1523 self._my_block_manager().commit_all()
1528 text = self.manifest_text(strip=False)
1529 body['manifest_text'] = text
1531 self._remember_api_response(self._my_api().collections().update(
1532 uuid=self._manifest_locator,
1534 ).execute(num_retries=num_retries))
1535 self._manifest_text = self._api_response["manifest_text"]
1536 self._portable_data_hash = self._api_response["portable_data_hash"]
1537 self.set_committed(True)
1539 self._remember_api_response(self._my_api().collections().update(
1540 uuid=self._manifest_locator,
1542 ).execute(num_retries=num_retries))
1544 return self._manifest_text
1552 name: Optional[str]=None,
1553 create_collection_record: bool=True,
1554 owner_uuid: Optional[str]=None,
1555 properties: Optional[Properties]=None,
1556 storage_classes: Optional[StorageClasses]=None,
1557 trash_at: Optional[datetime.datetime]=None,
1558 ensure_unique_name: bool=False,
1559 num_retries: Optional[int]=None,
1560 preserve_version: bool=False,
1562 """Save collection to a new API record
1564 This method finishes uploading new data blocks and (optionally)
1565 creates a new API collection record with the provided data. If a new
1566 record is created, this instance becomes associated with that record
1567 for future updates like `save()`. This method returns the saved
1568 collection manifest.
1572 * name: str | None --- The `name` field to use on the new collection
1573 record. If not specified, a generic default name is generated.
1575 * create_collection_record: bool --- If `True` (the default), creates a
1576 collection record on the API server. If `False`, the method finishes
1577 all data uploads and only returns the resulting collection manifest
1578 without sending it to the API server.
1580 * owner_uuid: str | None --- The `owner_uuid` field to use on the
1581 new collection record.
1583 * properties: dict[str, Any] | None --- The `properties` field to use on
1584 the new collection record.
1586 * storage_classes: list[str] | None --- The
1587 `storage_classes_desired` field to use on the new collection record.
1589 * trash_at: datetime.datetime | None --- The `trash_at` field to use
1590 on the new collection record.
1592 * ensure_unique_name: bool --- This value is passed directly to the
1593 Arvados API when creating the collection record. If `True`, the API
1594 server may modify the submitted `name` to ensure the collection's
1595 `name`+`owner_uuid` combination is unique. If `False` (the default),
1596 if a collection already exists with this same `name`+`owner_uuid`
1597 combination, creating a collection record will raise a validation
1600 * num_retries: int | None --- The number of times to retry reloading
1601 the collection's API record from the API server. If not specified,
1602 uses the `num_retries` provided when this instance was constructed.
1604 * preserve_version: bool --- This value will be passed to directly
1605 to the underlying API call. If `True`, the Arvados API will
1606 preserve the versions of this collection both immediately before
1607 and after the update. If `True` when the API server is not
1608 configured with collection versioning, this method raises
1609 `arvados.errors.ArgumentError`.
1611 if properties and type(properties) is not dict:
1612 raise errors.ArgumentError("properties must be dictionary type.")
1614 if storage_classes and type(storage_classes) is not list:
1615 raise errors.ArgumentError("storage_classes must be list type.")
1617 if trash_at and type(trash_at) is not datetime.datetime:
1618 raise errors.ArgumentError("trash_at must be datetime type.")
1620 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1621 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1623 if self._has_remote_blocks:
1624 # Copy any remote blocks to the local cluster.
1625 self._copy_remote_blocks(remote_blocks={})
1626 self._has_remote_blocks = False
1629 self._storage_classes_desired = storage_classes
1631 self._my_block_manager().commit_all()
1632 text = self.manifest_text(strip=False)
1634 if create_collection_record:
1636 name = "New collection"
1637 ensure_unique_name = True
1639 body = {"manifest_text": text,
1641 "replication_desired": self.replication_desired}
1643 body["owner_uuid"] = owner_uuid
1645 body["properties"] = properties
1646 if self.storage_classes_desired():
1647 body["storage_classes_desired"] = self.storage_classes_desired()
1649 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1650 body["trash_at"] = t
1651 if preserve_version:
1652 body["preserve_version"] = preserve_version
1654 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1655 text = self._api_response["manifest_text"]
1657 self._manifest_locator = self._api_response["uuid"]
1658 self._portable_data_hash = self._api_response["portable_data_hash"]
1660 self._manifest_text = text
1661 self.set_committed(True)
1665 _token_re = re.compile(r'(\S+)(\s+|$)')
1666 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1667 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1669 def _unescape_manifest_path(self, path):
1670 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1673 def _import_manifest(self, manifest_text):
1674 """Import a manifest into a `Collection`.
1677 The manifest text to import from.
1681 raise ArgumentError("Can only import manifest into an empty collection")
1690 for token_and_separator in self._token_re.finditer(manifest_text):
1691 tok = token_and_separator.group(1)
1692 sep = token_and_separator.group(2)
1694 if state == STREAM_NAME:
1695 # starting a new stream
1696 stream_name = self._unescape_manifest_path(tok)
1701 self.find_or_create(stream_name, COLLECTION)
1705 block_locator = self._block_re.match(tok)
1707 blocksize = int(block_locator.group(1))
1708 blocks.append(Range(tok, streamoffset, blocksize, 0))
1709 streamoffset += blocksize
1713 if state == SEGMENTS:
1714 file_segment = self._segment_re.match(tok)
1716 pos = int(file_segment.group(1))
1717 size = int(file_segment.group(2))
1718 name = self._unescape_manifest_path(file_segment.group(3))
1719 if name.split('/')[-1] == '.':
1720 # placeholder for persisting an empty directory, not a real file
1722 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1724 filepath = os.path.join(stream_name, name)
1726 afile = self.find_or_create(filepath, FILE)
1727 except IOError as e:
1728 if e.errno == errno.ENOTDIR:
1729 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1732 if isinstance(afile, ArvadosFile):
1733 afile.add_segment(blocks, pos, size)
1735 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1738 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1744 self.set_committed(True)
1750 collection: 'RichCollectionBase',
1752 item: CollectionItem,
1755 self._callback(event, collection, name, item)
1758 class Subcollection(RichCollectionBase):
1759 """Read and manipulate a stream/directory within an Arvados collection
1761 This class represents a single stream (like a directory) within an Arvados
1762 `Collection`. It is returned by `Collection.find` and provides the same API.
1763 Operations that work on the API collection record propagate to the parent
1764 `Collection` object.
1767 def __init__(self, parent, name):
1768 super(Subcollection, self).__init__(parent)
1769 self.lock = self.root_collection().lock
1770 self._manifest_text = None
1772 self.num_retries = parent.num_retries
1774 def root_collection(self) -> 'Collection':
1775 return self.parent.root_collection()
1777 def writable(self) -> bool:
1778 return self.root_collection().writable()
1781 return self.root_collection()._my_api()
1784 return self.root_collection()._my_keep()
1786 def _my_block_manager(self):
1787 return self.root_collection()._my_block_manager()
1789 def stream_name(self) -> str:
1790 return os.path.join(self.parent.stream_name(), self.name)
1795 new_parent: Optional['Collection']=None,
1796 new_name: Optional[str]=None,
1797 ) -> 'Subcollection':
1798 c = Subcollection(new_parent, new_name)
1804 def _reparent(self, newparent, newname):
1805 self.set_committed(False)
1807 self.parent.remove(self.name, recursive=True)
1808 self.parent = newparent
1810 self.lock = self.parent.root_collection().lock
1813 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1814 """Encode empty directories by using an \056-named (".") empty file"""
1815 if len(self._items) == 0:
1816 return "%s %s 0:0:\\056\n" % (
1817 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1818 return super(Subcollection, self)._get_manifest_text(stream_name,
1823 class CollectionReader(Collection):
1824 """Read-only `Collection` subclass
1826 This class will never create or update any API collection records. You can
1827 use this class for additional code safety when you only need to read
1828 existing collections.
1830 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1831 self._in_init = True
1832 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1833 self._in_init = False
1835 # Forego any locking since it should never change once initialized.
1836 self.lock = NoopLock()
1838 # Backwards compatability with old CollectionReader
1839 # all_streams() and all_files()
1840 self._streams = None
1842 def writable(self) -> bool:
1843 return self._in_init
1845 def _populate_streams(orig_func):
1846 @functools.wraps(orig_func)
1847 def populate_streams_wrapper(self, *args, **kwargs):
1848 # Defer populating self._streams until needed since it creates a copy of the manifest.
1849 if self._streams is None:
1850 if self._manifest_text:
1851 self._streams = [sline.split()
1852 for sline in self._manifest_text.split("\n")
1856 return orig_func(self, *args, **kwargs)
1857 return populate_streams_wrapper
1859 @arvados.util._deprecated('3.0', 'Collection iteration')
1861 def normalize(self):
1862 """Normalize the streams returned by `all_streams`"""
1864 for s in self.all_streams():
1865 for f in s.all_files():
1866 streamname, filename = split(s.name() + "/" + f.name())
1867 if streamname not in streams:
1868 streams[streamname] = {}
1869 if filename not in streams[streamname]:
1870 streams[streamname][filename] = []
1871 for r in f.segments:
1872 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1874 self._streams = [normalize_stream(s, streams[s])
1875 for s in sorted(streams)]
1877 @arvados.util._deprecated('3.0', 'Collection iteration')
1879 def all_streams(self):
1880 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1881 for s in self._streams]
1883 @arvados.util._deprecated('3.0', 'Collection iteration')
1885 def all_files(self):
1886 for s in self.all_streams():
1887 for f in s.all_files():
1891 class CollectionWriter(CollectionBase):
1892 """Create a new collection from scratch
1894 .. WARNING:: Deprecated
1895 This class is deprecated. Prefer `arvados.collection.Collection`
1899 @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1900 def __init__(self, api_client=None, num_retries=0, replication=None):
1901 """Instantiate a CollectionWriter.
1903 CollectionWriter lets you build a new Arvados Collection from scratch.
1904 Write files to it. The CollectionWriter will upload data to Keep as
1905 appropriate, and provide you with the Collection manifest text when
1909 * api_client: The API client to use to look up Collections. If not
1910 provided, CollectionReader will build one from available Arvados
1912 * num_retries: The default number of times to retry failed
1913 service requests. Default 0. You may change this value
1914 after instantiation, but note those changes may not
1915 propagate to related objects like the Keep client.
1916 * replication: The number of copies of each block to store.
1917 If this argument is None or not supplied, replication is
1918 the server-provided default if available, otherwise 2.
1920 self._api_client = api_client
1921 self.num_retries = num_retries
1922 self.replication = (2 if replication is None else replication)
1923 self._keep_client = None
1924 self._data_buffer = []
1925 self._data_buffer_len = 0
1926 self._current_stream_files = []
1927 self._current_stream_length = 0
1928 self._current_stream_locators = []
1929 self._current_stream_name = '.'
1930 self._current_file_name = None
1931 self._current_file_pos = 0
1932 self._finished_streams = []
1933 self._close_file = None
1934 self._queued_file = None
1935 self._queued_dirents = deque()
1936 self._queued_trees = deque()
1937 self._last_open = None
1939 def __exit__(self, exc_type, exc_value, traceback):
1940 if exc_type is None:
1943 def do_queued_work(self):
1944 # The work queue consists of three pieces:
1945 # * _queued_file: The file object we're currently writing to the
1947 # * _queued_dirents: Entries under the current directory
1948 # (_queued_trees[0]) that we want to write or recurse through.
1949 # This may contain files from subdirectories if
1950 # max_manifest_depth == 0 for this directory.
1951 # * _queued_trees: Directories that should be written as separate
1952 # streams to the Collection.
1953 # This function handles the smallest piece of work currently queued
1954 # (current file, then current directory, then next directory) until
1955 # no work remains. The _work_THING methods each do a unit of work on
1956 # THING. _queue_THING methods add a THING to the work queue.
1958 if self._queued_file:
1960 elif self._queued_dirents:
1961 self._work_dirents()
1962 elif self._queued_trees:
1967 def _work_file(self):
1969 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1973 self.finish_current_file()
1974 if self._close_file:
1975 self._queued_file.close()
1976 self._close_file = None
1977 self._queued_file = None
1979 def _work_dirents(self):
1980 path, stream_name, max_manifest_depth = self._queued_trees[0]
1981 if stream_name != self.current_stream_name():
1982 self.start_new_stream(stream_name)
1983 while self._queued_dirents:
1984 dirent = self._queued_dirents.popleft()
1985 target = os.path.join(path, dirent)
1986 if os.path.isdir(target):
1987 self._queue_tree(target,
1988 os.path.join(stream_name, dirent),
1989 max_manifest_depth - 1)
1991 self._queue_file(target, dirent)
1993 if not self._queued_dirents:
1994 self._queued_trees.popleft()
1996 def _work_trees(self):
1997 path, stream_name, max_manifest_depth = self._queued_trees[0]
1998 d = arvados.util.listdir_recursive(
1999 path, max_depth = (None if max_manifest_depth == 0 else 0))
2001 self._queue_dirents(stream_name, d)
2003 self._queued_trees.popleft()
2005 def _queue_file(self, source, filename=None):
2006 assert (self._queued_file is None), "tried to queue more than one file"
2007 if not hasattr(source, 'read'):
2008 source = open(source, 'rb')
2009 self._close_file = True
2011 self._close_file = False
2012 if filename is None:
2013 filename = os.path.basename(source.name)
2014 self.start_new_file(filename)
2015 self._queued_file = source
2017 def _queue_dirents(self, stream_name, dirents):
2018 assert (not self._queued_dirents), "tried to queue more than one tree"
2019 self._queued_dirents = deque(sorted(dirents))
2021 def _queue_tree(self, path, stream_name, max_manifest_depth):
2022 self._queued_trees.append((path, stream_name, max_manifest_depth))
2024 def write_file(self, source, filename=None):
2025 self._queue_file(source, filename)
2026 self.do_queued_work()
2028 def write_directory_tree(self,
2029 path, stream_name='.', max_manifest_depth=-1):
2030 self._queue_tree(path, stream_name, max_manifest_depth)
2031 self.do_queued_work()
2033 def write(self, newdata):
2034 if isinstance(newdata, bytes):
2036 elif isinstance(newdata, str):
2037 newdata = newdata.encode()
2038 elif hasattr(newdata, '__iter__'):
2042 self._data_buffer.append(newdata)
2043 self._data_buffer_len += len(newdata)
2044 self._current_stream_length += len(newdata)
2045 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2048 def open(self, streampath, filename=None):
2049 """open(streampath[, filename]) -> file-like object
2051 Pass in the path of a file to write to the Collection, either as a
2052 single string or as two separate stream name and file name arguments.
2053 This method returns a file-like object you can write to add it to the
2056 You may only have one file object from the Collection open at a time,
2057 so be sure to close the object when you're done. Using the object in
2058 a with statement makes that easy:
2060 with cwriter.open('./doc/page1.txt') as outfile:
2061 outfile.write(page1_data)
2062 with cwriter.open('./doc/page2.txt') as outfile:
2063 outfile.write(page2_data)
2065 if filename is None:
2066 streampath, filename = split(streampath)
2067 if self._last_open and not self._last_open.closed:
2068 raise errors.AssertionError(
2069 u"can't open '{}' when '{}' is still open".format(
2070 filename, self._last_open.name))
2071 if streampath != self.current_stream_name():
2072 self.start_new_stream(streampath)
2073 self.set_current_file_name(filename)
2074 self._last_open = _WriterFile(self, filename)
2075 return self._last_open
2077 def flush_data(self):
2078 data_buffer = b''.join(self._data_buffer)
2080 self._current_stream_locators.append(
2081 self._my_keep().put(
2082 data_buffer[0:config.KEEP_BLOCK_SIZE],
2083 copies=self.replication))
2084 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2085 self._data_buffer_len = len(self._data_buffer[0])
2087 def start_new_file(self, newfilename=None):
2088 self.finish_current_file()
2089 self.set_current_file_name(newfilename)
2091 def set_current_file_name(self, newfilename):
2092 if re.search(r'[\t\n]', newfilename):
2093 raise errors.AssertionError(
2094 "Manifest filenames cannot contain whitespace: %s" %
2096 elif re.search(r'\x00', newfilename):
2097 raise errors.AssertionError(
2098 "Manifest filenames cannot contain NUL characters: %s" %
2100 self._current_file_name = newfilename
2102 def current_file_name(self):
2103 return self._current_file_name
2105 def finish_current_file(self):
2106 if self._current_file_name is None:
2107 if self._current_file_pos == self._current_stream_length:
2109 raise errors.AssertionError(
2110 "Cannot finish an unnamed file " +
2111 "(%d bytes at offset %d in '%s' stream)" %
2112 (self._current_stream_length - self._current_file_pos,
2113 self._current_file_pos,
2114 self._current_stream_name))
2115 self._current_stream_files.append([
2116 self._current_file_pos,
2117 self._current_stream_length - self._current_file_pos,
2118 self._current_file_name])
2119 self._current_file_pos = self._current_stream_length
2120 self._current_file_name = None
2122 def start_new_stream(self, newstreamname='.'):
2123 self.finish_current_stream()
2124 self.set_current_stream_name(newstreamname)
2126 def set_current_stream_name(self, newstreamname):
2127 if re.search(r'[\t\n]', newstreamname):
2128 raise errors.AssertionError(
2129 "Manifest stream names cannot contain whitespace: '%s'" %
2131 self._current_stream_name = '.' if newstreamname=='' else newstreamname
2133 def current_stream_name(self):
2134 return self._current_stream_name
2136 def finish_current_stream(self):
2137 self.finish_current_file()
2139 if not self._current_stream_files:
2141 elif self._current_stream_name is None:
2142 raise errors.AssertionError(
2143 "Cannot finish an unnamed stream (%d bytes in %d files)" %
2144 (self._current_stream_length, len(self._current_stream_files)))
2146 if not self._current_stream_locators:
2147 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2148 self._finished_streams.append([self._current_stream_name,
2149 self._current_stream_locators,
2150 self._current_stream_files])
2151 self._current_stream_files = []
2152 self._current_stream_length = 0
2153 self._current_stream_locators = []
2154 self._current_stream_name = None
2155 self._current_file_pos = 0
2156 self._current_file_name = None
2159 """Store the manifest in Keep and return its locator.
2161 This is useful for storing manifest fragments (task outputs)
2162 temporarily in Keep during a Crunch job.
2164 In other cases you should make a collection instead, by
2165 sending manifest_text() to the API server's "create
2166 collection" endpoint.
2168 return self._my_keep().put(self.manifest_text().encode(),
2169 copies=self.replication)
2171 def portable_data_hash(self):
2172 stripped = self.stripped_manifest().encode()
2173 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2175 def manifest_text(self):
2176 self.finish_current_stream()
2179 for stream in self._finished_streams:
2180 if not re.search(r'^\.(/.*)?$', stream[0]):
2182 manifest += stream[0].replace(' ', '\\040')
2183 manifest += ' ' + ' '.join(stream[1])
2184 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2189 def data_locators(self):
2191 for name, locators, files in self._finished_streams:
2195 def save_new(self, name=None):
2196 return self._api_client.collections().create(
2197 ensure_unique_name=True,
2200 'manifest_text': self.manifest_text(),
2201 }).execute(num_retries=self.num_retries)
2204 class ResumableCollectionWriter(CollectionWriter):
2205 """CollectionWriter that can serialize internal state to disk
2207 .. WARNING:: Deprecated
2208 This class is deprecated. Prefer `arvados.collection.Collection`
2212 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2213 '_current_stream_locators', '_current_stream_name',
2214 '_current_file_name', '_current_file_pos', '_close_file',
2215 '_data_buffer', '_dependencies', '_finished_streams',
2216 '_queued_dirents', '_queued_trees']
2218 @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2219 def __init__(self, api_client=None, **kwargs):
2220 self._dependencies = {}
2221 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2224 def from_state(cls, state, *init_args, **init_kwargs):
2225 # Try to build a new writer from scratch with the given state.
2226 # If the state is not suitable to resume (because files have changed,
2227 # been deleted, aren't predictable, etc.), raise a
2228 # StaleWriterStateError. Otherwise, return the initialized writer.
2229 # The caller is responsible for calling writer.do_queued_work()
2230 # appropriately after it's returned.
2231 writer = cls(*init_args, **init_kwargs)
2232 for attr_name in cls.STATE_PROPS:
2233 attr_value = state[attr_name]
2234 attr_class = getattr(writer, attr_name).__class__
2235 # Coerce the value into the same type as the initial value, if
2237 if attr_class not in (type(None), attr_value.__class__):
2238 attr_value = attr_class(attr_value)
2239 setattr(writer, attr_name, attr_value)
2240 # Check dependencies before we try to resume anything.
2241 if any(KeepLocator(ls).permission_expired()
2242 for ls in writer._current_stream_locators):
2243 raise errors.StaleWriterStateError(
2244 "locators include expired permission hint")
2245 writer.check_dependencies()
2246 if state['_current_file'] is not None:
2247 path, pos = state['_current_file']
2249 writer._queued_file = open(path, 'rb')
2250 writer._queued_file.seek(pos)
2251 except IOError as error:
2252 raise errors.StaleWriterStateError(
2253 u"failed to reopen active file {}: {}".format(path, error))
2256 def check_dependencies(self):
2257 for path, orig_stat in self._dependencies.items():
2258 if not S_ISREG(orig_stat[ST_MODE]):
2259 raise errors.StaleWriterStateError(u"{} not file".format(path))
2261 now_stat = tuple(os.stat(path))
2262 except OSError as error:
2263 raise errors.StaleWriterStateError(
2264 u"failed to stat {}: {}".format(path, error))
2265 if ((not S_ISREG(now_stat[ST_MODE])) or
2266 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2267 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2268 raise errors.StaleWriterStateError(u"{} changed".format(path))
2270 def dump_state(self, copy_func=lambda x: x):
2271 state = {attr: copy_func(getattr(self, attr))
2272 for attr in self.STATE_PROPS}
2273 if self._queued_file is None:
2274 state['_current_file'] = None
2276 state['_current_file'] = (os.path.realpath(self._queued_file.name),
2277 self._queued_file.tell())
2280 def _queue_file(self, source, filename=None):
2282 src_path = os.path.realpath(source)
2284 raise errors.AssertionError(u"{} not a file path".format(source))
2286 path_stat = os.stat(src_path)
2287 except OSError as stat_error:
2289 super(ResumableCollectionWriter, self)._queue_file(source, filename)
2290 fd_stat = os.fstat(self._queued_file.fileno())
2291 if not S_ISREG(fd_stat.st_mode):
2292 # We won't be able to resume from this cache anyway, so don't
2293 # worry about further checks.
2294 self._dependencies[source] = tuple(fd_stat)
2295 elif path_stat is None:
2296 raise errors.AssertionError(
2297 u"could not stat {}: {}".format(source, stat_error))
2298 elif path_stat.st_ino != fd_stat.st_ino:
2299 raise errors.AssertionError(
2300 u"{} changed between open and stat calls".format(source))
2302 self._dependencies[src_path] = tuple(fd_stat)
2304 def write(self, data):
2305 if self._queued_file is None:
2306 raise errors.AssertionError(
2307 "resumable writer can't accept unsourced data")
2308 return super(ResumableCollectionWriter, self).write(data)