14539: Persists empty directories by adding a '\056' empty file.
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import ciso8601
11 import datetime
12 import errno
13 import functools
14 import hashlib
15 import io
16 import logging
17 import os
18 import re
19 import sys
20 import threading
21 import time
22
23 from collections import deque
24 from stat import *
25
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
34 import arvados.util
35 import arvados.events as events
36 from arvados.retry import retry_method
37
38 _logger = logging.getLogger('arvados.collection')
39
40
41 if sys.version_info >= (3, 0):
42     TextIOWrapper = io.TextIOWrapper
43 else:
44     class TextIOWrapper(io.TextIOWrapper):
45         """To maintain backward compatibility, cast str to unicode in
46         write('foo').
47
48         """
49         def write(self, data):
50             if isinstance(data, basestring):
51                 data = unicode(data)
52             return super(TextIOWrapper, self).write(data)
53
54
55 class CollectionBase(object):
56     """Abstract base class for Collection classes."""
57
58     def __enter__(self):
59         return self
60
61     def __exit__(self, exc_type, exc_value, traceback):
62         pass
63
64     def _my_keep(self):
65         if self._keep_client is None:
66             self._keep_client = KeepClient(api_client=self._api_client,
67                                            num_retries=self.num_retries)
68         return self._keep_client
69
70     def stripped_manifest(self):
71         """Get the manifest with locator hints stripped.
72
73         Return the manifest for the current collection with all
74         non-portable hints (i.e., permission signatures and other
75         hints other than size hints) removed from the locators.
76         """
77         raw = self.manifest_text()
78         clean = []
79         for line in raw.split("\n"):
80             fields = line.split()
81             if fields:
82                 clean_fields = fields[:1] + [
83                     (re.sub(r'\+[^\d][^\+]*', '', x)
84                      if re.match(arvados.util.keep_locator_pattern, x)
85                      else x)
86                     for x in fields[1:]]
87                 clean += [' '.join(clean_fields), "\n"]
88         return ''.join(clean)
89
90
91 class _WriterFile(_FileLikeObjectBase):
92     def __init__(self, coll_writer, name):
93         super(_WriterFile, self).__init__(name, 'wb')
94         self.dest = coll_writer
95
96     def close(self):
97         super(_WriterFile, self).close()
98         self.dest.finish_current_file()
99
100     @_FileLikeObjectBase._before_close
101     def write(self, data):
102         self.dest.write(data)
103
104     @_FileLikeObjectBase._before_close
105     def writelines(self, seq):
106         for data in seq:
107             self.write(data)
108
109     @_FileLikeObjectBase._before_close
110     def flush(self):
111         self.dest.flush_data()
112
113
114 class CollectionWriter(CollectionBase):
115     """Deprecated, use Collection instead."""
116
117     def __init__(self, api_client=None, num_retries=0, replication=None):
118         """Instantiate a CollectionWriter.
119
120         CollectionWriter lets you build a new Arvados Collection from scratch.
121         Write files to it.  The CollectionWriter will upload data to Keep as
122         appropriate, and provide you with the Collection manifest text when
123         you're finished.
124
125         Arguments:
126         * api_client: The API client to use to look up Collections.  If not
127           provided, CollectionReader will build one from available Arvados
128           configuration.
129         * num_retries: The default number of times to retry failed
130           service requests.  Default 0.  You may change this value
131           after instantiation, but note those changes may not
132           propagate to related objects like the Keep client.
133         * replication: The number of copies of each block to store.
134           If this argument is None or not supplied, replication is
135           the server-provided default if available, otherwise 2.
136         """
137         self._api_client = api_client
138         self.num_retries = num_retries
139         self.replication = (2 if replication is None else replication)
140         self._keep_client = None
141         self._data_buffer = []
142         self._data_buffer_len = 0
143         self._current_stream_files = []
144         self._current_stream_length = 0
145         self._current_stream_locators = []
146         self._current_stream_name = '.'
147         self._current_file_name = None
148         self._current_file_pos = 0
149         self._finished_streams = []
150         self._close_file = None
151         self._queued_file = None
152         self._queued_dirents = deque()
153         self._queued_trees = deque()
154         self._last_open = None
155
156     def __exit__(self, exc_type, exc_value, traceback):
157         if exc_type is None:
158             self.finish()
159
160     def do_queued_work(self):
161         # The work queue consists of three pieces:
162         # * _queued_file: The file object we're currently writing to the
163         #   Collection.
164         # * _queued_dirents: Entries under the current directory
165         #   (_queued_trees[0]) that we want to write or recurse through.
166         #   This may contain files from subdirectories if
167         #   max_manifest_depth == 0 for this directory.
168         # * _queued_trees: Directories that should be written as separate
169         #   streams to the Collection.
170         # This function handles the smallest piece of work currently queued
171         # (current file, then current directory, then next directory) until
172         # no work remains.  The _work_THING methods each do a unit of work on
173         # THING.  _queue_THING methods add a THING to the work queue.
174         while True:
175             if self._queued_file:
176                 self._work_file()
177             elif self._queued_dirents:
178                 self._work_dirents()
179             elif self._queued_trees:
180                 self._work_trees()
181             else:
182                 break
183
184     def _work_file(self):
185         while True:
186             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
187             if not buf:
188                 break
189             self.write(buf)
190         self.finish_current_file()
191         if self._close_file:
192             self._queued_file.close()
193         self._close_file = None
194         self._queued_file = None
195
196     def _work_dirents(self):
197         path, stream_name, max_manifest_depth = self._queued_trees[0]
198         if stream_name != self.current_stream_name():
199             self.start_new_stream(stream_name)
200         while self._queued_dirents:
201             dirent = self._queued_dirents.popleft()
202             target = os.path.join(path, dirent)
203             if os.path.isdir(target):
204                 self._queue_tree(target,
205                                  os.path.join(stream_name, dirent),
206                                  max_manifest_depth - 1)
207             else:
208                 self._queue_file(target, dirent)
209                 break
210         if not self._queued_dirents:
211             self._queued_trees.popleft()
212
213     def _work_trees(self):
214         path, stream_name, max_manifest_depth = self._queued_trees[0]
215         d = arvados.util.listdir_recursive(
216             path, max_depth = (None if max_manifest_depth == 0 else 0))
217         if d:
218             self._queue_dirents(stream_name, d)
219         else:
220             self._queued_trees.popleft()
221
222     def _queue_file(self, source, filename=None):
223         assert (self._queued_file is None), "tried to queue more than one file"
224         if not hasattr(source, 'read'):
225             source = open(source, 'rb')
226             self._close_file = True
227         else:
228             self._close_file = False
229         if filename is None:
230             filename = os.path.basename(source.name)
231         self.start_new_file(filename)
232         self._queued_file = source
233
234     def _queue_dirents(self, stream_name, dirents):
235         assert (not self._queued_dirents), "tried to queue more than one tree"
236         self._queued_dirents = deque(sorted(dirents))
237
238     def _queue_tree(self, path, stream_name, max_manifest_depth):
239         self._queued_trees.append((path, stream_name, max_manifest_depth))
240
241     def write_file(self, source, filename=None):
242         self._queue_file(source, filename)
243         self.do_queued_work()
244
245     def write_directory_tree(self,
246                              path, stream_name='.', max_manifest_depth=-1):
247         self._queue_tree(path, stream_name, max_manifest_depth)
248         self.do_queued_work()
249
250     def write(self, newdata):
251         if isinstance(newdata, bytes):
252             pass
253         elif isinstance(newdata, str):
254             newdata = newdata.encode()
255         elif hasattr(newdata, '__iter__'):
256             for s in newdata:
257                 self.write(s)
258             return
259         self._data_buffer.append(newdata)
260         self._data_buffer_len += len(newdata)
261         self._current_stream_length += len(newdata)
262         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
263             self.flush_data()
264
265     def open(self, streampath, filename=None):
266         """open(streampath[, filename]) -> file-like object
267
268         Pass in the path of a file to write to the Collection, either as a
269         single string or as two separate stream name and file name arguments.
270         This method returns a file-like object you can write to add it to the
271         Collection.
272
273         You may only have one file object from the Collection open at a time,
274         so be sure to close the object when you're done.  Using the object in
275         a with statement makes that easy::
276
277           with cwriter.open('./doc/page1.txt') as outfile:
278               outfile.write(page1_data)
279           with cwriter.open('./doc/page2.txt') as outfile:
280               outfile.write(page2_data)
281         """
282         if filename is None:
283             streampath, filename = split(streampath)
284         if self._last_open and not self._last_open.closed:
285             raise errors.AssertionError(
286                 "can't open '{}' when '{}' is still open".format(
287                     filename, self._last_open.name))
288         if streampath != self.current_stream_name():
289             self.start_new_stream(streampath)
290         self.set_current_file_name(filename)
291         self._last_open = _WriterFile(self, filename)
292         return self._last_open
293
294     def flush_data(self):
295         data_buffer = b''.join(self._data_buffer)
296         if data_buffer:
297             self._current_stream_locators.append(
298                 self._my_keep().put(
299                     data_buffer[0:config.KEEP_BLOCK_SIZE],
300                     copies=self.replication))
301             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
302             self._data_buffer_len = len(self._data_buffer[0])
303
304     def start_new_file(self, newfilename=None):
305         self.finish_current_file()
306         self.set_current_file_name(newfilename)
307
308     def set_current_file_name(self, newfilename):
309         if re.search(r'[\t\n]', newfilename):
310             raise errors.AssertionError(
311                 "Manifest filenames cannot contain whitespace: %s" %
312                 newfilename)
313         elif re.search(r'\x00', newfilename):
314             raise errors.AssertionError(
315                 "Manifest filenames cannot contain NUL characters: %s" %
316                 newfilename)
317         self._current_file_name = newfilename
318
319     def current_file_name(self):
320         return self._current_file_name
321
322     def finish_current_file(self):
323         if self._current_file_name is None:
324             if self._current_file_pos == self._current_stream_length:
325                 return
326             raise errors.AssertionError(
327                 "Cannot finish an unnamed file " +
328                 "(%d bytes at offset %d in '%s' stream)" %
329                 (self._current_stream_length - self._current_file_pos,
330                  self._current_file_pos,
331                  self._current_stream_name))
332         self._current_stream_files.append([
333                 self._current_file_pos,
334                 self._current_stream_length - self._current_file_pos,
335                 self._current_file_name])
336         self._current_file_pos = self._current_stream_length
337         self._current_file_name = None
338
339     def start_new_stream(self, newstreamname='.'):
340         self.finish_current_stream()
341         self.set_current_stream_name(newstreamname)
342
343     def set_current_stream_name(self, newstreamname):
344         if re.search(r'[\t\n]', newstreamname):
345             raise errors.AssertionError(
346                 "Manifest stream names cannot contain whitespace: '%s'" %
347                 (newstreamname))
348         self._current_stream_name = '.' if newstreamname=='' else newstreamname
349
350     def current_stream_name(self):
351         return self._current_stream_name
352
353     def finish_current_stream(self):
354         self.finish_current_file()
355         self.flush_data()
356         if not self._current_stream_files:
357             pass
358         elif self._current_stream_name is None:
359             raise errors.AssertionError(
360                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
361                 (self._current_stream_length, len(self._current_stream_files)))
362         else:
363             if not self._current_stream_locators:
364                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
365             self._finished_streams.append([self._current_stream_name,
366                                            self._current_stream_locators,
367                                            self._current_stream_files])
368         self._current_stream_files = []
369         self._current_stream_length = 0
370         self._current_stream_locators = []
371         self._current_stream_name = None
372         self._current_file_pos = 0
373         self._current_file_name = None
374
375     def finish(self):
376         """Store the manifest in Keep and return its locator.
377
378         This is useful for storing manifest fragments (task outputs)
379         temporarily in Keep during a Crunch job.
380
381         In other cases you should make a collection instead, by
382         sending manifest_text() to the API server's "create
383         collection" endpoint.
384         """
385         return self._my_keep().put(self.manifest_text().encode(),
386                                    copies=self.replication)
387
388     def portable_data_hash(self):
389         stripped = self.stripped_manifest().encode()
390         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
391
392     def manifest_text(self):
393         self.finish_current_stream()
394         manifest = ''
395
396         for stream in self._finished_streams:
397             if not re.search(r'^\.(/.*)?$', stream[0]):
398                 manifest += './'
399             manifest += stream[0].replace(' ', '\\040')
400             manifest += ' ' + ' '.join(stream[1])
401             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
402             manifest += "\n"
403
404         return manifest
405
406     def data_locators(self):
407         ret = []
408         for name, locators, files in self._finished_streams:
409             ret += locators
410         return ret
411
412     def save_new(self, name=None):
413         return self._api_client.collections().create(
414             ensure_unique_name=True,
415             body={
416                 'name': name,
417                 'manifest_text': self.manifest_text(),
418             }).execute(num_retries=self.num_retries)
419
420
421 class ResumableCollectionWriter(CollectionWriter):
422     """Deprecated, use Collection instead."""
423
424     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
425                    '_current_stream_locators', '_current_stream_name',
426                    '_current_file_name', '_current_file_pos', '_close_file',
427                    '_data_buffer', '_dependencies', '_finished_streams',
428                    '_queued_dirents', '_queued_trees']
429
430     def __init__(self, api_client=None, **kwargs):
431         self._dependencies = {}
432         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
433
434     @classmethod
435     def from_state(cls, state, *init_args, **init_kwargs):
436         # Try to build a new writer from scratch with the given state.
437         # If the state is not suitable to resume (because files have changed,
438         # been deleted, aren't predictable, etc.), raise a
439         # StaleWriterStateError.  Otherwise, return the initialized writer.
440         # The caller is responsible for calling writer.do_queued_work()
441         # appropriately after it's returned.
442         writer = cls(*init_args, **init_kwargs)
443         for attr_name in cls.STATE_PROPS:
444             attr_value = state[attr_name]
445             attr_class = getattr(writer, attr_name).__class__
446             # Coerce the value into the same type as the initial value, if
447             # needed.
448             if attr_class not in (type(None), attr_value.__class__):
449                 attr_value = attr_class(attr_value)
450             setattr(writer, attr_name, attr_value)
451         # Check dependencies before we try to resume anything.
452         if any(KeepLocator(ls).permission_expired()
453                for ls in writer._current_stream_locators):
454             raise errors.StaleWriterStateError(
455                 "locators include expired permission hint")
456         writer.check_dependencies()
457         if state['_current_file'] is not None:
458             path, pos = state['_current_file']
459             try:
460                 writer._queued_file = open(path, 'rb')
461                 writer._queued_file.seek(pos)
462             except IOError as error:
463                 raise errors.StaleWriterStateError(
464                     "failed to reopen active file {}: {}".format(path, error))
465         return writer
466
467     def check_dependencies(self):
468         for path, orig_stat in listitems(self._dependencies):
469             if not S_ISREG(orig_stat[ST_MODE]):
470                 raise errors.StaleWriterStateError("{} not file".format(path))
471             try:
472                 now_stat = tuple(os.stat(path))
473             except OSError as error:
474                 raise errors.StaleWriterStateError(
475                     "failed to stat {}: {}".format(path, error))
476             if ((not S_ISREG(now_stat[ST_MODE])) or
477                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
478                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
479                 raise errors.StaleWriterStateError("{} changed".format(path))
480
481     def dump_state(self, copy_func=lambda x: x):
482         state = {attr: copy_func(getattr(self, attr))
483                  for attr in self.STATE_PROPS}
484         if self._queued_file is None:
485             state['_current_file'] = None
486         else:
487             state['_current_file'] = (os.path.realpath(self._queued_file.name),
488                                       self._queued_file.tell())
489         return state
490
491     def _queue_file(self, source, filename=None):
492         try:
493             src_path = os.path.realpath(source)
494         except Exception:
495             raise errors.AssertionError("{} not a file path".format(source))
496         try:
497             path_stat = os.stat(src_path)
498         except OSError as stat_error:
499             path_stat = None
500         super(ResumableCollectionWriter, self)._queue_file(source, filename)
501         fd_stat = os.fstat(self._queued_file.fileno())
502         if not S_ISREG(fd_stat.st_mode):
503             # We won't be able to resume from this cache anyway, so don't
504             # worry about further checks.
505             self._dependencies[source] = tuple(fd_stat)
506         elif path_stat is None:
507             raise errors.AssertionError(
508                 "could not stat {}: {}".format(source, stat_error))
509         elif path_stat.st_ino != fd_stat.st_ino:
510             raise errors.AssertionError(
511                 "{} changed between open and stat calls".format(source))
512         else:
513             self._dependencies[src_path] = tuple(fd_stat)
514
515     def write(self, data):
516         if self._queued_file is None:
517             raise errors.AssertionError(
518                 "resumable writer can't accept unsourced data")
519         return super(ResumableCollectionWriter, self).write(data)
520
521
522 ADD = "add"
523 DEL = "del"
524 MOD = "mod"
525 TOK = "tok"
526 FILE = "file"
527 COLLECTION = "collection"
528
529 class RichCollectionBase(CollectionBase):
530     """Base class for Collections and Subcollections.
531
532     Implements the majority of functionality relating to accessing items in the
533     Collection.
534
535     """
536
537     def __init__(self, parent=None):
538         self.parent = parent
539         self._committed = False
540         self._has_remote_blocks = False
541         self._callback = None
542         self._items = {}
543
544     def _my_api(self):
545         raise NotImplementedError()
546
547     def _my_keep(self):
548         raise NotImplementedError()
549
550     def _my_block_manager(self):
551         raise NotImplementedError()
552
553     def writable(self):
554         raise NotImplementedError()
555
556     def root_collection(self):
557         raise NotImplementedError()
558
559     def notify(self, event, collection, name, item):
560         raise NotImplementedError()
561
562     def stream_name(self):
563         raise NotImplementedError()
564
565     @synchronized
566     def has_remote_blocks(self):
567         """Recursively check for a +R segment locator signature."""
568
569         if self._has_remote_blocks:
570             return True
571         for item in self:
572             if self[item].has_remote_blocks():
573                 return True
574         return False
575
576     @synchronized
577     def set_has_remote_blocks(self, val):
578         self._has_remote_blocks = val
579         if self.parent:
580             self.parent.set_has_remote_blocks(val)
581
582     @must_be_writable
583     @synchronized
584     def find_or_create(self, path, create_type):
585         """Recursively search the specified file path.
586
587         May return either a `Collection` or `ArvadosFile`.  If not found, will
588         create a new item at the specified path based on `create_type`.  Will
589         create intermediate subcollections needed to contain the final item in
590         the path.
591
592         :create_type:
593           One of `arvados.collection.FILE` or
594           `arvados.collection.COLLECTION`.  If the path is not found, and value
595           of create_type is FILE then create and return a new ArvadosFile for
596           the last path component.  If COLLECTION, then create and return a new
597           Collection for the last path component.
598
599         """
600
601         pathcomponents = path.split("/", 1)
602         if pathcomponents[0]:
603             # Don't allow naming files/dirs \\056
604             if pathcomponents[0] == "\\056":
605                 raise IOError(errno.EINVAL, "Invalid name", pathcomponents[0])
606             item = self._items.get(pathcomponents[0])
607             if len(pathcomponents) == 1:
608                 if item is None:
609                     # create new file
610                     if create_type == COLLECTION:
611                         item = Subcollection(self, pathcomponents[0])
612                     else:
613                         item = ArvadosFile(self, pathcomponents[0])
614                     self._items[pathcomponents[0]] = item
615                     self.set_committed(False)
616                     self.notify(ADD, self, pathcomponents[0], item)
617                 return item
618             else:
619                 if item is None:
620                     # create new collection
621                     item = Subcollection(self, pathcomponents[0])
622                     self._items[pathcomponents[0]] = item
623                     self.set_committed(False)
624                     self.notify(ADD, self, pathcomponents[0], item)
625                 if isinstance(item, RichCollectionBase):
626                     return item.find_or_create(pathcomponents[1], create_type)
627                 else:
628                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
629         else:
630             return self
631
632     @synchronized
633     def find(self, path):
634         """Recursively search the specified file path.
635
636         May return either a Collection or ArvadosFile. Return None if not
637         found.
638         If path is invalid (ex: starts with '/'), an IOError exception will be
639         raised.
640
641         """
642         if not path:
643             raise errors.ArgumentError("Parameter 'path' is empty.")
644
645         pathcomponents = path.split("/", 1)
646         if pathcomponents[0] == '':
647             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
648
649         item = self._items.get(pathcomponents[0])
650         if item is None:
651             return None
652         elif len(pathcomponents) == 1:
653             return item
654         else:
655             if isinstance(item, RichCollectionBase):
656                 if pathcomponents[1]:
657                     return item.find(pathcomponents[1])
658                 else:
659                     return item
660             else:
661                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
662
663     @synchronized
664     def mkdirs(self, path):
665         """Recursive subcollection create.
666
667         Like `os.makedirs()`.  Will create intermediate subcollections needed
668         to contain the leaf subcollection path.
669
670         """
671
672         if self.find(path) != None:
673             raise IOError(errno.EEXIST, "Directory or file exists", path)
674
675         return self.find_or_create(path, COLLECTION)
676
677     def open(self, path, mode="r", encoding=None):
678         """Open a file-like object for access.
679
680         :path:
681           path to a file in the collection
682         :mode:
683           a string consisting of "r", "w", or "a", optionally followed
684           by "b" or "t", optionally followed by "+".
685           :"b":
686             binary mode: write() accepts bytes, read() returns bytes.
687           :"t":
688             text mode (default): write() accepts strings, read() returns strings.
689           :"r":
690             opens for reading
691           :"r+":
692             opens for reading and writing.  Reads/writes share a file pointer.
693           :"w", "w+":
694             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
695           :"a", "a+":
696             opens for reading and writing.  All writes are appended to
697             the end of the file.  Writing does not affect the file pointer for
698             reading.
699
700         """
701
702         if not re.search(r'^[rwa][bt]?\+?$', mode):
703             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
704
705         if mode[0] == 'r' and '+' not in mode:
706             fclass = ArvadosFileReader
707             arvfile = self.find(path)
708         elif not self.writable():
709             raise IOError(errno.EROFS, "Collection is read only")
710         else:
711             fclass = ArvadosFileWriter
712             arvfile = self.find_or_create(path, FILE)
713
714         if arvfile is None:
715             raise IOError(errno.ENOENT, "File not found", path)
716         if not isinstance(arvfile, ArvadosFile):
717             raise IOError(errno.EISDIR, "Is a directory", path)
718
719         if mode[0] == 'w':
720             arvfile.truncate(0)
721
722         binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
723         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
724         if 'b' not in mode:
725             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
726             f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
727         return f
728
729     def modified(self):
730         """Determine if the collection has been modified since last commited."""
731         return not self.committed()
732
733     @synchronized
734     def committed(self):
735         """Determine if the collection has been committed to the API server."""
736         return self._committed
737
738     @synchronized
739     def set_committed(self, value=True):
740         """Recursively set committed flag.
741
742         If value is True, set committed to be True for this and all children.
743
744         If value is False, set committed to be False for this and all parents.
745         """
746         if value == self._committed:
747             return
748         if value:
749             for k,v in listitems(self._items):
750                 v.set_committed(True)
751             self._committed = True
752         else:
753             self._committed = False
754             if self.parent is not None:
755                 self.parent.set_committed(False)
756
757     @synchronized
758     def __iter__(self):
759         """Iterate over names of files and collections contained in this collection."""
760         return iter(viewkeys(self._items))
761
762     @synchronized
763     def __getitem__(self, k):
764         """Get a file or collection that is directly contained by this collection.
765
766         If you want to search a path, use `find()` instead.
767
768         """
769         return self._items[k]
770
771     @synchronized
772     def __contains__(self, k):
773         """Test if there is a file or collection a directly contained by this collection."""
774         return k in self._items
775
776     @synchronized
777     def __len__(self):
778         """Get the number of items directly contained in this collection."""
779         return len(self._items)
780
781     @must_be_writable
782     @synchronized
783     def __delitem__(self, p):
784         """Delete an item by name which is directly contained by this collection."""
785         del self._items[p]
786         self.set_committed(False)
787         self.notify(DEL, self, p, None)
788
789     @synchronized
790     def keys(self):
791         """Get a list of names of files and collections directly contained in this collection."""
792         return self._items.keys()
793
794     @synchronized
795     def values(self):
796         """Get a list of files and collection objects directly contained in this collection."""
797         return listvalues(self._items)
798
799     @synchronized
800     def items(self):
801         """Get a list of (name, object) tuples directly contained in this collection."""
802         return listitems(self._items)
803
804     def exists(self, path):
805         """Test if there is a file or collection at `path`."""
806         return self.find(path) is not None
807
808     @must_be_writable
809     @synchronized
810     def remove(self, path, recursive=False):
811         """Remove the file or subcollection (directory) at `path`.
812
813         :recursive:
814           Specify whether to remove non-empty subcollections (True), or raise an error (False).
815         """
816
817         if not path:
818             raise errors.ArgumentError("Parameter 'path' is empty.")
819
820         pathcomponents = path.split("/", 1)
821         item = self._items.get(pathcomponents[0])
822         if item is None:
823             raise IOError(errno.ENOENT, "File not found", path)
824         if len(pathcomponents) == 1:
825             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
826                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
827             deleteditem = self._items[pathcomponents[0]]
828             del self._items[pathcomponents[0]]
829             self.set_committed(False)
830             self.notify(DEL, self, pathcomponents[0], deleteditem)
831         else:
832             item.remove(pathcomponents[1])
833
834     def _clonefrom(self, source):
835         for k,v in listitems(source):
836             self._items[k] = v.clone(self, k)
837
838     def clone(self):
839         raise NotImplementedError()
840
841     @must_be_writable
842     @synchronized
843     def add(self, source_obj, target_name, overwrite=False, reparent=False):
844         """Copy or move a file or subcollection to this collection.
845
846         :source_obj:
847           An ArvadosFile, or Subcollection object
848
849         :target_name:
850           Destination item name.  If the target name already exists and is a
851           file, this will raise an error unless you specify `overwrite=True`.
852
853         :overwrite:
854           Whether to overwrite target file if it already exists.
855
856         :reparent:
857           If True, source_obj will be moved from its parent collection to this collection.
858           If False, source_obj will be copied and the parent collection will be
859           unmodified.
860
861         """
862
863         if target_name in self and not overwrite:
864             raise IOError(errno.EEXIST, "File already exists", target_name)
865
866         modified_from = None
867         if target_name in self:
868             modified_from = self[target_name]
869
870         # Actually make the move or copy.
871         if reparent:
872             source_obj._reparent(self, target_name)
873             item = source_obj
874         else:
875             item = source_obj.clone(self, target_name)
876
877         self._items[target_name] = item
878         self.set_committed(False)
879         if not self._has_remote_blocks and source_obj.has_remote_blocks():
880             self.set_has_remote_blocks(True)
881
882         if modified_from:
883             self.notify(MOD, self, target_name, (modified_from, item))
884         else:
885             self.notify(ADD, self, target_name, item)
886
887     def _get_src_target(self, source, target_path, source_collection, create_dest):
888         if source_collection is None:
889             source_collection = self
890
891         # Find the object
892         if isinstance(source, basestring):
893             source_obj = source_collection.find(source)
894             if source_obj is None:
895                 raise IOError(errno.ENOENT, "File not found", source)
896             sourcecomponents = source.split("/")
897         else:
898             source_obj = source
899             sourcecomponents = None
900
901         # Find parent collection the target path
902         targetcomponents = target_path.split("/")
903
904         # Determine the name to use.
905         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
906
907         if not target_name:
908             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
909
910         if create_dest:
911             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
912         else:
913             if len(targetcomponents) > 1:
914                 target_dir = self.find("/".join(targetcomponents[0:-1]))
915             else:
916                 target_dir = self
917
918         if target_dir is None:
919             raise IOError(errno.ENOENT, "Target directory not found", target_name)
920
921         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
922             target_dir = target_dir[target_name]
923             target_name = sourcecomponents[-1]
924
925         return (source_obj, target_dir, target_name)
926
927     @must_be_writable
928     @synchronized
929     def copy(self, source, target_path, source_collection=None, overwrite=False):
930         """Copy a file or subcollection to a new path in this collection.
931
932         :source:
933           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
934
935         :target_path:
936           Destination file or path.  If the target path already exists and is a
937           subcollection, the item will be placed inside the subcollection.  If
938           the target path already exists and is a file, this will raise an error
939           unless you specify `overwrite=True`.
940
941         :source_collection:
942           Collection to copy `source_path` from (default `self`)
943
944         :overwrite:
945           Whether to overwrite target file if it already exists.
946         """
947
948         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
949         target_dir.add(source_obj, target_name, overwrite, False)
950
951     @must_be_writable
952     @synchronized
953     def rename(self, source, target_path, source_collection=None, overwrite=False):
954         """Move a file or subcollection from `source_collection` to a new path in this collection.
955
956         :source:
957           A string with a path to source file or subcollection.
958
959         :target_path:
960           Destination file or path.  If the target path already exists and is a
961           subcollection, the item will be placed inside the subcollection.  If
962           the target path already exists and is a file, this will raise an error
963           unless you specify `overwrite=True`.
964
965         :source_collection:
966           Collection to copy `source_path` from (default `self`)
967
968         :overwrite:
969           Whether to overwrite target file if it already exists.
970         """
971
972         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
973         if not source_obj.writable():
974             raise IOError(errno.EROFS, "Source collection is read only", source)
975         target_dir.add(source_obj, target_name, overwrite, True)
976
977     def portable_manifest_text(self, stream_name="."):
978         """Get the manifest text for this collection, sub collections and files.
979
980         This method does not flush outstanding blocks to Keep.  It will return
981         a normalized manifest with access tokens stripped.
982
983         :stream_name:
984           Name to use for this stream (directory)
985
986         """
987         return self._get_manifest_text(stream_name, True, True)
988
989     @synchronized
990     def manifest_text(self, stream_name=".", strip=False, normalize=False,
991                       only_committed=False):
992         """Get the manifest text for this collection, sub collections and files.
993
994         This method will flush outstanding blocks to Keep.  By default, it will
995         not normalize an unmodified manifest or strip access tokens.
996
997         :stream_name:
998           Name to use for this stream (directory)
999
1000         :strip:
1001           If True, remove signing tokens from block locators if present.
1002           If False (default), block locators are left unchanged.
1003
1004         :normalize:
1005           If True, always export the manifest text in normalized form
1006           even if the Collection is not modified.  If False (default) and the collection
1007           is not modified, return the original manifest text even if it is not
1008           in normalized form.
1009
1010         :only_committed:
1011           If True, don't commit pending blocks.
1012
1013         """
1014
1015         if not only_committed:
1016             self._my_block_manager().commit_all()
1017         return self._get_manifest_text(stream_name, strip, normalize,
1018                                        only_committed=only_committed)
1019
1020     @synchronized
1021     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1022         """Get the manifest text for this collection, sub collections and files.
1023
1024         :stream_name:
1025           Name to use for this stream (directory)
1026
1027         :strip:
1028           If True, remove signing tokens from block locators if present.
1029           If False (default), block locators are left unchanged.
1030
1031         :normalize:
1032           If True, always export the manifest text in normalized form
1033           even if the Collection is not modified.  If False (default) and the collection
1034           is not modified, return the original manifest text even if it is not
1035           in normalized form.
1036
1037         :only_committed:
1038           If True, only include blocks that were already committed to Keep.
1039
1040         """
1041
1042         if not self.committed() or self._manifest_text is None or normalize:
1043             stream = {}
1044             buf = []
1045             sorted_keys = sorted(self.keys())
1046             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1047                 # Create a stream per file `k`
1048                 arvfile = self[filename]
1049                 filestream = []
1050                 for segment in arvfile.segments():
1051                     loc = segment.locator
1052                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1053                         if only_committed:
1054                             continue
1055                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1056                     if strip:
1057                         loc = KeepLocator(loc).stripped()
1058                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1059                                          segment.segment_offset, segment.range_size))
1060                 stream[filename] = filestream
1061             if stream:
1062                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1063             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1064                 buf.append(self[dirname].manifest_text(
1065                     stream_name=os.path.join(stream_name, dirname),
1066                     strip=strip, normalize=True, only_committed=only_committed))
1067             return "".join(buf)
1068         else:
1069             if strip:
1070                 return self.stripped_manifest()
1071             else:
1072                 return self._manifest_text
1073
1074     @synchronized
1075     def _copy_remote_blocks(self, remote_blocks={}):
1076         """Scan through the entire collection and ask Keep to copy remote blocks.
1077
1078         When accessing a remote collection, blocks will have a remote signature
1079         (+R instead of +A). Collect these signatures and request Keep to copy the
1080         blocks to the local cluster, returning local (+A) signatures.
1081
1082         :remote_blocks:
1083           Shared cache of remote to local block mappings. This is used to avoid
1084           doing extra work when blocks are shared by more than one file in
1085           different subdirectories.
1086
1087         """
1088         for item in self:
1089             remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1090         return remote_blocks
1091
1092     @synchronized
1093     def diff(self, end_collection, prefix=".", holding_collection=None):
1094         """Generate list of add/modify/delete actions.
1095
1096         When given to `apply`, will change `self` to match `end_collection`
1097
1098         """
1099         changes = []
1100         if holding_collection is None:
1101             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1102         for k in self:
1103             if k not in end_collection:
1104                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1105         for k in end_collection:
1106             if k in self:
1107                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1108                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1109                 elif end_collection[k] != self[k]:
1110                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1111                 else:
1112                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1113             else:
1114                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1115         return changes
1116
1117     @must_be_writable
1118     @synchronized
1119     def apply(self, changes):
1120         """Apply changes from `diff`.
1121
1122         If a change conflicts with a local change, it will be saved to an
1123         alternate path indicating the conflict.
1124
1125         """
1126         if changes:
1127             self.set_committed(False)
1128         for change in changes:
1129             event_type = change[0]
1130             path = change[1]
1131             initial = change[2]
1132             local = self.find(path)
1133             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1134                                                                     time.gmtime()))
1135             if event_type == ADD:
1136                 if local is None:
1137                     # No local file at path, safe to copy over new file
1138                     self.copy(initial, path)
1139                 elif local is not None and local != initial:
1140                     # There is already local file and it is different:
1141                     # save change to conflict file.
1142                     self.copy(initial, conflictpath)
1143             elif event_type == MOD or event_type == TOK:
1144                 final = change[3]
1145                 if local == initial:
1146                     # Local matches the "initial" item so it has not
1147                     # changed locally and is safe to update.
1148                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1149                         # Replace contents of local file with new contents
1150                         local.replace_contents(final)
1151                     else:
1152                         # Overwrite path with new item; this can happen if
1153                         # path was a file and is now a collection or vice versa
1154                         self.copy(final, path, overwrite=True)
1155                 else:
1156                     # Local is missing (presumably deleted) or local doesn't
1157                     # match the "start" value, so save change to conflict file
1158                     self.copy(final, conflictpath)
1159             elif event_type == DEL:
1160                 if local == initial:
1161                     # Local item matches "initial" value, so it is safe to remove.
1162                     self.remove(path, recursive=True)
1163                 # else, the file is modified or already removed, in either
1164                 # case we don't want to try to remove it.
1165
1166     def portable_data_hash(self):
1167         """Get the portable data hash for this collection's manifest."""
1168         if self._manifest_locator and self.committed():
1169             # If the collection is already saved on the API server, and it's committed
1170             # then return API server's PDH response.
1171             return self._portable_data_hash
1172         else:
1173             stripped = self.portable_manifest_text().encode()
1174             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1175
1176     @synchronized
1177     def subscribe(self, callback):
1178         if self._callback is None:
1179             self._callback = callback
1180         else:
1181             raise errors.ArgumentError("A callback is already set on this collection.")
1182
1183     @synchronized
1184     def unsubscribe(self):
1185         if self._callback is not None:
1186             self._callback = None
1187
1188     @synchronized
1189     def notify(self, event, collection, name, item):
1190         if self._callback:
1191             self._callback(event, collection, name, item)
1192         self.root_collection().notify(event, collection, name, item)
1193
1194     @synchronized
1195     def __eq__(self, other):
1196         if other is self:
1197             return True
1198         if not isinstance(other, RichCollectionBase):
1199             return False
1200         if len(self._items) != len(other):
1201             return False
1202         for k in self._items:
1203             if k not in other:
1204                 return False
1205             if self._items[k] != other[k]:
1206                 return False
1207         return True
1208
1209     def __ne__(self, other):
1210         return not self.__eq__(other)
1211
1212     @synchronized
1213     def flush(self):
1214         """Flush bufferblocks to Keep."""
1215         for e in listvalues(self):
1216             e.flush()
1217
1218
1219 class Collection(RichCollectionBase):
1220     """Represents the root of an Arvados Collection.
1221
1222     This class is threadsafe.  The root collection object, all subcollections
1223     and files are protected by a single lock (i.e. each access locks the entire
1224     collection).
1225
1226     Brief summary of
1227     useful methods:
1228
1229     :To read an existing file:
1230       `c.open("myfile", "r")`
1231
1232     :To write a new file:
1233       `c.open("myfile", "w")`
1234
1235     :To determine if a file exists:
1236       `c.find("myfile") is not None`
1237
1238     :To copy a file:
1239       `c.copy("source", "dest")`
1240
1241     :To delete a file:
1242       `c.remove("myfile")`
1243
1244     :To save to an existing collection record:
1245       `c.save()`
1246
1247     :To save a new collection record:
1248     `c.save_new()`
1249
1250     :To merge remote changes into this object:
1251       `c.update()`
1252
1253     Must be associated with an API server Collection record (during
1254     initialization, or using `save_new`) to use `save` or `update`
1255
1256     """
1257
1258     def __init__(self, manifest_locator_or_text=None,
1259                  api_client=None,
1260                  keep_client=None,
1261                  num_retries=None,
1262                  parent=None,
1263                  apiconfig=None,
1264                  block_manager=None,
1265                  replication_desired=None,
1266                  put_threads=None):
1267         """Collection constructor.
1268
1269         :manifest_locator_or_text:
1270           An Arvados collection UUID, portable data hash, raw manifest
1271           text, or (if creating an empty collection) None.
1272
1273         :parent:
1274           the parent Collection, may be None.
1275
1276         :apiconfig:
1277           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1278           Prefer this over supplying your own api_client and keep_client (except in testing).
1279           Will use default config settings if not specified.
1280
1281         :api_client:
1282           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1283
1284         :keep_client:
1285           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1286
1287         :num_retries:
1288           the number of retries for API and Keep requests.
1289
1290         :block_manager:
1291           the block manager to use.  If not specified, create one.
1292
1293         :replication_desired:
1294           How many copies should Arvados maintain. If None, API server default
1295           configuration applies. If not None, this value will also be used
1296           for determining the number of block copies being written.
1297
1298         """
1299         super(Collection, self).__init__(parent)
1300         self._api_client = api_client
1301         self._keep_client = keep_client
1302         self._block_manager = block_manager
1303         self.replication_desired = replication_desired
1304         self.put_threads = put_threads
1305
1306         if apiconfig:
1307             self._config = apiconfig
1308         else:
1309             self._config = config.settings()
1310
1311         self.num_retries = num_retries if num_retries is not None else 0
1312         self._manifest_locator = None
1313         self._manifest_text = None
1314         self._portable_data_hash = None
1315         self._api_response = None
1316         self._past_versions = set()
1317
1318         self.lock = threading.RLock()
1319         self.events = None
1320
1321         if manifest_locator_or_text:
1322             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1323                 self._manifest_locator = manifest_locator_or_text
1324             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1325                 self._manifest_locator = manifest_locator_or_text
1326                 if not self._has_local_collection_uuid():
1327                     self._has_remote_blocks = True
1328             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1329                 self._manifest_text = manifest_locator_or_text
1330                 if '+R' in self._manifest_text:
1331                     self._has_remote_blocks = True
1332             else:
1333                 raise errors.ArgumentError(
1334                     "Argument to CollectionReader is not a manifest or a collection UUID")
1335
1336             try:
1337                 self._populate()
1338             except (IOError, errors.SyntaxError) as e:
1339                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1340
1341     def root_collection(self):
1342         return self
1343
1344     def get_properties(self):
1345         if self._api_response and self._api_response["properties"]:
1346             return self._api_response["properties"]
1347         else:
1348             return {}
1349
1350     def get_trash_at(self):
1351         if self._api_response and self._api_response["trash_at"]:
1352             return ciso8601.parse_datetime(self._api_response["trash_at"])
1353         else:
1354             return None
1355
1356     def stream_name(self):
1357         return "."
1358
1359     def writable(self):
1360         return True
1361
1362     @synchronized
1363     def known_past_version(self, modified_at_and_portable_data_hash):
1364         return modified_at_and_portable_data_hash in self._past_versions
1365
1366     @synchronized
1367     @retry_method
1368     def update(self, other=None, num_retries=None):
1369         """Merge the latest collection on the API server with the current collection."""
1370
1371         if other is None:
1372             if self._manifest_locator is None:
1373                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1374             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1375             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1376                 response.get("portable_data_hash") != self.portable_data_hash()):
1377                 # The record on the server is different from our current one, but we've seen it before,
1378                 # so ignore it because it's already been merged.
1379                 # However, if it's the same as our current record, proceed with the update, because we want to update
1380                 # our tokens.
1381                 return
1382             else:
1383                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1384             other = CollectionReader(response["manifest_text"])
1385         baseline = CollectionReader(self._manifest_text)
1386         self.apply(baseline.diff(other))
1387         self._manifest_text = self.manifest_text()
1388
1389     @synchronized
1390     def _my_api(self):
1391         if self._api_client is None:
1392             self._api_client = ThreadSafeApiCache(self._config)
1393             if self._keep_client is None:
1394                 self._keep_client = self._api_client.keep
1395         return self._api_client
1396
1397     @synchronized
1398     def _my_keep(self):
1399         if self._keep_client is None:
1400             if self._api_client is None:
1401                 self._my_api()
1402             else:
1403                 self._keep_client = KeepClient(api_client=self._api_client)
1404         return self._keep_client
1405
1406     @synchronized
1407     def _my_block_manager(self):
1408         if self._block_manager is None:
1409             copies = (self.replication_desired or
1410                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1411                                                    2))
1412             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1413         return self._block_manager
1414
1415     def _remember_api_response(self, response):
1416         self._api_response = response
1417         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1418
1419     def _populate_from_api_server(self):
1420         # As in KeepClient itself, we must wait until the last
1421         # possible moment to instantiate an API client, in order to
1422         # avoid tripping up clients that don't have access to an API
1423         # server.  If we do build one, make sure our Keep client uses
1424         # it.  If instantiation fails, we'll fall back to the except
1425         # clause, just like any other Collection lookup
1426         # failure. Return an exception, or None if successful.
1427         self._remember_api_response(self._my_api().collections().get(
1428             uuid=self._manifest_locator).execute(
1429                 num_retries=self.num_retries))
1430         self._manifest_text = self._api_response['manifest_text']
1431         self._portable_data_hash = self._api_response['portable_data_hash']
1432         # If not overriden via kwargs, we should try to load the
1433         # replication_desired from the API server
1434         if self.replication_desired is None:
1435             self.replication_desired = self._api_response.get('replication_desired', None)
1436
1437     def _populate(self):
1438         if self._manifest_text is None:
1439             if self._manifest_locator is None:
1440                 return
1441             else:
1442                 self._populate_from_api_server()
1443         self._baseline_manifest = self._manifest_text
1444         self._import_manifest(self._manifest_text)
1445
1446     def _has_collection_uuid(self):
1447         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1448
1449     def _has_local_collection_uuid(self):
1450         return self._has_collection_uuid and \
1451             self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1452
1453     def __enter__(self):
1454         return self
1455
1456     def __exit__(self, exc_type, exc_value, traceback):
1457         """Support scoped auto-commit in a with: block."""
1458         if exc_type is None:
1459             if self.writable() and self._has_collection_uuid():
1460                 self.save()
1461         self.stop_threads()
1462
1463     def stop_threads(self):
1464         if self._block_manager is not None:
1465             self._block_manager.stop_threads()
1466
1467     @synchronized
1468     def manifest_locator(self):
1469         """Get the manifest locator, if any.
1470
1471         The manifest locator will be set when the collection is loaded from an
1472         API server record or the portable data hash of a manifest.
1473
1474         The manifest locator will be None if the collection is newly created or
1475         was created directly from manifest text.  The method `save_new()` will
1476         assign a manifest locator.
1477
1478         """
1479         return self._manifest_locator
1480
1481     @synchronized
1482     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1483         if new_config is None:
1484             new_config = self._config
1485         if readonly:
1486             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1487         else:
1488             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1489
1490         newcollection._clonefrom(self)
1491         return newcollection
1492
1493     @synchronized
1494     def api_response(self):
1495         """Returns information about this Collection fetched from the API server.
1496
1497         If the Collection exists in Keep but not the API server, currently
1498         returns None.  Future versions may provide a synthetic response.
1499
1500         """
1501         return self._api_response
1502
1503     def find_or_create(self, path, create_type):
1504         """See `RichCollectionBase.find_or_create`"""
1505         if path == ".":
1506             return self
1507         else:
1508             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1509
1510     def find(self, path):
1511         """See `RichCollectionBase.find`"""
1512         if path == ".":
1513             return self
1514         else:
1515             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1516
1517     def remove(self, path, recursive=False):
1518         """See `RichCollectionBase.remove`"""
1519         if path == ".":
1520             raise errors.ArgumentError("Cannot remove '.'")
1521         else:
1522             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1523
1524     @must_be_writable
1525     @synchronized
1526     @retry_method
1527     def save(self,
1528              properties=None,
1529              storage_classes=None,
1530              trash_at=None,
1531              merge=True,
1532              num_retries=None):
1533         """Save collection to an existing collection record.
1534
1535         Commit pending buffer blocks to Keep, merge with remote record (if
1536         merge=True, the default), and update the collection record. Returns
1537         the current manifest text.
1538
1539         Will raise AssertionError if not associated with a collection record on
1540         the API server.  If you want to save a manifest to Keep only, see
1541         `save_new()`.
1542
1543         :properties:
1544           Additional properties of collection. This value will replace any existing
1545           properties of collection.
1546
1547         :storage_classes:
1548           Specify desirable storage classes to be used when writing data to Keep.
1549
1550         :trash_at:
1551           A collection is *expiring* when it has a *trash_at* time in the future.
1552           An expiring collection can be accessed as normal,
1553           but is scheduled to be trashed automatically at the *trash_at* time.
1554
1555         :merge:
1556           Update and merge remote changes before saving.  Otherwise, any
1557           remote changes will be ignored and overwritten.
1558
1559         :num_retries:
1560           Retry count on API calls (if None,  use the collection default)
1561
1562         """
1563         if properties and type(properties) is not dict:
1564             raise errors.ArgumentError("properties must be dictionary type.")
1565
1566         if storage_classes and type(storage_classes) is not list:
1567             raise errors.ArgumentError("storage_classes must be list type.")
1568
1569         if trash_at and type(trash_at) is not datetime.datetime:
1570             raise errors.ArgumentError("trash_at must be datetime type.")
1571
1572         body={}
1573         if properties:
1574             body["properties"] = properties
1575         if storage_classes:
1576             body["storage_classes_desired"] = storage_classes
1577         if trash_at:
1578             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1579             body["trash_at"] = t
1580
1581         if not self.committed():
1582             if self._has_remote_blocks:
1583                 # Copy any remote blocks to the local cluster.
1584                 self._copy_remote_blocks(remote_blocks={})
1585                 self._has_remote_blocks = False
1586             if not self._has_collection_uuid():
1587                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1588             elif not self._has_local_collection_uuid():
1589                 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1590
1591             self._my_block_manager().commit_all()
1592
1593             if merge:
1594                 self.update()
1595
1596             text = self.manifest_text(strip=False)
1597             body['manifest_text'] = text
1598
1599             self._remember_api_response(self._my_api().collections().update(
1600                 uuid=self._manifest_locator,
1601                 body=body
1602                 ).execute(num_retries=num_retries))
1603             self._manifest_text = self._api_response["manifest_text"]
1604             self._portable_data_hash = self._api_response["portable_data_hash"]
1605             self.set_committed(True)
1606         elif body:
1607             self._remember_api_response(self._my_api().collections().update(
1608                 uuid=self._manifest_locator,
1609                 body=body
1610                 ).execute(num_retries=num_retries))
1611
1612         return self._manifest_text
1613
1614
1615     @must_be_writable
1616     @synchronized
1617     @retry_method
1618     def save_new(self, name=None,
1619                  create_collection_record=True,
1620                  owner_uuid=None,
1621                  properties=None,
1622                  storage_classes=None,
1623                  trash_at=None,
1624                  ensure_unique_name=False,
1625                  num_retries=None):
1626         """Save collection to a new collection record.
1627
1628         Commit pending buffer blocks to Keep and, when create_collection_record
1629         is True (default), create a new collection record.  After creating a
1630         new collection record, this Collection object will be associated with
1631         the new record used by `save()`. Returns the current manifest text.
1632
1633         :name:
1634           The collection name.
1635
1636         :create_collection_record:
1637            If True, create a collection record on the API server.
1638            If False, only commit blocks to Keep and return the manifest text.
1639
1640         :owner_uuid:
1641           the user, or project uuid that will own this collection.
1642           If None, defaults to the current user.
1643
1644         :properties:
1645           Additional properties of collection. This value will replace any existing
1646           properties of collection.
1647
1648         :storage_classes:
1649           Specify desirable storage classes to be used when writing data to Keep.
1650
1651         :trash_at:
1652           A collection is *expiring* when it has a *trash_at* time in the future.
1653           An expiring collection can be accessed as normal,
1654           but is scheduled to be trashed automatically at the *trash_at* time.
1655
1656         :ensure_unique_name:
1657           If True, ask the API server to rename the collection
1658           if it conflicts with a collection with the same name and owner.  If
1659           False, a name conflict will result in an error.
1660
1661         :num_retries:
1662           Retry count on API calls (if None,  use the collection default)
1663
1664         """
1665         if properties and type(properties) is not dict:
1666             raise errors.ArgumentError("properties must be dictionary type.")
1667
1668         if storage_classes and type(storage_classes) is not list:
1669             raise errors.ArgumentError("storage_classes must be list type.")
1670
1671         if trash_at and type(trash_at) is not datetime.datetime:
1672             raise errors.ArgumentError("trash_at must be datetime type.")
1673
1674         if self._has_remote_blocks:
1675             # Copy any remote blocks to the local cluster.
1676             self._copy_remote_blocks(remote_blocks={})
1677             self._has_remote_blocks = False
1678
1679         self._my_block_manager().commit_all()
1680         text = self.manifest_text(strip=False)
1681
1682         if create_collection_record:
1683             if name is None:
1684                 name = "New collection"
1685                 ensure_unique_name = True
1686
1687             body = {"manifest_text": text,
1688                     "name": name,
1689                     "replication_desired": self.replication_desired}
1690             if owner_uuid:
1691                 body["owner_uuid"] = owner_uuid
1692             if properties:
1693                 body["properties"] = properties
1694             if storage_classes:
1695                 body["storage_classes_desired"] = storage_classes
1696             if trash_at:
1697                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1698                 body["trash_at"] = t
1699
1700             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1701             text = self._api_response["manifest_text"]
1702
1703             self._manifest_locator = self._api_response["uuid"]
1704             self._portable_data_hash = self._api_response["portable_data_hash"]
1705
1706             self._manifest_text = text
1707             self.set_committed(True)
1708
1709         return text
1710
1711     _token_re = re.compile(r'(\S+)(\s+|$)')
1712     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1713     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1714
1715     def _unescape_manifest_path(self, path):
1716         return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1717
1718     @synchronized
1719     def _import_manifest(self, manifest_text):
1720         """Import a manifest into a `Collection`.
1721
1722         :manifest_text:
1723           The manifest text to import from.
1724
1725         """
1726         if len(self) > 0:
1727             raise ArgumentError("Can only import manifest into an empty collection")
1728
1729         STREAM_NAME = 0
1730         BLOCKS = 1
1731         SEGMENTS = 2
1732
1733         stream_name = None
1734         state = STREAM_NAME
1735
1736         for token_and_separator in self._token_re.finditer(manifest_text):
1737             tok = token_and_separator.group(1)
1738             sep = token_and_separator.group(2)
1739
1740             if state == STREAM_NAME:
1741                 # starting a new stream
1742                 stream_name = self._unescape_manifest_path(tok)
1743                 blocks = []
1744                 segments = []
1745                 streamoffset = 0
1746                 state = BLOCKS
1747                 self.find_or_create(stream_name, COLLECTION)
1748                 continue
1749
1750             if state == BLOCKS:
1751                 block_locator = self._block_re.match(tok)
1752                 if block_locator:
1753                     blocksize = int(block_locator.group(1))
1754                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1755                     streamoffset += blocksize
1756                 else:
1757                     state = SEGMENTS
1758
1759             if state == SEGMENTS:
1760                 file_segment = self._segment_re.match(tok)
1761                 if file_segment:
1762                     pos = int(file_segment.group(1))
1763                     size = int(file_segment.group(2))
1764                     name = self._unescape_manifest_path(file_segment.group(3))
1765                     if name.split('/')[-1] == '.':
1766                         # placeholder for persisting an empty directory, not a real file
1767                         if len(name) > 2:
1768                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1769                     else:
1770                         filepath = os.path.join(stream_name, name)
1771                         afile = self.find_or_create(filepath, FILE)
1772                         if isinstance(afile, ArvadosFile):
1773                             afile.add_segment(blocks, pos, size)
1774                         else:
1775                             raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1776                 else:
1777                     # error!
1778                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1779
1780             if sep == "\n":
1781                 stream_name = None
1782                 state = STREAM_NAME
1783
1784         self.set_committed(True)
1785
1786     @synchronized
1787     def notify(self, event, collection, name, item):
1788         if self._callback:
1789             self._callback(event, collection, name, item)
1790
1791
1792 class Subcollection(RichCollectionBase):
1793     """This is a subdirectory within a collection that doesn't have its own API
1794     server record.
1795
1796     Subcollection locking falls under the umbrella lock of its root collection.
1797
1798     """
1799
1800     def __init__(self, parent, name):
1801         super(Subcollection, self).__init__(parent)
1802         self.lock = self.root_collection().lock
1803         self._manifest_text = None
1804         self.name = name
1805         self.num_retries = parent.num_retries
1806
1807     def root_collection(self):
1808         return self.parent.root_collection()
1809
1810     def writable(self):
1811         return self.root_collection().writable()
1812
1813     def _my_api(self):
1814         return self.root_collection()._my_api()
1815
1816     def _my_keep(self):
1817         return self.root_collection()._my_keep()
1818
1819     def _my_block_manager(self):
1820         return self.root_collection()._my_block_manager()
1821
1822     def stream_name(self):
1823         return os.path.join(self.parent.stream_name(), self.name)
1824
1825     @synchronized
1826     def clone(self, new_parent, new_name):
1827         c = Subcollection(new_parent, new_name)
1828         c._clonefrom(self)
1829         return c
1830
1831     @must_be_writable
1832     @synchronized
1833     def _reparent(self, newparent, newname):
1834         self.set_committed(False)
1835         self.flush()
1836         self.parent.remove(self.name, recursive=True)
1837         self.parent = newparent
1838         self.name = newname
1839         self.lock = self.parent.root_collection().lock
1840
1841     @synchronized
1842     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1843         """Encode empty directories by using an \056-named (".") empty file"""
1844         if len(self._items) == 0:
1845             return "%s %s 0:0:\\056\n" % (stream_name, config.EMPTY_BLOCK_LOCATOR)
1846         return super(Subcollection, self)._get_manifest_text(stream_name,
1847                                                              strip, normalize,
1848                                                              only_committed)
1849
1850
1851 class CollectionReader(Collection):
1852     """A read-only collection object.
1853
1854     Initialize from a collection UUID or portable data hash, or raw
1855     manifest text.  See `Collection` constructor for detailed options.
1856
1857     """
1858     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1859         self._in_init = True
1860         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1861         self._in_init = False
1862
1863         # Forego any locking since it should never change once initialized.
1864         self.lock = NoopLock()
1865
1866         # Backwards compatability with old CollectionReader
1867         # all_streams() and all_files()
1868         self._streams = None
1869
1870     def writable(self):
1871         return self._in_init
1872
1873     def _populate_streams(orig_func):
1874         @functools.wraps(orig_func)
1875         def populate_streams_wrapper(self, *args, **kwargs):
1876             # Defer populating self._streams until needed since it creates a copy of the manifest.
1877             if self._streams is None:
1878                 if self._manifest_text:
1879                     self._streams = [sline.split()
1880                                      for sline in self._manifest_text.split("\n")
1881                                      if sline]
1882                 else:
1883                     self._streams = []
1884             return orig_func(self, *args, **kwargs)
1885         return populate_streams_wrapper
1886
1887     @_populate_streams
1888     def normalize(self):
1889         """Normalize the streams returned by `all_streams`.
1890
1891         This method is kept for backwards compatability and only affects the
1892         behavior of `all_streams()` and `all_files()`
1893
1894         """
1895
1896         # Rearrange streams
1897         streams = {}
1898         for s in self.all_streams():
1899             for f in s.all_files():
1900                 streamname, filename = split(s.name() + "/" + f.name())
1901                 if streamname not in streams:
1902                     streams[streamname] = {}
1903                 if filename not in streams[streamname]:
1904                     streams[streamname][filename] = []
1905                 for r in f.segments:
1906                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1907
1908         self._streams = [normalize_stream(s, streams[s])
1909                          for s in sorted(streams)]
1910     @_populate_streams
1911     def all_streams(self):
1912         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1913                 for s in self._streams]
1914
1915     @_populate_streams
1916     def all_files(self):
1917         for s in self.all_streams():
1918             for f in s.all_files():
1919                 yield f