8784: Fix service discovery race in tests (send SIGHUP to keep-web).
[arvados.git] / sdk / python / arvados / collection.py
1 from __future__ import absolute_import
2 from future.utils import listitems, listvalues, viewkeys
3 from builtins import str
4 from past.builtins import basestring
5 from builtins import object
6 import functools
7 import logging
8 import os
9 import re
10 import errno
11 import hashlib
12 import time
13 import threading
14
15 from collections import deque
16 from stat import *
17
18 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
19 from .keep import KeepLocator, KeepClient
20 from .stream import StreamReader
21 from ._normalize_stream import normalize_stream
22 from ._ranges import Range, LocatorAndRange
23 from .safeapi import ThreadSafeApiCache
24 import arvados.config as config
25 import arvados.errors as errors
26 import arvados.util
27 import arvados.events as events
28 from arvados.retry import retry_method
29
30 _logger = logging.getLogger('arvados.collection')
31
32 class CollectionBase(object):
33     def __enter__(self):
34         return self
35
36     def __exit__(self, exc_type, exc_value, traceback):
37         pass
38
39     def _my_keep(self):
40         if self._keep_client is None:
41             self._keep_client = KeepClient(api_client=self._api_client,
42                                            num_retries=self.num_retries)
43         return self._keep_client
44
45     def stripped_manifest(self):
46         """Get the manifest with locator hints stripped.
47
48         Return the manifest for the current collection with all
49         non-portable hints (i.e., permission signatures and other
50         hints other than size hints) removed from the locators.
51         """
52         raw = self.manifest_text()
53         clean = []
54         for line in raw.split("\n"):
55             fields = line.split()
56             if fields:
57                 clean_fields = fields[:1] + [
58                     (re.sub(r'\+[^\d][^\+]*', '', x)
59                      if re.match(arvados.util.keep_locator_pattern, x)
60                      else x)
61                     for x in fields[1:]]
62                 clean += [' '.join(clean_fields), "\n"]
63         return ''.join(clean)
64
65
66 class _WriterFile(_FileLikeObjectBase):
67     def __init__(self, coll_writer, name):
68         super(_WriterFile, self).__init__(name, 'wb')
69         self.dest = coll_writer
70
71     def close(self):
72         super(_WriterFile, self).close()
73         self.dest.finish_current_file()
74
75     @_FileLikeObjectBase._before_close
76     def write(self, data):
77         self.dest.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def writelines(self, seq):
81         for data in seq:
82             self.write(data)
83
84     @_FileLikeObjectBase._before_close
85     def flush(self):
86         self.dest.flush_data()
87
88
89 class CollectionWriter(CollectionBase):
90     def __init__(self, api_client=None, num_retries=0, replication=None):
91         """Instantiate a CollectionWriter.
92
93         CollectionWriter lets you build a new Arvados Collection from scratch.
94         Write files to it.  The CollectionWriter will upload data to Keep as
95         appropriate, and provide you with the Collection manifest text when
96         you're finished.
97
98         Arguments:
99         * api_client: The API client to use to look up Collections.  If not
100           provided, CollectionReader will build one from available Arvados
101           configuration.
102         * num_retries: The default number of times to retry failed
103           service requests.  Default 0.  You may change this value
104           after instantiation, but note those changes may not
105           propagate to related objects like the Keep client.
106         * replication: The number of copies of each block to store.
107           If this argument is None or not supplied, replication is
108           the server-provided default if available, otherwise 2.
109         """
110         self._api_client = api_client
111         self.num_retries = num_retries
112         self.replication = (2 if replication is None else replication)
113         self._keep_client = None
114         self._data_buffer = []
115         self._data_buffer_len = 0
116         self._current_stream_files = []
117         self._current_stream_length = 0
118         self._current_stream_locators = []
119         self._current_stream_name = '.'
120         self._current_file_name = None
121         self._current_file_pos = 0
122         self._finished_streams = []
123         self._close_file = None
124         self._queued_file = None
125         self._queued_dirents = deque()
126         self._queued_trees = deque()
127         self._last_open = None
128
129     def __exit__(self, exc_type, exc_value, traceback):
130         if exc_type is None:
131             self.finish()
132
133     def do_queued_work(self):
134         # The work queue consists of three pieces:
135         # * _queued_file: The file object we're currently writing to the
136         #   Collection.
137         # * _queued_dirents: Entries under the current directory
138         #   (_queued_trees[0]) that we want to write or recurse through.
139         #   This may contain files from subdirectories if
140         #   max_manifest_depth == 0 for this directory.
141         # * _queued_trees: Directories that should be written as separate
142         #   streams to the Collection.
143         # This function handles the smallest piece of work currently queued
144         # (current file, then current directory, then next directory) until
145         # no work remains.  The _work_THING methods each do a unit of work on
146         # THING.  _queue_THING methods add a THING to the work queue.
147         while True:
148             if self._queued_file:
149                 self._work_file()
150             elif self._queued_dirents:
151                 self._work_dirents()
152             elif self._queued_trees:
153                 self._work_trees()
154             else:
155                 break
156
157     def _work_file(self):
158         while True:
159             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
160             if not buf:
161                 break
162             self.write(buf)
163         self.finish_current_file()
164         if self._close_file:
165             self._queued_file.close()
166         self._close_file = None
167         self._queued_file = None
168
169     def _work_dirents(self):
170         path, stream_name, max_manifest_depth = self._queued_trees[0]
171         if stream_name != self.current_stream_name():
172             self.start_new_stream(stream_name)
173         while self._queued_dirents:
174             dirent = self._queued_dirents.popleft()
175             target = os.path.join(path, dirent)
176             if os.path.isdir(target):
177                 self._queue_tree(target,
178                                  os.path.join(stream_name, dirent),
179                                  max_manifest_depth - 1)
180             else:
181                 self._queue_file(target, dirent)
182                 break
183         if not self._queued_dirents:
184             self._queued_trees.popleft()
185
186     def _work_trees(self):
187         path, stream_name, max_manifest_depth = self._queued_trees[0]
188         d = arvados.util.listdir_recursive(
189             path, max_depth = (None if max_manifest_depth == 0 else 0))
190         if d:
191             self._queue_dirents(stream_name, d)
192         else:
193             self._queued_trees.popleft()
194
195     def _queue_file(self, source, filename=None):
196         assert (self._queued_file is None), "tried to queue more than one file"
197         if not hasattr(source, 'read'):
198             source = open(source, 'rb')
199             self._close_file = True
200         else:
201             self._close_file = False
202         if filename is None:
203             filename = os.path.basename(source.name)
204         self.start_new_file(filename)
205         self._queued_file = source
206
207     def _queue_dirents(self, stream_name, dirents):
208         assert (not self._queued_dirents), "tried to queue more than one tree"
209         self._queued_dirents = deque(sorted(dirents))
210
211     def _queue_tree(self, path, stream_name, max_manifest_depth):
212         self._queued_trees.append((path, stream_name, max_manifest_depth))
213
214     def write_file(self, source, filename=None):
215         self._queue_file(source, filename)
216         self.do_queued_work()
217
218     def write_directory_tree(self,
219                              path, stream_name='.', max_manifest_depth=-1):
220         self._queue_tree(path, stream_name, max_manifest_depth)
221         self.do_queued_work()
222
223     def write(self, newdata):
224         if isinstance(newdata, bytes):
225             pass
226         elif isinstance(newdata, str):
227             newdata = newdata.encode()
228         elif hasattr(newdata, '__iter__'):
229             for s in newdata:
230                 self.write(s)
231             return
232         self._data_buffer.append(newdata)
233         self._data_buffer_len += len(newdata)
234         self._current_stream_length += len(newdata)
235         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
236             self.flush_data()
237
238     def open(self, streampath, filename=None):
239         """open(streampath[, filename]) -> file-like object
240
241         Pass in the path of a file to write to the Collection, either as a
242         single string or as two separate stream name and file name arguments.
243         This method returns a file-like object you can write to add it to the
244         Collection.
245
246         You may only have one file object from the Collection open at a time,
247         so be sure to close the object when you're done.  Using the object in
248         a with statement makes that easy::
249
250           with cwriter.open('./doc/page1.txt') as outfile:
251               outfile.write(page1_data)
252           with cwriter.open('./doc/page2.txt') as outfile:
253               outfile.write(page2_data)
254         """
255         if filename is None:
256             streampath, filename = split(streampath)
257         if self._last_open and not self._last_open.closed:
258             raise errors.AssertionError(
259                 "can't open '{}' when '{}' is still open".format(
260                     filename, self._last_open.name))
261         if streampath != self.current_stream_name():
262             self.start_new_stream(streampath)
263         self.set_current_file_name(filename)
264         self._last_open = _WriterFile(self, filename)
265         return self._last_open
266
267     def flush_data(self):
268         data_buffer = b''.join(self._data_buffer)
269         if data_buffer:
270             self._current_stream_locators.append(
271                 self._my_keep().put(
272                     data_buffer[0:config.KEEP_BLOCK_SIZE],
273                     copies=self.replication))
274             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
275             self._data_buffer_len = len(self._data_buffer[0])
276
277     def start_new_file(self, newfilename=None):
278         self.finish_current_file()
279         self.set_current_file_name(newfilename)
280
281     def set_current_file_name(self, newfilename):
282         if re.search(r'[\t\n]', newfilename):
283             raise errors.AssertionError(
284                 "Manifest filenames cannot contain whitespace: %s" %
285                 newfilename)
286         elif re.search(r'\x00', newfilename):
287             raise errors.AssertionError(
288                 "Manifest filenames cannot contain NUL characters: %s" %
289                 newfilename)
290         self._current_file_name = newfilename
291
292     def current_file_name(self):
293         return self._current_file_name
294
295     def finish_current_file(self):
296         if self._current_file_name is None:
297             if self._current_file_pos == self._current_stream_length:
298                 return
299             raise errors.AssertionError(
300                 "Cannot finish an unnamed file " +
301                 "(%d bytes at offset %d in '%s' stream)" %
302                 (self._current_stream_length - self._current_file_pos,
303                  self._current_file_pos,
304                  self._current_stream_name))
305         self._current_stream_files.append([
306                 self._current_file_pos,
307                 self._current_stream_length - self._current_file_pos,
308                 self._current_file_name])
309         self._current_file_pos = self._current_stream_length
310         self._current_file_name = None
311
312     def start_new_stream(self, newstreamname='.'):
313         self.finish_current_stream()
314         self.set_current_stream_name(newstreamname)
315
316     def set_current_stream_name(self, newstreamname):
317         if re.search(r'[\t\n]', newstreamname):
318             raise errors.AssertionError(
319                 "Manifest stream names cannot contain whitespace: '%s'" %
320                 (newstreamname))
321         self._current_stream_name = '.' if newstreamname=='' else newstreamname
322
323     def current_stream_name(self):
324         return self._current_stream_name
325
326     def finish_current_stream(self):
327         self.finish_current_file()
328         self.flush_data()
329         if not self._current_stream_files:
330             pass
331         elif self._current_stream_name is None:
332             raise errors.AssertionError(
333                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
334                 (self._current_stream_length, len(self._current_stream_files)))
335         else:
336             if not self._current_stream_locators:
337                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
338             self._finished_streams.append([self._current_stream_name,
339                                            self._current_stream_locators,
340                                            self._current_stream_files])
341         self._current_stream_files = []
342         self._current_stream_length = 0
343         self._current_stream_locators = []
344         self._current_stream_name = None
345         self._current_file_pos = 0
346         self._current_file_name = None
347
348     def finish(self):
349         """Store the manifest in Keep and return its locator.
350
351         This is useful for storing manifest fragments (task outputs)
352         temporarily in Keep during a Crunch job.
353
354         In other cases you should make a collection instead, by
355         sending manifest_text() to the API server's "create
356         collection" endpoint.
357         """
358         return self._my_keep().put(self.manifest_text().encode(),
359                                    copies=self.replication)
360
361     def portable_data_hash(self):
362         stripped = self.stripped_manifest().encode()
363         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
364
365     def manifest_text(self):
366         self.finish_current_stream()
367         manifest = ''
368
369         for stream in self._finished_streams:
370             if not re.search(r'^\.(/.*)?$', stream[0]):
371                 manifest += './'
372             manifest += stream[0].replace(' ', '\\040')
373             manifest += ' ' + ' '.join(stream[1])
374             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
375             manifest += "\n"
376
377         return manifest
378
379     def data_locators(self):
380         ret = []
381         for name, locators, files in self._finished_streams:
382             ret += locators
383         return ret
384
385
386 class ResumableCollectionWriter(CollectionWriter):
387     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
388                    '_current_stream_locators', '_current_stream_name',
389                    '_current_file_name', '_current_file_pos', '_close_file',
390                    '_data_buffer', '_dependencies', '_finished_streams',
391                    '_queued_dirents', '_queued_trees']
392
393     def __init__(self, api_client=None, **kwargs):
394         self._dependencies = {}
395         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
396
397     @classmethod
398     def from_state(cls, state, *init_args, **init_kwargs):
399         # Try to build a new writer from scratch with the given state.
400         # If the state is not suitable to resume (because files have changed,
401         # been deleted, aren't predictable, etc.), raise a
402         # StaleWriterStateError.  Otherwise, return the initialized writer.
403         # The caller is responsible for calling writer.do_queued_work()
404         # appropriately after it's returned.
405         writer = cls(*init_args, **init_kwargs)
406         for attr_name in cls.STATE_PROPS:
407             attr_value = state[attr_name]
408             attr_class = getattr(writer, attr_name).__class__
409             # Coerce the value into the same type as the initial value, if
410             # needed.
411             if attr_class not in (type(None), attr_value.__class__):
412                 attr_value = attr_class(attr_value)
413             setattr(writer, attr_name, attr_value)
414         # Check dependencies before we try to resume anything.
415         if any(KeepLocator(ls).permission_expired()
416                for ls in writer._current_stream_locators):
417             raise errors.StaleWriterStateError(
418                 "locators include expired permission hint")
419         writer.check_dependencies()
420         if state['_current_file'] is not None:
421             path, pos = state['_current_file']
422             try:
423                 writer._queued_file = open(path, 'rb')
424                 writer._queued_file.seek(pos)
425             except IOError as error:
426                 raise errors.StaleWriterStateError(
427                     "failed to reopen active file {}: {}".format(path, error))
428         return writer
429
430     def check_dependencies(self):
431         for path, orig_stat in listitems(self._dependencies):
432             if not S_ISREG(orig_stat[ST_MODE]):
433                 raise errors.StaleWriterStateError("{} not file".format(path))
434             try:
435                 now_stat = tuple(os.stat(path))
436             except OSError as error:
437                 raise errors.StaleWriterStateError(
438                     "failed to stat {}: {}".format(path, error))
439             if ((not S_ISREG(now_stat[ST_MODE])) or
440                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
441                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
442                 raise errors.StaleWriterStateError("{} changed".format(path))
443
444     def dump_state(self, copy_func=lambda x: x):
445         state = {attr: copy_func(getattr(self, attr))
446                  for attr in self.STATE_PROPS}
447         if self._queued_file is None:
448             state['_current_file'] = None
449         else:
450             state['_current_file'] = (os.path.realpath(self._queued_file.name),
451                                       self._queued_file.tell())
452         return state
453
454     def _queue_file(self, source, filename=None):
455         try:
456             src_path = os.path.realpath(source)
457         except Exception:
458             raise errors.AssertionError("{} not a file path".format(source))
459         try:
460             path_stat = os.stat(src_path)
461         except OSError as stat_error:
462             path_stat = None
463         super(ResumableCollectionWriter, self)._queue_file(source, filename)
464         fd_stat = os.fstat(self._queued_file.fileno())
465         if not S_ISREG(fd_stat.st_mode):
466             # We won't be able to resume from this cache anyway, so don't
467             # worry about further checks.
468             self._dependencies[source] = tuple(fd_stat)
469         elif path_stat is None:
470             raise errors.AssertionError(
471                 "could not stat {}: {}".format(source, stat_error))
472         elif path_stat.st_ino != fd_stat.st_ino:
473             raise errors.AssertionError(
474                 "{} changed between open and stat calls".format(source))
475         else:
476             self._dependencies[src_path] = tuple(fd_stat)
477
478     def write(self, data):
479         if self._queued_file is None:
480             raise errors.AssertionError(
481                 "resumable writer can't accept unsourced data")
482         return super(ResumableCollectionWriter, self).write(data)
483
484
485 ADD = "add"
486 DEL = "del"
487 MOD = "mod"
488 TOK = "tok"
489 FILE = "file"
490 COLLECTION = "collection"
491
492 class RichCollectionBase(CollectionBase):
493     """Base class for Collections and Subcollections.
494
495     Implements the majority of functionality relating to accessing items in the
496     Collection.
497
498     """
499
500     def __init__(self, parent=None):
501         self.parent = parent
502         self._committed = False
503         self._callback = None
504         self._items = {}
505
506     def _my_api(self):
507         raise NotImplementedError()
508
509     def _my_keep(self):
510         raise NotImplementedError()
511
512     def _my_block_manager(self):
513         raise NotImplementedError()
514
515     def writable(self):
516         raise NotImplementedError()
517
518     def root_collection(self):
519         raise NotImplementedError()
520
521     def notify(self, event, collection, name, item):
522         raise NotImplementedError()
523
524     def stream_name(self):
525         raise NotImplementedError()
526
527     @must_be_writable
528     @synchronized
529     def find_or_create(self, path, create_type):
530         """Recursively search the specified file path.
531
532         May return either a `Collection` or `ArvadosFile`.  If not found, will
533         create a new item at the specified path based on `create_type`.  Will
534         create intermediate subcollections needed to contain the final item in
535         the path.
536
537         :create_type:
538           One of `arvados.collection.FILE` or
539           `arvados.collection.COLLECTION`.  If the path is not found, and value
540           of create_type is FILE then create and return a new ArvadosFile for
541           the last path component.  If COLLECTION, then create and return a new
542           Collection for the last path component.
543
544         """
545
546         pathcomponents = path.split("/", 1)
547         if pathcomponents[0]:
548             item = self._items.get(pathcomponents[0])
549             if len(pathcomponents) == 1:
550                 if item is None:
551                     # create new file
552                     if create_type == COLLECTION:
553                         item = Subcollection(self, pathcomponents[0])
554                     else:
555                         item = ArvadosFile(self, pathcomponents[0])
556                     self._items[pathcomponents[0]] = item
557                     self.set_committed(False)
558                     self.notify(ADD, self, pathcomponents[0], item)
559                 return item
560             else:
561                 if item is None:
562                     # create new collection
563                     item = Subcollection(self, pathcomponents[0])
564                     self._items[pathcomponents[0]] = item
565                     self.set_committed(False)
566                     self.notify(ADD, self, pathcomponents[0], item)
567                 if isinstance(item, RichCollectionBase):
568                     return item.find_or_create(pathcomponents[1], create_type)
569                 else:
570                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
571         else:
572             return self
573
574     @synchronized
575     def find(self, path):
576         """Recursively search the specified file path.
577
578         May return either a Collection or ArvadosFile. Return None if not
579         found.
580         If path is invalid (ex: starts with '/'), an IOError exception will be
581         raised.
582
583         """
584         if not path:
585             raise errors.ArgumentError("Parameter 'path' is empty.")
586
587         pathcomponents = path.split("/", 1)
588         if pathcomponents[0] == '':
589             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
590
591         item = self._items.get(pathcomponents[0])
592         if item is None:
593             return None
594         elif len(pathcomponents) == 1:
595             return item
596         else:
597             if isinstance(item, RichCollectionBase):
598                 if pathcomponents[1]:
599                     return item.find(pathcomponents[1])
600                 else:
601                     return item
602             else:
603                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
604
605     @synchronized
606     def mkdirs(self, path):
607         """Recursive subcollection create.
608
609         Like `os.makedirs()`.  Will create intermediate subcollections needed
610         to contain the leaf subcollection path.
611
612         """
613
614         if self.find(path) != None:
615             raise IOError(errno.EEXIST, "Directory or file exists", path)
616
617         return self.find_or_create(path, COLLECTION)
618
619     def open(self, path, mode="r"):
620         """Open a file-like object for access.
621
622         :path:
623           path to a file in the collection
624         :mode:
625           a string consisting of "r", "w", or "a", optionally followed
626           by "b" or "t", optionally followed by "+".
627           :"b":
628             binary mode: write() accepts bytes, read() returns bytes.
629           :"t":
630             text mode (default): write() accepts strings, read() returns strings.
631           :"r":
632             opens for reading
633           :"r+":
634             opens for reading and writing.  Reads/writes share a file pointer.
635           :"w", "w+":
636             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
637           :"a", "a+":
638             opens for reading and writing.  All writes are appended to
639             the end of the file.  Writing does not affect the file pointer for
640             reading.
641         """
642
643         if not re.search(r'^[rwa][bt]?\+?$', mode):
644             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
645
646         if mode[0] == 'r' and '+' not in mode:
647             fclass = ArvadosFileReader
648             arvfile = self.find(path)
649         elif not self.writable():
650             raise IOError(errno.EROFS, "Collection is read only")
651         else:
652             fclass = ArvadosFileWriter
653             arvfile = self.find_or_create(path, FILE)
654
655         if arvfile is None:
656             raise IOError(errno.ENOENT, "File not found", path)
657         if not isinstance(arvfile, ArvadosFile):
658             raise IOError(errno.EISDIR, "Is a directory", path)
659
660         if mode[0] == 'w':
661             arvfile.truncate(0)
662
663         return fclass(arvfile, mode=mode, num_retries=self.num_retries)
664
665     def modified(self):
666         """Determine if the collection has been modified since last commited."""
667         return not self.committed()
668
669     @synchronized
670     def committed(self):
671         """Determine if the collection has been committed to the API server."""
672         return self._committed
673
674     @synchronized
675     def set_committed(self, value=True):
676         """Recursively set committed flag.
677
678         If value is True, set committed to be True for this and all children.
679
680         If value is False, set committed to be False for this and all parents.
681         """
682         if value == self._committed:
683             return
684         if value:
685             for k,v in listitems(self._items):
686                 v.set_committed(True)
687             self._committed = True
688         else:
689             self._committed = False
690             if self.parent is not None:
691                 self.parent.set_committed(False)
692
693     @synchronized
694     def __iter__(self):
695         """Iterate over names of files and collections contained in this collection."""
696         return iter(viewkeys(self._items))
697
698     @synchronized
699     def __getitem__(self, k):
700         """Get a file or collection that is directly contained by this collection.
701
702         If you want to search a path, use `find()` instead.
703
704         """
705         return self._items[k]
706
707     @synchronized
708     def __contains__(self, k):
709         """Test if there is a file or collection a directly contained by this collection."""
710         return k in self._items
711
712     @synchronized
713     def __len__(self):
714         """Get the number of items directly contained in this collection."""
715         return len(self._items)
716
717     @must_be_writable
718     @synchronized
719     def __delitem__(self, p):
720         """Delete an item by name which is directly contained by this collection."""
721         del self._items[p]
722         self.set_committed(False)
723         self.notify(DEL, self, p, None)
724
725     @synchronized
726     def keys(self):
727         """Get a list of names of files and collections directly contained in this collection."""
728         return self._items.keys()
729
730     @synchronized
731     def values(self):
732         """Get a list of files and collection objects directly contained in this collection."""
733         return listvalues(self._items)
734
735     @synchronized
736     def items(self):
737         """Get a list of (name, object) tuples directly contained in this collection."""
738         return listitems(self._items)
739
740     def exists(self, path):
741         """Test if there is a file or collection at `path`."""
742         return self.find(path) is not None
743
744     @must_be_writable
745     @synchronized
746     def remove(self, path, recursive=False):
747         """Remove the file or subcollection (directory) at `path`.
748
749         :recursive:
750           Specify whether to remove non-empty subcollections (True), or raise an error (False).
751         """
752
753         if not path:
754             raise errors.ArgumentError("Parameter 'path' is empty.")
755
756         pathcomponents = path.split("/", 1)
757         item = self._items.get(pathcomponents[0])
758         if item is None:
759             raise IOError(errno.ENOENT, "File not found", path)
760         if len(pathcomponents) == 1:
761             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
762                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
763             deleteditem = self._items[pathcomponents[0]]
764             del self._items[pathcomponents[0]]
765             self.set_committed(False)
766             self.notify(DEL, self, pathcomponents[0], deleteditem)
767         else:
768             item.remove(pathcomponents[1])
769
770     def _clonefrom(self, source):
771         for k,v in listitems(source):
772             self._items[k] = v.clone(self, k)
773
774     def clone(self):
775         raise NotImplementedError()
776
777     @must_be_writable
778     @synchronized
779     def add(self, source_obj, target_name, overwrite=False, reparent=False):
780         """Copy or move a file or subcollection to this collection.
781
782         :source_obj:
783           An ArvadosFile, or Subcollection object
784
785         :target_name:
786           Destination item name.  If the target name already exists and is a
787           file, this will raise an error unless you specify `overwrite=True`.
788
789         :overwrite:
790           Whether to overwrite target file if it already exists.
791
792         :reparent:
793           If True, source_obj will be moved from its parent collection to this collection.
794           If False, source_obj will be copied and the parent collection will be
795           unmodified.
796
797         """
798
799         if target_name in self and not overwrite:
800             raise IOError(errno.EEXIST, "File already exists", target_name)
801
802         modified_from = None
803         if target_name in self:
804             modified_from = self[target_name]
805
806         # Actually make the move or copy.
807         if reparent:
808             source_obj._reparent(self, target_name)
809             item = source_obj
810         else:
811             item = source_obj.clone(self, target_name)
812
813         self._items[target_name] = item
814         self.set_committed(False)
815
816         if modified_from:
817             self.notify(MOD, self, target_name, (modified_from, item))
818         else:
819             self.notify(ADD, self, target_name, item)
820
821     def _get_src_target(self, source, target_path, source_collection, create_dest):
822         if source_collection is None:
823             source_collection = self
824
825         # Find the object
826         if isinstance(source, basestring):
827             source_obj = source_collection.find(source)
828             if source_obj is None:
829                 raise IOError(errno.ENOENT, "File not found", source)
830             sourcecomponents = source.split("/")
831         else:
832             source_obj = source
833             sourcecomponents = None
834
835         # Find parent collection the target path
836         targetcomponents = target_path.split("/")
837
838         # Determine the name to use.
839         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
840
841         if not target_name:
842             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
843
844         if create_dest:
845             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
846         else:
847             if len(targetcomponents) > 1:
848                 target_dir = self.find("/".join(targetcomponents[0:-1]))
849             else:
850                 target_dir = self
851
852         if target_dir is None:
853             raise IOError(errno.ENOENT, "Target directory not found", target_name)
854
855         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
856             target_dir = target_dir[target_name]
857             target_name = sourcecomponents[-1]
858
859         return (source_obj, target_dir, target_name)
860
861     @must_be_writable
862     @synchronized
863     def copy(self, source, target_path, source_collection=None, overwrite=False):
864         """Copy a file or subcollection to a new path in this collection.
865
866         :source:
867           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
868
869         :target_path:
870           Destination file or path.  If the target path already exists and is a
871           subcollection, the item will be placed inside the subcollection.  If
872           the target path already exists and is a file, this will raise an error
873           unless you specify `overwrite=True`.
874
875         :source_collection:
876           Collection to copy `source_path` from (default `self`)
877
878         :overwrite:
879           Whether to overwrite target file if it already exists.
880         """
881
882         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
883         target_dir.add(source_obj, target_name, overwrite, False)
884
885     @must_be_writable
886     @synchronized
887     def rename(self, source, target_path, source_collection=None, overwrite=False):
888         """Move a file or subcollection from `source_collection` to a new path in this collection.
889
890         :source:
891           A string with a path to source file or subcollection.
892
893         :target_path:
894           Destination file or path.  If the target path already exists and is a
895           subcollection, the item will be placed inside the subcollection.  If
896           the target path already exists and is a file, this will raise an error
897           unless you specify `overwrite=True`.
898
899         :source_collection:
900           Collection to copy `source_path` from (default `self`)
901
902         :overwrite:
903           Whether to overwrite target file if it already exists.
904         """
905
906         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
907         if not source_obj.writable():
908             raise IOError(errno.EROFS, "Source collection is read only", source)
909         target_dir.add(source_obj, target_name, overwrite, True)
910
911     def portable_manifest_text(self, stream_name="."):
912         """Get the manifest text for this collection, sub collections and files.
913
914         This method does not flush outstanding blocks to Keep.  It will return
915         a normalized manifest with access tokens stripped.
916
917         :stream_name:
918           Name to use for this stream (directory)
919
920         """
921         return self._get_manifest_text(stream_name, True, True)
922
923     @synchronized
924     def manifest_text(self, stream_name=".", strip=False, normalize=False,
925                       only_committed=False):
926         """Get the manifest text for this collection, sub collections and files.
927
928         This method will flush outstanding blocks to Keep.  By default, it will
929         not normalize an unmodified manifest or strip access tokens.
930
931         :stream_name:
932           Name to use for this stream (directory)
933
934         :strip:
935           If True, remove signing tokens from block locators if present.
936           If False (default), block locators are left unchanged.
937
938         :normalize:
939           If True, always export the manifest text in normalized form
940           even if the Collection is not modified.  If False (default) and the collection
941           is not modified, return the original manifest text even if it is not
942           in normalized form.
943
944         :only_committed:
945           If True, don't commit pending blocks.
946
947         """
948
949         if not only_committed:
950             self._my_block_manager().commit_all()
951         return self._get_manifest_text(stream_name, strip, normalize,
952                                        only_committed=only_committed)
953
954     @synchronized
955     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
956         """Get the manifest text for this collection, sub collections and files.
957
958         :stream_name:
959           Name to use for this stream (directory)
960
961         :strip:
962           If True, remove signing tokens from block locators if present.
963           If False (default), block locators are left unchanged.
964
965         :normalize:
966           If True, always export the manifest text in normalized form
967           even if the Collection is not modified.  If False (default) and the collection
968           is not modified, return the original manifest text even if it is not
969           in normalized form.
970
971         :only_committed:
972           If True, only include blocks that were already committed to Keep.
973
974         """
975
976         if not self.committed() or self._manifest_text is None or normalize:
977             stream = {}
978             buf = []
979             sorted_keys = sorted(self.keys())
980             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
981                 # Create a stream per file `k`
982                 arvfile = self[filename]
983                 filestream = []
984                 for segment in arvfile.segments():
985                     loc = segment.locator
986                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
987                         if only_committed:
988                             continue
989                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
990                     if strip:
991                         loc = KeepLocator(loc).stripped()
992                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
993                                          segment.segment_offset, segment.range_size))
994                 stream[filename] = filestream
995             if stream:
996                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
997             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
998                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
999             return "".join(buf)
1000         else:
1001             if strip:
1002                 return self.stripped_manifest()
1003             else:
1004                 return self._manifest_text
1005
1006     @synchronized
1007     def diff(self, end_collection, prefix=".", holding_collection=None):
1008         """Generate list of add/modify/delete actions.
1009
1010         When given to `apply`, will change `self` to match `end_collection`
1011
1012         """
1013         changes = []
1014         if holding_collection is None:
1015             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1016         for k in self:
1017             if k not in end_collection:
1018                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1019         for k in end_collection:
1020             if k in self:
1021                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1022                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1023                 elif end_collection[k] != self[k]:
1024                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1025                 else:
1026                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1027             else:
1028                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1029         return changes
1030
1031     @must_be_writable
1032     @synchronized
1033     def apply(self, changes):
1034         """Apply changes from `diff`.
1035
1036         If a change conflicts with a local change, it will be saved to an
1037         alternate path indicating the conflict.
1038
1039         """
1040         if changes:
1041             self.set_committed(False)
1042         for change in changes:
1043             event_type = change[0]
1044             path = change[1]
1045             initial = change[2]
1046             local = self.find(path)
1047             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1048                                                                     time.gmtime()))
1049             if event_type == ADD:
1050                 if local is None:
1051                     # No local file at path, safe to copy over new file
1052                     self.copy(initial, path)
1053                 elif local is not None and local != initial:
1054                     # There is already local file and it is different:
1055                     # save change to conflict file.
1056                     self.copy(initial, conflictpath)
1057             elif event_type == MOD or event_type == TOK:
1058                 final = change[3]
1059                 if local == initial:
1060                     # Local matches the "initial" item so it has not
1061                     # changed locally and is safe to update.
1062                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1063                         # Replace contents of local file with new contents
1064                         local.replace_contents(final)
1065                     else:
1066                         # Overwrite path with new item; this can happen if
1067                         # path was a file and is now a collection or vice versa
1068                         self.copy(final, path, overwrite=True)
1069                 else:
1070                     # Local is missing (presumably deleted) or local doesn't
1071                     # match the "start" value, so save change to conflict file
1072                     self.copy(final, conflictpath)
1073             elif event_type == DEL:
1074                 if local == initial:
1075                     # Local item matches "initial" value, so it is safe to remove.
1076                     self.remove(path, recursive=True)
1077                 # else, the file is modified or already removed, in either
1078                 # case we don't want to try to remove it.
1079
1080     def portable_data_hash(self):
1081         """Get the portable data hash for this collection's manifest."""
1082         if self._manifest_locator and self.committed():
1083             # If the collection is already saved on the API server, and it's committed
1084             # then return API server's PDH response.
1085             return self._portable_data_hash
1086         else:
1087             stripped = self.portable_manifest_text().encode()
1088             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1089
1090     @synchronized
1091     def subscribe(self, callback):
1092         if self._callback is None:
1093             self._callback = callback
1094         else:
1095             raise errors.ArgumentError("A callback is already set on this collection.")
1096
1097     @synchronized
1098     def unsubscribe(self):
1099         if self._callback is not None:
1100             self._callback = None
1101
1102     @synchronized
1103     def notify(self, event, collection, name, item):
1104         if self._callback:
1105             self._callback(event, collection, name, item)
1106         self.root_collection().notify(event, collection, name, item)
1107
1108     @synchronized
1109     def __eq__(self, other):
1110         if other is self:
1111             return True
1112         if not isinstance(other, RichCollectionBase):
1113             return False
1114         if len(self._items) != len(other):
1115             return False
1116         for k in self._items:
1117             if k not in other:
1118                 return False
1119             if self._items[k] != other[k]:
1120                 return False
1121         return True
1122
1123     def __ne__(self, other):
1124         return not self.__eq__(other)
1125
1126     @synchronized
1127     def flush(self):
1128         """Flush bufferblocks to Keep."""
1129         for e in listvalues(self):
1130             e.flush()
1131
1132
1133 class Collection(RichCollectionBase):
1134     """Represents the root of an Arvados Collection.
1135
1136     This class is threadsafe.  The root collection object, all subcollections
1137     and files are protected by a single lock (i.e. each access locks the entire
1138     collection).
1139
1140     Brief summary of
1141     useful methods:
1142
1143     :To read an existing file:
1144       `c.open("myfile", "r")`
1145
1146     :To write a new file:
1147       `c.open("myfile", "w")`
1148
1149     :To determine if a file exists:
1150       `c.find("myfile") is not None`
1151
1152     :To copy a file:
1153       `c.copy("source", "dest")`
1154
1155     :To delete a file:
1156       `c.remove("myfile")`
1157
1158     :To save to an existing collection record:
1159       `c.save()`
1160
1161     :To save a new collection record:
1162     `c.save_new()`
1163
1164     :To merge remote changes into this object:
1165       `c.update()`
1166
1167     Must be associated with an API server Collection record (during
1168     initialization, or using `save_new`) to use `save` or `update`
1169
1170     """
1171
1172     def __init__(self, manifest_locator_or_text=None,
1173                  api_client=None,
1174                  keep_client=None,
1175                  num_retries=None,
1176                  parent=None,
1177                  apiconfig=None,
1178                  block_manager=None,
1179                  replication_desired=None,
1180                  put_threads=None):
1181         """Collection constructor.
1182
1183         :manifest_locator_or_text:
1184           One of Arvados collection UUID, block locator of
1185           a manifest, raw manifest text, or None (to create an empty collection).
1186         :parent:
1187           the parent Collection, may be None.
1188
1189         :apiconfig:
1190           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1191           Prefer this over supplying your own api_client and keep_client (except in testing).
1192           Will use default config settings if not specified.
1193
1194         :api_client:
1195           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1196
1197         :keep_client:
1198           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1199
1200         :num_retries:
1201           the number of retries for API and Keep requests.
1202
1203         :block_manager:
1204           the block manager to use.  If not specified, create one.
1205
1206         :replication_desired:
1207           How many copies should Arvados maintain. If None, API server default
1208           configuration applies. If not None, this value will also be used
1209           for determining the number of block copies being written.
1210
1211         """
1212         super(Collection, self).__init__(parent)
1213         self._api_client = api_client
1214         self._keep_client = keep_client
1215         self._block_manager = block_manager
1216         self.replication_desired = replication_desired
1217         self.put_threads = put_threads
1218
1219         if apiconfig:
1220             self._config = apiconfig
1221         else:
1222             self._config = config.settings()
1223
1224         self.num_retries = num_retries if num_retries is not None else 0
1225         self._manifest_locator = None
1226         self._manifest_text = None
1227         self._portable_data_hash = None
1228         self._api_response = None
1229         self._past_versions = set()
1230
1231         self.lock = threading.RLock()
1232         self.events = None
1233
1234         if manifest_locator_or_text:
1235             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1236                 self._manifest_locator = manifest_locator_or_text
1237             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1238                 self._manifest_locator = manifest_locator_or_text
1239             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1240                 self._manifest_text = manifest_locator_or_text
1241             else:
1242                 raise errors.ArgumentError(
1243                     "Argument to CollectionReader is not a manifest or a collection UUID")
1244
1245             try:
1246                 self._populate()
1247             except (IOError, errors.SyntaxError) as e:
1248                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1249
1250     def root_collection(self):
1251         return self
1252
1253     def stream_name(self):
1254         return "."
1255
1256     def writable(self):
1257         return True
1258
1259     @synchronized
1260     def known_past_version(self, modified_at_and_portable_data_hash):
1261         return modified_at_and_portable_data_hash in self._past_versions
1262
1263     @synchronized
1264     @retry_method
1265     def update(self, other=None, num_retries=None):
1266         """Merge the latest collection on the API server with the current collection."""
1267
1268         if other is None:
1269             if self._manifest_locator is None:
1270                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1271             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1272             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1273                 response.get("portable_data_hash") != self.portable_data_hash()):
1274                 # The record on the server is different from our current one, but we've seen it before,
1275                 # so ignore it because it's already been merged.
1276                 # However, if it's the same as our current record, proceed with the update, because we want to update
1277                 # our tokens.
1278                 return
1279             else:
1280                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1281             other = CollectionReader(response["manifest_text"])
1282         baseline = CollectionReader(self._manifest_text)
1283         self.apply(baseline.diff(other))
1284         self._manifest_text = self.manifest_text()
1285
1286     @synchronized
1287     def _my_api(self):
1288         if self._api_client is None:
1289             self._api_client = ThreadSafeApiCache(self._config)
1290             if self._keep_client is None:
1291                 self._keep_client = self._api_client.keep
1292         return self._api_client
1293
1294     @synchronized
1295     def _my_keep(self):
1296         if self._keep_client is None:
1297             if self._api_client is None:
1298                 self._my_api()
1299             else:
1300                 self._keep_client = KeepClient(api_client=self._api_client)
1301         return self._keep_client
1302
1303     @synchronized
1304     def _my_block_manager(self):
1305         if self._block_manager is None:
1306             copies = (self.replication_desired or
1307                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1308                                                    2))
1309             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1310         return self._block_manager
1311
1312     def _remember_api_response(self, response):
1313         self._api_response = response
1314         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1315
1316     def _populate_from_api_server(self):
1317         # As in KeepClient itself, we must wait until the last
1318         # possible moment to instantiate an API client, in order to
1319         # avoid tripping up clients that don't have access to an API
1320         # server.  If we do build one, make sure our Keep client uses
1321         # it.  If instantiation fails, we'll fall back to the except
1322         # clause, just like any other Collection lookup
1323         # failure. Return an exception, or None if successful.
1324         try:
1325             self._remember_api_response(self._my_api().collections().get(
1326                 uuid=self._manifest_locator).execute(
1327                     num_retries=self.num_retries))
1328             self._manifest_text = self._api_response['manifest_text']
1329             self._portable_data_hash = self._api_response['portable_data_hash']
1330             # If not overriden via kwargs, we should try to load the
1331             # replication_desired from the API server
1332             if self.replication_desired is None:
1333                 self.replication_desired = self._api_response.get('replication_desired', None)
1334             return None
1335         except Exception as e:
1336             return e
1337
1338     def _populate_from_keep(self):
1339         # Retrieve a manifest directly from Keep. This has a chance of
1340         # working if [a] the locator includes a permission signature
1341         # or [b] the Keep services are operating in world-readable
1342         # mode. Return an exception, or None if successful.
1343         try:
1344             self._manifest_text = self._my_keep().get(
1345                 self._manifest_locator, num_retries=self.num_retries).decode()
1346         except Exception as e:
1347             return e
1348
1349     def _populate(self):
1350         if self._manifest_locator is None and self._manifest_text is None:
1351             return
1352         error_via_api = None
1353         error_via_keep = None
1354         should_try_keep = ((self._manifest_text is None) and
1355                            arvados.util.keep_locator_pattern.match(
1356                                self._manifest_locator))
1357         if ((self._manifest_text is None) and
1358             arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1359             error_via_keep = self._populate_from_keep()
1360         if self._manifest_text is None:
1361             error_via_api = self._populate_from_api_server()
1362             if error_via_api is not None and not should_try_keep:
1363                 raise error_via_api
1364         if ((self._manifest_text is None) and
1365             not error_via_keep and
1366             should_try_keep):
1367             # Looks like a keep locator, and we didn't already try keep above
1368             error_via_keep = self._populate_from_keep()
1369         if self._manifest_text is None:
1370             # Nothing worked!
1371             raise errors.NotFoundError(
1372                 ("Failed to retrieve collection '{}' " +
1373                  "from either API server ({}) or Keep ({})."
1374                  ).format(
1375                     self._manifest_locator,
1376                     error_via_api,
1377                     error_via_keep))
1378         # populate
1379         self._baseline_manifest = self._manifest_text
1380         self._import_manifest(self._manifest_text)
1381
1382
1383     def _has_collection_uuid(self):
1384         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1385
1386     def __enter__(self):
1387         return self
1388
1389     def __exit__(self, exc_type, exc_value, traceback):
1390         """Support scoped auto-commit in a with: block."""
1391         if exc_type is None:
1392             if self.writable() and self._has_collection_uuid():
1393                 self.save()
1394         self.stop_threads()
1395
1396     def stop_threads(self):
1397         if self._block_manager is not None:
1398             self._block_manager.stop_threads()
1399
1400     @synchronized
1401     def manifest_locator(self):
1402         """Get the manifest locator, if any.
1403
1404         The manifest locator will be set when the collection is loaded from an
1405         API server record or the portable data hash of a manifest.
1406
1407         The manifest locator will be None if the collection is newly created or
1408         was created directly from manifest text.  The method `save_new()` will
1409         assign a manifest locator.
1410
1411         """
1412         return self._manifest_locator
1413
1414     @synchronized
1415     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1416         if new_config is None:
1417             new_config = self._config
1418         if readonly:
1419             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1420         else:
1421             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1422
1423         newcollection._clonefrom(self)
1424         return newcollection
1425
1426     @synchronized
1427     def api_response(self):
1428         """Returns information about this Collection fetched from the API server.
1429
1430         If the Collection exists in Keep but not the API server, currently
1431         returns None.  Future versions may provide a synthetic response.
1432
1433         """
1434         return self._api_response
1435
1436     def find_or_create(self, path, create_type):
1437         """See `RichCollectionBase.find_or_create`"""
1438         if path == ".":
1439             return self
1440         else:
1441             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1442
1443     def find(self, path):
1444         """See `RichCollectionBase.find`"""
1445         if path == ".":
1446             return self
1447         else:
1448             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1449
1450     def remove(self, path, recursive=False):
1451         """See `RichCollectionBase.remove`"""
1452         if path == ".":
1453             raise errors.ArgumentError("Cannot remove '.'")
1454         else:
1455             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1456
1457     @must_be_writable
1458     @synchronized
1459     @retry_method
1460     def save(self, merge=True, num_retries=None):
1461         """Save collection to an existing collection record.
1462
1463         Commit pending buffer blocks to Keep, merge with remote record (if
1464         merge=True, the default), and update the collection record.  Returns
1465         the current manifest text.
1466
1467         Will raise AssertionError if not associated with a collection record on
1468         the API server.  If you want to save a manifest to Keep only, see
1469         `save_new()`.
1470
1471         :merge:
1472           Update and merge remote changes before saving.  Otherwise, any
1473           remote changes will be ignored and overwritten.
1474
1475         :num_retries:
1476           Retry count on API calls (if None,  use the collection default)
1477
1478         """
1479         if not self.committed():
1480             if not self._has_collection_uuid():
1481                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1482
1483             self._my_block_manager().commit_all()
1484
1485             if merge:
1486                 self.update()
1487
1488             text = self.manifest_text(strip=False)
1489             self._remember_api_response(self._my_api().collections().update(
1490                 uuid=self._manifest_locator,
1491                 body={'manifest_text': text}
1492                 ).execute(
1493                     num_retries=num_retries))
1494             self._manifest_text = self._api_response["manifest_text"]
1495             self._portable_data_hash = self._api_response["portable_data_hash"]
1496             self.set_committed(True)
1497
1498         return self._manifest_text
1499
1500
1501     @must_be_writable
1502     @synchronized
1503     @retry_method
1504     def save_new(self, name=None,
1505                  create_collection_record=True,
1506                  owner_uuid=None,
1507                  ensure_unique_name=False,
1508                  num_retries=None):
1509         """Save collection to a new collection record.
1510
1511         Commit pending buffer blocks to Keep and, when create_collection_record
1512         is True (default), create a new collection record.  After creating a
1513         new collection record, this Collection object will be associated with
1514         the new record used by `save()`.  Returns the current manifest text.
1515
1516         :name:
1517           The collection name.
1518
1519         :create_collection_record:
1520            If True, create a collection record on the API server.
1521            If False, only commit blocks to Keep and return the manifest text.
1522
1523         :owner_uuid:
1524           the user, or project uuid that will own this collection.
1525           If None, defaults to the current user.
1526
1527         :ensure_unique_name:
1528           If True, ask the API server to rename the collection
1529           if it conflicts with a collection with the same name and owner.  If
1530           False, a name conflict will result in an error.
1531
1532         :num_retries:
1533           Retry count on API calls (if None,  use the collection default)
1534
1535         """
1536         self._my_block_manager().commit_all()
1537         text = self.manifest_text(strip=False)
1538
1539         if create_collection_record:
1540             if name is None:
1541                 name = "New collection"
1542                 ensure_unique_name = True
1543
1544             body = {"manifest_text": text,
1545                     "name": name,
1546                     "replication_desired": self.replication_desired}
1547             if owner_uuid:
1548                 body["owner_uuid"] = owner_uuid
1549
1550             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1551             text = self._api_response["manifest_text"]
1552
1553             self._manifest_locator = self._api_response["uuid"]
1554             self._portable_data_hash = self._api_response["portable_data_hash"]
1555
1556             self._manifest_text = text
1557             self.set_committed(True)
1558
1559         return text
1560
1561     @synchronized
1562     def _import_manifest(self, manifest_text):
1563         """Import a manifest into a `Collection`.
1564
1565         :manifest_text:
1566           The manifest text to import from.
1567
1568         """
1569         if len(self) > 0:
1570             raise ArgumentError("Can only import manifest into an empty collection")
1571
1572         STREAM_NAME = 0
1573         BLOCKS = 1
1574         SEGMENTS = 2
1575
1576         stream_name = None
1577         state = STREAM_NAME
1578
1579         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1580             tok = token_and_separator.group(1)
1581             sep = token_and_separator.group(2)
1582
1583             if state == STREAM_NAME:
1584                 # starting a new stream
1585                 stream_name = tok.replace('\\040', ' ')
1586                 blocks = []
1587                 segments = []
1588                 streamoffset = 0
1589                 state = BLOCKS
1590                 self.find_or_create(stream_name, COLLECTION)
1591                 continue
1592
1593             if state == BLOCKS:
1594                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1595                 if block_locator:
1596                     blocksize = int(block_locator.group(1))
1597                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1598                     streamoffset += blocksize
1599                 else:
1600                     state = SEGMENTS
1601
1602             if state == SEGMENTS:
1603                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1604                 if file_segment:
1605                     pos = int(file_segment.group(1))
1606                     size = int(file_segment.group(2))
1607                     name = file_segment.group(3).replace('\\040', ' ')
1608                     filepath = os.path.join(stream_name, name)
1609                     afile = self.find_or_create(filepath, FILE)
1610                     if isinstance(afile, ArvadosFile):
1611                         afile.add_segment(blocks, pos, size)
1612                     else:
1613                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1614                 else:
1615                     # error!
1616                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1617
1618             if sep == "\n":
1619                 stream_name = None
1620                 state = STREAM_NAME
1621
1622         self.set_committed(True)
1623
1624     @synchronized
1625     def notify(self, event, collection, name, item):
1626         if self._callback:
1627             self._callback(event, collection, name, item)
1628
1629
1630 class Subcollection(RichCollectionBase):
1631     """This is a subdirectory within a collection that doesn't have its own API
1632     server record.
1633
1634     Subcollection locking falls under the umbrella lock of its root collection.
1635
1636     """
1637
1638     def __init__(self, parent, name):
1639         super(Subcollection, self).__init__(parent)
1640         self.lock = self.root_collection().lock
1641         self._manifest_text = None
1642         self.name = name
1643         self.num_retries = parent.num_retries
1644
1645     def root_collection(self):
1646         return self.parent.root_collection()
1647
1648     def writable(self):
1649         return self.root_collection().writable()
1650
1651     def _my_api(self):
1652         return self.root_collection()._my_api()
1653
1654     def _my_keep(self):
1655         return self.root_collection()._my_keep()
1656
1657     def _my_block_manager(self):
1658         return self.root_collection()._my_block_manager()
1659
1660     def stream_name(self):
1661         return os.path.join(self.parent.stream_name(), self.name)
1662
1663     @synchronized
1664     def clone(self, new_parent, new_name):
1665         c = Subcollection(new_parent, new_name)
1666         c._clonefrom(self)
1667         return c
1668
1669     @must_be_writable
1670     @synchronized
1671     def _reparent(self, newparent, newname):
1672         self.set_committed(False)
1673         self.flush()
1674         self.parent.remove(self.name, recursive=True)
1675         self.parent = newparent
1676         self.name = newname
1677         self.lock = self.parent.root_collection().lock
1678
1679
1680 class CollectionReader(Collection):
1681     """A read-only collection object.
1682
1683     Initialize from an api collection record locator, a portable data hash of a
1684     manifest, or raw manifest text.  See `Collection` constructor for detailed
1685     options.
1686
1687     """
1688     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1689         self._in_init = True
1690         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1691         self._in_init = False
1692
1693         # Forego any locking since it should never change once initialized.
1694         self.lock = NoopLock()
1695
1696         # Backwards compatability with old CollectionReader
1697         # all_streams() and all_files()
1698         self._streams = None
1699
1700     def writable(self):
1701         return self._in_init
1702
1703     def _populate_streams(orig_func):
1704         @functools.wraps(orig_func)
1705         def populate_streams_wrapper(self, *args, **kwargs):
1706             # Defer populating self._streams until needed since it creates a copy of the manifest.
1707             if self._streams is None:
1708                 if self._manifest_text:
1709                     self._streams = [sline.split()
1710                                      for sline in self._manifest_text.split("\n")
1711                                      if sline]
1712                 else:
1713                     self._streams = []
1714             return orig_func(self, *args, **kwargs)
1715         return populate_streams_wrapper
1716
1717     @_populate_streams
1718     def normalize(self):
1719         """Normalize the streams returned by `all_streams`.
1720
1721         This method is kept for backwards compatability and only affects the
1722         behavior of `all_streams()` and `all_files()`
1723
1724         """
1725
1726         # Rearrange streams
1727         streams = {}
1728         for s in self.all_streams():
1729             for f in s.all_files():
1730                 streamname, filename = split(s.name() + "/" + f.name())
1731                 if streamname not in streams:
1732                     streams[streamname] = {}
1733                 if filename not in streams[streamname]:
1734                     streams[streamname][filename] = []
1735                 for r in f.segments:
1736                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1737
1738         self._streams = [normalize_stream(s, streams[s])
1739                          for s in sorted(streams)]
1740     @_populate_streams
1741     def all_streams(self):
1742         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1743                 for s in self._streams]
1744
1745     @_populate_streams
1746     def all_files(self):
1747         for s in self.all_streams():
1748             for f in s.all_files():
1749                 yield f