1a427814cf4d5bc13ffbeca75f7c22c87134962c
[arvados.git] / sdk / python / arvados / collection.py
1 from __future__ import absolute_import
2 import functools
3 import logging
4 import os
5 import re
6 import errno
7 import hashlib
8 import time
9 import threading
10
11 from collections import deque
12 from stat import *
13
14 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
15 from .keep import KeepLocator, KeepClient
16 from .stream import StreamReader
17 from ._normalize_stream import normalize_stream
18 from ._ranges import Range, LocatorAndRange
19 from .safeapi import ThreadSafeApiCache
20 import arvados.config as config
21 import arvados.errors as errors
22 import arvados.util
23 import arvados.events as events
24 from arvados.retry import retry_method
25
26 _logger = logging.getLogger('arvados.collection')
27
28 class CollectionBase(object):
29     def __enter__(self):
30         return self
31
32     def __exit__(self, exc_type, exc_value, traceback):
33         pass
34
35     def _my_keep(self):
36         if self._keep_client is None:
37             self._keep_client = KeepClient(api_client=self._api_client,
38                                            num_retries=self.num_retries)
39         return self._keep_client
40
41     def stripped_manifest(self):
42         """Get the manifest with locator hints stripped.
43
44         Return the manifest for the current collection with all
45         non-portable hints (i.e., permission signatures and other
46         hints other than size hints) removed from the locators.
47         """
48         raw = self.manifest_text()
49         clean = []
50         for line in raw.split("\n"):
51             fields = line.split()
52             if fields:
53                 clean_fields = fields[:1] + [
54                     (re.sub(r'\+[^\d][^\+]*', '', x)
55                      if re.match(arvados.util.keep_locator_pattern, x)
56                      else x)
57                     for x in fields[1:]]
58                 clean += [' '.join(clean_fields), "\n"]
59         return ''.join(clean)
60
61
62 class _WriterFile(_FileLikeObjectBase):
63     def __init__(self, coll_writer, name):
64         super(_WriterFile, self).__init__(name, 'wb')
65         self.dest = coll_writer
66
67     def close(self):
68         super(_WriterFile, self).close()
69         self.dest.finish_current_file()
70
71     @_FileLikeObjectBase._before_close
72     def write(self, data):
73         self.dest.write(data)
74
75     @_FileLikeObjectBase._before_close
76     def writelines(self, seq):
77         for data in seq:
78             self.write(data)
79
80     @_FileLikeObjectBase._before_close
81     def flush(self):
82         self.dest.flush_data()
83
84
85 class CollectionWriter(CollectionBase):
86     def __init__(self, api_client=None, num_retries=0, replication=None):
87         """Instantiate a CollectionWriter.
88
89         CollectionWriter lets you build a new Arvados Collection from scratch.
90         Write files to it.  The CollectionWriter will upload data to Keep as
91         appropriate, and provide you with the Collection manifest text when
92         you're finished.
93
94         Arguments:
95         * api_client: The API client to use to look up Collections.  If not
96           provided, CollectionReader will build one from available Arvados
97           configuration.
98         * num_retries: The default number of times to retry failed
99           service requests.  Default 0.  You may change this value
100           after instantiation, but note those changes may not
101           propagate to related objects like the Keep client.
102         * replication: The number of copies of each block to store.
103           If this argument is None or not supplied, replication is
104           the server-provided default if available, otherwise 2.
105         """
106         self._api_client = api_client
107         self.num_retries = num_retries
108         self.replication = (2 if replication is None else replication)
109         self._keep_client = None
110         self._data_buffer = []
111         self._data_buffer_len = 0
112         self._current_stream_files = []
113         self._current_stream_length = 0
114         self._current_stream_locators = []
115         self._current_stream_name = '.'
116         self._current_file_name = None
117         self._current_file_pos = 0
118         self._finished_streams = []
119         self._close_file = None
120         self._queued_file = None
121         self._queued_dirents = deque()
122         self._queued_trees = deque()
123         self._last_open = None
124
125     def __exit__(self, exc_type, exc_value, traceback):
126         if exc_type is None:
127             self.finish()
128
129     def do_queued_work(self):
130         # The work queue consists of three pieces:
131         # * _queued_file: The file object we're currently writing to the
132         #   Collection.
133         # * _queued_dirents: Entries under the current directory
134         #   (_queued_trees[0]) that we want to write or recurse through.
135         #   This may contain files from subdirectories if
136         #   max_manifest_depth == 0 for this directory.
137         # * _queued_trees: Directories that should be written as separate
138         #   streams to the Collection.
139         # This function handles the smallest piece of work currently queued
140         # (current file, then current directory, then next directory) until
141         # no work remains.  The _work_THING methods each do a unit of work on
142         # THING.  _queue_THING methods add a THING to the work queue.
143         while True:
144             if self._queued_file:
145                 self._work_file()
146             elif self._queued_dirents:
147                 self._work_dirents()
148             elif self._queued_trees:
149                 self._work_trees()
150             else:
151                 break
152
153     def _work_file(self):
154         while True:
155             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
156             if not buf:
157                 break
158             self.write(buf)
159         self.finish_current_file()
160         if self._close_file:
161             self._queued_file.close()
162         self._close_file = None
163         self._queued_file = None
164
165     def _work_dirents(self):
166         path, stream_name, max_manifest_depth = self._queued_trees[0]
167         if stream_name != self.current_stream_name():
168             self.start_new_stream(stream_name)
169         while self._queued_dirents:
170             dirent = self._queued_dirents.popleft()
171             target = os.path.join(path, dirent)
172             if os.path.isdir(target):
173                 self._queue_tree(target,
174                                  os.path.join(stream_name, dirent),
175                                  max_manifest_depth - 1)
176             else:
177                 self._queue_file(target, dirent)
178                 break
179         if not self._queued_dirents:
180             self._queued_trees.popleft()
181
182     def _work_trees(self):
183         path, stream_name, max_manifest_depth = self._queued_trees[0]
184         d = arvados.util.listdir_recursive(
185             path, max_depth = (None if max_manifest_depth == 0 else 0))
186         if d:
187             self._queue_dirents(stream_name, d)
188         else:
189             self._queued_trees.popleft()
190
191     def _queue_file(self, source, filename=None):
192         assert (self._queued_file is None), "tried to queue more than one file"
193         if not hasattr(source, 'read'):
194             source = open(source, 'rb')
195             self._close_file = True
196         else:
197             self._close_file = False
198         if filename is None:
199             filename = os.path.basename(source.name)
200         self.start_new_file(filename)
201         self._queued_file = source
202
203     def _queue_dirents(self, stream_name, dirents):
204         assert (not self._queued_dirents), "tried to queue more than one tree"
205         self._queued_dirents = deque(sorted(dirents))
206
207     def _queue_tree(self, path, stream_name, max_manifest_depth):
208         self._queued_trees.append((path, stream_name, max_manifest_depth))
209
210     def write_file(self, source, filename=None):
211         self._queue_file(source, filename)
212         self.do_queued_work()
213
214     def write_directory_tree(self,
215                              path, stream_name='.', max_manifest_depth=-1):
216         self._queue_tree(path, stream_name, max_manifest_depth)
217         self.do_queued_work()
218
219     def write(self, newdata):
220         if hasattr(newdata, '__iter__'):
221             for s in newdata:
222                 self.write(s)
223             return
224         self._data_buffer.append(newdata)
225         self._data_buffer_len += len(newdata)
226         self._current_stream_length += len(newdata)
227         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
228             self.flush_data()
229
230     def open(self, streampath, filename=None):
231         """open(streampath[, filename]) -> file-like object
232
233         Pass in the path of a file to write to the Collection, either as a
234         single string or as two separate stream name and file name arguments.
235         This method returns a file-like object you can write to add it to the
236         Collection.
237
238         You may only have one file object from the Collection open at a time,
239         so be sure to close the object when you're done.  Using the object in
240         a with statement makes that easy::
241
242           with cwriter.open('./doc/page1.txt') as outfile:
243               outfile.write(page1_data)
244           with cwriter.open('./doc/page2.txt') as outfile:
245               outfile.write(page2_data)
246         """
247         if filename is None:
248             streampath, filename = split(streampath)
249         if self._last_open and not self._last_open.closed:
250             raise errors.AssertionError(
251                 "can't open '{}' when '{}' is still open".format(
252                     filename, self._last_open.name))
253         if streampath != self.current_stream_name():
254             self.start_new_stream(streampath)
255         self.set_current_file_name(filename)
256         self._last_open = _WriterFile(self, filename)
257         return self._last_open
258
259     def flush_data(self):
260         data_buffer = ''.join(self._data_buffer)
261         if data_buffer:
262             self._current_stream_locators.append(
263                 self._my_keep().put(
264                     data_buffer[0:config.KEEP_BLOCK_SIZE],
265                     copies=self.replication))
266             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
267             self._data_buffer_len = len(self._data_buffer[0])
268
269     def start_new_file(self, newfilename=None):
270         self.finish_current_file()
271         self.set_current_file_name(newfilename)
272
273     def set_current_file_name(self, newfilename):
274         if re.search(r'[\t\n]', newfilename):
275             raise errors.AssertionError(
276                 "Manifest filenames cannot contain whitespace: %s" %
277                 newfilename)
278         elif re.search(r'\x00', newfilename):
279             raise errors.AssertionError(
280                 "Manifest filenames cannot contain NUL characters: %s" %
281                 newfilename)
282         self._current_file_name = newfilename
283
284     def current_file_name(self):
285         return self._current_file_name
286
287     def finish_current_file(self):
288         if self._current_file_name is None:
289             if self._current_file_pos == self._current_stream_length:
290                 return
291             raise errors.AssertionError(
292                 "Cannot finish an unnamed file " +
293                 "(%d bytes at offset %d in '%s' stream)" %
294                 (self._current_stream_length - self._current_file_pos,
295                  self._current_file_pos,
296                  self._current_stream_name))
297         self._current_stream_files.append([
298                 self._current_file_pos,
299                 self._current_stream_length - self._current_file_pos,
300                 self._current_file_name])
301         self._current_file_pos = self._current_stream_length
302         self._current_file_name = None
303
304     def start_new_stream(self, newstreamname='.'):
305         self.finish_current_stream()
306         self.set_current_stream_name(newstreamname)
307
308     def set_current_stream_name(self, newstreamname):
309         if re.search(r'[\t\n]', newstreamname):
310             raise errors.AssertionError(
311                 "Manifest stream names cannot contain whitespace: '%s'" %
312                 (newstreamname))
313         self._current_stream_name = '.' if newstreamname=='' else newstreamname
314
315     def current_stream_name(self):
316         return self._current_stream_name
317
318     def finish_current_stream(self):
319         self.finish_current_file()
320         self.flush_data()
321         if not self._current_stream_files:
322             pass
323         elif self._current_stream_name is None:
324             raise errors.AssertionError(
325                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
326                 (self._current_stream_length, len(self._current_stream_files)))
327         else:
328             if not self._current_stream_locators:
329                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
330             self._finished_streams.append([self._current_stream_name,
331                                            self._current_stream_locators,
332                                            self._current_stream_files])
333         self._current_stream_files = []
334         self._current_stream_length = 0
335         self._current_stream_locators = []
336         self._current_stream_name = None
337         self._current_file_pos = 0
338         self._current_file_name = None
339
340     def finish(self):
341         """Store the manifest in Keep and return its locator.
342
343         This is useful for storing manifest fragments (task outputs)
344         temporarily in Keep during a Crunch job.
345
346         In other cases you should make a collection instead, by
347         sending manifest_text() to the API server's "create
348         collection" endpoint.
349         """
350         return self._my_keep().put(self.manifest_text(), copies=self.replication)
351
352     def portable_data_hash(self):
353         stripped = self.stripped_manifest()
354         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
355
356     def manifest_text(self):
357         self.finish_current_stream()
358         manifest = ''
359
360         for stream in self._finished_streams:
361             if not re.search(r'^\.(/.*)?$', stream[0]):
362                 manifest += './'
363             manifest += stream[0].replace(' ', '\\040')
364             manifest += ' ' + ' '.join(stream[1])
365             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
366             manifest += "\n"
367
368         return manifest
369
370     def data_locators(self):
371         ret = []
372         for name, locators, files in self._finished_streams:
373             ret += locators
374         return ret
375
376
377 class ResumableCollectionWriter(CollectionWriter):
378     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
379                    '_current_stream_locators', '_current_stream_name',
380                    '_current_file_name', '_current_file_pos', '_close_file',
381                    '_data_buffer', '_dependencies', '_finished_streams',
382                    '_queued_dirents', '_queued_trees']
383
384     def __init__(self, api_client=None, **kwargs):
385         self._dependencies = {}
386         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
387
388     @classmethod
389     def from_state(cls, state, *init_args, **init_kwargs):
390         # Try to build a new writer from scratch with the given state.
391         # If the state is not suitable to resume (because files have changed,
392         # been deleted, aren't predictable, etc.), raise a
393         # StaleWriterStateError.  Otherwise, return the initialized writer.
394         # The caller is responsible for calling writer.do_queued_work()
395         # appropriately after it's returned.
396         writer = cls(*init_args, **init_kwargs)
397         for attr_name in cls.STATE_PROPS:
398             attr_value = state[attr_name]
399             attr_class = getattr(writer, attr_name).__class__
400             # Coerce the value into the same type as the initial value, if
401             # needed.
402             if attr_class not in (type(None), attr_value.__class__):
403                 attr_value = attr_class(attr_value)
404             setattr(writer, attr_name, attr_value)
405         # Check dependencies before we try to resume anything.
406         if any(KeepLocator(ls).permission_expired()
407                for ls in writer._current_stream_locators):
408             raise errors.StaleWriterStateError(
409                 "locators include expired permission hint")
410         writer.check_dependencies()
411         if state['_current_file'] is not None:
412             path, pos = state['_current_file']
413             try:
414                 writer._queued_file = open(path, 'rb')
415                 writer._queued_file.seek(pos)
416             except IOError as error:
417                 raise errors.StaleWriterStateError(
418                     "failed to reopen active file {}: {}".format(path, error))
419         return writer
420
421     def check_dependencies(self):
422         for path, orig_stat in self._dependencies.items():
423             if not S_ISREG(orig_stat[ST_MODE]):
424                 raise errors.StaleWriterStateError("{} not file".format(path))
425             try:
426                 now_stat = tuple(os.stat(path))
427             except OSError as error:
428                 raise errors.StaleWriterStateError(
429                     "failed to stat {}: {}".format(path, error))
430             if ((not S_ISREG(now_stat[ST_MODE])) or
431                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
432                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
433                 raise errors.StaleWriterStateError("{} changed".format(path))
434
435     def dump_state(self, copy_func=lambda x: x):
436         state = {attr: copy_func(getattr(self, attr))
437                  for attr in self.STATE_PROPS}
438         if self._queued_file is None:
439             state['_current_file'] = None
440         else:
441             state['_current_file'] = (os.path.realpath(self._queued_file.name),
442                                       self._queued_file.tell())
443         return state
444
445     def _queue_file(self, source, filename=None):
446         try:
447             src_path = os.path.realpath(source)
448         except Exception:
449             raise errors.AssertionError("{} not a file path".format(source))
450         try:
451             path_stat = os.stat(src_path)
452         except OSError as stat_error:
453             path_stat = None
454         super(ResumableCollectionWriter, self)._queue_file(source, filename)
455         fd_stat = os.fstat(self._queued_file.fileno())
456         if not S_ISREG(fd_stat.st_mode):
457             # We won't be able to resume from this cache anyway, so don't
458             # worry about further checks.
459             self._dependencies[source] = tuple(fd_stat)
460         elif path_stat is None:
461             raise errors.AssertionError(
462                 "could not stat {}: {}".format(source, stat_error))
463         elif path_stat.st_ino != fd_stat.st_ino:
464             raise errors.AssertionError(
465                 "{} changed between open and stat calls".format(source))
466         else:
467             self._dependencies[src_path] = tuple(fd_stat)
468
469     def write(self, data):
470         if self._queued_file is None:
471             raise errors.AssertionError(
472                 "resumable writer can't accept unsourced data")
473         return super(ResumableCollectionWriter, self).write(data)
474
475
476 ADD = "add"
477 DEL = "del"
478 MOD = "mod"
479 TOK = "tok"
480 FILE = "file"
481 COLLECTION = "collection"
482
483 class RichCollectionBase(CollectionBase):
484     """Base class for Collections and Subcollections.
485
486     Implements the majority of functionality relating to accessing items in the
487     Collection.
488
489     """
490
491     def __init__(self, parent=None):
492         self.parent = parent
493         self._committed = False
494         self._callback = None
495         self._items = {}
496
497     def _my_api(self):
498         raise NotImplementedError()
499
500     def _my_keep(self):
501         raise NotImplementedError()
502
503     def _my_block_manager(self):
504         raise NotImplementedError()
505
506     def writable(self):
507         raise NotImplementedError()
508
509     def root_collection(self):
510         raise NotImplementedError()
511
512     def notify(self, event, collection, name, item):
513         raise NotImplementedError()
514
515     def stream_name(self):
516         raise NotImplementedError()
517
518     @must_be_writable
519     @synchronized
520     def find_or_create(self, path, create_type):
521         """Recursively search the specified file path.
522
523         May return either a `Collection` or `ArvadosFile`.  If not found, will
524         create a new item at the specified path based on `create_type`.  Will
525         create intermediate subcollections needed to contain the final item in
526         the path.
527
528         :create_type:
529           One of `arvados.collection.FILE` or
530           `arvados.collection.COLLECTION`.  If the path is not found, and value
531           of create_type is FILE then create and return a new ArvadosFile for
532           the last path component.  If COLLECTION, then create and return a new
533           Collection for the last path component.
534
535         """
536
537         pathcomponents = path.split("/", 1)
538         if pathcomponents[0]:
539             item = self._items.get(pathcomponents[0])
540             if len(pathcomponents) == 1:
541                 if item is None:
542                     # create new file
543                     if create_type == COLLECTION:
544                         item = Subcollection(self, pathcomponents[0])
545                     else:
546                         item = ArvadosFile(self, pathcomponents[0])
547                     self._items[pathcomponents[0]] = item
548                     self.set_committed(False)
549                     self.notify(ADD, self, pathcomponents[0], item)
550                 return item
551             else:
552                 if item is None:
553                     # create new collection
554                     item = Subcollection(self, pathcomponents[0])
555                     self._items[pathcomponents[0]] = item
556                     self.set_committed(False)
557                     self.notify(ADD, self, pathcomponents[0], item)
558                 if isinstance(item, RichCollectionBase):
559                     return item.find_or_create(pathcomponents[1], create_type)
560                 else:
561                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
562         else:
563             return self
564
565     @synchronized
566     def find(self, path):
567         """Recursively search the specified file path.
568
569         May return either a Collection or ArvadosFile. Return None if not
570         found.
571         If path is invalid (ex: starts with '/'), an IOError exception will be
572         raised.
573
574         """
575         if not path:
576             raise errors.ArgumentError("Parameter 'path' is empty.")
577
578         pathcomponents = path.split("/", 1)
579         if pathcomponents[0] == '':
580             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
581
582         item = self._items.get(pathcomponents[0])
583         if item is None:
584             return None
585         elif len(pathcomponents) == 1:
586             return item
587         else:
588             if isinstance(item, RichCollectionBase):
589                 if pathcomponents[1]:
590                     return item.find(pathcomponents[1])
591                 else:
592                     return item
593             else:
594                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
595
596     @synchronized
597     def mkdirs(self, path):
598         """Recursive subcollection create.
599
600         Like `os.makedirs()`.  Will create intermediate subcollections needed
601         to contain the leaf subcollection path.
602
603         """
604
605         if self.find(path) != None:
606             raise IOError(errno.EEXIST, "Directory or file exists", path)
607
608         return self.find_or_create(path, COLLECTION)
609
610     def open(self, path, mode="r"):
611         """Open a file-like object for access.
612
613         :path:
614           path to a file in the collection
615         :mode:
616           one of "r", "r+", "w", "w+", "a", "a+"
617           :"r":
618             opens for reading
619           :"r+":
620             opens for reading and writing.  Reads/writes share a file pointer.
621           :"w", "w+":
622             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
623           :"a", "a+":
624             opens for reading and writing.  All writes are appended to
625             the end of the file.  Writing does not affect the file pointer for
626             reading.
627         """
628         mode = mode.replace("b", "")
629         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
630             raise errors.ArgumentError("Bad mode '%s'" % mode)
631         create = (mode != "r")
632
633         if create and not self.writable():
634             raise IOError(errno.EROFS, "Collection is read only")
635
636         if create:
637             arvfile = self.find_or_create(path, FILE)
638         else:
639             arvfile = self.find(path)
640
641         if arvfile is None:
642             raise IOError(errno.ENOENT, "File not found", path)
643         if not isinstance(arvfile, ArvadosFile):
644             raise IOError(errno.EISDIR, "Is a directory", path)
645
646         if mode[0] == "w":
647             arvfile.truncate(0)
648
649         name = os.path.basename(path)
650
651         if mode == "r":
652             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
653         else:
654             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
655
656     def modified(self):
657         """Determine if the collection has been modified since last commited."""
658         return not self.committed()
659
660     @synchronized
661     def committed(self):
662         """Determine if the collection has been committed to the API server."""
663         return self._committed
664
665     @synchronized
666     def set_committed(self, value=True):
667         """Recursively set committed flag.
668
669         If value is True, set committed to be True for this and all children.
670
671         If value is False, set committed to be False for this and all parents.
672         """
673         if value == self._committed:
674             return
675         if value:
676             for k,v in self._items.items():
677                 v.set_committed(True)
678             self._committed = True
679         else:
680             self._committed = False
681             if self.parent is not None:
682                 self.parent.set_committed(False)
683
684     @synchronized
685     def __iter__(self):
686         """Iterate over names of files and collections contained in this collection."""
687         return iter(self._items.keys())
688
689     @synchronized
690     def __getitem__(self, k):
691         """Get a file or collection that is directly contained by this collection.
692
693         If you want to search a path, use `find()` instead.
694
695         """
696         return self._items[k]
697
698     @synchronized
699     def __contains__(self, k):
700         """Test if there is a file or collection a directly contained by this collection."""
701         return k in self._items
702
703     @synchronized
704     def __len__(self):
705         """Get the number of items directly contained in this collection."""
706         return len(self._items)
707
708     @must_be_writable
709     @synchronized
710     def __delitem__(self, p):
711         """Delete an item by name which is directly contained by this collection."""
712         del self._items[p]
713         self.set_committed(False)
714         self.notify(DEL, self, p, None)
715
716     @synchronized
717     def keys(self):
718         """Get a list of names of files and collections directly contained in this collection."""
719         return self._items.keys()
720
721     @synchronized
722     def values(self):
723         """Get a list of files and collection objects directly contained in this collection."""
724         return self._items.values()
725
726     @synchronized
727     def items(self):
728         """Get a list of (name, object) tuples directly contained in this collection."""
729         return self._items.items()
730
731     def exists(self, path):
732         """Test if there is a file or collection at `path`."""
733         return self.find(path) is not None
734
735     @must_be_writable
736     @synchronized
737     def remove(self, path, recursive=False):
738         """Remove the file or subcollection (directory) at `path`.
739
740         :recursive:
741           Specify whether to remove non-empty subcollections (True), or raise an error (False).
742         """
743
744         if not path:
745             raise errors.ArgumentError("Parameter 'path' is empty.")
746
747         pathcomponents = path.split("/", 1)
748         item = self._items.get(pathcomponents[0])
749         if item is None:
750             raise IOError(errno.ENOENT, "File not found", path)
751         if len(pathcomponents) == 1:
752             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
753                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
754             deleteditem = self._items[pathcomponents[0]]
755             del self._items[pathcomponents[0]]
756             self.set_committed(False)
757             self.notify(DEL, self, pathcomponents[0], deleteditem)
758         else:
759             item.remove(pathcomponents[1])
760
761     def _clonefrom(self, source):
762         for k,v in source.items():
763             self._items[k] = v.clone(self, k)
764
765     def clone(self):
766         raise NotImplementedError()
767
768     @must_be_writable
769     @synchronized
770     def add(self, source_obj, target_name, overwrite=False, reparent=False):
771         """Copy or move a file or subcollection to this collection.
772
773         :source_obj:
774           An ArvadosFile, or Subcollection object
775
776         :target_name:
777           Destination item name.  If the target name already exists and is a
778           file, this will raise an error unless you specify `overwrite=True`.
779
780         :overwrite:
781           Whether to overwrite target file if it already exists.
782
783         :reparent:
784           If True, source_obj will be moved from its parent collection to this collection.
785           If False, source_obj will be copied and the parent collection will be
786           unmodified.
787
788         """
789
790         if target_name in self and not overwrite:
791             raise IOError(errno.EEXIST, "File already exists", target_name)
792
793         modified_from = None
794         if target_name in self:
795             modified_from = self[target_name]
796
797         # Actually make the move or copy.
798         if reparent:
799             source_obj._reparent(self, target_name)
800             item = source_obj
801         else:
802             item = source_obj.clone(self, target_name)
803
804         self._items[target_name] = item
805         self.set_committed(False)
806
807         if modified_from:
808             self.notify(MOD, self, target_name, (modified_from, item))
809         else:
810             self.notify(ADD, self, target_name, item)
811
812     def _get_src_target(self, source, target_path, source_collection, create_dest):
813         if source_collection is None:
814             source_collection = self
815
816         # Find the object
817         if isinstance(source, basestring):
818             source_obj = source_collection.find(source)
819             if source_obj is None:
820                 raise IOError(errno.ENOENT, "File not found", source)
821             sourcecomponents = source.split("/")
822         else:
823             source_obj = source
824             sourcecomponents = None
825
826         # Find parent collection the target path
827         targetcomponents = target_path.split("/")
828
829         # Determine the name to use.
830         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
831
832         if not target_name:
833             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
834
835         if create_dest:
836             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
837         else:
838             if len(targetcomponents) > 1:
839                 target_dir = self.find("/".join(targetcomponents[0:-1]))
840             else:
841                 target_dir = self
842
843         if target_dir is None:
844             raise IOError(errno.ENOENT, "Target directory not found", target_name)
845
846         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
847             target_dir = target_dir[target_name]
848             target_name = sourcecomponents[-1]
849
850         return (source_obj, target_dir, target_name)
851
852     @must_be_writable
853     @synchronized
854     def copy(self, source, target_path, source_collection=None, overwrite=False):
855         """Copy a file or subcollection to a new path in this collection.
856
857         :source:
858           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
859
860         :target_path:
861           Destination file or path.  If the target path already exists and is a
862           subcollection, the item will be placed inside the subcollection.  If
863           the target path already exists and is a file, this will raise an error
864           unless you specify `overwrite=True`.
865
866         :source_collection:
867           Collection to copy `source_path` from (default `self`)
868
869         :overwrite:
870           Whether to overwrite target file if it already exists.
871         """
872
873         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
874         target_dir.add(source_obj, target_name, overwrite, False)
875
876     @must_be_writable
877     @synchronized
878     def rename(self, source, target_path, source_collection=None, overwrite=False):
879         """Move a file or subcollection from `source_collection` to a new path in this collection.
880
881         :source:
882           A string with a path to source file or subcollection.
883
884         :target_path:
885           Destination file or path.  If the target path already exists and is a
886           subcollection, the item will be placed inside the subcollection.  If
887           the target path already exists and is a file, this will raise an error
888           unless you specify `overwrite=True`.
889
890         :source_collection:
891           Collection to copy `source_path` from (default `self`)
892
893         :overwrite:
894           Whether to overwrite target file if it already exists.
895         """
896
897         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
898         if not source_obj.writable():
899             raise IOError(errno.EROFS, "Source collection is read only", source)
900         target_dir.add(source_obj, target_name, overwrite, True)
901
902     def portable_manifest_text(self, stream_name="."):
903         """Get the manifest text for this collection, sub collections and files.
904
905         This method does not flush outstanding blocks to Keep.  It will return
906         a normalized manifest with access tokens stripped.
907
908         :stream_name:
909           Name to use for this stream (directory)
910
911         """
912         return self._get_manifest_text(stream_name, True, True)
913
914     @synchronized
915     def manifest_text(self, stream_name=".", strip=False, normalize=False,
916                       only_committed=False):
917         """Get the manifest text for this collection, sub collections and files.
918
919         This method will flush outstanding blocks to Keep.  By default, it will
920         not normalize an unmodified manifest or strip access tokens.
921
922         :stream_name:
923           Name to use for this stream (directory)
924
925         :strip:
926           If True, remove signing tokens from block locators if present.
927           If False (default), block locators are left unchanged.
928
929         :normalize:
930           If True, always export the manifest text in normalized form
931           even if the Collection is not modified.  If False (default) and the collection
932           is not modified, return the original manifest text even if it is not
933           in normalized form.
934
935         :only_committed:
936           If True, don't commit pending blocks.
937
938         """
939
940         if not only_committed:
941             self._my_block_manager().commit_all()
942         return self._get_manifest_text(stream_name, strip, normalize,
943                                        only_committed=only_committed)
944
945     @synchronized
946     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
947         """Get the manifest text for this collection, sub collections and files.
948
949         :stream_name:
950           Name to use for this stream (directory)
951
952         :strip:
953           If True, remove signing tokens from block locators if present.
954           If False (default), block locators are left unchanged.
955
956         :normalize:
957           If True, always export the manifest text in normalized form
958           even if the Collection is not modified.  If False (default) and the collection
959           is not modified, return the original manifest text even if it is not
960           in normalized form.
961
962         :only_committed:
963           If True, only include blocks that were already committed to Keep.
964
965         """
966
967         if not self.committed() or self._manifest_text is None or normalize:
968             stream = {}
969             buf = []
970             sorted_keys = sorted(self.keys())
971             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
972                 # Create a stream per file `k`
973                 arvfile = self[filename]
974                 filestream = []
975                 for segment in arvfile.segments():
976                     loc = segment.locator
977                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
978                         if only_committed:
979                             continue
980                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
981                     if strip:
982                         loc = KeepLocator(loc).stripped()
983                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
984                                          segment.segment_offset, segment.range_size))
985                 stream[filename] = filestream
986             if stream:
987                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
988             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
989                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
990             return "".join(buf)
991         else:
992             if strip:
993                 return self.stripped_manifest()
994             else:
995                 return self._manifest_text
996
997     @synchronized
998     def diff(self, end_collection, prefix=".", holding_collection=None):
999         """Generate list of add/modify/delete actions.
1000
1001         When given to `apply`, will change `self` to match `end_collection`
1002
1003         """
1004         changes = []
1005         if holding_collection is None:
1006             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1007         for k in self:
1008             if k not in end_collection:
1009                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1010         for k in end_collection:
1011             if k in self:
1012                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1013                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1014                 elif end_collection[k] != self[k]:
1015                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1016                 else:
1017                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1018             else:
1019                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1020         return changes
1021
1022     @must_be_writable
1023     @synchronized
1024     def apply(self, changes):
1025         """Apply changes from `diff`.
1026
1027         If a change conflicts with a local change, it will be saved to an
1028         alternate path indicating the conflict.
1029
1030         """
1031         if changes:
1032             self.set_committed(False)
1033         for change in changes:
1034             event_type = change[0]
1035             path = change[1]
1036             initial = change[2]
1037             local = self.find(path)
1038             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1039                                                                     time.gmtime()))
1040             if event_type == ADD:
1041                 if local is None:
1042                     # No local file at path, safe to copy over new file
1043                     self.copy(initial, path)
1044                 elif local is not None and local != initial:
1045                     # There is already local file and it is different:
1046                     # save change to conflict file.
1047                     self.copy(initial, conflictpath)
1048             elif event_type == MOD or event_type == TOK:
1049                 final = change[3]
1050                 if local == initial:
1051                     # Local matches the "initial" item so it has not
1052                     # changed locally and is safe to update.
1053                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1054                         # Replace contents of local file with new contents
1055                         local.replace_contents(final)
1056                     else:
1057                         # Overwrite path with new item; this can happen if
1058                         # path was a file and is now a collection or vice versa
1059                         self.copy(final, path, overwrite=True)
1060                 else:
1061                     # Local is missing (presumably deleted) or local doesn't
1062                     # match the "start" value, so save change to conflict file
1063                     self.copy(final, conflictpath)
1064             elif event_type == DEL:
1065                 if local == initial:
1066                     # Local item matches "initial" value, so it is safe to remove.
1067                     self.remove(path, recursive=True)
1068                 # else, the file is modified or already removed, in either
1069                 # case we don't want to try to remove it.
1070
1071     def portable_data_hash(self):
1072         """Get the portable data hash for this collection's manifest."""
1073         if self._manifest_locator and self.committed():
1074             # If the collection is already saved on the API server, and it's committed
1075             # then return API server's PDH response.
1076             return self._portable_data_hash
1077         else:
1078             stripped = self.portable_manifest_text()
1079             return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1080
1081     @synchronized
1082     def subscribe(self, callback):
1083         if self._callback is None:
1084             self._callback = callback
1085         else:
1086             raise errors.ArgumentError("A callback is already set on this collection.")
1087
1088     @synchronized
1089     def unsubscribe(self):
1090         if self._callback is not None:
1091             self._callback = None
1092
1093     @synchronized
1094     def notify(self, event, collection, name, item):
1095         if self._callback:
1096             self._callback(event, collection, name, item)
1097         self.root_collection().notify(event, collection, name, item)
1098
1099     @synchronized
1100     def __eq__(self, other):
1101         if other is self:
1102             return True
1103         if not isinstance(other, RichCollectionBase):
1104             return False
1105         if len(self._items) != len(other):
1106             return False
1107         for k in self._items:
1108             if k not in other:
1109                 return False
1110             if self._items[k] != other[k]:
1111                 return False
1112         return True
1113
1114     def __ne__(self, other):
1115         return not self.__eq__(other)
1116
1117     @synchronized
1118     def flush(self):
1119         """Flush bufferblocks to Keep."""
1120         for e in self.values():
1121             e.flush()
1122
1123
1124 class Collection(RichCollectionBase):
1125     """Represents the root of an Arvados Collection.
1126
1127     This class is threadsafe.  The root collection object, all subcollections
1128     and files are protected by a single lock (i.e. each access locks the entire
1129     collection).
1130
1131     Brief summary of
1132     useful methods:
1133
1134     :To read an existing file:
1135       `c.open("myfile", "r")`
1136
1137     :To write a new file:
1138       `c.open("myfile", "w")`
1139
1140     :To determine if a file exists:
1141       `c.find("myfile") is not None`
1142
1143     :To copy a file:
1144       `c.copy("source", "dest")`
1145
1146     :To delete a file:
1147       `c.remove("myfile")`
1148
1149     :To save to an existing collection record:
1150       `c.save()`
1151
1152     :To save a new collection record:
1153     `c.save_new()`
1154
1155     :To merge remote changes into this object:
1156       `c.update()`
1157
1158     Must be associated with an API server Collection record (during
1159     initialization, or using `save_new`) to use `save` or `update`
1160
1161     """
1162
1163     def __init__(self, manifest_locator_or_text=None,
1164                  api_client=None,
1165                  keep_client=None,
1166                  num_retries=None,
1167                  parent=None,
1168                  apiconfig=None,
1169                  block_manager=None,
1170                  replication_desired=None,
1171                  put_threads=None):
1172         """Collection constructor.
1173
1174         :manifest_locator_or_text:
1175           One of Arvados collection UUID, block locator of
1176           a manifest, raw manifest text, or None (to create an empty collection).
1177         :parent:
1178           the parent Collection, may be None.
1179
1180         :apiconfig:
1181           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1182           Prefer this over supplying your own api_client and keep_client (except in testing).
1183           Will use default config settings if not specified.
1184
1185         :api_client:
1186           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1187
1188         :keep_client:
1189           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1190
1191         :num_retries:
1192           the number of retries for API and Keep requests.
1193
1194         :block_manager:
1195           the block manager to use.  If not specified, create one.
1196
1197         :replication_desired:
1198           How many copies should Arvados maintain. If None, API server default
1199           configuration applies. If not None, this value will also be used
1200           for determining the number of block copies being written.
1201
1202         """
1203         super(Collection, self).__init__(parent)
1204         self._api_client = api_client
1205         self._keep_client = keep_client
1206         self._block_manager = block_manager
1207         self.replication_desired = replication_desired
1208         self.put_threads = put_threads
1209
1210         if apiconfig:
1211             self._config = apiconfig
1212         else:
1213             self._config = config.settings()
1214
1215         self.num_retries = num_retries if num_retries is not None else 0
1216         self._manifest_locator = None
1217         self._manifest_text = None
1218         self._portable_data_hash = None
1219         self._api_response = None
1220         self._past_versions = set()
1221
1222         self.lock = threading.RLock()
1223         self.events = None
1224
1225         if manifest_locator_or_text:
1226             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1227                 self._manifest_locator = manifest_locator_or_text
1228             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1229                 self._manifest_locator = manifest_locator_or_text
1230             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1231                 self._manifest_text = manifest_locator_or_text
1232             else:
1233                 raise errors.ArgumentError(
1234                     "Argument to CollectionReader is not a manifest or a collection UUID")
1235
1236             try:
1237                 self._populate()
1238             except (IOError, errors.SyntaxError) as e:
1239                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1240
1241     def root_collection(self):
1242         return self
1243
1244     def stream_name(self):
1245         return "."
1246
1247     def writable(self):
1248         return True
1249
1250     @synchronized
1251     def known_past_version(self, modified_at_and_portable_data_hash):
1252         return modified_at_and_portable_data_hash in self._past_versions
1253
1254     @synchronized
1255     @retry_method
1256     def update(self, other=None, num_retries=None):
1257         """Merge the latest collection on the API server with the current collection."""
1258
1259         if other is None:
1260             if self._manifest_locator is None:
1261                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1262             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1263             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1264                 response.get("portable_data_hash") != self.portable_data_hash()):
1265                 # The record on the server is different from our current one, but we've seen it before,
1266                 # so ignore it because it's already been merged.
1267                 # However, if it's the same as our current record, proceed with the update, because we want to update
1268                 # our tokens.
1269                 return
1270             else:
1271                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1272             other = CollectionReader(response["manifest_text"])
1273         baseline = CollectionReader(self._manifest_text)
1274         self.apply(baseline.diff(other))
1275         self._manifest_text = self.manifest_text()
1276
1277     @synchronized
1278     def _my_api(self):
1279         if self._api_client is None:
1280             self._api_client = ThreadSafeApiCache(self._config)
1281             if self._keep_client is None:
1282                 self._keep_client = self._api_client.keep
1283         return self._api_client
1284
1285     @synchronized
1286     def _my_keep(self):
1287         if self._keep_client is None:
1288             if self._api_client is None:
1289                 self._my_api()
1290             else:
1291                 self._keep_client = KeepClient(api_client=self._api_client)
1292         return self._keep_client
1293
1294     @synchronized
1295     def _my_block_manager(self):
1296         if self._block_manager is None:
1297             copies = (self.replication_desired or
1298                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1299                                                    2))
1300             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1301         return self._block_manager
1302
1303     def _remember_api_response(self, response):
1304         self._api_response = response
1305         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1306
1307     def _populate_from_api_server(self):
1308         # As in KeepClient itself, we must wait until the last
1309         # possible moment to instantiate an API client, in order to
1310         # avoid tripping up clients that don't have access to an API
1311         # server.  If we do build one, make sure our Keep client uses
1312         # it.  If instantiation fails, we'll fall back to the except
1313         # clause, just like any other Collection lookup
1314         # failure. Return an exception, or None if successful.
1315         try:
1316             self._remember_api_response(self._my_api().collections().get(
1317                 uuid=self._manifest_locator).execute(
1318                     num_retries=self.num_retries))
1319             self._manifest_text = self._api_response['manifest_text']
1320             self._portable_data_hash = self._api_response['portable_data_hash']
1321             # If not overriden via kwargs, we should try to load the
1322             # replication_desired from the API server
1323             if self.replication_desired is None:
1324                 self.replication_desired = self._api_response.get('replication_desired', None)
1325             return None
1326         except Exception as e:
1327             return e
1328
1329     def _populate_from_keep(self):
1330         # Retrieve a manifest directly from Keep. This has a chance of
1331         # working if [a] the locator includes a permission signature
1332         # or [b] the Keep services are operating in world-readable
1333         # mode. Return an exception, or None if successful.
1334         try:
1335             self._manifest_text = self._my_keep().get(
1336                 self._manifest_locator, num_retries=self.num_retries)
1337         except Exception as e:
1338             return e
1339
1340     def _populate(self):
1341         if self._manifest_locator is None and self._manifest_text is None:
1342             return
1343         error_via_api = None
1344         error_via_keep = None
1345         should_try_keep = ((self._manifest_text is None) and
1346                            arvados.util.keep_locator_pattern.match(
1347                                self._manifest_locator))
1348         if ((self._manifest_text is None) and
1349             arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1350             error_via_keep = self._populate_from_keep()
1351         if self._manifest_text is None:
1352             error_via_api = self._populate_from_api_server()
1353             if error_via_api is not None and not should_try_keep:
1354                 raise error_via_api
1355         if ((self._manifest_text is None) and
1356             not error_via_keep and
1357             should_try_keep):
1358             # Looks like a keep locator, and we didn't already try keep above
1359             error_via_keep = self._populate_from_keep()
1360         if self._manifest_text is None:
1361             # Nothing worked!
1362             raise errors.NotFoundError(
1363                 ("Failed to retrieve collection '{}' " +
1364                  "from either API server ({}) or Keep ({})."
1365                  ).format(
1366                     self._manifest_locator,
1367                     error_via_api,
1368                     error_via_keep))
1369         # populate
1370         self._baseline_manifest = self._manifest_text
1371         self._import_manifest(self._manifest_text)
1372
1373
1374     def _has_collection_uuid(self):
1375         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1376
1377     def __enter__(self):
1378         return self
1379
1380     def __exit__(self, exc_type, exc_value, traceback):
1381         """Support scoped auto-commit in a with: block."""
1382         if exc_type is None:
1383             if self.writable() and self._has_collection_uuid():
1384                 self.save()
1385         self.stop_threads()
1386
1387     def stop_threads(self):
1388         if self._block_manager is not None:
1389             self._block_manager.stop_threads()
1390
1391     @synchronized
1392     def manifest_locator(self):
1393         """Get the manifest locator, if any.
1394
1395         The manifest locator will be set when the collection is loaded from an
1396         API server record or the portable data hash of a manifest.
1397
1398         The manifest locator will be None if the collection is newly created or
1399         was created directly from manifest text.  The method `save_new()` will
1400         assign a manifest locator.
1401
1402         """
1403         return self._manifest_locator
1404
1405     @synchronized
1406     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1407         if new_config is None:
1408             new_config = self._config
1409         if readonly:
1410             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1411         else:
1412             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1413
1414         newcollection._clonefrom(self)
1415         return newcollection
1416
1417     @synchronized
1418     def api_response(self):
1419         """Returns information about this Collection fetched from the API server.
1420
1421         If the Collection exists in Keep but not the API server, currently
1422         returns None.  Future versions may provide a synthetic response.
1423
1424         """
1425         return self._api_response
1426
1427     def find_or_create(self, path, create_type):
1428         """See `RichCollectionBase.find_or_create`"""
1429         if path == ".":
1430             return self
1431         else:
1432             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1433
1434     def find(self, path):
1435         """See `RichCollectionBase.find`"""
1436         if path == ".":
1437             return self
1438         else:
1439             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1440
1441     def remove(self, path, recursive=False):
1442         """See `RichCollectionBase.remove`"""
1443         if path == ".":
1444             raise errors.ArgumentError("Cannot remove '.'")
1445         else:
1446             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1447
1448     @must_be_writable
1449     @synchronized
1450     @retry_method
1451     def save(self, merge=True, num_retries=None):
1452         """Save collection to an existing collection record.
1453
1454         Commit pending buffer blocks to Keep, merge with remote record (if
1455         merge=True, the default), and update the collection record.  Returns
1456         the current manifest text.
1457
1458         Will raise AssertionError if not associated with a collection record on
1459         the API server.  If you want to save a manifest to Keep only, see
1460         `save_new()`.
1461
1462         :merge:
1463           Update and merge remote changes before saving.  Otherwise, any
1464           remote changes will be ignored and overwritten.
1465
1466         :num_retries:
1467           Retry count on API calls (if None,  use the collection default)
1468
1469         """
1470         if not self.committed():
1471             if not self._has_collection_uuid():
1472                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1473
1474             self._my_block_manager().commit_all()
1475
1476             if merge:
1477                 self.update()
1478
1479             text = self.manifest_text(strip=False)
1480             self._remember_api_response(self._my_api().collections().update(
1481                 uuid=self._manifest_locator,
1482                 body={'manifest_text': text}
1483                 ).execute(
1484                     num_retries=num_retries))
1485             self._manifest_text = self._api_response["manifest_text"]
1486             self._portable_data_hash = self._api_response["portable_data_hash"]
1487             self.set_committed(True)
1488
1489         return self._manifest_text
1490
1491
1492     @must_be_writable
1493     @synchronized
1494     @retry_method
1495     def save_new(self, name=None,
1496                  create_collection_record=True,
1497                  owner_uuid=None,
1498                  ensure_unique_name=False,
1499                  num_retries=None):
1500         """Save collection to a new collection record.
1501
1502         Commit pending buffer blocks to Keep and, when create_collection_record
1503         is True (default), create a new collection record.  After creating a
1504         new collection record, this Collection object will be associated with
1505         the new record used by `save()`.  Returns the current manifest text.
1506
1507         :name:
1508           The collection name.
1509
1510         :create_collection_record:
1511            If True, create a collection record on the API server.
1512            If False, only commit blocks to Keep and return the manifest text.
1513
1514         :owner_uuid:
1515           the user, or project uuid that will own this collection.
1516           If None, defaults to the current user.
1517
1518         :ensure_unique_name:
1519           If True, ask the API server to rename the collection
1520           if it conflicts with a collection with the same name and owner.  If
1521           False, a name conflict will result in an error.
1522
1523         :num_retries:
1524           Retry count on API calls (if None,  use the collection default)
1525
1526         """
1527         self._my_block_manager().commit_all()
1528         text = self.manifest_text(strip=False)
1529
1530         if create_collection_record:
1531             if name is None:
1532                 name = "New collection"
1533                 ensure_unique_name = True
1534
1535             body = {"manifest_text": text,
1536                     "name": name,
1537                     "replication_desired": self.replication_desired}
1538             if owner_uuid:
1539                 body["owner_uuid"] = owner_uuid
1540
1541             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1542             text = self._api_response["manifest_text"]
1543
1544             self._manifest_locator = self._api_response["uuid"]
1545             self._portable_data_hash = self._api_response["portable_data_hash"]
1546
1547             self._manifest_text = text
1548             self.set_committed(True)
1549
1550         return text
1551
1552     @synchronized
1553     def _import_manifest(self, manifest_text):
1554         """Import a manifest into a `Collection`.
1555
1556         :manifest_text:
1557           The manifest text to import from.
1558
1559         """
1560         if len(self) > 0:
1561             raise ArgumentError("Can only import manifest into an empty collection")
1562
1563         STREAM_NAME = 0
1564         BLOCKS = 1
1565         SEGMENTS = 2
1566
1567         stream_name = None
1568         state = STREAM_NAME
1569
1570         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1571             tok = token_and_separator.group(1)
1572             sep = token_and_separator.group(2)
1573
1574             if state == STREAM_NAME:
1575                 # starting a new stream
1576                 stream_name = tok.replace('\\040', ' ')
1577                 blocks = []
1578                 segments = []
1579                 streamoffset = 0
1580                 state = BLOCKS
1581                 self.find_or_create(stream_name, COLLECTION)
1582                 continue
1583
1584             if state == BLOCKS:
1585                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1586                 if block_locator:
1587                     blocksize = long(block_locator.group(1))
1588                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1589                     streamoffset += blocksize
1590                 else:
1591                     state = SEGMENTS
1592
1593             if state == SEGMENTS:
1594                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1595                 if file_segment:
1596                     pos = long(file_segment.group(1))
1597                     size = long(file_segment.group(2))
1598                     name = file_segment.group(3).replace('\\040', ' ')
1599                     filepath = os.path.join(stream_name, name)
1600                     afile = self.find_or_create(filepath, FILE)
1601                     if isinstance(afile, ArvadosFile):
1602                         afile.add_segment(blocks, pos, size)
1603                     else:
1604                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1605                 else:
1606                     # error!
1607                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1608
1609             if sep == "\n":
1610                 stream_name = None
1611                 state = STREAM_NAME
1612
1613         self.set_committed(True)
1614
1615     @synchronized
1616     def notify(self, event, collection, name, item):
1617         if self._callback:
1618             self._callback(event, collection, name, item)
1619
1620
1621 class Subcollection(RichCollectionBase):
1622     """This is a subdirectory within a collection that doesn't have its own API
1623     server record.
1624
1625     Subcollection locking falls under the umbrella lock of its root collection.
1626
1627     """
1628
1629     def __init__(self, parent, name):
1630         super(Subcollection, self).__init__(parent)
1631         self.lock = self.root_collection().lock
1632         self._manifest_text = None
1633         self.name = name
1634         self.num_retries = parent.num_retries
1635
1636     def root_collection(self):
1637         return self.parent.root_collection()
1638
1639     def writable(self):
1640         return self.root_collection().writable()
1641
1642     def _my_api(self):
1643         return self.root_collection()._my_api()
1644
1645     def _my_keep(self):
1646         return self.root_collection()._my_keep()
1647
1648     def _my_block_manager(self):
1649         return self.root_collection()._my_block_manager()
1650
1651     def stream_name(self):
1652         return os.path.join(self.parent.stream_name(), self.name)
1653
1654     @synchronized
1655     def clone(self, new_parent, new_name):
1656         c = Subcollection(new_parent, new_name)
1657         c._clonefrom(self)
1658         return c
1659
1660     @must_be_writable
1661     @synchronized
1662     def _reparent(self, newparent, newname):
1663         self.set_committed(False)
1664         self.flush()
1665         self.parent.remove(self.name, recursive=True)
1666         self.parent = newparent
1667         self.name = newname
1668         self.lock = self.parent.root_collection().lock
1669
1670
1671 class CollectionReader(Collection):
1672     """A read-only collection object.
1673
1674     Initialize from an api collection record locator, a portable data hash of a
1675     manifest, or raw manifest text.  See `Collection` constructor for detailed
1676     options.
1677
1678     """
1679     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1680         self._in_init = True
1681         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1682         self._in_init = False
1683
1684         # Forego any locking since it should never change once initialized.
1685         self.lock = NoopLock()
1686
1687         # Backwards compatability with old CollectionReader
1688         # all_streams() and all_files()
1689         self._streams = None
1690
1691     def writable(self):
1692         return self._in_init
1693
1694     def _populate_streams(orig_func):
1695         @functools.wraps(orig_func)
1696         def populate_streams_wrapper(self, *args, **kwargs):
1697             # Defer populating self._streams until needed since it creates a copy of the manifest.
1698             if self._streams is None:
1699                 if self._manifest_text:
1700                     self._streams = [sline.split()
1701                                      for sline in self._manifest_text.split("\n")
1702                                      if sline]
1703                 else:
1704                     self._streams = []
1705             return orig_func(self, *args, **kwargs)
1706         return populate_streams_wrapper
1707
1708     @_populate_streams
1709     def normalize(self):
1710         """Normalize the streams returned by `all_streams`.
1711
1712         This method is kept for backwards compatability and only affects the
1713         behavior of `all_streams()` and `all_files()`
1714
1715         """
1716
1717         # Rearrange streams
1718         streams = {}
1719         for s in self.all_streams():
1720             for f in s.all_files():
1721                 streamname, filename = split(s.name() + "/" + f.name())
1722                 if streamname not in streams:
1723                     streams[streamname] = {}
1724                 if filename not in streams[streamname]:
1725                     streams[streamname][filename] = []
1726                 for r in f.segments:
1727                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1728
1729         self._streams = [normalize_stream(s, streams[s])
1730                          for s in sorted(streams)]
1731     @_populate_streams
1732     def all_streams(self):
1733         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1734                 for s in self._streams]
1735
1736     @_populate_streams
1737     def all_files(self):
1738         for s in self.all_streams():
1739             for f in s.all_files():
1740                 yield f