20937: Parallel collection copy
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import ciso8601
11 import datetime
12 import errno
13 import functools
14 import hashlib
15 import io
16 import logging
17 import os
18 import re
19 import sys
20 import threading
21 import time
22
23 from collections import deque
24 from stat import *
25
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream, escape
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
34 import arvados.util
35 import arvados.events as events
36 from arvados.retry import retry_method
37
38 _logger = logging.getLogger('arvados.collection')
39
40 class CollectionBase(object):
41     """Abstract base class for Collection classes."""
42
43     def __enter__(self):
44         return self
45
46     def __exit__(self, exc_type, exc_value, traceback):
47         pass
48
49     def _my_keep(self):
50         if self._keep_client is None:
51             self._keep_client = KeepClient(api_client=self._api_client,
52                                            num_retries=self.num_retries)
53         return self._keep_client
54
55     def stripped_manifest(self):
56         """Get the manifest with locator hints stripped.
57
58         Return the manifest for the current collection with all
59         non-portable hints (i.e., permission signatures and other
60         hints other than size hints) removed from the locators.
61         """
62         raw = self.manifest_text()
63         clean = []
64         for line in raw.split("\n"):
65             fields = line.split()
66             if fields:
67                 clean_fields = fields[:1] + [
68                     (re.sub(r'\+[^\d][^\+]*', '', x)
69                      if re.match(arvados.util.keep_locator_pattern, x)
70                      else x)
71                     for x in fields[1:]]
72                 clean += [' '.join(clean_fields), "\n"]
73         return ''.join(clean)
74
75
76 class _WriterFile(_FileLikeObjectBase):
77     def __init__(self, coll_writer, name):
78         super(_WriterFile, self).__init__(name, 'wb')
79         self.dest = coll_writer
80
81     def close(self):
82         super(_WriterFile, self).close()
83         self.dest.finish_current_file()
84
85     @_FileLikeObjectBase._before_close
86     def write(self, data):
87         self.dest.write(data)
88
89     @_FileLikeObjectBase._before_close
90     def writelines(self, seq):
91         for data in seq:
92             self.write(data)
93
94     @_FileLikeObjectBase._before_close
95     def flush(self):
96         self.dest.flush_data()
97
98
99 class CollectionWriter(CollectionBase):
100     """Deprecated, use Collection instead."""
101
102     @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
103     def __init__(self, api_client=None, num_retries=0, replication=None):
104         """Instantiate a CollectionWriter.
105
106         CollectionWriter lets you build a new Arvados Collection from scratch.
107         Write files to it.  The CollectionWriter will upload data to Keep as
108         appropriate, and provide you with the Collection manifest text when
109         you're finished.
110
111         Arguments:
112         * api_client: The API client to use to look up Collections.  If not
113           provided, CollectionReader will build one from available Arvados
114           configuration.
115         * num_retries: The default number of times to retry failed
116           service requests.  Default 0.  You may change this value
117           after instantiation, but note those changes may not
118           propagate to related objects like the Keep client.
119         * replication: The number of copies of each block to store.
120           If this argument is None or not supplied, replication is
121           the server-provided default if available, otherwise 2.
122         """
123         self._api_client = api_client
124         self.num_retries = num_retries
125         self.replication = (2 if replication is None else replication)
126         self._keep_client = None
127         self._data_buffer = []
128         self._data_buffer_len = 0
129         self._current_stream_files = []
130         self._current_stream_length = 0
131         self._current_stream_locators = []
132         self._current_stream_name = '.'
133         self._current_file_name = None
134         self._current_file_pos = 0
135         self._finished_streams = []
136         self._close_file = None
137         self._queued_file = None
138         self._queued_dirents = deque()
139         self._queued_trees = deque()
140         self._last_open = None
141
142     def __exit__(self, exc_type, exc_value, traceback):
143         if exc_type is None:
144             self.finish()
145
146     def do_queued_work(self):
147         # The work queue consists of three pieces:
148         # * _queued_file: The file object we're currently writing to the
149         #   Collection.
150         # * _queued_dirents: Entries under the current directory
151         #   (_queued_trees[0]) that we want to write or recurse through.
152         #   This may contain files from subdirectories if
153         #   max_manifest_depth == 0 for this directory.
154         # * _queued_trees: Directories that should be written as separate
155         #   streams to the Collection.
156         # This function handles the smallest piece of work currently queued
157         # (current file, then current directory, then next directory) until
158         # no work remains.  The _work_THING methods each do a unit of work on
159         # THING.  _queue_THING methods add a THING to the work queue.
160         while True:
161             if self._queued_file:
162                 self._work_file()
163             elif self._queued_dirents:
164                 self._work_dirents()
165             elif self._queued_trees:
166                 self._work_trees()
167             else:
168                 break
169
170     def _work_file(self):
171         while True:
172             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
173             if not buf:
174                 break
175             self.write(buf)
176         self.finish_current_file()
177         if self._close_file:
178             self._queued_file.close()
179         self._close_file = None
180         self._queued_file = None
181
182     def _work_dirents(self):
183         path, stream_name, max_manifest_depth = self._queued_trees[0]
184         if stream_name != self.current_stream_name():
185             self.start_new_stream(stream_name)
186         while self._queued_dirents:
187             dirent = self._queued_dirents.popleft()
188             target = os.path.join(path, dirent)
189             if os.path.isdir(target):
190                 self._queue_tree(target,
191                                  os.path.join(stream_name, dirent),
192                                  max_manifest_depth - 1)
193             else:
194                 self._queue_file(target, dirent)
195                 break
196         if not self._queued_dirents:
197             self._queued_trees.popleft()
198
199     def _work_trees(self):
200         path, stream_name, max_manifest_depth = self._queued_trees[0]
201         d = arvados.util.listdir_recursive(
202             path, max_depth = (None if max_manifest_depth == 0 else 0))
203         if d:
204             self._queue_dirents(stream_name, d)
205         else:
206             self._queued_trees.popleft()
207
208     def _queue_file(self, source, filename=None):
209         assert (self._queued_file is None), "tried to queue more than one file"
210         if not hasattr(source, 'read'):
211             source = open(source, 'rb')
212             self._close_file = True
213         else:
214             self._close_file = False
215         if filename is None:
216             filename = os.path.basename(source.name)
217         self.start_new_file(filename)
218         self._queued_file = source
219
220     def _queue_dirents(self, stream_name, dirents):
221         assert (not self._queued_dirents), "tried to queue more than one tree"
222         self._queued_dirents = deque(sorted(dirents))
223
224     def _queue_tree(self, path, stream_name, max_manifest_depth):
225         self._queued_trees.append((path, stream_name, max_manifest_depth))
226
227     def write_file(self, source, filename=None):
228         self._queue_file(source, filename)
229         self.do_queued_work()
230
231     def write_directory_tree(self,
232                              path, stream_name='.', max_manifest_depth=-1):
233         self._queue_tree(path, stream_name, max_manifest_depth)
234         self.do_queued_work()
235
236     def write(self, newdata):
237         if isinstance(newdata, bytes):
238             pass
239         elif isinstance(newdata, str):
240             newdata = newdata.encode()
241         elif hasattr(newdata, '__iter__'):
242             for s in newdata:
243                 self.write(s)
244             return
245         self._data_buffer.append(newdata)
246         self._data_buffer_len += len(newdata)
247         self._current_stream_length += len(newdata)
248         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
249             self.flush_data()
250
251     def open(self, streampath, filename=None):
252         """open(streampath[, filename]) -> file-like object
253
254         Pass in the path of a file to write to the Collection, either as a
255         single string or as two separate stream name and file name arguments.
256         This method returns a file-like object you can write to add it to the
257         Collection.
258
259         You may only have one file object from the Collection open at a time,
260         so be sure to close the object when you're done.  Using the object in
261         a with statement makes that easy::
262
263           with cwriter.open('./doc/page1.txt') as outfile:
264               outfile.write(page1_data)
265           with cwriter.open('./doc/page2.txt') as outfile:
266               outfile.write(page2_data)
267         """
268         if filename is None:
269             streampath, filename = split(streampath)
270         if self._last_open and not self._last_open.closed:
271             raise errors.AssertionError(
272                 u"can't open '{}' when '{}' is still open".format(
273                     filename, self._last_open.name))
274         if streampath != self.current_stream_name():
275             self.start_new_stream(streampath)
276         self.set_current_file_name(filename)
277         self._last_open = _WriterFile(self, filename)
278         return self._last_open
279
280     def flush_data(self):
281         data_buffer = b''.join(self._data_buffer)
282         if data_buffer:
283             self._current_stream_locators.append(
284                 self._my_keep().put(
285                     data_buffer[0:config.KEEP_BLOCK_SIZE],
286                     copies=self.replication))
287             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
288             self._data_buffer_len = len(self._data_buffer[0])
289
290     def start_new_file(self, newfilename=None):
291         self.finish_current_file()
292         self.set_current_file_name(newfilename)
293
294     def set_current_file_name(self, newfilename):
295         if re.search(r'[\t\n]', newfilename):
296             raise errors.AssertionError(
297                 "Manifest filenames cannot contain whitespace: %s" %
298                 newfilename)
299         elif re.search(r'\x00', newfilename):
300             raise errors.AssertionError(
301                 "Manifest filenames cannot contain NUL characters: %s" %
302                 newfilename)
303         self._current_file_name = newfilename
304
305     def current_file_name(self):
306         return self._current_file_name
307
308     def finish_current_file(self):
309         if self._current_file_name is None:
310             if self._current_file_pos == self._current_stream_length:
311                 return
312             raise errors.AssertionError(
313                 "Cannot finish an unnamed file " +
314                 "(%d bytes at offset %d in '%s' stream)" %
315                 (self._current_stream_length - self._current_file_pos,
316                  self._current_file_pos,
317                  self._current_stream_name))
318         self._current_stream_files.append([
319                 self._current_file_pos,
320                 self._current_stream_length - self._current_file_pos,
321                 self._current_file_name])
322         self._current_file_pos = self._current_stream_length
323         self._current_file_name = None
324
325     def start_new_stream(self, newstreamname='.'):
326         self.finish_current_stream()
327         self.set_current_stream_name(newstreamname)
328
329     def set_current_stream_name(self, newstreamname):
330         if re.search(r'[\t\n]', newstreamname):
331             raise errors.AssertionError(
332                 "Manifest stream names cannot contain whitespace: '%s'" %
333                 (newstreamname))
334         self._current_stream_name = '.' if newstreamname=='' else newstreamname
335
336     def current_stream_name(self):
337         return self._current_stream_name
338
339     def finish_current_stream(self):
340         self.finish_current_file()
341         self.flush_data()
342         if not self._current_stream_files:
343             pass
344         elif self._current_stream_name is None:
345             raise errors.AssertionError(
346                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
347                 (self._current_stream_length, len(self._current_stream_files)))
348         else:
349             if not self._current_stream_locators:
350                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
351             self._finished_streams.append([self._current_stream_name,
352                                            self._current_stream_locators,
353                                            self._current_stream_files])
354         self._current_stream_files = []
355         self._current_stream_length = 0
356         self._current_stream_locators = []
357         self._current_stream_name = None
358         self._current_file_pos = 0
359         self._current_file_name = None
360
361     def finish(self):
362         """Store the manifest in Keep and return its locator.
363
364         This is useful for storing manifest fragments (task outputs)
365         temporarily in Keep during a Crunch job.
366
367         In other cases you should make a collection instead, by
368         sending manifest_text() to the API server's "create
369         collection" endpoint.
370         """
371         return self._my_keep().put(self.manifest_text().encode(),
372                                    copies=self.replication)
373
374     def portable_data_hash(self):
375         stripped = self.stripped_manifest().encode()
376         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
377
378     def manifest_text(self):
379         self.finish_current_stream()
380         manifest = ''
381
382         for stream in self._finished_streams:
383             if not re.search(r'^\.(/.*)?$', stream[0]):
384                 manifest += './'
385             manifest += stream[0].replace(' ', '\\040')
386             manifest += ' ' + ' '.join(stream[1])
387             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
388             manifest += "\n"
389
390         return manifest
391
392     def data_locators(self):
393         ret = []
394         for name, locators, files in self._finished_streams:
395             ret += locators
396         return ret
397
398     def save_new(self, name=None):
399         return self._api_client.collections().create(
400             ensure_unique_name=True,
401             body={
402                 'name': name,
403                 'manifest_text': self.manifest_text(),
404             }).execute(num_retries=self.num_retries)
405
406
407 class ResumableCollectionWriter(CollectionWriter):
408     """Deprecated, use Collection instead."""
409
410     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
411                    '_current_stream_locators', '_current_stream_name',
412                    '_current_file_name', '_current_file_pos', '_close_file',
413                    '_data_buffer', '_dependencies', '_finished_streams',
414                    '_queued_dirents', '_queued_trees']
415
416     @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
417     def __init__(self, api_client=None, **kwargs):
418         self._dependencies = {}
419         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
420
421     @classmethod
422     def from_state(cls, state, *init_args, **init_kwargs):
423         # Try to build a new writer from scratch with the given state.
424         # If the state is not suitable to resume (because files have changed,
425         # been deleted, aren't predictable, etc.), raise a
426         # StaleWriterStateError.  Otherwise, return the initialized writer.
427         # The caller is responsible for calling writer.do_queued_work()
428         # appropriately after it's returned.
429         writer = cls(*init_args, **init_kwargs)
430         for attr_name in cls.STATE_PROPS:
431             attr_value = state[attr_name]
432             attr_class = getattr(writer, attr_name).__class__
433             # Coerce the value into the same type as the initial value, if
434             # needed.
435             if attr_class not in (type(None), attr_value.__class__):
436                 attr_value = attr_class(attr_value)
437             setattr(writer, attr_name, attr_value)
438         # Check dependencies before we try to resume anything.
439         if any(KeepLocator(ls).permission_expired()
440                for ls in writer._current_stream_locators):
441             raise errors.StaleWriterStateError(
442                 "locators include expired permission hint")
443         writer.check_dependencies()
444         if state['_current_file'] is not None:
445             path, pos = state['_current_file']
446             try:
447                 writer._queued_file = open(path, 'rb')
448                 writer._queued_file.seek(pos)
449             except IOError as error:
450                 raise errors.StaleWriterStateError(
451                     u"failed to reopen active file {}: {}".format(path, error))
452         return writer
453
454     def check_dependencies(self):
455         for path, orig_stat in listitems(self._dependencies):
456             if not S_ISREG(orig_stat[ST_MODE]):
457                 raise errors.StaleWriterStateError(u"{} not file".format(path))
458             try:
459                 now_stat = tuple(os.stat(path))
460             except OSError as error:
461                 raise errors.StaleWriterStateError(
462                     u"failed to stat {}: {}".format(path, error))
463             if ((not S_ISREG(now_stat[ST_MODE])) or
464                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
465                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
466                 raise errors.StaleWriterStateError(u"{} changed".format(path))
467
468     def dump_state(self, copy_func=lambda x: x):
469         state = {attr: copy_func(getattr(self, attr))
470                  for attr in self.STATE_PROPS}
471         if self._queued_file is None:
472             state['_current_file'] = None
473         else:
474             state['_current_file'] = (os.path.realpath(self._queued_file.name),
475                                       self._queued_file.tell())
476         return state
477
478     def _queue_file(self, source, filename=None):
479         try:
480             src_path = os.path.realpath(source)
481         except Exception:
482             raise errors.AssertionError(u"{} not a file path".format(source))
483         try:
484             path_stat = os.stat(src_path)
485         except OSError as stat_error:
486             path_stat = None
487         super(ResumableCollectionWriter, self)._queue_file(source, filename)
488         fd_stat = os.fstat(self._queued_file.fileno())
489         if not S_ISREG(fd_stat.st_mode):
490             # We won't be able to resume from this cache anyway, so don't
491             # worry about further checks.
492             self._dependencies[source] = tuple(fd_stat)
493         elif path_stat is None:
494             raise errors.AssertionError(
495                 u"could not stat {}: {}".format(source, stat_error))
496         elif path_stat.st_ino != fd_stat.st_ino:
497             raise errors.AssertionError(
498                 u"{} changed between open and stat calls".format(source))
499         else:
500             self._dependencies[src_path] = tuple(fd_stat)
501
502     def write(self, data):
503         if self._queued_file is None:
504             raise errors.AssertionError(
505                 "resumable writer can't accept unsourced data")
506         return super(ResumableCollectionWriter, self).write(data)
507
508
509 ADD = "add"
510 DEL = "del"
511 MOD = "mod"
512 TOK = "tok"
513 FILE = "file"
514 COLLECTION = "collection"
515
516 class RichCollectionBase(CollectionBase):
517     """Base class for Collections and Subcollections.
518
519     Implements the majority of functionality relating to accessing items in the
520     Collection.
521
522     """
523
524     def __init__(self, parent=None):
525         self.parent = parent
526         self._committed = False
527         self._has_remote_blocks = False
528         self._callback = None
529         self._items = {}
530
531     def _my_api(self):
532         raise NotImplementedError()
533
534     def _my_keep(self):
535         raise NotImplementedError()
536
537     def _my_block_manager(self):
538         raise NotImplementedError()
539
540     def writable(self):
541         raise NotImplementedError()
542
543     def root_collection(self):
544         raise NotImplementedError()
545
546     def notify(self, event, collection, name, item):
547         raise NotImplementedError()
548
549     def stream_name(self):
550         raise NotImplementedError()
551
552
553     @synchronized
554     def has_remote_blocks(self):
555         """Recursively check for a +R segment locator signature."""
556
557         if self._has_remote_blocks:
558             return True
559         for item in self:
560             if self[item].has_remote_blocks():
561                 return True
562         return False
563
564     @synchronized
565     def set_has_remote_blocks(self, val):
566         self._has_remote_blocks = val
567         if self.parent:
568             self.parent.set_has_remote_blocks(val)
569
570     @must_be_writable
571     @synchronized
572     def find_or_create(self, path, create_type):
573         """Recursively search the specified file path.
574
575         May return either a `Collection` or `ArvadosFile`.  If not found, will
576         create a new item at the specified path based on `create_type`.  Will
577         create intermediate subcollections needed to contain the final item in
578         the path.
579
580         :create_type:
581           One of `arvados.collection.FILE` or
582           `arvados.collection.COLLECTION`.  If the path is not found, and value
583           of create_type is FILE then create and return a new ArvadosFile for
584           the last path component.  If COLLECTION, then create and return a new
585           Collection for the last path component.
586
587         """
588
589         pathcomponents = path.split("/", 1)
590         if pathcomponents[0]:
591             item = self._items.get(pathcomponents[0])
592             if len(pathcomponents) == 1:
593                 if item is None:
594                     # create new file
595                     if create_type == COLLECTION:
596                         item = Subcollection(self, pathcomponents[0])
597                     else:
598                         item = ArvadosFile(self, pathcomponents[0])
599                     self._items[pathcomponents[0]] = item
600                     self.set_committed(False)
601                     self.notify(ADD, self, pathcomponents[0], item)
602                 return item
603             else:
604                 if item is None:
605                     # create new collection
606                     item = Subcollection(self, pathcomponents[0])
607                     self._items[pathcomponents[0]] = item
608                     self.set_committed(False)
609                     self.notify(ADD, self, pathcomponents[0], item)
610                 if isinstance(item, RichCollectionBase):
611                     return item.find_or_create(pathcomponents[1], create_type)
612                 else:
613                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
614         else:
615             return self
616
617     @synchronized
618     def find(self, path):
619         """Recursively search the specified file path.
620
621         May return either a Collection or ArvadosFile. Return None if not
622         found.
623         If path is invalid (ex: starts with '/'), an IOError exception will be
624         raised.
625
626         """
627         if not path:
628             raise errors.ArgumentError("Parameter 'path' is empty.")
629
630         pathcomponents = path.split("/", 1)
631         if pathcomponents[0] == '':
632             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
633
634         item = self._items.get(pathcomponents[0])
635         if item is None:
636             return None
637         elif len(pathcomponents) == 1:
638             return item
639         else:
640             if isinstance(item, RichCollectionBase):
641                 if pathcomponents[1]:
642                     return item.find(pathcomponents[1])
643                 else:
644                     return item
645             else:
646                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
647
648     @synchronized
649     def mkdirs(self, path):
650         """Recursive subcollection create.
651
652         Like `os.makedirs()`.  Will create intermediate subcollections needed
653         to contain the leaf subcollection path.
654
655         """
656
657         if self.find(path) != None:
658             raise IOError(errno.EEXIST, "Directory or file exists", path)
659
660         return self.find_or_create(path, COLLECTION)
661
662     def open(self, path, mode="r", encoding=None):
663         """Open a file-like object for access.
664
665         :path:
666           path to a file in the collection
667         :mode:
668           a string consisting of "r", "w", or "a", optionally followed
669           by "b" or "t", optionally followed by "+".
670           :"b":
671             binary mode: write() accepts bytes, read() returns bytes.
672           :"t":
673             text mode (default): write() accepts strings, read() returns strings.
674           :"r":
675             opens for reading
676           :"r+":
677             opens for reading and writing.  Reads/writes share a file pointer.
678           :"w", "w+":
679             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
680           :"a", "a+":
681             opens for reading and writing.  All writes are appended to
682             the end of the file.  Writing does not affect the file pointer for
683             reading.
684
685         """
686
687         if not re.search(r'^[rwa][bt]?\+?$', mode):
688             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
689
690         if mode[0] == 'r' and '+' not in mode:
691             fclass = ArvadosFileReader
692             arvfile = self.find(path)
693         elif not self.writable():
694             raise IOError(errno.EROFS, "Collection is read only")
695         else:
696             fclass = ArvadosFileWriter
697             arvfile = self.find_or_create(path, FILE)
698
699         if arvfile is None:
700             raise IOError(errno.ENOENT, "File not found", path)
701         if not isinstance(arvfile, ArvadosFile):
702             raise IOError(errno.EISDIR, "Is a directory", path)
703
704         if mode[0] == 'w':
705             arvfile.truncate(0)
706
707         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
708         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
709         if 'b' not in mode:
710             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
711             f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
712         return f
713
714     def modified(self):
715         """Determine if the collection has been modified since last commited."""
716         return not self.committed()
717
718     @synchronized
719     def committed(self):
720         """Determine if the collection has been committed to the API server."""
721         return self._committed
722
723     @synchronized
724     def set_committed(self, value=True):
725         """Recursively set committed flag.
726
727         If value is True, set committed to be True for this and all children.
728
729         If value is False, set committed to be False for this and all parents.
730         """
731         if value == self._committed:
732             return
733         if value:
734             for k,v in listitems(self._items):
735                 v.set_committed(True)
736             self._committed = True
737         else:
738             self._committed = False
739             if self.parent is not None:
740                 self.parent.set_committed(False)
741
742     @synchronized
743     def __iter__(self):
744         """Iterate over names of files and collections contained in this collection."""
745         return iter(viewkeys(self._items))
746
747     @synchronized
748     def __getitem__(self, k):
749         """Get a file or collection that is directly contained by this collection.
750
751         If you want to search a path, use `find()` instead.
752
753         """
754         return self._items[k]
755
756     @synchronized
757     def __contains__(self, k):
758         """Test if there is a file or collection a directly contained by this collection."""
759         return k in self._items
760
761     @synchronized
762     def __len__(self):
763         """Get the number of items directly contained in this collection."""
764         return len(self._items)
765
766     @must_be_writable
767     @synchronized
768     def __delitem__(self, p):
769         """Delete an item by name which is directly contained by this collection."""
770         del self._items[p]
771         self.set_committed(False)
772         self.notify(DEL, self, p, None)
773
774     @synchronized
775     def keys(self):
776         """Get a list of names of files and collections directly contained in this collection."""
777         return self._items.keys()
778
779     @synchronized
780     def values(self):
781         """Get a list of files and collection objects directly contained in this collection."""
782         return listvalues(self._items)
783
784     @synchronized
785     def items(self):
786         """Get a list of (name, object) tuples directly contained in this collection."""
787         return listitems(self._items)
788
789     def exists(self, path):
790         """Test if there is a file or collection at `path`."""
791         return self.find(path) is not None
792
793     @must_be_writable
794     @synchronized
795     def remove(self, path, recursive=False):
796         """Remove the file or subcollection (directory) at `path`.
797
798         :recursive:
799           Specify whether to remove non-empty subcollections (True), or raise an error (False).
800         """
801
802         if not path:
803             raise errors.ArgumentError("Parameter 'path' is empty.")
804
805         pathcomponents = path.split("/", 1)
806         item = self._items.get(pathcomponents[0])
807         if item is None:
808             raise IOError(errno.ENOENT, "File not found", path)
809         if len(pathcomponents) == 1:
810             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
811                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
812             deleteditem = self._items[pathcomponents[0]]
813             del self._items[pathcomponents[0]]
814             self.set_committed(False)
815             self.notify(DEL, self, pathcomponents[0], deleteditem)
816         else:
817             item.remove(pathcomponents[1], recursive=recursive)
818
819     def _clonefrom(self, source):
820         for k,v in listitems(source):
821             self._items[k] = v.clone(self, k)
822
823     def clone(self):
824         raise NotImplementedError()
825
826     @must_be_writable
827     @synchronized
828     def add(self, source_obj, target_name, overwrite=False, reparent=False):
829         """Copy or move a file or subcollection to this collection.
830
831         :source_obj:
832           An ArvadosFile, or Subcollection object
833
834         :target_name:
835           Destination item name.  If the target name already exists and is a
836           file, this will raise an error unless you specify `overwrite=True`.
837
838         :overwrite:
839           Whether to overwrite target file if it already exists.
840
841         :reparent:
842           If True, source_obj will be moved from its parent collection to this collection.
843           If False, source_obj will be copied and the parent collection will be
844           unmodified.
845
846         """
847
848         if target_name in self and not overwrite:
849             raise IOError(errno.EEXIST, "File already exists", target_name)
850
851         modified_from = None
852         if target_name in self:
853             modified_from = self[target_name]
854
855         # Actually make the move or copy.
856         if reparent:
857             source_obj._reparent(self, target_name)
858             item = source_obj
859         else:
860             item = source_obj.clone(self, target_name)
861
862         self._items[target_name] = item
863         self.set_committed(False)
864         if not self._has_remote_blocks and source_obj.has_remote_blocks():
865             self.set_has_remote_blocks(True)
866
867         if modified_from:
868             self.notify(MOD, self, target_name, (modified_from, item))
869         else:
870             self.notify(ADD, self, target_name, item)
871
872     def _get_src_target(self, source, target_path, source_collection, create_dest):
873         if source_collection is None:
874             source_collection = self
875
876         # Find the object
877         if isinstance(source, basestring):
878             source_obj = source_collection.find(source)
879             if source_obj is None:
880                 raise IOError(errno.ENOENT, "File not found", source)
881             sourcecomponents = source.split("/")
882         else:
883             source_obj = source
884             sourcecomponents = None
885
886         # Find parent collection the target path
887         targetcomponents = target_path.split("/")
888
889         # Determine the name to use.
890         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
891
892         if not target_name:
893             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
894
895         if create_dest:
896             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
897         else:
898             if len(targetcomponents) > 1:
899                 target_dir = self.find("/".join(targetcomponents[0:-1]))
900             else:
901                 target_dir = self
902
903         if target_dir is None:
904             raise IOError(errno.ENOENT, "Target directory not found", target_name)
905
906         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
907             target_dir = target_dir[target_name]
908             target_name = sourcecomponents[-1]
909
910         return (source_obj, target_dir, target_name)
911
912     @must_be_writable
913     @synchronized
914     def copy(self, source, target_path, source_collection=None, overwrite=False):
915         """Copy a file or subcollection to a new path in this collection.
916
917         :source:
918           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
919
920         :target_path:
921           Destination file or path.  If the target path already exists and is a
922           subcollection, the item will be placed inside the subcollection.  If
923           the target path already exists and is a file, this will raise an error
924           unless you specify `overwrite=True`.
925
926         :source_collection:
927           Collection to copy `source_path` from (default `self`)
928
929         :overwrite:
930           Whether to overwrite target file if it already exists.
931         """
932
933         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
934         target_dir.add(source_obj, target_name, overwrite, False)
935
936     @must_be_writable
937     @synchronized
938     def rename(self, source, target_path, source_collection=None, overwrite=False):
939         """Move a file or subcollection from `source_collection` to a new path in this collection.
940
941         :source:
942           A string with a path to source file or subcollection.
943
944         :target_path:
945           Destination file or path.  If the target path already exists and is a
946           subcollection, the item will be placed inside the subcollection.  If
947           the target path already exists and is a file, this will raise an error
948           unless you specify `overwrite=True`.
949
950         :source_collection:
951           Collection to copy `source_path` from (default `self`)
952
953         :overwrite:
954           Whether to overwrite target file if it already exists.
955         """
956
957         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
958         if not source_obj.writable():
959             raise IOError(errno.EROFS, "Source collection is read only", source)
960         target_dir.add(source_obj, target_name, overwrite, True)
961
962     def portable_manifest_text(self, stream_name="."):
963         """Get the manifest text for this collection, sub collections and files.
964
965         This method does not flush outstanding blocks to Keep.  It will return
966         a normalized manifest with access tokens stripped.
967
968         :stream_name:
969           Name to use for this stream (directory)
970
971         """
972         return self._get_manifest_text(stream_name, True, True)
973
974     @synchronized
975     def manifest_text(self, stream_name=".", strip=False, normalize=False,
976                       only_committed=False):
977         """Get the manifest text for this collection, sub collections and files.
978
979         This method will flush outstanding blocks to Keep.  By default, it will
980         not normalize an unmodified manifest or strip access tokens.
981
982         :stream_name:
983           Name to use for this stream (directory)
984
985         :strip:
986           If True, remove signing tokens from block locators if present.
987           If False (default), block locators are left unchanged.
988
989         :normalize:
990           If True, always export the manifest text in normalized form
991           even if the Collection is not modified.  If False (default) and the collection
992           is not modified, return the original manifest text even if it is not
993           in normalized form.
994
995         :only_committed:
996           If True, don't commit pending blocks.
997
998         """
999
1000         if not only_committed:
1001             self._my_block_manager().commit_all()
1002         return self._get_manifest_text(stream_name, strip, normalize,
1003                                        only_committed=only_committed)
1004
1005     @synchronized
1006     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1007         """Get the manifest text for this collection, sub collections and files.
1008
1009         :stream_name:
1010           Name to use for this stream (directory)
1011
1012         :strip:
1013           If True, remove signing tokens from block locators if present.
1014           If False (default), block locators are left unchanged.
1015
1016         :normalize:
1017           If True, always export the manifest text in normalized form
1018           even if the Collection is not modified.  If False (default) and the collection
1019           is not modified, return the original manifest text even if it is not
1020           in normalized form.
1021
1022         :only_committed:
1023           If True, only include blocks that were already committed to Keep.
1024
1025         """
1026
1027         if not self.committed() or self._manifest_text is None or normalize:
1028             stream = {}
1029             buf = []
1030             sorted_keys = sorted(self.keys())
1031             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1032                 # Create a stream per file `k`
1033                 arvfile = self[filename]
1034                 filestream = []
1035                 for segment in arvfile.segments():
1036                     loc = segment.locator
1037                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1038                         if only_committed:
1039                             continue
1040                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1041                     if strip:
1042                         loc = KeepLocator(loc).stripped()
1043                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1044                                          segment.segment_offset, segment.range_size))
1045                 stream[filename] = filestream
1046             if stream:
1047                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1048             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1049                 buf.append(self[dirname].manifest_text(
1050                     stream_name=os.path.join(stream_name, dirname),
1051                     strip=strip, normalize=True, only_committed=only_committed))
1052             return "".join(buf)
1053         else:
1054             if strip:
1055                 return self.stripped_manifest()
1056             else:
1057                 return self._manifest_text
1058
1059     @synchronized
1060     def _copy_remote_blocks(self, remote_blocks={}):
1061         """Scan through the entire collection and ask Keep to copy remote blocks.
1062
1063         When accessing a remote collection, blocks will have a remote signature
1064         (+R instead of +A). Collect these signatures and request Keep to copy the
1065         blocks to the local cluster, returning local (+A) signatures.
1066
1067         :remote_blocks:
1068           Shared cache of remote to local block mappings. This is used to avoid
1069           doing extra work when blocks are shared by more than one file in
1070           different subdirectories.
1071
1072         """
1073         for item in self:
1074             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1075         return remote_blocks
1076
1077     @synchronized
1078     def diff(self, end_collection, prefix=".", holding_collection=None):
1079         """Generate list of add/modify/delete actions.
1080
1081         When given to `apply`, will change `self` to match `end_collection`
1082
1083         """
1084         changes = []
1085         if holding_collection is None:
1086             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1087         for k in self:
1088             if k not in end_collection:
1089                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1090         for k in end_collection:
1091             if k in self:
1092                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1093                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1094                 elif end_collection[k] != self[k]:
1095                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1096                 else:
1097                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1098             else:
1099                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1100         return changes
1101
1102     @must_be_writable
1103     @synchronized
1104     def apply(self, changes):
1105         """Apply changes from `diff`.
1106
1107         If a change conflicts with a local change, it will be saved to an
1108         alternate path indicating the conflict.
1109
1110         """
1111         if changes:
1112             self.set_committed(False)
1113         for change in changes:
1114             event_type = change[0]
1115             path = change[1]
1116             initial = change[2]
1117             local = self.find(path)
1118             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1119                                                                     time.gmtime()))
1120             if event_type == ADD:
1121                 if local is None:
1122                     # No local file at path, safe to copy over new file
1123                     self.copy(initial, path)
1124                 elif local is not None and local != initial:
1125                     # There is already local file and it is different:
1126                     # save change to conflict file.
1127                     self.copy(initial, conflictpath)
1128             elif event_type == MOD or event_type == TOK:
1129                 final = change[3]
1130                 if local == initial:
1131                     # Local matches the "initial" item so it has not
1132                     # changed locally and is safe to update.
1133                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1134                         # Replace contents of local file with new contents
1135                         local.replace_contents(final)
1136                     else:
1137                         # Overwrite path with new item; this can happen if
1138                         # path was a file and is now a collection or vice versa
1139                         self.copy(final, path, overwrite=True)
1140                 else:
1141                     # Local is missing (presumably deleted) or local doesn't
1142                     # match the "start" value, so save change to conflict file
1143                     self.copy(final, conflictpath)
1144             elif event_type == DEL:
1145                 if local == initial:
1146                     # Local item matches "initial" value, so it is safe to remove.
1147                     self.remove(path, recursive=True)
1148                 # else, the file is modified or already removed, in either
1149                 # case we don't want to try to remove it.
1150
1151     def portable_data_hash(self):
1152         """Get the portable data hash for this collection's manifest."""
1153         if self._manifest_locator and self.committed():
1154             # If the collection is already saved on the API server, and it's committed
1155             # then return API server's PDH response.
1156             return self._portable_data_hash
1157         else:
1158             stripped = self.portable_manifest_text().encode()
1159             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1160
1161     @synchronized
1162     def subscribe(self, callback):
1163         if self._callback is None:
1164             self._callback = callback
1165         else:
1166             raise errors.ArgumentError("A callback is already set on this collection.")
1167
1168     @synchronized
1169     def unsubscribe(self):
1170         if self._callback is not None:
1171             self._callback = None
1172
1173     @synchronized
1174     def notify(self, event, collection, name, item):
1175         if self._callback:
1176             self._callback(event, collection, name, item)
1177         self.root_collection().notify(event, collection, name, item)
1178
1179     @synchronized
1180     def __eq__(self, other):
1181         if other is self:
1182             return True
1183         if not isinstance(other, RichCollectionBase):
1184             return False
1185         if len(self._items) != len(other):
1186             return False
1187         for k in self._items:
1188             if k not in other:
1189                 return False
1190             if self._items[k] != other[k]:
1191                 return False
1192         return True
1193
1194     def __ne__(self, other):
1195         return not self.__eq__(other)
1196
1197     @synchronized
1198     def flush(self):
1199         """Flush bufferblocks to Keep."""
1200         for e in listvalues(self):
1201             e.flush()
1202
1203
1204 class Collection(RichCollectionBase):
1205     """Represents the root of an Arvados Collection.
1206
1207     This class is threadsafe.  The root collection object, all subcollections
1208     and files are protected by a single lock (i.e. each access locks the entire
1209     collection).
1210
1211     Brief summary of
1212     useful methods:
1213
1214     :To read an existing file:
1215       `c.open("myfile", "r")`
1216
1217     :To write a new file:
1218       `c.open("myfile", "w")`
1219
1220     :To determine if a file exists:
1221       `c.find("myfile") is not None`
1222
1223     :To copy a file:
1224       `c.copy("source", "dest")`
1225
1226     :To delete a file:
1227       `c.remove("myfile")`
1228
1229     :To save to an existing collection record:
1230       `c.save()`
1231
1232     :To save a new collection record:
1233     `c.save_new()`
1234
1235     :To merge remote changes into this object:
1236       `c.update()`
1237
1238     Must be associated with an API server Collection record (during
1239     initialization, or using `save_new`) to use `save` or `update`
1240
1241     """
1242
1243     def __init__(self, manifest_locator_or_text=None,
1244                  api_client=None,
1245                  keep_client=None,
1246                  num_retries=10,
1247                  parent=None,
1248                  apiconfig=None,
1249                  block_manager=None,
1250                  replication_desired=None,
1251                  storage_classes_desired=None,
1252                  put_threads=None):
1253         """Collection constructor.
1254
1255         :manifest_locator_or_text:
1256           An Arvados collection UUID, portable data hash, raw manifest
1257           text, or (if creating an empty collection) None.
1258
1259         :parent:
1260           the parent Collection, may be None.
1261
1262         :apiconfig:
1263           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1264           Prefer this over supplying your own api_client and keep_client (except in testing).
1265           Will use default config settings if not specified.
1266
1267         :api_client:
1268           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1269
1270         :keep_client:
1271           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1272
1273         :num_retries:
1274           the number of retries for API and Keep requests.
1275
1276         :block_manager:
1277           the block manager to use.  If not specified, create one.
1278
1279         :replication_desired:
1280           How many copies should Arvados maintain. If None, API server default
1281           configuration applies. If not None, this value will also be used
1282           for determining the number of block copies being written.
1283
1284         :storage_classes_desired:
1285           A list of storage class names where to upload the data. If None,
1286           the keep client is expected to store the data into the cluster's
1287           default storage class(es).
1288
1289         """
1290
1291         if storage_classes_desired and type(storage_classes_desired) is not list:
1292             raise errors.ArgumentError("storage_classes_desired must be list type.")
1293
1294         super(Collection, self).__init__(parent)
1295         self._api_client = api_client
1296         self._keep_client = keep_client
1297
1298         # Use the keep client from ThreadSafeApiCache
1299         if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1300             self._keep_client = self._api_client.keep
1301
1302         self._block_manager = block_manager
1303         self.replication_desired = replication_desired
1304         self._storage_classes_desired = storage_classes_desired
1305         self.put_threads = put_threads
1306
1307         if apiconfig:
1308             self._config = apiconfig
1309         else:
1310             self._config = config.settings()
1311
1312         self.num_retries = num_retries
1313         self._manifest_locator = None
1314         self._manifest_text = None
1315         self._portable_data_hash = None
1316         self._api_response = None
1317         self._past_versions = set()
1318
1319         self.lock = threading.RLock()
1320         self.events = None
1321
1322         if manifest_locator_or_text:
1323             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1324                 self._manifest_locator = manifest_locator_or_text
1325             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1326                 self._manifest_locator = manifest_locator_or_text
1327                 if not self._has_local_collection_uuid():
1328                     self._has_remote_blocks = True
1329             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1330                 self._manifest_text = manifest_locator_or_text
1331                 if '+R' in self._manifest_text:
1332                     self._has_remote_blocks = True
1333             else:
1334                 raise errors.ArgumentError(
1335                     "Argument to CollectionReader is not a manifest or a collection UUID")
1336
1337             try:
1338                 self._populate()
1339             except errors.SyntaxError as e:
1340                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1341
1342     def storage_classes_desired(self):
1343         return self._storage_classes_desired or []
1344
1345     def root_collection(self):
1346         return self
1347
1348     def get_properties(self):
1349         if self._api_response and self._api_response["properties"]:
1350             return self._api_response["properties"]
1351         else:
1352             return {}
1353
1354     def get_trash_at(self):
1355         if self._api_response and self._api_response["trash_at"]:
1356             try:
1357                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1358             except ValueError:
1359                 return None
1360         else:
1361             return None
1362
1363     def stream_name(self):
1364         return "."
1365
1366     def writable(self):
1367         return True
1368
1369     @synchronized
1370     def known_past_version(self, modified_at_and_portable_data_hash):
1371         return modified_at_and_portable_data_hash in self._past_versions
1372
1373     @synchronized
1374     @retry_method
1375     def update(self, other=None, num_retries=None):
1376         """Merge the latest collection on the API server with the current collection."""
1377
1378         if other is None:
1379             if self._manifest_locator is None:
1380                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1381             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1382             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1383                 response.get("portable_data_hash") != self.portable_data_hash()):
1384                 # The record on the server is different from our current one, but we've seen it before,
1385                 # so ignore it because it's already been merged.
1386                 # However, if it's the same as our current record, proceed with the update, because we want to update
1387                 # our tokens.
1388                 return
1389             else:
1390                 self._remember_api_response(response)
1391             other = CollectionReader(response["manifest_text"])
1392         baseline = CollectionReader(self._manifest_text)
1393         self.apply(baseline.diff(other))
1394         self._manifest_text = self.manifest_text()
1395
1396     @synchronized
1397     def _my_api(self):
1398         if self._api_client is None:
1399             self._api_client = ThreadSafeApiCache(self._config, version='v1')
1400             if self._keep_client is None:
1401                 self._keep_client = self._api_client.keep
1402         return self._api_client
1403
1404     @synchronized
1405     def _my_keep(self):
1406         if self._keep_client is None:
1407             if self._api_client is None:
1408                 self._my_api()
1409             else:
1410                 self._keep_client = KeepClient(api_client=self._api_client)
1411         return self._keep_client
1412
1413     @synchronized
1414     def _my_block_manager(self):
1415         if self._block_manager is None:
1416             copies = (self.replication_desired or
1417                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1418                                                    2))
1419             self._block_manager = _BlockManager(self._my_keep(),
1420                                                 copies=copies,
1421                                                 put_threads=self.put_threads,
1422                                                 num_retries=self.num_retries,
1423                                                 storage_classes_func=self.storage_classes_desired)
1424         return self._block_manager
1425
1426     def _remember_api_response(self, response):
1427         self._api_response = response
1428         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1429
1430     def _populate_from_api_server(self):
1431         # As in KeepClient itself, we must wait until the last
1432         # possible moment to instantiate an API client, in order to
1433         # avoid tripping up clients that don't have access to an API
1434         # server.  If we do build one, make sure our Keep client uses
1435         # it.  If instantiation fails, we'll fall back to the except
1436         # clause, just like any other Collection lookup
1437         # failure. Return an exception, or None if successful.
1438         self._remember_api_response(self._my_api().collections().get(
1439             uuid=self._manifest_locator).execute(
1440                 num_retries=self.num_retries))
1441         self._manifest_text = self._api_response['manifest_text']
1442         self._portable_data_hash = self._api_response['portable_data_hash']
1443         # If not overriden via kwargs, we should try to load the
1444         # replication_desired and storage_classes_desired from the API server
1445         if self.replication_desired is None:
1446             self.replication_desired = self._api_response.get('replication_desired', None)
1447         if self._storage_classes_desired is None:
1448             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1449
1450     def _populate(self):
1451         if self._manifest_text is None:
1452             if self._manifest_locator is None:
1453                 return
1454             else:
1455                 self._populate_from_api_server()
1456         self._baseline_manifest = self._manifest_text
1457         self._import_manifest(self._manifest_text)
1458
1459     def _has_collection_uuid(self):
1460         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1461
1462     def _has_local_collection_uuid(self):
1463         return self._has_collection_uuid and \
1464             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1465
1466     def __enter__(self):
1467         return self
1468
1469     def __exit__(self, exc_type, exc_value, traceback):
1470         """Support scoped auto-commit in a with: block."""
1471         if exc_type is None:
1472             if self.writable() and self._has_collection_uuid():
1473                 self.save()
1474         self.stop_threads()
1475
1476     def stop_threads(self):
1477         if self._block_manager is not None:
1478             self._block_manager.stop_threads()
1479
1480     @synchronized
1481     def manifest_locator(self):
1482         """Get the manifest locator, if any.
1483
1484         The manifest locator will be set when the collection is loaded from an
1485         API server record or the portable data hash of a manifest.
1486
1487         The manifest locator will be None if the collection is newly created or
1488         was created directly from manifest text.  The method `save_new()` will
1489         assign a manifest locator.
1490
1491         """
1492         return self._manifest_locator
1493
1494     @synchronized
1495     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1496         if new_config is None:
1497             new_config = self._config
1498         if readonly:
1499             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1500         else:
1501             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1502
1503         newcollection._clonefrom(self)
1504         return newcollection
1505
1506     @synchronized
1507     def api_response(self):
1508         """Returns information about this Collection fetched from the API server.
1509
1510         If the Collection exists in Keep but not the API server, currently
1511         returns None.  Future versions may provide a synthetic response.
1512
1513         """
1514         return self._api_response
1515
1516     def find_or_create(self, path, create_type):
1517         """See `RichCollectionBase.find_or_create`"""
1518         if path == ".":
1519             return self
1520         else:
1521             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1522
1523     def find(self, path):
1524         """See `RichCollectionBase.find`"""
1525         if path == ".":
1526             return self
1527         else:
1528             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1529
1530     def remove(self, path, recursive=False):
1531         """See `RichCollectionBase.remove`"""
1532         if path == ".":
1533             raise errors.ArgumentError("Cannot remove '.'")
1534         else:
1535             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1536
1537     @must_be_writable
1538     @synchronized
1539     @retry_method
1540     def save(self,
1541              properties=None,
1542              storage_classes=None,
1543              trash_at=None,
1544              merge=True,
1545              num_retries=None,
1546              preserve_version=False):
1547         """Save collection to an existing collection record.
1548
1549         Commit pending buffer blocks to Keep, merge with remote record (if
1550         merge=True, the default), and update the collection record. Returns
1551         the current manifest text.
1552
1553         Will raise AssertionError if not associated with a collection record on
1554         the API server.  If you want to save a manifest to Keep only, see
1555         `save_new()`.
1556
1557         :properties:
1558           Additional properties of collection. This value will replace any existing
1559           properties of collection.
1560
1561         :storage_classes:
1562           Specify desirable storage classes to be used when writing data to Keep.
1563
1564         :trash_at:
1565           A collection is *expiring* when it has a *trash_at* time in the future.
1566           An expiring collection can be accessed as normal,
1567           but is scheduled to be trashed automatically at the *trash_at* time.
1568
1569         :merge:
1570           Update and merge remote changes before saving.  Otherwise, any
1571           remote changes will be ignored and overwritten.
1572
1573         :num_retries:
1574           Retry count on API calls (if None,  use the collection default)
1575
1576         :preserve_version:
1577           If True, indicate that the collection content being saved right now
1578           should be preserved in a version snapshot if the collection record is
1579           updated in the future. Requires that the API server has
1580           Collections.CollectionVersioning enabled, if not, setting this will
1581           raise an exception.
1582
1583         """
1584         if properties and type(properties) is not dict:
1585             raise errors.ArgumentError("properties must be dictionary type.")
1586
1587         if storage_classes and type(storage_classes) is not list:
1588             raise errors.ArgumentError("storage_classes must be list type.")
1589         if storage_classes:
1590             self._storage_classes_desired = storage_classes
1591
1592         if trash_at and type(trash_at) is not datetime.datetime:
1593             raise errors.ArgumentError("trash_at must be datetime type.")
1594
1595         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1596             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1597
1598         body={}
1599         if properties:
1600             body["properties"] = properties
1601         if self.storage_classes_desired():
1602             body["storage_classes_desired"] = self.storage_classes_desired()
1603         if trash_at:
1604             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1605             body["trash_at"] = t
1606         if preserve_version:
1607             body["preserve_version"] = preserve_version
1608
1609         if not self.committed():
1610             if self._has_remote_blocks:
1611                 # Copy any remote blocks to the local cluster.
1612                 self._copy_remote_blocks(remote_blocks={})
1613                 self._has_remote_blocks = False
1614             if not self._has_collection_uuid():
1615                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1616             elif not self._has_local_collection_uuid():
1617                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1618
1619             self._my_block_manager().commit_all()
1620
1621             if merge:
1622                 self.update()
1623
1624             text = self.manifest_text(strip=False)
1625             body['manifest_text'] = text
1626
1627             self._remember_api_response(self._my_api().collections().update(
1628                 uuid=self._manifest_locator,
1629                 body=body
1630                 ).execute(num_retries=num_retries))
1631             self._manifest_text = self._api_response["manifest_text"]
1632             self._portable_data_hash = self._api_response["portable_data_hash"]
1633             self.set_committed(True)
1634         elif body:
1635             self._remember_api_response(self._my_api().collections().update(
1636                 uuid=self._manifest_locator,
1637                 body=body
1638                 ).execute(num_retries=num_retries))
1639
1640         return self._manifest_text
1641
1642
1643     @must_be_writable
1644     @synchronized
1645     @retry_method
1646     def save_new(self, name=None,
1647                  create_collection_record=True,
1648                  owner_uuid=None,
1649                  properties=None,
1650                  storage_classes=None,
1651                  trash_at=None,
1652                  ensure_unique_name=False,
1653                  num_retries=None,
1654                  preserve_version=False):
1655         """Save collection to a new collection record.
1656
1657         Commit pending buffer blocks to Keep and, when create_collection_record
1658         is True (default), create a new collection record.  After creating a
1659         new collection record, this Collection object will be associated with
1660         the new record used by `save()`. Returns the current manifest text.
1661
1662         :name:
1663           The collection name.
1664
1665         :create_collection_record:
1666            If True, create a collection record on the API server.
1667            If False, only commit blocks to Keep and return the manifest text.
1668
1669         :owner_uuid:
1670           the user, or project uuid that will own this collection.
1671           If None, defaults to the current user.
1672
1673         :properties:
1674           Additional properties of collection. This value will replace any existing
1675           properties of collection.
1676
1677         :storage_classes:
1678           Specify desirable storage classes to be used when writing data to Keep.
1679
1680         :trash_at:
1681           A collection is *expiring* when it has a *trash_at* time in the future.
1682           An expiring collection can be accessed as normal,
1683           but is scheduled to be trashed automatically at the *trash_at* time.
1684
1685         :ensure_unique_name:
1686           If True, ask the API server to rename the collection
1687           if it conflicts with a collection with the same name and owner.  If
1688           False, a name conflict will result in an error.
1689
1690         :num_retries:
1691           Retry count on API calls (if None,  use the collection default)
1692
1693         :preserve_version:
1694           If True, indicate that the collection content being saved right now
1695           should be preserved in a version snapshot if the collection record is
1696           updated in the future. Requires that the API server has
1697           Collections.CollectionVersioning enabled, if not, setting this will
1698           raise an exception.
1699
1700         """
1701         if properties and type(properties) is not dict:
1702             raise errors.ArgumentError("properties must be dictionary type.")
1703
1704         if storage_classes and type(storage_classes) is not list:
1705             raise errors.ArgumentError("storage_classes must be list type.")
1706
1707         if trash_at and type(trash_at) is not datetime.datetime:
1708             raise errors.ArgumentError("trash_at must be datetime type.")
1709
1710         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1711             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1712
1713         if self._has_remote_blocks:
1714             # Copy any remote blocks to the local cluster.
1715             self._copy_remote_blocks(remote_blocks={})
1716             self._has_remote_blocks = False
1717
1718         if storage_classes:
1719             self._storage_classes_desired = storage_classes
1720
1721         self._my_block_manager().commit_all()
1722         text = self.manifest_text(strip=False)
1723
1724         if create_collection_record:
1725             if name is None:
1726                 name = "New collection"
1727                 ensure_unique_name = True
1728
1729             body = {"manifest_text": text,
1730                     "name": name,
1731                     "replication_desired": self.replication_desired}
1732             if owner_uuid:
1733                 body["owner_uuid"] = owner_uuid
1734             if properties:
1735                 body["properties"] = properties
1736             if self.storage_classes_desired():
1737                 body["storage_classes_desired"] = self.storage_classes_desired()
1738             if trash_at:
1739                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1740                 body["trash_at"] = t
1741             if preserve_version:
1742                 body["preserve_version"] = preserve_version
1743
1744             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1745             text = self._api_response["manifest_text"]
1746
1747             self._manifest_locator = self._api_response["uuid"]
1748             self._portable_data_hash = self._api_response["portable_data_hash"]
1749
1750             self._manifest_text = text
1751             self.set_committed(True)
1752
1753         return text
1754
1755     _token_re = re.compile(r'(\S+)(\s+|$)')
1756     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1757     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1758
1759     def _unescape_manifest_path(self, path):
1760         return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1761
1762     @synchronized
1763     def _import_manifest(self, manifest_text):
1764         """Import a manifest into a `Collection`.
1765
1766         :manifest_text:
1767           The manifest text to import from.
1768
1769         """
1770         if len(self) > 0:
1771             raise ArgumentError("Can only import manifest into an empty collection")
1772
1773         STREAM_NAME = 0
1774         BLOCKS = 1
1775         SEGMENTS = 2
1776
1777         stream_name = None
1778         state = STREAM_NAME
1779
1780         for token_and_separator in self._token_re.finditer(manifest_text):
1781             tok = token_and_separator.group(1)
1782             sep = token_and_separator.group(2)
1783
1784             if state == STREAM_NAME:
1785                 # starting a new stream
1786                 stream_name = self._unescape_manifest_path(tok)
1787                 blocks = []
1788                 segments = []
1789                 streamoffset = 0
1790                 state = BLOCKS
1791                 self.find_or_create(stream_name, COLLECTION)
1792                 continue
1793
1794             if state == BLOCKS:
1795                 block_locator = self._block_re.match(tok)
1796                 if block_locator:
1797                     blocksize = int(block_locator.group(1))
1798                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1799                     streamoffset += blocksize
1800                 else:
1801                     state = SEGMENTS
1802
1803             if state == SEGMENTS:
1804                 file_segment = self._segment_re.match(tok)
1805                 if file_segment:
1806                     pos = int(file_segment.group(1))
1807                     size = int(file_segment.group(2))
1808                     name = self._unescape_manifest_path(file_segment.group(3))
1809                     if name.split('/')[-1] == '.':
1810                         # placeholder for persisting an empty directory, not a real file
1811                         if len(name) > 2:
1812                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1813                     else:
1814                         filepath = os.path.join(stream_name, name)
1815                         try:
1816                             afile = self.find_or_create(filepath, FILE)
1817                         except IOError as e:
1818                             if e.errno == errno.ENOTDIR:
1819                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1820                             else:
1821                                 raise e from None
1822                         if isinstance(afile, ArvadosFile):
1823                             afile.add_segment(blocks, pos, size)
1824                         else:
1825                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1826                 else:
1827                     # error!
1828                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1829
1830             if sep == "\n":
1831                 stream_name = None
1832                 state = STREAM_NAME
1833
1834         self.set_committed(True)
1835
1836     @synchronized
1837     def notify(self, event, collection, name, item):
1838         if self._callback:
1839             self._callback(event, collection, name, item)
1840
1841
1842 class Subcollection(RichCollectionBase):
1843     """This is a subdirectory within a collection that doesn't have its own API
1844     server record.
1845
1846     Subcollection locking falls under the umbrella lock of its root collection.
1847
1848     """
1849
1850     def __init__(self, parent, name):
1851         super(Subcollection, self).__init__(parent)
1852         self.lock = self.root_collection().lock
1853         self._manifest_text = None
1854         self.name = name
1855         self.num_retries = parent.num_retries
1856
1857     def root_collection(self):
1858         return self.parent.root_collection()
1859
1860     def writable(self):
1861         return self.root_collection().writable()
1862
1863     def _my_api(self):
1864         return self.root_collection()._my_api()
1865
1866     def _my_keep(self):
1867         return self.root_collection()._my_keep()
1868
1869     def _my_block_manager(self):
1870         return self.root_collection()._my_block_manager()
1871
1872     def stream_name(self):
1873         return os.path.join(self.parent.stream_name(), self.name)
1874
1875     @synchronized
1876     def clone(self, new_parent, new_name):
1877         c = Subcollection(new_parent, new_name)
1878         c._clonefrom(self)
1879         return c
1880
1881     @must_be_writable
1882     @synchronized
1883     def _reparent(self, newparent, newname):
1884         self.set_committed(False)
1885         self.flush()
1886         self.parent.remove(self.name, recursive=True)
1887         self.parent = newparent
1888         self.name = newname
1889         self.lock = self.parent.root_collection().lock
1890
1891     @synchronized
1892     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1893         """Encode empty directories by using an \056-named (".") empty file"""
1894         if len(self._items) == 0:
1895             return "%s %s 0:0:\\056\n" % (
1896                 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1897         return super(Subcollection, self)._get_manifest_text(stream_name,
1898                                                              strip, normalize,
1899                                                              only_committed)
1900
1901
1902 class CollectionReader(Collection):
1903     """A read-only collection object.
1904
1905     Initialize from a collection UUID or portable data hash, or raw
1906     manifest text.  See `Collection` constructor for detailed options.
1907
1908     """
1909     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1910         self._in_init = True
1911         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1912         self._in_init = False
1913
1914         # Forego any locking since it should never change once initialized.
1915         self.lock = NoopLock()
1916
1917         # Backwards compatability with old CollectionReader
1918         # all_streams() and all_files()
1919         self._streams = None
1920
1921     def writable(self):
1922         return self._in_init
1923
1924     def _populate_streams(orig_func):
1925         @functools.wraps(orig_func)
1926         def populate_streams_wrapper(self, *args, **kwargs):
1927             # Defer populating self._streams until needed since it creates a copy of the manifest.
1928             if self._streams is None:
1929                 if self._manifest_text:
1930                     self._streams = [sline.split()
1931                                      for sline in self._manifest_text.split("\n")
1932                                      if sline]
1933                 else:
1934                     self._streams = []
1935             return orig_func(self, *args, **kwargs)
1936         return populate_streams_wrapper
1937
1938     @_populate_streams
1939     def normalize(self):
1940         """Normalize the streams returned by `all_streams`.
1941
1942         This method is kept for backwards compatability and only affects the
1943         behavior of `all_streams()` and `all_files()`
1944
1945         """
1946
1947         # Rearrange streams
1948         streams = {}
1949         for s in self.all_streams():
1950             for f in s.all_files():
1951                 streamname, filename = split(s.name() + "/" + f.name())
1952                 if streamname not in streams:
1953                     streams[streamname] = {}
1954                 if filename not in streams[streamname]:
1955                     streams[streamname][filename] = []
1956                 for r in f.segments:
1957                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1958
1959         self._streams = [normalize_stream(s, streams[s])
1960                          for s in sorted(streams)]
1961
1962     @arvados.util._deprecated('3.0', 'Collection iteration')
1963     @_populate_streams
1964     def all_streams(self):
1965         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1966                 for s in self._streams]
1967
1968     @arvados.util._deprecated('3.0', 'Collection iteration')
1969     @_populate_streams
1970     def all_files(self):
1971         for s in self.all_streams():
1972             for f in s.all_files():
1973                 yield f