21935: Move arvados.diskcache under arvados._internal
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
5
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
11
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
13 """
14
15 import ciso8601
16 import datetime
17 import errno
18 import functools
19 import hashlib
20 import io
21 import logging
22 import os
23 import re
24 import sys
25 import threading
26 import time
27
28 from collections import deque
29 from stat import *
30
31 from .api import ThreadSafeAPIClient
32 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
33 from .keep import KeepLocator, KeepClient
34 from ._normalize_stream import normalize_stream, escape
35 from ._ranges import Range, LocatorAndRange
36 import arvados.config as config
37 import arvados.errors as errors
38 import arvados.util
39 import arvados.events as events
40 from arvados.retry import retry_method
41
42 from typing import (
43     Any,
44     Callable,
45     Dict,
46     IO,
47     Iterator,
48     List,
49     Mapping,
50     Optional,
51     Tuple,
52     Union,
53 )
54
55 if sys.version_info < (3, 8):
56     from typing_extensions import Literal
57 else:
58     from typing import Literal
59
60 _logger = logging.getLogger('arvados.collection')
61
62 ADD = "add"
63 """Argument value for `Collection` methods to represent an added item"""
64 DEL = "del"
65 """Argument value for `Collection` methods to represent a removed item"""
66 MOD = "mod"
67 """Argument value for `Collection` methods to represent a modified item"""
68 TOK = "tok"
69 """Argument value for `Collection` methods to represent an item with token differences"""
70 FILE = "file"
71 """`create_type` value for `Collection.find_or_create`"""
72 COLLECTION = "collection"
73 """`create_type` value for `Collection.find_or_create`"""
74
75 ChangeList = List[Union[
76     Tuple[Literal[ADD, DEL], str, 'Collection'],
77     Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
78 ]]
79 ChangeType = Literal[ADD, DEL, MOD, TOK]
80 CollectionItem = Union[ArvadosFile, 'Collection']
81 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
82 CreateType = Literal[COLLECTION, FILE]
83 Properties = Dict[str, Any]
84 StorageClasses = List[str]
85
86 class CollectionBase(object):
87     """Abstract base class for Collection classes
88
89     .. ATTENTION:: Internal
90        This class is meant to be used by other parts of the SDK. User code
91        should instantiate or subclass `Collection` or one of its subclasses
92        directly.
93     """
94
95     def __enter__(self):
96         """Enter a context block with this collection instance"""
97         return self
98
99     def __exit__(self, exc_type, exc_value, traceback):
100         """Exit a context block with this collection instance"""
101         pass
102
103     def _my_keep(self):
104         if self._keep_client is None:
105             self._keep_client = KeepClient(api_client=self._api_client,
106                                            num_retries=self.num_retries)
107         return self._keep_client
108
109     def stripped_manifest(self) -> str:
110         """Create a copy of the collection manifest with only size hints
111
112         This method returns a string with the current collection's manifest
113         text with all non-portable locator hints like permission hints and
114         remote cluster hints removed. The only hints in the returned manifest
115         will be size hints.
116         """
117         raw = self.manifest_text()
118         clean = []
119         for line in raw.split("\n"):
120             fields = line.split()
121             if fields:
122                 clean_fields = fields[:1] + [
123                     (re.sub(r'\+[^\d][^\+]*', '', x)
124                      if re.match(arvados.util.keep_locator_pattern, x)
125                      else x)
126                     for x in fields[1:]]
127                 clean += [' '.join(clean_fields), "\n"]
128         return ''.join(clean)
129
130
131 class _WriterFile(_FileLikeObjectBase):
132     def __init__(self, coll_writer, name):
133         super(_WriterFile, self).__init__(name, 'wb')
134         self.dest = coll_writer
135
136     def close(self):
137         super(_WriterFile, self).close()
138         self.dest.finish_current_file()
139
140     @_FileLikeObjectBase._before_close
141     def write(self, data):
142         self.dest.write(data)
143
144     @_FileLikeObjectBase._before_close
145     def writelines(self, seq):
146         for data in seq:
147             self.write(data)
148
149     @_FileLikeObjectBase._before_close
150     def flush(self):
151         self.dest.flush_data()
152
153
154 class RichCollectionBase(CollectionBase):
155     """Base class for Collection classes
156
157     .. ATTENTION:: Internal
158        This class is meant to be used by other parts of the SDK. User code
159        should instantiate or subclass `Collection` or one of its subclasses
160        directly.
161     """
162
163     def __init__(self, parent=None):
164         self.parent = parent
165         self._committed = False
166         self._has_remote_blocks = False
167         self._callback = None
168         self._items = {}
169
170     def _my_api(self):
171         raise NotImplementedError()
172
173     def _my_keep(self):
174         raise NotImplementedError()
175
176     def _my_block_manager(self):
177         raise NotImplementedError()
178
179     def writable(self) -> bool:
180         """Indicate whether this collection object can be modified
181
182         This method returns `False` if this object is a `CollectionReader`,
183         else `True`.
184         """
185         raise NotImplementedError()
186
187     def root_collection(self) -> 'Collection':
188         """Get this collection's root collection object
189
190         If you open a subcollection with `Collection.find`, calling this method
191         on that subcollection returns the source Collection object.
192         """
193         raise NotImplementedError()
194
195     def stream_name(self) -> str:
196         """Get the name of the manifest stream represented by this collection
197
198         If you open a subcollection with `Collection.find`, calling this method
199         on that subcollection returns the name of the stream you opened.
200         """
201         raise NotImplementedError()
202
203     @synchronized
204     def has_remote_blocks(self) -> bool:
205         """Indiciate whether the collection refers to remote data
206
207         Returns `True` if the collection manifest includes any Keep locators
208         with a remote hint (`+R`), else `False`.
209         """
210         if self._has_remote_blocks:
211             return True
212         for item in self:
213             if self[item].has_remote_blocks():
214                 return True
215         return False
216
217     @synchronized
218     def set_has_remote_blocks(self, val: bool) -> None:
219         """Cache whether this collection refers to remote blocks
220
221         .. ATTENTION:: Internal
222            This method is only meant to be used by other Collection methods.
223
224         Set this collection's cached "has remote blocks" flag to the given
225         value.
226         """
227         self._has_remote_blocks = val
228         if self.parent:
229             self.parent.set_has_remote_blocks(val)
230
231     @must_be_writable
232     @synchronized
233     def find_or_create(
234             self,
235             path: str,
236             create_type: CreateType,
237     ) -> CollectionItem:
238         """Get the item at the given path, creating it if necessary
239
240         If `path` refers to a stream in this collection, returns a
241         corresponding `Subcollection` object. If `path` refers to a file in
242         this collection, returns a corresponding
243         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
244         this collection, then this method creates a new object and returns
245         it, creating parent streams as needed. The type of object created is
246         determined by the value of `create_type`.
247
248         Arguments:
249
250         * path: str --- The path to find or create within this collection.
251
252         * create_type: Literal[COLLECTION, FILE] --- The type of object to
253           create at `path` if one does not exist. Passing `COLLECTION`
254           creates a stream and returns the corresponding
255           `Subcollection`. Passing `FILE` creates a new file and returns the
256           corresponding `arvados.arvfile.ArvadosFile`.
257         """
258         pathcomponents = path.split("/", 1)
259         if pathcomponents[0]:
260             item = self._items.get(pathcomponents[0])
261             if len(pathcomponents) == 1:
262                 if item is None:
263                     # create new file
264                     if create_type == COLLECTION:
265                         item = Subcollection(self, pathcomponents[0])
266                     else:
267                         item = ArvadosFile(self, pathcomponents[0])
268                     self._items[pathcomponents[0]] = item
269                     self.set_committed(False)
270                     self.notify(ADD, self, pathcomponents[0], item)
271                 return item
272             else:
273                 if item is None:
274                     # create new collection
275                     item = Subcollection(self, pathcomponents[0])
276                     self._items[pathcomponents[0]] = item
277                     self.set_committed(False)
278                     self.notify(ADD, self, pathcomponents[0], item)
279                 if isinstance(item, RichCollectionBase):
280                     return item.find_or_create(pathcomponents[1], create_type)
281                 else:
282                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
283         else:
284             return self
285
286     @synchronized
287     def find(self, path: str) -> CollectionItem:
288         """Get the item at the given path
289
290         If `path` refers to a stream in this collection, returns a
291         corresponding `Subcollection` object. If `path` refers to a file in
292         this collection, returns a corresponding
293         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
294         this collection, then this method raises `NotADirectoryError`.
295
296         Arguments:
297
298         * path: str --- The path to find or create within this collection.
299         """
300         if not path:
301             raise errors.ArgumentError("Parameter 'path' is empty.")
302
303         pathcomponents = path.split("/", 1)
304         if pathcomponents[0] == '':
305             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
306
307         item = self._items.get(pathcomponents[0])
308         if item is None:
309             return None
310         elif len(pathcomponents) == 1:
311             return item
312         else:
313             if isinstance(item, RichCollectionBase):
314                 if pathcomponents[1]:
315                     return item.find(pathcomponents[1])
316                 else:
317                     return item
318             else:
319                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
320
321     @synchronized
322     def mkdirs(self, path: str) -> 'Subcollection':
323         """Create and return a subcollection at `path`
324
325         If `path` exists within this collection, raises `FileExistsError`.
326         Otherwise, creates a stream at that path and returns the
327         corresponding `Subcollection`.
328         """
329         if self.find(path) != None:
330             raise IOError(errno.EEXIST, "Directory or file exists", path)
331
332         return self.find_or_create(path, COLLECTION)
333
334     def open(
335             self,
336             path: str,
337             mode: str="r",
338             encoding: Optional[str]=None
339     ) -> IO:
340         """Open a file-like object within the collection
341
342         This method returns a file-like object that can read and/or write the
343         file located at `path` within the collection. If you attempt to write
344         a `path` that does not exist, the file is created with `find_or_create`.
345         If the file cannot be opened for any other reason, this method raises
346         `OSError` with an appropriate errno.
347
348         Arguments:
349
350         * path: str --- The path of the file to open within this collection
351
352         * mode: str --- The mode to open this file. Supports all the same
353           values as `builtins.open`.
354
355         * encoding: str | None --- The text encoding of the file. Only used
356           when the file is opened in text mode. The default is
357           platform-dependent.
358
359         """
360         if not re.search(r'^[rwa][bt]?\+?$', mode):
361             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
362
363         if mode[0] == 'r' and '+' not in mode:
364             fclass = ArvadosFileReader
365             arvfile = self.find(path)
366         elif not self.writable():
367             raise IOError(errno.EROFS, "Collection is read only")
368         else:
369             fclass = ArvadosFileWriter
370             arvfile = self.find_or_create(path, FILE)
371
372         if arvfile is None:
373             raise IOError(errno.ENOENT, "File not found", path)
374         if not isinstance(arvfile, ArvadosFile):
375             raise IOError(errno.EISDIR, "Is a directory", path)
376
377         if mode[0] == 'w':
378             arvfile.truncate(0)
379
380         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
381         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
382         if 'b' not in mode:
383             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
384             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
385         return f
386
387     def modified(self) -> bool:
388         """Indicate whether this collection has an API server record
389
390         Returns `False` if this collection corresponds to a record loaded from
391         the API server, `True` otherwise.
392         """
393         return not self.committed()
394
395     @synchronized
396     def committed(self):
397         """Indicate whether this collection has an API server record
398
399         Returns `True` if this collection corresponds to a record loaded from
400         the API server, `False` otherwise.
401         """
402         return self._committed
403
404     @synchronized
405     def set_committed(self, value: bool=True):
406         """Cache whether this collection has an API server record
407
408         .. ATTENTION:: Internal
409            This method is only meant to be used by other Collection methods.
410
411         Set this collection's cached "committed" flag to the given
412         value and propagates it as needed.
413         """
414         if value == self._committed:
415             return
416         if value:
417             for k,v in self._items.items():
418                 v.set_committed(True)
419             self._committed = True
420         else:
421             self._committed = False
422             if self.parent is not None:
423                 self.parent.set_committed(False)
424
425     @synchronized
426     def __iter__(self) -> Iterator[str]:
427         """Iterate names of streams and files in this collection
428
429         This method does not recurse. It only iterates the contents of this
430         collection's corresponding stream.
431         """
432         return iter(self._items)
433
434     @synchronized
435     def __getitem__(self, k: str) -> CollectionItem:
436         """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
437
438         This method does not recurse. If you want to search a path, use
439         `RichCollectionBase.find` instead.
440         """
441         return self._items[k]
442
443     @synchronized
444     def __contains__(self, k: str) -> bool:
445         """Indicate whether this collection has an item with this name
446
447         This method does not recurse. It you want to check a path, use
448         `RichCollectionBase.exists` instead.
449         """
450         return k in self._items
451
452     @synchronized
453     def __len__(self):
454         """Get the number of items directly contained in this collection
455
456         This method does not recurse. It only counts the streams and files
457         in this collection's corresponding stream.
458         """
459         return len(self._items)
460
461     @must_be_writable
462     @synchronized
463     def __delitem__(self, p: str) -> None:
464         """Delete an item from this collection's stream
465
466         This method does not recurse. If you want to remove an item by a
467         path, use `RichCollectionBase.remove` instead.
468         """
469         del self._items[p]
470         self.set_committed(False)
471         self.notify(DEL, self, p, None)
472
473     @synchronized
474     def keys(self) -> Iterator[str]:
475         """Iterate names of streams and files in this collection
476
477         This method does not recurse. It only iterates the contents of this
478         collection's corresponding stream.
479         """
480         return self._items.keys()
481
482     @synchronized
483     def values(self) -> List[CollectionItem]:
484         """Get a list of objects in this collection's stream
485
486         The return value includes a `Subcollection` for every stream, and an
487         `arvados.arvfile.ArvadosFile` for every file, directly within this
488         collection's stream.  This method does not recurse.
489         """
490         return list(self._items.values())
491
492     @synchronized
493     def items(self) -> List[Tuple[str, CollectionItem]]:
494         """Get a list of `(name, object)` tuples from this collection's stream
495
496         The return value includes a `Subcollection` for every stream, and an
497         `arvados.arvfile.ArvadosFile` for every file, directly within this
498         collection's stream.  This method does not recurse.
499         """
500         return list(self._items.items())
501
502     def exists(self, path: str) -> bool:
503         """Indicate whether this collection includes an item at `path`
504
505         This method returns `True` if `path` refers to a stream or file within
506         this collection, else `False`.
507
508         Arguments:
509
510         * path: str --- The path to check for existence within this collection
511         """
512         return self.find(path) is not None
513
514     @must_be_writable
515     @synchronized
516     def remove(self, path: str, recursive: bool=False) -> None:
517         """Remove the file or stream at `path`
518
519         Arguments:
520
521         * path: str --- The path of the item to remove from the collection
522
523         * recursive: bool --- Controls the method's behavior if `path` refers
524           to a nonempty stream. If `False` (the default), this method raises
525           `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
526           items under the stream.
527         """
528         if not path:
529             raise errors.ArgumentError("Parameter 'path' is empty.")
530
531         pathcomponents = path.split("/", 1)
532         item = self._items.get(pathcomponents[0])
533         if item is None:
534             raise IOError(errno.ENOENT, "File not found", path)
535         if len(pathcomponents) == 1:
536             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
537                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
538             deleteditem = self._items[pathcomponents[0]]
539             del self._items[pathcomponents[0]]
540             self.set_committed(False)
541             self.notify(DEL, self, pathcomponents[0], deleteditem)
542         else:
543             item.remove(pathcomponents[1], recursive=recursive)
544
545     def _clonefrom(self, source):
546         for k,v in source.items():
547             self._items[k] = v.clone(self, k)
548
549     def clone(self):
550         raise NotImplementedError()
551
552     @must_be_writable
553     @synchronized
554     def add(
555             self,
556             source_obj: CollectionItem,
557             target_name: str,
558             overwrite: bool=False,
559             reparent: bool=False,
560     ) -> None:
561         """Copy or move a file or subcollection object to this collection
562
563         Arguments:
564
565         * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
566           to add to this collection
567
568         * target_name: str --- The path inside this collection where
569           `source_obj` should be added.
570
571         * overwrite: bool --- Controls the behavior of this method when the
572           collection already contains an object at `target_name`. If `False`
573           (the default), this method will raise `FileExistsError`. If `True`,
574           the object at `target_name` will be replaced with `source_obj`.
575
576         * reparent: bool --- Controls whether this method copies or moves
577           `source_obj`. If `False` (the default), `source_obj` is copied into
578           this collection. If `True`, `source_obj` is moved into this
579           collection.
580         """
581         if target_name in self and not overwrite:
582             raise IOError(errno.EEXIST, "File already exists", target_name)
583
584         modified_from = None
585         if target_name in self:
586             modified_from = self[target_name]
587
588         # Actually make the move or copy.
589         if reparent:
590             source_obj._reparent(self, target_name)
591             item = source_obj
592         else:
593             item = source_obj.clone(self, target_name)
594
595         self._items[target_name] = item
596         self.set_committed(False)
597         if not self._has_remote_blocks and source_obj.has_remote_blocks():
598             self.set_has_remote_blocks(True)
599
600         if modified_from:
601             self.notify(MOD, self, target_name, (modified_from, item))
602         else:
603             self.notify(ADD, self, target_name, item)
604
605     def _get_src_target(self, source, target_path, source_collection, create_dest):
606         if source_collection is None:
607             source_collection = self
608
609         # Find the object
610         if isinstance(source, str):
611             source_obj = source_collection.find(source)
612             if source_obj is None:
613                 raise IOError(errno.ENOENT, "File not found", source)
614             sourcecomponents = source.split("/")
615         else:
616             source_obj = source
617             sourcecomponents = None
618
619         # Find parent collection the target path
620         targetcomponents = target_path.split("/")
621
622         # Determine the name to use.
623         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
624
625         if not target_name:
626             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
627
628         if create_dest:
629             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
630         else:
631             if len(targetcomponents) > 1:
632                 target_dir = self.find("/".join(targetcomponents[0:-1]))
633             else:
634                 target_dir = self
635
636         if target_dir is None:
637             raise IOError(errno.ENOENT, "Target directory not found", target_name)
638
639         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
640             target_dir = target_dir[target_name]
641             target_name = sourcecomponents[-1]
642
643         return (source_obj, target_dir, target_name)
644
645     @must_be_writable
646     @synchronized
647     def copy(
648             self,
649             source: Union[str, CollectionItem],
650             target_path: str,
651             source_collection: Optional['RichCollectionBase']=None,
652             overwrite: bool=False,
653     ) -> None:
654         """Copy a file or subcollection object to this collection
655
656         Arguments:
657
658         * source: str | arvados.arvfile.ArvadosFile |
659           arvados.collection.Subcollection --- The file or subcollection to
660           add to this collection. If `source` is a str, the object will be
661           found by looking up this path from `source_collection` (see
662           below).
663
664         * target_path: str --- The path inside this collection where the
665           source object should be added.
666
667         * source_collection: arvados.collection.Collection | None --- The
668           collection to find the source object from when `source` is a
669           path. Defaults to the current collection (`self`).
670
671         * overwrite: bool --- Controls the behavior of this method when the
672           collection already contains an object at `target_path`. If `False`
673           (the default), this method will raise `FileExistsError`. If `True`,
674           the object at `target_path` will be replaced with `source_obj`.
675         """
676         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
677         target_dir.add(source_obj, target_name, overwrite, False)
678
679     @must_be_writable
680     @synchronized
681     def rename(
682             self,
683             source: Union[str, CollectionItem],
684             target_path: str,
685             source_collection: Optional['RichCollectionBase']=None,
686             overwrite: bool=False,
687     ) -> None:
688         """Move a file or subcollection object to this collection
689
690         Arguments:
691
692         * source: str | arvados.arvfile.ArvadosFile |
693           arvados.collection.Subcollection --- The file or subcollection to
694           add to this collection. If `source` is a str, the object will be
695           found by looking up this path from `source_collection` (see
696           below).
697
698         * target_path: str --- The path inside this collection where the
699           source object should be added.
700
701         * source_collection: arvados.collection.Collection | None --- The
702           collection to find the source object from when `source` is a
703           path. Defaults to the current collection (`self`).
704
705         * overwrite: bool --- Controls the behavior of this method when the
706           collection already contains an object at `target_path`. If `False`
707           (the default), this method will raise `FileExistsError`. If `True`,
708           the object at `target_path` will be replaced with `source_obj`.
709         """
710         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
711         if not source_obj.writable():
712             raise IOError(errno.EROFS, "Source collection is read only", source)
713         target_dir.add(source_obj, target_name, overwrite, True)
714
715     def portable_manifest_text(self, stream_name: str=".") -> str:
716         """Get the portable manifest text for this collection
717
718         The portable manifest text is normalized, and does not include access
719         tokens. This method does not flush outstanding blocks to Keep.
720
721         Arguments:
722
723         * stream_name: str --- The name to use for this collection's stream in
724           the generated manifest. Default `'.'`.
725         """
726         return self._get_manifest_text(stream_name, True, True)
727
728     @synchronized
729     def manifest_text(
730             self,
731             stream_name: str=".",
732             strip: bool=False,
733             normalize: bool=False,
734             only_committed: bool=False,
735     ) -> str:
736         """Get the manifest text for this collection
737
738         Arguments:
739
740         * stream_name: str --- The name to use for this collection's stream in
741           the generated manifest. Default `'.'`.
742
743         * strip: bool --- Controls whether or not the returned manifest text
744           includes access tokens. If `False` (the default), the manifest text
745           will include access tokens. If `True`, the manifest text will not
746           include access tokens.
747
748         * normalize: bool --- Controls whether or not the returned manifest
749           text is normalized. Default `False`.
750
751         * only_committed: bool --- Controls whether or not this method uploads
752           pending data to Keep before building and returning the manifest text.
753           If `False` (the default), this method will finish uploading all data
754           to Keep, then return the final manifest. If `True`, this method will
755           build and return a manifest that only refers to the data that has
756           finished uploading at the time this method was called.
757         """
758         if not only_committed:
759             self._my_block_manager().commit_all()
760         return self._get_manifest_text(stream_name, strip, normalize,
761                                        only_committed=only_committed)
762
763     @synchronized
764     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
765         """Get the manifest text for this collection, sub collections and files.
766
767         :stream_name:
768           Name to use for this stream (directory)
769
770         :strip:
771           If True, remove signing tokens from block locators if present.
772           If False (default), block locators are left unchanged.
773
774         :normalize:
775           If True, always export the manifest text in normalized form
776           even if the Collection is not modified.  If False (default) and the collection
777           is not modified, return the original manifest text even if it is not
778           in normalized form.
779
780         :only_committed:
781           If True, only include blocks that were already committed to Keep.
782
783         """
784
785         if not self.committed() or self._manifest_text is None or normalize:
786             stream = {}
787             buf = []
788             sorted_keys = sorted(self.keys())
789             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
790                 # Create a stream per file `k`
791                 arvfile = self[filename]
792                 filestream = []
793                 for segment in arvfile.segments():
794                     loc = segment.locator
795                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
796                         if only_committed:
797                             continue
798                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
799                     if strip:
800                         loc = KeepLocator(loc).stripped()
801                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
802                                          segment.segment_offset, segment.range_size))
803                 stream[filename] = filestream
804             if stream:
805                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
806             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
807                 buf.append(self[dirname].manifest_text(
808                     stream_name=os.path.join(stream_name, dirname),
809                     strip=strip, normalize=True, only_committed=only_committed))
810             return "".join(buf)
811         else:
812             if strip:
813                 return self.stripped_manifest()
814             else:
815                 return self._manifest_text
816
817     @synchronized
818     def _copy_remote_blocks(self, remote_blocks={}):
819         """Scan through the entire collection and ask Keep to copy remote blocks.
820
821         When accessing a remote collection, blocks will have a remote signature
822         (+R instead of +A). Collect these signatures and request Keep to copy the
823         blocks to the local cluster, returning local (+A) signatures.
824
825         :remote_blocks:
826           Shared cache of remote to local block mappings. This is used to avoid
827           doing extra work when blocks are shared by more than one file in
828           different subdirectories.
829
830         """
831         for item in self:
832             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
833         return remote_blocks
834
835     @synchronized
836     def diff(
837             self,
838             end_collection: 'RichCollectionBase',
839             prefix: str=".",
840             holding_collection: Optional['Collection']=None,
841     ) -> ChangeList:
842         """Build a list of differences between this collection and another
843
844         Arguments:
845
846         * end_collection: arvados.collection.RichCollectionBase --- A
847           collection object with the desired end state. The returned diff
848           list will describe how to go from the current collection object
849           `self` to `end_collection`.
850
851         * prefix: str --- The name to use for this collection's stream in
852           the diff list. Default `'.'`.
853
854         * holding_collection: arvados.collection.Collection | None --- A
855           collection object used to hold objects for the returned diff
856           list. By default, a new empty collection is created.
857         """
858         changes = []
859         if holding_collection is None:
860             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
861         for k in self:
862             if k not in end_collection:
863                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
864         for k in end_collection:
865             if k in self:
866                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
867                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
868                 elif end_collection[k] != self[k]:
869                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
870                 else:
871                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
872             else:
873                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
874         return changes
875
876     @must_be_writable
877     @synchronized
878     def apply(self, changes: ChangeList) -> None:
879         """Apply a list of changes from to this collection
880
881         This method takes a list of changes generated by
882         `RichCollectionBase.diff` and applies it to this
883         collection. Afterward, the state of this collection object will
884         match the state of `end_collection` passed to `diff`. If a change
885         conflicts with a local change, it will be saved to an alternate path
886         indicating the conflict.
887
888         Arguments:
889
890         * changes: arvados.collection.ChangeList --- The list of differences
891           generated by `RichCollectionBase.diff`.
892         """
893         if changes:
894             self.set_committed(False)
895         for change in changes:
896             event_type = change[0]
897             path = change[1]
898             initial = change[2]
899             local = self.find(path)
900             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
901                                                                     time.gmtime()))
902             if event_type == ADD:
903                 if local is None:
904                     # No local file at path, safe to copy over new file
905                     self.copy(initial, path)
906                 elif local is not None and local != initial:
907                     # There is already local file and it is different:
908                     # save change to conflict file.
909                     self.copy(initial, conflictpath)
910             elif event_type == MOD or event_type == TOK:
911                 final = change[3]
912                 if local == initial:
913                     # Local matches the "initial" item so it has not
914                     # changed locally and is safe to update.
915                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
916                         # Replace contents of local file with new contents
917                         local.replace_contents(final)
918                     else:
919                         # Overwrite path with new item; this can happen if
920                         # path was a file and is now a collection or vice versa
921                         self.copy(final, path, overwrite=True)
922                 else:
923                     # Local is missing (presumably deleted) or local doesn't
924                     # match the "start" value, so save change to conflict file
925                     self.copy(final, conflictpath)
926             elif event_type == DEL:
927                 if local == initial:
928                     # Local item matches "initial" value, so it is safe to remove.
929                     self.remove(path, recursive=True)
930                 # else, the file is modified or already removed, in either
931                 # case we don't want to try to remove it.
932
933     def portable_data_hash(self) -> str:
934         """Get the portable data hash for this collection's manifest"""
935         if self._manifest_locator and self.committed():
936             # If the collection is already saved on the API server, and it's committed
937             # then return API server's PDH response.
938             return self._portable_data_hash
939         else:
940             stripped = self.portable_manifest_text().encode()
941             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
942
943     @synchronized
944     def subscribe(self, callback: ChangeCallback) -> None:
945         """Set a notify callback for changes to this collection
946
947         Arguments:
948
949         * callback: arvados.collection.ChangeCallback --- The callable to
950           call each time the collection is changed.
951         """
952         if self._callback is None:
953             self._callback = callback
954         else:
955             raise errors.ArgumentError("A callback is already set on this collection.")
956
957     @synchronized
958     def unsubscribe(self) -> None:
959         """Remove any notify callback set for changes to this collection"""
960         if self._callback is not None:
961             self._callback = None
962
963     @synchronized
964     def notify(
965             self,
966             event: ChangeType,
967             collection: 'RichCollectionBase',
968             name: str,
969             item: CollectionItem,
970     ) -> None:
971         """Notify any subscribed callback about a change to this collection
972
973         .. ATTENTION:: Internal
974            This method is only meant to be used by other Collection methods.
975
976         If a callback has been registered with `RichCollectionBase.subscribe`,
977         it will be called with information about a change to this collection.
978         Then this notification will be propagated to this collection's root.
979
980         Arguments:
981
982         * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
983           the collection.
984
985         * collection: arvados.collection.RichCollectionBase --- The
986           collection that was modified.
987
988         * name: str --- The name of the file or stream within `collection` that
989           was modified.
990
991         * item: arvados.arvfile.ArvadosFile |
992           arvados.collection.Subcollection --- The new contents at `name`
993           within `collection`.
994         """
995         if self._callback:
996             self._callback(event, collection, name, item)
997         self.root_collection().notify(event, collection, name, item)
998
999     @synchronized
1000     def __eq__(self, other: Any) -> bool:
1001         """Indicate whether this collection object is equal to another"""
1002         if other is self:
1003             return True
1004         if not isinstance(other, RichCollectionBase):
1005             return False
1006         if len(self._items) != len(other):
1007             return False
1008         for k in self._items:
1009             if k not in other:
1010                 return False
1011             if self._items[k] != other[k]:
1012                 return False
1013         return True
1014
1015     def __ne__(self, other: Any) -> bool:
1016         """Indicate whether this collection object is not equal to another"""
1017         return not self.__eq__(other)
1018
1019     @synchronized
1020     def flush(self) -> None:
1021         """Upload any pending data to Keep"""
1022         for e in self.values():
1023             e.flush()
1024
1025
1026 class Collection(RichCollectionBase):
1027     """Read and manipulate an Arvados collection
1028
1029     This class provides a high-level interface to create, read, and update
1030     Arvados collections and their contents. Refer to the Arvados Python SDK
1031     cookbook for [an introduction to using the Collection class][cookbook].
1032
1033     [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1034     """
1035
1036     def __init__(self, manifest_locator_or_text: Optional[str]=None,
1037                  api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1038                  keep_client: Optional['arvados.keep.KeepClient']=None,
1039                  num_retries: int=10,
1040                  parent: Optional['Collection']=None,
1041                  apiconfig: Optional[Mapping[str, str]]=None,
1042                  block_manager: Optional['arvados.arvfile._BlockManager']=None,
1043                  replication_desired: Optional[int]=None,
1044                  storage_classes_desired: Optional[List[str]]=None,
1045                  put_threads: Optional[int]=None):
1046         """Initialize a Collection object
1047
1048         Arguments:
1049
1050         * manifest_locator_or_text: str | None --- This string can contain a
1051           collection manifest text, portable data hash, or UUID. When given a
1052           portable data hash or UUID, this instance will load a collection
1053           record from the API server. Otherwise, this instance will represent a
1054           new collection without an API server record. The default value `None`
1055           instantiates a new collection with an empty manifest.
1056
1057         * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1058           Arvados API client object this instance uses to make requests. If
1059           none is given, this instance creates its own client using the
1060           settings from `apiconfig` (see below). If your client instantiates
1061           many Collection objects, you can help limit memory utilization by
1062           calling `arvados.api.api` to construct an
1063           `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1064           for every Collection.
1065
1066         * keep_client: arvados.keep.KeepClient | None --- The Keep client
1067           object this instance uses to make requests. If none is given, this
1068           instance creates its own client using its `api_client`.
1069
1070         * num_retries: int --- The number of times that client requests are
1071           retried. Default 10.
1072
1073         * parent: arvados.collection.Collection | None --- The parent Collection
1074           object of this instance, if any. This argument is primarily used by
1075           other Collection methods; user client code shouldn't need to use it.
1076
1077         * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1078           `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1079           `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1080           Collection object constructs one from these settings. If no
1081           mapping is provided, calls `arvados.config.settings` to get these
1082           parameters from user configuration.
1083
1084         * block_manager: arvados.arvfile._BlockManager | None --- The
1085           _BlockManager object used by this instance to coordinate reading
1086           and writing Keep data blocks. If none is given, this instance
1087           constructs its own. This argument is primarily used by other
1088           Collection methods; user client code shouldn't need to use it.
1089
1090         * replication_desired: int | None --- This controls both the value of
1091           the `replication_desired` field on API collection records saved by
1092           this class, as well as the number of Keep services that the object
1093           writes new data blocks to. If none is given, uses the default value
1094           configured for the cluster.
1095
1096         * storage_classes_desired: list[str] | None --- This controls both
1097           the value of the `storage_classes_desired` field on API collection
1098           records saved by this class, as well as selecting which specific
1099           Keep services the object writes new data blocks to. If none is
1100           given, defaults to an empty list.
1101
1102         * put_threads: int | None --- The number of threads to run
1103           simultaneously to upload data blocks to Keep. This value is used when
1104           building a new `block_manager`. It is unused when a `block_manager`
1105           is provided.
1106         """
1107
1108         if storage_classes_desired and type(storage_classes_desired) is not list:
1109             raise errors.ArgumentError("storage_classes_desired must be list type.")
1110
1111         super(Collection, self).__init__(parent)
1112         self._api_client = api_client
1113         self._keep_client = keep_client
1114
1115         # Use the keep client from ThreadSafeAPIClient
1116         if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1117             self._keep_client = self._api_client.keep
1118
1119         self._block_manager = block_manager
1120         self.replication_desired = replication_desired
1121         self._storage_classes_desired = storage_classes_desired
1122         self.put_threads = put_threads
1123
1124         if apiconfig:
1125             self._config = apiconfig
1126         else:
1127             self._config = config.settings()
1128
1129         self.num_retries = num_retries
1130         self._manifest_locator = None
1131         self._manifest_text = None
1132         self._portable_data_hash = None
1133         self._api_response = None
1134         self._past_versions = set()
1135
1136         self.lock = threading.RLock()
1137         self.events = None
1138
1139         if manifest_locator_or_text:
1140             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1141                 self._manifest_locator = manifest_locator_or_text
1142             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1143                 self._manifest_locator = manifest_locator_or_text
1144                 if not self._has_local_collection_uuid():
1145                     self._has_remote_blocks = True
1146             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1147                 self._manifest_text = manifest_locator_or_text
1148                 if '+R' in self._manifest_text:
1149                     self._has_remote_blocks = True
1150             else:
1151                 raise errors.ArgumentError(
1152                     "Argument to CollectionReader is not a manifest or a collection UUID")
1153
1154             try:
1155                 self._populate()
1156             except errors.SyntaxError as e:
1157                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1158
1159     def storage_classes_desired(self) -> List[str]:
1160         """Get this collection's `storage_classes_desired` value"""
1161         return self._storage_classes_desired or []
1162
1163     def root_collection(self) -> 'Collection':
1164         return self
1165
1166     def get_properties(self) -> Properties:
1167         """Get this collection's properties
1168
1169         This method always returns a dict. If this collection object does not
1170         have an associated API record, or that record does not have any
1171         properties set, this method returns an empty dict.
1172         """
1173         if self._api_response and self._api_response["properties"]:
1174             return self._api_response["properties"]
1175         else:
1176             return {}
1177
1178     def get_trash_at(self) -> Optional[datetime.datetime]:
1179         """Get this collection's `trash_at` field
1180
1181         This method parses the `trash_at` field of the collection's API
1182         record and returns a datetime from it. If that field is not set, or
1183         this collection object does not have an associated API record,
1184         returns None.
1185         """
1186         if self._api_response and self._api_response["trash_at"]:
1187             try:
1188                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1189             except ValueError:
1190                 return None
1191         else:
1192             return None
1193
1194     def stream_name(self) -> str:
1195         return "."
1196
1197     def writable(self) -> bool:
1198         return True
1199
1200     @synchronized
1201     def known_past_version(
1202             self,
1203             modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1204     ) -> bool:
1205         """Indicate whether an API record for this collection has been seen before
1206
1207         As this collection object loads records from the API server, it records
1208         their `modified_at` and `portable_data_hash` fields. This method accepts
1209         a 2-tuple with values for those fields, and returns `True` if the
1210         combination was previously loaded.
1211         """
1212         return modified_at_and_portable_data_hash in self._past_versions
1213
1214     @synchronized
1215     @retry_method
1216     def update(
1217             self,
1218             other: Optional['Collection']=None,
1219             num_retries: Optional[int]=None,
1220     ) -> None:
1221         """Merge another collection's contents into this one
1222
1223         This method compares the manifest of this collection instance with
1224         another, then updates this instance's manifest with changes from the
1225         other, renaming files to flag conflicts where necessary.
1226
1227         When called without any arguments, this method reloads the collection's
1228         API record, and updates this instance with any changes that have
1229         appeared server-side. If this instance does not have a corresponding
1230         API record, this method raises `arvados.errors.ArgumentError`.
1231
1232         Arguments:
1233
1234         * other: arvados.collection.Collection | None --- The collection
1235           whose contents should be merged into this instance. When not
1236           provided, this method reloads this collection's API record and
1237           constructs a Collection object from it.  If this instance does not
1238           have a corresponding API record, this method raises
1239           `arvados.errors.ArgumentError`.
1240
1241         * num_retries: int | None --- The number of times to retry reloading
1242           the collection's API record from the API server. If not specified,
1243           uses the `num_retries` provided when this instance was constructed.
1244         """
1245         if other is None:
1246             if self._manifest_locator is None:
1247                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1248             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1249             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1250                 response.get("portable_data_hash") != self.portable_data_hash()):
1251                 # The record on the server is different from our current one, but we've seen it before,
1252                 # so ignore it because it's already been merged.
1253                 # However, if it's the same as our current record, proceed with the update, because we want to update
1254                 # our tokens.
1255                 return
1256             else:
1257                 self._remember_api_response(response)
1258             other = CollectionReader(response["manifest_text"])
1259         baseline = CollectionReader(self._manifest_text)
1260         self.apply(baseline.diff(other))
1261         self._manifest_text = self.manifest_text()
1262
1263     @synchronized
1264     def _my_api(self):
1265         if self._api_client is None:
1266             self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1267             if self._keep_client is None:
1268                 self._keep_client = self._api_client.keep
1269         return self._api_client
1270
1271     @synchronized
1272     def _my_keep(self):
1273         if self._keep_client is None:
1274             if self._api_client is None:
1275                 self._my_api()
1276             else:
1277                 self._keep_client = KeepClient(api_client=self._api_client)
1278         return self._keep_client
1279
1280     @synchronized
1281     def _my_block_manager(self):
1282         if self._block_manager is None:
1283             copies = (self.replication_desired or
1284                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1285                                                    2))
1286             self._block_manager = _BlockManager(self._my_keep(),
1287                                                 copies=copies,
1288                                                 put_threads=self.put_threads,
1289                                                 num_retries=self.num_retries,
1290                                                 storage_classes_func=self.storage_classes_desired)
1291         return self._block_manager
1292
1293     def _remember_api_response(self, response):
1294         self._api_response = response
1295         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1296
1297     def _populate_from_api_server(self):
1298         # As in KeepClient itself, we must wait until the last
1299         # possible moment to instantiate an API client, in order to
1300         # avoid tripping up clients that don't have access to an API
1301         # server.  If we do build one, make sure our Keep client uses
1302         # it.  If instantiation fails, we'll fall back to the except
1303         # clause, just like any other Collection lookup
1304         # failure. Return an exception, or None if successful.
1305         self._remember_api_response(self._my_api().collections().get(
1306             uuid=self._manifest_locator).execute(
1307                 num_retries=self.num_retries))
1308         self._manifest_text = self._api_response['manifest_text']
1309         self._portable_data_hash = self._api_response['portable_data_hash']
1310         # If not overriden via kwargs, we should try to load the
1311         # replication_desired and storage_classes_desired from the API server
1312         if self.replication_desired is None:
1313             self.replication_desired = self._api_response.get('replication_desired', None)
1314         if self._storage_classes_desired is None:
1315             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1316
1317     def _populate(self):
1318         if self._manifest_text is None:
1319             if self._manifest_locator is None:
1320                 return
1321             else:
1322                 self._populate_from_api_server()
1323         self._baseline_manifest = self._manifest_text
1324         self._import_manifest(self._manifest_text)
1325
1326     def _has_collection_uuid(self):
1327         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1328
1329     def _has_local_collection_uuid(self):
1330         return self._has_collection_uuid and \
1331             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1332
1333     def __enter__(self):
1334         return self
1335
1336     def __exit__(self, exc_type, exc_value, traceback):
1337         """Exit a context with this collection instance
1338
1339         If no exception was raised inside the context block, and this
1340         collection is writable and has a corresponding API record, that
1341         record will be updated to match the state of this instance at the end
1342         of the block.
1343         """
1344         if exc_type is None:
1345             if self.writable() and self._has_collection_uuid():
1346                 self.save()
1347         self.stop_threads()
1348
1349     def stop_threads(self) -> None:
1350         """Stop background Keep upload/download threads"""
1351         if self._block_manager is not None:
1352             self._block_manager.stop_threads()
1353
1354     @synchronized
1355     def manifest_locator(self) -> Optional[str]:
1356         """Get this collection's manifest locator, if any
1357
1358         * If this collection instance is associated with an API record with a
1359           UUID, return that.
1360         * Otherwise, if this collection instance was loaded from an API record
1361           by portable data hash, return that.
1362         * Otherwise, return `None`.
1363         """
1364         return self._manifest_locator
1365
1366     @synchronized
1367     def clone(
1368             self,
1369             new_parent: Optional['Collection']=None,
1370             new_name: Optional[str]=None,
1371             readonly: bool=False,
1372             new_config: Optional[Mapping[str, str]]=None,
1373     ) -> 'Collection':
1374         """Create a Collection object with the same contents as this instance
1375
1376         This method creates a new Collection object with contents that match
1377         this instance's. The new collection will not be associated with any API
1378         record.
1379
1380         Arguments:
1381
1382         * new_parent: arvados.collection.Collection | None --- This value is
1383           passed to the new Collection's constructor as the `parent`
1384           argument.
1385
1386         * new_name: str | None --- This value is unused.
1387
1388         * readonly: bool --- If this value is true, this method constructs and
1389           returns a `CollectionReader`. Otherwise, it returns a mutable
1390           `Collection`. Default `False`.
1391
1392         * new_config: Mapping[str, str] | None --- This value is passed to the
1393           new Collection's constructor as `apiconfig`. If no value is provided,
1394           defaults to the configuration passed to this instance's constructor.
1395         """
1396         if new_config is None:
1397             new_config = self._config
1398         if readonly:
1399             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1400         else:
1401             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1402
1403         newcollection._clonefrom(self)
1404         return newcollection
1405
1406     @synchronized
1407     def api_response(self) -> Optional[Dict[str, Any]]:
1408         """Get this instance's associated API record
1409
1410         If this Collection instance has an associated API record, return it.
1411         Otherwise, return `None`.
1412         """
1413         return self._api_response
1414
1415     def find_or_create(
1416             self,
1417             path: str,
1418             create_type: CreateType,
1419     ) -> CollectionItem:
1420         if path == ".":
1421             return self
1422         else:
1423             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1424
1425     def find(self, path: str) -> CollectionItem:
1426         if path == ".":
1427             return self
1428         else:
1429             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1430
1431     def remove(self, path: str, recursive: bool=False) -> None:
1432         if path == ".":
1433             raise errors.ArgumentError("Cannot remove '.'")
1434         else:
1435             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1436
1437     @must_be_writable
1438     @synchronized
1439     @retry_method
1440     def save(
1441             self,
1442             properties: Optional[Properties]=None,
1443             storage_classes: Optional[StorageClasses]=None,
1444             trash_at: Optional[datetime.datetime]=None,
1445             merge: bool=True,
1446             num_retries: Optional[int]=None,
1447             preserve_version: bool=False,
1448     ) -> str:
1449         """Save collection to an existing API record
1450
1451         This method updates the instance's corresponding API record to match
1452         the instance's state. If this instance does not have a corresponding API
1453         record yet, raises `AssertionError`. (To create a new API record, use
1454         `Collection.save_new`.) This method returns the saved collection
1455         manifest.
1456
1457         Arguments:
1458
1459         * properties: dict[str, Any] | None --- If provided, the API record will
1460           be updated with these properties. Note this will completely replace
1461           any existing properties.
1462
1463         * storage_classes: list[str] | None --- If provided, the API record will
1464           be updated with this value in the `storage_classes_desired` field.
1465           This value will also be saved on the instance and used for any
1466           changes that follow.
1467
1468         * trash_at: datetime.datetime | None --- If provided, the API record
1469           will be updated with this value in the `trash_at` field.
1470
1471         * merge: bool --- If `True` (the default), this method will first
1472           reload this collection's API record, and merge any new contents into
1473           this instance before saving changes. See `Collection.update` for
1474           details.
1475
1476         * num_retries: int | None --- The number of times to retry reloading
1477           the collection's API record from the API server. If not specified,
1478           uses the `num_retries` provided when this instance was constructed.
1479
1480         * preserve_version: bool --- This value will be passed to directly
1481           to the underlying API call. If `True`, the Arvados API will
1482           preserve the versions of this collection both immediately before
1483           and after the update. If `True` when the API server is not
1484           configured with collection versioning, this method raises
1485           `arvados.errors.ArgumentError`.
1486         """
1487         if properties and type(properties) is not dict:
1488             raise errors.ArgumentError("properties must be dictionary type.")
1489
1490         if storage_classes and type(storage_classes) is not list:
1491             raise errors.ArgumentError("storage_classes must be list type.")
1492         if storage_classes:
1493             self._storage_classes_desired = storage_classes
1494
1495         if trash_at and type(trash_at) is not datetime.datetime:
1496             raise errors.ArgumentError("trash_at must be datetime type.")
1497
1498         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1499             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1500
1501         body={}
1502         if properties:
1503             body["properties"] = properties
1504         if self.storage_classes_desired():
1505             body["storage_classes_desired"] = self.storage_classes_desired()
1506         if trash_at:
1507             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1508             body["trash_at"] = t
1509         if preserve_version:
1510             body["preserve_version"] = preserve_version
1511
1512         if not self.committed():
1513             if self._has_remote_blocks:
1514                 # Copy any remote blocks to the local cluster.
1515                 self._copy_remote_blocks(remote_blocks={})
1516                 self._has_remote_blocks = False
1517             if not self._has_collection_uuid():
1518                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1519             elif not self._has_local_collection_uuid():
1520                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1521
1522             self._my_block_manager().commit_all()
1523
1524             if merge:
1525                 self.update()
1526
1527             text = self.manifest_text(strip=False)
1528             body['manifest_text'] = text
1529
1530             self._remember_api_response(self._my_api().collections().update(
1531                 uuid=self._manifest_locator,
1532                 body=body
1533                 ).execute(num_retries=num_retries))
1534             self._manifest_text = self._api_response["manifest_text"]
1535             self._portable_data_hash = self._api_response["portable_data_hash"]
1536             self.set_committed(True)
1537         elif body:
1538             self._remember_api_response(self._my_api().collections().update(
1539                 uuid=self._manifest_locator,
1540                 body=body
1541                 ).execute(num_retries=num_retries))
1542
1543         return self._manifest_text
1544
1545
1546     @must_be_writable
1547     @synchronized
1548     @retry_method
1549     def save_new(
1550             self,
1551             name: Optional[str]=None,
1552             create_collection_record: bool=True,
1553             owner_uuid: Optional[str]=None,
1554             properties: Optional[Properties]=None,
1555             storage_classes: Optional[StorageClasses]=None,
1556             trash_at: Optional[datetime.datetime]=None,
1557             ensure_unique_name: bool=False,
1558             num_retries: Optional[int]=None,
1559             preserve_version: bool=False,
1560     ):
1561         """Save collection to a new API record
1562
1563         This method finishes uploading new data blocks and (optionally)
1564         creates a new API collection record with the provided data. If a new
1565         record is created, this instance becomes associated with that record
1566         for future updates like `save()`. This method returns the saved
1567         collection manifest.
1568
1569         Arguments:
1570
1571         * name: str | None --- The `name` field to use on the new collection
1572           record. If not specified, a generic default name is generated.
1573
1574         * create_collection_record: bool --- If `True` (the default), creates a
1575           collection record on the API server. If `False`, the method finishes
1576           all data uploads and only returns the resulting collection manifest
1577           without sending it to the API server.
1578
1579         * owner_uuid: str | None --- The `owner_uuid` field to use on the
1580           new collection record.
1581
1582         * properties: dict[str, Any] | None --- The `properties` field to use on
1583           the new collection record.
1584
1585         * storage_classes: list[str] | None --- The
1586           `storage_classes_desired` field to use on the new collection record.
1587
1588         * trash_at: datetime.datetime | None --- The `trash_at` field to use
1589           on the new collection record.
1590
1591         * ensure_unique_name: bool --- This value is passed directly to the
1592           Arvados API when creating the collection record. If `True`, the API
1593           server may modify the submitted `name` to ensure the collection's
1594           `name`+`owner_uuid` combination is unique. If `False` (the default),
1595           if a collection already exists with this same `name`+`owner_uuid`
1596           combination, creating a collection record will raise a validation
1597           error.
1598
1599         * num_retries: int | None --- The number of times to retry reloading
1600           the collection's API record from the API server. If not specified,
1601           uses the `num_retries` provided when this instance was constructed.
1602
1603         * preserve_version: bool --- This value will be passed to directly
1604           to the underlying API call. If `True`, the Arvados API will
1605           preserve the versions of this collection both immediately before
1606           and after the update. If `True` when the API server is not
1607           configured with collection versioning, this method raises
1608           `arvados.errors.ArgumentError`.
1609         """
1610         if properties and type(properties) is not dict:
1611             raise errors.ArgumentError("properties must be dictionary type.")
1612
1613         if storage_classes and type(storage_classes) is not list:
1614             raise errors.ArgumentError("storage_classes must be list type.")
1615
1616         if trash_at and type(trash_at) is not datetime.datetime:
1617             raise errors.ArgumentError("trash_at must be datetime type.")
1618
1619         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1620             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1621
1622         if self._has_remote_blocks:
1623             # Copy any remote blocks to the local cluster.
1624             self._copy_remote_blocks(remote_blocks={})
1625             self._has_remote_blocks = False
1626
1627         if storage_classes:
1628             self._storage_classes_desired = storage_classes
1629
1630         self._my_block_manager().commit_all()
1631         text = self.manifest_text(strip=False)
1632
1633         if create_collection_record:
1634             if name is None:
1635                 name = "New collection"
1636                 ensure_unique_name = True
1637
1638             body = {"manifest_text": text,
1639                     "name": name,
1640                     "replication_desired": self.replication_desired}
1641             if owner_uuid:
1642                 body["owner_uuid"] = owner_uuid
1643             if properties:
1644                 body["properties"] = properties
1645             if self.storage_classes_desired():
1646                 body["storage_classes_desired"] = self.storage_classes_desired()
1647             if trash_at:
1648                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1649                 body["trash_at"] = t
1650             if preserve_version:
1651                 body["preserve_version"] = preserve_version
1652
1653             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1654             text = self._api_response["manifest_text"]
1655
1656             self._manifest_locator = self._api_response["uuid"]
1657             self._portable_data_hash = self._api_response["portable_data_hash"]
1658
1659             self._manifest_text = text
1660             self.set_committed(True)
1661
1662         return text
1663
1664     _token_re = re.compile(r'(\S+)(\s+|$)')
1665     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1666     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1667
1668     def _unescape_manifest_path(self, path):
1669         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1670
1671     @synchronized
1672     def _import_manifest(self, manifest_text):
1673         """Import a manifest into a `Collection`.
1674
1675         :manifest_text:
1676           The manifest text to import from.
1677
1678         """
1679         if len(self) > 0:
1680             raise ArgumentError("Can only import manifest into an empty collection")
1681
1682         STREAM_NAME = 0
1683         BLOCKS = 1
1684         SEGMENTS = 2
1685
1686         stream_name = None
1687         state = STREAM_NAME
1688
1689         for token_and_separator in self._token_re.finditer(manifest_text):
1690             tok = token_and_separator.group(1)
1691             sep = token_and_separator.group(2)
1692
1693             if state == STREAM_NAME:
1694                 # starting a new stream
1695                 stream_name = self._unescape_manifest_path(tok)
1696                 blocks = []
1697                 segments = []
1698                 streamoffset = 0
1699                 state = BLOCKS
1700                 self.find_or_create(stream_name, COLLECTION)
1701                 continue
1702
1703             if state == BLOCKS:
1704                 block_locator = self._block_re.match(tok)
1705                 if block_locator:
1706                     blocksize = int(block_locator.group(1))
1707                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1708                     streamoffset += blocksize
1709                 else:
1710                     state = SEGMENTS
1711
1712             if state == SEGMENTS:
1713                 file_segment = self._segment_re.match(tok)
1714                 if file_segment:
1715                     pos = int(file_segment.group(1))
1716                     size = int(file_segment.group(2))
1717                     name = self._unescape_manifest_path(file_segment.group(3))
1718                     if name.split('/')[-1] == '.':
1719                         # placeholder for persisting an empty directory, not a real file
1720                         if len(name) > 2:
1721                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1722                     else:
1723                         filepath = os.path.join(stream_name, name)
1724                         try:
1725                             afile = self.find_or_create(filepath, FILE)
1726                         except IOError as e:
1727                             if e.errno == errno.ENOTDIR:
1728                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1729                             else:
1730                                 raise e from None
1731                         if isinstance(afile, ArvadosFile):
1732                             afile.add_segment(blocks, pos, size)
1733                         else:
1734                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1735                 else:
1736                     # error!
1737                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1738
1739             if sep == "\n":
1740                 stream_name = None
1741                 state = STREAM_NAME
1742
1743         self.set_committed(True)
1744
1745     @synchronized
1746     def notify(
1747             self,
1748             event: ChangeType,
1749             collection: 'RichCollectionBase',
1750             name: str,
1751             item: CollectionItem,
1752     ) -> None:
1753         if self._callback:
1754             self._callback(event, collection, name, item)
1755
1756
1757 class Subcollection(RichCollectionBase):
1758     """Read and manipulate a stream/directory within an Arvados collection
1759
1760     This class represents a single stream (like a directory) within an Arvados
1761     `Collection`. It is returned by `Collection.find` and provides the same API.
1762     Operations that work on the API collection record propagate to the parent
1763     `Collection` object.
1764     """
1765
1766     def __init__(self, parent, name):
1767         super(Subcollection, self).__init__(parent)
1768         self.lock = self.root_collection().lock
1769         self._manifest_text = None
1770         self.name = name
1771         self.num_retries = parent.num_retries
1772
1773     def root_collection(self) -> 'Collection':
1774         return self.parent.root_collection()
1775
1776     def writable(self) -> bool:
1777         return self.root_collection().writable()
1778
1779     def _my_api(self):
1780         return self.root_collection()._my_api()
1781
1782     def _my_keep(self):
1783         return self.root_collection()._my_keep()
1784
1785     def _my_block_manager(self):
1786         return self.root_collection()._my_block_manager()
1787
1788     def stream_name(self) -> str:
1789         return os.path.join(self.parent.stream_name(), self.name)
1790
1791     @synchronized
1792     def clone(
1793             self,
1794             new_parent: Optional['Collection']=None,
1795             new_name: Optional[str]=None,
1796     ) -> 'Subcollection':
1797         c = Subcollection(new_parent, new_name)
1798         c._clonefrom(self)
1799         return c
1800
1801     @must_be_writable
1802     @synchronized
1803     def _reparent(self, newparent, newname):
1804         self.set_committed(False)
1805         self.flush()
1806         self.parent.remove(self.name, recursive=True)
1807         self.parent = newparent
1808         self.name = newname
1809         self.lock = self.parent.root_collection().lock
1810
1811     @synchronized
1812     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1813         """Encode empty directories by using an \056-named (".") empty file"""
1814         if len(self._items) == 0:
1815             return "%s %s 0:0:\\056\n" % (
1816                 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1817         return super(Subcollection, self)._get_manifest_text(stream_name,
1818                                                              strip, normalize,
1819                                                              only_committed)
1820
1821
1822 class CollectionReader(Collection):
1823     """Read-only `Collection` subclass
1824
1825     This class will never create or update any API collection records. You can
1826     use this class for additional code safety when you only need to read
1827     existing collections.
1828     """
1829     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1830         self._in_init = True
1831         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1832         self._in_init = False
1833
1834         # Forego any locking since it should never change once initialized.
1835         self.lock = NoopLock()
1836
1837         # Backwards compatability with old CollectionReader
1838         # all_streams() and all_files()
1839         self._streams = None
1840
1841     def writable(self) -> bool:
1842         return self._in_init