]> git.arvados.org - arvados.git/blob - sdk/python/arvados/collection.py
23009: Don't skip initialization of checkedList when not empty
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
5
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
11
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
13 """
14
15 import ciso8601
16 import datetime
17 import errno
18 import functools
19 import hashlib
20 import io
21 import logging
22 import os
23 import re
24 import sys
25 import threading
26 import time
27
28 from collections import deque
29 from stat import *
30
31 from ._internal import streams
32 from .api import ThreadSafeAPIClient
33 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock, ADD, DEL, MOD, TOK, WRITE
34 from .keep import KeepLocator, KeepClient
35 import arvados.config as config
36 import arvados.errors as errors
37 import arvados.util
38 import arvados.events as events
39 from arvados.retry import retry_method
40
41 from typing import (
42     Any,
43     Callable,
44     Dict,
45     IO,
46     Iterator,
47     List,
48     Mapping,
49     Optional,
50     Tuple,
51     Union,
52 )
53
54 if sys.version_info < (3, 8):
55     from typing_extensions import Literal
56 else:
57     from typing import Literal
58
59 _logger = logging.getLogger('arvados.collection')
60
61
62 FILE = "file"
63 """`create_type` value for `Collection.find_or_create`"""
64 COLLECTION = "collection"
65 """`create_type` value for `Collection.find_or_create`"""
66
67 ChangeList = List[Union[
68     Tuple[Literal[ADD, DEL], str, 'Collection'],
69     Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
70 ]]
71 ChangeType = Literal[ADD, DEL, MOD, TOK]
72 CollectionItem = Union[ArvadosFile, 'Collection']
73 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
74 CreateType = Literal[COLLECTION, FILE]
75 Properties = Dict[str, Any]
76 StorageClasses = List[str]
77
78 class CollectionBase(object):
79     """Abstract base class for Collection classes
80
81     .. ATTENTION:: Internal
82        This class is meant to be used by other parts of the SDK. User code
83        should instantiate or subclass `Collection` or one of its subclasses
84        directly.
85     """
86
87     def __enter__(self):
88         """Enter a context block with this collection instance"""
89         return self
90
91     def __exit__(self, exc_type, exc_value, traceback):
92         """Exit a context block with this collection instance"""
93         pass
94
95     def _my_keep(self):
96         if self._keep_client is None:
97             self._keep_client = KeepClient(api_client=self._api_client,
98                                            num_retries=self.num_retries)
99         return self._keep_client
100
101     def stripped_manifest(self) -> str:
102         """Create a copy of the collection manifest with only size hints
103
104         This method returns a string with the current collection's manifest
105         text with all non-portable locator hints like permission hints and
106         remote cluster hints removed. The only hints in the returned manifest
107         will be size hints.
108         """
109         raw = self.manifest_text()
110         clean = []
111         for line in raw.split("\n"):
112             fields = line.split()
113             if fields:
114                 clean_fields = fields[:1] + [
115                     (re.sub(r'\+[^\d][^\+]*', '', x)
116                      if re.match(arvados.util.keep_locator_pattern, x)
117                      else x)
118                     for x in fields[1:]]
119                 clean += [' '.join(clean_fields), "\n"]
120         return ''.join(clean)
121
122
123 class _WriterFile(_FileLikeObjectBase):
124     def __init__(self, coll_writer, name):
125         super(_WriterFile, self).__init__(name, 'wb')
126         self.dest = coll_writer
127
128     def close(self):
129         super(_WriterFile, self).close()
130         self.dest.finish_current_file()
131
132     @_FileLikeObjectBase._before_close
133     def write(self, data):
134         self.dest.write(data)
135
136     @_FileLikeObjectBase._before_close
137     def writelines(self, seq):
138         for data in seq:
139             self.write(data)
140
141     @_FileLikeObjectBase._before_close
142     def flush(self):
143         self.dest.flush_data()
144
145
146 class RichCollectionBase(CollectionBase):
147     """Base class for Collection classes
148
149     .. ATTENTION:: Internal
150        This class is meant to be used by other parts of the SDK. User code
151        should instantiate or subclass `Collection` or one of its subclasses
152        directly.
153     """
154
155     def __init__(self, parent=None):
156         self.parent = parent
157         self._committed = False
158         self._has_remote_blocks = False
159         self._callback = None
160         self._items = {}
161
162     def _my_api(self):
163         raise NotImplementedError()
164
165     def _my_keep(self):
166         raise NotImplementedError()
167
168     def _my_block_manager(self):
169         raise NotImplementedError()
170
171     def writable(self) -> bool:
172         """Indicate whether this collection object can be modified
173
174         This method returns `False` if this object is a `CollectionReader`,
175         else `True`.
176         """
177         raise NotImplementedError()
178
179     def root_collection(self) -> 'Collection':
180         """Get this collection's root collection object
181
182         If you open a subcollection with `Collection.find`, calling this method
183         on that subcollection returns the source Collection object.
184         """
185         raise NotImplementedError()
186
187     def stream_name(self) -> str:
188         """Get the name of the manifest stream represented by this collection
189
190         If you open a subcollection with `Collection.find`, calling this method
191         on that subcollection returns the name of the stream you opened.
192         """
193         raise NotImplementedError()
194
195     @synchronized
196     def has_remote_blocks(self) -> bool:
197         """Indiciate whether the collection refers to remote data
198
199         Returns `True` if the collection manifest includes any Keep locators
200         with a remote hint (`+R`), else `False`.
201         """
202         if self._has_remote_blocks:
203             return True
204         for item in self:
205             if self[item].has_remote_blocks():
206                 return True
207         return False
208
209     @synchronized
210     def set_has_remote_blocks(self, val: bool) -> None:
211         """Cache whether this collection refers to remote blocks
212
213         .. ATTENTION:: Internal
214            This method is only meant to be used by other Collection methods.
215
216         Set this collection's cached "has remote blocks" flag to the given
217         value.
218         """
219         self._has_remote_blocks = val
220         if self.parent:
221             self.parent.set_has_remote_blocks(val)
222
223     @must_be_writable
224     @synchronized
225     def find_or_create(
226             self,
227             path: str,
228             create_type: CreateType,
229     ) -> CollectionItem:
230         """Get the item at the given path, creating it if necessary
231
232         If `path` refers to a stream in this collection, returns a
233         corresponding `Subcollection` object. If `path` refers to a file in
234         this collection, returns a corresponding
235         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
236         this collection, then this method creates a new object and returns
237         it, creating parent streams as needed. The type of object created is
238         determined by the value of `create_type`.
239
240         Arguments:
241
242         * path: str --- The path to find or create within this collection.
243
244         * create_type: Literal[COLLECTION, FILE] --- The type of object to
245           create at `path` if one does not exist. Passing `COLLECTION`
246           creates a stream and returns the corresponding
247           `Subcollection`. Passing `FILE` creates a new file and returns the
248           corresponding `arvados.arvfile.ArvadosFile`.
249         """
250         pathcomponents = path.split("/", 1)
251         if pathcomponents[0]:
252             item = self._items.get(pathcomponents[0])
253             if len(pathcomponents) == 1:
254                 if item is None:
255                     # create new file
256                     if create_type == COLLECTION:
257                         item = Subcollection(self, pathcomponents[0])
258                     else:
259                         item = ArvadosFile(self, pathcomponents[0])
260                     self._items[pathcomponents[0]] = item
261                     self.set_committed(False)
262                     self.notify(ADD, self, pathcomponents[0], item)
263                 return item
264             else:
265                 if item is None:
266                     # create new collection
267                     item = Subcollection(self, pathcomponents[0])
268                     self._items[pathcomponents[0]] = item
269                     self.set_committed(False)
270                     self.notify(ADD, self, pathcomponents[0], item)
271                 if isinstance(item, RichCollectionBase):
272                     return item.find_or_create(pathcomponents[1], create_type)
273                 else:
274                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
275         else:
276             return self
277
278     @synchronized
279     def find(self, path: str) -> CollectionItem:
280         """Get the item at the given path
281
282         If `path` refers to a stream in this collection, returns a
283         corresponding `Subcollection` object. If `path` refers to a file in
284         this collection, returns a corresponding
285         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
286         this collection, then this method raises `NotADirectoryError`.
287
288         Arguments:
289
290         * path: str --- The path to find or create within this collection.
291         """
292         if not path:
293             raise errors.ArgumentError("Parameter 'path' is empty.")
294
295         pathcomponents = path.split("/", 1)
296         if pathcomponents[0] == '':
297             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
298
299         item = self._items.get(pathcomponents[0])
300         if item is None:
301             return None
302         elif len(pathcomponents) == 1:
303             return item
304         else:
305             if isinstance(item, RichCollectionBase):
306                 if pathcomponents[1]:
307                     return item.find(pathcomponents[1])
308                 else:
309                     return item
310             else:
311                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
312
313     @synchronized
314     def mkdirs(self, path: str) -> 'Subcollection':
315         """Create and return a subcollection at `path`
316
317         If `path` exists within this collection, raises `FileExistsError`.
318         Otherwise, creates a stream at that path and returns the
319         corresponding `Subcollection`.
320         """
321         if self.find(path) != None:
322             raise IOError(errno.EEXIST, "Directory or file exists", path)
323
324         return self.find_or_create(path, COLLECTION)
325
326     def open(
327             self,
328             path: str,
329             mode: str="r",
330             encoding: Optional[str]=None
331     ) -> IO:
332         """Open a file-like object within the collection
333
334         This method returns a file-like object that can read and/or write the
335         file located at `path` within the collection. If you attempt to write
336         a `path` that does not exist, the file is created with `find_or_create`.
337         If the file cannot be opened for any other reason, this method raises
338         `OSError` with an appropriate errno.
339
340         Arguments:
341
342         * path: str --- The path of the file to open within this collection
343
344         * mode: str --- The mode to open this file. Supports all the same
345           values as `builtins.open`.
346
347         * encoding: str | None --- The text encoding of the file. Only used
348           when the file is opened in text mode. The default is
349           platform-dependent.
350
351         """
352         if not re.search(r'^[rwa][bt]?\+?$', mode):
353             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
354
355         if mode[0] == 'r' and '+' not in mode:
356             fclass = ArvadosFileReader
357             arvfile = self.find(path)
358         elif not self.writable():
359             raise IOError(errno.EROFS, "Collection is read only")
360         else:
361             fclass = ArvadosFileWriter
362             arvfile = self.find_or_create(path, FILE)
363
364         if arvfile is None:
365             raise IOError(errno.ENOENT, "File not found", path)
366         if not isinstance(arvfile, ArvadosFile):
367             raise IOError(errno.EISDIR, "Is a directory", path)
368
369         if mode[0] == 'w':
370             arvfile.truncate(0)
371
372         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
373         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
374         if 'b' not in mode:
375             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
376             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
377         return f
378
379     def modified(self) -> bool:
380         """Indicate whether this collection has an API server record
381
382         Returns `False` if this collection corresponds to a record loaded from
383         the API server, `True` otherwise.
384         """
385         return not self.committed()
386
387     @synchronized
388     def committed(self):
389         """Indicate whether this collection has an API server record
390
391         Returns `True` if this collection corresponds to a record loaded from
392         the API server, `False` otherwise.
393         """
394         return self._committed
395
396     @synchronized
397     def set_committed(self, value: bool=True):
398         """Cache whether this collection has an API server record
399
400         .. ATTENTION:: Internal
401            This method is only meant to be used by other Collection methods.
402
403         Set this collection's cached "committed" flag to the given
404         value and propagates it as needed.
405         """
406         if value == self._committed:
407             return
408         if value:
409             for k,v in self._items.items():
410                 v.set_committed(True)
411             self._committed = True
412         else:
413             self._committed = False
414             if self.parent is not None:
415                 self.parent.set_committed(False)
416
417     @synchronized
418     def __iter__(self) -> Iterator[str]:
419         """Iterate names of streams and files in this collection
420
421         This method does not recurse. It only iterates the contents of this
422         collection's corresponding stream.
423         """
424         return iter(self._items)
425
426     @synchronized
427     def __getitem__(self, k: str) -> CollectionItem:
428         """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
429
430         This method does not recurse. If you want to search a path, use
431         `RichCollectionBase.find` instead.
432         """
433         return self._items[k]
434
435     @synchronized
436     def __contains__(self, k: str) -> bool:
437         """Indicate whether this collection has an item with this name
438
439         This method does not recurse. It you want to check a path, use
440         `RichCollectionBase.exists` instead.
441         """
442         return k in self._items
443
444     @synchronized
445     def __len__(self):
446         """Get the number of items directly contained in this collection
447
448         This method does not recurse. It only counts the streams and files
449         in this collection's corresponding stream.
450         """
451         return len(self._items)
452
453     @must_be_writable
454     @synchronized
455     def __delitem__(self, p: str) -> None:
456         """Delete an item from this collection's stream
457
458         This method does not recurse. If you want to remove an item by a
459         path, use `RichCollectionBase.remove` instead.
460         """
461         del self._items[p]
462         self.set_committed(False)
463         self.notify(DEL, self, p, None)
464
465     @synchronized
466     def keys(self) -> Iterator[str]:
467         """Iterate names of streams and files in this collection
468
469         This method does not recurse. It only iterates the contents of this
470         collection's corresponding stream.
471         """
472         return self._items.keys()
473
474     @synchronized
475     def values(self) -> List[CollectionItem]:
476         """Get a list of objects in this collection's stream
477
478         The return value includes a `Subcollection` for every stream, and an
479         `arvados.arvfile.ArvadosFile` for every file, directly within this
480         collection's stream.  This method does not recurse.
481         """
482         return list(self._items.values())
483
484     @synchronized
485     def items(self) -> List[Tuple[str, CollectionItem]]:
486         """Get a list of `(name, object)` tuples from this collection's stream
487
488         The return value includes a `Subcollection` for every stream, and an
489         `arvados.arvfile.ArvadosFile` for every file, directly within this
490         collection's stream.  This method does not recurse.
491         """
492         return list(self._items.items())
493
494     def exists(self, path: str) -> bool:
495         """Indicate whether this collection includes an item at `path`
496
497         This method returns `True` if `path` refers to a stream or file within
498         this collection, else `False`.
499
500         Arguments:
501
502         * path: str --- The path to check for existence within this collection
503         """
504         return self.find(path) is not None
505
506     @must_be_writable
507     @synchronized
508     def remove(self, path: str, recursive: bool=False) -> None:
509         """Remove the file or stream at `path`
510
511         Arguments:
512
513         * path: str --- The path of the item to remove from the collection
514
515         * recursive: bool --- Controls the method's behavior if `path` refers
516           to a nonempty stream. If `False` (the default), this method raises
517           `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
518           items under the stream.
519         """
520         if not path:
521             raise errors.ArgumentError("Parameter 'path' is empty.")
522
523         pathcomponents = path.split("/", 1)
524         item = self._items.get(pathcomponents[0])
525         if item is None:
526             raise IOError(errno.ENOENT, "File not found", path)
527         if len(pathcomponents) == 1:
528             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
529                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
530             deleteditem = self._items[pathcomponents[0]]
531             del self._items[pathcomponents[0]]
532             self.set_committed(False)
533             self.notify(DEL, self, pathcomponents[0], deleteditem)
534         else:
535             item.remove(pathcomponents[1], recursive=recursive)
536
537     def _clonefrom(self, source):
538         for k,v in source.items():
539             self._items[k] = v.clone(self, k)
540
541     def clone(self):
542         raise NotImplementedError()
543
544     @must_be_writable
545     @synchronized
546     def add(
547             self,
548             source_obj: CollectionItem,
549             target_name: str,
550             overwrite: bool=False,
551             reparent: bool=False,
552     ) -> None:
553         """Copy or move a file or subcollection object to this collection
554
555         Arguments:
556
557         * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
558           to add to this collection
559
560         * target_name: str --- The path inside this collection where
561           `source_obj` should be added.
562
563         * overwrite: bool --- Controls the behavior of this method when the
564           collection already contains an object at `target_name`. If `False`
565           (the default), this method will raise `FileExistsError`. If `True`,
566           the object at `target_name` will be replaced with `source_obj`.
567
568         * reparent: bool --- Controls whether this method copies or moves
569           `source_obj`. If `False` (the default), `source_obj` is copied into
570           this collection. If `True`, `source_obj` is moved into this
571           collection.
572         """
573         if target_name in self and not overwrite:
574             raise IOError(errno.EEXIST, "File already exists", target_name)
575
576         modified_from = None
577         if target_name in self:
578             modified_from = self[target_name]
579
580         # Actually make the move or copy.
581         if reparent:
582             source_obj._reparent(self, target_name)
583             item = source_obj
584         else:
585             item = source_obj.clone(self, target_name)
586
587         self._items[target_name] = item
588         self.set_committed(False)
589         if not self._has_remote_blocks and source_obj.has_remote_blocks():
590             self.set_has_remote_blocks(True)
591
592         if modified_from:
593             self.notify(MOD, self, target_name, (modified_from, item))
594         else:
595             self.notify(ADD, self, target_name, item)
596
597     def _get_src_target(self, source, target_path, source_collection, create_dest):
598         if source_collection is None:
599             source_collection = self
600
601         # Find the object
602         if isinstance(source, str):
603             source_obj = source_collection.find(source)
604             if source_obj is None:
605                 raise IOError(errno.ENOENT, "File not found", source)
606             sourcecomponents = source.split("/")
607         else:
608             source_obj = source
609             sourcecomponents = None
610
611         # Find parent collection the target path
612         targetcomponents = target_path.split("/")
613
614         # Determine the name to use.
615         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
616
617         if not target_name:
618             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
619
620         if create_dest:
621             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
622         else:
623             if len(targetcomponents) > 1:
624                 target_dir = self.find("/".join(targetcomponents[0:-1]))
625             else:
626                 target_dir = self
627
628         if target_dir is None:
629             raise IOError(errno.ENOENT, "Target directory not found", target_name)
630
631         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
632             target_dir = target_dir[target_name]
633             target_name = sourcecomponents[-1]
634
635         return (source_obj, target_dir, target_name)
636
637     @must_be_writable
638     @synchronized
639     def copy(
640             self,
641             source: Union[str, CollectionItem],
642             target_path: str,
643             source_collection: Optional['RichCollectionBase']=None,
644             overwrite: bool=False,
645     ) -> None:
646         """Copy a file or subcollection object to this collection
647
648         Arguments:
649
650         * source: str | arvados.arvfile.ArvadosFile |
651           arvados.collection.Subcollection --- The file or subcollection to
652           add to this collection. If `source` is a str, the object will be
653           found by looking up this path from `source_collection` (see
654           below).
655
656         * target_path: str --- The path inside this collection where the
657           source object should be added.
658
659         * source_collection: arvados.collection.Collection | None --- The
660           collection to find the source object from when `source` is a
661           path. Defaults to the current collection (`self`).
662
663         * overwrite: bool --- Controls the behavior of this method when the
664           collection already contains an object at `target_path`. If `False`
665           (the default), this method will raise `FileExistsError`. If `True`,
666           the object at `target_path` will be replaced with `source_obj`.
667         """
668         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
669         target_dir.add(source_obj, target_name, overwrite, False)
670
671     @must_be_writable
672     @synchronized
673     def rename(
674             self,
675             source: Union[str, CollectionItem],
676             target_path: str,
677             source_collection: Optional['RichCollectionBase']=None,
678             overwrite: bool=False,
679     ) -> None:
680         """Move a file or subcollection object to this collection
681
682         Arguments:
683
684         * source: str | arvados.arvfile.ArvadosFile |
685           arvados.collection.Subcollection --- The file or subcollection to
686           add to this collection. If `source` is a str, the object will be
687           found by looking up this path from `source_collection` (see
688           below).
689
690         * target_path: str --- The path inside this collection where the
691           source object should be added.
692
693         * source_collection: arvados.collection.Collection | None --- The
694           collection to find the source object from when `source` is a
695           path. Defaults to the current collection (`self`).
696
697         * overwrite: bool --- Controls the behavior of this method when the
698           collection already contains an object at `target_path`. If `False`
699           (the default), this method will raise `FileExistsError`. If `True`,
700           the object at `target_path` will be replaced with `source_obj`.
701         """
702         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
703         if not source_obj.writable():
704             raise IOError(errno.EROFS, "Source collection is read only", source)
705         target_dir.add(source_obj, target_name, overwrite, True)
706
707     def portable_manifest_text(self, stream_name: str=".") -> str:
708         """Get the portable manifest text for this collection
709
710         The portable manifest text is normalized, and does not include access
711         tokens. This method does not flush outstanding blocks to Keep.
712
713         Arguments:
714
715         * stream_name: str --- The name to use for this collection's stream in
716           the generated manifest. Default `'.'`.
717         """
718         return self._get_manifest_text(stream_name, True, True)
719
720     @synchronized
721     def manifest_text(
722             self,
723             stream_name: str=".",
724             strip: bool=False,
725             normalize: bool=False,
726             only_committed: bool=False,
727     ) -> str:
728         """Get the manifest text for this collection
729
730         Arguments:
731
732         * stream_name: str --- The name to use for this collection's stream in
733           the generated manifest. Default `'.'`.
734
735         * strip: bool --- Controls whether or not the returned manifest text
736           includes access tokens. If `False` (the default), the manifest text
737           will include access tokens. If `True`, the manifest text will not
738           include access tokens.
739
740         * normalize: bool --- Controls whether or not the returned manifest
741           text is normalized. Default `False`.
742
743         * only_committed: bool --- Controls whether or not this method uploads
744           pending data to Keep before building and returning the manifest text.
745           If `False` (the default), this method will finish uploading all data
746           to Keep, then return the final manifest. If `True`, this method will
747           build and return a manifest that only refers to the data that has
748           finished uploading at the time this method was called.
749         """
750         if not only_committed:
751             self._my_block_manager().commit_all()
752         return self._get_manifest_text(stream_name, strip, normalize,
753                                        only_committed=only_committed)
754
755     @synchronized
756     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
757         """Get the manifest text for this collection, sub collections and files.
758
759         :stream_name:
760           Name to use for this stream (directory)
761
762         :strip:
763           If True, remove signing tokens from block locators if present.
764           If False (default), block locators are left unchanged.
765
766         :normalize:
767           If True, always export the manifest text in normalized form
768           even if the Collection is not modified.  If False (default) and the collection
769           is not modified, return the original manifest text even if it is not
770           in normalized form.
771
772         :only_committed:
773           If True, only include blocks that were already committed to Keep.
774
775         """
776
777         if not self.committed() or self._manifest_text is None or normalize:
778             stream = {}
779             buf = []
780             sorted_keys = sorted(self.keys())
781             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
782                 # Create a stream per file `k`
783                 arvfile = self[filename]
784                 filestream = []
785                 for segment in arvfile.segments():
786                     loc = segment.locator
787                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
788                         if only_committed:
789                             continue
790                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
791                     if strip:
792                         loc = KeepLocator(loc).stripped()
793                     filestream.append(streams.LocatorAndRange(
794                         loc,
795                         KeepLocator(loc).size,
796                         segment.segment_offset,
797                         segment.range_size,
798                     ))
799                 stream[filename] = filestream
800             if stream:
801                 buf.append(" ".join(streams.normalize_stream(stream_name, stream)) + "\n")
802             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
803                 buf.append(self[dirname].manifest_text(
804                     stream_name=os.path.join(stream_name, dirname),
805                     strip=strip, normalize=True, only_committed=only_committed))
806             return "".join(buf)
807         else:
808             if strip:
809                 return self.stripped_manifest()
810             else:
811                 return self._manifest_text
812
813     @synchronized
814     def _copy_remote_blocks(self, remote_blocks={}):
815         """Scan through the entire collection and ask Keep to copy remote blocks.
816
817         When accessing a remote collection, blocks will have a remote signature
818         (+R instead of +A). Collect these signatures and request Keep to copy the
819         blocks to the local cluster, returning local (+A) signatures.
820
821         :remote_blocks:
822           Shared cache of remote to local block mappings. This is used to avoid
823           doing extra work when blocks are shared by more than one file in
824           different subdirectories.
825
826         """
827         for item in self:
828             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
829         return remote_blocks
830
831     @synchronized
832     def diff(
833             self,
834             end_collection: 'RichCollectionBase',
835             prefix: str=".",
836             holding_collection: Optional['Collection']=None,
837     ) -> ChangeList:
838         """Build a list of differences between this collection and another
839
840         Arguments:
841
842         * end_collection: arvados.collection.RichCollectionBase --- A
843           collection object with the desired end state. The returned diff
844           list will describe how to go from the current collection object
845           `self` to `end_collection`.
846
847         * prefix: str --- The name to use for this collection's stream in
848           the diff list. Default `'.'`.
849
850         * holding_collection: arvados.collection.Collection | None --- A
851           collection object used to hold objects for the returned diff
852           list. By default, a new empty collection is created.
853         """
854         changes = []
855         if holding_collection is None:
856             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
857         for k in self:
858             if k not in end_collection:
859                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
860         for k in end_collection:
861             if k in self:
862                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
863                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
864                 elif end_collection[k] != self[k]:
865                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
866                 else:
867                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
868             else:
869                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
870         return changes
871
872     @must_be_writable
873     @synchronized
874     def apply(self, changes: ChangeList) -> None:
875         """Apply a list of changes from to this collection
876
877         This method takes a list of changes generated by
878         `RichCollectionBase.diff` and applies it to this
879         collection. Afterward, the state of this collection object will
880         match the state of `end_collection` passed to `diff`. If a change
881         conflicts with a local change, it will be saved to an alternate path
882         indicating the conflict.
883
884         Arguments:
885
886         * changes: arvados.collection.ChangeList --- The list of differences
887           generated by `RichCollectionBase.diff`.
888         """
889         if changes:
890             self.set_committed(False)
891         for change in changes:
892             event_type = change[0]
893             path = change[1]
894             initial = change[2]
895             local = self.find(path)
896             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
897                                                                     time.gmtime()))
898             if event_type == ADD:
899                 if local is None:
900                     # No local file at path, safe to copy over new file
901                     self.copy(initial, path)
902                 elif local is not None and local != initial:
903                     # There is already local file and it is different:
904                     # save change to conflict file.
905                     self.copy(initial, conflictpath)
906             elif event_type == MOD or event_type == TOK:
907                 final = change[3]
908                 if local == initial:
909                     # Local matches the "initial" item so it has not
910                     # changed locally and is safe to update.
911                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
912                         # Replace contents of local file with new contents
913                         local.replace_contents(final)
914                     else:
915                         # Overwrite path with new item; this can happen if
916                         # path was a file and is now a collection or vice versa
917                         self.copy(final, path, overwrite=True)
918                 elif event_type == MOD:
919                     # Local doesn't match the "start" value or local
920                     # is missing (presumably deleted) so save change
921                     # to conflict file.  Don't do this for TOK events
922                     # which means the file didn't change but only had
923                     # tokens updated.
924                     self.copy(final, conflictpath)
925             elif event_type == DEL:
926                 if local == initial:
927                     # Local item matches "initial" value, so it is safe to remove.
928                     self.remove(path, recursive=True)
929                 # else, the file is modified or already removed, in either
930                 # case we don't want to try to remove it.
931
932     def portable_data_hash(self) -> str:
933         """Get the portable data hash for this collection's manifest"""
934         if self._manifest_locator and self.committed():
935             # If the collection is already saved on the API server, and it's committed
936             # then return API server's PDH response.
937             return self._portable_data_hash
938         else:
939             stripped = self.portable_manifest_text().encode()
940             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
941
942     @synchronized
943     def subscribe(self, callback: ChangeCallback) -> None:
944         """Set a notify callback for changes to this collection
945
946         Arguments:
947
948         * callback: arvados.collection.ChangeCallback --- The callable to
949           call each time the collection is changed.
950         """
951         if self._callback is None:
952             self._callback = callback
953         else:
954             raise errors.ArgumentError("A callback is already set on this collection.")
955
956     @synchronized
957     def unsubscribe(self) -> None:
958         """Remove any notify callback set for changes to this collection"""
959         if self._callback is not None:
960             self._callback = None
961
962     @synchronized
963     def notify(
964             self,
965             event: ChangeType,
966             collection: 'RichCollectionBase',
967             name: str,
968             item: CollectionItem,
969     ) -> None:
970         """Notify any subscribed callback about a change to this collection
971
972         .. ATTENTION:: Internal
973            This method is only meant to be used by other Collection methods.
974
975         If a callback has been registered with `RichCollectionBase.subscribe`,
976         it will be called with information about a change to this collection.
977         Then this notification will be propagated to this collection's root.
978
979         Arguments:
980
981         * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
982           the collection.
983
984         * collection: arvados.collection.RichCollectionBase --- The
985           collection that was modified.
986
987         * name: str --- The name of the file or stream within `collection` that
988           was modified.
989
990         * item: arvados.arvfile.ArvadosFile |
991           arvados.collection.Subcollection --- For ADD events, the new
992           contents at `name` within `collection`; for DEL events, the
993           item that was removed.  For MOD and TOK events, a 2-tuple of
994           the previous item and the new item (may be the same object
995           or different, depending on whether the action involved it
996           being modified in place or replaced).
997
998         """
999         if self._callback:
1000             self._callback(event, collection, name, item)
1001         self.root_collection().notify(event, collection, name, item)
1002
1003     @synchronized
1004     def __eq__(self, other: Any) -> bool:
1005         """Indicate whether this collection object is equal to another"""
1006         if other is self:
1007             return True
1008         if not isinstance(other, RichCollectionBase):
1009             return False
1010         if len(self._items) != len(other):
1011             return False
1012         for k in self._items:
1013             if k not in other:
1014                 return False
1015             if self._items[k] != other[k]:
1016                 return False
1017         return True
1018
1019     def __ne__(self, other: Any) -> bool:
1020         """Indicate whether this collection object is not equal to another"""
1021         return not self.__eq__(other)
1022
1023     @synchronized
1024     def flush(self) -> None:
1025         """Upload any pending data to Keep"""
1026         for e in self.values():
1027             e.flush()
1028
1029
1030 class Collection(RichCollectionBase):
1031     """Read and manipulate an Arvados collection
1032
1033     This class provides a high-level interface to create, read, and update
1034     Arvados collections and their contents. Refer to the Arvados Python SDK
1035     cookbook for [an introduction to using the Collection class][cookbook].
1036
1037     [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1038     """
1039
1040     def __init__(self, manifest_locator_or_text: Optional[str]=None,
1041                  api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1042                  keep_client: Optional['arvados.keep.KeepClient']=None,
1043                  num_retries: int=10,
1044                  parent: Optional['Collection']=None,
1045                  apiconfig: Optional[Mapping[str, str]]=None,
1046                  block_manager: Optional['arvados.arvfile._BlockManager']=None,
1047                  replication_desired: Optional[int]=None,
1048                  storage_classes_desired: Optional[List[str]]=None,
1049                  put_threads: Optional[int]=None):
1050         """Initialize a Collection object
1051
1052         Arguments:
1053
1054         * manifest_locator_or_text: str | None --- This string can contain a
1055           collection manifest text, portable data hash, or UUID. When given a
1056           portable data hash or UUID, this instance will load a collection
1057           record from the API server. Otherwise, this instance will represent a
1058           new collection without an API server record. The default value `None`
1059           instantiates a new collection with an empty manifest.
1060
1061         * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1062           Arvados API client object this instance uses to make requests. If
1063           none is given, this instance creates its own client using the
1064           settings from `apiconfig` (see below). If your client instantiates
1065           many Collection objects, you can help limit memory utilization by
1066           calling `arvados.api.api` to construct an
1067           `arvados.api.ThreadSafeAPIClient`, and use that as the `api_client`
1068           for every Collection.
1069
1070         * keep_client: arvados.keep.KeepClient | None --- The Keep client
1071           object this instance uses to make requests. If none is given, this
1072           instance creates its own client using its `api_client`.
1073
1074         * num_retries: int --- The number of times that client requests are
1075           retried. Default 10.
1076
1077         * parent: arvados.collection.Collection | None --- The parent Collection
1078           object of this instance, if any. This argument is primarily used by
1079           other Collection methods; user client code shouldn't need to use it.
1080
1081         * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1082           `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1083           `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1084           Collection object constructs one from these settings. If no
1085           mapping is provided, calls `arvados.config.settings` to get these
1086           parameters from user configuration.
1087
1088         * block_manager: arvados.arvfile._BlockManager | None --- The
1089           _BlockManager object used by this instance to coordinate reading
1090           and writing Keep data blocks. If none is given, this instance
1091           constructs its own. This argument is primarily used by other
1092           Collection methods; user client code shouldn't need to use it.
1093
1094         * replication_desired: int | None --- This controls both the value of
1095           the `replication_desired` field on API collection records saved by
1096           this class, as well as the number of Keep services that the object
1097           writes new data blocks to. If none is given, uses the default value
1098           configured for the cluster.
1099
1100         * storage_classes_desired: list[str] | None --- This controls both
1101           the value of the `storage_classes_desired` field on API collection
1102           records saved by this class, as well as selecting which specific
1103           Keep services the object writes new data blocks to. If none is
1104           given, defaults to an empty list.
1105
1106         * put_threads: int | None --- The number of threads to run
1107           simultaneously to upload data blocks to Keep. This value is used when
1108           building a new `block_manager`. It is unused when a `block_manager`
1109           is provided.
1110         """
1111
1112         if storage_classes_desired and type(storage_classes_desired) is not list:
1113             raise errors.ArgumentError("storage_classes_desired must be list type.")
1114
1115         super(Collection, self).__init__(parent)
1116         self._api_client = api_client
1117         self._keep_client = keep_client
1118
1119         # Use the keep client from ThreadSafeAPIClient
1120         if self._keep_client is None and isinstance(self._api_client, ThreadSafeAPIClient):
1121             self._keep_client = self._api_client.keep
1122
1123         self._block_manager = block_manager
1124         self.replication_desired = replication_desired
1125         self._storage_classes_desired = storage_classes_desired
1126         self.put_threads = put_threads
1127
1128         if apiconfig:
1129             self._config = apiconfig
1130         else:
1131             self._config = config.settings()
1132
1133         self.num_retries = num_retries
1134         self._manifest_locator = None
1135         self._manifest_text = None
1136         self._portable_data_hash = None
1137         self._api_response = None
1138         self._token_refresh_timestamp = 0
1139
1140         self.lock = threading.RLock()
1141         self.events = None
1142
1143         if manifest_locator_or_text:
1144             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1145                 self._manifest_locator = manifest_locator_or_text
1146             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1147                 self._manifest_locator = manifest_locator_or_text
1148                 if not self._has_local_collection_uuid():
1149                     self._has_remote_blocks = True
1150             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1151                 self._manifest_text = manifest_locator_or_text
1152                 if '+R' in self._manifest_text:
1153                     self._has_remote_blocks = True
1154             else:
1155                 raise errors.ArgumentError(
1156                     "Argument to CollectionReader is not a manifest or a collection UUID")
1157
1158             try:
1159                 self._populate()
1160             except errors.SyntaxError as e:
1161                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1162
1163     def storage_classes_desired(self) -> List[str]:
1164         """Get this collection's `storage_classes_desired` value"""
1165         return self._storage_classes_desired or []
1166
1167     def root_collection(self) -> 'Collection':
1168         return self
1169
1170     def get_properties(self) -> Properties:
1171         """Get this collection's properties
1172
1173         This method always returns a dict. If this collection object does not
1174         have an associated API record, or that record does not have any
1175         properties set, this method returns an empty dict.
1176         """
1177         if self._api_response and self._api_response["properties"]:
1178             return self._api_response["properties"]
1179         else:
1180             return {}
1181
1182     def get_trash_at(self) -> Optional[datetime.datetime]:
1183         """Get this collection's `trash_at` field
1184
1185         This method parses the `trash_at` field of the collection's API
1186         record and returns a datetime from it. If that field is not set, or
1187         this collection object does not have an associated API record,
1188         returns None.
1189         """
1190         if self._api_response and self._api_response["trash_at"]:
1191             try:
1192                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1193             except ValueError:
1194                 return None
1195         else:
1196             return None
1197
1198     def stream_name(self) -> str:
1199         return "."
1200
1201     def writable(self) -> bool:
1202         return True
1203
1204     @synchronized
1205     @retry_method
1206     def update(
1207             self,
1208             other: Optional['Collection']=None,
1209             num_retries: Optional[int]=None,
1210     ) -> None:
1211         """Merge another collection's contents into this one
1212
1213         This method compares the manifest of this collection instance with
1214         another, then updates this instance's manifest with changes from the
1215         other, renaming files to flag conflicts where necessary.
1216
1217         When called without any arguments, this method reloads the collection's
1218         API record, and updates this instance with any changes that have
1219         appeared server-side. If this instance does not have a corresponding
1220         API record, this method raises `arvados.errors.ArgumentError`.
1221
1222         Arguments:
1223
1224         * other: arvados.collection.Collection | None --- The collection
1225           whose contents should be merged into this instance. When not
1226           provided, this method reloads this collection's API record and
1227           constructs a Collection object from it.  If this instance does not
1228           have a corresponding API record, this method raises
1229           `arvados.errors.ArgumentError`.
1230
1231         * num_retries: int | None --- The number of times to retry reloading
1232           the collection's API record from the API server. If not specified,
1233           uses the `num_retries` provided when this instance was constructed.
1234         """
1235
1236         token_refresh_period = 60*60
1237         time_since_last_token_refresh = (time.time() - self._token_refresh_timestamp)
1238         upstream_response = None
1239
1240         if other is None:
1241             if self._manifest_locator is None:
1242                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1243
1244             if re.match(arvados.util.portable_data_hash_pattern, self._manifest_locator) and time_since_last_token_refresh < token_refresh_period:
1245                 return
1246
1247             upstream_response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1248             other = CollectionReader(upstream_response["manifest_text"])
1249
1250         if self.committed():
1251             # 1st case, no local changes, content is the same
1252             if self.portable_data_hash() == other.portable_data_hash() and time_since_last_token_refresh < token_refresh_period:
1253                 # No difference in content.  Remember the API record
1254                 # (metadata such as name or properties may have changed)
1255                 # but don't update the token refresh timestamp.
1256                 if upstream_response is not None:
1257                     self._remember_api_response(upstream_response)
1258                 return
1259
1260             # 2nd case, no local changes, but either upstream changed
1261             # or we want to refresh tokens.
1262
1263             self.apply(self.diff(other))
1264             if upstream_response is not None:
1265                 self._remember_api_response(upstream_response)
1266             self._update_token_timestamp()
1267             self.set_committed(True)
1268             return
1269
1270         # 3rd case, upstream changed, but we also have uncommitted
1271         # changes that we want to incorporate so they don't get lost.
1272
1273         # _manifest_text stores the text from last time we received a
1274         # record from the API server.  This is the state of the
1275         # collection before our uncommitted changes.
1276         baseline = Collection(self._manifest_text)
1277
1278         # Get the set of changes between our baseline and the other
1279         # collection and apply them to self.
1280         #
1281         # If a file was modified in both 'self' and 'other', the
1282         # 'apply' method keeps the contents of 'self' and creates a
1283         # conflict file with the contents of 'other'.
1284         self.apply(baseline.diff(other))
1285
1286         # Remember the new baseline, changes to a file
1287         if upstream_response is not None:
1288             self._remember_api_response(upstream_response)
1289
1290
1291     @synchronized
1292     def _my_api(self):
1293         if self._api_client is None:
1294             self._api_client = ThreadSafeAPIClient(self._config, version='v1')
1295             if self._keep_client is None:
1296                 self._keep_client = self._api_client.keep
1297         return self._api_client
1298
1299     @synchronized
1300     def _my_keep(self):
1301         if self._keep_client is None:
1302             if self._api_client is None:
1303                 self._my_api()
1304             else:
1305                 self._keep_client = KeepClient(api_client=self._api_client)
1306         return self._keep_client
1307
1308     @synchronized
1309     def _my_block_manager(self):
1310         if self._block_manager is None:
1311             copies = (self.replication_desired or
1312                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1313                                                    2))
1314             self._block_manager = _BlockManager(self._my_keep(),
1315                                                 copies=copies,
1316                                                 put_threads=self.put_threads,
1317                                                 num_retries=self.num_retries,
1318                                                 storage_classes_func=self.storage_classes_desired)
1319         return self._block_manager
1320
1321     def _remember_api_response(self, response):
1322         self._api_response = response
1323         self._manifest_text = self._api_response['manifest_text']
1324         self._portable_data_hash = self._api_response['portable_data_hash']
1325
1326     def _update_token_timestamp(self):
1327         self._token_refresh_timestamp = time.time()
1328
1329     def _populate_from_api_server(self):
1330         # As in KeepClient itself, we must wait until the last
1331         # possible moment to instantiate an API client, in order to
1332         # avoid tripping up clients that don't have access to an API
1333         # server.  If we do build one, make sure our Keep client uses
1334         # it.  If instantiation fails, we'll fall back to the except
1335         # clause, just like any other Collection lookup
1336         # failure. Return an exception, or None if successful.
1337         self._remember_api_response(self._my_api().collections().get(
1338             uuid=self._manifest_locator).execute(
1339                 num_retries=self.num_retries))
1340
1341         # If not overriden via kwargs, we should try to load the
1342         # replication_desired and storage_classes_desired from the API server
1343         if self.replication_desired is None:
1344             self.replication_desired = self._api_response.get('replication_desired', None)
1345         if self._storage_classes_desired is None:
1346             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1347
1348     def _populate(self):
1349         if self._manifest_text is None:
1350             if self._manifest_locator is None:
1351                 return
1352             else:
1353                 self._populate_from_api_server()
1354         self._baseline_manifest = self._manifest_text
1355         self._import_manifest(self._manifest_text)
1356
1357     def _has_collection_uuid(self):
1358         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1359
1360     def _has_local_collection_uuid(self):
1361         return self._has_collection_uuid and \
1362             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1363
1364     def __enter__(self):
1365         return self
1366
1367     def __exit__(self, exc_type, exc_value, traceback):
1368         """Exit a context with this collection instance
1369
1370         If no exception was raised inside the context block, and this
1371         collection is writable and has a corresponding API record, that
1372         record will be updated to match the state of this instance at the end
1373         of the block.
1374         """
1375         if exc_type is None:
1376             if self.writable() and self._has_collection_uuid():
1377                 self.save()
1378         self.stop_threads()
1379
1380     def stop_threads(self) -> None:
1381         """Stop background Keep upload/download threads"""
1382         if self._block_manager is not None:
1383             self._block_manager.stop_threads()
1384
1385     @synchronized
1386     def manifest_locator(self) -> Optional[str]:
1387         """Get this collection's manifest locator, if any
1388
1389         * If this collection instance is associated with an API record with a
1390           UUID, return that.
1391         * Otherwise, if this collection instance was loaded from an API record
1392           by portable data hash, return that.
1393         * Otherwise, return `None`.
1394         """
1395         return self._manifest_locator
1396
1397     @synchronized
1398     def clone(
1399             self,
1400             new_parent: Optional['Collection']=None,
1401             new_name: Optional[str]=None,
1402             readonly: bool=False,
1403             new_config: Optional[Mapping[str, str]]=None,
1404     ) -> 'Collection':
1405         """Create a Collection object with the same contents as this instance
1406
1407         This method creates a new Collection object with contents that match
1408         this instance's. The new collection will not be associated with any API
1409         record.
1410
1411         Arguments:
1412
1413         * new_parent: arvados.collection.Collection | None --- This value is
1414           passed to the new Collection's constructor as the `parent`
1415           argument.
1416
1417         * new_name: str | None --- This value is unused.
1418
1419         * readonly: bool --- If this value is true, this method constructs and
1420           returns a `CollectionReader`. Otherwise, it returns a mutable
1421           `Collection`. Default `False`.
1422
1423         * new_config: Mapping[str, str] | None --- This value is passed to the
1424           new Collection's constructor as `apiconfig`. If no value is provided,
1425           defaults to the configuration passed to this instance's constructor.
1426         """
1427         if new_config is None:
1428             new_config = self._config
1429         if readonly:
1430             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1431         else:
1432             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1433
1434         newcollection._clonefrom(self)
1435         return newcollection
1436
1437     @synchronized
1438     def api_response(self) -> Optional[Dict[str, Any]]:
1439         """Get this instance's associated API record
1440
1441         If this Collection instance has an associated API record, return it.
1442         Otherwise, return `None`.
1443         """
1444         return self._api_response
1445
1446     def find_or_create(
1447             self,
1448             path: str,
1449             create_type: CreateType,
1450     ) -> CollectionItem:
1451         if path == ".":
1452             return self
1453         else:
1454             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1455
1456     def find(self, path: str) -> CollectionItem:
1457         if path == ".":
1458             return self
1459         else:
1460             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1461
1462     def remove(self, path: str, recursive: bool=False) -> None:
1463         if path == ".":
1464             raise errors.ArgumentError("Cannot remove '.'")
1465         else:
1466             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1467
1468     @must_be_writable
1469     @synchronized
1470     @retry_method
1471     def save(
1472             self,
1473             properties: Optional[Properties]=None,
1474             storage_classes: Optional[StorageClasses]=None,
1475             trash_at: Optional[datetime.datetime]=None,
1476             merge: bool=True,
1477             num_retries: Optional[int]=None,
1478             preserve_version: bool=False,
1479     ) -> str:
1480         """Save collection to an existing API record
1481
1482         This method updates the instance's corresponding API record to match
1483         the instance's state. If this instance does not have a corresponding API
1484         record yet, raises `AssertionError`. (To create a new API record, use
1485         `Collection.save_new`.) This method returns the saved collection
1486         manifest.
1487
1488         Arguments:
1489
1490         * properties: dict[str, Any] | None --- If provided, the API record will
1491           be updated with these properties. Note this will completely replace
1492           any existing properties.
1493
1494         * storage_classes: list[str] | None --- If provided, the API record will
1495           be updated with this value in the `storage_classes_desired` field.
1496           This value will also be saved on the instance and used for any
1497           changes that follow.
1498
1499         * trash_at: datetime.datetime | None --- If provided, the API record
1500           will be updated with this value in the `trash_at` field.
1501
1502         * merge: bool --- If `True` (the default), this method will first
1503           reload this collection's API record, and merge any new contents into
1504           this instance before saving changes. See `Collection.update` for
1505           details.
1506
1507         * num_retries: int | None --- The number of times to retry reloading
1508           the collection's API record from the API server. If not specified,
1509           uses the `num_retries` provided when this instance was constructed.
1510
1511         * preserve_version: bool --- This value will be passed to directly
1512           to the underlying API call. If `True`, the Arvados API will
1513           preserve the versions of this collection both immediately before
1514           and after the update. If `True` when the API server is not
1515           configured with collection versioning, this method raises
1516           `arvados.errors.ArgumentError`.
1517         """
1518         if properties and type(properties) is not dict:
1519             raise errors.ArgumentError("properties must be dictionary type.")
1520
1521         if storage_classes and type(storage_classes) is not list:
1522             raise errors.ArgumentError("storage_classes must be list type.")
1523         if storage_classes:
1524             self._storage_classes_desired = storage_classes
1525
1526         if trash_at and type(trash_at) is not datetime.datetime:
1527             raise errors.ArgumentError("trash_at must be datetime type.")
1528
1529         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1530             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1531
1532         body={}
1533         if properties:
1534             body["properties"] = properties
1535         if self.storage_classes_desired():
1536             body["storage_classes_desired"] = self.storage_classes_desired()
1537         if trash_at:
1538             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1539             body["trash_at"] = t
1540         if preserve_version:
1541             body["preserve_version"] = preserve_version
1542
1543         if not self.committed():
1544             if self._has_remote_blocks:
1545                 # Copy any remote blocks to the local cluster.
1546                 self._copy_remote_blocks(remote_blocks={})
1547                 self._has_remote_blocks = False
1548             if not self._has_collection_uuid():
1549                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1550             elif not self._has_local_collection_uuid():
1551                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1552
1553             self._my_block_manager().commit_all()
1554
1555             if merge:
1556                 self.update()
1557
1558             text = self.manifest_text(strip=False)
1559             body['manifest_text'] = text
1560
1561             self._remember_api_response(self._my_api().collections().update(
1562                 uuid=self._manifest_locator,
1563                 body=body
1564                 ).execute(num_retries=num_retries))
1565             self.set_committed(True)
1566         elif body:
1567             self._remember_api_response(self._my_api().collections().update(
1568                 uuid=self._manifest_locator,
1569                 body=body
1570                 ).execute(num_retries=num_retries))
1571
1572         return self._manifest_text
1573
1574
1575     @must_be_writable
1576     @synchronized
1577     @retry_method
1578     def save_new(
1579             self,
1580             name: Optional[str]=None,
1581             create_collection_record: bool=True,
1582             owner_uuid: Optional[str]=None,
1583             properties: Optional[Properties]=None,
1584             storage_classes: Optional[StorageClasses]=None,
1585             trash_at: Optional[datetime.datetime]=None,
1586             ensure_unique_name: bool=False,
1587             num_retries: Optional[int]=None,
1588             preserve_version: bool=False,
1589     ):
1590         """Save collection to a new API record
1591
1592         This method finishes uploading new data blocks and (optionally)
1593         creates a new API collection record with the provided data. If a new
1594         record is created, this instance becomes associated with that record
1595         for future updates like `save()`. This method returns the saved
1596         collection manifest.
1597
1598         Arguments:
1599
1600         * name: str | None --- The `name` field to use on the new collection
1601           record. If not specified, a generic default name is generated.
1602
1603         * create_collection_record: bool --- If `True` (the default), creates a
1604           collection record on the API server. If `False`, the method finishes
1605           all data uploads and only returns the resulting collection manifest
1606           without sending it to the API server.
1607
1608         * owner_uuid: str | None --- The `owner_uuid` field to use on the
1609           new collection record.
1610
1611         * properties: dict[str, Any] | None --- The `properties` field to use on
1612           the new collection record.
1613
1614         * storage_classes: list[str] | None --- The
1615           `storage_classes_desired` field to use on the new collection record.
1616
1617         * trash_at: datetime.datetime | None --- The `trash_at` field to use
1618           on the new collection record.
1619
1620         * ensure_unique_name: bool --- This value is passed directly to the
1621           Arvados API when creating the collection record. If `True`, the API
1622           server may modify the submitted `name` to ensure the collection's
1623           `name`+`owner_uuid` combination is unique. If `False` (the default),
1624           if a collection already exists with this same `name`+`owner_uuid`
1625           combination, creating a collection record will raise a validation
1626           error.
1627
1628         * num_retries: int | None --- The number of times to retry reloading
1629           the collection's API record from the API server. If not specified,
1630           uses the `num_retries` provided when this instance was constructed.
1631
1632         * preserve_version: bool --- This value will be passed to directly
1633           to the underlying API call. If `True`, the Arvados API will
1634           preserve the versions of this collection both immediately before
1635           and after the update. If `True` when the API server is not
1636           configured with collection versioning, this method raises
1637           `arvados.errors.ArgumentError`.
1638         """
1639         if properties and type(properties) is not dict:
1640             raise errors.ArgumentError("properties must be dictionary type.")
1641
1642         if storage_classes and type(storage_classes) is not list:
1643             raise errors.ArgumentError("storage_classes must be list type.")
1644
1645         if trash_at and type(trash_at) is not datetime.datetime:
1646             raise errors.ArgumentError("trash_at must be datetime type.")
1647
1648         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1649             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1650
1651         if self._has_remote_blocks:
1652             # Copy any remote blocks to the local cluster.
1653             self._copy_remote_blocks(remote_blocks={})
1654             self._has_remote_blocks = False
1655
1656         if storage_classes:
1657             self._storage_classes_desired = storage_classes
1658
1659         self._my_block_manager().commit_all()
1660         text = self.manifest_text(strip=False)
1661
1662         if create_collection_record:
1663             if name is None:
1664                 name = "New collection"
1665                 ensure_unique_name = True
1666
1667             body = {"manifest_text": text,
1668                     "name": name,
1669                     "replication_desired": self.replication_desired}
1670             if owner_uuid:
1671                 body["owner_uuid"] = owner_uuid
1672             if properties:
1673                 body["properties"] = properties
1674             if self.storage_classes_desired():
1675                 body["storage_classes_desired"] = self.storage_classes_desired()
1676             if trash_at:
1677                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1678                 body["trash_at"] = t
1679             if preserve_version:
1680                 body["preserve_version"] = preserve_version
1681
1682             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1683             self._manifest_locator = self._api_response["uuid"]
1684             self.set_committed(True)
1685
1686         return text
1687
1688     _token_re = re.compile(r'(\S+)(\s+|$)')
1689     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1690     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1691
1692     def _unescape_manifest_path(self, path):
1693         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1694
1695     @synchronized
1696     def _import_manifest(self, manifest_text):
1697         """Import a manifest into a `Collection`.
1698
1699         :manifest_text:
1700           The manifest text to import from.
1701
1702         """
1703         if len(self) > 0:
1704             raise ArgumentError("Can only import manifest into an empty collection")
1705
1706         STREAM_NAME = 0
1707         BLOCKS = 1
1708         SEGMENTS = 2
1709
1710         stream_name = None
1711         state = STREAM_NAME
1712
1713         for token_and_separator in self._token_re.finditer(manifest_text):
1714             tok = token_and_separator.group(1)
1715             sep = token_and_separator.group(2)
1716
1717             if state == STREAM_NAME:
1718                 # starting a new stream
1719                 stream_name = self._unescape_manifest_path(tok)
1720                 blocks = []
1721                 segments = []
1722                 streamoffset = 0
1723                 state = BLOCKS
1724                 self.find_or_create(stream_name, COLLECTION)
1725                 continue
1726
1727             if state == BLOCKS:
1728                 block_locator = self._block_re.match(tok)
1729                 if block_locator:
1730                     blocksize = int(block_locator.group(1))
1731                     blocks.append(streams.Range(tok, streamoffset, blocksize, 0))
1732                     streamoffset += blocksize
1733                 else:
1734                     state = SEGMENTS
1735
1736             if state == SEGMENTS:
1737                 file_segment = self._segment_re.match(tok)
1738                 if file_segment:
1739                     pos = int(file_segment.group(1))
1740                     size = int(file_segment.group(2))
1741                     name = self._unescape_manifest_path(file_segment.group(3))
1742                     if name.split('/')[-1] == '.':
1743                         # placeholder for persisting an empty directory, not a real file
1744                         if len(name) > 2:
1745                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1746                     else:
1747                         filepath = os.path.join(stream_name, name)
1748                         try:
1749                             afile = self.find_or_create(filepath, FILE)
1750                         except IOError as e:
1751                             if e.errno == errno.ENOTDIR:
1752                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1753                             else:
1754                                 raise e from None
1755                         if isinstance(afile, ArvadosFile):
1756                             afile.add_segment(blocks, pos, size)
1757                         else:
1758                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1759                 else:
1760                     # error!
1761                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1762
1763             if sep == "\n":
1764                 stream_name = None
1765                 state = STREAM_NAME
1766
1767         self._update_token_timestamp()
1768         self.set_committed(True)
1769
1770     @synchronized
1771     def notify(
1772             self,
1773             event: ChangeType,
1774             collection: 'RichCollectionBase',
1775             name: str,
1776             item: CollectionItem,
1777     ) -> None:
1778         if self._callback:
1779             self._callback(event, collection, name, item)
1780
1781
1782 class Subcollection(RichCollectionBase):
1783     """Read and manipulate a stream/directory within an Arvados collection
1784
1785     This class represents a single stream (like a directory) within an Arvados
1786     `Collection`. It is returned by `Collection.find` and provides the same API.
1787     Operations that work on the API collection record propagate to the parent
1788     `Collection` object.
1789     """
1790
1791     def __init__(self, parent, name):
1792         super(Subcollection, self).__init__(parent)
1793         self.lock = self.root_collection().lock
1794         self._manifest_text = None
1795         self.name = name
1796         self.num_retries = parent.num_retries
1797
1798     def root_collection(self) -> 'Collection':
1799         return self.parent.root_collection()
1800
1801     def writable(self) -> bool:
1802         return self.root_collection().writable()
1803
1804     def _my_api(self):
1805         return self.root_collection()._my_api()
1806
1807     def _my_keep(self):
1808         return self.root_collection()._my_keep()
1809
1810     def _my_block_manager(self):
1811         return self.root_collection()._my_block_manager()
1812
1813     def stream_name(self) -> str:
1814         return os.path.join(self.parent.stream_name(), self.name)
1815
1816     @synchronized
1817     def clone(
1818             self,
1819             new_parent: Optional['Collection']=None,
1820             new_name: Optional[str]=None,
1821     ) -> 'Subcollection':
1822         c = Subcollection(new_parent, new_name)
1823         c._clonefrom(self)
1824         return c
1825
1826     @must_be_writable
1827     @synchronized
1828     def _reparent(self, newparent, newname):
1829         self.set_committed(False)
1830         self.flush()
1831         self.parent.remove(self.name, recursive=True)
1832         self.parent = newparent
1833         self.name = newname
1834         self.lock = self.parent.root_collection().lock
1835
1836     @synchronized
1837     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1838         """Encode empty directories by using an \056-named (".") empty file"""
1839         if len(self._items) == 0:
1840             return "%s %s 0:0:\\056\n" % (
1841                 streams.escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1842         return super(Subcollection, self)._get_manifest_text(stream_name,
1843                                                              strip, normalize,
1844                                                              only_committed)
1845
1846
1847 class CollectionReader(Collection):
1848     """Read-only `Collection` subclass
1849
1850     This class will never create or update any API collection records. You can
1851     use this class for additional code safety when you only need to read
1852     existing collections.
1853     """
1854     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1855         self._in_init = True
1856         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1857         self._in_init = False
1858
1859         # Forego any locking since it should never change once initialized.
1860         self.lock = NoopLock()
1861
1862         # Backwards compatability with old CollectionReader
1863         # all_streams() and all_files()
1864         self._streams = None
1865
1866     def writable(self) -> bool:
1867         return self._in_init