21639: Tell the kernel to map in whole blocks with madvise
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Tools to work with Arvados collections
5
6 This module provides high-level interfaces to create, read, and update
7 Arvados collections. Most users will want to instantiate `Collection`
8 objects, and use methods like `Collection.open` and `Collection.mkdirs` to
9 read and write data in the collection. Refer to the Arvados Python SDK
10 cookbook for [an introduction to using the Collection class][cookbook].
11
12 [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
13 """
14
15 from __future__ import absolute_import
16 from future.utils import listitems, listvalues, viewkeys
17 from builtins import str
18 from past.builtins import basestring
19 from builtins import object
20 import ciso8601
21 import datetime
22 import errno
23 import functools
24 import hashlib
25 import io
26 import logging
27 import os
28 import re
29 import sys
30 import threading
31 import time
32
33 from collections import deque
34 from stat import *
35
36 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
37 from .keep import KeepLocator, KeepClient
38 from .stream import StreamReader
39 from ._normalize_stream import normalize_stream, escape
40 from ._ranges import Range, LocatorAndRange
41 from .safeapi import ThreadSafeApiCache
42 import arvados.config as config
43 import arvados.errors as errors
44 import arvados.util
45 import arvados.events as events
46 from arvados.retry import retry_method
47
48 from typing import (
49     Any,
50     Callable,
51     Dict,
52     IO,
53     Iterator,
54     List,
55     Mapping,
56     Optional,
57     Tuple,
58     Union,
59 )
60
61 if sys.version_info < (3, 8):
62     from typing_extensions import Literal
63 else:
64     from typing import Literal
65
66 _logger = logging.getLogger('arvados.collection')
67
68 ADD = "add"
69 """Argument value for `Collection` methods to represent an added item"""
70 DEL = "del"
71 """Argument value for `Collection` methods to represent a removed item"""
72 MOD = "mod"
73 """Argument value for `Collection` methods to represent a modified item"""
74 TOK = "tok"
75 """Argument value for `Collection` methods to represent an item with token differences"""
76 FILE = "file"
77 """`create_type` value for `Collection.find_or_create`"""
78 COLLECTION = "collection"
79 """`create_type` value for `Collection.find_or_create`"""
80
81 ChangeList = List[Union[
82     Tuple[Literal[ADD, DEL], str, 'Collection'],
83     Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'],
84 ]]
85 ChangeType = Literal[ADD, DEL, MOD, TOK]
86 CollectionItem = Union[ArvadosFile, 'Collection']
87 ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object]
88 CreateType = Literal[COLLECTION, FILE]
89 Properties = Dict[str, Any]
90 StorageClasses = List[str]
91
92 class CollectionBase(object):
93     """Abstract base class for Collection classes
94
95     .. ATTENTION:: Internal
96        This class is meant to be used by other parts of the SDK. User code
97        should instantiate or subclass `Collection` or one of its subclasses
98        directly.
99     """
100
101     def __enter__(self):
102         """Enter a context block with this collection instance"""
103         return self
104
105     def __exit__(self, exc_type, exc_value, traceback):
106         """Exit a context block with this collection instance"""
107         pass
108
109     def _my_keep(self):
110         if self._keep_client is None:
111             self._keep_client = KeepClient(api_client=self._api_client,
112                                            num_retries=self.num_retries)
113         return self._keep_client
114
115     def stripped_manifest(self) -> str:
116         """Create a copy of the collection manifest with only size hints
117
118         This method returns a string with the current collection's manifest
119         text with all non-portable locator hints like permission hints and
120         remote cluster hints removed. The only hints in the returned manifest
121         will be size hints.
122         """
123         raw = self.manifest_text()
124         clean = []
125         for line in raw.split("\n"):
126             fields = line.split()
127             if fields:
128                 clean_fields = fields[:1] + [
129                     (re.sub(r'\+[^\d][^\+]*', '', x)
130                      if re.match(arvados.util.keep_locator_pattern, x)
131                      else x)
132                     for x in fields[1:]]
133                 clean += [' '.join(clean_fields), "\n"]
134         return ''.join(clean)
135
136
137 class _WriterFile(_FileLikeObjectBase):
138     def __init__(self, coll_writer, name):
139         super(_WriterFile, self).__init__(name, 'wb')
140         self.dest = coll_writer
141
142     def close(self):
143         super(_WriterFile, self).close()
144         self.dest.finish_current_file()
145
146     @_FileLikeObjectBase._before_close
147     def write(self, data):
148         self.dest.write(data)
149
150     @_FileLikeObjectBase._before_close
151     def writelines(self, seq):
152         for data in seq:
153             self.write(data)
154
155     @_FileLikeObjectBase._before_close
156     def flush(self):
157         self.dest.flush_data()
158
159
160 class RichCollectionBase(CollectionBase):
161     """Base class for Collection classes
162
163     .. ATTENTION:: Internal
164        This class is meant to be used by other parts of the SDK. User code
165        should instantiate or subclass `Collection` or one of its subclasses
166        directly.
167     """
168
169     def __init__(self, parent=None):
170         self.parent = parent
171         self._committed = False
172         self._has_remote_blocks = False
173         self._callback = None
174         self._items = {}
175
176     def _my_api(self):
177         raise NotImplementedError()
178
179     def _my_keep(self):
180         raise NotImplementedError()
181
182     def _my_block_manager(self):
183         raise NotImplementedError()
184
185     def writable(self) -> bool:
186         """Indicate whether this collection object can be modified
187
188         This method returns `False` if this object is a `CollectionReader`,
189         else `True`.
190         """
191         raise NotImplementedError()
192
193     def root_collection(self) -> 'Collection':
194         """Get this collection's root collection object
195
196         If you open a subcollection with `Collection.find`, calling this method
197         on that subcollection returns the source Collection object.
198         """
199         raise NotImplementedError()
200
201     def stream_name(self) -> str:
202         """Get the name of the manifest stream represented by this collection
203
204         If you open a subcollection with `Collection.find`, calling this method
205         on that subcollection returns the name of the stream you opened.
206         """
207         raise NotImplementedError()
208
209     @synchronized
210     def has_remote_blocks(self) -> bool:
211         """Indiciate whether the collection refers to remote data
212
213         Returns `True` if the collection manifest includes any Keep locators
214         with a remote hint (`+R`), else `False`.
215         """
216         if self._has_remote_blocks:
217             return True
218         for item in self:
219             if self[item].has_remote_blocks():
220                 return True
221         return False
222
223     @synchronized
224     def set_has_remote_blocks(self, val: bool) -> None:
225         """Cache whether this collection refers to remote blocks
226
227         .. ATTENTION:: Internal
228            This method is only meant to be used by other Collection methods.
229
230         Set this collection's cached "has remote blocks" flag to the given
231         value.
232         """
233         self._has_remote_blocks = val
234         if self.parent:
235             self.parent.set_has_remote_blocks(val)
236
237     @must_be_writable
238     @synchronized
239     def find_or_create(
240             self,
241             path: str,
242             create_type: CreateType,
243     ) -> CollectionItem:
244         """Get the item at the given path, creating it if necessary
245
246         If `path` refers to a stream in this collection, returns a
247         corresponding `Subcollection` object. If `path` refers to a file in
248         this collection, returns a corresponding
249         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
250         this collection, then this method creates a new object and returns
251         it, creating parent streams as needed. The type of object created is
252         determined by the value of `create_type`.
253
254         Arguments:
255
256         * path: str --- The path to find or create within this collection.
257
258         * create_type: Literal[COLLECTION, FILE] --- The type of object to
259           create at `path` if one does not exist. Passing `COLLECTION`
260           creates a stream and returns the corresponding
261           `Subcollection`. Passing `FILE` creates a new file and returns the
262           corresponding `arvados.arvfile.ArvadosFile`.
263         """
264         pathcomponents = path.split("/", 1)
265         if pathcomponents[0]:
266             item = self._items.get(pathcomponents[0])
267             if len(pathcomponents) == 1:
268                 if item is None:
269                     # create new file
270                     if create_type == COLLECTION:
271                         item = Subcollection(self, pathcomponents[0])
272                     else:
273                         item = ArvadosFile(self, pathcomponents[0])
274                     self._items[pathcomponents[0]] = item
275                     self.set_committed(False)
276                     self.notify(ADD, self, pathcomponents[0], item)
277                 return item
278             else:
279                 if item is None:
280                     # create new collection
281                     item = Subcollection(self, pathcomponents[0])
282                     self._items[pathcomponents[0]] = item
283                     self.set_committed(False)
284                     self.notify(ADD, self, pathcomponents[0], item)
285                 if isinstance(item, RichCollectionBase):
286                     return item.find_or_create(pathcomponents[1], create_type)
287                 else:
288                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
289         else:
290             return self
291
292     @synchronized
293     def find(self, path: str) -> CollectionItem:
294         """Get the item at the given path
295
296         If `path` refers to a stream in this collection, returns a
297         corresponding `Subcollection` object. If `path` refers to a file in
298         this collection, returns a corresponding
299         `arvados.arvfile.ArvadosFile` object. If `path` does not exist in
300         this collection, then this method raises `NotADirectoryError`.
301
302         Arguments:
303
304         * path: str --- The path to find or create within this collection.
305         """
306         if not path:
307             raise errors.ArgumentError("Parameter 'path' is empty.")
308
309         pathcomponents = path.split("/", 1)
310         if pathcomponents[0] == '':
311             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
312
313         item = self._items.get(pathcomponents[0])
314         if item is None:
315             return None
316         elif len(pathcomponents) == 1:
317             return item
318         else:
319             if isinstance(item, RichCollectionBase):
320                 if pathcomponents[1]:
321                     return item.find(pathcomponents[1])
322                 else:
323                     return item
324             else:
325                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
326
327     @synchronized
328     def mkdirs(self, path: str) -> 'Subcollection':
329         """Create and return a subcollection at `path`
330
331         If `path` exists within this collection, raises `FileExistsError`.
332         Otherwise, creates a stream at that path and returns the
333         corresponding `Subcollection`.
334         """
335         if self.find(path) != None:
336             raise IOError(errno.EEXIST, "Directory or file exists", path)
337
338         return self.find_or_create(path, COLLECTION)
339
340     def open(
341             self,
342             path: str,
343             mode: str="r",
344             encoding: Optional[str]=None,
345     ) -> IO:
346         """Open a file-like object within the collection
347
348         This method returns a file-like object that can read and/or write the
349         file located at `path` within the collection. If you attempt to write
350         a `path` that does not exist, the file is created with `find_or_create`.
351         If the file cannot be opened for any other reason, this method raises
352         `OSError` with an appropriate errno.
353
354         Arguments:
355
356         * path: str --- The path of the file to open within this collection
357
358         * mode: str --- The mode to open this file. Supports all the same
359           values as `builtins.open`.
360
361         * encoding: str | None --- The text encoding of the file. Only used
362           when the file is opened in text mode. The default is
363           platform-dependent.
364         """
365         if not re.search(r'^[rwa][bt]?\+?$', mode):
366             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
367
368         if mode[0] == 'r' and '+' not in mode:
369             fclass = ArvadosFileReader
370             arvfile = self.find(path)
371         elif not self.writable():
372             raise IOError(errno.EROFS, "Collection is read only")
373         else:
374             fclass = ArvadosFileWriter
375             arvfile = self.find_or_create(path, FILE)
376
377         if arvfile is None:
378             raise IOError(errno.ENOENT, "File not found", path)
379         if not isinstance(arvfile, ArvadosFile):
380             raise IOError(errno.EISDIR, "Is a directory", path)
381
382         if mode[0] == 'w':
383             arvfile.truncate(0)
384
385         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
386         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
387         if 'b' not in mode:
388             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
389             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
390         return f
391
392     def modified(self) -> bool:
393         """Indicate whether this collection has an API server record
394
395         Returns `False` if this collection corresponds to a record loaded from
396         the API server, `True` otherwise.
397         """
398         return not self.committed()
399
400     @synchronized
401     def committed(self):
402         """Indicate whether this collection has an API server record
403
404         Returns `True` if this collection corresponds to a record loaded from
405         the API server, `False` otherwise.
406         """
407         return self._committed
408
409     @synchronized
410     def set_committed(self, value: bool=True):
411         """Cache whether this collection has an API server record
412
413         .. ATTENTION:: Internal
414            This method is only meant to be used by other Collection methods.
415
416         Set this collection's cached "committed" flag to the given
417         value and propagates it as needed.
418         """
419         if value == self._committed:
420             return
421         if value:
422             for k,v in listitems(self._items):
423                 v.set_committed(True)
424             self._committed = True
425         else:
426             self._committed = False
427             if self.parent is not None:
428                 self.parent.set_committed(False)
429
430     @synchronized
431     def __iter__(self) -> Iterator[str]:
432         """Iterate names of streams and files in this collection
433
434         This method does not recurse. It only iterates the contents of this
435         collection's corresponding stream.
436         """
437         return iter(viewkeys(self._items))
438
439     @synchronized
440     def __getitem__(self, k: str) -> CollectionItem:
441         """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection
442
443         This method does not recurse. If you want to search a path, use
444         `RichCollectionBase.find` instead.
445         """
446         return self._items[k]
447
448     @synchronized
449     def __contains__(self, k: str) -> bool:
450         """Indicate whether this collection has an item with this name
451
452         This method does not recurse. It you want to check a path, use
453         `RichCollectionBase.exists` instead.
454         """
455         return k in self._items
456
457     @synchronized
458     def __len__(self):
459         """Get the number of items directly contained in this collection
460
461         This method does not recurse. It only counts the streams and files
462         in this collection's corresponding stream.
463         """
464         return len(self._items)
465
466     @must_be_writable
467     @synchronized
468     def __delitem__(self, p: str) -> None:
469         """Delete an item from this collection's stream
470
471         This method does not recurse. If you want to remove an item by a
472         path, use `RichCollectionBase.remove` instead.
473         """
474         del self._items[p]
475         self.set_committed(False)
476         self.notify(DEL, self, p, None)
477
478     @synchronized
479     def keys(self) -> Iterator[str]:
480         """Iterate names of streams and files in this collection
481
482         This method does not recurse. It only iterates the contents of this
483         collection's corresponding stream.
484         """
485         return self._items.keys()
486
487     @synchronized
488     def values(self) -> List[CollectionItem]:
489         """Get a list of objects in this collection's stream
490
491         The return value includes a `Subcollection` for every stream, and an
492         `arvados.arvfile.ArvadosFile` for every file, directly within this
493         collection's stream.  This method does not recurse.
494         """
495         return listvalues(self._items)
496
497     @synchronized
498     def items(self) -> List[Tuple[str, CollectionItem]]:
499         """Get a list of `(name, object)` tuples from this collection's stream
500
501         The return value includes a `Subcollection` for every stream, and an
502         `arvados.arvfile.ArvadosFile` for every file, directly within this
503         collection's stream.  This method does not recurse.
504         """
505         return listitems(self._items)
506
507     def exists(self, path: str) -> bool:
508         """Indicate whether this collection includes an item at `path`
509
510         This method returns `True` if `path` refers to a stream or file within
511         this collection, else `False`.
512
513         Arguments:
514
515         * path: str --- The path to check for existence within this collection
516         """
517         return self.find(path) is not None
518
519     @must_be_writable
520     @synchronized
521     def remove(self, path: str, recursive: bool=False) -> None:
522         """Remove the file or stream at `path`
523
524         Arguments:
525
526         * path: str --- The path of the item to remove from the collection
527
528         * recursive: bool --- Controls the method's behavior if `path` refers
529           to a nonempty stream. If `False` (the default), this method raises
530           `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all
531           items under the stream.
532         """
533         if not path:
534             raise errors.ArgumentError("Parameter 'path' is empty.")
535
536         pathcomponents = path.split("/", 1)
537         item = self._items.get(pathcomponents[0])
538         if item is None:
539             raise IOError(errno.ENOENT, "File not found", path)
540         if len(pathcomponents) == 1:
541             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
542                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
543             deleteditem = self._items[pathcomponents[0]]
544             del self._items[pathcomponents[0]]
545             self.set_committed(False)
546             self.notify(DEL, self, pathcomponents[0], deleteditem)
547         else:
548             item.remove(pathcomponents[1], recursive=recursive)
549
550     def _clonefrom(self, source):
551         for k,v in listitems(source):
552             self._items[k] = v.clone(self, k)
553
554     def clone(self):
555         raise NotImplementedError()
556
557     @must_be_writable
558     @synchronized
559     def add(
560             self,
561             source_obj: CollectionItem,
562             target_name: str,
563             overwrite: bool=False,
564             reparent: bool=False,
565     ) -> None:
566         """Copy or move a file or subcollection object to this collection
567
568         Arguments:
569
570         * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection
571           to add to this collection
572
573         * target_name: str --- The path inside this collection where
574           `source_obj` should be added.
575
576         * overwrite: bool --- Controls the behavior of this method when the
577           collection already contains an object at `target_name`. If `False`
578           (the default), this method will raise `FileExistsError`. If `True`,
579           the object at `target_name` will be replaced with `source_obj`.
580
581         * reparent: bool --- Controls whether this method copies or moves
582           `source_obj`. If `False` (the default), `source_obj` is copied into
583           this collection. If `True`, `source_obj` is moved into this
584           collection.
585         """
586         if target_name in self and not overwrite:
587             raise IOError(errno.EEXIST, "File already exists", target_name)
588
589         modified_from = None
590         if target_name in self:
591             modified_from = self[target_name]
592
593         # Actually make the move or copy.
594         if reparent:
595             source_obj._reparent(self, target_name)
596             item = source_obj
597         else:
598             item = source_obj.clone(self, target_name)
599
600         self._items[target_name] = item
601         self.set_committed(False)
602         if not self._has_remote_blocks and source_obj.has_remote_blocks():
603             self.set_has_remote_blocks(True)
604
605         if modified_from:
606             self.notify(MOD, self, target_name, (modified_from, item))
607         else:
608             self.notify(ADD, self, target_name, item)
609
610     def _get_src_target(self, source, target_path, source_collection, create_dest):
611         if source_collection is None:
612             source_collection = self
613
614         # Find the object
615         if isinstance(source, basestring):
616             source_obj = source_collection.find(source)
617             if source_obj is None:
618                 raise IOError(errno.ENOENT, "File not found", source)
619             sourcecomponents = source.split("/")
620         else:
621             source_obj = source
622             sourcecomponents = None
623
624         # Find parent collection the target path
625         targetcomponents = target_path.split("/")
626
627         # Determine the name to use.
628         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
629
630         if not target_name:
631             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
632
633         if create_dest:
634             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
635         else:
636             if len(targetcomponents) > 1:
637                 target_dir = self.find("/".join(targetcomponents[0:-1]))
638             else:
639                 target_dir = self
640
641         if target_dir is None:
642             raise IOError(errno.ENOENT, "Target directory not found", target_name)
643
644         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
645             target_dir = target_dir[target_name]
646             target_name = sourcecomponents[-1]
647
648         return (source_obj, target_dir, target_name)
649
650     @must_be_writable
651     @synchronized
652     def copy(
653             self,
654             source: Union[str, CollectionItem],
655             target_path: str,
656             source_collection: Optional['RichCollectionBase']=None,
657             overwrite: bool=False,
658     ) -> None:
659         """Copy a file or subcollection object to this collection
660
661         Arguments:
662
663         * source: str | arvados.arvfile.ArvadosFile |
664           arvados.collection.Subcollection --- The file or subcollection to
665           add to this collection. If `source` is a str, the object will be
666           found by looking up this path from `source_collection` (see
667           below).
668
669         * target_path: str --- The path inside this collection where the
670           source object should be added.
671
672         * source_collection: arvados.collection.Collection | None --- The
673           collection to find the source object from when `source` is a
674           path. Defaults to the current collection (`self`).
675
676         * overwrite: bool --- Controls the behavior of this method when the
677           collection already contains an object at `target_path`. If `False`
678           (the default), this method will raise `FileExistsError`. If `True`,
679           the object at `target_path` will be replaced with `source_obj`.
680         """
681         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
682         target_dir.add(source_obj, target_name, overwrite, False)
683
684     @must_be_writable
685     @synchronized
686     def rename(
687             self,
688             source: Union[str, CollectionItem],
689             target_path: str,
690             source_collection: Optional['RichCollectionBase']=None,
691             overwrite: bool=False,
692     ) -> None:
693         """Move a file or subcollection object to this collection
694
695         Arguments:
696
697         * source: str | arvados.arvfile.ArvadosFile |
698           arvados.collection.Subcollection --- The file or subcollection to
699           add to this collection. If `source` is a str, the object will be
700           found by looking up this path from `source_collection` (see
701           below).
702
703         * target_path: str --- The path inside this collection where the
704           source object should be added.
705
706         * source_collection: arvados.collection.Collection | None --- The
707           collection to find the source object from when `source` is a
708           path. Defaults to the current collection (`self`).
709
710         * overwrite: bool --- Controls the behavior of this method when the
711           collection already contains an object at `target_path`. If `False`
712           (the default), this method will raise `FileExistsError`. If `True`,
713           the object at `target_path` will be replaced with `source_obj`.
714         """
715         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
716         if not source_obj.writable():
717             raise IOError(errno.EROFS, "Source collection is read only", source)
718         target_dir.add(source_obj, target_name, overwrite, True)
719
720     def portable_manifest_text(self, stream_name: str=".") -> str:
721         """Get the portable manifest text for this collection
722
723         The portable manifest text is normalized, and does not include access
724         tokens. This method does not flush outstanding blocks to Keep.
725
726         Arguments:
727
728         * stream_name: str --- The name to use for this collection's stream in
729           the generated manifest. Default `'.'`.
730         """
731         return self._get_manifest_text(stream_name, True, True)
732
733     @synchronized
734     def manifest_text(
735             self,
736             stream_name: str=".",
737             strip: bool=False,
738             normalize: bool=False,
739             only_committed: bool=False,
740     ) -> str:
741         """Get the manifest text for this collection
742
743         Arguments:
744
745         * stream_name: str --- The name to use for this collection's stream in
746           the generated manifest. Default `'.'`.
747
748         * strip: bool --- Controls whether or not the returned manifest text
749           includes access tokens. If `False` (the default), the manifest text
750           will include access tokens. If `True`, the manifest text will not
751           include access tokens.
752
753         * normalize: bool --- Controls whether or not the returned manifest
754           text is normalized. Default `False`.
755
756         * only_committed: bool --- Controls whether or not this method uploads
757           pending data to Keep before building and returning the manifest text.
758           If `False` (the default), this method will finish uploading all data
759           to Keep, then return the final manifest. If `True`, this method will
760           build and return a manifest that only refers to the data that has
761           finished uploading at the time this method was called.
762         """
763         if not only_committed:
764             self._my_block_manager().commit_all()
765         return self._get_manifest_text(stream_name, strip, normalize,
766                                        only_committed=only_committed)
767
768     @synchronized
769     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
770         """Get the manifest text for this collection, sub collections and files.
771
772         :stream_name:
773           Name to use for this stream (directory)
774
775         :strip:
776           If True, remove signing tokens from block locators if present.
777           If False (default), block locators are left unchanged.
778
779         :normalize:
780           If True, always export the manifest text in normalized form
781           even if the Collection is not modified.  If False (default) and the collection
782           is not modified, return the original manifest text even if it is not
783           in normalized form.
784
785         :only_committed:
786           If True, only include blocks that were already committed to Keep.
787
788         """
789
790         if not self.committed() or self._manifest_text is None or normalize:
791             stream = {}
792             buf = []
793             sorted_keys = sorted(self.keys())
794             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
795                 # Create a stream per file `k`
796                 arvfile = self[filename]
797                 filestream = []
798                 for segment in arvfile.segments():
799                     loc = segment.locator
800                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
801                         if only_committed:
802                             continue
803                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
804                     if strip:
805                         loc = KeepLocator(loc).stripped()
806                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
807                                          segment.segment_offset, segment.range_size))
808                 stream[filename] = filestream
809             if stream:
810                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
811             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
812                 buf.append(self[dirname].manifest_text(
813                     stream_name=os.path.join(stream_name, dirname),
814                     strip=strip, normalize=True, only_committed=only_committed))
815             return "".join(buf)
816         else:
817             if strip:
818                 return self.stripped_manifest()
819             else:
820                 return self._manifest_text
821
822     @synchronized
823     def _copy_remote_blocks(self, remote_blocks={}):
824         """Scan through the entire collection and ask Keep to copy remote blocks.
825
826         When accessing a remote collection, blocks will have a remote signature
827         (+R instead of +A). Collect these signatures and request Keep to copy the
828         blocks to the local cluster, returning local (+A) signatures.
829
830         :remote_blocks:
831           Shared cache of remote to local block mappings. This is used to avoid
832           doing extra work when blocks are shared by more than one file in
833           different subdirectories.
834
835         """
836         for item in self:
837             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
838         return remote_blocks
839
840     @synchronized
841     def diff(
842             self,
843             end_collection: 'RichCollectionBase',
844             prefix: str=".",
845             holding_collection: Optional['Collection']=None,
846     ) -> ChangeList:
847         """Build a list of differences between this collection and another
848
849         Arguments:
850
851         * end_collection: arvados.collection.RichCollectionBase --- A
852           collection object with the desired end state. The returned diff
853           list will describe how to go from the current collection object
854           `self` to `end_collection`.
855
856         * prefix: str --- The name to use for this collection's stream in
857           the diff list. Default `'.'`.
858
859         * holding_collection: arvados.collection.Collection | None --- A
860           collection object used to hold objects for the returned diff
861           list. By default, a new empty collection is created.
862         """
863         changes = []
864         if holding_collection is None:
865             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
866         for k in self:
867             if k not in end_collection:
868                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
869         for k in end_collection:
870             if k in self:
871                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
872                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
873                 elif end_collection[k] != self[k]:
874                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
875                 else:
876                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
877             else:
878                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
879         return changes
880
881     @must_be_writable
882     @synchronized
883     def apply(self, changes: ChangeList) -> None:
884         """Apply a list of changes from to this collection
885
886         This method takes a list of changes generated by
887         `RichCollectionBase.diff` and applies it to this
888         collection. Afterward, the state of this collection object will
889         match the state of `end_collection` passed to `diff`. If a change
890         conflicts with a local change, it will be saved to an alternate path
891         indicating the conflict.
892
893         Arguments:
894
895         * changes: arvados.collection.ChangeList --- The list of differences
896           generated by `RichCollectionBase.diff`.
897         """
898         if changes:
899             self.set_committed(False)
900         for change in changes:
901             event_type = change[0]
902             path = change[1]
903             initial = change[2]
904             local = self.find(path)
905             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
906                                                                     time.gmtime()))
907             if event_type == ADD:
908                 if local is None:
909                     # No local file at path, safe to copy over new file
910                     self.copy(initial, path)
911                 elif local is not None and local != initial:
912                     # There is already local file and it is different:
913                     # save change to conflict file.
914                     self.copy(initial, conflictpath)
915             elif event_type == MOD or event_type == TOK:
916                 final = change[3]
917                 if local == initial:
918                     # Local matches the "initial" item so it has not
919                     # changed locally and is safe to update.
920                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
921                         # Replace contents of local file with new contents
922                         local.replace_contents(final)
923                     else:
924                         # Overwrite path with new item; this can happen if
925                         # path was a file and is now a collection or vice versa
926                         self.copy(final, path, overwrite=True)
927                 else:
928                     # Local is missing (presumably deleted) or local doesn't
929                     # match the "start" value, so save change to conflict file
930                     self.copy(final, conflictpath)
931             elif event_type == DEL:
932                 if local == initial:
933                     # Local item matches "initial" value, so it is safe to remove.
934                     self.remove(path, recursive=True)
935                 # else, the file is modified or already removed, in either
936                 # case we don't want to try to remove it.
937
938     def portable_data_hash(self) -> str:
939         """Get the portable data hash for this collection's manifest"""
940         if self._manifest_locator and self.committed():
941             # If the collection is already saved on the API server, and it's committed
942             # then return API server's PDH response.
943             return self._portable_data_hash
944         else:
945             stripped = self.portable_manifest_text().encode()
946             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
947
948     @synchronized
949     def subscribe(self, callback: ChangeCallback) -> None:
950         """Set a notify callback for changes to this collection
951
952         Arguments:
953
954         * callback: arvados.collection.ChangeCallback --- The callable to
955           call each time the collection is changed.
956         """
957         if self._callback is None:
958             self._callback = callback
959         else:
960             raise errors.ArgumentError("A callback is already set on this collection.")
961
962     @synchronized
963     def unsubscribe(self) -> None:
964         """Remove any notify callback set for changes to this collection"""
965         if self._callback is not None:
966             self._callback = None
967
968     @synchronized
969     def notify(
970             self,
971             event: ChangeType,
972             collection: 'RichCollectionBase',
973             name: str,
974             item: CollectionItem,
975     ) -> None:
976         """Notify any subscribed callback about a change to this collection
977
978         .. ATTENTION:: Internal
979            This method is only meant to be used by other Collection methods.
980
981         If a callback has been registered with `RichCollectionBase.subscribe`,
982         it will be called with information about a change to this collection.
983         Then this notification will be propagated to this collection's root.
984
985         Arguments:
986
987         * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to
988           the collection.
989
990         * collection: arvados.collection.RichCollectionBase --- The
991           collection that was modified.
992
993         * name: str --- The name of the file or stream within `collection` that
994           was modified.
995
996         * item: arvados.arvfile.ArvadosFile |
997           arvados.collection.Subcollection --- The new contents at `name`
998           within `collection`.
999         """
1000         if self._callback:
1001             self._callback(event, collection, name, item)
1002         self.root_collection().notify(event, collection, name, item)
1003
1004     @synchronized
1005     def __eq__(self, other: Any) -> bool:
1006         """Indicate whether this collection object is equal to another"""
1007         if other is self:
1008             return True
1009         if not isinstance(other, RichCollectionBase):
1010             return False
1011         if len(self._items) != len(other):
1012             return False
1013         for k in self._items:
1014             if k not in other:
1015                 return False
1016             if self._items[k] != other[k]:
1017                 return False
1018         return True
1019
1020     def __ne__(self, other: Any) -> bool:
1021         """Indicate whether this collection object is not equal to another"""
1022         return not self.__eq__(other)
1023
1024     @synchronized
1025     def flush(self) -> None:
1026         """Upload any pending data to Keep"""
1027         for e in listvalues(self):
1028             e.flush()
1029
1030
1031 class Collection(RichCollectionBase):
1032     """Read and manipulate an Arvados collection
1033
1034     This class provides a high-level interface to create, read, and update
1035     Arvados collections and their contents. Refer to the Arvados Python SDK
1036     cookbook for [an introduction to using the Collection class][cookbook].
1037
1038     [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections
1039     """
1040
1041     def __init__(self, manifest_locator_or_text: Optional[str]=None,
1042                  api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None,
1043                  keep_client: Optional['arvados.keep.KeepClient']=None,
1044                  num_retries: int=10,
1045                  parent: Optional['Collection']=None,
1046                  apiconfig: Optional[Mapping[str, str]]=None,
1047                  block_manager: Optional['arvados.arvfile._BlockManager']=None,
1048                  replication_desired: Optional[int]=None,
1049                  storage_classes_desired: Optional[List[str]]=None,
1050                  put_threads: Optional[int]=None):
1051         """Initialize a Collection object
1052
1053         Arguments:
1054
1055         * manifest_locator_or_text: str | None --- This string can contain a
1056           collection manifest text, portable data hash, or UUID. When given a
1057           portable data hash or UUID, this instance will load a collection
1058           record from the API server. Otherwise, this instance will represent a
1059           new collection without an API server record. The default value `None`
1060           instantiates a new collection with an empty manifest.
1061
1062         * api_client: arvados.api_resources.ArvadosAPIClient | None --- The
1063           Arvados API client object this instance uses to make requests. If
1064           none is given, this instance creates its own client using the
1065           settings from `apiconfig` (see below). If your client instantiates
1066           many Collection objects, you can help limit memory utilization by
1067           calling `arvados.api.api` to construct an
1068           `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client`
1069           for every Collection.
1070
1071         * keep_client: arvados.keep.KeepClient | None --- The Keep client
1072           object this instance uses to make requests. If none is given, this
1073           instance creates its own client using its `api_client`.
1074
1075         * num_retries: int --- The number of times that client requests are
1076           retried. Default 10.
1077
1078         * parent: arvados.collection.Collection | None --- The parent Collection
1079           object of this instance, if any. This argument is primarily used by
1080           other Collection methods; user client code shouldn't need to use it.
1081
1082         * apiconfig: Mapping[str, str] | None --- A mapping with entries for
1083           `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally
1084           `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the
1085           Collection object constructs one from these settings. If no
1086           mapping is provided, calls `arvados.config.settings` to get these
1087           parameters from user configuration.
1088
1089         * block_manager: arvados.arvfile._BlockManager | None --- The
1090           _BlockManager object used by this instance to coordinate reading
1091           and writing Keep data blocks. If none is given, this instance
1092           constructs its own. This argument is primarily used by other
1093           Collection methods; user client code shouldn't need to use it.
1094
1095         * replication_desired: int | None --- This controls both the value of
1096           the `replication_desired` field on API collection records saved by
1097           this class, as well as the number of Keep services that the object
1098           writes new data blocks to. If none is given, uses the default value
1099           configured for the cluster.
1100
1101         * storage_classes_desired: list[str] | None --- This controls both
1102           the value of the `storage_classes_desired` field on API collection
1103           records saved by this class, as well as selecting which specific
1104           Keep services the object writes new data blocks to. If none is
1105           given, defaults to an empty list.
1106
1107         * put_threads: int | None --- The number of threads to run
1108           simultaneously to upload data blocks to Keep. This value is used when
1109           building a new `block_manager`. It is unused when a `block_manager`
1110           is provided.
1111         """
1112
1113         if storage_classes_desired and type(storage_classes_desired) is not list:
1114             raise errors.ArgumentError("storage_classes_desired must be list type.")
1115
1116         super(Collection, self).__init__(parent)
1117         self._api_client = api_client
1118         self._keep_client = keep_client
1119
1120         # Use the keep client from ThreadSafeApiCache
1121         if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1122             self._keep_client = self._api_client.keep
1123
1124         self._block_manager = block_manager
1125         self.replication_desired = replication_desired
1126         self._storage_classes_desired = storage_classes_desired
1127         self.put_threads = put_threads
1128
1129         if apiconfig:
1130             self._config = apiconfig
1131         else:
1132             self._config = config.settings()
1133
1134         self.num_retries = num_retries
1135         self._manifest_locator = None
1136         self._manifest_text = None
1137         self._portable_data_hash = None
1138         self._api_response = None
1139         self._past_versions = set()
1140
1141         self.lock = threading.RLock()
1142         self.events = None
1143
1144         if manifest_locator_or_text:
1145             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1146                 self._manifest_locator = manifest_locator_or_text
1147             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1148                 self._manifest_locator = manifest_locator_or_text
1149                 if not self._has_local_collection_uuid():
1150                     self._has_remote_blocks = True
1151             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1152                 self._manifest_text = manifest_locator_or_text
1153                 if '+R' in self._manifest_text:
1154                     self._has_remote_blocks = True
1155             else:
1156                 raise errors.ArgumentError(
1157                     "Argument to CollectionReader is not a manifest or a collection UUID")
1158
1159             try:
1160                 self._populate()
1161             except errors.SyntaxError as e:
1162                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1163
1164     def storage_classes_desired(self) -> List[str]:
1165         """Get this collection's `storage_classes_desired` value"""
1166         return self._storage_classes_desired or []
1167
1168     def root_collection(self) -> 'Collection':
1169         return self
1170
1171     def get_properties(self) -> Properties:
1172         """Get this collection's properties
1173
1174         This method always returns a dict. If this collection object does not
1175         have an associated API record, or that record does not have any
1176         properties set, this method returns an empty dict.
1177         """
1178         if self._api_response and self._api_response["properties"]:
1179             return self._api_response["properties"]
1180         else:
1181             return {}
1182
1183     def get_trash_at(self) -> Optional[datetime.datetime]:
1184         """Get this collection's `trash_at` field
1185
1186         This method parses the `trash_at` field of the collection's API
1187         record and returns a datetime from it. If that field is not set, or
1188         this collection object does not have an associated API record,
1189         returns None.
1190         """
1191         if self._api_response and self._api_response["trash_at"]:
1192             try:
1193                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1194             except ValueError:
1195                 return None
1196         else:
1197             return None
1198
1199     def stream_name(self) -> str:
1200         return "."
1201
1202     def writable(self) -> bool:
1203         return True
1204
1205     @synchronized
1206     def known_past_version(
1207             self,
1208             modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]]
1209     ) -> bool:
1210         """Indicate whether an API record for this collection has been seen before
1211
1212         As this collection object loads records from the API server, it records
1213         their `modified_at` and `portable_data_hash` fields. This method accepts
1214         a 2-tuple with values for those fields, and returns `True` if the
1215         combination was previously loaded.
1216         """
1217         return modified_at_and_portable_data_hash in self._past_versions
1218
1219     @synchronized
1220     @retry_method
1221     def update(
1222             self,
1223             other: Optional['Collection']=None,
1224             num_retries: Optional[int]=None,
1225     ) -> None:
1226         """Merge another collection's contents into this one
1227
1228         This method compares the manifest of this collection instance with
1229         another, then updates this instance's manifest with changes from the
1230         other, renaming files to flag conflicts where necessary.
1231
1232         When called without any arguments, this method reloads the collection's
1233         API record, and updates this instance with any changes that have
1234         appeared server-side. If this instance does not have a corresponding
1235         API record, this method raises `arvados.errors.ArgumentError`.
1236
1237         Arguments:
1238
1239         * other: arvados.collection.Collection | None --- The collection
1240           whose contents should be merged into this instance. When not
1241           provided, this method reloads this collection's API record and
1242           constructs a Collection object from it.  If this instance does not
1243           have a corresponding API record, this method raises
1244           `arvados.errors.ArgumentError`.
1245
1246         * num_retries: int | None --- The number of times to retry reloading
1247           the collection's API record from the API server. If not specified,
1248           uses the `num_retries` provided when this instance was constructed.
1249         """
1250         if other is None:
1251             if self._manifest_locator is None:
1252                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1253             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1254             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1255                 response.get("portable_data_hash") != self.portable_data_hash()):
1256                 # The record on the server is different from our current one, but we've seen it before,
1257                 # so ignore it because it's already been merged.
1258                 # However, if it's the same as our current record, proceed with the update, because we want to update
1259                 # our tokens.
1260                 return
1261             else:
1262                 self._remember_api_response(response)
1263             other = CollectionReader(response["manifest_text"])
1264         baseline = CollectionReader(self._manifest_text)
1265         self.apply(baseline.diff(other))
1266         self._manifest_text = self.manifest_text()
1267
1268     @synchronized
1269     def _my_api(self):
1270         if self._api_client is None:
1271             self._api_client = ThreadSafeApiCache(self._config, version='v1')
1272             if self._keep_client is None:
1273                 self._keep_client = self._api_client.keep
1274         return self._api_client
1275
1276     @synchronized
1277     def _my_keep(self):
1278         if self._keep_client is None:
1279             if self._api_client is None:
1280                 self._my_api()
1281             else:
1282                 self._keep_client = KeepClient(api_client=self._api_client)
1283         return self._keep_client
1284
1285     @synchronized
1286     def _my_block_manager(self):
1287         if self._block_manager is None:
1288             copies = (self.replication_desired or
1289                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1290                                                    2))
1291             self._block_manager = _BlockManager(self._my_keep(),
1292                                                 copies=copies,
1293                                                 put_threads=self.put_threads,
1294                                                 num_retries=self.num_retries,
1295                                                 storage_classes_func=self.storage_classes_desired)
1296         return self._block_manager
1297
1298     def _remember_api_response(self, response):
1299         self._api_response = response
1300         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1301
1302     def _populate_from_api_server(self):
1303         # As in KeepClient itself, we must wait until the last
1304         # possible moment to instantiate an API client, in order to
1305         # avoid tripping up clients that don't have access to an API
1306         # server.  If we do build one, make sure our Keep client uses
1307         # it.  If instantiation fails, we'll fall back to the except
1308         # clause, just like any other Collection lookup
1309         # failure. Return an exception, or None if successful.
1310         self._remember_api_response(self._my_api().collections().get(
1311             uuid=self._manifest_locator).execute(
1312                 num_retries=self.num_retries))
1313         self._manifest_text = self._api_response['manifest_text']
1314         self._portable_data_hash = self._api_response['portable_data_hash']
1315         # If not overriden via kwargs, we should try to load the
1316         # replication_desired and storage_classes_desired from the API server
1317         if self.replication_desired is None:
1318             self.replication_desired = self._api_response.get('replication_desired', None)
1319         if self._storage_classes_desired is None:
1320             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1321
1322     def _populate(self):
1323         if self._manifest_text is None:
1324             if self._manifest_locator is None:
1325                 return
1326             else:
1327                 self._populate_from_api_server()
1328         self._baseline_manifest = self._manifest_text
1329         self._import_manifest(self._manifest_text)
1330
1331     def _has_collection_uuid(self):
1332         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1333
1334     def _has_local_collection_uuid(self):
1335         return self._has_collection_uuid and \
1336             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1337
1338     def __enter__(self):
1339         return self
1340
1341     def __exit__(self, exc_type, exc_value, traceback):
1342         """Exit a context with this collection instance
1343
1344         If no exception was raised inside the context block, and this
1345         collection is writable and has a corresponding API record, that
1346         record will be updated to match the state of this instance at the end
1347         of the block.
1348         """
1349         if exc_type is None:
1350             if self.writable() and self._has_collection_uuid():
1351                 self.save()
1352         self.stop_threads()
1353
1354     def stop_threads(self) -> None:
1355         """Stop background Keep upload/download threads"""
1356         if self._block_manager is not None:
1357             self._block_manager.stop_threads()
1358
1359     @synchronized
1360     def manifest_locator(self) -> Optional[str]:
1361         """Get this collection's manifest locator, if any
1362
1363         * If this collection instance is associated with an API record with a
1364           UUID, return that.
1365         * Otherwise, if this collection instance was loaded from an API record
1366           by portable data hash, return that.
1367         * Otherwise, return `None`.
1368         """
1369         return self._manifest_locator
1370
1371     @synchronized
1372     def clone(
1373             self,
1374             new_parent: Optional['Collection']=None,
1375             new_name: Optional[str]=None,
1376             readonly: bool=False,
1377             new_config: Optional[Mapping[str, str]]=None,
1378     ) -> 'Collection':
1379         """Create a Collection object with the same contents as this instance
1380
1381         This method creates a new Collection object with contents that match
1382         this instance's. The new collection will not be associated with any API
1383         record.
1384
1385         Arguments:
1386
1387         * new_parent: arvados.collection.Collection | None --- This value is
1388           passed to the new Collection's constructor as the `parent`
1389           argument.
1390
1391         * new_name: str | None --- This value is unused.
1392
1393         * readonly: bool --- If this value is true, this method constructs and
1394           returns a `CollectionReader`. Otherwise, it returns a mutable
1395           `Collection`. Default `False`.
1396
1397         * new_config: Mapping[str, str] | None --- This value is passed to the
1398           new Collection's constructor as `apiconfig`. If no value is provided,
1399           defaults to the configuration passed to this instance's constructor.
1400         """
1401         if new_config is None:
1402             new_config = self._config
1403         if readonly:
1404             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1405         else:
1406             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1407
1408         newcollection._clonefrom(self)
1409         return newcollection
1410
1411     @synchronized
1412     def api_response(self) -> Optional[Dict[str, Any]]:
1413         """Get this instance's associated API record
1414
1415         If this Collection instance has an associated API record, return it.
1416         Otherwise, return `None`.
1417         """
1418         return self._api_response
1419
1420     def find_or_create(
1421             self,
1422             path: str,
1423             create_type: CreateType,
1424     ) -> CollectionItem:
1425         if path == ".":
1426             return self
1427         else:
1428             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1429
1430     def find(self, path: str) -> CollectionItem:
1431         if path == ".":
1432             return self
1433         else:
1434             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1435
1436     def remove(self, path: str, recursive: bool=False) -> None:
1437         if path == ".":
1438             raise errors.ArgumentError("Cannot remove '.'")
1439         else:
1440             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1441
1442     @must_be_writable
1443     @synchronized
1444     @retry_method
1445     def save(
1446             self,
1447             properties: Optional[Properties]=None,
1448             storage_classes: Optional[StorageClasses]=None,
1449             trash_at: Optional[datetime.datetime]=None,
1450             merge: bool=True,
1451             num_retries: Optional[int]=None,
1452             preserve_version: bool=False,
1453     ) -> str:
1454         """Save collection to an existing API record
1455
1456         This method updates the instance's corresponding API record to match
1457         the instance's state. If this instance does not have a corresponding API
1458         record yet, raises `AssertionError`. (To create a new API record, use
1459         `Collection.save_new`.) This method returns the saved collection
1460         manifest.
1461
1462         Arguments:
1463
1464         * properties: dict[str, Any] | None --- If provided, the API record will
1465           be updated with these properties. Note this will completely replace
1466           any existing properties.
1467
1468         * storage_classes: list[str] | None --- If provided, the API record will
1469           be updated with this value in the `storage_classes_desired` field.
1470           This value will also be saved on the instance and used for any
1471           changes that follow.
1472
1473         * trash_at: datetime.datetime | None --- If provided, the API record
1474           will be updated with this value in the `trash_at` field.
1475
1476         * merge: bool --- If `True` (the default), this method will first
1477           reload this collection's API record, and merge any new contents into
1478           this instance before saving changes. See `Collection.update` for
1479           details.
1480
1481         * num_retries: int | None --- The number of times to retry reloading
1482           the collection's API record from the API server. If not specified,
1483           uses the `num_retries` provided when this instance was constructed.
1484
1485         * preserve_version: bool --- This value will be passed to directly
1486           to the underlying API call. If `True`, the Arvados API will
1487           preserve the versions of this collection both immediately before
1488           and after the update. If `True` when the API server is not
1489           configured with collection versioning, this method raises
1490           `arvados.errors.ArgumentError`.
1491         """
1492         if properties and type(properties) is not dict:
1493             raise errors.ArgumentError("properties must be dictionary type.")
1494
1495         if storage_classes and type(storage_classes) is not list:
1496             raise errors.ArgumentError("storage_classes must be list type.")
1497         if storage_classes:
1498             self._storage_classes_desired = storage_classes
1499
1500         if trash_at and type(trash_at) is not datetime.datetime:
1501             raise errors.ArgumentError("trash_at must be datetime type.")
1502
1503         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1504             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1505
1506         body={}
1507         if properties:
1508             body["properties"] = properties
1509         if self.storage_classes_desired():
1510             body["storage_classes_desired"] = self.storage_classes_desired()
1511         if trash_at:
1512             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1513             body["trash_at"] = t
1514         if preserve_version:
1515             body["preserve_version"] = preserve_version
1516
1517         if not self.committed():
1518             if self._has_remote_blocks:
1519                 # Copy any remote blocks to the local cluster.
1520                 self._copy_remote_blocks(remote_blocks={})
1521                 self._has_remote_blocks = False
1522             if not self._has_collection_uuid():
1523                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1524             elif not self._has_local_collection_uuid():
1525                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1526
1527             self._my_block_manager().commit_all()
1528
1529             if merge:
1530                 self.update()
1531
1532             text = self.manifest_text(strip=False)
1533             body['manifest_text'] = text
1534
1535             self._remember_api_response(self._my_api().collections().update(
1536                 uuid=self._manifest_locator,
1537                 body=body
1538                 ).execute(num_retries=num_retries))
1539             self._manifest_text = self._api_response["manifest_text"]
1540             self._portable_data_hash = self._api_response["portable_data_hash"]
1541             self.set_committed(True)
1542         elif body:
1543             self._remember_api_response(self._my_api().collections().update(
1544                 uuid=self._manifest_locator,
1545                 body=body
1546                 ).execute(num_retries=num_retries))
1547
1548         return self._manifest_text
1549
1550
1551     @must_be_writable
1552     @synchronized
1553     @retry_method
1554     def save_new(
1555             self,
1556             name: Optional[str]=None,
1557             create_collection_record: bool=True,
1558             owner_uuid: Optional[str]=None,
1559             properties: Optional[Properties]=None,
1560             storage_classes: Optional[StorageClasses]=None,
1561             trash_at: Optional[datetime.datetime]=None,
1562             ensure_unique_name: bool=False,
1563             num_retries: Optional[int]=None,
1564             preserve_version: bool=False,
1565     ):
1566         """Save collection to a new API record
1567
1568         This method finishes uploading new data blocks and (optionally)
1569         creates a new API collection record with the provided data. If a new
1570         record is created, this instance becomes associated with that record
1571         for future updates like `save()`. This method returns the saved
1572         collection manifest.
1573
1574         Arguments:
1575
1576         * name: str | None --- The `name` field to use on the new collection
1577           record. If not specified, a generic default name is generated.
1578
1579         * create_collection_record: bool --- If `True` (the default), creates a
1580           collection record on the API server. If `False`, the method finishes
1581           all data uploads and only returns the resulting collection manifest
1582           without sending it to the API server.
1583
1584         * owner_uuid: str | None --- The `owner_uuid` field to use on the
1585           new collection record.
1586
1587         * properties: dict[str, Any] | None --- The `properties` field to use on
1588           the new collection record.
1589
1590         * storage_classes: list[str] | None --- The
1591           `storage_classes_desired` field to use on the new collection record.
1592
1593         * trash_at: datetime.datetime | None --- The `trash_at` field to use
1594           on the new collection record.
1595
1596         * ensure_unique_name: bool --- This value is passed directly to the
1597           Arvados API when creating the collection record. If `True`, the API
1598           server may modify the submitted `name` to ensure the collection's
1599           `name`+`owner_uuid` combination is unique. If `False` (the default),
1600           if a collection already exists with this same `name`+`owner_uuid`
1601           combination, creating a collection record will raise a validation
1602           error.
1603
1604         * num_retries: int | None --- The number of times to retry reloading
1605           the collection's API record from the API server. If not specified,
1606           uses the `num_retries` provided when this instance was constructed.
1607
1608         * preserve_version: bool --- This value will be passed to directly
1609           to the underlying API call. If `True`, the Arvados API will
1610           preserve the versions of this collection both immediately before
1611           and after the update. If `True` when the API server is not
1612           configured with collection versioning, this method raises
1613           `arvados.errors.ArgumentError`.
1614         """
1615         if properties and type(properties) is not dict:
1616             raise errors.ArgumentError("properties must be dictionary type.")
1617
1618         if storage_classes and type(storage_classes) is not list:
1619             raise errors.ArgumentError("storage_classes must be list type.")
1620
1621         if trash_at and type(trash_at) is not datetime.datetime:
1622             raise errors.ArgumentError("trash_at must be datetime type.")
1623
1624         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1625             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1626
1627         if self._has_remote_blocks:
1628             # Copy any remote blocks to the local cluster.
1629             self._copy_remote_blocks(remote_blocks={})
1630             self._has_remote_blocks = False
1631
1632         if storage_classes:
1633             self._storage_classes_desired = storage_classes
1634
1635         self._my_block_manager().commit_all()
1636         text = self.manifest_text(strip=False)
1637
1638         if create_collection_record:
1639             if name is None:
1640                 name = "New collection"
1641                 ensure_unique_name = True
1642
1643             body = {"manifest_text": text,
1644                     "name": name,
1645                     "replication_desired": self.replication_desired}
1646             if owner_uuid:
1647                 body["owner_uuid"] = owner_uuid
1648             if properties:
1649                 body["properties"] = properties
1650             if self.storage_classes_desired():
1651                 body["storage_classes_desired"] = self.storage_classes_desired()
1652             if trash_at:
1653                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1654                 body["trash_at"] = t
1655             if preserve_version:
1656                 body["preserve_version"] = preserve_version
1657
1658             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1659             text = self._api_response["manifest_text"]
1660
1661             self._manifest_locator = self._api_response["uuid"]
1662             self._portable_data_hash = self._api_response["portable_data_hash"]
1663
1664             self._manifest_text = text
1665             self.set_committed(True)
1666
1667         return text
1668
1669     _token_re = re.compile(r'(\S+)(\s+|$)')
1670     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1671     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1672
1673     def _unescape_manifest_path(self, path):
1674         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1675
1676     @synchronized
1677     def _import_manifest(self, manifest_text):
1678         """Import a manifest into a `Collection`.
1679
1680         :manifest_text:
1681           The manifest text to import from.
1682
1683         """
1684         if len(self) > 0:
1685             raise ArgumentError("Can only import manifest into an empty collection")
1686
1687         STREAM_NAME = 0
1688         BLOCKS = 1
1689         SEGMENTS = 2
1690
1691         stream_name = None
1692         state = STREAM_NAME
1693
1694         for token_and_separator in self._token_re.finditer(manifest_text):
1695             tok = token_and_separator.group(1)
1696             sep = token_and_separator.group(2)
1697
1698             if state == STREAM_NAME:
1699                 # starting a new stream
1700                 stream_name = self._unescape_manifest_path(tok)
1701                 blocks = []
1702                 segments = []
1703                 streamoffset = 0
1704                 state = BLOCKS
1705                 self.find_or_create(stream_name, COLLECTION)
1706                 continue
1707
1708             if state == BLOCKS:
1709                 block_locator = self._block_re.match(tok)
1710                 if block_locator:
1711                     blocksize = int(block_locator.group(1))
1712                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1713                     streamoffset += blocksize
1714                 else:
1715                     state = SEGMENTS
1716
1717             if state == SEGMENTS:
1718                 file_segment = self._segment_re.match(tok)
1719                 if file_segment:
1720                     pos = int(file_segment.group(1))
1721                     size = int(file_segment.group(2))
1722                     name = self._unescape_manifest_path(file_segment.group(3))
1723                     if name.split('/')[-1] == '.':
1724                         # placeholder for persisting an empty directory, not a real file
1725                         if len(name) > 2:
1726                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1727                     else:
1728                         filepath = os.path.join(stream_name, name)
1729                         try:
1730                             afile = self.find_or_create(filepath, FILE)
1731                         except IOError as e:
1732                             if e.errno == errno.ENOTDIR:
1733                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1734                             else:
1735                                 raise e from None
1736                         if isinstance(afile, ArvadosFile):
1737                             afile.add_segment(blocks, pos, size)
1738                         else:
1739                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1740                 else:
1741                     # error!
1742                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1743
1744             if sep == "\n":
1745                 stream_name = None
1746                 state = STREAM_NAME
1747
1748         self.set_committed(True)
1749
1750     @synchronized
1751     def notify(
1752             self,
1753             event: ChangeType,
1754             collection: 'RichCollectionBase',
1755             name: str,
1756             item: CollectionItem,
1757     ) -> None:
1758         if self._callback:
1759             self._callback(event, collection, name, item)
1760
1761
1762 class Subcollection(RichCollectionBase):
1763     """Read and manipulate a stream/directory within an Arvados collection
1764
1765     This class represents a single stream (like a directory) within an Arvados
1766     `Collection`. It is returned by `Collection.find` and provides the same API.
1767     Operations that work on the API collection record propagate to the parent
1768     `Collection` object.
1769     """
1770
1771     def __init__(self, parent, name):
1772         super(Subcollection, self).__init__(parent)
1773         self.lock = self.root_collection().lock
1774         self._manifest_text = None
1775         self.name = name
1776         self.num_retries = parent.num_retries
1777
1778     def root_collection(self) -> 'Collection':
1779         return self.parent.root_collection()
1780
1781     def writable(self) -> bool:
1782         return self.root_collection().writable()
1783
1784     def _my_api(self):
1785         return self.root_collection()._my_api()
1786
1787     def _my_keep(self):
1788         return self.root_collection()._my_keep()
1789
1790     def _my_block_manager(self):
1791         return self.root_collection()._my_block_manager()
1792
1793     def stream_name(self) -> str:
1794         return os.path.join(self.parent.stream_name(), self.name)
1795
1796     @synchronized
1797     def clone(
1798             self,
1799             new_parent: Optional['Collection']=None,
1800             new_name: Optional[str]=None,
1801     ) -> 'Subcollection':
1802         c = Subcollection(new_parent, new_name)
1803         c._clonefrom(self)
1804         return c
1805
1806     @must_be_writable
1807     @synchronized
1808     def _reparent(self, newparent, newname):
1809         self.set_committed(False)
1810         self.flush()
1811         self.parent.remove(self.name, recursive=True)
1812         self.parent = newparent
1813         self.name = newname
1814         self.lock = self.parent.root_collection().lock
1815
1816     @synchronized
1817     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1818         """Encode empty directories by using an \056-named (".") empty file"""
1819         if len(self._items) == 0:
1820             return "%s %s 0:0:\\056\n" % (
1821                 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1822         return super(Subcollection, self)._get_manifest_text(stream_name,
1823                                                              strip, normalize,
1824                                                              only_committed)
1825
1826
1827 class CollectionReader(Collection):
1828     """Read-only `Collection` subclass
1829
1830     This class will never create or update any API collection records. You can
1831     use this class for additional code safety when you only need to read
1832     existing collections.
1833     """
1834     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1835         self._in_init = True
1836         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1837         self._in_init = False
1838
1839         # Forego any locking since it should never change once initialized.
1840         self.lock = NoopLock()
1841
1842         # Backwards compatability with old CollectionReader
1843         # all_streams() and all_files()
1844         self._streams = None
1845
1846     def writable(self) -> bool:
1847         return self._in_init
1848
1849     def _populate_streams(orig_func):
1850         @functools.wraps(orig_func)
1851         def populate_streams_wrapper(self, *args, **kwargs):
1852             # Defer populating self._streams until needed since it creates a copy of the manifest.
1853             if self._streams is None:
1854                 if self._manifest_text:
1855                     self._streams = [sline.split()
1856                                      for sline in self._manifest_text.split("\n")
1857                                      if sline]
1858                 else:
1859                     self._streams = []
1860             return orig_func(self, *args, **kwargs)
1861         return populate_streams_wrapper
1862
1863     @arvados.util._deprecated('3.0', 'Collection iteration')
1864     @_populate_streams
1865     def normalize(self):
1866         """Normalize the streams returned by `all_streams`"""
1867         streams = {}
1868         for s in self.all_streams():
1869             for f in s.all_files():
1870                 streamname, filename = split(s.name() + "/" + f.name())
1871                 if streamname not in streams:
1872                     streams[streamname] = {}
1873                 if filename not in streams[streamname]:
1874                     streams[streamname][filename] = []
1875                 for r in f.segments:
1876                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1877
1878         self._streams = [normalize_stream(s, streams[s])
1879                          for s in sorted(streams)]
1880
1881     @arvados.util._deprecated('3.0', 'Collection iteration')
1882     @_populate_streams
1883     def all_streams(self):
1884         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1885                 for s in self._streams]
1886
1887     @arvados.util._deprecated('3.0', 'Collection iteration')
1888     @_populate_streams
1889     def all_files(self):
1890         for s in self.all_streams():
1891             for f in s.all_files():
1892                 yield f
1893
1894
1895 class CollectionWriter(CollectionBase):
1896     """Create a new collection from scratch
1897
1898     .. WARNING:: Deprecated
1899        This class is deprecated. Prefer `arvados.collection.Collection`
1900        instead.
1901     """
1902
1903     @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
1904     def __init__(self, api_client=None, num_retries=0, replication=None):
1905         """Instantiate a CollectionWriter.
1906
1907         CollectionWriter lets you build a new Arvados Collection from scratch.
1908         Write files to it.  The CollectionWriter will upload data to Keep as
1909         appropriate, and provide you with the Collection manifest text when
1910         you're finished.
1911
1912         Arguments:
1913         * api_client: The API client to use to look up Collections.  If not
1914           provided, CollectionReader will build one from available Arvados
1915           configuration.
1916         * num_retries: The default number of times to retry failed
1917           service requests.  Default 0.  You may change this value
1918           after instantiation, but note those changes may not
1919           propagate to related objects like the Keep client.
1920         * replication: The number of copies of each block to store.
1921           If this argument is None or not supplied, replication is
1922           the server-provided default if available, otherwise 2.
1923         """
1924         self._api_client = api_client
1925         self.num_retries = num_retries
1926         self.replication = (2 if replication is None else replication)
1927         self._keep_client = None
1928         self._data_buffer = []
1929         self._data_buffer_len = 0
1930         self._current_stream_files = []
1931         self._current_stream_length = 0
1932         self._current_stream_locators = []
1933         self._current_stream_name = '.'
1934         self._current_file_name = None
1935         self._current_file_pos = 0
1936         self._finished_streams = []
1937         self._close_file = None
1938         self._queued_file = None
1939         self._queued_dirents = deque()
1940         self._queued_trees = deque()
1941         self._last_open = None
1942
1943     def __exit__(self, exc_type, exc_value, traceback):
1944         if exc_type is None:
1945             self.finish()
1946
1947     def do_queued_work(self):
1948         # The work queue consists of three pieces:
1949         # * _queued_file: The file object we're currently writing to the
1950         #   Collection.
1951         # * _queued_dirents: Entries under the current directory
1952         #   (_queued_trees[0]) that we want to write or recurse through.
1953         #   This may contain files from subdirectories if
1954         #   max_manifest_depth == 0 for this directory.
1955         # * _queued_trees: Directories that should be written as separate
1956         #   streams to the Collection.
1957         # This function handles the smallest piece of work currently queued
1958         # (current file, then current directory, then next directory) until
1959         # no work remains.  The _work_THING methods each do a unit of work on
1960         # THING.  _queue_THING methods add a THING to the work queue.
1961         while True:
1962             if self._queued_file:
1963                 self._work_file()
1964             elif self._queued_dirents:
1965                 self._work_dirents()
1966             elif self._queued_trees:
1967                 self._work_trees()
1968             else:
1969                 break
1970
1971     def _work_file(self):
1972         while True:
1973             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
1974             if not buf:
1975                 break
1976             self.write(buf)
1977         self.finish_current_file()
1978         if self._close_file:
1979             self._queued_file.close()
1980         self._close_file = None
1981         self._queued_file = None
1982
1983     def _work_dirents(self):
1984         path, stream_name, max_manifest_depth = self._queued_trees[0]
1985         if stream_name != self.current_stream_name():
1986             self.start_new_stream(stream_name)
1987         while self._queued_dirents:
1988             dirent = self._queued_dirents.popleft()
1989             target = os.path.join(path, dirent)
1990             if os.path.isdir(target):
1991                 self._queue_tree(target,
1992                                  os.path.join(stream_name, dirent),
1993                                  max_manifest_depth - 1)
1994             else:
1995                 self._queue_file(target, dirent)
1996                 break
1997         if not self._queued_dirents:
1998             self._queued_trees.popleft()
1999
2000     def _work_trees(self):
2001         path, stream_name, max_manifest_depth = self._queued_trees[0]
2002         d = arvados.util.listdir_recursive(
2003             path, max_depth = (None if max_manifest_depth == 0 else 0))
2004         if d:
2005             self._queue_dirents(stream_name, d)
2006         else:
2007             self._queued_trees.popleft()
2008
2009     def _queue_file(self, source, filename=None):
2010         assert (self._queued_file is None), "tried to queue more than one file"
2011         if not hasattr(source, 'read'):
2012             source = open(source, 'rb')
2013             self._close_file = True
2014         else:
2015             self._close_file = False
2016         if filename is None:
2017             filename = os.path.basename(source.name)
2018         self.start_new_file(filename)
2019         self._queued_file = source
2020
2021     def _queue_dirents(self, stream_name, dirents):
2022         assert (not self._queued_dirents), "tried to queue more than one tree"
2023         self._queued_dirents = deque(sorted(dirents))
2024
2025     def _queue_tree(self, path, stream_name, max_manifest_depth):
2026         self._queued_trees.append((path, stream_name, max_manifest_depth))
2027
2028     def write_file(self, source, filename=None):
2029         self._queue_file(source, filename)
2030         self.do_queued_work()
2031
2032     def write_directory_tree(self,
2033                              path, stream_name='.', max_manifest_depth=-1):
2034         self._queue_tree(path, stream_name, max_manifest_depth)
2035         self.do_queued_work()
2036
2037     def write(self, newdata):
2038         if isinstance(newdata, bytes):
2039             pass
2040         elif isinstance(newdata, str):
2041             newdata = newdata.encode()
2042         elif hasattr(newdata, '__iter__'):
2043             for s in newdata:
2044                 self.write(s)
2045             return
2046         self._data_buffer.append(newdata)
2047         self._data_buffer_len += len(newdata)
2048         self._current_stream_length += len(newdata)
2049         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
2050             self.flush_data()
2051
2052     def open(self, streampath, filename=None):
2053         """open(streampath[, filename]) -> file-like object
2054
2055         Pass in the path of a file to write to the Collection, either as a
2056         single string or as two separate stream name and file name arguments.
2057         This method returns a file-like object you can write to add it to the
2058         Collection.
2059
2060         You may only have one file object from the Collection open at a time,
2061         so be sure to close the object when you're done.  Using the object in
2062         a with statement makes that easy:
2063
2064             with cwriter.open('./doc/page1.txt') as outfile:
2065                 outfile.write(page1_data)
2066             with cwriter.open('./doc/page2.txt') as outfile:
2067                 outfile.write(page2_data)
2068         """
2069         if filename is None:
2070             streampath, filename = split(streampath)
2071         if self._last_open and not self._last_open.closed:
2072             raise errors.AssertionError(
2073                 u"can't open '{}' when '{}' is still open".format(
2074                     filename, self._last_open.name))
2075         if streampath != self.current_stream_name():
2076             self.start_new_stream(streampath)
2077         self.set_current_file_name(filename)
2078         self._last_open = _WriterFile(self, filename)
2079         return self._last_open
2080
2081     def flush_data(self):
2082         data_buffer = b''.join(self._data_buffer)
2083         if data_buffer:
2084             self._current_stream_locators.append(
2085                 self._my_keep().put(
2086                     data_buffer[0:config.KEEP_BLOCK_SIZE],
2087                     copies=self.replication))
2088             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
2089             self._data_buffer_len = len(self._data_buffer[0])
2090
2091     def start_new_file(self, newfilename=None):
2092         self.finish_current_file()
2093         self.set_current_file_name(newfilename)
2094
2095     def set_current_file_name(self, newfilename):
2096         if re.search(r'[\t\n]', newfilename):
2097             raise errors.AssertionError(
2098                 "Manifest filenames cannot contain whitespace: %s" %
2099                 newfilename)
2100         elif re.search(r'\x00', newfilename):
2101             raise errors.AssertionError(
2102                 "Manifest filenames cannot contain NUL characters: %s" %
2103                 newfilename)
2104         self._current_file_name = newfilename
2105
2106     def current_file_name(self):
2107         return self._current_file_name
2108
2109     def finish_current_file(self):
2110         if self._current_file_name is None:
2111             if self._current_file_pos == self._current_stream_length:
2112                 return
2113             raise errors.AssertionError(
2114                 "Cannot finish an unnamed file " +
2115                 "(%d bytes at offset %d in '%s' stream)" %
2116                 (self._current_stream_length - self._current_file_pos,
2117                  self._current_file_pos,
2118                  self._current_stream_name))
2119         self._current_stream_files.append([
2120                 self._current_file_pos,
2121                 self._current_stream_length - self._current_file_pos,
2122                 self._current_file_name])
2123         self._current_file_pos = self._current_stream_length
2124         self._current_file_name = None
2125
2126     def start_new_stream(self, newstreamname='.'):
2127         self.finish_current_stream()
2128         self.set_current_stream_name(newstreamname)
2129
2130     def set_current_stream_name(self, newstreamname):
2131         if re.search(r'[\t\n]', newstreamname):
2132             raise errors.AssertionError(
2133                 "Manifest stream names cannot contain whitespace: '%s'" %
2134                 (newstreamname))
2135         self._current_stream_name = '.' if newstreamname=='' else newstreamname
2136
2137     def current_stream_name(self):
2138         return self._current_stream_name
2139
2140     def finish_current_stream(self):
2141         self.finish_current_file()
2142         self.flush_data()
2143         if not self._current_stream_files:
2144             pass
2145         elif self._current_stream_name is None:
2146             raise errors.AssertionError(
2147                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
2148                 (self._current_stream_length, len(self._current_stream_files)))
2149         else:
2150             if not self._current_stream_locators:
2151                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
2152             self._finished_streams.append([self._current_stream_name,
2153                                            self._current_stream_locators,
2154                                            self._current_stream_files])
2155         self._current_stream_files = []
2156         self._current_stream_length = 0
2157         self._current_stream_locators = []
2158         self._current_stream_name = None
2159         self._current_file_pos = 0
2160         self._current_file_name = None
2161
2162     def finish(self):
2163         """Store the manifest in Keep and return its locator.
2164
2165         This is useful for storing manifest fragments (task outputs)
2166         temporarily in Keep during a Crunch job.
2167
2168         In other cases you should make a collection instead, by
2169         sending manifest_text() to the API server's "create
2170         collection" endpoint.
2171         """
2172         return self._my_keep().put(self.manifest_text().encode(),
2173                                    copies=self.replication)
2174
2175     def portable_data_hash(self):
2176         stripped = self.stripped_manifest().encode()
2177         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
2178
2179     def manifest_text(self):
2180         self.finish_current_stream()
2181         manifest = ''
2182
2183         for stream in self._finished_streams:
2184             if not re.search(r'^\.(/.*)?$', stream[0]):
2185                 manifest += './'
2186             manifest += stream[0].replace(' ', '\\040')
2187             manifest += ' ' + ' '.join(stream[1])
2188             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
2189             manifest += "\n"
2190
2191         return manifest
2192
2193     def data_locators(self):
2194         ret = []
2195         for name, locators, files in self._finished_streams:
2196             ret += locators
2197         return ret
2198
2199     def save_new(self, name=None):
2200         return self._api_client.collections().create(
2201             ensure_unique_name=True,
2202             body={
2203                 'name': name,
2204                 'manifest_text': self.manifest_text(),
2205             }).execute(num_retries=self.num_retries)
2206
2207
2208 class ResumableCollectionWriter(CollectionWriter):
2209     """CollectionWriter that can serialize internal state to disk
2210
2211     .. WARNING:: Deprecated
2212        This class is deprecated. Prefer `arvados.collection.Collection`
2213        instead.
2214     """
2215
2216     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
2217                    '_current_stream_locators', '_current_stream_name',
2218                    '_current_file_name', '_current_file_pos', '_close_file',
2219                    '_data_buffer', '_dependencies', '_finished_streams',
2220                    '_queued_dirents', '_queued_trees']
2221
2222     @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
2223     def __init__(self, api_client=None, **kwargs):
2224         self._dependencies = {}
2225         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
2226
2227     @classmethod
2228     def from_state(cls, state, *init_args, **init_kwargs):
2229         # Try to build a new writer from scratch with the given state.
2230         # If the state is not suitable to resume (because files have changed,
2231         # been deleted, aren't predictable, etc.), raise a
2232         # StaleWriterStateError.  Otherwise, return the initialized writer.
2233         # The caller is responsible for calling writer.do_queued_work()
2234         # appropriately after it's returned.
2235         writer = cls(*init_args, **init_kwargs)
2236         for attr_name in cls.STATE_PROPS:
2237             attr_value = state[attr_name]
2238             attr_class = getattr(writer, attr_name).__class__
2239             # Coerce the value into the same type as the initial value, if
2240             # needed.
2241             if attr_class not in (type(None), attr_value.__class__):
2242                 attr_value = attr_class(attr_value)
2243             setattr(writer, attr_name, attr_value)
2244         # Check dependencies before we try to resume anything.
2245         if any(KeepLocator(ls).permission_expired()
2246                for ls in writer._current_stream_locators):
2247             raise errors.StaleWriterStateError(
2248                 "locators include expired permission hint")
2249         writer.check_dependencies()
2250         if state['_current_file'] is not None:
2251             path, pos = state['_current_file']
2252             try:
2253                 writer._queued_file = open(path, 'rb')
2254                 writer._queued_file.seek(pos)
2255             except IOError as error:
2256                 raise errors.StaleWriterStateError(
2257                     u"failed to reopen active file {}: {}".format(path, error))
2258         return writer
2259
2260     def check_dependencies(self):
2261         for path, orig_stat in listitems(self._dependencies):
2262             if not S_ISREG(orig_stat[ST_MODE]):
2263                 raise errors.StaleWriterStateError(u"{} not file".format(path))
2264             try:
2265                 now_stat = tuple(os.stat(path))
2266             except OSError as error:
2267                 raise errors.StaleWriterStateError(
2268                     u"failed to stat {}: {}".format(path, error))
2269             if ((not S_ISREG(now_stat[ST_MODE])) or
2270                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
2271                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
2272                 raise errors.StaleWriterStateError(u"{} changed".format(path))
2273
2274     def dump_state(self, copy_func=lambda x: x):
2275         state = {attr: copy_func(getattr(self, attr))
2276                  for attr in self.STATE_PROPS}
2277         if self._queued_file is None:
2278             state['_current_file'] = None
2279         else:
2280             state['_current_file'] = (os.path.realpath(self._queued_file.name),
2281                                       self._queued_file.tell())
2282         return state
2283
2284     def _queue_file(self, source, filename=None):
2285         try:
2286             src_path = os.path.realpath(source)
2287         except Exception:
2288             raise errors.AssertionError(u"{} not a file path".format(source))
2289         try:
2290             path_stat = os.stat(src_path)
2291         except OSError as stat_error:
2292             path_stat = None
2293         super(ResumableCollectionWriter, self)._queue_file(source, filename)
2294         fd_stat = os.fstat(self._queued_file.fileno())
2295         if not S_ISREG(fd_stat.st_mode):
2296             # We won't be able to resume from this cache anyway, so don't
2297             # worry about further checks.
2298             self._dependencies[source] = tuple(fd_stat)
2299         elif path_stat is None:
2300             raise errors.AssertionError(
2301                 u"could not stat {}: {}".format(source, stat_error))
2302         elif path_stat.st_ino != fd_stat.st_ino:
2303             raise errors.AssertionError(
2304                 u"{} changed between open and stat calls".format(source))
2305         else:
2306             self._dependencies[src_path] = tuple(fd_stat)
2307
2308     def write(self, data):
2309         if self._queued_file is None:
2310             raise errors.AssertionError(
2311                 "resumable writer can't accept unsourced data")
2312         return super(ResumableCollectionWriter, self).write(data)