1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
15 from __future__ import absolute_import
16 from future.utils import listitems, listvalues, viewkeys
17 from builtins import str
18 from past.builtins import basestring
19 from builtins import object
33 from collections import deque
36 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
37 from .keep import KeepLocator, KeepClient
38 from .stream import StreamReader
39 from ._normalize_stream import normalize_stream, escape
40 from ._ranges import Range, LocatorAndRange
41 from .safeapi import ThreadSafeApiCache
42 import arvados.config as config
43 import arvados.errors as errors
45 import arvados.events as events
46 from arvados.retry import retry_method
61 if sys.version_info < (3, 8):
62 from typing_extensions import Literal
64 from typing import Literal
66 _logger = logging.getLogger('arvados.collection')
69 """Argument value for `Collection` methods to represent an added item"""
71 """Argument value for `Collection` methods to represent a removed item"""
73 """Argument value for `Collection` methods to represent a modified item"""
75 """Argument value for `Collection` methods to represent an item with token differences"""
77 """`create_type` value for `Collection.find_or_create`"""
78 COLLECTION = "collection"
79 """`create_type` value for `Collection.find_or_create`"""
81 ChangeList = List[Union[
82 Tuple[Literal[ADD, DEL], str, 'Collection'],
83 Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
85 ChangeType = Literal[ADD, DEL, MOD, TOK]
86 CollectionItem = Union[ArvadosFile, 'Collection']
87 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
88 CreateType = Literal[COLLECTION, FILE]
89 Properties = Dict[str, Any]
90 StorageClasses = List[str]
92 class CollectionBase(object):
93 """Abstract base class for Collection classes
95 .. ATTENTION:: Internal
96 This class is meant to be used by other parts of the SDK. User code
97 should instantiate or subclass `Collection` or one of its subclasses
102 """Enter a context block with this collection instance"""
105 def __exit__(self, exc_type, exc_value, traceback):
106 """Exit a context block with this collection instance"""
110 if self._keep_client is None:
111 self._keep_client = KeepClient(api_client=self._api_client,
112 num_retries=self.num_retries)
113 return self._keep_client
115 def stripped_manifest(self) -> str:
116 """Create a copy of the collection manifest with only size hints
118 This method returns a string with the current collection's manifest
119 text with all non-portable locator hints like permission hints and
120 remote cluster hints removed. The only hints in the returned manifest
123 raw = self.manifest_text()
125 for line in raw.split("\n"):
126 fields = line.split()
128 clean_fields = fields[:1] + [
129 (re.sub(r'\+[^\d][^\+]*', '', x)
130 if re.match(arvados.util.keep_locator_pattern, x)
133 clean += [' '.join(clean_fields), "\n"]
134 return ''.join(clean)
137 class _WriterFile(_FileLikeObjectBase):
138 def __init__(self, coll_writer, name):
139 super(_WriterFile, self).__init__(name, 'wb')
140 self.dest = coll_writer
143 super(_WriterFile, self).close()
144 self.dest.finish_current_file()
146 @_FileLikeObjectBase._before_close
147 def write(self, data):
148 self.dest.write(data)
150 @_FileLikeObjectBase._before_close
151 def writelines(self, seq):
155 @_FileLikeObjectBase._before_close
157 self.dest.flush_data()
160 class RichCollectionBase(CollectionBase):
161 """Base class for Collection classes
163 .. ATTENTION:: Internal
164 This class is meant to be used by other parts of the SDK. User code
165 should instantiate or subclass `Collection` or one of its subclasses
169 def __init__(self, parent=None):
171 self._committed = False
172 self._has_remote_blocks = False
173 self._callback = None
177 raise NotImplementedError()
180 raise NotImplementedError()
182 def _my_block_manager(self):
183 raise NotImplementedError()
185 def writable(self) -> bool:
186 """Indicate whether this collection object can be modified
188 This method returns `False` if this object is a `CollectionReader`,
191 raise NotImplementedError()
193 def root_collection(self) -> 'Collection':
194 """Get this collection's root collection object
196 If you open a subcollection with `Collection.find`, calling this method
197 on that subcollection returns the source Collection object.
199 raise NotImplementedError()
201 def stream_name(self) -> str:
202 """Get the name of the manifest stream represented by this collection
204 If you open a subcollection with `Collection.find`, calling this method
205 on that subcollection returns the name of the stream you opened.
207 raise NotImplementedError()
210 def has_remote_blocks(self) -> bool:
211 """Indiciate whether the collection refers to remote data
213 Returns `True` if the collection manifest includes any Keep locators
214 with a remote hint (`+R`), else `False`.
216 if self._has_remote_blocks:
219 if self[item].has_remote_blocks():
224 def set_has_remote_blocks(self, val: bool) -> None:
225 """Cache whether this collection refers to remote blocks
227 .. ATTENTION:: Internal
228 This method is only meant to be used by other Collection methods.
230 Set this collection's cached "has remote blocks" flag to the given
233 self._has_remote_blocks = val
235 self.parent.set_has_remote_blocks(val)
242 create_type: CreateType,
244 """Get the item at the given path, creating it if necessary
246 If `path` refers to a stream in this collection, returns a
247 corresponding `Subcollection` object. If `path` refers to a file in
248 this collection, returns a corresponding
249 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
250 this collection, then this method creates a new object and returns
251 it, creating parent streams as needed. The type of object created is
252 determined by the value of `create_type`.
256 * path: str --- The path to find or create within this collection.
258 * create_type: Literal[COLLECTION, FILE] --- The type of object to
259 create at `path` if one does not exist. Passing `COLLECTION`
260 creates a stream and returns the corresponding
261 `Subcollection`. Passing `FILE` creates a new file and returns the
262 corresponding `arvados.arvfile.ArvadosFile`.
264 pathcomponents = path.split("/", 1)
265 if pathcomponents[0]:
266 item = self._items.get(pathcomponents[0])
267 if len(pathcomponents) == 1:
270 if create_type == COLLECTION:
271 item = Subcollection(self, pathcomponents[0])
273 item = ArvadosFile(self, pathcomponents[0])
274 self._items[pathcomponents[0]] = item
275 self.set_committed(False)
276 self.notify(ADD, self, pathcomponents[0], item)
280 # create new collection
281 item = Subcollection(self, pathcomponents[0])
282 self._items[pathcomponents[0]] = item
283 self.set_committed(False)
284 self.notify(ADD, self, pathcomponents[0], item)
285 if isinstance(item, RichCollectionBase):
286 return item.find_or_create(pathcomponents[1], create_type)
288 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
293 def find(self, path: str) -> CollectionItem:
294 """Get the item at the given path
296 If `path` refers to a stream in this collection, returns a
297 corresponding `Subcollection` object. If `path` refers to a file in
298 this collection, returns a corresponding
299 `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
300 this collection, then this method raises `NotADirectoryError`.
304 * path: str --- The path to find or create within this collection.
307 raise errors.ArgumentError("Parameter 'path' is empty.")
309 pathcomponents = path.split("/", 1)
310 if pathcomponents[0] == '':
311 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
313 item = self._items.get(pathcomponents[0])
316 elif len(pathcomponents) == 1:
319 if isinstance(item, RichCollectionBase):
320 if pathcomponents[1]:
321 return item.find(pathcomponents[1])
325 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
328 def mkdirs(self, path: str) -> 'Subcollection':
329 """Create and return a subcollection at `path`
331 If `path` exists within this collection, raises `FileExistsError`.
332 Otherwise, creates a stream at that path and returns the
333 corresponding `Subcollection`.
335 if self.find(path) != None:
336 raise IOError(errno.EEXIST, "Directory or file exists", path)
338 return self.find_or_create(path, COLLECTION)
344 encoding: Optional[str]=None,
346 """Open a file-like object within the collection
348 This method returns a file-like object that can read and/or write the
349 file located at `path` within the collection. If you attempt to write
350 a `path` that does not exist, the file is created with `find_or_create`.
351 If the file cannot be opened for any other reason, this method raises
352 `OSError` with an appropriate errno.
356 * path: str --- The path of the file to open within this collection
358 * mode: str --- The mode to open this file. Supports all the same
359 values as `builtins.open`.
361 * encoding: str | None --- The text encoding of the file. Only used
362 when the file is opened in text mode. The default is
365 if not re.search(r'^[rwa][bt]?\+?$', mode):
366 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
368 if mode[0] == 'r' and '+' not in mode:
369 fclass = ArvadosFileReader
370 arvfile = self.find(path)
371 elif not self.writable():
372 raise IOError(errno.EROFS, "Collection is read only")
374 fclass = ArvadosFileWriter
375 arvfile = self.find_or_create(path, FILE)
378 raise IOError(errno.ENOENT, "File not found", path)
379 if not isinstance(arvfile, ArvadosFile):
380 raise IOError(errno.EISDIR, "Is a directory", path)
385 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
386 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
388 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
389 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
392 def modified(self) -> bool:
393 """Indicate whether this collection has an API server record
395 Returns `False` if this collection corresponds to a record loaded from
396 the API server, `True` otherwise.
398 return not self.committed()
402 """Indicate whether this collection has an API server record
404 Returns `True` if this collection corresponds to a record loaded from
405 the API server, `False` otherwise.
407 return self._committed
410 def set_committed(self, value: bool=True):
411 """Cache whether this collection has an API server record
413 .. ATTENTION:: Internal
414 This method is only meant to be used by other Collection methods.
416 Set this collection's cached "committed" flag to the given
417 value and propagates it as needed.
419 if value == self._committed:
422 for k,v in listitems(self._items):
423 v.set_committed(True)
424 self._committed = True
426 self._committed = False
427 if self.parent is not None:
428 self.parent.set_committed(False)
431 def __iter__(self) -> Iterator[str]:
432 """Iterate names of streams and files in this collection
434 This method does not recurse. It only iterates the contents of this
435 collection's corresponding stream.
437 return iter(viewkeys(self._items))
440 def __getitem__(self, k: str) -> CollectionItem:
441 """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
443 This method does not recurse. If you want to search a path, use
444 `RichCollectionBase.find` instead.
446 return self._items[k]
449 def __contains__(self, k: str) -> bool:
450 """Indicate whether this collection has an item with this name
452 This method does not recurse. It you want to check a path, use
453 `RichCollectionBase.exists` instead.
455 return k in self._items
459 """Get the number of items directly contained in this collection
461 This method does not recurse. It only counts the streams and files
462 in this collection's corresponding stream.
464 return len(self._items)
468 def __delitem__(self, p: str) -> None:
469 """Delete an item from this collection's stream
471 This method does not recurse. If you want to remove an item by a
472 path, use `RichCollectionBase.remove` instead.
475 self.set_committed(False)
476 self.notify(DEL, self, p, None)
479 def keys(self) -> Iterator[str]:
480 """Iterate names of streams and files in this collection
482 This method does not recurse. It only iterates the contents of this
483 collection's corresponding stream.
485 return self._items.keys()
488 def values(self) -> List[CollectionItem]:
489 """Get a list of objects in this collection's stream
491 The return value includes a `Subcollection` for every stream, and an
492 `arvados.arvfile.ArvadosFile` for every file, directly within this
493 collection's stream. This method does not recurse.
495 return listvalues(self._items)
498 def items(self) -> List[Tuple[str, CollectionItem]]:
499 """Get a list of `(name, object)` tuples from this collection's stream
501 The return value includes a `Subcollection` for every stream, and an
502 `arvados.arvfile.ArvadosFile` for every file, directly within this
503 collection's stream. This method does not recurse.
505 return listitems(self._items)
507 def exists(self, path: str) -> bool:
508 """Indicate whether this collection includes an item at `path`
510 This method returns `True` if `path` refers to a stream or file within
511 this collection, else `False`.
515 * path: str --- The path to check for existence within this collection
517 return self.find(path) is not None
521 def remove(self, path: str, recursive: bool=False) -> None:
522 """Remove the file or stream at `path`
526 * path: str --- The path of the item to remove from the collection
528 * recursive: bool --- Controls the method's behavior if `path` refers
529 to a nonempty stream. If `False` (the default), this method raises
530 `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
531 items under the stream.
534 raise errors.ArgumentError("Parameter 'path' is empty.")
536 pathcomponents = path.split("/", 1)
537 item = self._items.get(pathcomponents[0])
539 raise IOError(errno.ENOENT, "File not found", path)
540 if len(pathcomponents) == 1:
541 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
542 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
543 deleteditem = self._items[pathcomponents[0]]
544 del self._items[pathcomponents[0]]
545 self.set_committed(False)
546 self.notify(DEL, self, pathcomponents[0], deleteditem)
548 item.remove(pathcomponents[1], recursive=recursive)
550 def _clonefrom(self, source):
551 for k,v in listitems(source):
552 self._items[k] = v.clone(self, k)
555 raise NotImplementedError()
561 source_obj: CollectionItem,
563 overwrite: bool=False,
564 reparent: bool=False,
566 """Copy or move a file or subcollection object to this collection
570 * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
571 to add to this collection
573 * target_name: str --- The path inside this collection where
574 `source_obj` should be added.
576 * overwrite: bool --- Controls the behavior of this method when the
577 collection already contains an object at `target_name`. If `False`
578 (the default), this method will raise `FileExistsError`. If `True`,
579 the object at `target_name` will be replaced with `source_obj`.
581 * reparent: bool --- Controls whether this method copies or moves
582 `source_obj`. If `False` (the default), `source_obj` is copied into
583 this collection. If `True`, `source_obj` is moved into this
586 if target_name in self and not overwrite:
587 raise IOError(errno.EEXIST, "File already exists", target_name)
590 if target_name in self:
591 modified_from = self[target_name]
593 # Actually make the move or copy.
595 source_obj._reparent(self, target_name)
598 item = source_obj.clone(self, target_name)
600 self._items[target_name] = item
601 self.set_committed(False)
602 if not self._has_remote_blocks and source_obj.has_remote_blocks():
603 self.set_has_remote_blocks(True)
606 self.notify(MOD, self, target_name, (modified_from, item))
608 self.notify(ADD, self, target_name, item)
610 def _get_src_target(self, source, target_path, source_collection, create_dest):
611 if source_collection is None:
612 source_collection = self
615 if isinstance(source, basestring):
616 source_obj = source_collection.find(source)
617 if source_obj is None:
618 raise IOError(errno.ENOENT, "File not found", source)
619 sourcecomponents = source.split("/")
622 sourcecomponents = None
624 # Find parent collection the target path
625 targetcomponents = target_path.split("/")
627 # Determine the name to use.
628 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
631 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
634 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
636 if len(targetcomponents) > 1:
637 target_dir = self.find("/".join(targetcomponents[0:-1]))
641 if target_dir is None:
642 raise IOError(errno.ENOENT, "Target directory not found", target_name)
644 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
645 target_dir = target_dir[target_name]
646 target_name = sourcecomponents[-1]
648 return (source_obj, target_dir, target_name)
654 source: Union[str, CollectionItem],
656 source_collection: Optional['RichCollectionBase']=None,
657 overwrite: bool=False,
659 """Copy a file or subcollection object to this collection
663 * source: str | arvados.arvfile.ArvadosFile |
664 arvados.collection.Subcollection --- The file or subcollection to
665 add to this collection. If `source` is a str, the object will be
666 found by looking up this path from `source_collection` (see
669 * target_path: str --- The path inside this collection where the
670 source object should be added.
672 * source_collection: arvados.collection.Collection | None --- The
673 collection to find the source object from when `source` is a
674 path. Defaults to the current collection (`self`).
676 * overwrite: bool --- Controls the behavior of this method when the
677 collection already contains an object at `target_path`. If `False`
678 (the default), this method will raise `FileExistsError`. If `True`,
679 the object at `target_path` will be replaced with `source_obj`.
681 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
682 target_dir.add(source_obj, target_name, overwrite, False)
688 source: Union[str, CollectionItem],
690 source_collection: Optional['RichCollectionBase']=None,
691 overwrite: bool=False,
693 """Move a file or subcollection object to this collection
697 * source: str | arvados.arvfile.ArvadosFile |
698 arvados.collection.Subcollection --- The file or subcollection to
699 add to this collection. If `source` is a str, the object will be
700 found by looking up this path from `source_collection` (see
703 * target_path: str --- The path inside this collection where the
704 source object should be added.
706 * source_collection: arvados.collection.Collection | None --- The
707 collection to find the source object from when `source` is a
708 path. Defaults to the current collection (`self`).
710 * overwrite: bool --- Controls the behavior of this method when the
711 collection already contains an object at `target_path`. If `False`
712 (the default), this method will raise `FileExistsError`. If `True`,
713 the object at `target_path` will be replaced with `source_obj`.
715 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
716 if not source_obj.writable():
717 raise IOError(errno.EROFS, "Source collection is read only", source)
718 target_dir.add(source_obj, target_name, overwrite, True)
720 def portable_manifest_text(self, stream_name: str=".") -> str:
721 """Get the portable manifest text for this collection
723 The portable manifest text is normalized, and does not include access
724 tokens. This method does not flush outstanding blocks to Keep.
728 * stream_name: str --- The name to use for this collection's stream in
729 the generated manifest. Default `'.'`.
731 return self._get_manifest_text(stream_name, True, True)
736 stream_name: str=".",
738 normalize: bool=False,
739 only_committed: bool=False,
741 """Get the manifest text for this collection
745 * stream_name: str --- The name to use for this collection's stream in
746 the generated manifest. Default `'.'`.
748 * strip: bool --- Controls whether or not the returned manifest text
749 includes access tokens. If `False` (the default), the manifest text
750 will include access tokens. If `True`, the manifest text will not
751 include access tokens.
753 * normalize: bool --- Controls whether or not the returned manifest
754 text is normalized. Default `False`.
756 * only_committed: bool --- Controls whether or not this method uploads
757 pending data to Keep before building and returning the manifest text.
758 If `False` (the default), this method will finish uploading all data
759 to Keep, then return the final manifest. If `True`, this method will
760 build and return a manifest that only refers to the data that has
761 finished uploading at the time this method was called.
763 if not only_committed:
764 self._my_block_manager().commit_all()
765 return self._get_manifest_text(stream_name, strip, normalize,
766 only_committed=only_committed)
769 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
770 """Get the manifest text for this collection, sub collections and files.
773 Name to use for this stream (directory)
776 If True, remove signing tokens from block locators if present.
777 If False (default), block locators are left unchanged.
780 If True, always export the manifest text in normalized form
781 even if the Collection is not modified. If False (default) and the collection
782 is not modified, return the original manifest text even if it is not
786 If True, only include blocks that were already committed to Keep.
790 if not self.committed() or self._manifest_text is None or normalize:
793 sorted_keys = sorted(self.keys())
794 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
795 # Create a stream per file `k`
796 arvfile = self[filename]
798 for segment in arvfile.segments():
799 loc = segment.locator
800 if arvfile.parent._my_block_manager().is_bufferblock(loc):
803 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
805 loc = KeepLocator(loc).stripped()
806 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
807 segment.segment_offset, segment.range_size))
808 stream[filename] = filestream
810 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
811 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
812 buf.append(self[dirname].manifest_text(
813 stream_name=os.path.join(stream_name, dirname),
814 strip=strip, normalize=True, only_committed=only_committed))
818 return self.stripped_manifest()
820 return self._manifest_text
823 def _copy_remote_blocks(self, remote_blocks={}):
824 """Scan through the entire collection and ask Keep to copy remote blocks.
826 When accessing a remote collection, blocks will have a remote signature
827 (+R instead of +A). Collect these signatures and request Keep to copy the
828 blocks to the local cluster, returning local (+A) signatures.
831 Shared cache of remote to local block mappings. This is used to avoid
832 doing extra work when blocks are shared by more than one file in
833 different subdirectories.
837 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
843 end_collection: 'RichCollectionBase',
845 holding_collection: Optional['Collection']=None,
847 """Build a list of differences between this collection and another
851 * end_collection: arvados.collection.RichCollectionBase --- A
852 collection object with the desired end state. The returned diff
853 list will describe how to go from the current collection object
854 `self` to `end_collection`.
856 * prefix: str --- The name to use for this collection's stream in
857 the diff list. Default `'.'`.
859 * holding_collection: arvados.collection.Collection | None --- A
860 collection object used to hold objects for the returned diff
861 list. By default, a new empty collection is created.
864 if holding_collection is None:
865 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
867 if k not in end_collection:
868 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
869 for k in end_collection:
871 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
872 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
873 elif end_collection[k] != self[k]:
874 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
876 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
878 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
883 def apply(self, changes: ChangeList) -> None:
884 """Apply a list of changes from to this collection
886 This method takes a list of changes generated by
887 `RichCollectionBase.diff` and applies it to this
888 collection. Afterward, the state of this collection object will
889 match the state of `end_collection` passed to `diff`. If a change
890 conflicts with a local change, it will be saved to an alternate path
891 indicating the conflict.
895 * changes: arvados.collection.ChangeList --- The list of differences
896 generated by `RichCollectionBase.diff`.
899 self.set_committed(False)
900 for change in changes:
901 event_type = change[0]
904 local = self.find(path)
905 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
907 if event_type == ADD:
909 # No local file at path, safe to copy over new file
910 self.copy(initial, path)
911 elif local is not None and local != initial:
912 # There is already local file and it is different:
913 # save change to conflict file.
914 self.copy(initial, conflictpath)
915 elif event_type == MOD or event_type == TOK:
918 # Local matches the "initial" item so it has not
919 # changed locally and is safe to update.
920 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
921 # Replace contents of local file with new contents
922 local.replace_contents(final)
924 # Overwrite path with new item; this can happen if
925 # path was a file and is now a collection or vice versa
926 self.copy(final, path, overwrite=True)
928 # Local is missing (presumably deleted) or local doesn't
929 # match the "start" value, so save change to conflict file
930 self.copy(final, conflictpath)
931 elif event_type == DEL:
933 # Local item matches "initial" value, so it is safe to remove.
934 self.remove(path, recursive=True)
935 # else, the file is modified or already removed, in either
936 # case we don't want to try to remove it.
938 def portable_data_hash(self) -> str:
939 """Get the portable data hash for this collection's manifest"""
940 if self._manifest_locator and self.committed():
941 # If the collection is already saved on the API server, and it's committed
942 # then return API server's PDH response.
943 return self._portable_data_hash
945 stripped = self.portable_manifest_text().encode()
946 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
949 def subscribe(self, callback: ChangeCallback) -> None:
950 """Set a notify callback for changes to this collection
954 * callback: arvados.collection.ChangeCallback --- The callable to
955 call each time the collection is changed.
957 if self._callback is None:
958 self._callback = callback
960 raise errors.ArgumentError("A callback is already set on this collection.")
963 def unsubscribe(self) -> None:
964 """Remove any notify callback set for changes to this collection"""
965 if self._callback is not None:
966 self._callback = None
972 collection: 'RichCollectionBase',
974 item: CollectionItem,
976 """Notify any subscribed callback about a change to this collection
978 .. ATTENTION:: Internal
979 This method is only meant to be used by other Collection methods.
981 If a callback has been registered with `RichCollectionBase.subscribe`,
982 it will be called with information about a change to this collection.
983 Then this notification will be propagated to this collection's root.
987 * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
990 * collection: arvados.collection.RichCollectionBase --- The
991 collection that was modified.
993 * name: str --- The name of the file or stream within `collection` that
996 * item: arvados.arvfile.ArvadosFile |
997 arvados.collection.Subcollection --- The new contents at `name`
1001 self._callback(event, collection, name, item)
1002 self.root_collection().notify(event, collection, name, item)
1005 def __eq__(self, other: Any) -> bool:
1006 """Indicate whether this collection object is equal to another"""
1009 if not isinstance(other, RichCollectionBase):
1011 if len(self._items) != len(other):
1013 for k in self._items:
1016 if self._items[k] != other[k]:
1020 def __ne__(self, other: Any) -> bool:
1021 """Indicate whether this collection object is not equal to another"""
1022 return not self.__eq__(other)
1025 def flush(self) -> None:
1026 """Upload any pending data to Keep"""
1027 for e in listvalues(self):
1031 class Collection(RichCollectionBase):
1032 """Read and manipulate an Arvados collection
1034 This class provides a high-level interface to create, read, and update
1035 Arvados collections and their contents. Refer to the Arvados Python SDK
1036 cookbook for [an introduction to using the Collection class][cookbook].
1038 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1041 def __init__(self, manifest_locator_or_text: Optional[str]=None,
1042 api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1043 keep_client: Optional['arvados.keep.KeepClient']=None,
1044 num_retries: int=10,
1045 parent: Optional['Collection']=None,
1046 apiconfig: Optional[Mapping[str, str]]=None,
1047 block_manager: Optional['arvados.arvfile._BlockManager']=None,
1048 replication_desired: Optional[int]=None,
1049 storage_classes_desired: Optional[List[str]]=None,
1050 put_threads: Optional[int]=None):
1051 """Initialize a Collection object
1055 * manifest_locator_or_text: str | None --- This string can contain a
1056 collection manifest text, portable data hash, or UUID. When given a
1057 portable data hash or UUID, this instance will load a collection
1058 record from the API server. Otherwise, this instance will represent a
1059 new collection without an API server record. The default value `None`
1060 instantiates a new collection with an empty manifest.
1062 * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1063 Arvados API client object this instance uses to make requests. If
1064 none is given, this instance creates its own client using the
1065 settings from `apiconfig` (see below). If your client instantiates
1066 many Collection objects, you can help limit memory utilization by
1067 calling `arvados.api.api` to construct an
1068 `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1069 for every Collection.
1071 * keep_client: arvados.keep.KeepClient | None --- The Keep client
1072 object this instance uses to make requests. If none is given, this
1073 instance creates its own client using its `api_client`.
1075 * num_retries: int --- The number of times that client requests are
1076 retried. Default 10.
1078 * parent: arvados.collection.Collection | None --- The parent Collection
1079 object of this instance, if any. This argument is primarily used by
1080 other Collection methods; user client code shouldn't need to use it.
1082 * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1083 `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1084 `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1085 Collection object constructs one from these settings. If no
1086 mapping is provided, calls `arvados.config.settings` to get these
1087 parameters from user configuration.
1089 * block_manager: arvados.arvfile._BlockManager | None --- The
1090 _BlockManager object used by this instance to coordinate reading
1091 and writing Keep data blocks. If none is given, this instance
1092 constructs its own. This argument is primarily used by other
1093 Collection methods; user client code shouldn't need to use it.
1095 * replication_desired: int | None --- This controls both the value of
1096 the `replication_desired` field on API collection records saved by
1097 this class, as well as the number of Keep services that the object
1098 writes new data blocks to. If none is given, uses the default value
1099 configured for the cluster.
1101 * storage_classes_desired: list[str] | None --- This controls both
1102 the value of the `storage_classes_desired` field on API collection
1103 records saved by this class, as well as selecting which specific
1104 Keep services the object writes new data blocks to. If none is
1105 given, defaults to an empty list.
1107 * put_threads: int | None --- The number of threads to run
1108 simultaneously to upload data blocks to Keep. This value is used when
1109 building a new `block_manager`. It is unused when a `block_manager`
1113 if storage_classes_desired and type(storage_classes_desired) is not list:
1114 raise errors.ArgumentError("storage_classes_desired must be list type.")
1116 super(Collection, self).__init__(parent)
1117 self._api_client = api_client
1118 self._keep_client = keep_client
1120 # Use the keep client from ThreadSafeApiCache
1121 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1122 self._keep_client = self._api_client.keep
1124 self._block_manager = block_manager
1125 self.replication_desired = replication_desired
1126 self._storage_classes_desired = storage_classes_desired
1127 self.put_threads = put_threads
1130 self._config = apiconfig
1132 self._config = config.settings()
1134 self.num_retries = num_retries
1135 self._manifest_locator = None
1136 self._manifest_text = None
1137 self._portable_data_hash = None
1138 self._api_response = None
1139 self._past_versions = set()
1141 self.lock = threading.RLock()
1144 if manifest_locator_or_text:
1145 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1146 self._manifest_locator = manifest_locator_or_text
1147 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1148 self._manifest_locator = manifest_locator_or_text
1149 if not self._has_local_collection_uuid():
1150 self._has_remote_blocks = True
1151 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1152 self._manifest_text = manifest_locator_or_text
1153 if '+R' in self._manifest_text:
1154 self._has_remote_blocks = True
1156 raise errors.ArgumentError(
1157 "Argument to CollectionReader is not a manifest or a collection UUID")
1161 except errors.SyntaxError as e:
1162 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1164 def storage_classes_desired(self) -> List[str]:
1165 """Get this collection's `storage_classes_desired` value"""
1166 return self._storage_classes_desired or []
1168 def root_collection(self) -> 'Collection':
1171 def get_properties(self) -> Properties:
1172 """Get this collection's properties
1174 This method always returns a dict. If this collection object does not
1175 have an associated API record, or that record does not have any
1176 properties set, this method returns an empty dict.
1178 if self._api_response and self._api_response["properties"]:
1179 return self._api_response["properties"]
1183 def get_trash_at(self) -> Optional[datetime.datetime]:
1184 """Get this collection's `trash_at` field
1186 This method parses the `trash_at` field of the collection's API
1187 record and returns a datetime from it. If that field is not set, or
1188 this collection object does not have an associated API record,
1191 if self._api_response and self._api_response["trash_at"]:
1193 return ciso8601.parse_datetime(self._api_response["trash_at"])
1199 def stream_name(self) -> str:
1202 def writable(self) -> bool:
1206 def known_past_version(
1208 modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1210 """Indicate whether an API record for this collection has been seen before
1212 As this collection object loads records from the API server, it records
1213 their `modified_at` and `portable_data_hash` fields. This method accepts
1214 a 2-tuple with values for those fields, and returns `True` if the
1215 combination was previously loaded.
1217 return modified_at_and_portable_data_hash in self._past_versions
1223 other: Optional['Collection']=None,
1224 num_retries: Optional[int]=None,
1226 """Merge another collection's contents into this one
1228 This method compares the manifest of this collection instance with
1229 another, then updates this instance's manifest with changes from the
1230 other, renaming files to flag conflicts where necessary.
1232 When called without any arguments, this method reloads the collection's
1233 API record, and updates this instance with any changes that have
1234 appeared server-side. If this instance does not have a corresponding
1235 API record, this method raises `arvados.errors.ArgumentError`.
1239 * other: arvados.collection.Collection | None --- The collection
1240 whose contents should be merged into this instance. When not
1241 provided, this method reloads this collection's API record and
1242 constructs a Collection object from it. If this instance does not
1243 have a corresponding API record, this method raises
1244 `arvados.errors.ArgumentError`.
1246 * num_retries: int | None --- The number of times to retry reloading
1247 the collection's API record from the API server. If not specified,
1248 uses the `num_retries` provided when this instance was constructed.
1251 if self._manifest_locator is None:
1252 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1253 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1254 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1255 response.get("portable_data_hash") != self.portable_data_hash()):
1256 # The record on the server is different from our current one, but we've seen it before,
1257 # so ignore it because it's already been merged.
1258 # However, if it's the same as our current record, proceed with the update, because we want to update
1262 self._remember_api_response(response)
1263 other = CollectionReader(response["manifest_text"])
1264 baseline = CollectionReader(self._manifest_text)
1265 self.apply(baseline.diff(other))
1266 self._manifest_text = self.manifest_text()
1270 if self._api_client is None:
1271 self._api_client = ThreadSafeApiCache(self._config, version='v1')
1272 if self._keep_client is None:
1273 self._keep_client = self._api_client.keep
1274 return self._api_client
1278 if self._keep_client is None:
1279 if self._api_client is None:
1282 self._keep_client = KeepClient(api_client=self._api_client)
1283 return self._keep_client
1286 def _my_block_manager(self):
1287 if self._block_manager is None:
1288 copies = (self.replication_desired or
1289 self._my_api()._rootDesc.get('defaultCollectionReplication',
1291 self._block_manager = _BlockManager(self._my_keep(),
1293 put_threads=self.put_threads,
1294 num_retries=self.num_retries,
1295 storage_classes_func=self.storage_classes_desired)
1296 return self._block_manager
1298 def _remember_api_response(self, response):
1299 self._api_response = response
1300 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1302 def _populate_from_api_server(self):
1303 # As in KeepClient itself, we must wait until the last
1304 # possible moment to instantiate an API client, in order to
1305 # avoid tripping up clients that don't have access to an API
1306 # server. If we do build one, make sure our Keep client uses
1307 # it. If instantiation fails, we'll fall back to the except
1308 # clause, just like any other Collection lookup
1309 # failure. Return an exception, or None if successful.
1310 self._remember_api_response(self._my_api().collections().get(
1311 uuid=self._manifest_locator).execute(
1312 num_retries=self.num_retries))
1313 self._manifest_text = self._api_response['manifest_text']
1314 self._portable_data_hash = self._api_response['portable_data_hash']
1315 # If not overriden via kwargs, we should try to load the
1316 # replication_desired and storage_classes_desired from the API server
1317 if self.replication_desired is None:
1318 self.replication_desired = self._api_response.get('replication_desired', None)
1319 if self._storage_classes_desired is None:
1320 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1322 def _populate(self):
1323 if self._manifest_text is None:
1324 if self._manifest_locator is None:
1327 self._populate_from_api_server()
1328 self._baseline_manifest = self._manifest_text
1329 self._import_manifest(self._manifest_text)
1331 def _has_collection_uuid(self):
1332 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1334 def _has_local_collection_uuid(self):
1335 return self._has_collection_uuid and \
1336 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1338 def __enter__(self):
1341 def __exit__(self, exc_type, exc_value, traceback):
1342 """Exit a context with this collection instance
1344 If no exception was raised inside the context block, and this
1345 collection is writable and has a corresponding API record, that
1346 record will be updated to match the state of this instance at the end
1349 if exc_type is None:
1350 if self.writable() and self._has_collection_uuid():
1354 def stop_threads(self) -> None:
1355 """Stop background Keep upload/download threads"""
1356 if self._block_manager is not None:
1357 self._block_manager.stop_threads()
1360 def manifest_locator(self) -> Optional[str]:
1361 """Get this collection's manifest locator, if any
1363 * If this collection instance is associated with an API record with a
1365 * Otherwise, if this collection instance was loaded from an API record
1366 by portable data hash, return that.
1367 * Otherwise, return `None`.
1369 return self._manifest_locator
1374 new_parent: Optional['Collection']=None,
1375 new_name: Optional[str]=None,
1376 readonly: bool=False,
1377 new_config: Optional[Mapping[str, str]]=None,
1379 """Create a Collection object with the same contents as this instance
1381 This method creates a new Collection object with contents that match
1382 this instance's. The new collection will not be associated with any API
1387 * new_parent: arvados.collection.Collection | None --- This value is
1388 passed to the new Collection's constructor as the `parent`
1391 * new_name: str | None --- This value is unused.
1393 * readonly: bool --- If this value is true, this method constructs and
1394 returns a `CollectionReader`. Otherwise, it returns a mutable
1395 `Collection`. Default `False`.
1397 * new_config: Mapping[str, str] | None --- This value is passed to the
1398 new Collection's constructor as `apiconfig`. If no value is provided,
1399 defaults to the configuration passed to this instance's constructor.
1401 if new_config is None:
1402 new_config = self._config
1404 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1406 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1408 newcollection._clonefrom(self)
1409 return newcollection
1412 def api_response(self) -> Optional[Dict[str, Any]]:
1413 """Get this instance's associated API record
1415 If this Collection instance has an associated API record, return it.
1416 Otherwise, return `None`.
1418 return self._api_response
1423 create_type: CreateType,
1424 ) -> CollectionItem:
1428 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1430 def find(self, path: str) -> CollectionItem:
1434 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1436 def remove(self, path: str, recursive: bool=False) -> None:
1438 raise errors.ArgumentError("Cannot remove '.'")
1440 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1447 properties: Optional[Properties]=None,
1448 storage_classes: Optional[StorageClasses]=None,
1449 trash_at: Optional[datetime.datetime]=None,
1451 num_retries: Optional[int]=None,
1452 preserve_version: bool=False,
1454 """Save collection to an existing API record
1456 This method updates the instance's corresponding API record to match
1457 the instance's state. If this instance does not have a corresponding API
1458 record yet, raises `AssertionError`. (To create a new API record, use
1459 `Collection.save_new`.) This method returns the saved collection
1464 * properties: dict[str, Any] | None --- If provided, the API record will
1465 be updated with these properties. Note this will completely replace
1466 any existing properties.
1468 * storage_classes: list[str] | None --- If provided, the API record will
1469 be updated with this value in the `storage_classes_desired` field.
1470 This value will also be saved on the instance and used for any
1471 changes that follow.
1473 * trash_at: datetime.datetime | None --- If provided, the API record
1474 will be updated with this value in the `trash_at` field.
1476 * merge: bool --- If `True` (the default), this method will first
1477 reload this collection's API record, and merge any new contents into
1478 this instance before saving changes. See `Collection.update` for
1481 * num_retries: int | None --- The number of times to retry reloading
1482 the collection's API record from the API server. If not specified,
1483 uses the `num_retries` provided when this instance was constructed.
1485 * preserve_version: bool --- This value will be passed to directly
1486 to the underlying API call. If `True`, the Arvados API will
1487 preserve the versions of this collection both immediately before
1488 and after the update. If `True` when the API server is not
1489 configured with collection versioning, this method raises
1490 `arvados.errors.ArgumentError`.
1492 if properties and type(properties) is not dict:
1493 raise errors.ArgumentError("properties must be dictionary type.")
1495 if storage_classes and type(storage_classes) is not list:
1496 raise errors.ArgumentError("storage_classes must be list type.")
1498 self._storage_classes_desired = storage_classes
1500 if trash_at and type(trash_at) is not datetime.datetime:
1501 raise errors.ArgumentError("trash_at must be datetime type.")
1503 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1504 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1508 body["properties"] = properties
1509 if self.storage_classes_desired():
1510 body["storage_classes_desired"] = self.storage_classes_desired()
1512 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1513 body["trash_at"] = t
1514 if preserve_version:
1515 body["preserve_version"] = preserve_version
1517 if not self.committed():
1518 if self._has_remote_blocks:
1519 # Copy any remote blocks to the local cluster.
1520 self._copy_remote_blocks(remote_blocks={})
1521 self._has_remote_blocks = False
1522 if not self._has_collection_uuid():
1523 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1524 elif not self._has_local_collection_uuid():
1525 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1527 self._my_block_manager().commit_all()
1532 text = self.manifest_text(strip=False)
1533 body['manifest_text'] = text
1535 self._remember_api_response(self._my_api().collections().update(
1536 uuid=self._manifest_locator,
1538 ).execute(num_retries=num_retries))
1539 self._manifest_text = self._api_response["manifest_text"]
1540 self._portable_data_hash = self._api_response["portable_data_hash"]
1541 self.set_committed(True)
1543 self._remember_api_response(self._my_api().collections().update(
1544 uuid=self._manifest_locator,
1546 ).execute(num_retries=num_retries))
1548 return self._manifest_text
1556 name: Optional[str]=None,
1557 create_collection_record: bool=True,
1558 owner_uuid: Optional[str]=None,
1559 properties: Optional[Properties]=None,
1560 storage_classes: Optional[StorageClasses]=None,
1561 trash_at: Optional[datetime.datetime]=None,
1562 ensure_unique_name: bool=False,
1563 num_retries: Optional[int]=None,
1564 preserve_version: bool=False,
1566 """Save collection to a new API record
1568 This method finishes uploading new data blocks and (optionally)
1569 creates a new API collection record with the provided data. If a new
1570 record is created, this instance becomes associated with that record
1571 for future updates like `save()`. This method returns the saved
1572 collection manifest.
1576 * name: str | None --- The `name` field to use on the new collection
1577 record. If not specified, a generic default name is generated.
1579 * create_collection_record: bool --- If `True` (the default), creates a
1580 collection record on the API server. If `False`, the method finishes
1581 all data uploads and only returns the resulting collection manifest
1582 without sending it to the API server.
1584 * owner_uuid: str | None --- The `owner_uuid` field to use on the
1585 new collection record.
1587 * properties: dict[str, Any] | None --- The `properties` field to use on
1588 the new collection record.
1590 * storage_classes: list[str] | None --- The
1591 `storage_classes_desired` field to use on the new collection record.
1593 * trash_at: datetime.datetime | None --- The `trash_at` field to use
1594 on the new collection record.
1596 * ensure_unique_name: bool --- This value is passed directly to the
1597 Arvados API when creating the collection record. If `True`, the API
1598 server may modify the submitted `name` to ensure the collection's
1599 `name`+`owner_uuid` combination is unique. If `False` (the default),
1600 if a collection already exists with this same `name`+`owner_uuid`
1601 combination, creating a collection record will raise a validation
1604 * num_retries: int | None --- The number of times to retry reloading
1605 the collection's API record from the API server. If not specified,
1606 uses the `num_retries` provided when this instance was constructed.
1608 * preserve_version: bool --- This value will be passed to directly
1609 to the underlying API call. If `True`, the Arvados API will
1610 preserve the versions of this collection both immediately before
1611 and after the update. If `True` when the API server is not
1612 configured with collection versioning, this method raises
1613 `arvados.errors.ArgumentError`.
1615 if properties and type(properties) is not dict:
1616 raise errors.ArgumentError("properties must be dictionary type.")
1618 if storage_classes and type(storage_classes) is not list:
1619 raise errors.ArgumentError("storage_classes must be list type.")
1621 if trash_at and type(trash_at) is not datetime.datetime:
1622 raise errors.ArgumentError("trash_at must be datetime type.")
1624 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1625 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1627 if self._has_remote_blocks:
1628 # Copy any remote blocks to the local cluster.
1629 self._copy_remote_blocks(remote_blocks={})
1630 self._has_remote_blocks = False
1633 self._storage_classes_desired = storage_classes
1635 self._my_block_manager().commit_all()
1636 text = self.manifest_text(strip=False)
1638 if create_collection_record:
1640 name = "New collection"
1641 ensure_unique_name = True
1643 body = {"manifest_text": text,
1645 "replication_desired": self.replication_desired}
1647 body["owner_uuid"] = owner_uuid
1649 body["properties"] = properties
1650 if self.storage_classes_desired():
1651 body["storage_classes_desired"] = self.storage_classes_desired()
1653 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1654 body["trash_at"] = t
1655 if preserve_version:
1656 body["preserve_version"] = preserve_version
1658 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1659 text = self._api_response["manifest_text"]
1661 self._manifest_locator = self._api_response["uuid"]
1662 self._portable_data_hash = self._api_response["portable_data_hash"]
1664 self._manifest_text = text
1665 self.set_committed(True)
1669 _token_re = re.compile(r'(\S+)(\s+|$)')
1670 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1671 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1673 def _unescape_manifest_path(self, path):
1674 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1677 def _import_manifest(self, manifest_text):
1678 """Import a manifest into a `Collection`.
1681 The manifest text to import from.
1685 raise ArgumentError("Can only import manifest into an empty collection")
1694 for token_and_separator in self._token_re.finditer(manifest_text):
1695 tok = token_and_separator.group(1)
1696 sep = token_and_separator.group(2)
1698 if state == STREAM_NAME:
1699 # starting a new stream
1700 stream_name = self._unescape_manifest_path(tok)
1705 self.find_or_create(stream_name, COLLECTION)
1709 block_locator = self._block_re.match(tok)
1711 blocksize = int(block_locator.group(1))
1712 blocks.append(Range(tok, streamoffset, blocksize, 0))
1713 streamoffset += blocksize
1717 if state == SEGMENTS:
1718 file_segment = self._segment_re.match(tok)
1720 pos = int(file_segment.group(1))
1721 size = int(file_segment.group(2))
1722 name = self._unescape_manifest_path(file_segment.group(3))
1723 if name.split('/')[-1] == '.':
1724 # placeholder for persisting an empty directory, not a real file
1726 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1728 filepath = os.path.join(stream_name, name)
1730 afile = self.find_or_create(filepath, FILE)
1731 except IOError as e:
1732 if e.errno == errno.ENOTDIR:
1733 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1736 if isinstance(afile, ArvadosFile):
1737 afile.add_segment(blocks, pos, size)
1739 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1742 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1748 self.set_committed(True)
1754 collection: 'RichCollectionBase',
1756 item: CollectionItem,
1759 self._callback(event, collection, name, item)
1762 class Subcollection(RichCollectionBase):
1763 """Read and manipulate a stream/directory within an Arvados collection
1765 This class represents a single stream (like a directory) within an Arvados
1766 `Collection`. It is returned by `Collection.find` and provides the same API.
1767 Operations that work on the API collection record propagate to the parent
1768 `Collection` object.
1771 def __init__(self, parent, name):
1772 super(Subcollection, self).__init__(parent)
1773 self.lock = self.root_collection().lock
1774 self._manifest_text = None
1776 self.num_retries = parent.num_retries
1778 def root_collection(self) -> 'Collection':
1779 return self.parent.root_collection()
1781 def writable(self) -> bool:
1782 return self.root_collection().writable()
1785 return self.root_collection()._my_api()
1788 return self.root_collection()._my_keep()
1790 def _my_block_manager(self):
1791 return self.root_collection()._my_block_manager()
1793 def stream_name(self) -> str:
1794 return os.path.join(self.parent.stream_name(), self.name)
1799 new_parent: Optional['Collection']=None,
1800 new_name: Optional[str]=None,
1801 ) -> 'Subcollection':
1802 c = Subcollection(new_parent, new_name)
1808 def _reparent(self, newparent, newname):
1809 self.set_committed(False)
1811 self.parent.remove(self.name, recursive=True)
1812 self.parent = newparent
1814 self.lock = self.parent.root_collection().lock
1817 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1818 """Encode empty directories by using an \056-named (".") empty file"""
1819 if len(self._items) == 0:
1820 return "%s %s 0:0:\\056\n" % (
1821 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1822 return super(Subcollection, self)._get_manifest_text(stream_name,
1827 class CollectionReader(Collection):
1828 """Read-only `Collection` subclass
1830 This class will never create or update any API collection records. You can
1831 use this class for additional code safety when you only need to read
1832 existing collections.
1834 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1835 self._in_init = True
1836 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1837 self._in_init = False
1839 # Forego any locking since it should never change once initialized.
1840 self.lock = NoopLock()
1842 # Backwards compatability with old CollectionReader
1843 # all_streams() and all_files()
1844 self._streams = None
1846 def writable(self) -> bool:
1847 return self._in_init
1849 def _populate_streams(orig_func):
1850 @functools.wraps(orig_func)
1851 def populate_streams_wrapper(self, *args, **kwargs):
1852 # Defer populating self._streams until needed since it creates a copy of the manifest.
1853 if self._streams is None:
1854 if self._manifest_text:
1855 self._streams = [sline.split()
1856 for sline in self._manifest_text.split("\n")
1860 return orig_func(self, *args, **kwargs)
1861 return populate_streams_wrapper
1863 @arvados.util._deprecated('3.0', 'Collection iteration')
1865 def normalize(self):
1866 """Normalize the streams returned by `all_streams`"""
1868 for s in self.all_streams():
1869 for f in s.all_files():
1870 streamname, filename = split(s.name() + "/" + f.name())
1871 if streamname not in streams:
1872 streams[streamname] = {}
1873 if filename not in streams[streamname]:
1874 streams[streamname][filename] = []
1875 for r in f.segments:
1876 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1878 self._streams = [normalize_stream(s, streams[s])
1879 for s in sorted(streams)]
1881 @arvados.util._deprecated('3.0', 'Collection iteration')
1883 def all_streams(self):
1884 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1885 for s in self._streams]
1887 @arvados.util._deprecated('3.0', 'Collection iteration')
1889 def all_files(self):
1890 for s in self.all_streams():
1891 for f in s.all_files():
1895 class CollectionWriter(CollectionBase):
1896 """Create a new collection from scratch
1898 .. WARNING:: Deprecated
1899 This class is deprecated. Prefer `arvados.collection.Collection`
1903 @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1904 def __init__(self, api_client=None, num_retries=0, replication=None):
1905 """Instantiate a CollectionWriter.
1907 CollectionWriter lets you build a new Arvados Collection from scratch.
1908 Write files to it. The CollectionWriter will upload data to Keep as
1909 appropriate, and provide you with the Collection manifest text when
1913 * api_client: The API client to use to look up Collections. If not
1914 provided, CollectionReader will build one from available Arvados
1916 * num_retries: The default number of times to retry failed
1917 service requests. Default 0. You may change this value
1918 after instantiation, but note those changes may not
1919 propagate to related objects like the Keep client.
1920 * replication: The number of copies of each block to store.
1921 If this argument is None or not supplied, replication is
1922 the server-provided default if available, otherwise 2.
1924 self._api_client = api_client
1925 self.num_retries = num_retries
1926 self.replication = (2 if replication is None else replication)
1927 self._keep_client = None
1928 self._data_buffer = []
1929 self._data_buffer_len = 0
1930 self._current_stream_files = []
1931 self._current_stream_length = 0
1932 self._current_stream_locators = []
1933 self._current_stream_name = '.'
1934 self._current_file_name = None
1935 self._current_file_pos = 0
1936 self._finished_streams = []
1937 self._close_file = None
1938 self._queued_file = None
1939 self._queued_dirents = deque()
1940 self._queued_trees = deque()
1941 self._last_open = None
1943 def __exit__(self, exc_type, exc_value, traceback):
1944 if exc_type is None:
1947 def do_queued_work(self):
1948 # The work queue consists of three pieces:
1949 # * _queued_file: The file object we're currently writing to the
1951 # * _queued_dirents: Entries under the current directory
1952 # (_queued_trees[0]) that we want to write or recurse through.
1953 # This may contain files from subdirectories if
1954 # max_manifest_depth == 0 for this directory.
1955 # * _queued_trees: Directories that should be written as separate
1956 # streams to the Collection.
1957 # This function handles the smallest piece of work currently queued
1958 # (current file, then current directory, then next directory) until
1959 # no work remains. The _work_THING methods each do a unit of work on
1960 # THING. _queue_THING methods add a THING to the work queue.
1962 if self._queued_file:
1964 elif self._queued_dirents:
1965 self._work_dirents()
1966 elif self._queued_trees:
1971 def _work_file(self):
1973 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1977 self.finish_current_file()
1978 if self._close_file:
1979 self._queued_file.close()
1980 self._close_file = None
1981 self._queued_file = None
1983 def _work_dirents(self):
1984 path, stream_name, max_manifest_depth = self._queued_trees[0]
1985 if stream_name != self.current_stream_name():
1986 self.start_new_stream(stream_name)
1987 while self._queued_dirents:
1988 dirent = self._queued_dirents.popleft()
1989 target = os.path.join(path, dirent)
1990 if os.path.isdir(target):
1991 self._queue_tree(target,
1992 os.path.join(stream_name, dirent),
1993 max_manifest_depth - 1)
1995 self._queue_file(target, dirent)
1997 if not self._queued_dirents:
1998 self._queued_trees.popleft()
2000 def _work_trees(self):
2001 path, stream_name, max_manifest_depth = self._queued_trees[0]
2002 d = arvados.util.listdir_recursive(
2003 path, max_depth = (None if max_manifest_depth == 0 else 0))
2005 self._queue_dirents(stream_name, d)
2007 self._queued_trees.popleft()
2009 def _queue_file(self, source, filename=None):
2010 assert (self._queued_file is None), "tried to queue more than one file"
2011 if not hasattr(source, 'read'):
2012 source = open(source, 'rb')
2013 self._close_file = True
2015 self._close_file = False
2016 if filename is None:
2017 filename = os.path.basename(source.name)
2018 self.start_new_file(filename)
2019 self._queued_file = source
2021 def _queue_dirents(self, stream_name, dirents):
2022 assert (not self._queued_dirents), "tried to queue more than one tree"
2023 self._queued_dirents = deque(sorted(dirents))
2025 def _queue_tree(self, path, stream_name, max_manifest_depth):
2026 self._queued_trees.append((path, stream_name, max_manifest_depth))
2028 def write_file(self, source, filename=None):
2029 self._queue_file(source, filename)
2030 self.do_queued_work()
2032 def write_directory_tree(self,
2033 path, stream_name='.', max_manifest_depth=-1):
2034 self._queue_tree(path, stream_name, max_manifest_depth)
2035 self.do_queued_work()
2037 def write(self, newdata):
2038 if isinstance(newdata, bytes):
2040 elif isinstance(newdata, str):
2041 newdata = newdata.encode()
2042 elif hasattr(newdata, '__iter__'):
2046 self._data_buffer.append(newdata)
2047 self._data_buffer_len += len(newdata)
2048 self._current_stream_length += len(newdata)
2049 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2052 def open(self, streampath, filename=None):
2053 """open(streampath[, filename]) -> file-like object
2055 Pass in the path of a file to write to the Collection, either as a
2056 single string or as two separate stream name and file name arguments.
2057 This method returns a file-like object you can write to add it to the
2060 You may only have one file object from the Collection open at a time,
2061 so be sure to close the object when you're done. Using the object in
2062 a with statement makes that easy:
2064 with cwriter.open('./doc/page1.txt') as outfile:
2065 outfile.write(page1_data)
2066 with cwriter.open('./doc/page2.txt') as outfile:
2067 outfile.write(page2_data)
2069 if filename is None:
2070 streampath, filename = split(streampath)
2071 if self._last_open and not self._last_open.closed:
2072 raise errors.AssertionError(
2073 u"can't open '{}' when '{}' is still open".format(
2074 filename, self._last_open.name))
2075 if streampath != self.current_stream_name():
2076 self.start_new_stream(streampath)
2077 self.set_current_file_name(filename)
2078 self._last_open = _WriterFile(self, filename)
2079 return self._last_open
2081 def flush_data(self):
2082 data_buffer = b''.join(self._data_buffer)
2084 self._current_stream_locators.append(
2085 self._my_keep().put(
2086 data_buffer[0:config.KEEP_BLOCK_SIZE],
2087 copies=self.replication))
2088 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2089 self._data_buffer_len = len(self._data_buffer[0])
2091 def start_new_file(self, newfilename=None):
2092 self.finish_current_file()
2093 self.set_current_file_name(newfilename)
2095 def set_current_file_name(self, newfilename):
2096 if re.search(r'[\t\n]', newfilename):
2097 raise errors.AssertionError(
2098 "Manifest filenames cannot contain whitespace: %s" %
2100 elif re.search(r'\x00', newfilename):
2101 raise errors.AssertionError(
2102 "Manifest filenames cannot contain NUL characters: %s" %
2104 self._current_file_name = newfilename
2106 def current_file_name(self):
2107 return self._current_file_name
2109 def finish_current_file(self):
2110 if self._current_file_name is None:
2111 if self._current_file_pos == self._current_stream_length:
2113 raise errors.AssertionError(
2114 "Cannot finish an unnamed file " +
2115 "(%d bytes at offset %d in '%s' stream)" %
2116 (self._current_stream_length - self._current_file_pos,
2117 self._current_file_pos,
2118 self._current_stream_name))
2119 self._current_stream_files.append([
2120 self._current_file_pos,
2121 self._current_stream_length - self._current_file_pos,
2122 self._current_file_name])
2123 self._current_file_pos = self._current_stream_length
2124 self._current_file_name = None
2126 def start_new_stream(self, newstreamname='.'):
2127 self.finish_current_stream()
2128 self.set_current_stream_name(newstreamname)
2130 def set_current_stream_name(self, newstreamname):
2131 if re.search(r'[\t\n]', newstreamname):
2132 raise errors.AssertionError(
2133 "Manifest stream names cannot contain whitespace: '%s'" %
2135 self._current_stream_name = '.' if newstreamname=='' else newstreamname
2137 def current_stream_name(self):
2138 return self._current_stream_name
2140 def finish_current_stream(self):
2141 self.finish_current_file()
2143 if not self._current_stream_files:
2145 elif self._current_stream_name is None:
2146 raise errors.AssertionError(
2147 "Cannot finish an unnamed stream (%d bytes in %d files)" %
2148 (self._current_stream_length, len(self._current_stream_files)))
2150 if not self._current_stream_locators:
2151 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2152 self._finished_streams.append([self._current_stream_name,
2153 self._current_stream_locators,
2154 self._current_stream_files])
2155 self._current_stream_files = []
2156 self._current_stream_length = 0
2157 self._current_stream_locators = []
2158 self._current_stream_name = None
2159 self._current_file_pos = 0
2160 self._current_file_name = None
2163 """Store the manifest in Keep and return its locator.
2165 This is useful for storing manifest fragments (task outputs)
2166 temporarily in Keep during a Crunch job.
2168 In other cases you should make a collection instead, by
2169 sending manifest_text() to the API server's "create
2170 collection" endpoint.
2172 return self._my_keep().put(self.manifest_text().encode(),
2173 copies=self.replication)
2175 def portable_data_hash(self):
2176 stripped = self.stripped_manifest().encode()
2177 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2179 def manifest_text(self):
2180 self.finish_current_stream()
2183 for stream in self._finished_streams:
2184 if not re.search(r'^\.(/.*)?$', stream[0]):
2186 manifest += stream[0].replace(' ', '\\040')
2187 manifest += ' ' + ' '.join(stream[1])
2188 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2193 def data_locators(self):
2195 for name, locators, files in self._finished_streams:
2199 def save_new(self, name=None):
2200 return self._api_client.collections().create(
2201 ensure_unique_name=True,
2204 'manifest_text': self.manifest_text(),
2205 }).execute(num_retries=self.num_retries)
2208 class ResumableCollectionWriter(CollectionWriter):
2209 """CollectionWriter that can serialize internal state to disk
2211 .. WARNING:: Deprecated
2212 This class is deprecated. Prefer `arvados.collection.Collection`
2216 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2217 '_current_stream_locators', '_current_stream_name',
2218 '_current_file_name', '_current_file_pos', '_close_file',
2219 '_data_buffer', '_dependencies', '_finished_streams',
2220 '_queued_dirents', '_queued_trees']
2222 @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2223 def __init__(self, api_client=None, **kwargs):
2224 self._dependencies = {}
2225 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2228 def from_state(cls, state, *init_args, **init_kwargs):
2229 # Try to build a new writer from scratch with the given state.
2230 # If the state is not suitable to resume (because files have changed,
2231 # been deleted, aren't predictable, etc.), raise a
2232 # StaleWriterStateError. Otherwise, return the initialized writer.
2233 # The caller is responsible for calling writer.do_queued_work()
2234 # appropriately after it's returned.
2235 writer = cls(*init_args, **init_kwargs)
2236 for attr_name in cls.STATE_PROPS:
2237 attr_value = state[attr_name]
2238 attr_class = getattr(writer, attr_name).__class__
2239 # Coerce the value into the same type as the initial value, if
2241 if attr_class not in (type(None), attr_value.__class__):
2242 attr_value = attr_class(attr_value)
2243 setattr(writer, attr_name, attr_value)
2244 # Check dependencies before we try to resume anything.
2245 if any(KeepLocator(ls).permission_expired()
2246 for ls in writer._current_stream_locators):
2247 raise errors.StaleWriterStateError(
2248 "locators include expired permission hint")
2249 writer.check_dependencies()
2250 if state['_current_file'] is not None:
2251 path, pos = state['_current_file']
2253 writer._queued_file = open(path, 'rb')
2254 writer._queued_file.seek(pos)
2255 except IOError as error:
2256 raise errors.StaleWriterStateError(
2257 u"failed to reopen active file {}: {}".format(path, error))
2260 def check_dependencies(self):
2261 for path, orig_stat in listitems(self._dependencies):
2262 if not S_ISREG(orig_stat[ST_MODE]):
2263 raise errors.StaleWriterStateError(u"{} not file".format(path))
2265 now_stat = tuple(os.stat(path))
2266 except OSError as error:
2267 raise errors.StaleWriterStateError(
2268 u"failed to stat {}: {}".format(path, error))
2269 if ((not S_ISREG(now_stat[ST_MODE])) or
2270 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2271 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2272 raise errors.StaleWriterStateError(u"{} changed".format(path))
2274 def dump_state(self, copy_func=lambda x: x):
2275 state = {attr: copy_func(getattr(self, attr))
2276 for attr in self.STATE_PROPS}
2277 if self._queued_file is None:
2278 state['_current_file'] = None
2280 state['_current_file'] = (os.path.realpath(self._queued_file.name),
2281 self._queued_file.tell())
2284 def _queue_file(self, source, filename=None):
2286 src_path = os.path.realpath(source)
2288 raise errors.AssertionError(u"{} not a file path".format(source))
2290 path_stat = os.stat(src_path)
2291 except OSError as stat_error:
2293 super(ResumableCollectionWriter, self)._queue_file(source, filename)
2294 fd_stat = os.fstat(self._queued_file.fileno())
2295 if not S_ISREG(fd_stat.st_mode):
2296 # We won't be able to resume from this cache anyway, so don't
2297 # worry about further checks.
2298 self._dependencies[source] = tuple(fd_stat)
2299 elif path_stat is None:
2300 raise errors.AssertionError(
2301 u"could not stat {}: {}".format(source, stat_error))
2302 elif path_stat.st_ino != fd_stat.st_ino:
2303 raise errors.AssertionError(
2304 u"{} changed between open and stat calls".format(source))
2306 self._dependencies[src_path] = tuple(fd_stat)
2308 def write(self, data):
2309 if self._queued_file is None:
2310 raise errors.AssertionError(
2311 "resumable writer can't accept unsourced data")
2312 return super(ResumableCollectionWriter, self).write(data)