1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
19 from collections import deque
22 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
23 from .keep import KeepLocator, KeepClient
24 from .stream import StreamReader
25 from ._normalize_stream import normalize_stream
26 from ._ranges import Range, LocatorAndRange
27 from .safeapi import ThreadSafeApiCache
28 import arvados.config as config
29 import arvados.errors as errors
31 import arvados.events as events
32 from arvados.retry import retry_method
34 _logger = logging.getLogger('arvados.collection')
36 class CollectionBase(object):
40 def __exit__(self, exc_type, exc_value, traceback):
44 if self._keep_client is None:
45 self._keep_client = KeepClient(api_client=self._api_client,
46 num_retries=self.num_retries)
47 return self._keep_client
49 def stripped_manifest(self):
50 """Get the manifest with locator hints stripped.
52 Return the manifest for the current collection with all
53 non-portable hints (i.e., permission signatures and other
54 hints other than size hints) removed from the locators.
56 raw = self.manifest_text()
58 for line in raw.split("\n"):
61 clean_fields = fields[:1] + [
62 (re.sub(r'\+[^\d][^\+]*', '', x)
63 if re.match(arvados.util.keep_locator_pattern, x)
66 clean += [' '.join(clean_fields), "\n"]
70 class _WriterFile(_FileLikeObjectBase):
71 def __init__(self, coll_writer, name):
72 super(_WriterFile, self).__init__(name, 'wb')
73 self.dest = coll_writer
76 super(_WriterFile, self).close()
77 self.dest.finish_current_file()
79 @_FileLikeObjectBase._before_close
80 def write(self, data):
83 @_FileLikeObjectBase._before_close
84 def writelines(self, seq):
88 @_FileLikeObjectBase._before_close
90 self.dest.flush_data()
93 class CollectionWriter(CollectionBase):
94 def __init__(self, api_client=None, num_retries=0, replication=None):
95 """Instantiate a CollectionWriter.
97 CollectionWriter lets you build a new Arvados Collection from scratch.
98 Write files to it. The CollectionWriter will upload data to Keep as
99 appropriate, and provide you with the Collection manifest text when
103 * api_client: The API client to use to look up Collections. If not
104 provided, CollectionReader will build one from available Arvados
106 * num_retries: The default number of times to retry failed
107 service requests. Default 0. You may change this value
108 after instantiation, but note those changes may not
109 propagate to related objects like the Keep client.
110 * replication: The number of copies of each block to store.
111 If this argument is None or not supplied, replication is
112 the server-provided default if available, otherwise 2.
114 self._api_client = api_client
115 self.num_retries = num_retries
116 self.replication = (2 if replication is None else replication)
117 self._keep_client = None
118 self._data_buffer = []
119 self._data_buffer_len = 0
120 self._current_stream_files = []
121 self._current_stream_length = 0
122 self._current_stream_locators = []
123 self._current_stream_name = '.'
124 self._current_file_name = None
125 self._current_file_pos = 0
126 self._finished_streams = []
127 self._close_file = None
128 self._queued_file = None
129 self._queued_dirents = deque()
130 self._queued_trees = deque()
131 self._last_open = None
133 def __exit__(self, exc_type, exc_value, traceback):
137 def do_queued_work(self):
138 # The work queue consists of three pieces:
139 # * _queued_file: The file object we're currently writing to the
141 # * _queued_dirents: Entries under the current directory
142 # (_queued_trees[0]) that we want to write or recurse through.
143 # This may contain files from subdirectories if
144 # max_manifest_depth == 0 for this directory.
145 # * _queued_trees: Directories that should be written as separate
146 # streams to the Collection.
147 # This function handles the smallest piece of work currently queued
148 # (current file, then current directory, then next directory) until
149 # no work remains. The _work_THING methods each do a unit of work on
150 # THING. _queue_THING methods add a THING to the work queue.
152 if self._queued_file:
154 elif self._queued_dirents:
156 elif self._queued_trees:
161 def _work_file(self):
163 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
167 self.finish_current_file()
169 self._queued_file.close()
170 self._close_file = None
171 self._queued_file = None
173 def _work_dirents(self):
174 path, stream_name, max_manifest_depth = self._queued_trees[0]
175 if stream_name != self.current_stream_name():
176 self.start_new_stream(stream_name)
177 while self._queued_dirents:
178 dirent = self._queued_dirents.popleft()
179 target = os.path.join(path, dirent)
180 if os.path.isdir(target):
181 self._queue_tree(target,
182 os.path.join(stream_name, dirent),
183 max_manifest_depth - 1)
185 self._queue_file(target, dirent)
187 if not self._queued_dirents:
188 self._queued_trees.popleft()
190 def _work_trees(self):
191 path, stream_name, max_manifest_depth = self._queued_trees[0]
192 d = arvados.util.listdir_recursive(
193 path, max_depth = (None if max_manifest_depth == 0 else 0))
195 self._queue_dirents(stream_name, d)
197 self._queued_trees.popleft()
199 def _queue_file(self, source, filename=None):
200 assert (self._queued_file is None), "tried to queue more than one file"
201 if not hasattr(source, 'read'):
202 source = open(source, 'rb')
203 self._close_file = True
205 self._close_file = False
207 filename = os.path.basename(source.name)
208 self.start_new_file(filename)
209 self._queued_file = source
211 def _queue_dirents(self, stream_name, dirents):
212 assert (not self._queued_dirents), "tried to queue more than one tree"
213 self._queued_dirents = deque(sorted(dirents))
215 def _queue_tree(self, path, stream_name, max_manifest_depth):
216 self._queued_trees.append((path, stream_name, max_manifest_depth))
218 def write_file(self, source, filename=None):
219 self._queue_file(source, filename)
220 self.do_queued_work()
222 def write_directory_tree(self,
223 path, stream_name='.', max_manifest_depth=-1):
224 self._queue_tree(path, stream_name, max_manifest_depth)
225 self.do_queued_work()
227 def write(self, newdata):
228 if isinstance(newdata, bytes):
230 elif isinstance(newdata, str):
231 newdata = newdata.encode()
232 elif hasattr(newdata, '__iter__'):
236 self._data_buffer.append(newdata)
237 self._data_buffer_len += len(newdata)
238 self._current_stream_length += len(newdata)
239 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
242 def open(self, streampath, filename=None):
243 """open(streampath[, filename]) -> file-like object
245 Pass in the path of a file to write to the Collection, either as a
246 single string or as two separate stream name and file name arguments.
247 This method returns a file-like object you can write to add it to the
250 You may only have one file object from the Collection open at a time,
251 so be sure to close the object when you're done. Using the object in
252 a with statement makes that easy::
254 with cwriter.open('./doc/page1.txt') as outfile:
255 outfile.write(page1_data)
256 with cwriter.open('./doc/page2.txt') as outfile:
257 outfile.write(page2_data)
260 streampath, filename = split(streampath)
261 if self._last_open and not self._last_open.closed:
262 raise errors.AssertionError(
263 "can't open '{}' when '{}' is still open".format(
264 filename, self._last_open.name))
265 if streampath != self.current_stream_name():
266 self.start_new_stream(streampath)
267 self.set_current_file_name(filename)
268 self._last_open = _WriterFile(self, filename)
269 return self._last_open
271 def flush_data(self):
272 data_buffer = b''.join(self._data_buffer)
274 self._current_stream_locators.append(
276 data_buffer[0:config.KEEP_BLOCK_SIZE],
277 copies=self.replication))
278 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
279 self._data_buffer_len = len(self._data_buffer[0])
281 def start_new_file(self, newfilename=None):
282 self.finish_current_file()
283 self.set_current_file_name(newfilename)
285 def set_current_file_name(self, newfilename):
286 if re.search(r'[\t\n]', newfilename):
287 raise errors.AssertionError(
288 "Manifest filenames cannot contain whitespace: %s" %
290 elif re.search(r'\x00', newfilename):
291 raise errors.AssertionError(
292 "Manifest filenames cannot contain NUL characters: %s" %
294 self._current_file_name = newfilename
296 def current_file_name(self):
297 return self._current_file_name
299 def finish_current_file(self):
300 if self._current_file_name is None:
301 if self._current_file_pos == self._current_stream_length:
303 raise errors.AssertionError(
304 "Cannot finish an unnamed file " +
305 "(%d bytes at offset %d in '%s' stream)" %
306 (self._current_stream_length - self._current_file_pos,
307 self._current_file_pos,
308 self._current_stream_name))
309 self._current_stream_files.append([
310 self._current_file_pos,
311 self._current_stream_length - self._current_file_pos,
312 self._current_file_name])
313 self._current_file_pos = self._current_stream_length
314 self._current_file_name = None
316 def start_new_stream(self, newstreamname='.'):
317 self.finish_current_stream()
318 self.set_current_stream_name(newstreamname)
320 def set_current_stream_name(self, newstreamname):
321 if re.search(r'[\t\n]', newstreamname):
322 raise errors.AssertionError(
323 "Manifest stream names cannot contain whitespace: '%s'" %
325 self._current_stream_name = '.' if newstreamname=='' else newstreamname
327 def current_stream_name(self):
328 return self._current_stream_name
330 def finish_current_stream(self):
331 self.finish_current_file()
333 if not self._current_stream_files:
335 elif self._current_stream_name is None:
336 raise errors.AssertionError(
337 "Cannot finish an unnamed stream (%d bytes in %d files)" %
338 (self._current_stream_length, len(self._current_stream_files)))
340 if not self._current_stream_locators:
341 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
342 self._finished_streams.append([self._current_stream_name,
343 self._current_stream_locators,
344 self._current_stream_files])
345 self._current_stream_files = []
346 self._current_stream_length = 0
347 self._current_stream_locators = []
348 self._current_stream_name = None
349 self._current_file_pos = 0
350 self._current_file_name = None
353 """Store the manifest in Keep and return its locator.
355 This is useful for storing manifest fragments (task outputs)
356 temporarily in Keep during a Crunch job.
358 In other cases you should make a collection instead, by
359 sending manifest_text() to the API server's "create
360 collection" endpoint.
362 return self._my_keep().put(self.manifest_text().encode(),
363 copies=self.replication)
365 def portable_data_hash(self):
366 stripped = self.stripped_manifest().encode()
367 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
369 def manifest_text(self):
370 self.finish_current_stream()
373 for stream in self._finished_streams:
374 if not re.search(r'^\.(/.*)?$', stream[0]):
376 manifest += stream[0].replace(' ', '\\040')
377 manifest += ' ' + ' '.join(stream[1])
378 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
383 def data_locators(self):
385 for name, locators, files in self._finished_streams:
389 def save_new(self, name=None):
390 return self._api_client.collections().create(
391 ensure_unique_name=True,
394 'manifest_text': self.manifest_text(),
395 }).execute(num_retries=self.num_retries)
398 class ResumableCollectionWriter(CollectionWriter):
399 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
400 '_current_stream_locators', '_current_stream_name',
401 '_current_file_name', '_current_file_pos', '_close_file',
402 '_data_buffer', '_dependencies', '_finished_streams',
403 '_queued_dirents', '_queued_trees']
405 def __init__(self, api_client=None, **kwargs):
406 self._dependencies = {}
407 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
410 def from_state(cls, state, *init_args, **init_kwargs):
411 # Try to build a new writer from scratch with the given state.
412 # If the state is not suitable to resume (because files have changed,
413 # been deleted, aren't predictable, etc.), raise a
414 # StaleWriterStateError. Otherwise, return the initialized writer.
415 # The caller is responsible for calling writer.do_queued_work()
416 # appropriately after it's returned.
417 writer = cls(*init_args, **init_kwargs)
418 for attr_name in cls.STATE_PROPS:
419 attr_value = state[attr_name]
420 attr_class = getattr(writer, attr_name).__class__
421 # Coerce the value into the same type as the initial value, if
423 if attr_class not in (type(None), attr_value.__class__):
424 attr_value = attr_class(attr_value)
425 setattr(writer, attr_name, attr_value)
426 # Check dependencies before we try to resume anything.
427 if any(KeepLocator(ls).permission_expired()
428 for ls in writer._current_stream_locators):
429 raise errors.StaleWriterStateError(
430 "locators include expired permission hint")
431 writer.check_dependencies()
432 if state['_current_file'] is not None:
433 path, pos = state['_current_file']
435 writer._queued_file = open(path, 'rb')
436 writer._queued_file.seek(pos)
437 except IOError as error:
438 raise errors.StaleWriterStateError(
439 "failed to reopen active file {}: {}".format(path, error))
442 def check_dependencies(self):
443 for path, orig_stat in listitems(self._dependencies):
444 if not S_ISREG(orig_stat[ST_MODE]):
445 raise errors.StaleWriterStateError("{} not file".format(path))
447 now_stat = tuple(os.stat(path))
448 except OSError as error:
449 raise errors.StaleWriterStateError(
450 "failed to stat {}: {}".format(path, error))
451 if ((not S_ISREG(now_stat[ST_MODE])) or
452 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
453 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
454 raise errors.StaleWriterStateError("{} changed".format(path))
456 def dump_state(self, copy_func=lambda x: x):
457 state = {attr: copy_func(getattr(self, attr))
458 for attr in self.STATE_PROPS}
459 if self._queued_file is None:
460 state['_current_file'] = None
462 state['_current_file'] = (os.path.realpath(self._queued_file.name),
463 self._queued_file.tell())
466 def _queue_file(self, source, filename=None):
468 src_path = os.path.realpath(source)
470 raise errors.AssertionError("{} not a file path".format(source))
472 path_stat = os.stat(src_path)
473 except OSError as stat_error:
475 super(ResumableCollectionWriter, self)._queue_file(source, filename)
476 fd_stat = os.fstat(self._queued_file.fileno())
477 if not S_ISREG(fd_stat.st_mode):
478 # We won't be able to resume from this cache anyway, so don't
479 # worry about further checks.
480 self._dependencies[source] = tuple(fd_stat)
481 elif path_stat is None:
482 raise errors.AssertionError(
483 "could not stat {}: {}".format(source, stat_error))
484 elif path_stat.st_ino != fd_stat.st_ino:
485 raise errors.AssertionError(
486 "{} changed between open and stat calls".format(source))
488 self._dependencies[src_path] = tuple(fd_stat)
490 def write(self, data):
491 if self._queued_file is None:
492 raise errors.AssertionError(
493 "resumable writer can't accept unsourced data")
494 return super(ResumableCollectionWriter, self).write(data)
502 COLLECTION = "collection"
504 class RichCollectionBase(CollectionBase):
505 """Base class for Collections and Subcollections.
507 Implements the majority of functionality relating to accessing items in the
512 def __init__(self, parent=None):
514 self._committed = False
515 self._callback = None
519 raise NotImplementedError()
522 raise NotImplementedError()
524 def _my_block_manager(self):
525 raise NotImplementedError()
528 raise NotImplementedError()
530 def root_collection(self):
531 raise NotImplementedError()
533 def notify(self, event, collection, name, item):
534 raise NotImplementedError()
536 def stream_name(self):
537 raise NotImplementedError()
541 def find_or_create(self, path, create_type):
542 """Recursively search the specified file path.
544 May return either a `Collection` or `ArvadosFile`. If not found, will
545 create a new item at the specified path based on `create_type`. Will
546 create intermediate subcollections needed to contain the final item in
550 One of `arvados.collection.FILE` or
551 `arvados.collection.COLLECTION`. If the path is not found, and value
552 of create_type is FILE then create and return a new ArvadosFile for
553 the last path component. If COLLECTION, then create and return a new
554 Collection for the last path component.
558 pathcomponents = path.split("/", 1)
559 if pathcomponents[0]:
560 item = self._items.get(pathcomponents[0])
561 if len(pathcomponents) == 1:
564 if create_type == COLLECTION:
565 item = Subcollection(self, pathcomponents[0])
567 item = ArvadosFile(self, pathcomponents[0])
568 self._items[pathcomponents[0]] = item
569 self.set_committed(False)
570 self.notify(ADD, self, pathcomponents[0], item)
574 # create new collection
575 item = Subcollection(self, pathcomponents[0])
576 self._items[pathcomponents[0]] = item
577 self.set_committed(False)
578 self.notify(ADD, self, pathcomponents[0], item)
579 if isinstance(item, RichCollectionBase):
580 return item.find_or_create(pathcomponents[1], create_type)
582 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
587 def find(self, path):
588 """Recursively search the specified file path.
590 May return either a Collection or ArvadosFile. Return None if not
592 If path is invalid (ex: starts with '/'), an IOError exception will be
597 raise errors.ArgumentError("Parameter 'path' is empty.")
599 pathcomponents = path.split("/", 1)
600 if pathcomponents[0] == '':
601 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
603 item = self._items.get(pathcomponents[0])
606 elif len(pathcomponents) == 1:
609 if isinstance(item, RichCollectionBase):
610 if pathcomponents[1]:
611 return item.find(pathcomponents[1])
615 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
618 def mkdirs(self, path):
619 """Recursive subcollection create.
621 Like `os.makedirs()`. Will create intermediate subcollections needed
622 to contain the leaf subcollection path.
626 if self.find(path) != None:
627 raise IOError(errno.EEXIST, "Directory or file exists", path)
629 return self.find_or_create(path, COLLECTION)
631 def open(self, path, mode="r"):
632 """Open a file-like object for access.
635 path to a file in the collection
637 a string consisting of "r", "w", or "a", optionally followed
638 by "b" or "t", optionally followed by "+".
640 binary mode: write() accepts bytes, read() returns bytes.
642 text mode (default): write() accepts strings, read() returns strings.
646 opens for reading and writing. Reads/writes share a file pointer.
648 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
650 opens for reading and writing. All writes are appended to
651 the end of the file. Writing does not affect the file pointer for
655 if not re.search(r'^[rwa][bt]?\+?$', mode):
656 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
658 if mode[0] == 'r' and '+' not in mode:
659 fclass = ArvadosFileReader
660 arvfile = self.find(path)
661 elif not self.writable():
662 raise IOError(errno.EROFS, "Collection is read only")
664 fclass = ArvadosFileWriter
665 arvfile = self.find_or_create(path, FILE)
668 raise IOError(errno.ENOENT, "File not found", path)
669 if not isinstance(arvfile, ArvadosFile):
670 raise IOError(errno.EISDIR, "Is a directory", path)
675 return fclass(arvfile, mode=mode, num_retries=self.num_retries)
678 """Determine if the collection has been modified since last commited."""
679 return not self.committed()
683 """Determine if the collection has been committed to the API server."""
684 return self._committed
687 def set_committed(self, value=True):
688 """Recursively set committed flag.
690 If value is True, set committed to be True for this and all children.
692 If value is False, set committed to be False for this and all parents.
694 if value == self._committed:
697 for k,v in listitems(self._items):
698 v.set_committed(True)
699 self._committed = True
701 self._committed = False
702 if self.parent is not None:
703 self.parent.set_committed(False)
707 """Iterate over names of files and collections contained in this collection."""
708 return iter(viewkeys(self._items))
711 def __getitem__(self, k):
712 """Get a file or collection that is directly contained by this collection.
714 If you want to search a path, use `find()` instead.
717 return self._items[k]
720 def __contains__(self, k):
721 """Test if there is a file or collection a directly contained by this collection."""
722 return k in self._items
726 """Get the number of items directly contained in this collection."""
727 return len(self._items)
731 def __delitem__(self, p):
732 """Delete an item by name which is directly contained by this collection."""
734 self.set_committed(False)
735 self.notify(DEL, self, p, None)
739 """Get a list of names of files and collections directly contained in this collection."""
740 return self._items.keys()
744 """Get a list of files and collection objects directly contained in this collection."""
745 return listvalues(self._items)
749 """Get a list of (name, object) tuples directly contained in this collection."""
750 return listitems(self._items)
752 def exists(self, path):
753 """Test if there is a file or collection at `path`."""
754 return self.find(path) is not None
758 def remove(self, path, recursive=False):
759 """Remove the file or subcollection (directory) at `path`.
762 Specify whether to remove non-empty subcollections (True), or raise an error (False).
766 raise errors.ArgumentError("Parameter 'path' is empty.")
768 pathcomponents = path.split("/", 1)
769 item = self._items.get(pathcomponents[0])
771 raise IOError(errno.ENOENT, "File not found", path)
772 if len(pathcomponents) == 1:
773 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
774 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
775 deleteditem = self._items[pathcomponents[0]]
776 del self._items[pathcomponents[0]]
777 self.set_committed(False)
778 self.notify(DEL, self, pathcomponents[0], deleteditem)
780 item.remove(pathcomponents[1])
782 def _clonefrom(self, source):
783 for k,v in listitems(source):
784 self._items[k] = v.clone(self, k)
787 raise NotImplementedError()
791 def add(self, source_obj, target_name, overwrite=False, reparent=False):
792 """Copy or move a file or subcollection to this collection.
795 An ArvadosFile, or Subcollection object
798 Destination item name. If the target name already exists and is a
799 file, this will raise an error unless you specify `overwrite=True`.
802 Whether to overwrite target file if it already exists.
805 If True, source_obj will be moved from its parent collection to this collection.
806 If False, source_obj will be copied and the parent collection will be
811 if target_name in self and not overwrite:
812 raise IOError(errno.EEXIST, "File already exists", target_name)
815 if target_name in self:
816 modified_from = self[target_name]
818 # Actually make the move or copy.
820 source_obj._reparent(self, target_name)
823 item = source_obj.clone(self, target_name)
825 self._items[target_name] = item
826 self.set_committed(False)
829 self.notify(MOD, self, target_name, (modified_from, item))
831 self.notify(ADD, self, target_name, item)
833 def _get_src_target(self, source, target_path, source_collection, create_dest):
834 if source_collection is None:
835 source_collection = self
838 if isinstance(source, basestring):
839 source_obj = source_collection.find(source)
840 if source_obj is None:
841 raise IOError(errno.ENOENT, "File not found", source)
842 sourcecomponents = source.split("/")
845 sourcecomponents = None
847 # Find parent collection the target path
848 targetcomponents = target_path.split("/")
850 # Determine the name to use.
851 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
854 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
857 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
859 if len(targetcomponents) > 1:
860 target_dir = self.find("/".join(targetcomponents[0:-1]))
864 if target_dir is None:
865 raise IOError(errno.ENOENT, "Target directory not found", target_name)
867 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
868 target_dir = target_dir[target_name]
869 target_name = sourcecomponents[-1]
871 return (source_obj, target_dir, target_name)
875 def copy(self, source, target_path, source_collection=None, overwrite=False):
876 """Copy a file or subcollection to a new path in this collection.
879 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
882 Destination file or path. If the target path already exists and is a
883 subcollection, the item will be placed inside the subcollection. If
884 the target path already exists and is a file, this will raise an error
885 unless you specify `overwrite=True`.
888 Collection to copy `source_path` from (default `self`)
891 Whether to overwrite target file if it already exists.
894 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
895 target_dir.add(source_obj, target_name, overwrite, False)
899 def rename(self, source, target_path, source_collection=None, overwrite=False):
900 """Move a file or subcollection from `source_collection` to a new path in this collection.
903 A string with a path to source file or subcollection.
906 Destination file or path. If the target path already exists and is a
907 subcollection, the item will be placed inside the subcollection. If
908 the target path already exists and is a file, this will raise an error
909 unless you specify `overwrite=True`.
912 Collection to copy `source_path` from (default `self`)
915 Whether to overwrite target file if it already exists.
918 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
919 if not source_obj.writable():
920 raise IOError(errno.EROFS, "Source collection is read only", source)
921 target_dir.add(source_obj, target_name, overwrite, True)
923 def portable_manifest_text(self, stream_name="."):
924 """Get the manifest text for this collection, sub collections and files.
926 This method does not flush outstanding blocks to Keep. It will return
927 a normalized manifest with access tokens stripped.
930 Name to use for this stream (directory)
933 return self._get_manifest_text(stream_name, True, True)
936 def manifest_text(self, stream_name=".", strip=False, normalize=False,
937 only_committed=False):
938 """Get the manifest text for this collection, sub collections and files.
940 This method will flush outstanding blocks to Keep. By default, it will
941 not normalize an unmodified manifest or strip access tokens.
944 Name to use for this stream (directory)
947 If True, remove signing tokens from block locators if present.
948 If False (default), block locators are left unchanged.
951 If True, always export the manifest text in normalized form
952 even if the Collection is not modified. If False (default) and the collection
953 is not modified, return the original manifest text even if it is not
957 If True, don't commit pending blocks.
961 if not only_committed:
962 self._my_block_manager().commit_all()
963 return self._get_manifest_text(stream_name, strip, normalize,
964 only_committed=only_committed)
967 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
968 """Get the manifest text for this collection, sub collections and files.
971 Name to use for this stream (directory)
974 If True, remove signing tokens from block locators if present.
975 If False (default), block locators are left unchanged.
978 If True, always export the manifest text in normalized form
979 even if the Collection is not modified. If False (default) and the collection
980 is not modified, return the original manifest text even if it is not
984 If True, only include blocks that were already committed to Keep.
988 if not self.committed() or self._manifest_text is None or normalize:
991 sorted_keys = sorted(self.keys())
992 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
993 # Create a stream per file `k`
994 arvfile = self[filename]
996 for segment in arvfile.segments():
997 loc = segment.locator
998 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1001 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1003 loc = KeepLocator(loc).stripped()
1004 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1005 segment.segment_offset, segment.range_size))
1006 stream[filename] = filestream
1008 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1009 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1010 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1014 return self.stripped_manifest()
1016 return self._manifest_text
1019 def diff(self, end_collection, prefix=".", holding_collection=None):
1020 """Generate list of add/modify/delete actions.
1022 When given to `apply`, will change `self` to match `end_collection`
1026 if holding_collection is None:
1027 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1029 if k not in end_collection:
1030 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1031 for k in end_collection:
1033 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1034 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1035 elif end_collection[k] != self[k]:
1036 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1038 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1040 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1045 def apply(self, changes):
1046 """Apply changes from `diff`.
1048 If a change conflicts with a local change, it will be saved to an
1049 alternate path indicating the conflict.
1053 self.set_committed(False)
1054 for change in changes:
1055 event_type = change[0]
1058 local = self.find(path)
1059 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1061 if event_type == ADD:
1063 # No local file at path, safe to copy over new file
1064 self.copy(initial, path)
1065 elif local is not None and local != initial:
1066 # There is already local file and it is different:
1067 # save change to conflict file.
1068 self.copy(initial, conflictpath)
1069 elif event_type == MOD or event_type == TOK:
1071 if local == initial:
1072 # Local matches the "initial" item so it has not
1073 # changed locally and is safe to update.
1074 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1075 # Replace contents of local file with new contents
1076 local.replace_contents(final)
1078 # Overwrite path with new item; this can happen if
1079 # path was a file and is now a collection or vice versa
1080 self.copy(final, path, overwrite=True)
1082 # Local is missing (presumably deleted) or local doesn't
1083 # match the "start" value, so save change to conflict file
1084 self.copy(final, conflictpath)
1085 elif event_type == DEL:
1086 if local == initial:
1087 # Local item matches "initial" value, so it is safe to remove.
1088 self.remove(path, recursive=True)
1089 # else, the file is modified or already removed, in either
1090 # case we don't want to try to remove it.
1092 def portable_data_hash(self):
1093 """Get the portable data hash for this collection's manifest."""
1094 if self._manifest_locator and self.committed():
1095 # If the collection is already saved on the API server, and it's committed
1096 # then return API server's PDH response.
1097 return self._portable_data_hash
1099 stripped = self.portable_manifest_text().encode()
1100 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1103 def subscribe(self, callback):
1104 if self._callback is None:
1105 self._callback = callback
1107 raise errors.ArgumentError("A callback is already set on this collection.")
1110 def unsubscribe(self):
1111 if self._callback is not None:
1112 self._callback = None
1115 def notify(self, event, collection, name, item):
1117 self._callback(event, collection, name, item)
1118 self.root_collection().notify(event, collection, name, item)
1121 def __eq__(self, other):
1124 if not isinstance(other, RichCollectionBase):
1126 if len(self._items) != len(other):
1128 for k in self._items:
1131 if self._items[k] != other[k]:
1135 def __ne__(self, other):
1136 return not self.__eq__(other)
1140 """Flush bufferblocks to Keep."""
1141 for e in listvalues(self):
1145 class Collection(RichCollectionBase):
1146 """Represents the root of an Arvados Collection.
1148 This class is threadsafe. The root collection object, all subcollections
1149 and files are protected by a single lock (i.e. each access locks the entire
1155 :To read an existing file:
1156 `c.open("myfile", "r")`
1158 :To write a new file:
1159 `c.open("myfile", "w")`
1161 :To determine if a file exists:
1162 `c.find("myfile") is not None`
1165 `c.copy("source", "dest")`
1168 `c.remove("myfile")`
1170 :To save to an existing collection record:
1173 :To save a new collection record:
1176 :To merge remote changes into this object:
1179 Must be associated with an API server Collection record (during
1180 initialization, or using `save_new`) to use `save` or `update`
1184 def __init__(self, manifest_locator_or_text=None,
1191 replication_desired=None,
1193 """Collection constructor.
1195 :manifest_locator_or_text:
1196 An Arvados collection UUID, portable data hash, raw manifest
1197 text, or (if creating an empty collection) None.
1200 the parent Collection, may be None.
1203 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1204 Prefer this over supplying your own api_client and keep_client (except in testing).
1205 Will use default config settings if not specified.
1208 The API client object to use for requests. If not specified, create one using `apiconfig`.
1211 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1214 the number of retries for API and Keep requests.
1217 the block manager to use. If not specified, create one.
1219 :replication_desired:
1220 How many copies should Arvados maintain. If None, API server default
1221 configuration applies. If not None, this value will also be used
1222 for determining the number of block copies being written.
1225 super(Collection, self).__init__(parent)
1226 self._api_client = api_client
1227 self._keep_client = keep_client
1228 self._block_manager = block_manager
1229 self.replication_desired = replication_desired
1230 self.put_threads = put_threads
1233 self._config = apiconfig
1235 self._config = config.settings()
1237 self.num_retries = num_retries if num_retries is not None else 0
1238 self._manifest_locator = None
1239 self._manifest_text = None
1240 self._portable_data_hash = None
1241 self._api_response = None
1242 self._past_versions = set()
1244 self.lock = threading.RLock()
1247 if manifest_locator_or_text:
1248 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1249 self._manifest_locator = manifest_locator_or_text
1250 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1251 self._manifest_locator = manifest_locator_or_text
1252 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1253 self._manifest_text = manifest_locator_or_text
1255 raise errors.ArgumentError(
1256 "Argument to CollectionReader is not a manifest or a collection UUID")
1260 except (IOError, errors.SyntaxError) as e:
1261 raise errors.ArgumentError("Error processing manifest text: %s", e)
1263 def root_collection(self):
1266 def stream_name(self):
1273 def known_past_version(self, modified_at_and_portable_data_hash):
1274 return modified_at_and_portable_data_hash in self._past_versions
1278 def update(self, other=None, num_retries=None):
1279 """Merge the latest collection on the API server with the current collection."""
1282 if self._manifest_locator is None:
1283 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1284 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1285 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1286 response.get("portable_data_hash") != self.portable_data_hash()):
1287 # The record on the server is different from our current one, but we've seen it before,
1288 # so ignore it because it's already been merged.
1289 # However, if it's the same as our current record, proceed with the update, because we want to update
1293 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1294 other = CollectionReader(response["manifest_text"])
1295 baseline = CollectionReader(self._manifest_text)
1296 self.apply(baseline.diff(other))
1297 self._manifest_text = self.manifest_text()
1301 if self._api_client is None:
1302 self._api_client = ThreadSafeApiCache(self._config)
1303 if self._keep_client is None:
1304 self._keep_client = self._api_client.keep
1305 return self._api_client
1309 if self._keep_client is None:
1310 if self._api_client is None:
1313 self._keep_client = KeepClient(api_client=self._api_client)
1314 return self._keep_client
1317 def _my_block_manager(self):
1318 if self._block_manager is None:
1319 copies = (self.replication_desired or
1320 self._my_api()._rootDesc.get('defaultCollectionReplication',
1322 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1323 return self._block_manager
1325 def _remember_api_response(self, response):
1326 self._api_response = response
1327 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1329 def _populate_from_api_server(self):
1330 # As in KeepClient itself, we must wait until the last
1331 # possible moment to instantiate an API client, in order to
1332 # avoid tripping up clients that don't have access to an API
1333 # server. If we do build one, make sure our Keep client uses
1334 # it. If instantiation fails, we'll fall back to the except
1335 # clause, just like any other Collection lookup
1336 # failure. Return an exception, or None if successful.
1337 self._remember_api_response(self._my_api().collections().get(
1338 uuid=self._manifest_locator).execute(
1339 num_retries=self.num_retries))
1340 self._manifest_text = self._api_response['manifest_text']
1341 self._portable_data_hash = self._api_response['portable_data_hash']
1342 # If not overriden via kwargs, we should try to load the
1343 # replication_desired from the API server
1344 if self.replication_desired is None:
1345 self.replication_desired = self._api_response.get('replication_desired', None)
1347 def _populate(self):
1348 if self._manifest_text is None:
1349 if self._manifest_locator is None:
1352 self._populate_from_api_server()
1353 self._baseline_manifest = self._manifest_text
1354 self._import_manifest(self._manifest_text)
1356 def _has_collection_uuid(self):
1357 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1359 def __enter__(self):
1362 def __exit__(self, exc_type, exc_value, traceback):
1363 """Support scoped auto-commit in a with: block."""
1364 if exc_type is None:
1365 if self.writable() and self._has_collection_uuid():
1369 def stop_threads(self):
1370 if self._block_manager is not None:
1371 self._block_manager.stop_threads()
1374 def manifest_locator(self):
1375 """Get the manifest locator, if any.
1377 The manifest locator will be set when the collection is loaded from an
1378 API server record or the portable data hash of a manifest.
1380 The manifest locator will be None if the collection is newly created or
1381 was created directly from manifest text. The method `save_new()` will
1382 assign a manifest locator.
1385 return self._manifest_locator
1388 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1389 if new_config is None:
1390 new_config = self._config
1392 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1394 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1396 newcollection._clonefrom(self)
1397 return newcollection
1400 def api_response(self):
1401 """Returns information about this Collection fetched from the API server.
1403 If the Collection exists in Keep but not the API server, currently
1404 returns None. Future versions may provide a synthetic response.
1407 return self._api_response
1409 def find_or_create(self, path, create_type):
1410 """See `RichCollectionBase.find_or_create`"""
1414 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1416 def find(self, path):
1417 """See `RichCollectionBase.find`"""
1421 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1423 def remove(self, path, recursive=False):
1424 """See `RichCollectionBase.remove`"""
1426 raise errors.ArgumentError("Cannot remove '.'")
1428 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1433 def save(self, merge=True, num_retries=None):
1434 """Save collection to an existing collection record.
1436 Commit pending buffer blocks to Keep, merge with remote record (if
1437 merge=True, the default), and update the collection record. Returns
1438 the current manifest text.
1440 Will raise AssertionError if not associated with a collection record on
1441 the API server. If you want to save a manifest to Keep only, see
1445 Update and merge remote changes before saving. Otherwise, any
1446 remote changes will be ignored and overwritten.
1449 Retry count on API calls (if None, use the collection default)
1452 if not self.committed():
1453 if not self._has_collection_uuid():
1454 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1456 self._my_block_manager().commit_all()
1461 text = self.manifest_text(strip=False)
1462 self._remember_api_response(self._my_api().collections().update(
1463 uuid=self._manifest_locator,
1464 body={'manifest_text': text}
1466 num_retries=num_retries))
1467 self._manifest_text = self._api_response["manifest_text"]
1468 self._portable_data_hash = self._api_response["portable_data_hash"]
1469 self.set_committed(True)
1471 return self._manifest_text
1477 def save_new(self, name=None,
1478 create_collection_record=True,
1480 ensure_unique_name=False,
1482 """Save collection to a new collection record.
1484 Commit pending buffer blocks to Keep and, when create_collection_record
1485 is True (default), create a new collection record. After creating a
1486 new collection record, this Collection object will be associated with
1487 the new record used by `save()`. Returns the current manifest text.
1490 The collection name.
1492 :create_collection_record:
1493 If True, create a collection record on the API server.
1494 If False, only commit blocks to Keep and return the manifest text.
1497 the user, or project uuid that will own this collection.
1498 If None, defaults to the current user.
1500 :ensure_unique_name:
1501 If True, ask the API server to rename the collection
1502 if it conflicts with a collection with the same name and owner. If
1503 False, a name conflict will result in an error.
1506 Retry count on API calls (if None, use the collection default)
1509 self._my_block_manager().commit_all()
1510 text = self.manifest_text(strip=False)
1512 if create_collection_record:
1514 name = "New collection"
1515 ensure_unique_name = True
1517 body = {"manifest_text": text,
1519 "replication_desired": self.replication_desired}
1521 body["owner_uuid"] = owner_uuid
1523 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1524 text = self._api_response["manifest_text"]
1526 self._manifest_locator = self._api_response["uuid"]
1527 self._portable_data_hash = self._api_response["portable_data_hash"]
1529 self._manifest_text = text
1530 self.set_committed(True)
1534 _token_re = re.compile(r'(\S+)(\s+|$)')
1535 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1536 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1539 def _import_manifest(self, manifest_text):
1540 """Import a manifest into a `Collection`.
1543 The manifest text to import from.
1547 raise ArgumentError("Can only import manifest into an empty collection")
1556 for token_and_separator in self._token_re.finditer(manifest_text):
1557 tok = token_and_separator.group(1)
1558 sep = token_and_separator.group(2)
1560 if state == STREAM_NAME:
1561 # starting a new stream
1562 stream_name = tok.replace('\\040', ' ')
1567 self.find_or_create(stream_name, COLLECTION)
1571 block_locator = self._block_re.match(tok)
1573 blocksize = int(block_locator.group(1))
1574 blocks.append(Range(tok, streamoffset, blocksize, 0))
1575 streamoffset += blocksize
1579 if state == SEGMENTS:
1580 file_segment = self._segment_re.match(tok)
1582 pos = int(file_segment.group(1))
1583 size = int(file_segment.group(2))
1584 name = file_segment.group(3).replace('\\040', ' ')
1585 filepath = os.path.join(stream_name, name)
1586 afile = self.find_or_create(filepath, FILE)
1587 if isinstance(afile, ArvadosFile):
1588 afile.add_segment(blocks, pos, size)
1590 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1593 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1599 self.set_committed(True)
1602 def notify(self, event, collection, name, item):
1604 self._callback(event, collection, name, item)
1607 class Subcollection(RichCollectionBase):
1608 """This is a subdirectory within a collection that doesn't have its own API
1611 Subcollection locking falls under the umbrella lock of its root collection.
1615 def __init__(self, parent, name):
1616 super(Subcollection, self).__init__(parent)
1617 self.lock = self.root_collection().lock
1618 self._manifest_text = None
1620 self.num_retries = parent.num_retries
1622 def root_collection(self):
1623 return self.parent.root_collection()
1626 return self.root_collection().writable()
1629 return self.root_collection()._my_api()
1632 return self.root_collection()._my_keep()
1634 def _my_block_manager(self):
1635 return self.root_collection()._my_block_manager()
1637 def stream_name(self):
1638 return os.path.join(self.parent.stream_name(), self.name)
1641 def clone(self, new_parent, new_name):
1642 c = Subcollection(new_parent, new_name)
1648 def _reparent(self, newparent, newname):
1649 self.set_committed(False)
1651 self.parent.remove(self.name, recursive=True)
1652 self.parent = newparent
1654 self.lock = self.parent.root_collection().lock
1657 class CollectionReader(Collection):
1658 """A read-only collection object.
1660 Initialize from a collection UUID or portable data hash, or raw
1661 manifest text. See `Collection` constructor for detailed options.
1664 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1665 self._in_init = True
1666 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1667 self._in_init = False
1669 # Forego any locking since it should never change once initialized.
1670 self.lock = NoopLock()
1672 # Backwards compatability with old CollectionReader
1673 # all_streams() and all_files()
1674 self._streams = None
1677 return self._in_init
1679 def _populate_streams(orig_func):
1680 @functools.wraps(orig_func)
1681 def populate_streams_wrapper(self, *args, **kwargs):
1682 # Defer populating self._streams until needed since it creates a copy of the manifest.
1683 if self._streams is None:
1684 if self._manifest_text:
1685 self._streams = [sline.split()
1686 for sline in self._manifest_text.split("\n")
1690 return orig_func(self, *args, **kwargs)
1691 return populate_streams_wrapper
1694 def normalize(self):
1695 """Normalize the streams returned by `all_streams`.
1697 This method is kept for backwards compatability and only affects the
1698 behavior of `all_streams()` and `all_files()`
1704 for s in self.all_streams():
1705 for f in s.all_files():
1706 streamname, filename = split(s.name() + "/" + f.name())
1707 if streamname not in streams:
1708 streams[streamname] = {}
1709 if filename not in streams[streamname]:
1710 streams[streamname][filename] = []
1711 for r in f.segments:
1712 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1714 self._streams = [normalize_stream(s, streams[s])
1715 for s in sorted(streams)]
1717 def all_streams(self):
1718 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1719 for s in self._streams]
1722 def all_files(self):
1723 for s in self.all_streams():
1724 for f in s.all_files():