20257: Refactor http-to-keep downloader
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import ciso8601
11 import datetime
12 import errno
13 import functools
14 import hashlib
15 import io
16 import logging
17 import os
18 import re
19 import sys
20 import threading
21 import time
22
23 from collections import deque
24 from stat import *
25
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream, escape
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
34 import arvados.util
35 import arvados.events as events
36 from arvados.retry import retry_method
37
38 _logger = logging.getLogger('arvados.collection')
39
40
41 if sys.version_info >= (3, 0):
42     TextIOWrapper = io.TextIOWrapper
43 else:
44     class TextIOWrapper(io.TextIOWrapper):
45         """To maintain backward compatibility, cast str to unicode in
46         write('foo').
47
48         """
49         def write(self, data):
50             if isinstance(data, basestring):
51                 data = unicode(data)
52             return super(TextIOWrapper, self).write(data)
53
54
55 class CollectionBase(object):
56     """Abstract base class for Collection classes."""
57
58     def __enter__(self):
59         return self
60
61     def __exit__(self, exc_type, exc_value, traceback):
62         pass
63
64     def _my_keep(self):
65         if self._keep_client is None:
66             self._keep_client = KeepClient(api_client=self._api_client,
67                                            num_retries=self.num_retries)
68         return self._keep_client
69
70     def stripped_manifest(self):
71         """Get the manifest with locator hints stripped.
72
73         Return the manifest for the current collection with all
74         non-portable hints (i.e., permission signatures and other
75         hints other than size hints) removed from the locators.
76         """
77         raw = self.manifest_text()
78         clean = []
79         for line in raw.split("\n"):
80             fields = line.split()
81             if fields:
82                 clean_fields = fields[:1] + [
83                     (re.sub(r'\+[^\d][^\+]*', '', x)
84                      if re.match(arvados.util.keep_locator_pattern, x)
85                      else x)
86                     for x in fields[1:]]
87                 clean += [' '.join(clean_fields), "\n"]
88         return ''.join(clean)
89
90
91 class _WriterFile(_FileLikeObjectBase):
92     def __init__(self, coll_writer, name):
93         super(_WriterFile, self).__init__(name, 'wb')
94         self.dest = coll_writer
95
96     def close(self):
97         super(_WriterFile, self).close()
98         self.dest.finish_current_file()
99
100     @_FileLikeObjectBase._before_close
101     def write(self, data):
102         self.dest.write(data)
103
104     @_FileLikeObjectBase._before_close
105     def writelines(self, seq):
106         for data in seq:
107             self.write(data)
108
109     @_FileLikeObjectBase._before_close
110     def flush(self):
111         self.dest.flush_data()
112
113
114 class CollectionWriter(CollectionBase):
115     """Deprecated, use Collection instead."""
116
117     def __init__(self, api_client=None, num_retries=0, replication=None):
118         """Instantiate a CollectionWriter.
119
120         CollectionWriter lets you build a new Arvados Collection from scratch.
121         Write files to it.  The CollectionWriter will upload data to Keep as
122         appropriate, and provide you with the Collection manifest text when
123         you're finished.
124
125         Arguments:
126         * api_client: The API client to use to look up Collections.  If not
127           provided, CollectionReader will build one from available Arvados
128           configuration.
129         * num_retries: The default number of times to retry failed
130           service requests.  Default 0.  You may change this value
131           after instantiation, but note those changes may not
132           propagate to related objects like the Keep client.
133         * replication: The number of copies of each block to store.
134           If this argument is None or not supplied, replication is
135           the server-provided default if available, otherwise 2.
136         """
137         self._api_client = api_client
138         self.num_retries = num_retries
139         self.replication = (2 if replication is None else replication)
140         self._keep_client = None
141         self._data_buffer = []
142         self._data_buffer_len = 0
143         self._current_stream_files = []
144         self._current_stream_length = 0
145         self._current_stream_locators = []
146         self._current_stream_name = '.'
147         self._current_file_name = None
148         self._current_file_pos = 0
149         self._finished_streams = []
150         self._close_file = None
151         self._queued_file = None
152         self._queued_dirents = deque()
153         self._queued_trees = deque()
154         self._last_open = None
155
156     def __exit__(self, exc_type, exc_value, traceback):
157         if exc_type is None:
158             self.finish()
159
160     def do_queued_work(self):
161         # The work queue consists of three pieces:
162         # * _queued_file: The file object we're currently writing to the
163         #   Collection.
164         # * _queued_dirents: Entries under the current directory
165         #   (_queued_trees[0]) that we want to write or recurse through.
166         #   This may contain files from subdirectories if
167         #   max_manifest_depth == 0 for this directory.
168         # * _queued_trees: Directories that should be written as separate
169         #   streams to the Collection.
170         # This function handles the smallest piece of work currently queued
171         # (current file, then current directory, then next directory) until
172         # no work remains.  The _work_THING methods each do a unit of work on
173         # THING.  _queue_THING methods add a THING to the work queue.
174         while True:
175             if self._queued_file:
176                 self._work_file()
177             elif self._queued_dirents:
178                 self._work_dirents()
179             elif self._queued_trees:
180                 self._work_trees()
181             else:
182                 break
183
184     def _work_file(self):
185         while True:
186             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
187             if not buf:
188                 break
189             self.write(buf)
190         self.finish_current_file()
191         if self._close_file:
192             self._queued_file.close()
193         self._close_file = None
194         self._queued_file = None
195
196     def _work_dirents(self):
197         path, stream_name, max_manifest_depth = self._queued_trees[0]
198         if stream_name != self.current_stream_name():
199             self.start_new_stream(stream_name)
200         while self._queued_dirents:
201             dirent = self._queued_dirents.popleft()
202             target = os.path.join(path, dirent)
203             if os.path.isdir(target):
204                 self._queue_tree(target,
205                                  os.path.join(stream_name, dirent),
206                                  max_manifest_depth - 1)
207             else:
208                 self._queue_file(target, dirent)
209                 break
210         if not self._queued_dirents:
211             self._queued_trees.popleft()
212
213     def _work_trees(self):
214         path, stream_name, max_manifest_depth = self._queued_trees[0]
215         d = arvados.util.listdir_recursive(
216             path, max_depth = (None if max_manifest_depth == 0 else 0))
217         if d:
218             self._queue_dirents(stream_name, d)
219         else:
220             self._queued_trees.popleft()
221
222     def _queue_file(self, source, filename=None):
223         assert (self._queued_file is None), "tried to queue more than one file"
224         if not hasattr(source, 'read'):
225             source = open(source, 'rb')
226             self._close_file = True
227         else:
228             self._close_file = False
229         if filename is None:
230             filename = os.path.basename(source.name)
231         self.start_new_file(filename)
232         self._queued_file = source
233
234     def _queue_dirents(self, stream_name, dirents):
235         assert (not self._queued_dirents), "tried to queue more than one tree"
236         self._queued_dirents = deque(sorted(dirents))
237
238     def _queue_tree(self, path, stream_name, max_manifest_depth):
239         self._queued_trees.append((path, stream_name, max_manifest_depth))
240
241     def write_file(self, source, filename=None):
242         self._queue_file(source, filename)
243         self.do_queued_work()
244
245     def write_directory_tree(self,
246                              path, stream_name='.', max_manifest_depth=-1):
247         self._queue_tree(path, stream_name, max_manifest_depth)
248         self.do_queued_work()
249
250     def write(self, newdata):
251         if isinstance(newdata, bytes):
252             pass
253         elif isinstance(newdata, str):
254             newdata = newdata.encode()
255         elif hasattr(newdata, '__iter__'):
256             for s in newdata:
257                 self.write(s)
258             return
259         self._data_buffer.append(newdata)
260         self._data_buffer_len += len(newdata)
261         self._current_stream_length += len(newdata)
262         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
263             self.flush_data()
264
265     def open(self, streampath, filename=None):
266         """open(streampath[, filename]) -> file-like object
267
268         Pass in the path of a file to write to the Collection, either as a
269         single string or as two separate stream name and file name arguments.
270         This method returns a file-like object you can write to add it to the
271         Collection.
272
273         You may only have one file object from the Collection open at a time,
274         so be sure to close the object when you're done.  Using the object in
275         a with statement makes that easy::
276
277           with cwriter.open('./doc/page1.txt') as outfile:
278               outfile.write(page1_data)
279           with cwriter.open('./doc/page2.txt') as outfile:
280               outfile.write(page2_data)
281         """
282         if filename is None:
283             streampath, filename = split(streampath)
284         if self._last_open and not self._last_open.closed:
285             raise errors.AssertionError(
286                 u"can't open '{}' when '{}' is still open".format(
287                     filename, self._last_open.name))
288         if streampath != self.current_stream_name():
289             self.start_new_stream(streampath)
290         self.set_current_file_name(filename)
291         self._last_open = _WriterFile(self, filename)
292         return self._last_open
293
294     def flush_data(self):
295         data_buffer = b''.join(self._data_buffer)
296         if data_buffer:
297             self._current_stream_locators.append(
298                 self._my_keep().put(
299                     data_buffer[0:config.KEEP_BLOCK_SIZE],
300                     copies=self.replication))
301             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
302             self._data_buffer_len = len(self._data_buffer[0])
303
304     def start_new_file(self, newfilename=None):
305         self.finish_current_file()
306         self.set_current_file_name(newfilename)
307
308     def set_current_file_name(self, newfilename):
309         if re.search(r'[\t\n]', newfilename):
310             raise errors.AssertionError(
311                 "Manifest filenames cannot contain whitespace: %s" %
312                 newfilename)
313         elif re.search(r'\x00', newfilename):
314             raise errors.AssertionError(
315                 "Manifest filenames cannot contain NUL characters: %s" %
316                 newfilename)
317         self._current_file_name = newfilename
318
319     def current_file_name(self):
320         return self._current_file_name
321
322     def finish_current_file(self):
323         if self._current_file_name is None:
324             if self._current_file_pos == self._current_stream_length:
325                 return
326             raise errors.AssertionError(
327                 "Cannot finish an unnamed file " +
328                 "(%d bytes at offset %d in '%s' stream)" %
329                 (self._current_stream_length - self._current_file_pos,
330                  self._current_file_pos,
331                  self._current_stream_name))
332         self._current_stream_files.append([
333                 self._current_file_pos,
334                 self._current_stream_length - self._current_file_pos,
335                 self._current_file_name])
336         self._current_file_pos = self._current_stream_length
337         self._current_file_name = None
338
339     def start_new_stream(self, newstreamname='.'):
340         self.finish_current_stream()
341         self.set_current_stream_name(newstreamname)
342
343     def set_current_stream_name(self, newstreamname):
344         if re.search(r'[\t\n]', newstreamname):
345             raise errors.AssertionError(
346                 "Manifest stream names cannot contain whitespace: '%s'" %
347                 (newstreamname))
348         self._current_stream_name = '.' if newstreamname=='' else newstreamname
349
350     def current_stream_name(self):
351         return self._current_stream_name
352
353     def finish_current_stream(self):
354         self.finish_current_file()
355         self.flush_data()
356         if not self._current_stream_files:
357             pass
358         elif self._current_stream_name is None:
359             raise errors.AssertionError(
360                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
361                 (self._current_stream_length, len(self._current_stream_files)))
362         else:
363             if not self._current_stream_locators:
364                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
365             self._finished_streams.append([self._current_stream_name,
366                                            self._current_stream_locators,
367                                            self._current_stream_files])
368         self._current_stream_files = []
369         self._current_stream_length = 0
370         self._current_stream_locators = []
371         self._current_stream_name = None
372         self._current_file_pos = 0
373         self._current_file_name = None
374
375     def finish(self):
376         """Store the manifest in Keep and return its locator.
377
378         This is useful for storing manifest fragments (task outputs)
379         temporarily in Keep during a Crunch job.
380
381         In other cases you should make a collection instead, by
382         sending manifest_text() to the API server's "create
383         collection" endpoint.
384         """
385         return self._my_keep().put(self.manifest_text().encode(),
386                                    copies=self.replication)
387
388     def portable_data_hash(self):
389         stripped = self.stripped_manifest().encode()
390         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
391
392     def manifest_text(self):
393         self.finish_current_stream()
394         manifest = ''
395
396         for stream in self._finished_streams:
397             if not re.search(r'^\.(/.*)?$', stream[0]):
398                 manifest += './'
399             manifest += stream[0].replace(' ', '\\040')
400             manifest += ' ' + ' '.join(stream[1])
401             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
402             manifest += "\n"
403
404         return manifest
405
406     def data_locators(self):
407         ret = []
408         for name, locators, files in self._finished_streams:
409             ret += locators
410         return ret
411
412     def save_new(self, name=None):
413         return self._api_client.collections().create(
414             ensure_unique_name=True,
415             body={
416                 'name': name,
417                 'manifest_text': self.manifest_text(),
418             }).execute(num_retries=self.num_retries)
419
420
421 class ResumableCollectionWriter(CollectionWriter):
422     """Deprecated, use Collection instead."""
423
424     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
425                    '_current_stream_locators', '_current_stream_name',
426                    '_current_file_name', '_current_file_pos', '_close_file',
427                    '_data_buffer', '_dependencies', '_finished_streams',
428                    '_queued_dirents', '_queued_trees']
429
430     def __init__(self, api_client=None, **kwargs):
431         self._dependencies = {}
432         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
433
434     @classmethod
435     def from_state(cls, state, *init_args, **init_kwargs):
436         # Try to build a new writer from scratch with the given state.
437         # If the state is not suitable to resume (because files have changed,
438         # been deleted, aren't predictable, etc.), raise a
439         # StaleWriterStateError.  Otherwise, return the initialized writer.
440         # The caller is responsible for calling writer.do_queued_work()
441         # appropriately after it's returned.
442         writer = cls(*init_args, **init_kwargs)
443         for attr_name in cls.STATE_PROPS:
444             attr_value = state[attr_name]
445             attr_class = getattr(writer, attr_name).__class__
446             # Coerce the value into the same type as the initial value, if
447             # needed.
448             if attr_class not in (type(None), attr_value.__class__):
449                 attr_value = attr_class(attr_value)
450             setattr(writer, attr_name, attr_value)
451         # Check dependencies before we try to resume anything.
452         if any(KeepLocator(ls).permission_expired()
453                for ls in writer._current_stream_locators):
454             raise errors.StaleWriterStateError(
455                 "locators include expired permission hint")
456         writer.check_dependencies()
457         if state['_current_file'] is not None:
458             path, pos = state['_current_file']
459             try:
460                 writer._queued_file = open(path, 'rb')
461                 writer._queued_file.seek(pos)
462             except IOError as error:
463                 raise errors.StaleWriterStateError(
464                     u"failed to reopen active file {}: {}".format(path, error))
465         return writer
466
467     def check_dependencies(self):
468         for path, orig_stat in listitems(self._dependencies):
469             if not S_ISREG(orig_stat[ST_MODE]):
470                 raise errors.StaleWriterStateError(u"{} not file".format(path))
471             try:
472                 now_stat = tuple(os.stat(path))
473             except OSError as error:
474                 raise errors.StaleWriterStateError(
475                     u"failed to stat {}: {}".format(path, error))
476             if ((not S_ISREG(now_stat[ST_MODE])) or
477                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
478                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
479                 raise errors.StaleWriterStateError(u"{} changed".format(path))
480
481     def dump_state(self, copy_func=lambda x: x):
482         state = {attr: copy_func(getattr(self, attr))
483                  for attr in self.STATE_PROPS}
484         if self._queued_file is None:
485             state['_current_file'] = None
486         else:
487             state['_current_file'] = (os.path.realpath(self._queued_file.name),
488                                       self._queued_file.tell())
489         return state
490
491     def _queue_file(self, source, filename=None):
492         try:
493             src_path = os.path.realpath(source)
494         except Exception:
495             raise errors.AssertionError(u"{} not a file path".format(source))
496         try:
497             path_stat = os.stat(src_path)
498         except OSError as stat_error:
499             path_stat = None
500         super(ResumableCollectionWriter, self)._queue_file(source, filename)
501         fd_stat = os.fstat(self._queued_file.fileno())
502         if not S_ISREG(fd_stat.st_mode):
503             # We won't be able to resume from this cache anyway, so don't
504             # worry about further checks.
505             self._dependencies[source] = tuple(fd_stat)
506         elif path_stat is None:
507             raise errors.AssertionError(
508                 u"could not stat {}: {}".format(source, stat_error))
509         elif path_stat.st_ino != fd_stat.st_ino:
510             raise errors.AssertionError(
511                 u"{} changed between open and stat calls".format(source))
512         else:
513             self._dependencies[src_path] = tuple(fd_stat)
514
515     def write(self, data):
516         if self._queued_file is None:
517             raise errors.AssertionError(
518                 "resumable writer can't accept unsourced data")
519         return super(ResumableCollectionWriter, self).write(data)
520
521
522 ADD = "add"
523 DEL = "del"
524 MOD = "mod"
525 TOK = "tok"
526 FILE = "file"
527 COLLECTION = "collection"
528
529 class RichCollectionBase(CollectionBase):
530     """Base class for Collections and Subcollections.
531
532     Implements the majority of functionality relating to accessing items in the
533     Collection.
534
535     """
536
537     def __init__(self, parent=None):
538         self.parent = parent
539         self._committed = False
540         self._has_remote_blocks = False
541         self._callback = None
542         self._items = {}
543
544     def _my_api(self):
545         raise NotImplementedError()
546
547     def _my_keep(self):
548         raise NotImplementedError()
549
550     def _my_block_manager(self):
551         raise NotImplementedError()
552
553     def writable(self):
554         raise NotImplementedError()
555
556     def root_collection(self):
557         raise NotImplementedError()
558
559     def notify(self, event, collection, name, item):
560         raise NotImplementedError()
561
562     def stream_name(self):
563         raise NotImplementedError()
564
565
566     @synchronized
567     def has_remote_blocks(self):
568         """Recursively check for a +R segment locator signature."""
569
570         if self._has_remote_blocks:
571             return True
572         for item in self:
573             if self[item].has_remote_blocks():
574                 return True
575         return False
576
577     @synchronized
578     def set_has_remote_blocks(self, val):
579         self._has_remote_blocks = val
580         if self.parent:
581             self.parent.set_has_remote_blocks(val)
582
583     @must_be_writable
584     @synchronized
585     def find_or_create(self, path, create_type):
586         """Recursively search the specified file path.
587
588         May return either a `Collection` or `ArvadosFile`.  If not found, will
589         create a new item at the specified path based on `create_type`.  Will
590         create intermediate subcollections needed to contain the final item in
591         the path.
592
593         :create_type:
594           One of `arvados.collection.FILE` or
595           `arvados.collection.COLLECTION`.  If the path is not found, and value
596           of create_type is FILE then create and return a new ArvadosFile for
597           the last path component.  If COLLECTION, then create and return a new
598           Collection for the last path component.
599
600         """
601
602         pathcomponents = path.split("/", 1)
603         if pathcomponents[0]:
604             item = self._items.get(pathcomponents[0])
605             if len(pathcomponents) == 1:
606                 if item is None:
607                     # create new file
608                     if create_type == COLLECTION:
609                         item = Subcollection(self, pathcomponents[0])
610                     else:
611                         item = ArvadosFile(self, pathcomponents[0])
612                     self._items[pathcomponents[0]] = item
613                     self.set_committed(False)
614                     self.notify(ADD, self, pathcomponents[0], item)
615                 return item
616             else:
617                 if item is None:
618                     # create new collection
619                     item = Subcollection(self, pathcomponents[0])
620                     self._items[pathcomponents[0]] = item
621                     self.set_committed(False)
622                     self.notify(ADD, self, pathcomponents[0], item)
623                 if isinstance(item, RichCollectionBase):
624                     return item.find_or_create(pathcomponents[1], create_type)
625                 else:
626                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
627         else:
628             return self
629
630     @synchronized
631     def find(self, path):
632         """Recursively search the specified file path.
633
634         May return either a Collection or ArvadosFile. Return None if not
635         found.
636         If path is invalid (ex: starts with '/'), an IOError exception will be
637         raised.
638
639         """
640         if not path:
641             raise errors.ArgumentError("Parameter 'path' is empty.")
642
643         pathcomponents = path.split("/", 1)
644         if pathcomponents[0] == '':
645             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
646
647         item = self._items.get(pathcomponents[0])
648         if item is None:
649             return None
650         elif len(pathcomponents) == 1:
651             return item
652         else:
653             if isinstance(item, RichCollectionBase):
654                 if pathcomponents[1]:
655                     return item.find(pathcomponents[1])
656                 else:
657                     return item
658             else:
659                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
660
661     @synchronized
662     def mkdirs(self, path):
663         """Recursive subcollection create.
664
665         Like `os.makedirs()`.  Will create intermediate subcollections needed
666         to contain the leaf subcollection path.
667
668         """
669
670         if self.find(path) != None:
671             raise IOError(errno.EEXIST, "Directory or file exists", path)
672
673         return self.find_or_create(path, COLLECTION)
674
675     def open(self, path, mode="r", encoding=None):
676         """Open a file-like object for access.
677
678         :path:
679           path to a file in the collection
680         :mode:
681           a string consisting of "r", "w", or "a", optionally followed
682           by "b" or "t", optionally followed by "+".
683           :"b":
684             binary mode: write() accepts bytes, read() returns bytes.
685           :"t":
686             text mode (default): write() accepts strings, read() returns strings.
687           :"r":
688             opens for reading
689           :"r+":
690             opens for reading and writing.  Reads/writes share a file pointer.
691           :"w", "w+":
692             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
693           :"a", "a+":
694             opens for reading and writing.  All writes are appended to
695             the end of the file.  Writing does not affect the file pointer for
696             reading.
697
698         """
699
700         if not re.search(r'^[rwa][bt]?\+?$', mode):
701             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
702
703         if mode[0] == 'r' and '+' not in mode:
704             fclass = ArvadosFileReader
705             arvfile = self.find(path)
706         elif not self.writable():
707             raise IOError(errno.EROFS, "Collection is read only")
708         else:
709             fclass = ArvadosFileWriter
710             arvfile = self.find_or_create(path, FILE)
711
712         if arvfile is None:
713             raise IOError(errno.ENOENT, "File not found", path)
714         if not isinstance(arvfile, ArvadosFile):
715             raise IOError(errno.EISDIR, "Is a directory", path)
716
717         if mode[0] == 'w':
718             arvfile.truncate(0)
719
720         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
721         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
722         if 'b' not in mode:
723             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
724             f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
725         return f
726
727     def modified(self):
728         """Determine if the collection has been modified since last commited."""
729         return not self.committed()
730
731     @synchronized
732     def committed(self):
733         """Determine if the collection has been committed to the API server."""
734         return self._committed
735
736     @synchronized
737     def set_committed(self, value=True):
738         """Recursively set committed flag.
739
740         If value is True, set committed to be True for this and all children.
741
742         If value is False, set committed to be False for this and all parents.
743         """
744         if value == self._committed:
745             return
746         if value:
747             for k,v in listitems(self._items):
748                 v.set_committed(True)
749             self._committed = True
750         else:
751             self._committed = False
752             if self.parent is not None:
753                 self.parent.set_committed(False)
754
755     @synchronized
756     def __iter__(self):
757         """Iterate over names of files and collections contained in this collection."""
758         return iter(viewkeys(self._items))
759
760     @synchronized
761     def __getitem__(self, k):
762         """Get a file or collection that is directly contained by this collection.
763
764         If you want to search a path, use `find()` instead.
765
766         """
767         return self._items[k]
768
769     @synchronized
770     def __contains__(self, k):
771         """Test if there is a file or collection a directly contained by this collection."""
772         return k in self._items
773
774     @synchronized
775     def __len__(self):
776         """Get the number of items directly contained in this collection."""
777         return len(self._items)
778
779     @must_be_writable
780     @synchronized
781     def __delitem__(self, p):
782         """Delete an item by name which is directly contained by this collection."""
783         del self._items[p]
784         self.set_committed(False)
785         self.notify(DEL, self, p, None)
786
787     @synchronized
788     def keys(self):
789         """Get a list of names of files and collections directly contained in this collection."""
790         return self._items.keys()
791
792     @synchronized
793     def values(self):
794         """Get a list of files and collection objects directly contained in this collection."""
795         return listvalues(self._items)
796
797     @synchronized
798     def items(self):
799         """Get a list of (name, object) tuples directly contained in this collection."""
800         return listitems(self._items)
801
802     def exists(self, path):
803         """Test if there is a file or collection at `path`."""
804         return self.find(path) is not None
805
806     @must_be_writable
807     @synchronized
808     def remove(self, path, recursive=False):
809         """Remove the file or subcollection (directory) at `path`.
810
811         :recursive:
812           Specify whether to remove non-empty subcollections (True), or raise an error (False).
813         """
814
815         if not path:
816             raise errors.ArgumentError("Parameter 'path' is empty.")
817
818         pathcomponents = path.split("/", 1)
819         item = self._items.get(pathcomponents[0])
820         if item is None:
821             raise IOError(errno.ENOENT, "File not found", path)
822         if len(pathcomponents) == 1:
823             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
824                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
825             deleteditem = self._items[pathcomponents[0]]
826             del self._items[pathcomponents[0]]
827             self.set_committed(False)
828             self.notify(DEL, self, pathcomponents[0], deleteditem)
829         else:
830             item.remove(pathcomponents[1], recursive=recursive)
831
832     def _clonefrom(self, source):
833         for k,v in listitems(source):
834             self._items[k] = v.clone(self, k)
835
836     def clone(self):
837         raise NotImplementedError()
838
839     @must_be_writable
840     @synchronized
841     def add(self, source_obj, target_name, overwrite=False, reparent=False):
842         """Copy or move a file or subcollection to this collection.
843
844         :source_obj:
845           An ArvadosFile, or Subcollection object
846
847         :target_name:
848           Destination item name.  If the target name already exists and is a
849           file, this will raise an error unless you specify `overwrite=True`.
850
851         :overwrite:
852           Whether to overwrite target file if it already exists.
853
854         :reparent:
855           If True, source_obj will be moved from its parent collection to this collection.
856           If False, source_obj will be copied and the parent collection will be
857           unmodified.
858
859         """
860
861         if target_name in self and not overwrite:
862             raise IOError(errno.EEXIST, "File already exists", target_name)
863
864         modified_from = None
865         if target_name in self:
866             modified_from = self[target_name]
867
868         # Actually make the move or copy.
869         if reparent:
870             source_obj._reparent(self, target_name)
871             item = source_obj
872         else:
873             item = source_obj.clone(self, target_name)
874
875         self._items[target_name] = item
876         self.set_committed(False)
877         if not self._has_remote_blocks and source_obj.has_remote_blocks():
878             self.set_has_remote_blocks(True)
879
880         if modified_from:
881             self.notify(MOD, self, target_name, (modified_from, item))
882         else:
883             self.notify(ADD, self, target_name, item)
884
885     def _get_src_target(self, source, target_path, source_collection, create_dest):
886         if source_collection is None:
887             source_collection = self
888
889         # Find the object
890         if isinstance(source, basestring):
891             source_obj = source_collection.find(source)
892             if source_obj is None:
893                 raise IOError(errno.ENOENT, "File not found", source)
894             sourcecomponents = source.split("/")
895         else:
896             source_obj = source
897             sourcecomponents = None
898
899         # Find parent collection the target path
900         targetcomponents = target_path.split("/")
901
902         # Determine the name to use.
903         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
904
905         if not target_name:
906             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
907
908         if create_dest:
909             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
910         else:
911             if len(targetcomponents) > 1:
912                 target_dir = self.find("/".join(targetcomponents[0:-1]))
913             else:
914                 target_dir = self
915
916         if target_dir is None:
917             raise IOError(errno.ENOENT, "Target directory not found", target_name)
918
919         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
920             target_dir = target_dir[target_name]
921             target_name = sourcecomponents[-1]
922
923         return (source_obj, target_dir, target_name)
924
925     @must_be_writable
926     @synchronized
927     def copy(self, source, target_path, source_collection=None, overwrite=False):
928         """Copy a file or subcollection to a new path in this collection.
929
930         :source:
931           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
932
933         :target_path:
934           Destination file or path.  If the target path already exists and is a
935           subcollection, the item will be placed inside the subcollection.  If
936           the target path already exists and is a file, this will raise an error
937           unless you specify `overwrite=True`.
938
939         :source_collection:
940           Collection to copy `source_path` from (default `self`)
941
942         :overwrite:
943           Whether to overwrite target file if it already exists.
944         """
945
946         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
947         target_dir.add(source_obj, target_name, overwrite, False)
948
949     @must_be_writable
950     @synchronized
951     def rename(self, source, target_path, source_collection=None, overwrite=False):
952         """Move a file or subcollection from `source_collection` to a new path in this collection.
953
954         :source:
955           A string with a path to source file or subcollection.
956
957         :target_path:
958           Destination file or path.  If the target path already exists and is a
959           subcollection, the item will be placed inside the subcollection.  If
960           the target path already exists and is a file, this will raise an error
961           unless you specify `overwrite=True`.
962
963         :source_collection:
964           Collection to copy `source_path` from (default `self`)
965
966         :overwrite:
967           Whether to overwrite target file if it already exists.
968         """
969
970         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
971         if not source_obj.writable():
972             raise IOError(errno.EROFS, "Source collection is read only", source)
973         target_dir.add(source_obj, target_name, overwrite, True)
974
975     def portable_manifest_text(self, stream_name="."):
976         """Get the manifest text for this collection, sub collections and files.
977
978         This method does not flush outstanding blocks to Keep.  It will return
979         a normalized manifest with access tokens stripped.
980
981         :stream_name:
982           Name to use for this stream (directory)
983
984         """
985         return self._get_manifest_text(stream_name, True, True)
986
987     @synchronized
988     def manifest_text(self, stream_name=".", strip=False, normalize=False,
989                       only_committed=False):
990         """Get the manifest text for this collection, sub collections and files.
991
992         This method will flush outstanding blocks to Keep.  By default, it will
993         not normalize an unmodified manifest or strip access tokens.
994
995         :stream_name:
996           Name to use for this stream (directory)
997
998         :strip:
999           If True, remove signing tokens from block locators if present.
1000           If False (default), block locators are left unchanged.
1001
1002         :normalize:
1003           If True, always export the manifest text in normalized form
1004           even if the Collection is not modified.  If False (default) and the collection
1005           is not modified, return the original manifest text even if it is not
1006           in normalized form.
1007
1008         :only_committed:
1009           If True, don't commit pending blocks.
1010
1011         """
1012
1013         if not only_committed:
1014             self._my_block_manager().commit_all()
1015         return self._get_manifest_text(stream_name, strip, normalize,
1016                                        only_committed=only_committed)
1017
1018     @synchronized
1019     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1020         """Get the manifest text for this collection, sub collections and files.
1021
1022         :stream_name:
1023           Name to use for this stream (directory)
1024
1025         :strip:
1026           If True, remove signing tokens from block locators if present.
1027           If False (default), block locators are left unchanged.
1028
1029         :normalize:
1030           If True, always export the manifest text in normalized form
1031           even if the Collection is not modified.  If False (default) and the collection
1032           is not modified, return the original manifest text even if it is not
1033           in normalized form.
1034
1035         :only_committed:
1036           If True, only include blocks that were already committed to Keep.
1037
1038         """
1039
1040         if not self.committed() or self._manifest_text is None or normalize:
1041             stream = {}
1042             buf = []
1043             sorted_keys = sorted(self.keys())
1044             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1045                 # Create a stream per file `k`
1046                 arvfile = self[filename]
1047                 filestream = []
1048                 for segment in arvfile.segments():
1049                     loc = segment.locator
1050                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1051                         if only_committed:
1052                             continue
1053                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1054                     if strip:
1055                         loc = KeepLocator(loc).stripped()
1056                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1057                                          segment.segment_offset, segment.range_size))
1058                 stream[filename] = filestream
1059             if stream:
1060                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1061             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1062                 buf.append(self[dirname].manifest_text(
1063                     stream_name=os.path.join(stream_name, dirname),
1064                     strip=strip, normalize=True, only_committed=only_committed))
1065             return "".join(buf)
1066         else:
1067             if strip:
1068                 return self.stripped_manifest()
1069             else:
1070                 return self._manifest_text
1071
1072     @synchronized
1073     def _copy_remote_blocks(self, remote_blocks={}):
1074         """Scan through the entire collection and ask Keep to copy remote blocks.
1075
1076         When accessing a remote collection, blocks will have a remote signature
1077         (+R instead of +A). Collect these signatures and request Keep to copy the
1078         blocks to the local cluster, returning local (+A) signatures.
1079
1080         :remote_blocks:
1081           Shared cache of remote to local block mappings. This is used to avoid
1082           doing extra work when blocks are shared by more than one file in
1083           different subdirectories.
1084
1085         """
1086         for item in self:
1087             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1088         return remote_blocks
1089
1090     @synchronized
1091     def diff(self, end_collection, prefix=".", holding_collection=None):
1092         """Generate list of add/modify/delete actions.
1093
1094         When given to `apply`, will change `self` to match `end_collection`
1095
1096         """
1097         changes = []
1098         if holding_collection is None:
1099             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1100         for k in self:
1101             if k not in end_collection:
1102                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1103         for k in end_collection:
1104             if k in self:
1105                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1106                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1107                 elif end_collection[k] != self[k]:
1108                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1109                 else:
1110                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1111             else:
1112                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1113         return changes
1114
1115     @must_be_writable
1116     @synchronized
1117     def apply(self, changes):
1118         """Apply changes from `diff`.
1119
1120         If a change conflicts with a local change, it will be saved to an
1121         alternate path indicating the conflict.
1122
1123         """
1124         if changes:
1125             self.set_committed(False)
1126         for change in changes:
1127             event_type = change[0]
1128             path = change[1]
1129             initial = change[2]
1130             local = self.find(path)
1131             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1132                                                                     time.gmtime()))
1133             if event_type == ADD:
1134                 if local is None:
1135                     # No local file at path, safe to copy over new file
1136                     self.copy(initial, path)
1137                 elif local is not None and local != initial:
1138                     # There is already local file and it is different:
1139                     # save change to conflict file.
1140                     self.copy(initial, conflictpath)
1141             elif event_type == MOD or event_type == TOK:
1142                 final = change[3]
1143                 if local == initial:
1144                     # Local matches the "initial" item so it has not
1145                     # changed locally and is safe to update.
1146                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1147                         # Replace contents of local file with new contents
1148                         local.replace_contents(final)
1149                     else:
1150                         # Overwrite path with new item; this can happen if
1151                         # path was a file and is now a collection or vice versa
1152                         self.copy(final, path, overwrite=True)
1153                 else:
1154                     # Local is missing (presumably deleted) or local doesn't
1155                     # match the "start" value, so save change to conflict file
1156                     self.copy(final, conflictpath)
1157             elif event_type == DEL:
1158                 if local == initial:
1159                     # Local item matches "initial" value, so it is safe to remove.
1160                     self.remove(path, recursive=True)
1161                 # else, the file is modified or already removed, in either
1162                 # case we don't want to try to remove it.
1163
1164     def portable_data_hash(self):
1165         """Get the portable data hash for this collection's manifest."""
1166         if self._manifest_locator and self.committed():
1167             # If the collection is already saved on the API server, and it's committed
1168             # then return API server's PDH response.
1169             return self._portable_data_hash
1170         else:
1171             stripped = self.portable_manifest_text().encode()
1172             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1173
1174     @synchronized
1175     def subscribe(self, callback):
1176         if self._callback is None:
1177             self._callback = callback
1178         else:
1179             raise errors.ArgumentError("A callback is already set on this collection.")
1180
1181     @synchronized
1182     def unsubscribe(self):
1183         if self._callback is not None:
1184             self._callback = None
1185
1186     @synchronized
1187     def notify(self, event, collection, name, item):
1188         if self._callback:
1189             self._callback(event, collection, name, item)
1190         self.root_collection().notify(event, collection, name, item)
1191
1192     @synchronized
1193     def __eq__(self, other):
1194         if other is self:
1195             return True
1196         if not isinstance(other, RichCollectionBase):
1197             return False
1198         if len(self._items) != len(other):
1199             return False
1200         for k in self._items:
1201             if k not in other:
1202                 return False
1203             if self._items[k] != other[k]:
1204                 return False
1205         return True
1206
1207     def __ne__(self, other):
1208         return not self.__eq__(other)
1209
1210     @synchronized
1211     def flush(self):
1212         """Flush bufferblocks to Keep."""
1213         for e in listvalues(self):
1214             e.flush()
1215
1216
1217 class Collection(RichCollectionBase):
1218     """Represents the root of an Arvados Collection.
1219
1220     This class is threadsafe.  The root collection object, all subcollections
1221     and files are protected by a single lock (i.e. each access locks the entire
1222     collection).
1223
1224     Brief summary of
1225     useful methods:
1226
1227     :To read an existing file:
1228       `c.open("myfile", "r")`
1229
1230     :To write a new file:
1231       `c.open("myfile", "w")`
1232
1233     :To determine if a file exists:
1234       `c.find("myfile") is not None`
1235
1236     :To copy a file:
1237       `c.copy("source", "dest")`
1238
1239     :To delete a file:
1240       `c.remove("myfile")`
1241
1242     :To save to an existing collection record:
1243       `c.save()`
1244
1245     :To save a new collection record:
1246     `c.save_new()`
1247
1248     :To merge remote changes into this object:
1249       `c.update()`
1250
1251     Must be associated with an API server Collection record (during
1252     initialization, or using `save_new`) to use `save` or `update`
1253
1254     """
1255
1256     def __init__(self, manifest_locator_or_text=None,
1257                  api_client=None,
1258                  keep_client=None,
1259                  num_retries=None,
1260                  parent=None,
1261                  apiconfig=None,
1262                  block_manager=None,
1263                  replication_desired=None,
1264                  storage_classes_desired=None,
1265                  put_threads=None,
1266                  get_threads=None):
1267         """Collection constructor.
1268
1269         :manifest_locator_or_text:
1270           An Arvados collection UUID, portable data hash, raw manifest
1271           text, or (if creating an empty collection) None.
1272
1273         :parent:
1274           the parent Collection, may be None.
1275
1276         :apiconfig:
1277           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1278           Prefer this over supplying your own api_client and keep_client (except in testing).
1279           Will use default config settings if not specified.
1280
1281         :api_client:
1282           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1283
1284         :keep_client:
1285           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1286
1287         :num_retries:
1288           the number of retries for API and Keep requests.
1289
1290         :block_manager:
1291           the block manager to use.  If not specified, create one.
1292
1293         :replication_desired:
1294           How many copies should Arvados maintain. If None, API server default
1295           configuration applies. If not None, this value will also be used
1296           for determining the number of block copies being written.
1297
1298         :storage_classes_desired:
1299           A list of storage class names where to upload the data. If None,
1300           the keep client is expected to store the data into the cluster's
1301           default storage class(es).
1302
1303         """
1304
1305         if storage_classes_desired and type(storage_classes_desired) is not list:
1306             raise errors.ArgumentError("storage_classes_desired must be list type.")
1307
1308         super(Collection, self).__init__(parent)
1309         self._api_client = api_client
1310         self._keep_client = keep_client
1311
1312         # Use the keep client from ThreadSafeApiCache
1313         if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1314             self._keep_client = self._api_client.keep
1315
1316         self._block_manager = block_manager
1317         self.replication_desired = replication_desired
1318         self._storage_classes_desired = storage_classes_desired
1319         self.put_threads = put_threads
1320         self.get_threads = get_threads
1321
1322         if apiconfig:
1323             self._config = apiconfig
1324         else:
1325             self._config = config.settings()
1326
1327         self.num_retries = num_retries if num_retries is not None else 0
1328         self._manifest_locator = None
1329         self._manifest_text = None
1330         self._portable_data_hash = None
1331         self._api_response = None
1332         self._past_versions = set()
1333
1334         self.lock = threading.RLock()
1335         self.events = None
1336
1337         if manifest_locator_or_text:
1338             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1339                 self._manifest_locator = manifest_locator_or_text
1340             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1341                 self._manifest_locator = manifest_locator_or_text
1342                 if not self._has_local_collection_uuid():
1343                     self._has_remote_blocks = True
1344             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1345                 self._manifest_text = manifest_locator_or_text
1346                 if '+R' in self._manifest_text:
1347                     self._has_remote_blocks = True
1348             else:
1349                 raise errors.ArgumentError(
1350                     "Argument to CollectionReader is not a manifest or a collection UUID")
1351
1352             try:
1353                 self._populate()
1354             except errors.SyntaxError as e:
1355                 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1356
1357     def storage_classes_desired(self):
1358         return self._storage_classes_desired or []
1359
1360     def root_collection(self):
1361         return self
1362
1363     def get_properties(self):
1364         if self._api_response and self._api_response["properties"]:
1365             return self._api_response["properties"]
1366         else:
1367             return {}
1368
1369     def get_trash_at(self):
1370         if self._api_response and self._api_response["trash_at"]:
1371             try:
1372                 return ciso8601.parse_datetime(self._api_response["trash_at"])
1373             except ValueError:
1374                 return None
1375         else:
1376             return None
1377
1378     def stream_name(self):
1379         return "."
1380
1381     def writable(self):
1382         return True
1383
1384     @synchronized
1385     def known_past_version(self, modified_at_and_portable_data_hash):
1386         return modified_at_and_portable_data_hash in self._past_versions
1387
1388     @synchronized
1389     @retry_method
1390     def update(self, other=None, num_retries=None):
1391         """Merge the latest collection on the API server with the current collection."""
1392
1393         if other is None:
1394             if self._manifest_locator is None:
1395                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1396             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1397             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1398                 response.get("portable_data_hash") != self.portable_data_hash()):
1399                 # The record on the server is different from our current one, but we've seen it before,
1400                 # so ignore it because it's already been merged.
1401                 # However, if it's the same as our current record, proceed with the update, because we want to update
1402                 # our tokens.
1403                 return
1404             else:
1405                 self._remember_api_response(response)
1406             other = CollectionReader(response["manifest_text"])
1407         baseline = CollectionReader(self._manifest_text)
1408         self.apply(baseline.diff(other))
1409         self._manifest_text = self.manifest_text()
1410
1411     @synchronized
1412     def _my_api(self):
1413         if self._api_client is None:
1414             self._api_client = ThreadSafeApiCache(self._config, version='v1')
1415             if self._keep_client is None:
1416                 self._keep_client = self._api_client.keep
1417         return self._api_client
1418
1419     @synchronized
1420     def _my_keep(self):
1421         if self._keep_client is None:
1422             if self._api_client is None:
1423                 self._my_api()
1424             else:
1425                 self._keep_client = KeepClient(api_client=self._api_client)
1426         return self._keep_client
1427
1428     @synchronized
1429     def _my_block_manager(self):
1430         if self._block_manager is None:
1431             copies = (self.replication_desired or
1432                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1433                                                    2))
1434             self._block_manager = _BlockManager(self._my_keep(),
1435                                                 copies=copies,
1436                                                 put_threads=self.put_threads,
1437                                                 num_retries=self.num_retries,
1438                                                 storage_classes_func=self.storage_classes_desired,
1439                                                 get_threads=self.get_threads,)
1440         return self._block_manager
1441
1442     def _remember_api_response(self, response):
1443         self._api_response = response
1444         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1445
1446     def _populate_from_api_server(self):
1447         # As in KeepClient itself, we must wait until the last
1448         # possible moment to instantiate an API client, in order to
1449         # avoid tripping up clients that don't have access to an API
1450         # server.  If we do build one, make sure our Keep client uses
1451         # it.  If instantiation fails, we'll fall back to the except
1452         # clause, just like any other Collection lookup
1453         # failure. Return an exception, or None if successful.
1454         self._remember_api_response(self._my_api().collections().get(
1455             uuid=self._manifest_locator).execute(
1456                 num_retries=self.num_retries))
1457         self._manifest_text = self._api_response['manifest_text']
1458         self._portable_data_hash = self._api_response['portable_data_hash']
1459         # If not overriden via kwargs, we should try to load the
1460         # replication_desired and storage_classes_desired from the API server
1461         if self.replication_desired is None:
1462             self.replication_desired = self._api_response.get('replication_desired', None)
1463         if self._storage_classes_desired is None:
1464             self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1465
1466     def _populate(self):
1467         if self._manifest_text is None:
1468             if self._manifest_locator is None:
1469                 return
1470             else:
1471                 self._populate_from_api_server()
1472         self._baseline_manifest = self._manifest_text
1473         self._import_manifest(self._manifest_text)
1474
1475     def _has_collection_uuid(self):
1476         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1477
1478     def _has_local_collection_uuid(self):
1479         return self._has_collection_uuid and \
1480             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1481
1482     def __enter__(self):
1483         return self
1484
1485     def __exit__(self, exc_type, exc_value, traceback):
1486         """Support scoped auto-commit in a with: block."""
1487         if exc_type is None:
1488             if self.writable() and self._has_collection_uuid():
1489                 self.save()
1490         self.stop_threads()
1491
1492     def stop_threads(self):
1493         if self._block_manager is not None:
1494             self._block_manager.stop_threads()
1495
1496     @synchronized
1497     def manifest_locator(self):
1498         """Get the manifest locator, if any.
1499
1500         The manifest locator will be set when the collection is loaded from an
1501         API server record or the portable data hash of a manifest.
1502
1503         The manifest locator will be None if the collection is newly created or
1504         was created directly from manifest text.  The method `save_new()` will
1505         assign a manifest locator.
1506
1507         """
1508         return self._manifest_locator
1509
1510     @synchronized
1511     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1512         if new_config is None:
1513             new_config = self._config
1514         if readonly:
1515             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1516         else:
1517             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1518
1519         newcollection._clonefrom(self)
1520         return newcollection
1521
1522     @synchronized
1523     def api_response(self):
1524         """Returns information about this Collection fetched from the API server.
1525
1526         If the Collection exists in Keep but not the API server, currently
1527         returns None.  Future versions may provide a synthetic response.
1528
1529         """
1530         return self._api_response
1531
1532     def find_or_create(self, path, create_type):
1533         """See `RichCollectionBase.find_or_create`"""
1534         if path == ".":
1535             return self
1536         else:
1537             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1538
1539     def find(self, path):
1540         """See `RichCollectionBase.find`"""
1541         if path == ".":
1542             return self
1543         else:
1544             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1545
1546     def remove(self, path, recursive=False):
1547         """See `RichCollectionBase.remove`"""
1548         if path == ".":
1549             raise errors.ArgumentError("Cannot remove '.'")
1550         else:
1551             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1552
1553     @must_be_writable
1554     @synchronized
1555     @retry_method
1556     def save(self,
1557              properties=None,
1558              storage_classes=None,
1559              trash_at=None,
1560              merge=True,
1561              num_retries=None,
1562              preserve_version=False):
1563         """Save collection to an existing collection record.
1564
1565         Commit pending buffer blocks to Keep, merge with remote record (if
1566         merge=True, the default), and update the collection record. Returns
1567         the current manifest text.
1568
1569         Will raise AssertionError if not associated with a collection record on
1570         the API server.  If you want to save a manifest to Keep only, see
1571         `save_new()`.
1572
1573         :properties:
1574           Additional properties of collection. This value will replace any existing
1575           properties of collection.
1576
1577         :storage_classes:
1578           Specify desirable storage classes to be used when writing data to Keep.
1579
1580         :trash_at:
1581           A collection is *expiring* when it has a *trash_at* time in the future.
1582           An expiring collection can be accessed as normal,
1583           but is scheduled to be trashed automatically at the *trash_at* time.
1584
1585         :merge:
1586           Update and merge remote changes before saving.  Otherwise, any
1587           remote changes will be ignored and overwritten.
1588
1589         :num_retries:
1590           Retry count on API calls (if None,  use the collection default)
1591
1592         :preserve_version:
1593           If True, indicate that the collection content being saved right now
1594           should be preserved in a version snapshot if the collection record is
1595           updated in the future. Requires that the API server has
1596           Collections.CollectionVersioning enabled, if not, setting this will
1597           raise an exception.
1598
1599         """
1600         if properties and type(properties) is not dict:
1601             raise errors.ArgumentError("properties must be dictionary type.")
1602
1603         if storage_classes and type(storage_classes) is not list:
1604             raise errors.ArgumentError("storage_classes must be list type.")
1605         if storage_classes:
1606             self._storage_classes_desired = storage_classes
1607
1608         if trash_at and type(trash_at) is not datetime.datetime:
1609             raise errors.ArgumentError("trash_at must be datetime type.")
1610
1611         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1612             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1613
1614         body={}
1615         if properties:
1616             body["properties"] = properties
1617         if self.storage_classes_desired():
1618             body["storage_classes_desired"] = self.storage_classes_desired()
1619         if trash_at:
1620             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1621             body["trash_at"] = t
1622         if preserve_version:
1623             body["preserve_version"] = preserve_version
1624
1625         if not self.committed():
1626             if self._has_remote_blocks:
1627                 # Copy any remote blocks to the local cluster.
1628                 self._copy_remote_blocks(remote_blocks={})
1629                 self._has_remote_blocks = False
1630             if not self._has_collection_uuid():
1631                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1632             elif not self._has_local_collection_uuid():
1633                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1634
1635             self._my_block_manager().commit_all()
1636
1637             if merge:
1638                 self.update()
1639
1640             text = self.manifest_text(strip=False)
1641             body['manifest_text'] = text
1642
1643             self._remember_api_response(self._my_api().collections().update(
1644                 uuid=self._manifest_locator,
1645                 body=body
1646                 ).execute(num_retries=num_retries))
1647             self._manifest_text = self._api_response["manifest_text"]
1648             self._portable_data_hash = self._api_response["portable_data_hash"]
1649             self.set_committed(True)
1650         elif body:
1651             self._remember_api_response(self._my_api().collections().update(
1652                 uuid=self._manifest_locator,
1653                 body=body
1654                 ).execute(num_retries=num_retries))
1655
1656         return self._manifest_text
1657
1658
1659     @must_be_writable
1660     @synchronized
1661     @retry_method
1662     def save_new(self, name=None,
1663                  create_collection_record=True,
1664                  owner_uuid=None,
1665                  properties=None,
1666                  storage_classes=None,
1667                  trash_at=None,
1668                  ensure_unique_name=False,
1669                  num_retries=None,
1670                  preserve_version=False):
1671         """Save collection to a new collection record.
1672
1673         Commit pending buffer blocks to Keep and, when create_collection_record
1674         is True (default), create a new collection record.  After creating a
1675         new collection record, this Collection object will be associated with
1676         the new record used by `save()`. Returns the current manifest text.
1677
1678         :name:
1679           The collection name.
1680
1681         :create_collection_record:
1682            If True, create a collection record on the API server.
1683            If False, only commit blocks to Keep and return the manifest text.
1684
1685         :owner_uuid:
1686           the user, or project uuid that will own this collection.
1687           If None, defaults to the current user.
1688
1689         :properties:
1690           Additional properties of collection. This value will replace any existing
1691           properties of collection.
1692
1693         :storage_classes:
1694           Specify desirable storage classes to be used when writing data to Keep.
1695
1696         :trash_at:
1697           A collection is *expiring* when it has a *trash_at* time in the future.
1698           An expiring collection can be accessed as normal,
1699           but is scheduled to be trashed automatically at the *trash_at* time.
1700
1701         :ensure_unique_name:
1702           If True, ask the API server to rename the collection
1703           if it conflicts with a collection with the same name and owner.  If
1704           False, a name conflict will result in an error.
1705
1706         :num_retries:
1707           Retry count on API calls (if None,  use the collection default)
1708
1709         :preserve_version:
1710           If True, indicate that the collection content being saved right now
1711           should be preserved in a version snapshot if the collection record is
1712           updated in the future. Requires that the API server has
1713           Collections.CollectionVersioning enabled, if not, setting this will
1714           raise an exception.
1715
1716         """
1717         if properties and type(properties) is not dict:
1718             raise errors.ArgumentError("properties must be dictionary type.")
1719
1720         if storage_classes and type(storage_classes) is not list:
1721             raise errors.ArgumentError("storage_classes must be list type.")
1722
1723         if trash_at and type(trash_at) is not datetime.datetime:
1724             raise errors.ArgumentError("trash_at must be datetime type.")
1725
1726         if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1727             raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1728
1729         if self._has_remote_blocks:
1730             # Copy any remote blocks to the local cluster.
1731             self._copy_remote_blocks(remote_blocks={})
1732             self._has_remote_blocks = False
1733
1734         if storage_classes:
1735             self._storage_classes_desired = storage_classes
1736
1737         self._my_block_manager().commit_all()
1738         text = self.manifest_text(strip=False)
1739
1740         if create_collection_record:
1741             if name is None:
1742                 name = "New collection"
1743                 ensure_unique_name = True
1744
1745             body = {"manifest_text": text,
1746                     "name": name,
1747                     "replication_desired": self.replication_desired}
1748             if owner_uuid:
1749                 body["owner_uuid"] = owner_uuid
1750             if properties:
1751                 body["properties"] = properties
1752             if self.storage_classes_desired():
1753                 body["storage_classes_desired"] = self.storage_classes_desired()
1754             if trash_at:
1755                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1756                 body["trash_at"] = t
1757             if preserve_version:
1758                 body["preserve_version"] = preserve_version
1759
1760             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1761             text = self._api_response["manifest_text"]
1762
1763             self._manifest_locator = self._api_response["uuid"]
1764             self._portable_data_hash = self._api_response["portable_data_hash"]
1765
1766             self._manifest_text = text
1767             self.set_committed(True)
1768
1769         return text
1770
1771     _token_re = re.compile(r'(\S+)(\s+|$)')
1772     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1773     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1774
1775     def _unescape_manifest_path(self, path):
1776         return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1777
1778     @synchronized
1779     def _import_manifest(self, manifest_text):
1780         """Import a manifest into a `Collection`.
1781
1782         :manifest_text:
1783           The manifest text to import from.
1784
1785         """
1786         if len(self) > 0:
1787             raise ArgumentError("Can only import manifest into an empty collection")
1788
1789         STREAM_NAME = 0
1790         BLOCKS = 1
1791         SEGMENTS = 2
1792
1793         stream_name = None
1794         state = STREAM_NAME
1795
1796         for token_and_separator in self._token_re.finditer(manifest_text):
1797             tok = token_and_separator.group(1)
1798             sep = token_and_separator.group(2)
1799
1800             if state == STREAM_NAME:
1801                 # starting a new stream
1802                 stream_name = self._unescape_manifest_path(tok)
1803                 blocks = []
1804                 segments = []
1805                 streamoffset = 0
1806                 state = BLOCKS
1807                 self.find_or_create(stream_name, COLLECTION)
1808                 continue
1809
1810             if state == BLOCKS:
1811                 block_locator = self._block_re.match(tok)
1812                 if block_locator:
1813                     blocksize = int(block_locator.group(1))
1814                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1815                     streamoffset += blocksize
1816                 else:
1817                     state = SEGMENTS
1818
1819             if state == SEGMENTS:
1820                 file_segment = self._segment_re.match(tok)
1821                 if file_segment:
1822                     pos = int(file_segment.group(1))
1823                     size = int(file_segment.group(2))
1824                     name = self._unescape_manifest_path(file_segment.group(3))
1825                     if name.split('/')[-1] == '.':
1826                         # placeholder for persisting an empty directory, not a real file
1827                         if len(name) > 2:
1828                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1829                     else:
1830                         filepath = os.path.join(stream_name, name)
1831                         try:
1832                             afile = self.find_or_create(filepath, FILE)
1833                         except IOError as e:
1834                             if e.errno == errno.ENOTDIR:
1835                                 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1836                             else:
1837                                 raise e from None
1838                         if isinstance(afile, ArvadosFile):
1839                             afile.add_segment(blocks, pos, size)
1840                         else:
1841                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1842                 else:
1843                     # error!
1844                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1845
1846             if sep == "\n":
1847                 stream_name = None
1848                 state = STREAM_NAME
1849
1850         self.set_committed(True)
1851
1852     @synchronized
1853     def notify(self, event, collection, name, item):
1854         if self._callback:
1855             self._callback(event, collection, name, item)
1856
1857
1858 class Subcollection(RichCollectionBase):
1859     """This is a subdirectory within a collection that doesn't have its own API
1860     server record.
1861
1862     Subcollection locking falls under the umbrella lock of its root collection.
1863
1864     """
1865
1866     def __init__(self, parent, name):
1867         super(Subcollection, self).__init__(parent)
1868         self.lock = self.root_collection().lock
1869         self._manifest_text = None
1870         self.name = name
1871         self.num_retries = parent.num_retries
1872
1873     def root_collection(self):
1874         return self.parent.root_collection()
1875
1876     def writable(self):
1877         return self.root_collection().writable()
1878
1879     def _my_api(self):
1880         return self.root_collection()._my_api()
1881
1882     def _my_keep(self):
1883         return self.root_collection()._my_keep()
1884
1885     def _my_block_manager(self):
1886         return self.root_collection()._my_block_manager()
1887
1888     def stream_name(self):
1889         return os.path.join(self.parent.stream_name(), self.name)
1890
1891     @synchronized
1892     def clone(self, new_parent, new_name):
1893         c = Subcollection(new_parent, new_name)
1894         c._clonefrom(self)
1895         return c
1896
1897     @must_be_writable
1898     @synchronized
1899     def _reparent(self, newparent, newname):
1900         self.set_committed(False)
1901         self.flush()
1902         self.parent.remove(self.name, recursive=True)
1903         self.parent = newparent
1904         self.name = newname
1905         self.lock = self.parent.root_collection().lock
1906
1907     @synchronized
1908     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1909         """Encode empty directories by using an \056-named (".") empty file"""
1910         if len(self._items) == 0:
1911             return "%s %s 0:0:\\056\n" % (
1912                 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1913         return super(Subcollection, self)._get_manifest_text(stream_name,
1914                                                              strip, normalize,
1915                                                              only_committed)
1916
1917
1918 class CollectionReader(Collection):
1919     """A read-only collection object.
1920
1921     Initialize from a collection UUID or portable data hash, or raw
1922     manifest text.  See `Collection` constructor for detailed options.
1923
1924     """
1925     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1926         self._in_init = True
1927         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1928         self._in_init = False
1929
1930         # Forego any locking since it should never change once initialized.
1931         self.lock = NoopLock()
1932
1933         # Backwards compatability with old CollectionReader
1934         # all_streams() and all_files()
1935         self._streams = None
1936
1937     def writable(self):
1938         return self._in_init
1939
1940     def _populate_streams(orig_func):
1941         @functools.wraps(orig_func)
1942         def populate_streams_wrapper(self, *args, **kwargs):
1943             # Defer populating self._streams until needed since it creates a copy of the manifest.
1944             if self._streams is None:
1945                 if self._manifest_text:
1946                     self._streams = [sline.split()
1947                                      for sline in self._manifest_text.split("\n")
1948                                      if sline]
1949                 else:
1950                     self._streams = []
1951             return orig_func(self, *args, **kwargs)
1952         return populate_streams_wrapper
1953
1954     @_populate_streams
1955     def normalize(self):
1956         """Normalize the streams returned by `all_streams`.
1957
1958         This method is kept for backwards compatability and only affects the
1959         behavior of `all_streams()` and `all_files()`
1960
1961         """
1962
1963         # Rearrange streams
1964         streams = {}
1965         for s in self.all_streams():
1966             for f in s.all_files():
1967                 streamname, filename = split(s.name() + "/" + f.name())
1968                 if streamname not in streams:
1969                     streams[streamname] = {}
1970                 if filename not in streams[streamname]:
1971                     streams[streamname][filename] = []
1972                 for r in f.segments:
1973                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1974
1975         self._streams = [normalize_stream(s, streams[s])
1976                          for s in sorted(streams)]
1977     @_populate_streams
1978     def all_streams(self):
1979         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1980                 for s in self._streams]
1981
1982     @_populate_streams
1983     def all_files(self):
1984         for s in self.all_streams():
1985             for f in s.all_files():
1986                 yield f