10671: Merge branch 'master' into 10671-pipeline-instance-finish-time
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace: '%s'" %
311                 (newstreamname))
312         self._current_stream_name = '.' if newstreamname=='' else newstreamname
313
314     def current_stream_name(self):
315         return self._current_stream_name
316
317     def finish_current_stream(self):
318         self.finish_current_file()
319         self.flush_data()
320         if not self._current_stream_files:
321             pass
322         elif self._current_stream_name is None:
323             raise errors.AssertionError(
324                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
325                 (self._current_stream_length, len(self._current_stream_files)))
326         else:
327             if not self._current_stream_locators:
328                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
329             self._finished_streams.append([self._current_stream_name,
330                                            self._current_stream_locators,
331                                            self._current_stream_files])
332         self._current_stream_files = []
333         self._current_stream_length = 0
334         self._current_stream_locators = []
335         self._current_stream_name = None
336         self._current_file_pos = 0
337         self._current_file_name = None
338
339     def finish(self):
340         """Store the manifest in Keep and return its locator.
341
342         This is useful for storing manifest fragments (task outputs)
343         temporarily in Keep during a Crunch job.
344
345         In other cases you should make a collection instead, by
346         sending manifest_text() to the API server's "create
347         collection" endpoint.
348         """
349         return self._my_keep().put(self.manifest_text(), copies=self.replication)
350
351     def portable_data_hash(self):
352         stripped = self.stripped_manifest()
353         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354
355     def manifest_text(self):
356         self.finish_current_stream()
357         manifest = ''
358
359         for stream in self._finished_streams:
360             if not re.search(r'^\.(/.*)?$', stream[0]):
361                 manifest += './'
362             manifest += stream[0].replace(' ', '\\040')
363             manifest += ' ' + ' '.join(stream[1])
364             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
365             manifest += "\n"
366
367         return manifest
368
369     def data_locators(self):
370         ret = []
371         for name, locators, files in self._finished_streams:
372             ret += locators
373         return ret
374
375
376 class ResumableCollectionWriter(CollectionWriter):
377     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378                    '_current_stream_locators', '_current_stream_name',
379                    '_current_file_name', '_current_file_pos', '_close_file',
380                    '_data_buffer', '_dependencies', '_finished_streams',
381                    '_queued_dirents', '_queued_trees']
382
383     def __init__(self, api_client=None, **kwargs):
384         self._dependencies = {}
385         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
386
387     @classmethod
388     def from_state(cls, state, *init_args, **init_kwargs):
389         # Try to build a new writer from scratch with the given state.
390         # If the state is not suitable to resume (because files have changed,
391         # been deleted, aren't predictable, etc.), raise a
392         # StaleWriterStateError.  Otherwise, return the initialized writer.
393         # The caller is responsible for calling writer.do_queued_work()
394         # appropriately after it's returned.
395         writer = cls(*init_args, **init_kwargs)
396         for attr_name in cls.STATE_PROPS:
397             attr_value = state[attr_name]
398             attr_class = getattr(writer, attr_name).__class__
399             # Coerce the value into the same type as the initial value, if
400             # needed.
401             if attr_class not in (type(None), attr_value.__class__):
402                 attr_value = attr_class(attr_value)
403             setattr(writer, attr_name, attr_value)
404         # Check dependencies before we try to resume anything.
405         if any(KeepLocator(ls).permission_expired()
406                for ls in writer._current_stream_locators):
407             raise errors.StaleWriterStateError(
408                 "locators include expired permission hint")
409         writer.check_dependencies()
410         if state['_current_file'] is not None:
411             path, pos = state['_current_file']
412             try:
413                 writer._queued_file = open(path, 'rb')
414                 writer._queued_file.seek(pos)
415             except IOError as error:
416                 raise errors.StaleWriterStateError(
417                     "failed to reopen active file {}: {}".format(path, error))
418         return writer
419
420     def check_dependencies(self):
421         for path, orig_stat in self._dependencies.items():
422             if not S_ISREG(orig_stat[ST_MODE]):
423                 raise errors.StaleWriterStateError("{} not file".format(path))
424             try:
425                 now_stat = tuple(os.stat(path))
426             except OSError as error:
427                 raise errors.StaleWriterStateError(
428                     "failed to stat {}: {}".format(path, error))
429             if ((not S_ISREG(now_stat[ST_MODE])) or
430                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
431                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
432                 raise errors.StaleWriterStateError("{} changed".format(path))
433
434     def dump_state(self, copy_func=lambda x: x):
435         state = {attr: copy_func(getattr(self, attr))
436                  for attr in self.STATE_PROPS}
437         if self._queued_file is None:
438             state['_current_file'] = None
439         else:
440             state['_current_file'] = (os.path.realpath(self._queued_file.name),
441                                       self._queued_file.tell())
442         return state
443
444     def _queue_file(self, source, filename=None):
445         try:
446             src_path = os.path.realpath(source)
447         except Exception:
448             raise errors.AssertionError("{} not a file path".format(source))
449         try:
450             path_stat = os.stat(src_path)
451         except OSError as stat_error:
452             path_stat = None
453         super(ResumableCollectionWriter, self)._queue_file(source, filename)
454         fd_stat = os.fstat(self._queued_file.fileno())
455         if not S_ISREG(fd_stat.st_mode):
456             # We won't be able to resume from this cache anyway, so don't
457             # worry about further checks.
458             self._dependencies[source] = tuple(fd_stat)
459         elif path_stat is None:
460             raise errors.AssertionError(
461                 "could not stat {}: {}".format(source, stat_error))
462         elif path_stat.st_ino != fd_stat.st_ino:
463             raise errors.AssertionError(
464                 "{} changed between open and stat calls".format(source))
465         else:
466             self._dependencies[src_path] = tuple(fd_stat)
467
468     def write(self, data):
469         if self._queued_file is None:
470             raise errors.AssertionError(
471                 "resumable writer can't accept unsourced data")
472         return super(ResumableCollectionWriter, self).write(data)
473
474
475 ADD = "add"
476 DEL = "del"
477 MOD = "mod"
478 TOK = "tok"
479 FILE = "file"
480 COLLECTION = "collection"
481
482 class RichCollectionBase(CollectionBase):
483     """Base class for Collections and Subcollections.
484
485     Implements the majority of functionality relating to accessing items in the
486     Collection.
487
488     """
489
490     def __init__(self, parent=None):
491         self.parent = parent
492         self._committed = False
493         self._callback = None
494         self._items = {}
495
496     def _my_api(self):
497         raise NotImplementedError()
498
499     def _my_keep(self):
500         raise NotImplementedError()
501
502     def _my_block_manager(self):
503         raise NotImplementedError()
504
505     def writable(self):
506         raise NotImplementedError()
507
508     def root_collection(self):
509         raise NotImplementedError()
510
511     def notify(self, event, collection, name, item):
512         raise NotImplementedError()
513
514     def stream_name(self):
515         raise NotImplementedError()
516
517     @must_be_writable
518     @synchronized
519     def find_or_create(self, path, create_type):
520         """Recursively search the specified file path.
521
522         May return either a `Collection` or `ArvadosFile`.  If not found, will
523         create a new item at the specified path based on `create_type`.  Will
524         create intermediate subcollections needed to contain the final item in
525         the path.
526
527         :create_type:
528           One of `arvados.collection.FILE` or
529           `arvados.collection.COLLECTION`.  If the path is not found, and value
530           of create_type is FILE then create and return a new ArvadosFile for
531           the last path component.  If COLLECTION, then create and return a new
532           Collection for the last path component.
533
534         """
535
536         pathcomponents = path.split("/", 1)
537         if pathcomponents[0]:
538             item = self._items.get(pathcomponents[0])
539             if len(pathcomponents) == 1:
540                 if item is None:
541                     # create new file
542                     if create_type == COLLECTION:
543                         item = Subcollection(self, pathcomponents[0])
544                     else:
545                         item = ArvadosFile(self, pathcomponents[0])
546                     self._items[pathcomponents[0]] = item
547                     self._committed = False
548                     self.notify(ADD, self, pathcomponents[0], item)
549                 return item
550             else:
551                 if item is None:
552                     # create new collection
553                     item = Subcollection(self, pathcomponents[0])
554                     self._items[pathcomponents[0]] = item
555                     self._committed = False
556                     self.notify(ADD, self, pathcomponents[0], item)
557                 if isinstance(item, RichCollectionBase):
558                     return item.find_or_create(pathcomponents[1], create_type)
559                 else:
560                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
561         else:
562             return self
563
564     @synchronized
565     def find(self, path):
566         """Recursively search the specified file path.
567
568         May return either a Collection or ArvadosFile. Return None if not
569         found.
570         If path is invalid (ex: starts with '/'), an IOError exception will be
571         raised.
572
573         """
574         if not path:
575             raise errors.ArgumentError("Parameter 'path' is empty.")
576
577         pathcomponents = path.split("/", 1)
578         if pathcomponents[0] == '':
579             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
580
581         item = self._items.get(pathcomponents[0])
582         if item is None:
583             return None
584         elif len(pathcomponents) == 1:
585             return item
586         else:
587             if isinstance(item, RichCollectionBase):
588                 if pathcomponents[1]:
589                     return item.find(pathcomponents[1])
590                 else:
591                     return item
592             else:
593                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
594
595     @synchronized
596     def mkdirs(self, path):
597         """Recursive subcollection create.
598
599         Like `os.makedirs()`.  Will create intermediate subcollections needed
600         to contain the leaf subcollection path.
601
602         """
603
604         if self.find(path) != None:
605             raise IOError(errno.EEXIST, "Directory or file exists", path)
606
607         return self.find_or_create(path, COLLECTION)
608
609     def open(self, path, mode="r"):
610         """Open a file-like object for access.
611
612         :path:
613           path to a file in the collection
614         :mode:
615           one of "r", "r+", "w", "w+", "a", "a+"
616           :"r":
617             opens for reading
618           :"r+":
619             opens for reading and writing.  Reads/writes share a file pointer.
620           :"w", "w+":
621             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
622           :"a", "a+":
623             opens for reading and writing.  All writes are appended to
624             the end of the file.  Writing does not affect the file pointer for
625             reading.
626         """
627         mode = mode.replace("b", "")
628         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
629             raise errors.ArgumentError("Bad mode '%s'" % mode)
630         create = (mode != "r")
631
632         if create and not self.writable():
633             raise IOError(errno.EROFS, "Collection is read only")
634
635         if create:
636             arvfile = self.find_or_create(path, FILE)
637         else:
638             arvfile = self.find(path)
639
640         if arvfile is None:
641             raise IOError(errno.ENOENT, "File not found", path)
642         if not isinstance(arvfile, ArvadosFile):
643             raise IOError(errno.EISDIR, "Is a directory", path)
644
645         if mode[0] == "w":
646             arvfile.truncate(0)
647
648         name = os.path.basename(path)
649
650         if mode == "r":
651             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
652         else:
653             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
654
655     def modified(self):
656         """Determine if the collection has been modified since last commited."""
657         return not self.committed()
658
659     @synchronized
660     def committed(self):
661         """Determine if the collection has been committed to the API server."""
662
663         if self._committed is False:
664             return False
665         for v in self._items.values():
666             if v.committed() is False:
667                 return False
668         return True
669
670     @synchronized
671     def set_committed(self):
672         """Recursively set committed flag to True."""
673         self._committed = True
674         for k,v in self._items.items():
675             v.set_committed()
676
677     @synchronized
678     def __iter__(self):
679         """Iterate over names of files and collections contained in this collection."""
680         return iter(self._items.keys())
681
682     @synchronized
683     def __getitem__(self, k):
684         """Get a file or collection that is directly contained by this collection.
685
686         If you want to search a path, use `find()` instead.
687
688         """
689         return self._items[k]
690
691     @synchronized
692     def __contains__(self, k):
693         """Test if there is a file or collection a directly contained by this collection."""
694         return k in self._items
695
696     @synchronized
697     def __len__(self):
698         """Get the number of items directly contained in this collection."""
699         return len(self._items)
700
701     @must_be_writable
702     @synchronized
703     def __delitem__(self, p):
704         """Delete an item by name which is directly contained by this collection."""
705         del self._items[p]
706         self._committed = False
707         self.notify(DEL, self, p, None)
708
709     @synchronized
710     def keys(self):
711         """Get a list of names of files and collections directly contained in this collection."""
712         return self._items.keys()
713
714     @synchronized
715     def values(self):
716         """Get a list of files and collection objects directly contained in this collection."""
717         return self._items.values()
718
719     @synchronized
720     def items(self):
721         """Get a list of (name, object) tuples directly contained in this collection."""
722         return self._items.items()
723
724     def exists(self, path):
725         """Test if there is a file or collection at `path`."""
726         return self.find(path) is not None
727
728     @must_be_writable
729     @synchronized
730     def remove(self, path, recursive=False):
731         """Remove the file or subcollection (directory) at `path`.
732
733         :recursive:
734           Specify whether to remove non-empty subcollections (True), or raise an error (False).
735         """
736
737         if not path:
738             raise errors.ArgumentError("Parameter 'path' is empty.")
739
740         pathcomponents = path.split("/", 1)
741         item = self._items.get(pathcomponents[0])
742         if item is None:
743             raise IOError(errno.ENOENT, "File not found", path)
744         if len(pathcomponents) == 1:
745             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
746                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
747             deleteditem = self._items[pathcomponents[0]]
748             del self._items[pathcomponents[0]]
749             self._committed = False
750             self.notify(DEL, self, pathcomponents[0], deleteditem)
751         else:
752             item.remove(pathcomponents[1])
753
754     def _clonefrom(self, source):
755         for k,v in source.items():
756             self._items[k] = v.clone(self, k)
757
758     def clone(self):
759         raise NotImplementedError()
760
761     @must_be_writable
762     @synchronized
763     def add(self, source_obj, target_name, overwrite=False, reparent=False):
764         """Copy or move a file or subcollection to this collection.
765
766         :source_obj:
767           An ArvadosFile, or Subcollection object
768
769         :target_name:
770           Destination item name.  If the target name already exists and is a
771           file, this will raise an error unless you specify `overwrite=True`.
772
773         :overwrite:
774           Whether to overwrite target file if it already exists.
775
776         :reparent:
777           If True, source_obj will be moved from its parent collection to this collection.
778           If False, source_obj will be copied and the parent collection will be
779           unmodified.
780
781         """
782
783         if target_name in self and not overwrite:
784             raise IOError(errno.EEXIST, "File already exists", target_name)
785
786         modified_from = None
787         if target_name in self:
788             modified_from = self[target_name]
789
790         # Actually make the move or copy.
791         if reparent:
792             source_obj._reparent(self, target_name)
793             item = source_obj
794         else:
795             item = source_obj.clone(self, target_name)
796
797         self._items[target_name] = item
798         self._committed = False
799
800         if modified_from:
801             self.notify(MOD, self, target_name, (modified_from, item))
802         else:
803             self.notify(ADD, self, target_name, item)
804
805     def _get_src_target(self, source, target_path, source_collection, create_dest):
806         if source_collection is None:
807             source_collection = self
808
809         # Find the object
810         if isinstance(source, basestring):
811             source_obj = source_collection.find(source)
812             if source_obj is None:
813                 raise IOError(errno.ENOENT, "File not found", source)
814             sourcecomponents = source.split("/")
815         else:
816             source_obj = source
817             sourcecomponents = None
818
819         # Find parent collection the target path
820         targetcomponents = target_path.split("/")
821
822         # Determine the name to use.
823         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
824
825         if not target_name:
826             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
827
828         if create_dest:
829             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
830         else:
831             if len(targetcomponents) > 1:
832                 target_dir = self.find("/".join(targetcomponents[0:-1]))
833             else:
834                 target_dir = self
835
836         if target_dir is None:
837             raise IOError(errno.ENOENT, "Target directory not found", target_name)
838
839         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
840             target_dir = target_dir[target_name]
841             target_name = sourcecomponents[-1]
842
843         return (source_obj, target_dir, target_name)
844
845     @must_be_writable
846     @synchronized
847     def copy(self, source, target_path, source_collection=None, overwrite=False):
848         """Copy a file or subcollection to a new path in this collection.
849
850         :source:
851           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
852
853         :target_path:
854           Destination file or path.  If the target path already exists and is a
855           subcollection, the item will be placed inside the subcollection.  If
856           the target path already exists and is a file, this will raise an error
857           unless you specify `overwrite=True`.
858
859         :source_collection:
860           Collection to copy `source_path` from (default `self`)
861
862         :overwrite:
863           Whether to overwrite target file if it already exists.
864         """
865
866         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
867         target_dir.add(source_obj, target_name, overwrite, False)
868
869     @must_be_writable
870     @synchronized
871     def rename(self, source, target_path, source_collection=None, overwrite=False):
872         """Move a file or subcollection from `source_collection` to a new path in this collection.
873
874         :source:
875           A string with a path to source file or subcollection.
876
877         :target_path:
878           Destination file or path.  If the target path already exists and is a
879           subcollection, the item will be placed inside the subcollection.  If
880           the target path already exists and is a file, this will raise an error
881           unless you specify `overwrite=True`.
882
883         :source_collection:
884           Collection to copy `source_path` from (default `self`)
885
886         :overwrite:
887           Whether to overwrite target file if it already exists.
888         """
889
890         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
891         if not source_obj.writable():
892             raise IOError(errno.EROFS, "Source collection is read only", source)
893         target_dir.add(source_obj, target_name, overwrite, True)
894
895     def portable_manifest_text(self, stream_name="."):
896         """Get the manifest text for this collection, sub collections and files.
897
898         This method does not flush outstanding blocks to Keep.  It will return
899         a normalized manifest with access tokens stripped.
900
901         :stream_name:
902           Name to use for this stream (directory)
903
904         """
905         return self._get_manifest_text(stream_name, True, True)
906
907     @synchronized
908     def manifest_text(self, stream_name=".", strip=False, normalize=False):
909         """Get the manifest text for this collection, sub collections and files.
910
911         This method will flush outstanding blocks to Keep.  By default, it will
912         not normalize an unmodified manifest or strip access tokens.
913
914         :stream_name:
915           Name to use for this stream (directory)
916
917         :strip:
918           If True, remove signing tokens from block locators if present.
919           If False (default), block locators are left unchanged.
920
921         :normalize:
922           If True, always export the manifest text in normalized form
923           even if the Collection is not modified.  If False (default) and the collection
924           is not modified, return the original manifest text even if it is not
925           in normalized form.
926
927         """
928
929         self._my_block_manager().commit_all()
930         return self._get_manifest_text(stream_name, strip, normalize)
931
932     @synchronized
933     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
934         """Get the manifest text for this collection, sub collections and files.
935
936         :stream_name:
937           Name to use for this stream (directory)
938
939         :strip:
940           If True, remove signing tokens from block locators if present.
941           If False (default), block locators are left unchanged.
942
943         :normalize:
944           If True, always export the manifest text in normalized form
945           even if the Collection is not modified.  If False (default) and the collection
946           is not modified, return the original manifest text even if it is not
947           in normalized form.
948
949         :only_committed:
950           If True, only include blocks that were already committed to Keep.
951
952         """
953
954         if not self.committed() or self._manifest_text is None or normalize:
955             stream = {}
956             buf = []
957             sorted_keys = sorted(self.keys())
958             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
959                 # Create a stream per file `k`
960                 arvfile = self[filename]
961                 filestream = []
962                 for segment in arvfile.segments():
963                     loc = segment.locator
964                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
965                         if only_committed:
966                             continue
967                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
968                     if strip:
969                         loc = KeepLocator(loc).stripped()
970                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
971                                          segment.segment_offset, segment.range_size))
972                 stream[filename] = filestream
973             if stream:
974                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
975             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
976                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
977             return "".join(buf)
978         else:
979             if strip:
980                 return self.stripped_manifest()
981             else:
982                 return self._manifest_text
983
984     @synchronized
985     def diff(self, end_collection, prefix=".", holding_collection=None):
986         """Generate list of add/modify/delete actions.
987
988         When given to `apply`, will change `self` to match `end_collection`
989
990         """
991         changes = []
992         if holding_collection is None:
993             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
994         for k in self:
995             if k not in end_collection:
996                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
997         for k in end_collection:
998             if k in self:
999                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1000                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1001                 elif end_collection[k] != self[k]:
1002                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1003                 else:
1004                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1005             else:
1006                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1007         return changes
1008
1009     @must_be_writable
1010     @synchronized
1011     def apply(self, changes):
1012         """Apply changes from `diff`.
1013
1014         If a change conflicts with a local change, it will be saved to an
1015         alternate path indicating the conflict.
1016
1017         """
1018         if changes:
1019             self._committed = False
1020         for change in changes:
1021             event_type = change[0]
1022             path = change[1]
1023             initial = change[2]
1024             local = self.find(path)
1025             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1026                                                                     time.gmtime()))
1027             if event_type == ADD:
1028                 if local is None:
1029                     # No local file at path, safe to copy over new file
1030                     self.copy(initial, path)
1031                 elif local is not None and local != initial:
1032                     # There is already local file and it is different:
1033                     # save change to conflict file.
1034                     self.copy(initial, conflictpath)
1035             elif event_type == MOD or event_type == TOK:
1036                 final = change[3]
1037                 if local == initial:
1038                     # Local matches the "initial" item so it has not
1039                     # changed locally and is safe to update.
1040                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1041                         # Replace contents of local file with new contents
1042                         local.replace_contents(final)
1043                     else:
1044                         # Overwrite path with new item; this can happen if
1045                         # path was a file and is now a collection or vice versa
1046                         self.copy(final, path, overwrite=True)
1047                 else:
1048                     # Local is missing (presumably deleted) or local doesn't
1049                     # match the "start" value, so save change to conflict file
1050                     self.copy(final, conflictpath)
1051             elif event_type == DEL:
1052                 if local == initial:
1053                     # Local item matches "initial" value, so it is safe to remove.
1054                     self.remove(path, recursive=True)
1055                 # else, the file is modified or already removed, in either
1056                 # case we don't want to try to remove it.
1057
1058     def portable_data_hash(self):
1059         """Get the portable data hash for this collection's manifest."""
1060         stripped = self.portable_manifest_text()
1061         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1062
1063     @synchronized
1064     def subscribe(self, callback):
1065         if self._callback is None:
1066             self._callback = callback
1067         else:
1068             raise errors.ArgumentError("A callback is already set on this collection.")
1069
1070     @synchronized
1071     def unsubscribe(self):
1072         if self._callback is not None:
1073             self._callback = None
1074
1075     @synchronized
1076     def notify(self, event, collection, name, item):
1077         if self._callback:
1078             self._callback(event, collection, name, item)
1079         self.root_collection().notify(event, collection, name, item)
1080
1081     @synchronized
1082     def __eq__(self, other):
1083         if other is self:
1084             return True
1085         if not isinstance(other, RichCollectionBase):
1086             return False
1087         if len(self._items) != len(other):
1088             return False
1089         for k in self._items:
1090             if k not in other:
1091                 return False
1092             if self._items[k] != other[k]:
1093                 return False
1094         return True
1095
1096     def __ne__(self, other):
1097         return not self.__eq__(other)
1098
1099     @synchronized
1100     def flush(self):
1101         """Flush bufferblocks to Keep."""
1102         for e in self.values():
1103             e.flush()
1104
1105
1106 class Collection(RichCollectionBase):
1107     """Represents the root of an Arvados Collection.
1108
1109     This class is threadsafe.  The root collection object, all subcollections
1110     and files are protected by a single lock (i.e. each access locks the entire
1111     collection).
1112
1113     Brief summary of
1114     useful methods:
1115
1116     :To read an existing file:
1117       `c.open("myfile", "r")`
1118
1119     :To write a new file:
1120       `c.open("myfile", "w")`
1121
1122     :To determine if a file exists:
1123       `c.find("myfile") is not None`
1124
1125     :To copy a file:
1126       `c.copy("source", "dest")`
1127
1128     :To delete a file:
1129       `c.remove("myfile")`
1130
1131     :To save to an existing collection record:
1132       `c.save()`
1133
1134     :To save a new collection record:
1135     `c.save_new()`
1136
1137     :To merge remote changes into this object:
1138       `c.update()`
1139
1140     Must be associated with an API server Collection record (during
1141     initialization, or using `save_new`) to use `save` or `update`
1142
1143     """
1144
1145     def __init__(self, manifest_locator_or_text=None,
1146                  api_client=None,
1147                  keep_client=None,
1148                  num_retries=None,
1149                  parent=None,
1150                  apiconfig=None,
1151                  block_manager=None,
1152                  replication_desired=None):
1153         """Collection constructor.
1154
1155         :manifest_locator_or_text:
1156           One of Arvados collection UUID, block locator of
1157           a manifest, raw manifest text, or None (to create an empty collection).
1158         :parent:
1159           the parent Collection, may be None.
1160
1161         :apiconfig:
1162           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1163           Prefer this over supplying your own api_client and keep_client (except in testing).
1164           Will use default config settings if not specified.
1165
1166         :api_client:
1167           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1168
1169         :keep_client:
1170           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1171
1172         :num_retries:
1173           the number of retries for API and Keep requests.
1174
1175         :block_manager:
1176           the block manager to use.  If not specified, create one.
1177
1178         :replication_desired:
1179           How many copies should Arvados maintain. If None, API server default
1180           configuration applies. If not None, this value will also be used
1181           for determining the number of block copies being written.
1182
1183         """
1184         super(Collection, self).__init__(parent)
1185         self._api_client = api_client
1186         self._keep_client = keep_client
1187         self._block_manager = block_manager
1188         self.replication_desired = replication_desired
1189
1190         if apiconfig:
1191             self._config = apiconfig
1192         else:
1193             self._config = config.settings()
1194
1195         self.num_retries = num_retries if num_retries is not None else 0
1196         self._manifest_locator = None
1197         self._manifest_text = None
1198         self._api_response = None
1199         self._past_versions = set()
1200
1201         self.lock = threading.RLock()
1202         self.events = None
1203
1204         if manifest_locator_or_text:
1205             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1206                 self._manifest_locator = manifest_locator_or_text
1207             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1208                 self._manifest_locator = manifest_locator_or_text
1209             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1210                 self._manifest_text = manifest_locator_or_text
1211             else:
1212                 raise errors.ArgumentError(
1213                     "Argument to CollectionReader is not a manifest or a collection UUID")
1214
1215             try:
1216                 self._populate()
1217             except (IOError, errors.SyntaxError) as e:
1218                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1219
1220     def root_collection(self):
1221         return self
1222
1223     def stream_name(self):
1224         return "."
1225
1226     def writable(self):
1227         return True
1228
1229     @synchronized
1230     def known_past_version(self, modified_at_and_portable_data_hash):
1231         return modified_at_and_portable_data_hash in self._past_versions
1232
1233     @synchronized
1234     @retry_method
1235     def update(self, other=None, num_retries=None):
1236         """Merge the latest collection on the API server with the current collection."""
1237
1238         if other is None:
1239             if self._manifest_locator is None:
1240                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1241             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1242             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1243                 response.get("portable_data_hash") != self.portable_data_hash()):
1244                 # The record on the server is different from our current one, but we've seen it before,
1245                 # so ignore it because it's already been merged.
1246                 # However, if it's the same as our current record, proceed with the update, because we want to update
1247                 # our tokens.
1248                 return
1249             else:
1250                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1251             other = CollectionReader(response["manifest_text"])
1252         baseline = CollectionReader(self._manifest_text)
1253         self.apply(baseline.diff(other))
1254         self._manifest_text = self.manifest_text()
1255
1256     @synchronized
1257     def _my_api(self):
1258         if self._api_client is None:
1259             self._api_client = ThreadSafeApiCache(self._config)
1260             if self._keep_client is None:
1261                 self._keep_client = self._api_client.keep
1262         return self._api_client
1263
1264     @synchronized
1265     def _my_keep(self):
1266         if self._keep_client is None:
1267             if self._api_client is None:
1268                 self._my_api()
1269             else:
1270                 self._keep_client = KeepClient(api_client=self._api_client)
1271         return self._keep_client
1272
1273     @synchronized
1274     def _my_block_manager(self):
1275         if self._block_manager is None:
1276             copies = (self.replication_desired or
1277                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1278                                                    2))
1279             self._block_manager = _BlockManager(self._my_keep(), copies=copies)
1280         return self._block_manager
1281
1282     def _remember_api_response(self, response):
1283         self._api_response = response
1284         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1285
1286     def _populate_from_api_server(self):
1287         # As in KeepClient itself, we must wait until the last
1288         # possible moment to instantiate an API client, in order to
1289         # avoid tripping up clients that don't have access to an API
1290         # server.  If we do build one, make sure our Keep client uses
1291         # it.  If instantiation fails, we'll fall back to the except
1292         # clause, just like any other Collection lookup
1293         # failure. Return an exception, or None if successful.
1294         try:
1295             self._remember_api_response(self._my_api().collections().get(
1296                 uuid=self._manifest_locator).execute(
1297                     num_retries=self.num_retries))
1298             self._manifest_text = self._api_response['manifest_text']
1299             # If not overriden via kwargs, we should try to load the
1300             # replication_desired from the API server
1301             if self.replication_desired is None:
1302                 self.replication_desired = self._api_response.get('replication_desired', None)
1303             return None
1304         except Exception as e:
1305             return e
1306
1307     def _populate_from_keep(self):
1308         # Retrieve a manifest directly from Keep. This has a chance of
1309         # working if [a] the locator includes a permission signature
1310         # or [b] the Keep services are operating in world-readable
1311         # mode. Return an exception, or None if successful.
1312         try:
1313             self._manifest_text = self._my_keep().get(
1314                 self._manifest_locator, num_retries=self.num_retries)
1315         except Exception as e:
1316             return e
1317
1318     def _populate(self):
1319         if self._manifest_locator is None and self._manifest_text is None:
1320             return
1321         error_via_api = None
1322         error_via_keep = None
1323         should_try_keep = ((self._manifest_text is None) and
1324                            util.keep_locator_pattern.match(
1325                                self._manifest_locator))
1326         if ((self._manifest_text is None) and
1327             util.signed_locator_pattern.match(self._manifest_locator)):
1328             error_via_keep = self._populate_from_keep()
1329         if self._manifest_text is None:
1330             error_via_api = self._populate_from_api_server()
1331             if error_via_api is not None and not should_try_keep:
1332                 raise error_via_api
1333         if ((self._manifest_text is None) and
1334             not error_via_keep and
1335             should_try_keep):
1336             # Looks like a keep locator, and we didn't already try keep above
1337             error_via_keep = self._populate_from_keep()
1338         if self._manifest_text is None:
1339             # Nothing worked!
1340             raise errors.NotFoundError(
1341                 ("Failed to retrieve collection '{}' " +
1342                  "from either API server ({}) or Keep ({})."
1343                  ).format(
1344                     self._manifest_locator,
1345                     error_via_api,
1346                     error_via_keep))
1347         # populate
1348         self._baseline_manifest = self._manifest_text
1349         self._import_manifest(self._manifest_text)
1350
1351
1352     def _has_collection_uuid(self):
1353         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1354
1355     def __enter__(self):
1356         return self
1357
1358     def __exit__(self, exc_type, exc_value, traceback):
1359         """Support scoped auto-commit in a with: block."""
1360         if exc_type is None:
1361             if self.writable() and self._has_collection_uuid():
1362                 self.save()
1363         self.stop_threads()
1364
1365     def stop_threads(self):
1366         if self._block_manager is not None:
1367             self._block_manager.stop_threads()
1368
1369     @synchronized
1370     def manifest_locator(self):
1371         """Get the manifest locator, if any.
1372
1373         The manifest locator will be set when the collection is loaded from an
1374         API server record or the portable data hash of a manifest.
1375
1376         The manifest locator will be None if the collection is newly created or
1377         was created directly from manifest text.  The method `save_new()` will
1378         assign a manifest locator.
1379
1380         """
1381         return self._manifest_locator
1382
1383     @synchronized
1384     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1385         if new_config is None:
1386             new_config = self._config
1387         if readonly:
1388             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1389         else:
1390             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1391
1392         newcollection._clonefrom(self)
1393         return newcollection
1394
1395     @synchronized
1396     def api_response(self):
1397         """Returns information about this Collection fetched from the API server.
1398
1399         If the Collection exists in Keep but not the API server, currently
1400         returns None.  Future versions may provide a synthetic response.
1401
1402         """
1403         return self._api_response
1404
1405     def find_or_create(self, path, create_type):
1406         """See `RichCollectionBase.find_or_create`"""
1407         if path == ".":
1408             return self
1409         else:
1410             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1411
1412     def find(self, path):
1413         """See `RichCollectionBase.find`"""
1414         if path == ".":
1415             return self
1416         else:
1417             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1418
1419     def remove(self, path, recursive=False):
1420         """See `RichCollectionBase.remove`"""
1421         if path == ".":
1422             raise errors.ArgumentError("Cannot remove '.'")
1423         else:
1424             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1425
1426     @must_be_writable
1427     @synchronized
1428     @retry_method
1429     def save(self, merge=True, num_retries=None):
1430         """Save collection to an existing collection record.
1431
1432         Commit pending buffer blocks to Keep, merge with remote record (if
1433         merge=True, the default), and update the collection record.  Returns
1434         the current manifest text.
1435
1436         Will raise AssertionError if not associated with a collection record on
1437         the API server.  If you want to save a manifest to Keep only, see
1438         `save_new()`.
1439
1440         :merge:
1441           Update and merge remote changes before saving.  Otherwise, any
1442           remote changes will be ignored and overwritten.
1443
1444         :num_retries:
1445           Retry count on API calls (if None,  use the collection default)
1446
1447         """
1448         if not self.committed():
1449             if not self._has_collection_uuid():
1450                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1451
1452             self._my_block_manager().commit_all()
1453
1454             if merge:
1455                 self.update()
1456
1457             text = self.manifest_text(strip=False)
1458             self._remember_api_response(self._my_api().collections().update(
1459                 uuid=self._manifest_locator,
1460                 body={'manifest_text': text}
1461                 ).execute(
1462                     num_retries=num_retries))
1463             self._manifest_text = self._api_response["manifest_text"]
1464             self.set_committed()
1465
1466         return self._manifest_text
1467
1468
1469     @must_be_writable
1470     @synchronized
1471     @retry_method
1472     def save_new(self, name=None,
1473                  create_collection_record=True,
1474                  owner_uuid=None,
1475                  ensure_unique_name=False,
1476                  num_retries=None):
1477         """Save collection to a new collection record.
1478
1479         Commit pending buffer blocks to Keep and, when create_collection_record
1480         is True (default), create a new collection record.  After creating a
1481         new collection record, this Collection object will be associated with
1482         the new record used by `save()`.  Returns the current manifest text.
1483
1484         :name:
1485           The collection name.
1486
1487         :create_collection_record:
1488            If True, create a collection record on the API server.
1489            If False, only commit blocks to Keep and return the manifest text.
1490
1491         :owner_uuid:
1492           the user, or project uuid that will own this collection.
1493           If None, defaults to the current user.
1494
1495         :ensure_unique_name:
1496           If True, ask the API server to rename the collection
1497           if it conflicts with a collection with the same name and owner.  If
1498           False, a name conflict will result in an error.
1499
1500         :num_retries:
1501           Retry count on API calls (if None,  use the collection default)
1502
1503         """
1504         self._my_block_manager().commit_all()
1505         text = self.manifest_text(strip=False)
1506
1507         if create_collection_record:
1508             if name is None:
1509                 name = "New collection"
1510                 ensure_unique_name = True
1511
1512             body = {"manifest_text": text,
1513                     "name": name,
1514                     "replication_desired": self.replication_desired}
1515             if owner_uuid:
1516                 body["owner_uuid"] = owner_uuid
1517
1518             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1519             text = self._api_response["manifest_text"]
1520
1521             self._manifest_locator = self._api_response["uuid"]
1522
1523             self._manifest_text = text
1524             self.set_committed()
1525
1526         return text
1527
1528     @synchronized
1529     def _import_manifest(self, manifest_text):
1530         """Import a manifest into a `Collection`.
1531
1532         :manifest_text:
1533           The manifest text to import from.
1534
1535         """
1536         if len(self) > 0:
1537             raise ArgumentError("Can only import manifest into an empty collection")
1538
1539         STREAM_NAME = 0
1540         BLOCKS = 1
1541         SEGMENTS = 2
1542
1543         stream_name = None
1544         state = STREAM_NAME
1545
1546         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1547             tok = token_and_separator.group(1)
1548             sep = token_and_separator.group(2)
1549
1550             if state == STREAM_NAME:
1551                 # starting a new stream
1552                 stream_name = tok.replace('\\040', ' ')
1553                 blocks = []
1554                 segments = []
1555                 streamoffset = 0L
1556                 state = BLOCKS
1557                 self.find_or_create(stream_name, COLLECTION)
1558                 continue
1559
1560             if state == BLOCKS:
1561                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1562                 if block_locator:
1563                     blocksize = long(block_locator.group(1))
1564                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1565                     streamoffset += blocksize
1566                 else:
1567                     state = SEGMENTS
1568
1569             if state == SEGMENTS:
1570                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1571                 if file_segment:
1572                     pos = long(file_segment.group(1))
1573                     size = long(file_segment.group(2))
1574                     name = file_segment.group(3).replace('\\040', ' ')
1575                     filepath = os.path.join(stream_name, name)
1576                     afile = self.find_or_create(filepath, FILE)
1577                     if isinstance(afile, ArvadosFile):
1578                         afile.add_segment(blocks, pos, size)
1579                     else:
1580                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1581                 else:
1582                     # error!
1583                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1584
1585             if sep == "\n":
1586                 stream_name = None
1587                 state = STREAM_NAME
1588
1589         self.set_committed()
1590
1591     @synchronized
1592     def notify(self, event, collection, name, item):
1593         if self._callback:
1594             self._callback(event, collection, name, item)
1595
1596
1597 class Subcollection(RichCollectionBase):
1598     """This is a subdirectory within a collection that doesn't have its own API
1599     server record.
1600
1601     Subcollection locking falls under the umbrella lock of its root collection.
1602
1603     """
1604
1605     def __init__(self, parent, name):
1606         super(Subcollection, self).__init__(parent)
1607         self.lock = self.root_collection().lock
1608         self._manifest_text = None
1609         self.name = name
1610         self.num_retries = parent.num_retries
1611
1612     def root_collection(self):
1613         return self.parent.root_collection()
1614
1615     def writable(self):
1616         return self.root_collection().writable()
1617
1618     def _my_api(self):
1619         return self.root_collection()._my_api()
1620
1621     def _my_keep(self):
1622         return self.root_collection()._my_keep()
1623
1624     def _my_block_manager(self):
1625         return self.root_collection()._my_block_manager()
1626
1627     def stream_name(self):
1628         return os.path.join(self.parent.stream_name(), self.name)
1629
1630     @synchronized
1631     def clone(self, new_parent, new_name):
1632         c = Subcollection(new_parent, new_name)
1633         c._clonefrom(self)
1634         return c
1635
1636     @must_be_writable
1637     @synchronized
1638     def _reparent(self, newparent, newname):
1639         self._committed = False
1640         self.flush()
1641         self.parent.remove(self.name, recursive=True)
1642         self.parent = newparent
1643         self.name = newname
1644         self.lock = self.parent.root_collection().lock
1645
1646
1647 class CollectionReader(Collection):
1648     """A read-only collection object.
1649
1650     Initialize from an api collection record locator, a portable data hash of a
1651     manifest, or raw manifest text.  See `Collection` constructor for detailed
1652     options.
1653
1654     """
1655     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1656         self._in_init = True
1657         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1658         self._in_init = False
1659
1660         # Forego any locking since it should never change once initialized.
1661         self.lock = NoopLock()
1662
1663         # Backwards compatability with old CollectionReader
1664         # all_streams() and all_files()
1665         self._streams = None
1666
1667     def writable(self):
1668         return self._in_init
1669
1670     def _populate_streams(orig_func):
1671         @functools.wraps(orig_func)
1672         def populate_streams_wrapper(self, *args, **kwargs):
1673             # Defer populating self._streams until needed since it creates a copy of the manifest.
1674             if self._streams is None:
1675                 if self._manifest_text:
1676                     self._streams = [sline.split()
1677                                      for sline in self._manifest_text.split("\n")
1678                                      if sline]
1679                 else:
1680                     self._streams = []
1681             return orig_func(self, *args, **kwargs)
1682         return populate_streams_wrapper
1683
1684     @_populate_streams
1685     def normalize(self):
1686         """Normalize the streams returned by `all_streams`.
1687
1688         This method is kept for backwards compatability and only affects the
1689         behavior of `all_streams()` and `all_files()`
1690
1691         """
1692
1693         # Rearrange streams
1694         streams = {}
1695         for s in self.all_streams():
1696             for f in s.all_files():
1697                 streamname, filename = split(s.name() + "/" + f.name())
1698                 if streamname not in streams:
1699                     streams[streamname] = {}
1700                 if filename not in streams[streamname]:
1701                     streams[streamname][filename] = []
1702                 for r in f.segments:
1703                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1704
1705         self._streams = [normalize_stream(s, streams[s])
1706                          for s in sorted(streams)]
1707     @_populate_streams
1708     def all_streams(self):
1709         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1710                 for s in self._streams]
1711
1712     @_populate_streams
1713     def all_files(self):
1714         for s in self.all_streams():
1715             for f in s.all_files():
1716                 yield f