9463: Several changes to the PySDK, as described below:
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5 import errno
6 import hashlib
7 import time
8 import threading
9
10 from collections import deque
11 from stat import *
12
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
19 import config
20 import errors
21 import util
22 import events
23 from arvados.retry import retry_method
24
25 _logger = logging.getLogger('arvados.collection')
26
27 class CollectionBase(object):
28     def __enter__(self):
29         return self
30
31     def __exit__(self, exc_type, exc_value, traceback):
32         pass
33
34     def _my_keep(self):
35         if self._keep_client is None:
36             self._keep_client = KeepClient(api_client=self._api_client,
37                                            num_retries=self.num_retries)
38         return self._keep_client
39
40     def stripped_manifest(self):
41         """Get the manifest with locator hints stripped.
42
43         Return the manifest for the current collection with all
44         non-portable hints (i.e., permission signatures and other
45         hints other than size hints) removed from the locators.
46         """
47         raw = self.manifest_text()
48         clean = []
49         for line in raw.split("\n"):
50             fields = line.split()
51             if fields:
52                 clean_fields = fields[:1] + [
53                     (re.sub(r'\+[^\d][^\+]*', '', x)
54                      if re.match(util.keep_locator_pattern, x)
55                      else x)
56                     for x in fields[1:]]
57                 clean += [' '.join(clean_fields), "\n"]
58         return ''.join(clean)
59
60
61 class _WriterFile(_FileLikeObjectBase):
62     def __init__(self, coll_writer, name):
63         super(_WriterFile, self).__init__(name, 'wb')
64         self.dest = coll_writer
65
66     def close(self):
67         super(_WriterFile, self).close()
68         self.dest.finish_current_file()
69
70     @_FileLikeObjectBase._before_close
71     def write(self, data):
72         self.dest.write(data)
73
74     @_FileLikeObjectBase._before_close
75     def writelines(self, seq):
76         for data in seq:
77             self.write(data)
78
79     @_FileLikeObjectBase._before_close
80     def flush(self):
81         self.dest.flush_data()
82
83
84 class CollectionWriter(CollectionBase):
85     def __init__(self, api_client=None, num_retries=0, replication=None):
86         """Instantiate a CollectionWriter.
87
88         CollectionWriter lets you build a new Arvados Collection from scratch.
89         Write files to it.  The CollectionWriter will upload data to Keep as
90         appropriate, and provide you with the Collection manifest text when
91         you're finished.
92
93         Arguments:
94         * api_client: The API client to use to look up Collections.  If not
95           provided, CollectionReader will build one from available Arvados
96           configuration.
97         * num_retries: The default number of times to retry failed
98           service requests.  Default 0.  You may change this value
99           after instantiation, but note those changes may not
100           propagate to related objects like the Keep client.
101         * replication: The number of copies of each block to store.
102           If this argument is None or not supplied, replication is
103           the server-provided default if available, otherwise 2.
104         """
105         self._api_client = api_client
106         self.num_retries = num_retries
107         self.replication = (2 if replication is None else replication)
108         self._keep_client = None
109         self._data_buffer = []
110         self._data_buffer_len = 0
111         self._current_stream_files = []
112         self._current_stream_length = 0
113         self._current_stream_locators = []
114         self._current_stream_name = '.'
115         self._current_file_name = None
116         self._current_file_pos = 0
117         self._finished_streams = []
118         self._close_file = None
119         self._queued_file = None
120         self._queued_dirents = deque()
121         self._queued_trees = deque()
122         self._last_open = None
123
124     def __exit__(self, exc_type, exc_value, traceback):
125         if exc_type is None:
126             self.finish()
127
128     def do_queued_work(self):
129         # The work queue consists of three pieces:
130         # * _queued_file: The file object we're currently writing to the
131         #   Collection.
132         # * _queued_dirents: Entries under the current directory
133         #   (_queued_trees[0]) that we want to write or recurse through.
134         #   This may contain files from subdirectories if
135         #   max_manifest_depth == 0 for this directory.
136         # * _queued_trees: Directories that should be written as separate
137         #   streams to the Collection.
138         # This function handles the smallest piece of work currently queued
139         # (current file, then current directory, then next directory) until
140         # no work remains.  The _work_THING methods each do a unit of work on
141         # THING.  _queue_THING methods add a THING to the work queue.
142         while True:
143             if self._queued_file:
144                 self._work_file()
145             elif self._queued_dirents:
146                 self._work_dirents()
147             elif self._queued_trees:
148                 self._work_trees()
149             else:
150                 break
151
152     def _work_file(self):
153         while True:
154             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
155             if not buf:
156                 break
157             self.write(buf)
158         self.finish_current_file()
159         if self._close_file:
160             self._queued_file.close()
161         self._close_file = None
162         self._queued_file = None
163
164     def _work_dirents(self):
165         path, stream_name, max_manifest_depth = self._queued_trees[0]
166         if stream_name != self.current_stream_name():
167             self.start_new_stream(stream_name)
168         while self._queued_dirents:
169             dirent = self._queued_dirents.popleft()
170             target = os.path.join(path, dirent)
171             if os.path.isdir(target):
172                 self._queue_tree(target,
173                                  os.path.join(stream_name, dirent),
174                                  max_manifest_depth - 1)
175             else:
176                 self._queue_file(target, dirent)
177                 break
178         if not self._queued_dirents:
179             self._queued_trees.popleft()
180
181     def _work_trees(self):
182         path, stream_name, max_manifest_depth = self._queued_trees[0]
183         d = util.listdir_recursive(
184             path, max_depth = (None if max_manifest_depth == 0 else 0))
185         if d:
186             self._queue_dirents(stream_name, d)
187         else:
188             self._queued_trees.popleft()
189
190     def _queue_file(self, source, filename=None):
191         assert (self._queued_file is None), "tried to queue more than one file"
192         if not hasattr(source, 'read'):
193             source = open(source, 'rb')
194             self._close_file = True
195         else:
196             self._close_file = False
197         if filename is None:
198             filename = os.path.basename(source.name)
199         self.start_new_file(filename)
200         self._queued_file = source
201
202     def _queue_dirents(self, stream_name, dirents):
203         assert (not self._queued_dirents), "tried to queue more than one tree"
204         self._queued_dirents = deque(sorted(dirents))
205
206     def _queue_tree(self, path, stream_name, max_manifest_depth):
207         self._queued_trees.append((path, stream_name, max_manifest_depth))
208
209     def write_file(self, source, filename=None):
210         self._queue_file(source, filename)
211         self.do_queued_work()
212
213     def write_directory_tree(self,
214                              path, stream_name='.', max_manifest_depth=-1):
215         self._queue_tree(path, stream_name, max_manifest_depth)
216         self.do_queued_work()
217
218     def write(self, newdata):
219         if hasattr(newdata, '__iter__'):
220             for s in newdata:
221                 self.write(s)
222             return
223         self._data_buffer.append(newdata)
224         self._data_buffer_len += len(newdata)
225         self._current_stream_length += len(newdata)
226         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
227             self.flush_data()
228
229     def open(self, streampath, filename=None):
230         """open(streampath[, filename]) -> file-like object
231
232         Pass in the path of a file to write to the Collection, either as a
233         single string or as two separate stream name and file name arguments.
234         This method returns a file-like object you can write to add it to the
235         Collection.
236
237         You may only have one file object from the Collection open at a time,
238         so be sure to close the object when you're done.  Using the object in
239         a with statement makes that easy::
240
241           with cwriter.open('./doc/page1.txt') as outfile:
242               outfile.write(page1_data)
243           with cwriter.open('./doc/page2.txt') as outfile:
244               outfile.write(page2_data)
245         """
246         if filename is None:
247             streampath, filename = split(streampath)
248         if self._last_open and not self._last_open.closed:
249             raise errors.AssertionError(
250                 "can't open '{}' when '{}' is still open".format(
251                     filename, self._last_open.name))
252         if streampath != self.current_stream_name():
253             self.start_new_stream(streampath)
254         self.set_current_file_name(filename)
255         self._last_open = _WriterFile(self, filename)
256         return self._last_open
257
258     def flush_data(self):
259         data_buffer = ''.join(self._data_buffer)
260         if data_buffer:
261             self._current_stream_locators.append(
262                 self._my_keep().put(
263                     data_buffer[0:config.KEEP_BLOCK_SIZE],
264                     copies=self.replication))
265             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
266             self._data_buffer_len = len(self._data_buffer[0])
267
268     def start_new_file(self, newfilename=None):
269         self.finish_current_file()
270         self.set_current_file_name(newfilename)
271
272     def set_current_file_name(self, newfilename):
273         if re.search(r'[\t\n]', newfilename):
274             raise errors.AssertionError(
275                 "Manifest filenames cannot contain whitespace: %s" %
276                 newfilename)
277         elif re.search(r'\x00', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain NUL characters: %s" %
280                 newfilename)
281         self._current_file_name = newfilename
282
283     def current_file_name(self):
284         return self._current_file_name
285
286     def finish_current_file(self):
287         if self._current_file_name is None:
288             if self._current_file_pos == self._current_stream_length:
289                 return
290             raise errors.AssertionError(
291                 "Cannot finish an unnamed file " +
292                 "(%d bytes at offset %d in '%s' stream)" %
293                 (self._current_stream_length - self._current_file_pos,
294                  self._current_file_pos,
295                  self._current_stream_name))
296         self._current_stream_files.append([
297                 self._current_file_pos,
298                 self._current_stream_length - self._current_file_pos,
299                 self._current_file_name])
300         self._current_file_pos = self._current_stream_length
301         self._current_file_name = None
302
303     def start_new_stream(self, newstreamname='.'):
304         self.finish_current_stream()
305         self.set_current_stream_name(newstreamname)
306
307     def set_current_stream_name(self, newstreamname):
308         if re.search(r'[\t\n]', newstreamname):
309             raise errors.AssertionError(
310                 "Manifest stream names cannot contain whitespace")
311         self._current_stream_name = '.' if newstreamname=='' else newstreamname
312
313     def current_stream_name(self):
314         return self._current_stream_name
315
316     def finish_current_stream(self):
317         self.finish_current_file()
318         self.flush_data()
319         if not self._current_stream_files:
320             pass
321         elif self._current_stream_name is None:
322             raise errors.AssertionError(
323                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
324                 (self._current_stream_length, len(self._current_stream_files)))
325         else:
326             if not self._current_stream_locators:
327                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
328             self._finished_streams.append([self._current_stream_name,
329                                            self._current_stream_locators,
330                                            self._current_stream_files])
331         self._current_stream_files = []
332         self._current_stream_length = 0
333         self._current_stream_locators = []
334         self._current_stream_name = None
335         self._current_file_pos = 0
336         self._current_file_name = None
337
338     def finish(self):
339         """Store the manifest in Keep and return its locator.
340
341         This is useful for storing manifest fragments (task outputs)
342         temporarily in Keep during a Crunch job.
343
344         In other cases you should make a collection instead, by
345         sending manifest_text() to the API server's "create
346         collection" endpoint.
347         """
348         return self._my_keep().put(self.manifest_text(), copies=self.replication)
349
350     def portable_data_hash(self):
351         stripped = self.stripped_manifest()
352         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
353
354     def manifest_text(self):
355         self.finish_current_stream()
356         manifest = ''
357
358         for stream in self._finished_streams:
359             if not re.search(r'^\.(/.*)?$', stream[0]):
360                 manifest += './'
361             manifest += stream[0].replace(' ', '\\040')
362             manifest += ' ' + ' '.join(stream[1])
363             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
364             manifest += "\n"
365
366         return manifest
367
368     def data_locators(self):
369         ret = []
370         for name, locators, files in self._finished_streams:
371             ret += locators
372         return ret
373
374
375 class ResumableCollectionWriter(CollectionWriter):
376     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
377                    '_current_stream_locators', '_current_stream_name',
378                    '_current_file_name', '_current_file_pos', '_close_file',
379                    '_data_buffer', '_dependencies', '_finished_streams',
380                    '_queued_dirents', '_queued_trees']
381
382     def __init__(self, api_client=None, **kwargs):
383         self._dependencies = {}
384         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
385
386     @classmethod
387     def from_state(cls, state, *init_args, **init_kwargs):
388         # Try to build a new writer from scratch with the given state.
389         # If the state is not suitable to resume (because files have changed,
390         # been deleted, aren't predictable, etc.), raise a
391         # StaleWriterStateError.  Otherwise, return the initialized writer.
392         # The caller is responsible for calling writer.do_queued_work()
393         # appropriately after it's returned.
394         writer = cls(*init_args, **init_kwargs)
395         for attr_name in cls.STATE_PROPS:
396             attr_value = state[attr_name]
397             attr_class = getattr(writer, attr_name).__class__
398             # Coerce the value into the same type as the initial value, if
399             # needed.
400             if attr_class not in (type(None), attr_value.__class__):
401                 attr_value = attr_class(attr_value)
402             setattr(writer, attr_name, attr_value)
403         # Check dependencies before we try to resume anything.
404         if any(KeepLocator(ls).permission_expired()
405                for ls in writer._current_stream_locators):
406             raise errors.StaleWriterStateError(
407                 "locators include expired permission hint")
408         writer.check_dependencies()
409         if state['_current_file'] is not None:
410             path, pos = state['_current_file']
411             try:
412                 writer._queued_file = open(path, 'rb')
413                 writer._queued_file.seek(pos)
414             except IOError as error:
415                 raise errors.StaleWriterStateError(
416                     "failed to reopen active file {}: {}".format(path, error))
417         return writer
418
419     def check_dependencies(self):
420         for path, orig_stat in self._dependencies.items():
421             if not S_ISREG(orig_stat[ST_MODE]):
422                 raise errors.StaleWriterStateError("{} not file".format(path))
423             try:
424                 now_stat = tuple(os.stat(path))
425             except OSError as error:
426                 raise errors.StaleWriterStateError(
427                     "failed to stat {}: {}".format(path, error))
428             if ((not S_ISREG(now_stat[ST_MODE])) or
429                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
430                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
431                 raise errors.StaleWriterStateError("{} changed".format(path))
432
433     def dump_state(self, copy_func=lambda x: x):
434         state = {attr: copy_func(getattr(self, attr))
435                  for attr in self.STATE_PROPS}
436         if self._queued_file is None:
437             state['_current_file'] = None
438         else:
439             state['_current_file'] = (os.path.realpath(self._queued_file.name),
440                                       self._queued_file.tell())
441         return state
442
443     def _queue_file(self, source, filename=None):
444         try:
445             src_path = os.path.realpath(source)
446         except Exception:
447             raise errors.AssertionError("{} not a file path".format(source))
448         try:
449             path_stat = os.stat(src_path)
450         except OSError as stat_error:
451             path_stat = None
452         super(ResumableCollectionWriter, self)._queue_file(source, filename)
453         fd_stat = os.fstat(self._queued_file.fileno())
454         if not S_ISREG(fd_stat.st_mode):
455             # We won't be able to resume from this cache anyway, so don't
456             # worry about further checks.
457             self._dependencies[source] = tuple(fd_stat)
458         elif path_stat is None:
459             raise errors.AssertionError(
460                 "could not stat {}: {}".format(source, stat_error))
461         elif path_stat.st_ino != fd_stat.st_ino:
462             raise errors.AssertionError(
463                 "{} changed between open and stat calls".format(source))
464         else:
465             self._dependencies[src_path] = tuple(fd_stat)
466
467     def write(self, data):
468         if self._queued_file is None:
469             raise errors.AssertionError(
470                 "resumable writer can't accept unsourced data")
471         return super(ResumableCollectionWriter, self).write(data)
472
473
474 ADD = "add"
475 DEL = "del"
476 MOD = "mod"
477 TOK = "tok"
478 FILE = "file"
479 COLLECTION = "collection"
480
481 class RichCollectionBase(CollectionBase):
482     """Base class for Collections and Subcollections.
483
484     Implements the majority of functionality relating to accessing items in the
485     Collection.
486
487     """
488
489     def __init__(self, parent=None):
490         self.parent = parent
491         self._committed = False
492         self._callback = None
493         self._items = {}
494
495     def _my_api(self):
496         raise NotImplementedError()
497
498     def _my_keep(self):
499         raise NotImplementedError()
500
501     def _my_block_manager(self):
502         raise NotImplementedError()
503
504     def writable(self):
505         raise NotImplementedError()
506
507     def root_collection(self):
508         raise NotImplementedError()
509
510     def notify(self, event, collection, name, item):
511         raise NotImplementedError()
512
513     def stream_name(self):
514         raise NotImplementedError()
515
516     @must_be_writable
517     @synchronized
518     def find_or_create(self, path, create_type):
519         """Recursively search the specified file path.
520
521         May return either a `Collection` or `ArvadosFile`.  If not found, will
522         create a new item at the specified path based on `create_type`.  Will
523         create intermediate subcollections needed to contain the final item in
524         the path.
525
526         :create_type:
527           One of `arvados.collection.FILE` or
528           `arvados.collection.COLLECTION`.  If the path is not found, and value
529           of create_type is FILE then create and return a new ArvadosFile for
530           the last path component.  If COLLECTION, then create and return a new
531           Collection for the last path component.
532
533         """
534
535         pathcomponents = path.split("/", 1)
536         if pathcomponents[0]:
537             item = self._items.get(pathcomponents[0])
538             if len(pathcomponents) == 1:
539                 if item is None:
540                     # create new file
541                     if create_type == COLLECTION:
542                         item = Subcollection(self, pathcomponents[0])
543                     else:
544                         item = ArvadosFile(self, pathcomponents[0])
545                     self._items[pathcomponents[0]] = item
546                     self._committed = False
547                     self.notify(ADD, self, pathcomponents[0], item)
548                 return item
549             else:
550                 if item is None:
551                     # create new collection
552                     item = Subcollection(self, pathcomponents[0])
553                     self._items[pathcomponents[0]] = item
554                     self._committed = False
555                     self.notify(ADD, self, pathcomponents[0], item)
556                 if isinstance(item, RichCollectionBase):
557                     return item.find_or_create(pathcomponents[1], create_type)
558                 else:
559                     raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
560         else:
561             return self
562
563     @synchronized
564     def find(self, path):
565         """Recursively search the specified file path.
566
567         May return either a Collection or ArvadosFile.  Return None if not
568         found.
569
570         """
571         if not path:
572             raise errors.ArgumentError("Parameter 'path' is empty.")
573
574         pathcomponents = path.split("/", 1)
575         item = self._items.get(pathcomponents[0])
576         if len(pathcomponents) == 1:
577             return item
578         else:
579             if isinstance(item, RichCollectionBase):
580                 if pathcomponents[1]:
581                     return item.find(pathcomponents[1])
582                 else:
583                     return item
584             else:
585                 raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
586
587     @synchronized
588     def mkdirs(self, path):
589         """Recursive subcollection create.
590
591         Like `os.makedirs()`.  Will create intermediate subcollections needed
592         to contain the leaf subcollection path.
593
594         """
595
596         if self.find(path) != None:
597             raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
598
599         return self.find_or_create(path, COLLECTION)
600
601     def open(self, path, mode="r"):
602         """Open a file-like object for access.
603
604         :path:
605           path to a file in the collection
606         :mode:
607           one of "r", "r+", "w", "w+", "a", "a+"
608           :"r":
609             opens for reading
610           :"r+":
611             opens for reading and writing.  Reads/writes share a file pointer.
612           :"w", "w+":
613             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
614           :"a", "a+":
615             opens for reading and writing.  All writes are appended to
616             the end of the file.  Writing does not affect the file pointer for
617             reading.
618         """
619         mode = mode.replace("b", "")
620         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
621             raise errors.ArgumentError("Bad mode '%s'" % mode)
622         create = (mode != "r")
623
624         if create and not self.writable():
625             raise IOError(errno.EROFS, "Collection is read only")
626
627         if create:
628             arvfile = self.find_or_create(path, FILE)
629         else:
630             arvfile = self.find(path)
631
632         if arvfile is None:
633             raise IOError(errno.ENOENT, "File not found")
634         if not isinstance(arvfile, ArvadosFile):
635             raise IOError(errno.EISDIR, "Is a directory: %s" % path)
636
637         if mode[0] == "w":
638             arvfile.truncate(0)
639
640         name = os.path.basename(path)
641
642         if mode == "r":
643             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
644         else:
645             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
646
647     def modified(self):
648         """Determine if the collection has been modified since last commited."""
649         return not self.committed()
650
651     @synchronized
652     def committed(self):
653         """Determine if the collection has been committed to the API server."""
654
655         if self._committed is False:
656             return False
657         for v in self._items.values():
658             if v.committed() is False:
659                 return False
660         return True
661
662     @synchronized
663     def set_committed(self):
664         """Recursively set committed flag to True."""
665         self._committed = True
666         for k,v in self._items.items():
667             v.set_committed()
668
669     @synchronized
670     def __iter__(self):
671         """Iterate over names of files and collections contained in this collection."""
672         return iter(self._items.keys())
673
674     @synchronized
675     def __getitem__(self, k):
676         """Get a file or collection that is directly contained by this collection.
677
678         If you want to search a path, use `find()` instead.
679
680         """
681         return self._items[k]
682
683     @synchronized
684     def __contains__(self, k):
685         """Test if there is a file or collection a directly contained by this collection."""
686         return k in self._items
687
688     @synchronized
689     def __len__(self):
690         """Get the number of items directly contained in this collection."""
691         return len(self._items)
692
693     @must_be_writable
694     @synchronized
695     def __delitem__(self, p):
696         """Delete an item by name which is directly contained by this collection."""
697         del self._items[p]
698         self._committed = False
699         self.notify(DEL, self, p, None)
700
701     @synchronized
702     def keys(self):
703         """Get a list of names of files and collections directly contained in this collection."""
704         return self._items.keys()
705
706     @synchronized
707     def values(self):
708         """Get a list of files and collection objects directly contained in this collection."""
709         return self._items.values()
710
711     @synchronized
712     def items(self):
713         """Get a list of (name, object) tuples directly contained in this collection."""
714         return self._items.items()
715
716     def exists(self, path):
717         """Test if there is a file or collection at `path`."""
718         return self.find(path) is not None
719
720     @must_be_writable
721     @synchronized
722     def remove(self, path, recursive=False):
723         """Remove the file or subcollection (directory) at `path`.
724
725         :recursive:
726           Specify whether to remove non-empty subcollections (True), or raise an error (False).
727         """
728
729         if not path:
730             raise errors.ArgumentError("Parameter 'path' is empty.")
731
732         pathcomponents = path.split("/", 1)
733         item = self._items.get(pathcomponents[0])
734         if item is None:
735             raise IOError(errno.ENOENT, "File not found")
736         if len(pathcomponents) == 1:
737             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
738                 raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
739             deleteditem = self._items[pathcomponents[0]]
740             del self._items[pathcomponents[0]]
741             self._committed = False
742             self.notify(DEL, self, pathcomponents[0], deleteditem)
743         else:
744             item.remove(pathcomponents[1])
745
746     def _clonefrom(self, source):
747         for k,v in source.items():
748             self._items[k] = v.clone(self, k)
749
750     def clone(self):
751         raise NotImplementedError()
752
753     @must_be_writable
754     @synchronized
755     def add(self, source_obj, target_name, overwrite=False, reparent=False):
756         """Copy or move a file or subcollection to this collection.
757
758         :source_obj:
759           An ArvadosFile, or Subcollection object
760
761         :target_name:
762           Destination item name.  If the target name already exists and is a
763           file, this will raise an error unless you specify `overwrite=True`.
764
765         :overwrite:
766           Whether to overwrite target file if it already exists.
767
768         :reparent:
769           If True, source_obj will be moved from its parent collection to this collection.
770           If False, source_obj will be copied and the parent collection will be
771           unmodified.
772
773         """
774
775         if target_name in self and not overwrite:
776             raise IOError(errno.EEXIST, "File already exists")
777
778         modified_from = None
779         if target_name in self:
780             modified_from = self[target_name]
781
782         # Actually make the move or copy.
783         if reparent:
784             source_obj._reparent(self, target_name)
785             item = source_obj
786         else:
787             item = source_obj.clone(self, target_name)
788
789         self._items[target_name] = item
790         self._committed = False
791
792         if modified_from:
793             self.notify(MOD, self, target_name, (modified_from, item))
794         else:
795             self.notify(ADD, self, target_name, item)
796
797     def _get_src_target(self, source, target_path, source_collection, create_dest):
798         if source_collection is None:
799             source_collection = self
800
801         # Find the object
802         if isinstance(source, basestring):
803             source_obj = source_collection.find(source)
804             if source_obj is None:
805                 raise IOError(errno.ENOENT, "File not found")
806             sourcecomponents = source.split("/")
807         else:
808             source_obj = source
809             sourcecomponents = None
810
811         # Find parent collection the target path
812         targetcomponents = target_path.split("/")
813
814         # Determine the name to use.
815         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
816
817         if not target_name:
818             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
819
820         if create_dest:
821             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
822         else:
823             if len(targetcomponents) > 1:
824                 target_dir = self.find("/".join(targetcomponents[0:-1]))
825             else:
826                 target_dir = self
827
828         if target_dir is None:
829             raise IOError(errno.ENOENT, "Target directory not found.")
830
831         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
832             target_dir = target_dir[target_name]
833             target_name = sourcecomponents[-1]
834
835         return (source_obj, target_dir, target_name)
836
837     @must_be_writable
838     @synchronized
839     def copy(self, source, target_path, source_collection=None, overwrite=False):
840         """Copy a file or subcollection to a new path in this collection.
841
842         :source:
843           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
844
845         :target_path:
846           Destination file or path.  If the target path already exists and is a
847           subcollection, the item will be placed inside the subcollection.  If
848           the target path already exists and is a file, this will raise an error
849           unless you specify `overwrite=True`.
850
851         :source_collection:
852           Collection to copy `source_path` from (default `self`)
853
854         :overwrite:
855           Whether to overwrite target file if it already exists.
856         """
857
858         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
859         target_dir.add(source_obj, target_name, overwrite, False)
860
861     @must_be_writable
862     @synchronized
863     def rename(self, source, target_path, source_collection=None, overwrite=False):
864         """Move a file or subcollection from `source_collection` to a new path in this collection.
865
866         :source:
867           A string with a path to source file or subcollection.
868
869         :target_path:
870           Destination file or path.  If the target path already exists and is a
871           subcollection, the item will be placed inside the subcollection.  If
872           the target path already exists and is a file, this will raise an error
873           unless you specify `overwrite=True`.
874
875         :source_collection:
876           Collection to copy `source_path` from (default `self`)
877
878         :overwrite:
879           Whether to overwrite target file if it already exists.
880         """
881
882         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
883         if not source_obj.writable():
884             raise IOError(errno.EROFS, "Source collection is read only.")
885         target_dir.add(source_obj, target_name, overwrite, True)
886
887     def portable_manifest_text(self, stream_name="."):
888         """Get the manifest text for this collection, sub collections and files.
889
890         This method does not flush outstanding blocks to Keep.  It will return
891         a normalized manifest with access tokens stripped.
892
893         :stream_name:
894           Name to use for this stream (directory)
895
896         """
897         return self._get_manifest_text(stream_name, True, True)
898
899     @synchronized
900     def manifest_text(self, stream_name=".", strip=False, normalize=False):
901         """Get the manifest text for this collection, sub collections and files.
902
903         This method will flush outstanding blocks to Keep.  By default, it will
904         not normalize an unmodified manifest or strip access tokens.
905
906         :stream_name:
907           Name to use for this stream (directory)
908
909         :strip:
910           If True, remove signing tokens from block locators if present.
911           If False (default), block locators are left unchanged.
912
913         :normalize:
914           If True, always export the manifest text in normalized form
915           even if the Collection is not modified.  If False (default) and the collection
916           is not modified, return the original manifest text even if it is not
917           in normalized form.
918
919         """
920
921         self._my_block_manager().commit_all()
922         return self._get_manifest_text(stream_name, strip, normalize)
923
924     @synchronized
925     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
926         """Get the manifest text for this collection, sub collections and files.
927
928         :stream_name:
929           Name to use for this stream (directory)
930
931         :strip:
932           If True, remove signing tokens from block locators if present.
933           If False (default), block locators are left unchanged.
934
935         :normalize:
936           If True, always export the manifest text in normalized form
937           even if the Collection is not modified.  If False (default) and the collection
938           is not modified, return the original manifest text even if it is not
939           in normalized form.
940
941         :only_committed:
942           If True, only include blocks that were already committed to Keep.
943
944         """
945
946         if not self.committed() or self._manifest_text is None or normalize:
947             stream = {}
948             buf = []
949             sorted_keys = sorted(self.keys())
950             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
951                 # Create a stream per file `k`
952                 arvfile = self[filename]
953                 filestream = []
954                 for segment in arvfile.segments():
955                     loc = segment.locator
956                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
957                         if only_committed:
958                             continue
959                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
960                     if strip:
961                         loc = KeepLocator(loc).stripped()
962                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
963                                          segment.segment_offset, segment.range_size))
964                 stream[filename] = filestream
965             if stream:
966                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
967             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
968                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True))
969             return "".join(buf)
970         else:
971             if strip:
972                 return self.stripped_manifest()
973             else:
974                 return self._manifest_text
975
976     @synchronized
977     def diff(self, end_collection, prefix=".", holding_collection=None):
978         """Generate list of add/modify/delete actions.
979
980         When given to `apply`, will change `self` to match `end_collection`
981
982         """
983         changes = []
984         if holding_collection is None:
985             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
986         for k in self:
987             if k not in end_collection:
988                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
989         for k in end_collection:
990             if k in self:
991                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
992                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
993                 elif end_collection[k] != self[k]:
994                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
995                 else:
996                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
997             else:
998                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
999         return changes
1000
1001     @must_be_writable
1002     @synchronized
1003     def apply(self, changes):
1004         """Apply changes from `diff`.
1005
1006         If a change conflicts with a local change, it will be saved to an
1007         alternate path indicating the conflict.
1008
1009         """
1010         if changes:
1011             self._committed = False
1012         for change in changes:
1013             event_type = change[0]
1014             path = change[1]
1015             initial = change[2]
1016             local = self.find(path)
1017             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1018                                                                     time.gmtime()))
1019             if event_type == ADD:
1020                 if local is None:
1021                     # No local file at path, safe to copy over new file
1022                     self.copy(initial, path)
1023                 elif local is not None and local != initial:
1024                     # There is already local file and it is different:
1025                     # save change to conflict file.
1026                     self.copy(initial, conflictpath)
1027             elif event_type == MOD or event_type == TOK:
1028                 final = change[3]
1029                 if local == initial:
1030                     # Local matches the "initial" item so it has not
1031                     # changed locally and is safe to update.
1032                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1033                         # Replace contents of local file with new contents
1034                         local.replace_contents(final)
1035                     else:
1036                         # Overwrite path with new item; this can happen if
1037                         # path was a file and is now a collection or vice versa
1038                         self.copy(final, path, overwrite=True)
1039                 else:
1040                     # Local is missing (presumably deleted) or local doesn't
1041                     # match the "start" value, so save change to conflict file
1042                     self.copy(final, conflictpath)
1043             elif event_type == DEL:
1044                 if local == initial:
1045                     # Local item matches "initial" value, so it is safe to remove.
1046                     self.remove(path, recursive=True)
1047                 # else, the file is modified or already removed, in either
1048                 # case we don't want to try to remove it.
1049
1050     def portable_data_hash(self):
1051         """Get the portable data hash for this collection's manifest."""
1052         stripped = self.portable_manifest_text()
1053         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1054
1055     @synchronized
1056     def subscribe(self, callback):
1057         if self._callback is None:
1058             self._callback = callback
1059         else:
1060             raise errors.ArgumentError("A callback is already set on this collection.")
1061
1062     @synchronized
1063     def unsubscribe(self):
1064         if self._callback is not None:
1065             self._callback = None
1066
1067     @synchronized
1068     def notify(self, event, collection, name, item):
1069         if self._callback:
1070             self._callback(event, collection, name, item)
1071         self.root_collection().notify(event, collection, name, item)
1072
1073     @synchronized
1074     def __eq__(self, other):
1075         if other is self:
1076             return True
1077         if not isinstance(other, RichCollectionBase):
1078             return False
1079         if len(self._items) != len(other):
1080             return False
1081         for k in self._items:
1082             if k not in other:
1083                 return False
1084             if self._items[k] != other[k]:
1085                 return False
1086         return True
1087
1088     def __ne__(self, other):
1089         return not self.__eq__(other)
1090
1091     @synchronized
1092     def flush(self):
1093         """Flush bufferblocks to Keep."""
1094         for e in self.values():
1095             e.flush()
1096
1097
1098 class Collection(RichCollectionBase):
1099     """Represents the root of an Arvados Collection.
1100
1101     This class is threadsafe.  The root collection object, all subcollections
1102     and files are protected by a single lock (i.e. each access locks the entire
1103     collection).
1104
1105     Brief summary of
1106     useful methods:
1107
1108     :To read an existing file:
1109       `c.open("myfile", "r")`
1110
1111     :To write a new file:
1112       `c.open("myfile", "w")`
1113
1114     :To determine if a file exists:
1115       `c.find("myfile") is not None`
1116
1117     :To copy a file:
1118       `c.copy("source", "dest")`
1119
1120     :To delete a file:
1121       `c.remove("myfile")`
1122
1123     :To save to an existing collection record:
1124       `c.save()`
1125
1126     :To save a new collection record:
1127     `c.save_new()`
1128
1129     :To merge remote changes into this object:
1130       `c.update()`
1131
1132     Must be associated with an API server Collection record (during
1133     initialization, or using `save_new`) to use `save` or `update`
1134
1135     """
1136
1137     def __init__(self, manifest_locator_or_text=None,
1138                  api_client=None,
1139                  keep_client=None,
1140                  num_retries=None,
1141                  parent=None,
1142                  apiconfig=None,
1143                  block_manager=None,
1144                  replication_desired=None):
1145         """Collection constructor.
1146
1147         :manifest_locator_or_text:
1148           One of Arvados collection UUID, block locator of
1149           a manifest, raw manifest text, or None (to create an empty collection).
1150         :parent:
1151           the parent Collection, may be None.
1152
1153         :apiconfig:
1154           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1155           Prefer this over supplying your own api_client and keep_client (except in testing).
1156           Will use default config settings if not specified.
1157
1158         :api_client:
1159           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1160
1161         :keep_client:
1162           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1163
1164         :num_retries:
1165           the number of retries for API and Keep requests.
1166
1167         :block_manager:
1168           the block manager to use.  If not specified, create one.
1169
1170         :replication_desired:
1171           How many copies should Arvados maintain. If None, API server default
1172           configuration applies. If not None, this value will also be used
1173           for determining the number of block copies being written.
1174
1175         """
1176         super(Collection, self).__init__(parent)
1177         self._api_client = api_client
1178         self._keep_client = keep_client
1179         self._block_manager = block_manager
1180         self.replication_desired = replication_desired
1181
1182         if apiconfig:
1183             self._config = apiconfig
1184         else:
1185             self._config = config.settings()
1186
1187         self.num_retries = num_retries if num_retries is not None else 0
1188         self._manifest_locator = None
1189         self._manifest_text = None
1190         self._api_response = None
1191         self._past_versions = set()
1192
1193         self.lock = threading.RLock()
1194         self.events = None
1195
1196         if manifest_locator_or_text:
1197             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1198                 self._manifest_locator = manifest_locator_or_text
1199             elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1200                 self._manifest_locator = manifest_locator_or_text
1201             elif re.match(util.manifest_pattern, manifest_locator_or_text):
1202                 self._manifest_text = manifest_locator_or_text
1203             else:
1204                 raise errors.ArgumentError(
1205                     "Argument to CollectionReader is not a manifest or a collection UUID")
1206
1207             try:
1208                 self._populate()
1209             except (IOError, errors.SyntaxError) as e:
1210                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1211
1212     def root_collection(self):
1213         return self
1214
1215     def stream_name(self):
1216         return "."
1217
1218     def writable(self):
1219         return True
1220
1221     @synchronized
1222     def known_past_version(self, modified_at_and_portable_data_hash):
1223         return modified_at_and_portable_data_hash in self._past_versions
1224
1225     @synchronized
1226     @retry_method
1227     def update(self, other=None, num_retries=None):
1228         """Merge the latest collection on the API server with the current collection."""
1229
1230         if other is None:
1231             if self._manifest_locator is None:
1232                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1233             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1234             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1235                 response.get("portable_data_hash") != self.portable_data_hash()):
1236                 # The record on the server is different from our current one, but we've seen it before,
1237                 # so ignore it because it's already been merged.
1238                 # However, if it's the same as our current record, proceed with the update, because we want to update
1239                 # our tokens.
1240                 return
1241             else:
1242                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1243             other = CollectionReader(response["manifest_text"])
1244         baseline = CollectionReader(self._manifest_text)
1245         self.apply(baseline.diff(other))
1246         self._manifest_text = self.manifest_text()
1247
1248     @synchronized
1249     def _my_api(self):
1250         if self._api_client is None:
1251             self._api_client = ThreadSafeApiCache(self._config)
1252             if self._keep_client is None:
1253                 self._keep_client = self._api_client.keep
1254         return self._api_client
1255
1256     @synchronized
1257     def _my_keep(self):
1258         if self._keep_client is None:
1259             if self._api_client is None:
1260                 self._my_api()
1261             else:
1262                 self._keep_client = KeepClient(api_client=self._api_client)
1263         return self._keep_client
1264
1265     @synchronized
1266     def _my_block_manager(self):
1267         if self._block_manager is None:
1268             copies = (self.replication_desired or
1269                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1270                                                    2))
1271             self._block_manager = _BlockManager(self._my_keep(), copies=copies)
1272         return self._block_manager
1273
1274     def _remember_api_response(self, response):
1275         self._api_response = response
1276         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1277
1278     def _populate_from_api_server(self):
1279         # As in KeepClient itself, we must wait until the last
1280         # possible moment to instantiate an API client, in order to
1281         # avoid tripping up clients that don't have access to an API
1282         # server.  If we do build one, make sure our Keep client uses
1283         # it.  If instantiation fails, we'll fall back to the except
1284         # clause, just like any other Collection lookup
1285         # failure. Return an exception, or None if successful.
1286         try:
1287             self._remember_api_response(self._my_api().collections().get(
1288                 uuid=self._manifest_locator).execute(
1289                     num_retries=self.num_retries))
1290             self._manifest_text = self._api_response['manifest_text']
1291             # If not overriden via kwargs, we should try to load the
1292             # replication_desired from the API server
1293             if self.replication_desired is None:
1294                 self.replication_desired = self._api_response.get('replication_desired', None)
1295             return None
1296         except Exception as e:
1297             return e
1298
1299     def _populate_from_keep(self):
1300         # Retrieve a manifest directly from Keep. This has a chance of
1301         # working if [a] the locator includes a permission signature
1302         # or [b] the Keep services are operating in world-readable
1303         # mode. Return an exception, or None if successful.
1304         try:
1305             self._manifest_text = self._my_keep().get(
1306                 self._manifest_locator, num_retries=self.num_retries)
1307         except Exception as e:
1308             return e
1309
1310     def _populate(self):
1311         if self._manifest_locator is None and self._manifest_text is None:
1312             return
1313         error_via_api = None
1314         error_via_keep = None
1315         should_try_keep = ((self._manifest_text is None) and
1316                            util.keep_locator_pattern.match(
1317                                self._manifest_locator))
1318         if ((self._manifest_text is None) and
1319             util.signed_locator_pattern.match(self._manifest_locator)):
1320             error_via_keep = self._populate_from_keep()
1321         if self._manifest_text is None:
1322             error_via_api = self._populate_from_api_server()
1323             if error_via_api is not None and not should_try_keep:
1324                 raise error_via_api
1325         if ((self._manifest_text is None) and
1326             not error_via_keep and
1327             should_try_keep):
1328             # Looks like a keep locator, and we didn't already try keep above
1329             error_via_keep = self._populate_from_keep()
1330         if self._manifest_text is None:
1331             # Nothing worked!
1332             raise errors.NotFoundError(
1333                 ("Failed to retrieve collection '{}' " +
1334                  "from either API server ({}) or Keep ({})."
1335                  ).format(
1336                     self._manifest_locator,
1337                     error_via_api,
1338                     error_via_keep))
1339         # populate
1340         self._baseline_manifest = self._manifest_text
1341         self._import_manifest(self._manifest_text)
1342
1343
1344     def _has_collection_uuid(self):
1345         return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1346
1347     def __enter__(self):
1348         return self
1349
1350     def __exit__(self, exc_type, exc_value, traceback):
1351         """Support scoped auto-commit in a with: block."""
1352         if exc_type is None:
1353             if self.writable() and self._has_collection_uuid():
1354                 self.save()
1355         self.stop_threads()
1356
1357     def stop_threads(self):
1358         if self._block_manager is not None:
1359             self._block_manager.stop_threads()
1360
1361     @synchronized
1362     def manifest_locator(self):
1363         """Get the manifest locator, if any.
1364
1365         The manifest locator will be set when the collection is loaded from an
1366         API server record or the portable data hash of a manifest.
1367
1368         The manifest locator will be None if the collection is newly created or
1369         was created directly from manifest text.  The method `save_new()` will
1370         assign a manifest locator.
1371
1372         """
1373         return self._manifest_locator
1374
1375     @synchronized
1376     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1377         if new_config is None:
1378             new_config = self._config
1379         if readonly:
1380             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1381         else:
1382             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1383
1384         newcollection._clonefrom(self)
1385         return newcollection
1386
1387     @synchronized
1388     def api_response(self):
1389         """Returns information about this Collection fetched from the API server.
1390
1391         If the Collection exists in Keep but not the API server, currently
1392         returns None.  Future versions may provide a synthetic response.
1393
1394         """
1395         return self._api_response
1396
1397     def find_or_create(self, path, create_type):
1398         """See `RichCollectionBase.find_or_create`"""
1399         if path == ".":
1400             return self
1401         else:
1402             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1403
1404     def find(self, path):
1405         """See `RichCollectionBase.find`"""
1406         if path == ".":
1407             return self
1408         else:
1409             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1410
1411     def remove(self, path, recursive=False):
1412         """See `RichCollectionBase.remove`"""
1413         if path == ".":
1414             raise errors.ArgumentError("Cannot remove '.'")
1415         else:
1416             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1417
1418     @must_be_writable
1419     @synchronized
1420     @retry_method
1421     def save(self, merge=True, num_retries=None):
1422         """Save collection to an existing collection record.
1423
1424         Commit pending buffer blocks to Keep, merge with remote record (if
1425         merge=True, the default), and update the collection record.  Returns
1426         the current manifest text.
1427
1428         Will raise AssertionError if not associated with a collection record on
1429         the API server.  If you want to save a manifest to Keep only, see
1430         `save_new()`.
1431
1432         :merge:
1433           Update and merge remote changes before saving.  Otherwise, any
1434           remote changes will be ignored and overwritten.
1435
1436         :num_retries:
1437           Retry count on API calls (if None,  use the collection default)
1438
1439         """
1440         if not self.committed():
1441             if not self._has_collection_uuid():
1442                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1443
1444             self._my_block_manager().commit_all()
1445
1446             if merge:
1447                 self.update()
1448
1449             text = self.manifest_text(strip=False)
1450             self._remember_api_response(self._my_api().collections().update(
1451                 uuid=self._manifest_locator,
1452                 body={'manifest_text': text}
1453                 ).execute(
1454                     num_retries=num_retries))
1455             self._manifest_text = self._api_response["manifest_text"]
1456             self.set_committed()
1457
1458         return self._manifest_text
1459
1460
1461     @must_be_writable
1462     @synchronized
1463     @retry_method
1464     def save_new(self, name=None,
1465                  create_collection_record=True,
1466                  owner_uuid=None,
1467                  ensure_unique_name=False,
1468                  num_retries=None):
1469         """Save collection to a new collection record.
1470
1471         Commit pending buffer blocks to Keep and, when create_collection_record
1472         is True (default), create a new collection record.  After creating a
1473         new collection record, this Collection object will be associated with
1474         the new record used by `save()`.  Returns the current manifest text.
1475
1476         :name:
1477           The collection name.
1478
1479         :create_collection_record:
1480            If True, create a collection record on the API server.
1481            If False, only commit blocks to Keep and return the manifest text.
1482
1483         :owner_uuid:
1484           the user, or project uuid that will own this collection.
1485           If None, defaults to the current user.
1486
1487         :ensure_unique_name:
1488           If True, ask the API server to rename the collection
1489           if it conflicts with a collection with the same name and owner.  If
1490           False, a name conflict will result in an error.
1491
1492         :num_retries:
1493           Retry count on API calls (if None,  use the collection default)
1494
1495         """
1496         self._my_block_manager().commit_all()
1497         text = self.manifest_text(strip=False)
1498
1499         if create_collection_record:
1500             if name is None:
1501                 name = "New collection"
1502                 ensure_unique_name = True
1503
1504             body = {"manifest_text": text,
1505                     "name": name,
1506                     "replication_desired": self.replication_desired}
1507             if owner_uuid:
1508                 body["owner_uuid"] = owner_uuid
1509
1510             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1511             text = self._api_response["manifest_text"]
1512
1513             self._manifest_locator = self._api_response["uuid"]
1514
1515             self._manifest_text = text
1516             self.set_committed()
1517
1518         return text
1519
1520     @synchronized
1521     def _import_manifest(self, manifest_text):
1522         """Import a manifest into a `Collection`.
1523
1524         :manifest_text:
1525           The manifest text to import from.
1526
1527         """
1528         if len(self) > 0:
1529             raise ArgumentError("Can only import manifest into an empty collection")
1530
1531         STREAM_NAME = 0
1532         BLOCKS = 1
1533         SEGMENTS = 2
1534
1535         stream_name = None
1536         state = STREAM_NAME
1537
1538         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1539             tok = token_and_separator.group(1)
1540             sep = token_and_separator.group(2)
1541
1542             if state == STREAM_NAME:
1543                 # starting a new stream
1544                 stream_name = tok.replace('\\040', ' ')
1545                 blocks = []
1546                 segments = []
1547                 streamoffset = 0L
1548                 state = BLOCKS
1549                 self.find_or_create(stream_name, COLLECTION)
1550                 continue
1551
1552             if state == BLOCKS:
1553                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1554                 if block_locator:
1555                     blocksize = long(block_locator.group(1))
1556                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1557                     streamoffset += blocksize
1558                 else:
1559                     state = SEGMENTS
1560
1561             if state == SEGMENTS:
1562                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1563                 if file_segment:
1564                     pos = long(file_segment.group(1))
1565                     size = long(file_segment.group(2))
1566                     name = file_segment.group(3).replace('\\040', ' ')
1567                     filepath = os.path.join(stream_name, name)
1568                     afile = self.find_or_create(filepath, FILE)
1569                     if isinstance(afile, ArvadosFile):
1570                         afile.add_segment(blocks, pos, size)
1571                     else:
1572                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1573                 else:
1574                     # error!
1575                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1576
1577             if sep == "\n":
1578                 stream_name = None
1579                 state = STREAM_NAME
1580
1581         self.set_committed()
1582
1583     @synchronized
1584     def notify(self, event, collection, name, item):
1585         if self._callback:
1586             self._callback(event, collection, name, item)
1587
1588
1589 class Subcollection(RichCollectionBase):
1590     """This is a subdirectory within a collection that doesn't have its own API
1591     server record.
1592
1593     Subcollection locking falls under the umbrella lock of its root collection.
1594
1595     """
1596
1597     def __init__(self, parent, name):
1598         super(Subcollection, self).__init__(parent)
1599         self.lock = self.root_collection().lock
1600         self._manifest_text = None
1601         self.name = name
1602         self.num_retries = parent.num_retries
1603
1604     def root_collection(self):
1605         return self.parent.root_collection()
1606
1607     def writable(self):
1608         return self.root_collection().writable()
1609
1610     def _my_api(self):
1611         return self.root_collection()._my_api()
1612
1613     def _my_keep(self):
1614         return self.root_collection()._my_keep()
1615
1616     def _my_block_manager(self):
1617         return self.root_collection()._my_block_manager()
1618
1619     def stream_name(self):
1620         return os.path.join(self.parent.stream_name(), self.name)
1621
1622     @synchronized
1623     def clone(self, new_parent, new_name):
1624         c = Subcollection(new_parent, new_name)
1625         c._clonefrom(self)
1626         return c
1627
1628     @must_be_writable
1629     @synchronized
1630     def _reparent(self, newparent, newname):
1631         self._committed = False
1632         self.flush()
1633         self.parent.remove(self.name, recursive=True)
1634         self.parent = newparent
1635         self.name = newname
1636         self.lock = self.parent.root_collection().lock
1637
1638
1639 class CollectionReader(Collection):
1640     """A read-only collection object.
1641
1642     Initialize from an api collection record locator, a portable data hash of a
1643     manifest, or raw manifest text.  See `Collection` constructor for detailed
1644     options.
1645
1646     """
1647     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1648         self._in_init = True
1649         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1650         self._in_init = False
1651
1652         # Forego any locking since it should never change once initialized.
1653         self.lock = NoopLock()
1654
1655         # Backwards compatability with old CollectionReader
1656         # all_streams() and all_files()
1657         self._streams = None
1658
1659     def writable(self):
1660         return self._in_init
1661
1662     def _populate_streams(orig_func):
1663         @functools.wraps(orig_func)
1664         def populate_streams_wrapper(self, *args, **kwargs):
1665             # Defer populating self._streams until needed since it creates a copy of the manifest.
1666             if self._streams is None:
1667                 if self._manifest_text:
1668                     self._streams = [sline.split()
1669                                      for sline in self._manifest_text.split("\n")
1670                                      if sline]
1671                 else:
1672                     self._streams = []
1673             return orig_func(self, *args, **kwargs)
1674         return populate_streams_wrapper
1675
1676     @_populate_streams
1677     def normalize(self):
1678         """Normalize the streams returned by `all_streams`.
1679
1680         This method is kept for backwards compatability and only affects the
1681         behavior of `all_streams()` and `all_files()`
1682
1683         """
1684
1685         # Rearrange streams
1686         streams = {}
1687         for s in self.all_streams():
1688             for f in s.all_files():
1689                 streamname, filename = split(s.name() + "/" + f.name())
1690                 if streamname not in streams:
1691                     streams[streamname] = {}
1692                 if filename not in streams[streamname]:
1693                     streams[streamname][filename] = []
1694                 for r in f.segments:
1695                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1696
1697         self._streams = [normalize_stream(s, streams[s])
1698                          for s in sorted(streams)]
1699     @_populate_streams
1700     def all_streams(self):
1701         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1702                 for s in self._streams]
1703
1704     @_populate_streams
1705     def all_files(self):
1706         for s in self.all_streams():
1707             for f in s.all_files():
1708                 yield f