]> git.arvados.org - arvados.git/blob - sdk/python/arvados/collection.py
11308: Eliminate old_div().
[arvados.git] / sdk / python / arvados / collection.py
1 from __future__ import absolute_import
2 from builtins import str
3 from past.builtins import basestring
4 from builtins import object
5 import functools
6 import logging
7 import os
8 import re
9 import errno
10 import hashlib
11 import time
12 import threading
13
14 from collections import deque
15 from stat import *
16
17 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
18 from .keep import KeepLocator, KeepClient
19 from .stream import StreamReader
20 from ._normalize_stream import normalize_stream
21 from ._ranges import Range, LocatorAndRange
22 from .safeapi import ThreadSafeApiCache
23 import arvados.config as config
24 import arvados.errors as errors
25 import arvados.util
26 import arvados.events as events
27 from arvados.retry import retry_method
28
29 _logger = logging.getLogger('arvados.collection')
30
31 class CollectionBase(object):
32     def __enter__(self):
33         return self
34
35     def __exit__(self, exc_type, exc_value, traceback):
36         pass
37
38     def _my_keep(self):
39         if self._keep_client is None:
40             self._keep_client = KeepClient(api_client=self._api_client,
41                                            num_retries=self.num_retries)
42         return self._keep_client
43
44     def stripped_manifest(self):
45         """Get the manifest with locator hints stripped.
46
47         Return the manifest for the current collection with all
48         non-portable hints (i.e., permission signatures and other
49         hints other than size hints) removed from the locators.
50         """
51         raw = self.manifest_text()
52         clean = []
53         for line in raw.split("\n"):
54             fields = line.split()
55             if fields:
56                 clean_fields = fields[:1] + [
57                     (re.sub(r'\+[^\d][^\+]*', '', x)
58                      if re.match(arvados.util.keep_locator_pattern, x)
59                      else x)
60                     for x in fields[1:]]
61                 clean += [' '.join(clean_fields), "\n"]
62         return ''.join(clean)
63
64
65 class _WriterFile(_FileLikeObjectBase):
66     def __init__(self, coll_writer, name):
67         super(_WriterFile, self).__init__(name, 'wb')
68         self.dest = coll_writer
69
70     def close(self):
71         super(_WriterFile, self).close()
72         self.dest.finish_current_file()
73
74     @_FileLikeObjectBase._before_close
75     def write(self, data):
76         self.dest.write(data)
77
78     @_FileLikeObjectBase._before_close
79     def writelines(self, seq):
80         for data in seq:
81             self.write(data)
82
83     @_FileLikeObjectBase._before_close
84     def flush(self):
85         self.dest.flush_data()
86
87
88 class CollectionWriter(CollectionBase):
89     def __init__(self, api_client=None, num_retries=0, replication=None):
90         """Instantiate a CollectionWriter.
91
92         CollectionWriter lets you build a new Arvados Collection from scratch.
93         Write files to it.  The CollectionWriter will upload data to Keep as
94         appropriate, and provide you with the Collection manifest text when
95         you're finished.
96
97         Arguments:
98         * api_client: The API client to use to look up Collections.  If not
99           provided, CollectionReader will build one from available Arvados
100           configuration.
101         * num_retries: The default number of times to retry failed
102           service requests.  Default 0.  You may change this value
103           after instantiation, but note those changes may not
104           propagate to related objects like the Keep client.
105         * replication: The number of copies of each block to store.
106           If this argument is None or not supplied, replication is
107           the server-provided default if available, otherwise 2.
108         """
109         self._api_client = api_client
110         self.num_retries = num_retries
111         self.replication = (2 if replication is None else replication)
112         self._keep_client = None
113         self._data_buffer = []
114         self._data_buffer_len = 0
115         self._current_stream_files = []
116         self._current_stream_length = 0
117         self._current_stream_locators = []
118         self._current_stream_name = '.'
119         self._current_file_name = None
120         self._current_file_pos = 0
121         self._finished_streams = []
122         self._close_file = None
123         self._queued_file = None
124         self._queued_dirents = deque()
125         self._queued_trees = deque()
126         self._last_open = None
127
128     def __exit__(self, exc_type, exc_value, traceback):
129         if exc_type is None:
130             self.finish()
131
132     def do_queued_work(self):
133         # The work queue consists of three pieces:
134         # * _queued_file: The file object we're currently writing to the
135         #   Collection.
136         # * _queued_dirents: Entries under the current directory
137         #   (_queued_trees[0]) that we want to write or recurse through.
138         #   This may contain files from subdirectories if
139         #   max_manifest_depth == 0 for this directory.
140         # * _queued_trees: Directories that should be written as separate
141         #   streams to the Collection.
142         # This function handles the smallest piece of work currently queued
143         # (current file, then current directory, then next directory) until
144         # no work remains.  The _work_THING methods each do a unit of work on
145         # THING.  _queue_THING methods add a THING to the work queue.
146         while True:
147             if self._queued_file:
148                 self._work_file()
149             elif self._queued_dirents:
150                 self._work_dirents()
151             elif self._queued_trees:
152                 self._work_trees()
153             else:
154                 break
155
156     def _work_file(self):
157         while True:
158             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
159             if not buf:
160                 break
161             self.write(buf)
162         self.finish_current_file()
163         if self._close_file:
164             self._queued_file.close()
165         self._close_file = None
166         self._queued_file = None
167
168     def _work_dirents(self):
169         path, stream_name, max_manifest_depth = self._queued_trees[0]
170         if stream_name != self.current_stream_name():
171             self.start_new_stream(stream_name)
172         while self._queued_dirents:
173             dirent = self._queued_dirents.popleft()
174             target = os.path.join(path, dirent)
175             if os.path.isdir(target):
176                 self._queue_tree(target,
177                                  os.path.join(stream_name, dirent),
178                                  max_manifest_depth - 1)
179             else:
180                 self._queue_file(target, dirent)
181                 break
182         if not self._queued_dirents:
183             self._queued_trees.popleft()
184
185     def _work_trees(self):
186         path, stream_name, max_manifest_depth = self._queued_trees[0]
187         d = arvados.util.listdir_recursive(
188             path, max_depth = (None if max_manifest_depth == 0 else 0))
189         if d:
190             self._queue_dirents(stream_name, d)
191         else:
192             self._queued_trees.popleft()
193
194     def _queue_file(self, source, filename=None):
195         assert (self._queued_file is None), "tried to queue more than one file"
196         if not hasattr(source, 'read'):
197             source = open(source, 'rb')
198             self._close_file = True
199         else:
200             self._close_file = False
201         if filename is None:
202             filename = os.path.basename(source.name)
203         self.start_new_file(filename)
204         self._queued_file = source
205
206     def _queue_dirents(self, stream_name, dirents):
207         assert (not self._queued_dirents), "tried to queue more than one tree"
208         self._queued_dirents = deque(sorted(dirents))
209
210     def _queue_tree(self, path, stream_name, max_manifest_depth):
211         self._queued_trees.append((path, stream_name, max_manifest_depth))
212
213     def write_file(self, source, filename=None):
214         self._queue_file(source, filename)
215         self.do_queued_work()
216
217     def write_directory_tree(self,
218                              path, stream_name='.', max_manifest_depth=-1):
219         self._queue_tree(path, stream_name, max_manifest_depth)
220         self.do_queued_work()
221
222     def write(self, newdata):
223         if hasattr(newdata, '__iter__'):
224             for s in newdata:
225                 self.write(s)
226             return
227         self._data_buffer.append(newdata)
228         self._data_buffer_len += len(newdata)
229         self._current_stream_length += len(newdata)
230         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
231             self.flush_data()
232
233     def open(self, streampath, filename=None):
234         """open(streampath[, filename]) -> file-like object
235
236         Pass in the path of a file to write to the Collection, either as a
237         single string or as two separate stream name and file name arguments.
238         This method returns a file-like object you can write to add it to the
239         Collection.
240
241         You may only have one file object from the Collection open at a time,
242         so be sure to close the object when you're done.  Using the object in
243         a with statement makes that easy::
244
245           with cwriter.open('./doc/page1.txt') as outfile:
246               outfile.write(page1_data)
247           with cwriter.open('./doc/page2.txt') as outfile:
248               outfile.write(page2_data)
249         """
250         if filename is None:
251             streampath, filename = split(streampath)
252         if self._last_open and not self._last_open.closed:
253             raise errors.AssertionError(
254                 "can't open '{}' when '{}' is still open".format(
255                     filename, self._last_open.name))
256         if streampath != self.current_stream_name():
257             self.start_new_stream(streampath)
258         self.set_current_file_name(filename)
259         self._last_open = _WriterFile(self, filename)
260         return self._last_open
261
262     def flush_data(self):
263         data_buffer = ''.join(self._data_buffer)
264         if data_buffer:
265             self._current_stream_locators.append(
266                 self._my_keep().put(
267                     data_buffer[0:config.KEEP_BLOCK_SIZE],
268                     copies=self.replication))
269             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
270             self._data_buffer_len = len(self._data_buffer[0])
271
272     def start_new_file(self, newfilename=None):
273         self.finish_current_file()
274         self.set_current_file_name(newfilename)
275
276     def set_current_file_name(self, newfilename):
277         if re.search(r'[\t\n]', newfilename):
278             raise errors.AssertionError(
279                 "Manifest filenames cannot contain whitespace: %s" %
280                 newfilename)
281         elif re.search(r'\x00', newfilename):
282             raise errors.AssertionError(
283                 "Manifest filenames cannot contain NUL characters: %s" %
284                 newfilename)
285         self._current_file_name = newfilename
286
287     def current_file_name(self):
288         return self._current_file_name
289
290     def finish_current_file(self):
291         if self._current_file_name is None:
292             if self._current_file_pos == self._current_stream_length:
293                 return
294             raise errors.AssertionError(
295                 "Cannot finish an unnamed file " +
296                 "(%d bytes at offset %d in '%s' stream)" %
297                 (self._current_stream_length - self._current_file_pos,
298                  self._current_file_pos,
299                  self._current_stream_name))
300         self._current_stream_files.append([
301                 self._current_file_pos,
302                 self._current_stream_length - self._current_file_pos,
303                 self._current_file_name])
304         self._current_file_pos = self._current_stream_length
305         self._current_file_name = None
306
307     def start_new_stream(self, newstreamname='.'):
308         self.finish_current_stream()
309         self.set_current_stream_name(newstreamname)
310
311     def set_current_stream_name(self, newstreamname):
312         if re.search(r'[\t\n]', newstreamname):
313             raise errors.AssertionError(
314                 "Manifest stream names cannot contain whitespace: '%s'" %
315                 (newstreamname))
316         self._current_stream_name = '.' if newstreamname=='' else newstreamname
317
318     def current_stream_name(self):
319         return self._current_stream_name
320
321     def finish_current_stream(self):
322         self.finish_current_file()
323         self.flush_data()
324         if not self._current_stream_files:
325             pass
326         elif self._current_stream_name is None:
327             raise errors.AssertionError(
328                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
329                 (self._current_stream_length, len(self._current_stream_files)))
330         else:
331             if not self._current_stream_locators:
332                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
333             self._finished_streams.append([self._current_stream_name,
334                                            self._current_stream_locators,
335                                            self._current_stream_files])
336         self._current_stream_files = []
337         self._current_stream_length = 0
338         self._current_stream_locators = []
339         self._current_stream_name = None
340         self._current_file_pos = 0
341         self._current_file_name = None
342
343     def finish(self):
344         """Store the manifest in Keep and return its locator.
345
346         This is useful for storing manifest fragments (task outputs)
347         temporarily in Keep during a Crunch job.
348
349         In other cases you should make a collection instead, by
350         sending manifest_text() to the API server's "create
351         collection" endpoint.
352         """
353         return self._my_keep().put(self.manifest_text(), copies=self.replication)
354
355     def portable_data_hash(self):
356         stripped = self.stripped_manifest()
357         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
358
359     def manifest_text(self):
360         self.finish_current_stream()
361         manifest = ''
362
363         for stream in self._finished_streams:
364             if not re.search(r'^\.(/.*)?$', stream[0]):
365                 manifest += './'
366             manifest += stream[0].replace(' ', '\\040')
367             manifest += ' ' + ' '.join(stream[1])
368             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
369             manifest += "\n"
370
371         return manifest
372
373     def data_locators(self):
374         ret = []
375         for name, locators, files in self._finished_streams:
376             ret += locators
377         return ret
378
379
380 class ResumableCollectionWriter(CollectionWriter):
381     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
382                    '_current_stream_locators', '_current_stream_name',
383                    '_current_file_name', '_current_file_pos', '_close_file',
384                    '_data_buffer', '_dependencies', '_finished_streams',
385                    '_queued_dirents', '_queued_trees']
386
387     def __init__(self, api_client=None, **kwargs):
388         self._dependencies = {}
389         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
390
391     @classmethod
392     def from_state(cls, state, *init_args, **init_kwargs):
393         # Try to build a new writer from scratch with the given state.
394         # If the state is not suitable to resume (because files have changed,
395         # been deleted, aren't predictable, etc.), raise a
396         # StaleWriterStateError.  Otherwise, return the initialized writer.
397         # The caller is responsible for calling writer.do_queued_work()
398         # appropriately after it's returned.
399         writer = cls(*init_args, **init_kwargs)
400         for attr_name in cls.STATE_PROPS:
401             attr_value = state[attr_name]
402             attr_class = getattr(writer, attr_name).__class__
403             # Coerce the value into the same type as the initial value, if
404             # needed.
405             if attr_class not in (type(None), attr_value.__class__):
406                 attr_value = attr_class(attr_value)
407             setattr(writer, attr_name, attr_value)
408         # Check dependencies before we try to resume anything.
409         if any(KeepLocator(ls).permission_expired()
410                for ls in writer._current_stream_locators):
411             raise errors.StaleWriterStateError(
412                 "locators include expired permission hint")
413         writer.check_dependencies()
414         if state['_current_file'] is not None:
415             path, pos = state['_current_file']
416             try:
417                 writer._queued_file = open(path, 'rb')
418                 writer._queued_file.seek(pos)
419             except IOError as error:
420                 raise errors.StaleWriterStateError(
421                     "failed to reopen active file {}: {}".format(path, error))
422         return writer
423
424     def check_dependencies(self):
425         for path, orig_stat in list(self._dependencies.items()):
426             if not S_ISREG(orig_stat[ST_MODE]):
427                 raise errors.StaleWriterStateError("{} not file".format(path))
428             try:
429                 now_stat = tuple(os.stat(path))
430             except OSError as error:
431                 raise errors.StaleWriterStateError(
432                     "failed to stat {}: {}".format(path, error))
433             if ((not S_ISREG(now_stat[ST_MODE])) or
434                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
435                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
436                 raise errors.StaleWriterStateError("{} changed".format(path))
437
438     def dump_state(self, copy_func=lambda x: x):
439         state = {attr: copy_func(getattr(self, attr))
440                  for attr in self.STATE_PROPS}
441         if self._queued_file is None:
442             state['_current_file'] = None
443         else:
444             state['_current_file'] = (os.path.realpath(self._queued_file.name),
445                                       self._queued_file.tell())
446         return state
447
448     def _queue_file(self, source, filename=None):
449         try:
450             src_path = os.path.realpath(source)
451         except Exception:
452             raise errors.AssertionError("{} not a file path".format(source))
453         try:
454             path_stat = os.stat(src_path)
455         except OSError as stat_error:
456             path_stat = None
457         super(ResumableCollectionWriter, self)._queue_file(source, filename)
458         fd_stat = os.fstat(self._queued_file.fileno())
459         if not S_ISREG(fd_stat.st_mode):
460             # We won't be able to resume from this cache anyway, so don't
461             # worry about further checks.
462             self._dependencies[source] = tuple(fd_stat)
463         elif path_stat is None:
464             raise errors.AssertionError(
465                 "could not stat {}: {}".format(source, stat_error))
466         elif path_stat.st_ino != fd_stat.st_ino:
467             raise errors.AssertionError(
468                 "{} changed between open and stat calls".format(source))
469         else:
470             self._dependencies[src_path] = tuple(fd_stat)
471
472     def write(self, data):
473         if self._queued_file is None:
474             raise errors.AssertionError(
475                 "resumable writer can't accept unsourced data")
476         return super(ResumableCollectionWriter, self).write(data)
477
478
479 ADD = "add"
480 DEL = "del"
481 MOD = "mod"
482 TOK = "tok"
483 FILE = "file"
484 COLLECTION = "collection"
485
486 class RichCollectionBase(CollectionBase):
487     """Base class for Collections and Subcollections.
488
489     Implements the majority of functionality relating to accessing items in the
490     Collection.
491
492     """
493
494     def __init__(self, parent=None):
495         self.parent = parent
496         self._committed = False
497         self._callback = None
498         self._items = {}
499
500     def _my_api(self):
501         raise NotImplementedError()
502
503     def _my_keep(self):
504         raise NotImplementedError()
505
506     def _my_block_manager(self):
507         raise NotImplementedError()
508
509     def writable(self):
510         raise NotImplementedError()
511
512     def root_collection(self):
513         raise NotImplementedError()
514
515     def notify(self, event, collection, name, item):
516         raise NotImplementedError()
517
518     def stream_name(self):
519         raise NotImplementedError()
520
521     @must_be_writable
522     @synchronized
523     def find_or_create(self, path, create_type):
524         """Recursively search the specified file path.
525
526         May return either a `Collection` or `ArvadosFile`.  If not found, will
527         create a new item at the specified path based on `create_type`.  Will
528         create intermediate subcollections needed to contain the final item in
529         the path.
530
531         :create_type:
532           One of `arvados.collection.FILE` or
533           `arvados.collection.COLLECTION`.  If the path is not found, and value
534           of create_type is FILE then create and return a new ArvadosFile for
535           the last path component.  If COLLECTION, then create and return a new
536           Collection for the last path component.
537
538         """
539
540         pathcomponents = path.split("/", 1)
541         if pathcomponents[0]:
542             item = self._items.get(pathcomponents[0])
543             if len(pathcomponents) == 1:
544                 if item is None:
545                     # create new file
546                     if create_type == COLLECTION:
547                         item = Subcollection(self, pathcomponents[0])
548                     else:
549                         item = ArvadosFile(self, pathcomponents[0])
550                     self._items[pathcomponents[0]] = item
551                     self.set_committed(False)
552                     self.notify(ADD, self, pathcomponents[0], item)
553                 return item
554             else:
555                 if item is None:
556                     # create new collection
557                     item = Subcollection(self, pathcomponents[0])
558                     self._items[pathcomponents[0]] = item
559                     self.set_committed(False)
560                     self.notify(ADD, self, pathcomponents[0], item)
561                 if isinstance(item, RichCollectionBase):
562                     return item.find_or_create(pathcomponents[1], create_type)
563                 else:
564                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
565         else:
566             return self
567
568     @synchronized
569     def find(self, path):
570         """Recursively search the specified file path.
571
572         May return either a Collection or ArvadosFile. Return None if not
573         found.
574         If path is invalid (ex: starts with '/'), an IOError exception will be
575         raised.
576
577         """
578         if not path:
579             raise errors.ArgumentError("Parameter 'path' is empty.")
580
581         pathcomponents = path.split("/", 1)
582         if pathcomponents[0] == '':
583             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
584
585         item = self._items.get(pathcomponents[0])
586         if item is None:
587             return None
588         elif len(pathcomponents) == 1:
589             return item
590         else:
591             if isinstance(item, RichCollectionBase):
592                 if pathcomponents[1]:
593                     return item.find(pathcomponents[1])
594                 else:
595                     return item
596             else:
597                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
598
599     @synchronized
600     def mkdirs(self, path):
601         """Recursive subcollection create.
602
603         Like `os.makedirs()`.  Will create intermediate subcollections needed
604         to contain the leaf subcollection path.
605
606         """
607
608         if self.find(path) != None:
609             raise IOError(errno.EEXIST, "Directory or file exists", path)
610
611         return self.find_or_create(path, COLLECTION)
612
613     def open(self, path, mode="r"):
614         """Open a file-like object for access.
615
616         :path:
617           path to a file in the collection
618         :mode:
619           one of "r", "r+", "w", "w+", "a", "a+"
620           :"r":
621             opens for reading
622           :"r+":
623             opens for reading and writing.  Reads/writes share a file pointer.
624           :"w", "w+":
625             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
626           :"a", "a+":
627             opens for reading and writing.  All writes are appended to
628             the end of the file.  Writing does not affect the file pointer for
629             reading.
630         """
631         mode = mode.replace("b", "")
632         if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
633             raise errors.ArgumentError("Bad mode '%s'" % mode)
634         create = (mode != "r")
635
636         if create and not self.writable():
637             raise IOError(errno.EROFS, "Collection is read only")
638
639         if create:
640             arvfile = self.find_or_create(path, FILE)
641         else:
642             arvfile = self.find(path)
643
644         if arvfile is None:
645             raise IOError(errno.ENOENT, "File not found", path)
646         if not isinstance(arvfile, ArvadosFile):
647             raise IOError(errno.EISDIR, "Is a directory", path)
648
649         if mode[0] == "w":
650             arvfile.truncate(0)
651
652         name = os.path.basename(path)
653
654         if mode == "r":
655             return ArvadosFileReader(arvfile, num_retries=self.num_retries)
656         else:
657             return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
658
659     def modified(self):
660         """Determine if the collection has been modified since last commited."""
661         return not self.committed()
662
663     @synchronized
664     def committed(self):
665         """Determine if the collection has been committed to the API server."""
666         return self._committed
667
668     @synchronized
669     def set_committed(self, value=True):
670         """Recursively set committed flag.
671
672         If value is True, set committed to be True for this and all children.
673
674         If value is False, set committed to be False for this and all parents.
675         """
676         if value == self._committed:
677             return
678         if value:
679             for k,v in list(self._items.items()):
680                 v.set_committed(True)
681             self._committed = True
682         else:
683             self._committed = False
684             if self.parent is not None:
685                 self.parent.set_committed(False)
686
687     @synchronized
688     def __iter__(self):
689         """Iterate over names of files and collections contained in this collection."""
690         return iter(list(self._items.keys()))
691
692     @synchronized
693     def __getitem__(self, k):
694         """Get a file or collection that is directly contained by this collection.
695
696         If you want to search a path, use `find()` instead.
697
698         """
699         return self._items[k]
700
701     @synchronized
702     def __contains__(self, k):
703         """Test if there is a file or collection a directly contained by this collection."""
704         return k in self._items
705
706     @synchronized
707     def __len__(self):
708         """Get the number of items directly contained in this collection."""
709         return len(self._items)
710
711     @must_be_writable
712     @synchronized
713     def __delitem__(self, p):
714         """Delete an item by name which is directly contained by this collection."""
715         del self._items[p]
716         self.set_committed(False)
717         self.notify(DEL, self, p, None)
718
719     @synchronized
720     def keys(self):
721         """Get a list of names of files and collections directly contained in this collection."""
722         return list(self._items.keys())
723
724     @synchronized
725     def values(self):
726         """Get a list of files and collection objects directly contained in this collection."""
727         return list(self._items.values())
728
729     @synchronized
730     def items(self):
731         """Get a list of (name, object) tuples directly contained in this collection."""
732         return list(self._items.items())
733
734     def exists(self, path):
735         """Test if there is a file or collection at `path`."""
736         return self.find(path) is not None
737
738     @must_be_writable
739     @synchronized
740     def remove(self, path, recursive=False):
741         """Remove the file or subcollection (directory) at `path`.
742
743         :recursive:
744           Specify whether to remove non-empty subcollections (True), or raise an error (False).
745         """
746
747         if not path:
748             raise errors.ArgumentError("Parameter 'path' is empty.")
749
750         pathcomponents = path.split("/", 1)
751         item = self._items.get(pathcomponents[0])
752         if item is None:
753             raise IOError(errno.ENOENT, "File not found", path)
754         if len(pathcomponents) == 1:
755             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
756                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
757             deleteditem = self._items[pathcomponents[0]]
758             del self._items[pathcomponents[0]]
759             self.set_committed(False)
760             self.notify(DEL, self, pathcomponents[0], deleteditem)
761         else:
762             item.remove(pathcomponents[1])
763
764     def _clonefrom(self, source):
765         for k,v in list(source.items()):
766             self._items[k] = v.clone(self, k)
767
768     def clone(self):
769         raise NotImplementedError()
770
771     @must_be_writable
772     @synchronized
773     def add(self, source_obj, target_name, overwrite=False, reparent=False):
774         """Copy or move a file or subcollection to this collection.
775
776         :source_obj:
777           An ArvadosFile, or Subcollection object
778
779         :target_name:
780           Destination item name.  If the target name already exists and is a
781           file, this will raise an error unless you specify `overwrite=True`.
782
783         :overwrite:
784           Whether to overwrite target file if it already exists.
785
786         :reparent:
787           If True, source_obj will be moved from its parent collection to this collection.
788           If False, source_obj will be copied and the parent collection will be
789           unmodified.
790
791         """
792
793         if target_name in self and not overwrite:
794             raise IOError(errno.EEXIST, "File already exists", target_name)
795
796         modified_from = None
797         if target_name in self:
798             modified_from = self[target_name]
799
800         # Actually make the move or copy.
801         if reparent:
802             source_obj._reparent(self, target_name)
803             item = source_obj
804         else:
805             item = source_obj.clone(self, target_name)
806
807         self._items[target_name] = item
808         self.set_committed(False)
809
810         if modified_from:
811             self.notify(MOD, self, target_name, (modified_from, item))
812         else:
813             self.notify(ADD, self, target_name, item)
814
815     def _get_src_target(self, source, target_path, source_collection, create_dest):
816         if source_collection is None:
817             source_collection = self
818
819         # Find the object
820         if isinstance(source, basestring):
821             source_obj = source_collection.find(source)
822             if source_obj is None:
823                 raise IOError(errno.ENOENT, "File not found", source)
824             sourcecomponents = source.split("/")
825         else:
826             source_obj = source
827             sourcecomponents = None
828
829         # Find parent collection the target path
830         targetcomponents = target_path.split("/")
831
832         # Determine the name to use.
833         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
834
835         if not target_name:
836             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
837
838         if create_dest:
839             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
840         else:
841             if len(targetcomponents) > 1:
842                 target_dir = self.find("/".join(targetcomponents[0:-1]))
843             else:
844                 target_dir = self
845
846         if target_dir is None:
847             raise IOError(errno.ENOENT, "Target directory not found", target_name)
848
849         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
850             target_dir = target_dir[target_name]
851             target_name = sourcecomponents[-1]
852
853         return (source_obj, target_dir, target_name)
854
855     @must_be_writable
856     @synchronized
857     def copy(self, source, target_path, source_collection=None, overwrite=False):
858         """Copy a file or subcollection to a new path in this collection.
859
860         :source:
861           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
862
863         :target_path:
864           Destination file or path.  If the target path already exists and is a
865           subcollection, the item will be placed inside the subcollection.  If
866           the target path already exists and is a file, this will raise an error
867           unless you specify `overwrite=True`.
868
869         :source_collection:
870           Collection to copy `source_path` from (default `self`)
871
872         :overwrite:
873           Whether to overwrite target file if it already exists.
874         """
875
876         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
877         target_dir.add(source_obj, target_name, overwrite, False)
878
879     @must_be_writable
880     @synchronized
881     def rename(self, source, target_path, source_collection=None, overwrite=False):
882         """Move a file or subcollection from `source_collection` to a new path in this collection.
883
884         :source:
885           A string with a path to source file or subcollection.
886
887         :target_path:
888           Destination file or path.  If the target path already exists and is a
889           subcollection, the item will be placed inside the subcollection.  If
890           the target path already exists and is a file, this will raise an error
891           unless you specify `overwrite=True`.
892
893         :source_collection:
894           Collection to copy `source_path` from (default `self`)
895
896         :overwrite:
897           Whether to overwrite target file if it already exists.
898         """
899
900         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
901         if not source_obj.writable():
902             raise IOError(errno.EROFS, "Source collection is read only", source)
903         target_dir.add(source_obj, target_name, overwrite, True)
904
905     def portable_manifest_text(self, stream_name="."):
906         """Get the manifest text for this collection, sub collections and files.
907
908         This method does not flush outstanding blocks to Keep.  It will return
909         a normalized manifest with access tokens stripped.
910
911         :stream_name:
912           Name to use for this stream (directory)
913
914         """
915         return self._get_manifest_text(stream_name, True, True)
916
917     @synchronized
918     def manifest_text(self, stream_name=".", strip=False, normalize=False,
919                       only_committed=False):
920         """Get the manifest text for this collection, sub collections and files.
921
922         This method will flush outstanding blocks to Keep.  By default, it will
923         not normalize an unmodified manifest or strip access tokens.
924
925         :stream_name:
926           Name to use for this stream (directory)
927
928         :strip:
929           If True, remove signing tokens from block locators if present.
930           If False (default), block locators are left unchanged.
931
932         :normalize:
933           If True, always export the manifest text in normalized form
934           even if the Collection is not modified.  If False (default) and the collection
935           is not modified, return the original manifest text even if it is not
936           in normalized form.
937
938         :only_committed:
939           If True, don't commit pending blocks.
940
941         """
942
943         if not only_committed:
944             self._my_block_manager().commit_all()
945         return self._get_manifest_text(stream_name, strip, normalize,
946                                        only_committed=only_committed)
947
948     @synchronized
949     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
950         """Get the manifest text for this collection, sub collections and files.
951
952         :stream_name:
953           Name to use for this stream (directory)
954
955         :strip:
956           If True, remove signing tokens from block locators if present.
957           If False (default), block locators are left unchanged.
958
959         :normalize:
960           If True, always export the manifest text in normalized form
961           even if the Collection is not modified.  If False (default) and the collection
962           is not modified, return the original manifest text even if it is not
963           in normalized form.
964
965         :only_committed:
966           If True, only include blocks that were already committed to Keep.
967
968         """
969
970         if not self.committed() or self._manifest_text is None or normalize:
971             stream = {}
972             buf = []
973             sorted_keys = sorted(self.keys())
974             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
975                 # Create a stream per file `k`
976                 arvfile = self[filename]
977                 filestream = []
978                 for segment in arvfile.segments():
979                     loc = segment.locator
980                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
981                         if only_committed:
982                             continue
983                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
984                     if strip:
985                         loc = KeepLocator(loc).stripped()
986                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
987                                          segment.segment_offset, segment.range_size))
988                 stream[filename] = filestream
989             if stream:
990                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
991             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
992                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
993             return "".join(buf)
994         else:
995             if strip:
996                 return self.stripped_manifest()
997             else:
998                 return self._manifest_text
999
1000     @synchronized
1001     def diff(self, end_collection, prefix=".", holding_collection=None):
1002         """Generate list of add/modify/delete actions.
1003
1004         When given to `apply`, will change `self` to match `end_collection`
1005
1006         """
1007         changes = []
1008         if holding_collection is None:
1009             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1010         for k in self:
1011             if k not in end_collection:
1012                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1013         for k in end_collection:
1014             if k in self:
1015                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1016                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1017                 elif end_collection[k] != self[k]:
1018                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1019                 else:
1020                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1021             else:
1022                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1023         return changes
1024
1025     @must_be_writable
1026     @synchronized
1027     def apply(self, changes):
1028         """Apply changes from `diff`.
1029
1030         If a change conflicts with a local change, it will be saved to an
1031         alternate path indicating the conflict.
1032
1033         """
1034         if changes:
1035             self.set_committed(False)
1036         for change in changes:
1037             event_type = change[0]
1038             path = change[1]
1039             initial = change[2]
1040             local = self.find(path)
1041             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1042                                                                     time.gmtime()))
1043             if event_type == ADD:
1044                 if local is None:
1045                     # No local file at path, safe to copy over new file
1046                     self.copy(initial, path)
1047                 elif local is not None and local != initial:
1048                     # There is already local file and it is different:
1049                     # save change to conflict file.
1050                     self.copy(initial, conflictpath)
1051             elif event_type == MOD or event_type == TOK:
1052                 final = change[3]
1053                 if local == initial:
1054                     # Local matches the "initial" item so it has not
1055                     # changed locally and is safe to update.
1056                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1057                         # Replace contents of local file with new contents
1058                         local.replace_contents(final)
1059                     else:
1060                         # Overwrite path with new item; this can happen if
1061                         # path was a file and is now a collection or vice versa
1062                         self.copy(final, path, overwrite=True)
1063                 else:
1064                     # Local is missing (presumably deleted) or local doesn't
1065                     # match the "start" value, so save change to conflict file
1066                     self.copy(final, conflictpath)
1067             elif event_type == DEL:
1068                 if local == initial:
1069                     # Local item matches "initial" value, so it is safe to remove.
1070                     self.remove(path, recursive=True)
1071                 # else, the file is modified or already removed, in either
1072                 # case we don't want to try to remove it.
1073
1074     def portable_data_hash(self):
1075         """Get the portable data hash for this collection's manifest."""
1076         if self._manifest_locator and self.committed():
1077             # If the collection is already saved on the API server, and it's committed
1078             # then return API server's PDH response.
1079             return self._portable_data_hash
1080         else:
1081             stripped = self.portable_manifest_text()
1082             return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1083
1084     @synchronized
1085     def subscribe(self, callback):
1086         if self._callback is None:
1087             self._callback = callback
1088         else:
1089             raise errors.ArgumentError("A callback is already set on this collection.")
1090
1091     @synchronized
1092     def unsubscribe(self):
1093         if self._callback is not None:
1094             self._callback = None
1095
1096     @synchronized
1097     def notify(self, event, collection, name, item):
1098         if self._callback:
1099             self._callback(event, collection, name, item)
1100         self.root_collection().notify(event, collection, name, item)
1101
1102     @synchronized
1103     def __eq__(self, other):
1104         if other is self:
1105             return True
1106         if not isinstance(other, RichCollectionBase):
1107             return False
1108         if len(self._items) != len(other):
1109             return False
1110         for k in self._items:
1111             if k not in other:
1112                 return False
1113             if self._items[k] != other[k]:
1114                 return False
1115         return True
1116
1117     def __ne__(self, other):
1118         return not self.__eq__(other)
1119
1120     @synchronized
1121     def flush(self):
1122         """Flush bufferblocks to Keep."""
1123         for e in list(self.values()):
1124             e.flush()
1125
1126
1127 class Collection(RichCollectionBase):
1128     """Represents the root of an Arvados Collection.
1129
1130     This class is threadsafe.  The root collection object, all subcollections
1131     and files are protected by a single lock (i.e. each access locks the entire
1132     collection).
1133
1134     Brief summary of
1135     useful methods:
1136
1137     :To read an existing file:
1138       `c.open("myfile", "r")`
1139
1140     :To write a new file:
1141       `c.open("myfile", "w")`
1142
1143     :To determine if a file exists:
1144       `c.find("myfile") is not None`
1145
1146     :To copy a file:
1147       `c.copy("source", "dest")`
1148
1149     :To delete a file:
1150       `c.remove("myfile")`
1151
1152     :To save to an existing collection record:
1153       `c.save()`
1154
1155     :To save a new collection record:
1156     `c.save_new()`
1157
1158     :To merge remote changes into this object:
1159       `c.update()`
1160
1161     Must be associated with an API server Collection record (during
1162     initialization, or using `save_new`) to use `save` or `update`
1163
1164     """
1165
1166     def __init__(self, manifest_locator_or_text=None,
1167                  api_client=None,
1168                  keep_client=None,
1169                  num_retries=None,
1170                  parent=None,
1171                  apiconfig=None,
1172                  block_manager=None,
1173                  replication_desired=None,
1174                  put_threads=None):
1175         """Collection constructor.
1176
1177         :manifest_locator_or_text:
1178           One of Arvados collection UUID, block locator of
1179           a manifest, raw manifest text, or None (to create an empty collection).
1180         :parent:
1181           the parent Collection, may be None.
1182
1183         :apiconfig:
1184           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1185           Prefer this over supplying your own api_client and keep_client (except in testing).
1186           Will use default config settings if not specified.
1187
1188         :api_client:
1189           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1190
1191         :keep_client:
1192           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1193
1194         :num_retries:
1195           the number of retries for API and Keep requests.
1196
1197         :block_manager:
1198           the block manager to use.  If not specified, create one.
1199
1200         :replication_desired:
1201           How many copies should Arvados maintain. If None, API server default
1202           configuration applies. If not None, this value will also be used
1203           for determining the number of block copies being written.
1204
1205         """
1206         super(Collection, self).__init__(parent)
1207         self._api_client = api_client
1208         self._keep_client = keep_client
1209         self._block_manager = block_manager
1210         self.replication_desired = replication_desired
1211         self.put_threads = put_threads
1212
1213         if apiconfig:
1214             self._config = apiconfig
1215         else:
1216             self._config = config.settings()
1217
1218         self.num_retries = num_retries if num_retries is not None else 0
1219         self._manifest_locator = None
1220         self._manifest_text = None
1221         self._portable_data_hash = None
1222         self._api_response = None
1223         self._past_versions = set()
1224
1225         self.lock = threading.RLock()
1226         self.events = None
1227
1228         if manifest_locator_or_text:
1229             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1230                 self._manifest_locator = manifest_locator_or_text
1231             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1232                 self._manifest_locator = manifest_locator_or_text
1233             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1234                 self._manifest_text = manifest_locator_or_text
1235             else:
1236                 raise errors.ArgumentError(
1237                     "Argument to CollectionReader is not a manifest or a collection UUID")
1238
1239             try:
1240                 self._populate()
1241             except (IOError, errors.SyntaxError) as e:
1242                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1243
1244     def root_collection(self):
1245         return self
1246
1247     def stream_name(self):
1248         return "."
1249
1250     def writable(self):
1251         return True
1252
1253     @synchronized
1254     def known_past_version(self, modified_at_and_portable_data_hash):
1255         return modified_at_and_portable_data_hash in self._past_versions
1256
1257     @synchronized
1258     @retry_method
1259     def update(self, other=None, num_retries=None):
1260         """Merge the latest collection on the API server with the current collection."""
1261
1262         if other is None:
1263             if self._manifest_locator is None:
1264                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1265             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1266             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1267                 response.get("portable_data_hash") != self.portable_data_hash()):
1268                 # The record on the server is different from our current one, but we've seen it before,
1269                 # so ignore it because it's already been merged.
1270                 # However, if it's the same as our current record, proceed with the update, because we want to update
1271                 # our tokens.
1272                 return
1273             else:
1274                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1275             other = CollectionReader(response["manifest_text"])
1276         baseline = CollectionReader(self._manifest_text)
1277         self.apply(baseline.diff(other))
1278         self._manifest_text = self.manifest_text()
1279
1280     @synchronized
1281     def _my_api(self):
1282         if self._api_client is None:
1283             self._api_client = ThreadSafeApiCache(self._config)
1284             if self._keep_client is None:
1285                 self._keep_client = self._api_client.keep
1286         return self._api_client
1287
1288     @synchronized
1289     def _my_keep(self):
1290         if self._keep_client is None:
1291             if self._api_client is None:
1292                 self._my_api()
1293             else:
1294                 self._keep_client = KeepClient(api_client=self._api_client)
1295         return self._keep_client
1296
1297     @synchronized
1298     def _my_block_manager(self):
1299         if self._block_manager is None:
1300             copies = (self.replication_desired or
1301                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1302                                                    2))
1303             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1304         return self._block_manager
1305
1306     def _remember_api_response(self, response):
1307         self._api_response = response
1308         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1309
1310     def _populate_from_api_server(self):
1311         # As in KeepClient itself, we must wait until the last
1312         # possible moment to instantiate an API client, in order to
1313         # avoid tripping up clients that don't have access to an API
1314         # server.  If we do build one, make sure our Keep client uses
1315         # it.  If instantiation fails, we'll fall back to the except
1316         # clause, just like any other Collection lookup
1317         # failure. Return an exception, or None if successful.
1318         try:
1319             self._remember_api_response(self._my_api().collections().get(
1320                 uuid=self._manifest_locator).execute(
1321                     num_retries=self.num_retries))
1322             self._manifest_text = self._api_response['manifest_text']
1323             self._portable_data_hash = self._api_response['portable_data_hash']
1324             # If not overriden via kwargs, we should try to load the
1325             # replication_desired from the API server
1326             if self.replication_desired is None:
1327                 self.replication_desired = self._api_response.get('replication_desired', None)
1328             return None
1329         except Exception as e:
1330             return e
1331
1332     def _populate_from_keep(self):
1333         # Retrieve a manifest directly from Keep. This has a chance of
1334         # working if [a] the locator includes a permission signature
1335         # or [b] the Keep services are operating in world-readable
1336         # mode. Return an exception, or None if successful.
1337         try:
1338             self._manifest_text = self._my_keep().get(
1339                 self._manifest_locator, num_retries=self.num_retries)
1340         except Exception as e:
1341             return e
1342
1343     def _populate(self):
1344         if self._manifest_locator is None and self._manifest_text is None:
1345             return
1346         error_via_api = None
1347         error_via_keep = None
1348         should_try_keep = ((self._manifest_text is None) and
1349                            arvados.util.keep_locator_pattern.match(
1350                                self._manifest_locator))
1351         if ((self._manifest_text is None) and
1352             arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1353             error_via_keep = self._populate_from_keep()
1354         if self._manifest_text is None:
1355             error_via_api = self._populate_from_api_server()
1356             if error_via_api is not None and not should_try_keep:
1357                 raise error_via_api
1358         if ((self._manifest_text is None) and
1359             not error_via_keep and
1360             should_try_keep):
1361             # Looks like a keep locator, and we didn't already try keep above
1362             error_via_keep = self._populate_from_keep()
1363         if self._manifest_text is None:
1364             # Nothing worked!
1365             raise errors.NotFoundError(
1366                 ("Failed to retrieve collection '{}' " +
1367                  "from either API server ({}) or Keep ({})."
1368                  ).format(
1369                     self._manifest_locator,
1370                     error_via_api,
1371                     error_via_keep))
1372         # populate
1373         self._baseline_manifest = self._manifest_text
1374         self._import_manifest(self._manifest_text)
1375
1376
1377     def _has_collection_uuid(self):
1378         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1379
1380     def __enter__(self):
1381         return self
1382
1383     def __exit__(self, exc_type, exc_value, traceback):
1384         """Support scoped auto-commit in a with: block."""
1385         if exc_type is None:
1386             if self.writable() and self._has_collection_uuid():
1387                 self.save()
1388         self.stop_threads()
1389
1390     def stop_threads(self):
1391         if self._block_manager is not None:
1392             self._block_manager.stop_threads()
1393
1394     @synchronized
1395     def manifest_locator(self):
1396         """Get the manifest locator, if any.
1397
1398         The manifest locator will be set when the collection is loaded from an
1399         API server record or the portable data hash of a manifest.
1400
1401         The manifest locator will be None if the collection is newly created or
1402         was created directly from manifest text.  The method `save_new()` will
1403         assign a manifest locator.
1404
1405         """
1406         return self._manifest_locator
1407
1408     @synchronized
1409     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1410         if new_config is None:
1411             new_config = self._config
1412         if readonly:
1413             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1414         else:
1415             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1416
1417         newcollection._clonefrom(self)
1418         return newcollection
1419
1420     @synchronized
1421     def api_response(self):
1422         """Returns information about this Collection fetched from the API server.
1423
1424         If the Collection exists in Keep but not the API server, currently
1425         returns None.  Future versions may provide a synthetic response.
1426
1427         """
1428         return self._api_response
1429
1430     def find_or_create(self, path, create_type):
1431         """See `RichCollectionBase.find_or_create`"""
1432         if path == ".":
1433             return self
1434         else:
1435             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1436
1437     def find(self, path):
1438         """See `RichCollectionBase.find`"""
1439         if path == ".":
1440             return self
1441         else:
1442             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1443
1444     def remove(self, path, recursive=False):
1445         """See `RichCollectionBase.remove`"""
1446         if path == ".":
1447             raise errors.ArgumentError("Cannot remove '.'")
1448         else:
1449             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1450
1451     @must_be_writable
1452     @synchronized
1453     @retry_method
1454     def save(self, merge=True, num_retries=None):
1455         """Save collection to an existing collection record.
1456
1457         Commit pending buffer blocks to Keep, merge with remote record (if
1458         merge=True, the default), and update the collection record.  Returns
1459         the current manifest text.
1460
1461         Will raise AssertionError if not associated with a collection record on
1462         the API server.  If you want to save a manifest to Keep only, see
1463         `save_new()`.
1464
1465         :merge:
1466           Update and merge remote changes before saving.  Otherwise, any
1467           remote changes will be ignored and overwritten.
1468
1469         :num_retries:
1470           Retry count on API calls (if None,  use the collection default)
1471
1472         """
1473         if not self.committed():
1474             if not self._has_collection_uuid():
1475                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1476
1477             self._my_block_manager().commit_all()
1478
1479             if merge:
1480                 self.update()
1481
1482             text = self.manifest_text(strip=False)
1483             self._remember_api_response(self._my_api().collections().update(
1484                 uuid=self._manifest_locator,
1485                 body={'manifest_text': text}
1486                 ).execute(
1487                     num_retries=num_retries))
1488             self._manifest_text = self._api_response["manifest_text"]
1489             self._portable_data_hash = self._api_response["portable_data_hash"]
1490             self.set_committed(True)
1491
1492         return self._manifest_text
1493
1494
1495     @must_be_writable
1496     @synchronized
1497     @retry_method
1498     def save_new(self, name=None,
1499                  create_collection_record=True,
1500                  owner_uuid=None,
1501                  ensure_unique_name=False,
1502                  num_retries=None):
1503         """Save collection to a new collection record.
1504
1505         Commit pending buffer blocks to Keep and, when create_collection_record
1506         is True (default), create a new collection record.  After creating a
1507         new collection record, this Collection object will be associated with
1508         the new record used by `save()`.  Returns the current manifest text.
1509
1510         :name:
1511           The collection name.
1512
1513         :create_collection_record:
1514            If True, create a collection record on the API server.
1515            If False, only commit blocks to Keep and return the manifest text.
1516
1517         :owner_uuid:
1518           the user, or project uuid that will own this collection.
1519           If None, defaults to the current user.
1520
1521         :ensure_unique_name:
1522           If True, ask the API server to rename the collection
1523           if it conflicts with a collection with the same name and owner.  If
1524           False, a name conflict will result in an error.
1525
1526         :num_retries:
1527           Retry count on API calls (if None,  use the collection default)
1528
1529         """
1530         self._my_block_manager().commit_all()
1531         text = self.manifest_text(strip=False)
1532
1533         if create_collection_record:
1534             if name is None:
1535                 name = "New collection"
1536                 ensure_unique_name = True
1537
1538             body = {"manifest_text": text,
1539                     "name": name,
1540                     "replication_desired": self.replication_desired}
1541             if owner_uuid:
1542                 body["owner_uuid"] = owner_uuid
1543
1544             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1545             text = self._api_response["manifest_text"]
1546
1547             self._manifest_locator = self._api_response["uuid"]
1548             self._portable_data_hash = self._api_response["portable_data_hash"]
1549
1550             self._manifest_text = text
1551             self.set_committed(True)
1552
1553         return text
1554
1555     @synchronized
1556     def _import_manifest(self, manifest_text):
1557         """Import a manifest into a `Collection`.
1558
1559         :manifest_text:
1560           The manifest text to import from.
1561
1562         """
1563         if len(self) > 0:
1564             raise ArgumentError("Can only import manifest into an empty collection")
1565
1566         STREAM_NAME = 0
1567         BLOCKS = 1
1568         SEGMENTS = 2
1569
1570         stream_name = None
1571         state = STREAM_NAME
1572
1573         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1574             tok = token_and_separator.group(1)
1575             sep = token_and_separator.group(2)
1576
1577             if state == STREAM_NAME:
1578                 # starting a new stream
1579                 stream_name = tok.replace('\\040', ' ')
1580                 blocks = []
1581                 segments = []
1582                 streamoffset = 0
1583                 state = BLOCKS
1584                 self.find_or_create(stream_name, COLLECTION)
1585                 continue
1586
1587             if state == BLOCKS:
1588                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1589                 if block_locator:
1590                     blocksize = int(block_locator.group(1))
1591                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1592                     streamoffset += blocksize
1593                 else:
1594                     state = SEGMENTS
1595
1596             if state == SEGMENTS:
1597                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1598                 if file_segment:
1599                     pos = int(file_segment.group(1))
1600                     size = int(file_segment.group(2))
1601                     name = file_segment.group(3).replace('\\040', ' ')
1602                     filepath = os.path.join(stream_name, name)
1603                     afile = self.find_or_create(filepath, FILE)
1604                     if isinstance(afile, ArvadosFile):
1605                         afile.add_segment(blocks, pos, size)
1606                     else:
1607                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1608                 else:
1609                     # error!
1610                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1611
1612             if sep == "\n":
1613                 stream_name = None
1614                 state = STREAM_NAME
1615
1616         self.set_committed(True)
1617
1618     @synchronized
1619     def notify(self, event, collection, name, item):
1620         if self._callback:
1621             self._callback(event, collection, name, item)
1622
1623
1624 class Subcollection(RichCollectionBase):
1625     """This is a subdirectory within a collection that doesn't have its own API
1626     server record.
1627
1628     Subcollection locking falls under the umbrella lock of its root collection.
1629
1630     """
1631
1632     def __init__(self, parent, name):
1633         super(Subcollection, self).__init__(parent)
1634         self.lock = self.root_collection().lock
1635         self._manifest_text = None
1636         self.name = name
1637         self.num_retries = parent.num_retries
1638
1639     def root_collection(self):
1640         return self.parent.root_collection()
1641
1642     def writable(self):
1643         return self.root_collection().writable()
1644
1645     def _my_api(self):
1646         return self.root_collection()._my_api()
1647
1648     def _my_keep(self):
1649         return self.root_collection()._my_keep()
1650
1651     def _my_block_manager(self):
1652         return self.root_collection()._my_block_manager()
1653
1654     def stream_name(self):
1655         return os.path.join(self.parent.stream_name(), self.name)
1656
1657     @synchronized
1658     def clone(self, new_parent, new_name):
1659         c = Subcollection(new_parent, new_name)
1660         c._clonefrom(self)
1661         return c
1662
1663     @must_be_writable
1664     @synchronized
1665     def _reparent(self, newparent, newname):
1666         self.set_committed(False)
1667         self.flush()
1668         self.parent.remove(self.name, recursive=True)
1669         self.parent = newparent
1670         self.name = newname
1671         self.lock = self.parent.root_collection().lock
1672
1673
1674 class CollectionReader(Collection):
1675     """A read-only collection object.
1676
1677     Initialize from an api collection record locator, a portable data hash of a
1678     manifest, or raw manifest text.  See `Collection` constructor for detailed
1679     options.
1680
1681     """
1682     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1683         self._in_init = True
1684         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1685         self._in_init = False
1686
1687         # Forego any locking since it should never change once initialized.
1688         self.lock = NoopLock()
1689
1690         # Backwards compatability with old CollectionReader
1691         # all_streams() and all_files()
1692         self._streams = None
1693
1694     def writable(self):
1695         return self._in_init
1696
1697     def _populate_streams(orig_func):
1698         @functools.wraps(orig_func)
1699         def populate_streams_wrapper(self, *args, **kwargs):
1700             # Defer populating self._streams until needed since it creates a copy of the manifest.
1701             if self._streams is None:
1702                 if self._manifest_text:
1703                     self._streams = [sline.split()
1704                                      for sline in self._manifest_text.split("\n")
1705                                      if sline]
1706                 else:
1707                     self._streams = []
1708             return orig_func(self, *args, **kwargs)
1709         return populate_streams_wrapper
1710
1711     @_populate_streams
1712     def normalize(self):
1713         """Normalize the streams returned by `all_streams`.
1714
1715         This method is kept for backwards compatability and only affects the
1716         behavior of `all_streams()` and `all_files()`
1717
1718         """
1719
1720         # Rearrange streams
1721         streams = {}
1722         for s in self.all_streams():
1723             for f in s.all_files():
1724                 streamname, filename = split(s.name() + "/" + f.name())
1725                 if streamname not in streams:
1726                     streams[streamname] = {}
1727                 if filename not in streams[streamname]:
1728                     streams[streamname][filename] = []
1729                 for r in f.segments:
1730                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1731
1732         self._streams = [normalize_stream(s, streams[s])
1733                          for s in sorted(streams)]
1734     @_populate_streams
1735     def all_streams(self):
1736         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1737                 for s in self._streams]
1738
1739     @_populate_streams
1740     def all_files(self):
1741         for s in self.all_streams():
1742             for f in s.all_files():
1743                 yield f