15397: Merge branch 'main'
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
5
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
11
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
13 """
14
15 import ciso8601
16 import datetime
17 import errno
18 import functools
19 import hashlib
20 import io
21 import logging
22 import os
23 import re
24 import sys
25 import threading
26 import time
27
28 from collections import deque
29 from stat import *
30
31 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
32 from .keep import KeepLocator, KeepClient
33 from .stream import StreamReader
34 from ._normalize_stream import normalize_stream, escape
35 from ._ranges import Range, LocatorAndRange
36 from .safeapi import ThreadSafeApiCache
37 import arvados.config as config
38 import arvados.errors as errors
39 import arvados.util
40 import arvados.events as events
41 from arvados.retry import retry_method
42
43 from typing import (
44     Any,
45     Callable,
46     Dict,
47     IO,
48     Iterator,
49     List,
50     Mapping,
51     Optional,
52     Tuple,
53     Union,
54 )
55
56 if sys.version_info < (3, 8):
57     from typing_extensions import Literal
58 else:
59     from typing import Literal
60
61 _logger = logging.getLogger('arvados.collection')
62
63 ADD = "add"
64 """Argument value for `Collection` methods to represent an added item"""
65 DEL = "del"
66 """Argument value for `Collection` methods to represent a removed item"""
67 MOD = "mod"
68 """Argument value for `Collection` methods to represent a modified item"""
69 TOK = "tok"
70 """Argument value for `Collection` methods to represent an item with token differences"""
71 FILE = "file"
72 """`create_type` value for `Collection.find_or_create`"""
73 COLLECTION = "collection"
74 """`create_type` value for `Collection.find_or_create`"""
75
76 ChangeList = List[Union[
77     Tuple[Literal[ADD, DEL], str, 'Collection'],
78     Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
79 ]]
80 ChangeType = Literal[ADD, DEL, MOD, TOK]
81 CollectionItem = Union[ArvadosFile, 'Collection']
82 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
83 CreateType = Literal[COLLECTION, FILE]
84 Properties = Dict[str, Any]
85 StorageClasses = List[str]
86
87 class CollectionBase(object):
88     """Abstract base class for Collection classes
89
90     .. ATTENTION:: Internal
91        This class is meant to be used by other parts of the SDK. User code
92        should instantiate or subclass `Collection` or one of its subclasses
93        directly.
94     """
95
96     def __enter__(self):
97         """Enter a context block with this collection instance"""
98         return self
99
100     def __exit__(self, exc_type, exc_value, traceback):
101         """Exit a context block with this collection instance"""
102         pass
103
104     def _my_keep(self):
105         if self._keep_client is None:
106             self._keep_client = KeepClient(api_client=self._api_client,
107                                            num_retries=self.num_retries)
108         return self._keep_client
109
110     def stripped_manifest(self) -> str:
111         """Create a copy of the collection manifest with only size hints
112
113         This method returns a string with the current collection's manifest
114         text with all non-portable locator hints like permission hints and
115         remote cluster hints removed. The only hints in the returned manifest
116         will be size hints.
117         """
118         raw = self.manifest_text()
119         clean = []
120         for line in raw.split("\n"):
121             fields = line.split()
122             if fields:
123                 clean_fields = fields[:1] + [
124                     (re.sub(r'\+[^\d][^\+]*', '', x)
125                      if re.match(arvados.util.keep_locator_pattern, x)
126                      else x)
127                     for x in fields[1:]]
128                 clean += [' '.join(clean_fields), "\n"]
129         return ''.join(clean)
130
131
132 class _WriterFile(_FileLikeObjectBase):
133     def __init__(self, coll_writer, name):
134         super(_WriterFile, self).__init__(name, 'wb')
135         self.dest = coll_writer
136
137     def close(self):
138         super(_WriterFile, self).close()
139         self.dest.finish_current_file()
140
141     @_FileLikeObjectBase._before_close
142     def write(self, data):
143         self.dest.write(data)
144
145     @_FileLikeObjectBase._before_close
146     def writelines(self, seq):
147         for data in seq:
148             self.write(data)
149
150     @_FileLikeObjectBase._before_close
151     def flush(self):
152         self.dest.flush_data()
153
154
155 class RichCollectionBase(CollectionBase):
156     """Base class for Collection classes
157
158     .. ATTENTION:: Internal
159        This class is meant to be used by other parts of the SDK. User code
160        should instantiate or subclass `Collection` or one of its subclasses
161        directly.
162     """
163
164     def __init__(self, parent=None):
165         self.parent = parent
166         self._committed = False
167         self._has_remote_blocks = False
168         self._callback = None
169         self._items = {}
170
171     def _my_api(self):
172         raise NotImplementedError()
173
174     def _my_keep(self):
175         raise NotImplementedError()
176
177     def _my_block_manager(self):
178         raise NotImplementedError()
179
180     def writable(self) -> bool:
181         """Indicate whether this collection object can be modified
182
183         This method returns `False` if this object is a `CollectionReader`,
184         else `True`.
185         """
186         raise NotImplementedError()
187
188     def root_collection(self) -> 'Collection':
189         """Get this collection's root collection object
190
191         If you open a subcollection with `Collection.find`, calling this method
192         on that subcollection returns the source Collection object.
193         """
194         raise NotImplementedError()
195
196     def stream_name(self) -> str:
197         """Get the name of the manifest stream represented by this collection
198
199         If you open a subcollection with `Collection.find`, calling this method
200         on that subcollection returns the name of the stream you opened.
201         """
202         raise NotImplementedError()
203
204     @synchronized
205     def has_remote_blocks(self) -> bool:
206         """Indiciate whether the collection refers to remote data
207
208         Returns `True` if the collection manifest includes any Keep locators
209         with a remote hint (`+R`), else `False`.
210         """
211         if self._has_remote_blocks:
212             return True
213         for item in self:
214             if self[item].has_remote_blocks():
215                 return True
216         return False
217
218     @synchronized
219     def set_has_remote_blocks(self, val: bool) -> None:
220         """Cache whether this collection refers to remote blocks
221
222         .. ATTENTION:: Internal
223            This method is only meant to be used by other Collection methods.
224
225         Set this collection's cached "has remote blocks" flag to the given
226         value.
227         """
228         self._has_remote_blocks = val
229         if self.parent:
230             self.parent.set_has_remote_blocks(val)
231
232     @must_be_writable
233     @synchronized
234     def find_or_create(
235             self,
236             path: str,
237             create_type: CreateType,
238     ) -> CollectionItem:
239         """Get the item at the given path, creating it if necessary
240
241         If `path` refers to a stream in this collection, returns a
242         corresponding `Subcollection` object. If `path` refers to a file in
243         this collection, returns a corresponding
244         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
245         this collection, then this method creates a new object and returns
246         it, creating parent streams as needed. The type of object created is
247         determined by the value of `create_type`.
248
249         Arguments:
250
251         * path: str --- The path to find or create within this collection.
252
253         * create_type: Literal[COLLECTION, FILE] --- The type of object to
254           create at `path` if one does not exist. Passing `COLLECTION`
255           creates a stream and returns the corresponding
256           `Subcollection`. Passing `FILE` creates a new file and returns the
257           corresponding `arvados.arvfile.ArvadosFile`.
258         """
259         pathcomponents = path.split("/", 1)
260         if pathcomponents[0]:
261             item = self._items.get(pathcomponents[0])
262             if len(pathcomponents) == 1:
263                 if item is None:
264                     # create new file
265                     if create_type == COLLECTION:
266                         item = Subcollection(self, pathcomponents[0])
267                     else:
268                         item = ArvadosFile(self, pathcomponents[0])
269                     self._items[pathcomponents[0]] = item
270                     self.set_committed(False)
271                     self.notify(ADD, self, pathcomponents[0], item)
272                 return item
273             else:
274                 if item is None:
275                     # create new collection
276                     item = Subcollection(self, pathcomponents[0])
277                     self._items[pathcomponents[0]] = item
278                     self.set_committed(False)
279                     self.notify(ADD, self, pathcomponents[0], item)
280                 if isinstance(item, RichCollectionBase):
281                     return item.find_or_create(pathcomponents[1], create_type)
282                 else:
283                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
284         else:
285             return self
286
287     @synchronized
288     def find(self, path: str) -> CollectionItem:
289         """Get the item at the given path
290
291         If `path` refers to a stream in this collection, returns a
292         corresponding `Subcollection` object. If `path` refers to a file in
293         this collection, returns a corresponding
294         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
295         this collection, then this method raises `NotADirectoryError`.
296
297         Arguments:
298
299         * path: str --- The path to find or create within this collection.
300         """
301         if not path:
302             raise errors.ArgumentError("Parameter 'path' is empty.")
303
304         pathcomponents = path.split("/", 1)
305         if pathcomponents[0] == '':
306             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
307
308         item = self._items.get(pathcomponents[0])
309         if item is None:
310             return None
311         elif len(pathcomponents) == 1:
312             return item
313         else:
314             if isinstance(item, RichCollectionBase):
315                 if pathcomponents[1]:
316                     return item.find(pathcomponents[1])
317                 else:
318                     return item
319             else:
320                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
321
322     @synchronized
323     def mkdirs(self, path: str) -> 'Subcollection':
324         """Create and return a subcollection at `path`
325
326         If `path` exists within this collection, raises `FileExistsError`.
327         Otherwise, creates a stream at that path and returns the
328         corresponding `Subcollection`.
329         """
330         if self.find(path) != None:
331             raise IOError(errno.EEXIST, "Directory or file exists", path)
332
333         return self.find_or_create(path, COLLECTION)
334
335     def open(
336             self,
337             path: str,
338             mode: str="r",
339             encoding: Optional[str]=None,
340     ) -> IO:
341         """Open a file-like object within the collection
342
343         This method returns a file-like object that can read and/or write the
344         file located at `path` within the collection. If you attempt to write
345         a `path` that does not exist, the file is created with `find_or_create`.
346         If the file cannot be opened for any other reason, this method raises
347         `OSError` with an appropriate errno.
348
349         Arguments:
350
351         * path: str --- The path of the file to open within this collection
352
353         * mode: str --- The mode to open this file. Supports all the same
354           values as `builtins.open`.
355
356         * encoding: str | None --- The text encoding of the file. Only used
357           when the file is opened in text mode. The default is
358           platform-dependent.
359         """
360         if not re.search(r'^[rwa][bt]?\+?$', mode):
361             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
362
363         if mode[0] == 'r' and '+' not in mode:
364             fclass = ArvadosFileReader
365             arvfile = self.find(path)
366         elif not self.writable():
367             raise IOError(errno.EROFS, "Collection is read only")
368         else:
369             fclass = ArvadosFileWriter
370             arvfile = self.find_or_create(path, FILE)
371
372         if arvfile is None:
373             raise IOError(errno.ENOENT, "File not found", path)
374         if not isinstance(arvfile, ArvadosFile):
375             raise IOError(errno.EISDIR, "Is a directory", path)
376
377         if mode[0] == 'w':
378             arvfile.truncate(0)
379
380         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
381         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
382         if 'b' not in mode:
383             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
384             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
385         return f
386
387     def modified(self) -> bool:
388         """Indicate whether this collection has an API server record
389
390         Returns `False` if this collection corresponds to a record loaded from
391         the API server, `True` otherwise.
392         """
393         return not self.committed()
394
395     @synchronized
396     def committed(self):
397         """Indicate whether this collection has an API server record
398
399         Returns `True` if this collection corresponds to a record loaded from
400         the API server, `False` otherwise.
401         """
402         return self._committed
403
404     @synchronized
405     def set_committed(self, value: bool=True):
406         """Cache whether this collection has an API server record
407
408         .. ATTENTION:: Internal
409            This method is only meant to be used by other Collection methods.
410
411         Set this collection's cached "committed" flag to the given
412         value and propagates it as needed.
413         """
414         if value == self._committed:
415             return
416         if value:
417             for k,v in self._items.items():
418                 v.set_committed(True)
419             self._committed = True
420         else:
421             self._committed = False
422             if self.parent is not None:
423                 self.parent.set_committed(False)
424
425     @synchronized
426     def __iter__(self) -> Iterator[str]:
427         """Iterate names of streams and files in this collection
428
429         This method does not recurse. It only iterates the contents of this
430         collection's corresponding stream.
431         """
432         return iter(self._items)
433
434     @synchronized
435     def __getitem__(self, k: str) -> CollectionItem:
436         """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
437
438         This method does not recurse. If you want to search a path, use
439         `RichCollectionBase.find` instead.
440         """
441         return self._items[k]
442
443     @synchronized
444     def __contains__(self, k: str) -> bool:
445         """Indicate whether this collection has an item with this name
446
447         This method does not recurse. It you want to check a path, use
448         `RichCollectionBase.exists` instead.
449         """
450         return k in self._items
451
452     @synchronized
453     def __len__(self):
454         """Get the number of items directly contained in this collection
455
456         This method does not recurse. It only counts the streams and files
457         in this collection's corresponding stream.
458         """
459         return len(self._items)
460
461     @must_be_writable
462     @synchronized
463     def __delitem__(self, p: str) -> None:
464         """Delete an item from this collection's stream
465
466         This method does not recurse. If you want to remove an item by a
467         path, use `RichCollectionBase.remove` instead.
468         """
469         del self._items[p]
470         self.set_committed(False)
471         self.notify(DEL, self, p, None)
472
473     @synchronized
474     def keys(self) -> Iterator[str]:
475         """Iterate names of streams and files in this collection
476
477         This method does not recurse. It only iterates the contents of this
478         collection's corresponding stream.
479         """
480         return self._items.keys()
481
482     @synchronized
483     def values(self) -> List[CollectionItem]:
484         """Get a list of objects in this collection's stream
485
486         The return value includes a `Subcollection` for every stream, and an
487         `arvados.arvfile.ArvadosFile` for every file, directly within this
488         collection's stream.  This method does not recurse.
489         """
490         return list(self._items.values())
491
492     @synchronized
493     def items(self) -> List[Tuple[str, CollectionItem]]:
494         """Get a list of `(name, object)` tuples from this collection's stream
495
496         The return value includes a `Subcollection` for every stream, and an
497         `arvados.arvfile.ArvadosFile` for every file, directly within this
498         collection's stream.  This method does not recurse.
499         """
500         return list(self._items.items())
501
502     def exists(self, path: str) -> bool:
503         """Indicate whether this collection includes an item at `path`
504
505         This method returns `True` if `path` refers to a stream or file within
506         this collection, else `False`.
507
508         Arguments:
509
510         * path: str --- The path to check for existence within this collection
511         """
512         return self.find(path) is not None
513
514     @must_be_writable
515     @synchronized
516     def remove(self, path: str, recursive: bool=False) -> None:
517         """Remove the file or stream at `path`
518
519         Arguments:
520
521         * path: str --- The path of the item to remove from the collection
522
523         * recursive: bool --- Controls the method's behavior if `path` refers
524           to a nonempty stream. If `False` (the default), this method raises
525           `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
526           items under the stream.
527         """
528         if not path:
529             raise errors.ArgumentError("Parameter 'path' is empty.")
530
531         pathcomponents = path.split("/", 1)
532         item = self._items.get(pathcomponents[0])
533         if item is None:
534             raise IOError(errno.ENOENT, "File not found", path)
535         if len(pathcomponents) == 1:
536             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
537                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
538             deleteditem = self._items[pathcomponents[0]]
539             del self._items[pathcomponents[0]]
540             self.set_committed(False)
541             self.notify(DEL, self, pathcomponents[0], deleteditem)
542         else:
543             item.remove(pathcomponents[1], recursive=recursive)
544
545     def _clonefrom(self, source):
546         for k,v in source.items():
547             self._items[k] = v.clone(self, k)
548
549     def clone(self):
550         raise NotImplementedError()
551
552     @must_be_writable
553     @synchronized
554     def add(
555             self,
556             source_obj: CollectionItem,
557             target_name: str,
558             overwrite: bool=False,
559             reparent: bool=False,
560     ) -> None:
561         """Copy or move a file or subcollection object to this collection
562
563         Arguments:
564
565         * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
566           to add to this collection
567
568         * target_name: str --- The path inside this collection where
569           `source_obj` should be added.
570
571         * overwrite: bool --- Controls the behavior of this method when the
572           collection already contains an object at `target_name`. If `False`
573           (the default), this method will raise `FileExistsError`. If `True`,
574           the object at `target_name` will be replaced with `source_obj`.
575
576         * reparent: bool --- Controls whether this method copies or moves
577           `source_obj`. If `False` (the default), `source_obj` is copied into
578           this collection. If `True`, `source_obj` is moved into this
579           collection.
580         """
581         if target_name in self and not overwrite:
582             raise IOError(errno.EEXIST, "File already exists", target_name)
583
584         modified_from = None
585         if target_name in self:
586             modified_from = self[target_name]
587
588         # Actually make the move or copy.
589         if reparent:
590             source_obj._reparent(self, target_name)
591             item = source_obj
592         else:
593             item = source_obj.clone(self, target_name)
594
595         self._items[target_name] = item
596         self.set_committed(False)
597         if not self._has_remote_blocks and source_obj.has_remote_blocks():
598             self.set_has_remote_blocks(True)
599
600         if modified_from:
601             self.notify(MOD, self, target_name, (modified_from, item))
602         else:
603             self.notify(ADD, self, target_name, item)
604
605     def _get_src_target(self, source, target_path, source_collection, create_dest):
606         if source_collection is None:
607             source_collection = self
608
609         # Find the object
610         if isinstance(source, str):
611             source_obj = source_collection.find(source)
612             if source_obj is None:
613                 raise IOError(errno.ENOENT, "File not found", source)
614             sourcecomponents = source.split("/")
615         else:
616             source_obj = source
617             sourcecomponents = None
618
619         # Find parent collection the target path
620         targetcomponents = target_path.split("/")
621
622         # Determine the name to use.
623         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
624
625         if not target_name:
626             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
627
628         if create_dest:
629             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
630         else:
631             if len(targetcomponents) > 1:
632                 target_dir = self.find("/".join(targetcomponents[0:-1]))
633             else:
634                 target_dir = self
635
636         if target_dir is None:
637             raise IOError(errno.ENOENT, "Target directory not found", target_name)
638
639         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
640             target_dir = target_dir[target_name]
641             target_name = sourcecomponents[-1]
642
643         return (source_obj, target_dir, target_name)
644
645     @must_be_writable
646     @synchronized
647     def copy(
648             self,
649             source: Union[str, CollectionItem],
650             target_path: str,
651             source_collection: Optional['RichCollectionBase']=None,
652             overwrite: bool=False,
653     ) -> None:
654         """Copy a file or subcollection object to this collection
655
656         Arguments:
657
658         * source: str | arvados.arvfile.ArvadosFile |
659           arvados.collection.Subcollection --- The file or subcollection to
660           add to this collection. If `source` is a str, the object will be
661           found by looking up this path from `source_collection` (see
662           below).
663
664         * target_path: str --- The path inside this collection where the
665           source object should be added.
666
667         * source_collection: arvados.collection.Collection | None --- The
668           collection to find the source object from when `source` is a
669           path. Defaults to the current collection (`self`).
670
671         * overwrite: bool --- Controls the behavior of this method when the
672           collection already contains an object at `target_path`. If `False`
673           (the default), this method will raise `FileExistsError`. If `True`,
674           the object at `target_path` will be replaced with `source_obj`.
675         """
676         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
677         target_dir.add(source_obj, target_name, overwrite, False)
678
679     @must_be_writable
680     @synchronized
681     def rename(
682             self,
683             source: Union[str, CollectionItem],
684             target_path: str,
685             source_collection: Optional['RichCollectionBase']=None,
686             overwrite: bool=False,
687     ) -> None:
688         """Move a file or subcollection object to this collection
689
690         Arguments:
691
692         * source: str | arvados.arvfile.ArvadosFile |
693           arvados.collection.Subcollection --- The file or subcollection to
694           add to this collection. If `source` is a str, the object will be
695           found by looking up this path from `source_collection` (see
696           below).
697
698         * target_path: str --- The path inside this collection where the
699           source object should be added.
700
701         * source_collection: arvados.collection.Collection | None --- The
702           collection to find the source object from when `source` is a
703           path. Defaults to the current collection (`self`).
704
705         * overwrite: bool --- Controls the behavior of this method when the
706           collection already contains an object at `target_path`. If `False`
707           (the default), this method will raise `FileExistsError`. If `True`,
708           the object at `target_path` will be replaced with `source_obj`.
709         """
710         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
711         if not source_obj.writable():
712             raise IOError(errno.EROFS, "Source collection is read only", source)
713         target_dir.add(source_obj, target_name, overwrite, True)
714
715     def portable_manifest_text(self, stream_name: str=".") -> str:
716         """Get the portable manifest text for this collection
717
718         The portable manifest text is normalized, and does not include access
719         tokens. This method does not flush outstanding blocks to Keep.
720
721         Arguments:
722
723         * stream_name: str --- The name to use for this collection's stream in
724           the generated manifest. Default `'.'`.
725         """
726         return self._get_manifest_text(stream_name, True, True)
727
728     @synchronized
729     def manifest_text(
730             self,
731             stream_name: str=".",
732             strip: bool=False,
733             normalize: bool=False,
734             only_committed: bool=False,
735     ) -> str:
736         """Get the manifest text for this collection
737
738         Arguments:
739
740         * stream_name: str --- The name to use for this collection's stream in
741           the generated manifest. Default `'.'`.
742
743         * strip: bool --- Controls whether or not the returned manifest text
744           includes access tokens. If `False` (the default), the manifest text
745           will include access tokens. If `True`, the manifest text will not
746           include access tokens.
747
748         * normalize: bool --- Controls whether or not the returned manifest
749           text is normalized. Default `False`.
750
751         * only_committed: bool --- Controls whether or not this method uploads
752           pending data to Keep before building and returning the manifest text.
753           If `False` (the default), this method will finish uploading all data
754           to Keep, then return the final manifest. If `True`, this method will
755           build and return a manifest that only refers to the data that has
756           finished uploading at the time this method was called.
757         """
758         if not only_committed:
759             self._my_block_manager().commit_all()
760         return self._get_manifest_text(stream_name, strip, normalize,
761                                        only_committed=only_committed)
762
763     @synchronized
764     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
765         """Get the manifest text for this collection, sub collections and files.
766
767         :stream_name:
768           Name to use for this stream (directory)
769
770         :strip:
771           If True, remove signing tokens from block locators if present.
772           If False (default), block locators are left unchanged.
773
774         :normalize:
775           If True, always export the manifest text in normalized form
776           even if the Collection is not modified.  If False (default) and the collection
777           is not modified, return the original manifest text even if it is not
778           in normalized form.
779
780         :only_committed:
781           If True, only include blocks that were already committed to Keep.
782
783         """
784
785         if not self.committed() or self._manifest_text is None or normalize:
786             stream = {}
787             buf = []
788             sorted_keys = sorted(self.keys())
789             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
790                 # Create a stream per file `k`
791                 arvfile = self[filename]
792                 filestream = []
793                 for segment in arvfile.segments():
794                     loc = segment.locator
795                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
796                         if only_committed:
797                             continue
798                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
799                     if strip:
800                         loc = KeepLocator(loc).stripped()
801                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
802                                          segment.segment_offset, segment.range_size))
803                 stream[filename] = filestream
804             if stream:
805                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
806             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
807                 buf.append(self[dirname].manifest_text(
808                     stream_name=os.path.join(stream_name, dirname),
809                     strip=strip, normalize=True, only_committed=only_committed))
810             return "".join(buf)
811         else:
812             if strip:
813                 return self.stripped_manifest()
814             else:
815                 return self._manifest_text
816
817     @synchronized
818     def _copy_remote_blocks(self, remote_blocks={}):
819         """Scan through the entire collection and ask Keep to copy remote blocks.
820
821         When accessing a remote collection, blocks will have a remote signature
822         (+R instead of +A). Collect these signatures and request Keep to copy the
823         blocks to the local cluster, returning local (+A) signatures.
824
825         :remote_blocks:
826           Shared cache of remote to local block mappings. This is used to avoid
827           doing extra work when blocks are shared by more than one file in
828           different subdirectories.
829
830         """
831         for item in self:
832             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
833         return remote_blocks
834
835     @synchronized
836     def diff(
837             self,
838             end_collection: 'RichCollectionBase',
839             prefix: str=".",
840             holding_collection: Optional['Collection']=None,
841     ) -> ChangeList:
842         """Build a list of differences between this collection and another
843
844         Arguments:
845
846         * end_collection: arvados.collection.RichCollectionBase --- A
847           collection object with the desired end state. The returned diff
848           list will describe how to go from the current collection object
849           `self` to `end_collection`.
850
851         * prefix: str --- The name to use for this collection's stream in
852           the diff list. Default `'.'`.
853
854         * holding_collection: arvados.collection.Collection | None --- A
855           collection object used to hold objects for the returned diff
856           list. By default, a new empty collection is created.
857         """
858         changes = []
859         if holding_collection is None:
860             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
861         for k in self:
862             if k not in end_collection:
863                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
864         for k in end_collection:
865             if k in self:
866                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
867                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
868                 elif end_collection[k] != self[k]:
869                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
870                 else:
871                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
872             else:
873                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
874         return changes
875
876     @must_be_writable
877     @synchronized
878     def apply(self, changes: ChangeList) -> None:
879         """Apply a list of changes from to this collection
880
881         This method takes a list of changes generated by
882         `RichCollectionBase.diff` and applies it to this
883         collection. Afterward, the state of this collection object will
884         match the state of `end_collection` passed to `diff`. If a change
885         conflicts with a local change, it will be saved to an alternate path
886         indicating the conflict.
887
888         Arguments:
889
890         * changes: arvados.collection.ChangeList --- The list of differences
891           generated by `RichCollectionBase.diff`.
892         """
893         if changes:
894             self.set_committed(False)
895         for change in changes:
896             event_type = change[0]
897             path = change[1]
898             initial = change[2]
899             local = self.find(path)
900             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
901                                                                     time.gmtime()))
902             if event_type == ADD:
903                 if local is None:
904                     # No local file at path, safe to copy over new file
905                     self.copy(initial, path)
906                 elif local is not None and local != initial:
907                     # There is already local file and it is different:
908                     # save change to conflict file.
909                     self.copy(initial, conflictpath)
910             elif event_type == MOD or event_type == TOK:
911                 final = change[3]
912                 if local == initial:
913                     # Local matches the "initial" item so it has not
914                     # changed locally and is safe to update.
915                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
916                         # Replace contents of local file with new contents
917                         local.replace_contents(final)
918                     else:
919                         # Overwrite path with new item; this can happen if
920                         # path was a file and is now a collection or vice versa
921                         self.copy(final, path, overwrite=True)
922                 else:
923                     # Local is missing (presumably deleted) or local doesn't
924                     # match the "start" value, so save change to conflict file
925                     self.copy(final, conflictpath)
926             elif event_type == DEL:
927                 if local == initial:
928                     # Local item matches "initial" value, so it is safe to remove.
929                     self.remove(path, recursive=True)
930                 # else, the file is modified or already removed, in either
931                 # case we don't want to try to remove it.
932
933     def portable_data_hash(self) -> str:
934         """Get the portable data hash for this collection's manifest"""
935         if self._manifest_locator and self.committed():
936             # If the collection is already saved on the API server, and it's committed
937             # then return API server's PDH response.
938             return self._portable_data_hash
939         else:
940             stripped = self.portable_manifest_text().encode()
941             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
942
943     @synchronized
944     def subscribe(self, callback: ChangeCallback) -> None:
945         """Set a notify callback for changes to this collection
946
947         Arguments:
948
949         * callback: arvados.collection.ChangeCallback --- The callable to
950           call each time the collection is changed.
951         """
952         if self._callback is None:
953             self._callback = callback
954         else:
955             raise errors.ArgumentError("A callback is already set on this collection.")
956
957     @synchronized
958     def unsubscribe(self) -> None:
959         """Remove any notify callback set for changes to this collection"""
960         if self._callback is not None:
961             self._callback = None
962
963     @synchronized
964     def notify(
965             self,
966             event: ChangeType,
967             collection: 'RichCollectionBase',
968             name: str,
969             item: CollectionItem,
970     ) -> None:
971         """Notify any subscribed callback about a change to this collection
972
973         .. ATTENTION:: Internal
974            This method is only meant to be used by other Collection methods.
975
976         If a callback has been registered with `RichCollectionBase.subscribe`,
977         it will be called with information about a change to this collection.
978         Then this notification will be propagated to this collection's root.
979
980         Arguments:
981
982         * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
983           the collection.
984
985         * collection: arvados.collection.RichCollectionBase --- The
986           collection that was modified.
987
988         * name: str --- The name of the file or stream within `collection` that
989           was modified.
990
991         * item: arvados.arvfile.ArvadosFile |
992           arvados.collection.Subcollection --- The new contents at `name`
993           within `collection`.
994         """
995         if self._callback:
996             self._callback(event, collection, name, item)
997         self.root_collection().notify(event, collection, name, item)
998
999     @synchronized
1000     def __eq__(self, other: Any) -> bool:
1001         """Indicate whether this collection object is equal to another"""
1002         if other is self:
1003             return True
1004         if not isinstance(other, RichCollectionBase):
1005             return False
1006         if len(self._items) != len(other):
1007             return False
1008         for k in self._items:
1009             if k not in other:
1010                 return False
1011             if self._items[k] != other[k]:
1012                 return False
1013         return True
1014
1015     def __ne__(self, other: Any) -> bool:
1016         """Indicate whether this collection object is not equal to another"""
1017         return not self.__eq__(other)
1018
1019     @synchronized
1020     def flush(self) -> None:
1021         """Upload any pending data to Keep"""
1022         for e in self.values():
1023             e.flush()
1024
1025
1026 class Collection(RichCollectionBase):
1027     """Read and manipulate an Arvados collection
1028
1029     This class provides a high-level interface to create, read, and update
1030     Arvados collections and their contents. Refer to the Arvados Python SDK
1031     cookbook for [an introduction to using the Collection class][cookbook].
1032
1033     [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1034     """
1035
1036     def __init__(self, manifest_locator_or_text: Optional[str]=None,
1037                  api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1038                  keep_client: Optional['arvados.keep.KeepClient']=None,
1039                  num_retries: int=10,
1040                  parent: Optional['Collection']=None,
1041                  apiconfig: Optional[Mapping[str, str]]=None,
1042                  block_manager: Optional['arvados.arvfile._BlockManager']=None,
1043                  replication_desired: Optional[int]=None,
1044                  storage_classes_desired: Optional[List[str]]=None,
1045                  put_threads: Optional[int]=None):
1046         """Initialize a Collection object
1047
1048         Arguments:
1049
1050         * manifest_locator_or_text: str | None --- This string can contain a
1051           collection manifest text, portable data hash, or UUID. When given a
1052           portable data hash or UUID, this instance will load a collection
1053           record from the API server. Otherwise, this instance will represent a
1054           new collection without an API server record. The default value `None`
1055           instantiates a new collection with an empty manifest.
1056
1057         * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1058           Arvados API client object this instance uses to make requests. If
1059           none is given, this instance creates its own client using the
1060           settings from `apiconfig` (see below). If your client instantiates
1061           many Collection objects, you can help limit memory utilization by
1062           calling `arvados.api.api` to construct an
1063           `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1064           for every Collection.
1065
1066         * keep_client: arvados.keep.KeepClient | None --- The Keep client
1067           object this instance uses to make requests. If none is given, this
1068           instance creates its own client using its `api_client`.
1069
1070         * num_retries: int --- The number of times that client requests are
1071           retried. Default 10.
1072
1073         * parent: arvados.collection.Collection | None --- The parent Collection
1074           object of this instance, if any. This argument is primarily used by
1075           other Collection methods; user client code shouldn't need to use it.
1076
1077         * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1078           `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1079           `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1080           Collection object constructs one from these settings. If no
1081           mapping is provided, calls `arvados.config.settings` to get these
1082           parameters from user configuration.
1083
1084         * block_manager: arvados.arvfile._BlockManager | None --- The
1085           _BlockManager object used by this instance to coordinate reading
1086           and writing Keep data blocks. If none is given, this instance
1087           constructs its own. This argument is primarily used by other
1088           Collection methods; user client code shouldn't need to use it.
1089
1090         * replication_desired: int | None --- This controls both the value of
1091           the `replication_desired` field on API collection records saved by
1092           this class, as well as the number of Keep services that the object
1093           writes new data blocks to. If none is given, uses the default value
1094           configured for the cluster.
1095
1096         * storage_classes_desired: list[str] | None --- This controls both
1097           the value of the `storage_classes_desired` field on API collection
1098           records saved by this class, as well as selecting which specific
1099           Keep services the object writes new data blocks to. If none is
1100           given, defaults to an empty list.
1101
1102         * put_threads: int | None --- The number of threads to run
1103           simultaneously to upload data blocks to Keep. This value is used when
1104           building a new `block_manager`. It is unused when a `block_manager`
1105           is provided.
1106         """
1107
1108         if storage_classes_desired and type(storage_classes_desired) is not list:
1109             raise errors.ArgumentError("storage_classes_desired must be list type.")
1110
1111         super(Collection, self).__init__(parent)
1112         self._api_client = api_client
1113         self._keep_client = keep_client
1114
1115         # Use the keep client from ThreadSafeApiCache
1116         if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1117             self._keep_client = self._api_client.keep
1118
1119         self._block_manager = block_manager
1120         self.replication_desired = replication_desired
1121         self._storage_classes_desired = storage_classes_desired
1122         self.put_threads = put_threads
1123
1124         if apiconfig:
1125             self._config = apiconfig
1126         else:
1127             self._config = config.settings()
1128
1129         self.num_retries = num_retries
1130         self._manifest_locator = None
1131         self._manifest_text = None
1132         self._portable_data_hash = None
1133         self._api_response = None
1134         self._past_versions = set()
1135
1136         self.lock = threading.RLock()
1137         self.events = None
1138
1139         if manifest_locator_or_text:
1140             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1141                 self._manifest_locator = manifest_locator_or_text
1142             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1143                 self._manifest_locator = manifest_locator_or_text
1144                 if not self._has_local_collection_uuid():
1145                     self._has_remote_blocks = True
1146             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1147                 self._manifest_text = manifest_locator_or_text
1148                 if '+R' in self._manifest_text:
1149                     self._has_remote_blocks = True
1150             else:
1151                 raise errors.ArgumentError(
1152                     "Argument to CollectionReader is not a manifest or a collection UUID")
1153
1154             try:
1155                 self._populate()
1156             except errors.SyntaxError as e:
1157                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1158
1159     def storage_classes_desired(self) -> List[str]:
1160         """Get this collection's `storage_classes_desired` value"""
1161         return self._storage_classes_desired or []
1162
1163     def root_collection(self) -> 'Collection':
1164         return self
1165
1166     def get_properties(self) -> Properties:
1167         """Get this collection's properties
1168
1169         This method always returns a dict. If this collection object does not
1170         have an associated API record, or that record does not have any
1171         properties set, this method returns an empty dict.
1172         """
1173         if self._api_response and self._api_response["properties"]:
1174             return self._api_response["properties"]
1175         else:
1176             return {}
1177
1178     def get_trash_at(self) -> Optional[datetime.datetime]:
1179         """Get this collection's `trash_at` field
1180
1181         This method parses the `trash_at` field of the collection's API
1182         record and returns a datetime from it. If that field is not set, or
1183         this collection object does not have an associated API record,
1184         returns None.
1185         """
1186         if self._api_response and self._api_response["trash_at"]:
1187             try:
1188                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1189             except ValueError:
1190                 return None
1191         else:
1192             return None
1193
1194     def stream_name(self) -> str:
1195         return "."
1196
1197     def writable(self) -> bool:
1198         return True
1199
1200     @synchronized
1201     def known_past_version(
1202             self,
1203             modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1204     ) -> bool:
1205         """Indicate whether an API record for this collection has been seen before
1206
1207         As this collection object loads records from the API server, it records
1208         their `modified_at` and `portable_data_hash` fields. This method accepts
1209         a 2-tuple with values for those fields, and returns `True` if the
1210         combination was previously loaded.
1211         """
1212         return modified_at_and_portable_data_hash in self._past_versions
1213
1214     @synchronized
1215     @retry_method
1216     def update(
1217             self,
1218             other: Optional['Collection']=None,
1219             num_retries: Optional[int]=None,
1220     ) -> None:
1221         """Merge another collection's contents into this one
1222
1223         This method compares the manifest of this collection instance with
1224         another, then updates this instance's manifest with changes from the
1225         other, renaming files to flag conflicts where necessary.
1226
1227         When called without any arguments, this method reloads the collection's
1228         API record, and updates this instance with any changes that have
1229         appeared server-side. If this instance does not have a corresponding
1230         API record, this method raises `arvados.errors.ArgumentError`.
1231
1232         Arguments:
1233
1234         * other: arvados.collection.Collection | None --- The collection
1235           whose contents should be merged into this instance. When not
1236           provided, this method reloads this collection's API record and
1237           constructs a Collection object from it.  If this instance does not
1238           have a corresponding API record, this method raises
1239           `arvados.errors.ArgumentError`.
1240
1241         * num_retries: int | None --- The number of times to retry reloading
1242           the collection's API record from the API server. If not specified,
1243           uses the `num_retries` provided when this instance was constructed.
1244         """
1245         if other is None:
1246             if self._manifest_locator is None:
1247                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1248             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1249             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1250                 response.get("portable_data_hash") != self.portable_data_hash()):
1251                 # The record on the server is different from our current one, but we've seen it before,
1252                 # so ignore it because it's already been merged.
1253                 # However, if it's the same as our current record, proceed with the update, because we want to update
1254                 # our tokens.
1255                 return
1256             else:
1257                 self._remember_api_response(response)
1258             other = CollectionReader(response["manifest_text"])
1259         baseline = CollectionReader(self._manifest_text)
1260         self.apply(baseline.diff(other))
1261         self._manifest_text = self.manifest_text()
1262
1263     @synchronized
1264     def _my_api(self):
1265         if self._api_client is None:
1266             self._api_client = ThreadSafeApiCache(self._config, version='v1')
1267             if self._keep_client is None:
1268                 self._keep_client = self._api_client.keep
1269         return self._api_client
1270
1271     @synchronized
1272     def _my_keep(self):
1273         if self._keep_client is None:
1274             if self._api_client is None:
1275                 self._my_api()
1276             else:
1277                 self._keep_client = KeepClient(api_client=self._api_client)
1278         return self._keep_client
1279
1280     @synchronized
1281     def _my_block_manager(self):
1282         if self._block_manager is None:
1283             copies = (self.replication_desired or
1284                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1285                                                    2))
1286             self._block_manager = _BlockManager(self._my_keep(),
1287                                                 copies=copies,
1288                                                 put_threads=self.put_threads,
1289                                                 num_retries=self.num_retries,
1290                                                 storage_classes_func=self.storage_classes_desired)
1291         return self._block_manager
1292
1293     def _remember_api_response(self, response):
1294         self._api_response = response
1295         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1296
1297     def _populate_from_api_server(self):
1298         # As in KeepClient itself, we must wait until the last
1299         # possible moment to instantiate an API client, in order to
1300         # avoid tripping up clients that don't have access to an API
1301         # server.  If we do build one, make sure our Keep client uses
1302         # it.  If instantiation fails, we'll fall back to the except
1303         # clause, just like any other Collection lookup
1304         # failure. Return an exception, or None if successful.
1305         self._remember_api_response(self._my_api().collections().get(
1306             uuid=self._manifest_locator).execute(
1307                 num_retries=self.num_retries))
1308         self._manifest_text = self._api_response['manifest_text']
1309         self._portable_data_hash = self._api_response['portable_data_hash']
1310         # If not overriden via kwargs, we should try to load the
1311         # replication_desired and storage_classes_desired from the API server
1312         if self.replication_desired is None:
1313             self.replication_desired = self._api_response.get('replication_desired', None)
1314         if self._storage_classes_desired is None:
1315             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1316
1317     def _populate(self):
1318         if self._manifest_text is None:
1319             if self._manifest_locator is None:
1320                 return
1321             else:
1322                 self._populate_from_api_server()
1323         self._baseline_manifest = self._manifest_text
1324         self._import_manifest(self._manifest_text)
1325
1326     def _has_collection_uuid(self):
1327         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1328
1329     def _has_local_collection_uuid(self):
1330         return self._has_collection_uuid and \
1331             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1332
1333     def __enter__(self):
1334         return self
1335
1336     def __exit__(self, exc_type, exc_value, traceback):
1337         """Exit a context with this collection instance
1338
1339         If no exception was raised inside the context block, and this
1340         collection is writable and has a corresponding API record, that
1341         record will be updated to match the state of this instance at the end
1342         of the block.
1343         """
1344         if exc_type is None:
1345             if self.writable() and self._has_collection_uuid():
1346                 self.save()
1347         self.stop_threads()
1348
1349     def stop_threads(self) -> None:
1350         """Stop background Keep upload/download threads"""
1351         if self._block_manager is not None:
1352             self._block_manager.stop_threads()
1353
1354     @synchronized
1355     def manifest_locator(self) -> Optional[str]:
1356         """Get this collection's manifest locator, if any
1357
1358         * If this collection instance is associated with an API record with a
1359           UUID, return that.
1360         * Otherwise, if this collection instance was loaded from an API record
1361           by portable data hash, return that.
1362         * Otherwise, return `None`.
1363         """
1364         return self._manifest_locator
1365
1366     @synchronized
1367     def clone(
1368             self,
1369             new_parent: Optional['Collection']=None,
1370             new_name: Optional[str]=None,
1371             readonly: bool=False,
1372             new_config: Optional[Mapping[str, str]]=None,
1373     ) -> 'Collection':
1374         """Create a Collection object with the same contents as this instance
1375
1376         This method creates a new Collection object with contents that match
1377         this instance's. The new collection will not be associated with any API
1378         record.
1379
1380         Arguments:
1381
1382         * new_parent: arvados.collection.Collection | None --- This value is
1383           passed to the new Collection's constructor as the `parent`
1384           argument.
1385
1386         * new_name: str | None --- This value is unused.
1387
1388         * readonly: bool --- If this value is true, this method constructs and
1389           returns a `CollectionReader`. Otherwise, it returns a mutable
1390           `Collection`. Default `False`.
1391
1392         * new_config: Mapping[str, str] | None --- This value is passed to the
1393           new Collection's constructor as `apiconfig`. If no value is provided,
1394           defaults to the configuration passed to this instance's constructor.
1395         """
1396         if new_config is None:
1397             new_config = self._config
1398         if readonly:
1399             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1400         else:
1401             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1402
1403         newcollection._clonefrom(self)
1404         return newcollection
1405
1406     @synchronized
1407     def api_response(self) -> Optional[Dict[str, Any]]:
1408         """Get this instance's associated API record
1409
1410         If this Collection instance has an associated API record, return it.
1411         Otherwise, return `None`.
1412         """
1413         return self._api_response
1414
1415     def find_or_create(
1416             self,
1417             path: str,
1418             create_type: CreateType,
1419     ) -> CollectionItem:
1420         if path == ".":
1421             return self
1422         else:
1423             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1424
1425     def find(self, path: str) -> CollectionItem:
1426         if path == ".":
1427             return self
1428         else:
1429             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1430
1431     def remove(self, path: str, recursive: bool=False) -> None:
1432         if path == ".":
1433             raise errors.ArgumentError("Cannot remove '.'")
1434         else:
1435             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1436
1437     @must_be_writable
1438     @synchronized
1439     @retry_method
1440     def save(
1441             self,
1442             properties: Optional[Properties]=None,
1443             storage_classes: Optional[StorageClasses]=None,
1444             trash_at: Optional[datetime.datetime]=None,
1445             merge: bool=True,
1446             num_retries: Optional[int]=None,
1447             preserve_version: bool=False,
1448     ) -> str:
1449         """Save collection to an existing API record
1450
1451         This method updates the instance's corresponding API record to match
1452         the instance's state. If this instance does not have a corresponding API
1453         record yet, raises `AssertionError`. (To create a new API record, use
1454         `Collection.save_new`.) This method returns the saved collection
1455         manifest.
1456
1457         Arguments:
1458
1459         * properties: dict[str, Any] | None --- If provided, the API record will
1460           be updated with these properties. Note this will completely replace
1461           any existing properties.
1462
1463         * storage_classes: list[str] | None --- If provided, the API record will
1464           be updated with this value in the `storage_classes_desired` field.
1465           This value will also be saved on the instance and used for any
1466           changes that follow.
1467
1468         * trash_at: datetime.datetime | None --- If provided, the API record
1469           will be updated with this value in the `trash_at` field.
1470
1471         * merge: bool --- If `True` (the default), this method will first
1472           reload this collection's API record, and merge any new contents into
1473           this instance before saving changes. See `Collection.update` for
1474           details.
1475
1476         * num_retries: int | None --- The number of times to retry reloading
1477           the collection's API record from the API server. If not specified,
1478           uses the `num_retries` provided when this instance was constructed.
1479
1480         * preserve_version: bool --- This value will be passed to directly
1481           to the underlying API call. If `True`, the Arvados API will
1482           preserve the versions of this collection both immediately before
1483           and after the update. If `True` when the API server is not
1484           configured with collection versioning, this method raises
1485           `arvados.errors.ArgumentError`.
1486         """
1487         if properties and type(properties) is not dict:
1488             raise errors.ArgumentError("properties must be dictionary type.")
1489
1490         if storage_classes and type(storage_classes) is not list:
1491             raise errors.ArgumentError("storage_classes must be list type.")
1492         if storage_classes:
1493             self._storage_classes_desired = storage_classes
1494
1495         if trash_at and type(trash_at) is not datetime.datetime:
1496             raise errors.ArgumentError("trash_at must be datetime type.")
1497
1498         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1499             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1500
1501         body={}
1502         if properties:
1503             body["properties"] = properties
1504         if self.storage_classes_desired():
1505             body["storage_classes_desired"] = self.storage_classes_desired()
1506         if trash_at:
1507             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1508             body["trash_at"] = t
1509         if preserve_version:
1510             body["preserve_version"] = preserve_version
1511
1512         if not self.committed():
1513             if self._has_remote_blocks:
1514                 # Copy any remote blocks to the local cluster.
1515                 self._copy_remote_blocks(remote_blocks={})
1516                 self._has_remote_blocks = False
1517             if not self._has_collection_uuid():
1518                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1519             elif not self._has_local_collection_uuid():
1520                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1521
1522             self._my_block_manager().commit_all()
1523
1524             if merge:
1525                 self.update()
1526
1527             text = self.manifest_text(strip=False)
1528             body['manifest_text'] = text
1529
1530             self._remember_api_response(self._my_api().collections().update(
1531                 uuid=self._manifest_locator,
1532                 body=body
1533                 ).execute(num_retries=num_retries))
1534             self._manifest_text = self._api_response["manifest_text"]
1535             self._portable_data_hash = self._api_response["portable_data_hash"]
1536             self.set_committed(True)
1537         elif body:
1538             self._remember_api_response(self._my_api().collections().update(
1539                 uuid=self._manifest_locator,
1540                 body=body
1541                 ).execute(num_retries=num_retries))
1542
1543         return self._manifest_text
1544
1545
1546     @must_be_writable
1547     @synchronized
1548     @retry_method
1549     def save_new(
1550             self,
1551             name: Optional[str]=None,
1552             create_collection_record: bool=True,
1553             owner_uuid: Optional[str]=None,
1554             properties: Optional[Properties]=None,
1555             storage_classes: Optional[StorageClasses]=None,
1556             trash_at: Optional[datetime.datetime]=None,
1557             ensure_unique_name: bool=False,
1558             num_retries: Optional[int]=None,
1559             preserve_version: bool=False,
1560     ):
1561         """Save collection to a new API record
1562
1563         This method finishes uploading new data blocks and (optionally)
1564         creates a new API collection record with the provided data. If a new
1565         record is created, this instance becomes associated with that record
1566         for future updates like `save()`. This method returns the saved
1567         collection manifest.
1568
1569         Arguments:
1570
1571         * name: str | None --- The `name` field to use on the new collection
1572           record. If not specified, a generic default name is generated.
1573
1574         * create_collection_record: bool --- If `True` (the default), creates a
1575           collection record on the API server. If `False`, the method finishes
1576           all data uploads and only returns the resulting collection manifest
1577           without sending it to the API server.
1578
1579         * owner_uuid: str | None --- The `owner_uuid` field to use on the
1580           new collection record.
1581
1582         * properties: dict[str, Any] | None --- The `properties` field to use on
1583           the new collection record.
1584
1585         * storage_classes: list[str] | None --- The
1586           `storage_classes_desired` field to use on the new collection record.
1587
1588         * trash_at: datetime.datetime | None --- The `trash_at` field to use
1589           on the new collection record.
1590
1591         * ensure_unique_name: bool --- This value is passed directly to the
1592           Arvados API when creating the collection record. If `True`, the API
1593           server may modify the submitted `name` to ensure the collection's
1594           `name`+`owner_uuid` combination is unique. If `False` (the default),
1595           if a collection already exists with this same `name`+`owner_uuid`
1596           combination, creating a collection record will raise a validation
1597           error.
1598
1599         * num_retries: int | None --- The number of times to retry reloading
1600           the collection's API record from the API server. If not specified,
1601           uses the `num_retries` provided when this instance was constructed.
1602
1603         * preserve_version: bool --- This value will be passed to directly
1604           to the underlying API call. If `True`, the Arvados API will
1605           preserve the versions of this collection both immediately before
1606           and after the update. If `True` when the API server is not
1607           configured with collection versioning, this method raises
1608           `arvados.errors.ArgumentError`.
1609         """
1610         if properties and type(properties) is not dict:
1611             raise errors.ArgumentError("properties must be dictionary type.")
1612
1613         if storage_classes and type(storage_classes) is not list:
1614             raise errors.ArgumentError("storage_classes must be list type.")
1615
1616         if trash_at and type(trash_at) is not datetime.datetime:
1617             raise errors.ArgumentError("trash_at must be datetime type.")
1618
1619         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1620             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1621
1622         if self._has_remote_blocks:
1623             # Copy any remote blocks to the local cluster.
1624             self._copy_remote_blocks(remote_blocks={})
1625             self._has_remote_blocks = False
1626
1627         if storage_classes:
1628             self._storage_classes_desired = storage_classes
1629
1630         self._my_block_manager().commit_all()
1631         text = self.manifest_text(strip=False)
1632
1633         if create_collection_record:
1634             if name is None:
1635                 name = "New collection"
1636                 ensure_unique_name = True
1637
1638             body = {"manifest_text": text,
1639                     "name": name,
1640                     "replication_desired": self.replication_desired}
1641             if owner_uuid:
1642                 body["owner_uuid"] = owner_uuid
1643             if properties:
1644                 body["properties"] = properties
1645             if self.storage_classes_desired():
1646                 body["storage_classes_desired"] = self.storage_classes_desired()
1647             if trash_at:
1648                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1649                 body["trash_at"] = t
1650             if preserve_version:
1651                 body["preserve_version"] = preserve_version
1652
1653             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1654             text = self._api_response["manifest_text"]
1655
1656             self._manifest_locator = self._api_response["uuid"]
1657             self._portable_data_hash = self._api_response["portable_data_hash"]
1658
1659             self._manifest_text = text
1660             self.set_committed(True)
1661
1662         return text
1663
1664     _token_re = re.compile(r'(\S+)(\s+|$)')
1665     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1666     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1667
1668     def _unescape_manifest_path(self, path):
1669         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1670
1671     @synchronized
1672     def _import_manifest(self, manifest_text):
1673         """Import a manifest into a `Collection`.
1674
1675         :manifest_text:
1676           The manifest text to import from.
1677
1678         """
1679         if len(self) > 0:
1680             raise ArgumentError("Can only import manifest into an empty collection")
1681
1682         STREAM_NAME = 0
1683         BLOCKS = 1
1684         SEGMENTS = 2
1685
1686         stream_name = None
1687         state = STREAM_NAME
1688
1689         for token_and_separator in self._token_re.finditer(manifest_text):
1690             tok = token_and_separator.group(1)
1691             sep = token_and_separator.group(2)
1692
1693             if state == STREAM_NAME:
1694                 # starting a new stream
1695                 stream_name = self._unescape_manifest_path(tok)
1696                 blocks = []
1697                 segments = []
1698                 streamoffset = 0
1699                 state = BLOCKS
1700                 self.find_or_create(stream_name, COLLECTION)
1701                 continue
1702
1703             if state == BLOCKS:
1704                 block_locator = self._block_re.match(tok)
1705                 if block_locator:
1706                     blocksize = int(block_locator.group(1))
1707                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1708                     streamoffset += blocksize
1709                 else:
1710                     state = SEGMENTS
1711
1712             if state == SEGMENTS:
1713                 file_segment = self._segment_re.match(tok)
1714                 if file_segment:
1715                     pos = int(file_segment.group(1))
1716                     size = int(file_segment.group(2))
1717                     name = self._unescape_manifest_path(file_segment.group(3))
1718                     if name.split('/')[-1] == '.':
1719                         # placeholder for persisting an empty directory, not a real file
1720                         if len(name) > 2:
1721                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1722                     else:
1723                         filepath = os.path.join(stream_name, name)
1724                         try:
1725                             afile = self.find_or_create(filepath, FILE)
1726                         except IOError as e:
1727                             if e.errno == errno.ENOTDIR:
1728                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1729                             else:
1730                                 raise e from None
1731                         if isinstance(afile, ArvadosFile):
1732                             afile.add_segment(blocks, pos, size)
1733                         else:
1734                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1735                 else:
1736                     # error!
1737                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1738
1739             if sep == "\n":
1740                 stream_name = None
1741                 state = STREAM_NAME
1742
1743         self.set_committed(True)
1744
1745     @synchronized
1746     def notify(
1747             self,
1748             event: ChangeType,
1749             collection: 'RichCollectionBase',
1750             name: str,
1751             item: CollectionItem,
1752     ) -> None:
1753         if self._callback:
1754             self._callback(event, collection, name, item)
1755
1756
1757 class Subcollection(RichCollectionBase):
1758     """Read and manipulate a stream/directory within an Arvados collection
1759
1760     This class represents a single stream (like a directory) within an Arvados
1761     `Collection`. It is returned by `Collection.find` and provides the same API.
1762     Operations that work on the API collection record propagate to the parent
1763     `Collection` object.
1764     """
1765
1766     def __init__(self, parent, name):
1767         super(Subcollection, self).__init__(parent)
1768         self.lock = self.root_collection().lock
1769         self._manifest_text = None
1770         self.name = name
1771         self.num_retries = parent.num_retries
1772
1773     def root_collection(self) -> 'Collection':
1774         return self.parent.root_collection()
1775
1776     def writable(self) -> bool:
1777         return self.root_collection().writable()
1778
1779     def _my_api(self):
1780         return self.root_collection()._my_api()
1781
1782     def _my_keep(self):
1783         return self.root_collection()._my_keep()
1784
1785     def _my_block_manager(self):
1786         return self.root_collection()._my_block_manager()
1787
1788     def stream_name(self) -> str:
1789         return os.path.join(self.parent.stream_name(), self.name)
1790
1791     @synchronized
1792     def clone(
1793             self,
1794             new_parent: Optional['Collection']=None,
1795             new_name: Optional[str]=None,
1796     ) -> 'Subcollection':
1797         c = Subcollection(new_parent, new_name)
1798         c._clonefrom(self)
1799         return c
1800
1801     @must_be_writable
1802     @synchronized
1803     def _reparent(self, newparent, newname):
1804         self.set_committed(False)
1805         self.flush()
1806         self.parent.remove(self.name, recursive=True)
1807         self.parent = newparent
1808         self.name = newname
1809         self.lock = self.parent.root_collection().lock
1810
1811     @synchronized
1812     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1813         """Encode empty directories by using an \056-named (".") empty file"""
1814         if len(self._items) == 0:
1815             return "%s %s 0:0:\\056\n" % (
1816                 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1817         return super(Subcollection, self)._get_manifest_text(stream_name,
1818                                                              strip, normalize,
1819                                                              only_committed)
1820
1821
1822 class CollectionReader(Collection):
1823     """Read-only `Collection` subclass
1824
1825     This class will never create or update any API collection records. You can
1826     use this class for additional code safety when you only need to read
1827     existing collections.
1828     """
1829     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1830         self._in_init = True
1831         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1832         self._in_init = False
1833
1834         # Forego any locking since it should never change once initialized.
1835         self.lock = NoopLock()
1836
1837         # Backwards compatability with old CollectionReader
1838         # all_streams() and all_files()
1839         self._streams = None
1840
1841     def writable(self) -> bool:
1842         return self._in_init
1843
1844     def _populate_streams(orig_func):
1845         @functools.wraps(orig_func)
1846         def populate_streams_wrapper(self, *args, **kwargs):
1847             # Defer populating self._streams until needed since it creates a copy of the manifest.
1848             if self._streams is None:
1849                 if self._manifest_text:
1850                     self._streams = [sline.split()
1851                                      for sline in self._manifest_text.split("\n")
1852                                      if sline]
1853                 else:
1854                     self._streams = []
1855             return orig_func(self, *args, **kwargs)
1856         return populate_streams_wrapper
1857
1858     @arvados.util._deprecated('3.0', 'Collection iteration')
1859     @_populate_streams
1860     def normalize(self):
1861         """Normalize the streams returned by `all_streams`"""
1862         streams = {}
1863         for s in self.all_streams():
1864             for f in s.all_files():
1865                 streamname, filename = split(s.name() + "/" + f.name())
1866                 if streamname not in streams:
1867                     streams[streamname] = {}
1868                 if filename not in streams[streamname]:
1869                     streams[streamname][filename] = []
1870                 for r in f.segments:
1871                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1872
1873         self._streams = [normalize_stream(s, streams[s])
1874                          for s in sorted(streams)]
1875
1876     @arvados.util._deprecated('3.0', 'Collection iteration')
1877     @_populate_streams
1878     def all_streams(self):
1879         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1880                 for s in self._streams]
1881
1882     @arvados.util._deprecated('3.0', 'Collection iteration')
1883     @_populate_streams
1884     def all_files(self):
1885         for s in self.all_streams():
1886             for f in s.all_files():
1887                 yield f
1888
1889
1890 class CollectionWriter(CollectionBase):
1891     """Create a new collection from scratch
1892
1893     .. WARNING:: Deprecated
1894        This class is deprecated. Prefer `arvados.collection.Collection`
1895        instead.
1896     """
1897
1898     @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1899     def __init__(self, api_client=None, num_retries=0, replication=None):
1900         """Instantiate a CollectionWriter.
1901
1902         CollectionWriter lets you build a new Arvados Collection from scratch.
1903         Write files to it.  The CollectionWriter will upload data to Keep as
1904         appropriate, and provide you with the Collection manifest text when
1905         you're finished.
1906
1907         Arguments:
1908         * api_client: The API client to use to look up Collections.  If not
1909           provided, CollectionReader will build one from available Arvados
1910           configuration.
1911         * num_retries: The default number of times to retry failed
1912           service requests.  Default 0.  You may change this value
1913           after instantiation, but note those changes may not
1914           propagate to related objects like the Keep client.
1915         * replication: The number of copies of each block to store.
1916           If this argument is None or not supplied, replication is
1917           the server-provided default if available, otherwise 2.
1918         """
1919         self._api_client = api_client
1920         self.num_retries = num_retries
1921         self.replication = (2 if replication is None else replication)
1922         self._keep_client = None
1923         self._data_buffer = []
1924         self._data_buffer_len = 0
1925         self._current_stream_files = []
1926         self._current_stream_length = 0
1927         self._current_stream_locators = []
1928         self._current_stream_name = '.'
1929         self._current_file_name = None
1930         self._current_file_pos = 0
1931         self._finished_streams = []
1932         self._close_file = None
1933         self._queued_file = None
1934         self._queued_dirents = deque()
1935         self._queued_trees = deque()
1936         self._last_open = None
1937
1938     def __exit__(self, exc_type, exc_value, traceback):
1939         if exc_type is None:
1940             self.finish()
1941
1942     def do_queued_work(self):
1943         # The work queue consists of three pieces:
1944         # * _queued_file: The file object we're currently writing to the
1945         #   Collection.
1946         # * _queued_dirents: Entries under the current directory
1947         #   (_queued_trees[0]) that we want to write or recurse through.
1948         #   This may contain files from subdirectories if
1949         #   max_manifest_depth == 0 for this directory.
1950         # * _queued_trees: Directories that should be written as separate
1951         #   streams to the Collection.
1952         # This function handles the smallest piece of work currently queued
1953         # (current file, then current directory, then next directory) until
1954         # no work remains.  The _work_THING methods each do a unit of work on
1955         # THING.  _queue_THING methods add a THING to the work queue.
1956         while True:
1957             if self._queued_file:
1958                 self._work_file()
1959             elif self._queued_dirents:
1960                 self._work_dirents()
1961             elif self._queued_trees:
1962                 self._work_trees()
1963             else:
1964                 break
1965
1966     def _work_file(self):
1967         while True:
1968             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1969             if not buf:
1970                 break
1971             self.write(buf)
1972         self.finish_current_file()
1973         if self._close_file:
1974             self._queued_file.close()
1975         self._close_file = None
1976         self._queued_file = None
1977
1978     def _work_dirents(self):
1979         path, stream_name, max_manifest_depth = self._queued_trees[0]
1980         if stream_name != self.current_stream_name():
1981             self.start_new_stream(stream_name)
1982         while self._queued_dirents:
1983             dirent = self._queued_dirents.popleft()
1984             target = os.path.join(path, dirent)
1985             if os.path.isdir(target):
1986                 self._queue_tree(target,
1987                                  os.path.join(stream_name, dirent),
1988                                  max_manifest_depth - 1)
1989             else:
1990                 self._queue_file(target, dirent)
1991                 break
1992         if not self._queued_dirents:
1993             self._queued_trees.popleft()
1994
1995     def _work_trees(self):
1996         path, stream_name, max_manifest_depth = self._queued_trees[0]
1997         d = arvados.util.listdir_recursive(
1998             path, max_depth = (None if max_manifest_depth == 0 else 0))
1999         if d:
2000             self._queue_dirents(stream_name, d)
2001         else:
2002             self._queued_trees.popleft()
2003
2004     def _queue_file(self, source, filename=None):
2005         assert (self._queued_file is None), "tried to queue more than one file"
2006         if not hasattr(source, 'read'):
2007             source = open(source, 'rb')
2008             self._close_file = True
2009         else:
2010             self._close_file = False
2011         if filename is None:
2012             filename = os.path.basename(source.name)
2013         self.start_new_file(filename)
2014         self._queued_file = source
2015
2016     def _queue_dirents(self, stream_name, dirents):
2017         assert (not self._queued_dirents), "tried to queue more than one tree"
2018         self._queued_dirents = deque(sorted(dirents))
2019
2020     def _queue_tree(self, path, stream_name, max_manifest_depth):
2021         self._queued_trees.append((path, stream_name, max_manifest_depth))
2022
2023     def write_file(self, source, filename=None):
2024         self._queue_file(source, filename)
2025         self.do_queued_work()
2026
2027     def write_directory_tree(self,
2028                              path, stream_name='.', max_manifest_depth=-1):
2029         self._queue_tree(path, stream_name, max_manifest_depth)
2030         self.do_queued_work()
2031
2032     def write(self, newdata):
2033         if isinstance(newdata, bytes):
2034             pass
2035         elif isinstance(newdata, str):
2036             newdata = newdata.encode()
2037         elif hasattr(newdata, '__iter__'):
2038             for s in newdata:
2039                 self.write(s)
2040             return
2041         self._data_buffer.append(newdata)
2042         self._data_buffer_len += len(newdata)
2043         self._current_stream_length += len(newdata)
2044         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2045             self.flush_data()
2046
2047     def open(self, streampath, filename=None):
2048         """open(streampath[, filename]) -> file-like object
2049
2050         Pass in the path of a file to write to the Collection, either as a
2051         single string or as two separate stream name and file name arguments.
2052         This method returns a file-like object you can write to add it to the
2053         Collection.
2054
2055         You may only have one file object from the Collection open at a time,
2056         so be sure to close the object when you're done.  Using the object in
2057         a with statement makes that easy:
2058
2059             with cwriter.open('./doc/page1.txt') as outfile:
2060                 outfile.write(page1_data)
2061             with cwriter.open('./doc/page2.txt') as outfile:
2062                 outfile.write(page2_data)
2063         """
2064         if filename is None:
2065             streampath, filename = split(streampath)
2066         if self._last_open and not self._last_open.closed:
2067             raise errors.AssertionError(
2068                 u"can't open '{}' when '{}' is still open".format(
2069                     filename, self._last_open.name))
2070         if streampath != self.current_stream_name():
2071             self.start_new_stream(streampath)
2072         self.set_current_file_name(filename)
2073         self._last_open = _WriterFile(self, filename)
2074         return self._last_open
2075
2076     def flush_data(self):
2077         data_buffer = b''.join(self._data_buffer)
2078         if data_buffer:
2079             self._current_stream_locators.append(
2080                 self._my_keep().put(
2081                     data_buffer[0:config.KEEP_BLOCK_SIZE],
2082                     copies=self.replication))
2083             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2084             self._data_buffer_len = len(self._data_buffer[0])
2085
2086     def start_new_file(self, newfilename=None):
2087         self.finish_current_file()
2088         self.set_current_file_name(newfilename)
2089
2090     def set_current_file_name(self, newfilename):
2091         if re.search(r'[\t\n]', newfilename):
2092             raise errors.AssertionError(
2093                 "Manifest filenames cannot contain whitespace: %s" %
2094                 newfilename)
2095         elif re.search(r'\x00', newfilename):
2096             raise errors.AssertionError(
2097                 "Manifest filenames cannot contain NUL characters: %s" %
2098                 newfilename)
2099         self._current_file_name = newfilename
2100
2101     def current_file_name(self):
2102         return self._current_file_name
2103
2104     def finish_current_file(self):
2105         if self._current_file_name is None:
2106             if self._current_file_pos == self._current_stream_length:
2107                 return
2108             raise errors.AssertionError(
2109                 "Cannot finish an unnamed file " +
2110                 "(%d bytes at offset %d in '%s' stream)" %
2111                 (self._current_stream_length - self._current_file_pos,
2112                  self._current_file_pos,
2113                  self._current_stream_name))
2114         self._current_stream_files.append([
2115                 self._current_file_pos,
2116                 self._current_stream_length - self._current_file_pos,
2117                 self._current_file_name])
2118         self._current_file_pos = self._current_stream_length
2119         self._current_file_name = None
2120
2121     def start_new_stream(self, newstreamname='.'):
2122         self.finish_current_stream()
2123         self.set_current_stream_name(newstreamname)
2124
2125     def set_current_stream_name(self, newstreamname):
2126         if re.search(r'[\t\n]', newstreamname):
2127             raise errors.AssertionError(
2128                 "Manifest stream names cannot contain whitespace: '%s'" %
2129                 (newstreamname))
2130         self._current_stream_name = '.' if newstreamname=='' else newstreamname
2131
2132     def current_stream_name(self):
2133         return self._current_stream_name
2134
2135     def finish_current_stream(self):
2136         self.finish_current_file()
2137         self.flush_data()
2138         if not self._current_stream_files:
2139             pass
2140         elif self._current_stream_name is None:
2141             raise errors.AssertionError(
2142                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
2143                 (self._current_stream_length, len(self._current_stream_files)))
2144         else:
2145             if not self._current_stream_locators:
2146                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2147             self._finished_streams.append([self._current_stream_name,
2148                                            self._current_stream_locators,
2149                                            self._current_stream_files])
2150         self._current_stream_files = []
2151         self._current_stream_length = 0
2152         self._current_stream_locators = []
2153         self._current_stream_name = None
2154         self._current_file_pos = 0
2155         self._current_file_name = None
2156
2157     def finish(self):
2158         """Store the manifest in Keep and return its locator.
2159
2160         This is useful for storing manifest fragments (task outputs)
2161         temporarily in Keep during a Crunch job.
2162
2163         In other cases you should make a collection instead, by
2164         sending manifest_text() to the API server's "create
2165         collection" endpoint.
2166         """
2167         return self._my_keep().put(self.manifest_text().encode(),
2168                                    copies=self.replication)
2169
2170     def portable_data_hash(self):
2171         stripped = self.stripped_manifest().encode()
2172         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2173
2174     def manifest_text(self):
2175         self.finish_current_stream()
2176         manifest = ''
2177
2178         for stream in self._finished_streams:
2179             if not re.search(r'^\.(/.*)?$', stream[0]):
2180                 manifest += './'
2181             manifest += stream[0].replace(' ', '\\040')
2182             manifest += ' ' + ' '.join(stream[1])
2183             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2184             manifest += "\n"
2185
2186         return manifest
2187
2188     def data_locators(self):
2189         ret = []
2190         for name, locators, files in self._finished_streams:
2191             ret += locators
2192         return ret
2193
2194     def save_new(self, name=None):
2195         return self._api_client.collections().create(
2196             ensure_unique_name=True,
2197             body={
2198                 'name': name,
2199                 'manifest_text': self.manifest_text(),
2200             }).execute(num_retries=self.num_retries)
2201
2202
2203 class ResumableCollectionWriter(CollectionWriter):
2204     """CollectionWriter that can serialize internal state to disk
2205
2206     .. WARNING:: Deprecated
2207        This class is deprecated. Prefer `arvados.collection.Collection`
2208        instead.
2209     """
2210
2211     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2212                    '_current_stream_locators', '_current_stream_name',
2213                    '_current_file_name', '_current_file_pos', '_close_file',
2214                    '_data_buffer', '_dependencies', '_finished_streams',
2215                    '_queued_dirents', '_queued_trees']
2216
2217     @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2218     def __init__(self, api_client=None, **kwargs):
2219         self._dependencies = {}
2220         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2221
2222     @classmethod
2223     def from_state(cls, state, *init_args, **init_kwargs):
2224         # Try to build a new writer from scratch with the given state.
2225         # If the state is not suitable to resume (because files have changed,
2226         # been deleted, aren't predictable, etc.), raise a
2227         # StaleWriterStateError.  Otherwise, return the initialized writer.
2228         # The caller is responsible for calling writer.do_queued_work()
2229         # appropriately after it's returned.
2230         writer = cls(*init_args, **init_kwargs)
2231         for attr_name in cls.STATE_PROPS:
2232             attr_value = state[attr_name]
2233             attr_class = getattr(writer, attr_name).__class__
2234             # Coerce the value into the same type as the initial value, if
2235             # needed.
2236             if attr_class not in (type(None), attr_value.__class__):
2237                 attr_value = attr_class(attr_value)
2238             setattr(writer, attr_name, attr_value)
2239         # Check dependencies before we try to resume anything.
2240         if any(KeepLocator(ls).permission_expired()
2241                for ls in writer._current_stream_locators):
2242             raise errors.StaleWriterStateError(
2243                 "locators include expired permission hint")
2244         writer.check_dependencies()
2245         if state['_current_file'] is not None:
2246             path, pos = state['_current_file']
2247             try:
2248                 writer._queued_file = open(path, 'rb')
2249                 writer._queued_file.seek(pos)
2250             except IOError as error:
2251                 raise errors.StaleWriterStateError(
2252                     u"failed to reopen active file {}: {}".format(path, error))
2253         return writer
2254
2255     def check_dependencies(self):
2256         for path, orig_stat in self._dependencies.items():
2257             if not S_ISREG(orig_stat[ST_MODE]):
2258                 raise errors.StaleWriterStateError(u"{} not file".format(path))
2259             try:
2260                 now_stat = tuple(os.stat(path))
2261             except OSError as error:
2262                 raise errors.StaleWriterStateError(
2263                     u"failed to stat {}: {}".format(path, error))
2264             if ((not S_ISREG(now_stat[ST_MODE])) or
2265                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2266                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2267                 raise errors.StaleWriterStateError(u"{} changed".format(path))
2268
2269     def dump_state(self, copy_func=lambda x: x):
2270         state = {attr: copy_func(getattr(self, attr))
2271                  for attr in self.STATE_PROPS}
2272         if self._queued_file is None:
2273             state['_current_file'] = None
2274         else:
2275             state['_current_file'] = (os.path.realpath(self._queued_file.name),
2276                                       self._queued_file.tell())
2277         return state
2278
2279     def _queue_file(self, source, filename=None):
2280         try:
2281             src_path = os.path.realpath(source)
2282         except Exception:
2283             raise errors.AssertionError(u"{} not a file path".format(source))
2284         try:
2285             path_stat = os.stat(src_path)
2286         except OSError as stat_error:
2287             path_stat = None
2288         super(ResumableCollectionWriter, self)._queue_file(source, filename)
2289         fd_stat = os.fstat(self._queued_file.fileno())
2290         if not S_ISREG(fd_stat.st_mode):
2291             # We won't be able to resume from this cache anyway, so don't
2292             # worry about further checks.
2293             self._dependencies[source] = tuple(fd_stat)
2294         elif path_stat is None:
2295             raise errors.AssertionError(
2296                 u"could not stat {}: {}".format(source, stat_error))
2297         elif path_stat.st_ino != fd_stat.st_ino:
2298             raise errors.AssertionError(
2299                 u"{} changed between open and stat calls".format(source))
2300         else:
2301             self._dependencies[src_path] = tuple(fd_stat)
2302
2303     def write(self, data):
2304         if self._queued_file is None:
2305             raise errors.AssertionError(
2306                 "resumable writer can't accept unsourced data")
2307         return super(ResumableCollectionWriter, self).write(data)