Merge branch '21705-go-deps-update-all'
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
5
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
11
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
13 """
14
15 import ciso8601
16 import datetime
17 import errno
18 import functools
19 import hashlib
20 import io
21 import logging
22 import os
23 import re
24 import sys
25 import threading
26 import time
27
28 from collections import deque
29 from stat import *
30
31 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
32 from .keep import KeepLocator, KeepClient
33 from .stream import StreamReader
34 from ._normalize_stream import normalize_stream, escape
35 from ._ranges import Range, LocatorAndRange
36 from .safeapi import ThreadSafeApiCache
37 import arvados.config as config
38 import arvados.errors as errors
39 import arvados.util
40 import arvados.events as events
41 from arvados.retry import retry_method
42
43 from typing import (
44     Any,
45     Callable,
46     Dict,
47     IO,
48     Iterator,
49     List,
50     Mapping,
51     Optional,
52     Tuple,
53     Union,
54 )
55
56 if sys.version_info < (3, 8):
57     from typing_extensions import Literal
58 else:
59     from typing import Literal
60
61 _logger = logging.getLogger('arvados.collection')
62
63 ADD = "add"
64 """Argument value for `Collection` methods to represent an added item"""
65 DEL = "del"
66 """Argument value for `Collection` methods to represent a removed item"""
67 MOD = "mod"
68 """Argument value for `Collection` methods to represent a modified item"""
69 TOK = "tok"
70 """Argument value for `Collection` methods to represent an item with token differences"""
71 FILE = "file"
72 """`create_type` value for `Collection.find_or_create`"""
73 COLLECTION = "collection"
74 """`create_type` value for `Collection.find_or_create`"""
75
76 ChangeList = List[Union[
77     Tuple[Literal[ADD, DEL], str, 'Collection'],
78     Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
79 ]]
80 ChangeType = Literal[ADD, DEL, MOD, TOK]
81 CollectionItem = Union[ArvadosFile, 'Collection']
82 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
83 CreateType = Literal[COLLECTION, FILE]
84 Properties = Dict[str, Any]
85 StorageClasses = List[str]
86
87 class CollectionBase(object):
88     """Abstract base class for Collection classes
89
90     .. ATTENTION:: Internal
91        This class is meant to be used by other parts of the SDK. User code
92        should instantiate or subclass `Collection` or one of its subclasses
93        directly.
94     """
95
96     def __enter__(self):
97         """Enter a context block with this collection instance"""
98         return self
99
100     def __exit__(self, exc_type, exc_value, traceback):
101         """Exit a context block with this collection instance"""
102         pass
103
104     def _my_keep(self):
105         if self._keep_client is None:
106             self._keep_client = KeepClient(api_client=self._api_client,
107                                            num_retries=self.num_retries)
108         return self._keep_client
109
110     def stripped_manifest(self) -> str:
111         """Create a copy of the collection manifest with only size hints
112
113         This method returns a string with the current collection's manifest
114         text with all non-portable locator hints like permission hints and
115         remote cluster hints removed. The only hints in the returned manifest
116         will be size hints.
117         """
118         raw = self.manifest_text()
119         clean = []
120         for line in raw.split("\n"):
121             fields = line.split()
122             if fields:
123                 clean_fields = fields[:1] + [
124                     (re.sub(r'\+[^\d][^\+]*', '', x)
125                      if re.match(arvados.util.keep_locator_pattern, x)
126                      else x)
127                     for x in fields[1:]]
128                 clean += [' '.join(clean_fields), "\n"]
129         return ''.join(clean)
130
131
132 class _WriterFile(_FileLikeObjectBase):
133     def __init__(self, coll_writer, name):
134         super(_WriterFile, self).__init__(name, 'wb')
135         self.dest = coll_writer
136
137     def close(self):
138         super(_WriterFile, self).close()
139         self.dest.finish_current_file()
140
141     @_FileLikeObjectBase._before_close
142     def write(self, data):
143         self.dest.write(data)
144
145     @_FileLikeObjectBase._before_close
146     def writelines(self, seq):
147         for data in seq:
148             self.write(data)
149
150     @_FileLikeObjectBase._before_close
151     def flush(self):
152         self.dest.flush_data()
153
154
155 class RichCollectionBase(CollectionBase):
156     """Base class for Collection classes
157
158     .. ATTENTION:: Internal
159        This class is meant to be used by other parts of the SDK. User code
160        should instantiate or subclass `Collection` or one of its subclasses
161        directly.
162     """
163
164     def __init__(self, parent=None):
165         self.parent = parent
166         self._committed = False
167         self._has_remote_blocks = False
168         self._callback = None
169         self._items = {}
170
171     def _my_api(self):
172         raise NotImplementedError()
173
174     def _my_keep(self):
175         raise NotImplementedError()
176
177     def _my_block_manager(self):
178         raise NotImplementedError()
179
180     def writable(self) -> bool:
181         """Indicate whether this collection object can be modified
182
183         This method returns `False` if this object is a `CollectionReader`,
184         else `True`.
185         """
186         raise NotImplementedError()
187
188     def root_collection(self) -> 'Collection':
189         """Get this collection's root collection object
190
191         If you open a subcollection with `Collection.find`, calling this method
192         on that subcollection returns the source Collection object.
193         """
194         raise NotImplementedError()
195
196     def stream_name(self) -> str:
197         """Get the name of the manifest stream represented by this collection
198
199         If you open a subcollection with `Collection.find`, calling this method
200         on that subcollection returns the name of the stream you opened.
201         """
202         raise NotImplementedError()
203
204     @synchronized
205     def has_remote_blocks(self) -> bool:
206         """Indiciate whether the collection refers to remote data
207
208         Returns `True` if the collection manifest includes any Keep locators
209         with a remote hint (`+R`), else `False`.
210         """
211         if self._has_remote_blocks:
212             return True
213         for item in self:
214             if self[item].has_remote_blocks():
215                 return True
216         return False
217
218     @synchronized
219     def set_has_remote_blocks(self, val: bool) -> None:
220         """Cache whether this collection refers to remote blocks
221
222         .. ATTENTION:: Internal
223            This method is only meant to be used by other Collection methods.
224
225         Set this collection's cached "has remote blocks" flag to the given
226         value.
227         """
228         self._has_remote_blocks = val
229         if self.parent:
230             self.parent.set_has_remote_blocks(val)
231
232     @must_be_writable
233     @synchronized
234     def find_or_create(
235             self,
236             path: str,
237             create_type: CreateType,
238     ) -> CollectionItem:
239         """Get the item at the given path, creating it if necessary
240
241         If `path` refers to a stream in this collection, returns a
242         corresponding `Subcollection` object. If `path` refers to a file in
243         this collection, returns a corresponding
244         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
245         this collection, then this method creates a new object and returns
246         it, creating parent streams as needed. The type of object created is
247         determined by the value of `create_type`.
248
249         Arguments:
250
251         * path: str --- The path to find or create within this collection.
252
253         * create_type: Literal[COLLECTION, FILE] --- The type of object to
254           create at `path` if one does not exist. Passing `COLLECTION`
255           creates a stream and returns the corresponding
256           `Subcollection`. Passing `FILE` creates a new file and returns the
257           corresponding `arvados.arvfile.ArvadosFile`.
258         """
259         pathcomponents = path.split("/", 1)
260         if pathcomponents[0]:
261             item = self._items.get(pathcomponents[0])
262             if len(pathcomponents) == 1:
263                 if item is None:
264                     # create new file
265                     if create_type == COLLECTION:
266                         item = Subcollection(self, pathcomponents[0])
267                     else:
268                         item = ArvadosFile(self, pathcomponents[0])
269                     self._items[pathcomponents[0]] = item
270                     self.set_committed(False)
271                     self.notify(ADD, self, pathcomponents[0], item)
272                 return item
273             else:
274                 if item is None:
275                     # create new collection
276                     item = Subcollection(self, pathcomponents[0])
277                     self._items[pathcomponents[0]] = item
278                     self.set_committed(False)
279                     self.notify(ADD, self, pathcomponents[0], item)
280                 if isinstance(item, RichCollectionBase):
281                     return item.find_or_create(pathcomponents[1], create_type)
282                 else:
283                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
284         else:
285             return self
286
287     @synchronized
288     def find(self, path: str) -> CollectionItem:
289         """Get the item at the given path
290
291         If `path` refers to a stream in this collection, returns a
292         corresponding `Subcollection` object. If `path` refers to a file in
293         this collection, returns a corresponding
294         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
295         this collection, then this method raises `NotADirectoryError`.
296
297         Arguments:
298
299         * path: str --- The path to find or create within this collection.
300         """
301         if not path:
302             raise errors.ArgumentError("Parameter 'path' is empty.")
303
304         pathcomponents = path.split("/", 1)
305         if pathcomponents[0] == '':
306             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
307
308         item = self._items.get(pathcomponents[0])
309         if item is None:
310             return None
311         elif len(pathcomponents) == 1:
312             return item
313         else:
314             if isinstance(item, RichCollectionBase):
315                 if pathcomponents[1]:
316                     return item.find(pathcomponents[1])
317                 else:
318                     return item
319             else:
320                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
321
322     @synchronized
323     def mkdirs(self, path: str) -> 'Subcollection':
324         """Create and return a subcollection at `path`
325
326         If `path` exists within this collection, raises `FileExistsError`.
327         Otherwise, creates a stream at that path and returns the
328         corresponding `Subcollection`.
329         """
330         if self.find(path) != None:
331             raise IOError(errno.EEXIST, "Directory or file exists", path)
332
333         return self.find_or_create(path, COLLECTION)
334
335     def open(
336             self,
337             path: str,
338             mode: str="r",
339             encoding: Optional[str]=None
340     ) -> IO:
341         """Open a file-like object within the collection
342
343         This method returns a file-like object that can read and/or write the
344         file located at `path` within the collection. If you attempt to write
345         a `path` that does not exist, the file is created with `find_or_create`.
346         If the file cannot be opened for any other reason, this method raises
347         `OSError` with an appropriate errno.
348
349         Arguments:
350
351         * path: str --- The path of the file to open within this collection
352
353         * mode: str --- The mode to open this file. Supports all the same
354           values as `builtins.open`.
355
356         * encoding: str | None --- The text encoding of the file. Only used
357           when the file is opened in text mode. The default is
358           platform-dependent.
359
360         """
361         if not re.search(r'^[rwa][bt]?\+?$', mode):
362             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
363
364         if mode[0] == 'r' and '+' not in mode:
365             fclass = ArvadosFileReader
366             arvfile = self.find(path)
367         elif not self.writable():
368             raise IOError(errno.EROFS, "Collection is read only")
369         else:
370             fclass = ArvadosFileWriter
371             arvfile = self.find_or_create(path, FILE)
372
373         if arvfile is None:
374             raise IOError(errno.ENOENT, "File not found", path)
375         if not isinstance(arvfile, ArvadosFile):
376             raise IOError(errno.EISDIR, "Is a directory", path)
377
378         if mode[0] == 'w':
379             arvfile.truncate(0)
380
381         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
382         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
383         if 'b' not in mode:
384             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
385             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
386         return f
387
388     def modified(self) -> bool:
389         """Indicate whether this collection has an API server record
390
391         Returns `False` if this collection corresponds to a record loaded from
392         the API server, `True` otherwise.
393         """
394         return not self.committed()
395
396     @synchronized
397     def committed(self):
398         """Indicate whether this collection has an API server record
399
400         Returns `True` if this collection corresponds to a record loaded from
401         the API server, `False` otherwise.
402         """
403         return self._committed
404
405     @synchronized
406     def set_committed(self, value: bool=True):
407         """Cache whether this collection has an API server record
408
409         .. ATTENTION:: Internal
410            This method is only meant to be used by other Collection methods.
411
412         Set this collection's cached "committed" flag to the given
413         value and propagates it as needed.
414         """
415         if value == self._committed:
416             return
417         if value:
418             for k,v in self._items.items():
419                 v.set_committed(True)
420             self._committed = True
421         else:
422             self._committed = False
423             if self.parent is not None:
424                 self.parent.set_committed(False)
425
426     @synchronized
427     def __iter__(self) -> Iterator[str]:
428         """Iterate names of streams and files in this collection
429
430         This method does not recurse. It only iterates the contents of this
431         collection's corresponding stream.
432         """
433         return iter(self._items)
434
435     @synchronized
436     def __getitem__(self, k: str) -> CollectionItem:
437         """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
438
439         This method does not recurse. If you want to search a path, use
440         `RichCollectionBase.find` instead.
441         """
442         return self._items[k]
443
444     @synchronized
445     def __contains__(self, k: str) -> bool:
446         """Indicate whether this collection has an item with this name
447
448         This method does not recurse. It you want to check a path, use
449         `RichCollectionBase.exists` instead.
450         """
451         return k in self._items
452
453     @synchronized
454     def __len__(self):
455         """Get the number of items directly contained in this collection
456
457         This method does not recurse. It only counts the streams and files
458         in this collection's corresponding stream.
459         """
460         return len(self._items)
461
462     @must_be_writable
463     @synchronized
464     def __delitem__(self, p: str) -> None:
465         """Delete an item from this collection's stream
466
467         This method does not recurse. If you want to remove an item by a
468         path, use `RichCollectionBase.remove` instead.
469         """
470         del self._items[p]
471         self.set_committed(False)
472         self.notify(DEL, self, p, None)
473
474     @synchronized
475     def keys(self) -> Iterator[str]:
476         """Iterate names of streams and files in this collection
477
478         This method does not recurse. It only iterates the contents of this
479         collection's corresponding stream.
480         """
481         return self._items.keys()
482
483     @synchronized
484     def values(self) -> List[CollectionItem]:
485         """Get a list of objects in this collection's stream
486
487         The return value includes a `Subcollection` for every stream, and an
488         `arvados.arvfile.ArvadosFile` for every file, directly within this
489         collection's stream.  This method does not recurse.
490         """
491         return list(self._items.values())
492
493     @synchronized
494     def items(self) -> List[Tuple[str, CollectionItem]]:
495         """Get a list of `(name, object)` tuples from this collection's stream
496
497         The return value includes a `Subcollection` for every stream, and an
498         `arvados.arvfile.ArvadosFile` for every file, directly within this
499         collection's stream.  This method does not recurse.
500         """
501         return list(self._items.items())
502
503     def exists(self, path: str) -> bool:
504         """Indicate whether this collection includes an item at `path`
505
506         This method returns `True` if `path` refers to a stream or file within
507         this collection, else `False`.
508
509         Arguments:
510
511         * path: str --- The path to check for existence within this collection
512         """
513         return self.find(path) is not None
514
515     @must_be_writable
516     @synchronized
517     def remove(self, path: str, recursive: bool=False) -> None:
518         """Remove the file or stream at `path`
519
520         Arguments:
521
522         * path: str --- The path of the item to remove from the collection
523
524         * recursive: bool --- Controls the method's behavior if `path` refers
525           to a nonempty stream. If `False` (the default), this method raises
526           `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
527           items under the stream.
528         """
529         if not path:
530             raise errors.ArgumentError("Parameter 'path' is empty.")
531
532         pathcomponents = path.split("/", 1)
533         item = self._items.get(pathcomponents[0])
534         if item is None:
535             raise IOError(errno.ENOENT, "File not found", path)
536         if len(pathcomponents) == 1:
537             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
538                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
539             deleteditem = self._items[pathcomponents[0]]
540             del self._items[pathcomponents[0]]
541             self.set_committed(False)
542             self.notify(DEL, self, pathcomponents[0], deleteditem)
543         else:
544             item.remove(pathcomponents[1], recursive=recursive)
545
546     def _clonefrom(self, source):
547         for k,v in source.items():
548             self._items[k] = v.clone(self, k)
549
550     def clone(self):
551         raise NotImplementedError()
552
553     @must_be_writable
554     @synchronized
555     def add(
556             self,
557             source_obj: CollectionItem,
558             target_name: str,
559             overwrite: bool=False,
560             reparent: bool=False,
561     ) -> None:
562         """Copy or move a file or subcollection object to this collection
563
564         Arguments:
565
566         * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
567           to add to this collection
568
569         * target_name: str --- The path inside this collection where
570           `source_obj` should be added.
571
572         * overwrite: bool --- Controls the behavior of this method when the
573           collection already contains an object at `target_name`. If `False`
574           (the default), this method will raise `FileExistsError`. If `True`,
575           the object at `target_name` will be replaced with `source_obj`.
576
577         * reparent: bool --- Controls whether this method copies or moves
578           `source_obj`. If `False` (the default), `source_obj` is copied into
579           this collection. If `True`, `source_obj` is moved into this
580           collection.
581         """
582         if target_name in self and not overwrite:
583             raise IOError(errno.EEXIST, "File already exists", target_name)
584
585         modified_from = None
586         if target_name in self:
587             modified_from = self[target_name]
588
589         # Actually make the move or copy.
590         if reparent:
591             source_obj._reparent(self, target_name)
592             item = source_obj
593         else:
594             item = source_obj.clone(self, target_name)
595
596         self._items[target_name] = item
597         self.set_committed(False)
598         if not self._has_remote_blocks and source_obj.has_remote_blocks():
599             self.set_has_remote_blocks(True)
600
601         if modified_from:
602             self.notify(MOD, self, target_name, (modified_from, item))
603         else:
604             self.notify(ADD, self, target_name, item)
605
606     def _get_src_target(self, source, target_path, source_collection, create_dest):
607         if source_collection is None:
608             source_collection = self
609
610         # Find the object
611         if isinstance(source, str):
612             source_obj = source_collection.find(source)
613             if source_obj is None:
614                 raise IOError(errno.ENOENT, "File not found", source)
615             sourcecomponents = source.split("/")
616         else:
617             source_obj = source
618             sourcecomponents = None
619
620         # Find parent collection the target path
621         targetcomponents = target_path.split("/")
622
623         # Determine the name to use.
624         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
625
626         if not target_name:
627             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
628
629         if create_dest:
630             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
631         else:
632             if len(targetcomponents) > 1:
633                 target_dir = self.find("/".join(targetcomponents[0:-1]))
634             else:
635                 target_dir = self
636
637         if target_dir is None:
638             raise IOError(errno.ENOENT, "Target directory not found", target_name)
639
640         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
641             target_dir = target_dir[target_name]
642             target_name = sourcecomponents[-1]
643
644         return (source_obj, target_dir, target_name)
645
646     @must_be_writable
647     @synchronized
648     def copy(
649             self,
650             source: Union[str, CollectionItem],
651             target_path: str,
652             source_collection: Optional['RichCollectionBase']=None,
653             overwrite: bool=False,
654     ) -> None:
655         """Copy a file or subcollection object to this collection
656
657         Arguments:
658
659         * source: str | arvados.arvfile.ArvadosFile |
660           arvados.collection.Subcollection --- The file or subcollection to
661           add to this collection. If `source` is a str, the object will be
662           found by looking up this path from `source_collection` (see
663           below).
664
665         * target_path: str --- The path inside this collection where the
666           source object should be added.
667
668         * source_collection: arvados.collection.Collection | None --- The
669           collection to find the source object from when `source` is a
670           path. Defaults to the current collection (`self`).
671
672         * overwrite: bool --- Controls the behavior of this method when the
673           collection already contains an object at `target_path`. If `False`
674           (the default), this method will raise `FileExistsError`. If `True`,
675           the object at `target_path` will be replaced with `source_obj`.
676         """
677         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
678         target_dir.add(source_obj, target_name, overwrite, False)
679
680     @must_be_writable
681     @synchronized
682     def rename(
683             self,
684             source: Union[str, CollectionItem],
685             target_path: str,
686             source_collection: Optional['RichCollectionBase']=None,
687             overwrite: bool=False,
688     ) -> None:
689         """Move a file or subcollection object to this collection
690
691         Arguments:
692
693         * source: str | arvados.arvfile.ArvadosFile |
694           arvados.collection.Subcollection --- The file or subcollection to
695           add to this collection. If `source` is a str, the object will be
696           found by looking up this path from `source_collection` (see
697           below).
698
699         * target_path: str --- The path inside this collection where the
700           source object should be added.
701
702         * source_collection: arvados.collection.Collection | None --- The
703           collection to find the source object from when `source` is a
704           path. Defaults to the current collection (`self`).
705
706         * overwrite: bool --- Controls the behavior of this method when the
707           collection already contains an object at `target_path`. If `False`
708           (the default), this method will raise `FileExistsError`. If `True`,
709           the object at `target_path` will be replaced with `source_obj`.
710         """
711         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
712         if not source_obj.writable():
713             raise IOError(errno.EROFS, "Source collection is read only", source)
714         target_dir.add(source_obj, target_name, overwrite, True)
715
716     def portable_manifest_text(self, stream_name: str=".") -> str:
717         """Get the portable manifest text for this collection
718
719         The portable manifest text is normalized, and does not include access
720         tokens. This method does not flush outstanding blocks to Keep.
721
722         Arguments:
723
724         * stream_name: str --- The name to use for this collection's stream in
725           the generated manifest. Default `'.'`.
726         """
727         return self._get_manifest_text(stream_name, True, True)
728
729     @synchronized
730     def manifest_text(
731             self,
732             stream_name: str=".",
733             strip: bool=False,
734             normalize: bool=False,
735             only_committed: bool=False,
736     ) -> str:
737         """Get the manifest text for this collection
738
739         Arguments:
740
741         * stream_name: str --- The name to use for this collection's stream in
742           the generated manifest. Default `'.'`.
743
744         * strip: bool --- Controls whether or not the returned manifest text
745           includes access tokens. If `False` (the default), the manifest text
746           will include access tokens. If `True`, the manifest text will not
747           include access tokens.
748
749         * normalize: bool --- Controls whether or not the returned manifest
750           text is normalized. Default `False`.
751
752         * only_committed: bool --- Controls whether or not this method uploads
753           pending data to Keep before building and returning the manifest text.
754           If `False` (the default), this method will finish uploading all data
755           to Keep, then return the final manifest. If `True`, this method will
756           build and return a manifest that only refers to the data that has
757           finished uploading at the time this method was called.
758         """
759         if not only_committed:
760             self._my_block_manager().commit_all()
761         return self._get_manifest_text(stream_name, strip, normalize,
762                                        only_committed=only_committed)
763
764     @synchronized
765     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
766         """Get the manifest text for this collection, sub collections and files.
767
768         :stream_name:
769           Name to use for this stream (directory)
770
771         :strip:
772           If True, remove signing tokens from block locators if present.
773           If False (default), block locators are left unchanged.
774
775         :normalize:
776           If True, always export the manifest text in normalized form
777           even if the Collection is not modified.  If False (default) and the collection
778           is not modified, return the original manifest text even if it is not
779           in normalized form.
780
781         :only_committed:
782           If True, only include blocks that were already committed to Keep.
783
784         """
785
786         if not self.committed() or self._manifest_text is None or normalize:
787             stream = {}
788             buf = []
789             sorted_keys = sorted(self.keys())
790             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
791                 # Create a stream per file `k`
792                 arvfile = self[filename]
793                 filestream = []
794                 for segment in arvfile.segments():
795                     loc = segment.locator
796                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
797                         if only_committed:
798                             continue
799                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
800                     if strip:
801                         loc = KeepLocator(loc).stripped()
802                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
803                                          segment.segment_offset, segment.range_size))
804                 stream[filename] = filestream
805             if stream:
806                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
807             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
808                 buf.append(self[dirname].manifest_text(
809                     stream_name=os.path.join(stream_name, dirname),
810                     strip=strip, normalize=True, only_committed=only_committed))
811             return "".join(buf)
812         else:
813             if strip:
814                 return self.stripped_manifest()
815             else:
816                 return self._manifest_text
817
818     @synchronized
819     def _copy_remote_blocks(self, remote_blocks={}):
820         """Scan through the entire collection and ask Keep to copy remote blocks.
821
822         When accessing a remote collection, blocks will have a remote signature
823         (+R instead of +A). Collect these signatures and request Keep to copy the
824         blocks to the local cluster, returning local (+A) signatures.
825
826         :remote_blocks:
827           Shared cache of remote to local block mappings. This is used to avoid
828           doing extra work when blocks are shared by more than one file in
829           different subdirectories.
830
831         """
832         for item in self:
833             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
834         return remote_blocks
835
836     @synchronized
837     def diff(
838             self,
839             end_collection: 'RichCollectionBase',
840             prefix: str=".",
841             holding_collection: Optional['Collection']=None,
842     ) -> ChangeList:
843         """Build a list of differences between this collection and another
844
845         Arguments:
846
847         * end_collection: arvados.collection.RichCollectionBase --- A
848           collection object with the desired end state. The returned diff
849           list will describe how to go from the current collection object
850           `self` to `end_collection`.
851
852         * prefix: str --- The name to use for this collection's stream in
853           the diff list. Default `'.'`.
854
855         * holding_collection: arvados.collection.Collection | None --- A
856           collection object used to hold objects for the returned diff
857           list. By default, a new empty collection is created.
858         """
859         changes = []
860         if holding_collection is None:
861             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
862         for k in self:
863             if k not in end_collection:
864                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
865         for k in end_collection:
866             if k in self:
867                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
868                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
869                 elif end_collection[k] != self[k]:
870                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
871                 else:
872                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
873             else:
874                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
875         return changes
876
877     @must_be_writable
878     @synchronized
879     def apply(self, changes: ChangeList) -> None:
880         """Apply a list of changes from to this collection
881
882         This method takes a list of changes generated by
883         `RichCollectionBase.diff` and applies it to this
884         collection. Afterward, the state of this collection object will
885         match the state of `end_collection` passed to `diff`. If a change
886         conflicts with a local change, it will be saved to an alternate path
887         indicating the conflict.
888
889         Arguments:
890
891         * changes: arvados.collection.ChangeList --- The list of differences
892           generated by `RichCollectionBase.diff`.
893         """
894         if changes:
895             self.set_committed(False)
896         for change in changes:
897             event_type = change[0]
898             path = change[1]
899             initial = change[2]
900             local = self.find(path)
901             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
902                                                                     time.gmtime()))
903             if event_type == ADD:
904                 if local is None:
905                     # No local file at path, safe to copy over new file
906                     self.copy(initial, path)
907                 elif local is not None and local != initial:
908                     # There is already local file and it is different:
909                     # save change to conflict file.
910                     self.copy(initial, conflictpath)
911             elif event_type == MOD or event_type == TOK:
912                 final = change[3]
913                 if local == initial:
914                     # Local matches the "initial" item so it has not
915                     # changed locally and is safe to update.
916                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
917                         # Replace contents of local file with new contents
918                         local.replace_contents(final)
919                     else:
920                         # Overwrite path with new item; this can happen if
921                         # path was a file and is now a collection or vice versa
922                         self.copy(final, path, overwrite=True)
923                 else:
924                     # Local is missing (presumably deleted) or local doesn't
925                     # match the "start" value, so save change to conflict file
926                     self.copy(final, conflictpath)
927             elif event_type == DEL:
928                 if local == initial:
929                     # Local item matches "initial" value, so it is safe to remove.
930                     self.remove(path, recursive=True)
931                 # else, the file is modified or already removed, in either
932                 # case we don't want to try to remove it.
933
934     def portable_data_hash(self) -> str:
935         """Get the portable data hash for this collection's manifest"""
936         if self._manifest_locator and self.committed():
937             # If the collection is already saved on the API server, and it's committed
938             # then return API server's PDH response.
939             return self._portable_data_hash
940         else:
941             stripped = self.portable_manifest_text().encode()
942             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
943
944     @synchronized
945     def subscribe(self, callback: ChangeCallback) -> None:
946         """Set a notify callback for changes to this collection
947
948         Arguments:
949
950         * callback: arvados.collection.ChangeCallback --- The callable to
951           call each time the collection is changed.
952         """
953         if self._callback is None:
954             self._callback = callback
955         else:
956             raise errors.ArgumentError("A callback is already set on this collection.")
957
958     @synchronized
959     def unsubscribe(self) -> None:
960         """Remove any notify callback set for changes to this collection"""
961         if self._callback is not None:
962             self._callback = None
963
964     @synchronized
965     def notify(
966             self,
967             event: ChangeType,
968             collection: 'RichCollectionBase',
969             name: str,
970             item: CollectionItem,
971     ) -> None:
972         """Notify any subscribed callback about a change to this collection
973
974         .. ATTENTION:: Internal
975            This method is only meant to be used by other Collection methods.
976
977         If a callback has been registered with `RichCollectionBase.subscribe`,
978         it will be called with information about a change to this collection.
979         Then this notification will be propagated to this collection's root.
980
981         Arguments:
982
983         * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
984           the collection.
985
986         * collection: arvados.collection.RichCollectionBase --- The
987           collection that was modified.
988
989         * name: str --- The name of the file or stream within `collection` that
990           was modified.
991
992         * item: arvados.arvfile.ArvadosFile |
993           arvados.collection.Subcollection --- The new contents at `name`
994           within `collection`.
995         """
996         if self._callback:
997             self._callback(event, collection, name, item)
998         self.root_collection().notify(event, collection, name, item)
999
1000     @synchronized
1001     def __eq__(self, other: Any) -> bool:
1002         """Indicate whether this collection object is equal to another"""
1003         if other is self:
1004             return True
1005         if not isinstance(other, RichCollectionBase):
1006             return False
1007         if len(self._items) != len(other):
1008             return False
1009         for k in self._items:
1010             if k not in other:
1011                 return False
1012             if self._items[k] != other[k]:
1013                 return False
1014         return True
1015
1016     def __ne__(self, other: Any) -> bool:
1017         """Indicate whether this collection object is not equal to another"""
1018         return not self.__eq__(other)
1019
1020     @synchronized
1021     def flush(self) -> None:
1022         """Upload any pending data to Keep"""
1023         for e in self.values():
1024             e.flush()
1025
1026
1027 class Collection(RichCollectionBase):
1028     """Read and manipulate an Arvados collection
1029
1030     This class provides a high-level interface to create, read, and update
1031     Arvados collections and their contents. Refer to the Arvados Python SDK
1032     cookbook for [an introduction to using the Collection class][cookbook].
1033
1034     [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1035     """
1036
1037     def __init__(self, manifest_locator_or_text: Optional[str]=None,
1038                  api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1039                  keep_client: Optional['arvados.keep.KeepClient']=None,
1040                  num_retries: int=10,
1041                  parent: Optional['Collection']=None,
1042                  apiconfig: Optional[Mapping[str, str]]=None,
1043                  block_manager: Optional['arvados.arvfile._BlockManager']=None,
1044                  replication_desired: Optional[int]=None,
1045                  storage_classes_desired: Optional[List[str]]=None,
1046                  put_threads: Optional[int]=None):
1047         """Initialize a Collection object
1048
1049         Arguments:
1050
1051         * manifest_locator_or_text: str | None --- This string can contain a
1052           collection manifest text, portable data hash, or UUID. When given a
1053           portable data hash or UUID, this instance will load a collection
1054           record from the API server. Otherwise, this instance will represent a
1055           new collection without an API server record. The default value `None`
1056           instantiates a new collection with an empty manifest.
1057
1058         * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1059           Arvados API client object this instance uses to make requests. If
1060           none is given, this instance creates its own client using the
1061           settings from `apiconfig` (see below). If your client instantiates
1062           many Collection objects, you can help limit memory utilization by
1063           calling `arvados.api.api` to construct an
1064           `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1065           for every Collection.
1066
1067         * keep_client: arvados.keep.KeepClient | None --- The Keep client
1068           object this instance uses to make requests. If none is given, this
1069           instance creates its own client using its `api_client`.
1070
1071         * num_retries: int --- The number of times that client requests are
1072           retried. Default 10.
1073
1074         * parent: arvados.collection.Collection | None --- The parent Collection
1075           object of this instance, if any. This argument is primarily used by
1076           other Collection methods; user client code shouldn't need to use it.
1077
1078         * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1079           `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1080           `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1081           Collection object constructs one from these settings. If no
1082           mapping is provided, calls `arvados.config.settings` to get these
1083           parameters from user configuration.
1084
1085         * block_manager: arvados.arvfile._BlockManager | None --- The
1086           _BlockManager object used by this instance to coordinate reading
1087           and writing Keep data blocks. If none is given, this instance
1088           constructs its own. This argument is primarily used by other
1089           Collection methods; user client code shouldn't need to use it.
1090
1091         * replication_desired: int | None --- This controls both the value of
1092           the `replication_desired` field on API collection records saved by
1093           this class, as well as the number of Keep services that the object
1094           writes new data blocks to. If none is given, uses the default value
1095           configured for the cluster.
1096
1097         * storage_classes_desired: list[str] | None --- This controls both
1098           the value of the `storage_classes_desired` field on API collection
1099           records saved by this class, as well as selecting which specific
1100           Keep services the object writes new data blocks to. If none is
1101           given, defaults to an empty list.
1102
1103         * put_threads: int | None --- The number of threads to run
1104           simultaneously to upload data blocks to Keep. This value is used when
1105           building a new `block_manager`. It is unused when a `block_manager`
1106           is provided.
1107         """
1108
1109         if storage_classes_desired and type(storage_classes_desired) is not list:
1110             raise errors.ArgumentError("storage_classes_desired must be list type.")
1111
1112         super(Collection, self).__init__(parent)
1113         self._api_client = api_client
1114         self._keep_client = keep_client
1115
1116         # Use the keep client from ThreadSafeApiCache
1117         if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1118             self._keep_client = self._api_client.keep
1119
1120         self._block_manager = block_manager
1121         self.replication_desired = replication_desired
1122         self._storage_classes_desired = storage_classes_desired
1123         self.put_threads = put_threads
1124
1125         if apiconfig:
1126             self._config = apiconfig
1127         else:
1128             self._config = config.settings()
1129
1130         self.num_retries = num_retries
1131         self._manifest_locator = None
1132         self._manifest_text = None
1133         self._portable_data_hash = None
1134         self._api_response = None
1135         self._past_versions = set()
1136
1137         self.lock = threading.RLock()
1138         self.events = None
1139
1140         if manifest_locator_or_text:
1141             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1142                 self._manifest_locator = manifest_locator_or_text
1143             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1144                 self._manifest_locator = manifest_locator_or_text
1145                 if not self._has_local_collection_uuid():
1146                     self._has_remote_blocks = True
1147             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1148                 self._manifest_text = manifest_locator_or_text
1149                 if '+R' in self._manifest_text:
1150                     self._has_remote_blocks = True
1151             else:
1152                 raise errors.ArgumentError(
1153                     "Argument to CollectionReader is not a manifest or a collection UUID")
1154
1155             try:
1156                 self._populate()
1157             except errors.SyntaxError as e:
1158                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1159
1160     def storage_classes_desired(self) -> List[str]:
1161         """Get this collection's `storage_classes_desired` value"""
1162         return self._storage_classes_desired or []
1163
1164     def root_collection(self) -> 'Collection':
1165         return self
1166
1167     def get_properties(self) -> Properties:
1168         """Get this collection's properties
1169
1170         This method always returns a dict. If this collection object does not
1171         have an associated API record, or that record does not have any
1172         properties set, this method returns an empty dict.
1173         """
1174         if self._api_response and self._api_response["properties"]:
1175             return self._api_response["properties"]
1176         else:
1177             return {}
1178
1179     def get_trash_at(self) -> Optional[datetime.datetime]:
1180         """Get this collection's `trash_at` field
1181
1182         This method parses the `trash_at` field of the collection's API
1183         record and returns a datetime from it. If that field is not set, or
1184         this collection object does not have an associated API record,
1185         returns None.
1186         """
1187         if self._api_response and self._api_response["trash_at"]:
1188             try:
1189                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1190             except ValueError:
1191                 return None
1192         else:
1193             return None
1194
1195     def stream_name(self) -> str:
1196         return "."
1197
1198     def writable(self) -> bool:
1199         return True
1200
1201     @synchronized
1202     def known_past_version(
1203             self,
1204             modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1205     ) -> bool:
1206         """Indicate whether an API record for this collection has been seen before
1207
1208         As this collection object loads records from the API server, it records
1209         their `modified_at` and `portable_data_hash` fields. This method accepts
1210         a 2-tuple with values for those fields, and returns `True` if the
1211         combination was previously loaded.
1212         """
1213         return modified_at_and_portable_data_hash in self._past_versions
1214
1215     @synchronized
1216     @retry_method
1217     def update(
1218             self,
1219             other: Optional['Collection']=None,
1220             num_retries: Optional[int]=None,
1221     ) -> None:
1222         """Merge another collection's contents into this one
1223
1224         This method compares the manifest of this collection instance with
1225         another, then updates this instance's manifest with changes from the
1226         other, renaming files to flag conflicts where necessary.
1227
1228         When called without any arguments, this method reloads the collection's
1229         API record, and updates this instance with any changes that have
1230         appeared server-side. If this instance does not have a corresponding
1231         API record, this method raises `arvados.errors.ArgumentError`.
1232
1233         Arguments:
1234
1235         * other: arvados.collection.Collection | None --- The collection
1236           whose contents should be merged into this instance. When not
1237           provided, this method reloads this collection's API record and
1238           constructs a Collection object from it.  If this instance does not
1239           have a corresponding API record, this method raises
1240           `arvados.errors.ArgumentError`.
1241
1242         * num_retries: int | None --- The number of times to retry reloading
1243           the collection's API record from the API server. If not specified,
1244           uses the `num_retries` provided when this instance was constructed.
1245         """
1246         if other is None:
1247             if self._manifest_locator is None:
1248                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1249             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1250             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1251                 response.get("portable_data_hash") != self.portable_data_hash()):
1252                 # The record on the server is different from our current one, but we've seen it before,
1253                 # so ignore it because it's already been merged.
1254                 # However, if it's the same as our current record, proceed with the update, because we want to update
1255                 # our tokens.
1256                 return
1257             else:
1258                 self._remember_api_response(response)
1259             other = CollectionReader(response["manifest_text"])
1260         baseline = CollectionReader(self._manifest_text)
1261         self.apply(baseline.diff(other))
1262         self._manifest_text = self.manifest_text()
1263
1264     @synchronized
1265     def _my_api(self):
1266         if self._api_client is None:
1267             self._api_client = ThreadSafeApiCache(self._config, version='v1')
1268             if self._keep_client is None:
1269                 self._keep_client = self._api_client.keep
1270         return self._api_client
1271
1272     @synchronized
1273     def _my_keep(self):
1274         if self._keep_client is None:
1275             if self._api_client is None:
1276                 self._my_api()
1277             else:
1278                 self._keep_client = KeepClient(api_client=self._api_client)
1279         return self._keep_client
1280
1281     @synchronized
1282     def _my_block_manager(self):
1283         if self._block_manager is None:
1284             copies = (self.replication_desired or
1285                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1286                                                    2))
1287             self._block_manager = _BlockManager(self._my_keep(),
1288                                                 copies=copies,
1289                                                 put_threads=self.put_threads,
1290                                                 num_retries=self.num_retries,
1291                                                 storage_classes_func=self.storage_classes_desired)
1292         return self._block_manager
1293
1294     def _remember_api_response(self, response):
1295         self._api_response = response
1296         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1297
1298     def _populate_from_api_server(self):
1299         # As in KeepClient itself, we must wait until the last
1300         # possible moment to instantiate an API client, in order to
1301         # avoid tripping up clients that don't have access to an API
1302         # server.  If we do build one, make sure our Keep client uses
1303         # it.  If instantiation fails, we'll fall back to the except
1304         # clause, just like any other Collection lookup
1305         # failure. Return an exception, or None if successful.
1306         self._remember_api_response(self._my_api().collections().get(
1307             uuid=self._manifest_locator).execute(
1308                 num_retries=self.num_retries))
1309         self._manifest_text = self._api_response['manifest_text']
1310         self._portable_data_hash = self._api_response['portable_data_hash']
1311         # If not overriden via kwargs, we should try to load the
1312         # replication_desired and storage_classes_desired from the API server
1313         if self.replication_desired is None:
1314             self.replication_desired = self._api_response.get('replication_desired', None)
1315         if self._storage_classes_desired is None:
1316             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1317
1318     def _populate(self):
1319         if self._manifest_text is None:
1320             if self._manifest_locator is None:
1321                 return
1322             else:
1323                 self._populate_from_api_server()
1324         self._baseline_manifest = self._manifest_text
1325         self._import_manifest(self._manifest_text)
1326
1327     def _has_collection_uuid(self):
1328         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1329
1330     def _has_local_collection_uuid(self):
1331         return self._has_collection_uuid and \
1332             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1333
1334     def __enter__(self):
1335         return self
1336
1337     def __exit__(self, exc_type, exc_value, traceback):
1338         """Exit a context with this collection instance
1339
1340         If no exception was raised inside the context block, and this
1341         collection is writable and has a corresponding API record, that
1342         record will be updated to match the state of this instance at the end
1343         of the block.
1344         """
1345         if exc_type is None:
1346             if self.writable() and self._has_collection_uuid():
1347                 self.save()
1348         self.stop_threads()
1349
1350     def stop_threads(self) -> None:
1351         """Stop background Keep upload/download threads"""
1352         if self._block_manager is not None:
1353             self._block_manager.stop_threads()
1354
1355     @synchronized
1356     def manifest_locator(self) -> Optional[str]:
1357         """Get this collection's manifest locator, if any
1358
1359         * If this collection instance is associated with an API record with a
1360           UUID, return that.
1361         * Otherwise, if this collection instance was loaded from an API record
1362           by portable data hash, return that.
1363         * Otherwise, return `None`.
1364         """
1365         return self._manifest_locator
1366
1367     @synchronized
1368     def clone(
1369             self,
1370             new_parent: Optional['Collection']=None,
1371             new_name: Optional[str]=None,
1372             readonly: bool=False,
1373             new_config: Optional[Mapping[str, str]]=None,
1374     ) -> 'Collection':
1375         """Create a Collection object with the same contents as this instance
1376
1377         This method creates a new Collection object with contents that match
1378         this instance's. The new collection will not be associated with any API
1379         record.
1380
1381         Arguments:
1382
1383         * new_parent: arvados.collection.Collection | None --- This value is
1384           passed to the new Collection's constructor as the `parent`
1385           argument.
1386
1387         * new_name: str | None --- This value is unused.
1388
1389         * readonly: bool --- If this value is true, this method constructs and
1390           returns a `CollectionReader`. Otherwise, it returns a mutable
1391           `Collection`. Default `False`.
1392
1393         * new_config: Mapping[str, str] | None --- This value is passed to the
1394           new Collection's constructor as `apiconfig`. If no value is provided,
1395           defaults to the configuration passed to this instance's constructor.
1396         """
1397         if new_config is None:
1398             new_config = self._config
1399         if readonly:
1400             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1401         else:
1402             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1403
1404         newcollection._clonefrom(self)
1405         return newcollection
1406
1407     @synchronized
1408     def api_response(self) -> Optional[Dict[str, Any]]:
1409         """Get this instance's associated API record
1410
1411         If this Collection instance has an associated API record, return it.
1412         Otherwise, return `None`.
1413         """
1414         return self._api_response
1415
1416     def find_or_create(
1417             self,
1418             path: str,
1419             create_type: CreateType,
1420     ) -> CollectionItem:
1421         if path == ".":
1422             return self
1423         else:
1424             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1425
1426     def find(self, path: str) -> CollectionItem:
1427         if path == ".":
1428             return self
1429         else:
1430             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1431
1432     def remove(self, path: str, recursive: bool=False) -> None:
1433         if path == ".":
1434             raise errors.ArgumentError("Cannot remove '.'")
1435         else:
1436             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1437
1438     @must_be_writable
1439     @synchronized
1440     @retry_method
1441     def save(
1442             self,
1443             properties: Optional[Properties]=None,
1444             storage_classes: Optional[StorageClasses]=None,
1445             trash_at: Optional[datetime.datetime]=None,
1446             merge: bool=True,
1447             num_retries: Optional[int]=None,
1448             preserve_version: bool=False,
1449     ) -> str:
1450         """Save collection to an existing API record
1451
1452         This method updates the instance's corresponding API record to match
1453         the instance's state. If this instance does not have a corresponding API
1454         record yet, raises `AssertionError`. (To create a new API record, use
1455         `Collection.save_new`.) This method returns the saved collection
1456         manifest.
1457
1458         Arguments:
1459
1460         * properties: dict[str, Any] | None --- If provided, the API record will
1461           be updated with these properties. Note this will completely replace
1462           any existing properties.
1463
1464         * storage_classes: list[str] | None --- If provided, the API record will
1465           be updated with this value in the `storage_classes_desired` field.
1466           This value will also be saved on the instance and used for any
1467           changes that follow.
1468
1469         * trash_at: datetime.datetime | None --- If provided, the API record
1470           will be updated with this value in the `trash_at` field.
1471
1472         * merge: bool --- If `True` (the default), this method will first
1473           reload this collection's API record, and merge any new contents into
1474           this instance before saving changes. See `Collection.update` for
1475           details.
1476
1477         * num_retries: int | None --- The number of times to retry reloading
1478           the collection's API record from the API server. If not specified,
1479           uses the `num_retries` provided when this instance was constructed.
1480
1481         * preserve_version: bool --- This value will be passed to directly
1482           to the underlying API call. If `True`, the Arvados API will
1483           preserve the versions of this collection both immediately before
1484           and after the update. If `True` when the API server is not
1485           configured with collection versioning, this method raises
1486           `arvados.errors.ArgumentError`.
1487         """
1488         if properties and type(properties) is not dict:
1489             raise errors.ArgumentError("properties must be dictionary type.")
1490
1491         if storage_classes and type(storage_classes) is not list:
1492             raise errors.ArgumentError("storage_classes must be list type.")
1493         if storage_classes:
1494             self._storage_classes_desired = storage_classes
1495
1496         if trash_at and type(trash_at) is not datetime.datetime:
1497             raise errors.ArgumentError("trash_at must be datetime type.")
1498
1499         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1500             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1501
1502         body={}
1503         if properties:
1504             body["properties"] = properties
1505         if self.storage_classes_desired():
1506             body["storage_classes_desired"] = self.storage_classes_desired()
1507         if trash_at:
1508             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1509             body["trash_at"] = t
1510         if preserve_version:
1511             body["preserve_version"] = preserve_version
1512
1513         if not self.committed():
1514             if self._has_remote_blocks:
1515                 # Copy any remote blocks to the local cluster.
1516                 self._copy_remote_blocks(remote_blocks={})
1517                 self._has_remote_blocks = False
1518             if not self._has_collection_uuid():
1519                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1520             elif not self._has_local_collection_uuid():
1521                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1522
1523             self._my_block_manager().commit_all()
1524
1525             if merge:
1526                 self.update()
1527
1528             text = self.manifest_text(strip=False)
1529             body['manifest_text'] = text
1530
1531             self._remember_api_response(self._my_api().collections().update(
1532                 uuid=self._manifest_locator,
1533                 body=body
1534                 ).execute(num_retries=num_retries))
1535             self._manifest_text = self._api_response["manifest_text"]
1536             self._portable_data_hash = self._api_response["portable_data_hash"]
1537             self.set_committed(True)
1538         elif body:
1539             self._remember_api_response(self._my_api().collections().update(
1540                 uuid=self._manifest_locator,
1541                 body=body
1542                 ).execute(num_retries=num_retries))
1543
1544         return self._manifest_text
1545
1546
1547     @must_be_writable
1548     @synchronized
1549     @retry_method
1550     def save_new(
1551             self,
1552             name: Optional[str]=None,
1553             create_collection_record: bool=True,
1554             owner_uuid: Optional[str]=None,
1555             properties: Optional[Properties]=None,
1556             storage_classes: Optional[StorageClasses]=None,
1557             trash_at: Optional[datetime.datetime]=None,
1558             ensure_unique_name: bool=False,
1559             num_retries: Optional[int]=None,
1560             preserve_version: bool=False,
1561     ):
1562         """Save collection to a new API record
1563
1564         This method finishes uploading new data blocks and (optionally)
1565         creates a new API collection record with the provided data. If a new
1566         record is created, this instance becomes associated with that record
1567         for future updates like `save()`. This method returns the saved
1568         collection manifest.
1569
1570         Arguments:
1571
1572         * name: str | None --- The `name` field to use on the new collection
1573           record. If not specified, a generic default name is generated.
1574
1575         * create_collection_record: bool --- If `True` (the default), creates a
1576           collection record on the API server. If `False`, the method finishes
1577           all data uploads and only returns the resulting collection manifest
1578           without sending it to the API server.
1579
1580         * owner_uuid: str | None --- The `owner_uuid` field to use on the
1581           new collection record.
1582
1583         * properties: dict[str, Any] | None --- The `properties` field to use on
1584           the new collection record.
1585
1586         * storage_classes: list[str] | None --- The
1587           `storage_classes_desired` field to use on the new collection record.
1588
1589         * trash_at: datetime.datetime | None --- The `trash_at` field to use
1590           on the new collection record.
1591
1592         * ensure_unique_name: bool --- This value is passed directly to the
1593           Arvados API when creating the collection record. If `True`, the API
1594           server may modify the submitted `name` to ensure the collection's
1595           `name`+`owner_uuid` combination is unique. If `False` (the default),
1596           if a collection already exists with this same `name`+`owner_uuid`
1597           combination, creating a collection record will raise a validation
1598           error.
1599
1600         * num_retries: int | None --- The number of times to retry reloading
1601           the collection's API record from the API server. If not specified,
1602           uses the `num_retries` provided when this instance was constructed.
1603
1604         * preserve_version: bool --- This value will be passed to directly
1605           to the underlying API call. If `True`, the Arvados API will
1606           preserve the versions of this collection both immediately before
1607           and after the update. If `True` when the API server is not
1608           configured with collection versioning, this method raises
1609           `arvados.errors.ArgumentError`.
1610         """
1611         if properties and type(properties) is not dict:
1612             raise errors.ArgumentError("properties must be dictionary type.")
1613
1614         if storage_classes and type(storage_classes) is not list:
1615             raise errors.ArgumentError("storage_classes must be list type.")
1616
1617         if trash_at and type(trash_at) is not datetime.datetime:
1618             raise errors.ArgumentError("trash_at must be datetime type.")
1619
1620         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1621             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1622
1623         if self._has_remote_blocks:
1624             # Copy any remote blocks to the local cluster.
1625             self._copy_remote_blocks(remote_blocks={})
1626             self._has_remote_blocks = False
1627
1628         if storage_classes:
1629             self._storage_classes_desired = storage_classes
1630
1631         self._my_block_manager().commit_all()
1632         text = self.manifest_text(strip=False)
1633
1634         if create_collection_record:
1635             if name is None:
1636                 name = "New collection"
1637                 ensure_unique_name = True
1638
1639             body = {"manifest_text": text,
1640                     "name": name,
1641                     "replication_desired": self.replication_desired}
1642             if owner_uuid:
1643                 body["owner_uuid"] = owner_uuid
1644             if properties:
1645                 body["properties"] = properties
1646             if self.storage_classes_desired():
1647                 body["storage_classes_desired"] = self.storage_classes_desired()
1648             if trash_at:
1649                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1650                 body["trash_at"] = t
1651             if preserve_version:
1652                 body["preserve_version"] = preserve_version
1653
1654             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1655             text = self._api_response["manifest_text"]
1656
1657             self._manifest_locator = self._api_response["uuid"]
1658             self._portable_data_hash = self._api_response["portable_data_hash"]
1659
1660             self._manifest_text = text
1661             self.set_committed(True)
1662
1663         return text
1664
1665     _token_re = re.compile(r'(\S+)(\s+|$)')
1666     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1667     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1668
1669     def _unescape_manifest_path(self, path):
1670         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1671
1672     @synchronized
1673     def _import_manifest(self, manifest_text):
1674         """Import a manifest into a `Collection`.
1675
1676         :manifest_text:
1677           The manifest text to import from.
1678
1679         """
1680         if len(self) > 0:
1681             raise ArgumentError("Can only import manifest into an empty collection")
1682
1683         STREAM_NAME = 0
1684         BLOCKS = 1
1685         SEGMENTS = 2
1686
1687         stream_name = None
1688         state = STREAM_NAME
1689
1690         for token_and_separator in self._token_re.finditer(manifest_text):
1691             tok = token_and_separator.group(1)
1692             sep = token_and_separator.group(2)
1693
1694             if state == STREAM_NAME:
1695                 # starting a new stream
1696                 stream_name = self._unescape_manifest_path(tok)
1697                 blocks = []
1698                 segments = []
1699                 streamoffset = 0
1700                 state = BLOCKS
1701                 self.find_or_create(stream_name, COLLECTION)
1702                 continue
1703
1704             if state == BLOCKS:
1705                 block_locator = self._block_re.match(tok)
1706                 if block_locator:
1707                     blocksize = int(block_locator.group(1))
1708                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1709                     streamoffset += blocksize
1710                 else:
1711                     state = SEGMENTS
1712
1713             if state == SEGMENTS:
1714                 file_segment = self._segment_re.match(tok)
1715                 if file_segment:
1716                     pos = int(file_segment.group(1))
1717                     size = int(file_segment.group(2))
1718                     name = self._unescape_manifest_path(file_segment.group(3))
1719                     if name.split('/')[-1] == '.':
1720                         # placeholder for persisting an empty directory, not a real file
1721                         if len(name) > 2:
1722                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1723                     else:
1724                         filepath = os.path.join(stream_name, name)
1725                         try:
1726                             afile = self.find_or_create(filepath, FILE)
1727                         except IOError as e:
1728                             if e.errno == errno.ENOTDIR:
1729                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1730                             else:
1731                                 raise e from None
1732                         if isinstance(afile, ArvadosFile):
1733                             afile.add_segment(blocks, pos, size)
1734                         else:
1735                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1736                 else:
1737                     # error!
1738                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1739
1740             if sep == "\n":
1741                 stream_name = None
1742                 state = STREAM_NAME
1743
1744         self.set_committed(True)
1745
1746     @synchronized
1747     def notify(
1748             self,
1749             event: ChangeType,
1750             collection: 'RichCollectionBase',
1751             name: str,
1752             item: CollectionItem,
1753     ) -> None:
1754         if self._callback:
1755             self._callback(event, collection, name, item)
1756
1757
1758 class Subcollection(RichCollectionBase):
1759     """Read and manipulate a stream/directory within an Arvados collection
1760
1761     This class represents a single stream (like a directory) within an Arvados
1762     `Collection`. It is returned by `Collection.find` and provides the same API.
1763     Operations that work on the API collection record propagate to the parent
1764     `Collection` object.
1765     """
1766
1767     def __init__(self, parent, name):
1768         super(Subcollection, self).__init__(parent)
1769         self.lock = self.root_collection().lock
1770         self._manifest_text = None
1771         self.name = name
1772         self.num_retries = parent.num_retries
1773
1774     def root_collection(self) -> 'Collection':
1775         return self.parent.root_collection()
1776
1777     def writable(self) -> bool:
1778         return self.root_collection().writable()
1779
1780     def _my_api(self):
1781         return self.root_collection()._my_api()
1782
1783     def _my_keep(self):
1784         return self.root_collection()._my_keep()
1785
1786     def _my_block_manager(self):
1787         return self.root_collection()._my_block_manager()
1788
1789     def stream_name(self) -> str:
1790         return os.path.join(self.parent.stream_name(), self.name)
1791
1792     @synchronized
1793     def clone(
1794             self,
1795             new_parent: Optional['Collection']=None,
1796             new_name: Optional[str]=None,
1797     ) -> 'Subcollection':
1798         c = Subcollection(new_parent, new_name)
1799         c._clonefrom(self)
1800         return c
1801
1802     @must_be_writable
1803     @synchronized
1804     def _reparent(self, newparent, newname):
1805         self.set_committed(False)
1806         self.flush()
1807         self.parent.remove(self.name, recursive=True)
1808         self.parent = newparent
1809         self.name = newname
1810         self.lock = self.parent.root_collection().lock
1811
1812     @synchronized
1813     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1814         """Encode empty directories by using an \056-named (".") empty file"""
1815         if len(self._items) == 0:
1816             return "%s %s 0:0:\\056\n" % (
1817                 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1818         return super(Subcollection, self)._get_manifest_text(stream_name,
1819                                                              strip, normalize,
1820                                                              only_committed)
1821
1822
1823 class CollectionReader(Collection):
1824     """Read-only `Collection` subclass
1825
1826     This class will never create or update any API collection records. You can
1827     use this class for additional code safety when you only need to read
1828     existing collections.
1829     """
1830     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1831         self._in_init = True
1832         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1833         self._in_init = False
1834
1835         # Forego any locking since it should never change once initialized.
1836         self.lock = NoopLock()
1837
1838         # Backwards compatability with old CollectionReader
1839         # all_streams() and all_files()
1840         self._streams = None
1841
1842     def writable(self) -> bool:
1843         return self._in_init
1844
1845     def _populate_streams(orig_func):
1846         @functools.wraps(orig_func)
1847         def populate_streams_wrapper(self, *args, **kwargs):
1848             # Defer populating self._streams until needed since it creates a copy of the manifest.
1849             if self._streams is None:
1850                 if self._manifest_text:
1851                     self._streams = [sline.split()
1852                                      for sline in self._manifest_text.split("\n")
1853                                      if sline]
1854                 else:
1855                     self._streams = []
1856             return orig_func(self, *args, **kwargs)
1857         return populate_streams_wrapper
1858
1859     @arvados.util._deprecated('3.0', 'Collection iteration')
1860     @_populate_streams
1861     def normalize(self):
1862         """Normalize the streams returned by `all_streams`"""
1863         streams = {}
1864         for s in self.all_streams():
1865             for f in s.all_files():
1866                 streamname, filename = split(s.name() + "/" + f.name())
1867                 if streamname not in streams:
1868                     streams[streamname] = {}
1869                 if filename not in streams[streamname]:
1870                     streams[streamname][filename] = []
1871                 for r in f.segments:
1872                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1873
1874         self._streams = [normalize_stream(s, streams[s])
1875                          for s in sorted(streams)]
1876
1877     @arvados.util._deprecated('3.0', 'Collection iteration')
1878     @_populate_streams
1879     def all_streams(self):
1880         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1881                 for s in self._streams]
1882
1883     @arvados.util._deprecated('3.0', 'Collection iteration')
1884     @_populate_streams
1885     def all_files(self):
1886         for s in self.all_streams():
1887             for f in s.all_files():
1888                 yield f
1889
1890
1891 class CollectionWriter(CollectionBase):
1892     """Create a new collection from scratch
1893
1894     .. WARNING:: Deprecated
1895        This class is deprecated. Prefer `arvados.collection.Collection`
1896        instead.
1897     """
1898
1899     @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1900     def __init__(self, api_client=None, num_retries=0, replication=None):
1901         """Instantiate a CollectionWriter.
1902
1903         CollectionWriter lets you build a new Arvados Collection from scratch.
1904         Write files to it.  The CollectionWriter will upload data to Keep as
1905         appropriate, and provide you with the Collection manifest text when
1906         you're finished.
1907
1908         Arguments:
1909         * api_client: The API client to use to look up Collections.  If not
1910           provided, CollectionReader will build one from available Arvados
1911           configuration.
1912         * num_retries: The default number of times to retry failed
1913           service requests.  Default 0.  You may change this value
1914           after instantiation, but note those changes may not
1915           propagate to related objects like the Keep client.
1916         * replication: The number of copies of each block to store.
1917           If this argument is None or not supplied, replication is
1918           the server-provided default if available, otherwise 2.
1919         """
1920         self._api_client = api_client
1921         self.num_retries = num_retries
1922         self.replication = (2 if replication is None else replication)
1923         self._keep_client = None
1924         self._data_buffer = []
1925         self._data_buffer_len = 0
1926         self._current_stream_files = []
1927         self._current_stream_length = 0
1928         self._current_stream_locators = []
1929         self._current_stream_name = '.'
1930         self._current_file_name = None
1931         self._current_file_pos = 0
1932         self._finished_streams = []
1933         self._close_file = None
1934         self._queued_file = None
1935         self._queued_dirents = deque()
1936         self._queued_trees = deque()
1937         self._last_open = None
1938
1939     def __exit__(self, exc_type, exc_value, traceback):
1940         if exc_type is None:
1941             self.finish()
1942
1943     def do_queued_work(self):
1944         # The work queue consists of three pieces:
1945         # * _queued_file: The file object we're currently writing to the
1946         #   Collection.
1947         # * _queued_dirents: Entries under the current directory
1948         #   (_queued_trees[0]) that we want to write or recurse through.
1949         #   This may contain files from subdirectories if
1950         #   max_manifest_depth == 0 for this directory.
1951         # * _queued_trees: Directories that should be written as separate
1952         #   streams to the Collection.
1953         # This function handles the smallest piece of work currently queued
1954         # (current file, then current directory, then next directory) until
1955         # no work remains.  The _work_THING methods each do a unit of work on
1956         # THING.  _queue_THING methods add a THING to the work queue.
1957         while True:
1958             if self._queued_file:
1959                 self._work_file()
1960             elif self._queued_dirents:
1961                 self._work_dirents()
1962             elif self._queued_trees:
1963                 self._work_trees()
1964             else:
1965                 break
1966
1967     def _work_file(self):
1968         while True:
1969             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1970             if not buf:
1971                 break
1972             self.write(buf)
1973         self.finish_current_file()
1974         if self._close_file:
1975             self._queued_file.close()
1976         self._close_file = None
1977         self._queued_file = None
1978
1979     def _work_dirents(self):
1980         path, stream_name, max_manifest_depth = self._queued_trees[0]
1981         if stream_name != self.current_stream_name():
1982             self.start_new_stream(stream_name)
1983         while self._queued_dirents:
1984             dirent = self._queued_dirents.popleft()
1985             target = os.path.join(path, dirent)
1986             if os.path.isdir(target):
1987                 self._queue_tree(target,
1988                                  os.path.join(stream_name, dirent),
1989                                  max_manifest_depth - 1)
1990             else:
1991                 self._queue_file(target, dirent)
1992                 break
1993         if not self._queued_dirents:
1994             self._queued_trees.popleft()
1995
1996     def _work_trees(self):
1997         path, stream_name, max_manifest_depth = self._queued_trees[0]
1998         d = arvados.util.listdir_recursive(
1999             path, max_depth = (None if max_manifest_depth == 0 else 0))
2000         if d:
2001             self._queue_dirents(stream_name, d)
2002         else:
2003             self._queued_trees.popleft()
2004
2005     def _queue_file(self, source, filename=None):
2006         assert (self._queued_file is None), "tried to queue more than one file"
2007         if not hasattr(source, 'read'):
2008             source = open(source, 'rb')
2009             self._close_file = True
2010         else:
2011             self._close_file = False
2012         if filename is None:
2013             filename = os.path.basename(source.name)
2014         self.start_new_file(filename)
2015         self._queued_file = source
2016
2017     def _queue_dirents(self, stream_name, dirents):
2018         assert (not self._queued_dirents), "tried to queue more than one tree"
2019         self._queued_dirents = deque(sorted(dirents))
2020
2021     def _queue_tree(self, path, stream_name, max_manifest_depth):
2022         self._queued_trees.append((path, stream_name, max_manifest_depth))
2023
2024     def write_file(self, source, filename=None):
2025         self._queue_file(source, filename)
2026         self.do_queued_work()
2027
2028     def write_directory_tree(self,
2029                              path, stream_name='.', max_manifest_depth=-1):
2030         self._queue_tree(path, stream_name, max_manifest_depth)
2031         self.do_queued_work()
2032
2033     def write(self, newdata):
2034         if isinstance(newdata, bytes):
2035             pass
2036         elif isinstance(newdata, str):
2037             newdata = newdata.encode()
2038         elif hasattr(newdata, '__iter__'):
2039             for s in newdata:
2040                 self.write(s)
2041             return
2042         self._data_buffer.append(newdata)
2043         self._data_buffer_len += len(newdata)
2044         self._current_stream_length += len(newdata)
2045         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2046             self.flush_data()
2047
2048     def open(self, streampath, filename=None):
2049         """open(streampath[, filename]) -> file-like object
2050
2051         Pass in the path of a file to write to the Collection, either as a
2052         single string or as two separate stream name and file name arguments.
2053         This method returns a file-like object you can write to add it to the
2054         Collection.
2055
2056         You may only have one file object from the Collection open at a time,
2057         so be sure to close the object when you're done.  Using the object in
2058         a with statement makes that easy:
2059
2060             with cwriter.open('./doc/page1.txt') as outfile:
2061                 outfile.write(page1_data)
2062             with cwriter.open('./doc/page2.txt') as outfile:
2063                 outfile.write(page2_data)
2064         """
2065         if filename is None:
2066             streampath, filename = split(streampath)
2067         if self._last_open and not self._last_open.closed:
2068             raise errors.AssertionError(
2069                 u"can't open '{}' when '{}' is still open".format(
2070                     filename, self._last_open.name))
2071         if streampath != self.current_stream_name():
2072             self.start_new_stream(streampath)
2073         self.set_current_file_name(filename)
2074         self._last_open = _WriterFile(self, filename)
2075         return self._last_open
2076
2077     def flush_data(self):
2078         data_buffer = b''.join(self._data_buffer)
2079         if data_buffer:
2080             self._current_stream_locators.append(
2081                 self._my_keep().put(
2082                     data_buffer[0:config.KEEP_BLOCK_SIZE],
2083                     copies=self.replication))
2084             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2085             self._data_buffer_len = len(self._data_buffer[0])
2086
2087     def start_new_file(self, newfilename=None):
2088         self.finish_current_file()
2089         self.set_current_file_name(newfilename)
2090
2091     def set_current_file_name(self, newfilename):
2092         if re.search(r'[\t\n]', newfilename):
2093             raise errors.AssertionError(
2094                 "Manifest filenames cannot contain whitespace: %s" %
2095                 newfilename)
2096         elif re.search(r'\x00', newfilename):
2097             raise errors.AssertionError(
2098                 "Manifest filenames cannot contain NUL characters: %s" %
2099                 newfilename)
2100         self._current_file_name = newfilename
2101
2102     def current_file_name(self):
2103         return self._current_file_name
2104
2105     def finish_current_file(self):
2106         if self._current_file_name is None:
2107             if self._current_file_pos == self._current_stream_length:
2108                 return
2109             raise errors.AssertionError(
2110                 "Cannot finish an unnamed file " +
2111                 "(%d bytes at offset %d in '%s' stream)" %
2112                 (self._current_stream_length - self._current_file_pos,
2113                  self._current_file_pos,
2114                  self._current_stream_name))
2115         self._current_stream_files.append([
2116                 self._current_file_pos,
2117                 self._current_stream_length - self._current_file_pos,
2118                 self._current_file_name])
2119         self._current_file_pos = self._current_stream_length
2120         self._current_file_name = None
2121
2122     def start_new_stream(self, newstreamname='.'):
2123         self.finish_current_stream()
2124         self.set_current_stream_name(newstreamname)
2125
2126     def set_current_stream_name(self, newstreamname):
2127         if re.search(r'[\t\n]', newstreamname):
2128             raise errors.AssertionError(
2129                 "Manifest stream names cannot contain whitespace: '%s'" %
2130                 (newstreamname))
2131         self._current_stream_name = '.' if newstreamname=='' else newstreamname
2132
2133     def current_stream_name(self):
2134         return self._current_stream_name
2135
2136     def finish_current_stream(self):
2137         self.finish_current_file()
2138         self.flush_data()
2139         if not self._current_stream_files:
2140             pass
2141         elif self._current_stream_name is None:
2142             raise errors.AssertionError(
2143                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
2144                 (self._current_stream_length, len(self._current_stream_files)))
2145         else:
2146             if not self._current_stream_locators:
2147                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2148             self._finished_streams.append([self._current_stream_name,
2149                                            self._current_stream_locators,
2150                                            self._current_stream_files])
2151         self._current_stream_files = []
2152         self._current_stream_length = 0
2153         self._current_stream_locators = []
2154         self._current_stream_name = None
2155         self._current_file_pos = 0
2156         self._current_file_name = None
2157
2158     def finish(self):
2159         """Store the manifest in Keep and return its locator.
2160
2161         This is useful for storing manifest fragments (task outputs)
2162         temporarily in Keep during a Crunch job.
2163
2164         In other cases you should make a collection instead, by
2165         sending manifest_text() to the API server's "create
2166         collection" endpoint.
2167         """
2168         return self._my_keep().put(self.manifest_text().encode(),
2169                                    copies=self.replication)
2170
2171     def portable_data_hash(self):
2172         stripped = self.stripped_manifest().encode()
2173         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2174
2175     def manifest_text(self):
2176         self.finish_current_stream()
2177         manifest = ''
2178
2179         for stream in self._finished_streams:
2180             if not re.search(r'^\.(/.*)?$', stream[0]):
2181                 manifest += './'
2182             manifest += stream[0].replace(' ', '\\040')
2183             manifest += ' ' + ' '.join(stream[1])
2184             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2185             manifest += "\n"
2186
2187         return manifest
2188
2189     def data_locators(self):
2190         ret = []
2191         for name, locators, files in self._finished_streams:
2192             ret += locators
2193         return ret
2194
2195     def save_new(self, name=None):
2196         return self._api_client.collections().create(
2197             ensure_unique_name=True,
2198             body={
2199                 'name': name,
2200                 'manifest_text': self.manifest_text(),
2201             }).execute(num_retries=self.num_retries)
2202
2203
2204 class ResumableCollectionWriter(CollectionWriter):
2205     """CollectionWriter that can serialize internal state to disk
2206
2207     .. WARNING:: Deprecated
2208        This class is deprecated. Prefer `arvados.collection.Collection`
2209        instead.
2210     """
2211
2212     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2213                    '_current_stream_locators', '_current_stream_name',
2214                    '_current_file_name', '_current_file_pos', '_close_file',
2215                    '_data_buffer', '_dependencies', '_finished_streams',
2216                    '_queued_dirents', '_queued_trees']
2217
2218     @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2219     def __init__(self, api_client=None, **kwargs):
2220         self._dependencies = {}
2221         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2222
2223     @classmethod
2224     def from_state(cls, state, *init_args, **init_kwargs):
2225         # Try to build a new writer from scratch with the given state.
2226         # If the state is not suitable to resume (because files have changed,
2227         # been deleted, aren't predictable, etc.), raise a
2228         # StaleWriterStateError.  Otherwise, return the initialized writer.
2229         # The caller is responsible for calling writer.do_queued_work()
2230         # appropriately after it's returned.
2231         writer = cls(*init_args, **init_kwargs)
2232         for attr_name in cls.STATE_PROPS:
2233             attr_value = state[attr_name]
2234             attr_class = getattr(writer, attr_name).__class__
2235             # Coerce the value into the same type as the initial value, if
2236             # needed.
2237             if attr_class not in (type(None), attr_value.__class__):
2238                 attr_value = attr_class(attr_value)
2239             setattr(writer, attr_name, attr_value)
2240         # Check dependencies before we try to resume anything.
2241         if any(KeepLocator(ls).permission_expired()
2242                for ls in writer._current_stream_locators):
2243             raise errors.StaleWriterStateError(
2244                 "locators include expired permission hint")
2245         writer.check_dependencies()
2246         if state['_current_file'] is not None:
2247             path, pos = state['_current_file']
2248             try:
2249                 writer._queued_file = open(path, 'rb')
2250                 writer._queued_file.seek(pos)
2251             except IOError as error:
2252                 raise errors.StaleWriterStateError(
2253                     u"failed to reopen active file {}: {}".format(path, error))
2254         return writer
2255
2256     def check_dependencies(self):
2257         for path, orig_stat in self._dependencies.items():
2258             if not S_ISREG(orig_stat[ST_MODE]):
2259                 raise errors.StaleWriterStateError(u"{} not file".format(path))
2260             try:
2261                 now_stat = tuple(os.stat(path))
2262             except OSError as error:
2263                 raise errors.StaleWriterStateError(
2264                     u"failed to stat {}: {}".format(path, error))
2265             if ((not S_ISREG(now_stat[ST_MODE])) or
2266                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2267                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2268                 raise errors.StaleWriterStateError(u"{} changed".format(path))
2269
2270     def dump_state(self, copy_func=lambda x: x):
2271         state = {attr: copy_func(getattr(self, attr))
2272                  for attr in self.STATE_PROPS}
2273         if self._queued_file is None:
2274             state['_current_file'] = None
2275         else:
2276             state['_current_file'] = (os.path.realpath(self._queued_file.name),
2277                                       self._queued_file.tell())
2278         return state
2279
2280     def _queue_file(self, source, filename=None):
2281         try:
2282             src_path = os.path.realpath(source)
2283         except Exception:
2284             raise errors.AssertionError(u"{} not a file path".format(source))
2285         try:
2286             path_stat = os.stat(src_path)
2287         except OSError as stat_error:
2288             path_stat = None
2289         super(ResumableCollectionWriter, self)._queue_file(source, filename)
2290         fd_stat = os.fstat(self._queued_file.fileno())
2291         if not S_ISREG(fd_stat.st_mode):
2292             # We won't be able to resume from this cache anyway, so don't
2293             # worry about further checks.
2294             self._dependencies[source] = tuple(fd_stat)
2295         elif path_stat is None:
2296             raise errors.AssertionError(
2297                 u"could not stat {}: {}".format(source, stat_error))
2298         elif path_stat.st_ino != fd_stat.st_ino:
2299             raise errors.AssertionError(
2300                 u"{} changed between open and stat calls".format(source))
2301         else:
2302             self._dependencies[src_path] = tuple(fd_stat)
2303
2304     def write(self, data):
2305         if self._queued_file is None:
2306             raise errors.AssertionError(
2307                 "resumable writer can't accept unsourced data")
2308         return super(ResumableCollectionWriter, self).write(data)