Merge branch 'master' into 7162-support-service-types
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace")
311         self._current_stream_name = '.' if newstreamname=='' else newstreamname
312
313     def current_stream_name(self):
314         return self._current_stream_name
315
316     def finish_current_stream(self):
317         self.finish_current_file()
318         self.flush_data()
319         if not self._current_stream_files:
320             pass
321         elif self._current_stream_name is None:
322             raise errors.AssertionError(
323                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324                 (self._current_stream_length, len(self._current_stream_files)))
325         else:
326             if not self._current_stream_locators:
327                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328             self._finished_streams.append([self._current_stream_name,
329                                            self._current_stream_locators,
330                                            self._current_stream_files])
331         self._current_stream_files = []
332         self._current_stream_length = 0
333         self._current_stream_locators = []
334         self._current_stream_name = None
335         self._current_file_pos = 0
336         self._current_file_name = None
337
338     def finish(self):
339         """Store the manifest in Keep and return its locator.
340
341         This is useful for storing manifest fragments (task outputs)
342         temporarily in Keep during a Crunch job.
343
344         In other cases you should make a collection instead, by
345         sending manifest_text() to the API server's "create
346         collection" endpoint.
347         """
348         return self._my_keep().put(self.manifest_text(), copies=self.replication)
349
350     def portable_data_hash(self):
351         stripped = self.stripped_manifest()
352         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
353
354     def manifest_text(self):
355         self.finish_current_stream()
356         manifest = ''
357
358         for stream in self._finished_streams:
359             if not re.search(r'^\.(/.*)?$', stream[0]):
360                 manifest += './'
361             manifest += stream[0].replace(' ', '\\040')
362             manifest += ' ' + ' '.join(stream[1])
363             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
364             manifest += "\n"
365
366         return manifest
367
368     def data_locators(self):
369         ret = []
370         for name, locators, files in self._finished_streams:
371             ret += locators
372         return ret
373
374
375 class ResumableCollectionWriter(CollectionWriter):
376     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377                    '_current_stream_locators', '_current_stream_name',
378                    '_current_file_name', '_current_file_pos', '_close_file',
379                    '_data_buffer', '_dependencies', '_finished_streams',
380                    '_queued_dirents', '_queued_trees']
381
382     def __init__(self, api_client=None, **kwargs):
383         self._dependencies = {}
384         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
385
386     @classmethod
387     def from_state(cls, state, *init_args, **init_kwargs):
388         # Try to build a new writer from scratch with the given state.
389         # If the state is not suitable to resume (because files have changed,
390         # been deleted, aren't predictable, etc.), raise a
391         # StaleWriterStateError.  Otherwise, return the initialized writer.
392         # The caller is responsible for calling writer.do_queued_work()
393         # appropriately after it's returned.
394         writer = cls(*init_args, **init_kwargs)
395         for attr_name in cls.STATE_PROPS:
396             attr_value = state[attr_name]
397             attr_class = getattr(writer, attr_name).__class__
398             # Coerce the value into the same type as the initial value, if
399             # needed.
400             if attr_class not in (type(None), attr_value.__class__):
401                 attr_value = attr_class(attr_value)
402             setattr(writer, attr_name, attr_value)
403         # Check dependencies before we try to resume anything.
404         if any(KeepLocator(ls).permission_expired()
405                for ls in writer._current_stream_locators):
406             raise errors.StaleWriterStateError(
407                 "locators include expired permission hint")
408         writer.check_dependencies()
409         if state['_current_file'] is not None:
410             path, pos = state['_current_file']
411             try:
412                 writer._queued_file = open(path, 'rb')
413                 writer._queued_file.seek(pos)
414             except IOError as error:
415                 raise errors.StaleWriterStateError(
416                     "failed to reopen active file {}: {}".format(path, error))
417         return writer
418
419     def check_dependencies(self):
420         for path, orig_stat in self._dependencies.items():
421             if not S_ISREG(orig_stat[ST_MODE]):
422                 raise errors.StaleWriterStateError("{} not file".format(path))
423             try:
424                 now_stat = tuple(os.stat(path))
425             except OSError as error:
426                 raise errors.StaleWriterStateError(
427                     "failed to stat {}: {}".format(path, error))
428             if ((not S_ISREG(now_stat[ST_MODE])) or
429                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431                 raise errors.StaleWriterStateError("{} changed".format(path))
432
433     def dump_state(self, copy_func=lambda x: x):
434         state = {attr: copy_func(getattr(self, attr))
435                  for attr in self.STATE_PROPS}
436         if self._queued_file is None:
437             state['_current_file'] = None
438         else:
439             state['_current_file'] = (os.path.realpath(self._queued_file.name),
440                                       self._queued_file.tell())
441         return state
442
443     def _queue_file(self, source, filename=None):
444         try:
445             src_path = os.path.realpath(source)
446         except Exception:
447             raise errors.AssertionError("{} not a file path".format(source))
448         try:
449             path_stat = os.stat(src_path)
450         except OSError as stat_error:
451             path_stat = None
452         super(ResumableCollectionWriter, self)._queue_file(source, filename)
453         fd_stat = os.fstat(self._queued_file.fileno())
454         if not S_ISREG(fd_stat.st_mode):
455             # We won't be able to resume from this cache anyway, so don't
456             # worry about further checks.
457             self._dependencies[source] = tuple(fd_stat)
458         elif path_stat is None:
459             raise errors.AssertionError(
460                 "could not stat {}: {}".format(source, stat_error))
461         elif path_stat.st_ino != fd_stat.st_ino:
462             raise errors.AssertionError(
463                 "{} changed between open and stat calls".format(source))
464         else:
465             self._dependencies[src_path] = tuple(fd_stat)
466
467     def write(self, data):
468         if self._queued_file is None:
469             raise errors.AssertionError(
470                 "resumable writer can't accept unsourced data")
471         return super(ResumableCollectionWriter, self).write(data)
472
473
474 ADD = "add"
475 DEL = "del"
476 MOD = "mod"
477 FILE = "file"
478 COLLECTION = "collection"
479
480 class RichCollectionBase(CollectionBase):
481     """Base class for Collections and Subcollections.
482
483     Implements the majority of functionality relating to accessing items in the
484     Collection.
485
486     """
487
488     def __init__(self, parent=None):
489         self.parent = parent
490         self._committed = False
491         self._callback = None
492         self._items = {}
493
494     def _my_api(self):
495         raise NotImplementedError()
496
497     def _my_keep(self):
498         raise NotImplementedError()
499
500     def _my_block_manager(self):
501         raise NotImplementedError()
502
503     def writable(self):
504         raise NotImplementedError()
505
506     def root_collection(self):
507         raise NotImplementedError()
508
509     def notify(self, event, collection, name, item):
510         raise NotImplementedError()
511
512     def stream_name(self):
513         raise NotImplementedError()
514
515     @must_be_writable
516     @synchronized
517     def find_or_create(self, path, create_type):
518         """Recursively search the specified file path.
519
520         May return either a `Collection` or `ArvadosFile`.  If not found, will
521         create a new item at the specified path based on `create_type`.  Will
522         create intermediate subcollections needed to contain the final item in
523         the path.
524
525         :create_type:
526           One of `arvados.collection.FILE` or
527           `arvados.collection.COLLECTION`.  If the path is not found, and value
528           of create_type is FILE then create and return a new ArvadosFile for
529           the last path component.  If COLLECTION, then create and return a new
530           Collection for the last path component.
531
532         """
533
534         pathcomponents = path.split("/", 1)
535         if pathcomponents[0]:
536             item = self._items.get(pathcomponents[0])
537             if len(pathcomponents) == 1:
538                 if item is None:
539                     # create new file
540                     if create_type == COLLECTION:
541                         item = Subcollection(self, pathcomponents[0])
542                     else:
543                         item = ArvadosFile(self, pathcomponents[0])
544                     self._items[pathcomponents[0]] = item
545                     self._committed = False
546                     self.notify(ADD, self, pathcomponents[0], item)
547                 return item
548             else:
549                 if item is None:
550                     # create new collection
551                     item = Subcollection(self, pathcomponents[0])
552                     self._items[pathcomponents[0]] = item
553                     self._committed = False
554                     self.notify(ADD, self, pathcomponents[0], item)
555                 if isinstance(item, RichCollectionBase):
556                     return item.find_or_create(pathcomponents[1], create_type)
557                 else:
558                     raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
559         else:
560             return self
561
562     @synchronized
563     def find(self, path):
564         """Recursively search the specified file path.
565
566         May return either a Collection or ArvadosFile.  Return None if not
567         found.
568
569         """
570         if not path:
571             raise errors.ArgumentError("Parameter 'path' is empty.")
572
573         pathcomponents = path.split("/", 1)
574         item = self._items.get(pathcomponents[0])
575         if len(pathcomponents) == 1:
576             return item
577         else:
578             if isinstance(item, RichCollectionBase):
579                 if pathcomponents[1]:
580                     return item.find(pathcomponents[1])
581                 else:
582                     return item
583             else:
584                 raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
585
586     @synchronized
587     def mkdirs(self, path):
588         """Recursive subcollection create.
589
590         Like `os.makedirs()`.  Will create intermediate subcollections needed
591         to contain the leaf subcollection path.
592
593         """
594
595         if self.find(path) != None:
596             raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
597
598         return self.find_or_create(path, COLLECTION)
599
600     def open(self, path, mode="r"):
601         """Open a file-like object for access.
602
603         :path:
604           path to a file in the collection
605         :mode:
606           one of "r", "r+", "w", "w+", "a", "a+"
607           :"r":
608             opens for reading
609           :"r+":
610             opens for reading and writing.  Reads/writes share a file pointer.
611           :"w", "w+":
612             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
613           :"a", "a+":
614             opens for reading and writing.  All writes are appended to
615             the end of the file.  Writing does not affect the file pointer for
616             reading.
617         """
618         mode = mode.replace("b", "")
619         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
620             raise errors.ArgumentError("Bad mode '%s'" % mode)
621         create = (mode != "r")
622
623         if create and not self.writable():
624             raise IOError(errno.EROFS, "Collection is read only")
625
626         if create:
627             arvfile = self.find_or_create(path, FILE)
628         else:
629             arvfile = self.find(path)
630
631         if arvfile is None:
632             raise IOError(errno.ENOENT, "File not found")
633         if not isinstance(arvfile, ArvadosFile):
634             raise IOError(errno.EISDIR, "Is a directory: %s" % path)
635
636         if mode[0] == "w":
637             arvfile.truncate(0)
638
639         name = os.path.basename(path)
640
641         if mode == "r":
642             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
643         else:
644             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
645
646     def modified(self):
647         """Determine if the collection has been modified since last commited."""
648         return not self.committed()
649
650     @synchronized
651     def committed(self):
652         """Determine if the collection has been committed to the API server."""
653
654         if self._committed is False:
655             return False
656         for v in self._items.values():
657             if v.committed() is False:
658                 return False
659         return True
660
661     @synchronized
662     def set_committed(self):
663         """Recursively set committed flag to True."""
664         self._committed = True
665         for k,v in self._items.items():
666             v.set_committed()
667
668     @synchronized
669     def __iter__(self):
670         """Iterate over names of files and collections contained in this collection."""
671         return iter(self._items.keys())
672
673     @synchronized
674     def __getitem__(self, k):
675         """Get a file or collection that is directly contained by this collection.
676
677         If you want to search a path, use `find()` instead.
678
679         """
680         return self._items[k]
681
682     @synchronized
683     def __contains__(self, k):
684         """Test if there is a file or collection a directly contained by this collection."""
685         return k in self._items
686
687     @synchronized
688     def __len__(self):
689         """Get the number of items directly contained in this collection."""
690         return len(self._items)
691
692     @must_be_writable
693     @synchronized
694     def __delitem__(self, p):
695         """Delete an item by name which is directly contained by this collection."""
696         del self._items[p]
697         self._committed = False
698         self.notify(DEL, self, p, None)
699
700     @synchronized
701     def keys(self):
702         """Get a list of names of files and collections directly contained in this collection."""
703         return self._items.keys()
704
705     @synchronized
706     def values(self):
707         """Get a list of files and collection objects directly contained in this collection."""
708         return self._items.values()
709
710     @synchronized
711     def items(self):
712         """Get a list of (name, object) tuples directly contained in this collection."""
713         return self._items.items()
714
715     def exists(self, path):
716         """Test if there is a file or collection at `path`."""
717         return self.find(path) is not None
718
719     @must_be_writable
720     @synchronized
721     def remove(self, path, recursive=False):
722         """Remove the file or subcollection (directory) at `path`.
723
724         :recursive:
725           Specify whether to remove non-empty subcollections (True), or raise an error (False).
726         """
727
728         if not path:
729             raise errors.ArgumentError("Parameter 'path' is empty.")
730
731         pathcomponents = path.split("/", 1)
732         item = self._items.get(pathcomponents[0])
733         if item is None:
734             raise IOError(errno.ENOENT, "File not found")
735         if len(pathcomponents) == 1:
736             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
737                 raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
738             deleteditem = self._items[pathcomponents[0]]
739             del self._items[pathcomponents[0]]
740             self._committed = False
741             self.notify(DEL, self, pathcomponents[0], deleteditem)
742         else:
743             item.remove(pathcomponents[1])
744
745     def _clonefrom(self, source):
746         for k,v in source.items():
747             self._items[k] = v.clone(self, k)
748
749     def clone(self):
750         raise NotImplementedError()
751
752     @must_be_writable
753     @synchronized
754     def add(self, source_obj, target_name, overwrite=False, reparent=False):
755         """Copy or move a file or subcollection to this collection.
756
757         :source_obj:
758           An ArvadosFile, or Subcollection object
759
760         :target_name:
761           Destination item name.  If the target name already exists and is a
762           file, this will raise an error unless you specify `overwrite=True`.
763
764         :overwrite:
765           Whether to overwrite target file if it already exists.
766
767         :reparent:
768           If True, source_obj will be moved from its parent collection to this collection.
769           If False, source_obj will be copied and the parent collection will be
770           unmodified.
771
772         """
773
774         if target_name in self and not overwrite:
775             raise IOError(errno.EEXIST, "File already exists")
776
777         modified_from = None
778         if target_name in self:
779             modified_from = self[target_name]
780
781         # Actually make the move or copy.
782         if reparent:
783             source_obj._reparent(self, target_name)
784             item = source_obj
785         else:
786             item = source_obj.clone(self, target_name)
787
788         self._items[target_name] = item
789         self._committed = False
790
791         if modified_from:
792             self.notify(MOD, self, target_name, (modified_from, item))
793         else:
794             self.notify(ADD, self, target_name, item)
795
796     def _get_src_target(self, source, target_path, source_collection, create_dest):
797         if source_collection is None:
798             source_collection = self
799
800         # Find the object
801         if isinstance(source, basestring):
802             source_obj = source_collection.find(source)
803             if source_obj is None:
804                 raise IOError(errno.ENOENT, "File not found")
805             sourcecomponents = source.split("/")
806         else:
807             source_obj = source
808             sourcecomponents = None
809
810         # Find parent collection the target path
811         targetcomponents = target_path.split("/")
812
813         # Determine the name to use.
814         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
815
816         if not target_name:
817             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
818
819         if create_dest:
820             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
821         else:
822             if len(targetcomponents) > 1:
823                 target_dir = self.find("/".join(targetcomponents[0:-1]))
824             else:
825                 target_dir = self
826
827         if target_dir is None:
828             raise IOError(errno.ENOENT, "Target directory not found.")
829
830         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
831             target_dir = target_dir[target_name]
832             target_name = sourcecomponents[-1]
833
834         return (source_obj, target_dir, target_name)
835
836     @must_be_writable
837     @synchronized
838     def copy(self, source, target_path, source_collection=None, overwrite=False):
839         """Copy a file or subcollection to a new path in this collection.
840
841         :source:
842           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
843
844         :target_path:
845           Destination file or path.  If the target path already exists and is a
846           subcollection, the item will be placed inside the subcollection.  If
847           the target path already exists and is a file, this will raise an error
848           unless you specify `overwrite=True`.
849
850         :source_collection:
851           Collection to copy `source_path` from (default `self`)
852
853         :overwrite:
854           Whether to overwrite target file if it already exists.
855         """
856
857         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
858         target_dir.add(source_obj, target_name, overwrite, False)
859
860     @must_be_writable
861     @synchronized
862     def rename(self, source, target_path, source_collection=None, overwrite=False):
863         """Move a file or subcollection from `source_collection` to a new path in this collection.
864
865         :source:
866           A string with a path to source file or subcollection.
867
868         :target_path:
869           Destination file or path.  If the target path already exists and is a
870           subcollection, the item will be placed inside the subcollection.  If
871           the target path already exists and is a file, this will raise an error
872           unless you specify `overwrite=True`.
873
874         :source_collection:
875           Collection to copy `source_path` from (default `self`)
876
877         :overwrite:
878           Whether to overwrite target file if it already exists.
879         """
880
881         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
882         if not source_obj.writable():
883             raise IOError(errno.EROFS, "Source collection is read only.")
884         target_dir.add(source_obj, target_name, overwrite, True)
885
886     def portable_manifest_text(self, stream_name="."):
887         """Get the manifest text for this collection, sub collections and files.
888
889         This method does not flush outstanding blocks to Keep.  It will return
890         a normalized manifest with access tokens stripped.
891
892         :stream_name:
893           Name to use for this stream (directory)
894
895         """
896         return self._get_manifest_text(stream_name, True, True)
897
898     @synchronized
899     def manifest_text(self, stream_name=".", strip=False, normalize=False):
900         """Get the manifest text for this collection, sub collections and files.
901
902         This method will flush outstanding blocks to Keep.  By default, it will
903         not normalize an unmodified manifest or strip access tokens.
904
905         :stream_name:
906           Name to use for this stream (directory)
907
908         :strip:
909           If True, remove signing tokens from block locators if present.
910           If False (default), block locators are left unchanged.
911
912         :normalize:
913           If True, always export the manifest text in normalized form
914           even if the Collection is not modified.  If False (default) and the collection
915           is not modified, return the original manifest text even if it is not
916           in normalized form.
917
918         """
919
920         self._my_block_manager().commit_all()
921         return self._get_manifest_text(stream_name, strip, normalize)
922
923     @synchronized
924     def _get_manifest_text(self, stream_name, strip, normalize):
925         """Get the manifest text for this collection, sub collections and files.
926
927         :stream_name:
928           Name to use for this stream (directory)
929
930         :strip:
931           If True, remove signing tokens from block locators if present.
932           If False (default), block locators are left unchanged.
933
934         :normalize:
935           If True, always export the manifest text in normalized form
936           even if the Collection is not modified.  If False (default) and the collection
937           is not modified, return the original manifest text even if it is not
938           in normalized form.
939
940         """
941
942         if not self.committed() or self._manifest_text is None or normalize:
943             stream = {}
944             buf = []
945             sorted_keys = sorted(self.keys())
946             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
947                 # Create a stream per file `k`
948                 arvfile = self[filename]
949                 filestream = []
950                 for segment in arvfile.segments():
951                     loc = segment.locator
952                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
953                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
954                     if strip:
955                         loc = KeepLocator(loc).stripped()
956                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
957                                          segment.segment_offset, segment.range_size))
958                 stream[filename] = filestream
959             if stream:
960                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
961             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
962                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
963             return "".join(buf)
964         else:
965             if strip:
966                 return self.stripped_manifest()
967             else:
968                 return self._manifest_text
969
970     @synchronized
971     def diff(self, end_collection, prefix=".", holding_collection=None):
972         """Generate list of add/modify/delete actions.
973
974         When given to `apply`, will change `self` to match `end_collection`
975
976         """
977         changes = []
978         if holding_collection is None:
979             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
980         for k in self:
981             if k not in end_collection:
982                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
983         for k in end_collection:
984             if k in self:
985                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
986                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
987                 elif end_collection[k] != self[k]:
988                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
989             else:
990                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
991         return changes
992
993     @must_be_writable
994     @synchronized
995     def apply(self, changes):
996         """Apply changes from `diff`.
997
998         If a change conflicts with a local change, it will be saved to an
999         alternate path indicating the conflict.
1000
1001         """
1002         if changes:
1003             self._committed = False
1004         for change in changes:
1005             event_type = change[0]
1006             path = change[1]
1007             initial = change[2]
1008             local = self.find(path)
1009             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1010                                                                     time.gmtime()))
1011             if event_type == ADD:
1012                 if local is None:
1013                     # No local file at path, safe to copy over new file
1014                     self.copy(initial, path)
1015                 elif local is not None and local != initial:
1016                     # There is already local file and it is different:
1017                     # save change to conflict file.
1018                     self.copy(initial, conflictpath)
1019             elif event_type == MOD:
1020                 final = change[3]
1021                 if local == initial:
1022                     # Local matches the "initial" item so it has not
1023                     # changed locally and is safe to update.
1024                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1025                         # Replace contents of local file with new contents
1026                         local.replace_contents(final)
1027                     else:
1028                         # Overwrite path with new item; this can happen if
1029                         # path was a file and is now a collection or vice versa
1030                         self.copy(final, path, overwrite=True)
1031                 else:
1032                     # Local is missing (presumably deleted) or local doesn't
1033                     # match the "start" value, so save change to conflict file
1034                     self.copy(final, conflictpath)
1035             elif event_type == DEL:
1036                 if local == initial:
1037                     # Local item matches "initial" value, so it is safe to remove.
1038                     self.remove(path, recursive=True)
1039                 # else, the file is modified or already removed, in either
1040                 # case we don't want to try to remove it.
1041
1042     def portable_data_hash(self):
1043         """Get the portable data hash for this collection's manifest."""
1044         stripped = self.portable_manifest_text()
1045         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1046
1047     @synchronized
1048     def subscribe(self, callback):
1049         if self._callback is None:
1050             self._callback = callback
1051         else:
1052             raise errors.ArgumentError("A callback is already set on this collection.")
1053
1054     @synchronized
1055     def unsubscribe(self):
1056         if self._callback is not None:
1057             self._callback = None
1058
1059     @synchronized
1060     def notify(self, event, collection, name, item):
1061         if self._callback:
1062             self._callback(event, collection, name, item)
1063         self.root_collection().notify(event, collection, name, item)
1064
1065     @synchronized
1066     def __eq__(self, other):
1067         if other is self:
1068             return True
1069         if not isinstance(other, RichCollectionBase):
1070             return False
1071         if len(self._items) != len(other):
1072             return False
1073         for k in self._items:
1074             if k not in other:
1075                 return False
1076             if self._items[k] != other[k]:
1077                 return False
1078         return True
1079
1080     def __ne__(self, other):
1081         return not self.__eq__(other)
1082
1083     @synchronized
1084     def flush(self):
1085         """Flush bufferblocks to Keep."""
1086         for e in self.values():
1087             e.flush()
1088
1089
1090 class Collection(RichCollectionBase):
1091     """Represents the root of an Arvados Collection.
1092
1093     This class is threadsafe.  The root collection object, all subcollections
1094     and files are protected by a single lock (i.e. each access locks the entire
1095     collection).
1096
1097     Brief summary of
1098     useful methods:
1099
1100     :To read an existing file:
1101       `c.open("myfile", "r")`
1102
1103     :To write a new file:
1104       `c.open("myfile", "w")`
1105
1106     :To determine if a file exists:
1107       `c.find("myfile") is not None`
1108
1109     :To copy a file:
1110       `c.copy("source", "dest")`
1111
1112     :To delete a file:
1113       `c.remove("myfile")`
1114
1115     :To save to an existing collection record:
1116       `c.save()`
1117
1118     :To save a new collection record:
1119     `c.save_new()`
1120
1121     :To merge remote changes into this object:
1122       `c.update()`
1123
1124     Must be associated with an API server Collection record (during
1125     initialization, or using `save_new`) to use `save` or `update`
1126
1127     """
1128
1129     def __init__(self, manifest_locator_or_text=None,
1130                  api_client=None,
1131                  keep_client=None,
1132                  num_retries=None,
1133                  parent=None,
1134                  apiconfig=None,
1135                  block_manager=None):
1136         """Collection constructor.
1137
1138         :manifest_locator_or_text:
1139           One of Arvados collection UUID, block locator of
1140           a manifest, raw manifest text, or None (to create an empty collection).
1141         :parent:
1142           the parent Collection, may be None.
1143         :apiconfig:
1144           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1145           Prefer this over supplying your own api_client and keep_client (except in testing).
1146           Will use default config settings if not specified.
1147         :api_client:
1148           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1149         :keep_client:
1150           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1151         :num_retries:
1152           the number of retries for API and Keep requests.
1153         :block_manager:
1154           the block manager to use.  If not specified, create one.
1155
1156         """
1157         super(Collection, self).__init__(parent)
1158         self._api_client = api_client
1159         self._keep_client = keep_client
1160         self._block_manager = block_manager
1161
1162         if apiconfig:
1163             self._config = apiconfig
1164         else:
1165             self._config = config.settings()
1166
1167         self.num_retries = num_retries if num_retries is not None else 0
1168         self._manifest_locator = None
1169         self._manifest_text = None
1170         self._api_response = None
1171         self._past_versions = set()
1172
1173         self.lock = threading.RLock()
1174         self.events = None
1175
1176         if manifest_locator_or_text:
1177             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1178                 self._manifest_locator = manifest_locator_or_text
1179             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1180                 self._manifest_locator = manifest_locator_or_text
1181             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1182                 self._manifest_text = manifest_locator_or_text
1183             else:
1184                 raise errors.ArgumentError(
1185                     "Argument to CollectionReader is not a manifest or a collection UUID")
1186
1187             try:
1188                 self._populate()
1189             except (IOError, errors.SyntaxError) as e:
1190                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1191
1192     def root_collection(self):
1193         return self
1194
1195     def stream_name(self):
1196         return "."
1197
1198     def writable(self):
1199         return True
1200
1201     @synchronized
1202     def known_past_version(self, modified_at_and_portable_data_hash):
1203         return modified_at_and_portable_data_hash in self._past_versions
1204
1205     @synchronized
1206     @retry_method
1207     def update(self, other=None, num_retries=None):
1208         """Merge the latest collection on the API server with the current collection."""
1209
1210         if other is None:
1211             if self._manifest_locator is None:
1212                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1213             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1214             if self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))):
1215                 # We've merged this record this before.  Don't do anything.
1216                 return
1217             else:
1218                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1219             other = CollectionReader(response["manifest_text"])
1220         baseline = CollectionReader(self._manifest_text)
1221         self.apply(baseline.diff(other))
1222         self._manifest_text = self.manifest_text()
1223
1224     @synchronized
1225     def _my_api(self):
1226         if self._api_client is None:
1227             self._api_client = ThreadSafeApiCache(self._config)
1228             self._keep_client = self._api_client.keep
1229         return self._api_client
1230
1231     @synchronized
1232     def _my_keep(self):
1233         if self._keep_client is None:
1234             if self._api_client is None:
1235                 self._my_api()
1236             else:
1237                 self._keep_client = KeepClient(api_client=self._api_client)
1238         return self._keep_client
1239
1240     @synchronized
1241     def _my_block_manager(self):
1242         if self._block_manager is None:
1243             self._block_manager = _BlockManager(self._my_keep())
1244         return self._block_manager
1245
1246     def _remember_api_response(self, response):
1247         self._api_response = response
1248         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1249
1250     def _populate_from_api_server(self):
1251         # As in KeepClient itself, we must wait until the last
1252         # possible moment to instantiate an API client, in order to
1253         # avoid tripping up clients that don't have access to an API
1254         # server.  If we do build one, make sure our Keep client uses
1255         # it.  If instantiation fails, we'll fall back to the except
1256         # clause, just like any other Collection lookup
1257         # failure. Return an exception, or None if successful.
1258         try:
1259             self._remember_api_response(self._my_api().collections().get(
1260                 uuid=self._manifest_locator).execute(
1261                     num_retries=self.num_retries))
1262             self._manifest_text = self._api_response['manifest_text']
1263             return None
1264         except Exception as e:
1265             return e
1266
1267     def _populate_from_keep(self):
1268         # Retrieve a manifest directly from Keep. This has a chance of
1269         # working if [a] the locator includes a permission signature
1270         # or [b] the Keep services are operating in world-readable
1271         # mode. Return an exception, or None if successful.
1272         try:
1273             self._manifest_text = self._my_keep().get(
1274                 self._manifest_locator, num_retries=self.num_retries)
1275         except Exception as e:
1276             return e
1277
1278     def _populate(self):
1279         if self._manifest_locator is None and self._manifest_text is None:
1280             return
1281         error_via_api = None
1282         error_via_keep = None
1283         should_try_keep = ((self._manifest_text is None) and
1284                            util.keep_locator_pattern.match(
1285                                self._manifest_locator))
1286         if ((self._manifest_text is None) and
1287             util.signed_locator_pattern.match(self._manifest_locator)):
1288             error_via_keep = self._populate_from_keep()
1289         if self._manifest_text is None:
1290             error_via_api = self._populate_from_api_server()
1291             if error_via_api is not None and not should_try_keep:
1292                 raise error_via_api
1293         if ((self._manifest_text is None) and
1294             not error_via_keep and
1295             should_try_keep):
1296             # Looks like a keep locator, and we didn't already try keep above
1297             error_via_keep = self._populate_from_keep()
1298         if self._manifest_text is None:
1299             # Nothing worked!
1300             raise errors.NotFoundError(
1301                 ("Failed to retrieve collection '{}' " +
1302                  "from either API server ({}) or Keep ({})."
1303                  ).format(
1304                     self._manifest_locator,
1305                     error_via_api,
1306                     error_via_keep))
1307         # populate
1308         self._baseline_manifest = self._manifest_text
1309         self._import_manifest(self._manifest_text)
1310
1311
1312     def _has_collection_uuid(self):
1313         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1314
1315     def __enter__(self):
1316         return self
1317
1318     def __exit__(self, exc_type, exc_value, traceback):
1319         """Support scoped auto-commit in a with: block."""
1320         if exc_type is None:
1321             if self.writable() and self._has_collection_uuid():
1322                 self.save()
1323         self.stop_threads()
1324
1325     def stop_threads(self):
1326         if self._block_manager is not None:
1327             self._block_manager.stop_threads()
1328
1329     @synchronized
1330     def manifest_locator(self):
1331         """Get the manifest locator, if any.
1332
1333         The manifest locator will be set when the collection is loaded from an
1334         API server record or the portable data hash of a manifest.
1335
1336         The manifest locator will be None if the collection is newly created or
1337         was created directly from manifest text.  The method `save_new()` will
1338         assign a manifest locator.
1339
1340         """
1341         return self._manifest_locator
1342
1343     @synchronized
1344     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1345         if new_config is None:
1346             new_config = self._config
1347         if readonly:
1348             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1349         else:
1350             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1351
1352         newcollection._clonefrom(self)
1353         return newcollection
1354
1355     @synchronized
1356     def api_response(self):
1357         """Returns information about this Collection fetched from the API server.
1358
1359         If the Collection exists in Keep but not the API server, currently
1360         returns None.  Future versions may provide a synthetic response.
1361
1362         """
1363         return self._api_response
1364
1365     def find_or_create(self, path, create_type):
1366         """See `RichCollectionBase.find_or_create`"""
1367         if path == ".":
1368             return self
1369         else:
1370             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1371
1372     def find(self, path):
1373         """See `RichCollectionBase.find`"""
1374         if path == ".":
1375             return self
1376         else:
1377             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1378
1379     def remove(self, path, recursive=False):
1380         """See `RichCollectionBase.remove`"""
1381         if path == ".":
1382             raise errors.ArgumentError("Cannot remove '.'")
1383         else:
1384             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1385
1386     @must_be_writable
1387     @synchronized
1388     @retry_method
1389     def save(self, merge=True, num_retries=None):
1390         """Save collection to an existing collection record.
1391
1392         Commit pending buffer blocks to Keep, merge with remote record (if
1393         merge=True, the default), and update the collection record.  Returns
1394         the current manifest text.
1395
1396         Will raise AssertionError if not associated with a collection record on
1397         the API server.  If you want to save a manifest to Keep only, see
1398         `save_new()`.
1399
1400         :merge:
1401           Update and merge remote changes before saving.  Otherwise, any
1402           remote changes will be ignored and overwritten.
1403
1404         :num_retries:
1405           Retry count on API calls (if None,  use the collection default)
1406
1407         """
1408         if not self.committed():
1409             if not self._has_collection_uuid():
1410                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1411
1412             self._my_block_manager().commit_all()
1413
1414             if merge:
1415                 self.update()
1416
1417             text = self.manifest_text(strip=False)
1418             self._remember_api_response(self._my_api().collections().update(
1419                 uuid=self._manifest_locator,
1420                 body={'manifest_text': text}
1421                 ).execute(
1422                     num_retries=num_retries))
1423             self._manifest_text = self._api_response["manifest_text"]
1424             self.set_committed()
1425
1426         return self._manifest_text
1427
1428
1429     @must_be_writable
1430     @synchronized
1431     @retry_method
1432     def save_new(self, name=None,
1433                  create_collection_record=True,
1434                  owner_uuid=None,
1435                  ensure_unique_name=False,
1436                  num_retries=None):
1437         """Save collection to a new collection record.
1438
1439         Commit pending buffer blocks to Keep and, when create_collection_record
1440         is True (default), create a new collection record.  After creating a
1441         new collection record, this Collection object will be associated with
1442         the new record used by `save()`.  Returns the current manifest text.
1443
1444         :name:
1445           The collection name.
1446
1447         :create_collection_record:
1448            If True, create a collection record on the API server.
1449            If False, only commit blocks to Keep and return the manifest text.
1450
1451         :owner_uuid:
1452           the user, or project uuid that will own this collection.
1453           If None, defaults to the current user.
1454
1455         :ensure_unique_name:
1456           If True, ask the API server to rename the collection
1457           if it conflicts with a collection with the same name and owner.  If
1458           False, a name conflict will result in an error.
1459
1460         :num_retries:
1461           Retry count on API calls (if None,  use the collection default)
1462
1463         """
1464         self._my_block_manager().commit_all()
1465         text = self.manifest_text(strip=False)
1466
1467         if create_collection_record:
1468             if name is None:
1469                 name = "New collection"
1470                 ensure_unique_name = True
1471
1472             body = {"manifest_text": text,
1473                     "name": name}
1474             if owner_uuid:
1475                 body["owner_uuid"] = owner_uuid
1476
1477             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1478             text = self._api_response["manifest_text"]
1479
1480             self._manifest_locator = self._api_response["uuid"]
1481
1482             self._manifest_text = text
1483             self.set_committed()
1484
1485         return text
1486
1487     @synchronized
1488     def _import_manifest(self, manifest_text):
1489         """Import a manifest into a `Collection`.
1490
1491         :manifest_text:
1492           The manifest text to import from.
1493
1494         """
1495         if len(self) > 0:
1496             raise ArgumentError("Can only import manifest into an empty collection")
1497
1498         STREAM_NAME = 0
1499         BLOCKS = 1
1500         SEGMENTS = 2
1501
1502         stream_name = None
1503         state = STREAM_NAME
1504
1505         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1506             tok = token_and_separator.group(1)
1507             sep = token_and_separator.group(2)
1508
1509             if state == STREAM_NAME:
1510                 # starting a new stream
1511                 stream_name = tok.replace('\\040', ' ')
1512                 blocks = []
1513                 segments = []
1514                 streamoffset = 0L
1515                 state = BLOCKS
1516                 self.find_or_create(stream_name, COLLECTION)
1517                 continue
1518
1519             if state == BLOCKS:
1520                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1521                 if block_locator:
1522                     blocksize = long(block_locator.group(1))
1523                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1524                     streamoffset += blocksize
1525                 else:
1526                     state = SEGMENTS
1527
1528             if state == SEGMENTS:
1529                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1530                 if file_segment:
1531                     pos = long(file_segment.group(1))
1532                     size = long(file_segment.group(2))
1533                     name = file_segment.group(3).replace('\\040', ' ')
1534                     filepath = os.path.join(stream_name, name)
1535                     afile = self.find_or_create(filepath, FILE)
1536                     if isinstance(afile, ArvadosFile):
1537                         afile.add_segment(blocks, pos, size)
1538                     else:
1539                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1540                 else:
1541                     # error!
1542                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1543
1544             if sep == "\n":
1545                 stream_name = None
1546                 state = STREAM_NAME
1547
1548         self.set_committed()
1549
1550     @synchronized
1551     def notify(self, event, collection, name, item):
1552         if self._callback:
1553             self._callback(event, collection, name, item)
1554
1555
1556 class Subcollection(RichCollectionBase):
1557     """This is a subdirectory within a collection that doesn't have its own API
1558     server record.
1559
1560     Subcollection locking falls under the umbrella lock of its root collection.
1561
1562     """
1563
1564     def __init__(self, parent, name):
1565         super(Subcollection, self).__init__(parent)
1566         self.lock = self.root_collection().lock
1567         self._manifest_text = None
1568         self.name = name
1569         self.num_retries = parent.num_retries
1570
1571     def root_collection(self):
1572         return self.parent.root_collection()
1573
1574     def writable(self):
1575         return self.root_collection().writable()
1576
1577     def _my_api(self):
1578         return self.root_collection()._my_api()
1579
1580     def _my_keep(self):
1581         return self.root_collection()._my_keep()
1582
1583     def _my_block_manager(self):
1584         return self.root_collection()._my_block_manager()
1585
1586     def stream_name(self):
1587         return os.path.join(self.parent.stream_name(), self.name)
1588
1589     @synchronized
1590     def clone(self, new_parent, new_name):
1591         c = Subcollection(new_parent, new_name)
1592         c._clonefrom(self)
1593         return c
1594
1595     @must_be_writable
1596     @synchronized
1597     def _reparent(self, newparent, newname):
1598         self._committed = False
1599         self.flush()
1600         self.parent.remove(self.name, recursive=True)
1601         self.parent = newparent
1602         self.name = newname
1603         self.lock = self.parent.root_collection().lock
1604
1605
1606 class CollectionReader(Collection):
1607     """A read-only collection object.
1608
1609     Initialize from an api collection record locator, a portable data hash of a
1610     manifest, or raw manifest text.  See `Collection` constructor for detailed
1611     options.
1612
1613     """
1614     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1615         self._in_init = True
1616         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1617         self._in_init = False
1618
1619         # Forego any locking since it should never change once initialized.
1620         self.lock = NoopLock()
1621
1622         # Backwards compatability with old CollectionReader
1623         # all_streams() and all_files()
1624         self._streams = None
1625
1626     def writable(self):
1627         return self._in_init
1628
1629     def _populate_streams(orig_func):
1630         @functools.wraps(orig_func)
1631         def populate_streams_wrapper(self, *args, **kwargs):
1632             # Defer populating self._streams until needed since it creates a copy of the manifest.
1633             if self._streams is None:
1634                 if self._manifest_text:
1635                     self._streams = [sline.split()
1636                                      for sline in self._manifest_text.split("\n")
1637                                      if sline]
1638                 else:
1639                     self._streams = []
1640             return orig_func(self, *args, **kwargs)
1641         return populate_streams_wrapper
1642
1643     @_populate_streams
1644     def normalize(self):
1645         """Normalize the streams returned by `all_streams`.
1646
1647         This method is kept for backwards compatability and only affects the
1648         behavior of `all_streams()` and `all_files()`
1649
1650         """
1651
1652         # Rearrange streams
1653         streams = {}
1654         for s in self.all_streams():
1655             for f in s.all_files():
1656                 streamname, filename = split(s.name() + "/" + f.name())
1657                 if streamname not in streams:
1658                     streams[streamname] = {}
1659                 if filename not in streams[streamname]:
1660                     streams[streamname][filename] = []
1661                 for r in f.segments:
1662                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1663
1664         self._streams = [normalize_stream(s, streams[s])
1665                          for s in sorted(streams)]
1666     @_populate_streams
1667     def all_streams(self):
1668         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1669                 for s in self._streams]
1670
1671     @_populate_streams
1672     def all_files(self):
1673         for s in self.all_streams():
1674             for f in s.all_files():
1675                 yield f