10684: Add Arvados-specific search path to Python SDK arvados.util.ca_certs_path.
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace: '%s'" %
311                 (newstreamname))
312         self._current_stream_name = '.' if newstreamname=='' else newstreamname
313
314     def current_stream_name(self):
315         return self._current_stream_name
316
317     def finish_current_stream(self):
318         self.finish_current_file()
319         self.flush_data()
320         if not self._current_stream_files:
321             pass
322         elif self._current_stream_name is None:
323             raise errors.AssertionError(
324                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
325                 (self._current_stream_length, len(self._current_stream_files)))
326         else:
327             if not self._current_stream_locators:
328                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
329             self._finished_streams.append([self._current_stream_name,
330                                            self._current_stream_locators,
331                                            self._current_stream_files])
332         self._current_stream_files = []
333         self._current_stream_length = 0
334         self._current_stream_locators = []
335         self._current_stream_name = None
336         self._current_file_pos = 0
337         self._current_file_name = None
338
339     def finish(self):
340         """Store the manifest in Keep and return its locator.
341
342         This is useful for storing manifest fragments (task outputs)
343         temporarily in Keep during a Crunch job.
344
345         In other cases you should make a collection instead, by
346         sending manifest_text() to the API server's "create
347         collection" endpoint.
348         """
349         return self._my_keep().put(self.manifest_text(), copies=self.replication)
350
351     def portable_data_hash(self):
352         stripped = self.stripped_manifest()
353         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
354
355     def manifest_text(self):
356         self.finish_current_stream()
357         manifest = ''
358
359         for stream in self._finished_streams:
360             if not re.search(r'^\.(/.*)?$', stream[0]):
361                 manifest += './'
362             manifest += stream[0].replace(' ', '\\040')
363             manifest += ' ' + ' '.join(stream[1])
364             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
365             manifest += "\n"
366
367         return manifest
368
369     def data_locators(self):
370         ret = []
371         for name, locators, files in self._finished_streams:
372             ret += locators
373         return ret
374
375
376 class ResumableCollectionWriter(CollectionWriter):
377     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378                    '_current_stream_locators', '_current_stream_name',
379                    '_current_file_name', '_current_file_pos', '_close_file',
380                    '_data_buffer', '_dependencies', '_finished_streams',
381                    '_queued_dirents', '_queued_trees']
382
383     def __init__(self, api_client=None, **kwargs):
384         self._dependencies = {}
385         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
386
387     @classmethod
388     def from_state(cls, state, *init_args, **init_kwargs):
389         # Try to build a new writer from scratch with the given state.
390         # If the state is not suitable to resume (because files have changed,
391         # been deleted, aren't predictable, etc.), raise a
392         # StaleWriterStateError.  Otherwise, return the initialized writer.
393         # The caller is responsible for calling writer.do_queued_work()
394         # appropriately after it's returned.
395         writer = cls(*init_args, **init_kwargs)
396         for attr_name in cls.STATE_PROPS:
397             attr_value = state[attr_name]
398             attr_class = getattr(writer, attr_name).__class__
399             # Coerce the value into the same type as the initial value, if
400             # needed.
401             if attr_class not in (type(None), attr_value.__class__):
402                 attr_value = attr_class(attr_value)
403             setattr(writer, attr_name, attr_value)
404         # Check dependencies before we try to resume anything.
405         if any(KeepLocator(ls).permission_expired()
406                for ls in writer._current_stream_locators):
407             raise errors.StaleWriterStateError(
408                 "locators include expired permission hint")
409         writer.check_dependencies()
410         if state['_current_file'] is not None:
411             path, pos = state['_current_file']
412             try:
413                 writer._queued_file = open(path, 'rb')
414                 writer._queued_file.seek(pos)
415             except IOError as error:
416                 raise errors.StaleWriterStateError(
417                     "failed to reopen active file {}: {}".format(path, error))
418         return writer
419
420     def check_dependencies(self):
421         for path, orig_stat in self._dependencies.items():
422             if not S_ISREG(orig_stat[ST_MODE]):
423                 raise errors.StaleWriterStateError("{} not file".format(path))
424             try:
425                 now_stat = tuple(os.stat(path))
426             except OSError as error:
427                 raise errors.StaleWriterStateError(
428                     "failed to stat {}: {}".format(path, error))
429             if ((not S_ISREG(now_stat[ST_MODE])) or
430                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
431                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
432                 raise errors.StaleWriterStateError("{} changed".format(path))
433
434     def dump_state(self, copy_func=lambda x: x):
435         state = {attr: copy_func(getattr(self, attr))
436                  for attr in self.STATE_PROPS}
437         if self._queued_file is None:
438             state['_current_file'] = None
439         else:
440             state['_current_file'] = (os.path.realpath(self._queued_file.name),
441                                       self._queued_file.tell())
442         return state
443
444     def _queue_file(self, source, filename=None):
445         try:
446             src_path = os.path.realpath(source)
447         except Exception:
448             raise errors.AssertionError("{} not a file path".format(source))
449         try:
450             path_stat = os.stat(src_path)
451         except OSError as stat_error:
452             path_stat = None
453         super(ResumableCollectionWriter, self)._queue_file(source, filename)
454         fd_stat = os.fstat(self._queued_file.fileno())
455         if not S_ISREG(fd_stat.st_mode):
456             # We won't be able to resume from this cache anyway, so don't
457             # worry about further checks.
458             self._dependencies[source] = tuple(fd_stat)
459         elif path_stat is None:
460             raise errors.AssertionError(
461                 "could not stat {}: {}".format(source, stat_error))
462         elif path_stat.st_ino != fd_stat.st_ino:
463             raise errors.AssertionError(
464                 "{} changed between open and stat calls".format(source))
465         else:
466             self._dependencies[src_path] = tuple(fd_stat)
467
468     def write(self, data):
469         if self._queued_file is None:
470             raise errors.AssertionError(
471                 "resumable writer can't accept unsourced data")
472         return super(ResumableCollectionWriter, self).write(data)
473
474
475 ADD = "add"
476 DEL = "del"
477 MOD = "mod"
478 TOK = "tok"
479 FILE = "file"
480 COLLECTION = "collection"
481
482 class RichCollectionBase(CollectionBase):
483     """Base class for Collections and Subcollections.
484
485     Implements the majority of functionality relating to accessing items in the
486     Collection.
487
488     """
489
490     def __init__(self, parent=None):
491         self.parent = parent
492         self._committed = False
493         self._callback = None
494         self._items = {}
495
496     def _my_api(self):
497         raise NotImplementedError()
498
499     def _my_keep(self):
500         raise NotImplementedError()
501
502     def _my_block_manager(self):
503         raise NotImplementedError()
504
505     def writable(self):
506         raise NotImplementedError()
507
508     def root_collection(self):
509         raise NotImplementedError()
510
511     def notify(self, event, collection, name, item):
512         raise NotImplementedError()
513
514     def stream_name(self):
515         raise NotImplementedError()
516
517     @must_be_writable
518     @synchronized
519     def find_or_create(self, path, create_type):
520         """Recursively search the specified file path.
521
522         May return either a `Collection` or `ArvadosFile`.  If not found, will
523         create a new item at the specified path based on `create_type`.  Will
524         create intermediate subcollections needed to contain the final item in
525         the path.
526
527         :create_type:
528           One of `arvados.collection.FILE` or
529           `arvados.collection.COLLECTION`.  If the path is not found, and value
530           of create_type is FILE then create and return a new ArvadosFile for
531           the last path component.  If COLLECTION, then create and return a new
532           Collection for the last path component.
533
534         """
535
536         pathcomponents = path.split("/", 1)
537         if pathcomponents[0]:
538             item = self._items.get(pathcomponents[0])
539             if len(pathcomponents) == 1:
540                 if item is None:
541                     # create new file
542                     if create_type == COLLECTION:
543                         item = Subcollection(self, pathcomponents[0])
544                     else:
545                         item = ArvadosFile(self, pathcomponents[0])
546                     self._items[pathcomponents[0]] = item
547                     self._committed = False
548                     self.notify(ADD, self, pathcomponents[0], item)
549                 return item
550             else:
551                 if item is None:
552                     # create new collection
553                     item = Subcollection(self, pathcomponents[0])
554                     self._items[pathcomponents[0]] = item
555                     self._committed = False
556                     self.notify(ADD, self, pathcomponents[0], item)
557                 if isinstance(item, RichCollectionBase):
558                     return item.find_or_create(pathcomponents[1], create_type)
559                 else:
560                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
561         else:
562             return self
563
564     @synchronized
565     def find(self, path):
566         """Recursively search the specified file path.
567
568         May return either a Collection or ArvadosFile.  Return None if not
569         found.
570
571         """
572         if not path:
573             raise errors.ArgumentError("Parameter 'path' is empty.")
574
575         pathcomponents = path.split("/", 1)
576         item = self._items.get(pathcomponents[0])
577         if len(pathcomponents) == 1:
578             return item
579         else:
580             if isinstance(item, RichCollectionBase):
581                 if pathcomponents[1]:
582                     return item.find(pathcomponents[1])
583                 else:
584                     return item
585             else:
586                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
587
588     @synchronized
589     def mkdirs(self, path):
590         """Recursive subcollection create.
591
592         Like `os.makedirs()`.  Will create intermediate subcollections needed
593         to contain the leaf subcollection path.
594
595         """
596
597         if self.find(path) != None:
598             raise IOError(errno.EEXIST, "Directory or file exists", path)
599
600         return self.find_or_create(path, COLLECTION)
601
602     def open(self, path, mode="r"):
603         """Open a file-like object for access.
604
605         :path:
606           path to a file in the collection
607         :mode:
608           one of "r", "r+", "w", "w+", "a", "a+"
609           :"r":
610             opens for reading
611           :"r+":
612             opens for reading and writing.  Reads/writes share a file pointer.
613           :"w", "w+":
614             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
615           :"a", "a+":
616             opens for reading and writing.  All writes are appended to
617             the end of the file.  Writing does not affect the file pointer for
618             reading.
619         """
620         mode = mode.replace("b", "")
621         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
622             raise errors.ArgumentError("Bad mode '%s'" % mode)
623         create = (mode != "r")
624
625         if create and not self.writable():
626             raise IOError(errno.EROFS, "Collection is read only")
627
628         if create:
629             arvfile = self.find_or_create(path, FILE)
630         else:
631             arvfile = self.find(path)
632
633         if arvfile is None:
634             raise IOError(errno.ENOENT, "File not found", path)
635         if not isinstance(arvfile, ArvadosFile):
636             raise IOError(errno.EISDIR, "Is a directory", path)
637
638         if mode[0] == "w":
639             arvfile.truncate(0)
640
641         name = os.path.basename(path)
642
643         if mode == "r":
644             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
645         else:
646             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
647
648     def modified(self):
649         """Determine if the collection has been modified since last commited."""
650         return not self.committed()
651
652     @synchronized
653     def committed(self):
654         """Determine if the collection has been committed to the API server."""
655
656         if self._committed is False:
657             return False
658         for v in self._items.values():
659             if v.committed() is False:
660                 return False
661         return True
662
663     @synchronized
664     def set_committed(self):
665         """Recursively set committed flag to True."""
666         self._committed = True
667         for k,v in self._items.items():
668             v.set_committed()
669
670     @synchronized
671     def __iter__(self):
672         """Iterate over names of files and collections contained in this collection."""
673         return iter(self._items.keys())
674
675     @synchronized
676     def __getitem__(self, k):
677         """Get a file or collection that is directly contained by this collection.
678
679         If you want to search a path, use `find()` instead.
680
681         """
682         return self._items[k]
683
684     @synchronized
685     def __contains__(self, k):
686         """Test if there is a file or collection a directly contained by this collection."""
687         return k in self._items
688
689     @synchronized
690     def __len__(self):
691         """Get the number of items directly contained in this collection."""
692         return len(self._items)
693
694     @must_be_writable
695     @synchronized
696     def __delitem__(self, p):
697         """Delete an item by name which is directly contained by this collection."""
698         del self._items[p]
699         self._committed = False
700         self.notify(DEL, self, p, None)
701
702     @synchronized
703     def keys(self):
704         """Get a list of names of files and collections directly contained in this collection."""
705         return self._items.keys()
706
707     @synchronized
708     def values(self):
709         """Get a list of files and collection objects directly contained in this collection."""
710         return self._items.values()
711
712     @synchronized
713     def items(self):
714         """Get a list of (name, object) tuples directly contained in this collection."""
715         return self._items.items()
716
717     def exists(self, path):
718         """Test if there is a file or collection at `path`."""
719         return self.find(path) is not None
720
721     @must_be_writable
722     @synchronized
723     def remove(self, path, recursive=False):
724         """Remove the file or subcollection (directory) at `path`.
725
726         :recursive:
727           Specify whether to remove non-empty subcollections (True), or raise an error (False).
728         """
729
730         if not path:
731             raise errors.ArgumentError("Parameter 'path' is empty.")
732
733         pathcomponents = path.split("/", 1)
734         item = self._items.get(pathcomponents[0])
735         if item is None:
736             raise IOError(errno.ENOENT, "File not found", path)
737         if len(pathcomponents) == 1:
738             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
739                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
740             deleteditem = self._items[pathcomponents[0]]
741             del self._items[pathcomponents[0]]
742             self._committed = False
743             self.notify(DEL, self, pathcomponents[0], deleteditem)
744         else:
745             item.remove(pathcomponents[1])
746
747     def _clonefrom(self, source):
748         for k,v in source.items():
749             self._items[k] = v.clone(self, k)
750
751     def clone(self):
752         raise NotImplementedError()
753
754     @must_be_writable
755     @synchronized
756     def add(self, source_obj, target_name, overwrite=False, reparent=False):
757         """Copy or move a file or subcollection to this collection.
758
759         :source_obj:
760           An ArvadosFile, or Subcollection object
761
762         :target_name:
763           Destination item name.  If the target name already exists and is a
764           file, this will raise an error unless you specify `overwrite=True`.
765
766         :overwrite:
767           Whether to overwrite target file if it already exists.
768
769         :reparent:
770           If True, source_obj will be moved from its parent collection to this collection.
771           If False, source_obj will be copied and the parent collection will be
772           unmodified.
773
774         """
775
776         if target_name in self and not overwrite:
777             raise IOError(errno.EEXIST, "File already exists", target_name)
778
779         modified_from = None
780         if target_name in self:
781             modified_from = self[target_name]
782
783         # Actually make the move or copy.
784         if reparent:
785             source_obj._reparent(self, target_name)
786             item = source_obj
787         else:
788             item = source_obj.clone(self, target_name)
789
790         self._items[target_name] = item
791         self._committed = False
792
793         if modified_from:
794             self.notify(MOD, self, target_name, (modified_from, item))
795         else:
796             self.notify(ADD, self, target_name, item)
797
798     def _get_src_target(self, source, target_path, source_collection, create_dest):
799         if source_collection is None:
800             source_collection = self
801
802         # Find the object
803         if isinstance(source, basestring):
804             source_obj = source_collection.find(source)
805             if source_obj is None:
806                 raise IOError(errno.ENOENT, "File not found", source)
807             sourcecomponents = source.split("/")
808         else:
809             source_obj = source
810             sourcecomponents = None
811
812         # Find parent collection the target path
813         targetcomponents = target_path.split("/")
814
815         # Determine the name to use.
816         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
817
818         if not target_name:
819             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
820
821         if create_dest:
822             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
823         else:
824             if len(targetcomponents) > 1:
825                 target_dir = self.find("/".join(targetcomponents[0:-1]))
826             else:
827                 target_dir = self
828
829         if target_dir is None:
830             raise IOError(errno.ENOENT, "Target directory not found", target_name)
831
832         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
833             target_dir = target_dir[target_name]
834             target_name = sourcecomponents[-1]
835
836         return (source_obj, target_dir, target_name)
837
838     @must_be_writable
839     @synchronized
840     def copy(self, source, target_path, source_collection=None, overwrite=False):
841         """Copy a file or subcollection to a new path in this collection.
842
843         :source:
844           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
845
846         :target_path:
847           Destination file or path.  If the target path already exists and is a
848           subcollection, the item will be placed inside the subcollection.  If
849           the target path already exists and is a file, this will raise an error
850           unless you specify `overwrite=True`.
851
852         :source_collection:
853           Collection to copy `source_path` from (default `self`)
854
855         :overwrite:
856           Whether to overwrite target file if it already exists.
857         """
858
859         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
860         target_dir.add(source_obj, target_name, overwrite, False)
861
862     @must_be_writable
863     @synchronized
864     def rename(self, source, target_path, source_collection=None, overwrite=False):
865         """Move a file or subcollection from `source_collection` to a new path in this collection.
866
867         :source:
868           A string with a path to source file or subcollection.
869
870         :target_path:
871           Destination file or path.  If the target path already exists and is a
872           subcollection, the item will be placed inside the subcollection.  If
873           the target path already exists and is a file, this will raise an error
874           unless you specify `overwrite=True`.
875
876         :source_collection:
877           Collection to copy `source_path` from (default `self`)
878
879         :overwrite:
880           Whether to overwrite target file if it already exists.
881         """
882
883         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
884         if not source_obj.writable():
885             raise IOError(errno.EROFS, "Source collection is read only", source)
886         target_dir.add(source_obj, target_name, overwrite, True)
887
888     def portable_manifest_text(self, stream_name="."):
889         """Get the manifest text for this collection, sub collections and files.
890
891         This method does not flush outstanding blocks to Keep.  It will return
892         a normalized manifest with access tokens stripped.
893
894         :stream_name:
895           Name to use for this stream (directory)
896
897         """
898         return self._get_manifest_text(stream_name, True, True)
899
900     @synchronized
901     def manifest_text(self, stream_name=".", strip=False, normalize=False):
902         """Get the manifest text for this collection, sub collections and files.
903
904         This method will flush outstanding blocks to Keep.  By default, it will
905         not normalize an unmodified manifest or strip access tokens.
906
907         :stream_name:
908           Name to use for this stream (directory)
909
910         :strip:
911           If True, remove signing tokens from block locators if present.
912           If False (default), block locators are left unchanged.
913
914         :normalize:
915           If True, always export the manifest text in normalized form
916           even if the Collection is not modified.  If False (default) and the collection
917           is not modified, return the original manifest text even if it is not
918           in normalized form.
919
920         """
921
922         self._my_block_manager().commit_all()
923         return self._get_manifest_text(stream_name, strip, normalize)
924
925     @synchronized
926     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
927         """Get the manifest text for this collection, sub collections and files.
928
929         :stream_name:
930           Name to use for this stream (directory)
931
932         :strip:
933           If True, remove signing tokens from block locators if present.
934           If False (default), block locators are left unchanged.
935
936         :normalize:
937           If True, always export the manifest text in normalized form
938           even if the Collection is not modified.  If False (default) and the collection
939           is not modified, return the original manifest text even if it is not
940           in normalized form.
941
942         :only_committed:
943           If True, only include blocks that were already committed to Keep.
944
945         """
946
947         if not self.committed() or self._manifest_text is None or normalize:
948             stream = {}
949             buf = []
950             sorted_keys = sorted(self.keys())
951             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
952                 # Create a stream per file `k`
953                 arvfile = self[filename]
954                 filestream = []
955                 for segment in arvfile.segments():
956                     loc = segment.locator
957                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
958                         if only_committed:
959                             continue
960                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
961                     if strip:
962                         loc = KeepLocator(loc).stripped()
963                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
964                                          segment.segment_offset, segment.range_size))
965                 stream[filename] = filestream
966             if stream:
967                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
968             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
969                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
970             return "".join(buf)
971         else:
972             if strip:
973                 return self.stripped_manifest()
974             else:
975                 return self._manifest_text
976
977     @synchronized
978     def diff(self, end_collection, prefix=".", holding_collection=None):
979         """Generate list of add/modify/delete actions.
980
981         When given to `apply`, will change `self` to match `end_collection`
982
983         """
984         changes = []
985         if holding_collection is None:
986             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
987         for k in self:
988             if k not in end_collection:
989                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
990         for k in end_collection:
991             if k in self:
992                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
993                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
994                 elif end_collection[k] != self[k]:
995                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
996                 else:
997                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
998             else:
999                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1000         return changes
1001
1002     @must_be_writable
1003     @synchronized
1004     def apply(self, changes):
1005         """Apply changes from `diff`.
1006
1007         If a change conflicts with a local change, it will be saved to an
1008         alternate path indicating the conflict.
1009
1010         """
1011         if changes:
1012             self._committed = False
1013         for change in changes:
1014             event_type = change[0]
1015             path = change[1]
1016             initial = change[2]
1017             local = self.find(path)
1018             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1019                                                                     time.gmtime()))
1020             if event_type == ADD:
1021                 if local is None:
1022                     # No local file at path, safe to copy over new file
1023                     self.copy(initial, path)
1024                 elif local is not None and local != initial:
1025                     # There is already local file and it is different:
1026                     # save change to conflict file.
1027                     self.copy(initial, conflictpath)
1028             elif event_type == MOD or event_type == TOK:
1029                 final = change[3]
1030                 if local == initial:
1031                     # Local matches the "initial" item so it has not
1032                     # changed locally and is safe to update.
1033                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1034                         # Replace contents of local file with new contents
1035                         local.replace_contents(final)
1036                     else:
1037                         # Overwrite path with new item; this can happen if
1038                         # path was a file and is now a collection or vice versa
1039                         self.copy(final, path, overwrite=True)
1040                 else:
1041                     # Local is missing (presumably deleted) or local doesn't
1042                     # match the "start" value, so save change to conflict file
1043                     self.copy(final, conflictpath)
1044             elif event_type == DEL:
1045                 if local == initial:
1046                     # Local item matches "initial" value, so it is safe to remove.
1047                     self.remove(path, recursive=True)
1048                 # else, the file is modified or already removed, in either
1049                 # case we don't want to try to remove it.
1050
1051     def portable_data_hash(self):
1052         """Get the portable data hash for this collection's manifest."""
1053         stripped = self.portable_manifest_text()
1054         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1055
1056     @synchronized
1057     def subscribe(self, callback):
1058         if self._callback is None:
1059             self._callback = callback
1060         else:
1061             raise errors.ArgumentError("A callback is already set on this collection.")
1062
1063     @synchronized
1064     def unsubscribe(self):
1065         if self._callback is not None:
1066             self._callback = None
1067
1068     @synchronized
1069     def notify(self, event, collection, name, item):
1070         if self._callback:
1071             self._callback(event, collection, name, item)
1072         self.root_collection().notify(event, collection, name, item)
1073
1074     @synchronized
1075     def __eq__(self, other):
1076         if other is self:
1077             return True
1078         if not isinstance(other, RichCollectionBase):
1079             return False
1080         if len(self._items) != len(other):
1081             return False
1082         for k in self._items:
1083             if k not in other:
1084                 return False
1085             if self._items[k] != other[k]:
1086                 return False
1087         return True
1088
1089     def __ne__(self, other):
1090         return not self.__eq__(other)
1091
1092     @synchronized
1093     def flush(self):
1094         """Flush bufferblocks to Keep."""
1095         for e in self.values():
1096             e.flush()
1097
1098
1099 class Collection(RichCollectionBase):
1100     """Represents the root of an Arvados Collection.
1101
1102     This class is threadsafe.  The root collection object, all subcollections
1103     and files are protected by a single lock (i.e. each access locks the entire
1104     collection).
1105
1106     Brief summary of
1107     useful methods:
1108
1109     :To read an existing file:
1110       `c.open("myfile", "r")`
1111
1112     :To write a new file:
1113       `c.open("myfile", "w")`
1114
1115     :To determine if a file exists:
1116       `c.find("myfile") is not None`
1117
1118     :To copy a file:
1119       `c.copy("source", "dest")`
1120
1121     :To delete a file:
1122       `c.remove("myfile")`
1123
1124     :To save to an existing collection record:
1125       `c.save()`
1126
1127     :To save a new collection record:
1128     `c.save_new()`
1129
1130     :To merge remote changes into this object:
1131       `c.update()`
1132
1133     Must be associated with an API server Collection record (during
1134     initialization, or using `save_new`) to use `save` or `update`
1135
1136     """
1137
1138     def __init__(self, manifest_locator_or_text=None,
1139                  api_client=None,
1140                  keep_client=None,
1141                  num_retries=None,
1142                  parent=None,
1143                  apiconfig=None,
1144                  block_manager=None,
1145                  replication_desired=None):
1146         """Collection constructor.
1147
1148         :manifest_locator_or_text:
1149           One of Arvados collection UUID, block locator of
1150           a manifest, raw manifest text, or None (to create an empty collection).
1151         :parent:
1152           the parent Collection, may be None.
1153
1154         :apiconfig:
1155           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1156           Prefer this over supplying your own api_client and keep_client (except in testing).
1157           Will use default config settings if not specified.
1158
1159         :api_client:
1160           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1161
1162         :keep_client:
1163           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1164
1165         :num_retries:
1166           the number of retries for API and Keep requests.
1167
1168         :block_manager:
1169           the block manager to use.  If not specified, create one.
1170
1171         :replication_desired:
1172           How many copies should Arvados maintain. If None, API server default
1173           configuration applies. If not None, this value will also be used
1174           for determining the number of block copies being written.
1175
1176         """
1177         super(Collection, self).__init__(parent)
1178         self._api_client = api_client
1179         self._keep_client = keep_client
1180         self._block_manager = block_manager
1181         self.replication_desired = replication_desired
1182
1183         if apiconfig:
1184             self._config = apiconfig
1185         else:
1186             self._config = config.settings()
1187
1188         self.num_retries = num_retries if num_retries is not None else 0
1189         self._manifest_locator = None
1190         self._manifest_text = None
1191         self._api_response = None
1192         self._past_versions = set()
1193
1194         self.lock = threading.RLock()
1195         self.events = None
1196
1197         if manifest_locator_or_text:
1198             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1199                 self._manifest_locator = manifest_locator_or_text
1200             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1201                 self._manifest_locator = manifest_locator_or_text
1202             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1203                 self._manifest_text = manifest_locator_or_text
1204             else:
1205                 raise errors.ArgumentError(
1206                     "Argument to CollectionReader is not a manifest or a collection UUID")
1207
1208             try:
1209                 self._populate()
1210             except (IOError, errors.SyntaxError) as e:
1211                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1212
1213     def root_collection(self):
1214         return self
1215
1216     def stream_name(self):
1217         return "."
1218
1219     def writable(self):
1220         return True
1221
1222     @synchronized
1223     def known_past_version(self, modified_at_and_portable_data_hash):
1224         return modified_at_and_portable_data_hash in self._past_versions
1225
1226     @synchronized
1227     @retry_method
1228     def update(self, other=None, num_retries=None):
1229         """Merge the latest collection on the API server with the current collection."""
1230
1231         if other is None:
1232             if self._manifest_locator is None:
1233                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1234             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1235             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1236                 response.get("portable_data_hash") != self.portable_data_hash()):
1237                 # The record on the server is different from our current one, but we've seen it before,
1238                 # so ignore it because it's already been merged.
1239                 # However, if it's the same as our current record, proceed with the update, because we want to update
1240                 # our tokens.
1241                 return
1242             else:
1243                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1244             other = CollectionReader(response["manifest_text"])
1245         baseline = CollectionReader(self._manifest_text)
1246         self.apply(baseline.diff(other))
1247         self._manifest_text = self.manifest_text()
1248
1249     @synchronized
1250     def _my_api(self):
1251         if self._api_client is None:
1252             self._api_client = ThreadSafeApiCache(self._config)
1253             if self._keep_client is None:
1254                 self._keep_client = self._api_client.keep
1255         return self._api_client
1256
1257     @synchronized
1258     def _my_keep(self):
1259         if self._keep_client is None:
1260             if self._api_client is None:
1261                 self._my_api()
1262             else:
1263                 self._keep_client = KeepClient(api_client=self._api_client)
1264         return self._keep_client
1265
1266     @synchronized
1267     def _my_block_manager(self):
1268         if self._block_manager is None:
1269             copies = (self.replication_desired or
1270                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1271                                                    2))
1272             self._block_manager = _BlockManager(self._my_keep(), copies=copies)
1273         return self._block_manager
1274
1275     def _remember_api_response(self, response):
1276         self._api_response = response
1277         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1278
1279     def _populate_from_api_server(self):
1280         # As in KeepClient itself, we must wait until the last
1281         # possible moment to instantiate an API client, in order to
1282         # avoid tripping up clients that don't have access to an API
1283         # server.  If we do build one, make sure our Keep client uses
1284         # it.  If instantiation fails, we'll fall back to the except
1285         # clause, just like any other Collection lookup
1286         # failure. Return an exception, or None if successful.
1287         try:
1288             self._remember_api_response(self._my_api().collections().get(
1289                 uuid=self._manifest_locator).execute(
1290                     num_retries=self.num_retries))
1291             self._manifest_text = self._api_response['manifest_text']
1292             # If not overriden via kwargs, we should try to load the
1293             # replication_desired from the API server
1294             if self.replication_desired is None:
1295                 self.replication_desired = self._api_response.get('replication_desired', None)
1296             return None
1297         except Exception as e:
1298             return e
1299
1300     def _populate_from_keep(self):
1301         # Retrieve a manifest directly from Keep. This has a chance of
1302         # working if [a] the locator includes a permission signature
1303         # or [b] the Keep services are operating in world-readable
1304         # mode. Return an exception, or None if successful.
1305         try:
1306             self._manifest_text = self._my_keep().get(
1307                 self._manifest_locator, num_retries=self.num_retries)
1308         except Exception as e:
1309             return e
1310
1311     def _populate(self):
1312         if self._manifest_locator is None and self._manifest_text is None:
1313             return
1314         error_via_api = None
1315         error_via_keep = None
1316         should_try_keep = ((self._manifest_text is None) and
1317                            util.keep_locator_pattern.match(
1318                                self._manifest_locator))
1319         if ((self._manifest_text is None) and
1320             util.signed_locator_pattern.match(self._manifest_locator)):
1321             error_via_keep = self._populate_from_keep()
1322         if self._manifest_text is None:
1323             error_via_api = self._populate_from_api_server()
1324             if error_via_api is not None and not should_try_keep:
1325                 raise error_via_api
1326         if ((self._manifest_text is None) and
1327             not error_via_keep and
1328             should_try_keep):
1329             # Looks like a keep locator, and we didn't already try keep above
1330             error_via_keep = self._populate_from_keep()
1331         if self._manifest_text is None:
1332             # Nothing worked!
1333             raise errors.NotFoundError(
1334                 ("Failed to retrieve collection '{}' " +
1335                  "from either API server ({}) or Keep ({})."
1336                  ).format(
1337                     self._manifest_locator,
1338                     error_via_api,
1339                     error_via_keep))
1340         # populate
1341         self._baseline_manifest = self._manifest_text
1342         self._import_manifest(self._manifest_text)
1343
1344
1345     def _has_collection_uuid(self):
1346         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1347
1348     def __enter__(self):
1349         return self
1350
1351     def __exit__(self, exc_type, exc_value, traceback):
1352         """Support scoped auto-commit in a with: block."""
1353         if exc_type is None:
1354             if self.writable() and self._has_collection_uuid():
1355                 self.save()
1356         self.stop_threads()
1357
1358     def stop_threads(self):
1359         if self._block_manager is not None:
1360             self._block_manager.stop_threads()
1361
1362     @synchronized
1363     def manifest_locator(self):
1364         """Get the manifest locator, if any.
1365
1366         The manifest locator will be set when the collection is loaded from an
1367         API server record or the portable data hash of a manifest.
1368
1369         The manifest locator will be None if the collection is newly created or
1370         was created directly from manifest text.  The method `save_new()` will
1371         assign a manifest locator.
1372
1373         """
1374         return self._manifest_locator
1375
1376     @synchronized
1377     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1378         if new_config is None:
1379             new_config = self._config
1380         if readonly:
1381             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1382         else:
1383             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1384
1385         newcollection._clonefrom(self)
1386         return newcollection
1387
1388     @synchronized
1389     def api_response(self):
1390         """Returns information about this Collection fetched from the API server.
1391
1392         If the Collection exists in Keep but not the API server, currently
1393         returns None.  Future versions may provide a synthetic response.
1394
1395         """
1396         return self._api_response
1397
1398     def find_or_create(self, path, create_type):
1399         """See `RichCollectionBase.find_or_create`"""
1400         if path == ".":
1401             return self
1402         else:
1403             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1404
1405     def find(self, path):
1406         """See `RichCollectionBase.find`"""
1407         if path == ".":
1408             return self
1409         else:
1410             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1411
1412     def remove(self, path, recursive=False):
1413         """See `RichCollectionBase.remove`"""
1414         if path == ".":
1415             raise errors.ArgumentError("Cannot remove '.'")
1416         else:
1417             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1418
1419     @must_be_writable
1420     @synchronized
1421     @retry_method
1422     def save(self, merge=True, num_retries=None):
1423         """Save collection to an existing collection record.
1424
1425         Commit pending buffer blocks to Keep, merge with remote record (if
1426         merge=True, the default), and update the collection record.  Returns
1427         the current manifest text.
1428
1429         Will raise AssertionError if not associated with a collection record on
1430         the API server.  If you want to save a manifest to Keep only, see
1431         `save_new()`.
1432
1433         :merge:
1434           Update and merge remote changes before saving.  Otherwise, any
1435           remote changes will be ignored and overwritten.
1436
1437         :num_retries:
1438           Retry count on API calls (if None,  use the collection default)
1439
1440         """
1441         if not self.committed():
1442             if not self._has_collection_uuid():
1443                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1444
1445             self._my_block_manager().commit_all()
1446
1447             if merge:
1448                 self.update()
1449
1450             text = self.manifest_text(strip=False)
1451             self._remember_api_response(self._my_api().collections().update(
1452                 uuid=self._manifest_locator,
1453                 body={'manifest_text': text}
1454                 ).execute(
1455                     num_retries=num_retries))
1456             self._manifest_text = self._api_response["manifest_text"]
1457             self.set_committed()
1458
1459         return self._manifest_text
1460
1461
1462     @must_be_writable
1463     @synchronized
1464     @retry_method
1465     def save_new(self, name=None,
1466                  create_collection_record=True,
1467                  owner_uuid=None,
1468                  ensure_unique_name=False,
1469                  num_retries=None):
1470         """Save collection to a new collection record.
1471
1472         Commit pending buffer blocks to Keep and, when create_collection_record
1473         is True (default), create a new collection record.  After creating a
1474         new collection record, this Collection object will be associated with
1475         the new record used by `save()`.  Returns the current manifest text.
1476
1477         :name:
1478           The collection name.
1479
1480         :create_collection_record:
1481            If True, create a collection record on the API server.
1482            If False, only commit blocks to Keep and return the manifest text.
1483
1484         :owner_uuid:
1485           the user, or project uuid that will own this collection.
1486           If None, defaults to the current user.
1487
1488         :ensure_unique_name:
1489           If True, ask the API server to rename the collection
1490           if it conflicts with a collection with the same name and owner.  If
1491           False, a name conflict will result in an error.
1492
1493         :num_retries:
1494           Retry count on API calls (if None,  use the collection default)
1495
1496         """
1497         self._my_block_manager().commit_all()
1498         text = self.manifest_text(strip=False)
1499
1500         if create_collection_record:
1501             if name is None:
1502                 name = "New collection"
1503                 ensure_unique_name = True
1504
1505             body = {"manifest_text": text,
1506                     "name": name,
1507                     "replication_desired": self.replication_desired}
1508             if owner_uuid:
1509                 body["owner_uuid"] = owner_uuid
1510
1511             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1512             text = self._api_response["manifest_text"]
1513
1514             self._manifest_locator = self._api_response["uuid"]
1515
1516             self._manifest_text = text
1517             self.set_committed()
1518
1519         return text
1520
1521     @synchronized
1522     def _import_manifest(self, manifest_text):
1523         """Import a manifest into a `Collection`.
1524
1525         :manifest_text:
1526           The manifest text to import from.
1527
1528         """
1529         if len(self) > 0:
1530             raise ArgumentError("Can only import manifest into an empty collection")
1531
1532         STREAM_NAME = 0
1533         BLOCKS = 1
1534         SEGMENTS = 2
1535
1536         stream_name = None
1537         state = STREAM_NAME
1538
1539         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1540             tok = token_and_separator.group(1)
1541             sep = token_and_separator.group(2)
1542
1543             if state == STREAM_NAME:
1544                 # starting a new stream
1545                 stream_name = tok.replace('\\040', ' ')
1546                 blocks = []
1547                 segments = []
1548                 streamoffset = 0L
1549                 state = BLOCKS
1550                 self.find_or_create(stream_name, COLLECTION)
1551                 continue
1552
1553             if state == BLOCKS:
1554                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1555                 if block_locator:
1556                     blocksize = long(block_locator.group(1))
1557                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1558                     streamoffset += blocksize
1559                 else:
1560                     state = SEGMENTS
1561
1562             if state == SEGMENTS:
1563                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1564                 if file_segment:
1565                     pos = long(file_segment.group(1))
1566                     size = long(file_segment.group(2))
1567                     name = file_segment.group(3).replace('\\040', ' ')
1568                     filepath = os.path.join(stream_name, name)
1569                     afile = self.find_or_create(filepath, FILE)
1570                     if isinstance(afile, ArvadosFile):
1571                         afile.add_segment(blocks, pos, size)
1572                     else:
1573                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1574                 else:
1575                     # error!
1576                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1577
1578             if sep == "\n":
1579                 stream_name = None
1580                 state = STREAM_NAME
1581
1582         self.set_committed()
1583
1584     @synchronized
1585     def notify(self, event, collection, name, item):
1586         if self._callback:
1587             self._callback(event, collection, name, item)
1588
1589
1590 class Subcollection(RichCollectionBase):
1591     """This is a subdirectory within a collection that doesn't have its own API
1592     server record.
1593
1594     Subcollection locking falls under the umbrella lock of its root collection.
1595
1596     """
1597
1598     def __init__(self, parent, name):
1599         super(Subcollection, self).__init__(parent)
1600         self.lock = self.root_collection().lock
1601         self._manifest_text = None
1602         self.name = name
1603         self.num_retries = parent.num_retries
1604
1605     def root_collection(self):
1606         return self.parent.root_collection()
1607
1608     def writable(self):
1609         return self.root_collection().writable()
1610
1611     def _my_api(self):
1612         return self.root_collection()._my_api()
1613
1614     def _my_keep(self):
1615         return self.root_collection()._my_keep()
1616
1617     def _my_block_manager(self):
1618         return self.root_collection()._my_block_manager()
1619
1620     def stream_name(self):
1621         return os.path.join(self.parent.stream_name(), self.name)
1622
1623     @synchronized
1624     def clone(self, new_parent, new_name):
1625         c = Subcollection(new_parent, new_name)
1626         c._clonefrom(self)
1627         return c
1628
1629     @must_be_writable
1630     @synchronized
1631     def _reparent(self, newparent, newname):
1632         self._committed = False
1633         self.flush()
1634         self.parent.remove(self.name, recursive=True)
1635         self.parent = newparent
1636         self.name = newname
1637         self.lock = self.parent.root_collection().lock
1638
1639
1640 class CollectionReader(Collection):
1641     """A read-only collection object.
1642
1643     Initialize from an api collection record locator, a portable data hash of a
1644     manifest, or raw manifest text.  See `Collection` constructor for detailed
1645     options.
1646
1647     """
1648     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1649         self._in_init = True
1650         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1651         self._in_init = False
1652
1653         # Forego any locking since it should never change once initialized.
1654         self.lock = NoopLock()
1655
1656         # Backwards compatability with old CollectionReader
1657         # all_streams() and all_files()
1658         self._streams = None
1659
1660     def writable(self):
1661         return self._in_init
1662
1663     def _populate_streams(orig_func):
1664         @functools.wraps(orig_func)
1665         def populate_streams_wrapper(self, *args, **kwargs):
1666             # Defer populating self._streams until needed since it creates a copy of the manifest.
1667             if self._streams is None:
1668                 if self._manifest_text:
1669                     self._streams = [sline.split()
1670                                      for sline in self._manifest_text.split("\n")
1671                                      if sline]
1672                 else:
1673                     self._streams = []
1674             return orig_func(self, *args, **kwargs)
1675         return populate_streams_wrapper
1676
1677     @_populate_streams
1678     def normalize(self):
1679         """Normalize the streams returned by `all_streams`.
1680
1681         This method is kept for backwards compatability and only affects the
1682         behavior of `all_streams()` and `all_files()`
1683
1684         """
1685
1686         # Rearrange streams
1687         streams = {}
1688         for s in self.all_streams():
1689             for f in s.all_files():
1690                 streamname, filename = split(s.name() + "/" + f.name())
1691                 if streamname not in streams:
1692                     streams[streamname] = {}
1693                 if filename not in streams[streamname]:
1694                     streams[streamname][filename] = []
1695                 for r in f.segments:
1696                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1697
1698         self._streams = [normalize_stream(s, streams[s])
1699                          for s in sorted(streams)]
1700     @_populate_streams
1701     def all_streams(self):
1702         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1703                 for s in self._streams]
1704
1705     @_populate_streams
1706     def all_files(self):
1707         for s in self.all_streams():
1708             for f in s.all_files():
1709                 yield f