Merge branch 'master' into 10797-ruby-2.3
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace: '%s'" %
311                 (newstreamname))
312         self._current_stream_name = '.' if newstreamname=='' else newstreamname
313
314     def current_stream_name(self):
315         return self._current_stream_name
316
317     def finish_current_stream(self):
318         self.finish_current_file()
319         self.flush_data()
320         if not self._current_stream_files:
321             pass
322         elif self._current_stream_name is None:
323             raise errors.AssertionError(
324                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
325                 (self._current_stream_length, len(self._current_stream_files)))
326         else:
327             if not self._current_stream_locators:
328                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
329             self._finished_streams.append([self._current_stream_name,
330                                            self._current_stream_locators,
331                                            self._current_stream_files])
332         self._current_stream_files = []
333         self._current_stream_length = 0
334         self._current_stream_locators = []
335         self._current_stream_name = None
336         self._current_file_pos = 0
337         self._current_file_name = None
338
339     def finish(self):
340         """Store the manifest in Keep and return its locator.
341
342         This is useful for storing manifest fragments (task outputs)
343         temporarily in Keep during a Crunch job.
344
345         In other cases you should make a collection instead, by
346         sending manifest_text() to the API server's "create
347         collection" endpoint.
348         """
349         return self._my_keep().put(self.manifest_text(), copies=self.replication)
350
351     def portable_data_hash(self):
352         stripped = self.stripped_manifest()
353         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354
355     def manifest_text(self):
356         self.finish_current_stream()
357         manifest = ''
358
359         for stream in self._finished_streams:
360             if not re.search(r'^\.(/.*)?$', stream[0]):
361                 manifest += './'
362             manifest += stream[0].replace(' ', '\\040')
363             manifest += ' ' + ' '.join(stream[1])
364             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
365             manifest += "\n"
366
367         return manifest
368
369     def data_locators(self):
370         ret = []
371         for name, locators, files in self._finished_streams:
372             ret += locators
373         return ret
374
375
376 class ResumableCollectionWriter(CollectionWriter):
377     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378                    '_current_stream_locators', '_current_stream_name',
379                    '_current_file_name', '_current_file_pos', '_close_file',
380                    '_data_buffer', '_dependencies', '_finished_streams',
381                    '_queued_dirents', '_queued_trees']
382
383     def __init__(self, api_client=None, **kwargs):
384         self._dependencies = {}
385         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
386
387     @classmethod
388     def from_state(cls, state, *init_args, **init_kwargs):
389         # Try to build a new writer from scratch with the given state.
390         # If the state is not suitable to resume (because files have changed,
391         # been deleted, aren't predictable, etc.), raise a
392         # StaleWriterStateError.  Otherwise, return the initialized writer.
393         # The caller is responsible for calling writer.do_queued_work()
394         # appropriately after it's returned.
395         writer = cls(*init_args, **init_kwargs)
396         for attr_name in cls.STATE_PROPS:
397             attr_value = state[attr_name]
398             attr_class = getattr(writer, attr_name).__class__
399             # Coerce the value into the same type as the initial value, if
400             # needed.
401             if attr_class not in (type(None), attr_value.__class__):
402                 attr_value = attr_class(attr_value)
403             setattr(writer, attr_name, attr_value)
404         # Check dependencies before we try to resume anything.
405         if any(KeepLocator(ls).permission_expired()
406                for ls in writer._current_stream_locators):
407             raise errors.StaleWriterStateError(
408                 "locators include expired permission hint")
409         writer.check_dependencies()
410         if state['_current_file'] is not None:
411             path, pos = state['_current_file']
412             try:
413                 writer._queued_file = open(path, 'rb')
414                 writer._queued_file.seek(pos)
415             except IOError as error:
416                 raise errors.StaleWriterStateError(
417                     "failed to reopen active file {}: {}".format(path, error))
418         return writer
419
420     def check_dependencies(self):
421         for path, orig_stat in self._dependencies.items():
422             if not S_ISREG(orig_stat[ST_MODE]):
423                 raise errors.StaleWriterStateError("{} not file".format(path))
424             try:
425                 now_stat = tuple(os.stat(path))
426             except OSError as error:
427                 raise errors.StaleWriterStateError(
428                     "failed to stat {}: {}".format(path, error))
429             if ((not S_ISREG(now_stat[ST_MODE])) or
430                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
431                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
432                 raise errors.StaleWriterStateError("{} changed".format(path))
433
434     def dump_state(self, copy_func=lambda x: x):
435         state = {attr: copy_func(getattr(self, attr))
436                  for attr in self.STATE_PROPS}
437         if self._queued_file is None:
438             state['_current_file'] = None
439         else:
440             state['_current_file'] = (os.path.realpath(self._queued_file.name),
441                                       self._queued_file.tell())
442         return state
443
444     def _queue_file(self, source, filename=None):
445         try:
446             src_path = os.path.realpath(source)
447         except Exception:
448             raise errors.AssertionError("{} not a file path".format(source))
449         try:
450             path_stat = os.stat(src_path)
451         except OSError as stat_error:
452             path_stat = None
453         super(ResumableCollectionWriter, self)._queue_file(source, filename)
454         fd_stat = os.fstat(self._queued_file.fileno())
455         if not S_ISREG(fd_stat.st_mode):
456             # We won't be able to resume from this cache anyway, so don't
457             # worry about further checks.
458             self._dependencies[source] = tuple(fd_stat)
459         elif path_stat is None:
460             raise errors.AssertionError(
461                 "could not stat {}: {}".format(source, stat_error))
462         elif path_stat.st_ino != fd_stat.st_ino:
463             raise errors.AssertionError(
464                 "{} changed between open and stat calls".format(source))
465         else:
466             self._dependencies[src_path] = tuple(fd_stat)
467
468     def write(self, data):
469         if self._queued_file is None:
470             raise errors.AssertionError(
471                 "resumable writer can't accept unsourced data")
472         return super(ResumableCollectionWriter, self).write(data)
473
474
475 ADD = "add"
476 DEL = "del"
477 MOD = "mod"
478 TOK = "tok"
479 FILE = "file"
480 COLLECTION = "collection"
481
482 class RichCollectionBase(CollectionBase):
483     """Base class for Collections and Subcollections.
484
485     Implements the majority of functionality relating to accessing items in the
486     Collection.
487
488     """
489
490     def __init__(self, parent=None):
491         self.parent = parent
492         self._committed = False
493         self._callback = None
494         self._items = {}
495
496     def _my_api(self):
497         raise NotImplementedError()
498
499     def _my_keep(self):
500         raise NotImplementedError()
501
502     def _my_block_manager(self):
503         raise NotImplementedError()
504
505     def writable(self):
506         raise NotImplementedError()
507
508     def root_collection(self):
509         raise NotImplementedError()
510
511     def notify(self, event, collection, name, item):
512         raise NotImplementedError()
513
514     def stream_name(self):
515         raise NotImplementedError()
516
517     @must_be_writable
518     @synchronized
519     def find_or_create(self, path, create_type):
520         """Recursively search the specified file path.
521
522         May return either a `Collection` or `ArvadosFile`.  If not found, will
523         create a new item at the specified path based on `create_type`.  Will
524         create intermediate subcollections needed to contain the final item in
525         the path.
526
527         :create_type:
528           One of `arvados.collection.FILE` or
529           `arvados.collection.COLLECTION`.  If the path is not found, and value
530           of create_type is FILE then create and return a new ArvadosFile for
531           the last path component.  If COLLECTION, then create and return a new
532           Collection for the last path component.
533
534         """
535
536         pathcomponents = path.split("/", 1)
537         if pathcomponents[0]:
538             item = self._items.get(pathcomponents[0])
539             if len(pathcomponents) == 1:
540                 if item is None:
541                     # create new file
542                     if create_type == COLLECTION:
543                         item = Subcollection(self, pathcomponents[0])
544                     else:
545                         item = ArvadosFile(self, pathcomponents[0])
546                     self._items[pathcomponents[0]] = item
547                     self._committed = False
548                     self.notify(ADD, self, pathcomponents[0], item)
549                 return item
550             else:
551                 if item is None:
552                     # create new collection
553                     item = Subcollection(self, pathcomponents[0])
554                     self._items[pathcomponents[0]] = item
555                     self._committed = False
556                     self.notify(ADD, self, pathcomponents[0], item)
557                 if isinstance(item, RichCollectionBase):
558                     return item.find_or_create(pathcomponents[1], create_type)
559                 else:
560                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
561         else:
562             return self
563
564     @synchronized
565     def find(self, path):
566         """Recursively search the specified file path.
567
568         May return either a Collection or ArvadosFile. Return None if not
569         found.
570         If path is invalid (ex: starts with '/'), an IOError exception will be
571         raised.
572
573         """
574         if not path:
575             raise errors.ArgumentError("Parameter 'path' is empty.")
576
577         pathcomponents = path.split("/", 1)
578         if pathcomponents[0] == '':
579             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
580
581         item = self._items.get(pathcomponents[0])
582         if item is None:
583             return None
584         elif len(pathcomponents) == 1:
585             return item
586         else:
587             if isinstance(item, RichCollectionBase):
588                 if pathcomponents[1]:
589                     return item.find(pathcomponents[1])
590                 else:
591                     return item
592             else:
593                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
594
595     @synchronized
596     def mkdirs(self, path):
597         """Recursive subcollection create.
598
599         Like `os.makedirs()`.  Will create intermediate subcollections needed
600         to contain the leaf subcollection path.
601
602         """
603
604         if self.find(path) != None:
605             raise IOError(errno.EEXIST, "Directory or file exists", path)
606
607         return self.find_or_create(path, COLLECTION)
608
609     def open(self, path, mode="r"):
610         """Open a file-like object for access.
611
612         :path:
613           path to a file in the collection
614         :mode:
615           one of "r", "r+", "w", "w+", "a", "a+"
616           :"r":
617             opens for reading
618           :"r+":
619             opens for reading and writing.  Reads/writes share a file pointer.
620           :"w", "w+":
621             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
622           :"a", "a+":
623             opens for reading and writing.  All writes are appended to
624             the end of the file.  Writing does not affect the file pointer for
625             reading.
626         """
627         mode = mode.replace("b", "")
628         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
629             raise errors.ArgumentError("Bad mode '%s'" % mode)
630         create = (mode != "r")
631
632         if create and not self.writable():
633             raise IOError(errno.EROFS, "Collection is read only")
634
635         if create:
636             arvfile = self.find_or_create(path, FILE)
637         else:
638             arvfile = self.find(path)
639
640         if arvfile is None:
641             raise IOError(errno.ENOENT, "File not found", path)
642         if not isinstance(arvfile, ArvadosFile):
643             raise IOError(errno.EISDIR, "Is a directory", path)
644
645         if mode[0] == "w":
646             arvfile.truncate(0)
647
648         name = os.path.basename(path)
649
650         if mode == "r":
651             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
652         else:
653             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
654
655     def modified(self):
656         """Determine if the collection has been modified since last commited."""
657         return not self.committed()
658
659     @synchronized
660     def committed(self):
661         """Determine if the collection has been committed to the API server."""
662
663         if self._committed is False:
664             return False
665         for v in self._items.values():
666             if v.committed() is False:
667                 return False
668         return True
669
670     @synchronized
671     def set_committed(self):
672         """Recursively set committed flag to True."""
673         self._committed = True
674         for k,v in self._items.items():
675             v.set_committed()
676
677     @synchronized
678     def __iter__(self):
679         """Iterate over names of files and collections contained in this collection."""
680         return iter(self._items.keys())
681
682     @synchronized
683     def __getitem__(self, k):
684         """Get a file or collection that is directly contained by this collection.
685
686         If you want to search a path, use `find()` instead.
687
688         """
689         return self._items[k]
690
691     @synchronized
692     def __contains__(self, k):
693         """Test if there is a file or collection a directly contained by this collection."""
694         return k in self._items
695
696     @synchronized
697     def __len__(self):
698         """Get the number of items directly contained in this collection."""
699         return len(self._items)
700
701     @must_be_writable
702     @synchronized
703     def __delitem__(self, p):
704         """Delete an item by name which is directly contained by this collection."""
705         del self._items[p]
706         self._committed = False
707         self.notify(DEL, self, p, None)
708
709     @synchronized
710     def keys(self):
711         """Get a list of names of files and collections directly contained in this collection."""
712         return self._items.keys()
713
714     @synchronized
715     def values(self):
716         """Get a list of files and collection objects directly contained in this collection."""
717         return self._items.values()
718
719     @synchronized
720     def items(self):
721         """Get a list of (name, object) tuples directly contained in this collection."""
722         return self._items.items()
723
724     def exists(self, path):
725         """Test if there is a file or collection at `path`."""
726         return self.find(path) is not None
727
728     @must_be_writable
729     @synchronized
730     def remove(self, path, recursive=False):
731         """Remove the file or subcollection (directory) at `path`.
732
733         :recursive:
734           Specify whether to remove non-empty subcollections (True), or raise an error (False).
735         """
736
737         if not path:
738             raise errors.ArgumentError("Parameter 'path' is empty.")
739
740         pathcomponents = path.split("/", 1)
741         item = self._items.get(pathcomponents[0])
742         if item is None:
743             raise IOError(errno.ENOENT, "File not found", path)
744         if len(pathcomponents) == 1:
745             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
746                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
747             deleteditem = self._items[pathcomponents[0]]
748             del self._items[pathcomponents[0]]
749             self._committed = False
750             self.notify(DEL, self, pathcomponents[0], deleteditem)
751         else:
752             item.remove(pathcomponents[1])
753
754     def _clonefrom(self, source):
755         for k,v in source.items():
756             self._items[k] = v.clone(self, k)
757
758     def clone(self):
759         raise NotImplementedError()
760
761     @must_be_writable
762     @synchronized
763     def add(self, source_obj, target_name, overwrite=False, reparent=False):
764         """Copy or move a file or subcollection to this collection.
765
766         :source_obj:
767           An ArvadosFile, or Subcollection object
768
769         :target_name:
770           Destination item name.  If the target name already exists and is a
771           file, this will raise an error unless you specify `overwrite=True`.
772
773         :overwrite:
774           Whether to overwrite target file if it already exists.
775
776         :reparent:
777           If True, source_obj will be moved from its parent collection to this collection.
778           If False, source_obj will be copied and the parent collection will be
779           unmodified.
780
781         """
782
783         if target_name in self and not overwrite:
784             raise IOError(errno.EEXIST, "File already exists", target_name)
785
786         modified_from = None
787         if target_name in self:
788             modified_from = self[target_name]
789
790         # Actually make the move or copy.
791         if reparent:
792             source_obj._reparent(self, target_name)
793             item = source_obj
794         else:
795             item = source_obj.clone(self, target_name)
796
797         self._items[target_name] = item
798         self._committed = False
799
800         if modified_from:
801             self.notify(MOD, self, target_name, (modified_from, item))
802         else:
803             self.notify(ADD, self, target_name, item)
804
805     def _get_src_target(self, source, target_path, source_collection, create_dest):
806         if source_collection is None:
807             source_collection = self
808
809         # Find the object
810         if isinstance(source, basestring):
811             source_obj = source_collection.find(source)
812             if source_obj is None:
813                 raise IOError(errno.ENOENT, "File not found", source)
814             sourcecomponents = source.split("/")
815         else:
816             source_obj = source
817             sourcecomponents = None
818
819         # Find parent collection the target path
820         targetcomponents = target_path.split("/")
821
822         # Determine the name to use.
823         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
824
825         if not target_name:
826             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
827
828         if create_dest:
829             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
830         else:
831             if len(targetcomponents) > 1:
832                 target_dir = self.find("/".join(targetcomponents[0:-1]))
833             else:
834                 target_dir = self
835
836         if target_dir is None:
837             raise IOError(errno.ENOENT, "Target directory not found", target_name)
838
839         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
840             target_dir = target_dir[target_name]
841             target_name = sourcecomponents[-1]
842
843         return (source_obj, target_dir, target_name)
844
845     @must_be_writable
846     @synchronized
847     def copy(self, source, target_path, source_collection=None, overwrite=False):
848         """Copy a file or subcollection to a new path in this collection.
849
850         :source:
851           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
852
853         :target_path:
854           Destination file or path.  If the target path already exists and is a
855           subcollection, the item will be placed inside the subcollection.  If
856           the target path already exists and is a file, this will raise an error
857           unless you specify `overwrite=True`.
858
859         :source_collection:
860           Collection to copy `source_path` from (default `self`)
861
862         :overwrite:
863           Whether to overwrite target file if it already exists.
864         """
865
866         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
867         target_dir.add(source_obj, target_name, overwrite, False)
868
869     @must_be_writable
870     @synchronized
871     def rename(self, source, target_path, source_collection=None, overwrite=False):
872         """Move a file or subcollection from `source_collection` to a new path in this collection.
873
874         :source:
875           A string with a path to source file or subcollection.
876
877         :target_path:
878           Destination file or path.  If the target path already exists and is a
879           subcollection, the item will be placed inside the subcollection.  If
880           the target path already exists and is a file, this will raise an error
881           unless you specify `overwrite=True`.
882
883         :source_collection:
884           Collection to copy `source_path` from (default `self`)
885
886         :overwrite:
887           Whether to overwrite target file if it already exists.
888         """
889
890         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
891         if not source_obj.writable():
892             raise IOError(errno.EROFS, "Source collection is read only", source)
893         target_dir.add(source_obj, target_name, overwrite, True)
894
895     def portable_manifest_text(self, stream_name="."):
896         """Get the manifest text for this collection, sub collections and files.
897
898         This method does not flush outstanding blocks to Keep.  It will return
899         a normalized manifest with access tokens stripped.
900
901         :stream_name:
902           Name to use for this stream (directory)
903
904         """
905         return self._get_manifest_text(stream_name, True, True)
906
907     @synchronized
908     def manifest_text(self, stream_name=".", strip=False, normalize=False,
909                       only_committed=False):
910         """Get the manifest text for this collection, sub collections and files.
911
912         This method will flush outstanding blocks to Keep.  By default, it will
913         not normalize an unmodified manifest or strip access tokens.
914
915         :stream_name:
916           Name to use for this stream (directory)
917
918         :strip:
919           If True, remove signing tokens from block locators if present.
920           If False (default), block locators are left unchanged.
921
922         :normalize:
923           If True, always export the manifest text in normalized form
924           even if the Collection is not modified.  If False (default) and the collection
925           is not modified, return the original manifest text even if it is not
926           in normalized form.
927
928         :only_committed:
929           If True, don't commit pending blocks.
930
931         """
932
933         if not only_committed:
934             self._my_block_manager().commit_all()
935         return self._get_manifest_text(stream_name, strip, normalize,
936                                        only_committed=only_committed)
937
938     @synchronized
939     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
940         """Get the manifest text for this collection, sub collections and files.
941
942         :stream_name:
943           Name to use for this stream (directory)
944
945         :strip:
946           If True, remove signing tokens from block locators if present.
947           If False (default), block locators are left unchanged.
948
949         :normalize:
950           If True, always export the manifest text in normalized form
951           even if the Collection is not modified.  If False (default) and the collection
952           is not modified, return the original manifest text even if it is not
953           in normalized form.
954
955         :only_committed:
956           If True, only include blocks that were already committed to Keep.
957
958         """
959
960         if not self.committed() or self._manifest_text is None or normalize:
961             stream = {}
962             buf = []
963             sorted_keys = sorted(self.keys())
964             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
965                 # Create a stream per file `k`
966                 arvfile = self[filename]
967                 filestream = []
968                 for segment in arvfile.segments():
969                     loc = segment.locator
970                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
971                         if only_committed:
972                             continue
973                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
974                     if strip:
975                         loc = KeepLocator(loc).stripped()
976                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
977                                          segment.segment_offset, segment.range_size))
978                 stream[filename] = filestream
979             if stream:
980                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
981             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
982                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
983             return "".join(buf)
984         else:
985             if strip:
986                 return self.stripped_manifest()
987             else:
988                 return self._manifest_text
989
990     @synchronized
991     def diff(self, end_collection, prefix=".", holding_collection=None):
992         """Generate list of add/modify/delete actions.
993
994         When given to `apply`, will change `self` to match `end_collection`
995
996         """
997         changes = []
998         if holding_collection is None:
999             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1000         for k in self:
1001             if k not in end_collection:
1002                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1003         for k in end_collection:
1004             if k in self:
1005                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1006                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1007                 elif end_collection[k] != self[k]:
1008                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1009                 else:
1010                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1011             else:
1012                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1013         return changes
1014
1015     @must_be_writable
1016     @synchronized
1017     def apply(self, changes):
1018         """Apply changes from `diff`.
1019
1020         If a change conflicts with a local change, it will be saved to an
1021         alternate path indicating the conflict.
1022
1023         """
1024         if changes:
1025             self._committed = False
1026         for change in changes:
1027             event_type = change[0]
1028             path = change[1]
1029             initial = change[2]
1030             local = self.find(path)
1031             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1032                                                                     time.gmtime()))
1033             if event_type == ADD:
1034                 if local is None:
1035                     # No local file at path, safe to copy over new file
1036                     self.copy(initial, path)
1037                 elif local is not None and local != initial:
1038                     # There is already local file and it is different:
1039                     # save change to conflict file.
1040                     self.copy(initial, conflictpath)
1041             elif event_type == MOD or event_type == TOK:
1042                 final = change[3]
1043                 if local == initial:
1044                     # Local matches the "initial" item so it has not
1045                     # changed locally and is safe to update.
1046                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1047                         # Replace contents of local file with new contents
1048                         local.replace_contents(final)
1049                     else:
1050                         # Overwrite path with new item; this can happen if
1051                         # path was a file and is now a collection or vice versa
1052                         self.copy(final, path, overwrite=True)
1053                 else:
1054                     # Local is missing (presumably deleted) or local doesn't
1055                     # match the "start" value, so save change to conflict file
1056                     self.copy(final, conflictpath)
1057             elif event_type == DEL:
1058                 if local == initial:
1059                     # Local item matches "initial" value, so it is safe to remove.
1060                     self.remove(path, recursive=True)
1061                 # else, the file is modified or already removed, in either
1062                 # case we don't want to try to remove it.
1063
1064     def portable_data_hash(self):
1065         """Get the portable data hash for this collection's manifest."""
1066         stripped = self.portable_manifest_text()
1067         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1068
1069     @synchronized
1070     def subscribe(self, callback):
1071         if self._callback is None:
1072             self._callback = callback
1073         else:
1074             raise errors.ArgumentError("A callback is already set on this collection.")
1075
1076     @synchronized
1077     def unsubscribe(self):
1078         if self._callback is not None:
1079             self._callback = None
1080
1081     @synchronized
1082     def notify(self, event, collection, name, item):
1083         if self._callback:
1084             self._callback(event, collection, name, item)
1085         self.root_collection().notify(event, collection, name, item)
1086
1087     @synchronized
1088     def __eq__(self, other):
1089         if other is self:
1090             return True
1091         if not isinstance(other, RichCollectionBase):
1092             return False
1093         if len(self._items) != len(other):
1094             return False
1095         for k in self._items:
1096             if k not in other:
1097                 return False
1098             if self._items[k] != other[k]:
1099                 return False
1100         return True
1101
1102     def __ne__(self, other):
1103         return not self.__eq__(other)
1104
1105     @synchronized
1106     def flush(self):
1107         """Flush bufferblocks to Keep."""
1108         for e in self.values():
1109             e.flush()
1110
1111
1112 class Collection(RichCollectionBase):
1113     """Represents the root of an Arvados Collection.
1114
1115     This class is threadsafe.  The root collection object, all subcollections
1116     and files are protected by a single lock (i.e. each access locks the entire
1117     collection).
1118
1119     Brief summary of
1120     useful methods:
1121
1122     :To read an existing file:
1123       `c.open("myfile", "r")`
1124
1125     :To write a new file:
1126       `c.open("myfile", "w")`
1127
1128     :To determine if a file exists:
1129       `c.find("myfile") is not None`
1130
1131     :To copy a file:
1132       `c.copy("source", "dest")`
1133
1134     :To delete a file:
1135       `c.remove("myfile")`
1136
1137     :To save to an existing collection record:
1138       `c.save()`
1139
1140     :To save a new collection record:
1141     `c.save_new()`
1142
1143     :To merge remote changes into this object:
1144       `c.update()`
1145
1146     Must be associated with an API server Collection record (during
1147     initialization, or using `save_new`) to use `save` or `update`
1148
1149     """
1150
1151     def __init__(self, manifest_locator_or_text=None,
1152                  api_client=None,
1153                  keep_client=None,
1154                  num_retries=None,
1155                  parent=None,
1156                  apiconfig=None,
1157                  block_manager=None,
1158                  replication_desired=None,
1159                  put_threads=None):
1160         """Collection constructor.
1161
1162         :manifest_locator_or_text:
1163           One of Arvados collection UUID, block locator of
1164           a manifest, raw manifest text, or None (to create an empty collection).
1165         :parent:
1166           the parent Collection, may be None.
1167
1168         :apiconfig:
1169           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1170           Prefer this over supplying your own api_client and keep_client (except in testing).
1171           Will use default config settings if not specified.
1172
1173         :api_client:
1174           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1175
1176         :keep_client:
1177           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1178
1179         :num_retries:
1180           the number of retries for API and Keep requests.
1181
1182         :block_manager:
1183           the block manager to use.  If not specified, create one.
1184
1185         :replication_desired:
1186           How many copies should Arvados maintain. If None, API server default
1187           configuration applies. If not None, this value will also be used
1188           for determining the number of block copies being written.
1189
1190         """
1191         super(Collection, self).__init__(parent)
1192         self._api_client = api_client
1193         self._keep_client = keep_client
1194         self._block_manager = block_manager
1195         self.replication_desired = replication_desired
1196         self.put_threads = put_threads
1197
1198         if apiconfig:
1199             self._config = apiconfig
1200         else:
1201             self._config = config.settings()
1202
1203         self.num_retries = num_retries if num_retries is not None else 0
1204         self._manifest_locator = None
1205         self._manifest_text = None
1206         self._api_response = None
1207         self._past_versions = set()
1208
1209         self.lock = threading.RLock()
1210         self.events = None
1211
1212         if manifest_locator_or_text:
1213             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1214                 self._manifest_locator = manifest_locator_or_text
1215             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1216                 self._manifest_locator = manifest_locator_or_text
1217             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1218                 self._manifest_text = manifest_locator_or_text
1219             else:
1220                 raise errors.ArgumentError(
1221                     "Argument to CollectionReader is not a manifest or a collection UUID")
1222
1223             try:
1224                 self._populate()
1225             except (IOError, errors.SyntaxError) as e:
1226                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1227
1228     def root_collection(self):
1229         return self
1230
1231     def stream_name(self):
1232         return "."
1233
1234     def writable(self):
1235         return True
1236
1237     @synchronized
1238     def known_past_version(self, modified_at_and_portable_data_hash):
1239         return modified_at_and_portable_data_hash in self._past_versions
1240
1241     @synchronized
1242     @retry_method
1243     def update(self, other=None, num_retries=None):
1244         """Merge the latest collection on the API server with the current collection."""
1245
1246         if other is None:
1247             if self._manifest_locator is None:
1248                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1249             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1250             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1251                 response.get("portable_data_hash") != self.portable_data_hash()):
1252                 # The record on the server is different from our current one, but we've seen it before,
1253                 # so ignore it because it's already been merged.
1254                 # However, if it's the same as our current record, proceed with the update, because we want to update
1255                 # our tokens.
1256                 return
1257             else:
1258                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1259             other = CollectionReader(response["manifest_text"])
1260         baseline = CollectionReader(self._manifest_text)
1261         self.apply(baseline.diff(other))
1262         self._manifest_text = self.manifest_text()
1263
1264     @synchronized
1265     def _my_api(self):
1266         if self._api_client is None:
1267             self._api_client = ThreadSafeApiCache(self._config)
1268             if self._keep_client is None:
1269                 self._keep_client = self._api_client.keep
1270         return self._api_client
1271
1272     @synchronized
1273     def _my_keep(self):
1274         if self._keep_client is None:
1275             if self._api_client is None:
1276                 self._my_api()
1277             else:
1278                 self._keep_client = KeepClient(api_client=self._api_client)
1279         return self._keep_client
1280
1281     @synchronized
1282     def _my_block_manager(self):
1283         if self._block_manager is None:
1284             copies = (self.replication_desired or
1285                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1286                                                    2))
1287             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1288         return self._block_manager
1289
1290     def _remember_api_response(self, response):
1291         self._api_response = response
1292         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1293
1294     def _populate_from_api_server(self):
1295         # As in KeepClient itself, we must wait until the last
1296         # possible moment to instantiate an API client, in order to
1297         # avoid tripping up clients that don't have access to an API
1298         # server.  If we do build one, make sure our Keep client uses
1299         # it.  If instantiation fails, we'll fall back to the except
1300         # clause, just like any other Collection lookup
1301         # failure. Return an exception, or None if successful.
1302         try:
1303             self._remember_api_response(self._my_api().collections().get(
1304                 uuid=self._manifest_locator).execute(
1305                     num_retries=self.num_retries))
1306             self._manifest_text = self._api_response['manifest_text']
1307             # If not overriden via kwargs, we should try to load the
1308             # replication_desired from the API server
1309             if self.replication_desired is None:
1310                 self.replication_desired = self._api_response.get('replication_desired', None)
1311             return None
1312         except Exception as e:
1313             return e
1314
1315     def _populate_from_keep(self):
1316         # Retrieve a manifest directly from Keep. This has a chance of
1317         # working if [a] the locator includes a permission signature
1318         # or [b] the Keep services are operating in world-readable
1319         # mode. Return an exception, or None if successful.
1320         try:
1321             self._manifest_text = self._my_keep().get(
1322                 self._manifest_locator, num_retries=self.num_retries)
1323         except Exception as e:
1324             return e
1325
1326     def _populate(self):
1327         if self._manifest_locator is None and self._manifest_text is None:
1328             return
1329         error_via_api = None
1330         error_via_keep = None
1331         should_try_keep = ((self._manifest_text is None) and
1332                            util.keep_locator_pattern.match(
1333                                self._manifest_locator))
1334         if ((self._manifest_text is None) and
1335             util.signed_locator_pattern.match(self._manifest_locator)):
1336             error_via_keep = self._populate_from_keep()
1337         if self._manifest_text is None:
1338             error_via_api = self._populate_from_api_server()
1339             if error_via_api is not None and not should_try_keep:
1340                 raise error_via_api
1341         if ((self._manifest_text is None) and
1342             not error_via_keep and
1343             should_try_keep):
1344             # Looks like a keep locator, and we didn't already try keep above
1345             error_via_keep = self._populate_from_keep()
1346         if self._manifest_text is None:
1347             # Nothing worked!
1348             raise errors.NotFoundError(
1349                 ("Failed to retrieve collection '{}' " +
1350                  "from either API server ({}) or Keep ({})."
1351                  ).format(
1352                     self._manifest_locator,
1353                     error_via_api,
1354                     error_via_keep))
1355         # populate
1356         self._baseline_manifest = self._manifest_text
1357         self._import_manifest(self._manifest_text)
1358
1359
1360     def _has_collection_uuid(self):
1361         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1362
1363     def __enter__(self):
1364         return self
1365
1366     def __exit__(self, exc_type, exc_value, traceback):
1367         """Support scoped auto-commit in a with: block."""
1368         if exc_type is None:
1369             if self.writable() and self._has_collection_uuid():
1370                 self.save()
1371         self.stop_threads()
1372
1373     def stop_threads(self):
1374         if self._block_manager is not None:
1375             self._block_manager.stop_threads()
1376
1377     @synchronized
1378     def manifest_locator(self):
1379         """Get the manifest locator, if any.
1380
1381         The manifest locator will be set when the collection is loaded from an
1382         API server record or the portable data hash of a manifest.
1383
1384         The manifest locator will be None if the collection is newly created or
1385         was created directly from manifest text.  The method `save_new()` will
1386         assign a manifest locator.
1387
1388         """
1389         return self._manifest_locator
1390
1391     @synchronized
1392     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1393         if new_config is None:
1394             new_config = self._config
1395         if readonly:
1396             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1397         else:
1398             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1399
1400         newcollection._clonefrom(self)
1401         return newcollection
1402
1403     @synchronized
1404     def api_response(self):
1405         """Returns information about this Collection fetched from the API server.
1406
1407         If the Collection exists in Keep but not the API server, currently
1408         returns None.  Future versions may provide a synthetic response.
1409
1410         """
1411         return self._api_response
1412
1413     def find_or_create(self, path, create_type):
1414         """See `RichCollectionBase.find_or_create`"""
1415         if path == ".":
1416             return self
1417         else:
1418             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1419
1420     def find(self, path):
1421         """See `RichCollectionBase.find`"""
1422         if path == ".":
1423             return self
1424         else:
1425             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1426
1427     def remove(self, path, recursive=False):
1428         """See `RichCollectionBase.remove`"""
1429         if path == ".":
1430             raise errors.ArgumentError("Cannot remove '.'")
1431         else:
1432             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1433
1434     @must_be_writable
1435     @synchronized
1436     @retry_method
1437     def save(self, merge=True, num_retries=None):
1438         """Save collection to an existing collection record.
1439
1440         Commit pending buffer blocks to Keep, merge with remote record (if
1441         merge=True, the default), and update the collection record.  Returns
1442         the current manifest text.
1443
1444         Will raise AssertionError if not associated with a collection record on
1445         the API server.  If you want to save a manifest to Keep only, see
1446         `save_new()`.
1447
1448         :merge:
1449           Update and merge remote changes before saving.  Otherwise, any
1450           remote changes will be ignored and overwritten.
1451
1452         :num_retries:
1453           Retry count on API calls (if None,  use the collection default)
1454
1455         """
1456         if not self.committed():
1457             if not self._has_collection_uuid():
1458                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1459
1460             self._my_block_manager().commit_all()
1461
1462             if merge:
1463                 self.update()
1464
1465             text = self.manifest_text(strip=False)
1466             self._remember_api_response(self._my_api().collections().update(
1467                 uuid=self._manifest_locator,
1468                 body={'manifest_text': text}
1469                 ).execute(
1470                     num_retries=num_retries))
1471             self._manifest_text = self._api_response["manifest_text"]
1472             self.set_committed()
1473
1474         return self._manifest_text
1475
1476
1477     @must_be_writable
1478     @synchronized
1479     @retry_method
1480     def save_new(self, name=None,
1481                  create_collection_record=True,
1482                  owner_uuid=None,
1483                  ensure_unique_name=False,
1484                  num_retries=None):
1485         """Save collection to a new collection record.
1486
1487         Commit pending buffer blocks to Keep and, when create_collection_record
1488         is True (default), create a new collection record.  After creating a
1489         new collection record, this Collection object will be associated with
1490         the new record used by `save()`.  Returns the current manifest text.
1491
1492         :name:
1493           The collection name.
1494
1495         :create_collection_record:
1496            If True, create a collection record on the API server.
1497            If False, only commit blocks to Keep and return the manifest text.
1498
1499         :owner_uuid:
1500           the user, or project uuid that will own this collection.
1501           If None, defaults to the current user.
1502
1503         :ensure_unique_name:
1504           If True, ask the API server to rename the collection
1505           if it conflicts with a collection with the same name and owner.  If
1506           False, a name conflict will result in an error.
1507
1508         :num_retries:
1509           Retry count on API calls (if None,  use the collection default)
1510
1511         """
1512         self._my_block_manager().commit_all()
1513         text = self.manifest_text(strip=False)
1514
1515         if create_collection_record:
1516             if name is None:
1517                 name = "New collection"
1518                 ensure_unique_name = True
1519
1520             body = {"manifest_text": text,
1521                     "name": name,
1522                     "replication_desired": self.replication_desired}
1523             if owner_uuid:
1524                 body["owner_uuid"] = owner_uuid
1525
1526             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1527             text = self._api_response["manifest_text"]
1528
1529             self._manifest_locator = self._api_response["uuid"]
1530
1531             self._manifest_text = text
1532             self.set_committed()
1533
1534         return text
1535
1536     @synchronized
1537     def _import_manifest(self, manifest_text):
1538         """Import a manifest into a `Collection`.
1539
1540         :manifest_text:
1541           The manifest text to import from.
1542
1543         """
1544         if len(self) > 0:
1545             raise ArgumentError("Can only import manifest into an empty collection")
1546
1547         STREAM_NAME = 0
1548         BLOCKS = 1
1549         SEGMENTS = 2
1550
1551         stream_name = None
1552         state = STREAM_NAME
1553
1554         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1555             tok = token_and_separator.group(1)
1556             sep = token_and_separator.group(2)
1557
1558             if state == STREAM_NAME:
1559                 # starting a new stream
1560                 stream_name = tok.replace('\\040', ' ')
1561                 blocks = []
1562                 segments = []
1563                 streamoffset = 0L
1564                 state = BLOCKS
1565                 self.find_or_create(stream_name, COLLECTION)
1566                 continue
1567
1568             if state == BLOCKS:
1569                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1570                 if block_locator:
1571                     blocksize = long(block_locator.group(1))
1572                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1573                     streamoffset += blocksize
1574                 else:
1575                     state = SEGMENTS
1576
1577             if state == SEGMENTS:
1578                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1579                 if file_segment:
1580                     pos = long(file_segment.group(1))
1581                     size = long(file_segment.group(2))
1582                     name = file_segment.group(3).replace('\\040', ' ')
1583                     filepath = os.path.join(stream_name, name)
1584                     afile = self.find_or_create(filepath, FILE)
1585                     if isinstance(afile, ArvadosFile):
1586                         afile.add_segment(blocks, pos, size)
1587                     else:
1588                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1589                 else:
1590                     # error!
1591                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1592
1593             if sep == "\n":
1594                 stream_name = None
1595                 state = STREAM_NAME
1596
1597         self.set_committed()
1598
1599     @synchronized
1600     def notify(self, event, collection, name, item):
1601         if self._callback:
1602             self._callback(event, collection, name, item)
1603
1604
1605 class Subcollection(RichCollectionBase):
1606     """This is a subdirectory within a collection that doesn't have its own API
1607     server record.
1608
1609     Subcollection locking falls under the umbrella lock of its root collection.
1610
1611     """
1612
1613     def __init__(self, parent, name):
1614         super(Subcollection, self).__init__(parent)
1615         self.lock = self.root_collection().lock
1616         self._manifest_text = None
1617         self.name = name
1618         self.num_retries = parent.num_retries
1619
1620     def root_collection(self):
1621         return self.parent.root_collection()
1622
1623     def writable(self):
1624         return self.root_collection().writable()
1625
1626     def _my_api(self):
1627         return self.root_collection()._my_api()
1628
1629     def _my_keep(self):
1630         return self.root_collection()._my_keep()
1631
1632     def _my_block_manager(self):
1633         return self.root_collection()._my_block_manager()
1634
1635     def stream_name(self):
1636         return os.path.join(self.parent.stream_name(), self.name)
1637
1638     @synchronized
1639     def clone(self, new_parent, new_name):
1640         c = Subcollection(new_parent, new_name)
1641         c._clonefrom(self)
1642         return c
1643
1644     @must_be_writable
1645     @synchronized
1646     def _reparent(self, newparent, newname):
1647         self._committed = False
1648         self.flush()
1649         self.parent.remove(self.name, recursive=True)
1650         self.parent = newparent
1651         self.name = newname
1652         self.lock = self.parent.root_collection().lock
1653
1654
1655 class CollectionReader(Collection):
1656     """A read-only collection object.
1657
1658     Initialize from an api collection record locator, a portable data hash of a
1659     manifest, or raw manifest text.  See `Collection` constructor for detailed
1660     options.
1661
1662     """
1663     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1664         self._in_init = True
1665         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1666         self._in_init = False
1667
1668         # Forego any locking since it should never change once initialized.
1669         self.lock = NoopLock()
1670
1671         # Backwards compatability with old CollectionReader
1672         # all_streams() and all_files()
1673         self._streams = None
1674
1675     def writable(self):
1676         return self._in_init
1677
1678     def _populate_streams(orig_func):
1679         @functools.wraps(orig_func)
1680         def populate_streams_wrapper(self, *args, **kwargs):
1681             # Defer populating self._streams until needed since it creates a copy of the manifest.
1682             if self._streams is None:
1683                 if self._manifest_text:
1684                     self._streams = [sline.split()
1685                                      for sline in self._manifest_text.split("\n")
1686                                      if sline]
1687                 else:
1688                     self._streams = []
1689             return orig_func(self, *args, **kwargs)
1690         return populate_streams_wrapper
1691
1692     @_populate_streams
1693     def normalize(self):
1694         """Normalize the streams returned by `all_streams`.
1695
1696         This method is kept for backwards compatability and only affects the
1697         behavior of `all_streams()` and `all_files()`
1698
1699         """
1700
1701         # Rearrange streams
1702         streams = {}
1703         for s in self.all_streams():
1704             for f in s.all_files():
1705                 streamname, filename = split(s.name() + "/" + f.name())
1706                 if streamname not in streams:
1707                     streams[streamname] = {}
1708                 if filename not in streams[streamname]:
1709                     streams[streamname][filename] = []
1710                 for r in f.segments:
1711                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1712
1713         self._streams = [normalize_stream(s, streams[s])
1714                          for s in sorted(streams)]
1715     @_populate_streams
1716     def all_streams(self):
1717         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1718                 for s in self._streams]
1719
1720     @_populate_streams
1721     def all_files(self):
1722         for s in self.all_streams():
1723             for f in s.all_files():
1724                 yield f