Merge branch '13108-cwl-parallel-submit' closes #13108
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import functools
11 import logging
12 import os
13 import re
14 import errno
15 import hashlib
16 import time
17 import threading
18
19 from collections import deque
20 from stat import *
21
22 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
23 from .keep import KeepLocator, KeepClient
24 from .stream import StreamReader
25 from ._normalize_stream import normalize_stream
26 from ._ranges import Range, LocatorAndRange
27 from .safeapi import ThreadSafeApiCache
28 import arvados.config as config
29 import arvados.errors as errors
30 import arvados.util
31 import arvados.events as events
32 from arvados.retry import retry_method
33
34 _logger = logging.getLogger('arvados.collection')
35
36 class CollectionBase(object):
37     """Abstract base class for Collection classes."""
38
39     def __enter__(self):
40         return self
41
42     def __exit__(self, exc_type, exc_value, traceback):
43         pass
44
45     def _my_keep(self):
46         if self._keep_client is None:
47             self._keep_client = KeepClient(api_client=self._api_client,
48                                            num_retries=self.num_retries)
49         return self._keep_client
50
51     def stripped_manifest(self):
52         """Get the manifest with locator hints stripped.
53
54         Return the manifest for the current collection with all
55         non-portable hints (i.e., permission signatures and other
56         hints other than size hints) removed from the locators.
57         """
58         raw = self.manifest_text()
59         clean = []
60         for line in raw.split("\n"):
61             fields = line.split()
62             if fields:
63                 clean_fields = fields[:1] + [
64                     (re.sub(r'\+[^\d][^\+]*', '', x)
65                      if re.match(arvados.util.keep_locator_pattern, x)
66                      else x)
67                     for x in fields[1:]]
68                 clean += [' '.join(clean_fields), "\n"]
69         return ''.join(clean)
70
71
72 class _WriterFile(_FileLikeObjectBase):
73     def __init__(self, coll_writer, name):
74         super(_WriterFile, self).__init__(name, 'wb')
75         self.dest = coll_writer
76
77     def close(self):
78         super(_WriterFile, self).close()
79         self.dest.finish_current_file()
80
81     @_FileLikeObjectBase._before_close
82     def write(self, data):
83         self.dest.write(data)
84
85     @_FileLikeObjectBase._before_close
86     def writelines(self, seq):
87         for data in seq:
88             self.write(data)
89
90     @_FileLikeObjectBase._before_close
91     def flush(self):
92         self.dest.flush_data()
93
94
95 class CollectionWriter(CollectionBase):
96     """Deprecated, use Collection instead."""
97
98     def __init__(self, api_client=None, num_retries=0, replication=None):
99         """Instantiate a CollectionWriter.
100
101         CollectionWriter lets you build a new Arvados Collection from scratch.
102         Write files to it.  The CollectionWriter will upload data to Keep as
103         appropriate, and provide you with the Collection manifest text when
104         you're finished.
105
106         Arguments:
107         * api_client: The API client to use to look up Collections.  If not
108           provided, CollectionReader will build one from available Arvados
109           configuration.
110         * num_retries: The default number of times to retry failed
111           service requests.  Default 0.  You may change this value
112           after instantiation, but note those changes may not
113           propagate to related objects like the Keep client.
114         * replication: The number of copies of each block to store.
115           If this argument is None or not supplied, replication is
116           the server-provided default if available, otherwise 2.
117         """
118         self._api_client = api_client
119         self.num_retries = num_retries
120         self.replication = (2 if replication is None else replication)
121         self._keep_client = None
122         self._data_buffer = []
123         self._data_buffer_len = 0
124         self._current_stream_files = []
125         self._current_stream_length = 0
126         self._current_stream_locators = []
127         self._current_stream_name = '.'
128         self._current_file_name = None
129         self._current_file_pos = 0
130         self._finished_streams = []
131         self._close_file = None
132         self._queued_file = None
133         self._queued_dirents = deque()
134         self._queued_trees = deque()
135         self._last_open = None
136
137     def __exit__(self, exc_type, exc_value, traceback):
138         if exc_type is None:
139             self.finish()
140
141     def do_queued_work(self):
142         # The work queue consists of three pieces:
143         # * _queued_file: The file object we're currently writing to the
144         #   Collection.
145         # * _queued_dirents: Entries under the current directory
146         #   (_queued_trees[0]) that we want to write or recurse through.
147         #   This may contain files from subdirectories if
148         #   max_manifest_depth == 0 for this directory.
149         # * _queued_trees: Directories that should be written as separate
150         #   streams to the Collection.
151         # This function handles the smallest piece of work currently queued
152         # (current file, then current directory, then next directory) until
153         # no work remains.  The _work_THING methods each do a unit of work on
154         # THING.  _queue_THING methods add a THING to the work queue.
155         while True:
156             if self._queued_file:
157                 self._work_file()
158             elif self._queued_dirents:
159                 self._work_dirents()
160             elif self._queued_trees:
161                 self._work_trees()
162             else:
163                 break
164
165     def _work_file(self):
166         while True:
167             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
168             if not buf:
169                 break
170             self.write(buf)
171         self.finish_current_file()
172         if self._close_file:
173             self._queued_file.close()
174         self._close_file = None
175         self._queued_file = None
176
177     def _work_dirents(self):
178         path, stream_name, max_manifest_depth = self._queued_trees[0]
179         if stream_name != self.current_stream_name():
180             self.start_new_stream(stream_name)
181         while self._queued_dirents:
182             dirent = self._queued_dirents.popleft()
183             target = os.path.join(path, dirent)
184             if os.path.isdir(target):
185                 self._queue_tree(target,
186                                  os.path.join(stream_name, dirent),
187                                  max_manifest_depth - 1)
188             else:
189                 self._queue_file(target, dirent)
190                 break
191         if not self._queued_dirents:
192             self._queued_trees.popleft()
193
194     def _work_trees(self):
195         path, stream_name, max_manifest_depth = self._queued_trees[0]
196         d = arvados.util.listdir_recursive(
197             path, max_depth = (None if max_manifest_depth == 0 else 0))
198         if d:
199             self._queue_dirents(stream_name, d)
200         else:
201             self._queued_trees.popleft()
202
203     def _queue_file(self, source, filename=None):
204         assert (self._queued_file is None), "tried to queue more than one file"
205         if not hasattr(source, 'read'):
206             source = open(source, 'rb')
207             self._close_file = True
208         else:
209             self._close_file = False
210         if filename is None:
211             filename = os.path.basename(source.name)
212         self.start_new_file(filename)
213         self._queued_file = source
214
215     def _queue_dirents(self, stream_name, dirents):
216         assert (not self._queued_dirents), "tried to queue more than one tree"
217         self._queued_dirents = deque(sorted(dirents))
218
219     def _queue_tree(self, path, stream_name, max_manifest_depth):
220         self._queued_trees.append((path, stream_name, max_manifest_depth))
221
222     def write_file(self, source, filename=None):
223         self._queue_file(source, filename)
224         self.do_queued_work()
225
226     def write_directory_tree(self,
227                              path, stream_name='.', max_manifest_depth=-1):
228         self._queue_tree(path, stream_name, max_manifest_depth)
229         self.do_queued_work()
230
231     def write(self, newdata):
232         if isinstance(newdata, bytes):
233             pass
234         elif isinstance(newdata, str):
235             newdata = newdata.encode()
236         elif hasattr(newdata, '__iter__'):
237             for s in newdata:
238                 self.write(s)
239             return
240         self._data_buffer.append(newdata)
241         self._data_buffer_len += len(newdata)
242         self._current_stream_length += len(newdata)
243         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
244             self.flush_data()
245
246     def open(self, streampath, filename=None):
247         """open(streampath[, filename]) -> file-like object
248
249         Pass in the path of a file to write to the Collection, either as a
250         single string or as two separate stream name and file name arguments.
251         This method returns a file-like object you can write to add it to the
252         Collection.
253
254         You may only have one file object from the Collection open at a time,
255         so be sure to close the object when you're done.  Using the object in
256         a with statement makes that easy::
257
258           with cwriter.open('./doc/page1.txt') as outfile:
259               outfile.write(page1_data)
260           with cwriter.open('./doc/page2.txt') as outfile:
261               outfile.write(page2_data)
262         """
263         if filename is None:
264             streampath, filename = split(streampath)
265         if self._last_open and not self._last_open.closed:
266             raise errors.AssertionError(
267                 "can't open '{}' when '{}' is still open".format(
268                     filename, self._last_open.name))
269         if streampath != self.current_stream_name():
270             self.start_new_stream(streampath)
271         self.set_current_file_name(filename)
272         self._last_open = _WriterFile(self, filename)
273         return self._last_open
274
275     def flush_data(self):
276         data_buffer = b''.join(self._data_buffer)
277         if data_buffer:
278             self._current_stream_locators.append(
279                 self._my_keep().put(
280                     data_buffer[0:config.KEEP_BLOCK_SIZE],
281                     copies=self.replication))
282             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
283             self._data_buffer_len = len(self._data_buffer[0])
284
285     def start_new_file(self, newfilename=None):
286         self.finish_current_file()
287         self.set_current_file_name(newfilename)
288
289     def set_current_file_name(self, newfilename):
290         if re.search(r'[\t\n]', newfilename):
291             raise errors.AssertionError(
292                 "Manifest filenames cannot contain whitespace: %s" %
293                 newfilename)
294         elif re.search(r'\x00', newfilename):
295             raise errors.AssertionError(
296                 "Manifest filenames cannot contain NUL characters: %s" %
297                 newfilename)
298         self._current_file_name = newfilename
299
300     def current_file_name(self):
301         return self._current_file_name
302
303     def finish_current_file(self):
304         if self._current_file_name is None:
305             if self._current_file_pos == self._current_stream_length:
306                 return
307             raise errors.AssertionError(
308                 "Cannot finish an unnamed file " +
309                 "(%d bytes at offset %d in '%s' stream)" %
310                 (self._current_stream_length - self._current_file_pos,
311                  self._current_file_pos,
312                  self._current_stream_name))
313         self._current_stream_files.append([
314                 self._current_file_pos,
315                 self._current_stream_length - self._current_file_pos,
316                 self._current_file_name])
317         self._current_file_pos = self._current_stream_length
318         self._current_file_name = None
319
320     def start_new_stream(self, newstreamname='.'):
321         self.finish_current_stream()
322         self.set_current_stream_name(newstreamname)
323
324     def set_current_stream_name(self, newstreamname):
325         if re.search(r'[\t\n]', newstreamname):
326             raise errors.AssertionError(
327                 "Manifest stream names cannot contain whitespace: '%s'" %
328                 (newstreamname))
329         self._current_stream_name = '.' if newstreamname=='' else newstreamname
330
331     def current_stream_name(self):
332         return self._current_stream_name
333
334     def finish_current_stream(self):
335         self.finish_current_file()
336         self.flush_data()
337         if not self._current_stream_files:
338             pass
339         elif self._current_stream_name is None:
340             raise errors.AssertionError(
341                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
342                 (self._current_stream_length, len(self._current_stream_files)))
343         else:
344             if not self._current_stream_locators:
345                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
346             self._finished_streams.append([self._current_stream_name,
347                                            self._current_stream_locators,
348                                            self._current_stream_files])
349         self._current_stream_files = []
350         self._current_stream_length = 0
351         self._current_stream_locators = []
352         self._current_stream_name = None
353         self._current_file_pos = 0
354         self._current_file_name = None
355
356     def finish(self):
357         """Store the manifest in Keep and return its locator.
358
359         This is useful for storing manifest fragments (task outputs)
360         temporarily in Keep during a Crunch job.
361
362         In other cases you should make a collection instead, by
363         sending manifest_text() to the API server's "create
364         collection" endpoint.
365         """
366         return self._my_keep().put(self.manifest_text().encode(),
367                                    copies=self.replication)
368
369     def portable_data_hash(self):
370         stripped = self.stripped_manifest().encode()
371         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
372
373     def manifest_text(self):
374         self.finish_current_stream()
375         manifest = ''
376
377         for stream in self._finished_streams:
378             if not re.search(r'^\.(/.*)?$', stream[0]):
379                 manifest += './'
380             manifest += stream[0].replace(' ', '\\040')
381             manifest += ' ' + ' '.join(stream[1])
382             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
383             manifest += "\n"
384
385         return manifest
386
387     def data_locators(self):
388         ret = []
389         for name, locators, files in self._finished_streams:
390             ret += locators
391         return ret
392
393     def save_new(self, name=None):
394         return self._api_client.collections().create(
395             ensure_unique_name=True,
396             body={
397                 'name': name,
398                 'manifest_text': self.manifest_text(),
399             }).execute(num_retries=self.num_retries)
400
401
402 class ResumableCollectionWriter(CollectionWriter):
403     """Deprecated, use Collection instead."""
404
405     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
406                    '_current_stream_locators', '_current_stream_name',
407                    '_current_file_name', '_current_file_pos', '_close_file',
408                    '_data_buffer', '_dependencies', '_finished_streams',
409                    '_queued_dirents', '_queued_trees']
410
411     def __init__(self, api_client=None, **kwargs):
412         self._dependencies = {}
413         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
414
415     @classmethod
416     def from_state(cls, state, *init_args, **init_kwargs):
417         # Try to build a new writer from scratch with the given state.
418         # If the state is not suitable to resume (because files have changed,
419         # been deleted, aren't predictable, etc.), raise a
420         # StaleWriterStateError.  Otherwise, return the initialized writer.
421         # The caller is responsible for calling writer.do_queued_work()
422         # appropriately after it's returned.
423         writer = cls(*init_args, **init_kwargs)
424         for attr_name in cls.STATE_PROPS:
425             attr_value = state[attr_name]
426             attr_class = getattr(writer, attr_name).__class__
427             # Coerce the value into the same type as the initial value, if
428             # needed.
429             if attr_class not in (type(None), attr_value.__class__):
430                 attr_value = attr_class(attr_value)
431             setattr(writer, attr_name, attr_value)
432         # Check dependencies before we try to resume anything.
433         if any(KeepLocator(ls).permission_expired()
434                for ls in writer._current_stream_locators):
435             raise errors.StaleWriterStateError(
436                 "locators include expired permission hint")
437         writer.check_dependencies()
438         if state['_current_file'] is not None:
439             path, pos = state['_current_file']
440             try:
441                 writer._queued_file = open(path, 'rb')
442                 writer._queued_file.seek(pos)
443             except IOError as error:
444                 raise errors.StaleWriterStateError(
445                     "failed to reopen active file {}: {}".format(path, error))
446         return writer
447
448     def check_dependencies(self):
449         for path, orig_stat in listitems(self._dependencies):
450             if not S_ISREG(orig_stat[ST_MODE]):
451                 raise errors.StaleWriterStateError("{} not file".format(path))
452             try:
453                 now_stat = tuple(os.stat(path))
454             except OSError as error:
455                 raise errors.StaleWriterStateError(
456                     "failed to stat {}: {}".format(path, error))
457             if ((not S_ISREG(now_stat[ST_MODE])) or
458                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
459                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
460                 raise errors.StaleWriterStateError("{} changed".format(path))
461
462     def dump_state(self, copy_func=lambda x: x):
463         state = {attr: copy_func(getattr(self, attr))
464                  for attr in self.STATE_PROPS}
465         if self._queued_file is None:
466             state['_current_file'] = None
467         else:
468             state['_current_file'] = (os.path.realpath(self._queued_file.name),
469                                       self._queued_file.tell())
470         return state
471
472     def _queue_file(self, source, filename=None):
473         try:
474             src_path = os.path.realpath(source)
475         except Exception:
476             raise errors.AssertionError("{} not a file path".format(source))
477         try:
478             path_stat = os.stat(src_path)
479         except OSError as stat_error:
480             path_stat = None
481         super(ResumableCollectionWriter, self)._queue_file(source, filename)
482         fd_stat = os.fstat(self._queued_file.fileno())
483         if not S_ISREG(fd_stat.st_mode):
484             # We won't be able to resume from this cache anyway, so don't
485             # worry about further checks.
486             self._dependencies[source] = tuple(fd_stat)
487         elif path_stat is None:
488             raise errors.AssertionError(
489                 "could not stat {}: {}".format(source, stat_error))
490         elif path_stat.st_ino != fd_stat.st_ino:
491             raise errors.AssertionError(
492                 "{} changed between open and stat calls".format(source))
493         else:
494             self._dependencies[src_path] = tuple(fd_stat)
495
496     def write(self, data):
497         if self._queued_file is None:
498             raise errors.AssertionError(
499                 "resumable writer can't accept unsourced data")
500         return super(ResumableCollectionWriter, self).write(data)
501
502
503 ADD = "add"
504 DEL = "del"
505 MOD = "mod"
506 TOK = "tok"
507 FILE = "file"
508 COLLECTION = "collection"
509
510 class RichCollectionBase(CollectionBase):
511     """Base class for Collections and Subcollections.
512
513     Implements the majority of functionality relating to accessing items in the
514     Collection.
515
516     """
517
518     def __init__(self, parent=None):
519         self.parent = parent
520         self._committed = False
521         self._callback = None
522         self._items = {}
523
524     def _my_api(self):
525         raise NotImplementedError()
526
527     def _my_keep(self):
528         raise NotImplementedError()
529
530     def _my_block_manager(self):
531         raise NotImplementedError()
532
533     def writable(self):
534         raise NotImplementedError()
535
536     def root_collection(self):
537         raise NotImplementedError()
538
539     def notify(self, event, collection, name, item):
540         raise NotImplementedError()
541
542     def stream_name(self):
543         raise NotImplementedError()
544
545     @must_be_writable
546     @synchronized
547     def find_or_create(self, path, create_type):
548         """Recursively search the specified file path.
549
550         May return either a `Collection` or `ArvadosFile`.  If not found, will
551         create a new item at the specified path based on `create_type`.  Will
552         create intermediate subcollections needed to contain the final item in
553         the path.
554
555         :create_type:
556           One of `arvados.collection.FILE` or
557           `arvados.collection.COLLECTION`.  If the path is not found, and value
558           of create_type is FILE then create and return a new ArvadosFile for
559           the last path component.  If COLLECTION, then create and return a new
560           Collection for the last path component.
561
562         """
563
564         pathcomponents = path.split("/", 1)
565         if pathcomponents[0]:
566             item = self._items.get(pathcomponents[0])
567             if len(pathcomponents) == 1:
568                 if item is None:
569                     # create new file
570                     if create_type == COLLECTION:
571                         item = Subcollection(self, pathcomponents[0])
572                     else:
573                         item = ArvadosFile(self, pathcomponents[0])
574                     self._items[pathcomponents[0]] = item
575                     self.set_committed(False)
576                     self.notify(ADD, self, pathcomponents[0], item)
577                 return item
578             else:
579                 if item is None:
580                     # create new collection
581                     item = Subcollection(self, pathcomponents[0])
582                     self._items[pathcomponents[0]] = item
583                     self.set_committed(False)
584                     self.notify(ADD, self, pathcomponents[0], item)
585                 if isinstance(item, RichCollectionBase):
586                     return item.find_or_create(pathcomponents[1], create_type)
587                 else:
588                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
589         else:
590             return self
591
592     @synchronized
593     def find(self, path):
594         """Recursively search the specified file path.
595
596         May return either a Collection or ArvadosFile. Return None if not
597         found.
598         If path is invalid (ex: starts with '/'), an IOError exception will be
599         raised.
600
601         """
602         if not path:
603             raise errors.ArgumentError("Parameter 'path' is empty.")
604
605         pathcomponents = path.split("/", 1)
606         if pathcomponents[0] == '':
607             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
608
609         item = self._items.get(pathcomponents[0])
610         if item is None:
611             return None
612         elif len(pathcomponents) == 1:
613             return item
614         else:
615             if isinstance(item, RichCollectionBase):
616                 if pathcomponents[1]:
617                     return item.find(pathcomponents[1])
618                 else:
619                     return item
620             else:
621                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
622
623     @synchronized
624     def mkdirs(self, path):
625         """Recursive subcollection create.
626
627         Like `os.makedirs()`.  Will create intermediate subcollections needed
628         to contain the leaf subcollection path.
629
630         """
631
632         if self.find(path) != None:
633             raise IOError(errno.EEXIST, "Directory or file exists", path)
634
635         return self.find_or_create(path, COLLECTION)
636
637     def open(self, path, mode="r"):
638         """Open a file-like object for access.
639
640         :path:
641           path to a file in the collection
642         :mode:
643           a string consisting of "r", "w", or "a", optionally followed
644           by "b" or "t", optionally followed by "+".
645           :"b":
646             binary mode: write() accepts bytes, read() returns bytes.
647           :"t":
648             text mode (default): write() accepts strings, read() returns strings.
649           :"r":
650             opens for reading
651           :"r+":
652             opens for reading and writing.  Reads/writes share a file pointer.
653           :"w", "w+":
654             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
655           :"a", "a+":
656             opens for reading and writing.  All writes are appended to
657             the end of the file.  Writing does not affect the file pointer for
658             reading.
659         """
660
661         if not re.search(r'^[rwa][bt]?\+?$', mode):
662             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
663
664         if mode[0] == 'r' and '+' not in mode:
665             fclass = ArvadosFileReader
666             arvfile = self.find(path)
667         elif not self.writable():
668             raise IOError(errno.EROFS, "Collection is read only")
669         else:
670             fclass = ArvadosFileWriter
671             arvfile = self.find_or_create(path, FILE)
672
673         if arvfile is None:
674             raise IOError(errno.ENOENT, "File not found", path)
675         if not isinstance(arvfile, ArvadosFile):
676             raise IOError(errno.EISDIR, "Is a directory", path)
677
678         if mode[0] == 'w':
679             arvfile.truncate(0)
680
681         return fclass(arvfile, mode=mode, num_retries=self.num_retries)
682
683     def modified(self):
684         """Determine if the collection has been modified since last commited."""
685         return not self.committed()
686
687     @synchronized
688     def committed(self):
689         """Determine if the collection has been committed to the API server."""
690         return self._committed
691
692     @synchronized
693     def set_committed(self, value=True):
694         """Recursively set committed flag.
695
696         If value is True, set committed to be True for this and all children.
697
698         If value is False, set committed to be False for this and all parents.
699         """
700         if value == self._committed:
701             return
702         if value:
703             for k,v in listitems(self._items):
704                 v.set_committed(True)
705             self._committed = True
706         else:
707             self._committed = False
708             if self.parent is not None:
709                 self.parent.set_committed(False)
710
711     @synchronized
712     def __iter__(self):
713         """Iterate over names of files and collections contained in this collection."""
714         return iter(viewkeys(self._items))
715
716     @synchronized
717     def __getitem__(self, k):
718         """Get a file or collection that is directly contained by this collection.
719
720         If you want to search a path, use `find()` instead.
721
722         """
723         return self._items[k]
724
725     @synchronized
726     def __contains__(self, k):
727         """Test if there is a file or collection a directly contained by this collection."""
728         return k in self._items
729
730     @synchronized
731     def __len__(self):
732         """Get the number of items directly contained in this collection."""
733         return len(self._items)
734
735     @must_be_writable
736     @synchronized
737     def __delitem__(self, p):
738         """Delete an item by name which is directly contained by this collection."""
739         del self._items[p]
740         self.set_committed(False)
741         self.notify(DEL, self, p, None)
742
743     @synchronized
744     def keys(self):
745         """Get a list of names of files and collections directly contained in this collection."""
746         return self._items.keys()
747
748     @synchronized
749     def values(self):
750         """Get a list of files and collection objects directly contained in this collection."""
751         return listvalues(self._items)
752
753     @synchronized
754     def items(self):
755         """Get a list of (name, object) tuples directly contained in this collection."""
756         return listitems(self._items)
757
758     def exists(self, path):
759         """Test if there is a file or collection at `path`."""
760         return self.find(path) is not None
761
762     @must_be_writable
763     @synchronized
764     def remove(self, path, recursive=False):
765         """Remove the file or subcollection (directory) at `path`.
766
767         :recursive:
768           Specify whether to remove non-empty subcollections (True), or raise an error (False).
769         """
770
771         if not path:
772             raise errors.ArgumentError("Parameter 'path' is empty.")
773
774         pathcomponents = path.split("/", 1)
775         item = self._items.get(pathcomponents[0])
776         if item is None:
777             raise IOError(errno.ENOENT, "File not found", path)
778         if len(pathcomponents) == 1:
779             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
780                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
781             deleteditem = self._items[pathcomponents[0]]
782             del self._items[pathcomponents[0]]
783             self.set_committed(False)
784             self.notify(DEL, self, pathcomponents[0], deleteditem)
785         else:
786             item.remove(pathcomponents[1])
787
788     def _clonefrom(self, source):
789         for k,v in listitems(source):
790             self._items[k] = v.clone(self, k)
791
792     def clone(self):
793         raise NotImplementedError()
794
795     @must_be_writable
796     @synchronized
797     def add(self, source_obj, target_name, overwrite=False, reparent=False):
798         """Copy or move a file or subcollection to this collection.
799
800         :source_obj:
801           An ArvadosFile, or Subcollection object
802
803         :target_name:
804           Destination item name.  If the target name already exists and is a
805           file, this will raise an error unless you specify `overwrite=True`.
806
807         :overwrite:
808           Whether to overwrite target file if it already exists.
809
810         :reparent:
811           If True, source_obj will be moved from its parent collection to this collection.
812           If False, source_obj will be copied and the parent collection will be
813           unmodified.
814
815         """
816
817         if target_name in self and not overwrite:
818             raise IOError(errno.EEXIST, "File already exists", target_name)
819
820         modified_from = None
821         if target_name in self:
822             modified_from = self[target_name]
823
824         # Actually make the move or copy.
825         if reparent:
826             source_obj._reparent(self, target_name)
827             item = source_obj
828         else:
829             item = source_obj.clone(self, target_name)
830
831         self._items[target_name] = item
832         self.set_committed(False)
833
834         if modified_from:
835             self.notify(MOD, self, target_name, (modified_from, item))
836         else:
837             self.notify(ADD, self, target_name, item)
838
839     def _get_src_target(self, source, target_path, source_collection, create_dest):
840         if source_collection is None:
841             source_collection = self
842
843         # Find the object
844         if isinstance(source, basestring):
845             source_obj = source_collection.find(source)
846             if source_obj is None:
847                 raise IOError(errno.ENOENT, "File not found", source)
848             sourcecomponents = source.split("/")
849         else:
850             source_obj = source
851             sourcecomponents = None
852
853         # Find parent collection the target path
854         targetcomponents = target_path.split("/")
855
856         # Determine the name to use.
857         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
858
859         if not target_name:
860             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
861
862         if create_dest:
863             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
864         else:
865             if len(targetcomponents) > 1:
866                 target_dir = self.find("/".join(targetcomponents[0:-1]))
867             else:
868                 target_dir = self
869
870         if target_dir is None:
871             raise IOError(errno.ENOENT, "Target directory not found", target_name)
872
873         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
874             target_dir = target_dir[target_name]
875             target_name = sourcecomponents[-1]
876
877         return (source_obj, target_dir, target_name)
878
879     @must_be_writable
880     @synchronized
881     def copy(self, source, target_path, source_collection=None, overwrite=False):
882         """Copy a file or subcollection to a new path in this collection.
883
884         :source:
885           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
886
887         :target_path:
888           Destination file or path.  If the target path already exists and is a
889           subcollection, the item will be placed inside the subcollection.  If
890           the target path already exists and is a file, this will raise an error
891           unless you specify `overwrite=True`.
892
893         :source_collection:
894           Collection to copy `source_path` from (default `self`)
895
896         :overwrite:
897           Whether to overwrite target file if it already exists.
898         """
899
900         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
901         target_dir.add(source_obj, target_name, overwrite, False)
902
903     @must_be_writable
904     @synchronized
905     def rename(self, source, target_path, source_collection=None, overwrite=False):
906         """Move a file or subcollection from `source_collection` to a new path in this collection.
907
908         :source:
909           A string with a path to source file or subcollection.
910
911         :target_path:
912           Destination file or path.  If the target path already exists and is a
913           subcollection, the item will be placed inside the subcollection.  If
914           the target path already exists and is a file, this will raise an error
915           unless you specify `overwrite=True`.
916
917         :source_collection:
918           Collection to copy `source_path` from (default `self`)
919
920         :overwrite:
921           Whether to overwrite target file if it already exists.
922         """
923
924         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
925         if not source_obj.writable():
926             raise IOError(errno.EROFS, "Source collection is read only", source)
927         target_dir.add(source_obj, target_name, overwrite, True)
928
929     def portable_manifest_text(self, stream_name="."):
930         """Get the manifest text for this collection, sub collections and files.
931
932         This method does not flush outstanding blocks to Keep.  It will return
933         a normalized manifest with access tokens stripped.
934
935         :stream_name:
936           Name to use for this stream (directory)
937
938         """
939         return self._get_manifest_text(stream_name, True, True)
940
941     @synchronized
942     def manifest_text(self, stream_name=".", strip=False, normalize=False,
943                       only_committed=False):
944         """Get the manifest text for this collection, sub collections and files.
945
946         This method will flush outstanding blocks to Keep.  By default, it will
947         not normalize an unmodified manifest or strip access tokens.
948
949         :stream_name:
950           Name to use for this stream (directory)
951
952         :strip:
953           If True, remove signing tokens from block locators if present.
954           If False (default), block locators are left unchanged.
955
956         :normalize:
957           If True, always export the manifest text in normalized form
958           even if the Collection is not modified.  If False (default) and the collection
959           is not modified, return the original manifest text even if it is not
960           in normalized form.
961
962         :only_committed:
963           If True, don't commit pending blocks.
964
965         """
966
967         if not only_committed:
968             self._my_block_manager().commit_all()
969         return self._get_manifest_text(stream_name, strip, normalize,
970                                        only_committed=only_committed)
971
972     @synchronized
973     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
974         """Get the manifest text for this collection, sub collections and files.
975
976         :stream_name:
977           Name to use for this stream (directory)
978
979         :strip:
980           If True, remove signing tokens from block locators if present.
981           If False (default), block locators are left unchanged.
982
983         :normalize:
984           If True, always export the manifest text in normalized form
985           even if the Collection is not modified.  If False (default) and the collection
986           is not modified, return the original manifest text even if it is not
987           in normalized form.
988
989         :only_committed:
990           If True, only include blocks that were already committed to Keep.
991
992         """
993
994         if not self.committed() or self._manifest_text is None or normalize:
995             stream = {}
996             buf = []
997             sorted_keys = sorted(self.keys())
998             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
999                 # Create a stream per file `k`
1000                 arvfile = self[filename]
1001                 filestream = []
1002                 for segment in arvfile.segments():
1003                     loc = segment.locator
1004                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
1005                         if only_committed:
1006                             continue
1007                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1008                     if strip:
1009                         loc = KeepLocator(loc).stripped()
1010                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1011                                          segment.segment_offset, segment.range_size))
1012                 stream[filename] = filestream
1013             if stream:
1014                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1015             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1016                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1017             return "".join(buf)
1018         else:
1019             if strip:
1020                 return self.stripped_manifest()
1021             else:
1022                 return self._manifest_text
1023
1024     @synchronized
1025     def diff(self, end_collection, prefix=".", holding_collection=None):
1026         """Generate list of add/modify/delete actions.
1027
1028         When given to `apply`, will change `self` to match `end_collection`
1029
1030         """
1031         changes = []
1032         if holding_collection is None:
1033             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1034         for k in self:
1035             if k not in end_collection:
1036                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1037         for k in end_collection:
1038             if k in self:
1039                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1040                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1041                 elif end_collection[k] != self[k]:
1042                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1043                 else:
1044                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1045             else:
1046                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1047         return changes
1048
1049     @must_be_writable
1050     @synchronized
1051     def apply(self, changes):
1052         """Apply changes from `diff`.
1053
1054         If a change conflicts with a local change, it will be saved to an
1055         alternate path indicating the conflict.
1056
1057         """
1058         if changes:
1059             self.set_committed(False)
1060         for change in changes:
1061             event_type = change[0]
1062             path = change[1]
1063             initial = change[2]
1064             local = self.find(path)
1065             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1066                                                                     time.gmtime()))
1067             if event_type == ADD:
1068                 if local is None:
1069                     # No local file at path, safe to copy over new file
1070                     self.copy(initial, path)
1071                 elif local is not None and local != initial:
1072                     # There is already local file and it is different:
1073                     # save change to conflict file.
1074                     self.copy(initial, conflictpath)
1075             elif event_type == MOD or event_type == TOK:
1076                 final = change[3]
1077                 if local == initial:
1078                     # Local matches the "initial" item so it has not
1079                     # changed locally and is safe to update.
1080                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1081                         # Replace contents of local file with new contents
1082                         local.replace_contents(final)
1083                     else:
1084                         # Overwrite path with new item; this can happen if
1085                         # path was a file and is now a collection or vice versa
1086                         self.copy(final, path, overwrite=True)
1087                 else:
1088                     # Local is missing (presumably deleted) or local doesn't
1089                     # match the "start" value, so save change to conflict file
1090                     self.copy(final, conflictpath)
1091             elif event_type == DEL:
1092                 if local == initial:
1093                     # Local item matches "initial" value, so it is safe to remove.
1094                     self.remove(path, recursive=True)
1095                 # else, the file is modified or already removed, in either
1096                 # case we don't want to try to remove it.
1097
1098     def portable_data_hash(self):
1099         """Get the portable data hash for this collection's manifest."""
1100         if self._manifest_locator and self.committed():
1101             # If the collection is already saved on the API server, and it's committed
1102             # then return API server's PDH response.
1103             return self._portable_data_hash
1104         else:
1105             stripped = self.portable_manifest_text().encode()
1106             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1107
1108     @synchronized
1109     def subscribe(self, callback):
1110         if self._callback is None:
1111             self._callback = callback
1112         else:
1113             raise errors.ArgumentError("A callback is already set on this collection.")
1114
1115     @synchronized
1116     def unsubscribe(self):
1117         if self._callback is not None:
1118             self._callback = None
1119
1120     @synchronized
1121     def notify(self, event, collection, name, item):
1122         if self._callback:
1123             self._callback(event, collection, name, item)
1124         self.root_collection().notify(event, collection, name, item)
1125
1126     @synchronized
1127     def __eq__(self, other):
1128         if other is self:
1129             return True
1130         if not isinstance(other, RichCollectionBase):
1131             return False
1132         if len(self._items) != len(other):
1133             return False
1134         for k in self._items:
1135             if k not in other:
1136                 return False
1137             if self._items[k] != other[k]:
1138                 return False
1139         return True
1140
1141     def __ne__(self, other):
1142         return not self.__eq__(other)
1143
1144     @synchronized
1145     def flush(self):
1146         """Flush bufferblocks to Keep."""
1147         for e in listvalues(self):
1148             e.flush()
1149
1150
1151 class Collection(RichCollectionBase):
1152     """Represents the root of an Arvados Collection.
1153
1154     This class is threadsafe.  The root collection object, all subcollections
1155     and files are protected by a single lock (i.e. each access locks the entire
1156     collection).
1157
1158     Brief summary of
1159     useful methods:
1160
1161     :To read an existing file:
1162       `c.open("myfile", "r")`
1163
1164     :To write a new file:
1165       `c.open("myfile", "w")`
1166
1167     :To determine if a file exists:
1168       `c.find("myfile") is not None`
1169
1170     :To copy a file:
1171       `c.copy("source", "dest")`
1172
1173     :To delete a file:
1174       `c.remove("myfile")`
1175
1176     :To save to an existing collection record:
1177       `c.save()`
1178
1179     :To save a new collection record:
1180     `c.save_new()`
1181
1182     :To merge remote changes into this object:
1183       `c.update()`
1184
1185     Must be associated with an API server Collection record (during
1186     initialization, or using `save_new`) to use `save` or `update`
1187
1188     """
1189
1190     def __init__(self, manifest_locator_or_text=None,
1191                  api_client=None,
1192                  keep_client=None,
1193                  num_retries=None,
1194                  parent=None,
1195                  apiconfig=None,
1196                  block_manager=None,
1197                  replication_desired=None,
1198                  put_threads=None):
1199         """Collection constructor.
1200
1201         :manifest_locator_or_text:
1202           An Arvados collection UUID, portable data hash, raw manifest
1203           text, or (if creating an empty collection) None.
1204
1205         :parent:
1206           the parent Collection, may be None.
1207
1208         :apiconfig:
1209           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1210           Prefer this over supplying your own api_client and keep_client (except in testing).
1211           Will use default config settings if not specified.
1212
1213         :api_client:
1214           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1215
1216         :keep_client:
1217           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1218
1219         :num_retries:
1220           the number of retries for API and Keep requests.
1221
1222         :block_manager:
1223           the block manager to use.  If not specified, create one.
1224
1225         :replication_desired:
1226           How many copies should Arvados maintain. If None, API server default
1227           configuration applies. If not None, this value will also be used
1228           for determining the number of block copies being written.
1229
1230         """
1231         super(Collection, self).__init__(parent)
1232         self._api_client = api_client
1233         self._keep_client = keep_client
1234         self._block_manager = block_manager
1235         self.replication_desired = replication_desired
1236         self.put_threads = put_threads
1237
1238         if apiconfig:
1239             self._config = apiconfig
1240         else:
1241             self._config = config.settings()
1242
1243         self.num_retries = num_retries if num_retries is not None else 0
1244         self._manifest_locator = None
1245         self._manifest_text = None
1246         self._portable_data_hash = None
1247         self._api_response = None
1248         self._past_versions = set()
1249
1250         self.lock = threading.RLock()
1251         self.events = None
1252
1253         if manifest_locator_or_text:
1254             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1255                 self._manifest_locator = manifest_locator_or_text
1256             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1257                 self._manifest_locator = manifest_locator_or_text
1258             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1259                 self._manifest_text = manifest_locator_or_text
1260             else:
1261                 raise errors.ArgumentError(
1262                     "Argument to CollectionReader is not a manifest or a collection UUID")
1263
1264             try:
1265                 self._populate()
1266             except (IOError, errors.SyntaxError) as e:
1267                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1268
1269     def root_collection(self):
1270         return self
1271
1272     def stream_name(self):
1273         return "."
1274
1275     def writable(self):
1276         return True
1277
1278     @synchronized
1279     def known_past_version(self, modified_at_and_portable_data_hash):
1280         return modified_at_and_portable_data_hash in self._past_versions
1281
1282     @synchronized
1283     @retry_method
1284     def update(self, other=None, num_retries=None):
1285         """Merge the latest collection on the API server with the current collection."""
1286
1287         if other is None:
1288             if self._manifest_locator is None:
1289                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1290             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1291             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1292                 response.get("portable_data_hash") != self.portable_data_hash()):
1293                 # The record on the server is different from our current one, but we've seen it before,
1294                 # so ignore it because it's already been merged.
1295                 # However, if it's the same as our current record, proceed with the update, because we want to update
1296                 # our tokens.
1297                 return
1298             else:
1299                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1300             other = CollectionReader(response["manifest_text"])
1301         baseline = CollectionReader(self._manifest_text)
1302         self.apply(baseline.diff(other))
1303         self._manifest_text = self.manifest_text()
1304
1305     @synchronized
1306     def _my_api(self):
1307         if self._api_client is None:
1308             self._api_client = ThreadSafeApiCache(self._config)
1309             if self._keep_client is None:
1310                 self._keep_client = self._api_client.keep
1311         return self._api_client
1312
1313     @synchronized
1314     def _my_keep(self):
1315         if self._keep_client is None:
1316             if self._api_client is None:
1317                 self._my_api()
1318             else:
1319                 self._keep_client = KeepClient(api_client=self._api_client)
1320         return self._keep_client
1321
1322     @synchronized
1323     def _my_block_manager(self):
1324         if self._block_manager is None:
1325             copies = (self.replication_desired or
1326                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1327                                                    2))
1328             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1329         return self._block_manager
1330
1331     def _remember_api_response(self, response):
1332         self._api_response = response
1333         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1334
1335     def _populate_from_api_server(self):
1336         # As in KeepClient itself, we must wait until the last
1337         # possible moment to instantiate an API client, in order to
1338         # avoid tripping up clients that don't have access to an API
1339         # server.  If we do build one, make sure our Keep client uses
1340         # it.  If instantiation fails, we'll fall back to the except
1341         # clause, just like any other Collection lookup
1342         # failure. Return an exception, or None if successful.
1343         self._remember_api_response(self._my_api().collections().get(
1344             uuid=self._manifest_locator).execute(
1345                 num_retries=self.num_retries))
1346         self._manifest_text = self._api_response['manifest_text']
1347         self._portable_data_hash = self._api_response['portable_data_hash']
1348         # If not overriden via kwargs, we should try to load the
1349         # replication_desired from the API server
1350         if self.replication_desired is None:
1351             self.replication_desired = self._api_response.get('replication_desired', None)
1352
1353     def _populate(self):
1354         if self._manifest_text is None:
1355             if self._manifest_locator is None:
1356                 return
1357             else:
1358                 self._populate_from_api_server()
1359         self._baseline_manifest = self._manifest_text
1360         self._import_manifest(self._manifest_text)
1361
1362     def _has_collection_uuid(self):
1363         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1364
1365     def __enter__(self):
1366         return self
1367
1368     def __exit__(self, exc_type, exc_value, traceback):
1369         """Support scoped auto-commit in a with: block."""
1370         if exc_type is None:
1371             if self.writable() and self._has_collection_uuid():
1372                 self.save()
1373         self.stop_threads()
1374
1375     def stop_threads(self):
1376         if self._block_manager is not None:
1377             self._block_manager.stop_threads()
1378
1379     @synchronized
1380     def manifest_locator(self):
1381         """Get the manifest locator, if any.
1382
1383         The manifest locator will be set when the collection is loaded from an
1384         API server record or the portable data hash of a manifest.
1385
1386         The manifest locator will be None if the collection is newly created or
1387         was created directly from manifest text.  The method `save_new()` will
1388         assign a manifest locator.
1389
1390         """
1391         return self._manifest_locator
1392
1393     @synchronized
1394     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1395         if new_config is None:
1396             new_config = self._config
1397         if readonly:
1398             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1399         else:
1400             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1401
1402         newcollection._clonefrom(self)
1403         return newcollection
1404
1405     @synchronized
1406     def api_response(self):
1407         """Returns information about this Collection fetched from the API server.
1408
1409         If the Collection exists in Keep but not the API server, currently
1410         returns None.  Future versions may provide a synthetic response.
1411
1412         """
1413         return self._api_response
1414
1415     def find_or_create(self, path, create_type):
1416         """See `RichCollectionBase.find_or_create`"""
1417         if path == ".":
1418             return self
1419         else:
1420             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1421
1422     def find(self, path):
1423         """See `RichCollectionBase.find`"""
1424         if path == ".":
1425             return self
1426         else:
1427             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1428
1429     def remove(self, path, recursive=False):
1430         """See `RichCollectionBase.remove`"""
1431         if path == ".":
1432             raise errors.ArgumentError("Cannot remove '.'")
1433         else:
1434             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1435
1436     @must_be_writable
1437     @synchronized
1438     @retry_method
1439     def save(self, merge=True, num_retries=None):
1440         """Save collection to an existing collection record.
1441
1442         Commit pending buffer blocks to Keep, merge with remote record (if
1443         merge=True, the default), and update the collection record.  Returns
1444         the current manifest text.
1445
1446         Will raise AssertionError if not associated with a collection record on
1447         the API server.  If you want to save a manifest to Keep only, see
1448         `save_new()`.
1449
1450         :merge:
1451           Update and merge remote changes before saving.  Otherwise, any
1452           remote changes will be ignored and overwritten.
1453
1454         :num_retries:
1455           Retry count on API calls (if None,  use the collection default)
1456
1457         """
1458         if not self.committed():
1459             if not self._has_collection_uuid():
1460                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1461
1462             self._my_block_manager().commit_all()
1463
1464             if merge:
1465                 self.update()
1466
1467             text = self.manifest_text(strip=False)
1468             self._remember_api_response(self._my_api().collections().update(
1469                 uuid=self._manifest_locator,
1470                 body={'manifest_text': text}
1471                 ).execute(
1472                     num_retries=num_retries))
1473             self._manifest_text = self._api_response["manifest_text"]
1474             self._portable_data_hash = self._api_response["portable_data_hash"]
1475             self.set_committed(True)
1476
1477         return self._manifest_text
1478
1479
1480     @must_be_writable
1481     @synchronized
1482     @retry_method
1483     def save_new(self, name=None,
1484                  create_collection_record=True,
1485                  owner_uuid=None,
1486                  ensure_unique_name=False,
1487                  num_retries=None):
1488         """Save collection to a new collection record.
1489
1490         Commit pending buffer blocks to Keep and, when create_collection_record
1491         is True (default), create a new collection record.  After creating a
1492         new collection record, this Collection object will be associated with
1493         the new record used by `save()`.  Returns the current manifest text.
1494
1495         :name:
1496           The collection name.
1497
1498         :create_collection_record:
1499            If True, create a collection record on the API server.
1500            If False, only commit blocks to Keep and return the manifest text.
1501
1502         :owner_uuid:
1503           the user, or project uuid that will own this collection.
1504           If None, defaults to the current user.
1505
1506         :ensure_unique_name:
1507           If True, ask the API server to rename the collection
1508           if it conflicts with a collection with the same name and owner.  If
1509           False, a name conflict will result in an error.
1510
1511         :num_retries:
1512           Retry count on API calls (if None,  use the collection default)
1513
1514         """
1515         self._my_block_manager().commit_all()
1516         text = self.manifest_text(strip=False)
1517
1518         if create_collection_record:
1519             if name is None:
1520                 name = "New collection"
1521                 ensure_unique_name = True
1522
1523             body = {"manifest_text": text,
1524                     "name": name,
1525                     "replication_desired": self.replication_desired}
1526             if owner_uuid:
1527                 body["owner_uuid"] = owner_uuid
1528
1529             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1530             text = self._api_response["manifest_text"]
1531
1532             self._manifest_locator = self._api_response["uuid"]
1533             self._portable_data_hash = self._api_response["portable_data_hash"]
1534
1535             self._manifest_text = text
1536             self.set_committed(True)
1537
1538         return text
1539
1540     _token_re = re.compile(r'(\S+)(\s+|$)')
1541     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1542     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1543
1544     @synchronized
1545     def _import_manifest(self, manifest_text):
1546         """Import a manifest into a `Collection`.
1547
1548         :manifest_text:
1549           The manifest text to import from.
1550
1551         """
1552         if len(self) > 0:
1553             raise ArgumentError("Can only import manifest into an empty collection")
1554
1555         STREAM_NAME = 0
1556         BLOCKS = 1
1557         SEGMENTS = 2
1558
1559         stream_name = None
1560         state = STREAM_NAME
1561
1562         for token_and_separator in self._token_re.finditer(manifest_text):
1563             tok = token_and_separator.group(1)
1564             sep = token_and_separator.group(2)
1565
1566             if state == STREAM_NAME:
1567                 # starting a new stream
1568                 stream_name = tok.replace('\\040', ' ')
1569                 blocks = []
1570                 segments = []
1571                 streamoffset = 0
1572                 state = BLOCKS
1573                 self.find_or_create(stream_name, COLLECTION)
1574                 continue
1575
1576             if state == BLOCKS:
1577                 block_locator = self._block_re.match(tok)
1578                 if block_locator:
1579                     blocksize = int(block_locator.group(1))
1580                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1581                     streamoffset += blocksize
1582                 else:
1583                     state = SEGMENTS
1584
1585             if state == SEGMENTS:
1586                 file_segment = self._segment_re.match(tok)
1587                 if file_segment:
1588                     pos = int(file_segment.group(1))
1589                     size = int(file_segment.group(2))
1590                     name = file_segment.group(3).replace('\\040', ' ')
1591                     filepath = os.path.join(stream_name, name)
1592                     afile = self.find_or_create(filepath, FILE)
1593                     if isinstance(afile, ArvadosFile):
1594                         afile.add_segment(blocks, pos, size)
1595                     else:
1596                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1597                 else:
1598                     # error!
1599                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1600
1601             if sep == "\n":
1602                 stream_name = None
1603                 state = STREAM_NAME
1604
1605         self.set_committed(True)
1606
1607     @synchronized
1608     def notify(self, event, collection, name, item):
1609         if self._callback:
1610             self._callback(event, collection, name, item)
1611
1612
1613 class Subcollection(RichCollectionBase):
1614     """This is a subdirectory within a collection that doesn't have its own API
1615     server record.
1616
1617     Subcollection locking falls under the umbrella lock of its root collection.
1618
1619     """
1620
1621     def __init__(self, parent, name):
1622         super(Subcollection, self).__init__(parent)
1623         self.lock = self.root_collection().lock
1624         self._manifest_text = None
1625         self.name = name
1626         self.num_retries = parent.num_retries
1627
1628     def root_collection(self):
1629         return self.parent.root_collection()
1630
1631     def writable(self):
1632         return self.root_collection().writable()
1633
1634     def _my_api(self):
1635         return self.root_collection()._my_api()
1636
1637     def _my_keep(self):
1638         return self.root_collection()._my_keep()
1639
1640     def _my_block_manager(self):
1641         return self.root_collection()._my_block_manager()
1642
1643     def stream_name(self):
1644         return os.path.join(self.parent.stream_name(), self.name)
1645
1646     @synchronized
1647     def clone(self, new_parent, new_name):
1648         c = Subcollection(new_parent, new_name)
1649         c._clonefrom(self)
1650         return c
1651
1652     @must_be_writable
1653     @synchronized
1654     def _reparent(self, newparent, newname):
1655         self.set_committed(False)
1656         self.flush()
1657         self.parent.remove(self.name, recursive=True)
1658         self.parent = newparent
1659         self.name = newname
1660         self.lock = self.parent.root_collection().lock
1661
1662
1663 class CollectionReader(Collection):
1664     """A read-only collection object.
1665
1666     Initialize from a collection UUID or portable data hash, or raw
1667     manifest text.  See `Collection` constructor for detailed options.
1668
1669     """
1670     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1671         self._in_init = True
1672         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1673         self._in_init = False
1674
1675         # Forego any locking since it should never change once initialized.
1676         self.lock = NoopLock()
1677
1678         # Backwards compatability with old CollectionReader
1679         # all_streams() and all_files()
1680         self._streams = None
1681
1682     def writable(self):
1683         return self._in_init
1684
1685     def _populate_streams(orig_func):
1686         @functools.wraps(orig_func)
1687         def populate_streams_wrapper(self, *args, **kwargs):
1688             # Defer populating self._streams until needed since it creates a copy of the manifest.
1689             if self._streams is None:
1690                 if self._manifest_text:
1691                     self._streams = [sline.split()
1692                                      for sline in self._manifest_text.split("\n")
1693                                      if sline]
1694                 else:
1695                     self._streams = []
1696             return orig_func(self, *args, **kwargs)
1697         return populate_streams_wrapper
1698
1699     @_populate_streams
1700     def normalize(self):
1701         """Normalize the streams returned by `all_streams`.
1702
1703         This method is kept for backwards compatability and only affects the
1704         behavior of `all_streams()` and `all_files()`
1705
1706         """
1707
1708         # Rearrange streams
1709         streams = {}
1710         for s in self.all_streams():
1711             for f in s.all_files():
1712                 streamname, filename = split(s.name() + "/" + f.name())
1713                 if streamname not in streams:
1714                     streams[streamname] = {}
1715                 if filename not in streams[streamname]:
1716                     streams[streamname][filename] = []
1717                 for r in f.segments:
1718                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1719
1720         self._streams = [normalize_stream(s, streams[s])
1721                          for s in sorted(streams)]
1722     @_populate_streams
1723     def all_streams(self):
1724         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1725                 for s in self._streams]
1726
1727     @_populate_streams
1728     def all_files(self):
1729         for s in self.all_streams():
1730             for f in s.all_files():
1731                 yield f