Merge branch '12123-arv-ws-keyerror'
[arvados.git] / sdk / python / arvados / collection.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
10 import functools
11 import logging
12 import os
13 import re
14 import errno
15 import hashlib
16 import time
17 import threading
18
19 from collections import deque
20 from stat import *
21
22 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
23 from .keep import KeepLocator, KeepClient
24 from .stream import StreamReader
25 from ._normalize_stream import normalize_stream
26 from ._ranges import Range, LocatorAndRange
27 from .safeapi import ThreadSafeApiCache
28 import arvados.config as config
29 import arvados.errors as errors
30 import arvados.util
31 import arvados.events as events
32 from arvados.retry import retry_method
33
34 _logger = logging.getLogger('arvados.collection')
35
36 class CollectionBase(object):
37     def __enter__(self):
38         return self
39
40     def __exit__(self, exc_type, exc_value, traceback):
41         pass
42
43     def _my_keep(self):
44         if self._keep_client is None:
45             self._keep_client = KeepClient(api_client=self._api_client,
46                                            num_retries=self.num_retries)
47         return self._keep_client
48
49     def stripped_manifest(self):
50         """Get the manifest with locator hints stripped.
51
52         Return the manifest for the current collection with all
53         non-portable hints (i.e., permission signatures and other
54         hints other than size hints) removed from the locators.
55         """
56         raw = self.manifest_text()
57         clean = []
58         for line in raw.split("\n"):
59             fields = line.split()
60             if fields:
61                 clean_fields = fields[:1] + [
62                     (re.sub(r'\+[^\d][^\+]*', '', x)
63                      if re.match(arvados.util.keep_locator_pattern, x)
64                      else x)
65                     for x in fields[1:]]
66                 clean += [' '.join(clean_fields), "\n"]
67         return ''.join(clean)
68
69
70 class _WriterFile(_FileLikeObjectBase):
71     def __init__(self, coll_writer, name):
72         super(_WriterFile, self).__init__(name, 'wb')
73         self.dest = coll_writer
74
75     def close(self):
76         super(_WriterFile, self).close()
77         self.dest.finish_current_file()
78
79     @_FileLikeObjectBase._before_close
80     def write(self, data):
81         self.dest.write(data)
82
83     @_FileLikeObjectBase._before_close
84     def writelines(self, seq):
85         for data in seq:
86             self.write(data)
87
88     @_FileLikeObjectBase._before_close
89     def flush(self):
90         self.dest.flush_data()
91
92
93 class CollectionWriter(CollectionBase):
94     def __init__(self, api_client=None, num_retries=0, replication=None):
95         """Instantiate a CollectionWriter.
96
97         CollectionWriter lets you build a new Arvados Collection from scratch.
98         Write files to it.  The CollectionWriter will upload data to Keep as
99         appropriate, and provide you with the Collection manifest text when
100         you're finished.
101
102         Arguments:
103         * api_client: The API client to use to look up Collections.  If not
104           provided, CollectionReader will build one from available Arvados
105           configuration.
106         * num_retries: The default number of times to retry failed
107           service requests.  Default 0.  You may change this value
108           after instantiation, but note those changes may not
109           propagate to related objects like the Keep client.
110         * replication: The number of copies of each block to store.
111           If this argument is None or not supplied, replication is
112           the server-provided default if available, otherwise 2.
113         """
114         self._api_client = api_client
115         self.num_retries = num_retries
116         self.replication = (2 if replication is None else replication)
117         self._keep_client = None
118         self._data_buffer = []
119         self._data_buffer_len = 0
120         self._current_stream_files = []
121         self._current_stream_length = 0
122         self._current_stream_locators = []
123         self._current_stream_name = '.'
124         self._current_file_name = None
125         self._current_file_pos = 0
126         self._finished_streams = []
127         self._close_file = None
128         self._queued_file = None
129         self._queued_dirents = deque()
130         self._queued_trees = deque()
131         self._last_open = None
132
133     def __exit__(self, exc_type, exc_value, traceback):
134         if exc_type is None:
135             self.finish()
136
137     def do_queued_work(self):
138         # The work queue consists of three pieces:
139         # * _queued_file: The file object we're currently writing to the
140         #   Collection.
141         # * _queued_dirents: Entries under the current directory
142         #   (_queued_trees[0]) that we want to write or recurse through.
143         #   This may contain files from subdirectories if
144         #   max_manifest_depth == 0 for this directory.
145         # * _queued_trees: Directories that should be written as separate
146         #   streams to the Collection.
147         # This function handles the smallest piece of work currently queued
148         # (current file, then current directory, then next directory) until
149         # no work remains.  The _work_THING methods each do a unit of work on
150         # THING.  _queue_THING methods add a THING to the work queue.
151         while True:
152             if self._queued_file:
153                 self._work_file()
154             elif self._queued_dirents:
155                 self._work_dirents()
156             elif self._queued_trees:
157                 self._work_trees()
158             else:
159                 break
160
161     def _work_file(self):
162         while True:
163             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
164             if not buf:
165                 break
166             self.write(buf)
167         self.finish_current_file()
168         if self._close_file:
169             self._queued_file.close()
170         self._close_file = None
171         self._queued_file = None
172
173     def _work_dirents(self):
174         path, stream_name, max_manifest_depth = self._queued_trees[0]
175         if stream_name != self.current_stream_name():
176             self.start_new_stream(stream_name)
177         while self._queued_dirents:
178             dirent = self._queued_dirents.popleft()
179             target = os.path.join(path, dirent)
180             if os.path.isdir(target):
181                 self._queue_tree(target,
182                                  os.path.join(stream_name, dirent),
183                                  max_manifest_depth - 1)
184             else:
185                 self._queue_file(target, dirent)
186                 break
187         if not self._queued_dirents:
188             self._queued_trees.popleft()
189
190     def _work_trees(self):
191         path, stream_name, max_manifest_depth = self._queued_trees[0]
192         d = arvados.util.listdir_recursive(
193             path, max_depth = (None if max_manifest_depth == 0 else 0))
194         if d:
195             self._queue_dirents(stream_name, d)
196         else:
197             self._queued_trees.popleft()
198
199     def _queue_file(self, source, filename=None):
200         assert (self._queued_file is None), "tried to queue more than one file"
201         if not hasattr(source, 'read'):
202             source = open(source, 'rb')
203             self._close_file = True
204         else:
205             self._close_file = False
206         if filename is None:
207             filename = os.path.basename(source.name)
208         self.start_new_file(filename)
209         self._queued_file = source
210
211     def _queue_dirents(self, stream_name, dirents):
212         assert (not self._queued_dirents), "tried to queue more than one tree"
213         self._queued_dirents = deque(sorted(dirents))
214
215     def _queue_tree(self, path, stream_name, max_manifest_depth):
216         self._queued_trees.append((path, stream_name, max_manifest_depth))
217
218     def write_file(self, source, filename=None):
219         self._queue_file(source, filename)
220         self.do_queued_work()
221
222     def write_directory_tree(self,
223                              path, stream_name='.', max_manifest_depth=-1):
224         self._queue_tree(path, stream_name, max_manifest_depth)
225         self.do_queued_work()
226
227     def write(self, newdata):
228         if isinstance(newdata, bytes):
229             pass
230         elif isinstance(newdata, str):
231             newdata = newdata.encode()
232         elif hasattr(newdata, '__iter__'):
233             for s in newdata:
234                 self.write(s)
235             return
236         self._data_buffer.append(newdata)
237         self._data_buffer_len += len(newdata)
238         self._current_stream_length += len(newdata)
239         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
240             self.flush_data()
241
242     def open(self, streampath, filename=None):
243         """open(streampath[, filename]) -> file-like object
244
245         Pass in the path of a file to write to the Collection, either as a
246         single string or as two separate stream name and file name arguments.
247         This method returns a file-like object you can write to add it to the
248         Collection.
249
250         You may only have one file object from the Collection open at a time,
251         so be sure to close the object when you're done.  Using the object in
252         a with statement makes that easy::
253
254           with cwriter.open('./doc/page1.txt') as outfile:
255               outfile.write(page1_data)
256           with cwriter.open('./doc/page2.txt') as outfile:
257               outfile.write(page2_data)
258         """
259         if filename is None:
260             streampath, filename = split(streampath)
261         if self._last_open and not self._last_open.closed:
262             raise errors.AssertionError(
263                 "can't open '{}' when '{}' is still open".format(
264                     filename, self._last_open.name))
265         if streampath != self.current_stream_name():
266             self.start_new_stream(streampath)
267         self.set_current_file_name(filename)
268         self._last_open = _WriterFile(self, filename)
269         return self._last_open
270
271     def flush_data(self):
272         data_buffer = b''.join(self._data_buffer)
273         if data_buffer:
274             self._current_stream_locators.append(
275                 self._my_keep().put(
276                     data_buffer[0:config.KEEP_BLOCK_SIZE],
277                     copies=self.replication))
278             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
279             self._data_buffer_len = len(self._data_buffer[0])
280
281     def start_new_file(self, newfilename=None):
282         self.finish_current_file()
283         self.set_current_file_name(newfilename)
284
285     def set_current_file_name(self, newfilename):
286         if re.search(r'[\t\n]', newfilename):
287             raise errors.AssertionError(
288                 "Manifest filenames cannot contain whitespace: %s" %
289                 newfilename)
290         elif re.search(r'\x00', newfilename):
291             raise errors.AssertionError(
292                 "Manifest filenames cannot contain NUL characters: %s" %
293                 newfilename)
294         self._current_file_name = newfilename
295
296     def current_file_name(self):
297         return self._current_file_name
298
299     def finish_current_file(self):
300         if self._current_file_name is None:
301             if self._current_file_pos == self._current_stream_length:
302                 return
303             raise errors.AssertionError(
304                 "Cannot finish an unnamed file " +
305                 "(%d bytes at offset %d in '%s' stream)" %
306                 (self._current_stream_length - self._current_file_pos,
307                  self._current_file_pos,
308                  self._current_stream_name))
309         self._current_stream_files.append([
310                 self._current_file_pos,
311                 self._current_stream_length - self._current_file_pos,
312                 self._current_file_name])
313         self._current_file_pos = self._current_stream_length
314         self._current_file_name = None
315
316     def start_new_stream(self, newstreamname='.'):
317         self.finish_current_stream()
318         self.set_current_stream_name(newstreamname)
319
320     def set_current_stream_name(self, newstreamname):
321         if re.search(r'[\t\n]', newstreamname):
322             raise errors.AssertionError(
323                 "Manifest stream names cannot contain whitespace: '%s'" %
324                 (newstreamname))
325         self._current_stream_name = '.' if newstreamname=='' else newstreamname
326
327     def current_stream_name(self):
328         return self._current_stream_name
329
330     def finish_current_stream(self):
331         self.finish_current_file()
332         self.flush_data()
333         if not self._current_stream_files:
334             pass
335         elif self._current_stream_name is None:
336             raise errors.AssertionError(
337                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
338                 (self._current_stream_length, len(self._current_stream_files)))
339         else:
340             if not self._current_stream_locators:
341                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
342             self._finished_streams.append([self._current_stream_name,
343                                            self._current_stream_locators,
344                                            self._current_stream_files])
345         self._current_stream_files = []
346         self._current_stream_length = 0
347         self._current_stream_locators = []
348         self._current_stream_name = None
349         self._current_file_pos = 0
350         self._current_file_name = None
351
352     def finish(self):
353         """Store the manifest in Keep and return its locator.
354
355         This is useful for storing manifest fragments (task outputs)
356         temporarily in Keep during a Crunch job.
357
358         In other cases you should make a collection instead, by
359         sending manifest_text() to the API server's "create
360         collection" endpoint.
361         """
362         return self._my_keep().put(self.manifest_text().encode(),
363                                    copies=self.replication)
364
365     def portable_data_hash(self):
366         stripped = self.stripped_manifest().encode()
367         return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
368
369     def manifest_text(self):
370         self.finish_current_stream()
371         manifest = ''
372
373         for stream in self._finished_streams:
374             if not re.search(r'^\.(/.*)?$', stream[0]):
375                 manifest += './'
376             manifest += stream[0].replace(' ', '\\040')
377             manifest += ' ' + ' '.join(stream[1])
378             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
379             manifest += "\n"
380
381         return manifest
382
383     def data_locators(self):
384         ret = []
385         for name, locators, files in self._finished_streams:
386             ret += locators
387         return ret
388
389
390 class ResumableCollectionWriter(CollectionWriter):
391     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
392                    '_current_stream_locators', '_current_stream_name',
393                    '_current_file_name', '_current_file_pos', '_close_file',
394                    '_data_buffer', '_dependencies', '_finished_streams',
395                    '_queued_dirents', '_queued_trees']
396
397     def __init__(self, api_client=None, **kwargs):
398         self._dependencies = {}
399         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
400
401     @classmethod
402     def from_state(cls, state, *init_args, **init_kwargs):
403         # Try to build a new writer from scratch with the given state.
404         # If the state is not suitable to resume (because files have changed,
405         # been deleted, aren't predictable, etc.), raise a
406         # StaleWriterStateError.  Otherwise, return the initialized writer.
407         # The caller is responsible for calling writer.do_queued_work()
408         # appropriately after it's returned.
409         writer = cls(*init_args, **init_kwargs)
410         for attr_name in cls.STATE_PROPS:
411             attr_value = state[attr_name]
412             attr_class = getattr(writer, attr_name).__class__
413             # Coerce the value into the same type as the initial value, if
414             # needed.
415             if attr_class not in (type(None), attr_value.__class__):
416                 attr_value = attr_class(attr_value)
417             setattr(writer, attr_name, attr_value)
418         # Check dependencies before we try to resume anything.
419         if any(KeepLocator(ls).permission_expired()
420                for ls in writer._current_stream_locators):
421             raise errors.StaleWriterStateError(
422                 "locators include expired permission hint")
423         writer.check_dependencies()
424         if state['_current_file'] is not None:
425             path, pos = state['_current_file']
426             try:
427                 writer._queued_file = open(path, 'rb')
428                 writer._queued_file.seek(pos)
429             except IOError as error:
430                 raise errors.StaleWriterStateError(
431                     "failed to reopen active file {}: {}".format(path, error))
432         return writer
433
434     def check_dependencies(self):
435         for path, orig_stat in listitems(self._dependencies):
436             if not S_ISREG(orig_stat[ST_MODE]):
437                 raise errors.StaleWriterStateError("{} not file".format(path))
438             try:
439                 now_stat = tuple(os.stat(path))
440             except OSError as error:
441                 raise errors.StaleWriterStateError(
442                     "failed to stat {}: {}".format(path, error))
443             if ((not S_ISREG(now_stat[ST_MODE])) or
444                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
445                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
446                 raise errors.StaleWriterStateError("{} changed".format(path))
447
448     def dump_state(self, copy_func=lambda x: x):
449         state = {attr: copy_func(getattr(self, attr))
450                  for attr in self.STATE_PROPS}
451         if self._queued_file is None:
452             state['_current_file'] = None
453         else:
454             state['_current_file'] = (os.path.realpath(self._queued_file.name),
455                                       self._queued_file.tell())
456         return state
457
458     def _queue_file(self, source, filename=None):
459         try:
460             src_path = os.path.realpath(source)
461         except Exception:
462             raise errors.AssertionError("{} not a file path".format(source))
463         try:
464             path_stat = os.stat(src_path)
465         except OSError as stat_error:
466             path_stat = None
467         super(ResumableCollectionWriter, self)._queue_file(source, filename)
468         fd_stat = os.fstat(self._queued_file.fileno())
469         if not S_ISREG(fd_stat.st_mode):
470             # We won't be able to resume from this cache anyway, so don't
471             # worry about further checks.
472             self._dependencies[source] = tuple(fd_stat)
473         elif path_stat is None:
474             raise errors.AssertionError(
475                 "could not stat {}: {}".format(source, stat_error))
476         elif path_stat.st_ino != fd_stat.st_ino:
477             raise errors.AssertionError(
478                 "{} changed between open and stat calls".format(source))
479         else:
480             self._dependencies[src_path] = tuple(fd_stat)
481
482     def write(self, data):
483         if self._queued_file is None:
484             raise errors.AssertionError(
485                 "resumable writer can't accept unsourced data")
486         return super(ResumableCollectionWriter, self).write(data)
487
488
489 ADD = "add"
490 DEL = "del"
491 MOD = "mod"
492 TOK = "tok"
493 FILE = "file"
494 COLLECTION = "collection"
495
496 class RichCollectionBase(CollectionBase):
497     """Base class for Collections and Subcollections.
498
499     Implements the majority of functionality relating to accessing items in the
500     Collection.
501
502     """
503
504     def __init__(self, parent=None):
505         self.parent = parent
506         self._committed = False
507         self._callback = None
508         self._items = {}
509
510     def _my_api(self):
511         raise NotImplementedError()
512
513     def _my_keep(self):
514         raise NotImplementedError()
515
516     def _my_block_manager(self):
517         raise NotImplementedError()
518
519     def writable(self):
520         raise NotImplementedError()
521
522     def root_collection(self):
523         raise NotImplementedError()
524
525     def notify(self, event, collection, name, item):
526         raise NotImplementedError()
527
528     def stream_name(self):
529         raise NotImplementedError()
530
531     @must_be_writable
532     @synchronized
533     def find_or_create(self, path, create_type):
534         """Recursively search the specified file path.
535
536         May return either a `Collection` or `ArvadosFile`.  If not found, will
537         create a new item at the specified path based on `create_type`.  Will
538         create intermediate subcollections needed to contain the final item in
539         the path.
540
541         :create_type:
542           One of `arvados.collection.FILE` or
543           `arvados.collection.COLLECTION`.  If the path is not found, and value
544           of create_type is FILE then create and return a new ArvadosFile for
545           the last path component.  If COLLECTION, then create and return a new
546           Collection for the last path component.
547
548         """
549
550         pathcomponents = path.split("/", 1)
551         if pathcomponents[0]:
552             item = self._items.get(pathcomponents[0])
553             if len(pathcomponents) == 1:
554                 if item is None:
555                     # create new file
556                     if create_type == COLLECTION:
557                         item = Subcollection(self, pathcomponents[0])
558                     else:
559                         item = ArvadosFile(self, pathcomponents[0])
560                     self._items[pathcomponents[0]] = item
561                     self.set_committed(False)
562                     self.notify(ADD, self, pathcomponents[0], item)
563                 return item
564             else:
565                 if item is None:
566                     # create new collection
567                     item = Subcollection(self, pathcomponents[0])
568                     self._items[pathcomponents[0]] = item
569                     self.set_committed(False)
570                     self.notify(ADD, self, pathcomponents[0], item)
571                 if isinstance(item, RichCollectionBase):
572                     return item.find_or_create(pathcomponents[1], create_type)
573                 else:
574                     raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
575         else:
576             return self
577
578     @synchronized
579     def find(self, path):
580         """Recursively search the specified file path.
581
582         May return either a Collection or ArvadosFile. Return None if not
583         found.
584         If path is invalid (ex: starts with '/'), an IOError exception will be
585         raised.
586
587         """
588         if not path:
589             raise errors.ArgumentError("Parameter 'path' is empty.")
590
591         pathcomponents = path.split("/", 1)
592         if pathcomponents[0] == '':
593             raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
594
595         item = self._items.get(pathcomponents[0])
596         if item is None:
597             return None
598         elif len(pathcomponents) == 1:
599             return item
600         else:
601             if isinstance(item, RichCollectionBase):
602                 if pathcomponents[1]:
603                     return item.find(pathcomponents[1])
604                 else:
605                     return item
606             else:
607                 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
608
609     @synchronized
610     def mkdirs(self, path):
611         """Recursive subcollection create.
612
613         Like `os.makedirs()`.  Will create intermediate subcollections needed
614         to contain the leaf subcollection path.
615
616         """
617
618         if self.find(path) != None:
619             raise IOError(errno.EEXIST, "Directory or file exists", path)
620
621         return self.find_or_create(path, COLLECTION)
622
623     def open(self, path, mode="r"):
624         """Open a file-like object for access.
625
626         :path:
627           path to a file in the collection
628         :mode:
629           a string consisting of "r", "w", or "a", optionally followed
630           by "b" or "t", optionally followed by "+".
631           :"b":
632             binary mode: write() accepts bytes, read() returns bytes.
633           :"t":
634             text mode (default): write() accepts strings, read() returns strings.
635           :"r":
636             opens for reading
637           :"r+":
638             opens for reading and writing.  Reads/writes share a file pointer.
639           :"w", "w+":
640             truncates to 0 and opens for reading and writing.  Reads/writes share a file pointer.
641           :"a", "a+":
642             opens for reading and writing.  All writes are appended to
643             the end of the file.  Writing does not affect the file pointer for
644             reading.
645         """
646
647         if not re.search(r'^[rwa][bt]?\+?$', mode):
648             raise errors.ArgumentError("Invalid mode {!r}".format(mode))
649
650         if mode[0] == 'r' and '+' not in mode:
651             fclass = ArvadosFileReader
652             arvfile = self.find(path)
653         elif not self.writable():
654             raise IOError(errno.EROFS, "Collection is read only")
655         else:
656             fclass = ArvadosFileWriter
657             arvfile = self.find_or_create(path, FILE)
658
659         if arvfile is None:
660             raise IOError(errno.ENOENT, "File not found", path)
661         if not isinstance(arvfile, ArvadosFile):
662             raise IOError(errno.EISDIR, "Is a directory", path)
663
664         if mode[0] == 'w':
665             arvfile.truncate(0)
666
667         return fclass(arvfile, mode=mode, num_retries=self.num_retries)
668
669     def modified(self):
670         """Determine if the collection has been modified since last commited."""
671         return not self.committed()
672
673     @synchronized
674     def committed(self):
675         """Determine if the collection has been committed to the API server."""
676         return self._committed
677
678     @synchronized
679     def set_committed(self, value=True):
680         """Recursively set committed flag.
681
682         If value is True, set committed to be True for this and all children.
683
684         If value is False, set committed to be False for this and all parents.
685         """
686         if value == self._committed:
687             return
688         if value:
689             for k,v in listitems(self._items):
690                 v.set_committed(True)
691             self._committed = True
692         else:
693             self._committed = False
694             if self.parent is not None:
695                 self.parent.set_committed(False)
696
697     @synchronized
698     def __iter__(self):
699         """Iterate over names of files and collections contained in this collection."""
700         return iter(viewkeys(self._items))
701
702     @synchronized
703     def __getitem__(self, k):
704         """Get a file or collection that is directly contained by this collection.
705
706         If you want to search a path, use `find()` instead.
707
708         """
709         return self._items[k]
710
711     @synchronized
712     def __contains__(self, k):
713         """Test if there is a file or collection a directly contained by this collection."""
714         return k in self._items
715
716     @synchronized
717     def __len__(self):
718         """Get the number of items directly contained in this collection."""
719         return len(self._items)
720
721     @must_be_writable
722     @synchronized
723     def __delitem__(self, p):
724         """Delete an item by name which is directly contained by this collection."""
725         del self._items[p]
726         self.set_committed(False)
727         self.notify(DEL, self, p, None)
728
729     @synchronized
730     def keys(self):
731         """Get a list of names of files and collections directly contained in this collection."""
732         return self._items.keys()
733
734     @synchronized
735     def values(self):
736         """Get a list of files and collection objects directly contained in this collection."""
737         return listvalues(self._items)
738
739     @synchronized
740     def items(self):
741         """Get a list of (name, object) tuples directly contained in this collection."""
742         return listitems(self._items)
743
744     def exists(self, path):
745         """Test if there is a file or collection at `path`."""
746         return self.find(path) is not None
747
748     @must_be_writable
749     @synchronized
750     def remove(self, path, recursive=False):
751         """Remove the file or subcollection (directory) at `path`.
752
753         :recursive:
754           Specify whether to remove non-empty subcollections (True), or raise an error (False).
755         """
756
757         if not path:
758             raise errors.ArgumentError("Parameter 'path' is empty.")
759
760         pathcomponents = path.split("/", 1)
761         item = self._items.get(pathcomponents[0])
762         if item is None:
763             raise IOError(errno.ENOENT, "File not found", path)
764         if len(pathcomponents) == 1:
765             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
766                 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
767             deleteditem = self._items[pathcomponents[0]]
768             del self._items[pathcomponents[0]]
769             self.set_committed(False)
770             self.notify(DEL, self, pathcomponents[0], deleteditem)
771         else:
772             item.remove(pathcomponents[1])
773
774     def _clonefrom(self, source):
775         for k,v in listitems(source):
776             self._items[k] = v.clone(self, k)
777
778     def clone(self):
779         raise NotImplementedError()
780
781     @must_be_writable
782     @synchronized
783     def add(self, source_obj, target_name, overwrite=False, reparent=False):
784         """Copy or move a file or subcollection to this collection.
785
786         :source_obj:
787           An ArvadosFile, or Subcollection object
788
789         :target_name:
790           Destination item name.  If the target name already exists and is a
791           file, this will raise an error unless you specify `overwrite=True`.
792
793         :overwrite:
794           Whether to overwrite target file if it already exists.
795
796         :reparent:
797           If True, source_obj will be moved from its parent collection to this collection.
798           If False, source_obj will be copied and the parent collection will be
799           unmodified.
800
801         """
802
803         if target_name in self and not overwrite:
804             raise IOError(errno.EEXIST, "File already exists", target_name)
805
806         modified_from = None
807         if target_name in self:
808             modified_from = self[target_name]
809
810         # Actually make the move or copy.
811         if reparent:
812             source_obj._reparent(self, target_name)
813             item = source_obj
814         else:
815             item = source_obj.clone(self, target_name)
816
817         self._items[target_name] = item
818         self.set_committed(False)
819
820         if modified_from:
821             self.notify(MOD, self, target_name, (modified_from, item))
822         else:
823             self.notify(ADD, self, target_name, item)
824
825     def _get_src_target(self, source, target_path, source_collection, create_dest):
826         if source_collection is None:
827             source_collection = self
828
829         # Find the object
830         if isinstance(source, basestring):
831             source_obj = source_collection.find(source)
832             if source_obj is None:
833                 raise IOError(errno.ENOENT, "File not found", source)
834             sourcecomponents = source.split("/")
835         else:
836             source_obj = source
837             sourcecomponents = None
838
839         # Find parent collection the target path
840         targetcomponents = target_path.split("/")
841
842         # Determine the name to use.
843         target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
844
845         if not target_name:
846             raise errors.ArgumentError("Target path is empty and source is an object.  Cannot determine destination filename to use.")
847
848         if create_dest:
849             target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
850         else:
851             if len(targetcomponents) > 1:
852                 target_dir = self.find("/".join(targetcomponents[0:-1]))
853             else:
854                 target_dir = self
855
856         if target_dir is None:
857             raise IOError(errno.ENOENT, "Target directory not found", target_name)
858
859         if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
860             target_dir = target_dir[target_name]
861             target_name = sourcecomponents[-1]
862
863         return (source_obj, target_dir, target_name)
864
865     @must_be_writable
866     @synchronized
867     def copy(self, source, target_path, source_collection=None, overwrite=False):
868         """Copy a file or subcollection to a new path in this collection.
869
870         :source:
871           A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
872
873         :target_path:
874           Destination file or path.  If the target path already exists and is a
875           subcollection, the item will be placed inside the subcollection.  If
876           the target path already exists and is a file, this will raise an error
877           unless you specify `overwrite=True`.
878
879         :source_collection:
880           Collection to copy `source_path` from (default `self`)
881
882         :overwrite:
883           Whether to overwrite target file if it already exists.
884         """
885
886         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
887         target_dir.add(source_obj, target_name, overwrite, False)
888
889     @must_be_writable
890     @synchronized
891     def rename(self, source, target_path, source_collection=None, overwrite=False):
892         """Move a file or subcollection from `source_collection` to a new path in this collection.
893
894         :source:
895           A string with a path to source file or subcollection.
896
897         :target_path:
898           Destination file or path.  If the target path already exists and is a
899           subcollection, the item will be placed inside the subcollection.  If
900           the target path already exists and is a file, this will raise an error
901           unless you specify `overwrite=True`.
902
903         :source_collection:
904           Collection to copy `source_path` from (default `self`)
905
906         :overwrite:
907           Whether to overwrite target file if it already exists.
908         """
909
910         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
911         if not source_obj.writable():
912             raise IOError(errno.EROFS, "Source collection is read only", source)
913         target_dir.add(source_obj, target_name, overwrite, True)
914
915     def portable_manifest_text(self, stream_name="."):
916         """Get the manifest text for this collection, sub collections and files.
917
918         This method does not flush outstanding blocks to Keep.  It will return
919         a normalized manifest with access tokens stripped.
920
921         :stream_name:
922           Name to use for this stream (directory)
923
924         """
925         return self._get_manifest_text(stream_name, True, True)
926
927     @synchronized
928     def manifest_text(self, stream_name=".", strip=False, normalize=False,
929                       only_committed=False):
930         """Get the manifest text for this collection, sub collections and files.
931
932         This method will flush outstanding blocks to Keep.  By default, it will
933         not normalize an unmodified manifest or strip access tokens.
934
935         :stream_name:
936           Name to use for this stream (directory)
937
938         :strip:
939           If True, remove signing tokens from block locators if present.
940           If False (default), block locators are left unchanged.
941
942         :normalize:
943           If True, always export the manifest text in normalized form
944           even if the Collection is not modified.  If False (default) and the collection
945           is not modified, return the original manifest text even if it is not
946           in normalized form.
947
948         :only_committed:
949           If True, don't commit pending blocks.
950
951         """
952
953         if not only_committed:
954             self._my_block_manager().commit_all()
955         return self._get_manifest_text(stream_name, strip, normalize,
956                                        only_committed=only_committed)
957
958     @synchronized
959     def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
960         """Get the manifest text for this collection, sub collections and files.
961
962         :stream_name:
963           Name to use for this stream (directory)
964
965         :strip:
966           If True, remove signing tokens from block locators if present.
967           If False (default), block locators are left unchanged.
968
969         :normalize:
970           If True, always export the manifest text in normalized form
971           even if the Collection is not modified.  If False (default) and the collection
972           is not modified, return the original manifest text even if it is not
973           in normalized form.
974
975         :only_committed:
976           If True, only include blocks that were already committed to Keep.
977
978         """
979
980         if not self.committed() or self._manifest_text is None or normalize:
981             stream = {}
982             buf = []
983             sorted_keys = sorted(self.keys())
984             for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
985                 # Create a stream per file `k`
986                 arvfile = self[filename]
987                 filestream = []
988                 for segment in arvfile.segments():
989                     loc = segment.locator
990                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
991                         if only_committed:
992                             continue
993                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
994                     if strip:
995                         loc = KeepLocator(loc).stripped()
996                     filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
997                                          segment.segment_offset, segment.range_size))
998                 stream[filename] = filestream
999             if stream:
1000                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1001             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1002                 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1003             return "".join(buf)
1004         else:
1005             if strip:
1006                 return self.stripped_manifest()
1007             else:
1008                 return self._manifest_text
1009
1010     @synchronized
1011     def diff(self, end_collection, prefix=".", holding_collection=None):
1012         """Generate list of add/modify/delete actions.
1013
1014         When given to `apply`, will change `self` to match `end_collection`
1015
1016         """
1017         changes = []
1018         if holding_collection is None:
1019             holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1020         for k in self:
1021             if k not in end_collection:
1022                changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1023         for k in end_collection:
1024             if k in self:
1025                 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1026                     changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1027                 elif end_collection[k] != self[k]:
1028                     changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1029                 else:
1030                     changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1031             else:
1032                 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1033         return changes
1034
1035     @must_be_writable
1036     @synchronized
1037     def apply(self, changes):
1038         """Apply changes from `diff`.
1039
1040         If a change conflicts with a local change, it will be saved to an
1041         alternate path indicating the conflict.
1042
1043         """
1044         if changes:
1045             self.set_committed(False)
1046         for change in changes:
1047             event_type = change[0]
1048             path = change[1]
1049             initial = change[2]
1050             local = self.find(path)
1051             conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1052                                                                     time.gmtime()))
1053             if event_type == ADD:
1054                 if local is None:
1055                     # No local file at path, safe to copy over new file
1056                     self.copy(initial, path)
1057                 elif local is not None and local != initial:
1058                     # There is already local file and it is different:
1059                     # save change to conflict file.
1060                     self.copy(initial, conflictpath)
1061             elif event_type == MOD or event_type == TOK:
1062                 final = change[3]
1063                 if local == initial:
1064                     # Local matches the "initial" item so it has not
1065                     # changed locally and is safe to update.
1066                     if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1067                         # Replace contents of local file with new contents
1068                         local.replace_contents(final)
1069                     else:
1070                         # Overwrite path with new item; this can happen if
1071                         # path was a file and is now a collection or vice versa
1072                         self.copy(final, path, overwrite=True)
1073                 else:
1074                     # Local is missing (presumably deleted) or local doesn't
1075                     # match the "start" value, so save change to conflict file
1076                     self.copy(final, conflictpath)
1077             elif event_type == DEL:
1078                 if local == initial:
1079                     # Local item matches "initial" value, so it is safe to remove.
1080                     self.remove(path, recursive=True)
1081                 # else, the file is modified or already removed, in either
1082                 # case we don't want to try to remove it.
1083
1084     def portable_data_hash(self):
1085         """Get the portable data hash for this collection's manifest."""
1086         if self._manifest_locator and self.committed():
1087             # If the collection is already saved on the API server, and it's committed
1088             # then return API server's PDH response.
1089             return self._portable_data_hash
1090         else:
1091             stripped = self.portable_manifest_text().encode()
1092             return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1093
1094     @synchronized
1095     def subscribe(self, callback):
1096         if self._callback is None:
1097             self._callback = callback
1098         else:
1099             raise errors.ArgumentError("A callback is already set on this collection.")
1100
1101     @synchronized
1102     def unsubscribe(self):
1103         if self._callback is not None:
1104             self._callback = None
1105
1106     @synchronized
1107     def notify(self, event, collection, name, item):
1108         if self._callback:
1109             self._callback(event, collection, name, item)
1110         self.root_collection().notify(event, collection, name, item)
1111
1112     @synchronized
1113     def __eq__(self, other):
1114         if other is self:
1115             return True
1116         if not isinstance(other, RichCollectionBase):
1117             return False
1118         if len(self._items) != len(other):
1119             return False
1120         for k in self._items:
1121             if k not in other:
1122                 return False
1123             if self._items[k] != other[k]:
1124                 return False
1125         return True
1126
1127     def __ne__(self, other):
1128         return not self.__eq__(other)
1129
1130     @synchronized
1131     def flush(self):
1132         """Flush bufferblocks to Keep."""
1133         for e in listvalues(self):
1134             e.flush()
1135
1136
1137 class Collection(RichCollectionBase):
1138     """Represents the root of an Arvados Collection.
1139
1140     This class is threadsafe.  The root collection object, all subcollections
1141     and files are protected by a single lock (i.e. each access locks the entire
1142     collection).
1143
1144     Brief summary of
1145     useful methods:
1146
1147     :To read an existing file:
1148       `c.open("myfile", "r")`
1149
1150     :To write a new file:
1151       `c.open("myfile", "w")`
1152
1153     :To determine if a file exists:
1154       `c.find("myfile") is not None`
1155
1156     :To copy a file:
1157       `c.copy("source", "dest")`
1158
1159     :To delete a file:
1160       `c.remove("myfile")`
1161
1162     :To save to an existing collection record:
1163       `c.save()`
1164
1165     :To save a new collection record:
1166     `c.save_new()`
1167
1168     :To merge remote changes into this object:
1169       `c.update()`
1170
1171     Must be associated with an API server Collection record (during
1172     initialization, or using `save_new`) to use `save` or `update`
1173
1174     """
1175
1176     def __init__(self, manifest_locator_or_text=None,
1177                  api_client=None,
1178                  keep_client=None,
1179                  num_retries=None,
1180                  parent=None,
1181                  apiconfig=None,
1182                  block_manager=None,
1183                  replication_desired=None,
1184                  put_threads=None):
1185         """Collection constructor.
1186
1187         :manifest_locator_or_text:
1188           One of Arvados collection UUID, block locator of
1189           a manifest, raw manifest text, or None (to create an empty collection).
1190         :parent:
1191           the parent Collection, may be None.
1192
1193         :apiconfig:
1194           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1195           Prefer this over supplying your own api_client and keep_client (except in testing).
1196           Will use default config settings if not specified.
1197
1198         :api_client:
1199           The API client object to use for requests.  If not specified, create one using `apiconfig`.
1200
1201         :keep_client:
1202           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
1203
1204         :num_retries:
1205           the number of retries for API and Keep requests.
1206
1207         :block_manager:
1208           the block manager to use.  If not specified, create one.
1209
1210         :replication_desired:
1211           How many copies should Arvados maintain. If None, API server default
1212           configuration applies. If not None, this value will also be used
1213           for determining the number of block copies being written.
1214
1215         """
1216         super(Collection, self).__init__(parent)
1217         self._api_client = api_client
1218         self._keep_client = keep_client
1219         self._block_manager = block_manager
1220         self.replication_desired = replication_desired
1221         self.put_threads = put_threads
1222
1223         if apiconfig:
1224             self._config = apiconfig
1225         else:
1226             self._config = config.settings()
1227
1228         self.num_retries = num_retries if num_retries is not None else 0
1229         self._manifest_locator = None
1230         self._manifest_text = None
1231         self._portable_data_hash = None
1232         self._api_response = None
1233         self._past_versions = set()
1234
1235         self.lock = threading.RLock()
1236         self.events = None
1237
1238         if manifest_locator_or_text:
1239             if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1240                 self._manifest_locator = manifest_locator_or_text
1241             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1242                 self._manifest_locator = manifest_locator_or_text
1243             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1244                 self._manifest_text = manifest_locator_or_text
1245             else:
1246                 raise errors.ArgumentError(
1247                     "Argument to CollectionReader is not a manifest or a collection UUID")
1248
1249             try:
1250                 self._populate()
1251             except (IOError, errors.SyntaxError) as e:
1252                 raise errors.ArgumentError("Error processing manifest text: %s", e)
1253
1254     def root_collection(self):
1255         return self
1256
1257     def stream_name(self):
1258         return "."
1259
1260     def writable(self):
1261         return True
1262
1263     @synchronized
1264     def known_past_version(self, modified_at_and_portable_data_hash):
1265         return modified_at_and_portable_data_hash in self._past_versions
1266
1267     @synchronized
1268     @retry_method
1269     def update(self, other=None, num_retries=None):
1270         """Merge the latest collection on the API server with the current collection."""
1271
1272         if other is None:
1273             if self._manifest_locator is None:
1274                 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1275             response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1276             if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1277                 response.get("portable_data_hash") != self.portable_data_hash()):
1278                 # The record on the server is different from our current one, but we've seen it before,
1279                 # so ignore it because it's already been merged.
1280                 # However, if it's the same as our current record, proceed with the update, because we want to update
1281                 # our tokens.
1282                 return
1283             else:
1284                 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1285             other = CollectionReader(response["manifest_text"])
1286         baseline = CollectionReader(self._manifest_text)
1287         self.apply(baseline.diff(other))
1288         self._manifest_text = self.manifest_text()
1289
1290     @synchronized
1291     def _my_api(self):
1292         if self._api_client is None:
1293             self._api_client = ThreadSafeApiCache(self._config)
1294             if self._keep_client is None:
1295                 self._keep_client = self._api_client.keep
1296         return self._api_client
1297
1298     @synchronized
1299     def _my_keep(self):
1300         if self._keep_client is None:
1301             if self._api_client is None:
1302                 self._my_api()
1303             else:
1304                 self._keep_client = KeepClient(api_client=self._api_client)
1305         return self._keep_client
1306
1307     @synchronized
1308     def _my_block_manager(self):
1309         if self._block_manager is None:
1310             copies = (self.replication_desired or
1311                       self._my_api()._rootDesc.get('defaultCollectionReplication',
1312                                                    2))
1313             self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1314         return self._block_manager
1315
1316     def _remember_api_response(self, response):
1317         self._api_response = response
1318         self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1319
1320     def _populate_from_api_server(self):
1321         # As in KeepClient itself, we must wait until the last
1322         # possible moment to instantiate an API client, in order to
1323         # avoid tripping up clients that don't have access to an API
1324         # server.  If we do build one, make sure our Keep client uses
1325         # it.  If instantiation fails, we'll fall back to the except
1326         # clause, just like any other Collection lookup
1327         # failure. Return an exception, or None if successful.
1328         try:
1329             self._remember_api_response(self._my_api().collections().get(
1330                 uuid=self._manifest_locator).execute(
1331                     num_retries=self.num_retries))
1332             self._manifest_text = self._api_response['manifest_text']
1333             self._portable_data_hash = self._api_response['portable_data_hash']
1334             # If not overriden via kwargs, we should try to load the
1335             # replication_desired from the API server
1336             if self.replication_desired is None:
1337                 self.replication_desired = self._api_response.get('replication_desired', None)
1338             return None
1339         except Exception as e:
1340             return e
1341
1342     def _populate_from_keep(self):
1343         # Retrieve a manifest directly from Keep. This has a chance of
1344         # working if [a] the locator includes a permission signature
1345         # or [b] the Keep services are operating in world-readable
1346         # mode. Return an exception, or None if successful.
1347         try:
1348             self._manifest_text = self._my_keep().get(
1349                 self._manifest_locator, num_retries=self.num_retries).decode()
1350         except Exception as e:
1351             return e
1352
1353     def _populate(self):
1354         if self._manifest_locator is None and self._manifest_text is None:
1355             return
1356         error_via_api = None
1357         error_via_keep = None
1358         should_try_keep = ((self._manifest_text is None) and
1359                            arvados.util.keep_locator_pattern.match(
1360                                self._manifest_locator))
1361         if ((self._manifest_text is None) and
1362             arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1363             error_via_keep = self._populate_from_keep()
1364         if self._manifest_text is None:
1365             error_via_api = self._populate_from_api_server()
1366             if error_via_api is not None and not should_try_keep:
1367                 raise error_via_api
1368         if ((self._manifest_text is None) and
1369             not error_via_keep and
1370             should_try_keep):
1371             # Looks like a keep locator, and we didn't already try keep above
1372             error_via_keep = self._populate_from_keep()
1373         if self._manifest_text is None:
1374             # Nothing worked!
1375             raise errors.NotFoundError(
1376                 ("Failed to retrieve collection '{}' " +
1377                  "from either API server ({}) or Keep ({})."
1378                  ).format(
1379                     self._manifest_locator,
1380                     error_via_api,
1381                     error_via_keep))
1382         # populate
1383         self._baseline_manifest = self._manifest_text
1384         self._import_manifest(self._manifest_text)
1385
1386
1387     def _has_collection_uuid(self):
1388         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1389
1390     def __enter__(self):
1391         return self
1392
1393     def __exit__(self, exc_type, exc_value, traceback):
1394         """Support scoped auto-commit in a with: block."""
1395         if exc_type is None:
1396             if self.writable() and self._has_collection_uuid():
1397                 self.save()
1398         self.stop_threads()
1399
1400     def stop_threads(self):
1401         if self._block_manager is not None:
1402             self._block_manager.stop_threads()
1403
1404     @synchronized
1405     def manifest_locator(self):
1406         """Get the manifest locator, if any.
1407
1408         The manifest locator will be set when the collection is loaded from an
1409         API server record or the portable data hash of a manifest.
1410
1411         The manifest locator will be None if the collection is newly created or
1412         was created directly from manifest text.  The method `save_new()` will
1413         assign a manifest locator.
1414
1415         """
1416         return self._manifest_locator
1417
1418     @synchronized
1419     def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1420         if new_config is None:
1421             new_config = self._config
1422         if readonly:
1423             newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1424         else:
1425             newcollection = Collection(parent=new_parent, apiconfig=new_config)
1426
1427         newcollection._clonefrom(self)
1428         return newcollection
1429
1430     @synchronized
1431     def api_response(self):
1432         """Returns information about this Collection fetched from the API server.
1433
1434         If the Collection exists in Keep but not the API server, currently
1435         returns None.  Future versions may provide a synthetic response.
1436
1437         """
1438         return self._api_response
1439
1440     def find_or_create(self, path, create_type):
1441         """See `RichCollectionBase.find_or_create`"""
1442         if path == ".":
1443             return self
1444         else:
1445             return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1446
1447     def find(self, path):
1448         """See `RichCollectionBase.find`"""
1449         if path == ".":
1450             return self
1451         else:
1452             return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1453
1454     def remove(self, path, recursive=False):
1455         """See `RichCollectionBase.remove`"""
1456         if path == ".":
1457             raise errors.ArgumentError("Cannot remove '.'")
1458         else:
1459             return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1460
1461     @must_be_writable
1462     @synchronized
1463     @retry_method
1464     def save(self, merge=True, num_retries=None):
1465         """Save collection to an existing collection record.
1466
1467         Commit pending buffer blocks to Keep, merge with remote record (if
1468         merge=True, the default), and update the collection record.  Returns
1469         the current manifest text.
1470
1471         Will raise AssertionError if not associated with a collection record on
1472         the API server.  If you want to save a manifest to Keep only, see
1473         `save_new()`.
1474
1475         :merge:
1476           Update and merge remote changes before saving.  Otherwise, any
1477           remote changes will be ignored and overwritten.
1478
1479         :num_retries:
1480           Retry count on API calls (if None,  use the collection default)
1481
1482         """
1483         if not self.committed():
1484             if not self._has_collection_uuid():
1485                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
1486
1487             self._my_block_manager().commit_all()
1488
1489             if merge:
1490                 self.update()
1491
1492             text = self.manifest_text(strip=False)
1493             self._remember_api_response(self._my_api().collections().update(
1494                 uuid=self._manifest_locator,
1495                 body={'manifest_text': text}
1496                 ).execute(
1497                     num_retries=num_retries))
1498             self._manifest_text = self._api_response["manifest_text"]
1499             self._portable_data_hash = self._api_response["portable_data_hash"]
1500             self.set_committed(True)
1501
1502         return self._manifest_text
1503
1504
1505     @must_be_writable
1506     @synchronized
1507     @retry_method
1508     def save_new(self, name=None,
1509                  create_collection_record=True,
1510                  owner_uuid=None,
1511                  ensure_unique_name=False,
1512                  num_retries=None):
1513         """Save collection to a new collection record.
1514
1515         Commit pending buffer blocks to Keep and, when create_collection_record
1516         is True (default), create a new collection record.  After creating a
1517         new collection record, this Collection object will be associated with
1518         the new record used by `save()`.  Returns the current manifest text.
1519
1520         :name:
1521           The collection name.
1522
1523         :create_collection_record:
1524            If True, create a collection record on the API server.
1525            If False, only commit blocks to Keep and return the manifest text.
1526
1527         :owner_uuid:
1528           the user, or project uuid that will own this collection.
1529           If None, defaults to the current user.
1530
1531         :ensure_unique_name:
1532           If True, ask the API server to rename the collection
1533           if it conflicts with a collection with the same name and owner.  If
1534           False, a name conflict will result in an error.
1535
1536         :num_retries:
1537           Retry count on API calls (if None,  use the collection default)
1538
1539         """
1540         self._my_block_manager().commit_all()
1541         text = self.manifest_text(strip=False)
1542
1543         if create_collection_record:
1544             if name is None:
1545                 name = "New collection"
1546                 ensure_unique_name = True
1547
1548             body = {"manifest_text": text,
1549                     "name": name,
1550                     "replication_desired": self.replication_desired}
1551             if owner_uuid:
1552                 body["owner_uuid"] = owner_uuid
1553
1554             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1555             text = self._api_response["manifest_text"]
1556
1557             self._manifest_locator = self._api_response["uuid"]
1558             self._portable_data_hash = self._api_response["portable_data_hash"]
1559
1560             self._manifest_text = text
1561             self.set_committed(True)
1562
1563         return text
1564
1565     @synchronized
1566     def _import_manifest(self, manifest_text):
1567         """Import a manifest into a `Collection`.
1568
1569         :manifest_text:
1570           The manifest text to import from.
1571
1572         """
1573         if len(self) > 0:
1574             raise ArgumentError("Can only import manifest into an empty collection")
1575
1576         STREAM_NAME = 0
1577         BLOCKS = 1
1578         SEGMENTS = 2
1579
1580         stream_name = None
1581         state = STREAM_NAME
1582
1583         for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1584             tok = token_and_separator.group(1)
1585             sep = token_and_separator.group(2)
1586
1587             if state == STREAM_NAME:
1588                 # starting a new stream
1589                 stream_name = tok.replace('\\040', ' ')
1590                 blocks = []
1591                 segments = []
1592                 streamoffset = 0
1593                 state = BLOCKS
1594                 self.find_or_create(stream_name, COLLECTION)
1595                 continue
1596
1597             if state == BLOCKS:
1598                 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1599                 if block_locator:
1600                     blocksize = int(block_locator.group(1))
1601                     blocks.append(Range(tok, streamoffset, blocksize, 0))
1602                     streamoffset += blocksize
1603                 else:
1604                     state = SEGMENTS
1605
1606             if state == SEGMENTS:
1607                 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1608                 if file_segment:
1609                     pos = int(file_segment.group(1))
1610                     size = int(file_segment.group(2))
1611                     name = file_segment.group(3).replace('\\040', ' ')
1612                     filepath = os.path.join(stream_name, name)
1613                     afile = self.find_or_create(filepath, FILE)
1614                     if isinstance(afile, ArvadosFile):
1615                         afile.add_segment(blocks, pos, size)
1616                     else:
1617                         raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1618                 else:
1619                     # error!
1620                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1621
1622             if sep == "\n":
1623                 stream_name = None
1624                 state = STREAM_NAME
1625
1626         self.set_committed(True)
1627
1628     @synchronized
1629     def notify(self, event, collection, name, item):
1630         if self._callback:
1631             self._callback(event, collection, name, item)
1632
1633
1634 class Subcollection(RichCollectionBase):
1635     """This is a subdirectory within a collection that doesn't have its own API
1636     server record.
1637
1638     Subcollection locking falls under the umbrella lock of its root collection.
1639
1640     """
1641
1642     def __init__(self, parent, name):
1643         super(Subcollection, self).__init__(parent)
1644         self.lock = self.root_collection().lock
1645         self._manifest_text = None
1646         self.name = name
1647         self.num_retries = parent.num_retries
1648
1649     def root_collection(self):
1650         return self.parent.root_collection()
1651
1652     def writable(self):
1653         return self.root_collection().writable()
1654
1655     def _my_api(self):
1656         return self.root_collection()._my_api()
1657
1658     def _my_keep(self):
1659         return self.root_collection()._my_keep()
1660
1661     def _my_block_manager(self):
1662         return self.root_collection()._my_block_manager()
1663
1664     def stream_name(self):
1665         return os.path.join(self.parent.stream_name(), self.name)
1666
1667     @synchronized
1668     def clone(self, new_parent, new_name):
1669         c = Subcollection(new_parent, new_name)
1670         c._clonefrom(self)
1671         return c
1672
1673     @must_be_writable
1674     @synchronized
1675     def _reparent(self, newparent, newname):
1676         self.set_committed(False)
1677         self.flush()
1678         self.parent.remove(self.name, recursive=True)
1679         self.parent = newparent
1680         self.name = newname
1681         self.lock = self.parent.root_collection().lock
1682
1683
1684 class CollectionReader(Collection):
1685     """A read-only collection object.
1686
1687     Initialize from an api collection record locator, a portable data hash of a
1688     manifest, or raw manifest text.  See `Collection` constructor for detailed
1689     options.
1690
1691     """
1692     def __init__(self, manifest_locator_or_text, *args, **kwargs):
1693         self._in_init = True
1694         super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1695         self._in_init = False
1696
1697         # Forego any locking since it should never change once initialized.
1698         self.lock = NoopLock()
1699
1700         # Backwards compatability with old CollectionReader
1701         # all_streams() and all_files()
1702         self._streams = None
1703
1704     def writable(self):
1705         return self._in_init
1706
1707     def _populate_streams(orig_func):
1708         @functools.wraps(orig_func)
1709         def populate_streams_wrapper(self, *args, **kwargs):
1710             # Defer populating self._streams until needed since it creates a copy of the manifest.
1711             if self._streams is None:
1712                 if self._manifest_text:
1713                     self._streams = [sline.split()
1714                                      for sline in self._manifest_text.split("\n")
1715                                      if sline]
1716                 else:
1717                     self._streams = []
1718             return orig_func(self, *args, **kwargs)
1719         return populate_streams_wrapper
1720
1721     @_populate_streams
1722     def normalize(self):
1723         """Normalize the streams returned by `all_streams`.
1724
1725         This method is kept for backwards compatability and only affects the
1726         behavior of `all_streams()` and `all_files()`
1727
1728         """
1729
1730         # Rearrange streams
1731         streams = {}
1732         for s in self.all_streams():
1733             for f in s.all_files():
1734                 streamname, filename = split(s.name() + "/" + f.name())
1735                 if streamname not in streams:
1736                     streams[streamname] = {}
1737                 if filename not in streams[streamname]:
1738                     streams[streamname][filename] = []
1739                 for r in f.segments:
1740                     streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1741
1742         self._streams = [normalize_stream(s, streams[s])
1743                          for s in sorted(streams)]
1744     @_populate_streams
1745     def all_streams(self):
1746         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1747                 for s in self._streams]
1748
1749     @_populate_streams
1750     def all_files(self):
1751         for s in self.all_streams():
1752             for f in s.all_files():
1753                 yield f