1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
23 from collections import deque
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream, escape
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
35 import arvados.events as events
36 from arvados.retry import retry_method
38 _logger = logging.getLogger('arvados.collection')
40 class CollectionBase(object):
41 """Abstract base class for Collection classes."""
46 def __exit__(self, exc_type, exc_value, traceback):
50 if self._keep_client is None:
51 self._keep_client = KeepClient(api_client=self._api_client,
52 num_retries=self.num_retries)
53 return self._keep_client
55 def stripped_manifest(self):
56 """Get the manifest with locator hints stripped.
58 Return the manifest for the current collection with all
59 non-portable hints (i.e., permission signatures and other
60 hints other than size hints) removed from the locators.
62 raw = self.manifest_text()
64 for line in raw.split("\n"):
67 clean_fields = fields[:1] + [
68 (re.sub(r'\+[^\d][^\+]*', '', x)
69 if re.match(arvados.util.keep_locator_pattern, x)
72 clean += [' '.join(clean_fields), "\n"]
76 class _WriterFile(_FileLikeObjectBase):
77 def __init__(self, coll_writer, name):
78 super(_WriterFile, self).__init__(name, 'wb')
79 self.dest = coll_writer
82 super(_WriterFile, self).close()
83 self.dest.finish_current_file()
85 @_FileLikeObjectBase._before_close
86 def write(self, data):
89 @_FileLikeObjectBase._before_close
90 def writelines(self, seq):
94 @_FileLikeObjectBase._before_close
96 self.dest.flush_data()
99 class CollectionWriter(CollectionBase):
100 """Deprecated, use Collection instead."""
102 @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
103 def __init__(self, api_client=None, num_retries=0, replication=None):
104 """Instantiate a CollectionWriter.
106 CollectionWriter lets you build a new Arvados Collection from scratch.
107 Write files to it. The CollectionWriter will upload data to Keep as
108 appropriate, and provide you with the Collection manifest text when
112 * api_client: The API client to use to look up Collections. If not
113 provided, CollectionReader will build one from available Arvados
115 * num_retries: The default number of times to retry failed
116 service requests. Default 0. You may change this value
117 after instantiation, but note those changes may not
118 propagate to related objects like the Keep client.
119 * replication: The number of copies of each block to store.
120 If this argument is None or not supplied, replication is
121 the server-provided default if available, otherwise 2.
123 self._api_client = api_client
124 self.num_retries = num_retries
125 self.replication = (2 if replication is None else replication)
126 self._keep_client = None
127 self._data_buffer = []
128 self._data_buffer_len = 0
129 self._current_stream_files = []
130 self._current_stream_length = 0
131 self._current_stream_locators = []
132 self._current_stream_name = '.'
133 self._current_file_name = None
134 self._current_file_pos = 0
135 self._finished_streams = []
136 self._close_file = None
137 self._queued_file = None
138 self._queued_dirents = deque()
139 self._queued_trees = deque()
140 self._last_open = None
142 def __exit__(self, exc_type, exc_value, traceback):
146 def do_queued_work(self):
147 # The work queue consists of three pieces:
148 # * _queued_file: The file object we're currently writing to the
150 # * _queued_dirents: Entries under the current directory
151 # (_queued_trees[0]) that we want to write or recurse through.
152 # This may contain files from subdirectories if
153 # max_manifest_depth == 0 for this directory.
154 # * _queued_trees: Directories that should be written as separate
155 # streams to the Collection.
156 # This function handles the smallest piece of work currently queued
157 # (current file, then current directory, then next directory) until
158 # no work remains. The _work_THING methods each do a unit of work on
159 # THING. _queue_THING methods add a THING to the work queue.
161 if self._queued_file:
163 elif self._queued_dirents:
165 elif self._queued_trees:
170 def _work_file(self):
172 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
176 self.finish_current_file()
178 self._queued_file.close()
179 self._close_file = None
180 self._queued_file = None
182 def _work_dirents(self):
183 path, stream_name, max_manifest_depth = self._queued_trees[0]
184 if stream_name != self.current_stream_name():
185 self.start_new_stream(stream_name)
186 while self._queued_dirents:
187 dirent = self._queued_dirents.popleft()
188 target = os.path.join(path, dirent)
189 if os.path.isdir(target):
190 self._queue_tree(target,
191 os.path.join(stream_name, dirent),
192 max_manifest_depth - 1)
194 self._queue_file(target, dirent)
196 if not self._queued_dirents:
197 self._queued_trees.popleft()
199 def _work_trees(self):
200 path, stream_name, max_manifest_depth = self._queued_trees[0]
201 d = arvados.util.listdir_recursive(
202 path, max_depth = (None if max_manifest_depth == 0 else 0))
204 self._queue_dirents(stream_name, d)
206 self._queued_trees.popleft()
208 def _queue_file(self, source, filename=None):
209 assert (self._queued_file is None), "tried to queue more than one file"
210 if not hasattr(source, 'read'):
211 source = open(source, 'rb')
212 self._close_file = True
214 self._close_file = False
216 filename = os.path.basename(source.name)
217 self.start_new_file(filename)
218 self._queued_file = source
220 def _queue_dirents(self, stream_name, dirents):
221 assert (not self._queued_dirents), "tried to queue more than one tree"
222 self._queued_dirents = deque(sorted(dirents))
224 def _queue_tree(self, path, stream_name, max_manifest_depth):
225 self._queued_trees.append((path, stream_name, max_manifest_depth))
227 def write_file(self, source, filename=None):
228 self._queue_file(source, filename)
229 self.do_queued_work()
231 def write_directory_tree(self,
232 path, stream_name='.', max_manifest_depth=-1):
233 self._queue_tree(path, stream_name, max_manifest_depth)
234 self.do_queued_work()
236 def write(self, newdata):
237 if isinstance(newdata, bytes):
239 elif isinstance(newdata, str):
240 newdata = newdata.encode()
241 elif hasattr(newdata, '__iter__'):
245 self._data_buffer.append(newdata)
246 self._data_buffer_len += len(newdata)
247 self._current_stream_length += len(newdata)
248 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
251 def open(self, streampath, filename=None):
252 """open(streampath[, filename]) -> file-like object
254 Pass in the path of a file to write to the Collection, either as a
255 single string or as two separate stream name and file name arguments.
256 This method returns a file-like object you can write to add it to the
259 You may only have one file object from the Collection open at a time,
260 so be sure to close the object when you're done. Using the object in
261 a with statement makes that easy::
263 with cwriter.open('./doc/page1.txt') as outfile:
264 outfile.write(page1_data)
265 with cwriter.open('./doc/page2.txt') as outfile:
266 outfile.write(page2_data)
269 streampath, filename = split(streampath)
270 if self._last_open and not self._last_open.closed:
271 raise errors.AssertionError(
272 u"can't open '{}' when '{}' is still open".format(
273 filename, self._last_open.name))
274 if streampath != self.current_stream_name():
275 self.start_new_stream(streampath)
276 self.set_current_file_name(filename)
277 self._last_open = _WriterFile(self, filename)
278 return self._last_open
280 def flush_data(self):
281 data_buffer = b''.join(self._data_buffer)
283 self._current_stream_locators.append(
285 data_buffer[0:config.KEEP_BLOCK_SIZE],
286 copies=self.replication))
287 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
288 self._data_buffer_len = len(self._data_buffer[0])
290 def start_new_file(self, newfilename=None):
291 self.finish_current_file()
292 self.set_current_file_name(newfilename)
294 def set_current_file_name(self, newfilename):
295 if re.search(r'[\t\n]', newfilename):
296 raise errors.AssertionError(
297 "Manifest filenames cannot contain whitespace: %s" %
299 elif re.search(r'\x00', newfilename):
300 raise errors.AssertionError(
301 "Manifest filenames cannot contain NUL characters: %s" %
303 self._current_file_name = newfilename
305 def current_file_name(self):
306 return self._current_file_name
308 def finish_current_file(self):
309 if self._current_file_name is None:
310 if self._current_file_pos == self._current_stream_length:
312 raise errors.AssertionError(
313 "Cannot finish an unnamed file " +
314 "(%d bytes at offset %d in '%s' stream)" %
315 (self._current_stream_length - self._current_file_pos,
316 self._current_file_pos,
317 self._current_stream_name))
318 self._current_stream_files.append([
319 self._current_file_pos,
320 self._current_stream_length - self._current_file_pos,
321 self._current_file_name])
322 self._current_file_pos = self._current_stream_length
323 self._current_file_name = None
325 def start_new_stream(self, newstreamname='.'):
326 self.finish_current_stream()
327 self.set_current_stream_name(newstreamname)
329 def set_current_stream_name(self, newstreamname):
330 if re.search(r'[\t\n]', newstreamname):
331 raise errors.AssertionError(
332 "Manifest stream names cannot contain whitespace: '%s'" %
334 self._current_stream_name = '.' if newstreamname=='' else newstreamname
336 def current_stream_name(self):
337 return self._current_stream_name
339 def finish_current_stream(self):
340 self.finish_current_file()
342 if not self._current_stream_files:
344 elif self._current_stream_name is None:
345 raise errors.AssertionError(
346 "Cannot finish an unnamed stream (%d bytes in %d files)" %
347 (self._current_stream_length, len(self._current_stream_files)))
349 if not self._current_stream_locators:
350 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
351 self._finished_streams.append([self._current_stream_name,
352 self._current_stream_locators,
353 self._current_stream_files])
354 self._current_stream_files = []
355 self._current_stream_length = 0
356 self._current_stream_locators = []
357 self._current_stream_name = None
358 self._current_file_pos = 0
359 self._current_file_name = None
362 """Store the manifest in Keep and return its locator.
364 This is useful for storing manifest fragments (task outputs)
365 temporarily in Keep during a Crunch job.
367 In other cases you should make a collection instead, by
368 sending manifest_text() to the API server's "create
369 collection" endpoint.
371 return self._my_keep().put(self.manifest_text().encode(),
372 copies=self.replication)
374 def portable_data_hash(self):
375 stripped = self.stripped_manifest().encode()
376 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
378 def manifest_text(self):
379 self.finish_current_stream()
382 for stream in self._finished_streams:
383 if not re.search(r'^\.(/.*)?$', stream[0]):
385 manifest += stream[0].replace(' ', '\\040')
386 manifest += ' ' + ' '.join(stream[1])
387 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
392 def data_locators(self):
394 for name, locators, files in self._finished_streams:
398 def save_new(self, name=None):
399 return self._api_client.collections().create(
400 ensure_unique_name=True,
403 'manifest_text': self.manifest_text(),
404 }).execute(num_retries=self.num_retries)
407 class ResumableCollectionWriter(CollectionWriter):
408 """Deprecated, use Collection instead."""
410 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
411 '_current_stream_locators', '_current_stream_name',
412 '_current_file_name', '_current_file_pos', '_close_file',
413 '_data_buffer', '_dependencies', '_finished_streams',
414 '_queued_dirents', '_queued_trees']
416 @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
417 def __init__(self, api_client=None, **kwargs):
418 self._dependencies = {}
419 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
422 def from_state(cls, state, *init_args, **init_kwargs):
423 # Try to build a new writer from scratch with the given state.
424 # If the state is not suitable to resume (because files have changed,
425 # been deleted, aren't predictable, etc.), raise a
426 # StaleWriterStateError. Otherwise, return the initialized writer.
427 # The caller is responsible for calling writer.do_queued_work()
428 # appropriately after it's returned.
429 writer = cls(*init_args, **init_kwargs)
430 for attr_name in cls.STATE_PROPS:
431 attr_value = state[attr_name]
432 attr_class = getattr(writer, attr_name).__class__
433 # Coerce the value into the same type as the initial value, if
435 if attr_class not in (type(None), attr_value.__class__):
436 attr_value = attr_class(attr_value)
437 setattr(writer, attr_name, attr_value)
438 # Check dependencies before we try to resume anything.
439 if any(KeepLocator(ls).permission_expired()
440 for ls in writer._current_stream_locators):
441 raise errors.StaleWriterStateError(
442 "locators include expired permission hint")
443 writer.check_dependencies()
444 if state['_current_file'] is not None:
445 path, pos = state['_current_file']
447 writer._queued_file = open(path, 'rb')
448 writer._queued_file.seek(pos)
449 except IOError as error:
450 raise errors.StaleWriterStateError(
451 u"failed to reopen active file {}: {}".format(path, error))
454 def check_dependencies(self):
455 for path, orig_stat in listitems(self._dependencies):
456 if not S_ISREG(orig_stat[ST_MODE]):
457 raise errors.StaleWriterStateError(u"{} not file".format(path))
459 now_stat = tuple(os.stat(path))
460 except OSError as error:
461 raise errors.StaleWriterStateError(
462 u"failed to stat {}: {}".format(path, error))
463 if ((not S_ISREG(now_stat[ST_MODE])) or
464 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
465 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
466 raise errors.StaleWriterStateError(u"{} changed".format(path))
468 def dump_state(self, copy_func=lambda x: x):
469 state = {attr: copy_func(getattr(self, attr))
470 for attr in self.STATE_PROPS}
471 if self._queued_file is None:
472 state['_current_file'] = None
474 state['_current_file'] = (os.path.realpath(self._queued_file.name),
475 self._queued_file.tell())
478 def _queue_file(self, source, filename=None):
480 src_path = os.path.realpath(source)
482 raise errors.AssertionError(u"{} not a file path".format(source))
484 path_stat = os.stat(src_path)
485 except OSError as stat_error:
487 super(ResumableCollectionWriter, self)._queue_file(source, filename)
488 fd_stat = os.fstat(self._queued_file.fileno())
489 if not S_ISREG(fd_stat.st_mode):
490 # We won't be able to resume from this cache anyway, so don't
491 # worry about further checks.
492 self._dependencies[source] = tuple(fd_stat)
493 elif path_stat is None:
494 raise errors.AssertionError(
495 u"could not stat {}: {}".format(source, stat_error))
496 elif path_stat.st_ino != fd_stat.st_ino:
497 raise errors.AssertionError(
498 u"{} changed between open and stat calls".format(source))
500 self._dependencies[src_path] = tuple(fd_stat)
502 def write(self, data):
503 if self._queued_file is None:
504 raise errors.AssertionError(
505 "resumable writer can't accept unsourced data")
506 return super(ResumableCollectionWriter, self).write(data)
514 COLLECTION = "collection"
516 class RichCollectionBase(CollectionBase):
517 """Base class for Collections and Subcollections.
519 Implements the majority of functionality relating to accessing items in the
524 def __init__(self, parent=None):
526 self._committed = False
527 self._has_remote_blocks = False
528 self._callback = None
532 raise NotImplementedError()
535 raise NotImplementedError()
537 def _my_block_manager(self):
538 raise NotImplementedError()
541 raise NotImplementedError()
543 def root_collection(self):
544 raise NotImplementedError()
546 def notify(self, event, collection, name, item):
547 raise NotImplementedError()
549 def stream_name(self):
550 raise NotImplementedError()
554 def has_remote_blocks(self):
555 """Recursively check for a +R segment locator signature."""
557 if self._has_remote_blocks:
560 if self[item].has_remote_blocks():
565 def set_has_remote_blocks(self, val):
566 self._has_remote_blocks = val
568 self.parent.set_has_remote_blocks(val)
572 def find_or_create(self, path, create_type):
573 """Recursively search the specified file path.
575 May return either a `Collection` or `ArvadosFile`. If not found, will
576 create a new item at the specified path based on `create_type`. Will
577 create intermediate subcollections needed to contain the final item in
581 One of `arvados.collection.FILE` or
582 `arvados.collection.COLLECTION`. If the path is not found, and value
583 of create_type is FILE then create and return a new ArvadosFile for
584 the last path component. If COLLECTION, then create and return a new
585 Collection for the last path component.
589 pathcomponents = path.split("/", 1)
590 if pathcomponents[0]:
591 item = self._items.get(pathcomponents[0])
592 if len(pathcomponents) == 1:
595 if create_type == COLLECTION:
596 item = Subcollection(self, pathcomponents[0])
598 item = ArvadosFile(self, pathcomponents[0])
599 self._items[pathcomponents[0]] = item
600 self.set_committed(False)
601 self.notify(ADD, self, pathcomponents[0], item)
605 # create new collection
606 item = Subcollection(self, pathcomponents[0])
607 self._items[pathcomponents[0]] = item
608 self.set_committed(False)
609 self.notify(ADD, self, pathcomponents[0], item)
610 if isinstance(item, RichCollectionBase):
611 return item.find_or_create(pathcomponents[1], create_type)
613 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
618 def find(self, path):
619 """Recursively search the specified file path.
621 May return either a Collection or ArvadosFile. Return None if not
623 If path is invalid (ex: starts with '/'), an IOError exception will be
628 raise errors.ArgumentError("Parameter 'path' is empty.")
630 pathcomponents = path.split("/", 1)
631 if pathcomponents[0] == '':
632 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
634 item = self._items.get(pathcomponents[0])
637 elif len(pathcomponents) == 1:
640 if isinstance(item, RichCollectionBase):
641 if pathcomponents[1]:
642 return item.find(pathcomponents[1])
646 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
649 def mkdirs(self, path):
650 """Recursive subcollection create.
652 Like `os.makedirs()`. Will create intermediate subcollections needed
653 to contain the leaf subcollection path.
657 if self.find(path) != None:
658 raise IOError(errno.EEXIST, "Directory or file exists", path)
660 return self.find_or_create(path, COLLECTION)
662 def open(self, path, mode="r", encoding=None):
663 """Open a file-like object for access.
666 path to a file in the collection
668 a string consisting of "r", "w", or "a", optionally followed
669 by "b" or "t", optionally followed by "+".
671 binary mode: write() accepts bytes, read() returns bytes.
673 text mode (default): write() accepts strings, read() returns strings.
677 opens for reading and writing. Reads/writes share a file pointer.
679 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
681 opens for reading and writing. All writes are appended to
682 the end of the file. Writing does not affect the file pointer for
687 if not re.search(r'^[rwa][bt]?\+?$', mode):
688 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
690 if mode[0] == 'r' and '+' not in mode:
691 fclass = ArvadosFileReader
692 arvfile = self.find(path)
693 elif not self.writable():
694 raise IOError(errno.EROFS, "Collection is read only")
696 fclass = ArvadosFileWriter
697 arvfile = self.find_or_create(path, FILE)
700 raise IOError(errno.ENOENT, "File not found", path)
701 if not isinstance(arvfile, ArvadosFile):
702 raise IOError(errno.EISDIR, "Is a directory", path)
707 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
708 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
710 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
711 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
715 """Determine if the collection has been modified since last commited."""
716 return not self.committed()
720 """Determine if the collection has been committed to the API server."""
721 return self._committed
724 def set_committed(self, value=True):
725 """Recursively set committed flag.
727 If value is True, set committed to be True for this and all children.
729 If value is False, set committed to be False for this and all parents.
731 if value == self._committed:
734 for k,v in listitems(self._items):
735 v.set_committed(True)
736 self._committed = True
738 self._committed = False
739 if self.parent is not None:
740 self.parent.set_committed(False)
744 """Iterate over names of files and collections contained in this collection."""
745 return iter(viewkeys(self._items))
748 def __getitem__(self, k):
749 """Get a file or collection that is directly contained by this collection.
751 If you want to search a path, use `find()` instead.
754 return self._items[k]
757 def __contains__(self, k):
758 """Test if there is a file or collection a directly contained by this collection."""
759 return k in self._items
763 """Get the number of items directly contained in this collection."""
764 return len(self._items)
768 def __delitem__(self, p):
769 """Delete an item by name which is directly contained by this collection."""
771 self.set_committed(False)
772 self.notify(DEL, self, p, None)
776 """Get a list of names of files and collections directly contained in this collection."""
777 return self._items.keys()
781 """Get a list of files and collection objects directly contained in this collection."""
782 return listvalues(self._items)
786 """Get a list of (name, object) tuples directly contained in this collection."""
787 return listitems(self._items)
789 def exists(self, path):
790 """Test if there is a file or collection at `path`."""
791 return self.find(path) is not None
795 def remove(self, path, recursive=False):
796 """Remove the file or subcollection (directory) at `path`.
799 Specify whether to remove non-empty subcollections (True), or raise an error (False).
803 raise errors.ArgumentError("Parameter 'path' is empty.")
805 pathcomponents = path.split("/", 1)
806 item = self._items.get(pathcomponents[0])
808 raise IOError(errno.ENOENT, "File not found", path)
809 if len(pathcomponents) == 1:
810 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
811 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
812 deleteditem = self._items[pathcomponents[0]]
813 del self._items[pathcomponents[0]]
814 self.set_committed(False)
815 self.notify(DEL, self, pathcomponents[0], deleteditem)
817 item.remove(pathcomponents[1], recursive=recursive)
819 def _clonefrom(self, source):
820 for k,v in listitems(source):
821 self._items[k] = v.clone(self, k)
824 raise NotImplementedError()
828 def add(self, source_obj, target_name, overwrite=False, reparent=False):
829 """Copy or move a file or subcollection to this collection.
832 An ArvadosFile, or Subcollection object
835 Destination item name. If the target name already exists and is a
836 file, this will raise an error unless you specify `overwrite=True`.
839 Whether to overwrite target file if it already exists.
842 If True, source_obj will be moved from its parent collection to this collection.
843 If False, source_obj will be copied and the parent collection will be
848 if target_name in self and not overwrite:
849 raise IOError(errno.EEXIST, "File already exists", target_name)
852 if target_name in self:
853 modified_from = self[target_name]
855 # Actually make the move or copy.
857 source_obj._reparent(self, target_name)
860 item = source_obj.clone(self, target_name)
862 self._items[target_name] = item
863 self.set_committed(False)
864 if not self._has_remote_blocks and source_obj.has_remote_blocks():
865 self.set_has_remote_blocks(True)
868 self.notify(MOD, self, target_name, (modified_from, item))
870 self.notify(ADD, self, target_name, item)
872 def _get_src_target(self, source, target_path, source_collection, create_dest):
873 if source_collection is None:
874 source_collection = self
877 if isinstance(source, basestring):
878 source_obj = source_collection.find(source)
879 if source_obj is None:
880 raise IOError(errno.ENOENT, "File not found", source)
881 sourcecomponents = source.split("/")
884 sourcecomponents = None
886 # Find parent collection the target path
887 targetcomponents = target_path.split("/")
889 # Determine the name to use.
890 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
893 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
896 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
898 if len(targetcomponents) > 1:
899 target_dir = self.find("/".join(targetcomponents[0:-1]))
903 if target_dir is None:
904 raise IOError(errno.ENOENT, "Target directory not found", target_name)
906 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
907 target_dir = target_dir[target_name]
908 target_name = sourcecomponents[-1]
910 return (source_obj, target_dir, target_name)
914 def copy(self, source, target_path, source_collection=None, overwrite=False):
915 """Copy a file or subcollection to a new path in this collection.
918 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
921 Destination file or path. If the target path already exists and is a
922 subcollection, the item will be placed inside the subcollection. If
923 the target path already exists and is a file, this will raise an error
924 unless you specify `overwrite=True`.
927 Collection to copy `source_path` from (default `self`)
930 Whether to overwrite target file if it already exists.
933 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
934 target_dir.add(source_obj, target_name, overwrite, False)
938 def rename(self, source, target_path, source_collection=None, overwrite=False):
939 """Move a file or subcollection from `source_collection` to a new path in this collection.
942 A string with a path to source file or subcollection.
945 Destination file or path. If the target path already exists and is a
946 subcollection, the item will be placed inside the subcollection. If
947 the target path already exists and is a file, this will raise an error
948 unless you specify `overwrite=True`.
951 Collection to copy `source_path` from (default `self`)
954 Whether to overwrite target file if it already exists.
957 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
958 if not source_obj.writable():
959 raise IOError(errno.EROFS, "Source collection is read only", source)
960 target_dir.add(source_obj, target_name, overwrite, True)
962 def portable_manifest_text(self, stream_name="."):
963 """Get the manifest text for this collection, sub collections and files.
965 This method does not flush outstanding blocks to Keep. It will return
966 a normalized manifest with access tokens stripped.
969 Name to use for this stream (directory)
972 return self._get_manifest_text(stream_name, True, True)
975 def manifest_text(self, stream_name=".", strip=False, normalize=False,
976 only_committed=False):
977 """Get the manifest text for this collection, sub collections and files.
979 This method will flush outstanding blocks to Keep. By default, it will
980 not normalize an unmodified manifest or strip access tokens.
983 Name to use for this stream (directory)
986 If True, remove signing tokens from block locators if present.
987 If False (default), block locators are left unchanged.
990 If True, always export the manifest text in normalized form
991 even if the Collection is not modified. If False (default) and the collection
992 is not modified, return the original manifest text even if it is not
996 If True, don't commit pending blocks.
1000 if not only_committed:
1001 self._my_block_manager().commit_all()
1002 return self._get_manifest_text(stream_name, strip, normalize,
1003 only_committed=only_committed)
1006 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1007 """Get the manifest text for this collection, sub collections and files.
1010 Name to use for this stream (directory)
1013 If True, remove signing tokens from block locators if present.
1014 If False (default), block locators are left unchanged.
1017 If True, always export the manifest text in normalized form
1018 even if the Collection is not modified. If False (default) and the collection
1019 is not modified, return the original manifest text even if it is not
1023 If True, only include blocks that were already committed to Keep.
1027 if not self.committed() or self._manifest_text is None or normalize:
1030 sorted_keys = sorted(self.keys())
1031 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1032 # Create a stream per file `k`
1033 arvfile = self[filename]
1035 for segment in arvfile.segments():
1036 loc = segment.locator
1037 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1040 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1042 loc = KeepLocator(loc).stripped()
1043 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1044 segment.segment_offset, segment.range_size))
1045 stream[filename] = filestream
1047 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1048 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1049 buf.append(self[dirname].manifest_text(
1050 stream_name=os.path.join(stream_name, dirname),
1051 strip=strip, normalize=True, only_committed=only_committed))
1055 return self.stripped_manifest()
1057 return self._manifest_text
1060 def _copy_remote_blocks(self, remote_blocks={}):
1061 """Scan through the entire collection and ask Keep to copy remote blocks.
1063 When accessing a remote collection, blocks will have a remote signature
1064 (+R instead of +A). Collect these signatures and request Keep to copy the
1065 blocks to the local cluster, returning local (+A) signatures.
1068 Shared cache of remote to local block mappings. This is used to avoid
1069 doing extra work when blocks are shared by more than one file in
1070 different subdirectories.
1074 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1075 return remote_blocks
1078 def diff(self, end_collection, prefix=".", holding_collection=None):
1079 """Generate list of add/modify/delete actions.
1081 When given to `apply`, will change `self` to match `end_collection`
1085 if holding_collection is None:
1086 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1088 if k not in end_collection:
1089 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1090 for k in end_collection:
1092 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1093 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1094 elif end_collection[k] != self[k]:
1095 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1097 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1099 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1104 def apply(self, changes):
1105 """Apply changes from `diff`.
1107 If a change conflicts with a local change, it will be saved to an
1108 alternate path indicating the conflict.
1112 self.set_committed(False)
1113 for change in changes:
1114 event_type = change[0]
1117 local = self.find(path)
1118 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1120 if event_type == ADD:
1122 # No local file at path, safe to copy over new file
1123 self.copy(initial, path)
1124 elif local is not None and local != initial:
1125 # There is already local file and it is different:
1126 # save change to conflict file.
1127 self.copy(initial, conflictpath)
1128 elif event_type == MOD or event_type == TOK:
1130 if local == initial:
1131 # Local matches the "initial" item so it has not
1132 # changed locally and is safe to update.
1133 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1134 # Replace contents of local file with new contents
1135 local.replace_contents(final)
1137 # Overwrite path with new item; this can happen if
1138 # path was a file and is now a collection or vice versa
1139 self.copy(final, path, overwrite=True)
1141 # Local is missing (presumably deleted) or local doesn't
1142 # match the "start" value, so save change to conflict file
1143 self.copy(final, conflictpath)
1144 elif event_type == DEL:
1145 if local == initial:
1146 # Local item matches "initial" value, so it is safe to remove.
1147 self.remove(path, recursive=True)
1148 # else, the file is modified or already removed, in either
1149 # case we don't want to try to remove it.
1151 def portable_data_hash(self):
1152 """Get the portable data hash for this collection's manifest."""
1153 if self._manifest_locator and self.committed():
1154 # If the collection is already saved on the API server, and it's committed
1155 # then return API server's PDH response.
1156 return self._portable_data_hash
1158 stripped = self.portable_manifest_text().encode()
1159 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1162 def subscribe(self, callback):
1163 if self._callback is None:
1164 self._callback = callback
1166 raise errors.ArgumentError("A callback is already set on this collection.")
1169 def unsubscribe(self):
1170 if self._callback is not None:
1171 self._callback = None
1174 def notify(self, event, collection, name, item):
1176 self._callback(event, collection, name, item)
1177 self.root_collection().notify(event, collection, name, item)
1180 def __eq__(self, other):
1183 if not isinstance(other, RichCollectionBase):
1185 if len(self._items) != len(other):
1187 for k in self._items:
1190 if self._items[k] != other[k]:
1194 def __ne__(self, other):
1195 return not self.__eq__(other)
1199 """Flush bufferblocks to Keep."""
1200 for e in listvalues(self):
1204 class Collection(RichCollectionBase):
1205 """Represents the root of an Arvados Collection.
1207 This class is threadsafe. The root collection object, all subcollections
1208 and files are protected by a single lock (i.e. each access locks the entire
1214 :To read an existing file:
1215 `c.open("myfile", "r")`
1217 :To write a new file:
1218 `c.open("myfile", "w")`
1220 :To determine if a file exists:
1221 `c.find("myfile") is not None`
1224 `c.copy("source", "dest")`
1227 `c.remove("myfile")`
1229 :To save to an existing collection record:
1232 :To save a new collection record:
1235 :To merge remote changes into this object:
1238 Must be associated with an API server Collection record (during
1239 initialization, or using `save_new`) to use `save` or `update`
1243 def __init__(self, manifest_locator_or_text=None,
1250 replication_desired=None,
1251 storage_classes_desired=None,
1253 """Collection constructor.
1255 :manifest_locator_or_text:
1256 An Arvados collection UUID, portable data hash, raw manifest
1257 text, or (if creating an empty collection) None.
1260 the parent Collection, may be None.
1263 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1264 Prefer this over supplying your own api_client and keep_client (except in testing).
1265 Will use default config settings if not specified.
1268 The API client object to use for requests. If not specified, create one using `apiconfig`.
1271 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1274 the number of retries for API and Keep requests.
1277 the block manager to use. If not specified, create one.
1279 :replication_desired:
1280 How many copies should Arvados maintain. If None, API server default
1281 configuration applies. If not None, this value will also be used
1282 for determining the number of block copies being written.
1284 :storage_classes_desired:
1285 A list of storage class names where to upload the data. If None,
1286 the keep client is expected to store the data into the cluster's
1287 default storage class(es).
1291 if storage_classes_desired and type(storage_classes_desired) is not list:
1292 raise errors.ArgumentError("storage_classes_desired must be list type.")
1294 super(Collection, self).__init__(parent)
1295 self._api_client = api_client
1296 self._keep_client = keep_client
1298 # Use the keep client from ThreadSafeApiCache
1299 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1300 self._keep_client = self._api_client.keep
1302 self._block_manager = block_manager
1303 self.replication_desired = replication_desired
1304 self._storage_classes_desired = storage_classes_desired
1305 self.put_threads = put_threads
1308 self._config = apiconfig
1310 self._config = config.settings()
1312 self.num_retries = num_retries
1313 self._manifest_locator = None
1314 self._manifest_text = None
1315 self._portable_data_hash = None
1316 self._api_response = None
1317 self._past_versions = set()
1319 self.lock = threading.RLock()
1322 if manifest_locator_or_text:
1323 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1324 self._manifest_locator = manifest_locator_or_text
1325 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1326 self._manifest_locator = manifest_locator_or_text
1327 if not self._has_local_collection_uuid():
1328 self._has_remote_blocks = True
1329 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1330 self._manifest_text = manifest_locator_or_text
1331 if '+R' in self._manifest_text:
1332 self._has_remote_blocks = True
1334 raise errors.ArgumentError(
1335 "Argument to CollectionReader is not a manifest or a collection UUID")
1339 except errors.SyntaxError as e:
1340 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1342 def storage_classes_desired(self):
1343 return self._storage_classes_desired or []
1345 def root_collection(self):
1348 def get_properties(self):
1349 if self._api_response and self._api_response["properties"]:
1350 return self._api_response["properties"]
1354 def get_trash_at(self):
1355 if self._api_response and self._api_response["trash_at"]:
1357 return ciso8601.parse_datetime(self._api_response["trash_at"])
1363 def stream_name(self):
1370 def known_past_version(self, modified_at_and_portable_data_hash):
1371 return modified_at_and_portable_data_hash in self._past_versions
1375 def update(self, other=None, num_retries=None):
1376 """Merge the latest collection on the API server with the current collection."""
1379 if self._manifest_locator is None:
1380 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1381 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1382 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1383 response.get("portable_data_hash") != self.portable_data_hash()):
1384 # The record on the server is different from our current one, but we've seen it before,
1385 # so ignore it because it's already been merged.
1386 # However, if it's the same as our current record, proceed with the update, because we want to update
1390 self._remember_api_response(response)
1391 other = CollectionReader(response["manifest_text"])
1392 baseline = CollectionReader(self._manifest_text)
1393 self.apply(baseline.diff(other))
1394 self._manifest_text = self.manifest_text()
1398 if self._api_client is None:
1399 self._api_client = ThreadSafeApiCache(self._config, version='v1')
1400 if self._keep_client is None:
1401 self._keep_client = self._api_client.keep
1402 return self._api_client
1406 if self._keep_client is None:
1407 if self._api_client is None:
1410 self._keep_client = KeepClient(api_client=self._api_client)
1411 return self._keep_client
1414 def _my_block_manager(self):
1415 if self._block_manager is None:
1416 copies = (self.replication_desired or
1417 self._my_api()._rootDesc.get('defaultCollectionReplication',
1419 self._block_manager = _BlockManager(self._my_keep(),
1421 put_threads=self.put_threads,
1422 num_retries=self.num_retries,
1423 storage_classes_func=self.storage_classes_desired)
1424 return self._block_manager
1426 def _remember_api_response(self, response):
1427 self._api_response = response
1428 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1430 def _populate_from_api_server(self):
1431 # As in KeepClient itself, we must wait until the last
1432 # possible moment to instantiate an API client, in order to
1433 # avoid tripping up clients that don't have access to an API
1434 # server. If we do build one, make sure our Keep client uses
1435 # it. If instantiation fails, we'll fall back to the except
1436 # clause, just like any other Collection lookup
1437 # failure. Return an exception, or None if successful.
1438 self._remember_api_response(self._my_api().collections().get(
1439 uuid=self._manifest_locator).execute(
1440 num_retries=self.num_retries))
1441 self._manifest_text = self._api_response['manifest_text']
1442 self._portable_data_hash = self._api_response['portable_data_hash']
1443 # If not overriden via kwargs, we should try to load the
1444 # replication_desired and storage_classes_desired from the API server
1445 if self.replication_desired is None:
1446 self.replication_desired = self._api_response.get('replication_desired', None)
1447 if self._storage_classes_desired is None:
1448 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1450 def _populate(self):
1451 if self._manifest_text is None:
1452 if self._manifest_locator is None:
1455 self._populate_from_api_server()
1456 self._baseline_manifest = self._manifest_text
1457 self._import_manifest(self._manifest_text)
1459 def _has_collection_uuid(self):
1460 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1462 def _has_local_collection_uuid(self):
1463 return self._has_collection_uuid and \
1464 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1466 def __enter__(self):
1469 def __exit__(self, exc_type, exc_value, traceback):
1470 """Support scoped auto-commit in a with: block."""
1471 if exc_type is None:
1472 if self.writable() and self._has_collection_uuid():
1476 def stop_threads(self):
1477 if self._block_manager is not None:
1478 self._block_manager.stop_threads()
1481 def manifest_locator(self):
1482 """Get the manifest locator, if any.
1484 The manifest locator will be set when the collection is loaded from an
1485 API server record or the portable data hash of a manifest.
1487 The manifest locator will be None if the collection is newly created or
1488 was created directly from manifest text. The method `save_new()` will
1489 assign a manifest locator.
1492 return self._manifest_locator
1495 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1496 if new_config is None:
1497 new_config = self._config
1499 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1501 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1503 newcollection._clonefrom(self)
1504 return newcollection
1507 def api_response(self):
1508 """Returns information about this Collection fetched from the API server.
1510 If the Collection exists in Keep but not the API server, currently
1511 returns None. Future versions may provide a synthetic response.
1514 return self._api_response
1516 def find_or_create(self, path, create_type):
1517 """See `RichCollectionBase.find_or_create`"""
1521 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1523 def find(self, path):
1524 """See `RichCollectionBase.find`"""
1528 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1530 def remove(self, path, recursive=False):
1531 """See `RichCollectionBase.remove`"""
1533 raise errors.ArgumentError("Cannot remove '.'")
1535 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1542 storage_classes=None,
1546 preserve_version=False):
1547 """Save collection to an existing collection record.
1549 Commit pending buffer blocks to Keep, merge with remote record (if
1550 merge=True, the default), and update the collection record. Returns
1551 the current manifest text.
1553 Will raise AssertionError if not associated with a collection record on
1554 the API server. If you want to save a manifest to Keep only, see
1558 Additional properties of collection. This value will replace any existing
1559 properties of collection.
1562 Specify desirable storage classes to be used when writing data to Keep.
1565 A collection is *expiring* when it has a *trash_at* time in the future.
1566 An expiring collection can be accessed as normal,
1567 but is scheduled to be trashed automatically at the *trash_at* time.
1570 Update and merge remote changes before saving. Otherwise, any
1571 remote changes will be ignored and overwritten.
1574 Retry count on API calls (if None, use the collection default)
1577 If True, indicate that the collection content being saved right now
1578 should be preserved in a version snapshot if the collection record is
1579 updated in the future. Requires that the API server has
1580 Collections.CollectionVersioning enabled, if not, setting this will
1584 if properties and type(properties) is not dict:
1585 raise errors.ArgumentError("properties must be dictionary type.")
1587 if storage_classes and type(storage_classes) is not list:
1588 raise errors.ArgumentError("storage_classes must be list type.")
1590 self._storage_classes_desired = storage_classes
1592 if trash_at and type(trash_at) is not datetime.datetime:
1593 raise errors.ArgumentError("trash_at must be datetime type.")
1595 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1596 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1600 body["properties"] = properties
1601 if self.storage_classes_desired():
1602 body["storage_classes_desired"] = self.storage_classes_desired()
1604 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1605 body["trash_at"] = t
1606 if preserve_version:
1607 body["preserve_version"] = preserve_version
1609 if not self.committed():
1610 if self._has_remote_blocks:
1611 # Copy any remote blocks to the local cluster.
1612 self._copy_remote_blocks(remote_blocks={})
1613 self._has_remote_blocks = False
1614 if not self._has_collection_uuid():
1615 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1616 elif not self._has_local_collection_uuid():
1617 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1619 self._my_block_manager().commit_all()
1624 text = self.manifest_text(strip=False)
1625 body['manifest_text'] = text
1627 self._remember_api_response(self._my_api().collections().update(
1628 uuid=self._manifest_locator,
1630 ).execute(num_retries=num_retries))
1631 self._manifest_text = self._api_response["manifest_text"]
1632 self._portable_data_hash = self._api_response["portable_data_hash"]
1633 self.set_committed(True)
1635 self._remember_api_response(self._my_api().collections().update(
1636 uuid=self._manifest_locator,
1638 ).execute(num_retries=num_retries))
1640 return self._manifest_text
1646 def save_new(self, name=None,
1647 create_collection_record=True,
1650 storage_classes=None,
1652 ensure_unique_name=False,
1654 preserve_version=False):
1655 """Save collection to a new collection record.
1657 Commit pending buffer blocks to Keep and, when create_collection_record
1658 is True (default), create a new collection record. After creating a
1659 new collection record, this Collection object will be associated with
1660 the new record used by `save()`. Returns the current manifest text.
1663 The collection name.
1665 :create_collection_record:
1666 If True, create a collection record on the API server.
1667 If False, only commit blocks to Keep and return the manifest text.
1670 the user, or project uuid that will own this collection.
1671 If None, defaults to the current user.
1674 Additional properties of collection. This value will replace any existing
1675 properties of collection.
1678 Specify desirable storage classes to be used when writing data to Keep.
1681 A collection is *expiring* when it has a *trash_at* time in the future.
1682 An expiring collection can be accessed as normal,
1683 but is scheduled to be trashed automatically at the *trash_at* time.
1685 :ensure_unique_name:
1686 If True, ask the API server to rename the collection
1687 if it conflicts with a collection with the same name and owner. If
1688 False, a name conflict will result in an error.
1691 Retry count on API calls (if None, use the collection default)
1694 If True, indicate that the collection content being saved right now
1695 should be preserved in a version snapshot if the collection record is
1696 updated in the future. Requires that the API server has
1697 Collections.CollectionVersioning enabled, if not, setting this will
1701 if properties and type(properties) is not dict:
1702 raise errors.ArgumentError("properties must be dictionary type.")
1704 if storage_classes and type(storage_classes) is not list:
1705 raise errors.ArgumentError("storage_classes must be list type.")
1707 if trash_at and type(trash_at) is not datetime.datetime:
1708 raise errors.ArgumentError("trash_at must be datetime type.")
1710 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1711 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1713 if self._has_remote_blocks:
1714 # Copy any remote blocks to the local cluster.
1715 self._copy_remote_blocks(remote_blocks={})
1716 self._has_remote_blocks = False
1719 self._storage_classes_desired = storage_classes
1721 self._my_block_manager().commit_all()
1722 text = self.manifest_text(strip=False)
1724 if create_collection_record:
1726 name = "New collection"
1727 ensure_unique_name = True
1729 body = {"manifest_text": text,
1731 "replication_desired": self.replication_desired}
1733 body["owner_uuid"] = owner_uuid
1735 body["properties"] = properties
1736 if self.storage_classes_desired():
1737 body["storage_classes_desired"] = self.storage_classes_desired()
1739 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1740 body["trash_at"] = t
1741 if preserve_version:
1742 body["preserve_version"] = preserve_version
1744 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1745 text = self._api_response["manifest_text"]
1747 self._manifest_locator = self._api_response["uuid"]
1748 self._portable_data_hash = self._api_response["portable_data_hash"]
1750 self._manifest_text = text
1751 self.set_committed(True)
1755 _token_re = re.compile(r'(\S+)(\s+|$)')
1756 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1757 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1759 def _unescape_manifest_path(self, path):
1760 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1763 def _import_manifest(self, manifest_text):
1764 """Import a manifest into a `Collection`.
1767 The manifest text to import from.
1771 raise ArgumentError("Can only import manifest into an empty collection")
1780 for token_and_separator in self._token_re.finditer(manifest_text):
1781 tok = token_and_separator.group(1)
1782 sep = token_and_separator.group(2)
1784 if state == STREAM_NAME:
1785 # starting a new stream
1786 stream_name = self._unescape_manifest_path(tok)
1791 self.find_or_create(stream_name, COLLECTION)
1795 block_locator = self._block_re.match(tok)
1797 blocksize = int(block_locator.group(1))
1798 blocks.append(Range(tok, streamoffset, blocksize, 0))
1799 streamoffset += blocksize
1803 if state == SEGMENTS:
1804 file_segment = self._segment_re.match(tok)
1806 pos = int(file_segment.group(1))
1807 size = int(file_segment.group(2))
1808 name = self._unescape_manifest_path(file_segment.group(3))
1809 if name.split('/')[-1] == '.':
1810 # placeholder for persisting an empty directory, not a real file
1812 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1814 filepath = os.path.join(stream_name, name)
1816 afile = self.find_or_create(filepath, FILE)
1817 except IOError as e:
1818 if e.errno == errno.ENOTDIR:
1819 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1822 if isinstance(afile, ArvadosFile):
1823 afile.add_segment(blocks, pos, size)
1825 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1828 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1834 self.set_committed(True)
1837 def notify(self, event, collection, name, item):
1839 self._callback(event, collection, name, item)
1842 class Subcollection(RichCollectionBase):
1843 """This is a subdirectory within a collection that doesn't have its own API
1846 Subcollection locking falls under the umbrella lock of its root collection.
1850 def __init__(self, parent, name):
1851 super(Subcollection, self).__init__(parent)
1852 self.lock = self.root_collection().lock
1853 self._manifest_text = None
1855 self.num_retries = parent.num_retries
1857 def root_collection(self):
1858 return self.parent.root_collection()
1861 return self.root_collection().writable()
1864 return self.root_collection()._my_api()
1867 return self.root_collection()._my_keep()
1869 def _my_block_manager(self):
1870 return self.root_collection()._my_block_manager()
1872 def stream_name(self):
1873 return os.path.join(self.parent.stream_name(), self.name)
1876 def clone(self, new_parent, new_name):
1877 c = Subcollection(new_parent, new_name)
1883 def _reparent(self, newparent, newname):
1884 self.set_committed(False)
1886 self.parent.remove(self.name, recursive=True)
1887 self.parent = newparent
1889 self.lock = self.parent.root_collection().lock
1892 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1893 """Encode empty directories by using an \056-named (".") empty file"""
1894 if len(self._items) == 0:
1895 return "%s %s 0:0:\\056\n" % (
1896 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1897 return super(Subcollection, self)._get_manifest_text(stream_name,
1902 class CollectionReader(Collection):
1903 """A read-only collection object.
1905 Initialize from a collection UUID or portable data hash, or raw
1906 manifest text. See `Collection` constructor for detailed options.
1909 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1910 self._in_init = True
1911 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1912 self._in_init = False
1914 # Forego any locking since it should never change once initialized.
1915 self.lock = NoopLock()
1917 # Backwards compatability with old CollectionReader
1918 # all_streams() and all_files()
1919 self._streams = None
1922 return self._in_init
1924 def _populate_streams(orig_func):
1925 @functools.wraps(orig_func)
1926 def populate_streams_wrapper(self, *args, **kwargs):
1927 # Defer populating self._streams until needed since it creates a copy of the manifest.
1928 if self._streams is None:
1929 if self._manifest_text:
1930 self._streams = [sline.split()
1931 for sline in self._manifest_text.split("\n")
1935 return orig_func(self, *args, **kwargs)
1936 return populate_streams_wrapper
1939 def normalize(self):
1940 """Normalize the streams returned by `all_streams`.
1942 This method is kept for backwards compatability and only affects the
1943 behavior of `all_streams()` and `all_files()`
1949 for s in self.all_streams():
1950 for f in s.all_files():
1951 streamname, filename = split(s.name() + "/" + f.name())
1952 if streamname not in streams:
1953 streams[streamname] = {}
1954 if filename not in streams[streamname]:
1955 streams[streamname][filename] = []
1956 for r in f.segments:
1957 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1959 self._streams = [normalize_stream(s, streams[s])
1960 for s in sorted(streams)]
1962 @arvados.util._deprecated('3.0', 'Collection iteration')
1964 def all_streams(self):
1965 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1966 for s in self._streams]
1968 @arvados.util._deprecated('3.0', 'Collection iteration')
1970 def all_files(self):
1971 for s in self.all_streams():
1972 for f in s.all_files():