closes #10903
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace: '%s'" %
311                 (newstreamname))
312         self._current_stream_name = '.' if newstreamname=='' else newstreamname
313
314     def current_stream_name(self):
315         return self._current_stream_name
316
317     def finish_current_stream(self):
318         self.finish_current_file()
319         self.flush_data()
320         if not self._current_stream_files:
321             pass
322         elif self._current_stream_name is None:
323             raise errors.AssertionError(
324                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
325                 (self._current_stream_length, len(self._current_stream_files)))
326         else:
327             if not self._current_stream_locators:
328                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
329             self._finished_streams.append([self._current_stream_name,
330                                            self._current_stream_locators,
331                                            self._current_stream_files])
332         self._current_stream_files = []
333         self._current_stream_length = 0
334         self._current_stream_locators = []
335         self._current_stream_name = None
336         self._current_file_pos = 0
337         self._current_file_name = None
338
339     def finish(self):
340         """Store the manifest in Keep and return its locator.
341
342         This is useful for storing manifest fragments (task outputs)
343         temporarily in Keep during a Crunch job.
344
345         In other cases you should make a collection instead, by
346         sending manifest_text() to the API server's "create
347         collection" endpoint.
348         """
349         return self._my_keep().put(self.manifest_text(), copies=self.replication)
350
351     def portable_data_hash(self):
352         stripped = self.stripped_manifest()
353         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354
355     def manifest_text(self):
356         self.finish_current_stream()
357         manifest = ''
358
359         for stream in self._finished_streams:
360             if not re.search(r'^\.(/.*)?$', stream[0]):
361                 manifest += './'
362             manifest += stream[0].replace(' ', '\\040')
363             manifest += ' ' + ' '.join(stream[1])
364             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
365             manifest += "\n"
366
367         return manifest
368
369     def data_locators(self):
370         ret = []
371         for name, locators, files in self._finished_streams:
372             ret += locators
373         return ret
374
375
376 class ResumableCollectionWriter(CollectionWriter):
377     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378                    '_current_stream_locators', '_current_stream_name',
379                    '_current_file_name', '_current_file_pos', '_close_file',
380                    '_data_buffer', '_dependencies', '_finished_streams',
381                    '_queued_dirents', '_queued_trees']
382
383     def __init__(self, api_client=None, **kwargs):
384         self._dependencies = {}
385         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
386
387     @classmethod
388     def from_state(cls, state, *init_args, **init_kwargs):
389         # Try to build a new writer from scratch with the given state.
390         # If the state is not suitable to resume (because files have changed,
391         # been deleted, aren't predictable, etc.), raise a
392         # StaleWriterStateError.  Otherwise, return the initialized writer.
393         # The caller is responsible for calling writer.do_queued_work()
394         # appropriately after it's returned.
395         writer = cls(*init_args, **init_kwargs)
396         for attr_name in cls.STATE_PROPS:
397             attr_value = state[attr_name]
398             attr_class = getattr(writer, attr_name).__class__
399             # Coerce the value into the same type as the initial value, if
400             # needed.
401             if attr_class not in (type(None), attr_value.__class__):
402                 attr_value = attr_class(attr_value)
403             setattr(writer, attr_name, attr_value)
404         # Check dependencies before we try to resume anything.
405         if any(KeepLocator(ls).permission_expired()
406                for ls in writer._current_stream_locators):
407             raise errors.StaleWriterStateError(
408                 "locators include expired permission hint")
409         writer.check_dependencies()
410         if state['_current_file'] is not None:
411             path, pos = state['_current_file']
412             try:
413                 writer._queued_file = open(path, 'rb')
414                 writer._queued_file.seek(pos)
415             except IOError as error:
416                 raise errors.StaleWriterStateError(
417                     "failed to reopen active file {}: {}".format(path, error))
418         return writer
419
420     def check_dependencies(self):
421         for path, orig_stat in self._dependencies.items():
422             if not S_ISREG(orig_stat[ST_MODE]):
423                 raise errors.StaleWriterStateError("{} not file".format(path))
424             try:
425                 now_stat = tuple(os.stat(path))
426             except OSError as error:
427                 raise errors.StaleWriterStateError(
428                     "failed to stat {}: {}".format(path, error))
429             if ((not S_ISREG(now_stat[ST_MODE])) or
430                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
431                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
432                 raise errors.StaleWriterStateError("{} changed".format(path))
433
434     def dump_state(self, copy_func=lambda x: x):
435         state = {attr: copy_func(getattr(self, attr))
436                  for attr in self.STATE_PROPS}
437         if self._queued_file is None:
438             state['_current_file'] = None
439         else:
440             state['_current_file'] = (os.path.realpath(self._queued_file.name),
441                                       self._queued_file.tell())
442         return state
443
444     def _queue_file(self, source, filename=None):
445         try:
446             src_path = os.path.realpath(source)
447         except Exception:
448             raise errors.AssertionError("{} not a file path".format(source))
449         try:
450             path_stat = os.stat(src_path)
451         except OSError as stat_error:
452             path_stat = None
453         super(ResumableCollectionWriter, self)._queue_file(source, filename)
454         fd_stat = os.fstat(self._queued_file.fileno())
455         if not S_ISREG(fd_stat.st_mode):
456             # We won't be able to resume from this cache anyway, so don't
457             # worry about further checks.
458             self._dependencies[source] = tuple(fd_stat)
459         elif path_stat is None:
460             raise errors.AssertionError(
461                 "could not stat {}: {}".format(source, stat_error))
462         elif path_stat.st_ino != fd_stat.st_ino:
463             raise errors.AssertionError(
464                 "{} changed between open and stat calls".format(source))
465         else:
466             self._dependencies[src_path] = tuple(fd_stat)
467
468     def write(self, data):
469         if self._queued_file is None:
470             raise errors.AssertionError(
471                 "resumable writer can't accept unsourced data")
472         return super(ResumableCollectionWriter, self).write(data)
473
474
475 ADD = "add"
476 DEL = "del"
477 MOD = "mod"
478 TOK = "tok"
479 FILE = "file"
480 COLLECTION = "collection"
481
482 class RichCollectionBase(CollectionBase):
483     """Base class for Collections and Subcollections.
484
485     Implements the majority of functionality relating to accessing items in the
486     Collection.
487
488     """
489
490     def __init__(self, parent=None):
491         self.parent = parent
492         self._committed = False
493         self._callback = None
494         self._items = {}
495
496     def _my_api(self):
497         raise NotImplementedError()
498
499     def _my_keep(self):
500         raise NotImplementedError()
501
502     def _my_block_manager(self):
503         raise NotImplementedError()
504
505     def writable(self):
506         raise NotImplementedError()
507
508     def root_collection(self):
509         raise NotImplementedError()
510
511     def notify(self, event, collection, name, item):
512         raise NotImplementedError()
513
514     def stream_name(self):
515         raise NotImplementedError()
516
517     @must_be_writable
518     @synchronized
519     def find_or_create(self, path, create_type):
520         """Recursively search the specified file path.
521
522         May return either a `Collection` or `ArvadosFile`.  If not found, will
523         create a new item at the specified path based on `create_type`.  Will
524         create intermediate subcollections needed to contain the final item in
525         the path.
526
527         :create_type:
528           One of `arvados.collection.FILE` or
529           `arvados.collection.COLLECTION`.  If the path is not found, and value
530           of create_type is FILE then create and return a new ArvadosFile for
531           the last path component.  If COLLECTION, then create and return a new
532           Collection for the last path component.
533
534         """
535
536         pathcomponents = path.split("/", 1)
537         if pathcomponents[0]:
538             item = self._items.get(pathcomponents[0])
539             if len(pathcomponents) == 1:
540                 if item is None:
541                     # create new file
542                     if create_type == COLLECTION:
543                         item = Subcollection(self, pathcomponents[0])
544                     else:
545                         item = ArvadosFile(self, pathcomponents[0])
546                     self._items[pathcomponents[0]] = item
547                     self._committed = False
548                     self.notify(ADD, self, pathcomponents[0], item)
549                 return item
550             else:
551                 if item is None:
552                     # create new collection
553                     item = Subcollection(self, pathcomponents[0])
554                     self._items[pathcomponents[0]] = item
555                     self._committed = False
556                     self.notify(ADD, self, pathcomponents[0], item)
557                 if isinstance(item, RichCollectionBase):
558                     return item.find_or_create(pathcomponents[1], create_type)
559                 else:
560                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
561         else:
562             return self
563
564     @synchronized
565     def find(self, path):
566         """Recursively search the specified file path.
567
568         May return either a Collection or ArvadosFile. Return None if not
569         found.
570         If path is invalid (ex: starts with '/'), an IOError exception will be
571         raised.
572
573         """
574         if not path:
575             raise errors.ArgumentError("Parameter 'path' is empty.")
576
577         pathcomponents = path.split("/", 1)
578         if pathcomponents[0] == '':
579             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
580
581         item = self._items.get(pathcomponents[0])
582         if item is None:
583             return None
584         elif len(pathcomponents) == 1:
585             return item
586         else:
587             if isinstance(item, RichCollectionBase):
588                 if pathcomponents[1]:
589                     return item.find(pathcomponents[1])
590                 else:
591                     return item
592             else:
593                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
594
595     @synchronized
596     def mkdirs(self, path):
597         """Recursive subcollection create.
598
599         Like `os.makedirs()`.  Will create intermediate subcollections needed
600         to contain the leaf subcollection path.
601
602         """
603
604         if self.find(path) != None:
605             raise IOError(errno.EEXIST, "Directory or file exists", path)
606
607         return self.find_or_create(path, COLLECTION)
608
609     def open(self, path, mode="r"):
610         """Open a file-like object for access.
611
612         :path:
613           path to a file in the collection
614         :mode:
615           one of "r", "r+", "w", "w+", "a", "a+"
616           :"r":
617             opens for reading
618           :"r+":
619             opens for reading and writing.  Reads/writes share a file pointer.
620           :"w", "w+":
621             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
622           :"a", "a+":
623             opens for reading and writing.  All writes are appended to
624             the end of the file.  Writing does not affect the file pointer for
625             reading.
626         """
627         mode = mode.replace("b", "")
628         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
629             raise errors.ArgumentError("Bad mode '%s'" % mode)
630         create = (mode != "r")
631
632         if create and not self.writable():
633             raise IOError(errno.EROFS, "Collection is read only")
634
635         if create:
636             arvfile = self.find_or_create(path, FILE)
637         else:
638             arvfile = self.find(path)
639
640         if arvfile is None:
641             raise IOError(errno.ENOENT, "File not found", path)
642         if not isinstance(arvfile, ArvadosFile):
643             raise IOError(errno.EISDIR, "Is a directory", path)
644
645         if mode[0] == "w":
646             arvfile.truncate(0)
647
648         name = os.path.basename(path)
649
650         if mode == "r":
651             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
652         else:
653             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
654
655     def modified(self):
656         """Determine if the collection has been modified since last commited."""
657         return not self.committed()
658
659     @synchronized
660     def committed(self):
661         """Determine if the collection has been committed to the API server."""
662
663         if self._committed is False:
664             return False
665         for v in self._items.values():
666             if v.committed() is False:
667                 return False
668         return True
669
670     @synchronized
671     def set_committed(self):
672         """Recursively set committed flag to True."""
673         self._committed = True
674         for k,v in self._items.items():
675             v.set_committed()
676
677     @synchronized
678     def __iter__(self):
679         """Iterate over names of files and collections contained in this collection."""
680         return iter(self._items.keys())
681
682     @synchronized
683     def __getitem__(self, k):
684         """Get a file or collection that is directly contained by this collection.
685
686         If you want to search a path, use `find()` instead.
687
688         """
689         return self._items[k]
690
691     @synchronized
692     def __contains__(self, k):
693         """Test if there is a file or collection a directly contained by this collection."""
694         return k in self._items
695
696     @synchronized
697     def __len__(self):
698         """Get the number of items directly contained in this collection."""
699         return len(self._items)
700
701     @must_be_writable
702     @synchronized
703     def __delitem__(self, p):
704         """Delete an item by name which is directly contained by this collection."""
705         del self._items[p]
706         self._committed = False
707         self.notify(DEL, self, p, None)
708
709     @synchronized
710     def keys(self):
711         """Get a list of names of files and collections directly contained in this collection."""
712         return self._items.keys()
713
714     @synchronized
715     def values(self):
716         """Get a list of files and collection objects directly contained in this collection."""
717         return self._items.values()
718
719     @synchronized
720     def items(self):
721         """Get a list of (name, object) tuples directly contained in this collection."""
722         return self._items.items()
723
724     def exists(self, path):
725         """Test if there is a file or collection at `path`."""
726         return self.find(path) is not None
727
728     @must_be_writable
729     @synchronized
730     def remove(self, path, recursive=False):
731         """Remove the file or subcollection (directory) at `path`.
732
733         :recursive:
734           Specify whether to remove non-empty subcollections (True), or raise an error (False).
735         """
736
737         if not path:
738             raise errors.ArgumentError("Parameter 'path' is empty.")
739
740         pathcomponents = path.split("/", 1)
741         item = self._items.get(pathcomponents[0])
742         if item is None:
743             raise IOError(errno.ENOENT, "File not found", path)
744         if len(pathcomponents) == 1:
745             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
746                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
747             deleteditem = self._items[pathcomponents[0]]
748             del self._items[pathcomponents[0]]
749             self._committed = False
750             self.notify(DEL, self, pathcomponents[0], deleteditem)
751         else:
752             item.remove(pathcomponents[1])
753
754     def _clonefrom(self, source):
755         for k,v in source.items():
756             self._items[k] = v.clone(self, k)
757
758     def clone(self):
759         raise NotImplementedError()
760
761     @must_be_writable
762     @synchronized
763     def add(self, source_obj, target_name, overwrite=False, reparent=False):
764         """Copy or move a file or subcollection to this collection.
765
766         :source_obj:
767           An ArvadosFile, or Subcollection object
768
769         :target_name:
770           Destination item name.  If the target name already exists and is a
771           file, this will raise an error unless you specify `overwrite=True`.
772
773         :overwrite:
774           Whether to overwrite target file if it already exists.
775
776         :reparent:
777           If True, source_obj will be moved from its parent collection to this collection.
778           If False, source_obj will be copied and the parent collection will be
779           unmodified.
780
781         """
782
783         if target_name in self and not overwrite:
784             raise IOError(errno.EEXIST, "File already exists", target_name)
785
786         modified_from = None
787         if target_name in self:
788             modified_from = self[target_name]
789
790         # Actually make the move or copy.
791         if reparent:
792             source_obj._reparent(self, target_name)
793             item = source_obj
794         else:
795             item = source_obj.clone(self, target_name)
796
797         self._items[target_name] = item
798         self._committed = False
799
800         if modified_from:
801             self.notify(MOD, self, target_name, (modified_from, item))
802         else:
803             self.notify(ADD, self, target_name, item)
804
805     def _get_src_target(self, source, target_path, source_collection, create_dest):
806         if source_collection is None:
807             source_collection = self
808
809         # Find the object
810         if isinstance(source, basestring):
811             source_obj = source_collection.find(source)
812             if source_obj is None:
813                 raise IOError(errno.ENOENT, "File not found", source)
814             sourcecomponents = source.split("/")
815         else:
816             source_obj = source
817             sourcecomponents = None
818
819         # Find parent collection the target path
820         targetcomponents = target_path.split("/")
821
822         # Determine the name to use.
823         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
824
825         if not target_name:
826             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
827
828         if create_dest:
829             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
830         else:
831             if len(targetcomponents) > 1:
832                 target_dir = self.find("/".join(targetcomponents[0:-1]))
833             else:
834                 target_dir = self
835
836         if target_dir is None:
837             raise IOError(errno.ENOENT, "Target directory not found", target_name)
838
839         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
840             target_dir = target_dir[target_name]
841             target_name = sourcecomponents[-1]
842
843         return (source_obj, target_dir, target_name)
844
845     @must_be_writable
846     @synchronized
847     def copy(self, source, target_path, source_collection=None, overwrite=False):
848         """Copy a file or subcollection to a new path in this collection.
849
850         :source:
851           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
852
853         :target_path:
854           Destination file or path.  If the target path already exists and is a
855           subcollection, the item will be placed inside the subcollection.  If
856           the target path already exists and is a file, this will raise an error
857           unless you specify `overwrite=True`.
858
859         :source_collection:
860           Collection to copy `source_path` from (default `self`)
861
862         :overwrite:
863           Whether to overwrite target file if it already exists.
864         """
865
866         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
867         target_dir.add(source_obj, target_name, overwrite, False)
868
869     @must_be_writable
870     @synchronized
871     def rename(self, source, target_path, source_collection=None, overwrite=False):
872         """Move a file or subcollection from `source_collection` to a new path in this collection.
873
874         :source:
875           A string with a path to source file or subcollection.
876
877         :target_path:
878           Destination file or path.  If the target path already exists and is a
879           subcollection, the item will be placed inside the subcollection.  If
880           the target path already exists and is a file, this will raise an error
881           unless you specify `overwrite=True`.
882
883         :source_collection:
884           Collection to copy `source_path` from (default `self`)
885
886         :overwrite:
887           Whether to overwrite target file if it already exists.
888         """
889
890         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
891         if not source_obj.writable():
892             raise IOError(errno.EROFS, "Source collection is read only", source)
893         target_dir.add(source_obj, target_name, overwrite, True)
894
895     def portable_manifest_text(self, stream_name="."):
896         """Get the manifest text for this collection, sub collections and files.
897
898         This method does not flush outstanding blocks to Keep.  It will return
899         a normalized manifest with access tokens stripped.
900
901         :stream_name:
902           Name to use for this stream (directory)
903
904         """
905         return self._get_manifest_text(stream_name, True, True)
906
907     @synchronized
908     def manifest_text(self, stream_name=".", strip=False, normalize=False,
909                       only_committed=False):
910         """Get the manifest text for this collection, sub collections and files.
911
912         This method will flush outstanding blocks to Keep.  By default, it will
913         not normalize an unmodified manifest or strip access tokens.
914
915         :stream_name:
916           Name to use for this stream (directory)
917
918         :strip:
919           If True, remove signing tokens from block locators if present.
920           If False (default), block locators are left unchanged.
921
922         :normalize:
923           If True, always export the manifest text in normalized form
924           even if the Collection is not modified.  If False (default) and the collection
925           is not modified, return the original manifest text even if it is not
926           in normalized form.
927
928         :only_committed:
929           If True, don't commit pending blocks.
930
931         """
932
933         if not only_committed:
934             self._my_block_manager().commit_all()
935         return self._get_manifest_text(stream_name, strip, normalize,
936                                        only_committed=only_committed)
937
938     @synchronized
939     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
940         """Get the manifest text for this collection, sub collections and files.
941
942         :stream_name:
943           Name to use for this stream (directory)
944
945         :strip:
946           If True, remove signing tokens from block locators if present.
947           If False (default), block locators are left unchanged.
948
949         :normalize:
950           If True, always export the manifest text in normalized form
951           even if the Collection is not modified.  If False (default) and the collection
952           is not modified, return the original manifest text even if it is not
953           in normalized form.
954
955         :only_committed:
956           If True, only include blocks that were already committed to Keep.
957
958         """
959
960         if not self.committed() or self._manifest_text is None or normalize:
961             stream = {}
962             buf = []
963             sorted_keys = sorted(self.keys())
964             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
965                 # Create a stream per file `k`
966                 arvfile = self[filename]
967                 filestream = []
968                 for segment in arvfile.segments():
969                     loc = segment.locator
970                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
971                         if only_committed:
972                             continue
973                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
974                     if strip:
975                         loc = KeepLocator(loc).stripped()
976                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
977                                          segment.segment_offset, segment.range_size))
978                 stream[filename] = filestream
979             if stream:
980                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
981             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
982                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
983             return "".join(buf)
984         else:
985             if strip:
986                 return self.stripped_manifest()
987             else:
988                 return self._manifest_text
989
990     @synchronized
991     def diff(self, end_collection, prefix=".", holding_collection=None):
992         """Generate list of add/modify/delete actions.
993
994         When given to `apply`, will change `self` to match `end_collection`
995
996         """
997         changes = []
998         if holding_collection is None:
999             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1000         for k in self:
1001             if k not in end_collection:
1002                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1003         for k in end_collection:
1004             if k in self:
1005                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1006                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1007                 elif end_collection[k] != self[k]:
1008                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1009                 else:
1010                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1011             else:
1012                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1013         return changes
1014
1015     @must_be_writable
1016     @synchronized
1017     def apply(self, changes):
1018         """Apply changes from `diff`.
1019
1020         If a change conflicts with a local change, it will be saved to an
1021         alternate path indicating the conflict.
1022
1023         """
1024         if changes:
1025             self._committed = False
1026         for change in changes:
1027             event_type = change[0]
1028             path = change[1]
1029             initial = change[2]
1030             local = self.find(path)
1031             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1032                                                                     time.gmtime()))
1033             if event_type == ADD:
1034                 if local is None:
1035                     # No local file at path, safe to copy over new file
1036                     self.copy(initial, path)
1037                 elif local is not None and local != initial:
1038                     # There is already local file and it is different:
1039                     # save change to conflict file.
1040                     self.copy(initial, conflictpath)
1041             elif event_type == MOD or event_type == TOK:
1042                 final = change[3]
1043                 if local == initial:
1044                     # Local matches the "initial" item so it has not
1045                     # changed locally and is safe to update.
1046                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1047                         # Replace contents of local file with new contents
1048                         local.replace_contents(final)
1049                     else:
1050                         # Overwrite path with new item; this can happen if
1051                         # path was a file and is now a collection or vice versa
1052                         self.copy(final, path, overwrite=True)
1053                 else:
1054                     # Local is missing (presumably deleted) or local doesn't
1055                     # match the "start" value, so save change to conflict file
1056                     self.copy(final, conflictpath)
1057             elif event_type == DEL:
1058                 if local == initial:
1059                     # Local item matches "initial" value, so it is safe to remove.
1060                     self.remove(path, recursive=True)
1061                 # else, the file is modified or already removed, in either
1062                 # case we don't want to try to remove it.
1063
1064     def portable_data_hash(self):
1065         """Get the portable data hash for this collection's manifest."""
1066         if self._manifest_locator and self.committed():
1067             # If the collection is already saved on the API server, and it's committed
1068             # then return API server's PDH response.
1069             return self._portable_data_hash
1070         else:
1071             stripped = self.portable_manifest_text()
1072             return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1073
1074     @synchronized
1075     def subscribe(self, callback):
1076         if self._callback is None:
1077             self._callback = callback
1078         else:
1079             raise errors.ArgumentError("A callback is already set on this collection.")
1080
1081     @synchronized
1082     def unsubscribe(self):
1083         if self._callback is not None:
1084             self._callback = None
1085
1086     @synchronized
1087     def notify(self, event, collection, name, item):
1088         if self._callback:
1089             self._callback(event, collection, name, item)
1090         self.root_collection().notify(event, collection, name, item)
1091
1092     @synchronized
1093     def __eq__(self, other):
1094         if other is self:
1095             return True
1096         if not isinstance(other, RichCollectionBase):
1097             return False
1098         if len(self._items) != len(other):
1099             return False
1100         for k in self._items:
1101             if k not in other:
1102                 return False
1103             if self._items[k] != other[k]:
1104                 return False
1105         return True
1106
1107     def __ne__(self, other):
1108         return not self.__eq__(other)
1109
1110     @synchronized
1111     def flush(self):
1112         """Flush bufferblocks to Keep."""
1113         for e in self.values():
1114             e.flush()
1115
1116
1117 class Collection(RichCollectionBase):
1118     """Represents the root of an Arvados Collection.
1119
1120     This class is threadsafe.  The root collection object, all subcollections
1121     and files are protected by a single lock (i.e. each access locks the entire
1122     collection).
1123
1124     Brief summary of
1125     useful methods:
1126
1127     :To read an existing file:
1128       `c.open("myfile", "r")`
1129
1130     :To write a new file:
1131       `c.open("myfile", "w")`
1132
1133     :To determine if a file exists:
1134       `c.find("myfile") is not None`
1135
1136     :To copy a file:
1137       `c.copy("source", "dest")`
1138
1139     :To delete a file:
1140       `c.remove("myfile")`
1141
1142     :To save to an existing collection record:
1143       `c.save()`
1144
1145     :To save a new collection record:
1146     `c.save_new()`
1147
1148     :To merge remote changes into this object:
1149       `c.update()`
1150
1151     Must be associated with an API server Collection record (during
1152     initialization, or using `save_new`) to use `save` or `update`
1153
1154     """
1155
1156     def __init__(self, manifest_locator_or_text=None,
1157                  api_client=None,
1158                  keep_client=None,
1159                  num_retries=None,
1160                  parent=None,
1161                  apiconfig=None,
1162                  block_manager=None,
1163                  replication_desired=None,
1164                  put_threads=None):
1165         """Collection constructor.
1166
1167         :manifest_locator_or_text:
1168           One of Arvados collection UUID, block locator of
1169           a manifest, raw manifest text, or None (to create an empty collection).
1170         :parent:
1171           the parent Collection, may be None.
1172
1173         :apiconfig:
1174           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1175           Prefer this over supplying your own api_client and keep_client (except in testing).
1176           Will use default config settings if not specified.
1177
1178         :api_client:
1179           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1180
1181         :keep_client:
1182           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1183
1184         :num_retries:
1185           the number of retries for API and Keep requests.
1186
1187         :block_manager:
1188           the block manager to use.  If not specified, create one.
1189
1190         :replication_desired:
1191           How many copies should Arvados maintain. If None, API server default
1192           configuration applies. If not None, this value will also be used
1193           for determining the number of block copies being written.
1194
1195         """
1196         super(Collection, self).__init__(parent)
1197         self._api_client = api_client
1198         self._keep_client = keep_client
1199         self._block_manager = block_manager
1200         self.replication_desired = replication_desired
1201         self.put_threads = put_threads
1202
1203         if apiconfig:
1204             self._config = apiconfig
1205         else:
1206             self._config = config.settings()
1207
1208         self.num_retries = num_retries if num_retries is not None else 0
1209         self._manifest_locator = None
1210         self._manifest_text = None
1211         self._portable_data_hash = None
1212         self._api_response = None
1213         self._past_versions = set()
1214
1215         self.lock = threading.RLock()
1216         self.events = None
1217
1218         if manifest_locator_or_text:
1219             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1220                 self._manifest_locator = manifest_locator_or_text
1221             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1222                 self._manifest_locator = manifest_locator_or_text
1223             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1224                 self._manifest_text = manifest_locator_or_text
1225             else:
1226                 raise errors.ArgumentError(
1227                     "Argument to CollectionReader is not a manifest or a collection UUID")
1228
1229             try:
1230                 self._populate()
1231             except (IOError, errors.SyntaxError) as e:
1232                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1233
1234     def root_collection(self):
1235         return self
1236
1237     def stream_name(self):
1238         return "."
1239
1240     def writable(self):
1241         return True
1242
1243     @synchronized
1244     def known_past_version(self, modified_at_and_portable_data_hash):
1245         return modified_at_and_portable_data_hash in self._past_versions
1246
1247     @synchronized
1248     @retry_method
1249     def update(self, other=None, num_retries=None):
1250         """Merge the latest collection on the API server with the current collection."""
1251
1252         if other is None:
1253             if self._manifest_locator is None:
1254                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1255             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1256             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1257                 response.get("portable_data_hash") != self.portable_data_hash()):
1258                 # The record on the server is different from our current one, but we've seen it before,
1259                 # so ignore it because it's already been merged.
1260                 # However, if it's the same as our current record, proceed with the update, because we want to update
1261                 # our tokens.
1262                 return
1263             else:
1264                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1265             other = CollectionReader(response["manifest_text"])
1266         baseline = CollectionReader(self._manifest_text)
1267         self.apply(baseline.diff(other))
1268         self._manifest_text = self.manifest_text()
1269
1270     @synchronized
1271     def _my_api(self):
1272         if self._api_client is None:
1273             self._api_client = ThreadSafeApiCache(self._config)
1274             if self._keep_client is None:
1275                 self._keep_client = self._api_client.keep
1276         return self._api_client
1277
1278     @synchronized
1279     def _my_keep(self):
1280         if self._keep_client is None:
1281             if self._api_client is None:
1282                 self._my_api()
1283             else:
1284                 self._keep_client = KeepClient(api_client=self._api_client)
1285         return self._keep_client
1286
1287     @synchronized
1288     def _my_block_manager(self):
1289         if self._block_manager is None:
1290             copies = (self.replication_desired or
1291                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1292                                                    2))
1293             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1294         return self._block_manager
1295
1296     def _remember_api_response(self, response):
1297         self._api_response = response
1298         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1299
1300     def _populate_from_api_server(self):
1301         # As in KeepClient itself, we must wait until the last
1302         # possible moment to instantiate an API client, in order to
1303         # avoid tripping up clients that don't have access to an API
1304         # server.  If we do build one, make sure our Keep client uses
1305         # it.  If instantiation fails, we'll fall back to the except
1306         # clause, just like any other Collection lookup
1307         # failure. Return an exception, or None if successful.
1308         try:
1309             self._remember_api_response(self._my_api().collections().get(
1310                 uuid=self._manifest_locator).execute(
1311                     num_retries=self.num_retries))
1312             self._manifest_text = self._api_response['manifest_text']
1313             # If not overriden via kwargs, we should try to load the
1314             # replication_desired from the API server
1315             if self.replication_desired is None:
1316                 self.replication_desired = self._api_response.get('replication_desired', None)
1317             return None
1318         except Exception as e:
1319             return e
1320
1321     def _populate_from_keep(self):
1322         # Retrieve a manifest directly from Keep. This has a chance of
1323         # working if [a] the locator includes a permission signature
1324         # or [b] the Keep services are operating in world-readable
1325         # mode. Return an exception, or None if successful.
1326         try:
1327             self._manifest_text = self._my_keep().get(
1328                 self._manifest_locator, num_retries=self.num_retries)
1329         except Exception as e:
1330             return e
1331
1332     def _populate(self):
1333         if self._manifest_locator is None and self._manifest_text is None:
1334             return
1335         error_via_api = None
1336         error_via_keep = None
1337         should_try_keep = ((self._manifest_text is None) and
1338                            util.keep_locator_pattern.match(
1339                                self._manifest_locator))
1340         if ((self._manifest_text is None) and
1341             util.signed_locator_pattern.match(self._manifest_locator)):
1342             error_via_keep = self._populate_from_keep()
1343         if self._manifest_text is None:
1344             error_via_api = self._populate_from_api_server()
1345             if error_via_api is not None and not should_try_keep:
1346                 raise error_via_api
1347         if ((self._manifest_text is None) and
1348             not error_via_keep and
1349             should_try_keep):
1350             # Looks like a keep locator, and we didn't already try keep above
1351             error_via_keep = self._populate_from_keep()
1352         if self._manifest_text is None:
1353             # Nothing worked!
1354             raise errors.NotFoundError(
1355                 ("Failed to retrieve collection '{}' " +
1356                  "from either API server ({}) or Keep ({})."
1357                  ).format(
1358                     self._manifest_locator,
1359                     error_via_api,
1360                     error_via_keep))
1361         # populate
1362         self._baseline_manifest = self._manifest_text
1363         self._import_manifest(self._manifest_text)
1364
1365
1366     def _has_collection_uuid(self):
1367         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1368
1369     def __enter__(self):
1370         return self
1371
1372     def __exit__(self, exc_type, exc_value, traceback):
1373         """Support scoped auto-commit in a with: block."""
1374         if exc_type is None:
1375             if self.writable() and self._has_collection_uuid():
1376                 self.save()
1377         self.stop_threads()
1378
1379     def stop_threads(self):
1380         if self._block_manager is not None:
1381             self._block_manager.stop_threads()
1382
1383     @synchronized
1384     def manifest_locator(self):
1385         """Get the manifest locator, if any.
1386
1387         The manifest locator will be set when the collection is loaded from an
1388         API server record or the portable data hash of a manifest.
1389
1390         The manifest locator will be None if the collection is newly created or
1391         was created directly from manifest text.  The method `save_new()` will
1392         assign a manifest locator.
1393
1394         """
1395         return self._manifest_locator
1396
1397     @synchronized
1398     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1399         if new_config is None:
1400             new_config = self._config
1401         if readonly:
1402             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1403         else:
1404             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1405
1406         newcollection._clonefrom(self)
1407         return newcollection
1408
1409     @synchronized
1410     def api_response(self):
1411         """Returns information about this Collection fetched from the API server.
1412
1413         If the Collection exists in Keep but not the API server, currently
1414         returns None.  Future versions may provide a synthetic response.
1415
1416         """
1417         return self._api_response
1418
1419     def find_or_create(self, path, create_type):
1420         """See `RichCollectionBase.find_or_create`"""
1421         if path == ".":
1422             return self
1423         else:
1424             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1425
1426     def find(self, path):
1427         """See `RichCollectionBase.find`"""
1428         if path == ".":
1429             return self
1430         else:
1431             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1432
1433     def remove(self, path, recursive=False):
1434         """See `RichCollectionBase.remove`"""
1435         if path == ".":
1436             raise errors.ArgumentError("Cannot remove '.'")
1437         else:
1438             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1439
1440     @must_be_writable
1441     @synchronized
1442     @retry_method
1443     def save(self, merge=True, num_retries=None):
1444         """Save collection to an existing collection record.
1445
1446         Commit pending buffer blocks to Keep, merge with remote record (if
1447         merge=True, the default), and update the collection record.  Returns
1448         the current manifest text.
1449
1450         Will raise AssertionError if not associated with a collection record on
1451         the API server.  If you want to save a manifest to Keep only, see
1452         `save_new()`.
1453
1454         :merge:
1455           Update and merge remote changes before saving.  Otherwise, any
1456           remote changes will be ignored and overwritten.
1457
1458         :num_retries:
1459           Retry count on API calls (if None,  use the collection default)
1460
1461         """
1462         if not self.committed():
1463             if not self._has_collection_uuid():
1464                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1465
1466             self._my_block_manager().commit_all()
1467
1468             if merge:
1469                 self.update()
1470
1471             text = self.manifest_text(strip=False)
1472             self._remember_api_response(self._my_api().collections().update(
1473                 uuid=self._manifest_locator,
1474                 body={'manifest_text': text}
1475                 ).execute(
1476                     num_retries=num_retries))
1477             self._manifest_text = self._api_response["manifest_text"]
1478             self._portable_data_hash = self._api_response["portable_data_hash"]
1479             self.set_committed()
1480
1481         return self._manifest_text
1482
1483
1484     @must_be_writable
1485     @synchronized
1486     @retry_method
1487     def save_new(self, name=None,
1488                  create_collection_record=True,
1489                  owner_uuid=None,
1490                  ensure_unique_name=False,
1491                  num_retries=None):
1492         """Save collection to a new collection record.
1493
1494         Commit pending buffer blocks to Keep and, when create_collection_record
1495         is True (default), create a new collection record.  After creating a
1496         new collection record, this Collection object will be associated with
1497         the new record used by `save()`.  Returns the current manifest text.
1498
1499         :name:
1500           The collection name.
1501
1502         :create_collection_record:
1503            If True, create a collection record on the API server.
1504            If False, only commit blocks to Keep and return the manifest text.
1505
1506         :owner_uuid:
1507           the user, or project uuid that will own this collection.
1508           If None, defaults to the current user.
1509
1510         :ensure_unique_name:
1511           If True, ask the API server to rename the collection
1512           if it conflicts with a collection with the same name and owner.  If
1513           False, a name conflict will result in an error.
1514
1515         :num_retries:
1516           Retry count on API calls (if None,  use the collection default)
1517
1518         """
1519         self._my_block_manager().commit_all()
1520         text = self.manifest_text(strip=False)
1521
1522         if create_collection_record:
1523             if name is None:
1524                 name = "New collection"
1525                 ensure_unique_name = True
1526
1527             body = {"manifest_text": text,
1528                     "name": name,
1529                     "replication_desired": self.replication_desired}
1530             if owner_uuid:
1531                 body["owner_uuid"] = owner_uuid
1532
1533             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1534             text = self._api_response["manifest_text"]
1535
1536             self._manifest_locator = self._api_response["uuid"]
1537             self._portable_data_hash = self._api_response["portable_data_hash"]
1538
1539             self._manifest_text = text
1540             self.set_committed()
1541
1542         return text
1543
1544     @synchronized
1545     def _import_manifest(self, manifest_text):
1546         """Import a manifest into a `Collection`.
1547
1548         :manifest_text:
1549           The manifest text to import from.
1550
1551         """
1552         if len(self) > 0:
1553             raise ArgumentError("Can only import manifest into an empty collection")
1554
1555         STREAM_NAME = 0
1556         BLOCKS = 1
1557         SEGMENTS = 2
1558
1559         stream_name = None
1560         state = STREAM_NAME
1561
1562         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1563             tok = token_and_separator.group(1)
1564             sep = token_and_separator.group(2)
1565
1566             if state == STREAM_NAME:
1567                 # starting a new stream
1568                 stream_name = tok.replace('\\040', ' ')
1569                 blocks = []
1570                 segments = []
1571                 streamoffset = 0L
1572                 state = BLOCKS
1573                 self.find_or_create(stream_name, COLLECTION)
1574                 continue
1575
1576             if state == BLOCKS:
1577                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1578                 if block_locator:
1579                     blocksize = long(block_locator.group(1))
1580                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1581                     streamoffset += blocksize
1582                 else:
1583                     state = SEGMENTS
1584
1585             if state == SEGMENTS:
1586                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1587                 if file_segment:
1588                     pos = long(file_segment.group(1))
1589                     size = long(file_segment.group(2))
1590                     name = file_segment.group(3).replace('\\040', ' ')
1591                     filepath = os.path.join(stream_name, name)
1592                     afile = self.find_or_create(filepath, FILE)
1593                     if isinstance(afile, ArvadosFile):
1594                         afile.add_segment(blocks, pos, size)
1595                     else:
1596                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1597                 else:
1598                     # error!
1599                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1600
1601             if sep == "\n":
1602                 stream_name = None
1603                 state = STREAM_NAME
1604
1605         self.set_committed()
1606
1607     @synchronized
1608     def notify(self, event, collection, name, item):
1609         if self._callback:
1610             self._callback(event, collection, name, item)
1611
1612
1613 class Subcollection(RichCollectionBase):
1614     """This is a subdirectory within a collection that doesn't have its own API
1615     server record.
1616
1617     Subcollection locking falls under the umbrella lock of its root collection.
1618
1619     """
1620
1621     def __init__(self, parent, name):
1622         super(Subcollection, self).__init__(parent)
1623         self.lock = self.root_collection().lock
1624         self._manifest_text = None
1625         self.name = name
1626         self.num_retries = parent.num_retries
1627
1628     def root_collection(self):
1629         return self.parent.root_collection()
1630
1631     def writable(self):
1632         return self.root_collection().writable()
1633
1634     def _my_api(self):
1635         return self.root_collection()._my_api()
1636
1637     def _my_keep(self):
1638         return self.root_collection()._my_keep()
1639
1640     def _my_block_manager(self):
1641         return self.root_collection()._my_block_manager()
1642
1643     def stream_name(self):
1644         return os.path.join(self.parent.stream_name(), self.name)
1645
1646     @synchronized
1647     def clone(self, new_parent, new_name):
1648         c = Subcollection(new_parent, new_name)
1649         c._clonefrom(self)
1650         return c
1651
1652     @must_be_writable
1653     @synchronized
1654     def _reparent(self, newparent, newname):
1655         self._committed = False
1656         self.flush()
1657         self.parent.remove(self.name, recursive=True)
1658         self.parent = newparent
1659         self.name = newname
1660         self.lock = self.parent.root_collection().lock
1661
1662
1663 class CollectionReader(Collection):
1664     """A read-only collection object.
1665
1666     Initialize from an api collection record locator, a portable data hash of a
1667     manifest, or raw manifest text.  See `Collection` constructor for detailed
1668     options.
1669
1670     """
1671     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1672         self._in_init = True
1673         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1674         self._in_init = False
1675
1676         # Forego any locking since it should never change once initialized.
1677         self.lock = NoopLock()
1678
1679         # Backwards compatability with old CollectionReader
1680         # all_streams() and all_files()
1681         self._streams = None
1682
1683     def writable(self):
1684         return self._in_init
1685
1686     def _populate_streams(orig_func):
1687         @functools.wraps(orig_func)
1688         def populate_streams_wrapper(self, *args, **kwargs):
1689             # Defer populating self._streams until needed since it creates a copy of the manifest.
1690             if self._streams is None:
1691                 if self._manifest_text:
1692                     self._streams = [sline.split()
1693                                      for sline in self._manifest_text.split("\n")
1694                                      if sline]
1695                 else:
1696                     self._streams = []
1697             return orig_func(self, *args, **kwargs)
1698         return populate_streams_wrapper
1699
1700     @_populate_streams
1701     def normalize(self):
1702         """Normalize the streams returned by `all_streams`.
1703
1704         This method is kept for backwards compatability and only affects the
1705         behavior of `all_streams()` and `all_files()`
1706
1707         """
1708
1709         # Rearrange streams
1710         streams = {}
1711         for s in self.all_streams():
1712             for f in s.all_files():
1713                 streamname, filename = split(s.name() + "/" + f.name())
1714                 if streamname not in streams:
1715                     streams[streamname] = {}
1716                 if filename not in streams[streamname]:
1717                     streams[streamname][filename] = []
1718                 for r in f.segments:
1719                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1720
1721         self._streams = [normalize_stream(s, streams[s])
1722                          for s in sorted(streams)]
1723     @_populate_streams
1724     def all_streams(self):
1725         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1726                 for s in self._streams]
1727
1728     @_populate_streams
1729     def all_files(self):
1730         for s in self.all_streams():
1731             for f in s.all_files():
1732                 yield f