11308: Make imports epydoc-parseable.
[arvados.git] / sdk / python / arvados / collection.py
1 from __future__ import absolute_import
2 from builtins import str
3 from past.builtins import basestring
4 from builtins import object
5 import functools
6 import logging
7 import os
8 import re
9 import errno
10 import hashlib
11 import time
12 import threading
13
14 from collections import deque
15 from stat import *
16
17 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
18 from .keep import KeepLocator, KeepClient
19 from .stream import StreamReader
20 from ._normalize_stream import normalize_stream
21 from ._ranges import Range, LocatorAndRange
22 from .safeapi import ThreadSafeApiCache
23 import arvados.config as config
24 import arvados.errors as errors
25 import arvados.util
26 import arvados.events as events
27 from arvados.retry import retry_method
28
29 _logger = logging.getLogger('arvados.collection')
30
31 class CollectionBase(object):
32     def __enter__(self):
33         return self
34
35     def __exit__(self, exc_type, exc_value, traceback):
36         pass
37
38     def _my_keep(self):
39         if self._keep_client is None:
40             self._keep_client = KeepClient(api_client=self._api_client,
41                                            num_retries=self.num_retries)
42         return self._keep_client
43
44     def stripped_manifest(self):
45         """Get the manifest with locator hints stripped.
46
47         Return the manifest for the current collection with all
48         non-portable hints (i.e., permission signatures and other
49         hints other than size hints) removed from the locators.
50         """
51         raw = self.manifest_text()
52         clean = []
53         for line in raw.split("\n"):
54             fields = line.split()
55             if fields:
56                 clean_fields = fields[:1] + [
57                     (re.sub(r'\+[^\d][^\+]*', '', x)
58                      if re.match(arvados.util.keep_locator_pattern, x)
59                      else x)
60                     for x in fields[1:]]
61                 clean += [' '.join(clean_fields), "\n"]
62         return ''.join(clean)
63
64
65 class _WriterFile(_FileLikeObjectBase):
66     def __init__(self, coll_writer, name):
67         super(_WriterFile, self).__init__(name, 'wb')
68         self.dest = coll_writer
69
70     def close(self):
71         super(_WriterFile, self).close()
72         self.dest.finish_current_file()
73
74     @_FileLikeObjectBase._before_close
75     def write(self, data):
76         self.dest.write(data)
77
78     @_FileLikeObjectBase._before_close
79     def writelines(self, seq):
80         for data in seq:
81             self.write(data)
82
83     @_FileLikeObjectBase._before_close
84     def flush(self):
85         self.dest.flush_data()
86
87
88 class CollectionWriter(CollectionBase):
89     def __init__(self, api_client=None, num_retries=0, replication=None):
90         """Instantiate a CollectionWriter.
91
92         CollectionWriter lets you build a new Arvados Collection from scratch.
93         Write files to it.  The CollectionWriter will upload data to Keep as
94         appropriate, and provide you with the Collection manifest text when
95         you're finished.
96
97         Arguments:
98         * api_client: The API client to use to look up Collections.  If not
99           provided, CollectionReader will build one from available Arvados
100           configuration.
101         * num_retries: The default number of times to retry failed
102           service requests.  Default 0.  You may change this value
103           after instantiation, but note those changes may not
104           propagate to related objects like the Keep client.
105         * replication: The number of copies of each block to store.
106           If this argument is None or not supplied, replication is
107           the server-provided default if available, otherwise 2.
108         """
109         self._api_client = api_client
110         self.num_retries = num_retries
111         self.replication = (2 if replication is None else replication)
112         self._keep_client = None
113         self._data_buffer = []
114         self._data_buffer_len = 0
115         self._current_stream_files = []
116         self._current_stream_length = 0
117         self._current_stream_locators = []
118         self._current_stream_name = '.'
119         self._current_file_name = None
120         self._current_file_pos = 0
121         self._finished_streams = []
122         self._close_file = None
123         self._queued_file = None
124         self._queued_dirents = deque()
125         self._queued_trees = deque()
126         self._last_open = None
127
128     def __exit__(self, exc_type, exc_value, traceback):
129         if exc_type is None:
130             self.finish()
131
132     def do_queued_work(self):
133         # The work queue consists of three pieces:
134         # * _queued_file: The file object we're currently writing to the
135         #   Collection.
136         # * _queued_dirents: Entries under the current directory
137         #   (_queued_trees[0]) that we want to write or recurse through.
138         #   This may contain files from subdirectories if
139         #   max_manifest_depth == 0 for this directory.
140         # * _queued_trees: Directories that should be written as separate
141         #   streams to the Collection.
142         # This function handles the smallest piece of work currently queued
143         # (current file, then current directory, then next directory) until
144         # no work remains.  The _work_THING methods each do a unit of work on
145         # THING.  _queue_THING methods add a THING to the work queue.
146         while True:
147             if self._queued_file:
148                 self._work_file()
149             elif self._queued_dirents:
150                 self._work_dirents()
151             elif self._queued_trees:
152                 self._work_trees()
153             else:
154                 break
155
156     def _work_file(self):
157         while True:
158             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
159             if not buf:
160                 break
161             self.write(buf)
162         self.finish_current_file()
163         if self._close_file:
164             self._queued_file.close()
165         self._close_file = None
166         self._queued_file = None
167
168     def _work_dirents(self):
169         path, stream_name, max_manifest_depth = self._queued_trees[0]
170         if stream_name != self.current_stream_name():
171             self.start_new_stream(stream_name)
172         while self._queued_dirents:
173             dirent = self._queued_dirents.popleft()
174             target = os.path.join(path, dirent)
175             if os.path.isdir(target):
176                 self._queue_tree(target,
177                                  os.path.join(stream_name, dirent),
178                                  max_manifest_depth - 1)
179             else:
180                 self._queue_file(target, dirent)
181                 break
182         if not self._queued_dirents:
183             self._queued_trees.popleft()
184
185     def _work_trees(self):
186         path, stream_name, max_manifest_depth = self._queued_trees[0]
187         d = arvados.util.listdir_recursive(
188             path, max_depth = (None if max_manifest_depth == 0 else 0))
189         if d:
190             self._queue_dirents(stream_name, d)
191         else:
192             self._queued_trees.popleft()
193
194     def _queue_file(self, source, filename=None):
195         assert (self._queued_file is None), "tried to queue more than one file"
196         if not hasattr(source, 'read'):
197             source = open(source, 'rb')
198             self._close_file = True
199         else:
200             self._close_file = False
201         if filename is None:
202             filename = os.path.basename(source.name)
203         self.start_new_file(filename)
204         self._queued_file = source
205
206     def _queue_dirents(self, stream_name, dirents):
207         assert (not self._queued_dirents), "tried to queue more than one tree"
208         self._queued_dirents = deque(sorted(dirents))
209
210     def _queue_tree(self, path, stream_name, max_manifest_depth):
211         self._queued_trees.append((path, stream_name, max_manifest_depth))
212
213     def write_file(self, source, filename=None):
214         self._queue_file(source, filename)
215         self.do_queued_work()
216
217     def write_directory_tree(self,
218                              path, stream_name='.', max_manifest_depth=-1):
219         self._queue_tree(path, stream_name, max_manifest_depth)
220         self.do_queued_work()
221
222     def write(self, newdata):
223         if isinstance(newdata, bytes):
224             pass
225         elif isinstance(newdata, str):
226             newdata = newdata.encode()
227         elif hasattr(newdata, '__iter__'):
228             for s in newdata:
229                 self.write(s)
230             return
231         self._data_buffer.append(newdata)
232         self._data_buffer_len += len(newdata)
233         self._current_stream_length += len(newdata)
234         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
235             self.flush_data()
236
237     def open(self, streampath, filename=None):
238         """open(streampath[, filename]) -> file-like object
239
240         Pass in the path of a file to write to the Collection, either as a
241         single string or as two separate stream name and file name arguments.
242         This method returns a file-like object you can write to add it to the
243         Collection.
244
245         You may only have one file object from the Collection open at a time,
246         so be sure to close the object when you're done.  Using the object in
247         a with statement makes that easy::
248
249           with cwriter.open('./doc/page1.txt') as outfile:
250               outfile.write(page1_data)
251           with cwriter.open('./doc/page2.txt') as outfile:
252               outfile.write(page2_data)
253         """
254         if filename is None:
255             streampath, filename = split(streampath)
256         if self._last_open and not self._last_open.closed:
257             raise errors.AssertionError(
258                 "can't open '{}' when '{}' is still open".format(
259                     filename, self._last_open.name))
260         if streampath != self.current_stream_name():
261             self.start_new_stream(streampath)
262         self.set_current_file_name(filename)
263         self._last_open = _WriterFile(self, filename)
264         return self._last_open
265
266     def flush_data(self):
267         data_buffer = b''.join(self._data_buffer)
268         if data_buffer:
269             self._current_stream_locators.append(
270                 self._my_keep().put(
271                     data_buffer[0:config.KEEP_BLOCK_SIZE],
272                     copies=self.replication))
273             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
274             self._data_buffer_len = len(self._data_buffer[0])
275
276     def start_new_file(self, newfilename=None):
277         self.finish_current_file()
278         self.set_current_file_name(newfilename)
279
280     def set_current_file_name(self, newfilename):
281         if re.search(r'[\t\n]', newfilename):
282             raise errors.AssertionError(
283                 "Manifest filenames cannot contain whitespace: %s" %
284                 newfilename)
285         elif re.search(r'\x00', newfilename):
286             raise errors.AssertionError(
287                 "Manifest filenames cannot contain NUL characters: %s" %
288                 newfilename)
289         self._current_file_name = newfilename
290
291     def current_file_name(self):
292         return self._current_file_name
293
294     def finish_current_file(self):
295         if self._current_file_name is None:
296             if self._current_file_pos == self._current_stream_length:
297                 return
298             raise errors.AssertionError(
299                 "Cannot finish an unnamed file " +
300                 "(%d bytes at offset %d in '%s' stream)" %
301                 (self._current_stream_length - self._current_file_pos,
302                  self._current_file_pos,
303                  self._current_stream_name))
304         self._current_stream_files.append([
305                 self._current_file_pos,
306                 self._current_stream_length - self._current_file_pos,
307                 self._current_file_name])
308         self._current_file_pos = self._current_stream_length
309         self._current_file_name = None
310
311     def start_new_stream(self, newstreamname='.'):
312         self.finish_current_stream()
313         self.set_current_stream_name(newstreamname)
314
315     def set_current_stream_name(self, newstreamname):
316         if re.search(r'[\t\n]', newstreamname):
317             raise errors.AssertionError(
318                 "Manifest stream names cannot contain whitespace: '%s'" %
319                 (newstreamname))
320         self._current_stream_name = '.' if newstreamname=='' else newstreamname
321
322     def current_stream_name(self):
323         return self._current_stream_name
324
325     def finish_current_stream(self):
326         self.finish_current_file()
327         self.flush_data()
328         if not self._current_stream_files:
329             pass
330         elif self._current_stream_name is None:
331             raise errors.AssertionError(
332                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
333                 (self._current_stream_length, len(self._current_stream_files)))
334         else:
335             if not self._current_stream_locators:
336                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
337             self._finished_streams.append([self._current_stream_name,
338                                            self._current_stream_locators,
339                                            self._current_stream_files])
340         self._current_stream_files = []
341         self._current_stream_length = 0
342         self._current_stream_locators = []
343         self._current_stream_name = None
344         self._current_file_pos = 0
345         self._current_file_name = None
346
347     def finish(self):
348         """Store the manifest in Keep and return its locator.
349
350         This is useful for storing manifest fragments (task outputs)
351         temporarily in Keep during a Crunch job.
352
353         In other cases you should make a collection instead, by
354         sending manifest_text() to the API server's "create
355         collection" endpoint.
356         """
357         return self._my_keep().put(self.manifest_text().encode(),
358                                    copies=self.replication)
359
360     def portable_data_hash(self):
361         stripped = self.stripped_manifest().encode()
362         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
363
364     def manifest_text(self):
365         self.finish_current_stream()
366         manifest = ''
367
368         for stream in self._finished_streams:
369             if not re.search(r'^\.(/.*)?$', stream[0]):
370                 manifest += './'
371             manifest += stream[0].replace(' ', '\\040')
372             manifest += ' ' + ' '.join(stream[1])
373             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
374             manifest += "\n"
375
376         return manifest
377
378     def data_locators(self):
379         ret = []
380         for name, locators, files in self._finished_streams:
381             ret += locators
382         return ret
383
384
385 class ResumableCollectionWriter(CollectionWriter):
386     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
387                    '_current_stream_locators', '_current_stream_name',
388                    '_current_file_name', '_current_file_pos', '_close_file',
389                    '_data_buffer', '_dependencies', '_finished_streams',
390                    '_queued_dirents', '_queued_trees']
391
392     def __init__(self, api_client=None, **kwargs):
393         self._dependencies = {}
394         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
395
396     @classmethod
397     def from_state(cls, state, *init_args, **init_kwargs):
398         # Try to build a new writer from scratch with the given state.
399         # If the state is not suitable to resume (because files have changed,
400         # been deleted, aren't predictable, etc.), raise a
401         # StaleWriterStateError.  Otherwise, return the initialized writer.
402         # The caller is responsible for calling writer.do_queued_work()
403         # appropriately after it's returned.
404         writer = cls(*init_args, **init_kwargs)
405         for attr_name in cls.STATE_PROPS:
406             attr_value = state[attr_name]
407             attr_class = getattr(writer, attr_name).__class__
408             # Coerce the value into the same type as the initial value, if
409             # needed.
410             if attr_class not in (type(None), attr_value.__class__):
411                 attr_value = attr_class(attr_value)
412             setattr(writer, attr_name, attr_value)
413         # Check dependencies before we try to resume anything.
414         if any(KeepLocator(ls).permission_expired()
415                for ls in writer._current_stream_locators):
416             raise errors.StaleWriterStateError(
417                 "locators include expired permission hint")
418         writer.check_dependencies()
419         if state['_current_file'] is not None:
420             path, pos = state['_current_file']
421             try:
422                 writer._queued_file = open(path, 'rb')
423                 writer._queued_file.seek(pos)
424             except IOError as error:
425                 raise errors.StaleWriterStateError(
426                     "failed to reopen active file {}: {}".format(path, error))
427         return writer
428
429     def check_dependencies(self):
430         for path, orig_stat in list(self._dependencies.items()):
431             if not S_ISREG(orig_stat[ST_MODE]):
432                 raise errors.StaleWriterStateError("{} not file".format(path))
433             try:
434                 now_stat = tuple(os.stat(path))
435             except OSError as error:
436                 raise errors.StaleWriterStateError(
437                     "failed to stat {}: {}".format(path, error))
438             if ((not S_ISREG(now_stat[ST_MODE])) or
439                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
440                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
441                 raise errors.StaleWriterStateError("{} changed".format(path))
442
443     def dump_state(self, copy_func=lambda x: x):
444         state = {attr: copy_func(getattr(self, attr))
445                  for attr in self.STATE_PROPS}
446         if self._queued_file is None:
447             state['_current_file'] = None
448         else:
449             state['_current_file'] = (os.path.realpath(self._queued_file.name),
450                                       self._queued_file.tell())
451         return state
452
453     def _queue_file(self, source, filename=None):
454         try:
455             src_path = os.path.realpath(source)
456         except Exception:
457             raise errors.AssertionError("{} not a file path".format(source))
458         try:
459             path_stat = os.stat(src_path)
460         except OSError as stat_error:
461             path_stat = None
462         super(ResumableCollectionWriter, self)._queue_file(source, filename)
463         fd_stat = os.fstat(self._queued_file.fileno())
464         if not S_ISREG(fd_stat.st_mode):
465             # We won't be able to resume from this cache anyway, so don't
466             # worry about further checks.
467             self._dependencies[source] = tuple(fd_stat)
468         elif path_stat is None:
469             raise errors.AssertionError(
470                 "could not stat {}: {}".format(source, stat_error))
471         elif path_stat.st_ino != fd_stat.st_ino:
472             raise errors.AssertionError(
473                 "{} changed between open and stat calls".format(source))
474         else:
475             self._dependencies[src_path] = tuple(fd_stat)
476
477     def write(self, data):
478         if self._queued_file is None:
479             raise errors.AssertionError(
480                 "resumable writer can't accept unsourced data")
481         return super(ResumableCollectionWriter, self).write(data)
482
483
484 ADD = "add"
485 DEL = "del"
486 MOD = "mod"
487 TOK = "tok"
488 FILE = "file"
489 COLLECTION = "collection"
490
491 class RichCollectionBase(CollectionBase):
492     """Base class for Collections and Subcollections.
493
494     Implements the majority of functionality relating to accessing items in the
495     Collection.
496
497     """
498
499     def __init__(self, parent=None):
500         self.parent = parent
501         self._committed = False
502         self._callback = None
503         self._items = {}
504
505     def _my_api(self):
506         raise NotImplementedError()
507
508     def _my_keep(self):
509         raise NotImplementedError()
510
511     def _my_block_manager(self):
512         raise NotImplementedError()
513
514     def writable(self):
515         raise NotImplementedError()
516
517     def root_collection(self):
518         raise NotImplementedError()
519
520     def notify(self, event, collection, name, item):
521         raise NotImplementedError()
522
523     def stream_name(self):
524         raise NotImplementedError()
525
526     @must_be_writable
527     @synchronized
528     def find_or_create(self, path, create_type):
529         """Recursively search the specified file path.
530
531         May return either a `Collection` or `ArvadosFile`.  If not found, will
532         create a new item at the specified path based on `create_type`.  Will
533         create intermediate subcollections needed to contain the final item in
534         the path.
535
536         :create_type:
537           One of `arvados.collection.FILE` or
538           `arvados.collection.COLLECTION`.  If the path is not found, and value
539           of create_type is FILE then create and return a new ArvadosFile for
540           the last path component.  If COLLECTION, then create and return a new
541           Collection for the last path component.
542
543         """
544
545         pathcomponents = path.split("/", 1)
546         if pathcomponents[0]:
547             item = self._items.get(pathcomponents[0])
548             if len(pathcomponents) == 1:
549                 if item is None:
550                     # create new file
551                     if create_type == COLLECTION:
552                         item = Subcollection(self, pathcomponents[0])
553                     else:
554                         item = ArvadosFile(self, pathcomponents[0])
555                     self._items[pathcomponents[0]] = item
556                     self.set_committed(False)
557                     self.notify(ADD, self, pathcomponents[0], item)
558                 return item
559             else:
560                 if item is None:
561                     # create new collection
562                     item = Subcollection(self, pathcomponents[0])
563                     self._items[pathcomponents[0]] = item
564                     self.set_committed(False)
565                     self.notify(ADD, self, pathcomponents[0], item)
566                 if isinstance(item, RichCollectionBase):
567                     return item.find_or_create(pathcomponents[1], create_type)
568                 else:
569                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
570         else:
571             return self
572
573     @synchronized
574     def find(self, path):
575         """Recursively search the specified file path.
576
577         May return either a Collection or ArvadosFile. Return None if not
578         found.
579         If path is invalid (ex: starts with '/'), an IOError exception will be
580         raised.
581
582         """
583         if not path:
584             raise errors.ArgumentError("Parameter 'path' is empty.")
585
586         pathcomponents = path.split("/", 1)
587         if pathcomponents[0] == '':
588             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
589
590         item = self._items.get(pathcomponents[0])
591         if item is None:
592             return None
593         elif len(pathcomponents) == 1:
594             return item
595         else:
596             if isinstance(item, RichCollectionBase):
597                 if pathcomponents[1]:
598                     return item.find(pathcomponents[1])
599                 else:
600                     return item
601             else:
602                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
603
604     @synchronized
605     def mkdirs(self, path):
606         """Recursive subcollection create.
607
608         Like `os.makedirs()`.  Will create intermediate subcollections needed
609         to contain the leaf subcollection path.
610
611         """
612
613         if self.find(path) != None:
614             raise IOError(errno.EEXIST, "Directory or file exists", path)
615
616         return self.find_or_create(path, COLLECTION)
617
618     def open(self, path, mode="r"):
619         """Open a file-like object for access.
620
621         :path:
622           path to a file in the collection
623         :mode:
624           a string consisting of "r", "w", or "a", optionally followed
625           by "b" or "t", optionally followed by "+".
626           :"b":
627             binary mode: write() accepts bytes, read() returns bytes.
628           :"t":
629             text mode (default): write() accepts strings, read() returns strings.
630           :"r":
631             opens for reading
632           :"r+":
633             opens for reading and writing.  Reads/writes share a file pointer.
634           :"w", "w+":
635             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
636           :"a", "a+":
637             opens for reading and writing.  All writes are appended to
638             the end of the file.  Writing does not affect the file pointer for
639             reading.
640         """
641
642         if not re.search(r'^[rwa][bt]?\+?$', mode):
643             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
644
645         if mode[0] == 'r' and '+' not in mode:
646             fclass = ArvadosFileReader
647             arvfile = self.find(path)
648         elif not self.writable():
649             raise IOError(errno.EROFS, "Collection is read only")
650         else:
651             fclass = ArvadosFileWriter
652             arvfile = self.find_or_create(path, FILE)
653
654         if arvfile is None:
655             raise IOError(errno.ENOENT, "File not found", path)
656         if not isinstance(arvfile, ArvadosFile):
657             raise IOError(errno.EISDIR, "Is a directory", path)
658
659         if mode[0] == 'w':
660             arvfile.truncate(0)
661
662         return fclass(arvfile, mode=mode, num_retries=self.num_retries)
663
664     def modified(self):
665         """Determine if the collection has been modified since last commited."""
666         return not self.committed()
667
668     @synchronized
669     def committed(self):
670         """Determine if the collection has been committed to the API server."""
671         return self._committed
672
673     @synchronized
674     def set_committed(self, value=True):
675         """Recursively set committed flag.
676
677         If value is True, set committed to be True for this and all children.
678
679         If value is False, set committed to be False for this and all parents.
680         """
681         if value == self._committed:
682             return
683         if value:
684             for k,v in list(self._items.items()):
685                 v.set_committed(True)
686             self._committed = True
687         else:
688             self._committed = False
689             if self.parent is not None:
690                 self.parent.set_committed(False)
691
692     @synchronized
693     def __iter__(self):
694         """Iterate over names of files and collections contained in this collection."""
695         return iter(list(self._items.keys()))
696
697     @synchronized
698     def __getitem__(self, k):
699         """Get a file or collection that is directly contained by this collection.
700
701         If you want to search a path, use `find()` instead.
702
703         """
704         return self._items[k]
705
706     @synchronized
707     def __contains__(self, k):
708         """Test if there is a file or collection a directly contained by this collection."""
709         return k in self._items
710
711     @synchronized
712     def __len__(self):
713         """Get the number of items directly contained in this collection."""
714         return len(self._items)
715
716     @must_be_writable
717     @synchronized
718     def __delitem__(self, p):
719         """Delete an item by name which is directly contained by this collection."""
720         del self._items[p]
721         self.set_committed(False)
722         self.notify(DEL, self, p, None)
723
724     @synchronized
725     def keys(self):
726         """Get a list of names of files and collections directly contained in this collection."""
727         return list(self._items.keys())
728
729     @synchronized
730     def values(self):
731         """Get a list of files and collection objects directly contained in this collection."""
732         return list(self._items.values())
733
734     @synchronized
735     def items(self):
736         """Get a list of (name, object) tuples directly contained in this collection."""
737         return list(self._items.items())
738
739     def exists(self, path):
740         """Test if there is a file or collection at `path`."""
741         return self.find(path) is not None
742
743     @must_be_writable
744     @synchronized
745     def remove(self, path, recursive=False):
746         """Remove the file or subcollection (directory) at `path`.
747
748         :recursive:
749           Specify whether to remove non-empty subcollections (True), or raise an error (False).
750         """
751
752         if not path:
753             raise errors.ArgumentError("Parameter 'path' is empty.")
754
755         pathcomponents = path.split("/", 1)
756         item = self._items.get(pathcomponents[0])
757         if item is None:
758             raise IOError(errno.ENOENT, "File not found", path)
759         if len(pathcomponents) == 1:
760             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
761                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
762             deleteditem = self._items[pathcomponents[0]]
763             del self._items[pathcomponents[0]]
764             self.set_committed(False)
765             self.notify(DEL, self, pathcomponents[0], deleteditem)
766         else:
767             item.remove(pathcomponents[1])
768
769     def _clonefrom(self, source):
770         for k,v in list(source.items()):
771             self._items[k] = v.clone(self, k)
772
773     def clone(self):
774         raise NotImplementedError()
775
776     @must_be_writable
777     @synchronized
778     def add(self, source_obj, target_name, overwrite=False, reparent=False):
779         """Copy or move a file or subcollection to this collection.
780
781         :source_obj:
782           An ArvadosFile, or Subcollection object
783
784         :target_name:
785           Destination item name.  If the target name already exists and is a
786           file, this will raise an error unless you specify `overwrite=True`.
787
788         :overwrite:
789           Whether to overwrite target file if it already exists.
790
791         :reparent:
792           If True, source_obj will be moved from its parent collection to this collection.
793           If False, source_obj will be copied and the parent collection will be
794           unmodified.
795
796         """
797
798         if target_name in self and not overwrite:
799             raise IOError(errno.EEXIST, "File already exists", target_name)
800
801         modified_from = None
802         if target_name in self:
803             modified_from = self[target_name]
804
805         # Actually make the move or copy.
806         if reparent:
807             source_obj._reparent(self, target_name)
808             item = source_obj
809         else:
810             item = source_obj.clone(self, target_name)
811
812         self._items[target_name] = item
813         self.set_committed(False)
814
815         if modified_from:
816             self.notify(MOD, self, target_name, (modified_from, item))
817         else:
818             self.notify(ADD, self, target_name, item)
819
820     def _get_src_target(self, source, target_path, source_collection, create_dest):
821         if source_collection is None:
822             source_collection = self
823
824         # Find the object
825         if isinstance(source, basestring):
826             source_obj = source_collection.find(source)
827             if source_obj is None:
828                 raise IOError(errno.ENOENT, "File not found", source)
829             sourcecomponents = source.split("/")
830         else:
831             source_obj = source
832             sourcecomponents = None
833
834         # Find parent collection the target path
835         targetcomponents = target_path.split("/")
836
837         # Determine the name to use.
838         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
839
840         if not target_name:
841             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
842
843         if create_dest:
844             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
845         else:
846             if len(targetcomponents) > 1:
847                 target_dir = self.find("/".join(targetcomponents[0:-1]))
848             else:
849                 target_dir = self
850
851         if target_dir is None:
852             raise IOError(errno.ENOENT, "Target directory not found", target_name)
853
854         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
855             target_dir = target_dir[target_name]
856             target_name = sourcecomponents[-1]
857
858         return (source_obj, target_dir, target_name)
859
860     @must_be_writable
861     @synchronized
862     def copy(self, source, target_path, source_collection=None, overwrite=False):
863         """Copy a file or subcollection to a new path in this collection.
864
865         :source:
866           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
867
868         :target_path:
869           Destination file or path.  If the target path already exists and is a
870           subcollection, the item will be placed inside the subcollection.  If
871           the target path already exists and is a file, this will raise an error
872           unless you specify `overwrite=True`.
873
874         :source_collection:
875           Collection to copy `source_path` from (default `self`)
876
877         :overwrite:
878           Whether to overwrite target file if it already exists.
879         """
880
881         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
882         target_dir.add(source_obj, target_name, overwrite, False)
883
884     @must_be_writable
885     @synchronized
886     def rename(self, source, target_path, source_collection=None, overwrite=False):
887         """Move a file or subcollection from `source_collection` to a new path in this collection.
888
889         :source:
890           A string with a path to source file or subcollection.
891
892         :target_path:
893           Destination file or path.  If the target path already exists and is a
894           subcollection, the item will be placed inside the subcollection.  If
895           the target path already exists and is a file, this will raise an error
896           unless you specify `overwrite=True`.
897
898         :source_collection:
899           Collection to copy `source_path` from (default `self`)
900
901         :overwrite:
902           Whether to overwrite target file if it already exists.
903         """
904
905         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
906         if not source_obj.writable():
907             raise IOError(errno.EROFS, "Source collection is read only", source)
908         target_dir.add(source_obj, target_name, overwrite, True)
909
910     def portable_manifest_text(self, stream_name="."):
911         """Get the manifest text for this collection, sub collections and files.
912
913         This method does not flush outstanding blocks to Keep.  It will return
914         a normalized manifest with access tokens stripped.
915
916         :stream_name:
917           Name to use for this stream (directory)
918
919         """
920         return self._get_manifest_text(stream_name, True, True)
921
922     @synchronized
923     def manifest_text(self, stream_name=".", strip=False, normalize=False,
924                       only_committed=False):
925         """Get the manifest text for this collection, sub collections and files.
926
927         This method will flush outstanding blocks to Keep.  By default, it will
928         not normalize an unmodified manifest or strip access tokens.
929
930         :stream_name:
931           Name to use for this stream (directory)
932
933         :strip:
934           If True, remove signing tokens from block locators if present.
935           If False (default), block locators are left unchanged.
936
937         :normalize:
938           If True, always export the manifest text in normalized form
939           even if the Collection is not modified.  If False (default) and the collection
940           is not modified, return the original manifest text even if it is not
941           in normalized form.
942
943         :only_committed:
944           If True, don't commit pending blocks.
945
946         """
947
948         if not only_committed:
949             self._my_block_manager().commit_all()
950         return self._get_manifest_text(stream_name, strip, normalize,
951                                        only_committed=only_committed)
952
953     @synchronized
954     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
955         """Get the manifest text for this collection, sub collections and files.
956
957         :stream_name:
958           Name to use for this stream (directory)
959
960         :strip:
961           If True, remove signing tokens from block locators if present.
962           If False (default), block locators are left unchanged.
963
964         :normalize:
965           If True, always export the manifest text in normalized form
966           even if the Collection is not modified.  If False (default) and the collection
967           is not modified, return the original manifest text even if it is not
968           in normalized form.
969
970         :only_committed:
971           If True, only include blocks that were already committed to Keep.
972
973         """
974
975         if not self.committed() or self._manifest_text is None or normalize:
976             stream = {}
977             buf = []
978             sorted_keys = sorted(self.keys())
979             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
980                 # Create a stream per file `k`
981                 arvfile = self[filename]
982                 filestream = []
983                 for segment in arvfile.segments():
984                     loc = segment.locator
985                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
986                         if only_committed:
987                             continue
988                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
989                     if strip:
990                         loc = KeepLocator(loc).stripped()
991                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
992                                          segment.segment_offset, segment.range_size))
993                 stream[filename] = filestream
994             if stream:
995                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
996             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
997                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
998             return "".join(buf)
999         else:
1000             if strip:
1001                 return self.stripped_manifest()
1002             else:
1003                 return self._manifest_text
1004
1005     @synchronized
1006     def diff(self, end_collection, prefix=".", holding_collection=None):
1007         """Generate list of add/modify/delete actions.
1008
1009         When given to `apply`, will change `self` to match `end_collection`
1010
1011         """
1012         changes = []
1013         if holding_collection is None:
1014             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1015         for k in self:
1016             if k not in end_collection:
1017                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1018         for k in end_collection:
1019             if k in self:
1020                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1021                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1022                 elif end_collection[k] != self[k]:
1023                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1024                 else:
1025                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1026             else:
1027                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1028         return changes
1029
1030     @must_be_writable
1031     @synchronized
1032     def apply(self, changes):
1033         """Apply changes from `diff`.
1034
1035         If a change conflicts with a local change, it will be saved to an
1036         alternate path indicating the conflict.
1037
1038         """
1039         if changes:
1040             self.set_committed(False)
1041         for change in changes:
1042             event_type = change[0]
1043             path = change[1]
1044             initial = change[2]
1045             local = self.find(path)
1046             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1047                                                                     time.gmtime()))
1048             if event_type == ADD:
1049                 if local is None:
1050                     # No local file at path, safe to copy over new file
1051                     self.copy(initial, path)
1052                 elif local is not None and local != initial:
1053                     # There is already local file and it is different:
1054                     # save change to conflict file.
1055                     self.copy(initial, conflictpath)
1056             elif event_type == MOD or event_type == TOK:
1057                 final = change[3]
1058                 if local == initial:
1059                     # Local matches the "initial" item so it has not
1060                     # changed locally and is safe to update.
1061                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1062                         # Replace contents of local file with new contents
1063                         local.replace_contents(final)
1064                     else:
1065                         # Overwrite path with new item; this can happen if
1066                         # path was a file and is now a collection or vice versa
1067                         self.copy(final, path, overwrite=True)
1068                 else:
1069                     # Local is missing (presumably deleted) or local doesn't
1070                     # match the "start" value, so save change to conflict file
1071                     self.copy(final, conflictpath)
1072             elif event_type == DEL:
1073                 if local == initial:
1074                     # Local item matches "initial" value, so it is safe to remove.
1075                     self.remove(path, recursive=True)
1076                 # else, the file is modified or already removed, in either
1077                 # case we don't want to try to remove it.
1078
1079     def portable_data_hash(self):
1080         """Get the portable data hash for this collection's manifest."""
1081         if self._manifest_locator and self.committed():
1082             # If the collection is already saved on the API server, and it's committed
1083             # then return API server's PDH response.
1084             return self._portable_data_hash
1085         else:
1086             stripped = self.portable_manifest_text().encode()
1087             return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1088
1089     @synchronized
1090     def subscribe(self, callback):
1091         if self._callback is None:
1092             self._callback = callback
1093         else:
1094             raise errors.ArgumentError("A callback is already set on this collection.")
1095
1096     @synchronized
1097     def unsubscribe(self):
1098         if self._callback is not None:
1099             self._callback = None
1100
1101     @synchronized
1102     def notify(self, event, collection, name, item):
1103         if self._callback:
1104             self._callback(event, collection, name, item)
1105         self.root_collection().notify(event, collection, name, item)
1106
1107     @synchronized
1108     def __eq__(self, other):
1109         if other is self:
1110             return True
1111         if not isinstance(other, RichCollectionBase):
1112             return False
1113         if len(self._items) != len(other):
1114             return False
1115         for k in self._items:
1116             if k not in other:
1117                 return False
1118             if self._items[k] != other[k]:
1119                 return False
1120         return True
1121
1122     def __ne__(self, other):
1123         return not self.__eq__(other)
1124
1125     @synchronized
1126     def flush(self):
1127         """Flush bufferblocks to Keep."""
1128         for e in list(self.values()):
1129             e.flush()
1130
1131
1132 class Collection(RichCollectionBase):
1133     """Represents the root of an Arvados Collection.
1134
1135     This class is threadsafe.  The root collection object, all subcollections
1136     and files are protected by a single lock (i.e. each access locks the entire
1137     collection).
1138
1139     Brief summary of
1140     useful methods:
1141
1142     :To read an existing file:
1143       `c.open("myfile", "r")`
1144
1145     :To write a new file:
1146       `c.open("myfile", "w")`
1147
1148     :To determine if a file exists:
1149       `c.find("myfile") is not None`
1150
1151     :To copy a file:
1152       `c.copy("source", "dest")`
1153
1154     :To delete a file:
1155       `c.remove("myfile")`
1156
1157     :To save to an existing collection record:
1158       `c.save()`
1159
1160     :To save a new collection record:
1161     `c.save_new()`
1162
1163     :To merge remote changes into this object:
1164       `c.update()`
1165
1166     Must be associated with an API server Collection record (during
1167     initialization, or using `save_new`) to use `save` or `update`
1168
1169     """
1170
1171     def __init__(self, manifest_locator_or_text=None,
1172                  api_client=None,
1173                  keep_client=None,
1174                  num_retries=None,
1175                  parent=None,
1176                  apiconfig=None,
1177                  block_manager=None,
1178                  replication_desired=None,
1179                  put_threads=None):
1180         """Collection constructor.
1181
1182         :manifest_locator_or_text:
1183           One of Arvados collection UUID, block locator of
1184           a manifest, raw manifest text, or None (to create an empty collection).
1185         :parent:
1186           the parent Collection, may be None.
1187
1188         :apiconfig:
1189           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1190           Prefer this over supplying your own api_client and keep_client (except in testing).
1191           Will use default config settings if not specified.
1192
1193         :api_client:
1194           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1195
1196         :keep_client:
1197           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1198
1199         :num_retries:
1200           the number of retries for API and Keep requests.
1201
1202         :block_manager:
1203           the block manager to use.  If not specified, create one.
1204
1205         :replication_desired:
1206           How many copies should Arvados maintain. If None, API server default
1207           configuration applies. If not None, this value will also be used
1208           for determining the number of block copies being written.
1209
1210         """
1211         super(Collection, self).__init__(parent)
1212         self._api_client = api_client
1213         self._keep_client = keep_client
1214         self._block_manager = block_manager
1215         self.replication_desired = replication_desired
1216         self.put_threads = put_threads
1217
1218         if apiconfig:
1219             self._config = apiconfig
1220         else:
1221             self._config = config.settings()
1222
1223         self.num_retries = num_retries if num_retries is not None else 0
1224         self._manifest_locator = None
1225         self._manifest_text = None
1226         self._portable_data_hash = None
1227         self._api_response = None
1228         self._past_versions = set()
1229
1230         self.lock = threading.RLock()
1231         self.events = None
1232
1233         if manifest_locator_or_text:
1234             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1235                 self._manifest_locator = manifest_locator_or_text
1236             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1237                 self._manifest_locator = manifest_locator_or_text
1238             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1239                 self._manifest_text = manifest_locator_or_text
1240             else:
1241                 raise errors.ArgumentError(
1242                     "Argument to CollectionReader is not a manifest or a collection UUID")
1243
1244             try:
1245                 self._populate()
1246             except (IOError, errors.SyntaxError) as e:
1247                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1248
1249     def root_collection(self):
1250         return self
1251
1252     def stream_name(self):
1253         return "."
1254
1255     def writable(self):
1256         return True
1257
1258     @synchronized
1259     def known_past_version(self, modified_at_and_portable_data_hash):
1260         return modified_at_and_portable_data_hash in self._past_versions
1261
1262     @synchronized
1263     @retry_method
1264     def update(self, other=None, num_retries=None):
1265         """Merge the latest collection on the API server with the current collection."""
1266
1267         if other is None:
1268             if self._manifest_locator is None:
1269                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1270             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1271             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1272                 response.get("portable_data_hash") != self.portable_data_hash()):
1273                 # The record on the server is different from our current one, but we've seen it before,
1274                 # so ignore it because it's already been merged.
1275                 # However, if it's the same as our current record, proceed with the update, because we want to update
1276                 # our tokens.
1277                 return
1278             else:
1279                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1280             other = CollectionReader(response["manifest_text"])
1281         baseline = CollectionReader(self._manifest_text)
1282         self.apply(baseline.diff(other))
1283         self._manifest_text = self.manifest_text()
1284
1285     @synchronized
1286     def _my_api(self):
1287         if self._api_client is None:
1288             self._api_client = ThreadSafeApiCache(self._config)
1289             if self._keep_client is None:
1290                 self._keep_client = self._api_client.keep
1291         return self._api_client
1292
1293     @synchronized
1294     def _my_keep(self):
1295         if self._keep_client is None:
1296             if self._api_client is None:
1297                 self._my_api()
1298             else:
1299                 self._keep_client = KeepClient(api_client=self._api_client)
1300         return self._keep_client
1301
1302     @synchronized
1303     def _my_block_manager(self):
1304         if self._block_manager is None:
1305             copies = (self.replication_desired or
1306                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1307                                                    2))
1308             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1309         return self._block_manager
1310
1311     def _remember_api_response(self, response):
1312         self._api_response = response
1313         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1314
1315     def _populate_from_api_server(self):
1316         # As in KeepClient itself, we must wait until the last
1317         # possible moment to instantiate an API client, in order to
1318         # avoid tripping up clients that don't have access to an API
1319         # server.  If we do build one, make sure our Keep client uses
1320         # it.  If instantiation fails, we'll fall back to the except
1321         # clause, just like any other Collection lookup
1322         # failure. Return an exception, or None if successful.
1323         try:
1324             self._remember_api_response(self._my_api().collections().get(
1325                 uuid=self._manifest_locator).execute(
1326                     num_retries=self.num_retries))
1327             self._manifest_text = self._api_response['manifest_text']
1328             self._portable_data_hash = self._api_response['portable_data_hash']
1329             # If not overriden via kwargs, we should try to load the
1330             # replication_desired from the API server
1331             if self.replication_desired is None:
1332                 self.replication_desired = self._api_response.get('replication_desired', None)
1333             return None
1334         except Exception as e:
1335             return e
1336
1337     def _populate_from_keep(self):
1338         # Retrieve a manifest directly from Keep. This has a chance of
1339         # working if [a] the locator includes a permission signature
1340         # or [b] the Keep services are operating in world-readable
1341         # mode. Return an exception, or None if successful.
1342         try:
1343             self._manifest_text = self._my_keep().get(
1344                 self._manifest_locator, num_retries=self.num_retries).decode()
1345         except Exception as e:
1346             return e
1347
1348     def _populate(self):
1349         if self._manifest_locator is None and self._manifest_text is None:
1350             return
1351         error_via_api = None
1352         error_via_keep = None
1353         should_try_keep = ((self._manifest_text is None) and
1354                            arvados.util.keep_locator_pattern.match(
1355                                self._manifest_locator))
1356         if ((self._manifest_text is None) and
1357             arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1358             error_via_keep = self._populate_from_keep()
1359         if self._manifest_text is None:
1360             error_via_api = self._populate_from_api_server()
1361             if error_via_api is not None and not should_try_keep:
1362                 raise error_via_api
1363         if ((self._manifest_text is None) and
1364             not error_via_keep and
1365             should_try_keep):
1366             # Looks like a keep locator, and we didn't already try keep above
1367             error_via_keep = self._populate_from_keep()
1368         if self._manifest_text is None:
1369             # Nothing worked!
1370             raise errors.NotFoundError(
1371                 ("Failed to retrieve collection '{}' " +
1372                  "from either API server ({}) or Keep ({})."
1373                  ).format(
1374                     self._manifest_locator,
1375                     error_via_api,
1376                     error_via_keep))
1377         # populate
1378         self._baseline_manifest = self._manifest_text
1379         self._import_manifest(self._manifest_text)
1380
1381
1382     def _has_collection_uuid(self):
1383         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1384
1385     def __enter__(self):
1386         return self
1387
1388     def __exit__(self, exc_type, exc_value, traceback):
1389         """Support scoped auto-commit in a with: block."""
1390         if exc_type is None:
1391             if self.writable() and self._has_collection_uuid():
1392                 self.save()
1393         self.stop_threads()
1394
1395     def stop_threads(self):
1396         if self._block_manager is not None:
1397             self._block_manager.stop_threads()
1398
1399     @synchronized
1400     def manifest_locator(self):
1401         """Get the manifest locator, if any.
1402
1403         The manifest locator will be set when the collection is loaded from an
1404         API server record or the portable data hash of a manifest.
1405
1406         The manifest locator will be None if the collection is newly created or
1407         was created directly from manifest text.  The method `save_new()` will
1408         assign a manifest locator.
1409
1410         """
1411         return self._manifest_locator
1412
1413     @synchronized
1414     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1415         if new_config is None:
1416             new_config = self._config
1417         if readonly:
1418             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1419         else:
1420             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1421
1422         newcollection._clonefrom(self)
1423         return newcollection
1424
1425     @synchronized
1426     def api_response(self):
1427         """Returns information about this Collection fetched from the API server.
1428
1429         If the Collection exists in Keep but not the API server, currently
1430         returns None.  Future versions may provide a synthetic response.
1431
1432         """
1433         return self._api_response
1434
1435     def find_or_create(self, path, create_type):
1436         """See `RichCollectionBase.find_or_create`"""
1437         if path == ".":
1438             return self
1439         else:
1440             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1441
1442     def find(self, path):
1443         """See `RichCollectionBase.find`"""
1444         if path == ".":
1445             return self
1446         else:
1447             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1448
1449     def remove(self, path, recursive=False):
1450         """See `RichCollectionBase.remove`"""
1451         if path == ".":
1452             raise errors.ArgumentError("Cannot remove '.'")
1453         else:
1454             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1455
1456     @must_be_writable
1457     @synchronized
1458     @retry_method
1459     def save(self, merge=True, num_retries=None):
1460         """Save collection to an existing collection record.
1461
1462         Commit pending buffer blocks to Keep, merge with remote record (if
1463         merge=True, the default), and update the collection record.  Returns
1464         the current manifest text.
1465
1466         Will raise AssertionError if not associated with a collection record on
1467         the API server.  If you want to save a manifest to Keep only, see
1468         `save_new()`.
1469
1470         :merge:
1471           Update and merge remote changes before saving.  Otherwise, any
1472           remote changes will be ignored and overwritten.
1473
1474         :num_retries:
1475           Retry count on API calls (if None,  use the collection default)
1476
1477         """
1478         if not self.committed():
1479             if not self._has_collection_uuid():
1480                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1481
1482             self._my_block_manager().commit_all()
1483
1484             if merge:
1485                 self.update()
1486
1487             text = self.manifest_text(strip=False)
1488             self._remember_api_response(self._my_api().collections().update(
1489                 uuid=self._manifest_locator,
1490                 body={'manifest_text': text}
1491                 ).execute(
1492                     num_retries=num_retries))
1493             self._manifest_text = self._api_response["manifest_text"]
1494             self._portable_data_hash = self._api_response["portable_data_hash"]
1495             self.set_committed(True)
1496
1497         return self._manifest_text
1498
1499
1500     @must_be_writable
1501     @synchronized
1502     @retry_method
1503     def save_new(self, name=None,
1504                  create_collection_record=True,
1505                  owner_uuid=None,
1506                  ensure_unique_name=False,
1507                  num_retries=None):
1508         """Save collection to a new collection record.
1509
1510         Commit pending buffer blocks to Keep and, when create_collection_record
1511         is True (default), create a new collection record.  After creating a
1512         new collection record, this Collection object will be associated with
1513         the new record used by `save()`.  Returns the current manifest text.
1514
1515         :name:
1516           The collection name.
1517
1518         :create_collection_record:
1519            If True, create a collection record on the API server.
1520            If False, only commit blocks to Keep and return the manifest text.
1521
1522         :owner_uuid:
1523           the user, or project uuid that will own this collection.
1524           If None, defaults to the current user.
1525
1526         :ensure_unique_name:
1527           If True, ask the API server to rename the collection
1528           if it conflicts with a collection with the same name and owner.  If
1529           False, a name conflict will result in an error.
1530
1531         :num_retries:
1532           Retry count on API calls (if None,  use the collection default)
1533
1534         """
1535         self._my_block_manager().commit_all()
1536         text = self.manifest_text(strip=False)
1537
1538         if create_collection_record:
1539             if name is None:
1540                 name = "New collection"
1541                 ensure_unique_name = True
1542
1543             body = {"manifest_text": text,
1544                     "name": name,
1545                     "replication_desired": self.replication_desired}
1546             if owner_uuid:
1547                 body["owner_uuid"] = owner_uuid
1548
1549             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1550             text = self._api_response["manifest_text"]
1551
1552             self._manifest_locator = self._api_response["uuid"]
1553             self._portable_data_hash = self._api_response["portable_data_hash"]
1554
1555             self._manifest_text = text
1556             self.set_committed(True)
1557
1558         return text
1559
1560     @synchronized
1561     def _import_manifest(self, manifest_text):
1562         """Import a manifest into a `Collection`.
1563
1564         :manifest_text:
1565           The manifest text to import from.
1566
1567         """
1568         if len(self) > 0:
1569             raise ArgumentError("Can only import manifest into an empty collection")
1570
1571         STREAM_NAME = 0
1572         BLOCKS = 1
1573         SEGMENTS = 2
1574
1575         stream_name = None
1576         state = STREAM_NAME
1577
1578         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1579             tok = token_and_separator.group(1)
1580             sep = token_and_separator.group(2)
1581
1582             if state == STREAM_NAME:
1583                 # starting a new stream
1584                 stream_name = tok.replace('\\040', ' ')
1585                 blocks = []
1586                 segments = []
1587                 streamoffset = 0
1588                 state = BLOCKS
1589                 self.find_or_create(stream_name, COLLECTION)
1590                 continue
1591
1592             if state == BLOCKS:
1593                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1594                 if block_locator:
1595                     blocksize = int(block_locator.group(1))
1596                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1597                     streamoffset += blocksize
1598                 else:
1599                     state = SEGMENTS
1600
1601             if state == SEGMENTS:
1602                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1603                 if file_segment:
1604                     pos = int(file_segment.group(1))
1605                     size = int(file_segment.group(2))
1606                     name = file_segment.group(3).replace('\\040', ' ')
1607                     filepath = os.path.join(stream_name, name)
1608                     afile = self.find_or_create(filepath, FILE)
1609                     if isinstance(afile, ArvadosFile):
1610                         afile.add_segment(blocks, pos, size)
1611                     else:
1612                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1613                 else:
1614                     # error!
1615                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1616
1617             if sep == "\n":
1618                 stream_name = None
1619                 state = STREAM_NAME
1620
1621         self.set_committed(True)
1622
1623     @synchronized
1624     def notify(self, event, collection, name, item):
1625         if self._callback:
1626             self._callback(event, collection, name, item)
1627
1628
1629 class Subcollection(RichCollectionBase):
1630     """This is a subdirectory within a collection that doesn't have its own API
1631     server record.
1632
1633     Subcollection locking falls under the umbrella lock of its root collection.
1634
1635     """
1636
1637     def __init__(self, parent, name):
1638         super(Subcollection, self).__init__(parent)
1639         self.lock = self.root_collection().lock
1640         self._manifest_text = None
1641         self.name = name
1642         self.num_retries = parent.num_retries
1643
1644     def root_collection(self):
1645         return self.parent.root_collection()
1646
1647     def writable(self):
1648         return self.root_collection().writable()
1649
1650     def _my_api(self):
1651         return self.root_collection()._my_api()
1652
1653     def _my_keep(self):
1654         return self.root_collection()._my_keep()
1655
1656     def _my_block_manager(self):
1657         return self.root_collection()._my_block_manager()
1658
1659     def stream_name(self):
1660         return os.path.join(self.parent.stream_name(), self.name)
1661
1662     @synchronized
1663     def clone(self, new_parent, new_name):
1664         c = Subcollection(new_parent, new_name)
1665         c._clonefrom(self)
1666         return c
1667
1668     @must_be_writable
1669     @synchronized
1670     def _reparent(self, newparent, newname):
1671         self.set_committed(False)
1672         self.flush()
1673         self.parent.remove(self.name, recursive=True)
1674         self.parent = newparent
1675         self.name = newname
1676         self.lock = self.parent.root_collection().lock
1677
1678
1679 class CollectionReader(Collection):
1680     """A read-only collection object.
1681
1682     Initialize from an api collection record locator, a portable data hash of a
1683     manifest, or raw manifest text.  See `Collection` constructor for detailed
1684     options.
1685
1686     """
1687     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1688         self._in_init = True
1689         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1690         self._in_init = False
1691
1692         # Forego any locking since it should never change once initialized.
1693         self.lock = NoopLock()
1694
1695         # Backwards compatability with old CollectionReader
1696         # all_streams() and all_files()
1697         self._streams = None
1698
1699     def writable(self):
1700         return self._in_init
1701
1702     def _populate_streams(orig_func):
1703         @functools.wraps(orig_func)
1704         def populate_streams_wrapper(self, *args, **kwargs):
1705             # Defer populating self._streams until needed since it creates a copy of the manifest.
1706             if self._streams is None:
1707                 if self._manifest_text:
1708                     self._streams = [sline.split()
1709                                      for sline in self._manifest_text.split("\n")
1710                                      if sline]
1711                 else:
1712                     self._streams = []
1713             return orig_func(self, *args, **kwargs)
1714         return populate_streams_wrapper
1715
1716     @_populate_streams
1717     def normalize(self):
1718         """Normalize the streams returned by `all_streams`.
1719
1720         This method is kept for backwards compatability and only affects the
1721         behavior of `all_streams()` and `all_files()`
1722
1723         """
1724
1725         # Rearrange streams
1726         streams = {}
1727         for s in self.all_streams():
1728             for f in s.all_files():
1729                 streamname, filename = split(s.name() + "/" + f.name())
1730                 if streamname not in streams:
1731                     streams[streamname] = {}
1732                 if filename not in streams[streamname]:
1733                     streams[streamname][filename] = []
1734                 for r in f.segments:
1735                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1736
1737         self._streams = [normalize_stream(s, streams[s])
1738                          for s in sorted(streams)]
1739     @_populate_streams
1740     def all_streams(self):
1741         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1742                 for s in self._streams]
1743
1744     @_populate_streams
1745     def all_files(self):
1746         for s in self.all_streams():
1747             for f in s.all_files():
1748                 yield f