1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
19 from collections import deque
22 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
23 from .keep import KeepLocator, KeepClient
24 from .stream import StreamReader
25 from ._normalize_stream import normalize_stream
26 from ._ranges import Range, LocatorAndRange
27 from .safeapi import ThreadSafeApiCache
28 import arvados.config as config
29 import arvados.errors as errors
31 import arvados.events as events
32 from arvados.retry import retry_method
34 _logger = logging.getLogger('arvados.collection')
36 class CollectionBase(object):
40 def __exit__(self, exc_type, exc_value, traceback):
44 if self._keep_client is None:
45 self._keep_client = KeepClient(api_client=self._api_client,
46 num_retries=self.num_retries)
47 return self._keep_client
49 def stripped_manifest(self):
50 """Get the manifest with locator hints stripped.
52 Return the manifest for the current collection with all
53 non-portable hints (i.e., permission signatures and other
54 hints other than size hints) removed from the locators.
56 raw = self.manifest_text()
58 for line in raw.split("\n"):
61 clean_fields = fields[:1] + [
62 (re.sub(r'\+[^\d][^\+]*', '', x)
63 if re.match(arvados.util.keep_locator_pattern, x)
66 clean += [' '.join(clean_fields), "\n"]
70 class _WriterFile(_FileLikeObjectBase):
71 def __init__(self, coll_writer, name):
72 super(_WriterFile, self).__init__(name, 'wb')
73 self.dest = coll_writer
76 super(_WriterFile, self).close()
77 self.dest.finish_current_file()
79 @_FileLikeObjectBase._before_close
80 def write(self, data):
83 @_FileLikeObjectBase._before_close
84 def writelines(self, seq):
88 @_FileLikeObjectBase._before_close
90 self.dest.flush_data()
93 class CollectionWriter(CollectionBase):
94 def __init__(self, api_client=None, num_retries=0, replication=None):
95 """Instantiate a CollectionWriter.
97 CollectionWriter lets you build a new Arvados Collection from scratch.
98 Write files to it. The CollectionWriter will upload data to Keep as
99 appropriate, and provide you with the Collection manifest text when
103 * api_client: The API client to use to look up Collections. If not
104 provided, CollectionReader will build one from available Arvados
106 * num_retries: The default number of times to retry failed
107 service requests. Default 0. You may change this value
108 after instantiation, but note those changes may not
109 propagate to related objects like the Keep client.
110 * replication: The number of copies of each block to store.
111 If this argument is None or not supplied, replication is
112 the server-provided default if available, otherwise 2.
114 self._api_client = api_client
115 self.num_retries = num_retries
116 self.replication = (2 if replication is None else replication)
117 self._keep_client = None
118 self._data_buffer = []
119 self._data_buffer_len = 0
120 self._current_stream_files = []
121 self._current_stream_length = 0
122 self._current_stream_locators = []
123 self._current_stream_name = '.'
124 self._current_file_name = None
125 self._current_file_pos = 0
126 self._finished_streams = []
127 self._close_file = None
128 self._queued_file = None
129 self._queued_dirents = deque()
130 self._queued_trees = deque()
131 self._last_open = None
133 def __exit__(self, exc_type, exc_value, traceback):
137 def do_queued_work(self):
138 # The work queue consists of three pieces:
139 # * _queued_file: The file object we're currently writing to the
141 # * _queued_dirents: Entries under the current directory
142 # (_queued_trees[0]) that we want to write or recurse through.
143 # This may contain files from subdirectories if
144 # max_manifest_depth == 0 for this directory.
145 # * _queued_trees: Directories that should be written as separate
146 # streams to the Collection.
147 # This function handles the smallest piece of work currently queued
148 # (current file, then current directory, then next directory) until
149 # no work remains. The _work_THING methods each do a unit of work on
150 # THING. _queue_THING methods add a THING to the work queue.
152 if self._queued_file:
154 elif self._queued_dirents:
156 elif self._queued_trees:
161 def _work_file(self):
163 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
167 self.finish_current_file()
169 self._queued_file.close()
170 self._close_file = None
171 self._queued_file = None
173 def _work_dirents(self):
174 path, stream_name, max_manifest_depth = self._queued_trees[0]
175 if stream_name != self.current_stream_name():
176 self.start_new_stream(stream_name)
177 while self._queued_dirents:
178 dirent = self._queued_dirents.popleft()
179 target = os.path.join(path, dirent)
180 if os.path.isdir(target):
181 self._queue_tree(target,
182 os.path.join(stream_name, dirent),
183 max_manifest_depth - 1)
185 self._queue_file(target, dirent)
187 if not self._queued_dirents:
188 self._queued_trees.popleft()
190 def _work_trees(self):
191 path, stream_name, max_manifest_depth = self._queued_trees[0]
192 d = arvados.util.listdir_recursive(
193 path, max_depth = (None if max_manifest_depth == 0 else 0))
195 self._queue_dirents(stream_name, d)
197 self._queued_trees.popleft()
199 def _queue_file(self, source, filename=None):
200 assert (self._queued_file is None), "tried to queue more than one file"
201 if not hasattr(source, 'read'):
202 source = open(source, 'rb')
203 self._close_file = True
205 self._close_file = False
207 filename = os.path.basename(source.name)
208 self.start_new_file(filename)
209 self._queued_file = source
211 def _queue_dirents(self, stream_name, dirents):
212 assert (not self._queued_dirents), "tried to queue more than one tree"
213 self._queued_dirents = deque(sorted(dirents))
215 def _queue_tree(self, path, stream_name, max_manifest_depth):
216 self._queued_trees.append((path, stream_name, max_manifest_depth))
218 def write_file(self, source, filename=None):
219 self._queue_file(source, filename)
220 self.do_queued_work()
222 def write_directory_tree(self,
223 path, stream_name='.', max_manifest_depth=-1):
224 self._queue_tree(path, stream_name, max_manifest_depth)
225 self.do_queued_work()
227 def write(self, newdata):
228 if isinstance(newdata, bytes):
230 elif isinstance(newdata, str):
231 newdata = newdata.encode()
232 elif hasattr(newdata, '__iter__'):
236 self._data_buffer.append(newdata)
237 self._data_buffer_len += len(newdata)
238 self._current_stream_length += len(newdata)
239 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
242 def open(self, streampath, filename=None):
243 """open(streampath[, filename]) -> file-like object
245 Pass in the path of a file to write to the Collection, either as a
246 single string or as two separate stream name and file name arguments.
247 This method returns a file-like object you can write to add it to the
250 You may only have one file object from the Collection open at a time,
251 so be sure to close the object when you're done. Using the object in
252 a with statement makes that easy::
254 with cwriter.open('./doc/page1.txt') as outfile:
255 outfile.write(page1_data)
256 with cwriter.open('./doc/page2.txt') as outfile:
257 outfile.write(page2_data)
260 streampath, filename = split(streampath)
261 if self._last_open and not self._last_open.closed:
262 raise errors.AssertionError(
263 "can't open '{}' when '{}' is still open".format(
264 filename, self._last_open.name))
265 if streampath != self.current_stream_name():
266 self.start_new_stream(streampath)
267 self.set_current_file_name(filename)
268 self._last_open = _WriterFile(self, filename)
269 return self._last_open
271 def flush_data(self):
272 data_buffer = b''.join(self._data_buffer)
274 self._current_stream_locators.append(
276 data_buffer[0:config.KEEP_BLOCK_SIZE],
277 copies=self.replication))
278 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
279 self._data_buffer_len = len(self._data_buffer[0])
281 def start_new_file(self, newfilename=None):
282 self.finish_current_file()
283 self.set_current_file_name(newfilename)
285 def set_current_file_name(self, newfilename):
286 if re.search(r'[\t\n]', newfilename):
287 raise errors.AssertionError(
288 "Manifest filenames cannot contain whitespace: %s" %
290 elif re.search(r'\x00', newfilename):
291 raise errors.AssertionError(
292 "Manifest filenames cannot contain NUL characters: %s" %
294 self._current_file_name = newfilename
296 def current_file_name(self):
297 return self._current_file_name
299 def finish_current_file(self):
300 if self._current_file_name is None:
301 if self._current_file_pos == self._current_stream_length:
303 raise errors.AssertionError(
304 "Cannot finish an unnamed file " +
305 "(%d bytes at offset %d in '%s' stream)" %
306 (self._current_stream_length - self._current_file_pos,
307 self._current_file_pos,
308 self._current_stream_name))
309 self._current_stream_files.append([
310 self._current_file_pos,
311 self._current_stream_length - self._current_file_pos,
312 self._current_file_name])
313 self._current_file_pos = self._current_stream_length
314 self._current_file_name = None
316 def start_new_stream(self, newstreamname='.'):
317 self.finish_current_stream()
318 self.set_current_stream_name(newstreamname)
320 def set_current_stream_name(self, newstreamname):
321 if re.search(r'[\t\n]', newstreamname):
322 raise errors.AssertionError(
323 "Manifest stream names cannot contain whitespace: '%s'" %
325 self._current_stream_name = '.' if newstreamname=='' else newstreamname
327 def current_stream_name(self):
328 return self._current_stream_name
330 def finish_current_stream(self):
331 self.finish_current_file()
333 if not self._current_stream_files:
335 elif self._current_stream_name is None:
336 raise errors.AssertionError(
337 "Cannot finish an unnamed stream (%d bytes in %d files)" %
338 (self._current_stream_length, len(self._current_stream_files)))
340 if not self._current_stream_locators:
341 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
342 self._finished_streams.append([self._current_stream_name,
343 self._current_stream_locators,
344 self._current_stream_files])
345 self._current_stream_files = []
346 self._current_stream_length = 0
347 self._current_stream_locators = []
348 self._current_stream_name = None
349 self._current_file_pos = 0
350 self._current_file_name = None
353 """Store the manifest in Keep and return its locator.
355 This is useful for storing manifest fragments (task outputs)
356 temporarily in Keep during a Crunch job.
358 In other cases you should make a collection instead, by
359 sending manifest_text() to the API server's "create
360 collection" endpoint.
362 return self._my_keep().put(self.manifest_text().encode(),
363 copies=self.replication)
365 def portable_data_hash(self):
366 stripped = self.stripped_manifest().encode()
367 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
369 def manifest_text(self):
370 self.finish_current_stream()
373 for stream in self._finished_streams:
374 if not re.search(r'^\.(/.*)?$', stream[0]):
376 manifest += stream[0].replace(' ', '\\040')
377 manifest += ' ' + ' '.join(stream[1])
378 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
383 def data_locators(self):
385 for name, locators, files in self._finished_streams:
390 class ResumableCollectionWriter(CollectionWriter):
391 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
392 '_current_stream_locators', '_current_stream_name',
393 '_current_file_name', '_current_file_pos', '_close_file',
394 '_data_buffer', '_dependencies', '_finished_streams',
395 '_queued_dirents', '_queued_trees']
397 def __init__(self, api_client=None, **kwargs):
398 self._dependencies = {}
399 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
402 def from_state(cls, state, *init_args, **init_kwargs):
403 # Try to build a new writer from scratch with the given state.
404 # If the state is not suitable to resume (because files have changed,
405 # been deleted, aren't predictable, etc.), raise a
406 # StaleWriterStateError. Otherwise, return the initialized writer.
407 # The caller is responsible for calling writer.do_queued_work()
408 # appropriately after it's returned.
409 writer = cls(*init_args, **init_kwargs)
410 for attr_name in cls.STATE_PROPS:
411 attr_value = state[attr_name]
412 attr_class = getattr(writer, attr_name).__class__
413 # Coerce the value into the same type as the initial value, if
415 if attr_class not in (type(None), attr_value.__class__):
416 attr_value = attr_class(attr_value)
417 setattr(writer, attr_name, attr_value)
418 # Check dependencies before we try to resume anything.
419 if any(KeepLocator(ls).permission_expired()
420 for ls in writer._current_stream_locators):
421 raise errors.StaleWriterStateError(
422 "locators include expired permission hint")
423 writer.check_dependencies()
424 if state['_current_file'] is not None:
425 path, pos = state['_current_file']
427 writer._queued_file = open(path, 'rb')
428 writer._queued_file.seek(pos)
429 except IOError as error:
430 raise errors.StaleWriterStateError(
431 "failed to reopen active file {}: {}".format(path, error))
434 def check_dependencies(self):
435 for path, orig_stat in listitems(self._dependencies):
436 if not S_ISREG(orig_stat[ST_MODE]):
437 raise errors.StaleWriterStateError("{} not file".format(path))
439 now_stat = tuple(os.stat(path))
440 except OSError as error:
441 raise errors.StaleWriterStateError(
442 "failed to stat {}: {}".format(path, error))
443 if ((not S_ISREG(now_stat[ST_MODE])) or
444 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
445 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
446 raise errors.StaleWriterStateError("{} changed".format(path))
448 def dump_state(self, copy_func=lambda x: x):
449 state = {attr: copy_func(getattr(self, attr))
450 for attr in self.STATE_PROPS}
451 if self._queued_file is None:
452 state['_current_file'] = None
454 state['_current_file'] = (os.path.realpath(self._queued_file.name),
455 self._queued_file.tell())
458 def _queue_file(self, source, filename=None):
460 src_path = os.path.realpath(source)
462 raise errors.AssertionError("{} not a file path".format(source))
464 path_stat = os.stat(src_path)
465 except OSError as stat_error:
467 super(ResumableCollectionWriter, self)._queue_file(source, filename)
468 fd_stat = os.fstat(self._queued_file.fileno())
469 if not S_ISREG(fd_stat.st_mode):
470 # We won't be able to resume from this cache anyway, so don't
471 # worry about further checks.
472 self._dependencies[source] = tuple(fd_stat)
473 elif path_stat is None:
474 raise errors.AssertionError(
475 "could not stat {}: {}".format(source, stat_error))
476 elif path_stat.st_ino != fd_stat.st_ino:
477 raise errors.AssertionError(
478 "{} changed between open and stat calls".format(source))
480 self._dependencies[src_path] = tuple(fd_stat)
482 def write(self, data):
483 if self._queued_file is None:
484 raise errors.AssertionError(
485 "resumable writer can't accept unsourced data")
486 return super(ResumableCollectionWriter, self).write(data)
494 COLLECTION = "collection"
496 class RichCollectionBase(CollectionBase):
497 """Base class for Collections and Subcollections.
499 Implements the majority of functionality relating to accessing items in the
504 def __init__(self, parent=None):
506 self._committed = False
507 self._callback = None
511 raise NotImplementedError()
514 raise NotImplementedError()
516 def _my_block_manager(self):
517 raise NotImplementedError()
520 raise NotImplementedError()
522 def root_collection(self):
523 raise NotImplementedError()
525 def notify(self, event, collection, name, item):
526 raise NotImplementedError()
528 def stream_name(self):
529 raise NotImplementedError()
533 def find_or_create(self, path, create_type):
534 """Recursively search the specified file path.
536 May return either a `Collection` or `ArvadosFile`. If not found, will
537 create a new item at the specified path based on `create_type`. Will
538 create intermediate subcollections needed to contain the final item in
542 One of `arvados.collection.FILE` or
543 `arvados.collection.COLLECTION`. If the path is not found, and value
544 of create_type is FILE then create and return a new ArvadosFile for
545 the last path component. If COLLECTION, then create and return a new
546 Collection for the last path component.
550 pathcomponents = path.split("/", 1)
551 if pathcomponents[0]:
552 item = self._items.get(pathcomponents[0])
553 if len(pathcomponents) == 1:
556 if create_type == COLLECTION:
557 item = Subcollection(self, pathcomponents[0])
559 item = ArvadosFile(self, pathcomponents[0])
560 self._items[pathcomponents[0]] = item
561 self.set_committed(False)
562 self.notify(ADD, self, pathcomponents[0], item)
566 # create new collection
567 item = Subcollection(self, pathcomponents[0])
568 self._items[pathcomponents[0]] = item
569 self.set_committed(False)
570 self.notify(ADD, self, pathcomponents[0], item)
571 if isinstance(item, RichCollectionBase):
572 return item.find_or_create(pathcomponents[1], create_type)
574 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
579 def find(self, path):
580 """Recursively search the specified file path.
582 May return either a Collection or ArvadosFile. Return None if not
584 If path is invalid (ex: starts with '/'), an IOError exception will be
589 raise errors.ArgumentError("Parameter 'path' is empty.")
591 pathcomponents = path.split("/", 1)
592 if pathcomponents[0] == '':
593 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
595 item = self._items.get(pathcomponents[0])
598 elif len(pathcomponents) == 1:
601 if isinstance(item, RichCollectionBase):
602 if pathcomponents[1]:
603 return item.find(pathcomponents[1])
607 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
610 def mkdirs(self, path):
611 """Recursive subcollection create.
613 Like `os.makedirs()`. Will create intermediate subcollections needed
614 to contain the leaf subcollection path.
618 if self.find(path) != None:
619 raise IOError(errno.EEXIST, "Directory or file exists", path)
621 return self.find_or_create(path, COLLECTION)
623 def open(self, path, mode="r"):
624 """Open a file-like object for access.
627 path to a file in the collection
629 a string consisting of "r", "w", or "a", optionally followed
630 by "b" or "t", optionally followed by "+".
632 binary mode: write() accepts bytes, read() returns bytes.
634 text mode (default): write() accepts strings, read() returns strings.
638 opens for reading and writing. Reads/writes share a file pointer.
640 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
642 opens for reading and writing. All writes are appended to
643 the end of the file. Writing does not affect the file pointer for
647 if not re.search(r'^[rwa][bt]?\+?$', mode):
648 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
650 if mode[0] == 'r' and '+' not in mode:
651 fclass = ArvadosFileReader
652 arvfile = self.find(path)
653 elif not self.writable():
654 raise IOError(errno.EROFS, "Collection is read only")
656 fclass = ArvadosFileWriter
657 arvfile = self.find_or_create(path, FILE)
660 raise IOError(errno.ENOENT, "File not found", path)
661 if not isinstance(arvfile, ArvadosFile):
662 raise IOError(errno.EISDIR, "Is a directory", path)
667 return fclass(arvfile, mode=mode, num_retries=self.num_retries)
670 """Determine if the collection has been modified since last commited."""
671 return not self.committed()
675 """Determine if the collection has been committed to the API server."""
676 return self._committed
679 def set_committed(self, value=True):
680 """Recursively set committed flag.
682 If value is True, set committed to be True for this and all children.
684 If value is False, set committed to be False for this and all parents.
686 if value == self._committed:
689 for k,v in listitems(self._items):
690 v.set_committed(True)
691 self._committed = True
693 self._committed = False
694 if self.parent is not None:
695 self.parent.set_committed(False)
699 """Iterate over names of files and collections contained in this collection."""
700 return iter(viewkeys(self._items))
703 def __getitem__(self, k):
704 """Get a file or collection that is directly contained by this collection.
706 If you want to search a path, use `find()` instead.
709 return self._items[k]
712 def __contains__(self, k):
713 """Test if there is a file or collection a directly contained by this collection."""
714 return k in self._items
718 """Get the number of items directly contained in this collection."""
719 return len(self._items)
723 def __delitem__(self, p):
724 """Delete an item by name which is directly contained by this collection."""
726 self.set_committed(False)
727 self.notify(DEL, self, p, None)
731 """Get a list of names of files and collections directly contained in this collection."""
732 return self._items.keys()
736 """Get a list of files and collection objects directly contained in this collection."""
737 return listvalues(self._items)
741 """Get a list of (name, object) tuples directly contained in this collection."""
742 return listitems(self._items)
744 def exists(self, path):
745 """Test if there is a file or collection at `path`."""
746 return self.find(path) is not None
750 def remove(self, path, recursive=False):
751 """Remove the file or subcollection (directory) at `path`.
754 Specify whether to remove non-empty subcollections (True), or raise an error (False).
758 raise errors.ArgumentError("Parameter 'path' is empty.")
760 pathcomponents = path.split("/", 1)
761 item = self._items.get(pathcomponents[0])
763 raise IOError(errno.ENOENT, "File not found", path)
764 if len(pathcomponents) == 1:
765 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
766 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
767 deleteditem = self._items[pathcomponents[0]]
768 del self._items[pathcomponents[0]]
769 self.set_committed(False)
770 self.notify(DEL, self, pathcomponents[0], deleteditem)
772 item.remove(pathcomponents[1])
774 def _clonefrom(self, source):
775 for k,v in listitems(source):
776 self._items[k] = v.clone(self, k)
779 raise NotImplementedError()
783 def add(self, source_obj, target_name, overwrite=False, reparent=False):
784 """Copy or move a file or subcollection to this collection.
787 An ArvadosFile, or Subcollection object
790 Destination item name. If the target name already exists and is a
791 file, this will raise an error unless you specify `overwrite=True`.
794 Whether to overwrite target file if it already exists.
797 If True, source_obj will be moved from its parent collection to this collection.
798 If False, source_obj will be copied and the parent collection will be
803 if target_name in self and not overwrite:
804 raise IOError(errno.EEXIST, "File already exists", target_name)
807 if target_name in self:
808 modified_from = self[target_name]
810 # Actually make the move or copy.
812 source_obj._reparent(self, target_name)
815 item = source_obj.clone(self, target_name)
817 self._items[target_name] = item
818 self.set_committed(False)
821 self.notify(MOD, self, target_name, (modified_from, item))
823 self.notify(ADD, self, target_name, item)
825 def _get_src_target(self, source, target_path, source_collection, create_dest):
826 if source_collection is None:
827 source_collection = self
830 if isinstance(source, basestring):
831 source_obj = source_collection.find(source)
832 if source_obj is None:
833 raise IOError(errno.ENOENT, "File not found", source)
834 sourcecomponents = source.split("/")
837 sourcecomponents = None
839 # Find parent collection the target path
840 targetcomponents = target_path.split("/")
842 # Determine the name to use.
843 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
846 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
849 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
851 if len(targetcomponents) > 1:
852 target_dir = self.find("/".join(targetcomponents[0:-1]))
856 if target_dir is None:
857 raise IOError(errno.ENOENT, "Target directory not found", target_name)
859 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
860 target_dir = target_dir[target_name]
861 target_name = sourcecomponents[-1]
863 return (source_obj, target_dir, target_name)
867 def copy(self, source, target_path, source_collection=None, overwrite=False):
868 """Copy a file or subcollection to a new path in this collection.
871 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
874 Destination file or path. If the target path already exists and is a
875 subcollection, the item will be placed inside the subcollection. If
876 the target path already exists and is a file, this will raise an error
877 unless you specify `overwrite=True`.
880 Collection to copy `source_path` from (default `self`)
883 Whether to overwrite target file if it already exists.
886 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
887 target_dir.add(source_obj, target_name, overwrite, False)
891 def rename(self, source, target_path, source_collection=None, overwrite=False):
892 """Move a file or subcollection from `source_collection` to a new path in this collection.
895 A string with a path to source file or subcollection.
898 Destination file or path. If the target path already exists and is a
899 subcollection, the item will be placed inside the subcollection. If
900 the target path already exists and is a file, this will raise an error
901 unless you specify `overwrite=True`.
904 Collection to copy `source_path` from (default `self`)
907 Whether to overwrite target file if it already exists.
910 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
911 if not source_obj.writable():
912 raise IOError(errno.EROFS, "Source collection is read only", source)
913 target_dir.add(source_obj, target_name, overwrite, True)
915 def portable_manifest_text(self, stream_name="."):
916 """Get the manifest text for this collection, sub collections and files.
918 This method does not flush outstanding blocks to Keep. It will return
919 a normalized manifest with access tokens stripped.
922 Name to use for this stream (directory)
925 return self._get_manifest_text(stream_name, True, True)
928 def manifest_text(self, stream_name=".", strip=False, normalize=False,
929 only_committed=False):
930 """Get the manifest text for this collection, sub collections and files.
932 This method will flush outstanding blocks to Keep. By default, it will
933 not normalize an unmodified manifest or strip access tokens.
936 Name to use for this stream (directory)
939 If True, remove signing tokens from block locators if present.
940 If False (default), block locators are left unchanged.
943 If True, always export the manifest text in normalized form
944 even if the Collection is not modified. If False (default) and the collection
945 is not modified, return the original manifest text even if it is not
949 If True, don't commit pending blocks.
953 if not only_committed:
954 self._my_block_manager().commit_all()
955 return self._get_manifest_text(stream_name, strip, normalize,
956 only_committed=only_committed)
959 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
960 """Get the manifest text for this collection, sub collections and files.
963 Name to use for this stream (directory)
966 If True, remove signing tokens from block locators if present.
967 If False (default), block locators are left unchanged.
970 If True, always export the manifest text in normalized form
971 even if the Collection is not modified. If False (default) and the collection
972 is not modified, return the original manifest text even if it is not
976 If True, only include blocks that were already committed to Keep.
980 if not self.committed() or self._manifest_text is None or normalize:
983 sorted_keys = sorted(self.keys())
984 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
985 # Create a stream per file `k`
986 arvfile = self[filename]
988 for segment in arvfile.segments():
989 loc = segment.locator
990 if arvfile.parent._my_block_manager().is_bufferblock(loc):
993 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
995 loc = KeepLocator(loc).stripped()
996 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
997 segment.segment_offset, segment.range_size))
998 stream[filename] = filestream
1000 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1001 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1002 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1006 return self.stripped_manifest()
1008 return self._manifest_text
1011 def diff(self, end_collection, prefix=".", holding_collection=None):
1012 """Generate list of add/modify/delete actions.
1014 When given to `apply`, will change `self` to match `end_collection`
1018 if holding_collection is None:
1019 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1021 if k not in end_collection:
1022 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1023 for k in end_collection:
1025 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1026 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1027 elif end_collection[k] != self[k]:
1028 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1030 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1032 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1037 def apply(self, changes):
1038 """Apply changes from `diff`.
1040 If a change conflicts with a local change, it will be saved to an
1041 alternate path indicating the conflict.
1045 self.set_committed(False)
1046 for change in changes:
1047 event_type = change[0]
1050 local = self.find(path)
1051 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1053 if event_type == ADD:
1055 # No local file at path, safe to copy over new file
1056 self.copy(initial, path)
1057 elif local is not None and local != initial:
1058 # There is already local file and it is different:
1059 # save change to conflict file.
1060 self.copy(initial, conflictpath)
1061 elif event_type == MOD or event_type == TOK:
1063 if local == initial:
1064 # Local matches the "initial" item so it has not
1065 # changed locally and is safe to update.
1066 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1067 # Replace contents of local file with new contents
1068 local.replace_contents(final)
1070 # Overwrite path with new item; this can happen if
1071 # path was a file and is now a collection or vice versa
1072 self.copy(final, path, overwrite=True)
1074 # Local is missing (presumably deleted) or local doesn't
1075 # match the "start" value, so save change to conflict file
1076 self.copy(final, conflictpath)
1077 elif event_type == DEL:
1078 if local == initial:
1079 # Local item matches "initial" value, so it is safe to remove.
1080 self.remove(path, recursive=True)
1081 # else, the file is modified or already removed, in either
1082 # case we don't want to try to remove it.
1084 def portable_data_hash(self):
1085 """Get the portable data hash for this collection's manifest."""
1086 if self._manifest_locator and self.committed():
1087 # If the collection is already saved on the API server, and it's committed
1088 # then return API server's PDH response.
1089 return self._portable_data_hash
1091 stripped = self.portable_manifest_text().encode()
1092 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1095 def subscribe(self, callback):
1096 if self._callback is None:
1097 self._callback = callback
1099 raise errors.ArgumentError("A callback is already set on this collection.")
1102 def unsubscribe(self):
1103 if self._callback is not None:
1104 self._callback = None
1107 def notify(self, event, collection, name, item):
1109 self._callback(event, collection, name, item)
1110 self.root_collection().notify(event, collection, name, item)
1113 def __eq__(self, other):
1116 if not isinstance(other, RichCollectionBase):
1118 if len(self._items) != len(other):
1120 for k in self._items:
1123 if self._items[k] != other[k]:
1127 def __ne__(self, other):
1128 return not self.__eq__(other)
1132 """Flush bufferblocks to Keep."""
1133 for e in listvalues(self):
1137 class Collection(RichCollectionBase):
1138 """Represents the root of an Arvados Collection.
1140 This class is threadsafe. The root collection object, all subcollections
1141 and files are protected by a single lock (i.e. each access locks the entire
1147 :To read an existing file:
1148 `c.open("myfile", "r")`
1150 :To write a new file:
1151 `c.open("myfile", "w")`
1153 :To determine if a file exists:
1154 `c.find("myfile") is not None`
1157 `c.copy("source", "dest")`
1160 `c.remove("myfile")`
1162 :To save to an existing collection record:
1165 :To save a new collection record:
1168 :To merge remote changes into this object:
1171 Must be associated with an API server Collection record (during
1172 initialization, or using `save_new`) to use `save` or `update`
1176 def __init__(self, manifest_locator_or_text=None,
1183 replication_desired=None,
1185 """Collection constructor.
1187 :manifest_locator_or_text:
1188 One of Arvados collection UUID, block locator of
1189 a manifest, raw manifest text, or None (to create an empty collection).
1191 the parent Collection, may be None.
1194 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1195 Prefer this over supplying your own api_client and keep_client (except in testing).
1196 Will use default config settings if not specified.
1199 The API client object to use for requests. If not specified, create one using `apiconfig`.
1202 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1205 the number of retries for API and Keep requests.
1208 the block manager to use. If not specified, create one.
1210 :replication_desired:
1211 How many copies should Arvados maintain. If None, API server default
1212 configuration applies. If not None, this value will also be used
1213 for determining the number of block copies being written.
1216 super(Collection, self).__init__(parent)
1217 self._api_client = api_client
1218 self._keep_client = keep_client
1219 self._block_manager = block_manager
1220 self.replication_desired = replication_desired
1221 self.put_threads = put_threads
1224 self._config = apiconfig
1226 self._config = config.settings()
1228 self.num_retries = num_retries if num_retries is not None else 0
1229 self._manifest_locator = None
1230 self._manifest_text = None
1231 self._portable_data_hash = None
1232 self._api_response = None
1233 self._past_versions = set()
1235 self.lock = threading.RLock()
1238 if manifest_locator_or_text:
1239 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1240 self._manifest_locator = manifest_locator_or_text
1241 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1242 self._manifest_locator = manifest_locator_or_text
1243 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1244 self._manifest_text = manifest_locator_or_text
1246 raise errors.ArgumentError(
1247 "Argument to CollectionReader is not a manifest or a collection UUID")
1251 except (IOError, errors.SyntaxError) as e:
1252 raise errors.ArgumentError("Error processing manifest text: %s", e)
1254 def root_collection(self):
1257 def stream_name(self):
1264 def known_past_version(self, modified_at_and_portable_data_hash):
1265 return modified_at_and_portable_data_hash in self._past_versions
1269 def update(self, other=None, num_retries=None):
1270 """Merge the latest collection on the API server with the current collection."""
1273 if self._manifest_locator is None:
1274 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1275 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1276 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1277 response.get("portable_data_hash") != self.portable_data_hash()):
1278 # The record on the server is different from our current one, but we've seen it before,
1279 # so ignore it because it's already been merged.
1280 # However, if it's the same as our current record, proceed with the update, because we want to update
1284 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1285 other = CollectionReader(response["manifest_text"])
1286 baseline = CollectionReader(self._manifest_text)
1287 self.apply(baseline.diff(other))
1288 self._manifest_text = self.manifest_text()
1292 if self._api_client is None:
1293 self._api_client = ThreadSafeApiCache(self._config)
1294 if self._keep_client is None:
1295 self._keep_client = self._api_client.keep
1296 return self._api_client
1300 if self._keep_client is None:
1301 if self._api_client is None:
1304 self._keep_client = KeepClient(api_client=self._api_client)
1305 return self._keep_client
1308 def _my_block_manager(self):
1309 if self._block_manager is None:
1310 copies = (self.replication_desired or
1311 self._my_api()._rootDesc.get('defaultCollectionReplication',
1313 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1314 return self._block_manager
1316 def _remember_api_response(self, response):
1317 self._api_response = response
1318 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1320 def _populate_from_api_server(self):
1321 # As in KeepClient itself, we must wait until the last
1322 # possible moment to instantiate an API client, in order to
1323 # avoid tripping up clients that don't have access to an API
1324 # server. If we do build one, make sure our Keep client uses
1325 # it. If instantiation fails, we'll fall back to the except
1326 # clause, just like any other Collection lookup
1327 # failure. Return an exception, or None if successful.
1329 self._remember_api_response(self._my_api().collections().get(
1330 uuid=self._manifest_locator).execute(
1331 num_retries=self.num_retries))
1332 self._manifest_text = self._api_response['manifest_text']
1333 self._portable_data_hash = self._api_response['portable_data_hash']
1334 # If not overriden via kwargs, we should try to load the
1335 # replication_desired from the API server
1336 if self.replication_desired is None:
1337 self.replication_desired = self._api_response.get('replication_desired', None)
1339 except Exception as e:
1342 def _populate_from_keep(self):
1343 # Retrieve a manifest directly from Keep. This has a chance of
1344 # working if [a] the locator includes a permission signature
1345 # or [b] the Keep services are operating in world-readable
1346 # mode. Return an exception, or None if successful.
1348 self._manifest_text = self._my_keep().get(
1349 self._manifest_locator, num_retries=self.num_retries).decode()
1350 except Exception as e:
1353 def _populate(self):
1354 if self._manifest_locator is None and self._manifest_text is None:
1356 error_via_api = None
1357 error_via_keep = None
1358 should_try_keep = ((self._manifest_text is None) and
1359 arvados.util.keep_locator_pattern.match(
1360 self._manifest_locator))
1361 if ((self._manifest_text is None) and
1362 arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1363 error_via_keep = self._populate_from_keep()
1364 if self._manifest_text is None:
1365 error_via_api = self._populate_from_api_server()
1366 if error_via_api is not None and not should_try_keep:
1368 if ((self._manifest_text is None) and
1369 not error_via_keep and
1371 # Looks like a keep locator, and we didn't already try keep above
1372 error_via_keep = self._populate_from_keep()
1373 if self._manifest_text is None:
1375 raise errors.NotFoundError(
1376 ("Failed to retrieve collection '{}' " +
1377 "from either API server ({}) or Keep ({})."
1379 self._manifest_locator,
1383 self._baseline_manifest = self._manifest_text
1384 self._import_manifest(self._manifest_text)
1387 def _has_collection_uuid(self):
1388 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1390 def __enter__(self):
1393 def __exit__(self, exc_type, exc_value, traceback):
1394 """Support scoped auto-commit in a with: block."""
1395 if exc_type is None:
1396 if self.writable() and self._has_collection_uuid():
1400 def stop_threads(self):
1401 if self._block_manager is not None:
1402 self._block_manager.stop_threads()
1405 def manifest_locator(self):
1406 """Get the manifest locator, if any.
1408 The manifest locator will be set when the collection is loaded from an
1409 API server record or the portable data hash of a manifest.
1411 The manifest locator will be None if the collection is newly created or
1412 was created directly from manifest text. The method `save_new()` will
1413 assign a manifest locator.
1416 return self._manifest_locator
1419 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1420 if new_config is None:
1421 new_config = self._config
1423 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1425 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1427 newcollection._clonefrom(self)
1428 return newcollection
1431 def api_response(self):
1432 """Returns information about this Collection fetched from the API server.
1434 If the Collection exists in Keep but not the API server, currently
1435 returns None. Future versions may provide a synthetic response.
1438 return self._api_response
1440 def find_or_create(self, path, create_type):
1441 """See `RichCollectionBase.find_or_create`"""
1445 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1447 def find(self, path):
1448 """See `RichCollectionBase.find`"""
1452 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1454 def remove(self, path, recursive=False):
1455 """See `RichCollectionBase.remove`"""
1457 raise errors.ArgumentError("Cannot remove '.'")
1459 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1464 def save(self, merge=True, num_retries=None):
1465 """Save collection to an existing collection record.
1467 Commit pending buffer blocks to Keep, merge with remote record (if
1468 merge=True, the default), and update the collection record. Returns
1469 the current manifest text.
1471 Will raise AssertionError if not associated with a collection record on
1472 the API server. If you want to save a manifest to Keep only, see
1476 Update and merge remote changes before saving. Otherwise, any
1477 remote changes will be ignored and overwritten.
1480 Retry count on API calls (if None, use the collection default)
1483 if not self.committed():
1484 if not self._has_collection_uuid():
1485 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1487 self._my_block_manager().commit_all()
1492 text = self.manifest_text(strip=False)
1493 self._remember_api_response(self._my_api().collections().update(
1494 uuid=self._manifest_locator,
1495 body={'manifest_text': text}
1497 num_retries=num_retries))
1498 self._manifest_text = self._api_response["manifest_text"]
1499 self._portable_data_hash = self._api_response["portable_data_hash"]
1500 self.set_committed(True)
1502 return self._manifest_text
1508 def save_new(self, name=None,
1509 create_collection_record=True,
1511 ensure_unique_name=False,
1513 """Save collection to a new collection record.
1515 Commit pending buffer blocks to Keep and, when create_collection_record
1516 is True (default), create a new collection record. After creating a
1517 new collection record, this Collection object will be associated with
1518 the new record used by `save()`. Returns the current manifest text.
1521 The collection name.
1523 :create_collection_record:
1524 If True, create a collection record on the API server.
1525 If False, only commit blocks to Keep and return the manifest text.
1528 the user, or project uuid that will own this collection.
1529 If None, defaults to the current user.
1531 :ensure_unique_name:
1532 If True, ask the API server to rename the collection
1533 if it conflicts with a collection with the same name and owner. If
1534 False, a name conflict will result in an error.
1537 Retry count on API calls (if None, use the collection default)
1540 self._my_block_manager().commit_all()
1541 text = self.manifest_text(strip=False)
1543 if create_collection_record:
1545 name = "New collection"
1546 ensure_unique_name = True
1548 body = {"manifest_text": text,
1550 "replication_desired": self.replication_desired}
1552 body["owner_uuid"] = owner_uuid
1554 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1555 text = self._api_response["manifest_text"]
1557 self._manifest_locator = self._api_response["uuid"]
1558 self._portable_data_hash = self._api_response["portable_data_hash"]
1560 self._manifest_text = text
1561 self.set_committed(True)
1566 def _import_manifest(self, manifest_text):
1567 """Import a manifest into a `Collection`.
1570 The manifest text to import from.
1574 raise ArgumentError("Can only import manifest into an empty collection")
1583 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1584 tok = token_and_separator.group(1)
1585 sep = token_and_separator.group(2)
1587 if state == STREAM_NAME:
1588 # starting a new stream
1589 stream_name = tok.replace('\\040', ' ')
1594 self.find_or_create(stream_name, COLLECTION)
1598 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1600 blocksize = int(block_locator.group(1))
1601 blocks.append(Range(tok, streamoffset, blocksize, 0))
1602 streamoffset += blocksize
1606 if state == SEGMENTS:
1607 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1609 pos = int(file_segment.group(1))
1610 size = int(file_segment.group(2))
1611 name = file_segment.group(3).replace('\\040', ' ')
1612 filepath = os.path.join(stream_name, name)
1613 afile = self.find_or_create(filepath, FILE)
1614 if isinstance(afile, ArvadosFile):
1615 afile.add_segment(blocks, pos, size)
1617 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1620 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1626 self.set_committed(True)
1629 def notify(self, event, collection, name, item):
1631 self._callback(event, collection, name, item)
1634 class Subcollection(RichCollectionBase):
1635 """This is a subdirectory within a collection that doesn't have its own API
1638 Subcollection locking falls under the umbrella lock of its root collection.
1642 def __init__(self, parent, name):
1643 super(Subcollection, self).__init__(parent)
1644 self.lock = self.root_collection().lock
1645 self._manifest_text = None
1647 self.num_retries = parent.num_retries
1649 def root_collection(self):
1650 return self.parent.root_collection()
1653 return self.root_collection().writable()
1656 return self.root_collection()._my_api()
1659 return self.root_collection()._my_keep()
1661 def _my_block_manager(self):
1662 return self.root_collection()._my_block_manager()
1664 def stream_name(self):
1665 return os.path.join(self.parent.stream_name(), self.name)
1668 def clone(self, new_parent, new_name):
1669 c = Subcollection(new_parent, new_name)
1675 def _reparent(self, newparent, newname):
1676 self.set_committed(False)
1678 self.parent.remove(self.name, recursive=True)
1679 self.parent = newparent
1681 self.lock = self.parent.root_collection().lock
1684 class CollectionReader(Collection):
1685 """A read-only collection object.
1687 Initialize from an api collection record locator, a portable data hash of a
1688 manifest, or raw manifest text. See `Collection` constructor for detailed
1692 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1693 self._in_init = True
1694 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1695 self._in_init = False
1697 # Forego any locking since it should never change once initialized.
1698 self.lock = NoopLock()
1700 # Backwards compatability with old CollectionReader
1701 # all_streams() and all_files()
1702 self._streams = None
1705 return self._in_init
1707 def _populate_streams(orig_func):
1708 @functools.wraps(orig_func)
1709 def populate_streams_wrapper(self, *args, **kwargs):
1710 # Defer populating self._streams until needed since it creates a copy of the manifest.
1711 if self._streams is None:
1712 if self._manifest_text:
1713 self._streams = [sline.split()
1714 for sline in self._manifest_text.split("\n")
1718 return orig_func(self, *args, **kwargs)
1719 return populate_streams_wrapper
1722 def normalize(self):
1723 """Normalize the streams returned by `all_streams`.
1725 This method is kept for backwards compatability and only affects the
1726 behavior of `all_streams()` and `all_files()`
1732 for s in self.all_streams():
1733 for f in s.all_files():
1734 streamname, filename = split(s.name() + "/" + f.name())
1735 if streamname not in streams:
1736 streams[streamname] = {}
1737 if filename not in streams[streamname]:
1738 streams[streamname][filename] = []
1739 for r in f.segments:
1740 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1742 self._streams = [normalize_stream(s, streams[s])
1743 for s in sorted(streams)]
1745 def all_streams(self):
1746 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1747 for s in self._streams]
1750 def all_files(self):
1751 for s in self.all_streams():
1752 for f in s.all_files():