15397: Remove deprecated arvados.Keep class.
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
5
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
11
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
13 """
14
15 import ciso8601
16 import datetime
17 import errno
18 import functools
19 import hashlib
20 import io
21 import logging
22 import os
23 import re
24 import sys
25 import threading
26 import time
27
28 from collections import deque
29 from stat import *
30
31 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
32 from .keep import KeepLocator, KeepClient
33 from ._normalize_stream import normalize_stream, escape
34 from ._ranges import Range, LocatorAndRange
35 from .safeapi import ThreadSafeApiCache
36 import arvados.config as config
37 import arvados.errors as errors
38 import arvados.util
39 import arvados.events as events
40 from arvados.retry import retry_method
41
42 from typing import (
43     Any,
44     Callable,
45     Dict,
46     IO,
47     Iterator,
48     List,
49     Mapping,
50     Optional,
51     Tuple,
52     Union,
53 )
54
55 if sys.version_info < (3, 8):
56     from typing_extensions import Literal
57 else:
58     from typing import Literal
59
60 _logger = logging.getLogger('arvados.collection')
61
62 ADD = "add"
63 """Argument value for `Collection` methods to represent an added item"""
64 DEL = "del"
65 """Argument value for `Collection` methods to represent a removed item"""
66 MOD = "mod"
67 """Argument value for `Collection` methods to represent a modified item"""
68 TOK = "tok"
69 """Argument value for `Collection` methods to represent an item with token differences"""
70 FILE = "file"
71 """`create_type` value for `Collection.find_or_create`"""
72 COLLECTION = "collection"
73 """`create_type` value for `Collection.find_or_create`"""
74
75 ChangeList = List[Union[
76     Tuple[Literal[ADD, DEL], str, 'Collection'],
77     Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
78 ]]
79 ChangeType = Literal[ADD, DEL, MOD, TOK]
80 CollectionItem = Union[ArvadosFile, 'Collection']
81 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
82 CreateType = Literal[COLLECTION, FILE]
83 Properties = Dict[str, Any]
84 StorageClasses = List[str]
85
86 class CollectionBase(object):
87     """Abstract base class for Collection classes
88
89     .. ATTENTION:: Internal
90        This class is meant to be used by other parts of the SDK. User code
91        should instantiate or subclass `Collection` or one of its subclasses
92        directly.
93     """
94
95     def __enter__(self):
96         """Enter a context block with this collection instance"""
97         return self
98
99     def __exit__(self, exc_type, exc_value, traceback):
100         """Exit a context block with this collection instance"""
101         pass
102
103     def _my_keep(self):
104         if self._keep_client is None:
105             self._keep_client = KeepClient(api_client=self._api_client,
106                                            num_retries=self.num_retries)
107         return self._keep_client
108
109     def stripped_manifest(self) -> str:
110         """Create a copy of the collection manifest with only size hints
111
112         This method returns a string with the current collection's manifest
113         text with all non-portable locator hints like permission hints and
114         remote cluster hints removed. The only hints in the returned manifest
115         will be size hints.
116         """
117         raw = self.manifest_text()
118         clean = []
119         for line in raw.split("\n"):
120             fields = line.split()
121             if fields:
122                 clean_fields = fields[:1] + [
123                     (re.sub(r'\+[^\d][^\+]*', '', x)
124                      if re.match(arvados.util.keep_locator_pattern, x)
125                      else x)
126                     for x in fields[1:]]
127                 clean += [' '.join(clean_fields), "\n"]
128         return ''.join(clean)
129
130
131 class _WriterFile(_FileLikeObjectBase):
132     def __init__(self, coll_writer, name):
133         super(_WriterFile, self).__init__(name, 'wb')
134         self.dest = coll_writer
135
136     def close(self):
137         super(_WriterFile, self).close()
138         self.dest.finish_current_file()
139
140     @_FileLikeObjectBase._before_close
141     def write(self, data):
142         self.dest.write(data)
143
144     @_FileLikeObjectBase._before_close
145     def writelines(self, seq):
146         for data in seq:
147             self.write(data)
148
149     @_FileLikeObjectBase._before_close
150     def flush(self):
151         self.dest.flush_data()
152
153
154 class RichCollectionBase(CollectionBase):
155     """Base class for Collection classes
156
157     .. ATTENTION:: Internal
158        This class is meant to be used by other parts of the SDK. User code
159        should instantiate or subclass `Collection` or one of its subclasses
160        directly.
161     """
162
163     def __init__(self, parent=None):
164         self.parent = parent
165         self._committed = False
166         self._has_remote_blocks = False
167         self._callback = None
168         self._items = {}
169
170     def _my_api(self):
171         raise NotImplementedError()
172
173     def _my_keep(self):
174         raise NotImplementedError()
175
176     def _my_block_manager(self):
177         raise NotImplementedError()
178
179     def writable(self) -> bool:
180         """Indicate whether this collection object can be modified
181
182         This method returns `False` if this object is a `CollectionReader`,
183         else `True`.
184         """
185         raise NotImplementedError()
186
187     def root_collection(self) -> 'Collection':
188         """Get this collection's root collection object
189
190         If you open a subcollection with `Collection.find`, calling this method
191         on that subcollection returns the source Collection object.
192         """
193         raise NotImplementedError()
194
195     def stream_name(self) -> str:
196         """Get the name of the manifest stream represented by this collection
197
198         If you open a subcollection with `Collection.find`, calling this method
199         on that subcollection returns the name of the stream you opened.
200         """
201         raise NotImplementedError()
202
203     @synchronized
204     def has_remote_blocks(self) -> bool:
205         """Indiciate whether the collection refers to remote data
206
207         Returns `True` if the collection manifest includes any Keep locators
208         with a remote hint (`+R`), else `False`.
209         """
210         if self._has_remote_blocks:
211             return True
212         for item in self:
213             if self[item].has_remote_blocks():
214                 return True
215         return False
216
217     @synchronized
218     def set_has_remote_blocks(self, val: bool) -> None:
219         """Cache whether this collection refers to remote blocks
220
221         .. ATTENTION:: Internal
222            This method is only meant to be used by other Collection methods.
223
224         Set this collection's cached "has remote blocks" flag to the given
225         value.
226         """
227         self._has_remote_blocks = val
228         if self.parent:
229             self.parent.set_has_remote_blocks(val)
230
231     @must_be_writable
232     @synchronized
233     def find_or_create(
234             self,
235             path: str,
236             create_type: CreateType,
237     ) -> CollectionItem:
238         """Get the item at the given path, creating it if necessary
239
240         If `path` refers to a stream in this collection, returns a
241         corresponding `Subcollection` object. If `path` refers to a file in
242         this collection, returns a corresponding
243         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
244         this collection, then this method creates a new object and returns
245         it, creating parent streams as needed. The type of object created is
246         determined by the value of `create_type`.
247
248         Arguments:
249
250         * path: str --- The path to find or create within this collection.
251
252         * create_type: Literal[COLLECTION, FILE] --- The type of object to
253           create at `path` if one does not exist. Passing `COLLECTION`
254           creates a stream and returns the corresponding
255           `Subcollection`. Passing `FILE` creates a new file and returns the
256           corresponding `arvados.arvfile.ArvadosFile`.
257         """
258         pathcomponents = path.split("/", 1)
259         if pathcomponents[0]:
260             item = self._items.get(pathcomponents[0])
261             if len(pathcomponents) == 1:
262                 if item is None:
263                     # create new file
264                     if create_type == COLLECTION:
265                         item = Subcollection(self, pathcomponents[0])
266                     else:
267                         item = ArvadosFile(self, pathcomponents[0])
268                     self._items[pathcomponents[0]] = item
269                     self.set_committed(False)
270                     self.notify(ADD, self, pathcomponents[0], item)
271                 return item
272             else:
273                 if item is None:
274                     # create new collection
275                     item = Subcollection(self, pathcomponents[0])
276                     self._items[pathcomponents[0]] = item
277                     self.set_committed(False)
278                     self.notify(ADD, self, pathcomponents[0], item)
279                 if isinstance(item, RichCollectionBase):
280                     return item.find_or_create(pathcomponents[1], create_type)
281                 else:
282                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
283         else:
284             return self
285
286     @synchronized
287     def find(self, path: str) -> CollectionItem:
288         """Get the item at the given path
289
290         If `path` refers to a stream in this collection, returns a
291         corresponding `Subcollection` object. If `path` refers to a file in
292         this collection, returns a corresponding
293         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
294         this collection, then this method raises `NotADirectoryError`.
295
296         Arguments:
297
298         * path: str --- The path to find or create within this collection.
299         """
300         if not path:
301             raise errors.ArgumentError("Parameter 'path' is empty.")
302
303         pathcomponents = path.split("/", 1)
304         if pathcomponents[0] == '':
305             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
306
307         item = self._items.get(pathcomponents[0])
308         if item is None:
309             return None
310         elif len(pathcomponents) == 1:
311             return item
312         else:
313             if isinstance(item, RichCollectionBase):
314                 if pathcomponents[1]:
315                     return item.find(pathcomponents[1])
316                 else:
317                     return item
318             else:
319                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
320
321     @synchronized
322     def mkdirs(self, path: str) -> 'Subcollection':
323         """Create and return a subcollection at `path`
324
325         If `path` exists within this collection, raises `FileExistsError`.
326         Otherwise, creates a stream at that path and returns the
327         corresponding `Subcollection`.
328         """
329         if self.find(path) != None:
330             raise IOError(errno.EEXIST, "Directory or file exists", path)
331
332         return self.find_or_create(path, COLLECTION)
333
334     def open(
335             self,
336             path: str,
337             mode: str="r",
338             encoding: Optional[str]=None,
339     ) -> IO:
340         """Open a file-like object within the collection
341
342         This method returns a file-like object that can read and/or write the
343         file located at `path` within the collection. If you attempt to write
344         a `path` that does not exist, the file is created with `find_or_create`.
345         If the file cannot be opened for any other reason, this method raises
346         `OSError` with an appropriate errno.
347
348         Arguments:
349
350         * path: str --- The path of the file to open within this collection
351
352         * mode: str --- The mode to open this file. Supports all the same
353           values as `builtins.open`.
354
355         * encoding: str | None --- The text encoding of the file. Only used
356           when the file is opened in text mode. The default is
357           platform-dependent.
358         """
359         if not re.search(r'^[rwa][bt]?\+?$', mode):
360             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
361
362         if mode[0] == 'r' and '+' not in mode:
363             fclass = ArvadosFileReader
364             arvfile = self.find(path)
365         elif not self.writable():
366             raise IOError(errno.EROFS, "Collection is read only")
367         else:
368             fclass = ArvadosFileWriter
369             arvfile = self.find_or_create(path, FILE)
370
371         if arvfile is None:
372             raise IOError(errno.ENOENT, "File not found", path)
373         if not isinstance(arvfile, ArvadosFile):
374             raise IOError(errno.EISDIR, "Is a directory", path)
375
376         if mode[0] == 'w':
377             arvfile.truncate(0)
378
379         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
380         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
381         if 'b' not in mode:
382             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
383             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
384         return f
385
386     def modified(self) -> bool:
387         """Indicate whether this collection has an API server record
388
389         Returns `False` if this collection corresponds to a record loaded from
390         the API server, `True` otherwise.
391         """
392         return not self.committed()
393
394     @synchronized
395     def committed(self):
396         """Indicate whether this collection has an API server record
397
398         Returns `True` if this collection corresponds to a record loaded from
399         the API server, `False` otherwise.
400         """
401         return self._committed
402
403     @synchronized
404     def set_committed(self, value: bool=True):
405         """Cache whether this collection has an API server record
406
407         .. ATTENTION:: Internal
408            This method is only meant to be used by other Collection methods.
409
410         Set this collection's cached "committed" flag to the given
411         value and propagates it as needed.
412         """
413         if value == self._committed:
414             return
415         if value:
416             for k,v in self._items.items():
417                 v.set_committed(True)
418             self._committed = True
419         else:
420             self._committed = False
421             if self.parent is not None:
422                 self.parent.set_committed(False)
423
424     @synchronized
425     def __iter__(self) -> Iterator[str]:
426         """Iterate names of streams and files in this collection
427
428         This method does not recurse. It only iterates the contents of this
429         collection's corresponding stream.
430         """
431         return iter(self._items)
432
433     @synchronized
434     def __getitem__(self, k: str) -> CollectionItem:
435         """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
436
437         This method does not recurse. If you want to search a path, use
438         `RichCollectionBase.find` instead.
439         """
440         return self._items[k]
441
442     @synchronized
443     def __contains__(self, k: str) -> bool:
444         """Indicate whether this collection has an item with this name
445
446         This method does not recurse. It you want to check a path, use
447         `RichCollectionBase.exists` instead.
448         """
449         return k in self._items
450
451     @synchronized
452     def __len__(self):
453         """Get the number of items directly contained in this collection
454
455         This method does not recurse. It only counts the streams and files
456         in this collection's corresponding stream.
457         """
458         return len(self._items)
459
460     @must_be_writable
461     @synchronized
462     def __delitem__(self, p: str) -> None:
463         """Delete an item from this collection's stream
464
465         This method does not recurse. If you want to remove an item by a
466         path, use `RichCollectionBase.remove` instead.
467         """
468         del self._items[p]
469         self.set_committed(False)
470         self.notify(DEL, self, p, None)
471
472     @synchronized
473     def keys(self) -> Iterator[str]:
474         """Iterate names of streams and files in this collection
475
476         This method does not recurse. It only iterates the contents of this
477         collection's corresponding stream.
478         """
479         return self._items.keys()
480
481     @synchronized
482     def values(self) -> List[CollectionItem]:
483         """Get a list of objects in this collection's stream
484
485         The return value includes a `Subcollection` for every stream, and an
486         `arvados.arvfile.ArvadosFile` for every file, directly within this
487         collection's stream.  This method does not recurse.
488         """
489         return list(self._items.values())
490
491     @synchronized
492     def items(self) -> List[Tuple[str, CollectionItem]]:
493         """Get a list of `(name, object)` tuples from this collection's stream
494
495         The return value includes a `Subcollection` for every stream, and an
496         `arvados.arvfile.ArvadosFile` for every file, directly within this
497         collection's stream.  This method does not recurse.
498         """
499         return list(self._items.items())
500
501     def exists(self, path: str) -> bool:
502         """Indicate whether this collection includes an item at `path`
503
504         This method returns `True` if `path` refers to a stream or file within
505         this collection, else `False`.
506
507         Arguments:
508
509         * path: str --- The path to check for existence within this collection
510         """
511         return self.find(path) is not None
512
513     @must_be_writable
514     @synchronized
515     def remove(self, path: str, recursive: bool=False) -> None:
516         """Remove the file or stream at `path`
517
518         Arguments:
519
520         * path: str --- The path of the item to remove from the collection
521
522         * recursive: bool --- Controls the method's behavior if `path` refers
523           to a nonempty stream. If `False` (the default), this method raises
524           `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
525           items under the stream.
526         """
527         if not path:
528             raise errors.ArgumentError("Parameter 'path' is empty.")
529
530         pathcomponents = path.split("/", 1)
531         item = self._items.get(pathcomponents[0])
532         if item is None:
533             raise IOError(errno.ENOENT, "File not found", path)
534         if len(pathcomponents) == 1:
535             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
536                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
537             deleteditem = self._items[pathcomponents[0]]
538             del self._items[pathcomponents[0]]
539             self.set_committed(False)
540             self.notify(DEL, self, pathcomponents[0], deleteditem)
541         else:
542             item.remove(pathcomponents[1], recursive=recursive)
543
544     def _clonefrom(self, source):
545         for k,v in source.items():
546             self._items[k] = v.clone(self, k)
547
548     def clone(self):
549         raise NotImplementedError()
550
551     @must_be_writable
552     @synchronized
553     def add(
554             self,
555             source_obj: CollectionItem,
556             target_name: str,
557             overwrite: bool=False,
558             reparent: bool=False,
559     ) -> None:
560         """Copy or move a file or subcollection object to this collection
561
562         Arguments:
563
564         * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
565           to add to this collection
566
567         * target_name: str --- The path inside this collection where
568           `source_obj` should be added.
569
570         * overwrite: bool --- Controls the behavior of this method when the
571           collection already contains an object at `target_name`. If `False`
572           (the default), this method will raise `FileExistsError`. If `True`,
573           the object at `target_name` will be replaced with `source_obj`.
574
575         * reparent: bool --- Controls whether this method copies or moves
576           `source_obj`. If `False` (the default), `source_obj` is copied into
577           this collection. If `True`, `source_obj` is moved into this
578           collection.
579         """
580         if target_name in self and not overwrite:
581             raise IOError(errno.EEXIST, "File already exists", target_name)
582
583         modified_from = None
584         if target_name in self:
585             modified_from = self[target_name]
586
587         # Actually make the move or copy.
588         if reparent:
589             source_obj._reparent(self, target_name)
590             item = source_obj
591         else:
592             item = source_obj.clone(self, target_name)
593
594         self._items[target_name] = item
595         self.set_committed(False)
596         if not self._has_remote_blocks and source_obj.has_remote_blocks():
597             self.set_has_remote_blocks(True)
598
599         if modified_from:
600             self.notify(MOD, self, target_name, (modified_from, item))
601         else:
602             self.notify(ADD, self, target_name, item)
603
604     def _get_src_target(self, source, target_path, source_collection, create_dest):
605         if source_collection is None:
606             source_collection = self
607
608         # Find the object
609         if isinstance(source, str):
610             source_obj = source_collection.find(source)
611             if source_obj is None:
612                 raise IOError(errno.ENOENT, "File not found", source)
613             sourcecomponents = source.split("/")
614         else:
615             source_obj = source
616             sourcecomponents = None
617
618         # Find parent collection the target path
619         targetcomponents = target_path.split("/")
620
621         # Determine the name to use.
622         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
623
624         if not target_name:
625             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
626
627         if create_dest:
628             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
629         else:
630             if len(targetcomponents) > 1:
631                 target_dir = self.find("/".join(targetcomponents[0:-1]))
632             else:
633                 target_dir = self
634
635         if target_dir is None:
636             raise IOError(errno.ENOENT, "Target directory not found", target_name)
637
638         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
639             target_dir = target_dir[target_name]
640             target_name = sourcecomponents[-1]
641
642         return (source_obj, target_dir, target_name)
643
644     @must_be_writable
645     @synchronized
646     def copy(
647             self,
648             source: Union[str, CollectionItem],
649             target_path: str,
650             source_collection: Optional['RichCollectionBase']=None,
651             overwrite: bool=False,
652     ) -> None:
653         """Copy a file or subcollection object to this collection
654
655         Arguments:
656
657         * source: str | arvados.arvfile.ArvadosFile |
658           arvados.collection.Subcollection --- The file or subcollection to
659           add to this collection. If `source` is a str, the object will be
660           found by looking up this path from `source_collection` (see
661           below).
662
663         * target_path: str --- The path inside this collection where the
664           source object should be added.
665
666         * source_collection: arvados.collection.Collection | None --- The
667           collection to find the source object from when `source` is a
668           path. Defaults to the current collection (`self`).
669
670         * overwrite: bool --- Controls the behavior of this method when the
671           collection already contains an object at `target_path`. If `False`
672           (the default), this method will raise `FileExistsError`. If `True`,
673           the object at `target_path` will be replaced with `source_obj`.
674         """
675         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
676         target_dir.add(source_obj, target_name, overwrite, False)
677
678     @must_be_writable
679     @synchronized
680     def rename(
681             self,
682             source: Union[str, CollectionItem],
683             target_path: str,
684             source_collection: Optional['RichCollectionBase']=None,
685             overwrite: bool=False,
686     ) -> None:
687         """Move a file or subcollection object to this collection
688
689         Arguments:
690
691         * source: str | arvados.arvfile.ArvadosFile |
692           arvados.collection.Subcollection --- The file or subcollection to
693           add to this collection. If `source` is a str, the object will be
694           found by looking up this path from `source_collection` (see
695           below).
696
697         * target_path: str --- The path inside this collection where the
698           source object should be added.
699
700         * source_collection: arvados.collection.Collection | None --- The
701           collection to find the source object from when `source` is a
702           path. Defaults to the current collection (`self`).
703
704         * overwrite: bool --- Controls the behavior of this method when the
705           collection already contains an object at `target_path`. If `False`
706           (the default), this method will raise `FileExistsError`. If `True`,
707           the object at `target_path` will be replaced with `source_obj`.
708         """
709         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
710         if not source_obj.writable():
711             raise IOError(errno.EROFS, "Source collection is read only", source)
712         target_dir.add(source_obj, target_name, overwrite, True)
713
714     def portable_manifest_text(self, stream_name: str=".") -> str:
715         """Get the portable manifest text for this collection
716
717         The portable manifest text is normalized, and does not include access
718         tokens. This method does not flush outstanding blocks to Keep.
719
720         Arguments:
721
722         * stream_name: str --- The name to use for this collection's stream in
723           the generated manifest. Default `'.'`.
724         """
725         return self._get_manifest_text(stream_name, True, True)
726
727     @synchronized
728     def manifest_text(
729             self,
730             stream_name: str=".",
731             strip: bool=False,
732             normalize: bool=False,
733             only_committed: bool=False,
734     ) -> str:
735         """Get the manifest text for this collection
736
737         Arguments:
738
739         * stream_name: str --- The name to use for this collection's stream in
740           the generated manifest. Default `'.'`.
741
742         * strip: bool --- Controls whether or not the returned manifest text
743           includes access tokens. If `False` (the default), the manifest text
744           will include access tokens. If `True`, the manifest text will not
745           include access tokens.
746
747         * normalize: bool --- Controls whether or not the returned manifest
748           text is normalized. Default `False`.
749
750         * only_committed: bool --- Controls whether or not this method uploads
751           pending data to Keep before building and returning the manifest text.
752           If `False` (the default), this method will finish uploading all data
753           to Keep, then return the final manifest. If `True`, this method will
754           build and return a manifest that only refers to the data that has
755           finished uploading at the time this method was called.
756         """
757         if not only_committed:
758             self._my_block_manager().commit_all()
759         return self._get_manifest_text(stream_name, strip, normalize,
760                                        only_committed=only_committed)
761
762     @synchronized
763     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
764         """Get the manifest text for this collection, sub collections and files.
765
766         :stream_name:
767           Name to use for this stream (directory)
768
769         :strip:
770           If True, remove signing tokens from block locators if present.
771           If False (default), block locators are left unchanged.
772
773         :normalize:
774           If True, always export the manifest text in normalized form
775           even if the Collection is not modified.  If False (default) and the collection
776           is not modified, return the original manifest text even if it is not
777           in normalized form.
778
779         :only_committed:
780           If True, only include blocks that were already committed to Keep.
781
782         """
783
784         if not self.committed() or self._manifest_text is None or normalize:
785             stream = {}
786             buf = []
787             sorted_keys = sorted(self.keys())
788             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
789                 # Create a stream per file `k`
790                 arvfile = self[filename]
791                 filestream = []
792                 for segment in arvfile.segments():
793                     loc = segment.locator
794                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
795                         if only_committed:
796                             continue
797                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
798                     if strip:
799                         loc = KeepLocator(loc).stripped()
800                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
801                                          segment.segment_offset, segment.range_size))
802                 stream[filename] = filestream
803             if stream:
804                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
805             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
806                 buf.append(self[dirname].manifest_text(
807                     stream_name=os.path.join(stream_name, dirname),
808                     strip=strip, normalize=True, only_committed=only_committed))
809             return "".join(buf)
810         else:
811             if strip:
812                 return self.stripped_manifest()
813             else:
814                 return self._manifest_text
815
816     @synchronized
817     def _copy_remote_blocks(self, remote_blocks={}):
818         """Scan through the entire collection and ask Keep to copy remote blocks.
819
820         When accessing a remote collection, blocks will have a remote signature
821         (+R instead of +A). Collect these signatures and request Keep to copy the
822         blocks to the local cluster, returning local (+A) signatures.
823
824         :remote_blocks:
825           Shared cache of remote to local block mappings. This is used to avoid
826           doing extra work when blocks are shared by more than one file in
827           different subdirectories.
828
829         """
830         for item in self:
831             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
832         return remote_blocks
833
834     @synchronized
835     def diff(
836             self,
837             end_collection: 'RichCollectionBase',
838             prefix: str=".",
839             holding_collection: Optional['Collection']=None,
840     ) -> ChangeList:
841         """Build a list of differences between this collection and another
842
843         Arguments:
844
845         * end_collection: arvados.collection.RichCollectionBase --- A
846           collection object with the desired end state. The returned diff
847           list will describe how to go from the current collection object
848           `self` to `end_collection`.
849
850         * prefix: str --- The name to use for this collection's stream in
851           the diff list. Default `'.'`.
852
853         * holding_collection: arvados.collection.Collection | None --- A
854           collection object used to hold objects for the returned diff
855           list. By default, a new empty collection is created.
856         """
857         changes = []
858         if holding_collection is None:
859             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
860         for k in self:
861             if k not in end_collection:
862                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
863         for k in end_collection:
864             if k in self:
865                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
866                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
867                 elif end_collection[k] != self[k]:
868                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
869                 else:
870                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
871             else:
872                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
873         return changes
874
875     @must_be_writable
876     @synchronized
877     def apply(self, changes: ChangeList) -> None:
878         """Apply a list of changes from to this collection
879
880         This method takes a list of changes generated by
881         `RichCollectionBase.diff` and applies it to this
882         collection. Afterward, the state of this collection object will
883         match the state of `end_collection` passed to `diff`. If a change
884         conflicts with a local change, it will be saved to an alternate path
885         indicating the conflict.
886
887         Arguments:
888
889         * changes: arvados.collection.ChangeList --- The list of differences
890           generated by `RichCollectionBase.diff`.
891         """
892         if changes:
893             self.set_committed(False)
894         for change in changes:
895             event_type = change[0]
896             path = change[1]
897             initial = change[2]
898             local = self.find(path)
899             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
900                                                                     time.gmtime()))
901             if event_type == ADD:
902                 if local is None:
903                     # No local file at path, safe to copy over new file
904                     self.copy(initial, path)
905                 elif local is not None and local != initial:
906                     # There is already local file and it is different:
907                     # save change to conflict file.
908                     self.copy(initial, conflictpath)
909             elif event_type == MOD or event_type == TOK:
910                 final = change[3]
911                 if local == initial:
912                     # Local matches the "initial" item so it has not
913                     # changed locally and is safe to update.
914                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
915                         # Replace contents of local file with new contents
916                         local.replace_contents(final)
917                     else:
918                         # Overwrite path with new item; this can happen if
919                         # path was a file and is now a collection or vice versa
920                         self.copy(final, path, overwrite=True)
921                 else:
922                     # Local is missing (presumably deleted) or local doesn't
923                     # match the "start" value, so save change to conflict file
924                     self.copy(final, conflictpath)
925             elif event_type == DEL:
926                 if local == initial:
927                     # Local item matches "initial" value, so it is safe to remove.
928                     self.remove(path, recursive=True)
929                 # else, the file is modified or already removed, in either
930                 # case we don't want to try to remove it.
931
932     def portable_data_hash(self) -> str:
933         """Get the portable data hash for this collection's manifest"""
934         if self._manifest_locator and self.committed():
935             # If the collection is already saved on the API server, and it's committed
936             # then return API server's PDH response.
937             return self._portable_data_hash
938         else:
939             stripped = self.portable_manifest_text().encode()
940             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
941
942     @synchronized
943     def subscribe(self, callback: ChangeCallback) -> None:
944         """Set a notify callback for changes to this collection
945
946         Arguments:
947
948         * callback: arvados.collection.ChangeCallback --- The callable to
949           call each time the collection is changed.
950         """
951         if self._callback is None:
952             self._callback = callback
953         else:
954             raise errors.ArgumentError("A callback is already set on this collection.")
955
956     @synchronized
957     def unsubscribe(self) -> None:
958         """Remove any notify callback set for changes to this collection"""
959         if self._callback is not None:
960             self._callback = None
961
962     @synchronized
963     def notify(
964             self,
965             event: ChangeType,
966             collection: 'RichCollectionBase',
967             name: str,
968             item: CollectionItem,
969     ) -> None:
970         """Notify any subscribed callback about a change to this collection
971
972         .. ATTENTION:: Internal
973            This method is only meant to be used by other Collection methods.
974
975         If a callback has been registered with `RichCollectionBase.subscribe`,
976         it will be called with information about a change to this collection.
977         Then this notification will be propagated to this collection's root.
978
979         Arguments:
980
981         * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
982           the collection.
983
984         * collection: arvados.collection.RichCollectionBase --- The
985           collection that was modified.
986
987         * name: str --- The name of the file or stream within `collection` that
988           was modified.
989
990         * item: arvados.arvfile.ArvadosFile |
991           arvados.collection.Subcollection --- The new contents at `name`
992           within `collection`.
993         """
994         if self._callback:
995             self._callback(event, collection, name, item)
996         self.root_collection().notify(event, collection, name, item)
997
998     @synchronized
999     def __eq__(self, other: Any) -> bool:
1000         """Indicate whether this collection object is equal to another"""
1001         if other is self:
1002             return True
1003         if not isinstance(other, RichCollectionBase):
1004             return False
1005         if len(self._items) != len(other):
1006             return False
1007         for k in self._items:
1008             if k not in other:
1009                 return False
1010             if self._items[k] != other[k]:
1011                 return False
1012         return True
1013
1014     def __ne__(self, other: Any) -> bool:
1015         """Indicate whether this collection object is not equal to another"""
1016         return not self.__eq__(other)
1017
1018     @synchronized
1019     def flush(self) -> None:
1020         """Upload any pending data to Keep"""
1021         for e in self.values():
1022             e.flush()
1023
1024
1025 class Collection(RichCollectionBase):
1026     """Read and manipulate an Arvados collection
1027
1028     This class provides a high-level interface to create, read, and update
1029     Arvados collections and their contents. Refer to the Arvados Python SDK
1030     cookbook for [an introduction to using the Collection class][cookbook].
1031
1032     [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1033     """
1034
1035     def __init__(self, manifest_locator_or_text: Optional[str]=None,
1036                  api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1037                  keep_client: Optional['arvados.keep.KeepClient']=None,
1038                  num_retries: int=10,
1039                  parent: Optional['Collection']=None,
1040                  apiconfig: Optional[Mapping[str, str]]=None,
1041                  block_manager: Optional['arvados.arvfile._BlockManager']=None,
1042                  replication_desired: Optional[int]=None,
1043                  storage_classes_desired: Optional[List[str]]=None,
1044                  put_threads: Optional[int]=None):
1045         """Initialize a Collection object
1046
1047         Arguments:
1048
1049         * manifest_locator_or_text: str | None --- This string can contain a
1050           collection manifest text, portable data hash, or UUID. When given a
1051           portable data hash or UUID, this instance will load a collection
1052           record from the API server. Otherwise, this instance will represent a
1053           new collection without an API server record. The default value `None`
1054           instantiates a new collection with an empty manifest.
1055
1056         * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1057           Arvados API client object this instance uses to make requests. If
1058           none is given, this instance creates its own client using the
1059           settings from `apiconfig` (see below). If your client instantiates
1060           many Collection objects, you can help limit memory utilization by
1061           calling `arvados.api.api` to construct an
1062           `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1063           for every Collection.
1064
1065         * keep_client: arvados.keep.KeepClient | None --- The Keep client
1066           object this instance uses to make requests. If none is given, this
1067           instance creates its own client using its `api_client`.
1068
1069         * num_retries: int --- The number of times that client requests are
1070           retried. Default 10.
1071
1072         * parent: arvados.collection.Collection | None --- The parent Collection
1073           object of this instance, if any. This argument is primarily used by
1074           other Collection methods; user client code shouldn't need to use it.
1075
1076         * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1077           `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1078           `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1079           Collection object constructs one from these settings. If no
1080           mapping is provided, calls `arvados.config.settings` to get these
1081           parameters from user configuration.
1082
1083         * block_manager: arvados.arvfile._BlockManager | None --- The
1084           _BlockManager object used by this instance to coordinate reading
1085           and writing Keep data blocks. If none is given, this instance
1086           constructs its own. This argument is primarily used by other
1087           Collection methods; user client code shouldn't need to use it.
1088
1089         * replication_desired: int | None --- This controls both the value of
1090           the `replication_desired` field on API collection records saved by
1091           this class, as well as the number of Keep services that the object
1092           writes new data blocks to. If none is given, uses the default value
1093           configured for the cluster.
1094
1095         * storage_classes_desired: list[str] | None --- This controls both
1096           the value of the `storage_classes_desired` field on API collection
1097           records saved by this class, as well as selecting which specific
1098           Keep services the object writes new data blocks to. If none is
1099           given, defaults to an empty list.
1100
1101         * put_threads: int | None --- The number of threads to run
1102           simultaneously to upload data blocks to Keep. This value is used when
1103           building a new `block_manager`. It is unused when a `block_manager`
1104           is provided.
1105         """
1106
1107         if storage_classes_desired and type(storage_classes_desired) is not list:
1108             raise errors.ArgumentError("storage_classes_desired must be list type.")
1109
1110         super(Collection, self).__init__(parent)
1111         self._api_client = api_client
1112         self._keep_client = keep_client
1113
1114         # Use the keep client from ThreadSafeApiCache
1115         if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1116             self._keep_client = self._api_client.keep
1117
1118         self._block_manager = block_manager
1119         self.replication_desired = replication_desired
1120         self._storage_classes_desired = storage_classes_desired
1121         self.put_threads = put_threads
1122
1123         if apiconfig:
1124             self._config = apiconfig
1125         else:
1126             self._config = config.settings()
1127
1128         self.num_retries = num_retries
1129         self._manifest_locator = None
1130         self._manifest_text = None
1131         self._portable_data_hash = None
1132         self._api_response = None
1133         self._past_versions = set()
1134
1135         self.lock = threading.RLock()
1136         self.events = None
1137
1138         if manifest_locator_or_text:
1139             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1140                 self._manifest_locator = manifest_locator_or_text
1141             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1142                 self._manifest_locator = manifest_locator_or_text
1143                 if not self._has_local_collection_uuid():
1144                     self._has_remote_blocks = True
1145             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1146                 self._manifest_text = manifest_locator_or_text
1147                 if '+R' in self._manifest_text:
1148                     self._has_remote_blocks = True
1149             else:
1150                 raise errors.ArgumentError(
1151                     "Argument to CollectionReader is not a manifest or a collection UUID")
1152
1153             try:
1154                 self._populate()
1155             except errors.SyntaxError as e:
1156                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1157
1158     def storage_classes_desired(self) -> List[str]:
1159         """Get this collection's `storage_classes_desired` value"""
1160         return self._storage_classes_desired or []
1161
1162     def root_collection(self) -> 'Collection':
1163         return self
1164
1165     def get_properties(self) -> Properties:
1166         """Get this collection's properties
1167
1168         This method always returns a dict. If this collection object does not
1169         have an associated API record, or that record does not have any
1170         properties set, this method returns an empty dict.
1171         """
1172         if self._api_response and self._api_response["properties"]:
1173             return self._api_response["properties"]
1174         else:
1175             return {}
1176
1177     def get_trash_at(self) -> Optional[datetime.datetime]:
1178         """Get this collection's `trash_at` field
1179
1180         This method parses the `trash_at` field of the collection's API
1181         record and returns a datetime from it. If that field is not set, or
1182         this collection object does not have an associated API record,
1183         returns None.
1184         """
1185         if self._api_response and self._api_response["trash_at"]:
1186             try:
1187                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1188             except ValueError:
1189                 return None
1190         else:
1191             return None
1192
1193     def stream_name(self) -> str:
1194         return "."
1195
1196     def writable(self) -> bool:
1197         return True
1198
1199     @synchronized
1200     def known_past_version(
1201             self,
1202             modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1203     ) -> bool:
1204         """Indicate whether an API record for this collection has been seen before
1205
1206         As this collection object loads records from the API server, it records
1207         their `modified_at` and `portable_data_hash` fields. This method accepts
1208         a 2-tuple with values for those fields, and returns `True` if the
1209         combination was previously loaded.
1210         """
1211         return modified_at_and_portable_data_hash in self._past_versions
1212
1213     @synchronized
1214     @retry_method
1215     def update(
1216             self,
1217             other: Optional['Collection']=None,
1218             num_retries: Optional[int]=None,
1219     ) -> None:
1220         """Merge another collection's contents into this one
1221
1222         This method compares the manifest of this collection instance with
1223         another, then updates this instance's manifest with changes from the
1224         other, renaming files to flag conflicts where necessary.
1225
1226         When called without any arguments, this method reloads the collection's
1227         API record, and updates this instance with any changes that have
1228         appeared server-side. If this instance does not have a corresponding
1229         API record, this method raises `arvados.errors.ArgumentError`.
1230
1231         Arguments:
1232
1233         * other: arvados.collection.Collection | None --- The collection
1234           whose contents should be merged into this instance. When not
1235           provided, this method reloads this collection's API record and
1236           constructs a Collection object from it.  If this instance does not
1237           have a corresponding API record, this method raises
1238           `arvados.errors.ArgumentError`.
1239
1240         * num_retries: int | None --- The number of times to retry reloading
1241           the collection's API record from the API server. If not specified,
1242           uses the `num_retries` provided when this instance was constructed.
1243         """
1244         if other is None:
1245             if self._manifest_locator is None:
1246                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1247             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1248             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1249                 response.get("portable_data_hash") != self.portable_data_hash()):
1250                 # The record on the server is different from our current one, but we've seen it before,
1251                 # so ignore it because it's already been merged.
1252                 # However, if it's the same as our current record, proceed with the update, because we want to update
1253                 # our tokens.
1254                 return
1255             else:
1256                 self._remember_api_response(response)
1257             other = CollectionReader(response["manifest_text"])
1258         baseline = CollectionReader(self._manifest_text)
1259         self.apply(baseline.diff(other))
1260         self._manifest_text = self.manifest_text()
1261
1262     @synchronized
1263     def _my_api(self):
1264         if self._api_client is None:
1265             self._api_client = ThreadSafeApiCache(self._config, version='v1')
1266             if self._keep_client is None:
1267                 self._keep_client = self._api_client.keep
1268         return self._api_client
1269
1270     @synchronized
1271     def _my_keep(self):
1272         if self._keep_client is None:
1273             if self._api_client is None:
1274                 self._my_api()
1275             else:
1276                 self._keep_client = KeepClient(api_client=self._api_client)
1277         return self._keep_client
1278
1279     @synchronized
1280     def _my_block_manager(self):
1281         if self._block_manager is None:
1282             copies = (self.replication_desired or
1283                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1284                                                    2))
1285             self._block_manager = _BlockManager(self._my_keep(),
1286                                                 copies=copies,
1287                                                 put_threads=self.put_threads,
1288                                                 num_retries=self.num_retries,
1289                                                 storage_classes_func=self.storage_classes_desired)
1290         return self._block_manager
1291
1292     def _remember_api_response(self, response):
1293         self._api_response = response
1294         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1295
1296     def _populate_from_api_server(self):
1297         # As in KeepClient itself, we must wait until the last
1298         # possible moment to instantiate an API client, in order to
1299         # avoid tripping up clients that don't have access to an API
1300         # server.  If we do build one, make sure our Keep client uses
1301         # it.  If instantiation fails, we'll fall back to the except
1302         # clause, just like any other Collection lookup
1303         # failure. Return an exception, or None if successful.
1304         self._remember_api_response(self._my_api().collections().get(
1305             uuid=self._manifest_locator).execute(
1306                 num_retries=self.num_retries))
1307         self._manifest_text = self._api_response['manifest_text']
1308         self._portable_data_hash = self._api_response['portable_data_hash']
1309         # If not overriden via kwargs, we should try to load the
1310         # replication_desired and storage_classes_desired from the API server
1311         if self.replication_desired is None:
1312             self.replication_desired = self._api_response.get('replication_desired', None)
1313         if self._storage_classes_desired is None:
1314             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1315
1316     def _populate(self):
1317         if self._manifest_text is None:
1318             if self._manifest_locator is None:
1319                 return
1320             else:
1321                 self._populate_from_api_server()
1322         self._baseline_manifest = self._manifest_text
1323         self._import_manifest(self._manifest_text)
1324
1325     def _has_collection_uuid(self):
1326         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1327
1328     def _has_local_collection_uuid(self):
1329         return self._has_collection_uuid and \
1330             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1331
1332     def __enter__(self):
1333         return self
1334
1335     def __exit__(self, exc_type, exc_value, traceback):
1336         """Exit a context with this collection instance
1337
1338         If no exception was raised inside the context block, and this
1339         collection is writable and has a corresponding API record, that
1340         record will be updated to match the state of this instance at the end
1341         of the block.
1342         """
1343         if exc_type is None:
1344             if self.writable() and self._has_collection_uuid():
1345                 self.save()
1346         self.stop_threads()
1347
1348     def stop_threads(self) -> None:
1349         """Stop background Keep upload/download threads"""
1350         if self._block_manager is not None:
1351             self._block_manager.stop_threads()
1352
1353     @synchronized
1354     def manifest_locator(self) -> Optional[str]:
1355         """Get this collection's manifest locator, if any
1356
1357         * If this collection instance is associated with an API record with a
1358           UUID, return that.
1359         * Otherwise, if this collection instance was loaded from an API record
1360           by portable data hash, return that.
1361         * Otherwise, return `None`.
1362         """
1363         return self._manifest_locator
1364
1365     @synchronized
1366     def clone(
1367             self,
1368             new_parent: Optional['Collection']=None,
1369             new_name: Optional[str]=None,
1370             readonly: bool=False,
1371             new_config: Optional[Mapping[str, str]]=None,
1372     ) -> 'Collection':
1373         """Create a Collection object with the same contents as this instance
1374
1375         This method creates a new Collection object with contents that match
1376         this instance's. The new collection will not be associated with any API
1377         record.
1378
1379         Arguments:
1380
1381         * new_parent: arvados.collection.Collection | None --- This value is
1382           passed to the new Collection's constructor as the `parent`
1383           argument.
1384
1385         * new_name: str | None --- This value is unused.
1386
1387         * readonly: bool --- If this value is true, this method constructs and
1388           returns a `CollectionReader`. Otherwise, it returns a mutable
1389           `Collection`. Default `False`.
1390
1391         * new_config: Mapping[str, str] | None --- This value is passed to the
1392           new Collection's constructor as `apiconfig`. If no value is provided,
1393           defaults to the configuration passed to this instance's constructor.
1394         """
1395         if new_config is None:
1396             new_config = self._config
1397         if readonly:
1398             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1399         else:
1400             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1401
1402         newcollection._clonefrom(self)
1403         return newcollection
1404
1405     @synchronized
1406     def api_response(self) -> Optional[Dict[str, Any]]:
1407         """Get this instance's associated API record
1408
1409         If this Collection instance has an associated API record, return it.
1410         Otherwise, return `None`.
1411         """
1412         return self._api_response
1413
1414     def find_or_create(
1415             self,
1416             path: str,
1417             create_type: CreateType,
1418     ) -> CollectionItem:
1419         if path == ".":
1420             return self
1421         else:
1422             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1423
1424     def find(self, path: str) -> CollectionItem:
1425         if path == ".":
1426             return self
1427         else:
1428             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1429
1430     def remove(self, path: str, recursive: bool=False) -> None:
1431         if path == ".":
1432             raise errors.ArgumentError("Cannot remove '.'")
1433         else:
1434             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1435
1436     @must_be_writable
1437     @synchronized
1438     @retry_method
1439     def save(
1440             self,
1441             properties: Optional[Properties]=None,
1442             storage_classes: Optional[StorageClasses]=None,
1443             trash_at: Optional[datetime.datetime]=None,
1444             merge: bool=True,
1445             num_retries: Optional[int]=None,
1446             preserve_version: bool=False,
1447     ) -> str:
1448         """Save collection to an existing API record
1449
1450         This method updates the instance's corresponding API record to match
1451         the instance's state. If this instance does not have a corresponding API
1452         record yet, raises `AssertionError`. (To create a new API record, use
1453         `Collection.save_new`.) This method returns the saved collection
1454         manifest.
1455
1456         Arguments:
1457
1458         * properties: dict[str, Any] | None --- If provided, the API record will
1459           be updated with these properties. Note this will completely replace
1460           any existing properties.
1461
1462         * storage_classes: list[str] | None --- If provided, the API record will
1463           be updated with this value in the `storage_classes_desired` field.
1464           This value will also be saved on the instance and used for any
1465           changes that follow.
1466
1467         * trash_at: datetime.datetime | None --- If provided, the API record
1468           will be updated with this value in the `trash_at` field.
1469
1470         * merge: bool --- If `True` (the default), this method will first
1471           reload this collection's API record, and merge any new contents into
1472           this instance before saving changes. See `Collection.update` for
1473           details.
1474
1475         * num_retries: int | None --- The number of times to retry reloading
1476           the collection's API record from the API server. If not specified,
1477           uses the `num_retries` provided when this instance was constructed.
1478
1479         * preserve_version: bool --- This value will be passed to directly
1480           to the underlying API call. If `True`, the Arvados API will
1481           preserve the versions of this collection both immediately before
1482           and after the update. If `True` when the API server is not
1483           configured with collection versioning, this method raises
1484           `arvados.errors.ArgumentError`.
1485         """
1486         if properties and type(properties) is not dict:
1487             raise errors.ArgumentError("properties must be dictionary type.")
1488
1489         if storage_classes and type(storage_classes) is not list:
1490             raise errors.ArgumentError("storage_classes must be list type.")
1491         if storage_classes:
1492             self._storage_classes_desired = storage_classes
1493
1494         if trash_at and type(trash_at) is not datetime.datetime:
1495             raise errors.ArgumentError("trash_at must be datetime type.")
1496
1497         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1498             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1499
1500         body={}
1501         if properties:
1502             body["properties"] = properties
1503         if self.storage_classes_desired():
1504             body["storage_classes_desired"] = self.storage_classes_desired()
1505         if trash_at:
1506             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1507             body["trash_at"] = t
1508         if preserve_version:
1509             body["preserve_version"] = preserve_version
1510
1511         if not self.committed():
1512             if self._has_remote_blocks:
1513                 # Copy any remote blocks to the local cluster.
1514                 self._copy_remote_blocks(remote_blocks={})
1515                 self._has_remote_blocks = False
1516             if not self._has_collection_uuid():
1517                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1518             elif not self._has_local_collection_uuid():
1519                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1520
1521             self._my_block_manager().commit_all()
1522
1523             if merge:
1524                 self.update()
1525
1526             text = self.manifest_text(strip=False)
1527             body['manifest_text'] = text
1528
1529             self._remember_api_response(self._my_api().collections().update(
1530                 uuid=self._manifest_locator,
1531                 body=body
1532                 ).execute(num_retries=num_retries))
1533             self._manifest_text = self._api_response["manifest_text"]
1534             self._portable_data_hash = self._api_response["portable_data_hash"]
1535             self.set_committed(True)
1536         elif body:
1537             self._remember_api_response(self._my_api().collections().update(
1538                 uuid=self._manifest_locator,
1539                 body=body
1540                 ).execute(num_retries=num_retries))
1541
1542         return self._manifest_text
1543
1544
1545     @must_be_writable
1546     @synchronized
1547     @retry_method
1548     def save_new(
1549             self,
1550             name: Optional[str]=None,
1551             create_collection_record: bool=True,
1552             owner_uuid: Optional[str]=None,
1553             properties: Optional[Properties]=None,
1554             storage_classes: Optional[StorageClasses]=None,
1555             trash_at: Optional[datetime.datetime]=None,
1556             ensure_unique_name: bool=False,
1557             num_retries: Optional[int]=None,
1558             preserve_version: bool=False,
1559     ):
1560         """Save collection to a new API record
1561
1562         This method finishes uploading new data blocks and (optionally)
1563         creates a new API collection record with the provided data. If a new
1564         record is created, this instance becomes associated with that record
1565         for future updates like `save()`. This method returns the saved
1566         collection manifest.
1567
1568         Arguments:
1569
1570         * name: str | None --- The `name` field to use on the new collection
1571           record. If not specified, a generic default name is generated.
1572
1573         * create_collection_record: bool --- If `True` (the default), creates a
1574           collection record on the API server. If `False`, the method finishes
1575           all data uploads and only returns the resulting collection manifest
1576           without sending it to the API server.
1577
1578         * owner_uuid: str | None --- The `owner_uuid` field to use on the
1579           new collection record.
1580
1581         * properties: dict[str, Any] | None --- The `properties` field to use on
1582           the new collection record.
1583
1584         * storage_classes: list[str] | None --- The
1585           `storage_classes_desired` field to use on the new collection record.
1586
1587         * trash_at: datetime.datetime | None --- The `trash_at` field to use
1588           on the new collection record.
1589
1590         * ensure_unique_name: bool --- This value is passed directly to the
1591           Arvados API when creating the collection record. If `True`, the API
1592           server may modify the submitted `name` to ensure the collection's
1593           `name`+`owner_uuid` combination is unique. If `False` (the default),
1594           if a collection already exists with this same `name`+`owner_uuid`
1595           combination, creating a collection record will raise a validation
1596           error.
1597
1598         * num_retries: int | None --- The number of times to retry reloading
1599           the collection's API record from the API server. If not specified,
1600           uses the `num_retries` provided when this instance was constructed.
1601
1602         * preserve_version: bool --- This value will be passed to directly
1603           to the underlying API call. If `True`, the Arvados API will
1604           preserve the versions of this collection both immediately before
1605           and after the update. If `True` when the API server is not
1606           configured with collection versioning, this method raises
1607           `arvados.errors.ArgumentError`.
1608         """
1609         if properties and type(properties) is not dict:
1610             raise errors.ArgumentError("properties must be dictionary type.")
1611
1612         if storage_classes and type(storage_classes) is not list:
1613             raise errors.ArgumentError("storage_classes must be list type.")
1614
1615         if trash_at and type(trash_at) is not datetime.datetime:
1616             raise errors.ArgumentError("trash_at must be datetime type.")
1617
1618         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1619             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1620
1621         if self._has_remote_blocks:
1622             # Copy any remote blocks to the local cluster.
1623             self._copy_remote_blocks(remote_blocks={})
1624             self._has_remote_blocks = False
1625
1626         if storage_classes:
1627             self._storage_classes_desired = storage_classes
1628
1629         self._my_block_manager().commit_all()
1630         text = self.manifest_text(strip=False)
1631
1632         if create_collection_record:
1633             if name is None:
1634                 name = "New collection"
1635                 ensure_unique_name = True
1636
1637             body = {"manifest_text": text,
1638                     "name": name,
1639                     "replication_desired": self.replication_desired}
1640             if owner_uuid:
1641                 body["owner_uuid"] = owner_uuid
1642             if properties:
1643                 body["properties"] = properties
1644             if self.storage_classes_desired():
1645                 body["storage_classes_desired"] = self.storage_classes_desired()
1646             if trash_at:
1647                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1648                 body["trash_at"] = t
1649             if preserve_version:
1650                 body["preserve_version"] = preserve_version
1651
1652             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1653             text = self._api_response["manifest_text"]
1654
1655             self._manifest_locator = self._api_response["uuid"]
1656             self._portable_data_hash = self._api_response["portable_data_hash"]
1657
1658             self._manifest_text = text
1659             self.set_committed(True)
1660
1661         return text
1662
1663     _token_re = re.compile(r'(\S+)(\s+|$)')
1664     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1665     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1666
1667     def _unescape_manifest_path(self, path):
1668         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1669
1670     @synchronized
1671     def _import_manifest(self, manifest_text):
1672         """Import a manifest into a `Collection`.
1673
1674         :manifest_text:
1675           The manifest text to import from.
1676
1677         """
1678         if len(self) > 0:
1679             raise ArgumentError("Can only import manifest into an empty collection")
1680
1681         STREAM_NAME = 0
1682         BLOCKS = 1
1683         SEGMENTS = 2
1684
1685         stream_name = None
1686         state = STREAM_NAME
1687
1688         for token_and_separator in self._token_re.finditer(manifest_text):
1689             tok = token_and_separator.group(1)
1690             sep = token_and_separator.group(2)
1691
1692             if state == STREAM_NAME:
1693                 # starting a new stream
1694                 stream_name = self._unescape_manifest_path(tok)
1695                 blocks = []
1696                 segments = []
1697                 streamoffset = 0
1698                 state = BLOCKS
1699                 self.find_or_create(stream_name, COLLECTION)
1700                 continue
1701
1702             if state == BLOCKS:
1703                 block_locator = self._block_re.match(tok)
1704                 if block_locator:
1705                     blocksize = int(block_locator.group(1))
1706                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1707                     streamoffset += blocksize
1708                 else:
1709                     state = SEGMENTS
1710
1711             if state == SEGMENTS:
1712                 file_segment = self._segment_re.match(tok)
1713                 if file_segment:
1714                     pos = int(file_segment.group(1))
1715                     size = int(file_segment.group(2))
1716                     name = self._unescape_manifest_path(file_segment.group(3))
1717                     if name.split('/')[-1] == '.':
1718                         # placeholder for persisting an empty directory, not a real file
1719                         if len(name) > 2:
1720                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1721                     else:
1722                         filepath = os.path.join(stream_name, name)
1723                         try:
1724                             afile = self.find_or_create(filepath, FILE)
1725                         except IOError as e:
1726                             if e.errno == errno.ENOTDIR:
1727                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1728                             else:
1729                                 raise e from None
1730                         if isinstance(afile, ArvadosFile):
1731                             afile.add_segment(blocks, pos, size)
1732                         else:
1733                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1734                 else:
1735                     # error!
1736                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1737
1738             if sep == "\n":
1739                 stream_name = None
1740                 state = STREAM_NAME
1741
1742         self.set_committed(True)
1743
1744     @synchronized
1745     def notify(
1746             self,
1747             event: ChangeType,
1748             collection: 'RichCollectionBase',
1749             name: str,
1750             item: CollectionItem,
1751     ) -> None:
1752         if self._callback:
1753             self._callback(event, collection, name, item)
1754
1755
1756 class Subcollection(RichCollectionBase):
1757     """Read and manipulate a stream/directory within an Arvados collection
1758
1759     This class represents a single stream (like a directory) within an Arvados
1760     `Collection`. It is returned by `Collection.find` and provides the same API.
1761     Operations that work on the API collection record propagate to the parent
1762     `Collection` object.
1763     """
1764
1765     def __init__(self, parent, name):
1766         super(Subcollection, self).__init__(parent)
1767         self.lock = self.root_collection().lock
1768         self._manifest_text = None
1769         self.name = name
1770         self.num_retries = parent.num_retries
1771
1772     def root_collection(self) -> 'Collection':
1773         return self.parent.root_collection()
1774
1775     def writable(self) -> bool:
1776         return self.root_collection().writable()
1777
1778     def _my_api(self):
1779         return self.root_collection()._my_api()
1780
1781     def _my_keep(self):
1782         return self.root_collection()._my_keep()
1783
1784     def _my_block_manager(self):
1785         return self.root_collection()._my_block_manager()
1786
1787     def stream_name(self) -> str:
1788         return os.path.join(self.parent.stream_name(), self.name)
1789
1790     @synchronized
1791     def clone(
1792             self,
1793             new_parent: Optional['Collection']=None,
1794             new_name: Optional[str]=None,
1795     ) -> 'Subcollection':
1796         c = Subcollection(new_parent, new_name)
1797         c._clonefrom(self)
1798         return c
1799
1800     @must_be_writable
1801     @synchronized
1802     def _reparent(self, newparent, newname):
1803         self.set_committed(False)
1804         self.flush()
1805         self.parent.remove(self.name, recursive=True)
1806         self.parent = newparent
1807         self.name = newname
1808         self.lock = self.parent.root_collection().lock
1809
1810     @synchronized
1811     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1812         """Encode empty directories by using an \056-named (".") empty file"""
1813         if len(self._items) == 0:
1814             return "%s %s 0:0:\\056\n" % (
1815                 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1816         return super(Subcollection, self)._get_manifest_text(stream_name,
1817                                                              strip, normalize,
1818                                                              only_committed)
1819
1820
1821 class CollectionReader(Collection):
1822     """Read-only `Collection` subclass
1823
1824     This class will never create or update any API collection records. You can
1825     use this class for additional code safety when you only need to read
1826     existing collections.
1827     """
1828     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1829         self._in_init = True
1830         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1831         self._in_init = False
1832
1833         # Forego any locking since it should never change once initialized.
1834         self.lock = NoopLock()
1835
1836         # Backwards compatability with old CollectionReader
1837         # all_streams() and all_files()
1838         self._streams = None
1839
1840     def writable(self) -> bool:
1841         return self._in_init