9945: Merge branch 'master' into 9945-make-python-package-dependency-free
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import ciso8601
11 import datetime
12 import errno
13 import functools
14 import hashlib
15 import io
16 import logging
17 import os
18 import re
19 import sys
20 import threading
21 import time
22
23 from collections import deque
24 from stat import *
25
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
34 import arvados.util
35 import arvados.events as events
36 from arvados.retry import retry_method
37
38 _logger = logging.getLogger('arvados.collection')
39
40
41 if sys.version_info >= (3, 0):
42     TextIOWrapper = io.TextIOWrapper
43 else:
44     class TextIOWrapper(io.TextIOWrapper):
45         """To maintain backward compatibility, cast str to unicode in
46         write('foo').
47
48         """
49         def write(self, data):
50             if isinstance(data, basestring):
51                 data = unicode(data)
52             return super(TextIOWrapper, self).write(data)
53
54
55 class CollectionBase(object):
56     """Abstract base class for Collection classes."""
57
58     def __enter__(self):
59         return self
60
61     def __exit__(self, exc_type, exc_value, traceback):
62         pass
63
64     def _my_keep(self):
65         if self._keep_client is None:
66             self._keep_client = KeepClient(api_client=self._api_client,
67                                            num_retries=self.num_retries)
68         return self._keep_client
69
70     def stripped_manifest(self):
71         """Get the manifest with locator hints stripped.
72
73         Return the manifest for the current collection with all
74         non-portable hints (i.e., permission signatures and other
75         hints other than size hints) removed from the locators.
76         """
77         raw = self.manifest_text()
78         clean = []
79         for line in raw.split("\n"):
80             fields = line.split()
81             if fields:
82                 clean_fields = fields[:1] + [
83                     (re.sub(r'\+[^\d][^\+]*', '', x)
84                      if re.match(arvados.util.keep_locator_pattern, x)
85                      else x)
86                     for x in fields[1:]]
87                 clean += [' '.join(clean_fields), "\n"]
88         return ''.join(clean)
89
90
91 class _WriterFile(_FileLikeObjectBase):
92     def __init__(self, coll_writer, name):
93         super(_WriterFile, self).__init__(name, 'wb')
94         self.dest = coll_writer
95
96     def close(self):
97         super(_WriterFile, self).close()
98         self.dest.finish_current_file()
99
100     @_FileLikeObjectBase._before_close
101     def write(self, data):
102         self.dest.write(data)
103
104     @_FileLikeObjectBase._before_close
105     def writelines(self, seq):
106         for data in seq:
107             self.write(data)
108
109     @_FileLikeObjectBase._before_close
110     def flush(self):
111         self.dest.flush_data()
112
113
114 class CollectionWriter(CollectionBase):
115     """Deprecated, use Collection instead."""
116
117     def __init__(self, api_client=None, num_retries=0, replication=None):
118         """Instantiate a CollectionWriter.
119
120         CollectionWriter lets you build a new Arvados Collection from scratch.
121         Write files to it.  The CollectionWriter will upload data to Keep as
122         appropriate, and provide you with the Collection manifest text when
123         you're finished.
124
125         Arguments:
126         * api_client: The API client to use to look up Collections.  If not
127           provided, CollectionReader will build one from available Arvados
128           configuration.
129         * num_retries: The default number of times to retry failed
130           service requests.  Default 0.  You may change this value
131           after instantiation, but note those changes may not
132           propagate to related objects like the Keep client.
133         * replication: The number of copies of each block to store.
134           If this argument is None or not supplied, replication is
135           the server-provided default if available, otherwise 2.
136         """
137         self._api_client = api_client
138         self.num_retries = num_retries
139         self.replication = (2 if replication is None else replication)
140         self._keep_client = None
141         self._data_buffer = []
142         self._data_buffer_len = 0
143         self._current_stream_files = []
144         self._current_stream_length = 0
145         self._current_stream_locators = []
146         self._current_stream_name = '.'
147         self._current_file_name = None
148         self._current_file_pos = 0
149         self._finished_streams = []
150         self._close_file = None
151         self._queued_file = None
152         self._queued_dirents = deque()
153         self._queued_trees = deque()
154         self._last_open = None
155
156     def __exit__(self, exc_type, exc_value, traceback):
157         if exc_type is None:
158             self.finish()
159
160     def do_queued_work(self):
161         # The work queue consists of three pieces:
162         # * _queued_file: The file object we're currently writing to the
163         #   Collection.
164         # * _queued_dirents: Entries under the current directory
165         #   (_queued_trees[0]) that we want to write or recurse through.
166         #   This may contain files from subdirectories if
167         #   max_manifest_depth == 0 for this directory.
168         # * _queued_trees: Directories that should be written as separate
169         #   streams to the Collection.
170         # This function handles the smallest piece of work currently queued
171         # (current file, then current directory, then next directory) until
172         # no work remains.  The _work_THING methods each do a unit of work on
173         # THING.  _queue_THING methods add a THING to the work queue.
174         while True:
175             if self._queued_file:
176                 self._work_file()
177             elif self._queued_dirents:
178                 self._work_dirents()
179             elif self._queued_trees:
180                 self._work_trees()
181             else:
182                 break
183
184     def _work_file(self):
185         while True:
186             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
187             if not buf:
188                 break
189             self.write(buf)
190         self.finish_current_file()
191         if self._close_file:
192             self._queued_file.close()
193         self._close_file = None
194         self._queued_file = None
195
196     def _work_dirents(self):
197         path, stream_name, max_manifest_depth = self._queued_trees[0]
198         if stream_name != self.current_stream_name():
199             self.start_new_stream(stream_name)
200         while self._queued_dirents:
201             dirent = self._queued_dirents.popleft()
202             target = os.path.join(path, dirent)
203             if os.path.isdir(target):
204                 self._queue_tree(target,
205                                  os.path.join(stream_name, dirent),
206                                  max_manifest_depth - 1)
207             else:
208                 self._queue_file(target, dirent)
209                 break
210         if not self._queued_dirents:
211             self._queued_trees.popleft()
212
213     def _work_trees(self):
214         path, stream_name, max_manifest_depth = self._queued_trees[0]
215         d = arvados.util.listdir_recursive(
216             path, max_depth = (None if max_manifest_depth == 0 else 0))
217         if d:
218             self._queue_dirents(stream_name, d)
219         else:
220             self._queued_trees.popleft()
221
222     def _queue_file(self, source, filename=None):
223         assert (self._queued_file is None), "tried to queue more than one file"
224         if not hasattr(source, 'read'):
225             source = open(source, 'rb')
226             self._close_file = True
227         else:
228             self._close_file = False
229         if filename is None:
230             filename = os.path.basename(source.name)
231         self.start_new_file(filename)
232         self._queued_file = source
233
234     def _queue_dirents(self, stream_name, dirents):
235         assert (not self._queued_dirents), "tried to queue more than one tree"
236         self._queued_dirents = deque(sorted(dirents))
237
238     def _queue_tree(self, path, stream_name, max_manifest_depth):
239         self._queued_trees.append((path, stream_name, max_manifest_depth))
240
241     def write_file(self, source, filename=None):
242         self._queue_file(source, filename)
243         self.do_queued_work()
244
245     def write_directory_tree(self,
246                              path, stream_name='.', max_manifest_depth=-1):
247         self._queue_tree(path, stream_name, max_manifest_depth)
248         self.do_queued_work()
249
250     def write(self, newdata):
251         if isinstance(newdata, bytes):
252             pass
253         elif isinstance(newdata, str):
254             newdata = newdata.encode()
255         elif hasattr(newdata, '__iter__'):
256             for s in newdata:
257                 self.write(s)
258             return
259         self._data_buffer.append(newdata)
260         self._data_buffer_len += len(newdata)
261         self._current_stream_length += len(newdata)
262         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
263             self.flush_data()
264
265     def open(self, streampath, filename=None):
266         """open(streampath[, filename]) -> file-like object
267
268         Pass in the path of a file to write to the Collection, either as a
269         single string or as two separate stream name and file name arguments.
270         This method returns a file-like object you can write to add it to the
271         Collection.
272
273         You may only have one file object from the Collection open at a time,
274         so be sure to close the object when you're done.  Using the object in
275         a with statement makes that easy::
276
277           with cwriter.open('./doc/page1.txt') as outfile:
278               outfile.write(page1_data)
279           with cwriter.open('./doc/page2.txt') as outfile:
280               outfile.write(page2_data)
281         """
282         if filename is None:
283             streampath, filename = split(streampath)
284         if self._last_open and not self._last_open.closed:
285             raise errors.AssertionError(
286                 "can't open '{}' when '{}' is still open".format(
287                     filename, self._last_open.name))
288         if streampath != self.current_stream_name():
289             self.start_new_stream(streampath)
290         self.set_current_file_name(filename)
291         self._last_open = _WriterFile(self, filename)
292         return self._last_open
293
294     def flush_data(self):
295         data_buffer = b''.join(self._data_buffer)
296         if data_buffer:
297             self._current_stream_locators.append(
298                 self._my_keep().put(
299                     data_buffer[0:config.KEEP_BLOCK_SIZE],
300                     copies=self.replication))
301             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
302             self._data_buffer_len = len(self._data_buffer[0])
303
304     def start_new_file(self, newfilename=None):
305         self.finish_current_file()
306         self.set_current_file_name(newfilename)
307
308     def set_current_file_name(self, newfilename):
309         if re.search(r'[\t\n]', newfilename):
310             raise errors.AssertionError(
311                 "Manifest filenames cannot contain whitespace: %s" %
312                 newfilename)
313         elif re.search(r'\x00', newfilename):
314             raise errors.AssertionError(
315                 "Manifest filenames cannot contain NUL characters: %s" %
316                 newfilename)
317         self._current_file_name = newfilename
318
319     def current_file_name(self):
320         return self._current_file_name
321
322     def finish_current_file(self):
323         if self._current_file_name is None:
324             if self._current_file_pos == self._current_stream_length:
325                 return
326             raise errors.AssertionError(
327                 "Cannot finish an unnamed file " +
328                 "(%d bytes at offset %d in '%s' stream)" %
329                 (self._current_stream_length - self._current_file_pos,
330                  self._current_file_pos,
331                  self._current_stream_name))
332         self._current_stream_files.append([
333                 self._current_file_pos,
334                 self._current_stream_length - self._current_file_pos,
335                 self._current_file_name])
336         self._current_file_pos = self._current_stream_length
337         self._current_file_name = None
338
339     def start_new_stream(self, newstreamname='.'):
340         self.finish_current_stream()
341         self.set_current_stream_name(newstreamname)
342
343     def set_current_stream_name(self, newstreamname):
344         if re.search(r'[\t\n]', newstreamname):
345             raise errors.AssertionError(
346                 "Manifest stream names cannot contain whitespace: '%s'" %
347                 (newstreamname))
348         self._current_stream_name = '.' if newstreamname=='' else newstreamname
349
350     def current_stream_name(self):
351         return self._current_stream_name
352
353     def finish_current_stream(self):
354         self.finish_current_file()
355         self.flush_data()
356         if not self._current_stream_files:
357             pass
358         elif self._current_stream_name is None:
359             raise errors.AssertionError(
360                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
361                 (self._current_stream_length, len(self._current_stream_files)))
362         else:
363             if not self._current_stream_locators:
364                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
365             self._finished_streams.append([self._current_stream_name,
366                                            self._current_stream_locators,
367                                            self._current_stream_files])
368         self._current_stream_files = []
369         self._current_stream_length = 0
370         self._current_stream_locators = []
371         self._current_stream_name = None
372         self._current_file_pos = 0
373         self._current_file_name = None
374
375     def finish(self):
376         """Store the manifest in Keep and return its locator.
377
378         This is useful for storing manifest fragments (task outputs)
379         temporarily in Keep during a Crunch job.
380
381         In other cases you should make a collection instead, by
382         sending manifest_text() to the API server's "create
383         collection" endpoint.
384         """
385         return self._my_keep().put(self.manifest_text().encode(),
386                                    copies=self.replication)
387
388     def portable_data_hash(self):
389         stripped = self.stripped_manifest().encode()
390         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
391
392     def manifest_text(self):
393         self.finish_current_stream()
394         manifest = ''
395
396         for stream in self._finished_streams:
397             if not re.search(r'^\.(/.*)?$', stream[0]):
398                 manifest += './'
399             manifest += stream[0].replace(' ', '\\040')
400             manifest += ' ' + ' '.join(stream[1])
401             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
402             manifest += "\n"
403
404         return manifest
405
406     def data_locators(self):
407         ret = []
408         for name, locators, files in self._finished_streams:
409             ret += locators
410         return ret
411
412     def save_new(self, name=None):
413         return self._api_client.collections().create(
414             ensure_unique_name=True,
415             body={
416                 'name': name,
417                 'manifest_text': self.manifest_text(),
418             }).execute(num_retries=self.num_retries)
419
420
421 class ResumableCollectionWriter(CollectionWriter):
422     """Deprecated, use Collection instead."""
423
424     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
425                    '_current_stream_locators', '_current_stream_name',
426                    '_current_file_name', '_current_file_pos', '_close_file',
427                    '_data_buffer', '_dependencies', '_finished_streams',
428                    '_queued_dirents', '_queued_trees']
429
430     def __init__(self, api_client=None, **kwargs):
431         self._dependencies = {}
432         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
433
434     @classmethod
435     def from_state(cls, state, *init_args, **init_kwargs):
436         # Try to build a new writer from scratch with the given state.
437         # If the state is not suitable to resume (because files have changed,
438         # been deleted, aren't predictable, etc.), raise a
439         # StaleWriterStateError.  Otherwise, return the initialized writer.
440         # The caller is responsible for calling writer.do_queued_work()
441         # appropriately after it's returned.
442         writer = cls(*init_args, **init_kwargs)
443         for attr_name in cls.STATE_PROPS:
444             attr_value = state[attr_name]
445             attr_class = getattr(writer, attr_name).__class__
446             # Coerce the value into the same type as the initial value, if
447             # needed.
448             if attr_class not in (type(None), attr_value.__class__):
449                 attr_value = attr_class(attr_value)
450             setattr(writer, attr_name, attr_value)
451         # Check dependencies before we try to resume anything.
452         if any(KeepLocator(ls).permission_expired()
453                for ls in writer._current_stream_locators):
454             raise errors.StaleWriterStateError(
455                 "locators include expired permission hint")
456         writer.check_dependencies()
457         if state['_current_file'] is not None:
458             path, pos = state['_current_file']
459             try:
460                 writer._queued_file = open(path, 'rb')
461                 writer._queued_file.seek(pos)
462             except IOError as error:
463                 raise errors.StaleWriterStateError(
464                     "failed to reopen active file {}: {}".format(path, error))
465         return writer
466
467     def check_dependencies(self):
468         for path, orig_stat in listitems(self._dependencies):
469             if not S_ISREG(orig_stat[ST_MODE]):
470                 raise errors.StaleWriterStateError("{} not file".format(path))
471             try:
472                 now_stat = tuple(os.stat(path))
473             except OSError as error:
474                 raise errors.StaleWriterStateError(
475                     "failed to stat {}: {}".format(path, error))
476             if ((not S_ISREG(now_stat[ST_MODE])) or
477                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
478                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
479                 raise errors.StaleWriterStateError("{} changed".format(path))
480
481     def dump_state(self, copy_func=lambda x: x):
482         state = {attr: copy_func(getattr(self, attr))
483                  for attr in self.STATE_PROPS}
484         if self._queued_file is None:
485             state['_current_file'] = None
486         else:
487             state['_current_file'] = (os.path.realpath(self._queued_file.name),
488                                       self._queued_file.tell())
489         return state
490
491     def _queue_file(self, source, filename=None):
492         try:
493             src_path = os.path.realpath(source)
494         except Exception:
495             raise errors.AssertionError("{} not a file path".format(source))
496         try:
497             path_stat = os.stat(src_path)
498         except OSError as stat_error:
499             path_stat = None
500         super(ResumableCollectionWriter, self)._queue_file(source, filename)
501         fd_stat = os.fstat(self._queued_file.fileno())
502         if not S_ISREG(fd_stat.st_mode):
503             # We won't be able to resume from this cache anyway, so don't
504             # worry about further checks.
505             self._dependencies[source] = tuple(fd_stat)
506         elif path_stat is None:
507             raise errors.AssertionError(
508                 "could not stat {}: {}".format(source, stat_error))
509         elif path_stat.st_ino != fd_stat.st_ino:
510             raise errors.AssertionError(
511                 "{} changed between open and stat calls".format(source))
512         else:
513             self._dependencies[src_path] = tuple(fd_stat)
514
515     def write(self, data):
516         if self._queued_file is None:
517             raise errors.AssertionError(
518                 "resumable writer can't accept unsourced data")
519         return super(ResumableCollectionWriter, self).write(data)
520
521
522 ADD = "add"
523 DEL = "del"
524 MOD = "mod"
525 TOK = "tok"
526 FILE = "file"
527 COLLECTION = "collection"
528
529 class RichCollectionBase(CollectionBase):
530     """Base class for Collections and Subcollections.
531
532     Implements the majority of functionality relating to accessing items in the
533     Collection.
534
535     """
536
537     def __init__(self, parent=None):
538         self.parent = parent
539         self._committed = False
540         self._has_remote_blocks = False
541         self._callback = None
542         self._items = {}
543
544     def _my_api(self):
545         raise NotImplementedError()
546
547     def _my_keep(self):
548         raise NotImplementedError()
549
550     def _my_block_manager(self):
551         raise NotImplementedError()
552
553     def writable(self):
554         raise NotImplementedError()
555
556     def root_collection(self):
557         raise NotImplementedError()
558
559     def notify(self, event, collection, name, item):
560         raise NotImplementedError()
561
562     def stream_name(self):
563         raise NotImplementedError()
564
565     @synchronized
566     def has_remote_blocks(self):
567         """Recursively check for a +R segment locator signature."""
568
569         if self._has_remote_blocks:
570             return True
571         for item in self:
572             if self[item].has_remote_blocks():
573                 return True
574         return False
575
576     @synchronized
577     def set_has_remote_blocks(self, val):
578         self._has_remote_blocks = val
579         if self.parent:
580             self.parent.set_has_remote_blocks(val)
581
582     @must_be_writable
583     @synchronized
584     def find_or_create(self, path, create_type):
585         """Recursively search the specified file path.
586
587         May return either a `Collection` or `ArvadosFile`.  If not found, will
588         create a new item at the specified path based on `create_type`.  Will
589         create intermediate subcollections needed to contain the final item in
590         the path.
591
592         :create_type:
593           One of `arvados.collection.FILE` or
594           `arvados.collection.COLLECTION`.  If the path is not found, and value
595           of create_type is FILE then create and return a new ArvadosFile for
596           the last path component.  If COLLECTION, then create and return a new
597           Collection for the last path component.
598
599         """
600
601         pathcomponents = path.split("/", 1)
602         if pathcomponents[0]:
603             item = self._items.get(pathcomponents[0])
604             if len(pathcomponents) == 1:
605                 if item is None:
606                     # create new file
607                     if create_type == COLLECTION:
608                         item = Subcollection(self, pathcomponents[0])
609                     else:
610                         item = ArvadosFile(self, pathcomponents[0])
611                     self._items[pathcomponents[0]] = item
612                     self.set_committed(False)
613                     self.notify(ADD, self, pathcomponents[0], item)
614                 return item
615             else:
616                 if item is None:
617                     # create new collection
618                     item = Subcollection(self, pathcomponents[0])
619                     self._items[pathcomponents[0]] = item
620                     self.set_committed(False)
621                     self.notify(ADD, self, pathcomponents[0], item)
622                 if isinstance(item, RichCollectionBase):
623                     return item.find_or_create(pathcomponents[1], create_type)
624                 else:
625                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
626         else:
627             return self
628
629     @synchronized
630     def find(self, path):
631         """Recursively search the specified file path.
632
633         May return either a Collection or ArvadosFile. Return None if not
634         found.
635         If path is invalid (ex: starts with '/'), an IOError exception will be
636         raised.
637
638         """
639         if not path:
640             raise errors.ArgumentError("Parameter 'path' is empty.")
641
642         pathcomponents = path.split("/", 1)
643         if pathcomponents[0] == '':
644             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
645
646         item = self._items.get(pathcomponents[0])
647         if item is None:
648             return None
649         elif len(pathcomponents) == 1:
650             return item
651         else:
652             if isinstance(item, RichCollectionBase):
653                 if pathcomponents[1]:
654                     return item.find(pathcomponents[1])
655                 else:
656                     return item
657             else:
658                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
659
660     @synchronized
661     def mkdirs(self, path):
662         """Recursive subcollection create.
663
664         Like `os.makedirs()`.  Will create intermediate subcollections needed
665         to contain the leaf subcollection path.
666
667         """
668
669         if self.find(path) != None:
670             raise IOError(errno.EEXIST, "Directory or file exists", path)
671
672         return self.find_or_create(path, COLLECTION)
673
674     def open(self, path, mode="r", encoding=None):
675         """Open a file-like object for access.
676
677         :path:
678           path to a file in the collection
679         :mode:
680           a string consisting of "r", "w", or "a", optionally followed
681           by "b" or "t", optionally followed by "+".
682           :"b":
683             binary mode: write() accepts bytes, read() returns bytes.
684           :"t":
685             text mode (default): write() accepts strings, read() returns strings.
686           :"r":
687             opens for reading
688           :"r+":
689             opens for reading and writing.  Reads/writes share a file pointer.
690           :"w", "w+":
691             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
692           :"a", "a+":
693             opens for reading and writing.  All writes are appended to
694             the end of the file.  Writing does not affect the file pointer for
695             reading.
696
697         """
698
699         if not re.search(r'^[rwa][bt]?\+?$', mode):
700             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
701
702         if mode[0] == 'r' and '+' not in mode:
703             fclass = ArvadosFileReader
704             arvfile = self.find(path)
705         elif not self.writable():
706             raise IOError(errno.EROFS, "Collection is read only")
707         else:
708             fclass = ArvadosFileWriter
709             arvfile = self.find_or_create(path, FILE)
710
711         if arvfile is None:
712             raise IOError(errno.ENOENT, "File not found", path)
713         if not isinstance(arvfile, ArvadosFile):
714             raise IOError(errno.EISDIR, "Is a directory", path)
715
716         if mode[0] == 'w':
717             arvfile.truncate(0)
718
719         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
720         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
721         if 'b' not in mode:
722             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
723             f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
724         return f
725
726     def modified(self):
727         """Determine if the collection has been modified since last commited."""
728         return not self.committed()
729
730     @synchronized
731     def committed(self):
732         """Determine if the collection has been committed to the API server."""
733         return self._committed
734
735     @synchronized
736     def set_committed(self, value=True):
737         """Recursively set committed flag.
738
739         If value is True, set committed to be True for this and all children.
740
741         If value is False, set committed to be False for this and all parents.
742         """
743         if value == self._committed:
744             return
745         if value:
746             for k,v in listitems(self._items):
747                 v.set_committed(True)
748             self._committed = True
749         else:
750             self._committed = False
751             if self.parent is not None:
752                 self.parent.set_committed(False)
753
754     @synchronized
755     def __iter__(self):
756         """Iterate over names of files and collections contained in this collection."""
757         return iter(viewkeys(self._items))
758
759     @synchronized
760     def __getitem__(self, k):
761         """Get a file or collection that is directly contained by this collection.
762
763         If you want to search a path, use `find()` instead.
764
765         """
766         return self._items[k]
767
768     @synchronized
769     def __contains__(self, k):
770         """Test if there is a file or collection a directly contained by this collection."""
771         return k in self._items
772
773     @synchronized
774     def __len__(self):
775         """Get the number of items directly contained in this collection."""
776         return len(self._items)
777
778     @must_be_writable
779     @synchronized
780     def __delitem__(self, p):
781         """Delete an item by name which is directly contained by this collection."""
782         del self._items[p]
783         self.set_committed(False)
784         self.notify(DEL, self, p, None)
785
786     @synchronized
787     def keys(self):
788         """Get a list of names of files and collections directly contained in this collection."""
789         return self._items.keys()
790
791     @synchronized
792     def values(self):
793         """Get a list of files and collection objects directly contained in this collection."""
794         return listvalues(self._items)
795
796     @synchronized
797     def items(self):
798         """Get a list of (name, object) tuples directly contained in this collection."""
799         return listitems(self._items)
800
801     def exists(self, path):
802         """Test if there is a file or collection at `path`."""
803         return self.find(path) is not None
804
805     @must_be_writable
806     @synchronized
807     def remove(self, path, recursive=False):
808         """Remove the file or subcollection (directory) at `path`.
809
810         :recursive:
811           Specify whether to remove non-empty subcollections (True), or raise an error (False).
812         """
813
814         if not path:
815             raise errors.ArgumentError("Parameter 'path' is empty.")
816
817         pathcomponents = path.split("/", 1)
818         item = self._items.get(pathcomponents[0])
819         if item is None:
820             raise IOError(errno.ENOENT, "File not found", path)
821         if len(pathcomponents) == 1:
822             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
823                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
824             deleteditem = self._items[pathcomponents[0]]
825             del self._items[pathcomponents[0]]
826             self.set_committed(False)
827             self.notify(DEL, self, pathcomponents[0], deleteditem)
828         else:
829             item.remove(pathcomponents[1])
830
831     def _clonefrom(self, source):
832         for k,v in listitems(source):
833             self._items[k] = v.clone(self, k)
834
835     def clone(self):
836         raise NotImplementedError()
837
838     @must_be_writable
839     @synchronized
840     def add(self, source_obj, target_name, overwrite=False, reparent=False):
841         """Copy or move a file or subcollection to this collection.
842
843         :source_obj:
844           An ArvadosFile, or Subcollection object
845
846         :target_name:
847           Destination item name.  If the target name already exists and is a
848           file, this will raise an error unless you specify `overwrite=True`.
849
850         :overwrite:
851           Whether to overwrite target file if it already exists.
852
853         :reparent:
854           If True, source_obj will be moved from its parent collection to this collection.
855           If False, source_obj will be copied and the parent collection will be
856           unmodified.
857
858         """
859
860         if target_name in self and not overwrite:
861             raise IOError(errno.EEXIST, "File already exists", target_name)
862
863         modified_from = None
864         if target_name in self:
865             modified_from = self[target_name]
866
867         # Actually make the move or copy.
868         if reparent:
869             source_obj._reparent(self, target_name)
870             item = source_obj
871         else:
872             item = source_obj.clone(self, target_name)
873
874         self._items[target_name] = item
875         self.set_committed(False)
876         if not self._has_remote_blocks and source_obj.has_remote_blocks():
877             self.set_has_remote_blocks(True)
878
879         if modified_from:
880             self.notify(MOD, self, target_name, (modified_from, item))
881         else:
882             self.notify(ADD, self, target_name, item)
883
884     def _get_src_target(self, source, target_path, source_collection, create_dest):
885         if source_collection is None:
886             source_collection = self
887
888         # Find the object
889         if isinstance(source, basestring):
890             source_obj = source_collection.find(source)
891             if source_obj is None:
892                 raise IOError(errno.ENOENT, "File not found", source)
893             sourcecomponents = source.split("/")
894         else:
895             source_obj = source
896             sourcecomponents = None
897
898         # Find parent collection the target path
899         targetcomponents = target_path.split("/")
900
901         # Determine the name to use.
902         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
903
904         if not target_name:
905             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
906
907         if create_dest:
908             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
909         else:
910             if len(targetcomponents) > 1:
911                 target_dir = self.find("/".join(targetcomponents[0:-1]))
912             else:
913                 target_dir = self
914
915         if target_dir is None:
916             raise IOError(errno.ENOENT, "Target directory not found", target_name)
917
918         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
919             target_dir = target_dir[target_name]
920             target_name = sourcecomponents[-1]
921
922         return (source_obj, target_dir, target_name)
923
924     @must_be_writable
925     @synchronized
926     def copy(self, source, target_path, source_collection=None, overwrite=False):
927         """Copy a file or subcollection to a new path in this collection.
928
929         :source:
930           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
931
932         :target_path:
933           Destination file or path.  If the target path already exists and is a
934           subcollection, the item will be placed inside the subcollection.  If
935           the target path already exists and is a file, this will raise an error
936           unless you specify `overwrite=True`.
937
938         :source_collection:
939           Collection to copy `source_path` from (default `self`)
940
941         :overwrite:
942           Whether to overwrite target file if it already exists.
943         """
944
945         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
946         target_dir.add(source_obj, target_name, overwrite, False)
947
948     @must_be_writable
949     @synchronized
950     def rename(self, source, target_path, source_collection=None, overwrite=False):
951         """Move a file or subcollection from `source_collection` to a new path in this collection.
952
953         :source:
954           A string with a path to source file or subcollection.
955
956         :target_path:
957           Destination file or path.  If the target path already exists and is a
958           subcollection, the item will be placed inside the subcollection.  If
959           the target path already exists and is a file, this will raise an error
960           unless you specify `overwrite=True`.
961
962         :source_collection:
963           Collection to copy `source_path` from (default `self`)
964
965         :overwrite:
966           Whether to overwrite target file if it already exists.
967         """
968
969         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
970         if not source_obj.writable():
971             raise IOError(errno.EROFS, "Source collection is read only", source)
972         target_dir.add(source_obj, target_name, overwrite, True)
973
974     def portable_manifest_text(self, stream_name="."):
975         """Get the manifest text for this collection, sub collections and files.
976
977         This method does not flush outstanding blocks to Keep.  It will return
978         a normalized manifest with access tokens stripped.
979
980         :stream_name:
981           Name to use for this stream (directory)
982
983         """
984         return self._get_manifest_text(stream_name, True, True)
985
986     @synchronized
987     def manifest_text(self, stream_name=".", strip=False, normalize=False,
988                       only_committed=False):
989         """Get the manifest text for this collection, sub collections and files.
990
991         This method will flush outstanding blocks to Keep.  By default, it will
992         not normalize an unmodified manifest or strip access tokens.
993
994         :stream_name:
995           Name to use for this stream (directory)
996
997         :strip:
998           If True, remove signing tokens from block locators if present.
999           If False (default), block locators are left unchanged.
1000
1001         :normalize:
1002           If True, always export the manifest text in normalized form
1003           even if the Collection is not modified.  If False (default) and the collection
1004           is not modified, return the original manifest text even if it is not
1005           in normalized form.
1006
1007         :only_committed:
1008           If True, don't commit pending blocks.
1009
1010         """
1011
1012         if not only_committed:
1013             self._my_block_manager().commit_all()
1014         return self._get_manifest_text(stream_name, strip, normalize,
1015                                        only_committed=only_committed)
1016
1017     @synchronized
1018     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1019         """Get the manifest text for this collection, sub collections and files.
1020
1021         :stream_name:
1022           Name to use for this stream (directory)
1023
1024         :strip:
1025           If True, remove signing tokens from block locators if present.
1026           If False (default), block locators are left unchanged.
1027
1028         :normalize:
1029           If True, always export the manifest text in normalized form
1030           even if the Collection is not modified.  If False (default) and the collection
1031           is not modified, return the original manifest text even if it is not
1032           in normalized form.
1033
1034         :only_committed:
1035           If True, only include blocks that were already committed to Keep.
1036
1037         """
1038
1039         if not self.committed() or self._manifest_text is None or normalize:
1040             stream = {}
1041             buf = []
1042             sorted_keys = sorted(self.keys())
1043             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1044                 # Create a stream per file `k`
1045                 arvfile = self[filename]
1046                 filestream = []
1047                 for segment in arvfile.segments():
1048                     loc = segment.locator
1049                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1050                         if only_committed:
1051                             continue
1052                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1053                     if strip:
1054                         loc = KeepLocator(loc).stripped()
1055                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1056                                          segment.segment_offset, segment.range_size))
1057                 stream[filename] = filestream
1058             if stream:
1059                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1060             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1061                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1062             return "".join(buf)
1063         else:
1064             if strip:
1065                 return self.stripped_manifest()
1066             else:
1067                 return self._manifest_text
1068
1069     @synchronized
1070     def _copy_remote_blocks(self, remote_blocks={}):
1071         """Scan through the entire collection and ask Keep to copy remote blocks.
1072
1073         When accessing a remote collection, blocks will have a remote signature
1074         (+R instead of +A). Collect these signatures and request Keep to copy the
1075         blocks to the local cluster, returning local (+A) signatures.
1076
1077         :remote_blocks:
1078           Shared cache of remote to local block mappings. This is used to avoid
1079           doing extra work when blocks are shared by more than one file in
1080           different subdirectories.
1081
1082         """
1083         for item in self:
1084             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1085         return remote_blocks
1086
1087     @synchronized
1088     def diff(self, end_collection, prefix=".", holding_collection=None):
1089         """Generate list of add/modify/delete actions.
1090
1091         When given to `apply`, will change `self` to match `end_collection`
1092
1093         """
1094         changes = []
1095         if holding_collection is None:
1096             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1097         for k in self:
1098             if k not in end_collection:
1099                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1100         for k in end_collection:
1101             if k in self:
1102                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1103                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1104                 elif end_collection[k] != self[k]:
1105                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1106                 else:
1107                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1108             else:
1109                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1110         return changes
1111
1112     @must_be_writable
1113     @synchronized
1114     def apply(self, changes):
1115         """Apply changes from `diff`.
1116
1117         If a change conflicts with a local change, it will be saved to an
1118         alternate path indicating the conflict.
1119
1120         """
1121         if changes:
1122             self.set_committed(False)
1123         for change in changes:
1124             event_type = change[0]
1125             path = change[1]
1126             initial = change[2]
1127             local = self.find(path)
1128             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1129                                                                     time.gmtime()))
1130             if event_type == ADD:
1131                 if local is None:
1132                     # No local file at path, safe to copy over new file
1133                     self.copy(initial, path)
1134                 elif local is not None and local != initial:
1135                     # There is already local file and it is different:
1136                     # save change to conflict file.
1137                     self.copy(initial, conflictpath)
1138             elif event_type == MOD or event_type == TOK:
1139                 final = change[3]
1140                 if local == initial:
1141                     # Local matches the "initial" item so it has not
1142                     # changed locally and is safe to update.
1143                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1144                         # Replace contents of local file with new contents
1145                         local.replace_contents(final)
1146                     else:
1147                         # Overwrite path with new item; this can happen if
1148                         # path was a file and is now a collection or vice versa
1149                         self.copy(final, path, overwrite=True)
1150                 else:
1151                     # Local is missing (presumably deleted) or local doesn't
1152                     # match the "start" value, so save change to conflict file
1153                     self.copy(final, conflictpath)
1154             elif event_type == DEL:
1155                 if local == initial:
1156                     # Local item matches "initial" value, so it is safe to remove.
1157                     self.remove(path, recursive=True)
1158                 # else, the file is modified or already removed, in either
1159                 # case we don't want to try to remove it.
1160
1161     def portable_data_hash(self):
1162         """Get the portable data hash for this collection's manifest."""
1163         if self._manifest_locator and self.committed():
1164             # If the collection is already saved on the API server, and it's committed
1165             # then return API server's PDH response.
1166             return self._portable_data_hash
1167         else:
1168             stripped = self.portable_manifest_text().encode()
1169             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1170
1171     @synchronized
1172     def subscribe(self, callback):
1173         if self._callback is None:
1174             self._callback = callback
1175         else:
1176             raise errors.ArgumentError("A callback is already set on this collection.")
1177
1178     @synchronized
1179     def unsubscribe(self):
1180         if self._callback is not None:
1181             self._callback = None
1182
1183     @synchronized
1184     def notify(self, event, collection, name, item):
1185         if self._callback:
1186             self._callback(event, collection, name, item)
1187         self.root_collection().notify(event, collection, name, item)
1188
1189     @synchronized
1190     def __eq__(self, other):
1191         if other is self:
1192             return True
1193         if not isinstance(other, RichCollectionBase):
1194             return False
1195         if len(self._items) != len(other):
1196             return False
1197         for k in self._items:
1198             if k not in other:
1199                 return False
1200             if self._items[k] != other[k]:
1201                 return False
1202         return True
1203
1204     def __ne__(self, other):
1205         return not self.__eq__(other)
1206
1207     @synchronized
1208     def flush(self):
1209         """Flush bufferblocks to Keep."""
1210         for e in listvalues(self):
1211             e.flush()
1212
1213
1214 class Collection(RichCollectionBase):
1215     """Represents the root of an Arvados Collection.
1216
1217     This class is threadsafe.  The root collection object, all subcollections
1218     and files are protected by a single lock (i.e. each access locks the entire
1219     collection).
1220
1221     Brief summary of
1222     useful methods:
1223
1224     :To read an existing file:
1225       `c.open("myfile", "r")`
1226
1227     :To write a new file:
1228       `c.open("myfile", "w")`
1229
1230     :To determine if a file exists:
1231       `c.find("myfile") is not None`
1232
1233     :To copy a file:
1234       `c.copy("source", "dest")`
1235
1236     :To delete a file:
1237       `c.remove("myfile")`
1238
1239     :To save to an existing collection record:
1240       `c.save()`
1241
1242     :To save a new collection record:
1243     `c.save_new()`
1244
1245     :To merge remote changes into this object:
1246       `c.update()`
1247
1248     Must be associated with an API server Collection record (during
1249     initialization, or using `save_new`) to use `save` or `update`
1250
1251     """
1252
1253     def __init__(self, manifest_locator_or_text=None,
1254                  api_client=None,
1255                  keep_client=None,
1256                  num_retries=None,
1257                  parent=None,
1258                  apiconfig=None,
1259                  block_manager=None,
1260                  replication_desired=None,
1261                  put_threads=None):
1262         """Collection constructor.
1263
1264         :manifest_locator_or_text:
1265           An Arvados collection UUID, portable data hash, raw manifest
1266           text, or (if creating an empty collection) None.
1267
1268         :parent:
1269           the parent Collection, may be None.
1270
1271         :apiconfig:
1272           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1273           Prefer this over supplying your own api_client and keep_client (except in testing).
1274           Will use default config settings if not specified.
1275
1276         :api_client:
1277           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1278
1279         :keep_client:
1280           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1281
1282         :num_retries:
1283           the number of retries for API and Keep requests.
1284
1285         :block_manager:
1286           the block manager to use.  If not specified, create one.
1287
1288         :replication_desired:
1289           How many copies should Arvados maintain. If None, API server default
1290           configuration applies. If not None, this value will also be used
1291           for determining the number of block copies being written.
1292
1293         """
1294         super(Collection, self).__init__(parent)
1295         self._api_client = api_client
1296         self._keep_client = keep_client
1297         self._block_manager = block_manager
1298         self.replication_desired = replication_desired
1299         self.put_threads = put_threads
1300
1301         if apiconfig:
1302             self._config = apiconfig
1303         else:
1304             self._config = config.settings()
1305
1306         self.num_retries = num_retries if num_retries is not None else 0
1307         self._manifest_locator = None
1308         self._manifest_text = None
1309         self._portable_data_hash = None
1310         self._api_response = None
1311         self._past_versions = set()
1312
1313         self.lock = threading.RLock()
1314         self.events = None
1315
1316         if manifest_locator_or_text:
1317             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1318                 self._manifest_locator = manifest_locator_or_text
1319             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1320                 self._manifest_locator = manifest_locator_or_text
1321                 if not self._has_local_collection_uuid():
1322                     self._has_remote_blocks = True
1323             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1324                 self._manifest_text = manifest_locator_or_text
1325                 if '+R' in self._manifest_text:
1326                     self._has_remote_blocks = True
1327             else:
1328                 raise errors.ArgumentError(
1329                     "Argument to CollectionReader is not a manifest or a collection UUID")
1330
1331             try:
1332                 self._populate()
1333             except (IOError, errors.SyntaxError) as e:
1334                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1335
1336     def root_collection(self):
1337         return self
1338
1339     def get_properties(self):
1340         if self._api_response and self._api_response["properties"]:
1341             return self._api_response["properties"]
1342         else:
1343             return {}
1344
1345     def get_trash_at(self):
1346         if self._api_response and self._api_response["trash_at"]:
1347             return ciso8601.parse_datetime(self._api_response["trash_at"])
1348         else:
1349             return None
1350
1351     def stream_name(self):
1352         return "."
1353
1354     def writable(self):
1355         return True
1356
1357     @synchronized
1358     def known_past_version(self, modified_at_and_portable_data_hash):
1359         return modified_at_and_portable_data_hash in self._past_versions
1360
1361     @synchronized
1362     @retry_method
1363     def update(self, other=None, num_retries=None):
1364         """Merge the latest collection on the API server with the current collection."""
1365
1366         if other is None:
1367             if self._manifest_locator is None:
1368                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1369             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1370             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1371                 response.get("portable_data_hash") != self.portable_data_hash()):
1372                 # The record on the server is different from our current one, but we've seen it before,
1373                 # so ignore it because it's already been merged.
1374                 # However, if it's the same as our current record, proceed with the update, because we want to update
1375                 # our tokens.
1376                 return
1377             else:
1378                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1379             other = CollectionReader(response["manifest_text"])
1380         baseline = CollectionReader(self._manifest_text)
1381         self.apply(baseline.diff(other))
1382         self._manifest_text = self.manifest_text()
1383
1384     @synchronized
1385     def _my_api(self):
1386         if self._api_client is None:
1387             self._api_client = ThreadSafeApiCache(self._config)
1388             if self._keep_client is None:
1389                 self._keep_client = self._api_client.keep
1390         return self._api_client
1391
1392     @synchronized
1393     def _my_keep(self):
1394         if self._keep_client is None:
1395             if self._api_client is None:
1396                 self._my_api()
1397             else:
1398                 self._keep_client = KeepClient(api_client=self._api_client)
1399         return self._keep_client
1400
1401     @synchronized
1402     def _my_block_manager(self):
1403         if self._block_manager is None:
1404             copies = (self.replication_desired or
1405                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1406                                                    2))
1407             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1408         return self._block_manager
1409
1410     def _remember_api_response(self, response):
1411         self._api_response = response
1412         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1413
1414     def _populate_from_api_server(self):
1415         # As in KeepClient itself, we must wait until the last
1416         # possible moment to instantiate an API client, in order to
1417         # avoid tripping up clients that don't have access to an API
1418         # server.  If we do build one, make sure our Keep client uses
1419         # it.  If instantiation fails, we'll fall back to the except
1420         # clause, just like any other Collection lookup
1421         # failure. Return an exception, or None if successful.
1422         self._remember_api_response(self._my_api().collections().get(
1423             uuid=self._manifest_locator).execute(
1424                 num_retries=self.num_retries))
1425         self._manifest_text = self._api_response['manifest_text']
1426         self._portable_data_hash = self._api_response['portable_data_hash']
1427         # If not overriden via kwargs, we should try to load the
1428         # replication_desired from the API server
1429         if self.replication_desired is None:
1430             self.replication_desired = self._api_response.get('replication_desired', None)
1431
1432     def _populate(self):
1433         if self._manifest_text is None:
1434             if self._manifest_locator is None:
1435                 return
1436             else:
1437                 self._populate_from_api_server()
1438         self._baseline_manifest = self._manifest_text
1439         self._import_manifest(self._manifest_text)
1440
1441     def _has_collection_uuid(self):
1442         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1443
1444     def _has_local_collection_uuid(self):
1445         return self._has_collection_uuid and \
1446             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1447
1448     def __enter__(self):
1449         return self
1450
1451     def __exit__(self, exc_type, exc_value, traceback):
1452         """Support scoped auto-commit in a with: block."""
1453         if exc_type is None:
1454             if self.writable() and self._has_collection_uuid():
1455                 self.save()
1456         self.stop_threads()
1457
1458     def stop_threads(self):
1459         if self._block_manager is not None:
1460             self._block_manager.stop_threads()
1461
1462     @synchronized
1463     def manifest_locator(self):
1464         """Get the manifest locator, if any.
1465
1466         The manifest locator will be set when the collection is loaded from an
1467         API server record or the portable data hash of a manifest.
1468
1469         The manifest locator will be None if the collection is newly created or
1470         was created directly from manifest text.  The method `save_new()` will
1471         assign a manifest locator.
1472
1473         """
1474         return self._manifest_locator
1475
1476     @synchronized
1477     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1478         if new_config is None:
1479             new_config = self._config
1480         if readonly:
1481             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1482         else:
1483             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1484
1485         newcollection._clonefrom(self)
1486         return newcollection
1487
1488     @synchronized
1489     def api_response(self):
1490         """Returns information about this Collection fetched from the API server.
1491
1492         If the Collection exists in Keep but not the API server, currently
1493         returns None.  Future versions may provide a synthetic response.
1494
1495         """
1496         return self._api_response
1497
1498     def find_or_create(self, path, create_type):
1499         """See `RichCollectionBase.find_or_create`"""
1500         if path == ".":
1501             return self
1502         else:
1503             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1504
1505     def find(self, path):
1506         """See `RichCollectionBase.find`"""
1507         if path == ".":
1508             return self
1509         else:
1510             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1511
1512     def remove(self, path, recursive=False):
1513         """See `RichCollectionBase.remove`"""
1514         if path == ".":
1515             raise errors.ArgumentError("Cannot remove '.'")
1516         else:
1517             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1518
1519     @must_be_writable
1520     @synchronized
1521     @retry_method
1522     def save(self,
1523              properties=None,
1524              storage_classes=None,
1525              trash_at=None,
1526              merge=True,
1527              num_retries=None):
1528         """Save collection to an existing collection record.
1529
1530         Commit pending buffer blocks to Keep, merge with remote record (if
1531         merge=True, the default), and update the collection record. Returns
1532         the current manifest text.
1533
1534         Will raise AssertionError if not associated with a collection record on
1535         the API server.  If you want to save a manifest to Keep only, see
1536         `save_new()`.
1537
1538         :properties:
1539           Additional properties of collection. This value will replace any existing
1540           properties of collection.
1541
1542         :storage_classes:
1543           Specify desirable storage classes to be used when writing data to Keep.
1544
1545         :trash_at:
1546           A collection is *expiring* when it has a *trash_at* time in the future.
1547           An expiring collection can be accessed as normal,
1548           but is scheduled to be trashed automatically at the *trash_at* time.
1549
1550         :merge:
1551           Update and merge remote changes before saving.  Otherwise, any
1552           remote changes will be ignored and overwritten.
1553
1554         :num_retries:
1555           Retry count on API calls (if None,  use the collection default)
1556
1557         """
1558         if properties and type(properties) is not dict:
1559             raise errors.ArgumentError("properties must be dictionary type.")
1560
1561         if storage_classes and type(storage_classes) is not list:
1562             raise errors.ArgumentError("storage_classes must be list type.")
1563
1564         if trash_at and type(trash_at) is not datetime.datetime:
1565             raise errors.ArgumentError("trash_at must be datetime type.")
1566
1567         body={}
1568         if properties:
1569             body["properties"] = properties
1570         if storage_classes:
1571             body["storage_classes_desired"] = storage_classes
1572         if trash_at:
1573             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1574             body["trash_at"] = t
1575
1576         if not self.committed():
1577             if self._has_remote_blocks:
1578                 # Copy any remote blocks to the local cluster.
1579                 self._copy_remote_blocks(remote_blocks={})
1580                 self._has_remote_blocks = False
1581             if not self._has_collection_uuid():
1582                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1583             elif not self._has_local_collection_uuid():
1584                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1585
1586             self._my_block_manager().commit_all()
1587
1588             if merge:
1589                 self.update()
1590
1591             text = self.manifest_text(strip=False)
1592             body['manifest_text'] = text
1593
1594             self._remember_api_response(self._my_api().collections().update(
1595                 uuid=self._manifest_locator,
1596                 body=body
1597                 ).execute(num_retries=num_retries))
1598             self._manifest_text = self._api_response["manifest_text"]
1599             self._portable_data_hash = self._api_response["portable_data_hash"]
1600             self.set_committed(True)
1601         elif body:
1602             self._remember_api_response(self._my_api().collections().update(
1603                 uuid=self._manifest_locator,
1604                 body=body
1605                 ).execute(num_retries=num_retries))
1606
1607         return self._manifest_text
1608
1609
1610     @must_be_writable
1611     @synchronized
1612     @retry_method
1613     def save_new(self, name=None,
1614                  create_collection_record=True,
1615                  owner_uuid=None,
1616                  properties=None,
1617                  storage_classes=None,
1618                  trash_at=None,
1619                  ensure_unique_name=False,
1620                  num_retries=None):
1621         """Save collection to a new collection record.
1622
1623         Commit pending buffer blocks to Keep and, when create_collection_record
1624         is True (default), create a new collection record.  After creating a
1625         new collection record, this Collection object will be associated with
1626         the new record used by `save()`. Returns the current manifest text.
1627
1628         :name:
1629           The collection name.
1630
1631         :create_collection_record:
1632            If True, create a collection record on the API server.
1633            If False, only commit blocks to Keep and return the manifest text.
1634
1635         :owner_uuid:
1636           the user, or project uuid that will own this collection.
1637           If None, defaults to the current user.
1638
1639         :properties:
1640           Additional properties of collection. This value will replace any existing
1641           properties of collection.
1642
1643         :storage_classes:
1644           Specify desirable storage classes to be used when writing data to Keep.
1645
1646         :trash_at:
1647           A collection is *expiring* when it has a *trash_at* time in the future.
1648           An expiring collection can be accessed as normal,
1649           but is scheduled to be trashed automatically at the *trash_at* time.
1650
1651         :ensure_unique_name:
1652           If True, ask the API server to rename the collection
1653           if it conflicts with a collection with the same name and owner.  If
1654           False, a name conflict will result in an error.
1655
1656         :num_retries:
1657           Retry count on API calls (if None,  use the collection default)
1658
1659         """
1660         if properties and type(properties) is not dict:
1661             raise errors.ArgumentError("properties must be dictionary type.")
1662
1663         if storage_classes and type(storage_classes) is not list:
1664             raise errors.ArgumentError("storage_classes must be list type.")
1665
1666         if trash_at and type(trash_at) is not datetime.datetime:
1667             raise errors.ArgumentError("trash_at must be datetime type.")
1668
1669         if self._has_remote_blocks:
1670             # Copy any remote blocks to the local cluster.
1671             self._copy_remote_blocks(remote_blocks={})
1672             self._has_remote_blocks = False
1673
1674         self._my_block_manager().commit_all()
1675         text = self.manifest_text(strip=False)
1676
1677         if create_collection_record:
1678             if name is None:
1679                 name = "New collection"
1680                 ensure_unique_name = True
1681
1682             body = {"manifest_text": text,
1683                     "name": name,
1684                     "replication_desired": self.replication_desired}
1685             if owner_uuid:
1686                 body["owner_uuid"] = owner_uuid
1687             if properties:
1688                 body["properties"] = properties
1689             if storage_classes:
1690                 body["storage_classes_desired"] = storage_classes
1691             if trash_at:
1692                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1693                 body["trash_at"] = t
1694
1695             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1696             text = self._api_response["manifest_text"]
1697
1698             self._manifest_locator = self._api_response["uuid"]
1699             self._portable_data_hash = self._api_response["portable_data_hash"]
1700
1701             self._manifest_text = text
1702             self.set_committed(True)
1703
1704         return text
1705
1706     _token_re = re.compile(r'(\S+)(\s+|$)')
1707     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1708     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1709
1710     def _unescape_manifest_path(self, path):
1711         return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1712
1713     @synchronized
1714     def _import_manifest(self, manifest_text):
1715         """Import a manifest into a `Collection`.
1716
1717         :manifest_text:
1718           The manifest text to import from.
1719
1720         """
1721         if len(self) > 0:
1722             raise ArgumentError("Can only import manifest into an empty collection")
1723
1724         STREAM_NAME = 0
1725         BLOCKS = 1
1726         SEGMENTS = 2
1727
1728         stream_name = None
1729         state = STREAM_NAME
1730
1731         for token_and_separator in self._token_re.finditer(manifest_text):
1732             tok = token_and_separator.group(1)
1733             sep = token_and_separator.group(2)
1734
1735             if state == STREAM_NAME:
1736                 # starting a new stream
1737                 stream_name = self._unescape_manifest_path(tok)
1738                 blocks = []
1739                 segments = []
1740                 streamoffset = 0
1741                 state = BLOCKS
1742                 self.find_or_create(stream_name, COLLECTION)
1743                 continue
1744
1745             if state == BLOCKS:
1746                 block_locator = self._block_re.match(tok)
1747                 if block_locator:
1748                     blocksize = int(block_locator.group(1))
1749                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1750                     streamoffset += blocksize
1751                 else:
1752                     state = SEGMENTS
1753
1754             if state == SEGMENTS:
1755                 file_segment = self._segment_re.match(tok)
1756                 if file_segment:
1757                     pos = int(file_segment.group(1))
1758                     size = int(file_segment.group(2))
1759                     name = self._unescape_manifest_path(file_segment.group(3))
1760                     if name.split('/')[-1] == '.':
1761                         # placeholder for persisting an empty directory, not a real file
1762                         if len(name) > 2:
1763                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1764                     else:
1765                         filepath = os.path.join(stream_name, name)
1766                         afile = self.find_or_create(filepath, FILE)
1767                         if isinstance(afile, ArvadosFile):
1768                             afile.add_segment(blocks, pos, size)
1769                         else:
1770                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1771                 else:
1772                     # error!
1773                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1774
1775             if sep == "\n":
1776                 stream_name = None
1777                 state = STREAM_NAME
1778
1779         self.set_committed(True)
1780
1781     @synchronized
1782     def notify(self, event, collection, name, item):
1783         if self._callback:
1784             self._callback(event, collection, name, item)
1785
1786
1787 class Subcollection(RichCollectionBase):
1788     """This is a subdirectory within a collection that doesn't have its own API
1789     server record.
1790
1791     Subcollection locking falls under the umbrella lock of its root collection.
1792
1793     """
1794
1795     def __init__(self, parent, name):
1796         super(Subcollection, self).__init__(parent)
1797         self.lock = self.root_collection().lock
1798         self._manifest_text = None
1799         self.name = name
1800         self.num_retries = parent.num_retries
1801
1802     def root_collection(self):
1803         return self.parent.root_collection()
1804
1805     def writable(self):
1806         return self.root_collection().writable()
1807
1808     def _my_api(self):
1809         return self.root_collection()._my_api()
1810
1811     def _my_keep(self):
1812         return self.root_collection()._my_keep()
1813
1814     def _my_block_manager(self):
1815         return self.root_collection()._my_block_manager()
1816
1817     def stream_name(self):
1818         return os.path.join(self.parent.stream_name(), self.name)
1819
1820     @synchronized
1821     def clone(self, new_parent, new_name):
1822         c = Subcollection(new_parent, new_name)
1823         c._clonefrom(self)
1824         return c
1825
1826     @must_be_writable
1827     @synchronized
1828     def _reparent(self, newparent, newname):
1829         self.set_committed(False)
1830         self.flush()
1831         self.parent.remove(self.name, recursive=True)
1832         self.parent = newparent
1833         self.name = newname
1834         self.lock = self.parent.root_collection().lock
1835
1836
1837 class CollectionReader(Collection):
1838     """A read-only collection object.
1839
1840     Initialize from a collection UUID or portable data hash, or raw
1841     manifest text.  See `Collection` constructor for detailed options.
1842
1843     """
1844     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1845         self._in_init = True
1846         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1847         self._in_init = False
1848
1849         # Forego any locking since it should never change once initialized.
1850         self.lock = NoopLock()
1851
1852         # Backwards compatability with old CollectionReader
1853         # all_streams() and all_files()
1854         self._streams = None
1855
1856     def writable(self):
1857         return self._in_init
1858
1859     def _populate_streams(orig_func):
1860         @functools.wraps(orig_func)
1861         def populate_streams_wrapper(self, *args, **kwargs):
1862             # Defer populating self._streams until needed since it creates a copy of the manifest.
1863             if self._streams is None:
1864                 if self._manifest_text:
1865                     self._streams = [sline.split()
1866                                      for sline in self._manifest_text.split("\n")
1867                                      if sline]
1868                 else:
1869                     self._streams = []
1870             return orig_func(self, *args, **kwargs)
1871         return populate_streams_wrapper
1872
1873     @_populate_streams
1874     def normalize(self):
1875         """Normalize the streams returned by `all_streams`.
1876
1877         This method is kept for backwards compatability and only affects the
1878         behavior of `all_streams()` and `all_files()`
1879
1880         """
1881
1882         # Rearrange streams
1883         streams = {}
1884         for s in self.all_streams():
1885             for f in s.all_files():
1886                 streamname, filename = split(s.name() + "/" + f.name())
1887                 if streamname not in streams:
1888                     streams[streamname] = {}
1889                 if filename not in streams[streamname]:
1890                     streams[streamname][filename] = []
1891                 for r in f.segments:
1892                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1893
1894         self._streams = [normalize_stream(s, streams[s])
1895                          for s in sorted(streams)]
1896     @_populate_streams
1897     def all_streams(self):
1898         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1899                 for s in self._streams]
1900
1901     @_populate_streams
1902     def all_files(self):
1903         for s in self.all_streams():
1904             for f in s.all_files():
1905                 yield f