21935: Mark internal arvados.keep classes as such
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
5
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
11
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
13 """
14
15 import ciso8601
16 import datetime
17 import errno
18 import functools
19 import hashlib
20 import io
21 import logging
22 import os
23 import re
24 import sys
25 import threading
26 import time
27
28 from collections import deque
29 from stat import *
30
31 from ._internal import streams
32 from .api import ThreadSafeAPIClient
33 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
34 from .keep import KeepLocator, KeepClient
35 import arvados.config as config
36 import arvados.errors as errors
37 import arvados.util
38 import arvados.events as events
39 from arvados.retry import retry_method
40
41 from typing import (
42     Any,
43     Callable,
44     Dict,
45     IO,
46     Iterator,
47     List,
48     Mapping,
49     Optional,
50     Tuple,
51     Union,
52 )
53
54 if sys.version_info < (3, 8):
55     from typing_extensions import Literal
56 else:
57     from typing import Literal
58
59 _logger = logging.getLogger('arvados.collection')
60
61 ADD = "add"
62 """Argument value for `Collection` methods to represent an added item"""
63 DEL = "del"
64 """Argument value for `Collection` methods to represent a removed item"""
65 MOD = "mod"
66 """Argument value for `Collection` methods to represent a modified item"""
67 TOK = "tok"
68 """Argument value for `Collection` methods to represent an item with token differences"""
69 FILE = "file"
70 """`create_type` value for `Collection.find_or_create`"""
71 COLLECTION = "collection"
72 """`create_type` value for `Collection.find_or_create`"""
73
74 ChangeList = List[Union[
75     Tuple[Literal[ADD, DEL], str, 'Collection'],
76     Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
77 ]]
78 ChangeType = Literal[ADD, DEL, MOD, TOK]
79 CollectionItem = Union[ArvadosFile, 'Collection']
80 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
81 CreateType = Literal[COLLECTION, FILE]
82 Properties = Dict[str, Any]
83 StorageClasses = List[str]
84
85 class CollectionBase(object):
86     """Abstract base class for Collection classes
87
88     .. ATTENTION:: Internal
89        This class is meant to be used by other parts of the SDK. User code
90        should instantiate or subclass `Collection` or one of its subclasses
91        directly.
92     """
93
94     def __enter__(self):
95         """Enter a context block with this collection instance"""
96         return self
97
98     def __exit__(self, exc_type, exc_value, traceback):
99         """Exit a context block with this collection instance"""
100         pass
101
102     def _my_keep(self):
103         if self._keep_client is None:
104             self._keep_client = KeepClient(api_client=self._api_client,
105                                            num_retries=self.num_retries)
106         return self._keep_client
107
108     def stripped_manifest(self) -> str:
109         """Create a copy of the collection manifest with only size hints
110
111         This method returns a string with the current collection's manifest
112         text with all non-portable locator hints like permission hints and
113         remote cluster hints removed. The only hints in the returned manifest
114         will be size hints.
115         """
116         raw = self.manifest_text()
117         clean = []
118         for line in raw.split("\n"):
119             fields = line.split()
120             if fields:
121                 clean_fields = fields[:1] + [
122                     (re.sub(r'\+[^\d][^\+]*', '', x)
123                      if re.match(arvados.util.keep_locator_pattern, x)
124                      else x)
125                     for x in fields[1:]]
126                 clean += [' '.join(clean_fields), "\n"]
127         return ''.join(clean)
128
129
130 class _WriterFile(_FileLikeObjectBase):
131     def __init__(self, coll_writer, name):
132         super(_WriterFile, self).__init__(name, 'wb')
133         self.dest = coll_writer
134
135     def close(self):
136         super(_WriterFile, self).close()
137         self.dest.finish_current_file()
138
139     @_FileLikeObjectBase._before_close
140     def write(self, data):
141         self.dest.write(data)
142
143     @_FileLikeObjectBase._before_close
144     def writelines(self, seq):
145         for data in seq:
146             self.write(data)
147
148     @_FileLikeObjectBase._before_close
149     def flush(self):
150         self.dest.flush_data()
151
152
153 class RichCollectionBase(CollectionBase):
154     """Base class for Collection classes
155
156     .. ATTENTION:: Internal
157        This class is meant to be used by other parts of the SDK. User code
158        should instantiate or subclass `Collection` or one of its subclasses
159        directly.
160     """
161
162     def __init__(self, parent=None):
163         self.parent = parent
164         self._committed = False
165         self._has_remote_blocks = False
166         self._callback = None
167         self._items = {}
168
169     def _my_api(self):
170         raise NotImplementedError()
171
172     def _my_keep(self):
173         raise NotImplementedError()
174
175     def _my_block_manager(self):
176         raise NotImplementedError()
177
178     def writable(self) -> bool:
179         """Indicate whether this collection object can be modified
180
181         This method returns `False` if this object is a `CollectionReader`,
182         else `True`.
183         """
184         raise NotImplementedError()
185
186     def root_collection(self) -> 'Collection':
187         """Get this collection's root collection object
188
189         If you open a subcollection with `Collection.find`, calling this method
190         on that subcollection returns the source Collection object.
191         """
192         raise NotImplementedError()
193
194     def stream_name(self) -> str:
195         """Get the name of the manifest stream represented by this collection
196
197         If you open a subcollection with `Collection.find`, calling this method
198         on that subcollection returns the name of the stream you opened.
199         """
200         raise NotImplementedError()
201
202     @synchronized
203     def has_remote_blocks(self) -> bool:
204         """Indiciate whether the collection refers to remote data
205
206         Returns `True` if the collection manifest includes any Keep locators
207         with a remote hint (`+R`), else `False`.
208         """
209         if self._has_remote_blocks:
210             return True
211         for item in self:
212             if self[item].has_remote_blocks():
213                 return True
214         return False
215
216     @synchronized
217     def set_has_remote_blocks(self, val: bool) -> None:
218         """Cache whether this collection refers to remote blocks
219
220         .. ATTENTION:: Internal
221            This method is only meant to be used by other Collection methods.
222
223         Set this collection's cached "has remote blocks" flag to the given
224         value.
225         """
226         self._has_remote_blocks = val
227         if self.parent:
228             self.parent.set_has_remote_blocks(val)
229
230     @must_be_writable
231     @synchronized
232     def find_or_create(
233             self,
234             path: str,
235             create_type: CreateType,
236     ) -> CollectionItem:
237         """Get the item at the given path, creating it if necessary
238
239         If `path` refers to a stream in this collection, returns a
240         corresponding `Subcollection` object. If `path` refers to a file in
241         this collection, returns a corresponding
242         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
243         this collection, then this method creates a new object and returns
244         it, creating parent streams as needed. The type of object created is
245         determined by the value of `create_type`.
246
247         Arguments:
248
249         * path: str --- The path to find or create within this collection.
250
251         * create_type: Literal[COLLECTION, FILE] --- The type of object to
252           create at `path` if one does not exist. Passing `COLLECTION`
253           creates a stream and returns the corresponding
254           `Subcollection`. Passing `FILE` creates a new file and returns the
255           corresponding `arvados.arvfile.ArvadosFile`.
256         """
257         pathcomponents = path.split("/", 1)
258         if pathcomponents[0]:
259             item = self._items.get(pathcomponents[0])
260             if len(pathcomponents) == 1:
261                 if item is None:
262                     # create new file
263                     if create_type == COLLECTION:
264                         item = Subcollection(self, pathcomponents[0])
265                     else:
266                         item = ArvadosFile(self, pathcomponents[0])
267                     self._items[pathcomponents[0]] = item
268                     self.set_committed(False)
269                     self.notify(ADD, self, pathcomponents[0], item)
270                 return item
271             else:
272                 if item is None:
273                     # create new collection
274                     item = Subcollection(self, pathcomponents[0])
275                     self._items[pathcomponents[0]] = item
276                     self.set_committed(False)
277                     self.notify(ADD, self, pathcomponents[0], item)
278                 if isinstance(item, RichCollectionBase):
279                     return item.find_or_create(pathcomponents[1], create_type)
280                 else:
281                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
282         else:
283             return self
284
285     @synchronized
286     def find(self, path: str) -> CollectionItem:
287         """Get the item at the given path
288
289         If `path` refers to a stream in this collection, returns a
290         corresponding `Subcollection` object. If `path` refers to a file in
291         this collection, returns a corresponding
292         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
293         this collection, then this method raises `NotADirectoryError`.
294
295         Arguments:
296
297         * path: str --- The path to find or create within this collection.
298         """
299         if not path:
300             raise errors.ArgumentError("Parameter 'path' is empty.")
301
302         pathcomponents = path.split("/", 1)
303         if pathcomponents[0] == '':
304             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
305
306         item = self._items.get(pathcomponents[0])
307         if item is None:
308             return None
309         elif len(pathcomponents) == 1:
310             return item
311         else:
312             if isinstance(item, RichCollectionBase):
313                 if pathcomponents[1]:
314                     return item.find(pathcomponents[1])
315                 else:
316                     return item
317             else:
318                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
319
320     @synchronized
321     def mkdirs(self, path: str) -> 'Subcollection':
322         """Create and return a subcollection at `path`
323
324         If `path` exists within this collection, raises `FileExistsError`.
325         Otherwise, creates a stream at that path and returns the
326         corresponding `Subcollection`.
327         """
328         if self.find(path) != None:
329             raise IOError(errno.EEXIST, "Directory or file exists", path)
330
331         return self.find_or_create(path, COLLECTION)
332
333     def open(
334             self,
335             path: str,
336             mode: str="r",
337             encoding: Optional[str]=None
338     ) -> IO:
339         """Open a file-like object within the collection
340
341         This method returns a file-like object that can read and/or write the
342         file located at `path` within the collection. If you attempt to write
343         a `path` that does not exist, the file is created with `find_or_create`.
344         If the file cannot be opened for any other reason, this method raises
345         `OSError` with an appropriate errno.
346
347         Arguments:
348
349         * path: str --- The path of the file to open within this collection
350
351         * mode: str --- The mode to open this file. Supports all the same
352           values as `builtins.open`.
353
354         * encoding: str | None --- The text encoding of the file. Only used
355           when the file is opened in text mode. The default is
356           platform-dependent.
357
358         """
359         if not re.search(r'^[rwa][bt]?\+?$', mode):
360             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
361
362         if mode[0] == 'r' and '+' not in mode:
363             fclass = ArvadosFileReader
364             arvfile = self.find(path)
365         elif not self.writable():
366             raise IOError(errno.EROFS, "Collection is read only")
367         else:
368             fclass = ArvadosFileWriter
369             arvfile = self.find_or_create(path, FILE)
370
371         if arvfile is None:
372             raise IOError(errno.ENOENT, "File not found", path)
373         if not isinstance(arvfile, ArvadosFile):
374             raise IOError(errno.EISDIR, "Is a directory", path)
375
376         if mode[0] == 'w':
377             arvfile.truncate(0)
378
379         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
380         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
381         if 'b' not in mode:
382             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
383             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
384         return f
385
386     def modified(self) -> bool:
387         """Indicate whether this collection has an API server record
388
389         Returns `False` if this collection corresponds to a record loaded from
390         the API server, `True` otherwise.
391         """
392         return not self.committed()
393
394     @synchronized
395     def committed(self):
396         """Indicate whether this collection has an API server record
397
398         Returns `True` if this collection corresponds to a record loaded from
399         the API server, `False` otherwise.
400         """
401         return self._committed
402
403     @synchronized
404     def set_committed(self, value: bool=True):
405         """Cache whether this collection has an API server record
406
407         .. ATTENTION:: Internal
408            This method is only meant to be used by other Collection methods.
409
410         Set this collection's cached "committed" flag to the given
411         value and propagates it as needed.
412         """
413         if value == self._committed:
414             return
415         if value:
416             for k,v in self._items.items():
417                 v.set_committed(True)
418             self._committed = True
419         else:
420             self._committed = False
421             if self.parent is not None:
422                 self.parent.set_committed(False)
423
424     @synchronized
425     def __iter__(self) -> Iterator[str]:
426         """Iterate names of streams and files in this collection
427
428         This method does not recurse. It only iterates the contents of this
429         collection's corresponding stream.
430         """
431         return iter(self._items)
432
433     @synchronized
434     def __getitem__(self, k: str) -> CollectionItem:
435         """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
436
437         This method does not recurse. If you want to search a path, use
438         `RichCollectionBase.find` instead.
439         """
440         return self._items[k]
441
442     @synchronized
443     def __contains__(self, k: str) -> bool:
444         """Indicate whether this collection has an item with this name
445
446         This method does not recurse. It you want to check a path, use
447         `RichCollectionBase.exists` instead.
448         """
449         return k in self._items
450
451     @synchronized
452     def __len__(self):
453         """Get the number of items directly contained in this collection
454
455         This method does not recurse. It only counts the streams and files
456         in this collection's corresponding stream.
457         """
458         return len(self._items)
459
460     @must_be_writable
461     @synchronized
462     def __delitem__(self, p: str) -> None:
463         """Delete an item from this collection's stream
464
465         This method does not recurse. If you want to remove an item by a
466         path, use `RichCollectionBase.remove` instead.
467         """
468         del self._items[p]
469         self.set_committed(False)
470         self.notify(DEL, self, p, None)
471
472     @synchronized
473     def keys(self) -> Iterator[str]:
474         """Iterate names of streams and files in this collection
475
476         This method does not recurse. It only iterates the contents of this
477         collection's corresponding stream.
478         """
479         return self._items.keys()
480
481     @synchronized
482     def values(self) -> List[CollectionItem]:
483         """Get a list of objects in this collection's stream
484
485         The return value includes a `Subcollection` for every stream, and an
486         `arvados.arvfile.ArvadosFile` for every file, directly within this
487         collection's stream.  This method does not recurse.
488         """
489         return list(self._items.values())
490
491     @synchronized
492     def items(self) -> List[Tuple[str, CollectionItem]]:
493         """Get a list of `(name, object)` tuples from this collection's stream
494
495         The return value includes a `Subcollection` for every stream, and an
496         `arvados.arvfile.ArvadosFile` for every file, directly within this
497         collection's stream.  This method does not recurse.
498         """
499         return list(self._items.items())
500
501     def exists(self, path: str) -> bool:
502         """Indicate whether this collection includes an item at `path`
503
504         This method returns `True` if `path` refers to a stream or file within
505         this collection, else `False`.
506
507         Arguments:
508
509         * path: str --- The path to check for existence within this collection
510         """
511         return self.find(path) is not None
512
513     @must_be_writable
514     @synchronized
515     def remove(self, path: str, recursive: bool=False) -> None:
516         """Remove the file or stream at `path`
517
518         Arguments:
519
520         * path: str --- The path of the item to remove from the collection
521
522         * recursive: bool --- Controls the method's behavior if `path` refers
523           to a nonempty stream. If `False` (the default), this method raises
524           `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
525           items under the stream.
526         """
527         if not path:
528             raise errors.ArgumentError("Parameter 'path' is empty.")
529
530         pathcomponents = path.split("/", 1)
531         item = self._items.get(pathcomponents[0])
532         if item is None:
533             raise IOError(errno.ENOENT, "File not found", path)
534         if len(pathcomponents) == 1:
535             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
536                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
537             deleteditem = self._items[pathcomponents[0]]
538             del self._items[pathcomponents[0]]
539             self.set_committed(False)
540             self.notify(DEL, self, pathcomponents[0], deleteditem)
541         else:
542             item.remove(pathcomponents[1], recursive=recursive)
543
544     def _clonefrom(self, source):
545         for k,v in source.items():
546             self._items[k] = v.clone(self, k)
547
548     def clone(self):
549         raise NotImplementedError()
550
551     @must_be_writable
552     @synchronized
553     def add(
554             self,
555             source_obj: CollectionItem,
556             target_name: str,
557             overwrite: bool=False,
558             reparent: bool=False,
559     ) -> None:
560         """Copy or move a file or subcollection object to this collection
561
562         Arguments:
563
564         * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
565           to add to this collection
566
567         * target_name: str --- The path inside this collection where
568           `source_obj` should be added.
569
570         * overwrite: bool --- Controls the behavior of this method when the
571           collection already contains an object at `target_name`. If `False`
572           (the default), this method will raise `FileExistsError`. If `True`,
573           the object at `target_name` will be replaced with `source_obj`.
574
575         * reparent: bool --- Controls whether this method copies or moves
576           `source_obj`. If `False` (the default), `source_obj` is copied into
577           this collection. If `True`, `source_obj` is moved into this
578           collection.
579         """
580         if target_name in self and not overwrite:
581             raise IOError(errno.EEXIST, "File already exists", target_name)
582
583         modified_from = None
584         if target_name in self:
585             modified_from = self[target_name]
586
587         # Actually make the move or copy.
588         if reparent:
589             source_obj._reparent(self, target_name)
590             item = source_obj
591         else:
592             item = source_obj.clone(self, target_name)
593
594         self._items[target_name] = item
595         self.set_committed(False)
596         if not self._has_remote_blocks and source_obj.has_remote_blocks():
597             self.set_has_remote_blocks(True)
598
599         if modified_from:
600             self.notify(MOD, self, target_name, (modified_from, item))
601         else:
602             self.notify(ADD, self, target_name, item)
603
604     def _get_src_target(self, source, target_path, source_collection, create_dest):
605         if source_collection is None:
606             source_collection = self
607
608         # Find the object
609         if isinstance(source, str):
610             source_obj = source_collection.find(source)
611             if source_obj is None:
612                 raise IOError(errno.ENOENT, "File not found", source)
613             sourcecomponents = source.split("/")
614         else:
615             source_obj = source
616             sourcecomponents = None
617
618         # Find parent collection the target path
619         targetcomponents = target_path.split("/")
620
621         # Determine the name to use.
622         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
623
624         if not target_name:
625             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
626
627         if create_dest:
628             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
629         else:
630             if len(targetcomponents) > 1:
631                 target_dir = self.find("/".join(targetcomponents[0:-1]))
632             else:
633                 target_dir = self
634
635         if target_dir is None:
636             raise IOError(errno.ENOENT, "Target directory not found", target_name)
637
638         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
639             target_dir = target_dir[target_name]
640             target_name = sourcecomponents[-1]
641
642         return (source_obj, target_dir, target_name)
643
644     @must_be_writable
645     @synchronized
646     def copy(
647             self,
648             source: Union[str, CollectionItem],
649             target_path: str,
650             source_collection: Optional['RichCollectionBase']=None,
651             overwrite: bool=False,
652     ) -> None:
653         """Copy a file or subcollection object to this collection
654
655         Arguments:
656
657         * source: str | arvados.arvfile.ArvadosFile |
658           arvados.collection.Subcollection --- The file or subcollection to
659           add to this collection. If `source` is a str, the object will be
660           found by looking up this path from `source_collection` (see
661           below).
662
663         * target_path: str --- The path inside this collection where the
664           source object should be added.
665
666         * source_collection: arvados.collection.Collection | None --- The
667           collection to find the source object from when `source` is a
668           path. Defaults to the current collection (`self`).
669
670         * overwrite: bool --- Controls the behavior of this method when the
671           collection already contains an object at `target_path`. If `False`
672           (the default), this method will raise `FileExistsError`. If `True`,
673           the object at `target_path` will be replaced with `source_obj`.
674         """
675         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
676         target_dir.add(source_obj, target_name, overwrite, False)
677
678     @must_be_writable
679     @synchronized
680     def rename(
681             self,
682             source: Union[str, CollectionItem],
683             target_path: str,
684             source_collection: Optional['RichCollectionBase']=None,
685             overwrite: bool=False,
686     ) -> None:
687         """Move a file or subcollection object to this collection
688
689         Arguments:
690
691         * source: str | arvados.arvfile.ArvadosFile |
692           arvados.collection.Subcollection --- The file or subcollection to
693           add to this collection. If `source` is a str, the object will be
694           found by looking up this path from `source_collection` (see
695           below).
696
697         * target_path: str --- The path inside this collection where the
698           source object should be added.
699
700         * source_collection: arvados.collection.Collection | None --- The
701           collection to find the source object from when `source` is a
702           path. Defaults to the current collection (`self`).
703
704         * overwrite: bool --- Controls the behavior of this method when the
705           collection already contains an object at `target_path`. If `False`
706           (the default), this method will raise `FileExistsError`. If `True`,
707           the object at `target_path` will be replaced with `source_obj`.
708         """
709         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
710         if not source_obj.writable():
711             raise IOError(errno.EROFS, "Source collection is read only", source)
712         target_dir.add(source_obj, target_name, overwrite, True)
713
714     def portable_manifest_text(self, stream_name: str=".") -> str:
715         """Get the portable manifest text for this collection
716
717         The portable manifest text is normalized, and does not include access
718         tokens. This method does not flush outstanding blocks to Keep.
719
720         Arguments:
721
722         * stream_name: str --- The name to use for this collection's stream in
723           the generated manifest. Default `'.'`.
724         """
725         return self._get_manifest_text(stream_name, True, True)
726
727     @synchronized
728     def manifest_text(
729             self,
730             stream_name: str=".",
731             strip: bool=False,
732             normalize: bool=False,
733             only_committed: bool=False,
734     ) -> str:
735         """Get the manifest text for this collection
736
737         Arguments:
738
739         * stream_name: str --- The name to use for this collection's stream in
740           the generated manifest. Default `'.'`.
741
742         * strip: bool --- Controls whether or not the returned manifest text
743           includes access tokens. If `False` (the default), the manifest text
744           will include access tokens. If `True`, the manifest text will not
745           include access tokens.
746
747         * normalize: bool --- Controls whether or not the returned manifest
748           text is normalized. Default `False`.
749
750         * only_committed: bool --- Controls whether or not this method uploads
751           pending data to Keep before building and returning the manifest text.
752           If `False` (the default), this method will finish uploading all data
753           to Keep, then return the final manifest. If `True`, this method will
754           build and return a manifest that only refers to the data that has
755           finished uploading at the time this method was called.
756         """
757         if not only_committed:
758             self._my_block_manager().commit_all()
759         return self._get_manifest_text(stream_name, strip, normalize,
760                                        only_committed=only_committed)
761
762     @synchronized
763     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
764         """Get the manifest text for this collection, sub collections and files.
765
766         :stream_name:
767           Name to use for this stream (directory)
768
769         :strip:
770           If True, remove signing tokens from block locators if present.
771           If False (default), block locators are left unchanged.
772
773         :normalize:
774           If True, always export the manifest text in normalized form
775           even if the Collection is not modified.  If False (default) and the collection
776           is not modified, return the original manifest text even if it is not
777           in normalized form.
778
779         :only_committed:
780           If True, only include blocks that were already committed to Keep.
781
782         """
783
784         if not self.committed() or self._manifest_text is None or normalize:
785             stream = {}
786             buf = []
787             sorted_keys = sorted(self.keys())
788             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
789                 # Create a stream per file `k`
790                 arvfile = self[filename]
791                 filestream = []
792                 for segment in arvfile.segments():
793                     loc = segment.locator
794                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
795                         if only_committed:
796                             continue
797                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
798                     if strip:
799                         loc = KeepLocator(loc).stripped()
800                     filestream.append(streams.LocatorAndRange(
801                         loc,
802                         KeepLocator(loc).size,
803                         segment.segment_offset,
804                         segment.range_size,
805                     ))
806                 stream[filename] = filestream
807             if stream:
808                 buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n")
809             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
810                 buf.append(self[dirname].manifest_text(
811                     stream_name=os.path.join(stream_name, dirname),
812                     strip=strip, normalize=True, only_committed=only_committed))
813             return "".join(buf)
814         else:
815             if strip:
816                 return self.stripped_manifest()
817             else:
818                 return self._manifest_text
819
820     @synchronized
821     def _copy_remote_blocks(self, remote_blocks={}):
822         """Scan through the entire collection and ask Keep to copy remote blocks.
823
824         When accessing a remote collection, blocks will have a remote signature
825         (+R instead of +A). Collect these signatures and request Keep to copy the
826         blocks to the local cluster, returning local (+A) signatures.
827
828         :remote_blocks:
829           Shared cache of remote to local block mappings. This is used to avoid
830           doing extra work when blocks are shared by more than one file in
831           different subdirectories.
832
833         """
834         for item in self:
835             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
836         return remote_blocks
837
838     @synchronized
839     def diff(
840             self,
841             end_collection: 'RichCollectionBase',
842             prefix: str=".",
843             holding_collection: Optional['Collection']=None,
844     ) -> ChangeList:
845         """Build a list of differences between this collection and another
846
847         Arguments:
848
849         * end_collection: arvados.collection.RichCollectionBase --- A
850           collection object with the desired end state. The returned diff
851           list will describe how to go from the current collection object
852           `self` to `end_collection`.
853
854         * prefix: str --- The name to use for this collection's stream in
855           the diff list. Default `'.'`.
856
857         * holding_collection: arvados.collection.Collection | None --- A
858           collection object used to hold objects for the returned diff
859           list. By default, a new empty collection is created.
860         """
861         changes = []
862         if holding_collection is None:
863             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
864         for k in self:
865             if k not in end_collection:
866                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
867         for k in end_collection:
868             if k in self:
869                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
870                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
871                 elif end_collection[k] != self[k]:
872                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
873                 else:
874                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
875             else:
876                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
877         return changes
878
879     @must_be_writable
880     @synchronized
881     def apply(self, changes: ChangeList) -> None:
882         """Apply a list of changes from to this collection
883
884         This method takes a list of changes generated by
885         `RichCollectionBase.diff` and applies it to this
886         collection. Afterward, the state of this collection object will
887         match the state of `end_collection` passed to `diff`. If a change
888         conflicts with a local change, it will be saved to an alternate path
889         indicating the conflict.
890
891         Arguments:
892
893         * changes: arvados.collection.ChangeList --- The list of differences
894           generated by `RichCollectionBase.diff`.
895         """
896         if changes:
897             self.set_committed(False)
898         for change in changes:
899             event_type = change[0]
900             path = change[1]
901             initial = change[2]
902             local = self.find(path)
903             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
904                                                                     time.gmtime()))
905             if event_type == ADD:
906                 if local is None:
907                     # No local file at path, safe to copy over new file
908                     self.copy(initial, path)
909                 elif local is not None and local != initial:
910                     # There is already local file and it is different:
911                     # save change to conflict file.
912                     self.copy(initial, conflictpath)
913             elif event_type == MOD or event_type == TOK:
914                 final = change[3]
915                 if local == initial:
916                     # Local matches the "initial" item so it has not
917                     # changed locally and is safe to update.
918                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
919                         # Replace contents of local file with new contents
920                         local.replace_contents(final)
921                     else:
922                         # Overwrite path with new item; this can happen if
923                         # path was a file and is now a collection or vice versa
924                         self.copy(final, path, overwrite=True)
925                 else:
926                     # Local is missing (presumably deleted) or local doesn't
927                     # match the "start" value, so save change to conflict file
928                     self.copy(final, conflictpath)
929             elif event_type == DEL:
930                 if local == initial:
931                     # Local item matches "initial" value, so it is safe to remove.
932                     self.remove(path, recursive=True)
933                 # else, the file is modified or already removed, in either
934                 # case we don't want to try to remove it.
935
936     def portable_data_hash(self) -> str:
937         """Get the portable data hash for this collection's manifest"""
938         if self._manifest_locator and self.committed():
939             # If the collection is already saved on the API server, and it's committed
940             # then return API server's PDH response.
941             return self._portable_data_hash
942         else:
943             stripped = self.portable_manifest_text().encode()
944             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
945
946     @synchronized
947     def subscribe(self, callback: ChangeCallback) -> None:
948         """Set a notify callback for changes to this collection
949
950         Arguments:
951
952         * callback: arvados.collection.ChangeCallback --- The callable to
953           call each time the collection is changed.
954         """
955         if self._callback is None:
956             self._callback = callback
957         else:
958             raise errors.ArgumentError("A callback is already set on this collection.")
959
960     @synchronized
961     def unsubscribe(self) -> None:
962         """Remove any notify callback set for changes to this collection"""
963         if self._callback is not None:
964             self._callback = None
965
966     @synchronized
967     def notify(
968             self,
969             event: ChangeType,
970             collection: 'RichCollectionBase',
971             name: str,
972             item: CollectionItem,
973     ) -> None:
974         """Notify any subscribed callback about a change to this collection
975
976         .. ATTENTION:: Internal
977            This method is only meant to be used by other Collection methods.
978
979         If a callback has been registered with `RichCollectionBase.subscribe`,
980         it will be called with information about a change to this collection.
981         Then this notification will be propagated to this collection's root.
982
983         Arguments:
984
985         * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
986           the collection.
987
988         * collection: arvados.collection.RichCollectionBase --- The
989           collection that was modified.
990
991         * name: str --- The name of the file or stream within `collection` that
992           was modified.
993
994         * item: arvados.arvfile.ArvadosFile |
995           arvados.collection.Subcollection --- The new contents at `name`
996           within `collection`.
997         """
998         if self._callback:
999             self._callback(event, collection, name, item)
1000         self.root_collection().notify(event, collection, name, item)
1001
1002     @synchronized
1003     def __eq__(self, other: Any) -> bool:
1004         """Indicate whether this collection object is equal to another"""
1005         if other is self:
1006             return True
1007         if not isinstance(other, RichCollectionBase):
1008             return False
1009         if len(self._items) != len(other):
1010             return False
1011         for k in self._items:
1012             if k not in other:
1013                 return False
1014             if self._items[k] != other[k]:
1015                 return False
1016         return True
1017
1018     def __ne__(self, other: Any) -> bool:
1019         """Indicate whether this collection object is not equal to another"""
1020         return not self.__eq__(other)
1021
1022     @synchronized
1023     def flush(self) -> None:
1024         """Upload any pending data to Keep"""
1025         for e in self.values():
1026             e.flush()
1027
1028
1029 class Collection(RichCollectionBase):
1030     """Read and manipulate an Arvados collection
1031
1032     This class provides a high-level interface to create, read, and update
1033     Arvados collections and their contents. Refer to the Arvados Python SDK
1034     cookbook for [an introduction to using the Collection class][cookbook].
1035
1036     [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1037     """
1038
1039     def __init__(self, manifest_locator_or_text: Optional[str]=None,
1040                  api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1041                  keep_client: Optional['arvados.keep.KeepClient']=None,
1042                  num_retries: int=10,
1043                  parent: Optional['Collection']=None,
1044                  apiconfig: Optional[Mapping[str, str]]=None,
1045                  block_manager: Optional['arvados.arvfile._BlockManager']=None,
1046                  replication_desired: Optional[int]=None,
1047                  storage_classes_desired: Optional[List[str]]=None,
1048                  put_threads: Optional[int]=None):
1049         """Initialize a Collection object
1050
1051         Arguments:
1052
1053         * manifest_locator_or_text: str | None --- This string can contain a
1054           collection manifest text, portable data hash, or UUID. When given a
1055           portable data hash or UUID, this instance will load a collection
1056           record from the API server. Otherwise, this instance will represent a
1057           new collection without an API server record. The default value `None`
1058           instantiates a new collection with an empty manifest.
1059
1060         * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1061           Arvados API client object this instance uses to make requests. If
1062           none is given, this instance creates its own client using the
1063           settings from `apiconfig` (see below). If your client instantiates
1064           many Collection objects, you can help limit memory utilization by
1065           calling `arvados.api.api` to construct an
1066           `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1067           for every Collection.
1068
1069         * keep_client: arvados.keep.KeepClient | None --- The Keep client
1070           object this instance uses to make requests. If none is given, this
1071           instance creates its own client using its `api_client`.
1072
1073         * num_retries: int --- The number of times that client requests are
1074           retried. Default 10.
1075
1076         * parent: arvados.collection.Collection | None --- The parent Collection
1077           object of this instance, if any. This argument is primarily used by
1078           other Collection methods; user client code shouldn't need to use it.
1079
1080         * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1081           `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1082           `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1083           Collection object constructs one from these settings. If no
1084           mapping is provided, calls `arvados.config.settings` to get these
1085           parameters from user configuration.
1086
1087         * block_manager: arvados.arvfile._BlockManager | None --- The
1088           _BlockManager object used by this instance to coordinate reading
1089           and writing Keep data blocks. If none is given, this instance
1090           constructs its own. This argument is primarily used by other
1091           Collection methods; user client code shouldn't need to use it.
1092
1093         * replication_desired: int | None --- This controls both the value of
1094           the `replication_desired` field on API collection records saved by
1095           this class, as well as the number of Keep services that the object
1096           writes new data blocks to. If none is given, uses the default value
1097           configured for the cluster.
1098
1099         * storage_classes_desired: list[str] | None --- This controls both
1100           the value of the `storage_classes_desired` field on API collection
1101           records saved by this class, as well as selecting which specific
1102           Keep services the object writes new data blocks to. If none is
1103           given, defaults to an empty list.
1104
1105         * put_threads: int | None --- The number of threads to run
1106           simultaneously to upload data blocks to Keep. This value is used when
1107           building a new `block_manager`. It is unused when a `block_manager`
1108           is provided.
1109         """
1110
1111         if storage_classes_desired and type(storage_classes_desired) is not list:
1112             raise errors.ArgumentError("storage_classes_desired must be list type.")
1113
1114         super(Collection, self).__init__(parent)
1115         self._api_client = api_client
1116         self._keep_client = keep_client
1117
1118         # Use the keep client from ThreadSafeAPIClient
1119         if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1120             self._keep_client = self._api_client.keep
1121
1122         self._block_manager = block_manager
1123         self.replication_desired = replication_desired
1124         self._storage_classes_desired = storage_classes_desired
1125         self.put_threads = put_threads
1126
1127         if apiconfig:
1128             self._config = apiconfig
1129         else:
1130             self._config = config.settings()
1131
1132         self.num_retries = num_retries
1133         self._manifest_locator = None
1134         self._manifest_text = None
1135         self._portable_data_hash = None
1136         self._api_response = None
1137         self._past_versions = set()
1138
1139         self.lock = threading.RLock()
1140         self.events = None
1141
1142         if manifest_locator_or_text:
1143             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1144                 self._manifest_locator = manifest_locator_or_text
1145             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1146                 self._manifest_locator = manifest_locator_or_text
1147                 if not self._has_local_collection_uuid():
1148                     self._has_remote_blocks = True
1149             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1150                 self._manifest_text = manifest_locator_or_text
1151                 if '+R' in self._manifest_text:
1152                     self._has_remote_blocks = True
1153             else:
1154                 raise errors.ArgumentError(
1155                     "Argument to CollectionReader is not a manifest or a collection UUID")
1156
1157             try:
1158                 self._populate()
1159             except errors.SyntaxError as e:
1160                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1161
1162     def storage_classes_desired(self) -> List[str]:
1163         """Get this collection's `storage_classes_desired` value"""
1164         return self._storage_classes_desired or []
1165
1166     def root_collection(self) -> 'Collection':
1167         return self
1168
1169     def get_properties(self) -> Properties:
1170         """Get this collection's properties
1171
1172         This method always returns a dict. If this collection object does not
1173         have an associated API record, or that record does not have any
1174         properties set, this method returns an empty dict.
1175         """
1176         if self._api_response and self._api_response["properties"]:
1177             return self._api_response["properties"]
1178         else:
1179             return {}
1180
1181     def get_trash_at(self) -> Optional[datetime.datetime]:
1182         """Get this collection's `trash_at` field
1183
1184         This method parses the `trash_at` field of the collection's API
1185         record and returns a datetime from it. If that field is not set, or
1186         this collection object does not have an associated API record,
1187         returns None.
1188         """
1189         if self._api_response and self._api_response["trash_at"]:
1190             try:
1191                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1192             except ValueError:
1193                 return None
1194         else:
1195             return None
1196
1197     def stream_name(self) -> str:
1198         return "."
1199
1200     def writable(self) -> bool:
1201         return True
1202
1203     @synchronized
1204     def known_past_version(
1205             self,
1206             modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1207     ) -> bool:
1208         """Indicate whether an API record for this collection has been seen before
1209
1210         As this collection object loads records from the API server, it records
1211         their `modified_at` and `portable_data_hash` fields. This method accepts
1212         a 2-tuple with values for those fields, and returns `True` if the
1213         combination was previously loaded.
1214         """
1215         return modified_at_and_portable_data_hash in self._past_versions
1216
1217     @synchronized
1218     @retry_method
1219     def update(
1220             self,
1221             other: Optional['Collection']=None,
1222             num_retries: Optional[int]=None,
1223     ) -> None:
1224         """Merge another collection's contents into this one
1225
1226         This method compares the manifest of this collection instance with
1227         another, then updates this instance's manifest with changes from the
1228         other, renaming files to flag conflicts where necessary.
1229
1230         When called without any arguments, this method reloads the collection's
1231         API record, and updates this instance with any changes that have
1232         appeared server-side. If this instance does not have a corresponding
1233         API record, this method raises `arvados.errors.ArgumentError`.
1234
1235         Arguments:
1236
1237         * other: arvados.collection.Collection | None --- The collection
1238           whose contents should be merged into this instance. When not
1239           provided, this method reloads this collection's API record and
1240           constructs a Collection object from it.  If this instance does not
1241           have a corresponding API record, this method raises
1242           `arvados.errors.ArgumentError`.
1243
1244         * num_retries: int | None --- The number of times to retry reloading
1245           the collection's API record from the API server. If not specified,
1246           uses the `num_retries` provided when this instance was constructed.
1247         """
1248         if other is None:
1249             if self._manifest_locator is None:
1250                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1251             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1252             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1253                 response.get("portable_data_hash") != self.portable_data_hash()):
1254                 # The record on the server is different from our current one, but we've seen it before,
1255                 # so ignore it because it's already been merged.
1256                 # However, if it's the same as our current record, proceed with the update, because we want to update
1257                 # our tokens.
1258                 return
1259             else:
1260                 self._remember_api_response(response)
1261             other = CollectionReader(response["manifest_text"])
1262         baseline = CollectionReader(self._manifest_text)
1263         self.apply(baseline.diff(other))
1264         self._manifest_text = self.manifest_text()
1265
1266     @synchronized
1267     def _my_api(self):
1268         if self._api_client is None:
1269             self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1270             if self._keep_client is None:
1271                 self._keep_client = self._api_client.keep
1272         return self._api_client
1273
1274     @synchronized
1275     def _my_keep(self):
1276         if self._keep_client is None:
1277             if self._api_client is None:
1278                 self._my_api()
1279             else:
1280                 self._keep_client = KeepClient(api_client=self._api_client)
1281         return self._keep_client
1282
1283     @synchronized
1284     def _my_block_manager(self):
1285         if self._block_manager is None:
1286             copies = (self.replication_desired or
1287                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1288                                                    2))
1289             self._block_manager = _BlockManager(self._my_keep(),
1290                                                 copies=copies,
1291                                                 put_threads=self.put_threads,
1292                                                 num_retries=self.num_retries,
1293                                                 storage_classes_func=self.storage_classes_desired)
1294         return self._block_manager
1295
1296     def _remember_api_response(self, response):
1297         self._api_response = response
1298         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1299
1300     def _populate_from_api_server(self):
1301         # As in KeepClient itself, we must wait until the last
1302         # possible moment to instantiate an API client, in order to
1303         # avoid tripping up clients that don't have access to an API
1304         # server.  If we do build one, make sure our Keep client uses
1305         # it.  If instantiation fails, we'll fall back to the except
1306         # clause, just like any other Collection lookup
1307         # failure. Return an exception, or None if successful.
1308         self._remember_api_response(self._my_api().collections().get(
1309             uuid=self._manifest_locator).execute(
1310                 num_retries=self.num_retries))
1311         self._manifest_text = self._api_response['manifest_text']
1312         self._portable_data_hash = self._api_response['portable_data_hash']
1313         # If not overriden via kwargs, we should try to load the
1314         # replication_desired and storage_classes_desired from the API server
1315         if self.replication_desired is None:
1316             self.replication_desired = self._api_response.get('replication_desired', None)
1317         if self._storage_classes_desired is None:
1318             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1319
1320     def _populate(self):
1321         if self._manifest_text is None:
1322             if self._manifest_locator is None:
1323                 return
1324             else:
1325                 self._populate_from_api_server()
1326         self._baseline_manifest = self._manifest_text
1327         self._import_manifest(self._manifest_text)
1328
1329     def _has_collection_uuid(self):
1330         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1331
1332     def _has_local_collection_uuid(self):
1333         return self._has_collection_uuid and \
1334             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1335
1336     def __enter__(self):
1337         return self
1338
1339     def __exit__(self, exc_type, exc_value, traceback):
1340         """Exit a context with this collection instance
1341
1342         If no exception was raised inside the context block, and this
1343         collection is writable and has a corresponding API record, that
1344         record will be updated to match the state of this instance at the end
1345         of the block.
1346         """
1347         if exc_type is None:
1348             if self.writable() and self._has_collection_uuid():
1349                 self.save()
1350         self.stop_threads()
1351
1352     def stop_threads(self) -> None:
1353         """Stop background Keep upload/download threads"""
1354         if self._block_manager is not None:
1355             self._block_manager.stop_threads()
1356
1357     @synchronized
1358     def manifest_locator(self) -> Optional[str]:
1359         """Get this collection's manifest locator, if any
1360
1361         * If this collection instance is associated with an API record with a
1362           UUID, return that.
1363         * Otherwise, if this collection instance was loaded from an API record
1364           by portable data hash, return that.
1365         * Otherwise, return `None`.
1366         """
1367         return self._manifest_locator
1368
1369     @synchronized
1370     def clone(
1371             self,
1372             new_parent: Optional['Collection']=None,
1373             new_name: Optional[str]=None,
1374             readonly: bool=False,
1375             new_config: Optional[Mapping[str, str]]=None,
1376     ) -> 'Collection':
1377         """Create a Collection object with the same contents as this instance
1378
1379         This method creates a new Collection object with contents that match
1380         this instance's. The new collection will not be associated with any API
1381         record.
1382
1383         Arguments:
1384
1385         * new_parent: arvados.collection.Collection | None --- This value is
1386           passed to the new Collection's constructor as the `parent`
1387           argument.
1388
1389         * new_name: str | None --- This value is unused.
1390
1391         * readonly: bool --- If this value is true, this method constructs and
1392           returns a `CollectionReader`. Otherwise, it returns a mutable
1393           `Collection`. Default `False`.
1394
1395         * new_config: Mapping[str, str] | None --- This value is passed to the
1396           new Collection's constructor as `apiconfig`. If no value is provided,
1397           defaults to the configuration passed to this instance's constructor.
1398         """
1399         if new_config is None:
1400             new_config = self._config
1401         if readonly:
1402             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1403         else:
1404             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1405
1406         newcollection._clonefrom(self)
1407         return newcollection
1408
1409     @synchronized
1410     def api_response(self) -> Optional[Dict[str, Any]]:
1411         """Get this instance's associated API record
1412
1413         If this Collection instance has an associated API record, return it.
1414         Otherwise, return `None`.
1415         """
1416         return self._api_response
1417
1418     def find_or_create(
1419             self,
1420             path: str,
1421             create_type: CreateType,
1422     ) -> CollectionItem:
1423         if path == ".":
1424             return self
1425         else:
1426             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1427
1428     def find(self, path: str) -> CollectionItem:
1429         if path == ".":
1430             return self
1431         else:
1432             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1433
1434     def remove(self, path: str, recursive: bool=False) -> None:
1435         if path == ".":
1436             raise errors.ArgumentError("Cannot remove '.'")
1437         else:
1438             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1439
1440     @must_be_writable
1441     @synchronized
1442     @retry_method
1443     def save(
1444             self,
1445             properties: Optional[Properties]=None,
1446             storage_classes: Optional[StorageClasses]=None,
1447             trash_at: Optional[datetime.datetime]=None,
1448             merge: bool=True,
1449             num_retries: Optional[int]=None,
1450             preserve_version: bool=False,
1451     ) -> str:
1452         """Save collection to an existing API record
1453
1454         This method updates the instance's corresponding API record to match
1455         the instance's state. If this instance does not have a corresponding API
1456         record yet, raises `AssertionError`. (To create a new API record, use
1457         `Collection.save_new`.) This method returns the saved collection
1458         manifest.
1459
1460         Arguments:
1461
1462         * properties: dict[str, Any] | None --- If provided, the API record will
1463           be updated with these properties. Note this will completely replace
1464           any existing properties.
1465
1466         * storage_classes: list[str] | None --- If provided, the API record will
1467           be updated with this value in the `storage_classes_desired` field.
1468           This value will also be saved on the instance and used for any
1469           changes that follow.
1470
1471         * trash_at: datetime.datetime | None --- If provided, the API record
1472           will be updated with this value in the `trash_at` field.
1473
1474         * merge: bool --- If `True` (the default), this method will first
1475           reload this collection's API record, and merge any new contents into
1476           this instance before saving changes. See `Collection.update` for
1477           details.
1478
1479         * num_retries: int | None --- The number of times to retry reloading
1480           the collection's API record from the API server. If not specified,
1481           uses the `num_retries` provided when this instance was constructed.
1482
1483         * preserve_version: bool --- This value will be passed to directly
1484           to the underlying API call. If `True`, the Arvados API will
1485           preserve the versions of this collection both immediately before
1486           and after the update. If `True` when the API server is not
1487           configured with collection versioning, this method raises
1488           `arvados.errors.ArgumentError`.
1489         """
1490         if properties and type(properties) is not dict:
1491             raise errors.ArgumentError("properties must be dictionary type.")
1492
1493         if storage_classes and type(storage_classes) is not list:
1494             raise errors.ArgumentError("storage_classes must be list type.")
1495         if storage_classes:
1496             self._storage_classes_desired = storage_classes
1497
1498         if trash_at and type(trash_at) is not datetime.datetime:
1499             raise errors.ArgumentError("trash_at must be datetime type.")
1500
1501         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1502             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1503
1504         body={}
1505         if properties:
1506             body["properties"] = properties
1507         if self.storage_classes_desired():
1508             body["storage_classes_desired"] = self.storage_classes_desired()
1509         if trash_at:
1510             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1511             body["trash_at"] = t
1512         if preserve_version:
1513             body["preserve_version"] = preserve_version
1514
1515         if not self.committed():
1516             if self._has_remote_blocks:
1517                 # Copy any remote blocks to the local cluster.
1518                 self._copy_remote_blocks(remote_blocks={})
1519                 self._has_remote_blocks = False
1520             if not self._has_collection_uuid():
1521                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1522             elif not self._has_local_collection_uuid():
1523                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1524
1525             self._my_block_manager().commit_all()
1526
1527             if merge:
1528                 self.update()
1529
1530             text = self.manifest_text(strip=False)
1531             body['manifest_text'] = text
1532
1533             self._remember_api_response(self._my_api().collections().update(
1534                 uuid=self._manifest_locator,
1535                 body=body
1536                 ).execute(num_retries=num_retries))
1537             self._manifest_text = self._api_response["manifest_text"]
1538             self._portable_data_hash = self._api_response["portable_data_hash"]
1539             self.set_committed(True)
1540         elif body:
1541             self._remember_api_response(self._my_api().collections().update(
1542                 uuid=self._manifest_locator,
1543                 body=body
1544                 ).execute(num_retries=num_retries))
1545
1546         return self._manifest_text
1547
1548
1549     @must_be_writable
1550     @synchronized
1551     @retry_method
1552     def save_new(
1553             self,
1554             name: Optional[str]=None,
1555             create_collection_record: bool=True,
1556             owner_uuid: Optional[str]=None,
1557             properties: Optional[Properties]=None,
1558             storage_classes: Optional[StorageClasses]=None,
1559             trash_at: Optional[datetime.datetime]=None,
1560             ensure_unique_name: bool=False,
1561             num_retries: Optional[int]=None,
1562             preserve_version: bool=False,
1563     ):
1564         """Save collection to a new API record
1565
1566         This method finishes uploading new data blocks and (optionally)
1567         creates a new API collection record with the provided data. If a new
1568         record is created, this instance becomes associated with that record
1569         for future updates like `save()`. This method returns the saved
1570         collection manifest.
1571
1572         Arguments:
1573
1574         * name: str | None --- The `name` field to use on the new collection
1575           record. If not specified, a generic default name is generated.
1576
1577         * create_collection_record: bool --- If `True` (the default), creates a
1578           collection record on the API server. If `False`, the method finishes
1579           all data uploads and only returns the resulting collection manifest
1580           without sending it to the API server.
1581
1582         * owner_uuid: str | None --- The `owner_uuid` field to use on the
1583           new collection record.
1584
1585         * properties: dict[str, Any] | None --- The `properties` field to use on
1586           the new collection record.
1587
1588         * storage_classes: list[str] | None --- The
1589           `storage_classes_desired` field to use on the new collection record.
1590
1591         * trash_at: datetime.datetime | None --- The `trash_at` field to use
1592           on the new collection record.
1593
1594         * ensure_unique_name: bool --- This value is passed directly to the
1595           Arvados API when creating the collection record. If `True`, the API
1596           server may modify the submitted `name` to ensure the collection's
1597           `name`+`owner_uuid` combination is unique. If `False` (the default),
1598           if a collection already exists with this same `name`+`owner_uuid`
1599           combination, creating a collection record will raise a validation
1600           error.
1601
1602         * num_retries: int | None --- The number of times to retry reloading
1603           the collection's API record from the API server. If not specified,
1604           uses the `num_retries` provided when this instance was constructed.
1605
1606         * preserve_version: bool --- This value will be passed to directly
1607           to the underlying API call. If `True`, the Arvados API will
1608           preserve the versions of this collection both immediately before
1609           and after the update. If `True` when the API server is not
1610           configured with collection versioning, this method raises
1611           `arvados.errors.ArgumentError`.
1612         """
1613         if properties and type(properties) is not dict:
1614             raise errors.ArgumentError("properties must be dictionary type.")
1615
1616         if storage_classes and type(storage_classes) is not list:
1617             raise errors.ArgumentError("storage_classes must be list type.")
1618
1619         if trash_at and type(trash_at) is not datetime.datetime:
1620             raise errors.ArgumentError("trash_at must be datetime type.")
1621
1622         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1623             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1624
1625         if self._has_remote_blocks:
1626             # Copy any remote blocks to the local cluster.
1627             self._copy_remote_blocks(remote_blocks={})
1628             self._has_remote_blocks = False
1629
1630         if storage_classes:
1631             self._storage_classes_desired = storage_classes
1632
1633         self._my_block_manager().commit_all()
1634         text = self.manifest_text(strip=False)
1635
1636         if create_collection_record:
1637             if name is None:
1638                 name = "New collection"
1639                 ensure_unique_name = True
1640
1641             body = {"manifest_text": text,
1642                     "name": name,
1643                     "replication_desired": self.replication_desired}
1644             if owner_uuid:
1645                 body["owner_uuid"] = owner_uuid
1646             if properties:
1647                 body["properties"] = properties
1648             if self.storage_classes_desired():
1649                 body["storage_classes_desired"] = self.storage_classes_desired()
1650             if trash_at:
1651                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1652                 body["trash_at"] = t
1653             if preserve_version:
1654                 body["preserve_version"] = preserve_version
1655
1656             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1657             text = self._api_response["manifest_text"]
1658
1659             self._manifest_locator = self._api_response["uuid"]
1660             self._portable_data_hash = self._api_response["portable_data_hash"]
1661
1662             self._manifest_text = text
1663             self.set_committed(True)
1664
1665         return text
1666
1667     _token_re = re.compile(r'(\S+)(\s+|$)')
1668     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1669     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1670
1671     def _unescape_manifest_path(self, path):
1672         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1673
1674     @synchronized
1675     def _import_manifest(self, manifest_text):
1676         """Import a manifest into a `Collection`.
1677
1678         :manifest_text:
1679           The manifest text to import from.
1680
1681         """
1682         if len(self) > 0:
1683             raise ArgumentError("Can only import manifest into an empty collection")
1684
1685         STREAM_NAME = 0
1686         BLOCKS = 1
1687         SEGMENTS = 2
1688
1689         stream_name = None
1690         state = STREAM_NAME
1691
1692         for token_and_separator in self._token_re.finditer(manifest_text):
1693             tok = token_and_separator.group(1)
1694             sep = token_and_separator.group(2)
1695
1696             if state == STREAM_NAME:
1697                 # starting a new stream
1698                 stream_name = self._unescape_manifest_path(tok)
1699                 blocks = []
1700                 segments = []
1701                 streamoffset = 0
1702                 state = BLOCKS
1703                 self.find_or_create(stream_name, COLLECTION)
1704                 continue
1705
1706             if state == BLOCKS:
1707                 block_locator = self._block_re.match(tok)
1708                 if block_locator:
1709                     blocksize = int(block_locator.group(1))
1710                     blocks.append(streams.Range(tok, streamoffset, blocksize, 0))
1711                     streamoffset += blocksize
1712                 else:
1713                     state = SEGMENTS
1714
1715             if state == SEGMENTS:
1716                 file_segment = self._segment_re.match(tok)
1717                 if file_segment:
1718                     pos = int(file_segment.group(1))
1719                     size = int(file_segment.group(2))
1720                     name = self._unescape_manifest_path(file_segment.group(3))
1721                     if name.split('/')[-1] == '.':
1722                         # placeholder for persisting an empty directory, not a real file
1723                         if len(name) > 2:
1724                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1725                     else:
1726                         filepath = os.path.join(stream_name, name)
1727                         try:
1728                             afile = self.find_or_create(filepath, FILE)
1729                         except IOError as e:
1730                             if e.errno == errno.ENOTDIR:
1731                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1732                             else:
1733                                 raise e from None
1734                         if isinstance(afile, ArvadosFile):
1735                             afile.add_segment(blocks, pos, size)
1736                         else:
1737                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1738                 else:
1739                     # error!
1740                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1741
1742             if sep == "\n":
1743                 stream_name = None
1744                 state = STREAM_NAME
1745
1746         self.set_committed(True)
1747
1748     @synchronized
1749     def notify(
1750             self,
1751             event: ChangeType,
1752             collection: 'RichCollectionBase',
1753             name: str,
1754             item: CollectionItem,
1755     ) -> None:
1756         if self._callback:
1757             self._callback(event, collection, name, item)
1758
1759
1760 class Subcollection(RichCollectionBase):
1761     """Read and manipulate a stream/directory within an Arvados collection
1762
1763     This class represents a single stream (like a directory) within an Arvados
1764     `Collection`. It is returned by `Collection.find` and provides the same API.
1765     Operations that work on the API collection record propagate to the parent
1766     `Collection` object.
1767     """
1768
1769     def __init__(self, parent, name):
1770         super(Subcollection, self).__init__(parent)
1771         self.lock = self.root_collection().lock
1772         self._manifest_text = None
1773         self.name = name
1774         self.num_retries = parent.num_retries
1775
1776     def root_collection(self) -> 'Collection':
1777         return self.parent.root_collection()
1778
1779     def writable(self) -> bool:
1780         return self.root_collection().writable()
1781
1782     def _my_api(self):
1783         return self.root_collection()._my_api()
1784
1785     def _my_keep(self):
1786         return self.root_collection()._my_keep()
1787
1788     def _my_block_manager(self):
1789         return self.root_collection()._my_block_manager()
1790
1791     def stream_name(self) -> str:
1792         return os.path.join(self.parent.stream_name(), self.name)
1793
1794     @synchronized
1795     def clone(
1796             self,
1797             new_parent: Optional['Collection']=None,
1798             new_name: Optional[str]=None,
1799     ) -> 'Subcollection':
1800         c = Subcollection(new_parent, new_name)
1801         c._clonefrom(self)
1802         return c
1803
1804     @must_be_writable
1805     @synchronized
1806     def _reparent(self, newparent, newname):
1807         self.set_committed(False)
1808         self.flush()
1809         self.parent.remove(self.name, recursive=True)
1810         self.parent = newparent
1811         self.name = newname
1812         self.lock = self.parent.root_collection().lock
1813
1814     @synchronized
1815     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1816         """Encode empty directories by using an \056-named (".") empty file"""
1817         if len(self._items) == 0:
1818             return "%s %s 0:0:\\056\n" % (
1819                 streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1820         return super(Subcollection, self)._get_manifest_text(stream_name,
1821                                                              strip, normalize,
1822                                                              only_committed)
1823
1824
1825 class CollectionReader(Collection):
1826     """Read-only `Collection` subclass
1827
1828     This class will never create or update any API collection records. You can
1829     use this class for additional code safety when you only need to read
1830     existing collections.
1831     """
1832     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1833         self._in_init = True
1834         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1835         self._in_init = False
1836
1837         # Forego any locking since it should never change once initialized.
1838         self.lock = NoopLock()
1839
1840         # Backwards compatability with old CollectionReader
1841         # all_streams() and all_files()
1842         self._streams = None
1843
1844     def writable(self) -> bool:
1845         return self._in_init