1 from __future__ import absolute_import
2 from future.utils import listitems, listvalues, viewkeys
3 from builtins import str
4 from past.builtins import basestring
5 from builtins import object
15 from collections import deque
18 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
19 from .keep import KeepLocator, KeepClient
20 from .stream import StreamReader
21 from ._normalize_stream import normalize_stream
22 from ._ranges import Range, LocatorAndRange
23 from .safeapi import ThreadSafeApiCache
24 import arvados.config as config
25 import arvados.errors as errors
27 import arvados.events as events
28 from arvados.retry import retry_method
30 _logger = logging.getLogger('arvados.collection')
32 class CollectionBase(object):
36 def __exit__(self, exc_type, exc_value, traceback):
40 if self._keep_client is None:
41 self._keep_client = KeepClient(api_client=self._api_client,
42 num_retries=self.num_retries)
43 return self._keep_client
45 def stripped_manifest(self):
46 """Get the manifest with locator hints stripped.
48 Return the manifest for the current collection with all
49 non-portable hints (i.e., permission signatures and other
50 hints other than size hints) removed from the locators.
52 raw = self.manifest_text()
54 for line in raw.split("\n"):
57 clean_fields = fields[:1] + [
58 (re.sub(r'\+[^\d][^\+]*', '', x)
59 if re.match(arvados.util.keep_locator_pattern, x)
62 clean += [' '.join(clean_fields), "\n"]
66 class _WriterFile(_FileLikeObjectBase):
67 def __init__(self, coll_writer, name):
68 super(_WriterFile, self).__init__(name, 'wb')
69 self.dest = coll_writer
72 super(_WriterFile, self).close()
73 self.dest.finish_current_file()
75 @_FileLikeObjectBase._before_close
76 def write(self, data):
79 @_FileLikeObjectBase._before_close
80 def writelines(self, seq):
84 @_FileLikeObjectBase._before_close
86 self.dest.flush_data()
89 class CollectionWriter(CollectionBase):
90 def __init__(self, api_client=None, num_retries=0, replication=None):
91 """Instantiate a CollectionWriter.
93 CollectionWriter lets you build a new Arvados Collection from scratch.
94 Write files to it. The CollectionWriter will upload data to Keep as
95 appropriate, and provide you with the Collection manifest text when
99 * api_client: The API client to use to look up Collections. If not
100 provided, CollectionReader will build one from available Arvados
102 * num_retries: The default number of times to retry failed
103 service requests. Default 0. You may change this value
104 after instantiation, but note those changes may not
105 propagate to related objects like the Keep client.
106 * replication: The number of copies of each block to store.
107 If this argument is None or not supplied, replication is
108 the server-provided default if available, otherwise 2.
110 self._api_client = api_client
111 self.num_retries = num_retries
112 self.replication = (2 if replication is None else replication)
113 self._keep_client = None
114 self._data_buffer = []
115 self._data_buffer_len = 0
116 self._current_stream_files = []
117 self._current_stream_length = 0
118 self._current_stream_locators = []
119 self._current_stream_name = '.'
120 self._current_file_name = None
121 self._current_file_pos = 0
122 self._finished_streams = []
123 self._close_file = None
124 self._queued_file = None
125 self._queued_dirents = deque()
126 self._queued_trees = deque()
127 self._last_open = None
129 def __exit__(self, exc_type, exc_value, traceback):
133 def do_queued_work(self):
134 # The work queue consists of three pieces:
135 # * _queued_file: The file object we're currently writing to the
137 # * _queued_dirents: Entries under the current directory
138 # (_queued_trees[0]) that we want to write or recurse through.
139 # This may contain files from subdirectories if
140 # max_manifest_depth == 0 for this directory.
141 # * _queued_trees: Directories that should be written as separate
142 # streams to the Collection.
143 # This function handles the smallest piece of work currently queued
144 # (current file, then current directory, then next directory) until
145 # no work remains. The _work_THING methods each do a unit of work on
146 # THING. _queue_THING methods add a THING to the work queue.
148 if self._queued_file:
150 elif self._queued_dirents:
152 elif self._queued_trees:
157 def _work_file(self):
159 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
163 self.finish_current_file()
165 self._queued_file.close()
166 self._close_file = None
167 self._queued_file = None
169 def _work_dirents(self):
170 path, stream_name, max_manifest_depth = self._queued_trees[0]
171 if stream_name != self.current_stream_name():
172 self.start_new_stream(stream_name)
173 while self._queued_dirents:
174 dirent = self._queued_dirents.popleft()
175 target = os.path.join(path, dirent)
176 if os.path.isdir(target):
177 self._queue_tree(target,
178 os.path.join(stream_name, dirent),
179 max_manifest_depth - 1)
181 self._queue_file(target, dirent)
183 if not self._queued_dirents:
184 self._queued_trees.popleft()
186 def _work_trees(self):
187 path, stream_name, max_manifest_depth = self._queued_trees[0]
188 d = arvados.util.listdir_recursive(
189 path, max_depth = (None if max_manifest_depth == 0 else 0))
191 self._queue_dirents(stream_name, d)
193 self._queued_trees.popleft()
195 def _queue_file(self, source, filename=None):
196 assert (self._queued_file is None), "tried to queue more than one file"
197 if not hasattr(source, 'read'):
198 source = open(source, 'rb')
199 self._close_file = True
201 self._close_file = False
203 filename = os.path.basename(source.name)
204 self.start_new_file(filename)
205 self._queued_file = source
207 def _queue_dirents(self, stream_name, dirents):
208 assert (not self._queued_dirents), "tried to queue more than one tree"
209 self._queued_dirents = deque(sorted(dirents))
211 def _queue_tree(self, path, stream_name, max_manifest_depth):
212 self._queued_trees.append((path, stream_name, max_manifest_depth))
214 def write_file(self, source, filename=None):
215 self._queue_file(source, filename)
216 self.do_queued_work()
218 def write_directory_tree(self,
219 path, stream_name='.', max_manifest_depth=-1):
220 self._queue_tree(path, stream_name, max_manifest_depth)
221 self.do_queued_work()
223 def write(self, newdata):
224 if isinstance(newdata, bytes):
226 elif isinstance(newdata, str):
227 newdata = newdata.encode()
228 elif hasattr(newdata, '__iter__'):
232 self._data_buffer.append(newdata)
233 self._data_buffer_len += len(newdata)
234 self._current_stream_length += len(newdata)
235 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
238 def open(self, streampath, filename=None):
239 """open(streampath[, filename]) -> file-like object
241 Pass in the path of a file to write to the Collection, either as a
242 single string or as two separate stream name and file name arguments.
243 This method returns a file-like object you can write to add it to the
246 You may only have one file object from the Collection open at a time,
247 so be sure to close the object when you're done. Using the object in
248 a with statement makes that easy::
250 with cwriter.open('./doc/page1.txt') as outfile:
251 outfile.write(page1_data)
252 with cwriter.open('./doc/page2.txt') as outfile:
253 outfile.write(page2_data)
256 streampath, filename = split(streampath)
257 if self._last_open and not self._last_open.closed:
258 raise errors.AssertionError(
259 "can't open '{}' when '{}' is still open".format(
260 filename, self._last_open.name))
261 if streampath != self.current_stream_name():
262 self.start_new_stream(streampath)
263 self.set_current_file_name(filename)
264 self._last_open = _WriterFile(self, filename)
265 return self._last_open
267 def flush_data(self):
268 data_buffer = b''.join(self._data_buffer)
270 self._current_stream_locators.append(
272 data_buffer[0:config.KEEP_BLOCK_SIZE],
273 copies=self.replication))
274 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
275 self._data_buffer_len = len(self._data_buffer[0])
277 def start_new_file(self, newfilename=None):
278 self.finish_current_file()
279 self.set_current_file_name(newfilename)
281 def set_current_file_name(self, newfilename):
282 if re.search(r'[\t\n]', newfilename):
283 raise errors.AssertionError(
284 "Manifest filenames cannot contain whitespace: %s" %
286 elif re.search(r'\x00', newfilename):
287 raise errors.AssertionError(
288 "Manifest filenames cannot contain NUL characters: %s" %
290 self._current_file_name = newfilename
292 def current_file_name(self):
293 return self._current_file_name
295 def finish_current_file(self):
296 if self._current_file_name is None:
297 if self._current_file_pos == self._current_stream_length:
299 raise errors.AssertionError(
300 "Cannot finish an unnamed file " +
301 "(%d bytes at offset %d in '%s' stream)" %
302 (self._current_stream_length - self._current_file_pos,
303 self._current_file_pos,
304 self._current_stream_name))
305 self._current_stream_files.append([
306 self._current_file_pos,
307 self._current_stream_length - self._current_file_pos,
308 self._current_file_name])
309 self._current_file_pos = self._current_stream_length
310 self._current_file_name = None
312 def start_new_stream(self, newstreamname='.'):
313 self.finish_current_stream()
314 self.set_current_stream_name(newstreamname)
316 def set_current_stream_name(self, newstreamname):
317 if re.search(r'[\t\n]', newstreamname):
318 raise errors.AssertionError(
319 "Manifest stream names cannot contain whitespace: '%s'" %
321 self._current_stream_name = '.' if newstreamname=='' else newstreamname
323 def current_stream_name(self):
324 return self._current_stream_name
326 def finish_current_stream(self):
327 self.finish_current_file()
329 if not self._current_stream_files:
331 elif self._current_stream_name is None:
332 raise errors.AssertionError(
333 "Cannot finish an unnamed stream (%d bytes in %d files)" %
334 (self._current_stream_length, len(self._current_stream_files)))
336 if not self._current_stream_locators:
337 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
338 self._finished_streams.append([self._current_stream_name,
339 self._current_stream_locators,
340 self._current_stream_files])
341 self._current_stream_files = []
342 self._current_stream_length = 0
343 self._current_stream_locators = []
344 self._current_stream_name = None
345 self._current_file_pos = 0
346 self._current_file_name = None
349 """Store the manifest in Keep and return its locator.
351 This is useful for storing manifest fragments (task outputs)
352 temporarily in Keep during a Crunch job.
354 In other cases you should make a collection instead, by
355 sending manifest_text() to the API server's "create
356 collection" endpoint.
358 return self._my_keep().put(self.manifest_text().encode(),
359 copies=self.replication)
361 def portable_data_hash(self):
362 stripped = self.stripped_manifest().encode()
363 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
365 def manifest_text(self):
366 self.finish_current_stream()
369 for stream in self._finished_streams:
370 if not re.search(r'^\.(/.*)?$', stream[0]):
372 manifest += stream[0].replace(' ', '\\040')
373 manifest += ' ' + ' '.join(stream[1])
374 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
379 def data_locators(self):
381 for name, locators, files in self._finished_streams:
386 class ResumableCollectionWriter(CollectionWriter):
387 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
388 '_current_stream_locators', '_current_stream_name',
389 '_current_file_name', '_current_file_pos', '_close_file',
390 '_data_buffer', '_dependencies', '_finished_streams',
391 '_queued_dirents', '_queued_trees']
393 def __init__(self, api_client=None, **kwargs):
394 self._dependencies = {}
395 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
398 def from_state(cls, state, *init_args, **init_kwargs):
399 # Try to build a new writer from scratch with the given state.
400 # If the state is not suitable to resume (because files have changed,
401 # been deleted, aren't predictable, etc.), raise a
402 # StaleWriterStateError. Otherwise, return the initialized writer.
403 # The caller is responsible for calling writer.do_queued_work()
404 # appropriately after it's returned.
405 writer = cls(*init_args, **init_kwargs)
406 for attr_name in cls.STATE_PROPS:
407 attr_value = state[attr_name]
408 attr_class = getattr(writer, attr_name).__class__
409 # Coerce the value into the same type as the initial value, if
411 if attr_class not in (type(None), attr_value.__class__):
412 attr_value = attr_class(attr_value)
413 setattr(writer, attr_name, attr_value)
414 # Check dependencies before we try to resume anything.
415 if any(KeepLocator(ls).permission_expired()
416 for ls in writer._current_stream_locators):
417 raise errors.StaleWriterStateError(
418 "locators include expired permission hint")
419 writer.check_dependencies()
420 if state['_current_file'] is not None:
421 path, pos = state['_current_file']
423 writer._queued_file = open(path, 'rb')
424 writer._queued_file.seek(pos)
425 except IOError as error:
426 raise errors.StaleWriterStateError(
427 "failed to reopen active file {}: {}".format(path, error))
430 def check_dependencies(self):
431 for path, orig_stat in listitems(self._dependencies):
432 if not S_ISREG(orig_stat[ST_MODE]):
433 raise errors.StaleWriterStateError("{} not file".format(path))
435 now_stat = tuple(os.stat(path))
436 except OSError as error:
437 raise errors.StaleWriterStateError(
438 "failed to stat {}: {}".format(path, error))
439 if ((not S_ISREG(now_stat[ST_MODE])) or
440 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
441 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
442 raise errors.StaleWriterStateError("{} changed".format(path))
444 def dump_state(self, copy_func=lambda x: x):
445 state = {attr: copy_func(getattr(self, attr))
446 for attr in self.STATE_PROPS}
447 if self._queued_file is None:
448 state['_current_file'] = None
450 state['_current_file'] = (os.path.realpath(self._queued_file.name),
451 self._queued_file.tell())
454 def _queue_file(self, source, filename=None):
456 src_path = os.path.realpath(source)
458 raise errors.AssertionError("{} not a file path".format(source))
460 path_stat = os.stat(src_path)
461 except OSError as stat_error:
463 super(ResumableCollectionWriter, self)._queue_file(source, filename)
464 fd_stat = os.fstat(self._queued_file.fileno())
465 if not S_ISREG(fd_stat.st_mode):
466 # We won't be able to resume from this cache anyway, so don't
467 # worry about further checks.
468 self._dependencies[source] = tuple(fd_stat)
469 elif path_stat is None:
470 raise errors.AssertionError(
471 "could not stat {}: {}".format(source, stat_error))
472 elif path_stat.st_ino != fd_stat.st_ino:
473 raise errors.AssertionError(
474 "{} changed between open and stat calls".format(source))
476 self._dependencies[src_path] = tuple(fd_stat)
478 def write(self, data):
479 if self._queued_file is None:
480 raise errors.AssertionError(
481 "resumable writer can't accept unsourced data")
482 return super(ResumableCollectionWriter, self).write(data)
490 COLLECTION = "collection"
492 class RichCollectionBase(CollectionBase):
493 """Base class for Collections and Subcollections.
495 Implements the majority of functionality relating to accessing items in the
500 def __init__(self, parent=None):
502 self._committed = False
503 self._callback = None
507 raise NotImplementedError()
510 raise NotImplementedError()
512 def _my_block_manager(self):
513 raise NotImplementedError()
516 raise NotImplementedError()
518 def root_collection(self):
519 raise NotImplementedError()
521 def notify(self, event, collection, name, item):
522 raise NotImplementedError()
524 def stream_name(self):
525 raise NotImplementedError()
529 def find_or_create(self, path, create_type):
530 """Recursively search the specified file path.
532 May return either a `Collection` or `ArvadosFile`. If not found, will
533 create a new item at the specified path based on `create_type`. Will
534 create intermediate subcollections needed to contain the final item in
538 One of `arvados.collection.FILE` or
539 `arvados.collection.COLLECTION`. If the path is not found, and value
540 of create_type is FILE then create and return a new ArvadosFile for
541 the last path component. If COLLECTION, then create and return a new
542 Collection for the last path component.
546 pathcomponents = path.split("/", 1)
547 if pathcomponents[0]:
548 item = self._items.get(pathcomponents[0])
549 if len(pathcomponents) == 1:
552 if create_type == COLLECTION:
553 item = Subcollection(self, pathcomponents[0])
555 item = ArvadosFile(self, pathcomponents[0])
556 self._items[pathcomponents[0]] = item
557 self.set_committed(False)
558 self.notify(ADD, self, pathcomponents[0], item)
562 # create new collection
563 item = Subcollection(self, pathcomponents[0])
564 self._items[pathcomponents[0]] = item
565 self.set_committed(False)
566 self.notify(ADD, self, pathcomponents[0], item)
567 if isinstance(item, RichCollectionBase):
568 return item.find_or_create(pathcomponents[1], create_type)
570 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
575 def find(self, path):
576 """Recursively search the specified file path.
578 May return either a Collection or ArvadosFile. Return None if not
580 If path is invalid (ex: starts with '/'), an IOError exception will be
585 raise errors.ArgumentError("Parameter 'path' is empty.")
587 pathcomponents = path.split("/", 1)
588 if pathcomponents[0] == '':
589 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
591 item = self._items.get(pathcomponents[0])
594 elif len(pathcomponents) == 1:
597 if isinstance(item, RichCollectionBase):
598 if pathcomponents[1]:
599 return item.find(pathcomponents[1])
603 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
606 def mkdirs(self, path):
607 """Recursive subcollection create.
609 Like `os.makedirs()`. Will create intermediate subcollections needed
610 to contain the leaf subcollection path.
614 if self.find(path) != None:
615 raise IOError(errno.EEXIST, "Directory or file exists", path)
617 return self.find_or_create(path, COLLECTION)
619 def open(self, path, mode="r"):
620 """Open a file-like object for access.
623 path to a file in the collection
625 a string consisting of "r", "w", or "a", optionally followed
626 by "b" or "t", optionally followed by "+".
628 binary mode: write() accepts bytes, read() returns bytes.
630 text mode (default): write() accepts strings, read() returns strings.
634 opens for reading and writing. Reads/writes share a file pointer.
636 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
638 opens for reading and writing. All writes are appended to
639 the end of the file. Writing does not affect the file pointer for
643 if not re.search(r'^[rwa][bt]?\+?$', mode):
644 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
646 if mode[0] == 'r' and '+' not in mode:
647 fclass = ArvadosFileReader
648 arvfile = self.find(path)
649 elif not self.writable():
650 raise IOError(errno.EROFS, "Collection is read only")
652 fclass = ArvadosFileWriter
653 arvfile = self.find_or_create(path, FILE)
656 raise IOError(errno.ENOENT, "File not found", path)
657 if not isinstance(arvfile, ArvadosFile):
658 raise IOError(errno.EISDIR, "Is a directory", path)
663 return fclass(arvfile, mode=mode, num_retries=self.num_retries)
666 """Determine if the collection has been modified since last commited."""
667 return not self.committed()
671 """Determine if the collection has been committed to the API server."""
672 return self._committed
675 def set_committed(self, value=True):
676 """Recursively set committed flag.
678 If value is True, set committed to be True for this and all children.
680 If value is False, set committed to be False for this and all parents.
682 if value == self._committed:
685 for k,v in listitems(self._items):
686 v.set_committed(True)
687 self._committed = True
689 self._committed = False
690 if self.parent is not None:
691 self.parent.set_committed(False)
695 """Iterate over names of files and collections contained in this collection."""
696 return iter(viewkeys(self._items))
699 def __getitem__(self, k):
700 """Get a file or collection that is directly contained by this collection.
702 If you want to search a path, use `find()` instead.
705 return self._items[k]
708 def __contains__(self, k):
709 """Test if there is a file or collection a directly contained by this collection."""
710 return k in self._items
714 """Get the number of items directly contained in this collection."""
715 return len(self._items)
719 def __delitem__(self, p):
720 """Delete an item by name which is directly contained by this collection."""
722 self.set_committed(False)
723 self.notify(DEL, self, p, None)
727 """Get a list of names of files and collections directly contained in this collection."""
728 return self._items.keys()
732 """Get a list of files and collection objects directly contained in this collection."""
733 return listvalues(self._items)
737 """Get a list of (name, object) tuples directly contained in this collection."""
738 return listitems(self._items)
740 def exists(self, path):
741 """Test if there is a file or collection at `path`."""
742 return self.find(path) is not None
746 def remove(self, path, recursive=False):
747 """Remove the file or subcollection (directory) at `path`.
750 Specify whether to remove non-empty subcollections (True), or raise an error (False).
754 raise errors.ArgumentError("Parameter 'path' is empty.")
756 pathcomponents = path.split("/", 1)
757 item = self._items.get(pathcomponents[0])
759 raise IOError(errno.ENOENT, "File not found", path)
760 if len(pathcomponents) == 1:
761 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
762 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
763 deleteditem = self._items[pathcomponents[0]]
764 del self._items[pathcomponents[0]]
765 self.set_committed(False)
766 self.notify(DEL, self, pathcomponents[0], deleteditem)
768 item.remove(pathcomponents[1])
770 def _clonefrom(self, source):
771 for k,v in listitems(source):
772 self._items[k] = v.clone(self, k)
775 raise NotImplementedError()
779 def add(self, source_obj, target_name, overwrite=False, reparent=False):
780 """Copy or move a file or subcollection to this collection.
783 An ArvadosFile, or Subcollection object
786 Destination item name. If the target name already exists and is a
787 file, this will raise an error unless you specify `overwrite=True`.
790 Whether to overwrite target file if it already exists.
793 If True, source_obj will be moved from its parent collection to this collection.
794 If False, source_obj will be copied and the parent collection will be
799 if target_name in self and not overwrite:
800 raise IOError(errno.EEXIST, "File already exists", target_name)
803 if target_name in self:
804 modified_from = self[target_name]
806 # Actually make the move or copy.
808 source_obj._reparent(self, target_name)
811 item = source_obj.clone(self, target_name)
813 self._items[target_name] = item
814 self.set_committed(False)
817 self.notify(MOD, self, target_name, (modified_from, item))
819 self.notify(ADD, self, target_name, item)
821 def _get_src_target(self, source, target_path, source_collection, create_dest):
822 if source_collection is None:
823 source_collection = self
826 if isinstance(source, basestring):
827 source_obj = source_collection.find(source)
828 if source_obj is None:
829 raise IOError(errno.ENOENT, "File not found", source)
830 sourcecomponents = source.split("/")
833 sourcecomponents = None
835 # Find parent collection the target path
836 targetcomponents = target_path.split("/")
838 # Determine the name to use.
839 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
842 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
845 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
847 if len(targetcomponents) > 1:
848 target_dir = self.find("/".join(targetcomponents[0:-1]))
852 if target_dir is None:
853 raise IOError(errno.ENOENT, "Target directory not found", target_name)
855 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
856 target_dir = target_dir[target_name]
857 target_name = sourcecomponents[-1]
859 return (source_obj, target_dir, target_name)
863 def copy(self, source, target_path, source_collection=None, overwrite=False):
864 """Copy a file or subcollection to a new path in this collection.
867 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
870 Destination file or path. If the target path already exists and is a
871 subcollection, the item will be placed inside the subcollection. If
872 the target path already exists and is a file, this will raise an error
873 unless you specify `overwrite=True`.
876 Collection to copy `source_path` from (default `self`)
879 Whether to overwrite target file if it already exists.
882 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
883 target_dir.add(source_obj, target_name, overwrite, False)
887 def rename(self, source, target_path, source_collection=None, overwrite=False):
888 """Move a file or subcollection from `source_collection` to a new path in this collection.
891 A string with a path to source file or subcollection.
894 Destination file or path. If the target path already exists and is a
895 subcollection, the item will be placed inside the subcollection. If
896 the target path already exists and is a file, this will raise an error
897 unless you specify `overwrite=True`.
900 Collection to copy `source_path` from (default `self`)
903 Whether to overwrite target file if it already exists.
906 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
907 if not source_obj.writable():
908 raise IOError(errno.EROFS, "Source collection is read only", source)
909 target_dir.add(source_obj, target_name, overwrite, True)
911 def portable_manifest_text(self, stream_name="."):
912 """Get the manifest text for this collection, sub collections and files.
914 This method does not flush outstanding blocks to Keep. It will return
915 a normalized manifest with access tokens stripped.
918 Name to use for this stream (directory)
921 return self._get_manifest_text(stream_name, True, True)
924 def manifest_text(self, stream_name=".", strip=False, normalize=False,
925 only_committed=False):
926 """Get the manifest text for this collection, sub collections and files.
928 This method will flush outstanding blocks to Keep. By default, it will
929 not normalize an unmodified manifest or strip access tokens.
932 Name to use for this stream (directory)
935 If True, remove signing tokens from block locators if present.
936 If False (default), block locators are left unchanged.
939 If True, always export the manifest text in normalized form
940 even if the Collection is not modified. If False (default) and the collection
941 is not modified, return the original manifest text even if it is not
945 If True, don't commit pending blocks.
949 if not only_committed:
950 self._my_block_manager().commit_all()
951 return self._get_manifest_text(stream_name, strip, normalize,
952 only_committed=only_committed)
955 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
956 """Get the manifest text for this collection, sub collections and files.
959 Name to use for this stream (directory)
962 If True, remove signing tokens from block locators if present.
963 If False (default), block locators are left unchanged.
966 If True, always export the manifest text in normalized form
967 even if the Collection is not modified. If False (default) and the collection
968 is not modified, return the original manifest text even if it is not
972 If True, only include blocks that were already committed to Keep.
976 if not self.committed() or self._manifest_text is None or normalize:
979 sorted_keys = sorted(self.keys())
980 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
981 # Create a stream per file `k`
982 arvfile = self[filename]
984 for segment in arvfile.segments():
985 loc = segment.locator
986 if arvfile.parent._my_block_manager().is_bufferblock(loc):
989 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
991 loc = KeepLocator(loc).stripped()
992 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
993 segment.segment_offset, segment.range_size))
994 stream[filename] = filestream
996 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
997 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
998 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1002 return self.stripped_manifest()
1004 return self._manifest_text
1007 def diff(self, end_collection, prefix=".", holding_collection=None):
1008 """Generate list of add/modify/delete actions.
1010 When given to `apply`, will change `self` to match `end_collection`
1014 if holding_collection is None:
1015 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1017 if k not in end_collection:
1018 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1019 for k in end_collection:
1021 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1022 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1023 elif end_collection[k] != self[k]:
1024 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1026 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1028 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1033 def apply(self, changes):
1034 """Apply changes from `diff`.
1036 If a change conflicts with a local change, it will be saved to an
1037 alternate path indicating the conflict.
1041 self.set_committed(False)
1042 for change in changes:
1043 event_type = change[0]
1046 local = self.find(path)
1047 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1049 if event_type == ADD:
1051 # No local file at path, safe to copy over new file
1052 self.copy(initial, path)
1053 elif local is not None and local != initial:
1054 # There is already local file and it is different:
1055 # save change to conflict file.
1056 self.copy(initial, conflictpath)
1057 elif event_type == MOD or event_type == TOK:
1059 if local == initial:
1060 # Local matches the "initial" item so it has not
1061 # changed locally and is safe to update.
1062 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1063 # Replace contents of local file with new contents
1064 local.replace_contents(final)
1066 # Overwrite path with new item; this can happen if
1067 # path was a file and is now a collection or vice versa
1068 self.copy(final, path, overwrite=True)
1070 # Local is missing (presumably deleted) or local doesn't
1071 # match the "start" value, so save change to conflict file
1072 self.copy(final, conflictpath)
1073 elif event_type == DEL:
1074 if local == initial:
1075 # Local item matches "initial" value, so it is safe to remove.
1076 self.remove(path, recursive=True)
1077 # else, the file is modified or already removed, in either
1078 # case we don't want to try to remove it.
1080 def portable_data_hash(self):
1081 """Get the portable data hash for this collection's manifest."""
1082 if self._manifest_locator and self.committed():
1083 # If the collection is already saved on the API server, and it's committed
1084 # then return API server's PDH response.
1085 return self._portable_data_hash
1087 stripped = self.portable_manifest_text().encode()
1088 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1091 def subscribe(self, callback):
1092 if self._callback is None:
1093 self._callback = callback
1095 raise errors.ArgumentError("A callback is already set on this collection.")
1098 def unsubscribe(self):
1099 if self._callback is not None:
1100 self._callback = None
1103 def notify(self, event, collection, name, item):
1105 self._callback(event, collection, name, item)
1106 self.root_collection().notify(event, collection, name, item)
1109 def __eq__(self, other):
1112 if not isinstance(other, RichCollectionBase):
1114 if len(self._items) != len(other):
1116 for k in self._items:
1119 if self._items[k] != other[k]:
1123 def __ne__(self, other):
1124 return not self.__eq__(other)
1128 """Flush bufferblocks to Keep."""
1129 for e in listvalues(self):
1133 class Collection(RichCollectionBase):
1134 """Represents the root of an Arvados Collection.
1136 This class is threadsafe. The root collection object, all subcollections
1137 and files are protected by a single lock (i.e. each access locks the entire
1143 :To read an existing file:
1144 `c.open("myfile", "r")`
1146 :To write a new file:
1147 `c.open("myfile", "w")`
1149 :To determine if a file exists:
1150 `c.find("myfile") is not None`
1153 `c.copy("source", "dest")`
1156 `c.remove("myfile")`
1158 :To save to an existing collection record:
1161 :To save a new collection record:
1164 :To merge remote changes into this object:
1167 Must be associated with an API server Collection record (during
1168 initialization, or using `save_new`) to use `save` or `update`
1172 def __init__(self, manifest_locator_or_text=None,
1179 replication_desired=None,
1181 """Collection constructor.
1183 :manifest_locator_or_text:
1184 One of Arvados collection UUID, block locator of
1185 a manifest, raw manifest text, or None (to create an empty collection).
1187 the parent Collection, may be None.
1190 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1191 Prefer this over supplying your own api_client and keep_client (except in testing).
1192 Will use default config settings if not specified.
1195 The API client object to use for requests. If not specified, create one using `apiconfig`.
1198 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1201 the number of retries for API and Keep requests.
1204 the block manager to use. If not specified, create one.
1206 :replication_desired:
1207 How many copies should Arvados maintain. If None, API server default
1208 configuration applies. If not None, this value will also be used
1209 for determining the number of block copies being written.
1212 super(Collection, self).__init__(parent)
1213 self._api_client = api_client
1214 self._keep_client = keep_client
1215 self._block_manager = block_manager
1216 self.replication_desired = replication_desired
1217 self.put_threads = put_threads
1220 self._config = apiconfig
1222 self._config = config.settings()
1224 self.num_retries = num_retries if num_retries is not None else 0
1225 self._manifest_locator = None
1226 self._manifest_text = None
1227 self._portable_data_hash = None
1228 self._api_response = None
1229 self._past_versions = set()
1231 self.lock = threading.RLock()
1234 if manifest_locator_or_text:
1235 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1236 self._manifest_locator = manifest_locator_or_text
1237 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1238 self._manifest_locator = manifest_locator_or_text
1239 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1240 self._manifest_text = manifest_locator_or_text
1242 raise errors.ArgumentError(
1243 "Argument to CollectionReader is not a manifest or a collection UUID")
1247 except (IOError, errors.SyntaxError) as e:
1248 raise errors.ArgumentError("Error processing manifest text: %s", e)
1250 def root_collection(self):
1253 def stream_name(self):
1260 def known_past_version(self, modified_at_and_portable_data_hash):
1261 return modified_at_and_portable_data_hash in self._past_versions
1265 def update(self, other=None, num_retries=None):
1266 """Merge the latest collection on the API server with the current collection."""
1269 if self._manifest_locator is None:
1270 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1271 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1272 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1273 response.get("portable_data_hash") != self.portable_data_hash()):
1274 # The record on the server is different from our current one, but we've seen it before,
1275 # so ignore it because it's already been merged.
1276 # However, if it's the same as our current record, proceed with the update, because we want to update
1280 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1281 other = CollectionReader(response["manifest_text"])
1282 baseline = CollectionReader(self._manifest_text)
1283 self.apply(baseline.diff(other))
1284 self._manifest_text = self.manifest_text()
1288 if self._api_client is None:
1289 self._api_client = ThreadSafeApiCache(self._config)
1290 if self._keep_client is None:
1291 self._keep_client = self._api_client.keep
1292 return self._api_client
1296 if self._keep_client is None:
1297 if self._api_client is None:
1300 self._keep_client = KeepClient(api_client=self._api_client)
1301 return self._keep_client
1304 def _my_block_manager(self):
1305 if self._block_manager is None:
1306 copies = (self.replication_desired or
1307 self._my_api()._rootDesc.get('defaultCollectionReplication',
1309 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1310 return self._block_manager
1312 def _remember_api_response(self, response):
1313 self._api_response = response
1314 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1316 def _populate_from_api_server(self):
1317 # As in KeepClient itself, we must wait until the last
1318 # possible moment to instantiate an API client, in order to
1319 # avoid tripping up clients that don't have access to an API
1320 # server. If we do build one, make sure our Keep client uses
1321 # it. If instantiation fails, we'll fall back to the except
1322 # clause, just like any other Collection lookup
1323 # failure. Return an exception, or None if successful.
1325 self._remember_api_response(self._my_api().collections().get(
1326 uuid=self._manifest_locator).execute(
1327 num_retries=self.num_retries))
1328 self._manifest_text = self._api_response['manifest_text']
1329 self._portable_data_hash = self._api_response['portable_data_hash']
1330 # If not overriden via kwargs, we should try to load the
1331 # replication_desired from the API server
1332 if self.replication_desired is None:
1333 self.replication_desired = self._api_response.get('replication_desired', None)
1335 except Exception as e:
1338 def _populate_from_keep(self):
1339 # Retrieve a manifest directly from Keep. This has a chance of
1340 # working if [a] the locator includes a permission signature
1341 # or [b] the Keep services are operating in world-readable
1342 # mode. Return an exception, or None if successful.
1344 self._manifest_text = self._my_keep().get(
1345 self._manifest_locator, num_retries=self.num_retries).decode()
1346 except Exception as e:
1349 def _populate(self):
1350 if self._manifest_locator is None and self._manifest_text is None:
1352 error_via_api = None
1353 error_via_keep = None
1354 should_try_keep = ((self._manifest_text is None) and
1355 arvados.util.keep_locator_pattern.match(
1356 self._manifest_locator))
1357 if ((self._manifest_text is None) and
1358 arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1359 error_via_keep = self._populate_from_keep()
1360 if self._manifest_text is None:
1361 error_via_api = self._populate_from_api_server()
1362 if error_via_api is not None and not should_try_keep:
1364 if ((self._manifest_text is None) and
1365 not error_via_keep and
1367 # Looks like a keep locator, and we didn't already try keep above
1368 error_via_keep = self._populate_from_keep()
1369 if self._manifest_text is None:
1371 raise errors.NotFoundError(
1372 ("Failed to retrieve collection '{}' " +
1373 "from either API server ({}) or Keep ({})."
1375 self._manifest_locator,
1379 self._baseline_manifest = self._manifest_text
1380 self._import_manifest(self._manifest_text)
1383 def _has_collection_uuid(self):
1384 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1386 def __enter__(self):
1389 def __exit__(self, exc_type, exc_value, traceback):
1390 """Support scoped auto-commit in a with: block."""
1391 if exc_type is None:
1392 if self.writable() and self._has_collection_uuid():
1396 def stop_threads(self):
1397 if self._block_manager is not None:
1398 self._block_manager.stop_threads()
1401 def manifest_locator(self):
1402 """Get the manifest locator, if any.
1404 The manifest locator will be set when the collection is loaded from an
1405 API server record or the portable data hash of a manifest.
1407 The manifest locator will be None if the collection is newly created or
1408 was created directly from manifest text. The method `save_new()` will
1409 assign a manifest locator.
1412 return self._manifest_locator
1415 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1416 if new_config is None:
1417 new_config = self._config
1419 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1421 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1423 newcollection._clonefrom(self)
1424 return newcollection
1427 def api_response(self):
1428 """Returns information about this Collection fetched from the API server.
1430 If the Collection exists in Keep but not the API server, currently
1431 returns None. Future versions may provide a synthetic response.
1434 return self._api_response
1436 def find_or_create(self, path, create_type):
1437 """See `RichCollectionBase.find_or_create`"""
1441 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1443 def find(self, path):
1444 """See `RichCollectionBase.find`"""
1448 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1450 def remove(self, path, recursive=False):
1451 """See `RichCollectionBase.remove`"""
1453 raise errors.ArgumentError("Cannot remove '.'")
1455 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1460 def save(self, merge=True, num_retries=None):
1461 """Save collection to an existing collection record.
1463 Commit pending buffer blocks to Keep, merge with remote record (if
1464 merge=True, the default), and update the collection record. Returns
1465 the current manifest text.
1467 Will raise AssertionError if not associated with a collection record on
1468 the API server. If you want to save a manifest to Keep only, see
1472 Update and merge remote changes before saving. Otherwise, any
1473 remote changes will be ignored and overwritten.
1476 Retry count on API calls (if None, use the collection default)
1479 if not self.committed():
1480 if not self._has_collection_uuid():
1481 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1483 self._my_block_manager().commit_all()
1488 text = self.manifest_text(strip=False)
1489 self._remember_api_response(self._my_api().collections().update(
1490 uuid=self._manifest_locator,
1491 body={'manifest_text': text}
1493 num_retries=num_retries))
1494 self._manifest_text = self._api_response["manifest_text"]
1495 self._portable_data_hash = self._api_response["portable_data_hash"]
1496 self.set_committed(True)
1498 return self._manifest_text
1504 def save_new(self, name=None,
1505 create_collection_record=True,
1507 ensure_unique_name=False,
1509 """Save collection to a new collection record.
1511 Commit pending buffer blocks to Keep and, when create_collection_record
1512 is True (default), create a new collection record. After creating a
1513 new collection record, this Collection object will be associated with
1514 the new record used by `save()`. Returns the current manifest text.
1517 The collection name.
1519 :create_collection_record:
1520 If True, create a collection record on the API server.
1521 If False, only commit blocks to Keep and return the manifest text.
1524 the user, or project uuid that will own this collection.
1525 If None, defaults to the current user.
1527 :ensure_unique_name:
1528 If True, ask the API server to rename the collection
1529 if it conflicts with a collection with the same name and owner. If
1530 False, a name conflict will result in an error.
1533 Retry count on API calls (if None, use the collection default)
1536 self._my_block_manager().commit_all()
1537 text = self.manifest_text(strip=False)
1539 if create_collection_record:
1541 name = "New collection"
1542 ensure_unique_name = True
1544 body = {"manifest_text": text,
1546 "replication_desired": self.replication_desired}
1548 body["owner_uuid"] = owner_uuid
1550 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1551 text = self._api_response["manifest_text"]
1553 self._manifest_locator = self._api_response["uuid"]
1554 self._portable_data_hash = self._api_response["portable_data_hash"]
1556 self._manifest_text = text
1557 self.set_committed(True)
1562 def _import_manifest(self, manifest_text):
1563 """Import a manifest into a `Collection`.
1566 The manifest text to import from.
1570 raise ArgumentError("Can only import manifest into an empty collection")
1579 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1580 tok = token_and_separator.group(1)
1581 sep = token_and_separator.group(2)
1583 if state == STREAM_NAME:
1584 # starting a new stream
1585 stream_name = tok.replace('\\040', ' ')
1590 self.find_or_create(stream_name, COLLECTION)
1594 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1596 blocksize = int(block_locator.group(1))
1597 blocks.append(Range(tok, streamoffset, blocksize, 0))
1598 streamoffset += blocksize
1602 if state == SEGMENTS:
1603 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1605 pos = int(file_segment.group(1))
1606 size = int(file_segment.group(2))
1607 name = file_segment.group(3).replace('\\040', ' ')
1608 filepath = os.path.join(stream_name, name)
1609 afile = self.find_or_create(filepath, FILE)
1610 if isinstance(afile, ArvadosFile):
1611 afile.add_segment(blocks, pos, size)
1613 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1616 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1622 self.set_committed(True)
1625 def notify(self, event, collection, name, item):
1627 self._callback(event, collection, name, item)
1630 class Subcollection(RichCollectionBase):
1631 """This is a subdirectory within a collection that doesn't have its own API
1634 Subcollection locking falls under the umbrella lock of its root collection.
1638 def __init__(self, parent, name):
1639 super(Subcollection, self).__init__(parent)
1640 self.lock = self.root_collection().lock
1641 self._manifest_text = None
1643 self.num_retries = parent.num_retries
1645 def root_collection(self):
1646 return self.parent.root_collection()
1649 return self.root_collection().writable()
1652 return self.root_collection()._my_api()
1655 return self.root_collection()._my_keep()
1657 def _my_block_manager(self):
1658 return self.root_collection()._my_block_manager()
1660 def stream_name(self):
1661 return os.path.join(self.parent.stream_name(), self.name)
1664 def clone(self, new_parent, new_name):
1665 c = Subcollection(new_parent, new_name)
1671 def _reparent(self, newparent, newname):
1672 self.set_committed(False)
1674 self.parent.remove(self.name, recursive=True)
1675 self.parent = newparent
1677 self.lock = self.parent.root_collection().lock
1680 class CollectionReader(Collection):
1681 """A read-only collection object.
1683 Initialize from an api collection record locator, a portable data hash of a
1684 manifest, or raw manifest text. See `Collection` constructor for detailed
1688 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1689 self._in_init = True
1690 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1691 self._in_init = False
1693 # Forego any locking since it should never change once initialized.
1694 self.lock = NoopLock()
1696 # Backwards compatability with old CollectionReader
1697 # all_streams() and all_files()
1698 self._streams = None
1701 return self._in_init
1703 def _populate_streams(orig_func):
1704 @functools.wraps(orig_func)
1705 def populate_streams_wrapper(self, *args, **kwargs):
1706 # Defer populating self._streams until needed since it creates a copy of the manifest.
1707 if self._streams is None:
1708 if self._manifest_text:
1709 self._streams = [sline.split()
1710 for sline in self._manifest_text.split("\n")
1714 return orig_func(self, *args, **kwargs)
1715 return populate_streams_wrapper
1718 def normalize(self):
1719 """Normalize the streams returned by `all_streams`.
1721 This method is kept for backwards compatability and only affects the
1722 behavior of `all_streams()` and `all_files()`
1728 for s in self.all_streams():
1729 for f in s.all_files():
1730 streamname, filename = split(s.name() + "/" + f.name())
1731 if streamname not in streams:
1732 streams[streamname] = {}
1733 if filename not in streams[streamname]:
1734 streams[streamname][filename] = []
1735 for r in f.segments:
1736 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1738 self._streams = [normalize_stream(s, streams[s])
1739 for s in sorted(streams)]
1741 def all_streams(self):
1742 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1743 for s in self._streams]
1746 def all_files(self):
1747 for s in self.all_streams():
1748 for f in s.all_files():