1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
23 from collections import deque
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
35 import arvados.events as events
36 from arvados.retry import retry_method
38 _logger = logging.getLogger('arvados.collection')
41 if sys.version_info >= (3, 0):
42 TextIOWrapper = io.TextIOWrapper
44 class TextIOWrapper(io.TextIOWrapper):
45 """To maintain backward compatibility, cast str to unicode in
49 def write(self, data):
50 if isinstance(data, basestring):
52 return super(TextIOWrapper, self).write(data)
55 class CollectionBase(object):
56 """Abstract base class for Collection classes."""
61 def __exit__(self, exc_type, exc_value, traceback):
65 if self._keep_client is None:
66 self._keep_client = KeepClient(api_client=self._api_client,
67 num_retries=self.num_retries)
68 return self._keep_client
70 def stripped_manifest(self):
71 """Get the manifest with locator hints stripped.
73 Return the manifest for the current collection with all
74 non-portable hints (i.e., permission signatures and other
75 hints other than size hints) removed from the locators.
77 raw = self.manifest_text()
79 for line in raw.split("\n"):
82 clean_fields = fields[:1] + [
83 (re.sub(r'\+[^\d][^\+]*', '', x)
84 if re.match(arvados.util.keep_locator_pattern, x)
87 clean += [' '.join(clean_fields), "\n"]
91 class _WriterFile(_FileLikeObjectBase):
92 def __init__(self, coll_writer, name):
93 super(_WriterFile, self).__init__(name, 'wb')
94 self.dest = coll_writer
97 super(_WriterFile, self).close()
98 self.dest.finish_current_file()
100 @_FileLikeObjectBase._before_close
101 def write(self, data):
102 self.dest.write(data)
104 @_FileLikeObjectBase._before_close
105 def writelines(self, seq):
109 @_FileLikeObjectBase._before_close
111 self.dest.flush_data()
114 class CollectionWriter(CollectionBase):
115 """Deprecated, use Collection instead."""
117 def __init__(self, api_client=None, num_retries=0, replication=None):
118 """Instantiate a CollectionWriter.
120 CollectionWriter lets you build a new Arvados Collection from scratch.
121 Write files to it. The CollectionWriter will upload data to Keep as
122 appropriate, and provide you with the Collection manifest text when
126 * api_client: The API client to use to look up Collections. If not
127 provided, CollectionReader will build one from available Arvados
129 * num_retries: The default number of times to retry failed
130 service requests. Default 0. You may change this value
131 after instantiation, but note those changes may not
132 propagate to related objects like the Keep client.
133 * replication: The number of copies of each block to store.
134 If this argument is None or not supplied, replication is
135 the server-provided default if available, otherwise 2.
137 self._api_client = api_client
138 self.num_retries = num_retries
139 self.replication = (2 if replication is None else replication)
140 self._keep_client = None
141 self._data_buffer = []
142 self._data_buffer_len = 0
143 self._current_stream_files = []
144 self._current_stream_length = 0
145 self._current_stream_locators = []
146 self._current_stream_name = '.'
147 self._current_file_name = None
148 self._current_file_pos = 0
149 self._finished_streams = []
150 self._close_file = None
151 self._queued_file = None
152 self._queued_dirents = deque()
153 self._queued_trees = deque()
154 self._last_open = None
156 def __exit__(self, exc_type, exc_value, traceback):
160 def do_queued_work(self):
161 # The work queue consists of three pieces:
162 # * _queued_file: The file object we're currently writing to the
164 # * _queued_dirents: Entries under the current directory
165 # (_queued_trees[0]) that we want to write or recurse through.
166 # This may contain files from subdirectories if
167 # max_manifest_depth == 0 for this directory.
168 # * _queued_trees: Directories that should be written as separate
169 # streams to the Collection.
170 # This function handles the smallest piece of work currently queued
171 # (current file, then current directory, then next directory) until
172 # no work remains. The _work_THING methods each do a unit of work on
173 # THING. _queue_THING methods add a THING to the work queue.
175 if self._queued_file:
177 elif self._queued_dirents:
179 elif self._queued_trees:
184 def _work_file(self):
186 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
190 self.finish_current_file()
192 self._queued_file.close()
193 self._close_file = None
194 self._queued_file = None
196 def _work_dirents(self):
197 path, stream_name, max_manifest_depth = self._queued_trees[0]
198 if stream_name != self.current_stream_name():
199 self.start_new_stream(stream_name)
200 while self._queued_dirents:
201 dirent = self._queued_dirents.popleft()
202 target = os.path.join(path, dirent)
203 if os.path.isdir(target):
204 self._queue_tree(target,
205 os.path.join(stream_name, dirent),
206 max_manifest_depth - 1)
208 self._queue_file(target, dirent)
210 if not self._queued_dirents:
211 self._queued_trees.popleft()
213 def _work_trees(self):
214 path, stream_name, max_manifest_depth = self._queued_trees[0]
215 d = arvados.util.listdir_recursive(
216 path, max_depth = (None if max_manifest_depth == 0 else 0))
218 self._queue_dirents(stream_name, d)
220 self._queued_trees.popleft()
222 def _queue_file(self, source, filename=None):
223 assert (self._queued_file is None), "tried to queue more than one file"
224 if not hasattr(source, 'read'):
225 source = open(source, 'rb')
226 self._close_file = True
228 self._close_file = False
230 filename = os.path.basename(source.name)
231 self.start_new_file(filename)
232 self._queued_file = source
234 def _queue_dirents(self, stream_name, dirents):
235 assert (not self._queued_dirents), "tried to queue more than one tree"
236 self._queued_dirents = deque(sorted(dirents))
238 def _queue_tree(self, path, stream_name, max_manifest_depth):
239 self._queued_trees.append((path, stream_name, max_manifest_depth))
241 def write_file(self, source, filename=None):
242 self._queue_file(source, filename)
243 self.do_queued_work()
245 def write_directory_tree(self,
246 path, stream_name='.', max_manifest_depth=-1):
247 self._queue_tree(path, stream_name, max_manifest_depth)
248 self.do_queued_work()
250 def write(self, newdata):
251 if isinstance(newdata, bytes):
253 elif isinstance(newdata, str):
254 newdata = newdata.encode()
255 elif hasattr(newdata, '__iter__'):
259 self._data_buffer.append(newdata)
260 self._data_buffer_len += len(newdata)
261 self._current_stream_length += len(newdata)
262 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
265 def open(self, streampath, filename=None):
266 """open(streampath[, filename]) -> file-like object
268 Pass in the path of a file to write to the Collection, either as a
269 single string or as two separate stream name and file name arguments.
270 This method returns a file-like object you can write to add it to the
273 You may only have one file object from the Collection open at a time,
274 so be sure to close the object when you're done. Using the object in
275 a with statement makes that easy::
277 with cwriter.open('./doc/page1.txt') as outfile:
278 outfile.write(page1_data)
279 with cwriter.open('./doc/page2.txt') as outfile:
280 outfile.write(page2_data)
283 streampath, filename = split(streampath)
284 if self._last_open and not self._last_open.closed:
285 raise errors.AssertionError(
286 "can't open '{}' when '{}' is still open".format(
287 filename, self._last_open.name))
288 if streampath != self.current_stream_name():
289 self.start_new_stream(streampath)
290 self.set_current_file_name(filename)
291 self._last_open = _WriterFile(self, filename)
292 return self._last_open
294 def flush_data(self):
295 data_buffer = b''.join(self._data_buffer)
297 self._current_stream_locators.append(
299 data_buffer[0:config.KEEP_BLOCK_SIZE],
300 copies=self.replication))
301 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
302 self._data_buffer_len = len(self._data_buffer[0])
304 def start_new_file(self, newfilename=None):
305 self.finish_current_file()
306 self.set_current_file_name(newfilename)
308 def set_current_file_name(self, newfilename):
309 if re.search(r'[\t\n]', newfilename):
310 raise errors.AssertionError(
311 "Manifest filenames cannot contain whitespace: %s" %
313 elif re.search(r'\x00', newfilename):
314 raise errors.AssertionError(
315 "Manifest filenames cannot contain NUL characters: %s" %
317 self._current_file_name = newfilename
319 def current_file_name(self):
320 return self._current_file_name
322 def finish_current_file(self):
323 if self._current_file_name is None:
324 if self._current_file_pos == self._current_stream_length:
326 raise errors.AssertionError(
327 "Cannot finish an unnamed file " +
328 "(%d bytes at offset %d in '%s' stream)" %
329 (self._current_stream_length - self._current_file_pos,
330 self._current_file_pos,
331 self._current_stream_name))
332 self._current_stream_files.append([
333 self._current_file_pos,
334 self._current_stream_length - self._current_file_pos,
335 self._current_file_name])
336 self._current_file_pos = self._current_stream_length
337 self._current_file_name = None
339 def start_new_stream(self, newstreamname='.'):
340 self.finish_current_stream()
341 self.set_current_stream_name(newstreamname)
343 def set_current_stream_name(self, newstreamname):
344 if re.search(r'[\t\n]', newstreamname):
345 raise errors.AssertionError(
346 "Manifest stream names cannot contain whitespace: '%s'" %
348 self._current_stream_name = '.' if newstreamname=='' else newstreamname
350 def current_stream_name(self):
351 return self._current_stream_name
353 def finish_current_stream(self):
354 self.finish_current_file()
356 if not self._current_stream_files:
358 elif self._current_stream_name is None:
359 raise errors.AssertionError(
360 "Cannot finish an unnamed stream (%d bytes in %d files)" %
361 (self._current_stream_length, len(self._current_stream_files)))
363 if not self._current_stream_locators:
364 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
365 self._finished_streams.append([self._current_stream_name,
366 self._current_stream_locators,
367 self._current_stream_files])
368 self._current_stream_files = []
369 self._current_stream_length = 0
370 self._current_stream_locators = []
371 self._current_stream_name = None
372 self._current_file_pos = 0
373 self._current_file_name = None
376 """Store the manifest in Keep and return its locator.
378 This is useful for storing manifest fragments (task outputs)
379 temporarily in Keep during a Crunch job.
381 In other cases you should make a collection instead, by
382 sending manifest_text() to the API server's "create
383 collection" endpoint.
385 return self._my_keep().put(self.manifest_text().encode(),
386 copies=self.replication)
388 def portable_data_hash(self):
389 stripped = self.stripped_manifest().encode()
390 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
392 def manifest_text(self):
393 self.finish_current_stream()
396 for stream in self._finished_streams:
397 if not re.search(r'^\.(/.*)?$', stream[0]):
399 manifest += stream[0].replace(' ', '\\040')
400 manifest += ' ' + ' '.join(stream[1])
401 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
406 def data_locators(self):
408 for name, locators, files in self._finished_streams:
412 def save_new(self, name=None):
413 return self._api_client.collections().create(
414 ensure_unique_name=True,
417 'manifest_text': self.manifest_text(),
418 }).execute(num_retries=self.num_retries)
421 class ResumableCollectionWriter(CollectionWriter):
422 """Deprecated, use Collection instead."""
424 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
425 '_current_stream_locators', '_current_stream_name',
426 '_current_file_name', '_current_file_pos', '_close_file',
427 '_data_buffer', '_dependencies', '_finished_streams',
428 '_queued_dirents', '_queued_trees']
430 def __init__(self, api_client=None, **kwargs):
431 self._dependencies = {}
432 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
435 def from_state(cls, state, *init_args, **init_kwargs):
436 # Try to build a new writer from scratch with the given state.
437 # If the state is not suitable to resume (because files have changed,
438 # been deleted, aren't predictable, etc.), raise a
439 # StaleWriterStateError. Otherwise, return the initialized writer.
440 # The caller is responsible for calling writer.do_queued_work()
441 # appropriately after it's returned.
442 writer = cls(*init_args, **init_kwargs)
443 for attr_name in cls.STATE_PROPS:
444 attr_value = state[attr_name]
445 attr_class = getattr(writer, attr_name).__class__
446 # Coerce the value into the same type as the initial value, if
448 if attr_class not in (type(None), attr_value.__class__):
449 attr_value = attr_class(attr_value)
450 setattr(writer, attr_name, attr_value)
451 # Check dependencies before we try to resume anything.
452 if any(KeepLocator(ls).permission_expired()
453 for ls in writer._current_stream_locators):
454 raise errors.StaleWriterStateError(
455 "locators include expired permission hint")
456 writer.check_dependencies()
457 if state['_current_file'] is not None:
458 path, pos = state['_current_file']
460 writer._queued_file = open(path, 'rb')
461 writer._queued_file.seek(pos)
462 except IOError as error:
463 raise errors.StaleWriterStateError(
464 "failed to reopen active file {}: {}".format(path, error))
467 def check_dependencies(self):
468 for path, orig_stat in listitems(self._dependencies):
469 if not S_ISREG(orig_stat[ST_MODE]):
470 raise errors.StaleWriterStateError("{} not file".format(path))
472 now_stat = tuple(os.stat(path))
473 except OSError as error:
474 raise errors.StaleWriterStateError(
475 "failed to stat {}: {}".format(path, error))
476 if ((not S_ISREG(now_stat[ST_MODE])) or
477 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
478 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
479 raise errors.StaleWriterStateError("{} changed".format(path))
481 def dump_state(self, copy_func=lambda x: x):
482 state = {attr: copy_func(getattr(self, attr))
483 for attr in self.STATE_PROPS}
484 if self._queued_file is None:
485 state['_current_file'] = None
487 state['_current_file'] = (os.path.realpath(self._queued_file.name),
488 self._queued_file.tell())
491 def _queue_file(self, source, filename=None):
493 src_path = os.path.realpath(source)
495 raise errors.AssertionError("{} not a file path".format(source))
497 path_stat = os.stat(src_path)
498 except OSError as stat_error:
500 super(ResumableCollectionWriter, self)._queue_file(source, filename)
501 fd_stat = os.fstat(self._queued_file.fileno())
502 if not S_ISREG(fd_stat.st_mode):
503 # We won't be able to resume from this cache anyway, so don't
504 # worry about further checks.
505 self._dependencies[source] = tuple(fd_stat)
506 elif path_stat is None:
507 raise errors.AssertionError(
508 "could not stat {}: {}".format(source, stat_error))
509 elif path_stat.st_ino != fd_stat.st_ino:
510 raise errors.AssertionError(
511 "{} changed between open and stat calls".format(source))
513 self._dependencies[src_path] = tuple(fd_stat)
515 def write(self, data):
516 if self._queued_file is None:
517 raise errors.AssertionError(
518 "resumable writer can't accept unsourced data")
519 return super(ResumableCollectionWriter, self).write(data)
527 COLLECTION = "collection"
529 class RichCollectionBase(CollectionBase):
530 """Base class for Collections and Subcollections.
532 Implements the majority of functionality relating to accessing items in the
537 def __init__(self, parent=None):
539 self._committed = False
540 self._has_remote_blocks = False
541 self._callback = None
545 raise NotImplementedError()
548 raise NotImplementedError()
550 def _my_block_manager(self):
551 raise NotImplementedError()
554 raise NotImplementedError()
556 def root_collection(self):
557 raise NotImplementedError()
559 def notify(self, event, collection, name, item):
560 raise NotImplementedError()
562 def stream_name(self):
563 raise NotImplementedError()
566 def has_remote_blocks(self):
567 """Recursively check for a +R segment locator signature."""
569 if self._has_remote_blocks:
572 if self[item].has_remote_blocks():
577 def set_has_remote_blocks(self, val):
578 self._has_remote_blocks = val
580 self.parent.set_has_remote_blocks(val)
584 def find_or_create(self, path, create_type):
585 """Recursively search the specified file path.
587 May return either a `Collection` or `ArvadosFile`. If not found, will
588 create a new item at the specified path based on `create_type`. Will
589 create intermediate subcollections needed to contain the final item in
593 One of `arvados.collection.FILE` or
594 `arvados.collection.COLLECTION`. If the path is not found, and value
595 of create_type is FILE then create and return a new ArvadosFile for
596 the last path component. If COLLECTION, then create and return a new
597 Collection for the last path component.
601 pathcomponents = path.split("/", 1)
602 if pathcomponents[0]:
603 item = self._items.get(pathcomponents[0])
604 if len(pathcomponents) == 1:
607 if create_type == COLLECTION:
608 item = Subcollection(self, pathcomponents[0])
610 item = ArvadosFile(self, pathcomponents[0])
611 self._items[pathcomponents[0]] = item
612 self.set_committed(False)
613 self.notify(ADD, self, pathcomponents[0], item)
617 # create new collection
618 item = Subcollection(self, pathcomponents[0])
619 self._items[pathcomponents[0]] = item
620 self.set_committed(False)
621 self.notify(ADD, self, pathcomponents[0], item)
622 if isinstance(item, RichCollectionBase):
623 return item.find_or_create(pathcomponents[1], create_type)
625 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
630 def find(self, path):
631 """Recursively search the specified file path.
633 May return either a Collection or ArvadosFile. Return None if not
635 If path is invalid (ex: starts with '/'), an IOError exception will be
640 raise errors.ArgumentError("Parameter 'path' is empty.")
642 pathcomponents = path.split("/", 1)
643 if pathcomponents[0] == '':
644 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
646 item = self._items.get(pathcomponents[0])
649 elif len(pathcomponents) == 1:
652 if isinstance(item, RichCollectionBase):
653 if pathcomponents[1]:
654 return item.find(pathcomponents[1])
658 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
661 def mkdirs(self, path):
662 """Recursive subcollection create.
664 Like `os.makedirs()`. Will create intermediate subcollections needed
665 to contain the leaf subcollection path.
669 if self.find(path) != None:
670 raise IOError(errno.EEXIST, "Directory or file exists", path)
672 return self.find_or_create(path, COLLECTION)
674 def open(self, path, mode="r", encoding=None):
675 """Open a file-like object for access.
678 path to a file in the collection
680 a string consisting of "r", "w", or "a", optionally followed
681 by "b" or "t", optionally followed by "+".
683 binary mode: write() accepts bytes, read() returns bytes.
685 text mode (default): write() accepts strings, read() returns strings.
689 opens for reading and writing. Reads/writes share a file pointer.
691 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
693 opens for reading and writing. All writes are appended to
694 the end of the file. Writing does not affect the file pointer for
699 if not re.search(r'^[rwa][bt]?\+?$', mode):
700 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
702 if mode[0] == 'r' and '+' not in mode:
703 fclass = ArvadosFileReader
704 arvfile = self.find(path)
705 elif not self.writable():
706 raise IOError(errno.EROFS, "Collection is read only")
708 fclass = ArvadosFileWriter
709 arvfile = self.find_or_create(path, FILE)
712 raise IOError(errno.ENOENT, "File not found", path)
713 if not isinstance(arvfile, ArvadosFile):
714 raise IOError(errno.EISDIR, "Is a directory", path)
719 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
720 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
722 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
723 f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
727 """Determine if the collection has been modified since last commited."""
728 return not self.committed()
732 """Determine if the collection has been committed to the API server."""
733 return self._committed
736 def set_committed(self, value=True):
737 """Recursively set committed flag.
739 If value is True, set committed to be True for this and all children.
741 If value is False, set committed to be False for this and all parents.
743 if value == self._committed:
746 for k,v in listitems(self._items):
747 v.set_committed(True)
748 self._committed = True
750 self._committed = False
751 if self.parent is not None:
752 self.parent.set_committed(False)
756 """Iterate over names of files and collections contained in this collection."""
757 return iter(viewkeys(self._items))
760 def __getitem__(self, k):
761 """Get a file or collection that is directly contained by this collection.
763 If you want to search a path, use `find()` instead.
766 return self._items[k]
769 def __contains__(self, k):
770 """Test if there is a file or collection a directly contained by this collection."""
771 return k in self._items
775 """Get the number of items directly contained in this collection."""
776 return len(self._items)
780 def __delitem__(self, p):
781 """Delete an item by name which is directly contained by this collection."""
783 self.set_committed(False)
784 self.notify(DEL, self, p, None)
788 """Get a list of names of files and collections directly contained in this collection."""
789 return self._items.keys()
793 """Get a list of files and collection objects directly contained in this collection."""
794 return listvalues(self._items)
798 """Get a list of (name, object) tuples directly contained in this collection."""
799 return listitems(self._items)
801 def exists(self, path):
802 """Test if there is a file or collection at `path`."""
803 return self.find(path) is not None
807 def remove(self, path, recursive=False):
808 """Remove the file or subcollection (directory) at `path`.
811 Specify whether to remove non-empty subcollections (True), or raise an error (False).
815 raise errors.ArgumentError("Parameter 'path' is empty.")
817 pathcomponents = path.split("/", 1)
818 item = self._items.get(pathcomponents[0])
820 raise IOError(errno.ENOENT, "File not found", path)
821 if len(pathcomponents) == 1:
822 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
823 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
824 deleteditem = self._items[pathcomponents[0]]
825 del self._items[pathcomponents[0]]
826 self.set_committed(False)
827 self.notify(DEL, self, pathcomponents[0], deleteditem)
829 item.remove(pathcomponents[1])
831 def _clonefrom(self, source):
832 for k,v in listitems(source):
833 self._items[k] = v.clone(self, k)
836 raise NotImplementedError()
840 def add(self, source_obj, target_name, overwrite=False, reparent=False):
841 """Copy or move a file or subcollection to this collection.
844 An ArvadosFile, or Subcollection object
847 Destination item name. If the target name already exists and is a
848 file, this will raise an error unless you specify `overwrite=True`.
851 Whether to overwrite target file if it already exists.
854 If True, source_obj will be moved from its parent collection to this collection.
855 If False, source_obj will be copied and the parent collection will be
860 if target_name in self and not overwrite:
861 raise IOError(errno.EEXIST, "File already exists", target_name)
864 if target_name in self:
865 modified_from = self[target_name]
867 # Actually make the move or copy.
869 source_obj._reparent(self, target_name)
872 item = source_obj.clone(self, target_name)
874 self._items[target_name] = item
875 self.set_committed(False)
876 if not self._has_remote_blocks and source_obj.has_remote_blocks():
877 self.set_has_remote_blocks(True)
880 self.notify(MOD, self, target_name, (modified_from, item))
882 self.notify(ADD, self, target_name, item)
884 def _get_src_target(self, source, target_path, source_collection, create_dest):
885 if source_collection is None:
886 source_collection = self
889 if isinstance(source, basestring):
890 source_obj = source_collection.find(source)
891 if source_obj is None:
892 raise IOError(errno.ENOENT, "File not found", source)
893 sourcecomponents = source.split("/")
896 sourcecomponents = None
898 # Find parent collection the target path
899 targetcomponents = target_path.split("/")
901 # Determine the name to use.
902 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
905 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
908 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
910 if len(targetcomponents) > 1:
911 target_dir = self.find("/".join(targetcomponents[0:-1]))
915 if target_dir is None:
916 raise IOError(errno.ENOENT, "Target directory not found", target_name)
918 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
919 target_dir = target_dir[target_name]
920 target_name = sourcecomponents[-1]
922 return (source_obj, target_dir, target_name)
926 def copy(self, source, target_path, source_collection=None, overwrite=False):
927 """Copy a file or subcollection to a new path in this collection.
930 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
933 Destination file or path. If the target path already exists and is a
934 subcollection, the item will be placed inside the subcollection. If
935 the target path already exists and is a file, this will raise an error
936 unless you specify `overwrite=True`.
939 Collection to copy `source_path` from (default `self`)
942 Whether to overwrite target file if it already exists.
945 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
946 target_dir.add(source_obj, target_name, overwrite, False)
950 def rename(self, source, target_path, source_collection=None, overwrite=False):
951 """Move a file or subcollection from `source_collection` to a new path in this collection.
954 A string with a path to source file or subcollection.
957 Destination file or path. If the target path already exists and is a
958 subcollection, the item will be placed inside the subcollection. If
959 the target path already exists and is a file, this will raise an error
960 unless you specify `overwrite=True`.
963 Collection to copy `source_path` from (default `self`)
966 Whether to overwrite target file if it already exists.
969 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
970 if not source_obj.writable():
971 raise IOError(errno.EROFS, "Source collection is read only", source)
972 target_dir.add(source_obj, target_name, overwrite, True)
974 def portable_manifest_text(self, stream_name="."):
975 """Get the manifest text for this collection, sub collections and files.
977 This method does not flush outstanding blocks to Keep. It will return
978 a normalized manifest with access tokens stripped.
981 Name to use for this stream (directory)
984 return self._get_manifest_text(stream_name, True, True)
987 def manifest_text(self, stream_name=".", strip=False, normalize=False,
988 only_committed=False):
989 """Get the manifest text for this collection, sub collections and files.
991 This method will flush outstanding blocks to Keep. By default, it will
992 not normalize an unmodified manifest or strip access tokens.
995 Name to use for this stream (directory)
998 If True, remove signing tokens from block locators if present.
999 If False (default), block locators are left unchanged.
1002 If True, always export the manifest text in normalized form
1003 even if the Collection is not modified. If False (default) and the collection
1004 is not modified, return the original manifest text even if it is not
1008 If True, don't commit pending blocks.
1012 if not only_committed:
1013 self._my_block_manager().commit_all()
1014 return self._get_manifest_text(stream_name, strip, normalize,
1015 only_committed=only_committed)
1018 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1019 """Get the manifest text for this collection, sub collections and files.
1022 Name to use for this stream (directory)
1025 If True, remove signing tokens from block locators if present.
1026 If False (default), block locators are left unchanged.
1029 If True, always export the manifest text in normalized form
1030 even if the Collection is not modified. If False (default) and the collection
1031 is not modified, return the original manifest text even if it is not
1035 If True, only include blocks that were already committed to Keep.
1039 if not self.committed() or self._manifest_text is None or normalize:
1042 sorted_keys = sorted(self.keys())
1043 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1044 # Create a stream per file `k`
1045 arvfile = self[filename]
1047 for segment in arvfile.segments():
1048 loc = segment.locator
1049 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1052 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1054 loc = KeepLocator(loc).stripped()
1055 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1056 segment.segment_offset, segment.range_size))
1057 stream[filename] = filestream
1059 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1060 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1061 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1065 return self.stripped_manifest()
1067 return self._manifest_text
1070 def _copy_remote_blocks(self, remote_blocks={}):
1071 """Scan through the entire collection and ask Keep to copy remote blocks.
1073 When accessing a remote collection, blocks will have a remote signature
1074 (+R instead of +A). Collect these signatures and request Keep to copy the
1075 blocks to the local cluster, returning local (+A) signatures.
1078 Shared cache of remote to local block mappings. This is used to avoid
1079 doing extra work when blocks are shared by more than one file in
1080 different subdirectories.
1084 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1085 return remote_blocks
1088 def diff(self, end_collection, prefix=".", holding_collection=None):
1089 """Generate list of add/modify/delete actions.
1091 When given to `apply`, will change `self` to match `end_collection`
1095 if holding_collection is None:
1096 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1098 if k not in end_collection:
1099 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1100 for k in end_collection:
1102 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1103 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1104 elif end_collection[k] != self[k]:
1105 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1107 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1109 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1114 def apply(self, changes):
1115 """Apply changes from `diff`.
1117 If a change conflicts with a local change, it will be saved to an
1118 alternate path indicating the conflict.
1122 self.set_committed(False)
1123 for change in changes:
1124 event_type = change[0]
1127 local = self.find(path)
1128 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1130 if event_type == ADD:
1132 # No local file at path, safe to copy over new file
1133 self.copy(initial, path)
1134 elif local is not None and local != initial:
1135 # There is already local file and it is different:
1136 # save change to conflict file.
1137 self.copy(initial, conflictpath)
1138 elif event_type == MOD or event_type == TOK:
1140 if local == initial:
1141 # Local matches the "initial" item so it has not
1142 # changed locally and is safe to update.
1143 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1144 # Replace contents of local file with new contents
1145 local.replace_contents(final)
1147 # Overwrite path with new item; this can happen if
1148 # path was a file and is now a collection or vice versa
1149 self.copy(final, path, overwrite=True)
1151 # Local is missing (presumably deleted) or local doesn't
1152 # match the "start" value, so save change to conflict file
1153 self.copy(final, conflictpath)
1154 elif event_type == DEL:
1155 if local == initial:
1156 # Local item matches "initial" value, so it is safe to remove.
1157 self.remove(path, recursive=True)
1158 # else, the file is modified or already removed, in either
1159 # case we don't want to try to remove it.
1161 def portable_data_hash(self):
1162 """Get the portable data hash for this collection's manifest."""
1163 if self._manifest_locator and self.committed():
1164 # If the collection is already saved on the API server, and it's committed
1165 # then return API server's PDH response.
1166 return self._portable_data_hash
1168 stripped = self.portable_manifest_text().encode()
1169 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1172 def subscribe(self, callback):
1173 if self._callback is None:
1174 self._callback = callback
1176 raise errors.ArgumentError("A callback is already set on this collection.")
1179 def unsubscribe(self):
1180 if self._callback is not None:
1181 self._callback = None
1184 def notify(self, event, collection, name, item):
1186 self._callback(event, collection, name, item)
1187 self.root_collection().notify(event, collection, name, item)
1190 def __eq__(self, other):
1193 if not isinstance(other, RichCollectionBase):
1195 if len(self._items) != len(other):
1197 for k in self._items:
1200 if self._items[k] != other[k]:
1204 def __ne__(self, other):
1205 return not self.__eq__(other)
1209 """Flush bufferblocks to Keep."""
1210 for e in listvalues(self):
1214 class Collection(RichCollectionBase):
1215 """Represents the root of an Arvados Collection.
1217 This class is threadsafe. The root collection object, all subcollections
1218 and files are protected by a single lock (i.e. each access locks the entire
1224 :To read an existing file:
1225 `c.open("myfile", "r")`
1227 :To write a new file:
1228 `c.open("myfile", "w")`
1230 :To determine if a file exists:
1231 `c.find("myfile") is not None`
1234 `c.copy("source", "dest")`
1237 `c.remove("myfile")`
1239 :To save to an existing collection record:
1242 :To save a new collection record:
1245 :To merge remote changes into this object:
1248 Must be associated with an API server Collection record (during
1249 initialization, or using `save_new`) to use `save` or `update`
1253 def __init__(self, manifest_locator_or_text=None,
1260 replication_desired=None,
1262 """Collection constructor.
1264 :manifest_locator_or_text:
1265 An Arvados collection UUID, portable data hash, raw manifest
1266 text, or (if creating an empty collection) None.
1269 the parent Collection, may be None.
1272 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1273 Prefer this over supplying your own api_client and keep_client (except in testing).
1274 Will use default config settings if not specified.
1277 The API client object to use for requests. If not specified, create one using `apiconfig`.
1280 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1283 the number of retries for API and Keep requests.
1286 the block manager to use. If not specified, create one.
1288 :replication_desired:
1289 How many copies should Arvados maintain. If None, API server default
1290 configuration applies. If not None, this value will also be used
1291 for determining the number of block copies being written.
1294 super(Collection, self).__init__(parent)
1295 self._api_client = api_client
1296 self._keep_client = keep_client
1297 self._block_manager = block_manager
1298 self.replication_desired = replication_desired
1299 self.put_threads = put_threads
1302 self._config = apiconfig
1304 self._config = config.settings()
1306 self.num_retries = num_retries if num_retries is not None else 0
1307 self._manifest_locator = None
1308 self._manifest_text = None
1309 self._portable_data_hash = None
1310 self._api_response = None
1311 self._past_versions = set()
1313 self.lock = threading.RLock()
1316 if manifest_locator_or_text:
1317 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1318 self._manifest_locator = manifest_locator_or_text
1319 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1320 self._manifest_locator = manifest_locator_or_text
1321 if not self._has_local_collection_uuid():
1322 self._has_remote_blocks = True
1323 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1324 self._manifest_text = manifest_locator_or_text
1325 if '+R' in self._manifest_text:
1326 self._has_remote_blocks = True
1328 raise errors.ArgumentError(
1329 "Argument to CollectionReader is not a manifest or a collection UUID")
1333 except (IOError, errors.SyntaxError) as e:
1334 raise errors.ArgumentError("Error processing manifest text: %s", e)
1336 def root_collection(self):
1339 def get_properties(self):
1340 if self._api_response and self._api_response["properties"]:
1341 return self._api_response["properties"]
1345 def get_trash_at(self):
1346 if self._api_response and self._api_response["trash_at"]:
1347 return ciso8601.parse_datetime(self._api_response["trash_at"])
1351 def stream_name(self):
1358 def known_past_version(self, modified_at_and_portable_data_hash):
1359 return modified_at_and_portable_data_hash in self._past_versions
1363 def update(self, other=None, num_retries=None):
1364 """Merge the latest collection on the API server with the current collection."""
1367 if self._manifest_locator is None:
1368 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1369 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1370 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1371 response.get("portable_data_hash") != self.portable_data_hash()):
1372 # The record on the server is different from our current one, but we've seen it before,
1373 # so ignore it because it's already been merged.
1374 # However, if it's the same as our current record, proceed with the update, because we want to update
1378 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1379 other = CollectionReader(response["manifest_text"])
1380 baseline = CollectionReader(self._manifest_text)
1381 self.apply(baseline.diff(other))
1382 self._manifest_text = self.manifest_text()
1386 if self._api_client is None:
1387 self._api_client = ThreadSafeApiCache(self._config)
1388 if self._keep_client is None:
1389 self._keep_client = self._api_client.keep
1390 return self._api_client
1394 if self._keep_client is None:
1395 if self._api_client is None:
1398 self._keep_client = KeepClient(api_client=self._api_client)
1399 return self._keep_client
1402 def _my_block_manager(self):
1403 if self._block_manager is None:
1404 copies = (self.replication_desired or
1405 self._my_api()._rootDesc.get('defaultCollectionReplication',
1407 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1408 return self._block_manager
1410 def _remember_api_response(self, response):
1411 self._api_response = response
1412 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1414 def _populate_from_api_server(self):
1415 # As in KeepClient itself, we must wait until the last
1416 # possible moment to instantiate an API client, in order to
1417 # avoid tripping up clients that don't have access to an API
1418 # server. If we do build one, make sure our Keep client uses
1419 # it. If instantiation fails, we'll fall back to the except
1420 # clause, just like any other Collection lookup
1421 # failure. Return an exception, or None if successful.
1422 self._remember_api_response(self._my_api().collections().get(
1423 uuid=self._manifest_locator).execute(
1424 num_retries=self.num_retries))
1425 self._manifest_text = self._api_response['manifest_text']
1426 self._portable_data_hash = self._api_response['portable_data_hash']
1427 # If not overriden via kwargs, we should try to load the
1428 # replication_desired from the API server
1429 if self.replication_desired is None:
1430 self.replication_desired = self._api_response.get('replication_desired', None)
1432 def _populate(self):
1433 if self._manifest_text is None:
1434 if self._manifest_locator is None:
1437 self._populate_from_api_server()
1438 self._baseline_manifest = self._manifest_text
1439 self._import_manifest(self._manifest_text)
1441 def _has_collection_uuid(self):
1442 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1444 def _has_local_collection_uuid(self):
1445 return self._has_collection_uuid and \
1446 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1448 def __enter__(self):
1451 def __exit__(self, exc_type, exc_value, traceback):
1452 """Support scoped auto-commit in a with: block."""
1453 if exc_type is None:
1454 if self.writable() and self._has_collection_uuid():
1458 def stop_threads(self):
1459 if self._block_manager is not None:
1460 self._block_manager.stop_threads()
1463 def manifest_locator(self):
1464 """Get the manifest locator, if any.
1466 The manifest locator will be set when the collection is loaded from an
1467 API server record or the portable data hash of a manifest.
1469 The manifest locator will be None if the collection is newly created or
1470 was created directly from manifest text. The method `save_new()` will
1471 assign a manifest locator.
1474 return self._manifest_locator
1477 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1478 if new_config is None:
1479 new_config = self._config
1481 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1483 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1485 newcollection._clonefrom(self)
1486 return newcollection
1489 def api_response(self):
1490 """Returns information about this Collection fetched from the API server.
1492 If the Collection exists in Keep but not the API server, currently
1493 returns None. Future versions may provide a synthetic response.
1496 return self._api_response
1498 def find_or_create(self, path, create_type):
1499 """See `RichCollectionBase.find_or_create`"""
1503 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1505 def find(self, path):
1506 """See `RichCollectionBase.find`"""
1510 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1512 def remove(self, path, recursive=False):
1513 """See `RichCollectionBase.remove`"""
1515 raise errors.ArgumentError("Cannot remove '.'")
1517 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1524 storage_classes=None,
1528 """Save collection to an existing collection record.
1530 Commit pending buffer blocks to Keep, merge with remote record (if
1531 merge=True, the default), and update the collection record. Returns
1532 the current manifest text.
1534 Will raise AssertionError if not associated with a collection record on
1535 the API server. If you want to save a manifest to Keep only, see
1539 Additional properties of collection. This value will replace any existing
1540 properties of collection.
1543 Specify desirable storage classes to be used when writing data to Keep.
1546 A collection is *expiring* when it has a *trash_at* time in the future.
1547 An expiring collection can be accessed as normal,
1548 but is scheduled to be trashed automatically at the *trash_at* time.
1551 Update and merge remote changes before saving. Otherwise, any
1552 remote changes will be ignored and overwritten.
1555 Retry count on API calls (if None, use the collection default)
1558 if properties and type(properties) is not dict:
1559 raise errors.ArgumentError("properties must be dictionary type.")
1561 if storage_classes and type(storage_classes) is not list:
1562 raise errors.ArgumentError("storage_classes must be list type.")
1564 if trash_at and type(trash_at) is not datetime.datetime:
1565 raise errors.ArgumentError("trash_at must be datetime type.")
1569 body["properties"] = properties
1571 body["storage_classes_desired"] = storage_classes
1573 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1574 body["trash_at"] = t
1576 if not self.committed():
1577 if self._has_remote_blocks:
1578 # Copy any remote blocks to the local cluster.
1579 self._copy_remote_blocks(remote_blocks={})
1580 self._has_remote_blocks = False
1581 if not self._has_collection_uuid():
1582 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1583 elif not self._has_local_collection_uuid():
1584 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1586 self._my_block_manager().commit_all()
1591 text = self.manifest_text(strip=False)
1592 body['manifest_text'] = text
1594 self._remember_api_response(self._my_api().collections().update(
1595 uuid=self._manifest_locator,
1597 ).execute(num_retries=num_retries))
1598 self._manifest_text = self._api_response["manifest_text"]
1599 self._portable_data_hash = self._api_response["portable_data_hash"]
1600 self.set_committed(True)
1602 self._remember_api_response(self._my_api().collections().update(
1603 uuid=self._manifest_locator,
1605 ).execute(num_retries=num_retries))
1607 return self._manifest_text
1613 def save_new(self, name=None,
1614 create_collection_record=True,
1617 storage_classes=None,
1619 ensure_unique_name=False,
1621 """Save collection to a new collection record.
1623 Commit pending buffer blocks to Keep and, when create_collection_record
1624 is True (default), create a new collection record. After creating a
1625 new collection record, this Collection object will be associated with
1626 the new record used by `save()`. Returns the current manifest text.
1629 The collection name.
1631 :create_collection_record:
1632 If True, create a collection record on the API server.
1633 If False, only commit blocks to Keep and return the manifest text.
1636 the user, or project uuid that will own this collection.
1637 If None, defaults to the current user.
1640 Additional properties of collection. This value will replace any existing
1641 properties of collection.
1644 Specify desirable storage classes to be used when writing data to Keep.
1647 A collection is *expiring* when it has a *trash_at* time in the future.
1648 An expiring collection can be accessed as normal,
1649 but is scheduled to be trashed automatically at the *trash_at* time.
1651 :ensure_unique_name:
1652 If True, ask the API server to rename the collection
1653 if it conflicts with a collection with the same name and owner. If
1654 False, a name conflict will result in an error.
1657 Retry count on API calls (if None, use the collection default)
1660 if properties and type(properties) is not dict:
1661 raise errors.ArgumentError("properties must be dictionary type.")
1663 if storage_classes and type(storage_classes) is not list:
1664 raise errors.ArgumentError("storage_classes must be list type.")
1666 if trash_at and type(trash_at) is not datetime.datetime:
1667 raise errors.ArgumentError("trash_at must be datetime type.")
1669 if self._has_remote_blocks:
1670 # Copy any remote blocks to the local cluster.
1671 self._copy_remote_blocks(remote_blocks={})
1672 self._has_remote_blocks = False
1674 self._my_block_manager().commit_all()
1675 text = self.manifest_text(strip=False)
1677 if create_collection_record:
1679 name = "New collection"
1680 ensure_unique_name = True
1682 body = {"manifest_text": text,
1684 "replication_desired": self.replication_desired}
1686 body["owner_uuid"] = owner_uuid
1688 body["properties"] = properties
1690 body["storage_classes_desired"] = storage_classes
1692 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1693 body["trash_at"] = t
1695 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1696 text = self._api_response["manifest_text"]
1698 self._manifest_locator = self._api_response["uuid"]
1699 self._portable_data_hash = self._api_response["portable_data_hash"]
1701 self._manifest_text = text
1702 self.set_committed(True)
1706 _token_re = re.compile(r'(\S+)(\s+|$)')
1707 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1708 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1710 def _unescape_manifest_path(self, path):
1711 return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1714 def _import_manifest(self, manifest_text):
1715 """Import a manifest into a `Collection`.
1718 The manifest text to import from.
1722 raise ArgumentError("Can only import manifest into an empty collection")
1731 for token_and_separator in self._token_re.finditer(manifest_text):
1732 tok = token_and_separator.group(1)
1733 sep = token_and_separator.group(2)
1735 if state == STREAM_NAME:
1736 # starting a new stream
1737 stream_name = self._unescape_manifest_path(tok)
1742 self.find_or_create(stream_name, COLLECTION)
1746 block_locator = self._block_re.match(tok)
1748 blocksize = int(block_locator.group(1))
1749 blocks.append(Range(tok, streamoffset, blocksize, 0))
1750 streamoffset += blocksize
1754 if state == SEGMENTS:
1755 file_segment = self._segment_re.match(tok)
1757 pos = int(file_segment.group(1))
1758 size = int(file_segment.group(2))
1759 name = self._unescape_manifest_path(file_segment.group(3))
1760 if name.split('/')[-1] == '.':
1761 # placeholder for persisting an empty directory, not a real file
1763 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1765 filepath = os.path.join(stream_name, name)
1766 afile = self.find_or_create(filepath, FILE)
1767 if isinstance(afile, ArvadosFile):
1768 afile.add_segment(blocks, pos, size)
1770 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1773 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1779 self.set_committed(True)
1782 def notify(self, event, collection, name, item):
1784 self._callback(event, collection, name, item)
1787 class Subcollection(RichCollectionBase):
1788 """This is a subdirectory within a collection that doesn't have its own API
1791 Subcollection locking falls under the umbrella lock of its root collection.
1795 def __init__(self, parent, name):
1796 super(Subcollection, self).__init__(parent)
1797 self.lock = self.root_collection().lock
1798 self._manifest_text = None
1800 self.num_retries = parent.num_retries
1802 def root_collection(self):
1803 return self.parent.root_collection()
1806 return self.root_collection().writable()
1809 return self.root_collection()._my_api()
1812 return self.root_collection()._my_keep()
1814 def _my_block_manager(self):
1815 return self.root_collection()._my_block_manager()
1817 def stream_name(self):
1818 return os.path.join(self.parent.stream_name(), self.name)
1821 def clone(self, new_parent, new_name):
1822 c = Subcollection(new_parent, new_name)
1828 def _reparent(self, newparent, newname):
1829 self.set_committed(False)
1831 self.parent.remove(self.name, recursive=True)
1832 self.parent = newparent
1834 self.lock = self.parent.root_collection().lock
1837 class CollectionReader(Collection):
1838 """A read-only collection object.
1840 Initialize from a collection UUID or portable data hash, or raw
1841 manifest text. See `Collection` constructor for detailed options.
1844 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1845 self._in_init = True
1846 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1847 self._in_init = False
1849 # Forego any locking since it should never change once initialized.
1850 self.lock = NoopLock()
1852 # Backwards compatability with old CollectionReader
1853 # all_streams() and all_files()
1854 self._streams = None
1857 return self._in_init
1859 def _populate_streams(orig_func):
1860 @functools.wraps(orig_func)
1861 def populate_streams_wrapper(self, *args, **kwargs):
1862 # Defer populating self._streams until needed since it creates a copy of the manifest.
1863 if self._streams is None:
1864 if self._manifest_text:
1865 self._streams = [sline.split()
1866 for sline in self._manifest_text.split("\n")
1870 return orig_func(self, *args, **kwargs)
1871 return populate_streams_wrapper
1874 def normalize(self):
1875 """Normalize the streams returned by `all_streams`.
1877 This method is kept for backwards compatability and only affects the
1878 behavior of `all_streams()` and `all_files()`
1884 for s in self.all_streams():
1885 for f in s.all_files():
1886 streamname, filename = split(s.name() + "/" + f.name())
1887 if streamname not in streams:
1888 streams[streamname] = {}
1889 if filename not in streams[streamname]:
1890 streams[streamname][filename] = []
1891 for r in f.segments:
1892 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1894 self._streams = [normalize_stream(s, streams[s])
1895 for s in sorted(streams)]
1897 def all_streams(self):
1898 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1899 for s in self._streams]
1902 def all_files(self):
1903 for s in self.all_streams():
1904 for f in s.all_files():