10813: Bug fixed on BlockManager's _get_manifest_text() when asked for only include...
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace: '%s'" %
311                 (newstreamname))
312         self._current_stream_name = '.' if newstreamname=='' else newstreamname
313
314     def current_stream_name(self):
315         return self._current_stream_name
316
317     def finish_current_stream(self):
318         self.finish_current_file()
319         self.flush_data()
320         if not self._current_stream_files:
321             pass
322         elif self._current_stream_name is None:
323             raise errors.AssertionError(
324                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
325                 (self._current_stream_length, len(self._current_stream_files)))
326         else:
327             if not self._current_stream_locators:
328                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
329             self._finished_streams.append([self._current_stream_name,
330                                            self._current_stream_locators,
331                                            self._current_stream_files])
332         self._current_stream_files = []
333         self._current_stream_length = 0
334         self._current_stream_locators = []
335         self._current_stream_name = None
336         self._current_file_pos = 0
337         self._current_file_name = None
338
339     def finish(self):
340         """Store the manifest in Keep and return its locator.
341
342         This is useful for storing manifest fragments (task outputs)
343         temporarily in Keep during a Crunch job.
344
345         In other cases you should make a collection instead, by
346         sending manifest_text() to the API server's "create
347         collection" endpoint.
348         """
349         return self._my_keep().put(self.manifest_text(), copies=self.replication)
350
351     def portable_data_hash(self):
352         stripped = self.stripped_manifest()
353         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354
355     def manifest_text(self):
356         self.finish_current_stream()
357         manifest = ''
358
359         for stream in self._finished_streams:
360             if not re.search(r'^\.(/.*)?$', stream[0]):
361                 manifest += './'
362             manifest += stream[0].replace(' ', '\\040')
363             manifest += ' ' + ' '.join(stream[1])
364             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
365             manifest += "\n"
366
367         return manifest
368
369     def data_locators(self):
370         ret = []
371         for name, locators, files in self._finished_streams:
372             ret += locators
373         return ret
374
375
376 class ResumableCollectionWriter(CollectionWriter):
377     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378                    '_current_stream_locators', '_current_stream_name',
379                    '_current_file_name', '_current_file_pos', '_close_file',
380                    '_data_buffer', '_dependencies', '_finished_streams',
381                    '_queued_dirents', '_queued_trees']
382
383     def __init__(self, api_client=None, **kwargs):
384         self._dependencies = {}
385         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
386
387     @classmethod
388     def from_state(cls, state, *init_args, **init_kwargs):
389         # Try to build a new writer from scratch with the given state.
390         # If the state is not suitable to resume (because files have changed,
391         # been deleted, aren't predictable, etc.), raise a
392         # StaleWriterStateError.  Otherwise, return the initialized writer.
393         # The caller is responsible for calling writer.do_queued_work()
394         # appropriately after it's returned.
395         writer = cls(*init_args, **init_kwargs)
396         for attr_name in cls.STATE_PROPS:
397             attr_value = state[attr_name]
398             attr_class = getattr(writer, attr_name).__class__
399             # Coerce the value into the same type as the initial value, if
400             # needed.
401             if attr_class not in (type(None), attr_value.__class__):
402                 attr_value = attr_class(attr_value)
403             setattr(writer, attr_name, attr_value)
404         # Check dependencies before we try to resume anything.
405         if any(KeepLocator(ls).permission_expired()
406                for ls in writer._current_stream_locators):
407             raise errors.StaleWriterStateError(
408                 "locators include expired permission hint")
409         writer.check_dependencies()
410         if state['_current_file'] is not None:
411             path, pos = state['_current_file']
412             try:
413                 writer._queued_file = open(path, 'rb')
414                 writer._queued_file.seek(pos)
415             except IOError as error:
416                 raise errors.StaleWriterStateError(
417                     "failed to reopen active file {}: {}".format(path, error))
418         return writer
419
420     def check_dependencies(self):
421         for path, orig_stat in self._dependencies.items():
422             if not S_ISREG(orig_stat[ST_MODE]):
423                 raise errors.StaleWriterStateError("{} not file".format(path))
424             try:
425                 now_stat = tuple(os.stat(path))
426             except OSError as error:
427                 raise errors.StaleWriterStateError(
428                     "failed to stat {}: {}".format(path, error))
429             if ((not S_ISREG(now_stat[ST_MODE])) or
430                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
431                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
432                 raise errors.StaleWriterStateError("{} changed".format(path))
433
434     def dump_state(self, copy_func=lambda x: x):
435         state = {attr: copy_func(getattr(self, attr))
436                  for attr in self.STATE_PROPS}
437         if self._queued_file is None:
438             state['_current_file'] = None
439         else:
440             state['_current_file'] = (os.path.realpath(self._queued_file.name),
441                                       self._queued_file.tell())
442         return state
443
444     def _queue_file(self, source, filename=None):
445         try:
446             src_path = os.path.realpath(source)
447         except Exception:
448             raise errors.AssertionError("{} not a file path".format(source))
449         try:
450             path_stat = os.stat(src_path)
451         except OSError as stat_error:
452             path_stat = None
453         super(ResumableCollectionWriter, self)._queue_file(source, filename)
454         fd_stat = os.fstat(self._queued_file.fileno())
455         if not S_ISREG(fd_stat.st_mode):
456             # We won't be able to resume from this cache anyway, so don't
457             # worry about further checks.
458             self._dependencies[source] = tuple(fd_stat)
459         elif path_stat is None:
460             raise errors.AssertionError(
461                 "could not stat {}: {}".format(source, stat_error))
462         elif path_stat.st_ino != fd_stat.st_ino:
463             raise errors.AssertionError(
464                 "{} changed between open and stat calls".format(source))
465         else:
466             self._dependencies[src_path] = tuple(fd_stat)
467
468     def write(self, data):
469         if self._queued_file is None:
470             raise errors.AssertionError(
471                 "resumable writer can't accept unsourced data")
472         return super(ResumableCollectionWriter, self).write(data)
473
474
475 ADD = "add"
476 DEL = "del"
477 MOD = "mod"
478 TOK = "tok"
479 FILE = "file"
480 COLLECTION = "collection"
481
482 class RichCollectionBase(CollectionBase):
483     """Base class for Collections and Subcollections.
484
485     Implements the majority of functionality relating to accessing items in the
486     Collection.
487
488     """
489
490     def __init__(self, parent=None):
491         self.parent = parent
492         self._committed = False
493         self._callback = None
494         self._items = {}
495
496     def _my_api(self):
497         raise NotImplementedError()
498
499     def _my_keep(self):
500         raise NotImplementedError()
501
502     def _my_block_manager(self):
503         raise NotImplementedError()
504
505     def writable(self):
506         raise NotImplementedError()
507
508     def root_collection(self):
509         raise NotImplementedError()
510
511     def notify(self, event, collection, name, item):
512         raise NotImplementedError()
513
514     def stream_name(self):
515         raise NotImplementedError()
516
517     @must_be_writable
518     @synchronized
519     def find_or_create(self, path, create_type):
520         """Recursively search the specified file path.
521
522         May return either a `Collection` or `ArvadosFile`.  If not found, will
523         create a new item at the specified path based on `create_type`.  Will
524         create intermediate subcollections needed to contain the final item in
525         the path.
526
527         :create_type:
528           One of `arvados.collection.FILE` or
529           `arvados.collection.COLLECTION`.  If the path is not found, and value
530           of create_type is FILE then create and return a new ArvadosFile for
531           the last path component.  If COLLECTION, then create and return a new
532           Collection for the last path component.
533
534         """
535
536         pathcomponents = path.split("/", 1)
537         if pathcomponents[0]:
538             item = self._items.get(pathcomponents[0])
539             if len(pathcomponents) == 1:
540                 if item is None:
541                     # create new file
542                     if create_type == COLLECTION:
543                         item = Subcollection(self, pathcomponents[0])
544                     else:
545                         item = ArvadosFile(self, pathcomponents[0])
546                     self._items[pathcomponents[0]] = item
547                     self._committed = False
548                     self.notify(ADD, self, pathcomponents[0], item)
549                 return item
550             else:
551                 if item is None:
552                     # create new collection
553                     item = Subcollection(self, pathcomponents[0])
554                     self._items[pathcomponents[0]] = item
555                     self._committed = False
556                     self.notify(ADD, self, pathcomponents[0], item)
557                 if isinstance(item, RichCollectionBase):
558                     return item.find_or_create(pathcomponents[1], create_type)
559                 else:
560                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
561         else:
562             return self
563
564     @synchronized
565     def find(self, path):
566         """Recursively search the specified file path.
567
568         May return either a Collection or ArvadosFile. Return None if not
569         found.
570         If path is invalid (ex: starts with '/'), an IOError exception will be
571         raised.
572
573         """
574         if not path:
575             raise errors.ArgumentError("Parameter 'path' is empty.")
576
577         pathcomponents = path.split("/", 1)
578         if pathcomponents[0] == '':
579             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
580
581         item = self._items.get(pathcomponents[0])
582         if item is None:
583             return None
584         elif len(pathcomponents) == 1:
585             return item
586         else:
587             if isinstance(item, RichCollectionBase):
588                 if pathcomponents[1]:
589                     return item.find(pathcomponents[1])
590                 else:
591                     return item
592             else:
593                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
594
595     @synchronized
596     def mkdirs(self, path):
597         """Recursive subcollection create.
598
599         Like `os.makedirs()`.  Will create intermediate subcollections needed
600         to contain the leaf subcollection path.
601
602         """
603
604         if self.find(path) != None:
605             raise IOError(errno.EEXIST, "Directory or file exists", path)
606
607         return self.find_or_create(path, COLLECTION)
608
609     def open(self, path, mode="r"):
610         """Open a file-like object for access.
611
612         :path:
613           path to a file in the collection
614         :mode:
615           one of "r", "r+", "w", "w+", "a", "a+"
616           :"r":
617             opens for reading
618           :"r+":
619             opens for reading and writing.  Reads/writes share a file pointer.
620           :"w", "w+":
621             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
622           :"a", "a+":
623             opens for reading and writing.  All writes are appended to
624             the end of the file.  Writing does not affect the file pointer for
625             reading.
626         """
627         mode = mode.replace("b", "")
628         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
629             raise errors.ArgumentError("Bad mode '%s'" % mode)
630         create = (mode != "r")
631
632         if create and not self.writable():
633             raise IOError(errno.EROFS, "Collection is read only")
634
635         if create:
636             arvfile = self.find_or_create(path, FILE)
637         else:
638             arvfile = self.find(path)
639
640         if arvfile is None:
641             raise IOError(errno.ENOENT, "File not found", path)
642         if not isinstance(arvfile, ArvadosFile):
643             raise IOError(errno.EISDIR, "Is a directory", path)
644
645         if mode[0] == "w":
646             arvfile.truncate(0)
647
648         name = os.path.basename(path)
649
650         if mode == "r":
651             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
652         else:
653             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
654
655     def modified(self):
656         """Determine if the collection has been modified since last commited."""
657         return not self.committed()
658
659     @synchronized
660     def committed(self):
661         """Determine if the collection has been committed to the API server."""
662
663         if self._committed is False:
664             return False
665         for v in self._items.values():
666             if v.committed() is False:
667                 return False
668         return True
669
670     @synchronized
671     def set_committed(self):
672         """Recursively set committed flag to True."""
673         self._committed = True
674         for k,v in self._items.items():
675             v.set_committed()
676
677     @synchronized
678     def __iter__(self):
679         """Iterate over names of files and collections contained in this collection."""
680         return iter(self._items.keys())
681
682     @synchronized
683     def __getitem__(self, k):
684         """Get a file or collection that is directly contained by this collection.
685
686         If you want to search a path, use `find()` instead.
687
688         """
689         return self._items[k]
690
691     @synchronized
692     def __contains__(self, k):
693         """Test if there is a file or collection a directly contained by this collection."""
694         return k in self._items
695
696     @synchronized
697     def __len__(self):
698         """Get the number of items directly contained in this collection."""
699         return len(self._items)
700
701     @must_be_writable
702     @synchronized
703     def __delitem__(self, p):
704         """Delete an item by name which is directly contained by this collection."""
705         del self._items[p]
706         self._committed = False
707         self.notify(DEL, self, p, None)
708
709     @synchronized
710     def keys(self):
711         """Get a list of names of files and collections directly contained in this collection."""
712         return self._items.keys()
713
714     @synchronized
715     def values(self):
716         """Get a list of files and collection objects directly contained in this collection."""
717         return self._items.values()
718
719     @synchronized
720     def items(self):
721         """Get a list of (name, object) tuples directly contained in this collection."""
722         return self._items.items()
723
724     def exists(self, path):
725         """Test if there is a file or collection at `path`."""
726         return self.find(path) is not None
727
728     @must_be_writable
729     @synchronized
730     def remove(self, path, recursive=False):
731         """Remove the file or subcollection (directory) at `path`.
732
733         :recursive:
734           Specify whether to remove non-empty subcollections (True), or raise an error (False).
735         """
736
737         if not path:
738             raise errors.ArgumentError("Parameter 'path' is empty.")
739
740         pathcomponents = path.split("/", 1)
741         item = self._items.get(pathcomponents[0])
742         if item is None:
743             raise IOError(errno.ENOENT, "File not found", path)
744         if len(pathcomponents) == 1:
745             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
746                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
747             deleteditem = self._items[pathcomponents[0]]
748             del self._items[pathcomponents[0]]
749             self._committed = False
750             self.notify(DEL, self, pathcomponents[0], deleteditem)
751         else:
752             item.remove(pathcomponents[1])
753
754     def _clonefrom(self, source):
755         for k,v in source.items():
756             self._items[k] = v.clone(self, k)
757
758     def clone(self):
759         raise NotImplementedError()
760
761     @must_be_writable
762     @synchronized
763     def add(self, source_obj, target_name, overwrite=False, reparent=False):
764         """Copy or move a file or subcollection to this collection.
765
766         :source_obj:
767           An ArvadosFile, or Subcollection object
768
769         :target_name:
770           Destination item name.  If the target name already exists and is a
771           file, this will raise an error unless you specify `overwrite=True`.
772
773         :overwrite:
774           Whether to overwrite target file if it already exists.
775
776         :reparent:
777           If True, source_obj will be moved from its parent collection to this collection.
778           If False, source_obj will be copied and the parent collection will be
779           unmodified.
780
781         """
782
783         if target_name in self and not overwrite:
784             raise IOError(errno.EEXIST, "File already exists", target_name)
785
786         modified_from = None
787         if target_name in self:
788             modified_from = self[target_name]
789
790         # Actually make the move or copy.
791         if reparent:
792             source_obj._reparent(self, target_name)
793             item = source_obj
794         else:
795             item = source_obj.clone(self, target_name)
796
797         self._items[target_name] = item
798         self._committed = False
799
800         if modified_from:
801             self.notify(MOD, self, target_name, (modified_from, item))
802         else:
803             self.notify(ADD, self, target_name, item)
804
805     def _get_src_target(self, source, target_path, source_collection, create_dest):
806         if source_collection is None:
807             source_collection = self
808
809         # Find the object
810         if isinstance(source, basestring):
811             source_obj = source_collection.find(source)
812             if source_obj is None:
813                 raise IOError(errno.ENOENT, "File not found", source)
814             sourcecomponents = source.split("/")
815         else:
816             source_obj = source
817             sourcecomponents = None
818
819         # Find parent collection the target path
820         targetcomponents = target_path.split("/")
821
822         # Determine the name to use.
823         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
824
825         if not target_name:
826             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
827
828         if create_dest:
829             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
830         else:
831             if len(targetcomponents) > 1:
832                 target_dir = self.find("/".join(targetcomponents[0:-1]))
833             else:
834                 target_dir = self
835
836         if target_dir is None:
837             raise IOError(errno.ENOENT, "Target directory not found", target_name)
838
839         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
840             target_dir = target_dir[target_name]
841             target_name = sourcecomponents[-1]
842
843         return (source_obj, target_dir, target_name)
844
845     @must_be_writable
846     @synchronized
847     def copy(self, source, target_path, source_collection=None, overwrite=False):
848         """Copy a file or subcollection to a new path in this collection.
849
850         :source:
851           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
852
853         :target_path:
854           Destination file or path.  If the target path already exists and is a
855           subcollection, the item will be placed inside the subcollection.  If
856           the target path already exists and is a file, this will raise an error
857           unless you specify `overwrite=True`.
858
859         :source_collection:
860           Collection to copy `source_path` from (default `self`)
861
862         :overwrite:
863           Whether to overwrite target file if it already exists.
864         """
865
866         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
867         target_dir.add(source_obj, target_name, overwrite, False)
868
869     @must_be_writable
870     @synchronized
871     def rename(self, source, target_path, source_collection=None, overwrite=False):
872         """Move a file or subcollection from `source_collection` to a new path in this collection.
873
874         :source:
875           A string with a path to source file or subcollection.
876
877         :target_path:
878           Destination file or path.  If the target path already exists and is a
879           subcollection, the item will be placed inside the subcollection.  If
880           the target path already exists and is a file, this will raise an error
881           unless you specify `overwrite=True`.
882
883         :source_collection:
884           Collection to copy `source_path` from (default `self`)
885
886         :overwrite:
887           Whether to overwrite target file if it already exists.
888         """
889
890         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
891         if not source_obj.writable():
892             raise IOError(errno.EROFS, "Source collection is read only", source)
893         target_dir.add(source_obj, target_name, overwrite, True)
894
895     def portable_manifest_text(self, stream_name="."):
896         """Get the manifest text for this collection, sub collections and files.
897
898         This method does not flush outstanding blocks to Keep.  It will return
899         a normalized manifest with access tokens stripped.
900
901         :stream_name:
902           Name to use for this stream (directory)
903
904         """
905         return self._get_manifest_text(stream_name, True, True)
906
907     @synchronized
908     def manifest_text(self, stream_name=".", strip=False, normalize=False):
909         """Get the manifest text for this collection, sub collections and files.
910
911         This method will flush outstanding blocks to Keep.  By default, it will
912         not normalize an unmodified manifest or strip access tokens.
913
914         :stream_name:
915           Name to use for this stream (directory)
916
917         :strip:
918           If True, remove signing tokens from block locators if present.
919           If False (default), block locators are left unchanged.
920
921         :normalize:
922           If True, always export the manifest text in normalized form
923           even if the Collection is not modified.  If False (default) and the collection
924           is not modified, return the original manifest text even if it is not
925           in normalized form.
926
927         """
928
929         self._my_block_manager().commit_all()
930         return self._get_manifest_text(stream_name, strip, normalize)
931
932     @synchronized
933     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
934         """Get the manifest text for this collection, sub collections and files.
935
936         :stream_name:
937           Name to use for this stream (directory)
938
939         :strip:
940           If True, remove signing tokens from block locators if present.
941           If False (default), block locators are left unchanged.
942
943         :normalize:
944           If True, always export the manifest text in normalized form
945           even if the Collection is not modified.  If False (default) and the collection
946           is not modified, return the original manifest text even if it is not
947           in normalized form.
948
949         :only_committed:
950           If True, only include blocks that were already committed to Keep.
951
952         """
953
954         if not self.committed() or self._manifest_text is None or normalize:
955             stream = {}
956             buf = []
957             sorted_keys = sorted(self.keys())
958             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
959                 # Create a stream per file `k`
960                 arvfile = self[filename]
961                 filestream = []
962                 for segment in arvfile.segments():
963                     loc = segment.locator
964                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
965                         if only_committed:
966                             continue
967                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
968                     if strip:
969                         loc = KeepLocator(loc).stripped()
970                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
971                                          segment.segment_offset, segment.range_size))
972                 stream[filename] = filestream
973             if stream:
974                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
975             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
976                 if only_committed:
977                     buf.append(self[dirname]._get_manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
978                 else:
979                     buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
980             return "".join(buf)
981         else:
982             if strip:
983                 return self.stripped_manifest()
984             else:
985                 return self._manifest_text
986
987     @synchronized
988     def diff(self, end_collection, prefix=".", holding_collection=None):
989         """Generate list of add/modify/delete actions.
990
991         When given to `apply`, will change `self` to match `end_collection`
992
993         """
994         changes = []
995         if holding_collection is None:
996             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
997         for k in self:
998             if k not in end_collection:
999                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1000         for k in end_collection:
1001             if k in self:
1002                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1003                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1004                 elif end_collection[k] != self[k]:
1005                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1006                 else:
1007                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1008             else:
1009                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1010         return changes
1011
1012     @must_be_writable
1013     @synchronized
1014     def apply(self, changes):
1015         """Apply changes from `diff`.
1016
1017         If a change conflicts with a local change, it will be saved to an
1018         alternate path indicating the conflict.
1019
1020         """
1021         if changes:
1022             self._committed = False
1023         for change in changes:
1024             event_type = change[0]
1025             path = change[1]
1026             initial = change[2]
1027             local = self.find(path)
1028             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1029                                                                     time.gmtime()))
1030             if event_type == ADD:
1031                 if local is None:
1032                     # No local file at path, safe to copy over new file
1033                     self.copy(initial, path)
1034                 elif local is not None and local != initial:
1035                     # There is already local file and it is different:
1036                     # save change to conflict file.
1037                     self.copy(initial, conflictpath)
1038             elif event_type == MOD or event_type == TOK:
1039                 final = change[3]
1040                 if local == initial:
1041                     # Local matches the "initial" item so it has not
1042                     # changed locally and is safe to update.
1043                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1044                         # Replace contents of local file with new contents
1045                         local.replace_contents(final)
1046                     else:
1047                         # Overwrite path with new item; this can happen if
1048                         # path was a file and is now a collection or vice versa
1049                         self.copy(final, path, overwrite=True)
1050                 else:
1051                     # Local is missing (presumably deleted) or local doesn't
1052                     # match the "start" value, so save change to conflict file
1053                     self.copy(final, conflictpath)
1054             elif event_type == DEL:
1055                 if local == initial:
1056                     # Local item matches "initial" value, so it is safe to remove.
1057                     self.remove(path, recursive=True)
1058                 # else, the file is modified or already removed, in either
1059                 # case we don't want to try to remove it.
1060
1061     def portable_data_hash(self):
1062         """Get the portable data hash for this collection's manifest."""
1063         stripped = self.portable_manifest_text()
1064         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1065
1066     @synchronized
1067     def subscribe(self, callback):
1068         if self._callback is None:
1069             self._callback = callback
1070         else:
1071             raise errors.ArgumentError("A callback is already set on this collection.")
1072
1073     @synchronized
1074     def unsubscribe(self):
1075         if self._callback is not None:
1076             self._callback = None
1077
1078     @synchronized
1079     def notify(self, event, collection, name, item):
1080         if self._callback:
1081             self._callback(event, collection, name, item)
1082         self.root_collection().notify(event, collection, name, item)
1083
1084     @synchronized
1085     def __eq__(self, other):
1086         if other is self:
1087             return True
1088         if not isinstance(other, RichCollectionBase):
1089             return False
1090         if len(self._items) != len(other):
1091             return False
1092         for k in self._items:
1093             if k not in other:
1094                 return False
1095             if self._items[k] != other[k]:
1096                 return False
1097         return True
1098
1099     def __ne__(self, other):
1100         return not self.__eq__(other)
1101
1102     @synchronized
1103     def flush(self):
1104         """Flush bufferblocks to Keep."""
1105         for e in self.values():
1106             e.flush()
1107
1108
1109 class Collection(RichCollectionBase):
1110     """Represents the root of an Arvados Collection.
1111
1112     This class is threadsafe.  The root collection object, all subcollections
1113     and files are protected by a single lock (i.e. each access locks the entire
1114     collection).
1115
1116     Brief summary of
1117     useful methods:
1118
1119     :To read an existing file:
1120       `c.open("myfile", "r")`
1121
1122     :To write a new file:
1123       `c.open("myfile", "w")`
1124
1125     :To determine if a file exists:
1126       `c.find("myfile") is not None`
1127
1128     :To copy a file:
1129       `c.copy("source", "dest")`
1130
1131     :To delete a file:
1132       `c.remove("myfile")`
1133
1134     :To save to an existing collection record:
1135       `c.save()`
1136
1137     :To save a new collection record:
1138     `c.save_new()`
1139
1140     :To merge remote changes into this object:
1141       `c.update()`
1142
1143     Must be associated with an API server Collection record (during
1144     initialization, or using `save_new`) to use `save` or `update`
1145
1146     """
1147
1148     def __init__(self, manifest_locator_or_text=None,
1149                  api_client=None,
1150                  keep_client=None,
1151                  num_retries=None,
1152                  parent=None,
1153                  apiconfig=None,
1154                  block_manager=None,
1155                  replication_desired=None,
1156                  put_threads=None):
1157         """Collection constructor.
1158
1159         :manifest_locator_or_text:
1160           One of Arvados collection UUID, block locator of
1161           a manifest, raw manifest text, or None (to create an empty collection).
1162         :parent:
1163           the parent Collection, may be None.
1164
1165         :apiconfig:
1166           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1167           Prefer this over supplying your own api_client and keep_client (except in testing).
1168           Will use default config settings if not specified.
1169
1170         :api_client:
1171           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1172
1173         :keep_client:
1174           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1175
1176         :num_retries:
1177           the number of retries for API and Keep requests.
1178
1179         :block_manager:
1180           the block manager to use.  If not specified, create one.
1181
1182         :replication_desired:
1183           How many copies should Arvados maintain. If None, API server default
1184           configuration applies. If not None, this value will also be used
1185           for determining the number of block copies being written.
1186
1187         """
1188         super(Collection, self).__init__(parent)
1189         self._api_client = api_client
1190         self._keep_client = keep_client
1191         self._block_manager = block_manager
1192         self.replication_desired = replication_desired
1193         self.put_threads = put_threads
1194
1195         if apiconfig:
1196             self._config = apiconfig
1197         else:
1198             self._config = config.settings()
1199
1200         self.num_retries = num_retries if num_retries is not None else 0
1201         self._manifest_locator = None
1202         self._manifest_text = None
1203         self._api_response = None
1204         self._past_versions = set()
1205
1206         self.lock = threading.RLock()
1207         self.events = None
1208
1209         if manifest_locator_or_text:
1210             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1211                 self._manifest_locator = manifest_locator_or_text
1212             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1213                 self._manifest_locator = manifest_locator_or_text
1214             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1215                 self._manifest_text = manifest_locator_or_text
1216             else:
1217                 raise errors.ArgumentError(
1218                     "Argument to CollectionReader is not a manifest or a collection UUID")
1219
1220             try:
1221                 self._populate()
1222             except (IOError, errors.SyntaxError) as e:
1223                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1224
1225     def root_collection(self):
1226         return self
1227
1228     def stream_name(self):
1229         return "."
1230
1231     def writable(self):
1232         return True
1233
1234     @synchronized
1235     def known_past_version(self, modified_at_and_portable_data_hash):
1236         return modified_at_and_portable_data_hash in self._past_versions
1237
1238     @synchronized
1239     @retry_method
1240     def update(self, other=None, num_retries=None):
1241         """Merge the latest collection on the API server with the current collection."""
1242
1243         if other is None:
1244             if self._manifest_locator is None:
1245                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1246             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1247             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1248                 response.get("portable_data_hash") != self.portable_data_hash()):
1249                 # The record on the server is different from our current one, but we've seen it before,
1250                 # so ignore it because it's already been merged.
1251                 # However, if it's the same as our current record, proceed with the update, because we want to update
1252                 # our tokens.
1253                 return
1254             else:
1255                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1256             other = CollectionReader(response["manifest_text"])
1257         baseline = CollectionReader(self._manifest_text)
1258         self.apply(baseline.diff(other))
1259         self._manifest_text = self.manifest_text()
1260
1261     @synchronized
1262     def _my_api(self):
1263         if self._api_client is None:
1264             self._api_client = ThreadSafeApiCache(self._config)
1265             if self._keep_client is None:
1266                 self._keep_client = self._api_client.keep
1267         return self._api_client
1268
1269     @synchronized
1270     def _my_keep(self):
1271         if self._keep_client is None:
1272             if self._api_client is None:
1273                 self._my_api()
1274             else:
1275                 self._keep_client = KeepClient(api_client=self._api_client)
1276         return self._keep_client
1277
1278     @synchronized
1279     def _my_block_manager(self):
1280         if self._block_manager is None:
1281             copies = (self.replication_desired or
1282                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1283                                                    2))
1284             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1285         return self._block_manager
1286
1287     def _remember_api_response(self, response):
1288         self._api_response = response
1289         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1290
1291     def _populate_from_api_server(self):
1292         # As in KeepClient itself, we must wait until the last
1293         # possible moment to instantiate an API client, in order to
1294         # avoid tripping up clients that don't have access to an API
1295         # server.  If we do build one, make sure our Keep client uses
1296         # it.  If instantiation fails, we'll fall back to the except
1297         # clause, just like any other Collection lookup
1298         # failure. Return an exception, or None if successful.
1299         try:
1300             self._remember_api_response(self._my_api().collections().get(
1301                 uuid=self._manifest_locator).execute(
1302                     num_retries=self.num_retries))
1303             self._manifest_text = self._api_response['manifest_text']
1304             # If not overriden via kwargs, we should try to load the
1305             # replication_desired from the API server
1306             if self.replication_desired is None:
1307                 self.replication_desired = self._api_response.get('replication_desired', None)
1308             return None
1309         except Exception as e:
1310             return e
1311
1312     def _populate_from_keep(self):
1313         # Retrieve a manifest directly from Keep. This has a chance of
1314         # working if [a] the locator includes a permission signature
1315         # or [b] the Keep services are operating in world-readable
1316         # mode. Return an exception, or None if successful.
1317         try:
1318             self._manifest_text = self._my_keep().get(
1319                 self._manifest_locator, num_retries=self.num_retries)
1320         except Exception as e:
1321             return e
1322
1323     def _populate(self):
1324         if self._manifest_locator is None and self._manifest_text is None:
1325             return
1326         error_via_api = None
1327         error_via_keep = None
1328         should_try_keep = ((self._manifest_text is None) and
1329                            util.keep_locator_pattern.match(
1330                                self._manifest_locator))
1331         if ((self._manifest_text is None) and
1332             util.signed_locator_pattern.match(self._manifest_locator)):
1333             error_via_keep = self._populate_from_keep()
1334         if self._manifest_text is None:
1335             error_via_api = self._populate_from_api_server()
1336             if error_via_api is not None and not should_try_keep:
1337                 raise error_via_api
1338         if ((self._manifest_text is None) and
1339             not error_via_keep and
1340             should_try_keep):
1341             # Looks like a keep locator, and we didn't already try keep above
1342             error_via_keep = self._populate_from_keep()
1343         if self._manifest_text is None:
1344             # Nothing worked!
1345             raise errors.NotFoundError(
1346                 ("Failed to retrieve collection '{}' " +
1347                  "from either API server ({}) or Keep ({})."
1348                  ).format(
1349                     self._manifest_locator,
1350                     error_via_api,
1351                     error_via_keep))
1352         # populate
1353         self._baseline_manifest = self._manifest_text
1354         self._import_manifest(self._manifest_text)
1355
1356
1357     def _has_collection_uuid(self):
1358         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1359
1360     def __enter__(self):
1361         return self
1362
1363     def __exit__(self, exc_type, exc_value, traceback):
1364         """Support scoped auto-commit in a with: block."""
1365         if exc_type is None:
1366             if self.writable() and self._has_collection_uuid():
1367                 self.save()
1368         self.stop_threads()
1369
1370     def stop_threads(self):
1371         if self._block_manager is not None:
1372             self._block_manager.stop_threads()
1373
1374     @synchronized
1375     def manifest_locator(self):
1376         """Get the manifest locator, if any.
1377
1378         The manifest locator will be set when the collection is loaded from an
1379         API server record or the portable data hash of a manifest.
1380
1381         The manifest locator will be None if the collection is newly created or
1382         was created directly from manifest text.  The method `save_new()` will
1383         assign a manifest locator.
1384
1385         """
1386         return self._manifest_locator
1387
1388     @synchronized
1389     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1390         if new_config is None:
1391             new_config = self._config
1392         if readonly:
1393             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1394         else:
1395             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1396
1397         newcollection._clonefrom(self)
1398         return newcollection
1399
1400     @synchronized
1401     def api_response(self):
1402         """Returns information about this Collection fetched from the API server.
1403
1404         If the Collection exists in Keep but not the API server, currently
1405         returns None.  Future versions may provide a synthetic response.
1406
1407         """
1408         return self._api_response
1409
1410     def find_or_create(self, path, create_type):
1411         """See `RichCollectionBase.find_or_create`"""
1412         if path == ".":
1413             return self
1414         else:
1415             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1416
1417     def find(self, path):
1418         """See `RichCollectionBase.find`"""
1419         if path == ".":
1420             return self
1421         else:
1422             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1423
1424     def remove(self, path, recursive=False):
1425         """See `RichCollectionBase.remove`"""
1426         if path == ".":
1427             raise errors.ArgumentError("Cannot remove '.'")
1428         else:
1429             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1430
1431     @must_be_writable
1432     @synchronized
1433     @retry_method
1434     def save(self, merge=True, num_retries=None):
1435         """Save collection to an existing collection record.
1436
1437         Commit pending buffer blocks to Keep, merge with remote record (if
1438         merge=True, the default), and update the collection record.  Returns
1439         the current manifest text.
1440
1441         Will raise AssertionError if not associated with a collection record on
1442         the API server.  If you want to save a manifest to Keep only, see
1443         `save_new()`.
1444
1445         :merge:
1446           Update and merge remote changes before saving.  Otherwise, any
1447           remote changes will be ignored and overwritten.
1448
1449         :num_retries:
1450           Retry count on API calls (if None,  use the collection default)
1451
1452         """
1453         if not self.committed():
1454             if not self._has_collection_uuid():
1455                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1456
1457             self._my_block_manager().commit_all()
1458
1459             if merge:
1460                 self.update()
1461
1462             text = self.manifest_text(strip=False)
1463             self._remember_api_response(self._my_api().collections().update(
1464                 uuid=self._manifest_locator,
1465                 body={'manifest_text': text}
1466                 ).execute(
1467                     num_retries=num_retries))
1468             self._manifest_text = self._api_response["manifest_text"]
1469             self.set_committed()
1470
1471         return self._manifest_text
1472
1473
1474     @must_be_writable
1475     @synchronized
1476     @retry_method
1477     def save_new(self, name=None,
1478                  create_collection_record=True,
1479                  owner_uuid=None,
1480                  ensure_unique_name=False,
1481                  num_retries=None):
1482         """Save collection to a new collection record.
1483
1484         Commit pending buffer blocks to Keep and, when create_collection_record
1485         is True (default), create a new collection record.  After creating a
1486         new collection record, this Collection object will be associated with
1487         the new record used by `save()`.  Returns the current manifest text.
1488
1489         :name:
1490           The collection name.
1491
1492         :create_collection_record:
1493            If True, create a collection record on the API server.
1494            If False, only commit blocks to Keep and return the manifest text.
1495
1496         :owner_uuid:
1497           the user, or project uuid that will own this collection.
1498           If None, defaults to the current user.
1499
1500         :ensure_unique_name:
1501           If True, ask the API server to rename the collection
1502           if it conflicts with a collection with the same name and owner.  If
1503           False, a name conflict will result in an error.
1504
1505         :num_retries:
1506           Retry count on API calls (if None,  use the collection default)
1507
1508         """
1509         self._my_block_manager().commit_all()
1510         text = self.manifest_text(strip=False)
1511
1512         if create_collection_record:
1513             if name is None:
1514                 name = "New collection"
1515                 ensure_unique_name = True
1516
1517             body = {"manifest_text": text,
1518                     "name": name,
1519                     "replication_desired": self.replication_desired}
1520             if owner_uuid:
1521                 body["owner_uuid"] = owner_uuid
1522
1523             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1524             text = self._api_response["manifest_text"]
1525
1526             self._manifest_locator = self._api_response["uuid"]
1527
1528             self._manifest_text = text
1529             self.set_committed()
1530
1531         return text
1532
1533     @synchronized
1534     def _import_manifest(self, manifest_text):
1535         """Import a manifest into a `Collection`.
1536
1537         :manifest_text:
1538           The manifest text to import from.
1539
1540         """
1541         if len(self) > 0:
1542             raise ArgumentError("Can only import manifest into an empty collection")
1543
1544         STREAM_NAME = 0
1545         BLOCKS = 1
1546         SEGMENTS = 2
1547
1548         stream_name = None
1549         state = STREAM_NAME
1550
1551         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1552             tok = token_and_separator.group(1)
1553             sep = token_and_separator.group(2)
1554
1555             if state == STREAM_NAME:
1556                 # starting a new stream
1557                 stream_name = tok.replace('\\040', ' ')
1558                 blocks = []
1559                 segments = []
1560                 streamoffset = 0L
1561                 state = BLOCKS
1562                 self.find_or_create(stream_name, COLLECTION)
1563                 continue
1564
1565             if state == BLOCKS:
1566                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1567                 if block_locator:
1568                     blocksize = long(block_locator.group(1))
1569                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1570                     streamoffset += blocksize
1571                 else:
1572                     state = SEGMENTS
1573
1574             if state == SEGMENTS:
1575                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1576                 if file_segment:
1577                     pos = long(file_segment.group(1))
1578                     size = long(file_segment.group(2))
1579                     name = file_segment.group(3).replace('\\040', ' ')
1580                     filepath = os.path.join(stream_name, name)
1581                     afile = self.find_or_create(filepath, FILE)
1582                     if isinstance(afile, ArvadosFile):
1583                         afile.add_segment(blocks, pos, size)
1584                     else:
1585                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1586                 else:
1587                     # error!
1588                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1589
1590             if sep == "\n":
1591                 stream_name = None
1592                 state = STREAM_NAME
1593
1594         self.set_committed()
1595
1596     @synchronized
1597     def notify(self, event, collection, name, item):
1598         if self._callback:
1599             self._callback(event, collection, name, item)
1600
1601
1602 class Subcollection(RichCollectionBase):
1603     """This is a subdirectory within a collection that doesn't have its own API
1604     server record.
1605
1606     Subcollection locking falls under the umbrella lock of its root collection.
1607
1608     """
1609
1610     def __init__(self, parent, name):
1611         super(Subcollection, self).__init__(parent)
1612         self.lock = self.root_collection().lock
1613         self._manifest_text = None
1614         self.name = name
1615         self.num_retries = parent.num_retries
1616
1617     def root_collection(self):
1618         return self.parent.root_collection()
1619
1620     def writable(self):
1621         return self.root_collection().writable()
1622
1623     def _my_api(self):
1624         return self.root_collection()._my_api()
1625
1626     def _my_keep(self):
1627         return self.root_collection()._my_keep()
1628
1629     def _my_block_manager(self):
1630         return self.root_collection()._my_block_manager()
1631
1632     def stream_name(self):
1633         return os.path.join(self.parent.stream_name(), self.name)
1634
1635     @synchronized
1636     def clone(self, new_parent, new_name):
1637         c = Subcollection(new_parent, new_name)
1638         c._clonefrom(self)
1639         return c
1640
1641     @must_be_writable
1642     @synchronized
1643     def _reparent(self, newparent, newname):
1644         self._committed = False
1645         self.flush()
1646         self.parent.remove(self.name, recursive=True)
1647         self.parent = newparent
1648         self.name = newname
1649         self.lock = self.parent.root_collection().lock
1650
1651
1652 class CollectionReader(Collection):
1653     """A read-only collection object.
1654
1655     Initialize from an api collection record locator, a portable data hash of a
1656     manifest, or raw manifest text.  See `Collection` constructor for detailed
1657     options.
1658
1659     """
1660     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1661         self._in_init = True
1662         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1663         self._in_init = False
1664
1665         # Forego any locking since it should never change once initialized.
1666         self.lock = NoopLock()
1667
1668         # Backwards compatability with old CollectionReader
1669         # all_streams() and all_files()
1670         self._streams = None
1671
1672     def writable(self):
1673         return self._in_init
1674
1675     def _populate_streams(orig_func):
1676         @functools.wraps(orig_func)
1677         def populate_streams_wrapper(self, *args, **kwargs):
1678             # Defer populating self._streams until needed since it creates a copy of the manifest.
1679             if self._streams is None:
1680                 if self._manifest_text:
1681                     self._streams = [sline.split()
1682                                      for sline in self._manifest_text.split("\n")
1683                                      if sline]
1684                 else:
1685                     self._streams = []
1686             return orig_func(self, *args, **kwargs)
1687         return populate_streams_wrapper
1688
1689     @_populate_streams
1690     def normalize(self):
1691         """Normalize the streams returned by `all_streams`.
1692
1693         This method is kept for backwards compatability and only affects the
1694         behavior of `all_streams()` and `all_files()`
1695
1696         """
1697
1698         # Rearrange streams
1699         streams = {}
1700         for s in self.all_streams():
1701             for f in s.all_files():
1702                 streamname, filename = split(s.name() + "/" + f.name())
1703                 if streamname not in streams:
1704                     streams[streamname] = {}
1705                 if filename not in streams[streamname]:
1706                     streams[streamname][filename] = []
1707                 for r in f.segments:
1708                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1709
1710         self._streams = [normalize_stream(s, streams[s])
1711                          for s in sorted(streams)]
1712     @_populate_streams
1713     def all_streams(self):
1714         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1715                 for s in self._streams]
1716
1717     @_populate_streams
1718     def all_files(self):
1719         for s in self.all_streams():
1720             for f in s.all_files():
1721                 yield f